From 498e69c4267ee19b0a201bda5424a3901f668151 Mon Sep 17 00:00:00 2001 From: Dawid Deja Date: Fri, 13 Jan 2017 12:20:23 +0100 Subject: [PATCH] Make kombu driver work in multi-thread manner Implements blueprint mistral-kombu-driver-multi-thread Change-Id: I08f81c7c3de47320a76dc13d5edf2186d3ef2de0 --- .../engine/rpc_backend/kombu/kombu_server.py | 32 ++++++++++++++++++- .../rpc_backend/kombu/test_kombu_server.py | 27 ++++++++++++++++ setup.cfg | 4 +++ 3 files changed, 62 insertions(+), 1 deletion(-) diff --git a/mistral/engine/rpc_backend/kombu/kombu_server.py b/mistral/engine/rpc_backend/kombu/kombu_server.py index 90b364a20..2eba33eca 100644 --- a/mistral/engine/rpc_backend/kombu/kombu_server.py +++ b/mistral/engine/rpc_backend/kombu/kombu_server.py @@ -16,7 +16,9 @@ import socket import threading import kombu +from oslo_config import cfg from oslo_log import log as logging +from stevedore import driver from mistral import context as auth_ctx from mistral.engine.rpc_backend import base as rpc_base @@ -25,6 +27,13 @@ from mistral import exceptions as exc LOG = logging.getLogger(__name__) +CONF = cfg.CONF +_pool_opts = [ + cfg.IntOpt('executor_thread_pool_size', + default=64, + deprecated_name="rpc_thread_pool_size", + help='Size of executor thread pool.'), +] class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): @@ -32,7 +41,9 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): super(KombuRPCServer, self).__init__(conf) self._register_mistral_serialization() + CONF.register_opts(_pool_opts) + self._executor_threads = CONF.executor_thread_pool_size self.exchange = conf.get('exchange', '') self.user_id = conf.get('user_id', 'guest') self.password = conf.get('password', 'guest') @@ -49,6 +60,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): self._running = threading.Event() self._stopped = threading.Event() self.endpoints = [] + self._worker = None @property def is_running(self): @@ -57,6 +69,8 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): def run(self, executor='blocking'): """Start the server.""" + self._prepare_worker(executor) + self.conn = self._make_connection( self.host, self.port, @@ -83,7 +97,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): ) with conn.Consumer( queues=queue, - callbacks=[self._on_message_safe], + callbacks=[self._process_message], ) as consumer: consumer.qos(prefetch_count=1) @@ -115,6 +129,10 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): def wait(self): self._stopped.wait() + try: + self._worker.shutdown(wait=True) + except AttributeError as e: + LOG.warning("Cannot stop worker in graceful way: %s" % e) def _get_rpc_method(self, method_name): for endpoint in self.endpoints: @@ -192,3 +210,15 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): def register_endpoint(self, endpoint): self.endpoints.append(endpoint) + + def _process_message(self, request, message): + self._worker.submit(self._on_message_safe, request, message) + + def _prepare_worker(self, executor='blocking'): + mgr = driver.DriverManager('kombu_driver.executors', executor) + + executor_opts = {} + if executor == 'threading': + executor_opts['max_workers'] = self._executor_threads + + self._worker = mgr.driver(**executor_opts) diff --git a/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_server.py b/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_server.py index 8c3a189a0..ce0c057f3 100644 --- a/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_server.py +++ b/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_server.py @@ -19,6 +19,7 @@ from mistral.tests.unit.engine.rpc_backend.kombu import fake_kombu import mock import socket +from stevedore import driver with mock.patch.dict('sys.modules', kombu=fake_kombu): from mistral.engine.rpc_backend.kombu import kombu_server @@ -260,3 +261,29 @@ class KombuServerTestCase(base.KombuTestCase): reply_to, correlation_id ) + + @mock.patch('stevedore.driver.DriverManager') + def test__prepare_worker(self, driver_manager_mock): + worker_mock = mock.MagicMock() + mgr_mock = mock.MagicMock() + mgr_mock.driver.return_value = worker_mock + + def side_effect(*args, **kwargs): + return mgr_mock + + driver_manager_mock.side_effect = side_effect + + self.server._prepare_worker('blocking') + + self.assertEqual(self.server._worker, worker_mock) + + @mock.patch('stevedore.driver.DriverManager') + def test__prepare_worker_no_valid_executor(self, driver_manager_mock): + + driver_manager_mock.side_effect = driver.NoMatches() + + self.assertRaises( + driver.NoMatches, + self.server._prepare_worker, + 'non_valid_executor' + ) diff --git a/setup.cfg b/setup.cfg index d8b99072c..d96feb3e9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -84,3 +84,7 @@ mistral.expression.evaluators = mistral.auth = keystone = mistral.auth.keystone:KeystoneAuthHandler keycloak-oidc = mistral.auth.keycloak:KeycloakAuthHandler + +kombu_driver.executors = + blocking = futurist:SynchronousExecutor + threading = futurist:ThreadPoolExecutor