arq¶
Current Version: v0.15
Job queues and RPC in python with asyncio, redis and msgpack.
arq was conceived as a simple, modern and performant successor to rq.
Why use arq?
- non-blocking
- arq is built using python 3’s asyncio allowing
non-blocking job enqueuing and execution. Multiple jobs (potentially hundreds) can be run simultaneously
using a pool of asyncio
Tasks
. - pre-forked
- The worker starts two processes and uses the subprocess to execute all jobs, there’s no overhead in forking a process for each job.
- fast
- Asyncio, pre-forking and use of msgpack for job encoding make arq around 7x faster (see benchmarks) than rq for short jobs with no io. With io that might increase to around 40x faster. (TODO)
- elegant
- arq uses a novel approach to variable scope with the
@concurrent
decorator being applied to bound methods ofActor
classes which hold the connection pool. This works well with aiohttp, allows for easier testing and avoids extended head scratching over how variables like connections are defined (is this attached to the request? or thread local? or truly global? where am I, hell, what does global even mean?). - small
- and easy to reason with - currently arq is only about 700 lines, that won’t change significantly.
Dependencies¶
Required before pip install:
- Python 3.6.0+ asyncio is used throughout with new style
async/await
syntax andasync yield
. - Redis Redis lists are used to communication between the front end and worker, redis can also be used to store job results.
Installed as dependencies by pip:
Terminology¶
The old computer science proverb/joke goes:
There are only two challenges in computer science: cache invalidation, naming things and the n + 1 problem.
arq tries to avoid confusion over what’s named what by using generally accepted terminology as much as possible, however a few terms (like “actors” and “shadows”) are not so standard and bear describing:
An Actor is a class with some concurrent methods, you can define and use multiple actors. Actors hold a reference to a redis pool for enqueuing jobs and are generally singletons.
The Worker is the class which is responsible for running jobs for one or more actors. Workers should inherit
from BaseWorker
, your application will generally have just one worker.
Actors are therefore used in two distinctly different modes:
- default mode where you initialise, then use and abuse the actor including calling concurrent methods and thereby enqueuing jobs
- shadow mode where the actor was initialised by the worker in order to perform jobs enqueued by the actor in default (or even shadow) mode.
It’s possible to check what mode an actor is in by checking the is_shadow
variable.
Usage¶
Usage is best described by example.
Simple Usage¶
import asyncio
from aiohttp import ClientSession
from arq import Actor, BaseWorker, concurrent
class Downloader(Actor):
async def startup(self):
self.session = ClientSession(loop=self.loop)
@concurrent
async def download_content(self, url):
async with self.session.get(url) as response:
content = await response.read()
print(f'{url}: {content.decode():.80}...')
return len(content)
async def shutdown(self):
self.session.close()
class Worker(BaseWorker):
shadows = [Downloader]
async def download_lots():
d = Downloader()
for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'):
await d.download_content(url)
await d.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(download_lots())
(This script is complete, it should run “as is” both to enqueue jobs and run them)
To enqueue the jobs, simply run the script:
python demo.py
To execute the jobs, either after running demo.py
or before/during:
arq demo.py
For details on the arq CLI:
arq --help
Startup & Shutdown coroutines¶
The startup
and shutdown
coroutines are provided as a convenient way to run logic as actors start and finish,
however it’s important to not that these methods are not called by default when actors are initialised or closed.
They are however called when the actor was started and closed on the worker, eg. in “shadow” mode, see above.
In other words: if you need these coroutines to be called when using an actor in your code; that’s your responsibility.
For example, in the above example there’s no need for self.session
when using the actor in “default” mode,
eg. called with python demo.py
, so neither startup
or shutdown
are called.
Usage with aiohttp¶
Assuming you have Downloader
already defined as per above.
from aiohttp import web
async def start_job(request):
data = await request.post()
# this will enqueue the download_content job
await request.app['downloader'].download_content(data['url'])
raise web.HTTPFound(location='/wherever/')
async def shutdown(app):
await app['downloader'].close()
def create_app():
app = web.Application()
...
app.router.add_post('/start-job/', start_job)
app['downloader'] = Downloader()
# use aiohttp's on_shutdown trigger to close downloader
app.on_shutdown.append(shutdown)
return app
if __name__ == '__main__':
app = create_app()
web.run_app(app, port=8000)
(Won’t run as Downloader
is not defined)
For a full example arq usage with aiohttp and docker see the demo app.
Health checks¶
arq will automatically record some info about it’s current state in redis every health_check_interval
seconds,
see arq.worker.BaseWorker.health_check_interval
. That key/value will expire after health_check_interval + 1
seconds so you can be sure if the variable exists arq is alive and kicking (technically you can be sure it
was alive and kicking health_check_interval
seconds ago).
You can run a health check with the CLI (assuming you’re using the above example):
arq --check demo.py
The command will output the value of the health check if found;
then exit 0
if the key was found and 1
if it was not.
A health check value takes the following form:
Feb-20_11:02:40 j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=0 q_dft=0 q_low=0
Where the items have the following meaning:
j_complete
the number of jobs completedj_failed
the number of jobs which have failed eg. raised an exceptionj_timedout
the number of jobs which have timed out, eg. exceededarq.worker.BaseWorker.timeout_seconds
and been cancelledj_ongoing
the number of jobs currently being performedq_*
the number of pending jobs in each queue
Cron Jobs¶
Functions can be scheduled to be run periodically at specific times
from arq import Actor, cron
class FooBar(Actor):
@cron(hour={9, 12, 18}, minute=12)
async def foo(self):
print('run foo job at 9.12am, 12.12pm and 6.12pm')
See arq.main.cron()
for details on the available arguments and how how cron works.
Usage roughly shadows cron except None
is equivalent on *
in crontab.
As per the example sets can be used to run at multiple of the given unit.
Note that second
defaults to 0
so you don’t in inadvertently run jobs every second and microsecond
defaults to 123456
so you don’t inadvertently run jobs every microsecond and so arq avoids enqueuing jobs
at the top of a second when the world is generally slightly busier.
Multiple Queues¶
Functions can be assigned to different queues, by default arq defines three queues:
HIGH_QUEUE
, DEFAULT_QUEUE
and LOW_QUEUE
which are prioritised by the worker in that order.
from arq import Actor, concurrent
class RegistrationEmail(Actor):
@concurrent
async def email_standard_user(self, user_id):
send_user_email(user_id)
@concurrent(Actor.HIGH_QUEUE)
async def email_premium_user(self, user_id):
send_user_email(user_id)
(Just a snippet, won’t run “as is”)
Direct Enqueuing¶
Functions can we enqueued directly whether or no they’re decorated with @concurrent
.
from arq import Actor
class FooBar(Actor):
async def foo(self, a, b, c):
print(a + b + c)
async def main():
foobar = FooBar()
await foobar.enqueue_job('foo', 1, 2, c=48, queue=Actor.LOW_QUEUE)
await foobar.enqueue_job('foo', 1, 2, c=48) # this will be queued in DEFAULT_QUEUE
await foobar.close()
(This script is almost complete except for loop.run_until_complete(main())
as above to run main
,
you would also need to define a worker to run the jobs)
See arq.main.Actor.enqueue_job()
for more details.
Worker Customisation¶
Workers can be customised in numerous ways, this is preferred to command line arguments as it’s easier to document and record.
from arq import BaseWorker
class Worker(BaseWorker):
# execute jobs from both Downloader and FooBar above
shadows = [Downloader, FooBar]
# allow lots and lots of jobs to run simultaniously, default 50
max_concurrent_tasks = 500
# force the worker to close quickly after a termination signal is received, default 6
shutdown_delay = 2
# jobs may not take more than 10 seconds, default 60
timeout_seconds = 10
# number of seconds between health checks, default 60
health_check_interval = 30
def logging_config(self, verbose):
conf = super().logging_config(verbose)
# alter logging setup to set arq.jobs level to WARNING
conf['loggers']['arq.jobs']['level'] = 'WARNING'
return conf
(This script is more-or-less complete,
provided Downloader
and FooBar
are defined and imported it should run “as is”)
See arq.worker.BaseWorker()
for more customisation options.
For more information on logging see arq.logs.default_log_config()
.
API Reference¶
main
¶
Defines the main Actor
class and @concurrent
decorator for using arq from within your code.
Also defines the @cron
decorator for declaring cron job functions.
-
class
arq.main.
Actor
(*args, worker=None, concurrency_enabled=True, **kwargs)[source]¶ All classes which wish to use arq should inherit from Actor.
Actor defines three default queues: HIGH_QUEUE, DEFAULT_QUEUE and LOW_QUEUE which are processed in that order of priority by the worker.
Actors operate in two modes: normal mode when you initialise them and use them, and “shadow mode” where the actor is initialised by the worker and used to execute jobs.
Parameters: - worker – reference to the worker which is managing this actor in shadow mode
- concurrency_enabled – For testing only if set to False methods are called directly not queued
- kwargs – other keyword arguments, see
arq.utils.RedisMixin
for all available options
-
HIGH_QUEUE
= 'high'¶ highest priority queue, this can be overwritten by changing
arq.main.Actor.queues
-
DEFAULT_QUEUE
= 'dft'¶ default queue, this is a special value as it’s used in
arq.main.Actor.enqueue_job()
-
LOW_QUEUE
= 'low'¶ lowest priority queue, can be overwritten
-
QUEUE_PREFIX
= b'arq:q:'¶ prefix prepended to all queue names to create the list keys in redis
-
job_class
¶ alias of
arq.jobs.Job
-
re_enqueue_jobs
= False¶ Whether or not to re-queue jobs if the worker quits before the job has time to finish.
-
queues
= ('high', 'dft', 'low')¶ queues the actor can enqueue jobs in, order is important, the first queue is highest priority
-
name
= None¶ if not None this name is used instead of the class name when encoding and referencing jobs, if None the class’s name is used
-
close
(shutdown=False)[source]¶ Close down the actor, eg. close the associated redis pool, optionally also calling shutdown.
Parameters: shutdown – whether or not to also call the shutdown coroutine, you probably only want to set this to True
it you called startup previously
-
enqueue_job
(func_name: str, *args, queue: str = None, **kwargs) → arq.jobs.Job[source]¶ Enqueue a job by pushing the encoded job information into the redis list specified by the queue.
Alternatively if concurrency is disabled the job will be encoded, then decoded and ran. Disabled concurrency (set via
disable_concurrency
init keyword argument) is designed for use in testing, hence the job is encoded then decoded to keep tests as close as possible to production.Parameters: - func_name – name of the function executing the job
- args – arguments to pass to the function
- queue – name of the queue to use, if None
DEFAULT_QUEUE
is used. - kwargs – key word arguments to pass to the function
-
arq.main.
concurrent
(func_or_queue)[source]¶ Decorator which defines a functions as concurrent, eg. it should be executed on the worker.
If you wish to call the function directly you can access the original function at
<func>.direct
.The decorator can optionally be used with one argument: the queue to use by default for the job.
-
arq.main.
cron
(*, month: Union[None, set, int] = None, day: Union[None, set, int] = None, weekday: Union[None, set, int, str] = None, hour: Union[None, set, int] = None, minute: Union[None, set, int] = None, second: Union[None, set, int] = 0, microsecond: int = 123456, dft_queue=None, run_at_startup=False, unique=True)[source]¶ Decorator which defines a functions as a cron job, eg. it should be executed at specific times.
Workers will enqueue this job at or just after the set times. If
unique
is true (the default) the job will only be enqueued once even if multiple workers are running.If you wish to call the function directly you can access the original function at
<func>.direct
.Parameters: - month – month(s) to run the job on, 1 - 12
- day – day(s) to run the job on, 1 - 31
- weekday – week day(s) to run the job on, 0 - 6 or mon - sun
- hour – hour(s) to run the job on, 0 - 23
- minute – minute(s) to run the job on, 0 - 59
- second – second(s) to run the job on, 0 - 59
- microsecond – microsecond(s) to run the job on, defaults to 123456 as the world is busier at the top of a second, 0 - 1e6
- dft_queue – default queue to use
- run_at_startup – whether to run as worker starts
- unique – whether the job should be only be executed once at each time
worker
¶
Responsible for executing jobs on the worker.
-
class
arq.worker.
BaseWorker
(*, burst: bool = False, shadows: list = None, queues: list = None, timeout_seconds: int = None, **kwargs)[source]¶ Base class for Workers to inherit from.
Parameters: - burst – if true the worker will close as soon as no new jobs are found in the queue lists
- shadows – list of
arq.main.Actor
classes for the worker to run, overrides shadows already defined in the class definition - queues – list of queue names for the worker to listen on, if None queues is taken from the shadows
- timeout_seconds – maximum duration of a job, after that the job will be cancelled by the event loop
- re_queue – Whether or not to re-queue jobs if the worker quits before the job has time to finish.
- kwargs – other keyword arguments, see
arq.utils.RedisMixin
for all available options
-
max_concurrent_tasks
= 50¶ maximum number of jobs which can be execute at the same time by the event loop
-
shutdown_delay
= 6¶ number of seconds after a termination signal (SIGINT or SIGTERM) is received to force quit the worker
-
health_check_interval
= 60¶ number of seconds between calls to health_check
-
repeat_health_check_logs
= False¶ Whether or not to skip log messages from health checks where nothing has changed
-
drain_class
¶ alias of
arq.drain.Drain
-
timeout_seconds
= 60¶ default maximum duration of a job
-
classmethod
logging_config
(verbose) → dict[source]¶ Override to customise the logging setup for the arq worker. By default just uses
arq.logs.default_log_config()
.Parameters: verbose – verbose flag from cli, by default log level is INFO if false and DEBUG if true Returns: dict suitable for logging.config.dictConfig
-
shadows
= None¶ shadow classes, a list of Actor classes for the Worker to run
-
classmethod
check_health
(**kwargs)[source]¶ Run a health check on the worker return the appropriate exit code.
Returns: 0 if successful, 1 if not
-
run
(log_redis_version=False)[source]¶ Main entry point for the the worker which initialises shadows, checks they look ok then runs
work
to perform jobs.
-
shadow_factory
() → list[source]¶ Initialise list of shadows and return them.
Override to customise the way shadows are initialised.
drain
¶
Drain class used by arq.worker.BaseWorker
and reusable elsewhere.
-
class
arq.drain.
Drain
(*, redis: aioredis.commands.Redis, max_concurrent_tasks: int = 50, shutdown_delay: float = 6, timeout_seconds: int = 60, burst_mode: bool = True, raise_task_exception: bool = False, semaphore_timeout: float = 60)[source]¶ Drains popping jobs from redis lists and managing a set of tasks with a limited size to execute those jobs.
Parameters: - redis – redis pool to get connection from to pop items from list, also used to optionally re-enqueue pending jobs on termination
- max_concurrent_tasks – maximum number of jobs which can be execute at the same time by the event loop
- shutdown_delay – number of seconds to wait for tasks to finish
- timeout_seconds – maximum duration of a job, after that the job will be cancelled by the event loop
- burst_mode – break the iter loop as soon as no more jobs are available by adding an sentinel quit queue
- raise_task_exception – whether or not to raise an exception which occurs in a processed task
-
add
(coro, job, re_enqueue=False)[source]¶ Start job and add it to the pending tasks set. :param coro: coroutine to execute the job :param job: job object, instance of
arq.jobs.Job
or similar :param re_enqueue: whether or not to re-enqueue the job on finish if the job won’t finish in time.
-
finish
(timeout=None)[source]¶ Cancel all pending tasks and optionally re-enqueue jobs which haven’t finished after the timeout.
Parameters: timeout – how long to wait for tasks to finish, defaults to shutdown_delay
-
iter
(*raw_queues, pop_timeout=1)[source]¶ blpop jobs from redis queues and yield them. Waits for the number of tasks to drop below max_concurrent_tasks before popping.
Parameters: - raw_queues – tuple of bytes defining queue(s) to pop from.
- pop_timeout – how long to wait on each blpop before yielding None.
Yields: tuple
(raw_queue_name, raw_data)
or(None, None)
if all jobs are empty
jobs
¶
Defines the Job
class and descendants which deal with encoding and decoding job data.
-
class
arq.jobs.
Job
(*, class_name: str, func_name: str, args: tuple, kwargs: dict, job_id: str = None, queue: str = None, raw_queue: str = None, queued_at: int = 0, raw_data: bytes = None)[source]¶ Main Job class responsible for encoding and decoding jobs as they go into and come out of redis.
Create a new job instance.
Parameters: - class_name – name (see
arq.main.Actor.name
) of the actor class where the job is defined - func_name – name of the function be called
- args – arguments to pass to the function
- kwargs – key word arguments to pass to the function
- job_id – id to use for the job, leave blank to generate a uuid
-
classmethod
decode
(raw_data: bytes, *, queue_name: str = None, raw_queue: str = None) → arq.jobs.Job[source]¶ Create a job instance by decoding a job definition eg. from redis.
Parameters: - raw_data – data to decode, as created by
arq.jobs.Job.encode()
- queue_name – name of the queue the job was dequeued from
- raw_queue – raw name of the queue the job was taken from
- raw_data – data to decode, as created by
-
encode
()[source]¶ Create a byte string suitable for pushing into redis which contains all required information about a job to be performed.
Parameters: - job_id – id to use for the job, leave blank to generate a uuid
- queued_at – time in ms unix time when the job was queue, if None now is used
- class_name – name (see
arq.main.Actor.name
) of the actor class where the job is defined - func_name – name of the function be called
- args – arguments to pass to the function
- kwargs – key word arguments to pass to the function
- class_name – name (see
-
class
arq.jobs.
DatetimeJob
(*, class_name: str, func_name: str, args: tuple, kwargs: dict, job_id: str = None, queue: str = None, raw_queue: str = None, queued_at: int = 0, raw_data: bytes = None)[source]¶ Alternative Job which copes with datetimes. None timezone naïve dates are supported but the returned datetimes will use a
datetime.timezone
class to define the timezone regardless of the timezone class originally used on the datetime object (eg.pytz
).Create a new job instance.
Parameters: - class_name – name (see
arq.main.Actor.name
) of the actor class where the job is defined - func_name – name of the function be called
- args – arguments to pass to the function
- kwargs – key word arguments to pass to the function
- job_id – id to use for the job, leave blank to generate a uuid
- class_name – name (see
logs
¶
-
class
arq.logs.
ColourHandler
(stream=None)[source]¶ Coloured log handler. Levels: debug: white, info: green, warning: yellow, else: red.
Date times (anything before the first colon) is magenta.
Initialize the handler.
If stream is not specified, sys.stderr is used.
-
emit
(record)[source]¶ Emit a record.
If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an ‘encoding’ attribute, it is used to determine how to do the output to the stream.
-
utils
¶
Utilises for running arq used by other modules.
-
class
arq.utils.
RedisSettings
(host='localhost', port=6379, database=0, password=None, conn_timeout=1, conn_retries=5, conn_retry_delay=1)[source]¶ No-Op class used to hold redis connection redis_settings.
Parameters: - host – redis host
- port – redis port
- database – redis database id
- password – password for redis connection
-
class
arq.utils.
RedisMixin
(*, loop: asyncio.events.AbstractEventLoop = None, redis_settings: arq.utils.RedisSettings = None, existing_redis: aioredis.commands.Redis = None)[source]¶ Mixin used to fined a redis pool and access it.
Parameters: - loop – asyncio loop to use for the redis pool
- redis_settings – connection settings to use for the pool
- existing_redis – existing pool, if set no new pool is created, instead this one is used
-
arq.utils.
create_tz
(utcoffset=0) → datetime.timezone[source]¶ Create a python datetime.timezone with a given utc offset.
Parameters: utcoffset – utc offset in seconds, if 0 timezone.utc is returned.
-
arq.utils.
timestamp
() → float[source]¶ This should be exactly the same as time.time(), we use this approach for consistency with other methods and possibly greater accuracy. :return: now in unix time, eg. seconds since 1970
-
arq.utils.
timestamp
() → float[source] This should be exactly the same as time.time(), we use this approach for consistency with other methods and possibly greater accuracy. :return: now in unix time, eg. seconds since 1970
-
arq.utils.
to_unix_ms_tz
(dt: datetime.datetime) → Tuple[int, Optional[int]][source]¶ convert a datetime to number of milliseconds since 1970 and calculate timezone offset :param dt: datetime to evaluate :return: tuple - (unix time in milliseconds, utc offset in seconds)
-
arq.utils.
to_unix_ms
(dt: datetime.datetime) → int[source]¶ convert a datetime to number of milliseconds since 1970 :param dt: datetime to evaluate :return: unix time in milliseconds
-
arq.utils.
from_unix_ms
(ms: int, utcoffset: int = None) → datetime.datetime[source]¶ convert int to a datetime.
Parameters: - ms – number of milliseconds since 1970
- utcoffset – if set a timezone i added to the datime based on the offset in seconds.
Returns: datetime - including timezone if utcoffset is not None, else timezone naïve
testing
¶
Utils for testing arq.
See arq’s own tests for examples of usage.
-
class
arq.testing.
RaiseWorker
(*, burst: bool = False, shadows: list = None, queues: list = None, timeout_seconds: int = None, **kwargs)[source]¶ Worker which raises exceptions rather than logging them. Useful for testing.
Parameters: - burst – if true the worker will close as soon as no new jobs are found in the queue lists
- shadows – list of
arq.main.Actor
classes for the worker to run, overrides shadows already defined in the class definition - queues – list of queue names for the worker to listen on, if None queues is taken from the shadows
- timeout_seconds – maximum duration of a job, after that the job will be cancelled by the event loop
- re_queue – Whether or not to re-queue jobs if the worker quits before the job has time to finish.
- kwargs – other keyword arguments, see
arq.utils.RedisMixin
for all available options
-
class
arq.testing.
MockRedis
(*, loop=None, data=None)[source]¶ Very simple mock of aioredis > Redis which allows jobs to be enqueued and executed without redis.
-
class
arq.testing.
MockRedisMixin
(*, loop: asyncio.events.AbstractEventLoop = None, redis_settings: arq.utils.RedisSettings = None, existing_redis: aioredis.commands.Redis = None)[source]¶ Dependent of RedisMixin which uses MockRedis rather than real redis to enqueue jobs.
Parameters: - loop – asyncio loop to use for the redis pool
- redis_settings – connection settings to use for the pool
- existing_redis – existing pool, if set no new pool is created, instead this one is used
-
class
arq.testing.
MockRedisWorker
(*, burst: bool = False, shadows: list = None, queues: list = None, timeout_seconds: int = None, **kwargs)[source]¶ Dependent of Base Worker which executes jobs from MockRedis rather than real redis.
Parameters: - burst – if true the worker will close as soon as no new jobs are found in the queue lists
- shadows – list of
arq.main.Actor
classes for the worker to run, overrides shadows already defined in the class definition - queues – list of queue names for the worker to listen on, if None queues is taken from the shadows
- timeout_seconds – maximum duration of a job, after that the job will be cancelled by the event loop
- re_queue – Whether or not to re-queue jobs if the worker quits before the job has time to finish.
- kwargs – other keyword arguments, see
arq.utils.RedisMixin
for all available options
History¶
v0.15.0 (2018-11-15)¶
- update dependencies
- reconfigure
Job
, return a job instance when enqueuing tasks #93 - tweaks to docs #106
v0.14.0 (2018-05-28)¶
- package updates, particularly compatibility for
msgpack 0.5.6
v0.13.0 (2017-11-27)¶
- Braking Change: integration with aioredis >= 1.0, basic usage hasn’t changed but look at aioredis’s migration docs for changes in redis API #76
v0.12.0 (2017-11-16)¶
- better signal handling, support
uvloop
#73 - drain pending tasks and drain task cancellation #74
- add aiohttp and docker demo
/demo
#75
v0.11.0 (2017-08-25)¶
- extract
create_pool_lenient
fromRedixMixin
- improve redis connection traceback
v0.10.4 (2017-08-22)¶
RedisSettings
repr method- add
create_connection_timeout
to connection pool
v0.10.3 (2017-08-19)¶
- fix bug with
RedisMixin.get_redis_pool
creating multiple queues - tweak drain logs
v0.10.2 (2017-08-17)¶
- only save job on task in drain if re-enqueuing
- add semaphore timeout to drains
- add key count to
log_redis_info
v0.10.1 (2017-08-16)¶
- correct format of
log_redis_info
v0.10.0 (2017-08-16)¶
- log redis version when starting worker, fix #64
- log “connection success” when connecting to redis after connection failures, fix #67
- add job ids, for now they’re just used in logging, fix #53
v0.9.0 (2017-06-23)¶
- allow set encoding in msgpack for jobs #49
- cron tasks allowing scheduling of functions in the future #50
- Breaking change: switch
to_unix_ms
to just return the timestamp int, addto_unix_ms_tz
to return tz offset too
v0.8.1 (2017-06-05)¶
- uprev setup requires
- correct setup arguments
v0.8.0 (2017-06-05)¶
- add
async-timeout
dependency - use async-timeout around
shadow_factory
- change logger name for control process log messages
- use
Semaphore
rather thanasyncio.wait(...return_when=asyncio.FIRST_COMPLETED)
for improved performance - improve log display
- add timeout and retry logic to
RedisMixin.create_redis_pool
v0.7.0 (2017-06-01)¶
- implementing reusable
Drain
which takes tasks from a redis list and allows them to be execute asynchronously. - Drain uses python 3.6
async yield
, therefore python 3.5 is no longer supported. - prevent repeated identical health check log messages
v0.6.1 (2017-05-06)¶
- mypy at last passing, #30
- adding trove classifiers, #29
v0.6.0 (2017-04-14)¶
- add
StopJob
exception for cleaning ending jobs, #21 - add
flushdb
toMockRedis
, #23 - allow configurable length job logging via
log_curtail
onWorker
, #28
v0.5.2 (2017-02-25)¶
- add
shadow_kwargs
method toBaseWorker
to make customising actors easier.
v0.5.1 (2017-02-25)¶
- reimplement worker reuse as it turned out to be useful in tests.
v0.5.0 (2017-02-20)¶
- use
gather
rather thanwait
for startup and shutdown so exceptions propagate. - add
--check
option to confirm arq worker is running.
v0.4.1 (2017-02-11)¶
- fix issue with
Concurrent
class binding with multiple actor instances.
v0.4.0 (2017-02-10)¶
- improving naming of log handlers and formatters
- upgrade numerous packages, nothing significant
- add
startup
andshutdown
methods to actors - switch
@concurrent
to return aConcurrent
instance so the direct method is accessible via<func>.direct
v0.3.2 (2017-01-24)¶
- improved solution for preventing new jobs starting when the worker is about to stop
- switch
SIGRTMIN
>SIGUSR1
to work with mac
v0.3.1 (2017-01-20)¶
- fix main process signal handling so the worker shuts down when just the main process receives a signal
- re-enqueue un-started jobs popped from the queue if the worker is about to exit
v0.3.0 (2017-01-19)¶
- rename settings class to
RedisSettings
and simplify significantly
v0.2.0 (2016-12-09)¶
- add
concurrency_enabled
argument to aid in testing - fix conflict with unitest.mock
v0.1.0 (2016-12-06)¶
- prevent logs disabling other logs
v0.0.6 (2016-08-14)¶
- first proper release