
    tni%                         d Z ddlZddlZddlZddlZddlmZmZ ddlm	Z	m
Z
mZ ddlmZmZ ddlmZ dd	lmZmZ  e            Zd
 ZdedefdZ G d d          ZdS )zh
runpod | serverless | rp_scale.py
Provides the functionality for scaling the runpod serverless worker.
    N)AnyDict   )AsyncClientSessionClientSessionTooManyRequests   )get_job
handle_job)RunPodLogger)JobsProgressIS_LOCAL_TESTc                 l    t          j        | ||          }t                              d|            d S )NzUncaught exception | )	tracebackformat_exceptionlogerror)exc_type	exc_valueexc_tracebackexcs       _/var/www/html/gpu-tools/venv/lib/python3.11/site-packages/runpod/serverless/modules/rp_scale.py_handle_uncaught_exceptionr      s7    

$Xy-
H
HCII+c++,,,,,    current_concurrencyreturnc                     | S )z
    Default concurrency modifier.

    This function returns the current concurrency without any modification.

    Args:
        current_concurrency (int): The current concurrency.

    Returns:
        int: The current concurrency.
     )r   s    r   _default_concurrency_modifierr      s
     r   c                       e Zd ZdZdeeef         fdZd Zd Z	d Z
d Zd Zd	 Zd
efdZdefdZdefdZdedefdZdS )	JobScalerz^
    Job Scaler. This class is responsible for scaling the number of concurrent requests.
    configc                 0   t          j                    | _        d| _        || _        t                      | _        t          j        | j                  | _        t          | _
        t          | _        d| _        t          | _        |                    d          x}r|| _
        t"          sd S | j                            d          x}r|| _        | j                            d          x}r|| _        | j                            d          x}r	|| _        d S d S )Nr	   maxsizeZ   concurrency_modifierjobs_fetcherjobs_fetcher_timeoutjobs_handler)asyncioEvent_shutdown_eventr   r"   r   job_progressQueue
jobs_queuer   r'   r
   r(   r)   r   r*   getr   )selfr"   r'   r(   r)   r*   s         r   __init__zJobScaler.__init__-   s   &}#$ (NN!-0HIII$A!#$&!&#)::.D#E#EE 	=(<D% 	F;??>:::< 	- ,D#';??3I#J#JJ 	=(<D%;??>:::< 	- ,D	- 	-r   c                 l  K   |                      | j                  | _        | j        r| j        | j        j        k    rd S |                                 dk    rt          j        d           d {V  3t          j        | j                  | _        t          	                    d| j                    d S )Nr   r	   r$   z.JobScaler.set_scale | New concurrency set to: )
r'   r   r0   r%   current_occupancyr+   sleepr/   r   debugr2   s    r   	set_scalezJobScaler.set_scaleJ   s      #'#<#<T=U#V#V ? 	 8DO<S S SF$$&&**-"""""""""!-0HIII		WT=UWW	
 	
 	
 	
 	
r   c                 \   t           t          _        	 t          j        t          j        | j                   t          j        t          j        | j                   n*# t          $ r t          	                    d           Y nw xY wt          j        |                                            dS )z
        This is required for the worker to be able to shut down gracefully
        when the user sends a SIGTERM or SIGINT signal. This is typically
        the case when the worker is running in a container.
        z5Signal handling is only supported in the main thread.N)r   sys
excepthooksignalSIGTERMhandle_shutdownSIGINT
ValueErrorr   warningr+   runr8   s    r   startzJobScaler.start[   s     4	QM&.$*>???M&-)=>>>> 	Q 	Q 	QKKOPPPPP	Q
 	DHHJJs   AA $BBc                 j    t                               d| d           |                                  dS )a  
        Called when the worker is signalled to shut down.

        This function is called when the worker receives a signal to shut down, such as
        SIGTERM or SIGINT. It sets the shutdown event, which will cause the worker to
        exit its main loop and shut down gracefully.

        Args:
            signum: The signal number that was received.
            frame: The current stack frame.
        zReceived shutdown signal: .N)r   r7   kill_worker)r2   signumframes      r   r?   zJobScaler.handle_shutdownn   s:     			8v888999r   c                 H  K   t                      4 d {V }t          j        |                     |                    }t          j        |                     |                    }||g}t          j        |  d {V  d d d           d {V  d S # 1 d {V swxY w Y   d S )N)r   r+   create_taskget_jobsrun_jobsgather)r2   sessionjobtake_taskjobrun_tasktaskss        r   rC   zJobScaler.run}   s0     %'' 	) 	) 	) 	) 	) 	) 	)7".t}}W/E/EFFL!-dmmG.D.DEEK!;/E .%((((((((	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	)s   A'B
BBc                 6    | j                                          S )z<
        Return whether the worker is alive or not.
        )r-   is_setr8   s    r   is_alivezJobScaler.is_alive   s     '..0000r   c                 l    t                               d           | j                                         dS )z-
        Whether to kill the worker.
        zKill worker.N)r   r7   r-   setr8   s    r   rG   zJobScaler.kill_worker   s1     			.!!!  """""r   r   c                     | j                                         }| j                                        }t                              d| j         d| d|            ||z   S )Nz JobScaler.status | concurrency: z	; queue: z; progress: )r0   qsizer.   get_job_countr   r7   r   )r2   current_queue_countcurrent_progress_counts      r   r5   zJobScaler.current_occupancy   s    "o3355!%!2!@!@!B!B		 Lt/G  L  LRe  L  L  tJ  L  L	
 	
 	
 &(;;;r   rO   c           	      &  K   |                                  r|                                  d{V  | j        |                                 z
  }|dk    r5t                              d           t          j        d           d{V  	 t                              d           t          j        | 	                    ||          | j
                   d{V }|s7t                              d           	 t          j        d           d{V  |D ]]}| j                            |           d{V  | j                            |           t                              d|d	                    ^t                              d
| j                                                    n# t"          $ r7 t                              d           t          j        d           d{V  Y nt          j        $ r t                              d            t          j        $ r t                              d           Y nt(          $ r(}t                              d| d           Y d}~nYd}~wt*          $ rI}t                              dt/          |          j         dt3          |                      Y d}~nd}~ww xY wt          j        d           d{V  n# t          j        d           d{V  w xY w|                                  dS dS )z
        Retrieve multiple jobs from the server in batches using blocking requests.

        Runs the block in an infinite loop while the worker is alive.

        Adds jobs to the JobsQueue
        Nr   z2JobScaler.get_jobs | Queue is full. Retrying soon.r	   z.JobScaler.get_jobs | Starting job acquisition.)timeoutz&JobScaler.get_jobs | No jobs acquired.z
Job QueuedidzJobs in queue: z?JobScaler.get_jobs | Too many requests. Debounce for 5 seconds.   z+JobScaler.get_jobs | Request was cancelled.z9JobScaler.get_jobs | Job acquisition timed out. Retrying.z'JobScaler.get_jobs | Unexpected error: rF   z!Failed to get job. | Error Type: z | Error Message: )rU   r9   r   r5   r   r7   r+   r6   wait_forr(   r)   r0   putr.   addinforY   r   CancelledErrorTimeoutError	TypeError	Exceptionr   type__name__str)r2   rO   jobs_neededacquired_jobsjobr   s         r   rL   zJobScaler.get_jobs   s      mmoo /	'.."""""""""2T5K5K5M5MMKa		NOOOmA&&&&&&&&&&'		JKKK '.&6%%g{;; 5' ' ' ! ! ! ! ! !
 % IIFGGG6 mA&&&&&&&&&&3 ) 7 7C/--c222222222%))#...IIlCI6666D4?+@+@+B+BDDEEEE" ' ' '		U   mA&&&&&&&&&&&)   		GHHH' W W W		UVVVVV N N N		LELLLMMMMMMMM   		lU8Lll`cdi`j`jll        mA&&&&&&&&&&gmA&&&&&&&&&&_ mmoo /	' /	' /	' /	' /	'sc   
A+F( BF( &K (>J=&K (AJ=9K ;	J=I'"K 'J=4?J83K 8J==K K7c                 h  K   g }|                                  s| j                                        sjt          |          | j        k     r| j                                        s| j                                         d{V }t          j        |                     ||                    }|	                    |           t          |          | j        k     r| j                                        |rat                              dt          |                      t          j        |t          j                   d{V \  }fd|D             }t          j        d           d{V  |                                  P| j                                        jt          j        |  d{V  dS )z
        Retrieve jobs from the jobs queue and process them concurrently.

        Runs the block in an infinite loop while the worker is alive or jobs queue is not empty.
        NzJobs in progress: )return_whenc                     g | ]}|v|	S r   r   ).0tdones     r   
<listcomp>z&JobScaler.run_jobs.<locals>.<listcomp>   s    ;;;qQd]]]]]r   r   )rU   r0   emptylenr   r1   r+   rK   r   appendr   rd   waitFIRST_COMPLETEDr6   rN   )r2   rO   rR   rn   taskpendingrt   s         @r   rM   zJobScaler.run_jobs   s      mmoo 	#T_%:%:%<%< 	#e**t777@U@U@W@W7 O//11111111 *4??7C+H+HIIT""" e**t777@U@U@W@W7  <:c%jj::;;;&-lw'>' ' ' ! ! ! ! ! !g
 <;;;E;;; -"""""""""+ mmoo 	#T_%:%:%<%< 	#0 ne$$$$$$$$$$r   rn   c                   K   	 t                               d|d                    |                     || j        |           d{V  | j                            dd          r|                                  n8# t          $ r+}t                               d| |d                    |d}~ww xY w| j        	                                 | j
                            |           t                               d|d                    dS # | j        	                                 | j
                            |           t                               d|d                    w xY w)za
        Process an individual job. This function is run concurrently for multiple jobs.
        zHandling Jobr_   Nrefresh_workerFzError handling job: zFinished Job)r   r7   r*   r"   r1   rG   rh   r   r0   	task_doner.   remove)r2   rO   rn   errs       r   r   zJobScaler.handle_job   s[     	1IInc$i000##GT[#>>>>>>>>>{/77 #  """ 	 	 	II2S22CI>>>I	 O%%''' $$S)))IInc$i00000 O%%''' $$S)))IInc$i0000s+   A2A7 6D 7
B,&B''B,,D AEN)rj   
__module____qualname____doc__r   rk   r   r3   r9   rD   r?   rC   rU   rG   intr5   r   rL   rM   dictr   r   r   r   r!   r!   (   s        -tCH~ - - - -:
 
 
"     &  
) 
) 
)1 1 1# # #<3 < < < <7'm 7' 7' 7' 7'r %m  %  %  %  %D1 1D 1 1 1 1 1 1r   r!   )r   r+   r=   r;   r   typingr   r   http_clientr   r   r   rp_jobr
   r   	rp_loggerr   worker_stater   r   r   r   r   r   r!   r   r   r   <module>r      s(   
   



             M M M M M M M M M M ' ' ' ' ' ' ' ' # # # # # # 5 5 5 5 5 5 5 5lnn- - -
s s    i1 i1 i1 i1 i1 i1 i1 i1 i1 i1r   