Source code for arq.cron

import asyncio
import dataclasses
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, Union

from .typing import WEEKDAYS, OptionType, SecondsTimedelta, WeekdayOptionType, WorkerCoroutine
from .utils import import_string, to_seconds


@dataclass
class Options:
    month: OptionType
    day: OptionType
    weekday: WeekdayOptionType
    hour: OptionType
    minute: OptionType
    second: OptionType
    microsecond: int


def next_cron(
    previous_dt: datetime,
    *,
    month: OptionType = None,
    day: OptionType = None,
    weekday: WeekdayOptionType = None,
    hour: OptionType = None,
    minute: OptionType = None,
    second: OptionType = 0,
    microsecond: int = 123_456,
) -> datetime:
    """
    Find the next datetime matching the given parameters.
    """
    dt = previous_dt + timedelta(seconds=1)
    if isinstance(weekday, str):
        weekday = WEEKDAYS.index(weekday.lower())
    options = Options(
        month=month, day=day, weekday=weekday, hour=hour, minute=minute, second=second, microsecond=microsecond
    )

    while True:
        next_dt = _get_next_dt(dt, options)
        # print(dt, next_dt)
        if next_dt is None:
            return dt
        dt = next_dt


def _get_next_dt(dt_: datetime, options: Options) -> Optional[datetime]:  # noqa: C901
    for field, v in dataclasses.asdict(options).items():
        if v is None:
            continue
        if field == 'weekday':
            next_v = dt_.weekday()
        else:
            next_v = getattr(dt_, field)
        if isinstance(v, int):
            mismatch = next_v != v
        elif isinstance(v, (set, list, tuple)):
            mismatch = next_v not in v
        else:
            raise RuntimeError(v)
        # print(field, v, next_v, mismatch)
        if mismatch:
            micro = max(dt_.microsecond - options.microsecond, 0)
            if field == 'month':
                if dt_.month == 12:
                    return datetime(dt_.year + 1, 1, 1, tzinfo=dt_.tzinfo)
                else:
                    return datetime(dt_.year, dt_.month + 1, 1, tzinfo=dt_.tzinfo)
            elif field in ('day', 'weekday'):
                return (
                    dt_
                    + timedelta(days=1)
                    - timedelta(hours=dt_.hour, minutes=dt_.minute, seconds=dt_.second, microseconds=micro)
                )
            elif field == 'hour':
                return dt_ + timedelta(hours=1) - timedelta(minutes=dt_.minute, seconds=dt_.second, microseconds=micro)
            elif field == 'minute':
                return dt_ + timedelta(minutes=1) - timedelta(seconds=dt_.second, microseconds=micro)
            elif field == 'second':
                return dt_ + timedelta(seconds=1) - timedelta(microseconds=micro)
            else:
                if field != 'microsecond':
                    raise RuntimeError(field)
                return dt_ + timedelta(microseconds=options.microsecond - dt_.microsecond)
    return None


@dataclass
class CronJob:
    name: str
    coroutine: WorkerCoroutine
    month: OptionType
    day: OptionType
    weekday: WeekdayOptionType
    hour: OptionType
    minute: OptionType
    second: OptionType
    microsecond: int
    run_at_startup: bool
    unique: bool
    job_id: Optional[str]
    timeout_s: Optional[float]
    keep_result_s: Optional[float]
    keep_result_forever: Optional[bool]
    max_tries: Optional[int]
    next_run: Optional[datetime] = None

    def calculate_next(self, prev_run: datetime) -> None:
        self.next_run = next_cron(
            prev_run,
            month=self.month,
            day=self.day,
            weekday=self.weekday,
            hour=self.hour,
            minute=self.minute,
            second=self.second,
            microsecond=self.microsecond,
        )

    def __repr__(self) -> str:
        return '<CronJob {}>'.format(' '.join(f'{k}={v}' for k, v in self.__dict__.items()))


[docs]def cron( coroutine: Union[str, WorkerCoroutine], *, name: Optional[str] = None, month: OptionType = None, day: OptionType = None, weekday: WeekdayOptionType = None, hour: OptionType = None, minute: OptionType = None, second: OptionType = 0, microsecond: int = 123_456, run_at_startup: bool = False, unique: bool = True, job_id: Optional[str] = None, timeout: Optional[SecondsTimedelta] = None, keep_result: Optional[float] = 0, keep_result_forever: Optional[bool] = False, max_tries: Optional[int] = 1, ) -> CronJob: """ Create a cron job, eg. it should be executed at specific times. Workers will enqueue this job at or just after the set times. If ``unique`` is true (the default) the job will only be run once even if multiple workers are running. :param coroutine: coroutine function to run :param name: name of the job, if None, the name of the coroutine is used :param month: month(s) to run the job on, 1 - 12 :param day: day(s) to run the job on, 1 - 31 :param weekday: week day(s) to run the job on, 0 - 6 or mon - sun :param hour: hour(s) to run the job on, 0 - 23 :param minute: minute(s) to run the job on, 0 - 59 :param second: second(s) to run the job on, 0 - 59 :param microsecond: microsecond(s) to run the job on, defaults to 123456 as the world is busier at the top of a second, 0 - 1e6 :param run_at_startup: whether to run as worker starts :param unique: whether the job should only be executed once at each time (useful if you have multiple workers) :param job_id: ID of the job, can be used to enforce job uniqueness, spanning multiple cron schedules :param timeout: job timeout :param keep_result: how long to keep the result for :param keep_result_forever: whether to keep results forever :param max_tries: maximum number of tries for the job """ if isinstance(coroutine, str): name = name or 'cron:' + coroutine coroutine_: WorkerCoroutine = import_string(coroutine) else: coroutine_ = coroutine if not asyncio.iscoroutinefunction(coroutine_): raise RuntimeError(f'{coroutine_} is not a coroutine function') timeout = to_seconds(timeout) keep_result = to_seconds(keep_result) return CronJob( name or 'cron:' + coroutine_.__qualname__, coroutine_, month, day, weekday, hour, minute, second, microsecond, run_at_startup, unique, job_id, timeout, keep_result, keep_result_forever, max_tries, )