Source code for arq.jobs

"""
:mod:`jobs`
===========

Defines the ``Job`` class and descendants which deal with encoding and decoding job data.
"""
import base64
import os
from datetime import datetime

import msgpack

from .utils import DEFAULT_CURTAIL, from_unix_ms, timestamp, to_unix_ms_tz, truncate

__all__ = ['JobSerialisationError', 'Job', 'DatetimeJob']


class ArqError(Exception):
    pass


class JobSerialisationError(ArqError):
    pass


def gen_random() -> str:
    """
    generate a lowercase alpha-numeric random string of length 24.

    Should have more randomness for its size thank uuid
    """
    return base64.b32encode(os.urandom(10))[:16].decode().lower()


# "device control one" should be fairly unique as a dict key and only one byte
DEVICE_CONTROL_ONE = '\x11'


[docs]class Job: """ Main Job class responsible for encoding and decoding jobs as they go into and come out of redis. """ __slots__ = 'id', 'queue', 'queued_at', 'class_name', 'func_name', 'args', 'kwargs', 'raw_queue', 'raw_data' def __init__(self, *, class_name: str, func_name: str, args: tuple, kwargs: dict, job_id: str=None, queue: str=None, raw_queue: str=None, queued_at: int=0, raw_data: bytes=None) -> None: """ Create a new job instance. :param class_name: name (see :attr:`arq.main.Actor.name`) of the actor class where the job is defined :param func_name: name of the function be called :param args: arguments to pass to the function :param kwargs: key word arguments to pass to the function :param job_id: id to use for the job, leave blank to generate a uuid """ self.class_name = class_name self.func_name = func_name self.args = args self.kwargs = kwargs self.id = self.generate_id(job_id) self.queue = queue self.raw_queue = raw_queue self.queued_at = queued_at self.raw_data = raw_data
[docs] @classmethod def decode(cls, raw_data: bytes, *, queue_name: str=None, raw_queue: str=None) -> 'Job': """ Create a job instance by decoding a job definition eg. from redis. :param raw_data: data to decode, as created by :meth:`arq.jobs.Job.encode` :param queue_name: name of the queue the job was dequeued from :param raw_queue: raw name of the queue the job was taken from """ queued_at, class_name, func_name, args, kwargs, job_id = cls.decode_raw(raw_data) return cls( class_name=class_name, func_name=func_name, args=args, kwargs=kwargs, job_id=job_id, queue=queue_name, raw_queue=raw_queue, queued_at=queued_at / 1000, raw_data=raw_data, )
[docs] def encode(self): """ Create a byte string suitable for pushing into redis which contains all required information about a job to be performed. :param job_id: id to use for the job, leave blank to generate a uuid :param queued_at: time in ms unix time when the job was queue, if None now is used :param class_name: name (see :attr:`arq.main.Actor.name`) of the actor class where the job is defined :param func_name: name of the function be called :param args: arguments to pass to the function :param kwargs: key word arguments to pass to the function """ self.queued_at = self.queued_at or int(timestamp() * 1000) try: return self.encode_raw([self.queued_at, self.class_name, self.func_name, self.args, self.kwargs, self.id]) except TypeError as e: raise JobSerialisationError(str(e)) from e
@classmethod def generate_id(cls, given_id): return given_id or gen_random()
[docs] @classmethod def msgpack_encoder(cls, obj): """ The default msgpack encoder, adds support for encoding sets. """ if isinstance(obj, set): return {DEVICE_CONTROL_ONE: list(obj)} else: return obj
@classmethod def msgpack_object_hook(cls, obj): if len(obj) == 1 and DEVICE_CONTROL_ONE in obj: return set(obj[DEVICE_CONTROL_ONE]) return obj @classmethod def encode_raw(cls, data) -> bytes: return msgpack.packb(data, default=cls.msgpack_encoder, use_bin_type=True) @classmethod def decode_raw(cls, data: bytes): return msgpack.unpackb(data, object_hook=cls.msgpack_object_hook, raw=False) def to_string(self, args_curtail=DEFAULT_CURTAIL): arguments = '' if self.args: arguments = ', '.join(map(str, self.args)) if self.kwargs: if arguments: arguments += ', ' arguments += ', '.join(f'{k}={v!r}' for k, v in sorted(self.kwargs.items())) return '{s.id:.6} {s.class_name}.{s.func_name}({args})'.format(s=self, args=truncate(arguments, args_curtail)) def short_ref(self): return '{s.id:.6} {s.class_name}.{s.func_name}'.format(s=self) def __str__(self): return self.to_string() def __repr__(self): return f'<Job {self} on {self.queue}>'
DEVICE_CONTROL_TWO = '\x12' TIMEZONE = 'O'
[docs]class DatetimeJob(Job): """ Alternative Job which copes with datetimes. None timezone naïve dates are supported but the returned datetimes will use a :mod:`datetime.timezone` class to define the timezone regardless of the timezone class originally used on the datetime object (eg. ``pytz``). """
[docs] @classmethod def msgpack_encoder(cls, obj): if isinstance(obj, datetime): ts, tz = to_unix_ms_tz(obj) result = {DEVICE_CONTROL_TWO: ts} if tz is not None: result[TIMEZONE] = tz return result else: return super().msgpack_encoder(obj)
@classmethod def msgpack_object_hook(cls, obj): if len(obj) <= 2 and DEVICE_CONTROL_TWO in obj: return from_unix_ms(obj[DEVICE_CONTROL_TWO], utcoffset=obj.get(TIMEZONE)) else: return super().msgpack_object_hook(obj)