"""Timer scheduling Python callbacks."""
import heapq
import sys
from collections import namedtuple
from datetime import datetime
from functools import total_ordering
from weakref import proxy as weakrefproxy
from time import monotonic
from vine.utils import wraps
from kombu.log import get_logger
from time import time as _time
try:
from pytz import utc
except ImportError: # pragma: no cover
utc = None
__all__ = ('Entry', 'Timer', 'to_timestamp')
logger = get_logger(__name__)
DEFAULT_MAX_INTERVAL = 2
EPOCH = datetime.utcfromtimestamp(0).replace(tzinfo=utc)
IS_PYPY = hasattr(sys, 'pypy_version_info')
scheduled = namedtuple('scheduled', ('eta', 'priority', 'entry'))
[docs]def to_timestamp(d, default_timezone=utc, time=monotonic):
"""Convert datetime to timestamp.
If d' is already a timestamp, then that will be used.
"""
if isinstance(d, datetime):
if d.tzinfo is None:
d = d.replace(tzinfo=default_timezone)
diff = _time() - time()
return max((d - EPOCH).total_seconds() - diff, 0)
return d
[docs]@total_ordering
class Entry:
"""Schedule Entry."""
if not IS_PYPY: # pragma: no cover
__slots__ = (
'fun', 'args', 'kwargs', 'tref', 'canceled',
'_last_run', '__weakref__',
)
def __init__(self, fun, args=None, kwargs=None):
self.fun = fun
self.args = args or []
self.kwargs = kwargs or {}
self.tref = weakrefproxy(self)
self._last_run = None
self.canceled = False
def __call__(self):
return self.fun(*self.args, **self.kwargs)
[docs] def cancel(self):
try:
self.tref.canceled = True
except ReferenceError: # pragma: no cover
pass
def __repr__(self):
return '<TimerEntry: {}(*{!r}, **{!r})'.format(
self.fun.__name__, self.args, self.kwargs)
# must not use hash() to order entries
def __lt__(self, other):
return id(self) < id(other)
@property
def cancelled(self):
return self.canceled
@cancelled.setter
def cancelled(self, value):
self.canceled = value
class Timer:
"""Async timer implementation."""
Entry = Entry
on_error = None
def __init__(self, max_interval=None, on_error=None, **kwargs):
self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
self.on_error = on_error or self.on_error
self._queue = []
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.stop()
def call_at(self, eta, fun, args=(), kwargs=None, priority=0):
kwargs = {} if not kwargs else kwargs
return self.enter_at(self.Entry(fun, args, kwargs), eta, priority)
def call_after(self, secs, fun, args=(), kwargs=None, priority=0):
kwargs = {} if not kwargs else kwargs
return self.enter_after(secs, self.Entry(fun, args, kwargs), priority)
def call_repeatedly(self, secs, fun, args=(), kwargs=None, priority=0):
kwargs = {} if not kwargs else kwargs
tref = self.Entry(fun, args, kwargs)
@wraps(fun)
def _reschedules(*args, **kwargs):
last, now = tref._last_run, monotonic()
lsince = (now - tref._last_run) if last else secs
try:
if lsince and lsince >= secs:
tref._last_run = now
return fun(*args, **kwargs)
finally:
if not tref.canceled:
last = tref._last_run
next = secs - (now - last) if last else secs
self.enter_after(next, tref, priority)
tref.fun = _reschedules
tref._last_run = None
return self.enter_after(secs, tref, priority)
def enter_at(self, entry, eta=None, priority=0, time=monotonic):
"""Enter function into the scheduler.
Arguments:
entry (~kombu.asynchronous.timer.Entry): Item to enter.
eta (datetime.datetime): Scheduled time.
priority (int): Unused.
"""
if eta is None:
eta = time()
if isinstance(eta, datetime):
try:
eta = to_timestamp(eta)
except Exception as exc:
if not self.handle_error(exc):
raise
return
return self._enter(eta, priority, entry)
def enter_after(self, secs, entry, priority=0, time=monotonic):
return self.enter_at(entry, time() + float(secs), priority)
def _enter(self, eta, priority, entry, push=heapq.heappush):
push(self._queue, scheduled(eta, priority, entry))
return entry
def apply_entry(self, entry):
try:
entry()
except Exception as exc:
if not self.handle_error(exc):
logger.error('Error in timer: %r', exc, exc_info=True)
def handle_error(self, exc_info):
if self.on_error:
self.on_error(exc_info)
return True
def stop(self):
pass
def __iter__(self, min=min, nowfun=monotonic,
pop=heapq.heappop, push=heapq.heappush):
"""Iterate over schedule.
This iterator yields a tuple of ``(wait_seconds, entry)``,
where if entry is :const:`None` the caller should wait
for ``wait_seconds`` until it polls the schedule again.
"""
max_interval = self.max_interval
queue = self._queue
while 1:
if queue:
eventA = queue[0]
now, eta = nowfun(), eventA[0]
if now < eta:
yield min(eta - now, max_interval), None
else:
eventB = pop(queue)
if eventB is eventA:
entry = eventA[2]
if not entry.canceled:
yield None, entry
continue
else:
push(queue, eventB)
else:
yield None, None
def clear(self):
self._queue[:] = [] # atomic, without creating a new list.
def cancel(self, tref):
tref.cancel()
def __len__(self):
return len(self._queue)
def __nonzero__(self):
return True
@property
def queue(self, _pop=heapq.heappop):
"""Snapshot of underlying datastructure."""
events = list(self._queue)
return [_pop(v) for v in [events] * len(events)]
@property
def schedule(self):
return self