Remove the task manager

The underlying openstacksdk library is shifting how task manager works,
so stop trying to send it a task manager. This is a feature basically in
place just for nodepool which is being expanded to be more usable by
everyone. The likelihood that anyone other than nodepool is using it is
... very low.

Change-Id: I04be3937589a805a5f9686c91a78933eebcfa022
This commit is contained in:
Monty Taylor 2018-09-25 07:17:39 -05:00
parent d0da0b0323
commit 28e95889a0
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
5 changed files with 6 additions and 565 deletions

View File

@ -39,13 +39,6 @@ Most of the logging is set up to log to the root `shade` logger. There are
additional sub-loggers that are used at times, primarily so that a user can
decide to turn on or off a specific type of logging. They are listed below.
shade.task_manager
`shade` uses a Task Manager to perform remote calls. The `shade.task_manager`
logger emits messages at the start and end of each Task announging what
it is going to run and then what it ran and how long it took. Logging
`shade.task_manager` is a good way to get a trace of external actions shade
is taking without full `HTTP Tracing`_.
shade.request_ids
The `shade.request_ids` logger emits a log line at the end of each HTTP
interaction with the OpenStack Request ID associated with the interaction.

View File

@ -0,0 +1,6 @@
---
upgrade:
- |
The ``manager`` parameter is no longer meaningful. This should have no
impact as the only known consumer of the feature is nodepool which
no longer uses shade.

View File

@ -11,7 +11,6 @@
# limitations under the License.
import base64
import collections
import copy
import datetime
import functools
@ -35,7 +34,6 @@ import os
from openstack.cloud import _utils
from openstack.config import loader
from openstack import connection
from openstack import task_manager
from openstack import utils
from shade import exc
@ -135,7 +133,6 @@ class OpenStackCloud(
super(OpenStackCloud, self).__init__(
config=cloud_config,
strict=strict,
task_manager=manager,
app_name=app_name,
app_version=app_version,
use_direct_get=use_direct_get,
@ -7270,118 +7267,6 @@ class OpenStackCloud(
endpoint, filename, headers,
file_size, segment_size, use_slo)
def _upload_object(self, endpoint, filename, headers):
return self._object_store_client.put(
endpoint, headers=headers, data=open(filename, 'r'))
def _get_file_segments(self, endpoint, filename, file_size, segment_size):
# Use an ordered dict here so that testing can replicate things
segments = collections.OrderedDict()
for (index, offset) in enumerate(range(0, file_size, segment_size)):
remaining = file_size - (index * segment_size)
segment = _utils.FileSegment(
filename, offset,
segment_size if segment_size < remaining else remaining)
name = '{endpoint}/{index:0>6}'.format(
endpoint=endpoint, index=index)
segments[name] = segment
return segments
def _object_name_from_url(self, url):
'''Get container_name/object_name from the full URL called.
Remove the Swift endpoint from the front of the URL, and remove
the leaving / that will leave behind.'''
endpoint = self._object_store_client.get_endpoint()
object_name = url.replace(endpoint, '')
if object_name.startswith('/'):
object_name = object_name[1:]
return object_name
def _add_etag_to_manifest(self, segment_results, manifest):
for result in segment_results:
if 'Etag' not in result.headers:
continue
name = self._object_name_from_url(result.url)
for entry in manifest:
if entry['path'] == '/{name}'.format(name=name):
entry['etag'] = result.headers['Etag']
def _upload_large_object(
self, endpoint, filename,
headers, file_size, segment_size, use_slo):
# If the object is big, we need to break it up into segments that
# are no larger than segment_size, upload each of them individually
# and then upload a manifest object. The segments can be uploaded in
# parallel, so we'll use the async feature of the TaskManager.
segment_futures = []
segment_results = []
retry_results = []
retry_futures = []
manifest = []
# Get an OrderedDict with keys being the swift location for the
# segment, the value a FileSegment file-like object that is a
# slice of the data for the segment.
segments = self._get_file_segments(
endpoint, filename, file_size, segment_size)
# Schedule the segments for upload
for name, segment in segments.items():
# Async call to put - schedules execution and returns a future
segment_future = self._object_store_client.put(
name, headers=headers, data=segment, run_async=True)
segment_futures.append(segment_future)
# TODO(mordred) Collect etags from results to add to this manifest
# dict. Then sort the list of dicts by path.
manifest.append(dict(
path='/{name}'.format(name=name),
size_bytes=segment.length))
# Try once and collect failed results to retry
segment_results, retry_results = task_manager.wait_for_futures(
segment_futures, raise_on_error=False)
self._add_etag_to_manifest(segment_results, manifest)
for result in retry_results:
# Grab the FileSegment for the failed upload so we can retry
name = self._object_name_from_url(result.url)
segment = segments[name]
segment.seek(0)
# Async call to put - schedules execution and returns a future
segment_future = self._object_store_client.put(
name, headers=headers, data=segment, run_async=True)
# TODO(mordred) Collect etags from results to add to this manifest
# dict. Then sort the list of dicts by path.
retry_futures.append(segment_future)
# If any segments fail the second time, just throw the error
segment_results, retry_results = task_manager.wait_for_futures(
retry_futures, raise_on_error=True)
self._add_etag_to_manifest(segment_results, manifest)
if use_slo:
return self._finish_large_object_slo(endpoint, headers, manifest)
else:
return self._finish_large_object_dlo(endpoint, headers)
def _finish_large_object_slo(self, endpoint, headers, manifest):
# TODO(mordred) send an etag of the manifest, which is the md5sum
# of the concatenation of the etags of the results
headers = headers.copy()
return self._object_store_client.put(
endpoint,
params={'multipart-manifest': 'put'},
headers=headers, data=json.dumps(manifest))
def _finish_large_object_dlo(self, endpoint, headers):
headers = headers.copy()
headers['X-Object-Manifest'] = endpoint
return self._object_store_client.put(endpoint, headers=headers)
def update_object(self, container, name, metadata=None, **headers):
"""Update the metadata of an object

View File

@ -1,334 +0,0 @@
# Copyright (C) 2011-2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import concurrent.futures
import sys
import threading
import time
import types
import keystoneauth1.exceptions
import six
from shade import _log
from shade import exc
from shade import meta
def _is_listlike(obj):
# NOTE(Shrews): Since the client API might decide to subclass one
# of these result types, we use isinstance() here instead of type().
return (
isinstance(obj, list) or
isinstance(obj, types.GeneratorType))
def _is_objlike(obj):
# NOTE(Shrews): Since the client API might decide to subclass one
# of these result types, we use isinstance() here instead of type().
return (
not isinstance(obj, bool) and
not isinstance(obj, int) and
not isinstance(obj, float) and
not isinstance(obj, six.string_types) and
not isinstance(obj, set) and
not isinstance(obj, tuple))
@six.add_metaclass(abc.ABCMeta)
class BaseTask(object):
"""Represent a task to be performed on an OpenStack Cloud.
Some consumers need to inject things like rate-limiting or auditing
around each external REST interaction. Task provides an interface
to encapsulate each such interaction. Also, although shade itself
operates normally in a single-threaded direct action manner, consuming
programs may provide a multi-threaded TaskManager themselves. For that
reason, Task uses threading events to ensure appropriate wait conditions.
These should be a no-op in single-threaded applications.
A consumer is expected to overload the main method.
:param dict kw: Any args that are expected to be passed to something in
the main payload at execution time.
"""
def __init__(self, **kw):
self._exception = None
self._traceback = None
self._result = None
self._response = None
self._finished = threading.Event()
self.run_async = False
self.args = kw
self.name = type(self).__name__
@abc.abstractmethod
def main(self, client):
""" Override this method with the actual workload to be performed """
def done(self, result):
self._result = result
self._finished.set()
def exception(self, e, tb):
self._exception = e
self._traceback = tb
self._finished.set()
def wait(self, raw=False):
self._finished.wait()
if self._exception:
six.reraise(type(self._exception), self._exception,
self._traceback)
return self._result
def run(self, client):
self._client = client
try:
# Retry one time if we get a retriable connection failure
try:
# Keep time for connection retrying logging
start = time.time()
self.done(self.main(client))
except keystoneauth1.exceptions.RetriableConnectionFailure as e:
end = time.time()
dt = end - start
if client.region_name:
client.log.debug(str(e))
client.log.debug(
"Connection failure on %(cloud)s:%(region)s"
" for %(name)s after %(secs)s seconds, retrying",
{'cloud': client.name,
'region': client.region_name,
'secs': dt,
'name': self.name})
else:
client.log.debug(
"Connection failure on %(cloud)s for %(name)s after"
" %(secs)s seconds, retrying",
{'cloud': client.name, 'name': self.name, 'secs': dt})
self.done(self.main(client))
except Exception:
raise
except Exception as e:
self.exception(e, sys.exc_info()[2])
class Task(BaseTask):
""" Shade specific additions to the BaseTask Interface. """
def wait(self, raw=False):
super(Task, self).wait()
if raw:
# Do NOT convert the result.
return self._result
if _is_listlike(self._result):
return meta.obj_list_to_munch(self._result)
elif _is_objlike(self._result):
return meta.obj_to_munch(self._result)
else:
return self._result
class RequestTask(BaseTask):
""" Extensions to the Shade Tasks to handle raw requests """
# It's totally legit for calls to not return things
result_key = None
# keystoneauth1 throws keystoneauth1.exceptions.http.HttpError on !200
def done(self, result):
self._response = result
try:
result_json = self._response.json()
except Exception as e:
result_json = self._response.text
self._client.log.debug(
'Could not decode json in response: %(e)s', {'e': str(e)})
self._client.log.debug(result_json)
if self.result_key:
self._result = result_json[self.result_key]
else:
self._result = result_json
self._request_id = self._response.headers.get('x-openstack-request-id')
self._finished.set()
def wait(self, raw=False):
super(RequestTask, self).wait()
if raw:
# Do NOT convert the result.
return self._result
if _is_listlike(self._result):
return meta.obj_list_to_munch(
self._result, request_id=self._request_id)
elif _is_objlike(self._result):
return meta.obj_to_munch(self._result, request_id=self._request_id)
return self._result
def _result_filter_cb(result):
return result
def generate_task_class(method, name, result_filter_cb):
if name is None:
if callable(method):
name = method.__name__
else:
name = method
class RunTask(Task):
def __init__(self, **kw):
super(RunTask, self).__init__(**kw)
self.name = name
self._method = method
def wait(self, raw=False):
super(RunTask, self).wait()
if raw:
# Do NOT convert the result.
return self._result
return result_filter_cb(self._result)
def main(self, client):
if callable(self._method):
return method(**self.args)
else:
meth = getattr(client, self._method)
return meth(**self.args)
return RunTask
class TaskManager(object):
log = _log.setup_logging('shade.task_manager')
def __init__(
self, client, name, result_filter_cb=None, workers=5, **kwargs):
self.name = name
self._client = client
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=workers)
if not result_filter_cb:
self._result_filter_cb = _result_filter_cb
else:
self._result_filter_cb = result_filter_cb
def set_client(self, client):
self._client = client
def stop(self):
""" This is a direct action passthrough TaskManager """
self._executor.shutdown(wait=True)
def run(self):
""" This is a direct action passthrough TaskManager """
pass
def submit_task(self, task, raw=False):
"""Submit and execute the given task.
:param task: The task to execute.
:param bool raw: If True, return the raw result as received from the
underlying client call.
"""
return self.run_task(task=task, raw=raw)
def _run_task_async(self, task, raw=False):
self.log.debug(
"Manager %s submitting task %s", self.name, task.name)
return self._executor.submit(self._run_task, task, raw=raw)
def run_task(self, task, raw=False):
if hasattr(task, 'run_async') and task.run_async:
return self._run_task_async(task, raw=raw)
else:
return self._run_task(task, raw=raw)
def _run_task(self, task, raw=False):
self.log.debug(
"Manager %s running task %s", self.name, task.name)
start = time.time()
task.run(self._client)
end = time.time()
dt = end - start
self.log.debug(
"Manager %s ran task %s in %ss", self.name, task.name, dt)
self.post_run_task(dt, task)
return task.wait(raw)
def post_run_task(self, elasped_time, task):
pass
# Backwards compatibility
submitTask = submit_task
def submit_function(
self, method, name=None, result_filter_cb=None, **kwargs):
""" Allows submitting an arbitrary method for work.
:param method: Method to run in the TaskManager. Can be either the
name of a method to find on self.client, or a callable.
"""
if not result_filter_cb:
result_filter_cb = self._result_filter_cb
task_class = generate_task_class(method, name, result_filter_cb)
return self._executor.submit_task(task_class(**kwargs))
def wait_for_futures(futures, raise_on_error=True, log=None):
'''Collect results or failures from a list of running future tasks.'''
results = []
retries = []
# Check on each result as its thread finishes
for completed in concurrent.futures.as_completed(futures):
try:
result = completed.result()
# We have to do this here because munch_response doesn't
# get called on async job results
exc.raise_from_response(result)
results.append(result)
except (keystoneauth1.exceptions.RetriableConnectionFailure,
exc.OpenStackCloudException) as e:
if log:
log.debug(
"Exception processing async task: {e}".format(
e=str(e)),
exc_info=True)
# If we get an exception, put the result into a list so we
# can try again
if raise_on_error:
raise
else:
retries.append(result)
return results, retries

View File

@ -1,109 +0,0 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import concurrent.futures
import mock
from shade import task_manager
from shade.tests.unit import base
class TestException(Exception):
pass
class TaskTest(task_manager.Task):
def main(self, client):
raise TestException("This is a test exception")
class TaskTestGenerator(task_manager.Task):
def main(self, client):
yield 1
class TaskTestInt(task_manager.Task):
def main(self, client):
return int(1)
class TaskTestFloat(task_manager.Task):
def main(self, client):
return float(2.0)
class TaskTestStr(task_manager.Task):
def main(self, client):
return "test"
class TaskTestBool(task_manager.Task):
def main(self, client):
return True
class TaskTestSet(task_manager.Task):
def main(self, client):
return set([1, 2])
class TaskTestAsync(task_manager.Task):
def __init__(self):
super(task_manager.Task, self).__init__()
self.run_async = True
def main(self, client):
pass
class TestTaskManager(base.RequestsMockTestCase):
def setUp(self):
super(TestTaskManager, self).setUp()
self.manager = task_manager.TaskManager(name='test', client=self)
def test_wait_re_raise(self):
"""Test that Exceptions thrown in a Task is reraised correctly
This test is aimed to six.reraise(), called in Task::wait().
Specifically, we test if we get the same behaviour with all the
configured interpreters (e.g. py27, p34, pypy, ...)
"""
self.assertRaises(TestException, self.manager.submit_task, TaskTest())
def test_dont_munchify_int(self):
ret = self.manager.submit_task(TaskTestInt())
self.assertIsInstance(ret, int)
def test_dont_munchify_float(self):
ret = self.manager.submit_task(TaskTestFloat())
self.assertIsInstance(ret, float)
def test_dont_munchify_str(self):
ret = self.manager.submit_task(TaskTestStr())
self.assertIsInstance(ret, str)
def test_dont_munchify_bool(self):
ret = self.manager.submit_task(TaskTestBool())
self.assertIsInstance(ret, bool)
def test_dont_munchify_set(self):
ret = self.manager.submit_task(TaskTestSet())
self.assertIsInstance(ret, set)
@mock.patch.object(concurrent.futures.ThreadPoolExecutor, 'submit')
def test_async(self, mock_submit):
self.manager.submit_task(TaskTestAsync())
self.assertTrue(mock_submit.called)