summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-06-20 17:09:29 +0000
committerGerrit Code Review <review@openstack.org>2015-06-20 17:09:29 +0000
commitf2c522e6e383c26540ac925ceb8175e0585c2d8d (patch)
tree4637192e3f498421f545e8a74abb6057b8c655d4
parentfe30446dd1795e13d9f8b37c2b385945e8a7f574 (diff)
parent826ce57ab93ce821f79dc154313e9d2bd489f449 (diff)
Merge "Remove unused files from oslo-incubator"
-rw-r--r--manila/openstack/common/service.py509
-rw-r--r--manila/openstack/common/systemd.py105
-rw-r--r--manila/openstack/common/threadgroup.py149
-rw-r--r--openstack-common.conf3
4 files changed, 0 insertions, 766 deletions
diff --git a/manila/openstack/common/service.py b/manila/openstack/common/service.py
deleted file mode 100644
index 6257abf..0000000
--- a/manila/openstack/common/service.py
+++ /dev/null
@@ -1,509 +0,0 @@
1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# Copyright 2011 Justin Santa Barbara
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
18"""Generic Node base class for all workers that run on hosts."""
19
20import errno
21import logging
22import os
23import random
24import signal
25import sys
26import time
27
28try:
29 # Importing just the symbol here because the io module does not
30 # exist in Python 2.6.
31 from io import UnsupportedOperation # noqa
32except ImportError:
33 # Python 2.6
34 UnsupportedOperation = None
35
36import eventlet
37from eventlet import event
38from oslo_config import cfg
39
40from manila.openstack.common import eventlet_backdoor
41from manila.openstack.common._i18n import _LE, _LI, _LW
42from manila.openstack.common import systemd
43from manila.openstack.common import threadgroup
44
45
46CONF = cfg.CONF
47LOG = logging.getLogger(__name__)
48
49
50def _sighup_supported():
51 return hasattr(signal, 'SIGHUP')
52
53
54def _is_daemon():
55 # The process group for a foreground process will match the
56 # process group of the controlling terminal. If those values do
57 # not match, or ioctl() fails on the stdout file handle, we assume
58 # the process is running in the background as a daemon.
59 # http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
60 try:
61 is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
62 except OSError as err:
63 if err.errno == errno.ENOTTY:
64 # Assume we are a daemon because there is no terminal.
65 is_daemon = True
66 else:
67 raise
68 except UnsupportedOperation:
69 # Could not get the fileno for stdout, so we must be a daemon.
70 is_daemon = True
71 return is_daemon
72
73
74def _is_sighup_and_daemon(signo):
75 if not (_sighup_supported() and signo == signal.SIGHUP):
76 # Avoid checking if we are a daemon, because the signal isn't
77 # SIGHUP.
78 return False
79 return _is_daemon()
80
81
82def _signo_to_signame(signo):
83 signals = {signal.SIGTERM: 'SIGTERM',
84 signal.SIGINT: 'SIGINT'}
85 if _sighup_supported():
86 signals[signal.SIGHUP] = 'SIGHUP'
87 return signals[signo]
88
89
90def _set_signals_handler(handler):
91 signal.signal(signal.SIGTERM, handler)
92 signal.signal(signal.SIGINT, handler)
93 if _sighup_supported():
94 signal.signal(signal.SIGHUP, handler)
95
96
97class Launcher(object):
98 """Launch one or more services and wait for them to complete."""
99
100 def __init__(self):
101 """Initialize the service launcher.
102
103 :returns: None
104
105 """
106 self.services = Services()
107 self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
108
109 def launch_service(self, service):
110 """Load and start the given service.
111
112 :param service: The service you would like to start.
113 :returns: None
114
115 """
116 service.backdoor_port = self.backdoor_port
117 self.services.add(service)
118
119 def stop(self):
120 """Stop all services which are currently running.
121
122 :returns: None
123
124 """
125 self.services.stop()
126
127 def wait(self):
128 """Waits until all services have been stopped, and then returns.
129
130 :returns: None
131
132 """
133 self.services.wait()
134
135 def restart(self):
136 """Reload config files and restart service.
137
138 :returns: None
139
140 """
141 cfg.CONF.reload_config_files()
142 self.services.restart()
143
144
145class SignalExit(SystemExit):
146 def __init__(self, signo, exccode=1):
147 super(SignalExit, self).__init__(exccode)
148 self.signo = signo
149
150
151class ServiceLauncher(Launcher):
152 def _handle_signal(self, signo, frame):
153 # Allow the process to be killed again and die from natural causes
154 _set_signals_handler(signal.SIG_DFL)
155 raise SignalExit(signo)
156
157 def handle_signal(self):
158 _set_signals_handler(self._handle_signal)
159
160 def _wait_for_exit_or_signal(self, ready_callback=None):
161 status = None
162 signo = 0
163
164 LOG.debug('Full set of CONF:')
165 CONF.log_opt_values(LOG, logging.DEBUG)
166
167 try:
168 if ready_callback:
169 ready_callback()
170 super(ServiceLauncher, self).wait()
171 except SignalExit as exc:
172 signame = _signo_to_signame(exc.signo)
173 LOG.info(_LI('Caught %s, exiting'), signame)
174 status = exc.code
175 signo = exc.signo
176 except SystemExit as exc:
177 status = exc.code
178 finally:
179 self.stop()
180
181 return status, signo
182
183 def wait(self, ready_callback=None):
184 systemd.notify_once()
185 while True:
186 self.handle_signal()
187 status, signo = self._wait_for_exit_or_signal(ready_callback)
188 if not _is_sighup_and_daemon(signo):
189 return status
190 self.restart()
191
192
193class ServiceWrapper(object):
194 def __init__(self, service, workers):
195 self.service = service
196 self.workers = workers
197 self.children = set()
198 self.forktimes = []
199
200
201class ProcessLauncher(object):
202 _signal_handlers_set = set()
203
204 @classmethod
205 def _handle_class_signals(cls, *args, **kwargs):
206 for handler in cls._signal_handlers_set:
207 handler(*args, **kwargs)
208
209 def __init__(self):
210 """Constructor."""
211
212 self.children = {}
213 self.sigcaught = None
214 self.running = True
215 rfd, self.writepipe = os.pipe()
216 self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
217 self.handle_signal()
218
219 def handle_signal(self):
220 self._signal_handlers_set.add(self._handle_signal)
221 _set_signals_handler(self._handle_class_signals)
222
223 def _handle_signal(self, signo, frame):
224 self.sigcaught = signo
225 self.running = False
226
227 # Allow the process to be killed again and die from natural causes
228 _set_signals_handler(signal.SIG_DFL)
229
230 def _pipe_watcher(self):
231 # This will block until the write end is closed when the parent
232 # dies unexpectedly
233 self.readpipe.read()
234
235 LOG.info(_LI('Parent process has died unexpectedly, exiting'))
236
237 sys.exit(1)
238
239 def _child_process_handle_signal(self):
240 # Setup child signal handlers differently
241 def _sigterm(*args):
242 signal.signal(signal.SIGTERM, signal.SIG_DFL)
243 raise SignalExit(signal.SIGTERM)
244
245 def _sighup(*args):
246 signal.signal(signal.SIGHUP, signal.SIG_DFL)
247 raise SignalExit(signal.SIGHUP)
248
249 signal.signal(signal.SIGTERM, _sigterm)
250 if _sighup_supported():
251 signal.signal(signal.SIGHUP, _sighup)
252 # Block SIGINT and let the parent send us a SIGTERM
253 signal.signal(signal.SIGINT, signal.SIG_IGN)
254
255 def _child_wait_for_exit_or_signal(self, launcher):
256 status = 0
257 signo = 0
258
259 # NOTE(johannes): All exceptions are caught to ensure this
260 # doesn't fallback into the loop spawning children. It would
261 # be bad for a child to spawn more children.
262 try:
263 launcher.wait()
264 except SignalExit as exc:
265 signame = _signo_to_signame(exc.signo)
266 LOG.info(_LI('Child caught %s, exiting'), signame)
267 status = exc.code
268 signo = exc.signo
269 except SystemExit as exc:
270 status = exc.code
271 except BaseException:
272 LOG.exception(_LE('Unhandled exception'))
273 status = 2
274 finally:
275 launcher.stop()
276
277 return status, signo
278
279 def _child_process(self, service):
280 self._child_process_handle_signal()
281
282 # Reopen the eventlet hub to make sure we don't share an epoll
283 # fd with parent and/or siblings, which would be bad
284 eventlet.hubs.use_hub()
285
286 # Close write to ensure only parent has it open
287 os.close(self.writepipe)
288 # Create greenthread to watch for parent to close pipe
289 eventlet.spawn_n(self._pipe_watcher)
290
291 # Reseed random number generator
292 random.seed()
293
294 launcher = Launcher()
295 launcher.launch_service(service)
296 return launcher
297
298 def _start_child(self, wrap):
299 if len(wrap.forktimes) > wrap.workers:
300 # Limit ourselves to one process a second (over the period of
301 # number of workers * 1 second). This will allow workers to
302 # start up quickly but ensure we don't fork off children that
303 # die instantly too quickly.
304 if time.time() - wrap.forktimes[0] < wrap.workers:
305 LOG.info(_LI('Forking too fast, sleeping'))
306 time.sleep(1)
307
308 wrap.forktimes.pop(0)
309
310 wrap.forktimes.append(time.time())
311
312 pid = os.fork()
313 if pid == 0:
314 launcher = self._child_process(wrap.service)
315 while True:
316 self._child_process_handle_signal()
317 status, signo = self._child_wait_for_exit_or_signal(launcher)
318 if not _is_sighup_and_daemon(signo):
319 break
320 launcher.restart()
321
322 os._exit(status)
323
324 LOG.info(_LI('Started child %d'), pid)
325
326 wrap.children.add(pid)
327 self.children[pid] = wrap
328
329 return pid
330
331 def launch_service(self, service, workers=1):
332 wrap = ServiceWrapper(service, workers)
333
334 LOG.info(_LI('Starting %d workers'), wrap.workers)
335 while self.running and len(wrap.children) < wrap.workers:
336 self._start_child(wrap)
337
338 def _wait_child(self):
339 try:
340 # Block while any of child processes have exited
341 pid, status = os.waitpid(0, 0)
342 if not pid:
343 return None
344 except OSError as exc:
345 if exc.errno not in (errno.EINTR, errno.ECHILD):
346 raise
347 return None
348
349 if os.WIFSIGNALED(status):
350 sig = os.WTERMSIG(status)
351 LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'),
352 dict(pid=pid, sig=sig))
353 else:
354 code = os.WEXITSTATUS(status)
355 LOG.info(_LI('Child %(pid)s exited with status %(code)d'),
356 dict(pid=pid, code=code))
357
358 if pid not in self.children:
359 LOG.warning(_LW('pid %d not in child list'), pid)
360 return None
361
362 wrap = self.children.pop(pid)
363 wrap.children.remove(pid)
364 return wrap
365
366 def _respawn_children(self):
367 while self.running:
368 wrap = self._wait_child()
369 if not wrap:
370 continue
371 while self.running and len(wrap.children) < wrap.workers:
372 self._start_child(wrap)
373
374 def wait(self):
375 """Loop waiting on children to die and respawning as necessary."""
376
377 systemd.notify_once()
378 LOG.debug('Full set of CONF:')
379 CONF.log_opt_values(LOG, logging.DEBUG)
380
381 try:
382 while True:
383 self.handle_signal()
384 self._respawn_children()
385 # No signal means that stop was called. Don't clean up here.
386 if not self.sigcaught:
387 return
388
389 signame = _signo_to_signame(self.sigcaught)
390 LOG.info(_LI('Caught %s, stopping children'), signame)
391 if not _is_sighup_and_daemon(self.sigcaught):
392 break
393
394 cfg.CONF.reload_config_files()
395 for service in set(
396 [wrap.service for wrap in self.children.values()]):
397 service.reset()
398
399 for pid in self.children:
400 os.kill(pid, signal.SIGHUP)
401
402 self.running = True
403 self.sigcaught = None
404 except eventlet.greenlet.GreenletExit:
405 LOG.info(_LI("Wait called after thread killed. Cleaning up."))
406
407 self.stop()
408
409 def stop(self):
410 """Terminate child processes and wait on each."""
411 self.running = False
412 for pid in self.children:
413 try:
414 os.kill(pid, signal.SIGTERM)
415 except OSError as exc:
416 if exc.errno != errno.ESRCH:
417 raise
418
419 # Wait for children to die
420 if self.children:
421 LOG.info(_LI('Waiting on %d children to exit'), len(self.children))
422 while self.children:
423 self._wait_child()
424
425
426class Service(object):
427 """Service object for binaries running on hosts."""
428
429 def __init__(self, threads=1000):
430 self.tg = threadgroup.ThreadGroup(threads)
431
432 # signal that the service is done shutting itself down:
433 self._done = event.Event()
434
435 def reset(self):
436 # NOTE(Fengqian): docs for Event.reset() recommend against using it
437 self._done = event.Event()
438
439 def start(self):
440 pass
441
442 def stop(self, graceful=False):
443 self.tg.stop(graceful)
444 self.tg.wait()
445 # Signal that service cleanup is done:
446 if not self._done.ready():
447 self._done.send()
448
449 def wait(self):
450 self._done.wait()
451
452
453class Services(object):
454
455 def __init__(self):
456 self.services = []
457 self.tg = threadgroup.ThreadGroup()
458 self.done = event.Event()
459
460 def add(self, service):
461 self.services.append(service)
462 self.tg.add_thread(self.run_service, service, self.done)
463
464 def stop(self):
465 # wait for graceful shutdown of services:
466 for service in self.services:
467 service.stop()
468 service.wait()
469
470 # Each service has performed cleanup, now signal that the run_service
471 # wrapper threads can now die:
472 if not self.done.ready():
473 self.done.send()
474
475 # reap threads:
476 self.tg.stop()
477
478 def wait(self):
479 self.tg.wait()
480
481 def restart(self):
482 self.stop()
483 self.done = event.Event()
484 for restart_service in self.services:
485 restart_service.reset()
486 self.tg.add_thread(self.run_service, restart_service, self.done)
487
488 @staticmethod
489 def run_service(service, done):
490 """Service start wrapper.
491
492 :param service: service to run
493 :param done: event to wait on until a shutdown is triggered
494 :returns: None
495
496 """
497 service.start()
498 done.wait()
499
500
501def launch(service, workers=1):
502 if workers is None or workers == 1:
503 launcher = ServiceLauncher()
504 launcher.launch_service(service)
505 else:
506 launcher = ProcessLauncher()
507 launcher.launch_service(service, workers=workers)
508
509 return launcher
diff --git a/manila/openstack/common/systemd.py b/manila/openstack/common/systemd.py
deleted file mode 100644
index 36243b3..0000000
--- a/manila/openstack/common/systemd.py
+++ /dev/null
@@ -1,105 +0,0 @@
1# Copyright 2012-2014 Red Hat, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15"""
16Helper module for systemd service readiness notification.
17"""
18
19import logging
20import os
21import socket
22import sys
23
24
25LOG = logging.getLogger(__name__)
26
27
28def _abstractify(socket_name):
29 if socket_name.startswith('@'):
30 # abstract namespace socket
31 socket_name = '\0%s' % socket_name[1:]
32 return socket_name
33
34
35def _sd_notify(unset_env, msg):
36 notify_socket = os.getenv('NOTIFY_SOCKET')
37 if notify_socket:
38 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
39 try:
40 sock.connect(_abstractify(notify_socket))
41 sock.sendall(msg)
42 if unset_env:
43 del os.environ['NOTIFY_SOCKET']
44 except EnvironmentError:
45 LOG.debug("Systemd notification failed", exc_info=True)
46 finally:
47 sock.close()
48
49
50def notify():
51 """Send notification to Systemd that service is ready.
52
53 For details see
54 http://www.freedesktop.org/software/systemd/man/sd_notify.html
55 """
56 _sd_notify(False, 'READY=1')
57
58
59def notify_once():
60 """Send notification once to Systemd that service is ready.
61
62 Systemd sets NOTIFY_SOCKET environment variable with the name of the
63 socket listening for notifications from services.
64 This method removes the NOTIFY_SOCKET environment variable to ensure
65 notification is sent only once.
66 """
67 _sd_notify(True, 'READY=1')
68
69
70def onready(notify_socket, timeout):
71 """Wait for systemd style notification on the socket.
72
73 :param notify_socket: local socket address
74 :type notify_socket: string
75 :param timeout: socket timeout
76 :type timeout: float
77 :returns: 0 service ready
78 1 service not ready
79 2 timeout occurred
80 """
81 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
82 sock.settimeout(timeout)
83 sock.bind(_abstractify(notify_socket))
84 try:
85 msg = sock.recv(512)
86 except socket.timeout:
87 return 2
88 finally:
89 sock.close()
90 if 'READY=1' in msg:
91 return 0
92 else:
93 return 1
94
95
96if __name__ == '__main__':
97 # simple CLI for testing
98 if len(sys.argv) == 1:
99 notify()
100 elif len(sys.argv) >= 2:
101 timeout = float(sys.argv[1])
102 notify_socket = os.getenv('NOTIFY_SOCKET')
103 if notify_socket:
104 retval = onready(notify_socket, timeout)
105 sys.exit(retval)
diff --git a/manila/openstack/common/threadgroup.py b/manila/openstack/common/threadgroup.py
deleted file mode 100644
index 05e95d8..0000000
--- a/manila/openstack/common/threadgroup.py
+++ /dev/null
@@ -1,149 +0,0 @@
1# Copyright 2012 Red Hat, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14import logging
15import threading
16
17import eventlet
18from eventlet import greenpool
19
20from manila.openstack.common import loopingcall
21
22
23LOG = logging.getLogger(__name__)
24
25
26def _thread_done(gt, *args, **kwargs):
27 """Callback function to be passed to GreenThread.link() when we spawn()
28 Calls the :class:`ThreadGroup` to notify if.
29
30 """
31 kwargs['group'].thread_done(kwargs['thread'])
32
33
34class Thread(object):
35 """Wrapper around a greenthread, that holds a reference to the
36 :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
37 it has done so it can be removed from the threads list.
38 """
39 def __init__(self, thread, group):
40 self.thread = thread
41 self.thread.link(_thread_done, group=group, thread=self)
42
43 def stop(self):
44 self.thread.kill()
45
46 def wait(self):
47 return self.thread.wait()
48
49 def link(self, func, *args, **kwargs):
50 self.thread.link(func, *args, **kwargs)
51
52
53class ThreadGroup(object):
54 """The point of the ThreadGroup class is to:
55
56 * keep track of timers and greenthreads (making it easier to stop them
57 when need be).
58 * provide an easy API to add timers.
59 """
60 def __init__(self, thread_pool_size=10):
61 self.pool = greenpool.GreenPool(thread_pool_size)
62 self.threads = []
63 self.timers = []
64
65 def add_dynamic_timer(self, callback, initial_delay=None,
66 periodic_interval_max=None, *args, **kwargs):
67 timer = loopingcall.DynamicLoopingCall(callback, *args, **kwargs)
68 timer.start(initial_delay=initial_delay,
69 periodic_interval_max=periodic_interval_max)
70 self.timers.append(timer)
71
72 def add_timer(self, interval, callback, initial_delay=None,
73 *args, **kwargs):
74 pulse = loopingcall.FixedIntervalLoopingCall(callback, *args, **kwargs)
75 pulse.start(interval=interval,
76 initial_delay=initial_delay)
77 self.timers.append(pulse)
78
79 def add_thread(self, callback, *args, **kwargs):
80 gt = self.pool.spawn(callback, *args, **kwargs)
81 th = Thread(gt, self)
82 self.threads.append(th)
83 return th
84
85 def thread_done(self, thread):
86 self.threads.remove(thread)
87
88 def _stop_threads(self):
89 current = threading.current_thread()
90
91 # Iterate over a copy of self.threads so thread_done doesn't
92 # modify the list while we're iterating
93 for x in self.threads[:]:
94 if x is current:
95 # don't kill the current thread.
96 continue
97 try:
98 x.stop()
99 except eventlet.greenlet.GreenletExit:
100 pass
101 except Exception as ex:
102 LOG.exception(ex)
103
104 def stop_timers(self):
105 for x in self.timers:
106 try:
107 x.stop()
108 except Exception as ex:
109 LOG.exception(ex)
110 self.timers = []
111
112 def stop(self, graceful=False):
113 """stop function has the option of graceful=True/False.
114
115 * In case of graceful=True, wait for all threads to be finished.
116 Never kill threads.
117 * In case of graceful=False, kill threads immediately.
118 """
119 self.stop_timers()
120 if graceful:
121 # In case of graceful=True, wait for all threads to be
122 # finished, never kill threads
123 self.wait()
124 else:
125 # In case of graceful=False(Default), kill threads
126 # immediately
127 self._stop_threads()
128
129 def wait(self):
130 for x in self.timers:
131 try:
132 x.wait()
133 except eventlet.greenlet.GreenletExit:
134 pass
135 except Exception as ex:
136 LOG.exception(ex)
137 current = threading.current_thread()
138
139 # Iterate over a copy of self.threads so thread_done doesn't
140 # modify the list while we're iterating
141 for x in self.threads[:]:
142 if x is current:
143 continue
144 try:
145 x.wait()
146 except eventlet.greenlet.GreenletExit:
147 pass
148 except Exception as ex:
149 LOG.exception(ex)
diff --git a/openstack-common.conf b/openstack-common.conf
index 31a2e7d..5f6f042 100644
--- a/openstack-common.conf
+++ b/openstack-common.conf
@@ -6,9 +6,6 @@ module=loopingcall
6module=scheduler 6module=scheduler
7module=scheduler.filters 7module=scheduler.filters
8module=scheduler.weights 8module=scheduler.weights
9module=service
10module=systemd
11module=threadgroup
12 9
13# The list of scripts to copy from oslo common code 10# The list of scripts to copy from oslo common code
14script = tools/colorizer.py 11script = tools/colorizer.py