Source code for celery.backends.couchbase
"""Couchbase result store backend."""
from kombu.utils.url import _parse_url
from celery.exceptions import ImproperlyConfigured
from .base import KeyValueStoreBackend
try:
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster, ClusterOptions
from couchbase_core._libcouchbase import FMT_AUTO
except ImportError:
Cluster = PasswordAuthenticator = ClusterOptions = None
__all__ = ('CouchbaseBackend',)
[docs]class CouchbaseBackend(KeyValueStoreBackend):
"""Couchbase backend.
Raises:
celery.exceptions.ImproperlyConfigured:
if module :pypi:`couchbase` is not available.
"""
bucket = 'default'
host = 'localhost'
port = 8091
username = None
password = None
quiet = False
supports_autoexpire = True
timeout = 2.5
# Use str as couchbase key not bytes
key_t = str
def __init__(self, url=None, *args, **kwargs):
kwargs.setdefault('expires_type', int)
super().__init__(*args, **kwargs)
self.url = url
if Cluster is None:
raise ImproperlyConfigured(
'You need to install the couchbase library to use the '
'Couchbase backend.',
)
uhost = uport = uname = upass = ubucket = None
if url:
_, uhost, uport, uname, upass, ubucket, _ = _parse_url(url)
ubucket = ubucket.strip('/') if ubucket else None
config = self.app.conf.get('couchbase_backend_settings', None)
if config is not None:
if not isinstance(config, dict):
raise ImproperlyConfigured(
'Couchbase backend settings should be grouped in a dict',
)
else:
config = {}
self.host = uhost or config.get('host', self.host)
self.port = int(uport or config.get('port', self.port))
self.bucket = ubucket or config.get('bucket', self.bucket)
self.username = uname or config.get('username', self.username)
self.password = upass or config.get('password', self.password)
self._connection = None
def _get_connection(self):
"""Connect to the Couchbase server."""
if self._connection is None:
if self.host and self.port:
uri = f"couchbase://{self.host}:{self.port}"
else:
uri = f"couchbase://{self.host}"
if self.username and self.password:
opt = PasswordAuthenticator(self.username, self.password)
else:
opt = None
cluster = Cluster(uri, opt)
bucket = cluster.bucket(self.bucket)
self._connection = bucket.default_collection()
return self._connection
@property
def connection(self):
return self._get_connection()
[docs] def get(self, key):
return self.connection.get(key).content
[docs] def set(self, key, value):
self.connection.upsert(key, value, ttl=self.expires, format=FMT_AUTO)
[docs] def mget(self, keys):
return self.connection.get_multi(keys)
[docs] def delete(self, key):
self.connection.remove(key)