summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-12-12 15:35:40 +0000
committerGerrit Code Review <review@openstack.org>2018-12-12 15:35:40 +0000
commit7b31740c09b89de34eb7f964c082f84794819923 (patch)
tree3e7a7ba443bb96647b6a42933d6b32617ff2e83f
parent32da499c04a5b70a67583ec000d36ab28f17eeb5 (diff)
parentf368430f13183c3d09d68c80184d854449da2e87 (diff)
Merge "Use threads to process target function"
-rw-r--r--oslo_privsep/daemon.py93
-rw-r--r--oslo_privsep/priv_context.py7
-rw-r--r--oslo_privsep/tests/test_daemon.py1
-rw-r--r--releasenotes/notes/add_thread_pool_size-a54e6f27ab019f96.yaml7
4 files changed, 85 insertions, 23 deletions
diff --git a/oslo_privsep/daemon.py b/oslo_privsep/daemon.py
index 38e7cb9..94b9a7c 100644
--- a/oslo_privsep/daemon.py
+++ b/oslo_privsep/daemon.py
@@ -43,6 +43,7 @@ The privsep daemon exits when the communication channel is closed,
43 43
44''' 44'''
45 45
46from concurrent import futures
46import enum 47import enum
47import errno 48import errno
48import io 49import io
@@ -64,6 +65,7 @@ import eventlet
64from oslo_config import cfg 65from oslo_config import cfg
65from oslo_log import log as logging 66from oslo_log import log as logging
66from oslo_utils import importutils 67from oslo_utils import importutils
68import six
67 69
68from oslo_privsep._i18n import _ 70from oslo_privsep._i18n import _
69from oslo_privsep import capabilities 71from oslo_privsep import capabilities
@@ -352,6 +354,9 @@ class Daemon(object):
352 self.user = context.conf.user 354 self.user = context.conf.user
353 self.group = context.conf.group 355 self.group = context.conf.group
354 self.caps = set(context.conf.capabilities) 356 self.caps = set(context.conf.capabilities)
357 self.thread_pool = futures.ThreadPoolExecutor(
358 context.conf.thread_pool_size)
359 self.communication_error = None
355 360
356 def run(self): 361 def run(self):
357 """Run request loop. Sets up environment, then calls loop()""" 362 """Run request loop. Sets up environment, then calls loop()"""
@@ -413,22 +418,75 @@ class Daemon(object):
413 'inh': fmt_caps(inh), 418 'inh': fmt_caps(inh),
414 }) 419 })
415 420
416 def _process_cmd(self, cmd, *args): 421 def _process_cmd(self, msgid, cmd, *args):
422 """Executes the requested command in an execution thread.
423
424 This executes a call within a thread executor and returns the results
425 of the execution.
426
427 :param msgid: The message identifier.
428 :param cmd: The `Message` type indicating the command type.
429 :param args: The function, args, and kwargs if a Message.CALL type.
430 :return: A tuple of the return status, optional call output, and
431 optional error information.
432 """
417 if cmd == Message.PING: 433 if cmd == Message.PING:
418 return (Message.PONG.value,) 434 return (Message.PONG.value,)
419 435
420 elif cmd == Message.CALL: 436 try:
437 if cmd != Message.CALL:
438 raise ProtocolError(_('Unknown privsep cmd: %s') % cmd)
439
440 # Extract the callable and arguments
421 name, f_args, f_kwargs = args 441 name, f_args, f_kwargs = args
422 func = importutils.import_class(name) 442 func = importutils.import_class(name)
423
424 if not self.context.is_entrypoint(func): 443 if not self.context.is_entrypoint(func):
425 msg = _('Invalid privsep function: %s not exported') % name 444 msg = _('Invalid privsep function: %s not exported') % name
426 raise NameError(msg) 445 raise NameError(msg)
427 446
428 ret = func(*f_args, **f_kwargs) 447 ret = func(*f_args, **f_kwargs)
429 return (Message.RET.value, ret) 448 return (Message.RET.value, ret)
449 except Exception as e:
450 LOG.debug(
451 'privsep: Exception during request[%(msgid)s]: '
452 '%(err)s', {'msgid': msgid, 'err': e}, exc_info=True)
453 cls = e.__class__
454 cls_name = '%s.%s' % (cls.__module__, cls.__name__)
455 return (Message.ERR.value, cls_name, e.args)
456
457 def _create_done_callback(self, msgid):
458 """Creates a future callback to receive command execution results.
459
460 :param msgid: The message identifier.
461 :return: A future reply callback.
462 """
463 channel = self.channel
464
465 def _call_back(result):
466 """Future execution callback.
467
468 :param result: The `future` execution and its results.
469 """
470 try:
471 reply = result.result()
472 LOG.debug('privsep: reply[%(msgid)s]: %(reply)s',
473 {'msgid': msgid, 'reply': reply})
474 channel.send((msgid, reply))
475 except IOError:
476 self.communication_error = sys.exc_info()
477 except Exception as e:
478 LOG.debug(
479 'privsep: Exception during request[%(msgid)s]: '
480 '%(err)s', {'msgid': msgid, 'err': e}, exc_info=True)
481 cls = e.__class__
482 cls_name = '%s.%s' % (cls.__module__, cls.__name__)
483 reply = (Message.ERR.value, cls_name, e.args)
484 try:
485 channel.send((msgid, reply))
486 except IOError:
487 self.communication_error = sys.exc_info()
430 488
431 raise ProtocolError(_('Unknown privsep cmd: %s') % cmd) 489 return _call_back
432 490
433 def loop(self): 491 def loop(self):
434 """Main body of daemon request loop""" 492 """Main body of daemon request loop"""
@@ -439,27 +497,16 @@ class Daemon(object):
439 self.context.set_client_mode(False) 497 self.context.set_client_mode(False)
440 498
441 for msgid, msg in self.channel: 499 for msgid, msg in self.channel:
442 LOG.debug('privsep: request[%(msgid)s]: %(req)s', 500 error = self.communication_error
443 {'msgid': msgid, 'req': msg}) 501 if error:
444 try: 502 if error[1].errno == errno.EPIPE:
445 reply = self._process_cmd(*msg)
446 except Exception as e:
447 LOG.debug(
448 'privsep: Exception during request[%(msgid)s]: %(err)s',
449 {'msgid': msgid, 'err': e}, exc_info=True)
450 cls = e.__class__
451 cls_name = '%s.%s' % (cls.__module__, cls.__name__)
452 reply = (Message.ERR.value, cls_name, e.args)
453
454 try:
455 LOG.debug('privsep: reply[%(msgid)s]: %(reply)s',
456 {'msgid': msgid, 'reply': reply})
457 self.channel.send((msgid, reply))
458 except IOError as e:
459 if e.errno == errno.EPIPE:
460 # Write stream closed, exit loop 503 # Write stream closed, exit loop
461 break 504 break
462 raise 505 six.reraise(error)
506
507 # Submit the command for execution
508 future = self.thread_pool.submit(self._process_cmd, msgid, *msg)
509 future.add_done_callback(self._create_done_callback(msgid))
463 510
464 LOG.debug('Socket closed, shutting down privsep daemon') 511 LOG.debug('Socket closed, shutting down privsep daemon')
465 512
diff --git a/oslo_privsep/priv_context.py b/oslo_privsep/priv_context.py
index 1036701..1b44861 100644
--- a/oslo_privsep/priv_context.py
+++ b/oslo_privsep/priv_context.py
@@ -16,6 +16,7 @@
16import enum 16import enum
17import functools 17import functools
18import logging 18import logging
19import multiprocessing
19import shlex 20import shlex
20import sys 21import sys
21 22
@@ -48,6 +49,12 @@ OPTS = [
48 type=types.List(CapNameOrInt), default=[], 49 type=types.List(CapNameOrInt), default=[],
49 help=_('List of Linux capabilities retained by the privsep ' 50 help=_('List of Linux capabilities retained by the privsep '
50 'daemon.')), 51 'daemon.')),
52 cfg.IntOpt('thread_pool_size',
53 min=1,
54 help=_("The number of threads available for privsep to "
55 "concurrently run processes. Defaults to the number of "
56 "CPU cores in the system."),
57 default=multiprocessing.cpu_count()),
51 cfg.StrOpt('helper_command', 58 cfg.StrOpt('helper_command',
52 help=_('Command to invoke to start the privsep daemon if ' 59 help=_('Command to invoke to start the privsep daemon if '
53 'not using the "fork" method. ' 60 'not using the "fork" method. '
diff --git a/oslo_privsep/tests/test_daemon.py b/oslo_privsep/tests/test_daemon.py
index 2147b10..c5cf29a 100644
--- a/oslo_privsep/tests/test_daemon.py
+++ b/oslo_privsep/tests/test_daemon.py
@@ -149,6 +149,7 @@ class DaemonTest(base.BaseTestCase):
149 context = mock.NonCallableMock() 149 context = mock.NonCallableMock()
150 context.conf.user = 42 150 context.conf.user = 42
151 context.conf.group = 84 151 context.conf.group = 84
152 context.conf.thread_pool_size = 10
152 context.conf.capabilities = [ 153 context.conf.capabilities = [
153 capabilities.CAP_SYS_ADMIN, capabilities.CAP_NET_ADMIN] 154 capabilities.CAP_SYS_ADMIN, capabilities.CAP_NET_ADMIN]
154 155
diff --git a/releasenotes/notes/add_thread_pool_size-a54e6f27ab019f96.yaml b/releasenotes/notes/add_thread_pool_size-a54e6f27ab019f96.yaml
new file mode 100644
index 0000000..92fa9db
--- /dev/null
+++ b/releasenotes/notes/add_thread_pool_size-a54e6f27ab019f96.yaml
@@ -0,0 +1,7 @@
1---
2features:
3 - |
4 Privsep now uses multithreading to allow concurrency in executing
5 privileged commands. The number of concurrent threads defaults to the
6 available CPU cores, but can be adjusted by the new ``thread_pool_size``
7 config option.