Source code for celery.concurrency.thread
"""Thread execution pool."""
from concurrent.futures import ThreadPoolExecutor, wait
from .base import BasePool, apply_target
__all__ = ('TaskPool',)
class ApplyResult:
def __init__(self, future):
self.f = future
self.get = self.f.result
def wait(self, timeout=None):
wait([self.f], timeout)
[docs]class TaskPool(BasePool):
"""Thread Task Pool."""
body_can_be_buffer = True
signal_safe = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=self.limit)
[docs] def on_stop(self):
self.executor.shutdown()
super().on_stop()
[docs] def on_apply(self, target, args=None, kwargs=None, callback=None,
accept_callback=None, **_):
f = self.executor.submit(apply_target, target, args, kwargs,
callback, accept_callback)
return ApplyResult(f)
def _get_info(self):
return {
'max-concurrency': self.limit,
'threads': len(self.executor._threads)
# TODO use a public api to retrieve the current number of threads
# in the executor when available. (Currently not available).
}