Use threads to process target function

This patch takes advantages of threads to make privsep daemon process
concurrently.

Change-Id: Ib20b27d5ea07bd0af61891c7d8c0d352a393aa21
This commit is contained in:
TommyLike 2018-08-20 17:24:31 +08:00 committed by Sean McGinnis
parent b033057372
commit f368430f13
No known key found for this signature in database
GPG Key ID: CE7EE4BFAF8D70C8
4 changed files with 85 additions and 23 deletions

View File

@ -43,6 +43,7 @@ The privsep daemon exits when the communication channel is closed,
'''
from concurrent import futures
import enum
import errno
import io
@ -64,6 +65,7 @@ import eventlet
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
import six
from oslo_privsep._i18n import _
from oslo_privsep import capabilities
@ -352,6 +354,9 @@ class Daemon(object):
self.user = context.conf.user
self.group = context.conf.group
self.caps = set(context.conf.capabilities)
self.thread_pool = futures.ThreadPoolExecutor(
context.conf.thread_pool_size)
self.communication_error = None
def run(self):
"""Run request loop. Sets up environment, then calls loop()"""
@ -413,22 +418,75 @@ class Daemon(object):
'inh': fmt_caps(inh),
})
def _process_cmd(self, cmd, *args):
def _process_cmd(self, msgid, cmd, *args):
"""Executes the requested command in an execution thread.
This executes a call within a thread executor and returns the results
of the execution.
:param msgid: The message identifier.
:param cmd: The `Message` type indicating the command type.
:param args: The function, args, and kwargs if a Message.CALL type.
:return: A tuple of the return status, optional call output, and
optional error information.
"""
if cmd == Message.PING:
return (Message.PONG.value,)
elif cmd == Message.CALL:
try:
if cmd != Message.CALL:
raise ProtocolError(_('Unknown privsep cmd: %s') % cmd)
# Extract the callable and arguments
name, f_args, f_kwargs = args
func = importutils.import_class(name)
if not self.context.is_entrypoint(func):
msg = _('Invalid privsep function: %s not exported') % name
raise NameError(msg)
ret = func(*f_args, **f_kwargs)
return (Message.RET.value, ret)
except Exception as e:
LOG.debug(
'privsep: Exception during request[%(msgid)s]: '
'%(err)s', {'msgid': msgid, 'err': e}, exc_info=True)
cls = e.__class__
cls_name = '%s.%s' % (cls.__module__, cls.__name__)
return (Message.ERR.value, cls_name, e.args)
raise ProtocolError(_('Unknown privsep cmd: %s') % cmd)
def _create_done_callback(self, msgid):
"""Creates a future callback to receive command execution results.
:param msgid: The message identifier.
:return: A future reply callback.
"""
channel = self.channel
def _call_back(result):
"""Future execution callback.
:param result: The `future` execution and its results.
"""
try:
reply = result.result()
LOG.debug('privsep: reply[%(msgid)s]: %(reply)s',
{'msgid': msgid, 'reply': reply})
channel.send((msgid, reply))
except IOError:
self.communication_error = sys.exc_info()
except Exception as e:
LOG.debug(
'privsep: Exception during request[%(msgid)s]: '
'%(err)s', {'msgid': msgid, 'err': e}, exc_info=True)
cls = e.__class__
cls_name = '%s.%s' % (cls.__module__, cls.__name__)
reply = (Message.ERR.value, cls_name, e.args)
try:
channel.send((msgid, reply))
except IOError:
self.communication_error = sys.exc_info()
return _call_back
def loop(self):
"""Main body of daemon request loop"""
@ -439,27 +497,16 @@ class Daemon(object):
self.context.set_client_mode(False)
for msgid, msg in self.channel:
LOG.debug('privsep: request[%(msgid)s]: %(req)s',
{'msgid': msgid, 'req': msg})
try:
reply = self._process_cmd(*msg)
except Exception as e:
LOG.debug(
'privsep: Exception during request[%(msgid)s]: %(err)s',
{'msgid': msgid, 'err': e}, exc_info=True)
cls = e.__class__
cls_name = '%s.%s' % (cls.__module__, cls.__name__)
reply = (Message.ERR.value, cls_name, e.args)
try:
LOG.debug('privsep: reply[%(msgid)s]: %(reply)s',
{'msgid': msgid, 'reply': reply})
self.channel.send((msgid, reply))
except IOError as e:
if e.errno == errno.EPIPE:
error = self.communication_error
if error:
if error[1].errno == errno.EPIPE:
# Write stream closed, exit loop
break
raise
six.reraise(error)
# Submit the command for execution
future = self.thread_pool.submit(self._process_cmd, msgid, *msg)
future.add_done_callback(self._create_done_callback(msgid))
LOG.debug('Socket closed, shutting down privsep daemon')

View File

@ -16,6 +16,7 @@
import enum
import functools
import logging
import multiprocessing
import shlex
import sys
@ -48,6 +49,12 @@ OPTS = [
type=types.List(CapNameOrInt), default=[],
help=_('List of Linux capabilities retained by the privsep '
'daemon.')),
cfg.IntOpt('thread_pool_size',
min=1,
help=_("The number of threads available for privsep to "
"concurrently run processes. Defaults to the number of "
"CPU cores in the system."),
default=multiprocessing.cpu_count()),
cfg.StrOpt('helper_command',
help=_('Command to invoke to start the privsep daemon if '
'not using the "fork" method. '

View File

@ -149,6 +149,7 @@ class DaemonTest(base.BaseTestCase):
context = mock.NonCallableMock()
context.conf.user = 42
context.conf.group = 84
context.conf.thread_pool_size = 10
context.conf.capabilities = [
capabilities.CAP_SYS_ADMIN, capabilities.CAP_NET_ADMIN]

View File

@ -0,0 +1,7 @@
---
features:
- |
Privsep now uses multithreading to allow concurrency in executing
privileged commands. The number of concurrent threads defaults to the
available CPU cores, but can be adjusted by the new ``thread_pool_size``
config option.