Add tests for _ImportToStore.execute()

There are some existing tests for the body of this task, but none for the
meat of the task when things go right.

Change-Id: I6fd0505a265c8ffb03bf8c3c0f2d4a1176485b51
This commit is contained in:
Dan Smith 2020-07-28 09:24:46 -07:00
parent 2e11ecb15e
commit 94a672c7cd
1 changed files with 73 additions and 0 deletions

View File

@ -109,6 +109,79 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
overwrite=False)
self.img_factory = self.gateway.get_image_factory(self.context)
def test_execute(self):
wrapper = mock.MagicMock()
action = mock.MagicMock()
wrapper.__enter__.return_value = action
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper,
"http://url",
"store1", False,
True)
# Assert file_path is honored
with mock.patch.object(image_import, '_execute') as mock_execute:
image_import.execute(mock.sentinel.path)
mock_execute.assert_called_once_with(action, mock.sentinel.path)
# Assert file_path is optional
with mock.patch.object(image_import, '_execute') as mock_execute:
image_import.execute()
mock_execute.assert_called_once_with(action, None)
def test_execute_body_with_store(self):
image = mock.MagicMock()
img_repo = mock.MagicMock()
img_repo.get.return_value = image
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper,
"http://url",
"store1", False,
True)
action = mock.MagicMock()
image_import._execute(action, mock.sentinel.path)
action.set_image_data.assert_called_once_with(
mock.sentinel.path,
TASK_ID1, backend='store1',
set_active=True)
action.remove_importing_stores(['store1'])
def test_execute_body_with_store_no_path(self):
image = mock.MagicMock()
img_repo = mock.MagicMock()
img_repo.get.return_value = image
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper,
"http://url",
"store1", False,
True)
action = mock.MagicMock()
image_import._execute(action, None)
action.set_image_data.assert_called_once_with(
'http://url',
TASK_ID1, backend='store1',
set_active=True)
action.remove_importing_stores(['store1'])
def test_execute_body_without_store(self):
image = mock.MagicMock()
img_repo = mock.MagicMock()
img_repo.get.return_value = image
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
wrapper,
"http://url",
None, False,
True)
action = mock.MagicMock()
image_import._execute(action, mock.sentinel.path)
action.set_image_data.assert_called_once_with(
mock.sentinel.path,
TASK_ID1, backend=None,
set_active=True)
action.remove_importing_stores.assert_not_called()
def test_raises_when_image_deleted(self):
img_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)