Deprecate wrappertask decorator

Now that we only support Python 3, use the 'yield from' keyword instead
of the @wrappertask decorator for calling co-routines from a co-routine.
This should greatly simplify backtraces when calling nested co-routines.

Change-Id: If9beaff74cf4facbc4aa4b30f31a3a087bdcad8a
This commit is contained in:
Zane Bitter 2019-12-17 16:05:08 -05:00
parent 0e2174fb3e
commit 2c58017a14
6 changed files with 64 additions and 91 deletions

View File

@ -1005,7 +1005,6 @@ class Resource(status.ResourceStatus):
action action
) )
@scheduler.wrappertask
def _do_action(self, action, pre_func=None, resource_data=None): def _do_action(self, action, pre_func=None, resource_data=None):
"""Perform a transition to a new state via a specified action. """Perform a transition to a new state via a specified action.
@ -1028,7 +1027,7 @@ class Resource(status.ResourceStatus):
pre_func() pre_func()
handler_args = [resource_data] if resource_data is not None else [] handler_args = [resource_data] if resource_data is not None else []
yield self.action_handler_task(action, args=handler_args) yield from self.action_handler_task(action, args=handler_args)
def _update_stored_properties(self): def _update_stored_properties(self):
old_props = self._stored_properties_data old_props = self._stored_properties_data
@ -1189,7 +1188,6 @@ class Resource(status.ResourceStatus):
message="%s" % error_message) message="%s" % error_message)
raise raise
@scheduler.wrappertask
def create(self): def create(self):
"""Create the resource. """Create the resource.
@ -1203,18 +1201,18 @@ class Resource(status.ResourceStatus):
raise exception.ResourceFailure(exc, self, action) raise exception.ResourceFailure(exc, self, action)
if self.external_id is not None: if self.external_id is not None:
yield self._do_action(self.ADOPT, yield from self._do_action(self.ADOPT,
resource_data={ resource_data={
'resource_id': self.external_id}) 'resource_id': self.external_id})
yield self.check() yield from self.check()
return return
# This method can be called when we replace a resource, too. In that # This method can be called when we replace a resource, too. In that
# case, a hook has already been dealt with in `Resource.update` so we # case, a hook has already been dealt with in `Resource.update` so we
# shouldn't do it here again: # shouldn't do it here again:
if self.stack.action == self.stack.CREATE: if self.stack.action == self.stack.CREATE:
yield self._break_if_required( yield from self._break_if_required(self.CREATE,
self.CREATE, environment.HOOK_PRE_CREATE) environment.HOOK_PRE_CREATE)
LOG.info('creating %s', self) LOG.info('creating %s', self)
@ -1239,13 +1237,13 @@ class Resource(status.ResourceStatus):
delay = timeutils.retry_backoff_delay(count[action], delay = timeutils.retry_backoff_delay(count[action],
jitter_max=2.0) jitter_max=2.0)
waiter = scheduler.TaskRunner(self.pause) waiter = scheduler.TaskRunner(self.pause)
yield waiter.as_task(timeout=delay) yield from waiter.as_task(timeout=delay)
elif action == self.CREATE: elif action == self.CREATE:
# Only validate properties in first create call. # Only validate properties in first create call.
pre_func = self.properties.validate pre_func = self.properties.validate
try: try:
yield self._do_action(action, pre_func) yield from self._do_action(action, pre_func)
if action == self.CREATE: if action == self.CREATE:
first_failure = None first_failure = None
break break
@ -1282,8 +1280,8 @@ class Resource(status.ResourceStatus):
raise first_failure raise first_failure
if self.stack.action == self.stack.CREATE: if self.stack.action == self.stack.CREATE:
yield self._break_if_required( yield from self._break_if_required(self.CREATE,
self.CREATE, environment.HOOK_POST_CREATE) environment.HOOK_POST_CREATE)
@staticmethod @staticmethod
def pause(): def pause():
@ -1312,7 +1310,7 @@ class Resource(status.ResourceStatus):
adopt. adopt.
""" """
self._update_stored_properties() self._update_stored_properties()
return self._do_action(self.ADOPT, resource_data=resource_data) yield from self._do_action(self.ADOPT, resource_data=resource_data)
def handle_adopt(self, resource_data=None): def handle_adopt(self, resource_data=None):
resource_id, data, metadata = self._get_resource_info(resource_data) resource_id, data, metadata = self._get_resource_info(resource_data)
@ -1606,7 +1604,6 @@ class Resource(status.ResourceStatus):
elif new_template_id is not None: elif new_template_id is not None:
self.store(lock=lock) self.store(lock=lock)
@scheduler.wrappertask
def update(self, after, before=None, prev_resource=None, def update(self, after, before=None, prev_resource=None,
new_template_id=None, new_requires=None): new_template_id=None, new_requires=None):
"""Return a task to update the resource. """Return a task to update the resource.
@ -1634,8 +1631,8 @@ class Resource(status.ResourceStatus):
after_props, before_props = self._prepare_update_props(after, before) after_props, before_props = self._prepare_update_props(after, before)
yield self._break_if_required( yield from self._break_if_required(self.UPDATE,
self.UPDATE, environment.HOOK_PRE_UPDATE) environment.HOOK_PRE_UPDATE)
try: try:
registry = self.stack.env.registry registry = self.stack.env.registry
@ -1695,9 +1692,9 @@ class Resource(status.ResourceStatus):
if new_template_id is not None: if new_template_id is not None:
self.current_template_id = new_template_id self.current_template_id = new_template_id
yield self.action_handler_task(action, yield from self.action_handler_task(action,
args=[after, tmpl_diff, args=[after, tmpl_diff,
prop_diff]) prop_diff])
except UpdateReplace: except UpdateReplace:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
self.current_template_id = self.old_template_id self.current_template_id = self.old_template_id
@ -1710,8 +1707,8 @@ class Resource(status.ResourceStatus):
if new_requires is not None: if new_requires is not None:
self.requires = new_requires self.requires = new_requires
yield self._break_if_required( yield from self._break_if_required(self.UPDATE,
self.UPDATE, environment.HOOK_POST_UPDATE) environment.HOOK_POST_UPDATE)
def prepare_for_replace(self): def prepare_for_replace(self):
"""Prepare resource for replacing. """Prepare resource for replacing.
@ -1731,7 +1728,6 @@ class Resource(status.ResourceStatus):
""" """
pass pass
@scheduler.wrappertask
def check(self): def check(self):
"""Checks that the physical resource is in its expected state. """Checks that the physical resource is in its expected state.
@ -1753,7 +1749,7 @@ class Resource(status.ResourceStatus):
raise failure raise failure
with self.frozen_properties(): with self.frozen_properties():
yield self._do_action(action) yield from self._do_action(action)
else: else:
if self.state == (self.INIT, self.COMPLETE): if self.state == (self.INIT, self.COMPLETE):
# No need to store status; better to leave the resource in # No need to store status; better to leave the resource in
@ -1778,7 +1774,6 @@ class Resource(status.ResourceStatus):
if invalid_checks: if invalid_checks:
raise exception.Error('; '.join(invalid_checks)) raise exception.Error('; '.join(invalid_checks))
@scheduler.wrappertask
def suspend(self): def suspend(self):
"""Return a task to suspend the resource. """Return a task to suspend the resource.
@ -1798,9 +1793,8 @@ class Resource(status.ResourceStatus):
LOG.info('suspending %s', self) LOG.info('suspending %s', self)
with self.frozen_properties(): with self.frozen_properties():
yield self._do_action(action) yield from self._do_action(action)
@scheduler.wrappertask
def resume(self): def resume(self):
"""Return a task to resume the resource. """Return a task to resume the resource.
@ -1820,18 +1814,16 @@ class Resource(status.ResourceStatus):
LOG.info('resuming %s', self) LOG.info('resuming %s', self)
with self.frozen_properties(): with self.frozen_properties():
yield self._do_action(action) yield from self._do_action(action)
@scheduler.wrappertask
def snapshot(self): def snapshot(self):
"""Snapshot the resource and return the created data, if any.""" """Snapshot the resource and return the created data, if any."""
LOG.info('snapshotting %s', self) LOG.info('snapshotting %s', self)
with self.frozen_properties(): with self.frozen_properties():
yield self._do_action(self.SNAPSHOT) yield from self._do_action(self.SNAPSHOT)
@scheduler.wrappertask
def delete_snapshot(self, data): def delete_snapshot(self, data):
yield self.action_handler_task('delete_snapshot', args=[data]) yield from self.action_handler_task('delete_snapshot', args=[data])
def physical_resource_name(self): def physical_resource_name(self):
if self.id is None or self.action == self.INIT: if self.id is None or self.action == self.INIT:
@ -1981,7 +1973,6 @@ class Resource(status.ResourceStatus):
return self.resource_id return self.resource_id
return None return None
@scheduler.wrappertask
def delete(self): def delete(self):
"""A task to delete the resource. """A task to delete the resource.
@ -2009,8 +2000,8 @@ class Resource(status.ResourceStatus):
# case, a hook has already been dealt with in `Resource.update` so we # case, a hook has already been dealt with in `Resource.update` so we
# shouldn't do it here again: # shouldn't do it here again:
if self.stack.action == self.stack.DELETE: if self.stack.action == self.stack.DELETE:
yield self._break_if_required( yield from self._break_if_required(self.DELETE,
self.DELETE, environment.HOOK_PRE_DELETE) environment.HOOK_PRE_DELETE)
LOG.info('deleting %s', self) LOG.info('deleting %s', self)
@ -2045,20 +2036,19 @@ class Resource(status.ResourceStatus):
delay = timeutils.retry_backoff_delay(count, delay = timeutils.retry_backoff_delay(count,
jitter_max=2.0) jitter_max=2.0)
waiter = scheduler.TaskRunner(self.pause) waiter = scheduler.TaskRunner(self.pause)
yield waiter.as_task(timeout=delay) yield from waiter.as_task(timeout=delay)
with excutils.exception_filter(should_retry): with excutils.exception_filter(should_retry):
yield self.action_handler_task(action, yield from self.action_handler_task(action,
*action_args) *action_args)
break break
if self.stack.action == self.stack.DELETE: if self.stack.action == self.stack.DELETE:
yield self._break_if_required( yield from self._break_if_required(self.DELETE,
self.DELETE, environment.HOOK_POST_DELETE) environment.HOOK_POST_DELETE)
@scheduler.wrappertask
def destroy(self): def destroy(self):
"""A task to delete the resource and remove it from the database.""" """A task to delete the resource and remove it from the database."""
yield self.delete() yield from self.delete()
if self.id is None: if self.id is None:
return return

View File

@ -15,6 +15,7 @@ import functools
import sys import sys
import types import types
import debtcollector
import eventlet import eventlet
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import encodeutils from oslo_utils import encodeutils
@ -293,7 +294,11 @@ class TaskRunner(object):
return self.__nonzero__() return self.__nonzero__()
def wrappertask(task): # noqa: C901 @debtcollector.removals.remove(message="Use the Python 3 'yield from' keyword "
"in place of 'yield', instead of "
"decorating with @wrappertask.",
stacklevel=1)
def wrappertask(task):
"""Decorator for a task that needs to drive a subtask. """Decorator for a task that needs to drive a subtask.
This is essentially a replacement for the Python 3-only "yield from" This is essentially a replacement for the Python 3-only "yield from"
@ -311,6 +316,9 @@ def wrappertask(task): # noqa: C901
@functools.wraps(task) @functools.wraps(task)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
# This could be simplified by using 'yield from' for the parent loop
# as well, but not without adding yet another frame to the stack
# for the subtasks.
parent = task(*args, **kwargs) parent = task(*args, **kwargs)
try: try:
@ -321,28 +329,7 @@ def wrappertask(task): # noqa: C901
while True: while True:
try: try:
if isinstance(subtask, types.GeneratorType): if isinstance(subtask, types.GeneratorType):
subtask_running = True yield from subtask
try:
step = next(subtask)
except StopIteration:
subtask_running = False
while subtask_running:
try:
yield step
except GeneratorExit:
subtask.close()
raise
except: # noqa
try:
step = subtask.throw(*sys.exc_info())
except StopIteration:
subtask_running = False
else:
try:
step = next(subtask)
except StopIteration:
subtask_running = False
else: else:
yield subtask yield subtask
except GeneratorExit: except GeneratorExit:

View File

@ -1155,7 +1155,6 @@ class Stack(collections.Mapping):
return {'resource_data': data['resources'].get(resource.name)} return {'resource_data': data['resources'].get(resource.name)}
@scheduler.wrappertask
def stack_task(self, action, reverse=False, post_func=None, def stack_task(self, action, reverse=False, post_func=None,
aggregate_exceptions=False, pre_completion_func=None, aggregate_exceptions=False, pre_completion_func=None,
notify=None): notify=None):
@ -1204,12 +1203,11 @@ class Stack(collections.Mapping):
lambda x: {}) lambda x: {})
@functools.wraps(getattr(resource.Resource, action_method)) @functools.wraps(getattr(resource.Resource, action_method))
@scheduler.wrappertask
def resource_action(r): def resource_action(r):
# Find e.g resource.create and call it # Find e.g resource.create and call it
handle = getattr(r, action_method) handle = getattr(r, action_method)
yield handle(**handle_kwargs(r)) yield from handle(**handle_kwargs(r))
if action == self.CREATE: if action == self.CREATE:
stk_defn.update_resource_data(self.defn, r.name, r.node_data()) stk_defn.update_resource_data(self.defn, r.name, r.node_data())
@ -1225,7 +1223,7 @@ class Stack(collections.Mapping):
aggregate_exceptions=aggregate_exceptions) aggregate_exceptions=aggregate_exceptions)
try: try:
yield action_task() yield from action_task()
except scheduler.Timeout: except scheduler.Timeout:
stack_status = self.FAILED stack_status = self.FAILED
reason = '%s timed out' % action.title() reason = '%s timed out' % action.title()
@ -1591,7 +1589,6 @@ class Stack(collections.Mapping):
self.state_set(self.action, self.FAILED, str(reason)) self.state_set(self.action, self.FAILED, str(reason))
@scheduler.wrappertask
def update_task(self, newstack, action=UPDATE, def update_task(self, newstack, action=UPDATE,
msg_queue=None, notify=None): msg_queue=None, notify=None):
if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE): if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE):
@ -1674,8 +1671,8 @@ class Stack(collections.Mapping):
check_message = functools.partial(self._check_for_message, check_message = functools.partial(self._check_for_message,
msg_queue) msg_queue)
try: try:
yield updater.as_task(timeout=self.timeout_secs(), yield from updater.as_task(timeout=self.timeout_secs(),
progress_callback=check_message) progress_callback=check_message)
finally: finally:
self.reset_dependencies() self.reset_dependencies()
@ -1691,7 +1688,7 @@ class Stack(collections.Mapping):
# so we roll back to the original state # so we roll back to the original state
should_rollback = self._update_exception_handler(e, action) should_rollback = self._update_exception_handler(e, action)
if should_rollback: if should_rollback:
yield self.update_task(oldstack, action=self.ROLLBACK) yield from self.update_task(oldstack, action=self.ROLLBACK)
except BaseException as e: except BaseException as e:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
self._update_exception_handler(e, action) self._update_exception_handler(e, action)

View File

@ -46,7 +46,6 @@ class StackUpdate(object):
else: else:
return '%s Update' % str(self.existing_stack) return '%s Update' % str(self.existing_stack)
@scheduler.wrappertask
def __call__(self): def __call__(self):
"""Return a co-routine that updates the stack.""" """Return a co-routine that updates the stack."""
@ -64,10 +63,10 @@ class StackUpdate(object):
error_wait_time=get_error_wait_time) error_wait_time=get_error_wait_time)
if not self.rollback: if not self.rollback:
yield cleanup_prev() yield from cleanup_prev()
try: try:
yield updater() yield from updater()
finally: finally:
self.previous_stack.reset_dependencies() self.previous_stack.reset_dependencies()
@ -77,12 +76,11 @@ class StackUpdate(object):
else: else:
return self._process_existing_resource_update(res) return self._process_existing_resource_update(res)
@scheduler.wrappertask
def _remove_backup_resource(self, prev_res): def _remove_backup_resource(self, prev_res):
if prev_res.state not in ((prev_res.INIT, prev_res.COMPLETE), if prev_res.state not in ((prev_res.INIT, prev_res.COMPLETE),
(prev_res.DELETE, prev_res.COMPLETE)): (prev_res.DELETE, prev_res.COMPLETE)):
LOG.debug("Deleting backup resource %s", prev_res.name) LOG.debug("Deleting backup resource %s", prev_res.name)
yield prev_res.destroy() yield from prev_res.destroy()
@staticmethod @staticmethod
def _exchange_stacks(existing_res, prev_res): def _exchange_stacks(existing_res, prev_res):
@ -92,7 +90,6 @@ class StackUpdate(object):
prev_stack.add_resource(existing_res) prev_stack.add_resource(existing_res)
existing_stack.add_resource(prev_res) existing_stack.add_resource(prev_res)
@scheduler.wrappertask
def _create_resource(self, new_res): def _create_resource(self, new_res):
res_name = new_res.name res_name = new_res.name
@ -111,7 +108,7 @@ class StackUpdate(object):
return return
LOG.debug("Deleting backup Resource %s", res_name) LOG.debug("Deleting backup Resource %s", res_name)
yield prev_res.destroy() yield from prev_res.destroy()
# Back up existing resource # Back up existing resource
if res_name in self.existing_stack: if res_name in self.existing_stack:
@ -132,7 +129,7 @@ class StackUpdate(object):
self.previous_stack.t.add_resource(new_res.t) self.previous_stack.t.add_resource(new_res.t)
self.previous_stack.t.store(self.previous_stack.context) self.previous_stack.t.store(self.previous_stack.context)
yield new_res.create() yield from new_res.create()
self._update_resource_data(new_res) self._update_resource_data(new_res)
@ -161,7 +158,6 @@ class StackUpdate(object):
stk_defn.update_resource_data(self.new_stack.defn, stk_defn.update_resource_data(self.new_stack.defn,
resource.name, node_data) resource.name, node_data)
@scheduler.wrappertask
def _process_new_resource_update(self, new_res): def _process_new_resource_update(self, new_res):
res_name = new_res.name res_name = new_res.name
@ -170,9 +166,9 @@ class StackUpdate(object):
is_substituted = existing_res.check_is_substituted(type(new_res)) is_substituted = existing_res.check_is_substituted(type(new_res))
if type(existing_res) is type(new_res) or is_substituted: if type(existing_res) is type(new_res) or is_substituted:
try: try:
yield self._update_in_place(existing_res, yield from self._update_in_place(existing_res,
new_res, new_res,
is_substituted) is_substituted)
except resource.UpdateReplace: except resource.UpdateReplace:
pass pass
else: else:
@ -196,7 +192,7 @@ class StackUpdate(object):
else: else:
self._check_replace_restricted(new_res) self._check_replace_restricted(new_res)
yield self._create_resource(new_res) yield from self._create_resource(new_res)
def _update_in_place(self, existing_res, new_res, is_substituted=False): def _update_in_place(self, existing_res, new_res, is_substituted=False):
existing_snippet = self.existing_snippets[existing_res.name] existing_snippet = self.existing_snippets[existing_res.name]
@ -215,15 +211,15 @@ class StackUpdate(object):
existing_res.stack.resources[existing_res.name] = substitute existing_res.stack.resources[existing_res.name] = substitute
existing_res = substitute existing_res = substitute
existing_res.converge = self.new_stack.converge existing_res.converge = self.new_stack.converge
return existing_res.update(new_snippet, existing_snippet, yield from existing_res.update(new_snippet, existing_snippet,
prev_resource=prev_res) prev_resource=prev_res)
@scheduler.wrappertask
def _process_existing_resource_update(self, existing_res): def _process_existing_resource_update(self, existing_res):
res_name = existing_res.name res_name = existing_res.name
if res_name in self.previous_stack: if res_name in self.previous_stack:
yield self._remove_backup_resource(self.previous_stack[res_name]) backup_res = self.previous_stack[res_name]
yield from self._remove_backup_resource(backup_res)
if res_name in self.new_stack: if res_name in self.new_stack:
new_res = self.new_stack[res_name] new_res = self.new_stack[res_name]
@ -232,7 +228,7 @@ class StackUpdate(object):
return return
if existing_res.stack is not self.previous_stack: if existing_res.stack is not self.previous_stack:
yield existing_res.destroy() yield from existing_res.destroy()
if res_name not in self.new_stack: if res_name not in self.new_stack:
self.existing_stack.remove_resource(res_name) self.existing_stack.remove_resource(res_name)

View File

@ -2110,6 +2110,7 @@ class StackUpdateTest(common.HeatTestCase):
def update(self, after, before=None, prev_resource=None): def update(self, after, before=None, prev_resource=None):
ResourceTypeB.count_b += 1 ResourceTypeB.count_b += 1
yield
resource._register_class('ResourceTypeB', ResourceTypeB) resource._register_class('ResourceTypeB', ResourceTypeB)
@ -2124,6 +2125,7 @@ class StackUpdateTest(common.HeatTestCase):
def update(self, after, before=None, prev_resource=None): def update(self, after, before=None, prev_resource=None):
ResourceTypeA.count_a += 1 ResourceTypeA.count_a += 1
yield
resource._register_class('ResourceTypeA', ResourceTypeA) resource._register_class('ResourceTypeA', ResourceTypeA)

View File

@ -6,6 +6,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
Babel!=2.4.0,>=2.3.4 # BSD Babel!=2.4.0,>=2.3.4 # BSD
croniter>=0.3.4 # MIT License croniter>=0.3.4 # MIT License
cryptography>=2.1 # BSD/Apache-2.0 cryptography>=2.1 # BSD/Apache-2.0
debtcollector>=1.19.0 # Apache-2.0
eventlet!=0.18.3,!=0.20.1,!=0.21.0,!=0.23.0,!=0.25.0,>=0.18.2 # MIT eventlet!=0.18.3,!=0.20.1,!=0.21.0,!=0.23.0,!=0.25.0,>=0.18.2 # MIT
keystoneauth1>=3.18.0 # Apache-2.0 keystoneauth1>=3.18.0 # Apache-2.0
keystonemiddleware>=4.17.0 # Apache-2.0 keystonemiddleware>=4.17.0 # Apache-2.0