From 3f6e349d0853a9746d0d744bc3eb0b2baa1ddff9 Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Tue, 28 Jul 2020 09:02:13 -0700 Subject: [PATCH] Implement time-limited import locking This attempts to provide a time-based import lock that is dependent on the task actually making progress. While the task is copying data, the task message is updated, which in turn touches the task updated_at time. The API will break any lock after 30 minutes of no activity on a stalled or dead task. The import taskflow will check to see if it has lost the lock at any point, and/or if its task status has changed and abort if so. The logic in more detail: 1. API locks the image by task-id before we start the task thread, but before we return 2. Import thread will check the task-id lock on the image every time it tries to modify the image, and if it has changed, will abort 3. The data pipeline will heartbeat the task every minute by updating the task.message (bonus: we get some status) 4. If the data pipeline heartbeat ever finds the task state to be changed from the expected 'processing' it will abort 5. On task revert or completion, we drop the task-id lock from the image 6. If something ever gets stuck or dies, the heartbeating will stop 7. If the API gets a request for an import where the lock is held, it will grab the task by id (in the lock) and check the state and age. If the age is sufficiently old (no heartbeating) and the state is either 'processing' or terminal, it will mark the task as failed, steal the lock, and proceed. Lots of logging throughout any time we encounter unexpected situations. Closes-Bug: #1884596 Change-Id: Icb3c1d27e9a514d96fca7c1d824fd2183f69d8b3 --- glance/api/v2/images.py | 105 ++++++- glance/async_/flows/api_image_import.py | 121 +++++++- .../v2/test_images_import_locking.py | 268 ++++++++++++++++++ .../async_/flows/test_api_image_import.py | 200 +++++++++++-- glance/tests/unit/async_/test_async.py | 24 +- glance/tests/unit/v2/test_images_resource.py | 161 ++++++++++- 6 files changed, 828 insertions(+), 51 deletions(-) create mode 100644 glance/tests/functional/v2/test_images_import_locking.py diff --git a/glance/api/v2/images.py b/glance/api/v2/images.py index 3058f5ff68..d7847ddda8 100644 --- a/glance/api/v2/images.py +++ b/glance/api/v2/images.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime import hashlib import os import re @@ -25,6 +26,7 @@ from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils as json from oslo_utils import encodeutils +from oslo_utils import timeutils as oslo_timeutils import six from six.moves import http_client as http import six.moves.urllib.parse as urlparse @@ -98,6 +100,90 @@ class ImagesController(object): return image + def _bust_import_lock(self, admin_image_repo, admin_task_repo, + image, task, task_id): + if task: + # FIXME(danms): It would be good if we had a 'canceled' or + # 'aborted' status here. + try: + task.fail('Expired lock preempted') + admin_task_repo.save(task) + except exception.InvalidTaskStatusTransition: + # NOTE(danms): This may happen if we try to fail a + # task that is in a terminal state, but where the lock + # was never dropped from the image. We will log the + # image, task, and status below so we can just ignore + # here. + pass + + try: + admin_image_repo.delete_property_atomic( + image, 'os_glance_import_task', task_id) + except exception.NotFound: + LOG.warning('Image %(image)s has stale import task %(task)s ' + 'but we lost the race to remove it.', + {'image': image.image_id, + 'task': task_id}) + # We probably lost the race to expire the old lock, but + # act like it is not yet expired to avoid a retry loop. + raise exception.Conflict('Image has active task') + + LOG.warning('Image %(image)s has stale import task %(task)s ' + 'in status %(status)s from %(owner)s; removed lock ' + 'because it had expired.', + {'image': image.image_id, + 'task': task_id, + 'status': task and task.status or 'missing', + 'owner': task and task.owner or 'unknown owner'}) + + def _enforce_import_lock(self, req, image): + admin_context = req.context.elevated() + admin_image_repo = self.gateway.get_repo(admin_context) + admin_task_repo = self.gateway.get_task_repo(admin_context) + other_task = image.extra_properties['os_glance_import_task'] + + expiry = datetime.timedelta(minutes=60) + bustable_states = ('pending', 'processing', 'success', 'failure') + + try: + task = admin_task_repo.get(other_task) + except exception.NotFound: + # NOTE(danms): This could happen if we failed to do an import + # a long time ago, and the task record has since been culled from + # the database, but the task id is still in the lock field. + LOG.warning('Image %(image)s has non-existent import ' + 'task %(task)s; considering it stale', + {'image': image.image_id, + 'task': other_task}) + task = None + age = 0 + else: + age = oslo_timeutils.utcnow() - task.updated_at + if task.status == 'pending': + # NOTE(danms): Tasks in pending state could be queued, + # blocked or otherwise right-about-to-get-going, so we + # double the expiry time for safety. We will report + # time remaining below, so this is not too obscure. + expiry *= 2 + + if not task or (task.status in bustable_states and age >= expiry): + self._bust_import_lock(admin_image_repo, admin_task_repo, + image, task, other_task) + return + + if task.status in bustable_states: + LOG.warning('Image %(image)s has active import task %(task)s in ' + 'status %(status)s; lock remains valid for %(expire)i ' + 'more seconds', + {'image': image.image_id, + 'task': task.task_id, + 'status': task.status, + 'expire': (expiry - age).total_seconds()}) + else: + LOG.debug('Image %(image)s has import task %(task)s in status ' + '%(status)s and does not qualify for expiry.') + raise exception.Conflict('Image has active task') + @utils.mutating def import_image(self, req, image_id, body): image_repo = self.gateway.get_repo(req.context) @@ -142,6 +228,11 @@ class ImagesController(object): raise webob.exc.HTTPForbidden( explanation=_("Operation not permitted")) + if 'os_glance_import_task' in image.extra_properties: + # NOTE(danms): This will raise exception.Conflict if the + # lock is present and valid, or return if absent or invalid. + self._enforce_import_lock(req, image) + stores = [None] if CONF.enabled_backends: try: @@ -214,6 +305,17 @@ class ImagesController(object): import_task = task_factory.new_task(task_type='api_image_import', owner=req.context.owner, task_input=task_input) + + # NOTE(danms): Try to grab the lock for this task + try: + image_repo.set_property_atomic(image, + 'os_glance_import_task', + import_task.task_id) + except exception.Duplicate: + msg = (_("New operation on image '%s' is not permitted as " + "prior operation is still in progress") % image_id) + raise exception.Conflict(msg) + task_repo.add(import_task) task_executor = executor_factory.new_task_executor(req.context) pool = common.get_thread_pool("tasks_pool") @@ -734,7 +836,8 @@ class RequestDeserializer(wsgi.JSONRequestDeserializer): 'size', 'virtual_size', 'direct_url', 'self', 'file', 'schema', 'id', 'os_hash_algo', 'os_hash_value') - _reserved_properties = ('location', 'deleted', 'deleted_at') + _reserved_properties = ('location', 'deleted', 'deleted_at', + 'os_glance_import_task') _base_properties = ('checksum', 'created_at', 'container_format', 'disk_format', 'id', 'min_disk', 'min_ram', 'name', 'size', 'virtual_size', 'status', 'tags', 'owner', diff --git a/glance/async_/flows/api_image_import.py b/glance/async_/flows/api_image_import.py index a846d0aed9..ea7bdf6fbd 100644 --- a/glance/async_/flows/api_image_import.py +++ b/glance/async_/flows/api_image_import.py @@ -101,13 +101,16 @@ class ImportActionWrapper(object): :param image-id: The ID of the image we should be altering """ - def __init__(self, image_repo, image_id): + def __init__(self, image_repo, image_id, task_id): self._image_repo = image_repo self._image_id = image_id + self._task_id = task_id def __enter__(self): self._image = self._image_repo.get(self._image_id) self._image_previous_status = self._image.status + self._assert_task_lock(self._image) + return _ImportActions(self._image) def __exit__(self, type, value, traceback): @@ -123,6 +126,43 @@ class ImportActionWrapper(object): 'new_status': self._image.status}) self._image_repo.save(self._image, self._image_previous_status) + @property + def image_id(self): + return self._image_id + + def drop_lock_for_task(self): + """Delete the import lock for our task. + + This is an atomic operation and thus does not require a context + for the image save. Note that after calling this method, no + further actions will be allowed on the image. + + :raises: NotFound if the image was not locked by the expected task. + """ + image = self._image_repo.get(self._image_id) + self._image_repo.delete_property_atomic(image, + 'os_glance_import_task', + self._task_id) + + def _assert_task_lock(self, image): + task_lock = image.extra_properties.get('os_glance_import_task') + if task_lock != self._task_id: + LOG.error('Image %(image)s import task %(task)s attempted to ' + 'take action on image, but other task %(other)s holds ' + 'the lock; Aborting.', + {'image': self._image_id, + 'task': self._task_id, + 'other': task_lock}) + raise exception.TaskAbortedError() + + def assert_task_lock(self): + """Assert that we own the task lock on the image. + + :raises: TaskAbortedError if we do not + """ + image = self._image_repo.get(self._image_id) + self._assert_task_lock(image) + class _ImportActions(object): """Actions available for being performed on an image during import. @@ -319,6 +359,41 @@ class _DeleteFromFS(task.Task): 'fn': file_path}) +class _ImageLock(task.Task): + def __init__(self, task_id, task_type, action_wrapper): + self.task_id = task_id + self.task_type = task_type + self.action_wrapper = action_wrapper + super(_ImageLock, self).__init__( + name='%s-ImageLock-%s' % (task_type, task_id)) + + def execute(self): + self.action_wrapper.assert_task_lock() + LOG.debug('Image %(image)s import task %(task)s lock confirmed', + {'image': self.action_wrapper.image_id, + 'task': self.task_id}) + + def revert(self, result, **kwargs): + """Drop our claim on the image. + + If we have failed, we need to drop our import_task lock on the image + so that something else can have a try. Note that we may have been + preempted so we should only drop *our* lock. + """ + try: + self.action_wrapper.drop_lock_for_task() + except exception.NotFound: + LOG.warning('Image %(image)s import task %(task)s lost its ' + 'lock during execution!', + {'image': self.action_wrapper.image_id, + 'task': self.task_id}) + else: + LOG.debug('Image %(image)s import task %(task)s dropped ' + 'its lock after failure', + {'image': self.action_wrapper.image_id, + 'task': self.task_id}) + + class _VerifyStaging(task.Task): # NOTE(jokke): This could be also for example "staging_path" but to @@ -548,24 +623,17 @@ class _VerifyImageState(task.Task): class _CompleteTask(task.Task): - def __init__(self, task_id, task_type, task_repo, image_id): + def __init__(self, task_id, task_type, task_repo, action_wrapper): self.task_id = task_id self.task_type = task_type self.task_repo = task_repo - self.image_id = image_id + self.action_wrapper = action_wrapper super(_CompleteTask, self).__init__( name='%s-CompleteTask-%s' % (task_type, task_id)) - def execute(self): - """Finishing the task flow - - :param image_id: Glance Image ID - """ - task = script_utils.get_task(self.task_repo, self.task_id) - if task is None: - return + def _finish_task(self, task): try: - task.succeed({'image_id': self.image_id}) + task.succeed({'image_id': self.action_wrapper.image_id}) except Exception as e: # Note: The message string contains Error in it to indicate # in the task.message that it's a error message for the user. @@ -584,6 +652,28 @@ class _CompleteTask(task.Task): finally: self.task_repo.save(task) + def _drop_lock(self): + try: + self.action_wrapper.drop_lock_for_task() + except exception.NotFound: + # NOTE(danms): This would be really bad, but there is probably + # not much point in reverting all the way back if we got this + # far. Log the carnage for forensics. + LOG.error('Image %(image)s import task %(task)s did not hold the ' + 'lock upon completion!', + {'image': self.action_wrapper.image_id, + 'task': self.task_id}) + + def execute(self): + """Finishing the task flow + + :param image_id: Glance Image ID + """ + task = script_utils.get_task(self.task_repo, self.task_id) + if task is not None: + self._finish_task(task) + self._drop_lock() + LOG.info(_LI("%(task_id)s of %(task_type)s completed"), {'task_id': self.task_id, 'task_type': self.task_type}) @@ -616,7 +706,8 @@ def get_flow(**kwargs): # Instantiate an action wrapper with the admin repo if we got one, # otherwise with the regular repo. - action_wrapper = ImportActionWrapper(admin_repo or image_repo, image_id) + action_wrapper = ImportActionWrapper(admin_repo or image_repo, image_id, + task_id) if not uri and import_method in ['glance-direct', 'copy-image']: if CONF.enabled_backends: @@ -627,6 +718,8 @@ def get_flow(**kwargs): flow = lf.Flow(task_type, retry=retry.AlwaysRevert()) + flow.add(_ImageLock(task_id, task_type, action_wrapper)) + if import_method in ['web-download', 'copy-image']: internal_plugin = internal_plugins.get_import_plugin(**kwargs) flow.add(internal_plugin) @@ -678,7 +771,7 @@ def get_flow(**kwargs): complete_task = _CompleteTask(task_id, task_type, task_repo, - image_id) + action_wrapper) flow.add(complete_task) with action_wrapper as action: diff --git a/glance/tests/functional/v2/test_images_import_locking.py b/glance/tests/functional/v2/test_images_import_locking.py new file mode 100644 index 0000000000..3409739789 --- /dev/null +++ b/glance/tests/functional/v2/test_images_import_locking.py @@ -0,0 +1,268 @@ +# Copyright 2020 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import os +from testtools import content as ttc +import textwrap +import time +from unittest import mock +import uuid + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_serialization import jsonutils +from oslo_utils import fixture as time_fixture +import webob + +from glance.common import config +from glance.common import wsgi +import glance.db.sqlalchemy.api +from glance.tests import utils as test_utils +import glance_store + +LOG = logging.getLogger(__name__) +TENANT1 = str(uuid.uuid4()) +CONF = cfg.CONF + + +class SynchronousAPIBase(test_utils.BaseTestCase): + """A test base class that provides synchronous calling into the API + without starting a separate server, and with a simple paste + pipeline. Configured with multi-store and a real database. + """ + + @mock.patch('oslo_db.sqlalchemy.enginefacade.writer.get_engine') + def setup_database(self, mock_get_engine): + db_file = 'sqlite:///%s/test-%s.db' % (self.test_dir, + uuid.uuid4()) + self.config(connection=db_file, group='database') + + # NOTE(danms): Make sure that we clear the current global + # database configuration, provision a temporary database file, + # and run migrations with our configuration to define the + # schema there. + glance.db.sqlalchemy.api.clear_db_env() + engine = glance.db.sqlalchemy.api.get_engine() + mock_get_engine.return_value = engine + with mock.patch('logging.config'): + # NOTE(danms): The alembic config in the env module will break our + # BaseTestCase logging setup. So mock that out to prevent it while + # we db_sync. + test_utils.db_sync(engine=engine) + + def setup_simple_paste(self): + self.paste_config = os.path.join(self.test_dir, 'glance-api-paste.ini') + with open(self.paste_config, 'w') as f: + f.write(textwrap.dedent(""" + [filter:context] + paste.filter_factory = glance.api.middleware.context:\ + ContextMiddleware.factory + [filter:fakeauth] + paste.filter_factory = glance.tests.utils:\ + FakeAuthMiddleware.factory + [pipeline:glance-api] + pipeline = context rootapp + [composite:rootapp] + paste.composite_factory = glance.api:root_app_factory + /v2: apiv2app + [app:apiv2app] + paste.app_factory = glance.api.v2.router:API.factory + """)) + + def _store_dir(self, store): + return os.path.join(self.test_dir, store) + + def setup_stores(self): + self.config(enabled_backends={'store1': 'file', 'store2': 'file'}) + glance_store.register_store_opts(CONF, + reserved_stores=wsgi.RESERVED_STORES) + self.config(default_backend='store1', + group='glance_store') + self.config(filesystem_store_datadir=self._store_dir('store1'), + group='store1') + self.config(filesystem_store_datadir=self._store_dir('store2'), + group='store2') + self.config(filesystem_store_datadir=self._store_dir('staging'), + group='os_glance_staging_store') + + glance_store.create_multi_stores(CONF, + reserved_stores=wsgi.RESERVED_STORES) + glance_store.verify_store() + + def setUp(self): + super(SynchronousAPIBase, self).setUp() + + self.setup_database() + self.setup_simple_paste() + self.setup_stores() + + def start_server(self): + config.set_config_defaults() + self.api = config.load_paste_app('glance-api', + conf_file=self.paste_config) + + def _headers(self, custom_headers=None): + base_headers = { + 'X-Identity-Status': 'Confirmed', + 'X-Auth-Token': '932c5c84-02ac-4fe5-a9ba-620af0e2bb96', + 'X-User-Id': 'f9a41d13-0c13-47e9-bee2-ce4e8bfe958e', + 'X-Tenant-Id': TENANT1, + 'Content-Type': 'application/json', + 'X-Roles': 'admin', + } + base_headers.update(custom_headers or {}) + return base_headers + + def api_get(self, url, headers=None): + headers = self._headers(headers) + req = webob.Request.blank(url, method='GET', + headers=headers) + return self.api(req) + + def api_post(self, url, data=None, json=None, headers=None): + headers = self._headers(headers) + req = webob.Request.blank(url, method='POST', + headers=headers) + if json and not data: + data = jsonutils.dumps(json).encode() + headers['Content-Type'] = 'application/json' + if data: + req.body = data + LOG.debug(req.as_bytes()) + return self.api(req) + + def api_put(self, url, data=None, json=None, headers=None): + headers = self._headers(headers) + req = webob.Request.blank(url, method='PUT', + headers=headers) + if json and not data: + data = jsonutils.dumps(json).encode() + if data: + req.body = data + return self.api(req) + + +class TestImageImportLocking(SynchronousAPIBase): + def _import_copy(self, image_id, stores): + """Do an import of image_id to the given stores.""" + body = {'method': {'name': 'copy-image'}, + 'stores': stores, + 'all_stores': False} + + return self.api_post( + '/v2/images/%s/import' % image_id, + json=body) + + def _create_and_import(self, stores=[]): + """Create an image, stage data, and import into the given stores. + + :returns: image_id + """ + resp = self.api_post('/v2/images', + json={'name': 'foo', + 'container_format': 'bare', + 'disk_format': 'raw'}) + image = jsonutils.loads(resp.text) + + resp = self.api_put( + '/v2/images/%s/stage' % image['id'], + headers={'Content-Type': 'application/octet-stream'}, + data=b'IMAGEDATA') + self.assertEqual(204, resp.status_code) + + body = {'method': {'name': 'glance-direct'}} + if stores: + body['stores'] = stores + + resp = self.api_post( + '/v2/images/%s/import' % image['id'], + json=body) + + self.assertEqual(202, resp.status_code) + + # Make sure it goes active + for i in range(0, 10): + image = self.api_get('/v2/images/%s' % image['id']).json + if not image.get('os_glance_import_task'): + break + self.addDetail('Create-Import task id', + ttc.text_content(image['os_glance_import_task'])) + time.sleep(1) + + self.assertEqual('active', image['status']) + + return image['id'] + + def _test_import_copy(self, warp_time=False): + self.start_server() + state = {} + + # Create and import an image with no pipeline stall + image_id = self._create_and_import(stores=['store1']) + + # Set up a fake data pipeline that will stall until we are ready + # to unblock it + def slow_fake_set_data(data_iter, backend=None, set_active=True): + while True: + state['running'] = True + time.sleep(0.1) + + # Constrain oslo timeutils time so we can manipulate it + tf = time_fixture.TimeFixture() + self.useFixture(tf) + + # Turn on the delayed data pipeline and start a copy-image + # import which will hang out for a while + with mock.patch('glance.domain.proxy.Image.set_data') as mock_sd: + mock_sd.side_effect = slow_fake_set_data + + resp = self._import_copy(image_id, ['store2']) + self.addDetail('First import response', + ttc.text_content(str(resp))) + self.assertEqual(202, resp.status_code) + + # Wait to make sure the data stream gets started + for i in range(0, 10): + if state: + break + time.sleep(0.1) + + # Make sure the first import got to the point where the + # hanging loop will hold it in processing state + self.assertTrue(state.get('running', False), + 'slow_fake_set_data() never ran') + + # If we're warping time, then advance the clock by two hours + if warp_time: + tf.advance_time_delta(datetime.timedelta(hours=2)) + + # Try a second copy-image import. If we are warping time, + # expect the lock to be busted. If not, then we should get + # a 409 Conflict. + resp = self._import_copy(image_id, ['store2']) + + self.addDetail('Second import response', + ttc.text_content(str(resp))) + if warp_time: + self.assertEqual(202, resp.status_code) + else: + self.assertEqual(409, resp.status_code) + + def test_import_copy_locked(self): + self._test_import_copy(warp_time=False) + + def test_import_copy_bust_lock(self): + self._test_import_copy(warp_time=True) diff --git a/glance/tests/unit/async_/flows/test_api_image_import.py b/glance/tests/unit/async_/flows/test_api_image_import.py index 74b70c4f46..e3eb97d396 100644 --- a/glance/tests/unit/async_/flows/test_api_image_import.py +++ b/glance/tests/unit/async_/flows/test_api_image_import.py @@ -63,6 +63,8 @@ class TestApiImageImportTask(test_utils.BaseTestCase): self.mock_task_repo = mock.MagicMock() self.mock_image_repo = mock.MagicMock() + self.mock_image_repo.get.return_value.extra_properties = { + 'os_glance_import_task': TASK_ID1} @mock.patch('glance.async_.flows.api_image_import._VerifyStaging.__init__') @mock.patch('taskflow.patterns.linear_flow.Flow.add') @@ -102,6 +104,61 @@ class TestApiImageImportTask(test_utils.BaseTestCase): import_req=self.gd_task_input['import_req']) +class TestImageLock(test_utils.BaseTestCase): + def setUp(self): + super(TestImageLock, self).setUp() + self.img_repo = mock.MagicMock() + + @mock.patch('glance.async_.flows.api_image_import.LOG') + def test_execute_confirms_lock(self, mock_log): + self.img_repo.get.return_value.extra_properties = { + 'os_glance_import_task': TASK_ID1} + wrapper = import_flow.ImportActionWrapper(self.img_repo, IMAGE_ID1, + TASK_ID1) + imagelock = import_flow._ImageLock(TASK_ID1, TASK_TYPE, wrapper) + imagelock.execute() + mock_log.debug.assert_called_once_with('Image %(image)s import task ' + '%(task)s lock confirmed', + {'image': IMAGE_ID1, + 'task': TASK_ID1}) + + @mock.patch('glance.async_.flows.api_image_import.LOG') + def test_execute_confirms_lock_not_held(self, mock_log): + wrapper = import_flow.ImportActionWrapper(self.img_repo, IMAGE_ID1, + TASK_ID1) + imagelock = import_flow._ImageLock(TASK_ID1, TASK_TYPE, wrapper) + self.assertRaises(exception.TaskAbortedError, + imagelock.execute) + + @mock.patch('glance.async_.flows.api_image_import.LOG') + def test_revert_drops_lock(self, mock_log): + wrapper = import_flow.ImportActionWrapper(self.img_repo, IMAGE_ID1, + TASK_ID1) + imagelock = import_flow._ImageLock(TASK_ID1, TASK_TYPE, wrapper) + with mock.patch.object(wrapper, 'drop_lock_for_task') as mock_drop: + imagelock.revert(None) + mock_drop.assert_called_once_with() + mock_log.debug.assert_called_once_with('Image %(image)s import task ' + '%(task)s dropped its lock ' + 'after failure', + {'image': IMAGE_ID1, + 'task': TASK_ID1}) + + @mock.patch('glance.async_.flows.api_image_import.LOG') + def test_revert_drops_lock_missing(self, mock_log): + wrapper = import_flow.ImportActionWrapper(self.img_repo, IMAGE_ID1, + TASK_ID1) + imagelock = import_flow._ImageLock(TASK_ID1, TASK_TYPE, wrapper) + with mock.patch.object(wrapper, 'drop_lock_for_task') as mock_drop: + mock_drop.side_effect = exception.NotFound() + imagelock.revert(None) + mock_log.warning.assert_called_once_with('Image %(image)s import task ' + '%(task)s lost its lock ' + 'during execution!', + {'image': IMAGE_ID1, + 'task': TASK_ID1}) + + class TestImportToStoreTask(test_utils.BaseTestCase): def setUp(self): @@ -137,7 +194,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase): img_repo = mock.MagicMock() img_repo.get.return_value = image task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", @@ -157,7 +215,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase): img_repo = mock.MagicMock() img_repo.get.return_value = image task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", @@ -177,7 +236,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase): img_repo = mock.MagicMock() img_repo.get.return_value = image task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", @@ -198,7 +258,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase): img_repo = mock.MagicMock() task_repo = mock.MagicMock() task_repo.get.return_value.status = 'processing' - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", @@ -250,7 +311,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase): def test_raises_when_image_deleted(self): img_repo = mock.MagicMock() task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", @@ -265,13 +327,15 @@ class TestImportToStoreTask(test_utils.BaseTestCase): def test_remove_store_from_property(self, mock_import): img_repo = mock.MagicMock() task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", "store1", True, True) - extra_properties = {"os_glance_importing_to_stores": "store1,store2"} + extra_properties = {"os_glance_importing_to_stores": "store1,store2", + "os_glance_import_task": TASK_ID1} image = self.img_factory.new_image(image_id=UUID1, extra_properties=extra_properties) img_repo.get.return_value = image @@ -282,13 +346,15 @@ class TestImportToStoreTask(test_utils.BaseTestCase): def test_revert_updates_status_keys(self): img_repo = mock.MagicMock() task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", "store1", True, True) - extra_properties = {"os_glance_importing_to_stores": "store1,store2"} + extra_properties = {"os_glance_importing_to_stores": "store1,store2", + "os_glance_import_task": TASK_ID1} image = self.img_factory.new_image(image_id=UUID1, extra_properties=extra_properties) img_repo.get.return_value = image @@ -313,13 +379,16 @@ class TestImportToStoreTask(test_utils.BaseTestCase): def test_raises_when_all_stores_must_succeed(self, mock_import): img_repo = mock.MagicMock() task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", "store1", True, True) - image = self.img_factory.new_image(image_id=UUID1) + extra_properties = {'os_glance_import_task': TASK_ID1} + image = self.img_factory.new_image(image_id=UUID1, + extra_properties=extra_properties) img_repo.get.return_value = image mock_import.set_image_data.side_effect = \ cursive_exception.SignatureVerificationError( @@ -331,13 +400,16 @@ class TestImportToStoreTask(test_utils.BaseTestCase): def test_doesnt_raise_when_not_all_stores_must_succeed(self, mock_import): img_repo = mock.MagicMock() task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", "store1", False, True) - image = self.img_factory.new_image(image_id=UUID1) + extra_properties = {'os_glance_import_task': TASK_ID1} + image = self.img_factory.new_image(image_id=UUID1, + extra_properties=extra_properties) img_repo.get.return_value = image mock_import.set_image_data.side_effect = \ cursive_exception.SignatureVerificationError( @@ -473,7 +545,7 @@ class TestImportCopyImageTask(test_utils.BaseTestCase): fake_img = mock.MagicMock() fake_img.id = IMAGE_ID1 fake_img.status = 'active' - fake_img.extra_properties = {} + fake_img.extra_properties = {'os_glance_import_task': TASK_ID1} admin_repo.get.return_value = fake_img import_flow.get_flow(task_id=TASK_ID1, @@ -494,10 +566,13 @@ class TestImportCopyImageTask(test_utils.BaseTestCase): class TestVerifyImageStateTask(test_utils.BaseTestCase): def test_verify_active_status(self): - fake_img = mock.MagicMock(status='active') + fake_img = mock.MagicMock(status='active', + extra_properties={ + 'os_glance_import_task': TASK_ID1}) mock_repo = mock.MagicMock() mock_repo.get.return_value = fake_img - wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) task = import_flow._VerifyImageState(TASK_ID1, TASK_TYPE, wrapper, 'anything!') @@ -531,7 +606,10 @@ class TestVerifyImageStateTask(test_utils.BaseTestCase): class TestImportActionWrapper(test_utils.BaseTestCase): def test_wrapper_success(self): mock_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1) + mock_repo.get.return_value.extra_properties = { + 'os_glance_import_task': TASK_ID1} + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) with wrapper as action: self.assertIsInstance(action, import_flow._ImportActions) mock_repo.get.assert_called_once_with(IMAGE_ID1) @@ -541,7 +619,10 @@ class TestImportActionWrapper(test_utils.BaseTestCase): def test_wrapper_failure(self): mock_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1) + mock_repo.get.return_value.extra_properties = { + 'os_glance_import_task': TASK_ID1} + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) class SpecificError(Exception): pass @@ -561,7 +642,9 @@ class TestImportActionWrapper(test_utils.BaseTestCase): def test_wrapper_logs_status(self, mock_log): mock_repo = mock.MagicMock() mock_image = mock_repo.get.return_value - wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1) + mock_image.extra_properties = {'os_glance_import_task': TASK_ID1} + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) mock_image.status = 'foo' with wrapper as action: @@ -575,6 +658,61 @@ class TestImportActionWrapper(test_utils.BaseTestCase): 'new_status': 'bar'}) self.assertEqual('bar', mock_image.status) + def test_image_id_property(self): + mock_repo = mock.MagicMock() + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) + self.assertEqual(IMAGE_ID1, wrapper.image_id) + + def test_drop_lock_for_task(self): + mock_repo = mock.MagicMock() + mock_repo.get.return_value.extra_properties = { + 'os_glance_import_task': TASK_ID1} + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) + wrapper.drop_lock_for_task() + mock_repo.delete_property_atomic.assert_called_once_with( + mock_repo.get.return_value, 'os_glance_import_task', TASK_ID1) + + def test_assert_task_lock(self): + mock_repo = mock.MagicMock() + mock_repo.get.return_value.extra_properties = { + 'os_glance_import_task': TASK_ID1} + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) + wrapper.assert_task_lock() + + # Try again with a different task ID and it should fail + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + 'foo') + self.assertRaises(exception.TaskAbortedError, + wrapper.assert_task_lock) + + def _grab_image(self, wrapper): + with wrapper: + pass + + @mock.patch.object(import_flow, 'LOG') + def test_check_task_lock(self, mock_log): + mock_repo = mock.MagicMock() + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) + image = mock.MagicMock(image_id=IMAGE_ID1) + image.extra_properties = {'os_glance_import_task': TASK_ID1} + mock_repo.get.return_value = image + self._grab_image(wrapper) + mock_log.error.assert_not_called() + + image.extra_properties['os_glance_import_task'] = 'somethingelse' + self.assertRaises(exception.TaskAbortedError, + self._grab_image, wrapper) + mock_log.error.assert_called_once_with( + 'Image %(image)s import task %(task)s attempted to take action on ' + 'image, but other task %(other)s holds the lock; Aborting.', + {'image': image.image_id, + 'task': TASK_ID1, + 'other': 'somethingelse'}) + class TestImportActions(test_utils.BaseTestCase): def setUp(self): @@ -767,30 +905,48 @@ class TestCompleteTask(test_utils.BaseTestCase): super(TestCompleteTask, self).setUp() self.task_repo = mock.MagicMock() self.task = mock.MagicMock() + self.wrapper = mock.MagicMock(image_id=IMAGE_ID1) def test_execute(self, mock_get_task): complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE, - self.task_repo, IMAGE_ID1) + self.task_repo, self.wrapper) mock_get_task.return_value = self.task complete.execute() mock_get_task.assert_called_once_with(self.task_repo, TASK_ID1) self.task.succeed.assert_called_once_with({'image_id': IMAGE_ID1}) self.task_repo.save.assert_called_once_with(self.task) + self.wrapper.drop_lock_for_task.assert_called_once_with() def test_execute_no_task(self, mock_get_task): mock_get_task.return_value = None complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE, - self.task_repo, IMAGE_ID1) + self.task_repo, self.wrapper) complete.execute() self.task_repo.save.assert_not_called() + self.wrapper.drop_lock_for_task.assert_called_once_with() def test_execute_succeed_fails(self, mock_get_task): mock_get_task.return_value = self.task self.task.succeed.side_effect = Exception('testing') complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE, - self.task_repo, IMAGE_ID1) + self.task_repo, self.wrapper) complete.execute() self.task.fail.assert_called_once_with( _('Error: : testing')) self.task_repo.save.assert_called_once_with(self.task) + self.wrapper.drop_lock_for_task.assert_called_once_with() + + def test_execute_drop_lock_fails(self, mock_get_task): + mock_get_task.return_value = self.task + self.wrapper.drop_lock_for_task.side_effect = exception.NotFound() + complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE, + self.task_repo, self.wrapper) + with mock.patch('glance.async_.flows.api_image_import.LOG') as m_log: + complete.execute() + m_log.error.assert_called_once_with('Image %(image)s import task ' + '%(task)s did not hold the ' + 'lock upon completion!', + {'image': IMAGE_ID1, + 'task': TASK_ID1}) + self.task.succeed.assert_called_once_with({'image_id': IMAGE_ID1}) diff --git a/glance/tests/unit/async_/test_async.py b/glance/tests/unit/async_/test_async.py index 2ec190e8eb..1574a65ce2 100644 --- a/glance/tests/unit/async_/test_async.py +++ b/glance/tests/unit/async_/test_async.py @@ -88,13 +88,15 @@ class TestImportTaskFlow(test_utils.BaseTestCase): def _get_flow(self, import_req=None): inputs = { - 'task_id': mock.MagicMock(), + 'task_id': mock.sentinel.task_id, 'task_type': mock.MagicMock(), 'task_repo': mock.MagicMock(), 'image_repo': mock.MagicMock(), 'image_id': mock.MagicMock(), 'import_req': import_req or mock.MagicMock() } + inputs['image_repo'].get.return_value = mock.MagicMock( + extra_properties={'os_glance_import_task': mock.sentinel.task_id}) flow = api_image_import.get_flow(**inputs) return flow @@ -116,8 +118,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase): flow = self._get_flow() flow_comp = self._get_flow_tasks(flow) - # assert flow has 5 tasks - self.assertEqual(5, len(flow_comp)) + # assert flow has 6 tasks + self.assertEqual(6, len(flow_comp)) for c in self.base_flow: self.assertIn(c, flow_comp) @@ -135,8 +137,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase): flow = self._get_flow(import_req=import_req) flow_comp = self._get_flow_tasks(flow) - # assert flow has 6 tasks - self.assertEqual(6, len(flow_comp)) + # assert flow has 7 tasks + self.assertEqual(7, len(flow_comp)) for c in self.base_flow: self.assertIn(c, flow_comp) self.assertIn('WebDownload', flow_comp) @@ -157,8 +159,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase): flow = self._get_flow(import_req=import_req) flow_comp = self._get_flow_tasks(flow) - # assert flow has 6 tasks - self.assertEqual(6, len(flow_comp)) + # assert flow has 7 tasks + self.assertEqual(7, len(flow_comp)) for c in self.base_flow: self.assertIn(c, flow_comp) self.assertIn('CopyImage', flow_comp) @@ -174,8 +176,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase): flow = self._get_flow() flow_comp = self._get_flow_tasks(flow) - # assert flow has 8 tasks (base_flow + plugins) - self.assertEqual(8, len(flow_comp)) + # assert flow has 9 tasks (base_flow + plugins) + self.assertEqual(9, len(flow_comp)) for c in self.base_flow: self.assertIn(c, flow_comp) for c in self.import_plugins: @@ -202,8 +204,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase): flow = self._get_flow(import_req=import_req) flow_comp = self._get_flow_tasks(flow) - # assert flow has 6 tasks - self.assertEqual(6, len(flow_comp)) + # assert flow has 7 tasks + self.assertEqual(7, len(flow_comp)) for c in self.base_flow: self.assertIn(c, flow_comp) self.assertIn('CopyImage', flow_comp) diff --git a/glance/tests/unit/v2/test_images_resource.py b/glance/tests/unit/v2/test_images_resource.py index 5d664ceb60..c4869a579a 100644 --- a/glance/tests/unit/v2/test_images_resource.py +++ b/glance/tests/unit/v2/test_images_resource.py @@ -23,6 +23,7 @@ from castellan.common import exception as castellan_exception import glance_store as store from oslo_config import cfg from oslo_serialization import jsonutils +from oslo_utils import fixture import six from six.moves import http_client as http # NOTE(jokke): simplified transition to py3, behaves like py2 xrange @@ -39,6 +40,7 @@ import glance.schema from glance.tests.unit import base from glance.tests.unit.keymgr import fake as fake_keymgr import glance.tests.unit.utils as unit_test_utils +from glance.tests.unit.v2 import test_tasks_resource import glance.tests.utils as test_utils CONF = cfg.CONF @@ -745,8 +747,9 @@ class TestImagesController(base.IsolatedUnitTest): self.controller.import_image, request, UUID4, {'method': {'name': 'glance-direct'}}) + @mock.patch('glance.db.simple.api.image_set_property_atomic') @mock.patch('glance.api.common.get_thread_pool') - def test_image_import_raises_bad_request(self, mock_gpt): + def test_image_import_raises_bad_request(self, mock_gpt, mock_spa): request = unit_test_utils.get_fake_request() with mock.patch.object( glance.api.authorization.ImageRepoProxy, 'get') as mock_get: @@ -2955,19 +2958,25 @@ class TestImagesController(base.IsolatedUnitTest): pos = self.controller._get_locations_op_pos('1', None, True) self.assertIsNone(pos) + @mock.patch('glance.db.simple.api.image_set_property_atomic') @mock.patch.object(glance.api.authorization.TaskFactoryProxy, 'new_task') @mock.patch.object(glance.domain.TaskExecutorFactory, 'new_task_executor') @mock.patch('glance.api.common.get_thread_pool') - def test_image_import(self, mock_gtp, mock_nte, mock_nt): + def test_image_import(self, mock_gtp, mock_nte, mock_nt, mock_spa): request = unit_test_utils.get_fake_request() + image = FakeImage(status='uploading') with mock.patch.object( glance.api.authorization.ImageRepoProxy, 'get') as mock_get: - mock_get.return_value = FakeImage(status='uploading') + mock_get.return_value = image output = self.controller.import_image( request, UUID4, {'method': {'name': 'glance-direct'}}) self.assertEqual(UUID4, output) + # Make sure we set the lock on the image + mock_spa.assert_called_once_with(UUID4, 'os_glance_import_task', + mock_nt.return_value.task_id) + # Make sure we grabbed a thread pool, and that we asked it # to spawn the task's run method with it. mock_gtp.assert_called_once_with('tasks_pool') @@ -2989,12 +2998,14 @@ class TestImagesController(base.IsolatedUnitTest): # a task mock_new_task.assert_not_called() + @mock.patch('glance.db.simple.api.image_set_property_atomic') @mock.patch('glance.context.RequestContext.elevated') @mock.patch.object(glance.domain.TaskFactory, 'new_task') @mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get') def test_image_import_copy_allowed_by_policy(self, mock_get, mock_new_task, mock_elevated, + mock_spa, allowed=True): # NOTE(danms): FakeImage is owned by utils.TENANT1. Try to do the # import as TENANT2, but with a policy exception @@ -3031,6 +3042,150 @@ class TestImagesController(base.IsolatedUnitTest): self.test_image_import_copy_allowed_by_policy, allowed=False) + @mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get') + def test_image_import_locked(self, mock_get): + task = test_tasks_resource._db_fixture(test_tasks_resource.UUID1, + status='pending') + self.db.task_create(None, task) + image = FakeImage(status='uploading') + # Image is locked with a valid task that has not aged out, so + # the lock will not be busted. + image.extra_properties['os_glance_import_task'] = task['id'] + mock_get.return_value = image + + request = unit_test_utils.get_fake_request(tenant=TENANT1) + req_body = {'method': {'name': 'glance-direct'}} + + exc = self.assertRaises(webob.exc.HTTPConflict, + self.controller.import_image, + request, UUID1, req_body) + self.assertEqual('Image has active task', str(exc)) + + @mock.patch('glance.db.simple.api.image_set_property_atomic') + @mock.patch('glance.db.simple.api.image_delete_property_atomic') + @mock.patch.object(glance.api.authorization.TaskFactoryProxy, 'new_task') + @mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get') + def test_image_import_locked_by_reaped_task(self, mock_get, mock_nt, + mock_dpi, mock_spi): + image = FakeImage(status='uploading') + # Image is locked by some other task that TaskRepo will not find + image.extra_properties['os_glance_import_task'] = 'missing' + mock_get.return_value = image + + request = unit_test_utils.get_fake_request(tenant=TENANT1) + req_body = {'method': {'name': 'glance-direct'}} + + mock_nt.return_value.task_id = 'mytask' + self.controller.import_image(request, UUID1, req_body) + + # We should have atomically deleted the missing task lock + mock_dpi.assert_called_once_with(image.id, 'os_glance_import_task', + 'missing') + # We should have atomically grabbed the lock with our task id + mock_spi.assert_called_once_with(image.id, 'os_glance_import_task', + 'mytask') + + @mock.patch('glance.db.simple.api.image_set_property_atomic') + @mock.patch('glance.db.simple.api.image_delete_property_atomic') + @mock.patch.object(glance.api.authorization.TaskFactoryProxy, 'new_task') + @mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get') + def test_image_import_locked_by_bustable_task(self, mock_get, mock_nt, + mock_dpi, mock_spi, + task_status='processing'): + task = test_tasks_resource._db_fixture( + test_tasks_resource.UUID1, + status=task_status) + self.db.task_create(None, task) + image = FakeImage(status='uploading') + # Image is locked by a task in 'processing' state + image.extra_properties['os_glance_import_task'] = task['id'] + mock_get.return_value = image + + request = unit_test_utils.get_fake_request(tenant=TENANT1) + req_body = {'method': {'name': 'glance-direct'}} + + # Task has only been running for ten minutes + time_fixture = fixture.TimeFixture(task['updated_at'] + + datetime.timedelta(minutes=10)) + self.useFixture(time_fixture) + + mock_nt.return_value.task_id = 'mytask' + + # Task holds the lock, API refuses to bust it + self.assertRaises(webob.exc.HTTPConflict, + self.controller.import_image, + request, UUID1, req_body) + mock_dpi.assert_not_called() + mock_spi.assert_not_called() + mock_nt.assert_not_called() + + # Fast forward to 90 minutes from now + time_fixture.advance_time_delta(datetime.timedelta(minutes=90)) + self.controller.import_image(request, UUID1, req_body) + + # API deleted the other task's lock and locked it for us + mock_dpi.assert_called_once_with(image.id, 'os_glance_import_task', + task['id']) + mock_spi.assert_called_once_with(image.id, 'os_glance_import_task', + 'mytask') + + def test_image_import_locked_by_bustable_terminal_task_failure(self): + # Make sure we don't fail with a task status transition error + self.test_image_import_locked_by_bustable_task(task_status='failure') + + def test_image_import_locked_by_bustable_terminal_task_success(self): + # Make sure we don't fail with a task status transition error + self.test_image_import_locked_by_bustable_task(task_status='success') + + def test_bust_import_lock_race_to_delete(self): + image_repo = mock.MagicMock() + task_repo = mock.MagicMock() + image = mock.MagicMock() + task = mock.MagicMock(id='foo') + # Simulate a race where we tried to bust a specific lock and + # someone else already had, and/or re-locked it + image_repo.delete_property_atomic.side_effect = exception.NotFound + self.assertRaises(exception.Conflict, + self.controller._bust_import_lock, + image_repo, task_repo, + image, task, task.id) + + def test_enforce_lock_log_not_bustable(self, task_status='processing'): + task = test_tasks_resource._db_fixture( + test_tasks_resource.UUID1, + status=task_status) + self.db.task_create(None, task) + request = unit_test_utils.get_fake_request(tenant=TENANT1) + image = FakeImage() + image.extra_properties['os_glance_import_task'] = task['id'] + + # Freeze time to make this repeatable + time_fixture = fixture.TimeFixture(task['updated_at'] + + datetime.timedelta(minutes=55)) + self.useFixture(time_fixture) + + expected_expire = 300 + if task_status == 'pending': + # NOTE(danms): Tasks in 'pending' get double the expiry time, + # so we'd be expecting an extra hour here. + expected_expire += 3600 + + with mock.patch.object(glance.api.v2.images, 'LOG') as mock_log: + self.assertRaises(exception.Conflict, + self.controller._enforce_import_lock, + request, image) + mock_log.warning.assert_called_once_with( + 'Image %(image)s has active import task %(task)s in ' + 'status %(status)s; lock remains valid for %(expire)i ' + 'more seconds', + {'image': image.id, + 'task': task['id'], + 'status': task_status, + 'expire': expected_expire}) + + def test_enforce_lock_pending_takes_longer(self): + self.test_enforce_lock_log_not_bustable(task_status='pending') + def test_delete_encryption_key_no_encryption_key(self): request = unit_test_utils.get_fake_request() fake_encryption_key = self.controller._key_manager.store(