Merge "Shift swift segment async code out of adapter"
This commit is contained in:
commit
4e2385e695
|
@ -132,7 +132,7 @@ class OpenStackSDKAdapter(adapter.Adapter):
|
|||
self.task_manager = task_manager
|
||||
|
||||
def request(
|
||||
self, url, method, run_async=False, error_message=None,
|
||||
self, url, method, error_message=None,
|
||||
raise_exc=False, connect_retries=1, *args, **kwargs):
|
||||
name_parts = _extract_name(url, self.service_type)
|
||||
name = '.'.join([self.service_type, method] + name_parts)
|
||||
|
@ -145,10 +145,7 @@ class OpenStackSDKAdapter(adapter.Adapter):
|
|||
connect_retries=connect_retries, raise_exc=raise_exc,
|
||||
tag=self.service_type,
|
||||
**kwargs)
|
||||
if run_async:
|
||||
return ret
|
||||
else:
|
||||
return ret.result()
|
||||
return ret.result()
|
||||
|
||||
def _version_matches(self, version):
|
||||
api_version = self.get_api_major_version()
|
||||
|
@ -160,11 +157,6 @@ class OpenStackSDKAdapter(adapter.Adapter):
|
|||
class ShadeAdapter(OpenStackSDKAdapter):
|
||||
"""Wrapper for shade methods that expect json unpacking."""
|
||||
|
||||
def request(self, url, method,
|
||||
run_async=False, error_message=None, **kwargs):
|
||||
response = super(ShadeAdapter, self).request(
|
||||
url, method, run_async=run_async, **kwargs)
|
||||
if run_async:
|
||||
return response
|
||||
else:
|
||||
return _json_response(response, error_message=error_message)
|
||||
def request(self, url, method, error_message=None, **kwargs):
|
||||
response = super(ShadeAdapter, self).request(url, method, **kwargs)
|
||||
return _json_response(response, error_message=error_message)
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
|
||||
import base64
|
||||
import collections
|
||||
import concurrent.futures
|
||||
import copy
|
||||
import datetime
|
||||
import functools
|
||||
|
@ -51,7 +52,6 @@ from openstack.cloud import meta
|
|||
from openstack.cloud import _utils
|
||||
import openstack.config
|
||||
import openstack.config.defaults
|
||||
from openstack import task_manager
|
||||
from openstack import utils
|
||||
|
||||
# Rackspace returns this for intermittent import errors
|
||||
|
@ -259,6 +259,8 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
|
|||
self._container_cache = dict()
|
||||
self._file_hash_cache = dict()
|
||||
|
||||
self.__pool_executor = None
|
||||
|
||||
self._raw_clients = {}
|
||||
|
||||
self._local_ipv6 = (
|
||||
|
@ -7828,6 +7830,42 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
|
|||
if entry['path'] == '/{name}'.format(name=name):
|
||||
entry['etag'] = result.headers['Etag']
|
||||
|
||||
@property
|
||||
def _pool_executor(self):
|
||||
if not self.__pool_executor:
|
||||
# TODO(mordred) Make this configurable - and probably use Futurist
|
||||
# instead of concurrent.futures so that people using Eventlet will
|
||||
# be happier.
|
||||
self.__pool_executor = concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=5)
|
||||
return self.__pool_executor
|
||||
|
||||
def _wait_for_futures(self, futures, raise_on_error=True):
|
||||
'''Collect results or failures from a list of running future tasks.'''
|
||||
|
||||
results = []
|
||||
retries = []
|
||||
|
||||
# Check on each result as its thread finishes
|
||||
for completed in concurrent.futures.as_completed(futures):
|
||||
try:
|
||||
result = completed.result()
|
||||
exceptions.raise_from_response(result)
|
||||
results.append(result)
|
||||
except (keystoneauth1.exceptions.RetriableConnectionFailure,
|
||||
exceptions.HttpException) as e:
|
||||
error_text = "Exception processing async task: {}".format(
|
||||
str(e))
|
||||
if raise_on_error:
|
||||
self.log.exception(error_text)
|
||||
raise
|
||||
else:
|
||||
self.log.debug(error_text)
|
||||
# If we get an exception, put the result into a list so we
|
||||
# can try again
|
||||
retries.append(completed.result())
|
||||
return results, retries
|
||||
|
||||
def _upload_large_object(
|
||||
self, endpoint, filename,
|
||||
headers, file_size, segment_size, use_slo):
|
||||
|
@ -7851,8 +7889,10 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
|
|||
# Schedule the segments for upload
|
||||
for name, segment in segments.items():
|
||||
# Async call to put - schedules execution and returns a future
|
||||
segment_future = self._object_store_client.put(
|
||||
name, headers=headers, data=segment, run_async=True)
|
||||
segment_future = self._pool_executor.submit(
|
||||
self.object_store.put,
|
||||
name, headers=headers, data=segment,
|
||||
raise_exc=False)
|
||||
segment_futures.append(segment_future)
|
||||
# TODO(mordred) Collect etags from results to add to this manifest
|
||||
# dict. Then sort the list of dicts by path.
|
||||
|
@ -7861,7 +7901,7 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
|
|||
size_bytes=segment.length))
|
||||
|
||||
# Try once and collect failed results to retry
|
||||
segment_results, retry_results = task_manager.wait_for_futures(
|
||||
segment_results, retry_results = self._wait_for_futures(
|
||||
segment_futures, raise_on_error=False)
|
||||
|
||||
self._add_etag_to_manifest(segment_results, manifest)
|
||||
|
@ -7872,14 +7912,15 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
|
|||
segment = segments[name]
|
||||
segment.seek(0)
|
||||
# Async call to put - schedules execution and returns a future
|
||||
segment_future = self._object_store_client.put(
|
||||
name, headers=headers, data=segment, run_async=True)
|
||||
segment_future = self._pool_executor.submit(
|
||||
self.object_store.put,
|
||||
name, headers=headers, data=segment)
|
||||
# TODO(mordred) Collect etags from results to add to this manifest
|
||||
# dict. Then sort the list of dicts by path.
|
||||
retry_futures.append(segment_future)
|
||||
|
||||
# If any segments fail the second time, just throw the error
|
||||
segment_results, retry_results = task_manager.wait_for_futures(
|
||||
segment_results, retry_results = self._wait_for_futures(
|
||||
retry_futures, raise_on_error=True)
|
||||
|
||||
self._add_etag_to_manifest(segment_results, manifest)
|
||||
|
|
Loading…
Reference in New Issue