From 0b625424a25095acaf499a7060eb7e4121874683 Mon Sep 17 00:00:00 2001 From: Monty Taylor Date: Thu, 1 Nov 2018 10:53:56 -0500 Subject: [PATCH] Shift swift segment async code out of adapter Rather than having a run_async parameter to adapter which impacts what the request method returns, do the concurrent.futures work in the segment upload code itself. This is one of the steps towards shifting to the keystoneauth-based rate limiting. Change-Id: I0ed6cd566f4c450fedbe14712fc8f3712d736f6a --- openstack/_adapter.py | 18 +++------- openstack/cloud/openstackcloud.py | 55 +++++++++++++++++++++++++++---- 2 files changed, 53 insertions(+), 20 deletions(-) diff --git a/openstack/_adapter.py b/openstack/_adapter.py index bb301b187..6f9f341e3 100644 --- a/openstack/_adapter.py +++ b/openstack/_adapter.py @@ -132,7 +132,7 @@ class OpenStackSDKAdapter(adapter.Adapter): self.task_manager = task_manager def request( - self, url, method, run_async=False, error_message=None, + self, url, method, error_message=None, raise_exc=False, connect_retries=1, *args, **kwargs): name_parts = _extract_name(url, self.service_type) name = '.'.join([self.service_type, method] + name_parts) @@ -145,10 +145,7 @@ class OpenStackSDKAdapter(adapter.Adapter): connect_retries=connect_retries, raise_exc=raise_exc, tag=self.service_type, **kwargs) - if run_async: - return ret - else: - return ret.result() + return ret.result() def _version_matches(self, version): api_version = self.get_api_major_version() @@ -160,11 +157,6 @@ class OpenStackSDKAdapter(adapter.Adapter): class ShadeAdapter(OpenStackSDKAdapter): """Wrapper for shade methods that expect json unpacking.""" - def request(self, url, method, - run_async=False, error_message=None, **kwargs): - response = super(ShadeAdapter, self).request( - url, method, run_async=run_async, **kwargs) - if run_async: - return response - else: - return _json_response(response, error_message=error_message) + def request(self, url, method, error_message=None, **kwargs): + response = super(ShadeAdapter, self).request(url, method, **kwargs) + return _json_response(response, error_message=error_message) diff --git a/openstack/cloud/openstackcloud.py b/openstack/cloud/openstackcloud.py index 37aaa74c5..9444aef09 100755 --- a/openstack/cloud/openstackcloud.py +++ b/openstack/cloud/openstackcloud.py @@ -12,6 +12,7 @@ import base64 import collections +import concurrent.futures import copy import datetime import functools @@ -51,7 +52,6 @@ from openstack.cloud import meta from openstack.cloud import _utils import openstack.config import openstack.config.defaults -from openstack import task_manager from openstack import utils # Rackspace returns this for intermittent import errors @@ -259,6 +259,8 @@ class _OpenStackCloudMixin(_normalize.Normalizer): self._container_cache = dict() self._file_hash_cache = dict() + self.__pool_executor = None + self._raw_clients = {} self._local_ipv6 = ( @@ -7828,6 +7830,42 @@ class _OpenStackCloudMixin(_normalize.Normalizer): if entry['path'] == '/{name}'.format(name=name): entry['etag'] = result.headers['Etag'] + @property + def _pool_executor(self): + if not self.__pool_executor: + # TODO(mordred) Make this configurable - and probably use Futurist + # instead of concurrent.futures so that people using Eventlet will + # be happier. + self.__pool_executor = concurrent.futures.ThreadPoolExecutor( + max_workers=5) + return self.__pool_executor + + def _wait_for_futures(self, futures, raise_on_error=True): + '''Collect results or failures from a list of running future tasks.''' + + results = [] + retries = [] + + # Check on each result as its thread finishes + for completed in concurrent.futures.as_completed(futures): + try: + result = completed.result() + exceptions.raise_from_response(result) + results.append(result) + except (keystoneauth1.exceptions.RetriableConnectionFailure, + exceptions.HttpException) as e: + error_text = "Exception processing async task: {}".format( + str(e)) + if raise_on_error: + self.log.exception(error_text) + raise + else: + self.log.debug(error_text) + # If we get an exception, put the result into a list so we + # can try again + retries.append(completed.result()) + return results, retries + def _upload_large_object( self, endpoint, filename, headers, file_size, segment_size, use_slo): @@ -7851,8 +7889,10 @@ class _OpenStackCloudMixin(_normalize.Normalizer): # Schedule the segments for upload for name, segment in segments.items(): # Async call to put - schedules execution and returns a future - segment_future = self._object_store_client.put( - name, headers=headers, data=segment, run_async=True) + segment_future = self._pool_executor.submit( + self.object_store.put, + name, headers=headers, data=segment, + raise_exc=False) segment_futures.append(segment_future) # TODO(mordred) Collect etags from results to add to this manifest # dict. Then sort the list of dicts by path. @@ -7861,7 +7901,7 @@ class _OpenStackCloudMixin(_normalize.Normalizer): size_bytes=segment.length)) # Try once and collect failed results to retry - segment_results, retry_results = task_manager.wait_for_futures( + segment_results, retry_results = self._wait_for_futures( segment_futures, raise_on_error=False) self._add_etag_to_manifest(segment_results, manifest) @@ -7872,14 +7912,15 @@ class _OpenStackCloudMixin(_normalize.Normalizer): segment = segments[name] segment.seek(0) # Async call to put - schedules execution and returns a future - segment_future = self._object_store_client.put( - name, headers=headers, data=segment, run_async=True) + segment_future = self._pool_executor.submit( + self.object_store.put, + name, headers=headers, data=segment) # TODO(mordred) Collect etags from results to add to this manifest # dict. Then sort the list of dicts by path. retry_futures.append(segment_future) # If any segments fail the second time, just throw the error - segment_results, retry_results = task_manager.wait_for_futures( + segment_results, retry_results = self._wait_for_futures( retry_futures, raise_on_error=True) self._add_etag_to_manifest(segment_results, manifest)