Merge "Shift swift segment async code out of adapter"

This commit is contained in:
Zuul 2018-11-07 20:20:47 +00:00 committed by Gerrit Code Review
commit 4e2385e695
2 changed files with 53 additions and 20 deletions

View File

@ -132,7 +132,7 @@ class OpenStackSDKAdapter(adapter.Adapter):
self.task_manager = task_manager
def request(
self, url, method, run_async=False, error_message=None,
self, url, method, error_message=None,
raise_exc=False, connect_retries=1, *args, **kwargs):
name_parts = _extract_name(url, self.service_type)
name = '.'.join([self.service_type, method] + name_parts)
@ -145,10 +145,7 @@ class OpenStackSDKAdapter(adapter.Adapter):
connect_retries=connect_retries, raise_exc=raise_exc,
tag=self.service_type,
**kwargs)
if run_async:
return ret
else:
return ret.result()
return ret.result()
def _version_matches(self, version):
api_version = self.get_api_major_version()
@ -160,11 +157,6 @@ class OpenStackSDKAdapter(adapter.Adapter):
class ShadeAdapter(OpenStackSDKAdapter):
"""Wrapper for shade methods that expect json unpacking."""
def request(self, url, method,
run_async=False, error_message=None, **kwargs):
response = super(ShadeAdapter, self).request(
url, method, run_async=run_async, **kwargs)
if run_async:
return response
else:
return _json_response(response, error_message=error_message)
def request(self, url, method, error_message=None, **kwargs):
response = super(ShadeAdapter, self).request(url, method, **kwargs)
return _json_response(response, error_message=error_message)

View File

@ -12,6 +12,7 @@
import base64
import collections
import concurrent.futures
import copy
import datetime
import functools
@ -51,7 +52,6 @@ from openstack.cloud import meta
from openstack.cloud import _utils
import openstack.config
import openstack.config.defaults
from openstack import task_manager
from openstack import utils
# Rackspace returns this for intermittent import errors
@ -259,6 +259,8 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
self._container_cache = dict()
self._file_hash_cache = dict()
self.__pool_executor = None
self._raw_clients = {}
self._local_ipv6 = (
@ -7828,6 +7830,42 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
if entry['path'] == '/{name}'.format(name=name):
entry['etag'] = result.headers['Etag']
@property
def _pool_executor(self):
if not self.__pool_executor:
# TODO(mordred) Make this configurable - and probably use Futurist
# instead of concurrent.futures so that people using Eventlet will
# be happier.
self.__pool_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=5)
return self.__pool_executor
def _wait_for_futures(self, futures, raise_on_error=True):
'''Collect results or failures from a list of running future tasks.'''
results = []
retries = []
# Check on each result as its thread finishes
for completed in concurrent.futures.as_completed(futures):
try:
result = completed.result()
exceptions.raise_from_response(result)
results.append(result)
except (keystoneauth1.exceptions.RetriableConnectionFailure,
exceptions.HttpException) as e:
error_text = "Exception processing async task: {}".format(
str(e))
if raise_on_error:
self.log.exception(error_text)
raise
else:
self.log.debug(error_text)
# If we get an exception, put the result into a list so we
# can try again
retries.append(completed.result())
return results, retries
def _upload_large_object(
self, endpoint, filename,
headers, file_size, segment_size, use_slo):
@ -7851,8 +7889,10 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
# Schedule the segments for upload
for name, segment in segments.items():
# Async call to put - schedules execution and returns a future
segment_future = self._object_store_client.put(
name, headers=headers, data=segment, run_async=True)
segment_future = self._pool_executor.submit(
self.object_store.put,
name, headers=headers, data=segment,
raise_exc=False)
segment_futures.append(segment_future)
# TODO(mordred) Collect etags from results to add to this manifest
# dict. Then sort the list of dicts by path.
@ -7861,7 +7901,7 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
size_bytes=segment.length))
# Try once and collect failed results to retry
segment_results, retry_results = task_manager.wait_for_futures(
segment_results, retry_results = self._wait_for_futures(
segment_futures, raise_on_error=False)
self._add_etag_to_manifest(segment_results, manifest)
@ -7872,14 +7912,15 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
segment = segments[name]
segment.seek(0)
# Async call to put - schedules execution and returns a future
segment_future = self._object_store_client.put(
name, headers=headers, data=segment, run_async=True)
segment_future = self._pool_executor.submit(
self.object_store.put,
name, headers=headers, data=segment)
# TODO(mordred) Collect etags from results to add to this manifest
# dict. Then sort the list of dicts by path.
retry_futures.append(segment_future)
# If any segments fail the second time, just throw the error
segment_results, retry_results = task_manager.wait_for_futures(
segment_results, retry_results = self._wait_for_futures(
retry_futures, raise_on_error=True)
self._add_etag_to_manifest(segment_results, manifest)