Source code for celery.backends.mongodb

"""MongoDB result store backend."""
from datetime import datetime, timedelta

from kombu.exceptions import EncodeError
from kombu.utils.objects import cached_property
from kombu.utils.url import maybe_sanitize_url, urlparse

from celery import states
from celery.exceptions import ImproperlyConfigured

from .base import BaseBackend

try:
    import pymongo
except ImportError:  # pragma: no cover
    pymongo = None   # noqa

if pymongo:
    try:
        from bson.binary import Binary
    except ImportError:                     # pragma: no cover
        from pymongo.binary import Binary  # noqa
    from pymongo.errors import InvalidDocument  # noqa
else:                                       # pragma: no cover
    Binary = None                           # noqa

    class InvalidDocument(Exception):       # noqa
        pass

__all__ = ('MongoBackend',)

BINARY_CODECS = frozenset(['pickle', 'msgpack'])


[docs]class MongoBackend(BaseBackend): """MongoDB result backend. Raises: celery.exceptions.ImproperlyConfigured: if module :pypi:`pymongo` is not available. """ mongo_host = None host = 'localhost' port = 27017 user = None password = None database_name = 'celery' taskmeta_collection = 'celery_taskmeta' groupmeta_collection = 'celery_groupmeta' max_pool_size = 10 options = None supports_autoexpire = False _connection = None def __init__(self, app=None, **kwargs): self.options = {} super().__init__(app, **kwargs) if not pymongo: raise ImproperlyConfigured( 'You need to install the pymongo library to use the ' 'MongoDB backend.') # Set option defaults for key, value in self._prepare_client_options().items(): self.options.setdefault(key, value) # update conf with mongo uri data, only if uri was given if self.url: self.url = self._ensure_mongodb_uri_compliance(self.url) uri_data = pymongo.uri_parser.parse_uri(self.url) # build the hosts list to create a mongo connection hostslist = [ f'{x[0]}:{x[1]}' for x in uri_data['nodelist'] ] self.user = uri_data['username'] self.password = uri_data['password'] self.mongo_host = hostslist if uri_data['database']: # if no database is provided in the uri, use default self.database_name = uri_data['database'] self.options.update(uri_data['options']) # update conf with specific settings config = self.app.conf.get('mongodb_backend_settings') if config is not None: if not isinstance(config, dict): raise ImproperlyConfigured( 'MongoDB backend settings should be grouped in a dict') config = dict(config) # don't modify original if 'host' in config or 'port' in config: # these should take over uri conf self.mongo_host = None self.host = config.pop('host', self.host) self.port = config.pop('port', self.port) self.mongo_host = config.pop('mongo_host', self.mongo_host) self.user = config.pop('user', self.user) self.password = config.pop('password', self.password) self.database_name = config.pop('database', self.database_name) self.taskmeta_collection = config.pop( 'taskmeta_collection', self.taskmeta_collection, ) self.groupmeta_collection = config.pop( 'groupmeta_collection', self.groupmeta_collection, ) self.options.update(config.pop('options', {})) self.options.update(config) @staticmethod def _ensure_mongodb_uri_compliance(url): parsed_url = urlparse(url) if not parsed_url.scheme.startswith('mongodb'): url = f'mongodb+{url}' if url == 'mongodb://': url += 'localhost' return url def _prepare_client_options(self): if pymongo.version_tuple >= (3,): return {'maxPoolSize': self.max_pool_size} else: # pragma: no cover return {'max_pool_size': self.max_pool_size, 'auto_start_request': False} def _get_connection(self): """Connect to the MongoDB server.""" if self._connection is None: from pymongo import MongoClient host = self.mongo_host if not host: # The first pymongo.Connection() argument (host) can be # a list of ['host:port'] elements or a mongodb connection # URI. If this is the case, don't use self.port # but let pymongo get the port(s) from the URI instead. # This enables the use of replica sets and sharding. # See pymongo.Connection() for more info. host = self.host if isinstance(host, str) \ and not host.startswith('mongodb://'): host = f'mongodb://{host}:{self.port}' # don't change self.options conf = dict(self.options) conf['host'] = host if self.user: conf['username'] = self.user if self.password: conf['password'] = self.password self._connection = MongoClient(**conf) return self._connection
[docs] def encode(self, data): if self.serializer == 'bson': # mongodb handles serialization return data payload = super().encode(data) # serializer which are in a unsupported format (pickle/binary) if self.serializer in BINARY_CODECS: payload = Binary(payload) return payload
[docs] def decode(self, data): if self.serializer == 'bson': return data return super().decode(data)
def _store_result(self, task_id, result, state, traceback=None, request=None, **kwargs): """Store return value and state of an executed task.""" meta = self._get_result_meta(result=self.encode(result), state=state, traceback=traceback, request=request) # Add the _id for mongodb meta['_id'] = task_id try: self.collection.replace_one({'_id': task_id}, meta, upsert=True) except InvalidDocument as exc: raise EncodeError(exc) return result def _get_task_meta_for(self, task_id): """Get task meta-data for a task by id.""" obj = self.collection.find_one({'_id': task_id}) if obj: return self.meta_from_decoded({ 'task_id': obj['_id'], 'status': obj['status'], 'result': self.decode(obj['result']), 'date_done': obj['date_done'], 'traceback': obj['traceback'], 'children': obj['children'], }) return {'status': states.PENDING, 'result': None} def _save_group(self, group_id, result): """Save the group result.""" meta = { '_id': group_id, 'result': self.encode([i.id for i in result]), 'date_done': datetime.utcnow(), } self.group_collection.replace_one({'_id': group_id}, meta, upsert=True) return result def _restore_group(self, group_id): """Get the result for a group by id.""" obj = self.group_collection.find_one({'_id': group_id}) if obj: return { 'task_id': obj['_id'], 'date_done': obj['date_done'], 'result': [ self.app.AsyncResult(task) for task in self.decode(obj['result']) ], } def _delete_group(self, group_id): """Delete a group by id.""" self.group_collection.delete_one({'_id': group_id}) def _forget(self, task_id): """Remove result from MongoDB. Raises: pymongo.exceptions.OperationsError: if the task_id could not be removed. """ # By using safe=True, this will wait until it receives a response from # the server. Likewise, it will raise an OperationsError if the # response was unable to be completed. self.collection.delete_one({'_id': task_id})
[docs] def cleanup(self): """Delete expired meta-data.""" if not self.expires: return self.collection.delete_many( {'date_done': {'$lt': self.app.now() - self.expires_delta}}, ) self.group_collection.delete_many( {'date_done': {'$lt': self.app.now() - self.expires_delta}}, )
def __reduce__(self, args=(), kwargs=None): kwargs = {} if not kwargs else kwargs return super().__reduce__( args, dict(kwargs, expires=self.expires, url=self.url)) def _get_database(self): conn = self._get_connection() db = conn[self.database_name] if self.user and self.password: source = self.options.get( 'authsource', self.database_name or 'admin' ) if not db.authenticate(self.user, self.password, source=source): raise ImproperlyConfigured( 'Invalid MongoDB username or password.') return db @cached_property def database(self): """Get database from MongoDB connection. performs authentication if necessary. """ return self._get_database() @cached_property def collection(self): """Get the meta-data task collection.""" collection = self.database[self.taskmeta_collection] # Ensure an index on date_done is there, if not process the index # in the background. Once completed cleanup will be much faster collection.create_index('date_done', background=True) return collection @cached_property def group_collection(self): """Get the meta-data task collection.""" collection = self.database[self.groupmeta_collection] # Ensure an index on date_done is there, if not process the index # in the background. Once completed cleanup will be much faster collection.create_index('date_done', background=True) return collection @cached_property def expires_delta(self): return timedelta(seconds=self.expires)
[docs] def as_uri(self, include_password=False): """Return the backend as an URI. Arguments: include_password (bool): Password censored if disabled. """ if not self.url: return 'mongodb://' if include_password: return self.url if ',' not in self.url: return maybe_sanitize_url(self.url) uri1, remainder = self.url.split(',', 1) return ','.join([maybe_sanitize_url(uri1), remainder])
Back to Top