Source code for arq.testing


Utils for testing arq.

See arq's own tests for examples of usage.
import asyncio
import logging
from contextlib import contextmanager
from datetime import datetime, timedelta

from .utils import RedisMixin, timestamp
from .worker import BaseWorker

logger = logging.getLogger('arq.mock')

[docs]class RaiseWorker(BaseWorker): """ Worker which raises exceptions rather than logging them. Useful for testing. """ @classmethod def handle_execute_exc(cls, started_at, exc, j): raise exc def handle_prepare_exc(self, msg): raise RuntimeError(msg)
@contextmanager def redis_context_manager(r): yield r
[docs]class MockRedis: """ Very simple mock of aioredis > Redis which allows jobs to be enqueued and executed without redis. """ def __init__(self, *, loop=None, data=None): self.loop = loop or asyncio.get_event_loop() = {} if data is None else data self._expiry = {} self._pool_or_conn = type('MockConnection', (), {'_loop': self.loop})'initialising MockRedis, data id: %s', None if data is None else id(data)) def __await__(self): return redis_context_manager(self) yield # pragma: no cover async def rpush(self, list_name, data):'rpushing %s to %s', data, list_name)[list_name] = self._get(list_name, []) + [data] async def blpop(self, *list_names, timeout=0): assert isinstance(timeout, int) start = timestamp() if timeout > 0 else None'blpop from %s, timeout=%d', list_names, timeout) while True: v = await self.lpop(*list_names) if v: return v t = timestamp() - start if start and t > timeout:'blpop timed out %0.3fs', t) return'blpop waiting for data %0.3fs', t) await asyncio.sleep(0.5, loop=self.loop) async def lpop(self, *list_names): for list_name in list_names: data_list = self._get(list_name) if data_list is None: continue assert isinstance(data_list, list) if data_list: d = data_list.pop(0)'lpop %s from %s', d, list_name) return list_name, d'lpop nothing found in lists %s', list_names) def _get(self, key, default=None): expires = self._expiry.get(key, None) if expires and expires < return default return, default) async def llen(self, list_name): return len(self._get(list_name, [])) async def flushdb(self): = {} async def set(self, key, value):[key] = value async def setex(self, key, expires, value):[key] = value self._expiry[key] = + timedelta(seconds=expires) async def expire(self, key, expires): self._expiry[key] = + timedelta(seconds=expires) async def get(self, key): return self._get(key) async def getset(self, key, value): old_value = self._get(key) await self.set(key, value) return old_value def close(self): pass async def wait_closed(self): pass
[docs]class MockRedisMixin(RedisMixin): """ Dependent of RedisMixin which uses MockRedis rather than real redis to enqueue jobs. """ async def create_redis_pool(self): return self.redis or MockRedis(loop=self.loop) @property def mock_data(self): self.redis = self.redis or MockRedis(loop=self.loop) return @mock_data.setter def mock_data(self, data): self.redis = self.redis or MockRedis(loop=self.loop) = data
[docs]class MockRedisWorker(MockRedisMixin, BaseWorker): """ Dependent of Base Worker which executes jobs from MockRedis rather than real redis. """