diff --git a/heat/engine/resource.py b/heat/engine/resource.py index c7bacb3b5d..e2f51e5c5c 100644 --- a/heat/engine/resource.py +++ b/heat/engine/resource.py @@ -1005,7 +1005,6 @@ class Resource(status.ResourceStatus): action ) - @scheduler.wrappertask def _do_action(self, action, pre_func=None, resource_data=None): """Perform a transition to a new state via a specified action. @@ -1028,7 +1027,7 @@ class Resource(status.ResourceStatus): pre_func() handler_args = [resource_data] if resource_data is not None else [] - yield self.action_handler_task(action, args=handler_args) + yield from self.action_handler_task(action, args=handler_args) def _update_stored_properties(self): old_props = self._stored_properties_data @@ -1189,7 +1188,6 @@ class Resource(status.ResourceStatus): message="%s" % error_message) raise - @scheduler.wrappertask def create(self): """Create the resource. @@ -1203,18 +1201,18 @@ class Resource(status.ResourceStatus): raise exception.ResourceFailure(exc, self, action) if self.external_id is not None: - yield self._do_action(self.ADOPT, - resource_data={ - 'resource_id': self.external_id}) - yield self.check() + yield from self._do_action(self.ADOPT, + resource_data={ + 'resource_id': self.external_id}) + yield from self.check() return # This method can be called when we replace a resource, too. In that # case, a hook has already been dealt with in `Resource.update` so we # shouldn't do it here again: if self.stack.action == self.stack.CREATE: - yield self._break_if_required( - self.CREATE, environment.HOOK_PRE_CREATE) + yield from self._break_if_required(self.CREATE, + environment.HOOK_PRE_CREATE) LOG.info('creating %s', self) @@ -1239,13 +1237,13 @@ class Resource(status.ResourceStatus): delay = timeutils.retry_backoff_delay(count[action], jitter_max=2.0) waiter = scheduler.TaskRunner(self.pause) - yield waiter.as_task(timeout=delay) + yield from waiter.as_task(timeout=delay) elif action == self.CREATE: # Only validate properties in first create call. pre_func = self.properties.validate try: - yield self._do_action(action, pre_func) + yield from self._do_action(action, pre_func) if action == self.CREATE: first_failure = None break @@ -1282,8 +1280,8 @@ class Resource(status.ResourceStatus): raise first_failure if self.stack.action == self.stack.CREATE: - yield self._break_if_required( - self.CREATE, environment.HOOK_POST_CREATE) + yield from self._break_if_required(self.CREATE, + environment.HOOK_POST_CREATE) @staticmethod def pause(): @@ -1312,7 +1310,7 @@ class Resource(status.ResourceStatus): adopt. """ self._update_stored_properties() - return self._do_action(self.ADOPT, resource_data=resource_data) + yield from self._do_action(self.ADOPT, resource_data=resource_data) def handle_adopt(self, resource_data=None): resource_id, data, metadata = self._get_resource_info(resource_data) @@ -1606,7 +1604,6 @@ class Resource(status.ResourceStatus): elif new_template_id is not None: self.store(lock=lock) - @scheduler.wrappertask def update(self, after, before=None, prev_resource=None, new_template_id=None, new_requires=None): """Return a task to update the resource. @@ -1634,8 +1631,8 @@ class Resource(status.ResourceStatus): after_props, before_props = self._prepare_update_props(after, before) - yield self._break_if_required( - self.UPDATE, environment.HOOK_PRE_UPDATE) + yield from self._break_if_required(self.UPDATE, + environment.HOOK_PRE_UPDATE) try: registry = self.stack.env.registry @@ -1695,9 +1692,9 @@ class Resource(status.ResourceStatus): if new_template_id is not None: self.current_template_id = new_template_id - yield self.action_handler_task(action, - args=[after, tmpl_diff, - prop_diff]) + yield from self.action_handler_task(action, + args=[after, tmpl_diff, + prop_diff]) except UpdateReplace: with excutils.save_and_reraise_exception(): self.current_template_id = self.old_template_id @@ -1710,8 +1707,8 @@ class Resource(status.ResourceStatus): if new_requires is not None: self.requires = new_requires - yield self._break_if_required( - self.UPDATE, environment.HOOK_POST_UPDATE) + yield from self._break_if_required(self.UPDATE, + environment.HOOK_POST_UPDATE) def prepare_for_replace(self): """Prepare resource for replacing. @@ -1731,7 +1728,6 @@ class Resource(status.ResourceStatus): """ pass - @scheduler.wrappertask def check(self): """Checks that the physical resource is in its expected state. @@ -1753,7 +1749,7 @@ class Resource(status.ResourceStatus): raise failure with self.frozen_properties(): - yield self._do_action(action) + yield from self._do_action(action) else: if self.state == (self.INIT, self.COMPLETE): # No need to store status; better to leave the resource in @@ -1778,7 +1774,6 @@ class Resource(status.ResourceStatus): if invalid_checks: raise exception.Error('; '.join(invalid_checks)) - @scheduler.wrappertask def suspend(self): """Return a task to suspend the resource. @@ -1798,9 +1793,8 @@ class Resource(status.ResourceStatus): LOG.info('suspending %s', self) with self.frozen_properties(): - yield self._do_action(action) + yield from self._do_action(action) - @scheduler.wrappertask def resume(self): """Return a task to resume the resource. @@ -1820,18 +1814,16 @@ class Resource(status.ResourceStatus): LOG.info('resuming %s', self) with self.frozen_properties(): - yield self._do_action(action) + yield from self._do_action(action) - @scheduler.wrappertask def snapshot(self): """Snapshot the resource and return the created data, if any.""" LOG.info('snapshotting %s', self) with self.frozen_properties(): - yield self._do_action(self.SNAPSHOT) + yield from self._do_action(self.SNAPSHOT) - @scheduler.wrappertask def delete_snapshot(self, data): - yield self.action_handler_task('delete_snapshot', args=[data]) + yield from self.action_handler_task('delete_snapshot', args=[data]) def physical_resource_name(self): if self.id is None or self.action == self.INIT: @@ -1981,7 +1973,6 @@ class Resource(status.ResourceStatus): return self.resource_id return None - @scheduler.wrappertask def delete(self): """A task to delete the resource. @@ -2009,8 +2000,8 @@ class Resource(status.ResourceStatus): # case, a hook has already been dealt with in `Resource.update` so we # shouldn't do it here again: if self.stack.action == self.stack.DELETE: - yield self._break_if_required( - self.DELETE, environment.HOOK_PRE_DELETE) + yield from self._break_if_required(self.DELETE, + environment.HOOK_PRE_DELETE) LOG.info('deleting %s', self) @@ -2045,20 +2036,19 @@ class Resource(status.ResourceStatus): delay = timeutils.retry_backoff_delay(count, jitter_max=2.0) waiter = scheduler.TaskRunner(self.pause) - yield waiter.as_task(timeout=delay) + yield from waiter.as_task(timeout=delay) with excutils.exception_filter(should_retry): - yield self.action_handler_task(action, - *action_args) + yield from self.action_handler_task(action, + *action_args) break if self.stack.action == self.stack.DELETE: - yield self._break_if_required( - self.DELETE, environment.HOOK_POST_DELETE) + yield from self._break_if_required(self.DELETE, + environment.HOOK_POST_DELETE) - @scheduler.wrappertask def destroy(self): """A task to delete the resource and remove it from the database.""" - yield self.delete() + yield from self.delete() if self.id is None: return diff --git a/heat/engine/scheduler.py b/heat/engine/scheduler.py index 48104d6287..1e97343b13 100644 --- a/heat/engine/scheduler.py +++ b/heat/engine/scheduler.py @@ -15,6 +15,7 @@ import functools import sys import types +import debtcollector import eventlet from oslo_log import log as logging from oslo_utils import encodeutils @@ -293,7 +294,11 @@ class TaskRunner(object): return self.__nonzero__() -def wrappertask(task): # noqa: C901 +@debtcollector.removals.remove(message="Use the Python 3 'yield from' keyword " + "in place of 'yield', instead of " + "decorating with @wrappertask.", + stacklevel=1) +def wrappertask(task): """Decorator for a task that needs to drive a subtask. This is essentially a replacement for the Python 3-only "yield from" @@ -311,6 +316,9 @@ def wrappertask(task): # noqa: C901 @functools.wraps(task) def wrapper(*args, **kwargs): + # This could be simplified by using 'yield from' for the parent loop + # as well, but not without adding yet another frame to the stack + # for the subtasks. parent = task(*args, **kwargs) try: @@ -321,28 +329,7 @@ def wrappertask(task): # noqa: C901 while True: try: if isinstance(subtask, types.GeneratorType): - subtask_running = True - try: - step = next(subtask) - except StopIteration: - subtask_running = False - - while subtask_running: - try: - yield step - except GeneratorExit: - subtask.close() - raise - except: # noqa - try: - step = subtask.throw(*sys.exc_info()) - except StopIteration: - subtask_running = False - else: - try: - step = next(subtask) - except StopIteration: - subtask_running = False + yield from subtask else: yield subtask except GeneratorExit: diff --git a/heat/engine/stack.py b/heat/engine/stack.py index e9a124a02e..be3323b8bc 100644 --- a/heat/engine/stack.py +++ b/heat/engine/stack.py @@ -1155,7 +1155,6 @@ class Stack(collections.Mapping): return {'resource_data': data['resources'].get(resource.name)} - @scheduler.wrappertask def stack_task(self, action, reverse=False, post_func=None, aggregate_exceptions=False, pre_completion_func=None, notify=None): @@ -1204,12 +1203,11 @@ class Stack(collections.Mapping): lambda x: {}) @functools.wraps(getattr(resource.Resource, action_method)) - @scheduler.wrappertask def resource_action(r): # Find e.g resource.create and call it handle = getattr(r, action_method) - yield handle(**handle_kwargs(r)) + yield from handle(**handle_kwargs(r)) if action == self.CREATE: stk_defn.update_resource_data(self.defn, r.name, r.node_data()) @@ -1225,7 +1223,7 @@ class Stack(collections.Mapping): aggregate_exceptions=aggregate_exceptions) try: - yield action_task() + yield from action_task() except scheduler.Timeout: stack_status = self.FAILED reason = '%s timed out' % action.title() @@ -1591,7 +1589,6 @@ class Stack(collections.Mapping): self.state_set(self.action, self.FAILED, str(reason)) - @scheduler.wrappertask def update_task(self, newstack, action=UPDATE, msg_queue=None, notify=None): if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE): @@ -1674,8 +1671,8 @@ class Stack(collections.Mapping): check_message = functools.partial(self._check_for_message, msg_queue) try: - yield updater.as_task(timeout=self.timeout_secs(), - progress_callback=check_message) + yield from updater.as_task(timeout=self.timeout_secs(), + progress_callback=check_message) finally: self.reset_dependencies() @@ -1691,7 +1688,7 @@ class Stack(collections.Mapping): # so we roll back to the original state should_rollback = self._update_exception_handler(e, action) if should_rollback: - yield self.update_task(oldstack, action=self.ROLLBACK) + yield from self.update_task(oldstack, action=self.ROLLBACK) except BaseException as e: with excutils.save_and_reraise_exception(): self._update_exception_handler(e, action) diff --git a/heat/engine/update.py b/heat/engine/update.py index c7be465023..913f49a887 100644 --- a/heat/engine/update.py +++ b/heat/engine/update.py @@ -45,7 +45,6 @@ class StackUpdate(object): else: return '%s Update' % str(self.existing_stack) - @scheduler.wrappertask def __call__(self): """Return a co-routine that updates the stack.""" @@ -63,10 +62,10 @@ class StackUpdate(object): error_wait_time=get_error_wait_time) if not self.rollback: - yield cleanup_prev() + yield from cleanup_prev() try: - yield updater() + yield from updater() finally: self.previous_stack.reset_dependencies() @@ -76,12 +75,11 @@ class StackUpdate(object): else: return self._process_existing_resource_update(res) - @scheduler.wrappertask def _remove_backup_resource(self, prev_res): if prev_res.state not in ((prev_res.INIT, prev_res.COMPLETE), (prev_res.DELETE, prev_res.COMPLETE)): LOG.debug("Deleting backup resource %s", prev_res.name) - yield prev_res.destroy() + yield from prev_res.destroy() @staticmethod def _exchange_stacks(existing_res, prev_res): @@ -91,7 +89,6 @@ class StackUpdate(object): prev_stack.add_resource(existing_res) existing_stack.add_resource(prev_res) - @scheduler.wrappertask def _create_resource(self, new_res): res_name = new_res.name @@ -110,7 +107,7 @@ class StackUpdate(object): return LOG.debug("Deleting backup Resource %s", res_name) - yield prev_res.destroy() + yield from prev_res.destroy() # Back up existing resource if res_name in self.existing_stack: @@ -131,7 +128,7 @@ class StackUpdate(object): self.previous_stack.t.add_resource(new_res.t) self.previous_stack.t.store(self.previous_stack.context) - yield new_res.create() + yield from new_res.create() self._update_resource_data(new_res) @@ -160,7 +157,6 @@ class StackUpdate(object): stk_defn.update_resource_data(self.new_stack.defn, resource.name, node_data) - @scheduler.wrappertask def _process_new_resource_update(self, new_res): res_name = new_res.name @@ -169,9 +165,9 @@ class StackUpdate(object): is_substituted = existing_res.check_is_substituted(type(new_res)) if type(existing_res) is type(new_res) or is_substituted: try: - yield self._update_in_place(existing_res, - new_res, - is_substituted) + yield from self._update_in_place(existing_res, + new_res, + is_substituted) except resource.UpdateReplace: pass else: @@ -195,7 +191,7 @@ class StackUpdate(object): else: self._check_replace_restricted(new_res) - yield self._create_resource(new_res) + yield from self._create_resource(new_res) def _update_in_place(self, existing_res, new_res, is_substituted=False): existing_snippet = self.existing_snippets[existing_res.name] @@ -214,15 +210,15 @@ class StackUpdate(object): existing_res.stack.resources[existing_res.name] = substitute existing_res = substitute existing_res.converge = self.new_stack.converge - return existing_res.update(new_snippet, existing_snippet, - prev_resource=prev_res) + yield from existing_res.update(new_snippet, existing_snippet, + prev_resource=prev_res) - @scheduler.wrappertask def _process_existing_resource_update(self, existing_res): res_name = existing_res.name if res_name in self.previous_stack: - yield self._remove_backup_resource(self.previous_stack[res_name]) + backup_res = self.previous_stack[res_name] + yield from self._remove_backup_resource(backup_res) if res_name in self.new_stack: new_res = self.new_stack[res_name] @@ -231,7 +227,7 @@ class StackUpdate(object): return if existing_res.stack is not self.previous_stack: - yield existing_res.destroy() + yield from existing_res.destroy() if res_name not in self.new_stack: self.existing_stack.remove_resource(res_name) diff --git a/heat/tests/test_stack_update.py b/heat/tests/test_stack_update.py index 90a6698815..71e97c9814 100644 --- a/heat/tests/test_stack_update.py +++ b/heat/tests/test_stack_update.py @@ -2110,6 +2110,7 @@ class StackUpdateTest(common.HeatTestCase): def update(self, after, before=None, prev_resource=None): ResourceTypeB.count_b += 1 + yield resource._register_class('ResourceTypeB', ResourceTypeB) @@ -2124,6 +2125,7 @@ class StackUpdateTest(common.HeatTestCase): def update(self, after, before=None, prev_resource=None): ResourceTypeA.count_a += 1 + yield resource._register_class('ResourceTypeA', ResourceTypeA) diff --git a/requirements.txt b/requirements.txt index ad414aa173..8cc73d0f44 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 Babel!=2.4.0,>=2.3.4 # BSD croniter>=0.3.4 # MIT License cryptography>=2.1 # BSD/Apache-2.0 +debtcollector>=1.19.0 # Apache-2.0 eventlet!=0.18.3,!=0.20.1,!=0.21.0,!=0.23.0,!=0.25.0,>=0.18.2 # MIT keystoneauth1>=3.18.0 # Apache-2.0 keystonemiddleware>=4.17.0 # Apache-2.0