summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-12-19 14:15:35 +0000
committerGerrit Code Review <review@openstack.org>2018-12-19 14:15:35 +0000
commitfdd65ed5e985f206e19342d97494f0868798d0e6 (patch)
tree9f8507510dccd066edccdeaff026a06d37c5d764
parent230aa8947716b20c5ea8ea6d4a5b30093dcbce26 (diff)
parent55f897c61330475b9fc63b9e2829d49e87a3b274 (diff)
Merge "Document the threadgroup module"
-rw-r--r--oslo_service/threadgroup.py206
1 files changed, 201 insertions, 5 deletions
diff --git a/oslo_service/threadgroup.py b/oslo_service/threadgroup.py
index f64c10f..e57efb5 100644
--- a/oslo_service/threadgroup.py
+++ b/oslo_service/threadgroup.py
@@ -52,32 +52,75 @@ class Thread(object):
52 return self._ident 52 return self._ident
53 53
54 def stop(self): 54 def stop(self):
55 """Kill the thread by raising GreenletExit within it."""
55 self.thread.kill() 56 self.thread.kill()
56 57
57 def wait(self): 58 def wait(self):
59 """Block until the thread completes and return the result."""
58 return self.thread.wait() 60 return self.thread.wait()
59 61
60 def link(self, func, *args, **kwargs): 62 def link(self, func, *args, **kwargs):
63 """Schedule a function to be run upon completion of the thread."""
61 self.thread.link(func, *args, **kwargs) 64 self.thread.link(func, *args, **kwargs)
62 65
63 def cancel(self, *throw_args): 66 def cancel(self, *throw_args):
67 """Prevent the thread from starting if it has not already done so.
68
69 :param throw_args: the `exc_info` data to raise from :func:`wait`.
70 """
64 self.thread.cancel(*throw_args) 71 self.thread.cancel(*throw_args)
65 72
66 73
67class ThreadGroup(object): 74class ThreadGroup(object):
68 """The point of the ThreadGroup class is to: 75 """A group of greenthreads and timers.
76
77 The point of the ThreadGroup class is to:
69 78
70 * keep track of timers and greenthreads (making it easier to stop them 79 * keep track of timers and greenthreads (making it easier to stop them
71 when need be). 80 when need be).
72 * provide an easy API to add timers. 81 * provide an easy API to add timers.
82
83 .. note::
84 The API is inconsistent, confusing, and not orthogonal. The same verbs
85 often mean different things when applied to timers and threads,
86 respectively. Read the documentation carefully.
73 """ 87 """
88
74 def __init__(self, thread_pool_size=10): 89 def __init__(self, thread_pool_size=10):
90 """Create a ThreadGroup with a pool of greenthreads.
91
92 :param thread_pool_size: the maximum number of threads allowed to run
93 concurrently.
94 """
75 self.pool = greenpool.GreenPool(thread_pool_size) 95 self.pool = greenpool.GreenPool(thread_pool_size)
76 self.threads = [] 96 self.threads = []
77 self.timers = [] 97 self.timers = []
78 98
79 def add_dynamic_timer(self, callback, initial_delay=None, 99 def add_dynamic_timer(self, callback, initial_delay=None,
80 periodic_interval_max=None, *args, **kwargs): 100 periodic_interval_max=None, *args, **kwargs):
101 """Add a timer that controls its own period dynamically.
102
103 The period of each iteration of the timer is controlled by the return
104 value of the callback function on the previous iteration.
105
106 .. warning::
107 Passing arguments to the callback function is deprecated. Use the
108 :func:`add_dynamic_timer_args` method to pass arguments for the
109 callback function.
110
111 :param callback: The callback function to run when the timer is
112 triggered.
113 :param initial_delay: The delay in seconds before first triggering the
114 timer. If not set, the timer is liable to be
115 scheduled immediately.
116 :param periodic_interval_max: The maximum interval in seconds to allow
117 the callback function to request. If
118 provided, this is also used as the
119 default delay if None is returned by the
120 callback function.
121 :returns: an :class:`oslo_service.loopingcall.DynamicLoopingCall`
122 instance
123 """
81 if args or kwargs: 124 if args or kwargs:
82 warnings.warn("Calling add_dynamic_timer() with arguments to the " 125 warnings.warn("Calling add_dynamic_timer() with arguments to the "
83 "callback function is deprecated. Use " 126 "callback function is deprecated. Use "
@@ -91,6 +134,29 @@ class ThreadGroup(object):
91 def add_dynamic_timer_args(self, callback, args=None, kwargs=None, 134 def add_dynamic_timer_args(self, callback, args=None, kwargs=None,
92 initial_delay=None, periodic_interval_max=None, 135 initial_delay=None, periodic_interval_max=None,
93 stop_on_exception=True): 136 stop_on_exception=True):
137 """Add a timer that controls its own period dynamically.
138
139 The period of each iteration of the timer is controlled by the return
140 value of the callback function on the previous iteration.
141
142 :param callback: The callback function to run when the timer is
143 triggered.
144 :param args: A list of positional args to the callback function.
145 :param kwargs: A dict of keyword args to the callback function.
146 :param initial_delay: The delay in seconds before first triggering the
147 timer. If not set, the timer is liable to be
148 scheduled immediately.
149 :param periodic_interval_max: The maximum interval in seconds to allow
150 the callback function to request. If
151 provided, this is also used as the
152 default delay if None is returned by the
153 callback function.
154 :param stop_on_exception: Pass ``False`` to have the timer continue
155 running even if the callback function raises
156 an exception.
157 :returns: an :class:`oslo_service.loopingcall.DynamicLoopingCall`
158 instance
159 """
94 args = args or [] 160 args = args or []
95 kwargs = kwargs or {} 161 kwargs = kwargs or {}
96 timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs) 162 timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
@@ -102,6 +168,23 @@ class ThreadGroup(object):
102 168
103 def add_timer(self, interval, callback, initial_delay=None, 169 def add_timer(self, interval, callback, initial_delay=None,
104 *args, **kwargs): 170 *args, **kwargs):
171 """Add a timer with a fixed period.
172
173 .. warning::
174 Passing arguments to the callback function is deprecated. Use the
175 :func:`add_timer_args` method to pass arguments for the callback
176 function.
177
178 :param interval: The minimum period in seconds between calls to the
179 callback function.
180 :param callback: The callback function to run when the timer is
181 triggered.
182 :param initial_delay: The delay in seconds before first triggering the
183 timer. If not set, the timer is liable to be
184 scheduled immediately.
185 :returns: an :class:`oslo_service.loopingcall.FixedIntervalLoopingCall`
186 instance
187 """
105 if args or kwargs: 188 if args or kwargs:
106 warnings.warn("Calling add_timer() with arguments to the callback " 189 warnings.warn("Calling add_timer() with arguments to the callback "
107 "function is deprecated. Use add_timer_args() " 190 "function is deprecated. Use add_timer_args() "
@@ -112,6 +195,23 @@ class ThreadGroup(object):
112 195
113 def add_timer_args(self, interval, callback, args=None, kwargs=None, 196 def add_timer_args(self, interval, callback, args=None, kwargs=None,
114 initial_delay=None, stop_on_exception=True): 197 initial_delay=None, stop_on_exception=True):
198 """Add a timer with a fixed period.
199
200 :param interval: The minimum period in seconds between calls to the
201 callback function.
202 :param callback: The callback function to run when the timer is
203 triggered.
204 :param args: A list of positional args to the callback function.
205 :param kwargs: A dict of keyword args to the callback function.
206 :param initial_delay: The delay in seconds before first triggering the
207 timer. If not set, the timer is liable to be
208 scheduled immediately.
209 :param stop_on_exception: Pass ``False`` to have the timer continue
210 running even if the callback function raises
211 an exception.
212 :returns: an :class:`oslo_service.loopingcall.FixedIntervalLoopingCall`
213 instance
214 """
115 args = args or [] 215 args = args or []
116 kwargs = kwargs or {} 216 kwargs = kwargs or {}
117 pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs) 217 pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
@@ -122,6 +222,17 @@ class ThreadGroup(object):
122 return pulse 222 return pulse
123 223
124 def add_thread(self, callback, *args, **kwargs): 224 def add_thread(self, callback, *args, **kwargs):
225 """Spawn a new thread.
226
227 This call will block until capacity is available in the thread pool.
228 After that, it returns immediately (i.e. *before* the new thread is
229 scheduled).
230
231 :param callback: the function to run in the new thread.
232 :param args: positional arguments to the callback function.
233 :param kwargs: keyword arguments to the callback function.
234 :returns: a :class:`Thread` object
235 """
125 gt = self.pool.spawn(callback, *args, **kwargs) 236 gt = self.pool.spawn(callback, *args, **kwargs)
126 th = Thread(gt, self, link=False) 237 th = Thread(gt, self, link=False)
127 self.threads.append(th) 238 self.threads.append(th)
@@ -129,9 +240,19 @@ class ThreadGroup(object):
129 return th 240 return th
130 241
131 def thread_done(self, thread): 242 def thread_done(self, thread):
243 """Remove a completed thread from the group.
244
245 This method is automatically called on completion of a thread in the
246 group, and should not be called explicitly.
247 """
132 self.threads.remove(thread) 248 self.threads.remove(thread)
133 249
134 def timer_done(self, timer): 250 def timer_done(self, timer):
251 """Remove a timer from the group.
252
253 :param timer: The timer object returned from :func:`add_timer` or its
254 analogues.
255 """
135 self.timers.remove(timer) 256 self.timers.remove(timer)
136 257
137 def _perform_action_on_threads(self, action_func, on_error_func): 258 def _perform_action_on_threads(self, action_func, on_error_func):
@@ -156,6 +277,18 @@ class ThreadGroup(object):
156 lambda x: LOG.exception('Error stopping thread.')) 277 lambda x: LOG.exception('Error stopping thread.'))
157 278
158 def stop_timers(self, wait=False): 279 def stop_timers(self, wait=False):
280 """Stop all timers in the group and remove them from the group.
281
282 No new invocations of timers will be triggered after they are stopped,
283 but calls that are in progress will not be interrupted.
284
285 To wait for in-progress calls to complete, pass ``wait=True`` - calling
286 :func:`wait` will not have the desired effect as the timers will have
287 already been removed from the group.
288
289 :param wait: If true, block until all timers have been stopped before
290 returning.
291 """
159 for timer in self.timers: 292 for timer in self.timers:
160 timer.stop() 293 timer.stop()
161 if wait: 294 if wait:
@@ -163,11 +296,25 @@ class ThreadGroup(object):
163 self.timers = [] 296 self.timers = []
164 297
165 def stop(self, graceful=False): 298 def stop(self, graceful=False):
166 """stop function has the option of graceful=True/False. 299 """Stop all timers and threads in the group.
300
301 No new invocations of timers will be triggered after they are stopped,
302 but calls that are in progress will not be interrupted.
303
304 If ``graceful`` is false, kill all threads immediately by raising
305 GreenletExit. Note that in this case, this method will **not** block
306 until all threads and running timer callbacks have actually exited. To
307 guarantee that all threads have exited, call :func:`wait`.
167 308
168 * In case of graceful=True, wait for all threads to be finished. 309 If ``graceful`` is true, do not kill threads. Block until all threads
169 Never kill threads. 310 and running timer callbacks have completed. This is equivalent to
170 * In case of graceful=False, kill threads immediately. 311 calling :func:`stop_timers` with ``wait=True`` followed by
312 :func:`wait`.
313
314 :param graceful: If true, block until all timers have stopped and all
315 threads completed; never kill threads. Otherwise,
316 kill threads immediately and return immediately even
317 if there are timer callbacks still running.
171 """ 318 """
172 self.stop_timers(wait=graceful) 319 self.stop_timers(wait=graceful)
173 if graceful: 320 if graceful:
@@ -195,6 +342,25 @@ class ThreadGroup(object):
195 lambda x: LOG.exception('Error waiting on thread.')) 342 lambda x: LOG.exception('Error waiting on thread.'))
196 343
197 def wait(self): 344 def wait(self):
345 """Block until all timers and threads in the group are complete.
346
347 .. note::
348 Before calling this method, any timers should be stopped first by
349 calling :func:`stop_timers`, :func:`stop`, or :func:`cancel` with a
350 ``timeout`` argument. Otherwise this will block forever.
351
352 .. note::
353 Calling :func:`stop_timers` removes the timers from the group, so a
354 subsequent call to this method will not wait for any in-progress
355 timer calls to complete.
356
357 Any exceptions raised by the threads will be logged but suppressed.
358
359 .. note::
360 This call guarantees only that the threads themselves have
361 completed, **not** that any cleanup functions added via
362 :func:`Thread.link` have completed.
363 """
198 self._wait_timers() 364 self._wait_timers()
199 self._wait_threads() 365 self._wait_threads()
200 366
@@ -209,6 +375,36 @@ class ThreadGroup(object):
209 return False 375 return False
210 376
211 def cancel(self, *throw_args, **kwargs): 377 def cancel(self, *throw_args, **kwargs):
378 """Cancel unstarted threads in the group, and optionally stop the rest.
379
380 If called without the ``timeout`` argument, this does **not** stop any
381 running threads, but prevents any threads in the group that have not
382 yet started from running, then returns immediately. Timers are not
383 affected.
384
385 If the 'timeout' argument is supplied, then it serves as a grace period
386 to allow running threads to finish. After the timeout, any threads in
387 the group that are still running will be killed by raising GreenletExit
388 in them, and all timers will be stopped (so that they are not
389 retriggered - timer calls that are in progress will not be
390 interrupted). This method will **not** block until all threads have
391 actually exited, nor that all in-progress timer calls have completed.
392 To guarantee that all threads have exited, call :func:`wait`. If all
393 threads complete before the timeout expires, timers will be left
394 running; there is no way to then stop those timers, so for consistent
395 behaviour :func`stop_timers` should be called before calling this
396 method.
397
398 :param throw_args: the `exc_info` data to raise from
399 :func:`Thread.wait` for any of the unstarted
400 threads. (Though note that :func:`ThreadGroup.wait`
401 suppresses exceptions.)
402 :param timeout: time to wait for running threads to complete before
403 calling stop(). If not supplied, threads that are
404 already running continue to completion.
405 :param wait_time: length of time in seconds to sleep between checks of
406 whether any threads are still alive. (Default 1s.)
407 """
212 self._perform_action_on_threads( 408 self._perform_action_on_threads(
213 lambda x: x.cancel(*throw_args), 409 lambda x: x.cancel(*throw_args),
214 lambda x: LOG.exception('Error canceling thread.')) 410 lambda x: LOG.exception('Error canceling thread.'))