Shift swift segment async code out of adapter

Rather than having a run_async parameter to adapter which impacts what
the request method returns, do the concurrent.futures work in the
segment upload code itself. This is one of the steps towards shifting to
the keystoneauth-based rate limiting.

Change-Id: I0ed6cd566f4c450fedbe14712fc8f3712d736f6a
This commit is contained in:
Monty Taylor 2018-11-01 10:53:56 -05:00
parent 999ff0eb7f
commit 0b625424a2
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
2 changed files with 53 additions and 20 deletions

View File

@ -132,7 +132,7 @@ class OpenStackSDKAdapter(adapter.Adapter):
self.task_manager = task_manager
def request(
self, url, method, run_async=False, error_message=None,
self, url, method, error_message=None,
raise_exc=False, connect_retries=1, *args, **kwargs):
name_parts = _extract_name(url, self.service_type)
name = '.'.join([self.service_type, method] + name_parts)
@ -145,10 +145,7 @@ class OpenStackSDKAdapter(adapter.Adapter):
connect_retries=connect_retries, raise_exc=raise_exc,
tag=self.service_type,
**kwargs)
if run_async:
return ret
else:
return ret.result()
return ret.result()
def _version_matches(self, version):
api_version = self.get_api_major_version()
@ -160,11 +157,6 @@ class OpenStackSDKAdapter(adapter.Adapter):
class ShadeAdapter(OpenStackSDKAdapter):
"""Wrapper for shade methods that expect json unpacking."""
def request(self, url, method,
run_async=False, error_message=None, **kwargs):
response = super(ShadeAdapter, self).request(
url, method, run_async=run_async, **kwargs)
if run_async:
return response
else:
return _json_response(response, error_message=error_message)
def request(self, url, method, error_message=None, **kwargs):
response = super(ShadeAdapter, self).request(url, method, **kwargs)
return _json_response(response, error_message=error_message)

View File

@ -12,6 +12,7 @@
import base64
import collections
import concurrent.futures
import copy
import datetime
import functools
@ -51,7 +52,6 @@ from openstack.cloud import meta
from openstack.cloud import _utils
import openstack.config
import openstack.config.defaults
from openstack import task_manager
from openstack import utils
# Rackspace returns this for intermittent import errors
@ -259,6 +259,8 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
self._container_cache = dict()
self._file_hash_cache = dict()
self.__pool_executor = None
self._raw_clients = {}
self._local_ipv6 = (
@ -7828,6 +7830,42 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
if entry['path'] == '/{name}'.format(name=name):
entry['etag'] = result.headers['Etag']
@property
def _pool_executor(self):
if not self.__pool_executor:
# TODO(mordred) Make this configurable - and probably use Futurist
# instead of concurrent.futures so that people using Eventlet will
# be happier.
self.__pool_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=5)
return self.__pool_executor
def _wait_for_futures(self, futures, raise_on_error=True):
'''Collect results or failures from a list of running future tasks.'''
results = []
retries = []
# Check on each result as its thread finishes
for completed in concurrent.futures.as_completed(futures):
try:
result = completed.result()
exceptions.raise_from_response(result)
results.append(result)
except (keystoneauth1.exceptions.RetriableConnectionFailure,
exceptions.HttpException) as e:
error_text = "Exception processing async task: {}".format(
str(e))
if raise_on_error:
self.log.exception(error_text)
raise
else:
self.log.debug(error_text)
# If we get an exception, put the result into a list so we
# can try again
retries.append(completed.result())
return results, retries
def _upload_large_object(
self, endpoint, filename,
headers, file_size, segment_size, use_slo):
@ -7851,8 +7889,10 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
# Schedule the segments for upload
for name, segment in segments.items():
# Async call to put - schedules execution and returns a future
segment_future = self._object_store_client.put(
name, headers=headers, data=segment, run_async=True)
segment_future = self._pool_executor.submit(
self.object_store.put,
name, headers=headers, data=segment,
raise_exc=False)
segment_futures.append(segment_future)
# TODO(mordred) Collect etags from results to add to this manifest
# dict. Then sort the list of dicts by path.
@ -7861,7 +7901,7 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
size_bytes=segment.length))
# Try once and collect failed results to retry
segment_results, retry_results = task_manager.wait_for_futures(
segment_results, retry_results = self._wait_for_futures(
segment_futures, raise_on_error=False)
self._add_etag_to_manifest(segment_results, manifest)
@ -7872,14 +7912,15 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
segment = segments[name]
segment.seek(0)
# Async call to put - schedules execution and returns a future
segment_future = self._object_store_client.put(
name, headers=headers, data=segment, run_async=True)
segment_future = self._pool_executor.submit(
self.object_store.put,
name, headers=headers, data=segment)
# TODO(mordred) Collect etags from results to add to this manifest
# dict. Then sort the list of dicts by path.
retry_futures.append(segment_future)
# If any segments fail the second time, just throw the error
segment_results, retry_results = task_manager.wait_for_futures(
segment_results, retry_results = self._wait_for_futures(
retry_futures, raise_on_error=True)
self._add_etag_to_manifest(segment_results, manifest)