Source code for arq.worker

"""
:mod:`worker`
=============

Responsible for executing jobs on the worker.
"""
import asyncio
import logging
import os
import signal
import sys
import time
from datetime import datetime
from importlib import import_module, reload
from multiprocessing import Process
from signal import Signals
from typing import Dict, List, Type  # noqa

from async_timeout import timeout

from .drain import Drain
from .jobs import ArqError, Job
from .logs import default_log_config
from .main import Actor  # noqa
from .utils import RedisMixin, timestamp, truncate

__all__ = ['BaseWorker', 'RunWorkerProcess', 'StopJob', 'import_string']

work_logger = logging.getLogger('arq.work')
ctrl_logger = logging.getLogger('arq.control')
jobs_logger = logging.getLogger('arq.jobs')


class HandledExit(ArqError):
    pass


class ImmediateExit(ArqError):
    pass


class StopJob(ArqError):
    def __init__(self, reason: str='', warning: bool=False) -> None:
        self.warning = warning
        super().__init__(reason)


# special signal sent by the main process in case the worker process hasn't received a signal (eg. SIGTERM or SIGINT)
SIG_PROXY = signal.SIGUSR1


[docs]class BaseWorker(RedisMixin): """ Base class for Workers to inherit from. """ #: maximum number of jobs which can be execute at the same time by the event loop max_concurrent_tasks = 50 #: number of seconds after a termination signal (SIGINT or SIGTERM) is received to force quit the worker shutdown_delay = 6 #: default maximum duration of a job timeout_seconds = 60 #: shadow classes, a list of Actor classes for the Worker to run shadows = None #: number of seconds between calls to health_check health_check_interval = 60 health_check_key = b'arq:health-check' #: Mostly used in tests; if true actors and the redis pool will not be closed at the end of run() # allowing reuse of the worker, eg. ``worker.run()`` can be called multiple times. reusable = False #: Adjust the length at which arguments of concurrent functions and job results are curtailed # when logging job execution log_curtail = 80 #: Whether or not to skip log messages from health checks where nothing has changed repeat_health_check_logs = False drain_class = Drain _shadow_factory_timeout = 10 def __init__(self, *, burst: bool=False, shadows: list=None, queues: list=None, timeout_seconds: int=None, **kwargs) -> None: """ :param burst: if true the worker will close as soon as no new jobs are found in the queue lists :param shadows: list of :class:`arq.main.Actor` classes for the worker to run, overrides shadows already defined in the class definition :param queues: list of queue names for the worker to listen on, if None queues is taken from the shadows :param timeout_seconds: maximum duration of a job, after that the job will be cancelled by the event loop :param re_queue: Whether or not to re-queue jobs if the worker quits before the job has time to finish. :param kwargs: other keyword arguments, see :class:`arq.utils.RedisMixin` for all available options """ self._burst_mode = burst self.shadows = shadows or self.shadows self.queues: List[str] = queues self.timeout_seconds = timeout_seconds or self.timeout_seconds self._shadow_lookup: Dict[str, Actor] = {} self.start: float = None self.last_health_check = 0 self._last_health_check_log = None self._closed = False self.drain: Drain = None self.job_class: Type[Job] = None signal.signal(signal.SIGINT, self.handle_sig) signal.signal(signal.SIGTERM, self.handle_sig) signal.signal(SIG_PROXY, self.handle_proxy_signal) super().__init__(**kwargs) self._shutdown_lock = asyncio.Lock(loop=self.loop)
[docs] async def shadow_factory(self) -> list: """ Initialise list of shadows and return them. Override to customise the way shadows are initialised. """ if self.shadows is None: raise TypeError('shadows not defined on worker') kwargs = await self.shadow_kwargs() shadows = [s(**kwargs) for s in self.shadows] await asyncio.gather(*[s.startup() for s in shadows], loop=self.loop) return shadows
[docs] async def shadow_kwargs(self): """ Prepare the keyword arguments for initialising all shadows. Override to customise the kwargs used to initialise shadows. """ return dict( redis_settings=self.redis_settings, worker=self, loop=self.loop, existing_pool=await self.get_redis_pool(), )
[docs] @classmethod def logging_config(cls, verbose) -> dict: """ Override to customise the logging setup for the arq worker. By default just uses :func:`arq.logs.default_log_config`. :param verbose: verbose flag from cli, by default log level is INFO if false and DEBUG if true :return: dict suitable for ``logging.config.dictConfig`` """ return default_log_config(verbose)
@property def shadow_names(self): return ', '.join(self._shadow_lookup.keys()) def get_redis_queues(self): q_lookup = {} for s in self._shadow_lookup.values(): q_lookup.update(s.queue_lookup) try: queues = [(q_lookup[q], q) for q in self.queues] except KeyError as e: raise KeyError(f'queue not found in queue lookups from shadows, ' f'queues: {self.queues}, combined shadow queue lookups: {q_lookup}') from e return [r for r, q in queues], dict(queues) def run_until_complete(self, log_redis_version=False): self.loop.run_until_complete(self.run(log_redis_version))
[docs] async def run(self, log_redis_version=False): """ Main entry point for the the worker which initialises shadows, checks they look ok then runs ``work`` to perform jobs. """ if log_redis_version: await self.log_redis_info(work_logger.info) self._stopped = False work_logger.info('Initialising work manager, burst mode: %s, creating shadows...', self._burst_mode) with timeout(self._shadow_factory_timeout): shadows = await self.shadow_factory() assert isinstance(shadows, list), 'shadow_factory should return a list not %s' % type(shadows) self.job_class = shadows[0].job_class work_logger.debug('Using first shadows job class "%s"', self.job_class.__name__) for shadow in shadows[1:]: if shadow.job_class != self.job_class: raise TypeError(f"shadows don't match: {shadow} has a different job class to the first shadow, " f"{shadow.job_class} != {self.job_class}") if not self.queues: self.queues = shadows[0].queues for shadow in shadows[1:]: if shadow.queues != self.queues: raise TypeError(f'{shadow} has a different list of queues to the first shadow: ' f'{shadow.queues} != {self.queues}') self._shadow_lookup = {w.name: w for w in shadows} work_logger.info('Running worker with %s shadow%s listening to %d queues', len(self._shadow_lookup), '' if len(self._shadow_lookup) == 1 else 's', len(self.queues)) work_logger.info('shadows: %s | queues: %s', self.shadow_names, ', '.join(self.queues)) self.drain = self.drain_class( redis_pool=await self.get_redis_pool(), max_concurrent_tasks=self.max_concurrent_tasks, shutdown_delay=self.shutdown_delay - 1, timeout_seconds=self.timeout_seconds, burst_mode=self._burst_mode, ) self.start = timestamp() try: await self.work() finally: t = (timestamp() - self.start) if self.start else 0 work_logger.info('shutting down worker after %0.3fs ◆ %d jobs done ◆ %d failed ◆ %d timed out', t, self.jobs_complete, self.jobs_failed, self.jobs_timed_out) if not self.reusable: await self.close() if self.drain.task_exception: work_logger.error('Found task exception "%s"', self.drain.task_exception) raise self.drain.task_exception
[docs] async def work(self): """ Pop job definitions from the lists associated with the queues and perform the jobs. Also regularly runs ``record_health``. """ redis_queues, queue_lookup = self.get_redis_queues() original_redis_queues = list(redis_queues) async with self.drain: await self.heart_beat(original_redis_queues, queue_lookup) async for raw_queue, raw_data in self.drain.iter(*redis_queues): if raw_queue is not None: job = self.job_class(raw_data, queue_name=queue_lookup[raw_queue], raw_queue=raw_queue) shadow = self._shadow_lookup.get(job.class_name) re_enqueue = shadow and getattr(shadow, 're_enqueue_jobs', False) work_logger.debug('scheduling job %r, re-enqueue: %r', job, re_enqueue) self.drain.add(self.run_job, job, re_enqueue) await self.heart_beat(original_redis_queues, queue_lookup)
async def heart_beat(self, redis_queues, queue_lookup): await self.record_health(redis_queues, queue_lookup) for shadow in self._shadow_lookup.values(): await shadow.run_cron() async def record_health(self, redis_queues, queue_lookup): now_ts = timestamp() if (now_ts - self.last_health_check) < self.health_check_interval: return self.last_health_check = now_ts pending_tasks = sum(not t.done() for t in self.drain.pending_tasks) info = ( f'{datetime.now():%b-%d %H:%M:%S} j_complete={self.jobs_complete} j_failed={self.jobs_failed} ' f'j_timedout={self.jobs_timed_out} j_ongoing={pending_tasks}' ) for redis_queue in redis_queues: info += ' q_{}={}'.format(queue_lookup[redis_queue], await self.drain.redis.llen(redis_queue)) await self.drain.redis.setex(self.health_check_key, self.health_check_interval + 1, info.encode()) log_suffix = info[info.index('j_complete='):] if self.repeat_health_check_logs or log_suffix != self._last_health_check_log: jobs_logger.info('recording health: %s', info) self._last_health_check_log = log_suffix async def _check_health(self): r = 1 async with await self.get_redis_conn() as redis: data = await redis.get(self.health_check_key) if not data: work_logger.warning('Health check failed: no health check sentinel value found') else: work_logger.info('Health check successful: %s', data.decode()) r = 0 # only need to close redis not deal with queues etc., hence super close await super().close() return r
[docs] @classmethod def check_health(cls, **kwargs): """ Run a health check on the worker return the appropriate exit code. :return: 0 if successful, 1 if not """ self = cls(**kwargs) return self.loop.run_until_complete(self._check_health())
async def run_job(self, j: Job): try: shadow = self._shadow_lookup[j.class_name] except KeyError: return self.handle_prepare_exc(f'Job Error: unable to find shadow for {j!r}') try: func = getattr(shadow, j.func_name) except AttributeError: return self.handle_prepare_exc(f'Job Error: shadow class "{shadow.name}" has no function "{j.func_name}"') try: func = func.direct except AttributeError: # allow for cases where enqueue_job is called manually pass started_at = timestamp() self.log_job_start(started_at, j) try: result = await func(*j.args, **j.kwargs) except StopJob as e: self.handle_stop_job(started_at, e, j) return 0 except BaseException as e: self.handle_execute_exc(started_at, e, j) return 1 else: self.log_job_result(started_at, result, j) return 0 def log_job_start(self, started_at: float, j: Job): if jobs_logger.isEnabledFor(logging.INFO): job_str = j.to_string(self.log_curtail) jobs_logger.info('%-4s queued%7.3fs → %s', j.queue, started_at - j.queued_at, job_str) def log_job_result(self, started_at: float, result, j: Job): if not jobs_logger.isEnabledFor(logging.INFO): return job_time = timestamp() - started_at sr = '' if result is None else truncate(repr(result), self.log_curtail) jobs_logger.info('%-4s ran in%7.3fs ← %s%s', j.queue, job_time, j.short_ref(), sr) def handle_prepare_exc(self, msg: str): self.drain.jobs_failed += 1 jobs_logger.error(msg) # exit with zero so we don't increment jobs_failed twice return 0 @classmethod def handle_stop_job(cls, started_at: float, exc: StopJob, j: Job): if exc.warning: msg, logger = '%-4s ran in%7.3fs ■ %s ● Stopped Warning %s', jobs_logger.warning else: msg, logger = '%-4s ran in%7.3fs ■ %s ● Stopped %s', jobs_logger.info logger(msg, j.queue, timestamp() - started_at, j.short_ref(), exc) @classmethod def handle_execute_exc(cls, started_at: float, exc: BaseException, j: Job): exc_type = exc.__class__.__name__ jobs_logger.exception('%-4s ran in%7.3fs ! %s: %s', j.queue, timestamp() - started_at, j, exc_type) async def close(self): if not self._closed: if self._shadow_lookup: await asyncio.gather(*[s.close(True) for s in self._shadow_lookup.values()], loop=self.loop) await super().close() self._closed = True def handle_proxy_signal(self, signum, frame): self.running = False work_logger.info('pid=%d, got signal proxied from main process, stopping...', os.getpid()) signal.signal(signal.SIGINT, self.handle_sig_force) signal.signal(signal.SIGTERM, self.handle_sig_force) signal.signal(signal.SIGALRM, self.handle_sig_force) signal.alarm(self.shutdown_delay) raise HandledExit() def handle_sig(self, signum, frame): self.running = False work_logger.info('pid=%d, got signal: %s, stopping...', os.getpid(), Signals(signum).name) signal.signal(SIG_PROXY, signal.SIG_IGN) signal.signal(signal.SIGINT, self.handle_sig_force) signal.signal(signal.SIGTERM, self.handle_sig_force) signal.signal(signal.SIGALRM, self.handle_sig_force) signal.alarm(self.shutdown_delay) raise HandledExit() def handle_sig_force(self, signum, frame): work_logger.warning('pid=%d, got signal: %s again, forcing exit', os.getpid(), Signals(signum).name) raise ImmediateExit('force exit') jobs_complete = property(lambda self: self.drain.jobs_complete) jobs_failed = property(lambda self: self.drain.jobs_failed) jobs_timed_out = property(lambda self: self.drain.jobs_timed_out) @property def running(self): return self.drain and self.drain.running @running.setter def running(self, v): if self.drain: self.drain.running = v
[docs]def import_string(file_path, attr_name): """ Import attribute/class from from a python module. Raise ``ImportError`` if the import failed. Approximately stolen from django. :param file_path: path to python module :param attr_name: attribute to get from module :return: attribute """ module_path = file_path.replace('.py', '').replace('/', '.') p = os.getcwd() sys.path = [p] + sys.path module = import_module(module_path) reload(module) try: attr = getattr(module, attr_name) except AttributeError as e: raise ImportError('Module "%s" does not define a "%s" attribute/class' % (module_path, attr_name)) from e return attr
def start_worker(worker_path: str, worker_class: str, burst: bool, loop: asyncio.AbstractEventLoop=None): """ Run from within the subprocess to load the worker class and execute jobs. :param worker_path: full path to the python file containing the worker definition :param worker_class: name of the worker class to be loaded and used :param burst: whether or not to run in burst mode :param loop: asyncio loop use to or None """ worker_cls = import_string(worker_path, worker_class) work_logger.info('Starting "%s" on pid=%d', worker_cls.__name__, os.getpid()) worker = worker_cls(burst=burst, loop=loop) try: worker.run_until_complete(log_redis_version=True) except HandledExit: work_logger.debug('worker exited with well handled exception') pass except Exception as e: work_logger.exception('Worker exiting after an unhandled error: %s', e.__class__.__name__) # could raise here instead of sys.exit but that causes the traceback to be printed twice, # if it's needed "raise_exc" would need to be added a new argument to the function sys.exit(1) finally: worker.loop.run_until_complete(worker.close())
[docs]class RunWorkerProcess: """ Responsible for starting a process to run the worker, monitoring it and possibly killing it. """ def __init__(self, worker_path, worker_class, burst=False): signal.signal(signal.SIGINT, self.handle_sig) signal.signal(signal.SIGTERM, self.handle_sig) self.process = None self.run_worker(worker_path, worker_class, burst) def run_worker(self, worker_path, worker_class, burst): name = 'WorkProcess' ctrl_logger.info('starting work process "%s"', name) self.process = Process(target=start_worker, args=(worker_path, worker_class, burst), name=name) self.process.start() self.process.join() if self.process.exitcode == 0: ctrl_logger.info('worker process exited ok') return ctrl_logger.critical('worker process %s exited badly with exit code %s', self.process.pid, self.process.exitcode) sys.exit(3) # could restart worker here, but better to leave it up to the real manager eg. docker restart: always def handle_sig(self, signum, frame): signal.signal(signal.SIGINT, self.handle_sig_force) signal.signal(signal.SIGTERM, self.handle_sig_force) ctrl_logger.info('got signal: %s, waiting for worker pid=%s to finish...', Signals(signum).name, self.process and self.process.pid) # sleep to make sure worker.handle_sig above has executed if it's going to and detached handle_proxy_signal time.sleep(0.01) if self.process and self.process.is_alive(): ctrl_logger.debug("sending custom shutdown signal to worker in case it didn't receive the signal") os.kill(self.process.pid, SIG_PROXY) def handle_sig_force(self, signum, frame): ctrl_logger.warning('got signal: %s again, forcing exit', Signals(signum).name) if self.process and self.process.is_alive(): ctrl_logger.error('sending worker %d SIGTERM', self.process.pid) os.kill(self.process.pid, signal.SIGTERM) raise ImmediateExit('force exit')