Update task message during import

This updates the task.message field with details about the copy,
which will also update the updated_at field on the task. This will
facilitate some rudimentary liveness checking from outside the task
thread. Note that it also checks the state of our task, and will
abort if it has been pushed out of 'processing' state externally.
This will be used in the following patch to faciliate import lock
busting behavior.

Change-Id: I8667c17813f6e701db98595b0b30df9e7b275294
This commit is contained in:
Dan Smith 2020-07-27 15:05:58 -07:00
parent e49f23c04d
commit 77d9cfa66e
3 changed files with 98 additions and 12 deletions

View File

@ -394,10 +394,11 @@ class _VerifyStaging(task.Task):
class _ImportToStore(task.Task):
def __init__(self, task_id, task_type, action_wrapper, uri,
def __init__(self, task_id, task_type, task_repo, action_wrapper, uri,
backend, all_stores_must_succeed, set_active):
self.task_id = task_id
self.task_type = task_type
self.task_repo = task_repo
self.action_wrapper = action_wrapper
self.uri = uri
self.backend = backend
@ -490,6 +491,20 @@ class _ImportToStore(task.Task):
'copied': total_bytes // units.Mi})
self.last_status = timeutils.now()
task = script_utils.get_task(self.task_repo, self.task_id)
if task is None:
LOG.error(
'Status callback for task %(task)s found no task object!',
{'task': self.task_id})
raise exception.TaskNotFound(self.task_id)
if task.status != 'processing':
LOG.error('Task %(task)s expected "processing" status, '
'but found "%(status)s"; aborting.')
raise exception.TaskAbortedError()
task.message = _('Copied %i MiB') % (total_bytes // units.Mi)
self.task_repo.save(task)
def revert(self, result, **kwargs):
"""
Remove location from image in case of failure
@ -637,6 +652,7 @@ def get_flow(**kwargs):
import_task = lf.Flow(task_name)
import_to_store = _ImportToStore(task_id,
task_name,
task_repo,
action_wrapper,
file_uri,
store,

View File

@ -359,6 +359,10 @@ class ImportTaskError(TaskException, Invalid):
message = _("An import task exception occurred")
class TaskAbortedError(ImportTaskError):
message = _("Task was aborted externally")
class DuplicateLocation(Duplicate):
message = _("The location %(location)s already exists")

View File

@ -17,9 +17,10 @@ from unittest import mock
from glance_store import exceptions as store_exceptions
from oslo_config import cfg
from oslo_utils import units
import glance.async_.flows.api_image_import as import_flow
from glance.common.exception import ImportTaskError
from glance.common import exception
from glance.common.scripts.image_import import main as image_import
from glance import context
from glance import gateway
@ -112,9 +113,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
def test_execute(self):
wrapper = mock.MagicMock()
action = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper.__enter__.return_value = action
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper,
task_repo, wrapper,
"http://url",
"store1", False,
True)
@ -132,9 +134,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
image = mock.MagicMock()
img_repo = mock.MagicMock()
img_repo.get.return_value = image
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper,
task_repo, wrapper,
"http://url",
"store1", False,
True)
@ -151,9 +154,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
image = mock.MagicMock()
img_repo = mock.MagicMock()
img_repo.get.return_value = image
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper,
task_repo, wrapper,
"http://url",
"store1", False,
True)
@ -170,9 +174,10 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
image = mock.MagicMock()
img_repo = mock.MagicMock()
img_repo.get.return_value = image
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper,
task_repo, wrapper,
"http://url",
None, False,
True)
@ -189,9 +194,11 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.now')
def test_status_callback_limits_rate(self, mock_now, mock_log):
img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
task_repo.get.return_value.status = 'processing'
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper,
task_repo, wrapper,
"http://url",
None, False,
True)
@ -240,22 +247,26 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
def test_raises_when_image_deleted(self):
img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper, "http://url",
task_repo, wrapper,
"http://url",
"store1", False,
True)
image = self.img_factory.new_image(image_id=UUID1)
image.status = "deleted"
img_repo.get.return_value = image
self.assertRaises(ImportTaskError, image_import.execute)
self.assertRaises(exception.ImportTaskError, image_import.execute)
@mock.patch("glance.async_.flows.api_image_import.image_import")
def test_remove_store_from_property(self, mock_import):
img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper, "http://url",
task_repo, wrapper,
"http://url",
"store1", True,
True)
extra_properties = {"os_glance_importing_to_stores": "store1,store2"}
@ -269,9 +280,11 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
@mock.patch("glance.async_.flows.api_image_import.image_import")
def test_raises_when_all_stores_must_succeed(self, mock_import):
img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper, "http://url",
task_repo, wrapper,
"http://url",
"store1", True,
True)
image = self.img_factory.new_image(image_id=UUID1)
@ -285,9 +298,11 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
@mock.patch("glance.async_.flows.api_image_import.image_import")
def test_doesnt_raise_when_not_all_stores_must_succeed(self, mock_import):
img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper, "http://url",
task_repo, wrapper,
"http://url",
"store1", False,
True)
image = self.img_factory.new_image(image_id=UUID1)
@ -302,6 +317,57 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
except cursive_exception.SignatureVerificationError:
self.fail("Exception shouldn't be raised")
@mock.patch('glance.common.scripts.utils.get_task')
def test_status_callback_updates_task_message(self, mock_get):
task_repo = mock.MagicMock()
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, mock.MagicMock(),
"http://url",
"store1", False,
True)
task = mock.MagicMock()
task.status = 'processing'
mock_get.return_value = task
action = mock.MagicMock()
image_import._status_callback(action, 128, 256 * units.Mi)
mock_get.assert_called_once_with(task_repo, TASK_ID1)
task_repo.save.assert_called_once_with(task)
self.assertEqual(_('Copied %i MiB' % 256), task.message)
@mock.patch('glance.common.scripts.utils.get_task')
def test_status_aborts_missing_task(self, mock_get):
task_repo = mock.MagicMock()
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, mock.MagicMock(),
"http://url",
"store1", False,
True)
mock_get.return_value = None
action = mock.MagicMock()
self.assertRaises(exception.TaskNotFound,
image_import._status_callback,
action, 128, 256 * units.Mi)
mock_get.assert_called_once_with(task_repo, TASK_ID1)
task_repo.save.assert_not_called()
@mock.patch('glance.common.scripts.utils.get_task')
def test_status_aborts_invalid_task_state(self, mock_get):
task_repo = mock.MagicMock()
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, mock.MagicMock(),
"http://url",
"store1", False,
True)
task = mock.MagicMock()
task.status = 'failed'
mock_get.return_value = task
action = mock.MagicMock()
self.assertRaises(exception.TaskAbortedError,
image_import._status_callback,
action, 128, 256 * units.Mi)
mock_get.assert_called_once_with(task_repo, TASK_ID1)
task_repo.save.assert_not_called()
class TestDeleteFromFS(test_utils.BaseTestCase):
def test_delete_with_backends_deletes(self):