Source code for celery.events.event

"""Creating events, and event exchange definition."""
import time
from copy import copy

from kombu import Exchange

__all__ = (
    'Event', 'event_exchange', 'get_exchange', 'group_from',
)

EVENT_EXCHANGE_NAME = 'celeryev'
#: Exchange used to send events on.
#: Note: Use :func:`get_exchange` instead, as the type of
#: exchange will vary depending on the broker connection.
event_exchange = Exchange(EVENT_EXCHANGE_NAME, type='topic')


[docs]def Event(type, _fields=None, __dict__=dict, __now__=time.time, **fields): """Create an event. Notes: An event is simply a dictionary: the only required field is ``type``. A ``timestamp`` field will be set to the current time if not provided. """ event = __dict__(_fields, **fields) if _fields else fields if 'timestamp' not in event: event.update(timestamp=__now__(), type=type) else: event['type'] = type return event
[docs]def group_from(type): """Get the group part of an event type name. Example: >>> group_from('task-sent') 'task' >>> group_from('custom-my-event') 'custom' """ return type.split('-', 1)[0]
[docs]def get_exchange(conn, name=EVENT_EXCHANGE_NAME): """Get exchange used for sending events. Arguments: conn (kombu.Connection): Connection used for sending/receiving events. name (str): Name of the exchange. Default is ``celeryev``. Note: The event type changes if Redis is used as the transport (from topic -> fanout). """ ex = copy(event_exchange) if conn.transport.driver_type == 'redis': # quick hack for Issue #436 ex.type = 'fanout' if name != ex.name: ex.name = name return ex
Back to Top