arq

pypi license

Current Version: v0.15

Job queues and RPC in python with asyncio, redis and msgpack.

arq was conceived as a simple, modern and performant successor to rq.

Why use arq?

non-blocking
arq is built using python 3’s asyncio allowing non-blocking job enqueuing and execution. Multiple jobs (potentially hundreds) can be run simultaneously using a pool of asyncio Tasks.
pre-forked
The worker starts two processes and uses the subprocess to execute all jobs, there’s no overhead in forking a process for each job.
fast
Asyncio, pre-forking and use of msgpack for job encoding make arq around 7x faster (see benchmarks) than rq for short jobs with no io. With io that might increase to around 40x faster. (TODO)
elegant
arq uses a novel approach to variable scope with the @concurrent decorator being applied to bound methods of Actor classes which hold the connection pool. This works well with aiohttp, allows for easier testing and avoids extended head scratching over how variables like connections are defined (is this attached to the request? or thread local? or truly global? where am I, hell, what does global even mean?).
small
and easy to reason with - currently arq is only about 700 lines, that won’t change significantly.

Dependencies

Required before pip install:

  • Python 3.6.0+ asyncio is used throughout with new style async/await syntax and async yield.
  • Redis Redis lists are used to communication between the front end and worker, redis can also be used to store job results.

Installed as dependencies by pip:

  • msgpack is used for its simplicity and performance to encode and decode job information.
  • aioredis is used as the non-block asyncio interface to redis.
  • click is used for the CLI interface “arq”.

Install

Just:

pip install arq

Terminology

The old computer science proverb/joke goes:

There are only two challenges in computer science: cache invalidation, naming things and the n + 1 problem.

arq tries to avoid confusion over what’s named what by using generally accepted terminology as much as possible, however a few terms (like “actors” and “shadows”) are not so standard and bear describing:

An Actor is a class with some concurrent methods, you can define and use multiple actors. Actors hold a reference to a redis pool for enqueuing jobs and are generally singletons.

The Worker is the class which is responsible for running jobs for one or more actors. Workers should inherit from BaseWorker, your application will generally have just one worker.

Actors are therefore used in two distinctly different modes:

  • default mode where you initialise, then use and abuse the actor including calling concurrent methods and thereby enqueuing jobs
  • shadow mode where the actor was initialised by the worker in order to perform jobs enqueued by the actor in default (or even shadow) mode.

It’s possible to check what mode an actor is in by checking the is_shadow variable.

Usage

Usage is best described by example.

Simple Usage

import asyncio
from aiohttp import ClientSession
from arq import Actor, BaseWorker, concurrent


class Downloader(Actor):
    async def startup(self):
        self.session = ClientSession(loop=self.loop)

    @concurrent
    async def download_content(self, url):
        async with self.session.get(url) as response:
            content = await response.read()
            print(f'{url}: {content.decode():.80}...')
        return len(content)

    async def shutdown(self):
        self.session.close()


class Worker(BaseWorker):
    shadows = [Downloader]


async def download_lots():
    d = Downloader()
    for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'):
        await d.download_content(url)
    await d.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(download_lots())

(This script is complete, it should run “as is” both to enqueue jobs and run them)

To enqueue the jobs, simply run the script:

python demo.py

To execute the jobs, either after running demo.py or before/during:

arq demo.py

For details on the arq CLI:

arq --help

Startup & Shutdown coroutines

The startup and shutdown coroutines are provided as a convenient way to run logic as actors start and finish, however it’s important to not that these methods are not called by default when actors are initialised or closed. They are however called when the actor was started and closed on the worker, eg. in “shadow” mode, see above. In other words: if you need these coroutines to be called when using an actor in your code; that’s your responsibility.

For example, in the above example there’s no need for self.session when using the actor in “default” mode, eg. called with python demo.py, so neither startup or shutdown are called.

Usage with aiohttp

Assuming you have Downloader already defined as per above.

from aiohttp import web


async def start_job(request):
    data = await request.post()
    # this will enqueue the download_content job
    await request.app['downloader'].download_content(data['url'])
    raise web.HTTPFound(location='/wherever/')


async def shutdown(app):
    await app['downloader'].close()


def create_app():
    app = web.Application()
    ...
    app.router.add_post('/start-job/', start_job)
    app['downloader'] = Downloader()
    # use aiohttp's on_shutdown trigger to close downloader
    app.on_shutdown.append(shutdown)
    return app


if __name__ == '__main__':
    app = create_app()
    web.run_app(app, port=8000)

(Won’t run as Downloader is not defined)

For a full example arq usage with aiohttp and docker see the demo app.

Health checks

arq will automatically record some info about it’s current state in redis every health_check_interval seconds, see arq.worker.BaseWorker.health_check_interval. That key/value will expire after health_check_interval + 1 seconds so you can be sure if the variable exists arq is alive and kicking (technically you can be sure it was alive and kicking health_check_interval seconds ago).

You can run a health check with the CLI (assuming you’re using the above example):

arq --check demo.py

The command will output the value of the health check if found; then exit 0 if the key was found and 1 if it was not.

A health check value takes the following form:

Feb-20_11:02:40 j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=0 q_dft=0 q_low=0

Where the items have the following meaning:

  • j_complete the number of jobs completed
  • j_failed the number of jobs which have failed eg. raised an exception
  • j_timedout the number of jobs which have timed out, eg. exceeded arq.worker.BaseWorker.timeout_seconds and been cancelled
  • j_ongoing the number of jobs currently being performed
  • q_* the number of pending jobs in each queue

Cron Jobs

Functions can be scheduled to be run periodically at specific times

from arq import Actor, cron


class FooBar(Actor):

    @cron(hour={9, 12, 18}, minute=12)
    async def foo(self):
        print('run foo job at 9.12am, 12.12pm and 6.12pm')

See arq.main.cron() for details on the available arguments and how how cron works.

Usage roughly shadows cron except None is equivalent on * in crontab. As per the example sets can be used to run at multiple of the given unit.

Note that second defaults to 0 so you don’t in inadvertently run jobs every second and microsecond defaults to 123456 so you don’t inadvertently run jobs every microsecond and so arq avoids enqueuing jobs at the top of a second when the world is generally slightly busier.

Multiple Queues

Functions can be assigned to different queues, by default arq defines three queues: HIGH_QUEUE, DEFAULT_QUEUE and LOW_QUEUE which are prioritised by the worker in that order.

from arq import Actor, concurrent


class RegistrationEmail(Actor):
    @concurrent
    async def email_standard_user(self, user_id):
        send_user_email(user_id)

    @concurrent(Actor.HIGH_QUEUE)
    async def email_premium_user(self, user_id):
        send_user_email(user_id)

(Just a snippet, won’t run “as is”)

Direct Enqueuing

Functions can we enqueued directly whether or no they’re decorated with @concurrent.

from arq import Actor


class FooBar(Actor):
    async def foo(self, a, b, c):
        print(a + b + c)


async def main():
    foobar = FooBar()
    await foobar.enqueue_job('foo', 1, 2, c=48, queue=Actor.LOW_QUEUE)
    await foobar.enqueue_job('foo', 1, 2, c=48)  # this will be queued in DEFAULT_QUEUE
    await foobar.close()

(This script is almost complete except for loop.run_until_complete(main()) as above to run main, you would also need to define a worker to run the jobs)

See arq.main.Actor.enqueue_job() for more details.

Worker Customisation

Workers can be customised in numerous ways, this is preferred to command line arguments as it’s easier to document and record.

from arq import BaseWorker


class Worker(BaseWorker):
    # execute jobs from both Downloader and FooBar above
    shadows = [Downloader, FooBar]

    # allow lots and lots of jobs to run simultaniously, default 50
    max_concurrent_tasks = 500

    # force the worker to close quickly after a termination signal is received, default 6
    shutdown_delay = 2

    # jobs may not take more than 10 seconds, default 60
    timeout_seconds = 10

    # number of seconds between health checks, default 60
    health_check_interval = 30

    def logging_config(self, verbose):
        conf = super().logging_config(verbose)
        # alter logging setup to set arq.jobs level to WARNING
        conf['loggers']['arq.jobs']['level'] = 'WARNING'
        return conf

(This script is more-or-less complete, provided Downloader and FooBar are defined and imported it should run “as is”)

See arq.worker.BaseWorker() for more customisation options.

For more information on logging see arq.logs.default_log_config().

API Reference

main

Defines the main Actor class and @concurrent decorator for using arq from within your code.

Also defines the @cron decorator for declaring cron job functions.

class arq.main.Actor(*args, worker=None, concurrency_enabled=True, **kwargs)[source]

All classes which wish to use arq should inherit from Actor.

Actor defines three default queues: HIGH_QUEUE, DEFAULT_QUEUE and LOW_QUEUE which are processed in that order of priority by the worker.

Actors operate in two modes: normal mode when you initialise them and use them, and “shadow mode” where the actor is initialised by the worker and used to execute jobs.

Parameters:
  • worker – reference to the worker which is managing this actor in shadow mode
  • concurrency_enabledFor testing only if set to False methods are called directly not queued
  • kwargs – other keyword arguments, see arq.utils.RedisMixin for all available options
HIGH_QUEUE = 'high'

highest priority queue, this can be overwritten by changing arq.main.Actor.queues

DEFAULT_QUEUE = 'dft'

default queue, this is a special value as it’s used in arq.main.Actor.enqueue_job()

LOW_QUEUE = 'low'

lowest priority queue, can be overwritten

QUEUE_PREFIX = b'arq:q:'

prefix prepended to all queue names to create the list keys in redis

job_class

alias of arq.jobs.Job

re_enqueue_jobs = False

Whether or not to re-queue jobs if the worker quits before the job has time to finish.

queues = ('high', 'dft', 'low')

queues the actor can enqueue jobs in, order is important, the first queue is highest priority

name = None

if not None this name is used instead of the class name when encoding and referencing jobs, if None the class’s name is used

close(shutdown=False)[source]

Close down the actor, eg. close the associated redis pool, optionally also calling shutdown.

Parameters:shutdown – whether or not to also call the shutdown coroutine, you probably only want to set this to True it you called startup previously
enqueue_job(func_name: str, *args, queue: str = None, **kwargs) → arq.jobs.Job[source]

Enqueue a job by pushing the encoded job information into the redis list specified by the queue.

Alternatively if concurrency is disabled the job will be encoded, then decoded and ran. Disabled concurrency (set via disable_concurrency init keyword argument) is designed for use in testing, hence the job is encoded then decoded to keep tests as close as possible to production.

Parameters:
  • func_name – name of the function executing the job
  • args – arguments to pass to the function
  • queue – name of the queue to use, if None DEFAULT_QUEUE is used.
  • kwargs – key word arguments to pass to the function
shutdown()[source]

Override to gracefully close or delete any objects you setup in startup

startup()[source]

Override to setup objects you’ll need while running the worker, eg. sessions and database connections

arq.main.concurrent(func_or_queue)[source]

Decorator which defines a functions as concurrent, eg. it should be executed on the worker.

If you wish to call the function directly you can access the original function at <func>.direct.

The decorator can optionally be used with one argument: the queue to use by default for the job.

arq.main.cron(*, month: Union[None, set, int] = None, day: Union[None, set, int] = None, weekday: Union[None, set, int, str] = None, hour: Union[None, set, int] = None, minute: Union[None, set, int] = None, second: Union[None, set, int] = 0, microsecond: int = 123456, dft_queue=None, run_at_startup=False, unique=True)[source]

Decorator which defines a functions as a cron job, eg. it should be executed at specific times.

Workers will enqueue this job at or just after the set times. If unique is true (the default) the job will only be enqueued once even if multiple workers are running.

If you wish to call the function directly you can access the original function at <func>.direct.

Parameters:
  • month – month(s) to run the job on, 1 - 12
  • day – day(s) to run the job on, 1 - 31
  • weekday – week day(s) to run the job on, 0 - 6 or mon - sun
  • hour – hour(s) to run the job on, 0 - 23
  • minute – minute(s) to run the job on, 0 - 59
  • second – second(s) to run the job on, 0 - 59
  • microsecond – microsecond(s) to run the job on, defaults to 123456 as the world is busier at the top of a second, 0 - 1e6
  • dft_queue – default queue to use
  • run_at_startup – whether to run as worker starts
  • unique – whether the job should be only be executed once at each time

worker

Responsible for executing jobs on the worker.

exception arq.worker.StopJob(reason: str = '', warning: bool = False)[source]
class arq.worker.BaseWorker(*, burst: bool = False, shadows: list = None, queues: list = None, timeout_seconds: int = None, **kwargs)[source]

Base class for Workers to inherit from.

Parameters:
  • burst – if true the worker will close as soon as no new jobs are found in the queue lists
  • shadows – list of arq.main.Actor classes for the worker to run, overrides shadows already defined in the class definition
  • queues – list of queue names for the worker to listen on, if None queues is taken from the shadows
  • timeout_seconds – maximum duration of a job, after that the job will be cancelled by the event loop
  • re_queue – Whether or not to re-queue jobs if the worker quits before the job has time to finish.
  • kwargs – other keyword arguments, see arq.utils.RedisMixin for all available options
max_concurrent_tasks = 50

maximum number of jobs which can be execute at the same time by the event loop

shutdown_delay = 6

number of seconds after a termination signal (SIGINT or SIGTERM) is received to force quit the worker

health_check_interval = 60

number of seconds between calls to health_check

repeat_health_check_logs = False

Whether or not to skip log messages from health checks where nothing has changed

drain_class

alias of arq.drain.Drain

timeout_seconds = 60

default maximum duration of a job

classmethod logging_config(verbose) → dict[source]

Override to customise the logging setup for the arq worker. By default just uses arq.logs.default_log_config().

Parameters:verbose – verbose flag from cli, by default log level is INFO if false and DEBUG if true
Returns:dict suitable for logging.config.dictConfig
shadows = None

shadow classes, a list of Actor classes for the Worker to run

classmethod check_health(**kwargs)[source]

Run a health check on the worker return the appropriate exit code.

Returns:0 if successful, 1 if not
close()[source]

Close the pool and wait for all connections to close.

run(log_redis_version=False)[source]

Main entry point for the the worker which initialises shadows, checks they look ok then runs work to perform jobs.

shadow_factory() → list[source]

Initialise list of shadows and return them.

Override to customise the way shadows are initialised.

shadow_kwargs()[source]

Prepare the keyword arguments for initialising all shadows.

Override to customise the kwargs used to initialise shadows.

work()[source]

Pop job definitions from the lists associated with the queues and perform the jobs.

Also regularly runs record_health.

arq.worker.import_string(file_path, attr_name)[source]

Import attribute/class from from a python module. Raise ImportError if the import failed. Approximately stolen from django.

Parameters:
  • file_path – path to python module
  • attr_name – attribute to get from module
Returns:

attribute

class arq.worker.RunWorkerProcess(worker_path, worker_class, burst=False)[source]

Responsible for starting a process to run the worker, monitoring it and possibly killing it.

drain

Drain class used by arq.worker.BaseWorker and reusable elsewhere.

class arq.drain.Drain(*, redis: aioredis.commands.Redis, max_concurrent_tasks: int = 50, shutdown_delay: float = 6, timeout_seconds: int = 60, burst_mode: bool = True, raise_task_exception: bool = False, semaphore_timeout: float = 60)[source]

Drains popping jobs from redis lists and managing a set of tasks with a limited size to execute those jobs.

Parameters:
  • redis – redis pool to get connection from to pop items from list, also used to optionally re-enqueue pending jobs on termination
  • max_concurrent_tasks – maximum number of jobs which can be execute at the same time by the event loop
  • shutdown_delay – number of seconds to wait for tasks to finish
  • timeout_seconds – maximum duration of a job, after that the job will be cancelled by the event loop
  • burst_mode – break the iter loop as soon as no more jobs are available by adding an sentinel quit queue
  • raise_task_exception – whether or not to raise an exception which occurs in a processed task
add(coro, job, re_enqueue=False)[source]

Start job and add it to the pending tasks set. :param coro: coroutine to execute the job :param job: job object, instance of arq.jobs.Job or similar :param re_enqueue: whether or not to re-enqueue the job on finish if the job won’t finish in time.

finish(timeout=None)[source]

Cancel all pending tasks and optionally re-enqueue jobs which haven’t finished after the timeout.

Parameters:timeout – how long to wait for tasks to finish, defaults to shutdown_delay
iter(*raw_queues, pop_timeout=1)[source]

blpop jobs from redis queues and yield them. Waits for the number of tasks to drop below max_concurrent_tasks before popping.

Parameters:
  • raw_queues – tuple of bytes defining queue(s) to pop from.
  • pop_timeout – how long to wait on each blpop before yielding None.
Yields:

tuple (raw_queue_name, raw_data) or (None, None) if all jobs are empty

jobs

Defines the Job class and descendants which deal with encoding and decoding job data.

class arq.jobs.Job(*, class_name: str, func_name: str, args: tuple, kwargs: dict, job_id: str = None, queue: str = None, raw_queue: str = None, queued_at: int = 0, raw_data: bytes = None)[source]

Main Job class responsible for encoding and decoding jobs as they go into and come out of redis.

Create a new job instance.

Parameters:
  • class_name – name (see arq.main.Actor.name) of the actor class where the job is defined
  • func_name – name of the function be called
  • args – arguments to pass to the function
  • kwargs – key word arguments to pass to the function
  • job_id – id to use for the job, leave blank to generate a uuid
classmethod decode(raw_data: bytes, *, queue_name: str = None, raw_queue: str = None) → arq.jobs.Job[source]

Create a job instance by decoding a job definition eg. from redis.

Parameters:
  • raw_data – data to decode, as created by arq.jobs.Job.encode()
  • queue_name – name of the queue the job was dequeued from
  • raw_queue – raw name of the queue the job was taken from
encode()[source]

Create a byte string suitable for pushing into redis which contains all required information about a job to be performed.

Parameters:
  • job_id – id to use for the job, leave blank to generate a uuid
  • queued_at – time in ms unix time when the job was queue, if None now is used
  • class_name – name (see arq.main.Actor.name) of the actor class where the job is defined
  • func_name – name of the function be called
  • args – arguments to pass to the function
  • kwargs – key word arguments to pass to the function
classmethod msgpack_encoder(obj)[source]

The default msgpack encoder, adds support for encoding sets.

class arq.jobs.DatetimeJob(*, class_name: str, func_name: str, args: tuple, kwargs: dict, job_id: str = None, queue: str = None, raw_queue: str = None, queued_at: int = 0, raw_data: bytes = None)[source]

Alternative Job which copes with datetimes. None timezone naïve dates are supported but the returned datetimes will use a datetime.timezone class to define the timezone regardless of the timezone class originally used on the datetime object (eg. pytz).

Create a new job instance.

Parameters:
  • class_name – name (see arq.main.Actor.name) of the actor class where the job is defined
  • func_name – name of the function be called
  • args – arguments to pass to the function
  • kwargs – key word arguments to pass to the function
  • job_id – id to use for the job, leave blank to generate a uuid
classmethod msgpack_encoder(obj)[source]

The default msgpack encoder, adds support for encoding sets.

logs

class arq.logs.ColourHandler(stream=None)[source]

Coloured log handler. Levels: debug: white, info: green, warning: yellow, else: red.

Date times (anything before the first colon) is magenta.

Initialize the handler.

If stream is not specified, sys.stderr is used.

emit(record)[source]

Emit a record.

If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an ‘encoding’ attribute, it is used to determine how to do the output to the stream.

arq.logs.default_log_config(verbose: bool) → dict[source]

Setup default config. for dictConfig.

Parameters:verbose – level: DEBUG if True, INFO if False
Returns:dict suitable for logging.config.dictConfig

utils

Utilises for running arq used by other modules.

class arq.utils.RedisSettings(host='localhost', port=6379, database=0, password=None, conn_timeout=1, conn_retries=5, conn_retry_delay=1)[source]

No-Op class used to hold redis connection redis_settings.

Parameters:
  • host – redis host
  • port – redis port
  • database – redis database id
  • password – password for redis connection
class arq.utils.RedisMixin(*, loop: asyncio.events.AbstractEventLoop = None, redis_settings: arq.utils.RedisSettings = None, existing_redis: aioredis.commands.Redis = None)[source]

Mixin used to fined a redis pool and access it.

Parameters:
  • loop – asyncio loop to use for the redis pool
  • redis_settings – connection settings to use for the pool
  • existing_redis – existing pool, if set no new pool is created, instead this one is used
close()[source]

Close the pool and wait for all connections to close.

get_redis() → aioredis.commands.Redis[source]

Get the redis pool, if a pool is already initialised it’s returned, else one is crated.

arq.utils.create_tz(utcoffset=0) → datetime.timezone[source]

Create a python datetime.timezone with a given utc offset.

Parameters:utcoffset – utc offset in seconds, if 0 timezone.utc is returned.
arq.utils.timestamp() → float[source]

This should be exactly the same as time.time(), we use this approach for consistency with other methods and possibly greater accuracy. :return: now in unix time, eg. seconds since 1970

arq.utils.timestamp() → float[source]

This should be exactly the same as time.time(), we use this approach for consistency with other methods and possibly greater accuracy. :return: now in unix time, eg. seconds since 1970

arq.utils.to_unix_ms_tz(dt: datetime.datetime) → Tuple[int, Optional[int]][source]

convert a datetime to number of milliseconds since 1970 and calculate timezone offset :param dt: datetime to evaluate :return: tuple - (unix time in milliseconds, utc offset in seconds)

arq.utils.to_unix_ms(dt: datetime.datetime) → int[source]

convert a datetime to number of milliseconds since 1970 :param dt: datetime to evaluate :return: unix time in milliseconds

arq.utils.from_unix_ms(ms: int, utcoffset: int = None) → datetime.datetime[source]

convert int to a datetime.

Parameters:
  • ms – number of milliseconds since 1970
  • utcoffset – if set a timezone i added to the datime based on the offset in seconds.
Returns:

datetime - including timezone if utcoffset is not None, else timezone naïve

arq.utils.gen_random(length: int = 20) → bytes[source]

Create a random string.

Parameters:length – length of string to created, default 20
arq.utils.truncate(s: str, length: int = 80) → str[source]

Truncate a string and add an ellipsis (three dots) to the end if it was too long

Parameters:
  • s – string to possibly truncate
  • length – length to truncate the string to

testing

Utils for testing arq.

See arq’s own tests for examples of usage.

class arq.testing.RaiseWorker(*, burst: bool = False, shadows: list = None, queues: list = None, timeout_seconds: int = None, **kwargs)[source]

Worker which raises exceptions rather than logging them. Useful for testing.

Parameters:
  • burst – if true the worker will close as soon as no new jobs are found in the queue lists
  • shadows – list of arq.main.Actor classes for the worker to run, overrides shadows already defined in the class definition
  • queues – list of queue names for the worker to listen on, if None queues is taken from the shadows
  • timeout_seconds – maximum duration of a job, after that the job will be cancelled by the event loop
  • re_queue – Whether or not to re-queue jobs if the worker quits before the job has time to finish.
  • kwargs – other keyword arguments, see arq.utils.RedisMixin for all available options
class arq.testing.MockRedis(*, loop=None, data=None)[source]

Very simple mock of aioredis > Redis which allows jobs to be enqueued and executed without redis.

class arq.testing.MockRedisMixin(*, loop: asyncio.events.AbstractEventLoop = None, redis_settings: arq.utils.RedisSettings = None, existing_redis: aioredis.commands.Redis = None)[source]

Dependent of RedisMixin which uses MockRedis rather than real redis to enqueue jobs.

Parameters:
  • loop – asyncio loop to use for the redis pool
  • redis_settings – connection settings to use for the pool
  • existing_redis – existing pool, if set no new pool is created, instead this one is used
class arq.testing.MockRedisWorker(*, burst: bool = False, shadows: list = None, queues: list = None, timeout_seconds: int = None, **kwargs)[source]

Dependent of Base Worker which executes jobs from MockRedis rather than real redis.

Parameters:
  • burst – if true the worker will close as soon as no new jobs are found in the queue lists
  • shadows – list of arq.main.Actor classes for the worker to run, overrides shadows already defined in the class definition
  • queues – list of queue names for the worker to listen on, if None queues is taken from the shadows
  • timeout_seconds – maximum duration of a job, after that the job will be cancelled by the event loop
  • re_queue – Whether or not to re-queue jobs if the worker quits before the job has time to finish.
  • kwargs – other keyword arguments, see arq.utils.RedisMixin for all available options

History

v0.15.0 (2018-11-15)

  • update dependencies
  • reconfigure Job, return a job instance when enqueuing tasks #93
  • tweaks to docs #106

v0.14.0 (2018-05-28)

  • package updates, particularly compatibility for msgpack 0.5.6

v0.13.0 (2017-11-27)

  • Braking Change: integration with aioredis >= 1.0, basic usage hasn’t changed but look at aioredis’s migration docs for changes in redis API #76

v0.12.0 (2017-11-16)

  • better signal handling, support uvloop #73
  • drain pending tasks and drain task cancellation #74
  • add aiohttp and docker demo /demo #75

v0.11.0 (2017-08-25)

  • extract create_pool_lenient from RedixMixin
  • improve redis connection traceback

v0.10.4 (2017-08-22)

  • RedisSettings repr method
  • add create_connection_timeout to connection pool

v0.10.3 (2017-08-19)

  • fix bug with RedisMixin.get_redis_pool creating multiple queues
  • tweak drain logs

v0.10.2 (2017-08-17)

  • only save job on task in drain if re-enqueuing
  • add semaphore timeout to drains
  • add key count to log_redis_info

v0.10.1 (2017-08-16)

  • correct format of log_redis_info

v0.10.0 (2017-08-16)

  • log redis version when starting worker, fix #64
  • log “connection success” when connecting to redis after connection failures, fix #67
  • add job ids, for now they’re just used in logging, fix #53

v0.9.0 (2017-06-23)

  • allow set encoding in msgpack for jobs #49
  • cron tasks allowing scheduling of functions in the future #50
  • Breaking change: switch to_unix_ms to just return the timestamp int, add to_unix_ms_tz to return tz offset too

v0.8.1 (2017-06-05)

  • uprev setup requires
  • correct setup arguments

v0.8.0 (2017-06-05)

  • add async-timeout dependency
  • use async-timeout around shadow_factory
  • change logger name for control process log messages
  • use Semaphore rather than asyncio.wait(...return_when=asyncio.FIRST_COMPLETED) for improved performance
  • improve log display
  • add timeout and retry logic to RedisMixin.create_redis_pool

v0.7.0 (2017-06-01)

  • implementing reusable Drain which takes tasks from a redis list and allows them to be execute asynchronously.
  • Drain uses python 3.6 async yield, therefore python 3.5 is no longer supported.
  • prevent repeated identical health check log messages

v0.6.1 (2017-05-06)

  • mypy at last passing, #30
  • adding trove classifiers, #29

v0.6.0 (2017-04-14)

  • add StopJob exception for cleaning ending jobs, #21
  • add flushdb to MockRedis, #23
  • allow configurable length job logging via log_curtail on Worker, #28

v0.5.2 (2017-02-25)

  • add shadow_kwargs method to BaseWorker to make customising actors easier.

v0.5.1 (2017-02-25)

  • reimplement worker reuse as it turned out to be useful in tests.

v0.5.0 (2017-02-20)

  • use gather rather than wait for startup and shutdown so exceptions propagate.
  • add --check option to confirm arq worker is running.

v0.4.1 (2017-02-11)

  • fix issue with Concurrent class binding with multiple actor instances.

v0.4.0 (2017-02-10)

  • improving naming of log handlers and formatters
  • upgrade numerous packages, nothing significant
  • add startup and shutdown methods to actors
  • switch @concurrent to return a Concurrent instance so the direct method is accessible via <func>.direct

v0.3.2 (2017-01-24)

  • improved solution for preventing new jobs starting when the worker is about to stop
  • switch SIGRTMIN > SIGUSR1 to work with mac

v0.3.1 (2017-01-20)

  • fix main process signal handling so the worker shuts down when just the main process receives a signal
  • re-enqueue un-started jobs popped from the queue if the worker is about to exit

v0.3.0 (2017-01-19)

  • rename settings class to RedisSettings and simplify significantly

v0.2.0 (2016-12-09)

  • add concurrency_enabled argument to aid in testing
  • fix conflict with unitest.mock

v0.1.0 (2016-12-06)

  • prevent logs disabling other logs

v0.0.6 (2016-08-14)

  • first proper release

Indices and tables