arq¶
Current Version: v0.26.3
Job queues and RPC in python with asyncio and redis.
arq was conceived as a simple, modern and performant successor to rq.
Warning
In v0.16 arq was COMPLETELY REWRITTEN to use an entirely different approach to registering workers,
enqueueing jobs and processing jobs. You will need to either keep using v0.15 or entirely rewrite your arq
integration to use v0.16.
See here for old docs.
Why use arq?
- non-blocking
- arq is built using python 3’s asyncio allowing non-blocking job enqueuing and execution. Multiple jobs (potentially hundreds) can be run simultaneously using a pool of asyncio - Tasks.
- powerful-features
- Deferred execution, easy retrying of jobs, and pessimistic execution (see below) means arq is great for critical jobs that must be completed. 
- fast
- Asyncio and no forking make arq around 7x faster than rq for short jobs with no io. With io that might increase to around 40x faster. (TODO) 
- elegant
- I’m a long time contributor to and user of rq, arq is designed to be simpler, clearer and more powerful. 
- small
- and easy to reason with - currently arq is only about 700 lines, that won’t change significantly. 
Install¶
Just:
pip install arq
Redesigned to be less elegant?¶
The approach used in arq v0.16 of enqueueing jobs by name rather than “just calling a function” and knowing it
will be called on the worker (as used in arq <= v0.15, rq, celery et al.) might seem less elegant,
but it’s for good reason.
This approach means your frontend (calling the worker) doesn’t need access to the worker code, meaning better code separation and possibly smaller images etc.
Usage¶
Warning
Jobs may be called more than once!
arq v0.16 has what I’m calling “pessimistic execution”: jobs aren’t removed from the queue until they’ve either succeeded or failed. If the worker shuts down, the job will be cancelled immediately and will remain in the queue to be run again when the worker starts up again (or run by another worker which is still running).
(This differs from other similar libraries like arq <= v0.15, rq, celery et al. where jobs generally don’t get
rerun when a worker shuts down. This in turn requires complex logic to try and let jobs finish before
shutting down (I wrote the HerokuWorker for rq), however this never really works unless either: all jobs take
less than 6 seconds or your worker never shuts down when a job is running (impossible).)
All arq jobs should therefore be designed to cope with being called repeatedly if they’re cancelled, eg. use database transactions, idempotency keys or redis to mark when an API request or similar has succeeded to avoid making it twice.
In summary: sometimes exactly once can be hard or impossible, arq favours multiple times over zero times.
Simple Usage¶
import asyncio
from httpx import AsyncClient
from arq import create_pool
from arq.connections import RedisSettings
# Here you can configure the Redis connection.
# The default is to connect to localhost:6379, no password.
REDIS_SETTINGS = RedisSettings()
async def download_content(ctx, url):
    session: AsyncClient = ctx['session']
    response = await session.get(url)
    print(f'{url}: {response.text:.80}...')
    return len(response.text)
async def startup(ctx):
    ctx['session'] = AsyncClient()
async def shutdown(ctx):
    await ctx['session'].aclose()
async def main():
    redis = await create_pool(REDIS_SETTINGS)
    for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'):
        await redis.enqueue_job('download_content', url)
# WorkerSettings defines the settings to use when creating the work,
# It's used by the arq CLI.
# redis_settings might be omitted here if using the default settings
# For a list of all available settings, see https://arq-docs.helpmanual.io/#arq.worker.Worker
class WorkerSettings:
    functions = [download_content]
    on_startup = startup
    on_shutdown = shutdown
    redis_settings = REDIS_SETTINGS
if __name__ == '__main__':
    asyncio.run(main())
(This script is complete, it should run “as is” both to enqueue jobs and run them)
To enqueue the jobs, simply run the script:
python demo.py
To execute the jobs, either after running demo.py or before/during:
arq demo.WorkerSettings
Append --burst to stop the worker once all jobs have finished. See arq.worker.Worker for more available
properties of WorkerSettings.
You can also watch for changes and reload the worker when the source changes:
arq demo.WorkerSettings --watch path/to/src
This requires watchfiles to be installed (pip install watchfiles).
For details on the arq CLI:
arq --help
Startup & Shutdown coroutines¶
The on_startup and on_shutdown coroutines are provided as a convenient way to run logic as the worker
starts and finishes, see arq.worker.Worker.
For example, in the above example session is created once when the work starts up and is then used in subsequent
jobs.
Deferring Jobs¶
By default, when a job is enqueued it will run as soon as possible (provided a worker is running). However
you can schedule jobs to run in the future, either by a given duration (_defer_by) or
at a particular time _defer_until, see arq.connections.ArqRedis.enqueue_job().
import asyncio
from datetime import datetime, timedelta
from arq import create_pool
from arq.connections import RedisSettings
async def the_task(ctx):
    print('this is the tasks, delay since enqueueing:', datetime.now() - ctx['enqueue_time'])
async def main():
    redis = await create_pool(RedisSettings())
    # deferred by 10 seconds
    await redis.enqueue_job('the_task', _defer_by=10)
    # deferred by 1 minute
    await redis.enqueue_job('the_task', _defer_by=timedelta(minutes=1))
    # deferred until jan 28th 2032, you'll be waiting a long time for this...
    await redis.enqueue_job('the_task', _defer_until=datetime(2032, 1, 28))
class WorkerSettings:
    functions = [the_task]
if __name__ == '__main__':
    asyncio.run(main())
Job Uniqueness¶
Sometimes you want a job to only be run once at a time (eg. a backup) or once for a given parameter (eg. generating invoices for a particular company).
arq supports this via custom job ids, see arq.connections.ArqRedis.enqueue_job(). It guarantees
that a job with a particular ID cannot be enqueued again until its execution has finished and its result has cleared. To control when a finished job’s result clears, you can use the keep_result setting on your worker, see arq.worker.func().
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
from arq.jobs import Job
async def the_task(ctx):
    print('running the task with id', ctx['job_id'])
async def main():
    redis = await create_pool(RedisSettings())
    # no id, random id will be generated
    job1 = await redis.enqueue_job('the_task')
    print(job1)
    """
    >  <arq job 99edfef86ccf4145b2f64ee160fa3297>
    """
    # random id again, again the job will be enqueued and a job will be returned
    job2 = await redis.enqueue_job('the_task')
    print(job2)
    """
    >  <arq job 7d2163c056e54b62a4d8404921094f05>
    """
    # custom job id, job will be enqueued
    job3 = await redis.enqueue_job('the_task', _job_id='foobar')
    print(job3)
    """
    >  <arq job foobar>
    """
    # same custom job id, job will not be enqueued and enqueue_job will return None
    job4 = await redis.enqueue_job('the_task', _job_id='foobar')
    print(job4)
    """
    >  None
    """
    # you can retrieve jobs by using arq.jobs.Job
    await redis.enqueue_job('the_task', _job_id='my_job')
    job5 = Job(job_id='my_job', redis=redis)
    print(job5)
    """
    <arq job my_job>
    """
class WorkerSettings:
    functions = [the_task]
if __name__ == '__main__':
    asyncio.run(main())
The check of job_id uniqueness in the queue is performed using a redis transaction so you can be certain jobs
with the same id won’t be enqueued twice (or overwritten) even if they’re enqueued at exactly the same time.
Job Results¶
You can access job information, status and job results using the arq.jobs.Job instance returned from
arq.connections.ArqRedis.enqueue_job().
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
# requires `pip install devtools`, used for pretty printing of job info
from devtools import debug
async def the_task(ctx):
    print('running the task')
    return 42
async def main():
    redis = await create_pool(RedisSettings())
    job = await redis.enqueue_job('the_task')
    # get the job's id
    print(job.job_id)
    """
    >  68362958a244465b9be909db4b7b5ab4 (or whatever)
    """
    # get information about the job, will include results if the job has finished, but
    # doesn't await the job's result
    debug(await job.info())
    """
    >   docs/examples/job_results.py:23 main
    JobDef(
        function='the_task',
        args=(),
        kwargs={},
        job_try=None,
        enqueue_time=datetime.datetime(2019, 4, 23, 13, 58, 56, 781000),
        score=1556027936781
    ) (JobDef)
    """
    # get the Job's status
    print(await job.status())
    """
    >  JobStatus.queued
    """
    # poll redis for the job result, if the job raised an exception,
    # it will be raised here
    # (You'll need the worker running at the same time to get a result here)
    print(await job.result(timeout=5))
    """
    >  42
    """
class WorkerSettings:
    functions = [the_task]
if __name__ == '__main__':
    asyncio.run(main())
Retrying jobs and cancellation¶
As described above, when an arq worker shuts down, any ongoing jobs are cancelled immediately
(via vanilla task.cancel(), so a CancelledError will be raised). You can see this by running a slow job
(eg. add await asyncio.sleep(5)) and hitting Ctrl+C once it’s started.
You’ll get something like.
➤  arq slow_job.WorkerSettings
12:42:38: Starting worker for 1 functions: the_task
12:42:38: redis_version=4.0.9 mem_usage=904.50K clients_connected=4 db_keys=3
12:42:38:  10.23s → c3dd4acc171541b9ac10b1d791750cde:the_task() delayed=10.23s
^C12:42:40: shutdown on SIGINT ◆ 0 jobs complete ◆ 0 failed ◆ 0 retries ◆ 1 ongoing to cancel
12:42:40:   1.16s ↻ c3dd4acc171541b9ac10b1d791750cde:the_task cancelled, will be run again
➤  arq slow_job.WorkerSettings
12:42:50: Starting worker for 1 functions: the_task
12:42:50: redis_version=4.0.9 mem_usage=904.61K clients_connected=4 db_keys=4
12:42:50:  21.78s → c3dd4acc171541b9ac10b1d791750cde:the_task() try=2 delayed=21.78s
12:42:55:   5.00s ← c3dd4acc171541b9ac10b1d791750cde:the_task ●
^C12:42:57: shutdown on SIGINT ◆ 1 jobs complete ◆ 0 failed ◆ 0 retries ◆ 0 ongoing to cancel
You can also retry jobs by raising the arq.worker.Retry exception from within a job,
optionally with a duration to defer rerunning the jobs by:
import asyncio
from httpx import AsyncClient
from arq import create_pool, Retry
from arq.connections import RedisSettings
async def download_content(ctx, url):
    session: AsyncClient = ctx['session']
    response = await session.get(url)
    if response.status_code != 200:
        # retry the job with increasing back-off
        # delays will be 5s, 10s, 15s, 20s
        # after max_tries (default 5) the job will permanently fail
        raise Retry(defer=ctx['job_try'] * 5)
    return len(response.text)
async def startup(ctx):
    ctx['session'] = AsyncClient()
async def shutdown(ctx):
    await ctx['session'].aclose()
async def main():
    redis = await create_pool(RedisSettings())
    await redis.enqueue_job('download_content', 'https://httpbin.org/status/503')
class WorkerSettings:
    functions = [download_content]
    on_startup = startup
    on_shutdown = shutdown
if __name__ == '__main__':
    asyncio.run(main())
To abort a job, call arq.job.Job.abort(). (Note for the arq.job.Job.abort() method to
have any effect, you need to set allow_abort_jobs to True on the worker, this is for performance reason.
allow_abort_jobs=True may become the default in future)
arq.job.Job.abort() will abort a job if it’s already running or prevent it being run if it’s currently
in the queue.
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
async def do_stuff(ctx):
    print('doing stuff...')
    await asyncio.sleep(10)
    return 'stuff done'
async def main():
    redis = await create_pool(RedisSettings())
    job = await redis.enqueue_job('do_stuff')
    await asyncio.sleep(1)
    await job.abort()
class WorkerSettings:
    functions = [do_stuff]
    allow_abort_jobs = True
if __name__ == '__main__':
    asyncio.run(main())
Health checks¶
arq will automatically record some info about its current state in redis every health_check_interval seconds.
That key/value will expire after health_check_interval + 1 seconds so you can be sure if the variable exists arq
is alive and kicking (technically you can be sure it was alive and kicking health_check_interval seconds ago).
You can run a health check with the CLI (assuming you’re using the first example above):
arq --check demo.WorkerSettings
The command will output the value of the health check if found;
then exit 0 if the key was found and 1 if it was not.
A health check value takes the following form:
Mar-01 17:41:22 j_complete=0 j_failed=0 j_retried=0 j_ongoing=0 queued=0
Where the items have the following meaning:
- j_completethe number of jobs completed
- j_failedthe number of jobs which have failed eg. raised an exception
- j_ongoingthe number of jobs currently being performed
- j_retriedthe number of jobs retries run
Cron Jobs¶
Functions can be scheduled to be run periodically at specific times. See arq.cron.cron().
from arq import cron
async def run_regularly(ctx):
    print('run foo job at 9.12am, 12.12pm and 6.12pm')
class WorkerSettings:
    cron_jobs = [
        cron(run_regularly, hour={9, 12, 18}, minute=12)
    ]
Usage roughly shadows cron except None is equivalent on * in crontab.
As per the example sets can be used to run at multiple of the given unit.
Note that second defaults to 0 so you don’t in inadvertently run jobs every second and microsecond
defaults to 123456 so you don’t inadvertently run jobs every microsecond and so arq avoids enqueuing jobs
at the top of a second when the world is generally slightly busier.
Synchronous Jobs¶
Functions that can block the loop for extended periods should be run in an executor like
concurrent.futures.ThreadPoolExecutor or concurrent.futures.ProcessPoolExecutor using
loop.run_in_executor as shown below.
import time
import functools
import asyncio
from concurrent import futures
def sync_task(t):
    return time.sleep(t)
async def the_task(ctx, t):
    blocking = functools.partial(sync_task, t)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(ctx['pool'], blocking)
async def startup(ctx):
    ctx['pool'] = futures.ProcessPoolExecutor()
class WorkerSettings:
    functions = [the_task]
    on_startup = startup
Custom job serializers¶
By default, arq will use the built-in pickle module to serialize and deserialize jobs. If you wish to
use an alternative serialization methods, you can do so by specifying them when creating the connection pool
and the worker settings. A serializer function takes a Python object and returns a binary representation
encoded in a bytes object. A deserializer function, on the other hand, creates Python objects out of
a bytes sequence.
Warning
It is essential that the serialization functions used by arq.connections.create_pool() and
arq.worker.Worker are the same, otherwise jobs created by the former cannot be executed by the
latter. This also applies when you update your serialization functions: you need to ensure that your new
functions are backward compatible with the old jobs, or that there are no jobs with the older serialization
scheme in the queue.
Here is an example with MsgPack, an efficient binary serialization format that may enable significant memory improvements over pickle:
import asyncio
import msgpack  # installable with "pip install msgpack"
from arq import create_pool
from arq.connections import RedisSettings
async def the_task(ctx):
    return 42
async def main():
    redis = await create_pool(
        RedisSettings(),
        job_serializer=msgpack.packb,
        job_deserializer=lambda b: msgpack.unpackb(b, raw=False),
    )
    await redis.enqueue_job('the_task')
class WorkerSettings:
    functions = [the_task]
    job_serializer = msgpack.packb
    # refer to MsgPack's documentation as to why raw=False is required
    job_deserializer = lambda b: msgpack.unpackb(b, raw=False)
if __name__ == '__main__':
    asyncio.run(main())
Reference¶
- class arq.connections.RedisSettings(host: Union[str, List[Tuple[str, int]]] = 'localhost', port: int = 6379, unix_socket_path: Optional[str] = None, database: int = 0, username: Optional[str] = None, password: Optional[str] = None, ssl: bool = False, ssl_keyfile: Optional[str] = None, ssl_certfile: Optional[str] = None, ssl_cert_reqs: str = 'required', ssl_ca_certs: Optional[str] = None, ssl_ca_data: Optional[str] = None, ssl_check_hostname: bool = False, conn_timeout: int = 1, conn_retries: int = 5, conn_retry_delay: int = 1, max_connections: Optional[int] = None, sentinel: bool = False, sentinel_master: str = 'mymaster', retry_on_timeout: bool = False, retry_on_error: Optional[List[Exception]] = None, retry: Optional[Retry] = None)[source]¶
- No-Op class used to hold redis connection redis_settings. - Used by - arq.connections.create_pool()and- arq.worker.Worker.
- class arq.connections.ArqRedis(pool_or_conn: Optional[ConnectionPool] = None, job_serializer: Optional[Callable[[Dict[str, Any]], bytes]] = None, job_deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None, default_queue_name: str = 'arq:queue', expires_extra_ms: int = 86400000, **kwargs: Any)[source]¶
- Thin subclass of - redis.asyncio.Rediswhich adds- arq.connections.enqueue_job().- Parameters:
- redis_settings – an instance of - arq.connections.RedisSettings.
- job_serializer – a function that serializes Python objects to bytes, defaults to pickle.dumps 
- job_deserializer – a function that deserializes bytes into Python objects, defaults to pickle.loads 
- default_queue_name – the default queue name to use, defaults to - arq.queue.
- expires_extra_ms – the default length of time from when a job is expected to start after which the job expires, defaults to 1 day in ms. 
- kwargs – keyword arguments directly passed to - redis.asyncio.Redis.
 
 - Initialize a new Redis client. To specify a retry policy for specific errors, first set retry_on_error to a list of the error/s to retry on, then set retry to a valid Retry object. To retry on TimeoutError, retry_on_timeout can also be set to True. - async enqueue_job(function: str, *args: Any, _job_id: Optional[str] = None, _queue_name: Optional[str] = None, _defer_until: Optional[datetime] = None, _defer_by: Union[None, int, float, timedelta] = None, _expires: Union[None, int, float, timedelta] = None, _job_try: Optional[int] = None, **kwargs: Any) Optional[Job][source]¶
- Enqueue a job. - Parameters:
- function – Name of the function to call 
- args – args to pass to the function 
- _job_id – ID of the job, can be used to enforce job uniqueness 
- _queue_name – queue of the job, can be used to create job in different queue 
- _defer_until – datetime at which to run the job 
- _defer_by – duration to wait before running the job 
- _expires – do not start or retry a job after this duration; defaults to 24 hours plus deferring time, if any 
- _job_try – useful when re-enqueueing jobs within a job 
- kwargs – any keyword arguments to pass to the function 
 
- Returns:
- arq.jobs.Jobinstance or- Noneif a job with this ID already exists
 
 
- async arq.connections.create_pool(settings_: Optional[RedisSettings] = None, *, retry: int = 0, job_serializer: Optional[Callable[[Dict[str, Any]], bytes]] = None, job_deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None, default_queue_name: str = 'arq:queue', expires_extra_ms: int = 86400000) ArqRedis[source]¶
- Create a new redis pool, retrying up to - conn_retriestimes if the connection fails.- Returns a - arq.connections.ArqRedisinstance, thus allowing job enqueuing.
- arq.worker.func(coroutine: Union[str, Function, WorkerCoroutine], *, name: Optional[str] = None, keep_result: Optional[SecondsTimedelta] = None, timeout: Optional[SecondsTimedelta] = None, keep_result_forever: Optional[bool] = None, max_tries: Optional[int] = None) Function[source]¶
- Wrapper for a job function which lets you configure more settings. - Parameters:
- coroutine – coroutine function to call, can be a string to import 
- name – name for function, if None, - coroutine.__qualname__is used
- keep_result – duration to keep the result for, if 0 the result is not kept 
- keep_result_forever – whether to keep results forever, if None use Worker default, wins over - keep_result
- timeout – maximum time the job should take 
- max_tries – maximum number of tries allowed for the function, use 1 to prevent retrying 
 
 
- exception arq.worker.Retry(defer: Optional[SecondsTimedelta] = None)[source]¶
- Special exception to retry the job (if - max_retrieshasn’t been reached).- Parameters:
- defer – duration to wait before rerunning the job 
 
- class arq.worker.Worker(functions: Sequence[Union[Function, WorkerCoroutine]] = (), *, queue_name: Optional[str] = 'arq:queue', cron_jobs: Optional[Sequence[CronJob]] = None, redis_settings: Optional[RedisSettings] = None, redis_pool: Optional[ArqRedis] = None, burst: bool = False, on_startup: Optional[StartupShutdown] = None, on_shutdown: Optional[StartupShutdown] = None, on_job_start: Optional[StartupShutdown] = None, on_job_end: Optional[StartupShutdown] = None, after_job_end: Optional[StartupShutdown] = None, handle_signals: bool = True, job_completion_wait: int = 0, max_jobs: int = 10, job_timeout: SecondsTimedelta = 300, keep_result: SecondsTimedelta = 3600, keep_result_forever: bool = False, poll_delay: SecondsTimedelta = 0.5, queue_read_limit: Optional[int] = None, max_tries: int = 5, health_check_interval: SecondsTimedelta = 3600, health_check_key: Optional[str] = None, ctx: Optional[Dict[Any, Any]] = None, retry_jobs: bool = True, allow_abort_jobs: bool = False, max_burst_jobs: int = -1, job_serializer: Optional[Callable[[Dict[str, Any]], bytes]] = None, job_deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None, expires_extra_ms: int = 86400000, timezone: Optional[timezone] = None, log_results: bool = True)[source]¶
- Main class for running jobs. - Parameters:
- functions – list of functions to register, can either be raw coroutine functions or the result of - arq.worker.func().
- queue_name – queue name to get jobs from 
- cron_jobs – list of cron jobs to run, use - arq.cron.cron()to create them
- redis_settings – settings for creating a redis connection 
- redis_pool – existing redis pool, generally None 
- burst – whether to stop the worker once all jobs have been run 
- on_startup – coroutine function to run at startup 
- on_shutdown – coroutine function to run at shutdown 
- on_job_start – coroutine function to run on job start 
- on_job_end – coroutine function to run on job end 
- after_job_end – coroutine function to run after job has ended and results have been recorded 
- handle_signals – default true, register signal handlers, set to false when running inside other async framework 
- job_completion_wait – time to wait before cancelling tasks after a signal. Useful together with - terminationGracePeriodSecondsin kubernetes, when you want to make the pod complete jobs before shutting down. The worker will not pick new tasks while waiting for shut down.
- max_jobs – maximum number of jobs to run at a time 
- job_timeout – default job timeout (max run time) 
- keep_result – default duration to keep job results for 
- keep_result_forever – whether to keep results forever 
- poll_delay – duration between polling the queue for new jobs 
- queue_read_limit – the maximum number of jobs to pull from the queue each time it’s polled. By default it equals - max_jobs* 5, or 100; whichever is higher.
- max_tries – default maximum number of times to retry a job 
- health_check_interval – how often to set the health check key 
- health_check_key – redis key under which health check is set 
- ctx – dictionary to hold extra user defined state 
- retry_jobs – whether to retry jobs on Retry or CancelledError or not 
- allow_abort_jobs – whether to abort jobs on a call to - arq.jobs.Job.abort()
- max_burst_jobs – the maximum number of jobs to process in burst mode (disabled with negative values) 
- job_serializer – a function that serializes Python objects to bytes, defaults to pickle.dumps 
- job_deserializer – a function that deserializes bytes into Python objects, defaults to pickle.loads 
- expires_extra_ms – the default length of time from when a job is expected to start after which the job expires, defaults to 1 day in ms. 
- timezone – timezone used for evaluation of cron schedules, defaults to system timezone 
- log_results – when set to true (default) results for successful jobs will be logged 
 
 - async async_run() None[source]¶
- Asynchronously run the worker, does not close connections. Useful when testing. 
 - async run_check(retry_jobs: Optional[bool] = None, max_burst_jobs: Optional[int] = None) int[source]¶
- Run - arq.worker.Worker.async_run(), check for failed jobs and raise- arq.worker.FailedJobsif any jobs have failed.- Returns:
- number of completed jobs 
 
 - async start_jobs(job_ids: List[bytes]) None[source]¶
- For each job id, get the job definition, check it’s not running and start it in a task 
 - handle_sig_wait_for_completion(signum: Signals) None[source]¶
- Alternative signal handler that allow tasks to complete within a given time before shutting down the worker. Time can be configured using wait_for_job_completion_on_signal_second. The worker will stop picking jobs when signal has been received. 
 
- arq.cron.cron(coroutine: Union[str, WorkerCoroutine], *, name: Optional[str] = None, month: Union[None, Set[int], int] = None, day: Union[None, Set[int], int] = None, weekday: Union[None, Set[int], int, Literal['mon', 'tues', 'wed', 'thurs', 'fri', 'sat', 'sun']] = None, hour: Union[None, Set[int], int] = None, minute: Union[None, Set[int], int] = None, second: Union[None, Set[int], int] = 0, microsecond: int = 123456, run_at_startup: bool = False, unique: bool = True, job_id: Optional[str] = None, timeout: Optional[Union[int, float, timedelta]] = None, keep_result: Optional[float] = 0, keep_result_forever: Optional[bool] = False, max_tries: Optional[int] = 1) CronJob[source]¶
- Create a cron job, eg. it should be executed at specific times. - Workers will enqueue this job at or just after the set times. If - uniqueis true (the default) the job will only be run once even if multiple workers are running.- Parameters:
- coroutine – coroutine function to run 
- name – name of the job, if None, the name of the coroutine is used 
- month – month(s) to run the job on, 1 - 12 
- day – day(s) to run the job on, 1 - 31 
- weekday – week day(s) to run the job on, 0 - 6 or mon - sun 
- hour – hour(s) to run the job on, 0 - 23 
- minute – minute(s) to run the job on, 0 - 59 
- second – second(s) to run the job on, 0 - 59 
- microsecond – microsecond(s) to run the job on, defaults to 123456 as the world is busier at the top of a second, 0 - 1e6 
- run_at_startup – whether to run as worker starts 
- unique – whether the job should only be executed once at each time (useful if you have multiple workers) 
- job_id – ID of the job, can be used to enforce job uniqueness, spanning multiple cron schedules 
- timeout – job timeout 
- keep_result – how long to keep the result for 
- keep_result_forever – whether to keep results forever 
- max_tries – maximum number of tries for the job 
 
 
- class arq.jobs.JobStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
- Enum of job statuses. - deferred = 'deferred'¶
- job is in the queue, time it should be run not yet reached 
 - queued = 'queued'¶
- job is in the queue, time it should run has been reached 
 - in_progress = 'in_progress'¶
- job is in progress 
 - complete = 'complete'¶
- job is complete, result is available 
 - not_found = 'not_found'¶
- job not found in any way 
 
- class arq.jobs.Job(job_id: str, redis: Redis[bytes], _queue_name: str = 'arq:queue', _deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None)[source]¶
- Holds data a reference to a job. - async result(timeout: Optional[float] = None, *, poll_delay: float = 0.5, pole_delay: Optional[float] = None) Any[source]¶
- Get the result of the job or, if the job raised an exception, reraise it. - This function waits for the result if it’s not yet available and the job is present in the queue. Otherwise - ResultNotFoundis raised.- Parameters:
- timeout – maximum time to wait for the job result before raising - TimeoutError, will wait forever
- poll_delay – how often to poll redis for the job result 
- pole_delay – deprecated, use poll_delay instead 
 
 
 - async info() Optional[JobDef][source]¶
- All information on a job, including its result if it’s available, does not wait for the result. 
 - async result_info() Optional[JobResult][source]¶
- Information about the job result if available, does not wait for the result. Does not raise an exception even if the job raised one. 
 - async abort(*, timeout: Optional[float] = None, poll_delay: float = 0.5) bool[source]¶
- Abort the job. - Parameters:
- timeout – maximum time to wait for the job result before raising - TimeoutError, will wait forever on None
- poll_delay – how often to poll redis for the job result 
 
- Returns:
- True if the job aborted properly, False otherwise 
 
 
History¶
v0.26.3 (2025-01-06)¶
- Fix negative expires_ms and avoid worker freezing while using cron by @Matvey-Kuk in #479 
- Fix race condition on task retry by @RB387 in #487 
v0.26.1 (2023-08-29)¶
- Uses - testcontainersto provide a redis instance for the unit tests by @chrisguidry in #470
- Bump - redisversion from <5 to <6 by @Wh1isper in #460
- Bump - idnafrom 3.6 to 3.7 in /requirements by @dependabot in #444
- Bump - jinja2from 3.1.3 to 3.1.4 in /requirements by @dependabot in #452
- Bump - requestsfrom 2.31.0 to 2.32.0 in /requirements by @dependabot in #461
- Bump - urllib3from 2.2.1 to 2.2.2 in /requirements by @dependabot in #464
- Bump - certififrom 2024.2.2 to 2024.7.4 in /requirements by @dependabot in #468
v0.26.0 (2023-05-01)¶
No changes since v0.26.0b1.
v0.26.0b1 (2023-04-01)¶
- Prevent worker getting stuck in terminating state by @JonasKs in #370 
- Fix redis pipeline created and not used by @iamlikeme in #374 
- Bump certifi from 2022.6.15 to 2022.12.7 in - /requirementsby @dependabot in #373
- Use instance’s default queue for - queued_jobsdefault by @phy1729 in #368
- Docs: Add details about reusing a unique job id by @ross-nordstrom in #391 
- Delete - setup.pyin #398
- Adding a job counter to address - Semaphoreissues by @rm-21 in #408
- docs: add documentation on how to retrieve running jobs by @JonasKs in #377 
- feat: add - job_idto- JobDef, closing #376 by @JonasKs in #378
- chore: update dependencies, fixing tests by @JonasKs in #382 
- refactor: refactor all asserts into raise - <exception>, close #371 by @JonasKs in #379
- Fix: timezone info occasionally removed from cron job execution time by @iamlikeme in #383 
- 3.12 support, drop 3.7, uprev dependencies by @samuelcolvin in #439 
- Extend - RedisSettingsto include redis Retry Helper settings by @mernmic in #387
- Allow to connect to Redis using a Unix socket URL… by @drygdryg in #392 
- Allow infinite retry by @vvmruder in #396 
- Allow - max_connectionsto be set in RedisSettings by @danbox in #406
- Improve - RedisSettingsexplanation in- main_demo.pyby @RamonGiovane in #422
- uprev to v0.26.0b1 by @samuelcolvin in #440 
v0.25 (2022-12-02)¶
- Allow to opt-out from logging results by @iamlikeme in #352 
- Add timezone support for cron jobs by @iamlikeme in #354 
- connections: fix pipeline usage for exists command by @utkarshgupta137 in #366 
- Fix race condition causing incorrect status not found by @iamlikeme in #362 
- Adds - after_job_endhook by @AngellusMortis in #355
- Raise - ResultNotFoundwhen- Job.result()finds no job and no result by @iamlikeme in #364
- use - 3.11for testing #367
- Signal handler to wait for task completion before shutting down by @JonasKs in #345 
v0.24 (2022-09-05)¶
- Allow customisation of timezone in logs, #281 
- Add the - usernameoption to- RedisSettings, #299
- Change primary branch name to - main, 40c8803
- Add - --custom-log-dictCLI option, #294
- Fix error in case of pytz not being installed, #318 
- Support and test python 3.11, #327 
- Improve docs for parameter - _expiresin- enqueue_job, #313
- Fix redis ssl support, #323 
- Fix recursion while waiting for redis connection, #311 
- switch from watchgod to watchfiles, #332 
- Simplify dependencies, drop pydantic as a dependency., #334 
- Allow use of - unix_socket_pathin- RedisSettings, #336
- Allow user to configure a default job expiry-extra length, #303 
- Remove transaction around - infocommand to support Redis 6.2.3, #338
- Switch from - setup.pyto- pyproject.toml, #341
- Support - abortfor deferred jobs, #307
v0.23 (2022-08-23)¶
No changes from v0.23a1.
v0.23a1 (2022-03-09)¶
- Fix jobs timeout by @kiriusm2 in #248 
- Update - index.rstby @Kludex in #266
- Improve some docs wording by @johtso in #285 
- fix error when cron jobs were terminanted by @tobymao in #273 
- add - on_job_startand- on_job_endhooks by @tobymao in #274
- Update argument docstring definition by @sondrelg in #278 
- fix tests and uprev test dependencies, #288 
- Add link to WorkerSettings in documentation by @JonasKs in #279 
- Allow setting - job_idon cron jobs by @JonasKs in #293
- Fix docs typo by @johtso in #296 
- support aioredis v2 by @Yolley in #259 
- support python 3.10, #298 
v0.22 (2021-09-02)¶
- fix package importing in example, #261, thanks @cdpath 
- restrict - aioredisto- <2.0.0(soon we’ll support- aioredis>=2.0.0), #258, thanks @PaxPrz
- auto setting version on release, 759fe03 
v0.21 (2021-07-06)¶
- CI improvements #243 
- fix - log_redis_info#255
v0.20 (2021-04-26)¶
- Added - queue_nameattribute to- JobResult, #198
- set - job_deserializer,- job_serializerand- default_queue_nameon worker pools to better supported nested jobs, #203, #215 and #218
- All job results to be kept indefinitely, #205 
- refactor - cronjobs to prevent duplicate jobs, #200
- correctly handle - CancelledErrorin python 3.8+, #213
- allow jobs to be aborted, #212 
- depreciate - pole_delayand use correct spelling- poll_delay, #242
- docs improvements, #207 and #232 
v0.19.1 (2020-10-26)¶
- fix timestamp issue in _defer_until without timezone offset, #182 
- add option to disable signal handler registration from running inside other frameworks, #183 
- add - default_queue_nameto- create_redis_pooland- ArqRedis, #191
- Workercan retrieve the- queue_namefrom the connection pool, if present
- fix potential race condition when starting jobs, #194 
- support python 3.9 and pydantic 1.7, #214 
v0.19.0 (2020-04-24)¶
- Python 3.8 support, #178 
- fix concurrency with multiple workers, #180 
- full mypy coverage, #181 
v0.18.4 (2019-12-19)¶
- Add - py.typedfile to tell mypy the package has type hints, #163
- Added - ssloption to- RedisSettings, #165
v0.18.3 (2019-11-13)¶
- Include - queue_namewhen for job object in response to- enqueue_job, #160
v0.18.2 (2019-11-01)¶
- Fix cron scheduling on a specific queue, by @dmvass and @Tinche 
v0.18.1 (2019-10-28)¶
- add support for Redis Sentinel fix #132 
- fix - Worker.abort_jobinvalid expire time error, by @dmvass
v0.18 (2019-08-30)¶
- fix usage of - max_burst_jobs, improve coverage fix #152
- stop lots of - WatchVariableErrorerrors in log, #153
v0.17.1 (2019-08-21)¶
- deal better with failed job deserialization, #149 by @samuelcolvin 
- fix - run_check(xmax_burst_jobs=...)when a jobs fails, #150 by @samuelcolvin
v0.17 (2019-08-11)¶
- add - worker.queue_read_limit, fix #141, by @rubik
- custom serializers, eg. to use msgpack rather than pickle, #143 by @rubik 
- add - ArqRedis.queued_jobsutility method for getting queued jobs while testing, fix #145 by @samuelcolvin
v0.16.1 (2019-08-02)¶
- prevent duplicate - job_idwhen job result exists, fix #137
- add “don’t retry mode” via - worker.retry_jobs = False, fix #139
- add - worker.max_burst_jobs
v0.16 (2019-07-30)¶
- improved error when a job is aborted (eg. function not found) 
v0.16.0b3 (2019-05-14)¶
- fix semaphore on worker with many expired jobs 
v0.16.0b2 (2019-05-14)¶
- add support for different queues, #127 thanks @tsutsarin 
v0.16.0b1 (2019-04-23)¶
- use dicts for pickling not tuples, better handling of pickling errors, #123 
v0.16.0a5 (2019-04-22)¶
- use - pipelinein- enqueue_job
- catch any error when pickling job result 
- add support for python 3.6 
v0.16.0a4 (2019-03-15)¶
- add - Worker.run_check, fix #115
v0.16.0a3 (2019-03-12)¶
- fix - Workerwith custom redis settings
v0.16.0a2 (2019-03-06)¶
- add - job_tryargument to- enqueue_job, #113
- adding - --watchmode to the worker (requires- watchgod), #114
- allow - ctxwhen creating Worker
- add - all_job_resultsto- ArqRedis
- fix python path when starting worker 
v0.16.0a1 (2019-03-05)¶
- Breaking Change: COMPLETE REWRITE!!! see docs for details, #110 
v0.15.0 (2018-11-15)¶
- update dependencies 
- reconfigure - Job, return a job instance when enqueuing tasks #93
- tweaks to docs #106 
v0.14.0 (2018-05-28)¶
- package updates, particularly compatibility for - msgpack 0.5.6
v0.13.0 (2017-11-27)¶
- Breaking Change: integration with aioredis >= 1.0, basic usage hasn’t changed but look at aioredis’s migration docs for changes in redis API #76 
v0.12.0 (2017-11-16)¶
- better signal handling, support - uvloop#73
- drain pending tasks and drain task cancellation #74 
- add aiohttp and docker demo - /demo#75
v0.11.0 (2017-08-25)¶
- extract - create_pool_lenientfrom- RedixMixin
- improve redis connection traceback 
v0.10.4 (2017-08-22)¶
- RedisSettingsrepr method
- add - create_connection_timeoutto connection pool
v0.10.3 (2017-08-19)¶
- fix bug with - RedisMixin.get_redis_poolcreating multiple queues
- tweak drain logs 
v0.10.2 (2017-08-17)¶
- only save job on task in drain if re-enqueuing 
- add semaphore timeout to drains 
- add key count to - log_redis_info
v0.10.1 (2017-08-16)¶
- correct format of - log_redis_info
v0.10.0 (2017-08-16)¶
- log redis version when starting worker, fix #64 
- log “connection success” when connecting to redis after connection failures, fix #67 
- add job ids, for now they’re just used in logging, fix #53 
v0.9.0 (2017-06-23)¶
- allow set encoding in msgpack for jobs #49 
- cron tasks allowing scheduling of functions in the future #50 
- Breaking change: switch - to_unix_msto just return the timestamp int, add- to_unix_ms_tzto return tz offset too
v0.8.1 (2017-06-05)¶
- uprev setup requires 
- correct setup arguments 
v0.8.0 (2017-06-05)¶
- add - async-timeoutdependency
- use async-timeout around - shadow_factory
- change logger name for control process log messages 
- use - Semaphorerather than- asyncio.wait(...return_when=asyncio.FIRST_COMPLETED)for improved performance
- improve log display 
- add timeout and retry logic to - RedisMixin.create_redis_pool
v0.7.0 (2017-06-01)¶
- implementing reusable - Drainwhich takes tasks from a redis list and allows them to be execute asynchronously.
- Drain uses python 3.6 - async yield, therefore python 3.5 is no longer supported.
- prevent repeated identical health check log messages 
v0.6.1 (2017-05-06)¶
- mypy at last passing, #30 
- adding trove classifiers, #29 
v0.6.0 (2017-04-14)¶
- add - StopJobexception for cleaning ending jobs, #21
- add - flushdbto- MockRedis, #23
- allow configurable length job logging via - log_curtailon- Worker, #28
v0.5.2 (2017-02-25)¶
- add - shadow_kwargsmethod to- BaseWorkerto make customising actors easier.
v0.5.1 (2017-02-25)¶
- reimplement worker reuse as it turned out to be useful in tests. 
v0.5.0 (2017-02-20)¶
- use - gatherrather than- waitfor startup and shutdown so exceptions propagate.
- add - --checkoption to confirm arq worker is running.
v0.4.1 (2017-02-11)¶
- fix issue with - Concurrentclass binding with multiple actor instances.
v0.4.0 (2017-02-10)¶
- improving naming of log handlers and formatters 
- upgrade numerous packages, nothing significant 
- add - startupand- shutdownmethods to actors
- switch - @concurrentto return a- Concurrentinstance so the direct method is accessible via- <func>.direct
v0.3.2 (2017-01-24)¶
- improved solution for preventing new jobs starting when the worker is about to stop 
- switch - SIGRTMIN>- SIGUSR1to work with mac
v0.3.1 (2017-01-20)¶
- fix main process signal handling so the worker shuts down when just the main process receives a signal 
- re-enqueue un-started jobs popped from the queue if the worker is about to exit 
v0.3.0 (2017-01-19)¶
- rename settings class to - RedisSettingsand simplify significantly
v0.2.0 (2016-12-09)¶
- add - concurrency_enabledargument to aid in testing
- fix conflict with unitest.mock 
v0.1.0 (2016-12-06)¶
- prevent logs disabling other logs 
v0.0.6 (2016-08-14)¶
- first proper release