From ef636718af0fb200390510c2cb48fe8b9ece6c7e Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Mon, 18 May 2015 17:56:59 -0700 Subject: [PATCH] Mark task as failed in case of flow failure When a taskflow flow fails at an intermediate task before complete task, the glance task is in the processing state indefinitely. This patch marks the task as failed. In addition, it also fixes the importToFS task to properly analyze the taskflow result object. Closes-Bug: #1459015 Change-Id: Icd325c87bac5a48ea177cfa883a173c3c490feb2 --- glance/async/flows/base_import.py | 11 +++--- glance/async/taskflow_executor.py | 4 ++ glance/tests/unit/async/flows/test_import.py | 39 +++++++++++++++++++ .../unit/async/test_taskflow_executor.py | 11 ++++++ 4 files changed, 60 insertions(+), 5 deletions(-) diff --git a/glance/async/flows/base_import.py b/glance/async/flows/base_import.py index 1e9f112aa9..c622f2285b 100644 --- a/glance/async/flows/base_import.py +++ b/glance/async/flows/base_import.py @@ -24,6 +24,7 @@ from stevedore import named from taskflow.patterns import linear_flow as lf from taskflow import retry from taskflow import task +from taskflow.types import failure from glance.common import exception from glance.common.scripts.image_import import main as image_import @@ -148,11 +149,11 @@ class _ImportToFS(task.Task): path = self.store.add(image_id, data, 0, context=None)[0] return path - def revert(self, image_id, result=None, **kwargs): - # NOTE(flaper87): If result is None, it probably - # means this task failed. Otherwise, we would have - # a result from its execution. - if result is None: + def revert(self, image_id, result, **kwargs): + if isinstance(result, failure.Failure): + LOG.exception(_LE('Task: %(task_id)s failed to import image ' + '%(image_id)s to the filesystem.') % + {'task_id': self.task_id, 'image_id': image_id}) return if os.path.exists(result.split("file://")[-1]): diff --git a/glance/async/taskflow_executor.py b/glance/async/taskflow_executor.py index 84bdc5fc49..0419786b99 100644 --- a/glance/async/taskflow_executor.py +++ b/glance/async/taskflow_executor.py @@ -129,3 +129,7 @@ class TaskExecutor(glance.async.TaskExecutor): with excutils.save_and_reraise_exception(): LOG.error(_LE('Failed to execute task %(task_id)s: %(exc)s') % {'task_id': task_id, 'exc': exc.message}) + # TODO(sabari): Check for specific exceptions and update the + # task failure message. + task.fail(_('Task failed due to Internal Error')) + self.task_repo.save(task) diff --git a/glance/tests/unit/async/flows/test_import.py b/glance/tests/unit/async/flows/test_import.py index 7acd599d4d..25396c2c0b 100644 --- a/glance/tests/unit/async/flows/test_import.py +++ b/glance/tests/unit/async/flows/test_import.py @@ -21,6 +21,7 @@ import glance_store from oslo_config import cfg from six.moves import cStringIO from taskflow import task +from taskflow.types import failure import glance.async.flows.base_import as import_flow from glance.async import taskflow_executor @@ -151,6 +152,44 @@ class TestImportTask(test_utils.BaseTestCase): self.assertFalse(os.path.exists(tmp_image_path)) self.assertTrue(os.path.exists(image_path)) + def test_import_flow_revert_import_to_fs(self): + self.config(engine_mode='serial', group='taskflow_executor') + + img_factory = mock.MagicMock() + + executor = taskflow_executor.TaskExecutor( + self.context, + self.task_repo, + self.img_repo, + img_factory) + + self.task_repo.get.return_value = self.task + + def create_image(*args, **kwargs): + kwargs['image_id'] = UUID1 + return self.img_factory.new_image(*args, **kwargs) + + self.img_repo.get.return_value = self.image + img_factory.new_image.side_effect = create_image + + with mock.patch.object(script_utils, 'get_image_data_iter') as dmock: + dmock.side_effect = RuntimeError + + with mock.patch.object(import_flow._ImportToFS, 'revert') as rmock: + self.assertRaises(RuntimeError, + executor.begin_processing, self.task.task_id) + self.assertTrue(rmock.called) + self.assertIsInstance(rmock.call_args[1]['result'], + failure.Failure) + + image_path = os.path.join(self.test_dir, self.image.image_id) + tmp_image_path = os.path.join(self.work_dir, + "%s.tasks_import" % image_path) + self.assertFalse(os.path.exists(tmp_image_path)) + # Note(sabari): The image should not have been uploaded to + # the store as the flow failed before ImportToStore Task. + self.assertFalse(os.path.exists(image_path)) + def test_import_flow_revert(self): self.config(engine_mode='serial', group='taskflow_executor') diff --git a/glance/tests/unit/async/test_taskflow_executor.py b/glance/tests/unit/async/test_taskflow_executor.py index 403d870e63..d9d07aa28c 100644 --- a/glance/tests/unit/async/test_taskflow_executor.py +++ b/glance/tests/unit/async/test_taskflow_executor.py @@ -78,3 +78,14 @@ class TestTaskExecutor(test_utils.BaseTestCase): # assert the call load_mock.assert_called_once() engine.assert_called_once() + + def test_task_fail(self): + with mock.patch.object(engines, 'load') as load_mock: + engine = mock.Mock() + load_mock.return_value = engine + engine.run.side_effect = RuntimeError + self.task_repo.get.return_value = self.task + self.assertRaises(RuntimeError, self.executor.begin_processing, + self.task.task_id) + self.assertEqual('failure', self.task.status) + self.task_repo.save.assert_called_with(self.task)