1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
# -*- coding: utf-8 -*-
"""
celery.concurrency.base
~~~~~~~~~~~~~~~~~~~~~~~
TaskPool interface.
"""
from __future__ import absolute_import
import logging
import os
import time
from kombu.utils.encoding import safe_repr
from celery.utils import timer2
from celery.utils.log import get_logger
logger = get_logger('celery.concurrency')
def apply_target(target, args=(), kwargs={}, callback=None,
accept_callback=None, pid=None, **_):
if accept_callback:
accept_callback(pid or os.getpid(), time.time())
callback(target(*args, **kwargs))
class BasePool(object):
RUN = 0x1
CLOSE = 0x2
TERMINATE = 0x3
Timer = timer2.Timer
#: set to true if the pool can be shutdown from within
#: a signal handler.
signal_safe = True
#: set to true if pool supports rate limits.
#: (this is here for gevent, which currently does not implement
#: the necessary timers).
rlimit_safe = True
#: set to true if pool requires the use of a mediator
#: thread (e.g. if applying new items can block the current thread).
requires_mediator = False
#: set to true if pool uses greenlets.
is_green = False
_state = None
_pool = None
#: only used by multiprocessing pool
uses_semaphore = False
def __init__(self, limit=None, putlocks=True,
forking_enable=True, callbacks_propagate=(), **options):
self.limit = limit
self.putlocks = putlocks
self.options = options
self.forking_enable = forking_enable
self.callbacks_propagate = callbacks_propagate
self._does_debug = logger.isEnabledFor(logging.DEBUG)
def on_start(self):
pass
def did_start_ok(self):
return True
def on_stop(self):
pass
def on_apply(self, *args, **kwargs):
pass
def on_terminate(self):
pass
def on_soft_timeout(self, job):
pass
def on_hard_timeout(self, job):
pass
def maybe_handle_result(self, *args):
pass
def maintain_pool(self, *args, **kwargs):
pass
def terminate_job(self, pid):
raise NotImplementedError(
'%s does not implement kill_job' % (self.__class__, ))
def restart(self):
raise NotImplementedError(
'%s does not implement restart' % (self.__class__, ))
def stop(self):
self.on_stop()
self._state = self.TERMINATE
def terminate(self):
self._state = self.TERMINATE
self.on_terminate()
def start(self):
self.on_start()
self._state = self.RUN
def close(self):
self._state = self.CLOSE
self.on_close()
def on_close(self):
pass
def init_callbacks(self, **kwargs):
pass
def apply_async(self, target, args=[], kwargs={}, **options):
"""Equivalent of the :func:`apply` built-in function.
Callbacks should optimally return as soon as possible since
otherwise the thread which handles the result will get blocked.
"""
if self._does_debug:
logger.debug('TaskPool: Apply %s (args:%s kwargs:%s)',
target, safe_repr(args), safe_repr(kwargs))
return self.on_apply(target, args, kwargs,
waitforslot=self.putlocks,
callbacks_propagate=self.callbacks_propagate,
**options)
def _get_info(self):
return {}
@property
def info(self):
return self._get_info()
@property
def active(self):
return self._state == self.RUN
@property
def num_processes(self):
return self.limit
@property
def readers(self):
return {}
@property
def writers(self):
return {}
@property
def timers(self):
return {}
|