Mark task as failed in case of flow failure
When a taskflow flow fails at an intermediate task before complete task, the glance task is in the processing state indefinitely. This patch marks the task as failed. In addition, it also fixes the importToFS task to properly analyze the taskflow result object. Closes-Bug: #1459015 Change-Id: Icd325c87bac5a48ea177cfa883a173c3c490feb2
This commit is contained in:
parent
62a46593b9
commit
ef636718af
|
@ -24,6 +24,7 @@ from stevedore import named
|
|||
from taskflow.patterns import linear_flow as lf
|
||||
from taskflow import retry
|
||||
from taskflow import task
|
||||
from taskflow.types import failure
|
||||
|
||||
from glance.common import exception
|
||||
from glance.common.scripts.image_import import main as image_import
|
||||
|
@ -148,11 +149,11 @@ class _ImportToFS(task.Task):
|
|||
path = self.store.add(image_id, data, 0, context=None)[0]
|
||||
return path
|
||||
|
||||
def revert(self, image_id, result=None, **kwargs):
|
||||
# NOTE(flaper87): If result is None, it probably
|
||||
# means this task failed. Otherwise, we would have
|
||||
# a result from its execution.
|
||||
if result is None:
|
||||
def revert(self, image_id, result, **kwargs):
|
||||
if isinstance(result, failure.Failure):
|
||||
LOG.exception(_LE('Task: %(task_id)s failed to import image '
|
||||
'%(image_id)s to the filesystem.') %
|
||||
{'task_id': self.task_id, 'image_id': image_id})
|
||||
return
|
||||
|
||||
if os.path.exists(result.split("file://")[-1]):
|
||||
|
|
|
@ -129,3 +129,7 @@ class TaskExecutor(glance.async.TaskExecutor):
|
|||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE('Failed to execute task %(task_id)s: %(exc)s') %
|
||||
{'task_id': task_id, 'exc': exc.message})
|
||||
# TODO(sabari): Check for specific exceptions and update the
|
||||
# task failure message.
|
||||
task.fail(_('Task failed due to Internal Error'))
|
||||
self.task_repo.save(task)
|
||||
|
|
|
@ -21,6 +21,7 @@ import glance_store
|
|||
from oslo_config import cfg
|
||||
from six.moves import cStringIO
|
||||
from taskflow import task
|
||||
from taskflow.types import failure
|
||||
|
||||
import glance.async.flows.base_import as import_flow
|
||||
from glance.async import taskflow_executor
|
||||
|
@ -151,6 +152,44 @@ class TestImportTask(test_utils.BaseTestCase):
|
|||
self.assertFalse(os.path.exists(tmp_image_path))
|
||||
self.assertTrue(os.path.exists(image_path))
|
||||
|
||||
def test_import_flow_revert_import_to_fs(self):
|
||||
self.config(engine_mode='serial', group='taskflow_executor')
|
||||
|
||||
img_factory = mock.MagicMock()
|
||||
|
||||
executor = taskflow_executor.TaskExecutor(
|
||||
self.context,
|
||||
self.task_repo,
|
||||
self.img_repo,
|
||||
img_factory)
|
||||
|
||||
self.task_repo.get.return_value = self.task
|
||||
|
||||
def create_image(*args, **kwargs):
|
||||
kwargs['image_id'] = UUID1
|
||||
return self.img_factory.new_image(*args, **kwargs)
|
||||
|
||||
self.img_repo.get.return_value = self.image
|
||||
img_factory.new_image.side_effect = create_image
|
||||
|
||||
with mock.patch.object(script_utils, 'get_image_data_iter') as dmock:
|
||||
dmock.side_effect = RuntimeError
|
||||
|
||||
with mock.patch.object(import_flow._ImportToFS, 'revert') as rmock:
|
||||
self.assertRaises(RuntimeError,
|
||||
executor.begin_processing, self.task.task_id)
|
||||
self.assertTrue(rmock.called)
|
||||
self.assertIsInstance(rmock.call_args[1]['result'],
|
||||
failure.Failure)
|
||||
|
||||
image_path = os.path.join(self.test_dir, self.image.image_id)
|
||||
tmp_image_path = os.path.join(self.work_dir,
|
||||
"%s.tasks_import" % image_path)
|
||||
self.assertFalse(os.path.exists(tmp_image_path))
|
||||
# Note(sabari): The image should not have been uploaded to
|
||||
# the store as the flow failed before ImportToStore Task.
|
||||
self.assertFalse(os.path.exists(image_path))
|
||||
|
||||
def test_import_flow_revert(self):
|
||||
self.config(engine_mode='serial',
|
||||
group='taskflow_executor')
|
||||
|
|
|
@ -78,3 +78,14 @@ class TestTaskExecutor(test_utils.BaseTestCase):
|
|||
# assert the call
|
||||
load_mock.assert_called_once()
|
||||
engine.assert_called_once()
|
||||
|
||||
def test_task_fail(self):
|
||||
with mock.patch.object(engines, 'load') as load_mock:
|
||||
engine = mock.Mock()
|
||||
load_mock.return_value = engine
|
||||
engine.run.side_effect = RuntimeError
|
||||
self.task_repo.get.return_value = self.task
|
||||
self.assertRaises(RuntimeError, self.executor.begin_processing,
|
||||
self.task.task_id)
|
||||
self.assertEqual('failure', self.task.status)
|
||||
self.task_repo.save.assert_called_with(self.task)
|
||||
|
|
Loading…
Reference in New Issue