diff --git a/doc/source/user/guides/logging.rst b/doc/source/user/guides/logging.rst index 6c8a27eee..6eb4da4a5 100644 --- a/doc/source/user/guides/logging.rst +++ b/doc/source/user/guides/logging.rst @@ -65,6 +65,14 @@ openstack.config Issues pertaining to configuration are logged to the ``openstack.config`` logger. +openstack.task_manager + `openstacksdk` uses a Task Manager to perform remote calls. The + ``openstack.task_manager`` logger emits messages at the start and end + of each Task announcing what it is going to run and then what it ran and + how long it took. Logging ``openstack.task_manager`` is a good way to + get a trace of external actions `openstacksdk` is taking without full + `HTTP Tracing`_. + openstack.iterate_timeout When `openstacksdk` needs to poll a resource, it does so in a loop that waits between iterations and ultimately times out. The diff --git a/lower-constraints.txt b/lower-constraints.txt index 99965df90..6eb10512e 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -13,7 +13,7 @@ jmespath==0.9.0 jsonpatch==1.16 jsonpointer==1.13 jsonschema==2.6.0 -keystoneauth1==3.13.0 +keystoneauth1==3.11.0 linecache2==1.0.0 mock==2.0.0 mox3==0.20.0 diff --git a/openstack/_adapter.py b/openstack/_adapter.py index 5f8d720eb..6f9f341e3 100644 --- a/openstack/_adapter.py +++ b/openstack/_adapter.py @@ -14,6 +14,7 @@ ''' Wrapper around keystoneauth Adapter to wrap calls in TaskManager ''' +import functools try: import simplejson JSONDecodeError = simplejson.scanner.JSONDecodeError @@ -24,6 +25,7 @@ from six.moves import urllib from keystoneauth1 import adapter from openstack import exceptions +from openstack import task_manager as _task_manager def _extract_name(url, service_type=None): @@ -107,22 +109,43 @@ def _json_response(response, result_key=None, error_message=None): class OpenStackSDKAdapter(adapter.Adapter): - """Wrapper around keystoneauth1.adapter.Adapter.""" + """Wrapper around keystoneauth1.adapter.Adapter. + + Uses task_manager to run tasks rather than executing them directly. + This allows using the nodepool MultiThreaded Rate Limiting TaskManager. + """ def __init__( self, session=None, + task_manager=None, + rate_limit=None, concurrency=None, *args, **kwargs): super(OpenStackSDKAdapter, self).__init__( session=session, *args, **kwargs) + if not task_manager: + task_manager = _task_manager.TaskManager( + name=self.service_type, + rate=rate_limit, + workers=concurrency) + task_manager.start() + + self.task_manager = task_manager def request( self, url, method, error_message=None, raise_exc=False, connect_retries=1, *args, **kwargs): - response = super(OpenStackSDKAdapter, self).request( - url, method, - connect_retries=connect_retries, raise_exc=False, + name_parts = _extract_name(url, self.service_type) + name = '.'.join([self.service_type, method] + name_parts) + + request_method = functools.partial( + super(OpenStackSDKAdapter, self).request, url, method) + + ret = self.task_manager.submit_function( + request_method, run_async=True, name=name, + connect_retries=connect_retries, raise_exc=raise_exc, + tag=self.service_type, **kwargs) - return response + return ret.result() def _version_matches(self, version): api_version = self.get_api_major_version() diff --git a/openstack/cloud/openstackcloud.py b/openstack/cloud/openstackcloud.py index 43c7753a2..450ef4d9a 100755 --- a/openstack/cloud/openstackcloud.py +++ b/openstack/cloud/openstackcloud.py @@ -452,6 +452,7 @@ class _OpenStackCloudMixin(_normalize.Normalizer): version=config_major) adapter = _adapter.ShadeAdapter( session=self.session, + task_manager=self.task_manager, service_type=self.config.get_service_type(service_type), service_name=self.config.get_service_name(service_type), interface=self.config.get_interface(service_type), @@ -464,6 +465,7 @@ class _OpenStackCloudMixin(_normalize.Normalizer): adapter = _adapter.ShadeAdapter( session=self.session, + task_manager=self.task_manager, service_type=self.config.get_service_type(service_type), service_name=self.config.get_service_name(service_type), interface=self.config.get_interface(service_type), @@ -502,6 +504,7 @@ class _OpenStackCloudMixin(_normalize.Normalizer): self, service_type, api_version=None, endpoint_override=None): return _adapter.ShadeAdapter( session=self.session, + task_manager=self.task_manager, service_type=self.config.get_service_type(service_type), service_name=self.config.get_service_name(service_type), interface=self.config.get_interface(service_type), diff --git a/openstack/config/cloud_region.py b/openstack/config/cloud_region.py index e96cabe32..0d763d96c 100644 --- a/openstack/config/cloud_region.py +++ b/openstack/config/cloud_region.py @@ -492,6 +492,7 @@ class CloudRegion(object): region_name=self.region_name, ) network_endpoint = network_adapter.get_endpoint() + network_adapter.task_manager.stop() if not network_endpoint.rstrip().rsplit('/')[-1] == 'v2.0': if not network_endpoint.endswith('/'): network_endpoint += '/' diff --git a/openstack/connection.py b/openstack/connection.py index f208bf4be..27974fa9d 100644 --- a/openstack/connection.py +++ b/openstack/connection.py @@ -168,6 +168,7 @@ from openstack import config as _config from openstack.config import cloud_region from openstack import exceptions from openstack import service_description +from openstack import task_manager as _task_manager __all__ = [ 'from_config', @@ -258,8 +259,10 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta, filtering instead of making list calls and filtering client-side. Default false. :param task_manager: - Ignored. Exists for backwards compat during transition. Rate limit - parameters should be passed directly to the `rate_limit` parameter. + Task Manager to handle the execution of remote REST calls. + Defaults to None which causes a direct-action Task Manager to be + used. + :type manager: :class:`~openstack.task_manager.TaskManager` :param rate_limit: Client-side rate limit, expressed in calls per second. The parameter can either be a single float, or it can be a dict with @@ -283,7 +286,6 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta, app_name=app_name, app_version=app_version, load_yaml_config=False, load_envvars=False, - rate_limit=rate_limit, **kwargs) else: self.config = _config.get_cloud_region( @@ -291,9 +293,18 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta, app_name=app_name, app_version=app_version, load_yaml_config=cloud is not None, load_envvars=cloud is not None, - rate_limit=rate_limit, **kwargs) + if task_manager: + # If a TaskManager was passed in, don't start it, assume it's + # under the control of the calling context. + self.task_manager = task_manager + else: + self.task_manager = _task_manager.TaskManager( + self.config.full_name, + rate=rate_limit) + self.task_manager.start() + self._session = None self._proxies = {} self.use_direct_get = use_direct_get @@ -360,8 +371,7 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta, def close(self): """Release any resources held open.""" - if self.__pool_executor: - self.__pool_executor.shutdown() + self.task_manager.stop() def __enter__(self): return self diff --git a/openstack/service_description.py b/openstack/service_description.py index 206affce8..bab08562b 100644 --- a/openstack/service_description.py +++ b/openstack/service_description.py @@ -108,6 +108,7 @@ class ServiceDescription(object): proxy_obj = config.get_session_client( self.service_type, constructor=proxy_class, + task_manager=instance.task_manager, ) else: warnings.warn( @@ -128,6 +129,7 @@ class ServiceDescription(object): proxy_obj = config.get_session_client( self.service_type, constructor=proxy_class, + task_manager=instance.task_manager, ) else: warnings.warn( @@ -163,6 +165,7 @@ class ServiceDescription(object): proxy_obj = config.get_session_client( self.service_type, constructor=proxy_class, + task_manager=instance.task_manager, ) return proxy_obj @@ -196,12 +199,14 @@ class ServiceDescription(object): service_type=self.service_type), category=exceptions.UnsupportedServiceVersion) return temp_adapter + temp_adapter.task_manager.stop() proxy_class = self.supported_versions.get(str(found_version[0])) if not proxy_class: proxy_class = proxy.Proxy return config.get_session_client( self.service_type, constructor=proxy_class, + task_manager=instance.task_manager, allow_version_hack=True, **version_kwargs ) diff --git a/openstack/tests/unit/base.py b/openstack/tests/unit/base.py index 7d508ecd6..cee34d786 100644 --- a/openstack/tests/unit/base.py +++ b/openstack/tests/unit/base.py @@ -438,6 +438,7 @@ class TestCase(base.TestCase): cloud=test_cloud, validate=True, **kwargs) self.cloud = openstack.connection.Connection( config=self.cloud_config, strict=self.strict_cloud) + self.addCleanup(self.cloud.task_manager.stop) def get_cinder_discovery_mock_dict( self, diff --git a/openstack/tests/unit/cloud/test_task_manager.py b/openstack/tests/unit/cloud/test_task_manager.py new file mode 100644 index 000000000..89fc2dfce --- /dev/null +++ b/openstack/tests/unit/cloud/test_task_manager.py @@ -0,0 +1,227 @@ +# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import concurrent.futures +import fixtures +import mock +import threading +import time + +from six.moves import queue + +from openstack import task_manager +from openstack.tests.unit import base + + +class TestException(Exception): + pass + + +class TaskTest(task_manager.Task): + def main(self): + raise TestException("This is a test exception") + + +class TaskTestGenerator(task_manager.Task): + def main(self): + yield 1 + + +class TaskTestInt(task_manager.Task): + def main(self): + return int(1) + + +class TaskTestFloat(task_manager.Task): + def main(self): + return float(2.0) + + +class TaskTestStr(task_manager.Task): + def main(self): + return "test" + + +class TaskTestBool(task_manager.Task): + def main(self): + return True + + +class TaskTestSet(task_manager.Task): + def main(self): + return set([1, 2]) + + +class TestRateTransforms(base.TestCase): + + def test_rate_parameter_scalar(self): + manager = task_manager.TaskManager(name='test', rate=0.1234) + self.assertEqual(1 / 0.1234, manager._get_wait('compute')) + self.assertEqual(1 / 0.1234, manager._get_wait(None)) + + def test_rate_parameter_dict(self): + manager = task_manager.TaskManager( + name='test', + rate={ + 'compute': 20, + 'network': 10, + }) + self.assertEqual(1.0 / 20, manager._get_wait('compute')) + self.assertEqual(1.0 / 10, manager._get_wait('network')) + self.assertIsNone(manager._get_wait('object-store')) + + +class TestTaskManager(base.TestCase): + + def setUp(self): + super(TestTaskManager, self).setUp() + self.manager = task_manager.TaskManager(name='test') + self.manager.start() + + def test_wait_re_raise(self): + """Test that Exceptions thrown in a Task is reraised correctly + + This test is aimed to six.reraise(), called in Task::wait(). + Specifically, we test if we get the same behaviour with all the + configured interpreters (e.g. py27, p35, ...) + """ + self.assertRaises(TestException, self.manager.submit_task, TaskTest()) + + def test_dont_munchify_int(self): + ret = self.manager.submit_task(TaskTestInt()) + self.assertIsInstance(ret, int) + + def test_dont_munchify_float(self): + ret = self.manager.submit_task(TaskTestFloat()) + self.assertIsInstance(ret, float) + + def test_dont_munchify_str(self): + ret = self.manager.submit_task(TaskTestStr()) + self.assertIsInstance(ret, str) + + def test_dont_munchify_bool(self): + ret = self.manager.submit_task(TaskTestBool()) + self.assertIsInstance(ret, bool) + + def test_dont_munchify_set(self): + ret = self.manager.submit_task(TaskTestSet()) + self.assertIsInstance(ret, set) + + @mock.patch.object(concurrent.futures.ThreadPoolExecutor, 'submit') + def test_async(self, mock_submit): + + self.manager.submit_function(set, run_async=True) + self.assertTrue(mock_submit.called) + + @mock.patch.object(task_manager.TaskManager, 'post_run_task') + @mock.patch.object(task_manager.TaskManager, 'pre_run_task') + def test_pre_post_calls(self, mock_pre, mock_post): + self.manager.submit_function(lambda: None) + mock_pre.assert_called_once() + mock_post.assert_called_once() + + @mock.patch.object(task_manager.TaskManager, 'post_run_task') + @mock.patch.object(task_manager.TaskManager, 'pre_run_task') + def test_validate_timing(self, mock_pre, mock_post): + # Note the unit test setup has mocked out time.sleep() and + # done a * 0.0001, and the test should be under the 5 + # second timeout. Thus with below, we should see at + # *least* a 1 second pause running the task. + self.manager.submit_function(lambda: time.sleep(10000)) + + mock_pre.assert_called_once() + mock_post.assert_called_once() + + args, kwargs = mock_post.call_args_list[0] + self.assertTrue(args[0] > 1.0) + + +class ThreadingTaskManager(task_manager.TaskManager): + """A subclass of TaskManager which exercises the thread-shifting + exception handling behavior.""" + + def __init__(self, *args, **kw): + super(ThreadingTaskManager, self).__init__( + *args, **kw) + self.queue = queue.Queue() + self._running = True + self._thread = threading.Thread(name=self.name, target=self.run) + self._thread.daemon = True + self.failed = False + + def start(self): + self._thread.start() + + def stop(self): + self._running = False + self.queue.put(None) + + def join(self): + self._thread.join() + + def run(self): + # No exception should ever cause this method to hit its + # exception handler. + try: + while True: + task = self.queue.get() + if not task: + if not self._running: + break + continue + self.run_task(task) + self.queue.task_done() + except Exception: + self.failed = True + raise + + def submit_task(self, task, raw=False): + # An important part of the exception-shifting feature is that + # this method should raise the exception. + self.queue.put(task) + return task.wait() + + +class ThreadingTaskManagerFixture(fixtures.Fixture): + def _setUp(self): + self.manager = ThreadingTaskManager(name='threading test') + self.manager.start() + self.addCleanup(self._cleanup) + + def _cleanup(self): + self.manager.stop() + self.manager.join() + + +class TestThreadingTaskManager(base.TestCase): + + def setUp(self): + super(TestThreadingTaskManager, self).setUp() + f = self.useFixture(ThreadingTaskManagerFixture()) + self.manager = f.manager + + def test_wait_re_raise(self): + """Test that Exceptions thrown in a Task is reraised correctly + + This test is aimed to six.reraise(), called in Task::wait(). + Specifically, we test if we get the same behaviour with all the + configured interpreters (e.g. py27, p35, ...) + """ + self.assertRaises(TestException, self.manager.submit_task, TaskTest()) + # Stop the manager and join the run thread to ensure the + # exception handler has run. + self.manager.stop() + self.manager.join() + self.assertFalse(self.manager.failed) diff --git a/openstack/tests/unit/test_connection.py b/openstack/tests/unit/test_connection.py index 163275fd0..f27ce67b9 100644 --- a/openstack/tests/unit/test_connection.py +++ b/openstack/tests/unit/test_connection.py @@ -91,6 +91,22 @@ class TestConnection(base.TestCase): self.assertEqual(mock_session, conn.session) self.assertEqual('auth.example.com', conn.config.name) + def test_task_manager_rate_scalar(self): + conn = connection.Connection(cloud='sample-cloud', rate_limit=20) + self.assertEqual(1.0 / 20, conn.task_manager._get_wait('object-store')) + self.assertEqual(1.0 / 20, conn.task_manager._get_wait(None)) + + def test_task_manager_rate_dict(self): + conn = connection.Connection( + cloud='sample-cloud', + rate_limit={ + 'compute': 20, + 'network': 10, + }) + self.assertEqual(1.0 / 20, conn.task_manager._get_wait('compute')) + self.assertEqual(1.0 / 10, conn.task_manager._get_wait('network')) + self.assertIsNone(conn.task_manager._get_wait('object-store')) + def test_create_session(self): conn = connection.Connection(cloud='sample-cloud') self.assertIsNotNone(conn) diff --git a/requirements.txt b/requirements.txt index 6181b5544..78f758519 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ requestsexceptions>=1.2.0 # Apache-2.0 jsonpatch!=1.20,>=1.16 # BSD six>=1.10.0 # MIT os-service-types>=1.2.0 # Apache-2.0 -keystoneauth1>=3.13.0 # Apache-2.0 +keystoneauth1>=3.11.0 # Apache-2.0 munch>=2.1.0 # MIT decorator>=3.4.0 # BSD