arq¶
Current Version: v0.15
Job queues and RPC in python with asyncio, redis and msgpack.
arq was conceived as a simple, modern and performant successor to rq.
Why use arq?
- non-blocking
- arq is built using python 3’s asyncio allowing
non-blocking job enqueuing and execution. Multiple jobs (potentially hundreds) can be run simultaneously
using a pool of asyncio Tasks.
- pre-forked
- The worker starts two processes and uses the subprocess to execute all jobs, there’s no overhead in forking a process for each job.
- fast
- Asyncio, pre-forking and use of msgpack for job encoding make arq around 7x faster (see benchmarks) than rq for short jobs with no io. With io that might increase to around 40x faster. (TODO)
- elegant
- arq uses a novel approach to variable scope with the @concurrentdecorator being applied to bound methods ofActorclasses which hold the connection pool. This works well with aiohttp, allows for easier testing and avoids extended head scratching over how variables like connections are defined (is this attached to the request? or thread local? or truly global? where am I, hell, what does global even mean?).
- small
- and easy to reason with - currently arq is only about 700 lines, that won’t change significantly.
Dependencies¶
Required before pip install:
- Python 3.6.0+ asyncio is used throughout with new style async/awaitsyntax andasync yield.
- Redis Redis lists are used to communication between the front end and worker, redis can also be used to store job results.
Installed as dependencies by pip:
Terminology¶
The old computer science proverb/joke goes:
There are only two challenges in computer science: cache invalidation, naming things and the n + 1 problem.
arq tries to avoid confusion over what’s named what by using generally accepted terminology as much as possible, however a few terms (like “actors” and “shadows”) are not so standard and bear describing:
An Actor is a class with some concurrent methods, you can define and use multiple actors. Actors hold a reference to a redis pool for enqueuing jobs and are generally singletons.
The Worker is the class which is responsible for running jobs for one or more actors. Workers should inherit
from BaseWorker, your application will generally have just one worker.
Actors are therefore used in two distinctly different modes:
- default mode where you initialise, then use and abuse the actor including calling concurrent methods and thereby enqueuing jobs
- shadow mode where the actor was initialised by the worker in order to perform jobs enqueued by the actor in default (or even shadow) mode.
It’s possible to check what mode an actor is in by checking the is_shadow variable.
Usage¶
Usage is best described by example.
Simple Usage¶
import asyncio
from aiohttp import ClientSession
from arq import Actor, BaseWorker, concurrent
class Downloader(Actor):
    async def startup(self):
        self.session = ClientSession(loop=self.loop)
    @concurrent
    async def download_content(self, url):
        async with self.session.get(url) as response:
            content = await response.read()
            print(f'{url}: {content.decode():.80}...')
        return len(content)
    async def shutdown(self):
        self.session.close()
class Worker(BaseWorker):
    shadows = [Downloader]
async def download_lots():
    d = Downloader()
    for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'):
        await d.download_content(url)
    await d.close()
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(download_lots())
(This script is complete, it should run “as is” both to enqueue jobs and run them)
To enqueue the jobs, simply run the script:
python demo.py
To execute the jobs, either after running demo.py or before/during:
arq demo.py
For details on the arq CLI:
arq --help
Startup & Shutdown coroutines¶
The startup and shutdown coroutines are provided as a convenient way to run logic as actors start and finish,
however it’s important to not that these methods are not called by default when actors are initialised or closed.
They are however called when the actor was started and closed on the worker, eg. in “shadow” mode, see above.
In other words: if you need these coroutines to be called when using an actor in your code; that’s your responsibility.
For example, in the above example there’s no need for self.session when using the actor in “default” mode,
eg. called with python demo.py, so neither startup or shutdown are called.
Usage with aiohttp¶
Assuming you have Downloader already defined as per above.
from aiohttp import web
async def start_job(request):
    data = await request.post()
    # this will enqueue the download_content job
    await request.app['downloader'].download_content(data['url'])
    raise web.HTTPFound(location='/wherever/')
async def shutdown(app):
    await app['downloader'].close()
def create_app():
    app = web.Application()
    ...
    app.router.add_post('/start-job/', start_job)
    app['downloader'] = Downloader()
    # use aiohttp's on_shutdown trigger to close downloader
    app.on_shutdown.append(shutdown)
    return app
if __name__ == '__main__':
    app = create_app()
    web.run_app(app, port=8000)
(Won’t run as Downloader is not defined)
For a full example arq usage with aiohttp and docker see the demo app.
Health checks¶
arq will automatically record some info about it’s current state in redis every health_check_interval seconds,
see arq.worker.BaseWorker.health_check_interval. That key/value will expire after health_check_interval + 1
seconds so you can be sure if the variable exists arq is alive and kicking (technically you can be sure it
was alive and kicking health_check_interval seconds ago).
You can run a health check with the CLI (assuming you’re using the above example):
arq --check demo.py
The command will output the value of the health check if found;
then exit 0 if the key was found and 1 if it was not.
A health check value takes the following form:
Feb-20_11:02:40 j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=0 q_dft=0 q_low=0
Where the items have the following meaning:
- j_completethe number of jobs completed
- j_failedthe number of jobs which have failed eg. raised an exception
- j_timedoutthe number of jobs which have timed out, eg. exceeded- arq.worker.BaseWorker.timeout_secondsand been cancelled
- j_ongoingthe number of jobs currently being performed
- q_*the number of pending jobs in each queue
Cron Jobs¶
Functions can be scheduled to be run periodically at specific times
from arq import Actor, cron
class FooBar(Actor):
    @cron(hour={9, 12, 18}, minute=12)
    async def foo(self):
        print('run foo job at 9.12am, 12.12pm and 6.12pm')
See arq.main.cron() for details on the available arguments and how how cron works.
Usage roughly shadows cron except None is equivalent on * in crontab.
As per the example sets can be used to run at multiple of the given unit.
Note that second defaults to 0 so you don’t in inadvertently run jobs every second and microsecond
defaults to 123456 so you don’t inadvertently run jobs every microsecond and so arq avoids enqueuing jobs
at the top of a second when the world is generally slightly busier.
Multiple Queues¶
Functions can be assigned to different queues, by default arq defines three queues:
HIGH_QUEUE, DEFAULT_QUEUE and LOW_QUEUE which are prioritised by the worker in that order.
from arq import Actor, concurrent
class RegistrationEmail(Actor):
    @concurrent
    async def email_standard_user(self, user_id):
        send_user_email(user_id)
    @concurrent(Actor.HIGH_QUEUE)
    async def email_premium_user(self, user_id):
        send_user_email(user_id)
(Just a snippet, won’t run “as is”)
Direct Enqueuing¶
Functions can we enqueued directly whether or no they’re decorated with @concurrent.
from arq import Actor
class FooBar(Actor):
    async def foo(self, a, b, c):
        print(a + b + c)
async def main():
    foobar = FooBar()
    await foobar.enqueue_job('foo', 1, 2, c=48, queue=Actor.LOW_QUEUE)
    await foobar.enqueue_job('foo', 1, 2, c=48)  # this will be queued in DEFAULT_QUEUE
    await foobar.close()
(This script is almost complete except for loop.run_until_complete(main()) as above to run main,
you would also need to define a worker to run the jobs)
See arq.main.Actor.enqueue_job() for more details.
Worker Customisation¶
Workers can be customised in numerous ways, this is preferred to command line arguments as it’s easier to document and record.
from arq import BaseWorker
class Worker(BaseWorker):
    # execute jobs from both Downloader and FooBar above
    shadows = [Downloader, FooBar]
    # allow lots and lots of jobs to run simultaniously, default 50
    max_concurrent_tasks = 500
    # force the worker to close quickly after a termination signal is received, default 6
    shutdown_delay = 2
    # jobs may not take more than 10 seconds, default 60
    timeout_seconds = 10
    # number of seconds between health checks, default 60
    health_check_interval = 30
    def logging_config(self, verbose):
        conf = super().logging_config(verbose)
        # alter logging setup to set arq.jobs level to WARNING
        conf['loggers']['arq.jobs']['level'] = 'WARNING'
        return conf
(This script is more-or-less complete,
provided Downloader and FooBar are defined and imported it should run “as is”)
See arq.worker.BaseWorker() for more customisation options.
For more information on logging see arq.logs.default_log_config().
API Reference¶
main¶
Defines the main Actor class and @concurrent decorator for using arq from within your code.
Also defines the @cron decorator for declaring cron job functions.
- 
class arq.main.Actor(*args, worker=None, concurrency_enabled=True, **kwargs)[source]¶
- All classes which wish to use arq should inherit from Actor. - Actor defines three default queues: HIGH_QUEUE, DEFAULT_QUEUE and LOW_QUEUE which are processed in that order of priority by the worker. - Actors operate in two modes: normal mode when you initialise them and use them, and “shadow mode” where the actor is initialised by the worker and used to execute jobs. - Parameters: - worker – reference to the worker which is managing this actor in shadow mode
- concurrency_enabled – For testing only if set to False methods are called directly not queued
- kwargs – other keyword arguments, see arq.utils.RedisMixinfor all available options
 - 
HIGH_QUEUE= 'high'¶
- highest priority queue, this can be overwritten by changing - arq.main.Actor.queues
 - 
DEFAULT_QUEUE= 'dft'¶
- default queue, this is a special value as it’s used in - arq.main.Actor.enqueue_job()
 - 
LOW_QUEUE= 'low'¶
- lowest priority queue, can be overwritten 
 - 
QUEUE_PREFIX= b'arq:q:'¶
- prefix prepended to all queue names to create the list keys in redis 
 - 
job_class¶
- alias of - arq.jobs.Job
 - 
re_enqueue_jobs= False¶
- Whether or not to re-queue jobs if the worker quits before the job has time to finish. 
 - 
queues= ('high', 'dft', 'low')¶
- queues the actor can enqueue jobs in, order is important, the first queue is highest priority 
 - 
name= None¶
- if not None this name is used instead of the class name when encoding and referencing jobs, if None the class’s name is used 
 - 
close(shutdown=False)[source]¶
- Close down the actor, eg. close the associated redis pool, optionally also calling shutdown. - Parameters: - shutdown – whether or not to also call the shutdown coroutine, you probably only want to set this to - Trueit you called startup previously
 - 
enqueue_job(func_name: str, *args, queue: str = None, **kwargs) → arq.jobs.Job[source]¶
- Enqueue a job by pushing the encoded job information into the redis list specified by the queue. - Alternatively if concurrency is disabled the job will be encoded, then decoded and ran. Disabled concurrency (set via - disable_concurrencyinit keyword argument) is designed for use in testing, hence the job is encoded then decoded to keep tests as close as possible to production.- Parameters: - func_name – name of the function executing the job
- args – arguments to pass to the function
- queue – name of the queue to use, if None DEFAULT_QUEUEis used.
- kwargs – key word arguments to pass to the function
 
 
- 
arq.main.concurrent(func_or_queue)[source]¶
- Decorator which defines a functions as concurrent, eg. it should be executed on the worker. - If you wish to call the function directly you can access the original function at - <func>.direct.- The decorator can optionally be used with one argument: the queue to use by default for the job. 
- 
arq.main.cron(*, month: Union[None, set, int] = None, day: Union[None, set, int] = None, weekday: Union[None, set, int, str] = None, hour: Union[None, set, int] = None, minute: Union[None, set, int] = None, second: Union[None, set, int] = 0, microsecond: int = 123456, dft_queue=None, run_at_startup=False, unique=True)[source]¶
- Decorator which defines a functions as a cron job, eg. it should be executed at specific times. - Workers will enqueue this job at or just after the set times. If - uniqueis true (the default) the job will only be enqueued once even if multiple workers are running.- If you wish to call the function directly you can access the original function at - <func>.direct.- Parameters: - month – month(s) to run the job on, 1 - 12
- day – day(s) to run the job on, 1 - 31
- weekday – week day(s) to run the job on, 0 - 6 or mon - sun
- hour – hour(s) to run the job on, 0 - 23
- minute – minute(s) to run the job on, 0 - 59
- second – second(s) to run the job on, 0 - 59
- microsecond – microsecond(s) to run the job on, defaults to 123456 as the world is busier at the top of a second, 0 - 1e6
- dft_queue – default queue to use
- run_at_startup – whether to run as worker starts
- unique – whether the job should be only be executed once at each time
 
worker¶
Responsible for executing jobs on the worker.
- 
class arq.worker.BaseWorker(*, burst: bool = False, shadows: list = None, queues: list = None, timeout_seconds: int = None, **kwargs)[source]¶
- Base class for Workers to inherit from. - Parameters: - burst – if true the worker will close as soon as no new jobs are found in the queue lists
- shadows – list of arq.main.Actorclasses for the worker to run, overrides shadows already defined in the class definition
- queues – list of queue names for the worker to listen on, if None queues is taken from the shadows
- timeout_seconds – maximum duration of a job, after that the job will be cancelled by the event loop
- re_queue – Whether or not to re-queue jobs if the worker quits before the job has time to finish.
- kwargs – other keyword arguments, see arq.utils.RedisMixinfor all available options
 - 
max_concurrent_tasks= 50¶
- maximum number of jobs which can be execute at the same time by the event loop 
 - 
shutdown_delay= 6¶
- number of seconds after a termination signal (SIGINT or SIGTERM) is received to force quit the worker 
 - 
health_check_interval= 60¶
- number of seconds between calls to health_check 
 - 
repeat_health_check_logs= False¶
- Whether or not to skip log messages from health checks where nothing has changed 
 - 
drain_class¶
- alias of - arq.drain.Drain
 - 
timeout_seconds= 60¶
- default maximum duration of a job 
 - 
classmethod logging_config(verbose) → dict[source]¶
- Override to customise the logging setup for the arq worker. By default just uses - arq.logs.default_log_config().- Parameters: - verbose – verbose flag from cli, by default log level is INFO if false and DEBUG if true - Returns: - dict suitable for - logging.config.dictConfig
 - 
shadows= None¶
- shadow classes, a list of Actor classes for the Worker to run 
 - 
classmethod check_health(**kwargs)[source]¶
- Run a health check on the worker return the appropriate exit code. - Returns: - 0 if successful, 1 if not 
 - 
run(log_redis_version=False)[source]¶
- Main entry point for the the worker which initialises shadows, checks they look ok then runs - workto perform jobs.
 - 
shadow_factory() → list[source]¶
- Initialise list of shadows and return them. - Override to customise the way shadows are initialised. 
 
drain¶
Drain class used by arq.worker.BaseWorker and reusable elsewhere.
- 
class arq.drain.Drain(*, redis: aioredis.commands.Redis, max_concurrent_tasks: int = 50, shutdown_delay: float = 6, timeout_seconds: int = 60, burst_mode: bool = True, raise_task_exception: bool = False, semaphore_timeout: float = 60)[source]¶
- Drains popping jobs from redis lists and managing a set of tasks with a limited size to execute those jobs. - Parameters: - redis – redis pool to get connection from to pop items from list, also used to optionally re-enqueue pending jobs on termination
- max_concurrent_tasks – maximum number of jobs which can be execute at the same time by the event loop
- shutdown_delay – number of seconds to wait for tasks to finish
- timeout_seconds – maximum duration of a job, after that the job will be cancelled by the event loop
- burst_mode – break the iter loop as soon as no more jobs are available by adding an sentinel quit queue
- raise_task_exception – whether or not to raise an exception which occurs in a processed task
 - 
add(coro, job, re_enqueue=False)[source]¶
- Start job and add it to the pending tasks set. :param coro: coroutine to execute the job :param job: job object, instance of - arq.jobs.Jobor similar :param re_enqueue: whether or not to re-enqueue the job on finish if the job won’t finish in time.
 - 
finish(timeout=None)[source]¶
- Cancel all pending tasks and optionally re-enqueue jobs which haven’t finished after the timeout. - Parameters: - timeout – how long to wait for tasks to finish, defaults to - shutdown_delay
 - 
iter(*raw_queues, pop_timeout=1)[source]¶
- blpop jobs from redis queues and yield them. Waits for the number of tasks to drop below max_concurrent_tasks before popping. - Parameters: - raw_queues – tuple of bytes defining queue(s) to pop from.
- pop_timeout – how long to wait on each blpop before yielding None.
 - Yields: - tuple - (raw_queue_name, raw_data)or- (None, None)if all jobs are empty
 
jobs¶
Defines the Job class and descendants which deal with encoding and decoding job data.
- 
class arq.jobs.Job(*, class_name: str, func_name: str, args: tuple, kwargs: dict, job_id: str = None, queue: str = None, raw_queue: str = None, queued_at: int = 0, raw_data: bytes = None)[source]¶
- Main Job class responsible for encoding and decoding jobs as they go into and come out of redis. - Create a new job instance. - Parameters: - class_name – name (see arq.main.Actor.name) of the actor class where the job is defined
- func_name – name of the function be called
- args – arguments to pass to the function
- kwargs – key word arguments to pass to the function
- job_id – id to use for the job, leave blank to generate a uuid
 - 
classmethod decode(raw_data: bytes, *, queue_name: str = None, raw_queue: str = None) → arq.jobs.Job[source]¶
- Create a job instance by decoding a job definition eg. from redis. - Parameters: - raw_data – data to decode, as created by arq.jobs.Job.encode()
- queue_name – name of the queue the job was dequeued from
- raw_queue – raw name of the queue the job was taken from
 
- raw_data – data to decode, as created by 
 - 
encode()[source]¶
- Create a byte string suitable for pushing into redis which contains all required information about a job to be performed. - Parameters: - job_id – id to use for the job, leave blank to generate a uuid
- queued_at – time in ms unix time when the job was queue, if None now is used
- class_name – name (see arq.main.Actor.name) of the actor class where the job is defined
- func_name – name of the function be called
- args – arguments to pass to the function
- kwargs – key word arguments to pass to the function
 
 
- class_name – name (see 
- 
class arq.jobs.DatetimeJob(*, class_name: str, func_name: str, args: tuple, kwargs: dict, job_id: str = None, queue: str = None, raw_queue: str = None, queued_at: int = 0, raw_data: bytes = None)[source]¶
- Alternative Job which copes with datetimes. None timezone naïve dates are supported but the returned datetimes will use a - datetime.timezoneclass to define the timezone regardless of the timezone class originally used on the datetime object (eg.- pytz).- Create a new job instance. - Parameters: - class_name – name (see arq.main.Actor.name) of the actor class where the job is defined
- func_name – name of the function be called
- args – arguments to pass to the function
- kwargs – key word arguments to pass to the function
- job_id – id to use for the job, leave blank to generate a uuid
 
- class_name – name (see 
logs¶
- 
class arq.logs.ColourHandler(stream=None)[source]¶
- Coloured log handler. Levels: debug: white, info: green, warning: yellow, else: red. - Date times (anything before the first colon) is magenta. - Initialize the handler. - If stream is not specified, sys.stderr is used. - 
emit(record)[source]¶
- Emit a record. - If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an ‘encoding’ attribute, it is used to determine how to do the output to the stream. 
 
- 
utils¶
Utilises for running arq used by other modules.
- 
class arq.utils.RedisSettings(host='localhost', port=6379, database=0, password=None, conn_timeout=1, conn_retries=5, conn_retry_delay=1)[source]¶
- No-Op class used to hold redis connection redis_settings. - Parameters: - host – redis host
- port – redis port
- database – redis database id
- password – password for redis connection
 
- 
class arq.utils.RedisMixin(*, loop: asyncio.events.AbstractEventLoop = None, redis_settings: arq.utils.RedisSettings = None, existing_redis: aioredis.commands.Redis = None)[source]¶
- Mixin used to fined a redis pool and access it. - Parameters: - loop – asyncio loop to use for the redis pool
- redis_settings – connection settings to use for the pool
- existing_redis – existing pool, if set no new pool is created, instead this one is used
 
- 
arq.utils.create_tz(utcoffset=0) → datetime.timezone[source]¶
- Create a python datetime.timezone with a given utc offset. - Parameters: - utcoffset – utc offset in seconds, if 0 timezone.utc is returned. 
- 
arq.utils.timestamp() → float[source]¶
- This should be exactly the same as time.time(), we use this approach for consistency with other methods and possibly greater accuracy. :return: now in unix time, eg. seconds since 1970 
- 
arq.utils.timestamp() → float[source]
- This should be exactly the same as time.time(), we use this approach for consistency with other methods and possibly greater accuracy. :return: now in unix time, eg. seconds since 1970 
- 
arq.utils.to_unix_ms_tz(dt: datetime.datetime) → Tuple[int, Optional[int]][source]¶
- convert a datetime to number of milliseconds since 1970 and calculate timezone offset :param dt: datetime to evaluate :return: tuple - (unix time in milliseconds, utc offset in seconds) 
- 
arq.utils.to_unix_ms(dt: datetime.datetime) → int[source]¶
- convert a datetime to number of milliseconds since 1970 :param dt: datetime to evaluate :return: unix time in milliseconds 
- 
arq.utils.from_unix_ms(ms: int, utcoffset: int = None) → datetime.datetime[source]¶
- convert int to a datetime. - Parameters: - ms – number of milliseconds since 1970
- utcoffset – if set a timezone i added to the datime based on the offset in seconds.
 - Returns: - datetime - including timezone if utcoffset is not None, else timezone naïve 
testing¶
Utils for testing arq.
See arq’s own tests for examples of usage.
- 
class arq.testing.RaiseWorker(*, burst: bool = False, shadows: list = None, queues: list = None, timeout_seconds: int = None, **kwargs)[source]¶
- Worker which raises exceptions rather than logging them. Useful for testing. - Parameters: - burst – if true the worker will close as soon as no new jobs are found in the queue lists
- shadows – list of arq.main.Actorclasses for the worker to run, overrides shadows already defined in the class definition
- queues – list of queue names for the worker to listen on, if None queues is taken from the shadows
- timeout_seconds – maximum duration of a job, after that the job will be cancelled by the event loop
- re_queue – Whether or not to re-queue jobs if the worker quits before the job has time to finish.
- kwargs – other keyword arguments, see arq.utils.RedisMixinfor all available options
 
- 
class arq.testing.MockRedis(*, loop=None, data=None)[source]¶
- Very simple mock of aioredis > Redis which allows jobs to be enqueued and executed without redis. 
- 
class arq.testing.MockRedisMixin(*, loop: asyncio.events.AbstractEventLoop = None, redis_settings: arq.utils.RedisSettings = None, existing_redis: aioredis.commands.Redis = None)[source]¶
- Dependent of RedisMixin which uses MockRedis rather than real redis to enqueue jobs. - Parameters: - loop – asyncio loop to use for the redis pool
- redis_settings – connection settings to use for the pool
- existing_redis – existing pool, if set no new pool is created, instead this one is used
 
- 
class arq.testing.MockRedisWorker(*, burst: bool = False, shadows: list = None, queues: list = None, timeout_seconds: int = None, **kwargs)[source]¶
- Dependent of Base Worker which executes jobs from MockRedis rather than real redis. - Parameters: - burst – if true the worker will close as soon as no new jobs are found in the queue lists
- shadows – list of arq.main.Actorclasses for the worker to run, overrides shadows already defined in the class definition
- queues – list of queue names for the worker to listen on, if None queues is taken from the shadows
- timeout_seconds – maximum duration of a job, after that the job will be cancelled by the event loop
- re_queue – Whether or not to re-queue jobs if the worker quits before the job has time to finish.
- kwargs – other keyword arguments, see arq.utils.RedisMixinfor all available options
 
History¶
v0.15.0 (2018-11-15)¶
- update dependencies
- reconfigure Job, return a job instance when enqueuing tasks #93
- tweaks to docs #106
v0.14.0 (2018-05-28)¶
- package updates, particularly compatibility for msgpack 0.5.6
v0.13.0 (2017-11-27)¶
- Braking Change: integration with aioredis >= 1.0, basic usage hasn’t changed but look at aioredis’s migration docs for changes in redis API #76
v0.12.0 (2017-11-16)¶
- better signal handling, support uvloop#73
- drain pending tasks and drain task cancellation #74
- add aiohttp and docker demo /demo#75
v0.11.0 (2017-08-25)¶
- extract create_pool_lenientfromRedixMixin
- improve redis connection traceback
v0.10.4 (2017-08-22)¶
- RedisSettingsrepr method
- add create_connection_timeoutto connection pool
v0.10.3 (2017-08-19)¶
- fix bug with RedisMixin.get_redis_poolcreating multiple queues
- tweak drain logs
v0.10.2 (2017-08-17)¶
- only save job on task in drain if re-enqueuing
- add semaphore timeout to drains
- add key count to log_redis_info
v0.10.1 (2017-08-16)¶
- correct format of log_redis_info
v0.10.0 (2017-08-16)¶
- log redis version when starting worker, fix #64
- log “connection success” when connecting to redis after connection failures, fix #67
- add job ids, for now they’re just used in logging, fix #53
v0.9.0 (2017-06-23)¶
- allow set encoding in msgpack for jobs #49
- cron tasks allowing scheduling of functions in the future #50
- Breaking change: switch to_unix_msto just return the timestamp int, addto_unix_ms_tzto return tz offset too
v0.8.1 (2017-06-05)¶
- uprev setup requires
- correct setup arguments
v0.8.0 (2017-06-05)¶
- add async-timeoutdependency
- use async-timeout around shadow_factory
- change logger name for control process log messages
- use Semaphorerather thanasyncio.wait(...return_when=asyncio.FIRST_COMPLETED)for improved performance
- improve log display
- add timeout and retry logic to RedisMixin.create_redis_pool
v0.7.0 (2017-06-01)¶
- implementing reusable Drainwhich takes tasks from a redis list and allows them to be execute asynchronously.
- Drain uses python 3.6 async yield, therefore python 3.5 is no longer supported.
- prevent repeated identical health check log messages
v0.6.1 (2017-05-06)¶
- mypy at last passing, #30
- adding trove classifiers, #29
v0.6.0 (2017-04-14)¶
- add StopJobexception for cleaning ending jobs, #21
- add flushdbtoMockRedis, #23
- allow configurable length job logging via log_curtailonWorker, #28
v0.5.2 (2017-02-25)¶
- add shadow_kwargsmethod toBaseWorkerto make customising actors easier.
v0.5.1 (2017-02-25)¶
- reimplement worker reuse as it turned out to be useful in tests.
v0.5.0 (2017-02-20)¶
- use gatherrather thanwaitfor startup and shutdown so exceptions propagate.
- add --checkoption to confirm arq worker is running.
v0.4.1 (2017-02-11)¶
- fix issue with Concurrentclass binding with multiple actor instances.
v0.4.0 (2017-02-10)¶
- improving naming of log handlers and formatters
- upgrade numerous packages, nothing significant
- add startupandshutdownmethods to actors
- switch @concurrentto return aConcurrentinstance so the direct method is accessible via<func>.direct
v0.3.2 (2017-01-24)¶
- improved solution for preventing new jobs starting when the worker is about to stop
- switch SIGRTMIN>SIGUSR1to work with mac
v0.3.1 (2017-01-20)¶
- fix main process signal handling so the worker shuts down when just the main process receives a signal
- re-enqueue un-started jobs popped from the queue if the worker is about to exit
v0.3.0 (2017-01-19)¶
- rename settings class to RedisSettingsand simplify significantly
v0.2.0 (2016-12-09)¶
- add concurrency_enabledargument to aid in testing
- fix conflict with unitest.mock
v0.1.0 (2016-12-06)¶
- prevent logs disabling other logs
v0.0.6 (2016-08-14)¶
- first proper release