diff --git a/glance/api/v2/images.py b/glance/api/v2/images.py index 3058f5ff68..d7847ddda8 100644 --- a/glance/api/v2/images.py +++ b/glance/api/v2/images.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime import hashlib import os import re @@ -25,6 +26,7 @@ from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils as json from oslo_utils import encodeutils +from oslo_utils import timeutils as oslo_timeutils import six from six.moves import http_client as http import six.moves.urllib.parse as urlparse @@ -98,6 +100,90 @@ class ImagesController(object): return image + def _bust_import_lock(self, admin_image_repo, admin_task_repo, + image, task, task_id): + if task: + # FIXME(danms): It would be good if we had a 'canceled' or + # 'aborted' status here. + try: + task.fail('Expired lock preempted') + admin_task_repo.save(task) + except exception.InvalidTaskStatusTransition: + # NOTE(danms): This may happen if we try to fail a + # task that is in a terminal state, but where the lock + # was never dropped from the image. We will log the + # image, task, and status below so we can just ignore + # here. + pass + + try: + admin_image_repo.delete_property_atomic( + image, 'os_glance_import_task', task_id) + except exception.NotFound: + LOG.warning('Image %(image)s has stale import task %(task)s ' + 'but we lost the race to remove it.', + {'image': image.image_id, + 'task': task_id}) + # We probably lost the race to expire the old lock, but + # act like it is not yet expired to avoid a retry loop. + raise exception.Conflict('Image has active task') + + LOG.warning('Image %(image)s has stale import task %(task)s ' + 'in status %(status)s from %(owner)s; removed lock ' + 'because it had expired.', + {'image': image.image_id, + 'task': task_id, + 'status': task and task.status or 'missing', + 'owner': task and task.owner or 'unknown owner'}) + + def _enforce_import_lock(self, req, image): + admin_context = req.context.elevated() + admin_image_repo = self.gateway.get_repo(admin_context) + admin_task_repo = self.gateway.get_task_repo(admin_context) + other_task = image.extra_properties['os_glance_import_task'] + + expiry = datetime.timedelta(minutes=60) + bustable_states = ('pending', 'processing', 'success', 'failure') + + try: + task = admin_task_repo.get(other_task) + except exception.NotFound: + # NOTE(danms): This could happen if we failed to do an import + # a long time ago, and the task record has since been culled from + # the database, but the task id is still in the lock field. + LOG.warning('Image %(image)s has non-existent import ' + 'task %(task)s; considering it stale', + {'image': image.image_id, + 'task': other_task}) + task = None + age = 0 + else: + age = oslo_timeutils.utcnow() - task.updated_at + if task.status == 'pending': + # NOTE(danms): Tasks in pending state could be queued, + # blocked or otherwise right-about-to-get-going, so we + # double the expiry time for safety. We will report + # time remaining below, so this is not too obscure. + expiry *= 2 + + if not task or (task.status in bustable_states and age >= expiry): + self._bust_import_lock(admin_image_repo, admin_task_repo, + image, task, other_task) + return + + if task.status in bustable_states: + LOG.warning('Image %(image)s has active import task %(task)s in ' + 'status %(status)s; lock remains valid for %(expire)i ' + 'more seconds', + {'image': image.image_id, + 'task': task.task_id, + 'status': task.status, + 'expire': (expiry - age).total_seconds()}) + else: + LOG.debug('Image %(image)s has import task %(task)s in status ' + '%(status)s and does not qualify for expiry.') + raise exception.Conflict('Image has active task') + @utils.mutating def import_image(self, req, image_id, body): image_repo = self.gateway.get_repo(req.context) @@ -142,6 +228,11 @@ class ImagesController(object): raise webob.exc.HTTPForbidden( explanation=_("Operation not permitted")) + if 'os_glance_import_task' in image.extra_properties: + # NOTE(danms): This will raise exception.Conflict if the + # lock is present and valid, or return if absent or invalid. + self._enforce_import_lock(req, image) + stores = [None] if CONF.enabled_backends: try: @@ -214,6 +305,17 @@ class ImagesController(object): import_task = task_factory.new_task(task_type='api_image_import', owner=req.context.owner, task_input=task_input) + + # NOTE(danms): Try to grab the lock for this task + try: + image_repo.set_property_atomic(image, + 'os_glance_import_task', + import_task.task_id) + except exception.Duplicate: + msg = (_("New operation on image '%s' is not permitted as " + "prior operation is still in progress") % image_id) + raise exception.Conflict(msg) + task_repo.add(import_task) task_executor = executor_factory.new_task_executor(req.context) pool = common.get_thread_pool("tasks_pool") @@ -734,7 +836,8 @@ class RequestDeserializer(wsgi.JSONRequestDeserializer): 'size', 'virtual_size', 'direct_url', 'self', 'file', 'schema', 'id', 'os_hash_algo', 'os_hash_value') - _reserved_properties = ('location', 'deleted', 'deleted_at') + _reserved_properties = ('location', 'deleted', 'deleted_at', + 'os_glance_import_task') _base_properties = ('checksum', 'created_at', 'container_format', 'disk_format', 'id', 'min_disk', 'min_ram', 'name', 'size', 'virtual_size', 'status', 'tags', 'owner', diff --git a/glance/async_/flows/api_image_import.py b/glance/async_/flows/api_image_import.py index a846d0aed9..ea7bdf6fbd 100644 --- a/glance/async_/flows/api_image_import.py +++ b/glance/async_/flows/api_image_import.py @@ -101,13 +101,16 @@ class ImportActionWrapper(object): :param image-id: The ID of the image we should be altering """ - def __init__(self, image_repo, image_id): + def __init__(self, image_repo, image_id, task_id): self._image_repo = image_repo self._image_id = image_id + self._task_id = task_id def __enter__(self): self._image = self._image_repo.get(self._image_id) self._image_previous_status = self._image.status + self._assert_task_lock(self._image) + return _ImportActions(self._image) def __exit__(self, type, value, traceback): @@ -123,6 +126,43 @@ class ImportActionWrapper(object): 'new_status': self._image.status}) self._image_repo.save(self._image, self._image_previous_status) + @property + def image_id(self): + return self._image_id + + def drop_lock_for_task(self): + """Delete the import lock for our task. + + This is an atomic operation and thus does not require a context + for the image save. Note that after calling this method, no + further actions will be allowed on the image. + + :raises: NotFound if the image was not locked by the expected task. + """ + image = self._image_repo.get(self._image_id) + self._image_repo.delete_property_atomic(image, + 'os_glance_import_task', + self._task_id) + + def _assert_task_lock(self, image): + task_lock = image.extra_properties.get('os_glance_import_task') + if task_lock != self._task_id: + LOG.error('Image %(image)s import task %(task)s attempted to ' + 'take action on image, but other task %(other)s holds ' + 'the lock; Aborting.', + {'image': self._image_id, + 'task': self._task_id, + 'other': task_lock}) + raise exception.TaskAbortedError() + + def assert_task_lock(self): + """Assert that we own the task lock on the image. + + :raises: TaskAbortedError if we do not + """ + image = self._image_repo.get(self._image_id) + self._assert_task_lock(image) + class _ImportActions(object): """Actions available for being performed on an image during import. @@ -319,6 +359,41 @@ class _DeleteFromFS(task.Task): 'fn': file_path}) +class _ImageLock(task.Task): + def __init__(self, task_id, task_type, action_wrapper): + self.task_id = task_id + self.task_type = task_type + self.action_wrapper = action_wrapper + super(_ImageLock, self).__init__( + name='%s-ImageLock-%s' % (task_type, task_id)) + + def execute(self): + self.action_wrapper.assert_task_lock() + LOG.debug('Image %(image)s import task %(task)s lock confirmed', + {'image': self.action_wrapper.image_id, + 'task': self.task_id}) + + def revert(self, result, **kwargs): + """Drop our claim on the image. + + If we have failed, we need to drop our import_task lock on the image + so that something else can have a try. Note that we may have been + preempted so we should only drop *our* lock. + """ + try: + self.action_wrapper.drop_lock_for_task() + except exception.NotFound: + LOG.warning('Image %(image)s import task %(task)s lost its ' + 'lock during execution!', + {'image': self.action_wrapper.image_id, + 'task': self.task_id}) + else: + LOG.debug('Image %(image)s import task %(task)s dropped ' + 'its lock after failure', + {'image': self.action_wrapper.image_id, + 'task': self.task_id}) + + class _VerifyStaging(task.Task): # NOTE(jokke): This could be also for example "staging_path" but to @@ -548,24 +623,17 @@ class _VerifyImageState(task.Task): class _CompleteTask(task.Task): - def __init__(self, task_id, task_type, task_repo, image_id): + def __init__(self, task_id, task_type, task_repo, action_wrapper): self.task_id = task_id self.task_type = task_type self.task_repo = task_repo - self.image_id = image_id + self.action_wrapper = action_wrapper super(_CompleteTask, self).__init__( name='%s-CompleteTask-%s' % (task_type, task_id)) - def execute(self): - """Finishing the task flow - - :param image_id: Glance Image ID - """ - task = script_utils.get_task(self.task_repo, self.task_id) - if task is None: - return + def _finish_task(self, task): try: - task.succeed({'image_id': self.image_id}) + task.succeed({'image_id': self.action_wrapper.image_id}) except Exception as e: # Note: The message string contains Error in it to indicate # in the task.message that it's a error message for the user. @@ -584,6 +652,28 @@ class _CompleteTask(task.Task): finally: self.task_repo.save(task) + def _drop_lock(self): + try: + self.action_wrapper.drop_lock_for_task() + except exception.NotFound: + # NOTE(danms): This would be really bad, but there is probably + # not much point in reverting all the way back if we got this + # far. Log the carnage for forensics. + LOG.error('Image %(image)s import task %(task)s did not hold the ' + 'lock upon completion!', + {'image': self.action_wrapper.image_id, + 'task': self.task_id}) + + def execute(self): + """Finishing the task flow + + :param image_id: Glance Image ID + """ + task = script_utils.get_task(self.task_repo, self.task_id) + if task is not None: + self._finish_task(task) + self._drop_lock() + LOG.info(_LI("%(task_id)s of %(task_type)s completed"), {'task_id': self.task_id, 'task_type': self.task_type}) @@ -616,7 +706,8 @@ def get_flow(**kwargs): # Instantiate an action wrapper with the admin repo if we got one, # otherwise with the regular repo. - action_wrapper = ImportActionWrapper(admin_repo or image_repo, image_id) + action_wrapper = ImportActionWrapper(admin_repo or image_repo, image_id, + task_id) if not uri and import_method in ['glance-direct', 'copy-image']: if CONF.enabled_backends: @@ -627,6 +718,8 @@ def get_flow(**kwargs): flow = lf.Flow(task_type, retry=retry.AlwaysRevert()) + flow.add(_ImageLock(task_id, task_type, action_wrapper)) + if import_method in ['web-download', 'copy-image']: internal_plugin = internal_plugins.get_import_plugin(**kwargs) flow.add(internal_plugin) @@ -678,7 +771,7 @@ def get_flow(**kwargs): complete_task = _CompleteTask(task_id, task_type, task_repo, - image_id) + action_wrapper) flow.add(complete_task) with action_wrapper as action: diff --git a/glance/tests/functional/v2/test_images_import_locking.py b/glance/tests/functional/v2/test_images_import_locking.py new file mode 100644 index 0000000000..3409739789 --- /dev/null +++ b/glance/tests/functional/v2/test_images_import_locking.py @@ -0,0 +1,268 @@ +# Copyright 2020 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import os +from testtools import content as ttc +import textwrap +import time +from unittest import mock +import uuid + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_serialization import jsonutils +from oslo_utils import fixture as time_fixture +import webob + +from glance.common import config +from glance.common import wsgi +import glance.db.sqlalchemy.api +from glance.tests import utils as test_utils +import glance_store + +LOG = logging.getLogger(__name__) +TENANT1 = str(uuid.uuid4()) +CONF = cfg.CONF + + +class SynchronousAPIBase(test_utils.BaseTestCase): + """A test base class that provides synchronous calling into the API + without starting a separate server, and with a simple paste + pipeline. Configured with multi-store and a real database. + """ + + @mock.patch('oslo_db.sqlalchemy.enginefacade.writer.get_engine') + def setup_database(self, mock_get_engine): + db_file = 'sqlite:///%s/test-%s.db' % (self.test_dir, + uuid.uuid4()) + self.config(connection=db_file, group='database') + + # NOTE(danms): Make sure that we clear the current global + # database configuration, provision a temporary database file, + # and run migrations with our configuration to define the + # schema there. + glance.db.sqlalchemy.api.clear_db_env() + engine = glance.db.sqlalchemy.api.get_engine() + mock_get_engine.return_value = engine + with mock.patch('logging.config'): + # NOTE(danms): The alembic config in the env module will break our + # BaseTestCase logging setup. So mock that out to prevent it while + # we db_sync. + test_utils.db_sync(engine=engine) + + def setup_simple_paste(self): + self.paste_config = os.path.join(self.test_dir, 'glance-api-paste.ini') + with open(self.paste_config, 'w') as f: + f.write(textwrap.dedent(""" + [filter:context] + paste.filter_factory = glance.api.middleware.context:\ + ContextMiddleware.factory + [filter:fakeauth] + paste.filter_factory = glance.tests.utils:\ + FakeAuthMiddleware.factory + [pipeline:glance-api] + pipeline = context rootapp + [composite:rootapp] + paste.composite_factory = glance.api:root_app_factory + /v2: apiv2app + [app:apiv2app] + paste.app_factory = glance.api.v2.router:API.factory + """)) + + def _store_dir(self, store): + return os.path.join(self.test_dir, store) + + def setup_stores(self): + self.config(enabled_backends={'store1': 'file', 'store2': 'file'}) + glance_store.register_store_opts(CONF, + reserved_stores=wsgi.RESERVED_STORES) + self.config(default_backend='store1', + group='glance_store') + self.config(filesystem_store_datadir=self._store_dir('store1'), + group='store1') + self.config(filesystem_store_datadir=self._store_dir('store2'), + group='store2') + self.config(filesystem_store_datadir=self._store_dir('staging'), + group='os_glance_staging_store') + + glance_store.create_multi_stores(CONF, + reserved_stores=wsgi.RESERVED_STORES) + glance_store.verify_store() + + def setUp(self): + super(SynchronousAPIBase, self).setUp() + + self.setup_database() + self.setup_simple_paste() + self.setup_stores() + + def start_server(self): + config.set_config_defaults() + self.api = config.load_paste_app('glance-api', + conf_file=self.paste_config) + + def _headers(self, custom_headers=None): + base_headers = { + 'X-Identity-Status': 'Confirmed', + 'X-Auth-Token': '932c5c84-02ac-4fe5-a9ba-620af0e2bb96', + 'X-User-Id': 'f9a41d13-0c13-47e9-bee2-ce4e8bfe958e', + 'X-Tenant-Id': TENANT1, + 'Content-Type': 'application/json', + 'X-Roles': 'admin', + } + base_headers.update(custom_headers or {}) + return base_headers + + def api_get(self, url, headers=None): + headers = self._headers(headers) + req = webob.Request.blank(url, method='GET', + headers=headers) + return self.api(req) + + def api_post(self, url, data=None, json=None, headers=None): + headers = self._headers(headers) + req = webob.Request.blank(url, method='POST', + headers=headers) + if json and not data: + data = jsonutils.dumps(json).encode() + headers['Content-Type'] = 'application/json' + if data: + req.body = data + LOG.debug(req.as_bytes()) + return self.api(req) + + def api_put(self, url, data=None, json=None, headers=None): + headers = self._headers(headers) + req = webob.Request.blank(url, method='PUT', + headers=headers) + if json and not data: + data = jsonutils.dumps(json).encode() + if data: + req.body = data + return self.api(req) + + +class TestImageImportLocking(SynchronousAPIBase): + def _import_copy(self, image_id, stores): + """Do an import of image_id to the given stores.""" + body = {'method': {'name': 'copy-image'}, + 'stores': stores, + 'all_stores': False} + + return self.api_post( + '/v2/images/%s/import' % image_id, + json=body) + + def _create_and_import(self, stores=[]): + """Create an image, stage data, and import into the given stores. + + :returns: image_id + """ + resp = self.api_post('/v2/images', + json={'name': 'foo', + 'container_format': 'bare', + 'disk_format': 'raw'}) + image = jsonutils.loads(resp.text) + + resp = self.api_put( + '/v2/images/%s/stage' % image['id'], + headers={'Content-Type': 'application/octet-stream'}, + data=b'IMAGEDATA') + self.assertEqual(204, resp.status_code) + + body = {'method': {'name': 'glance-direct'}} + if stores: + body['stores'] = stores + + resp = self.api_post( + '/v2/images/%s/import' % image['id'], + json=body) + + self.assertEqual(202, resp.status_code) + + # Make sure it goes active + for i in range(0, 10): + image = self.api_get('/v2/images/%s' % image['id']).json + if not image.get('os_glance_import_task'): + break + self.addDetail('Create-Import task id', + ttc.text_content(image['os_glance_import_task'])) + time.sleep(1) + + self.assertEqual('active', image['status']) + + return image['id'] + + def _test_import_copy(self, warp_time=False): + self.start_server() + state = {} + + # Create and import an image with no pipeline stall + image_id = self._create_and_import(stores=['store1']) + + # Set up a fake data pipeline that will stall until we are ready + # to unblock it + def slow_fake_set_data(data_iter, backend=None, set_active=True): + while True: + state['running'] = True + time.sleep(0.1) + + # Constrain oslo timeutils time so we can manipulate it + tf = time_fixture.TimeFixture() + self.useFixture(tf) + + # Turn on the delayed data pipeline and start a copy-image + # import which will hang out for a while + with mock.patch('glance.domain.proxy.Image.set_data') as mock_sd: + mock_sd.side_effect = slow_fake_set_data + + resp = self._import_copy(image_id, ['store2']) + self.addDetail('First import response', + ttc.text_content(str(resp))) + self.assertEqual(202, resp.status_code) + + # Wait to make sure the data stream gets started + for i in range(0, 10): + if state: + break + time.sleep(0.1) + + # Make sure the first import got to the point where the + # hanging loop will hold it in processing state + self.assertTrue(state.get('running', False), + 'slow_fake_set_data() never ran') + + # If we're warping time, then advance the clock by two hours + if warp_time: + tf.advance_time_delta(datetime.timedelta(hours=2)) + + # Try a second copy-image import. If we are warping time, + # expect the lock to be busted. If not, then we should get + # a 409 Conflict. + resp = self._import_copy(image_id, ['store2']) + + self.addDetail('Second import response', + ttc.text_content(str(resp))) + if warp_time: + self.assertEqual(202, resp.status_code) + else: + self.assertEqual(409, resp.status_code) + + def test_import_copy_locked(self): + self._test_import_copy(warp_time=False) + + def test_import_copy_bust_lock(self): + self._test_import_copy(warp_time=True) diff --git a/glance/tests/unit/async_/flows/test_api_image_import.py b/glance/tests/unit/async_/flows/test_api_image_import.py index 74b70c4f46..e3eb97d396 100644 --- a/glance/tests/unit/async_/flows/test_api_image_import.py +++ b/glance/tests/unit/async_/flows/test_api_image_import.py @@ -63,6 +63,8 @@ class TestApiImageImportTask(test_utils.BaseTestCase): self.mock_task_repo = mock.MagicMock() self.mock_image_repo = mock.MagicMock() + self.mock_image_repo.get.return_value.extra_properties = { + 'os_glance_import_task': TASK_ID1} @mock.patch('glance.async_.flows.api_image_import._VerifyStaging.__init__') @mock.patch('taskflow.patterns.linear_flow.Flow.add') @@ -102,6 +104,61 @@ class TestApiImageImportTask(test_utils.BaseTestCase): import_req=self.gd_task_input['import_req']) +class TestImageLock(test_utils.BaseTestCase): + def setUp(self): + super(TestImageLock, self).setUp() + self.img_repo = mock.MagicMock() + + @mock.patch('glance.async_.flows.api_image_import.LOG') + def test_execute_confirms_lock(self, mock_log): + self.img_repo.get.return_value.extra_properties = { + 'os_glance_import_task': TASK_ID1} + wrapper = import_flow.ImportActionWrapper(self.img_repo, IMAGE_ID1, + TASK_ID1) + imagelock = import_flow._ImageLock(TASK_ID1, TASK_TYPE, wrapper) + imagelock.execute() + mock_log.debug.assert_called_once_with('Image %(image)s import task ' + '%(task)s lock confirmed', + {'image': IMAGE_ID1, + 'task': TASK_ID1}) + + @mock.patch('glance.async_.flows.api_image_import.LOG') + def test_execute_confirms_lock_not_held(self, mock_log): + wrapper = import_flow.ImportActionWrapper(self.img_repo, IMAGE_ID1, + TASK_ID1) + imagelock = import_flow._ImageLock(TASK_ID1, TASK_TYPE, wrapper) + self.assertRaises(exception.TaskAbortedError, + imagelock.execute) + + @mock.patch('glance.async_.flows.api_image_import.LOG') + def test_revert_drops_lock(self, mock_log): + wrapper = import_flow.ImportActionWrapper(self.img_repo, IMAGE_ID1, + TASK_ID1) + imagelock = import_flow._ImageLock(TASK_ID1, TASK_TYPE, wrapper) + with mock.patch.object(wrapper, 'drop_lock_for_task') as mock_drop: + imagelock.revert(None) + mock_drop.assert_called_once_with() + mock_log.debug.assert_called_once_with('Image %(image)s import task ' + '%(task)s dropped its lock ' + 'after failure', + {'image': IMAGE_ID1, + 'task': TASK_ID1}) + + @mock.patch('glance.async_.flows.api_image_import.LOG') + def test_revert_drops_lock_missing(self, mock_log): + wrapper = import_flow.ImportActionWrapper(self.img_repo, IMAGE_ID1, + TASK_ID1) + imagelock = import_flow._ImageLock(TASK_ID1, TASK_TYPE, wrapper) + with mock.patch.object(wrapper, 'drop_lock_for_task') as mock_drop: + mock_drop.side_effect = exception.NotFound() + imagelock.revert(None) + mock_log.warning.assert_called_once_with('Image %(image)s import task ' + '%(task)s lost its lock ' + 'during execution!', + {'image': IMAGE_ID1, + 'task': TASK_ID1}) + + class TestImportToStoreTask(test_utils.BaseTestCase): def setUp(self): @@ -137,7 +194,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase): img_repo = mock.MagicMock() img_repo.get.return_value = image task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", @@ -157,7 +215,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase): img_repo = mock.MagicMock() img_repo.get.return_value = image task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", @@ -177,7 +236,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase): img_repo = mock.MagicMock() img_repo.get.return_value = image task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", @@ -198,7 +258,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase): img_repo = mock.MagicMock() task_repo = mock.MagicMock() task_repo.get.return_value.status = 'processing' - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", @@ -250,7 +311,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase): def test_raises_when_image_deleted(self): img_repo = mock.MagicMock() task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", @@ -265,13 +327,15 @@ class TestImportToStoreTask(test_utils.BaseTestCase): def test_remove_store_from_property(self, mock_import): img_repo = mock.MagicMock() task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", "store1", True, True) - extra_properties = {"os_glance_importing_to_stores": "store1,store2"} + extra_properties = {"os_glance_importing_to_stores": "store1,store2", + "os_glance_import_task": TASK_ID1} image = self.img_factory.new_image(image_id=UUID1, extra_properties=extra_properties) img_repo.get.return_value = image @@ -282,13 +346,15 @@ class TestImportToStoreTask(test_utils.BaseTestCase): def test_revert_updates_status_keys(self): img_repo = mock.MagicMock() task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", "store1", True, True) - extra_properties = {"os_glance_importing_to_stores": "store1,store2"} + extra_properties = {"os_glance_importing_to_stores": "store1,store2", + "os_glance_import_task": TASK_ID1} image = self.img_factory.new_image(image_id=UUID1, extra_properties=extra_properties) img_repo.get.return_value = image @@ -313,13 +379,16 @@ class TestImportToStoreTask(test_utils.BaseTestCase): def test_raises_when_all_stores_must_succeed(self, mock_import): img_repo = mock.MagicMock() task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", "store1", True, True) - image = self.img_factory.new_image(image_id=UUID1) + extra_properties = {'os_glance_import_task': TASK_ID1} + image = self.img_factory.new_image(image_id=UUID1, + extra_properties=extra_properties) img_repo.get.return_value = image mock_import.set_image_data.side_effect = \ cursive_exception.SignatureVerificationError( @@ -331,13 +400,16 @@ class TestImportToStoreTask(test_utils.BaseTestCase): def test_doesnt_raise_when_not_all_stores_must_succeed(self, mock_import): img_repo = mock.MagicMock() task_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1, + TASK_ID1) image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE, task_repo, wrapper, "http://url", "store1", False, True) - image = self.img_factory.new_image(image_id=UUID1) + extra_properties = {'os_glance_import_task': TASK_ID1} + image = self.img_factory.new_image(image_id=UUID1, + extra_properties=extra_properties) img_repo.get.return_value = image mock_import.set_image_data.side_effect = \ cursive_exception.SignatureVerificationError( @@ -473,7 +545,7 @@ class TestImportCopyImageTask(test_utils.BaseTestCase): fake_img = mock.MagicMock() fake_img.id = IMAGE_ID1 fake_img.status = 'active' - fake_img.extra_properties = {} + fake_img.extra_properties = {'os_glance_import_task': TASK_ID1} admin_repo.get.return_value = fake_img import_flow.get_flow(task_id=TASK_ID1, @@ -494,10 +566,13 @@ class TestImportCopyImageTask(test_utils.BaseTestCase): class TestVerifyImageStateTask(test_utils.BaseTestCase): def test_verify_active_status(self): - fake_img = mock.MagicMock(status='active') + fake_img = mock.MagicMock(status='active', + extra_properties={ + 'os_glance_import_task': TASK_ID1}) mock_repo = mock.MagicMock() mock_repo.get.return_value = fake_img - wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1) + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) task = import_flow._VerifyImageState(TASK_ID1, TASK_TYPE, wrapper, 'anything!') @@ -531,7 +606,10 @@ class TestVerifyImageStateTask(test_utils.BaseTestCase): class TestImportActionWrapper(test_utils.BaseTestCase): def test_wrapper_success(self): mock_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1) + mock_repo.get.return_value.extra_properties = { + 'os_glance_import_task': TASK_ID1} + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) with wrapper as action: self.assertIsInstance(action, import_flow._ImportActions) mock_repo.get.assert_called_once_with(IMAGE_ID1) @@ -541,7 +619,10 @@ class TestImportActionWrapper(test_utils.BaseTestCase): def test_wrapper_failure(self): mock_repo = mock.MagicMock() - wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1) + mock_repo.get.return_value.extra_properties = { + 'os_glance_import_task': TASK_ID1} + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) class SpecificError(Exception): pass @@ -561,7 +642,9 @@ class TestImportActionWrapper(test_utils.BaseTestCase): def test_wrapper_logs_status(self, mock_log): mock_repo = mock.MagicMock() mock_image = mock_repo.get.return_value - wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1) + mock_image.extra_properties = {'os_glance_import_task': TASK_ID1} + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) mock_image.status = 'foo' with wrapper as action: @@ -575,6 +658,61 @@ class TestImportActionWrapper(test_utils.BaseTestCase): 'new_status': 'bar'}) self.assertEqual('bar', mock_image.status) + def test_image_id_property(self): + mock_repo = mock.MagicMock() + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) + self.assertEqual(IMAGE_ID1, wrapper.image_id) + + def test_drop_lock_for_task(self): + mock_repo = mock.MagicMock() + mock_repo.get.return_value.extra_properties = { + 'os_glance_import_task': TASK_ID1} + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) + wrapper.drop_lock_for_task() + mock_repo.delete_property_atomic.assert_called_once_with( + mock_repo.get.return_value, 'os_glance_import_task', TASK_ID1) + + def test_assert_task_lock(self): + mock_repo = mock.MagicMock() + mock_repo.get.return_value.extra_properties = { + 'os_glance_import_task': TASK_ID1} + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) + wrapper.assert_task_lock() + + # Try again with a different task ID and it should fail + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + 'foo') + self.assertRaises(exception.TaskAbortedError, + wrapper.assert_task_lock) + + def _grab_image(self, wrapper): + with wrapper: + pass + + @mock.patch.object(import_flow, 'LOG') + def test_check_task_lock(self, mock_log): + mock_repo = mock.MagicMock() + wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1, + TASK_ID1) + image = mock.MagicMock(image_id=IMAGE_ID1) + image.extra_properties = {'os_glance_import_task': TASK_ID1} + mock_repo.get.return_value = image + self._grab_image(wrapper) + mock_log.error.assert_not_called() + + image.extra_properties['os_glance_import_task'] = 'somethingelse' + self.assertRaises(exception.TaskAbortedError, + self._grab_image, wrapper) + mock_log.error.assert_called_once_with( + 'Image %(image)s import task %(task)s attempted to take action on ' + 'image, but other task %(other)s holds the lock; Aborting.', + {'image': image.image_id, + 'task': TASK_ID1, + 'other': 'somethingelse'}) + class TestImportActions(test_utils.BaseTestCase): def setUp(self): @@ -767,30 +905,48 @@ class TestCompleteTask(test_utils.BaseTestCase): super(TestCompleteTask, self).setUp() self.task_repo = mock.MagicMock() self.task = mock.MagicMock() + self.wrapper = mock.MagicMock(image_id=IMAGE_ID1) def test_execute(self, mock_get_task): complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE, - self.task_repo, IMAGE_ID1) + self.task_repo, self.wrapper) mock_get_task.return_value = self.task complete.execute() mock_get_task.assert_called_once_with(self.task_repo, TASK_ID1) self.task.succeed.assert_called_once_with({'image_id': IMAGE_ID1}) self.task_repo.save.assert_called_once_with(self.task) + self.wrapper.drop_lock_for_task.assert_called_once_with() def test_execute_no_task(self, mock_get_task): mock_get_task.return_value = None complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE, - self.task_repo, IMAGE_ID1) + self.task_repo, self.wrapper) complete.execute() self.task_repo.save.assert_not_called() + self.wrapper.drop_lock_for_task.assert_called_once_with() def test_execute_succeed_fails(self, mock_get_task): mock_get_task.return_value = self.task self.task.succeed.side_effect = Exception('testing') complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE, - self.task_repo, IMAGE_ID1) + self.task_repo, self.wrapper) complete.execute() self.task.fail.assert_called_once_with( _('Error: : testing')) self.task_repo.save.assert_called_once_with(self.task) + self.wrapper.drop_lock_for_task.assert_called_once_with() + + def test_execute_drop_lock_fails(self, mock_get_task): + mock_get_task.return_value = self.task + self.wrapper.drop_lock_for_task.side_effect = exception.NotFound() + complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE, + self.task_repo, self.wrapper) + with mock.patch('glance.async_.flows.api_image_import.LOG') as m_log: + complete.execute() + m_log.error.assert_called_once_with('Image %(image)s import task ' + '%(task)s did not hold the ' + 'lock upon completion!', + {'image': IMAGE_ID1, + 'task': TASK_ID1}) + self.task.succeed.assert_called_once_with({'image_id': IMAGE_ID1}) diff --git a/glance/tests/unit/async_/test_async.py b/glance/tests/unit/async_/test_async.py index 2ec190e8eb..1574a65ce2 100644 --- a/glance/tests/unit/async_/test_async.py +++ b/glance/tests/unit/async_/test_async.py @@ -88,13 +88,15 @@ class TestImportTaskFlow(test_utils.BaseTestCase): def _get_flow(self, import_req=None): inputs = { - 'task_id': mock.MagicMock(), + 'task_id': mock.sentinel.task_id, 'task_type': mock.MagicMock(), 'task_repo': mock.MagicMock(), 'image_repo': mock.MagicMock(), 'image_id': mock.MagicMock(), 'import_req': import_req or mock.MagicMock() } + inputs['image_repo'].get.return_value = mock.MagicMock( + extra_properties={'os_glance_import_task': mock.sentinel.task_id}) flow = api_image_import.get_flow(**inputs) return flow @@ -116,8 +118,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase): flow = self._get_flow() flow_comp = self._get_flow_tasks(flow) - # assert flow has 5 tasks - self.assertEqual(5, len(flow_comp)) + # assert flow has 6 tasks + self.assertEqual(6, len(flow_comp)) for c in self.base_flow: self.assertIn(c, flow_comp) @@ -135,8 +137,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase): flow = self._get_flow(import_req=import_req) flow_comp = self._get_flow_tasks(flow) - # assert flow has 6 tasks - self.assertEqual(6, len(flow_comp)) + # assert flow has 7 tasks + self.assertEqual(7, len(flow_comp)) for c in self.base_flow: self.assertIn(c, flow_comp) self.assertIn('WebDownload', flow_comp) @@ -157,8 +159,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase): flow = self._get_flow(import_req=import_req) flow_comp = self._get_flow_tasks(flow) - # assert flow has 6 tasks - self.assertEqual(6, len(flow_comp)) + # assert flow has 7 tasks + self.assertEqual(7, len(flow_comp)) for c in self.base_flow: self.assertIn(c, flow_comp) self.assertIn('CopyImage', flow_comp) @@ -174,8 +176,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase): flow = self._get_flow() flow_comp = self._get_flow_tasks(flow) - # assert flow has 8 tasks (base_flow + plugins) - self.assertEqual(8, len(flow_comp)) + # assert flow has 9 tasks (base_flow + plugins) + self.assertEqual(9, len(flow_comp)) for c in self.base_flow: self.assertIn(c, flow_comp) for c in self.import_plugins: @@ -202,8 +204,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase): flow = self._get_flow(import_req=import_req) flow_comp = self._get_flow_tasks(flow) - # assert flow has 6 tasks - self.assertEqual(6, len(flow_comp)) + # assert flow has 7 tasks + self.assertEqual(7, len(flow_comp)) for c in self.base_flow: self.assertIn(c, flow_comp) self.assertIn('CopyImage', flow_comp) diff --git a/glance/tests/unit/v2/test_images_resource.py b/glance/tests/unit/v2/test_images_resource.py index 5d664ceb60..c4869a579a 100644 --- a/glance/tests/unit/v2/test_images_resource.py +++ b/glance/tests/unit/v2/test_images_resource.py @@ -23,6 +23,7 @@ from castellan.common import exception as castellan_exception import glance_store as store from oslo_config import cfg from oslo_serialization import jsonutils +from oslo_utils import fixture import six from six.moves import http_client as http # NOTE(jokke): simplified transition to py3, behaves like py2 xrange @@ -39,6 +40,7 @@ import glance.schema from glance.tests.unit import base from glance.tests.unit.keymgr import fake as fake_keymgr import glance.tests.unit.utils as unit_test_utils +from glance.tests.unit.v2 import test_tasks_resource import glance.tests.utils as test_utils CONF = cfg.CONF @@ -745,8 +747,9 @@ class TestImagesController(base.IsolatedUnitTest): self.controller.import_image, request, UUID4, {'method': {'name': 'glance-direct'}}) + @mock.patch('glance.db.simple.api.image_set_property_atomic') @mock.patch('glance.api.common.get_thread_pool') - def test_image_import_raises_bad_request(self, mock_gpt): + def test_image_import_raises_bad_request(self, mock_gpt, mock_spa): request = unit_test_utils.get_fake_request() with mock.patch.object( glance.api.authorization.ImageRepoProxy, 'get') as mock_get: @@ -2955,19 +2958,25 @@ class TestImagesController(base.IsolatedUnitTest): pos = self.controller._get_locations_op_pos('1', None, True) self.assertIsNone(pos) + @mock.patch('glance.db.simple.api.image_set_property_atomic') @mock.patch.object(glance.api.authorization.TaskFactoryProxy, 'new_task') @mock.patch.object(glance.domain.TaskExecutorFactory, 'new_task_executor') @mock.patch('glance.api.common.get_thread_pool') - def test_image_import(self, mock_gtp, mock_nte, mock_nt): + def test_image_import(self, mock_gtp, mock_nte, mock_nt, mock_spa): request = unit_test_utils.get_fake_request() + image = FakeImage(status='uploading') with mock.patch.object( glance.api.authorization.ImageRepoProxy, 'get') as mock_get: - mock_get.return_value = FakeImage(status='uploading') + mock_get.return_value = image output = self.controller.import_image( request, UUID4, {'method': {'name': 'glance-direct'}}) self.assertEqual(UUID4, output) + # Make sure we set the lock on the image + mock_spa.assert_called_once_with(UUID4, 'os_glance_import_task', + mock_nt.return_value.task_id) + # Make sure we grabbed a thread pool, and that we asked it # to spawn the task's run method with it. mock_gtp.assert_called_once_with('tasks_pool') @@ -2989,12 +2998,14 @@ class TestImagesController(base.IsolatedUnitTest): # a task mock_new_task.assert_not_called() + @mock.patch('glance.db.simple.api.image_set_property_atomic') @mock.patch('glance.context.RequestContext.elevated') @mock.patch.object(glance.domain.TaskFactory, 'new_task') @mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get') def test_image_import_copy_allowed_by_policy(self, mock_get, mock_new_task, mock_elevated, + mock_spa, allowed=True): # NOTE(danms): FakeImage is owned by utils.TENANT1. Try to do the # import as TENANT2, but with a policy exception @@ -3031,6 +3042,150 @@ class TestImagesController(base.IsolatedUnitTest): self.test_image_import_copy_allowed_by_policy, allowed=False) + @mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get') + def test_image_import_locked(self, mock_get): + task = test_tasks_resource._db_fixture(test_tasks_resource.UUID1, + status='pending') + self.db.task_create(None, task) + image = FakeImage(status='uploading') + # Image is locked with a valid task that has not aged out, so + # the lock will not be busted. + image.extra_properties['os_glance_import_task'] = task['id'] + mock_get.return_value = image + + request = unit_test_utils.get_fake_request(tenant=TENANT1) + req_body = {'method': {'name': 'glance-direct'}} + + exc = self.assertRaises(webob.exc.HTTPConflict, + self.controller.import_image, + request, UUID1, req_body) + self.assertEqual('Image has active task', str(exc)) + + @mock.patch('glance.db.simple.api.image_set_property_atomic') + @mock.patch('glance.db.simple.api.image_delete_property_atomic') + @mock.patch.object(glance.api.authorization.TaskFactoryProxy, 'new_task') + @mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get') + def test_image_import_locked_by_reaped_task(self, mock_get, mock_nt, + mock_dpi, mock_spi): + image = FakeImage(status='uploading') + # Image is locked by some other task that TaskRepo will not find + image.extra_properties['os_glance_import_task'] = 'missing' + mock_get.return_value = image + + request = unit_test_utils.get_fake_request(tenant=TENANT1) + req_body = {'method': {'name': 'glance-direct'}} + + mock_nt.return_value.task_id = 'mytask' + self.controller.import_image(request, UUID1, req_body) + + # We should have atomically deleted the missing task lock + mock_dpi.assert_called_once_with(image.id, 'os_glance_import_task', + 'missing') + # We should have atomically grabbed the lock with our task id + mock_spi.assert_called_once_with(image.id, 'os_glance_import_task', + 'mytask') + + @mock.patch('glance.db.simple.api.image_set_property_atomic') + @mock.patch('glance.db.simple.api.image_delete_property_atomic') + @mock.patch.object(glance.api.authorization.TaskFactoryProxy, 'new_task') + @mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get') + def test_image_import_locked_by_bustable_task(self, mock_get, mock_nt, + mock_dpi, mock_spi, + task_status='processing'): + task = test_tasks_resource._db_fixture( + test_tasks_resource.UUID1, + status=task_status) + self.db.task_create(None, task) + image = FakeImage(status='uploading') + # Image is locked by a task in 'processing' state + image.extra_properties['os_glance_import_task'] = task['id'] + mock_get.return_value = image + + request = unit_test_utils.get_fake_request(tenant=TENANT1) + req_body = {'method': {'name': 'glance-direct'}} + + # Task has only been running for ten minutes + time_fixture = fixture.TimeFixture(task['updated_at'] + + datetime.timedelta(minutes=10)) + self.useFixture(time_fixture) + + mock_nt.return_value.task_id = 'mytask' + + # Task holds the lock, API refuses to bust it + self.assertRaises(webob.exc.HTTPConflict, + self.controller.import_image, + request, UUID1, req_body) + mock_dpi.assert_not_called() + mock_spi.assert_not_called() + mock_nt.assert_not_called() + + # Fast forward to 90 minutes from now + time_fixture.advance_time_delta(datetime.timedelta(minutes=90)) + self.controller.import_image(request, UUID1, req_body) + + # API deleted the other task's lock and locked it for us + mock_dpi.assert_called_once_with(image.id, 'os_glance_import_task', + task['id']) + mock_spi.assert_called_once_with(image.id, 'os_glance_import_task', + 'mytask') + + def test_image_import_locked_by_bustable_terminal_task_failure(self): + # Make sure we don't fail with a task status transition error + self.test_image_import_locked_by_bustable_task(task_status='failure') + + def test_image_import_locked_by_bustable_terminal_task_success(self): + # Make sure we don't fail with a task status transition error + self.test_image_import_locked_by_bustable_task(task_status='success') + + def test_bust_import_lock_race_to_delete(self): + image_repo = mock.MagicMock() + task_repo = mock.MagicMock() + image = mock.MagicMock() + task = mock.MagicMock(id='foo') + # Simulate a race where we tried to bust a specific lock and + # someone else already had, and/or re-locked it + image_repo.delete_property_atomic.side_effect = exception.NotFound + self.assertRaises(exception.Conflict, + self.controller._bust_import_lock, + image_repo, task_repo, + image, task, task.id) + + def test_enforce_lock_log_not_bustable(self, task_status='processing'): + task = test_tasks_resource._db_fixture( + test_tasks_resource.UUID1, + status=task_status) + self.db.task_create(None, task) + request = unit_test_utils.get_fake_request(tenant=TENANT1) + image = FakeImage() + image.extra_properties['os_glance_import_task'] = task['id'] + + # Freeze time to make this repeatable + time_fixture = fixture.TimeFixture(task['updated_at'] + + datetime.timedelta(minutes=55)) + self.useFixture(time_fixture) + + expected_expire = 300 + if task_status == 'pending': + # NOTE(danms): Tasks in 'pending' get double the expiry time, + # so we'd be expecting an extra hour here. + expected_expire += 3600 + + with mock.patch.object(glance.api.v2.images, 'LOG') as mock_log: + self.assertRaises(exception.Conflict, + self.controller._enforce_import_lock, + request, image) + mock_log.warning.assert_called_once_with( + 'Image %(image)s has active import task %(task)s in ' + 'status %(status)s; lock remains valid for %(expire)i ' + 'more seconds', + {'image': image.id, + 'task': task['id'], + 'status': task_status, + 'expire': expected_expire}) + + def test_enforce_lock_pending_takes_longer(self): + self.test_enforce_lock_log_not_bustable(task_status='pending') + def test_delete_encryption_key_no_encryption_key(self): request = unit_test_utils.get_fake_request() fake_encryption_key = self.controller._key_manager.store(