Source code for arq.jobs

import asyncio
import logging
import pickle
import warnings
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, Optional, Tuple

from redis.asyncio import Redis

from .constants import abort_jobs_ss, default_queue_name, in_progress_key_prefix, job_key_prefix, result_key_prefix
from .utils import ms_to_datetime, poll, timestamp_ms

logger = logging.getLogger('arq.jobs')

Serializer = Callable[[Dict[str, Any]], bytes]
Deserializer = Callable[[bytes], Dict[str, Any]]


class ResultNotFound(RuntimeError):
    pass


[docs]class JobStatus(str, Enum): """ Enum of job statuses. """ #: job is in the queue, time it should be run not yet reached deferred = 'deferred' #: job is in the queue, time it should run has been reached queued = 'queued' #: job is in progress in_progress = 'in_progress' #: job is complete, result is available complete = 'complete' #: job not found in any way not_found = 'not_found'
@dataclass class JobDef: function: str args: Tuple[Any, ...] kwargs: Dict[str, Any] job_try: int enqueue_time: datetime score: Optional[int] def __post_init__(self) -> None: if isinstance(self.score, float): self.score = int(self.score) @dataclass class JobResult(JobDef): success: bool result: Any start_time: datetime finish_time: datetime queue_name: str job_id: Optional[str] = None
[docs]class Job: """ Holds data a reference to a job. """ __slots__ = 'job_id', '_redis', '_queue_name', '_deserializer' def __init__( self, job_id: str, redis: 'Redis[bytes]', _queue_name: str = default_queue_name, _deserializer: Optional[Deserializer] = None, ): self.job_id = job_id self._redis = redis self._queue_name = _queue_name self._deserializer = _deserializer
[docs] async def result( self, timeout: Optional[float] = None, *, poll_delay: float = 0.5, pole_delay: float = None ) -> Any: """ Get the result of the job or, if the job raised an exception, reraise it. This function waits for the result if it's not yet available and the job is present in the queue. Otherwise ``ResultNotFound`` is raised. :param timeout: maximum time to wait for the job result before raising ``TimeoutError``, will wait forever :param poll_delay: how often to poll redis for the job result :param pole_delay: deprecated, use poll_delay instead """ if pole_delay is not None: warnings.warn( '"pole_delay" is deprecated, use the correct spelling "poll_delay" instead', DeprecationWarning ) poll_delay = pole_delay async for delay in poll(poll_delay): async with self._redis.pipeline(transaction=True): v = await self._redis.get(result_key_prefix + self.job_id) s = await self._redis.zscore(self._queue_name, self.job_id) if v: info = deserialize_result(v, deserializer=self._deserializer) if info.success: return info.result elif isinstance(info.result, (Exception, asyncio.CancelledError)): raise info.result else: raise SerializationError(info.result) elif s is None: raise ResultNotFound( 'Not waiting for job result because the job is not in queue. ' 'Is the worker function configured to keep result?' ) if timeout is not None and delay > timeout: raise asyncio.TimeoutError()
[docs] async def info(self) -> Optional[JobDef]: """ All information on a job, including its result if it's available, does not wait for the result. """ info: Optional[JobDef] = await self.result_info() if not info: v = await self._redis.get(job_key_prefix + self.job_id) if v: info = deserialize_job(v, deserializer=self._deserializer) if info: s = await self._redis.zscore(self._queue_name, self.job_id) info.score = None if s is None else int(s) return info
[docs] async def result_info(self) -> Optional[JobResult]: """ Information about the job result if available, does not wait for the result. Does not raise an exception even if the job raised one. """ v = await self._redis.get(result_key_prefix + self.job_id) if v: return deserialize_result(v, deserializer=self._deserializer) else: return None
[docs] async def status(self) -> JobStatus: """ Status of the job. """ async with self._redis.pipeline(transaction=True) as tr: tr.exists(result_key_prefix + self.job_id) # type: ignore[unused-coroutine] tr.exists(in_progress_key_prefix + self.job_id) # type: ignore[unused-coroutine] tr.zscore(self._queue_name, self.job_id) # type: ignore[unused-coroutine] is_complete, is_in_progress, score = await tr.execute() if is_complete: return JobStatus.complete elif is_in_progress: return JobStatus.in_progress elif score: return JobStatus.deferred if score > timestamp_ms() else JobStatus.queued else: return JobStatus.not_found
[docs] async def abort(self, *, timeout: Optional[float] = None, poll_delay: float = 0.5) -> bool: """ Abort the job. :param timeout: maximum time to wait for the job result before raising ``TimeoutError``, will wait forever on None :param poll_delay: how often to poll redis for the job result :return: True if the job aborted properly, False otherwise """ job_info = await self.info() if job_info and job_info.score and job_info.score > timestamp_ms(): async with self._redis.pipeline(transaction=True) as tr: tr.zrem(self._queue_name, self.job_id) # type: ignore[unused-coroutine] tr.zadd(self._queue_name, {self.job_id: 1}) # type: ignore[unused-coroutine] await tr.execute() await self._redis.zadd(abort_jobs_ss, {self.job_id: timestamp_ms()}) try: await self.result(timeout=timeout, poll_delay=poll_delay) except asyncio.CancelledError: return True except ResultNotFound: # We do not know if the job was cancelled or not return False else: return False
def __repr__(self) -> str: return f'<arq job {self.job_id}>'
class SerializationError(RuntimeError): pass class DeserializationError(SerializationError): pass def serialize_job( function_name: str, args: Tuple[Any, ...], kwargs: Dict[str, Any], job_try: Optional[int], enqueue_time_ms: int, *, serializer: Optional[Serializer] = None, ) -> bytes: data = {'t': job_try, 'f': function_name, 'a': args, 'k': kwargs, 'et': enqueue_time_ms} if serializer is None: serializer = pickle.dumps try: return serializer(data) except Exception as e: raise SerializationError(f'unable to serialize job "{function_name}"') from e def serialize_result( function: str, args: Tuple[Any, ...], kwargs: Dict[str, Any], job_try: int, enqueue_time_ms: int, success: bool, result: Any, start_ms: int, finished_ms: int, ref: str, queue_name: str, *, serializer: Optional[Serializer] = None, ) -> Optional[bytes]: data = { 't': job_try, 'f': function, 'a': args, 'k': kwargs, 'et': enqueue_time_ms, 's': success, 'r': result, 'st': start_ms, 'ft': finished_ms, 'q': queue_name, } if serializer is None: serializer = pickle.dumps try: return serializer(data) except Exception: logger.warning('error serializing result of %s', ref, exc_info=True) # use string in case serialization fails again data.update(r='unable to serialize result', s=False) try: return serializer(data) except Exception: logger.critical('error serializing result of %s even after replacing result', ref, exc_info=True) return None def deserialize_job(r: bytes, *, deserializer: Optional[Deserializer] = None) -> JobDef: if deserializer is None: deserializer = pickle.loads try: d = deserializer(r) return JobDef( function=d['f'], args=d['a'], kwargs=d['k'], job_try=d['t'], enqueue_time=ms_to_datetime(d['et']), score=None, ) except Exception as e: raise DeserializationError('unable to deserialize job') from e def deserialize_job_raw( r: bytes, *, deserializer: Optional[Deserializer] = None ) -> Tuple[str, Tuple[Any, ...], Dict[str, Any], int, int]: if deserializer is None: deserializer = pickle.loads try: d = deserializer(r) return d['f'], d['a'], d['k'], d['t'], d['et'] except Exception as e: raise DeserializationError('unable to deserialize job') from e def deserialize_result(r: bytes, *, deserializer: Optional[Deserializer] = None) -> JobResult: if deserializer is None: deserializer = pickle.loads try: d = deserializer(r) return JobResult( job_try=d['t'], function=d['f'], args=d['a'], kwargs=d['k'], enqueue_time=ms_to_datetime(d['et']), score=None, success=d['s'], result=d['r'], start_time=ms_to_datetime(d['st']), finish_time=ms_to_datetime(d['ft']), queue_name=d.get('q', '<unknown>'), ) except Exception as e: raise DeserializationError('unable to deserialize job result') from e