Implement time-limited import locking

This attempts to provide a time-based import lock that is dependent
on the task actually making progress. While the task is copying
data, the task message is updated, which in turn touches the task
updated_at time. The API will break any lock after 30 minutes of
no activity on a stalled or dead task. The import taskflow will
check to see if it has lost the lock at any point, and/or if its
task status has changed and abort if so.

The logic in more detail:

1. API locks the image by task-id before we start the task thread, but
   before we return
2. Import thread will check the task-id lock on the image every time it
   tries to modify the image, and if it has changed, will abort
3. The data pipeline will heartbeat the task every minute by updating
   the task.message (bonus: we get some status)
4. If the data pipeline heartbeat ever finds the task state to be changed
   from the expected 'processing' it will abort
5. On task revert or completion, we drop the task-id lock from the image
6. If something ever gets stuck or dies, the heartbeating will stop
7. If the API gets a request for an import where the lock is held, it
   will grab the task by id (in the lock) and check the state and age.
   If the age is sufficiently old (no heartbeating) and the state is
   either 'processing' or terminal, it will mark the task as failed,
   steal the lock, and proceed.

Lots of logging throughout any time we encounter unexpected situations.

Conflicts:
 - Changes due to policy check being missing from ussuri

Closes-Bug: #1884596
Change-Id: Icb3c1d27e9a514d96fca7c1d824fd2183f69d8b3
(cherry picked from commit 3f6e349d08)
This commit is contained in:
Dan Smith 2020-07-28 09:02:13 -07:00
parent 5998933acf
commit 055e5e7902
6 changed files with 824 additions and 63 deletions

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import hashlib
import os
import re
@ -25,6 +26,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils as json
from oslo_utils import encodeutils
from oslo_utils import timeutils as oslo_timeutils
import six
from six.moves import http_client as http
import six.moves.urllib.parse as urlparse
@ -98,6 +100,90 @@ class ImagesController(object):
return image
def _bust_import_lock(self, admin_image_repo, admin_task_repo,
image, task, task_id):
if task:
# FIXME(danms): It would be good if we had a 'canceled' or
# 'aborted' status here.
try:
task.fail('Expired lock preempted')
admin_task_repo.save(task)
except exception.InvalidTaskStatusTransition:
# NOTE(danms): This may happen if we try to fail a
# task that is in a terminal state, but where the lock
# was never dropped from the image. We will log the
# image, task, and status below so we can just ignore
# here.
pass
try:
admin_image_repo.delete_property_atomic(
image, 'os_glance_import_task', task_id)
except exception.NotFound:
LOG.warning('Image %(image)s has stale import task %(task)s '
'but we lost the race to remove it.',
{'image': image.image_id,
'task': task_id})
# We probably lost the race to expire the old lock, but
# act like it is not yet expired to avoid a retry loop.
raise exception.Conflict('Image has active task')
LOG.warning('Image %(image)s has stale import task %(task)s '
'in status %(status)s from %(owner)s; removed lock '
'because it had expired.',
{'image': image.image_id,
'task': task_id,
'status': task and task.status or 'missing',
'owner': task and task.owner or 'unknown owner'})
def _enforce_import_lock(self, req, image):
admin_context = req.context.elevated()
admin_image_repo = self.gateway.get_repo(admin_context)
admin_task_repo = self.gateway.get_task_repo(admin_context)
other_task = image.extra_properties['os_glance_import_task']
expiry = datetime.timedelta(minutes=60)
bustable_states = ('pending', 'processing', 'success', 'failure')
try:
task = admin_task_repo.get(other_task)
except exception.NotFound:
# NOTE(danms): This could happen if we failed to do an import
# a long time ago, and the task record has since been culled from
# the database, but the task id is still in the lock field.
LOG.warning('Image %(image)s has non-existent import '
'task %(task)s; considering it stale',
{'image': image.image_id,
'task': other_task})
task = None
age = 0
else:
age = oslo_timeutils.utcnow() - task.updated_at
if task.status == 'pending':
# NOTE(danms): Tasks in pending state could be queued,
# blocked or otherwise right-about-to-get-going, so we
# double the expiry time for safety. We will report
# time remaining below, so this is not too obscure.
expiry *= 2
if not task or (task.status in bustable_states and age >= expiry):
self._bust_import_lock(admin_image_repo, admin_task_repo,
image, task, other_task)
return
if task.status in bustable_states:
LOG.warning('Image %(image)s has active import task %(task)s in '
'status %(status)s; lock remains valid for %(expire)i '
'more seconds',
{'image': image.image_id,
'task': task.task_id,
'status': task.status,
'expire': (expiry - age).total_seconds()})
else:
LOG.debug('Image %(image)s has import task %(task)s in status '
'%(status)s and does not qualify for expiry.')
raise exception.Conflict('Image has active task')
@utils.mutating
def import_image(self, req, image_id, body):
image_repo = self.gateway.get_repo(req.context)
@ -136,6 +222,11 @@ class ImagesController(object):
raise webob.exc.HTTPForbidden(
explanation=_("Operation not permitted"))
if 'os_glance_import_task' in image.extra_properties:
# NOTE(danms): This will raise exception.Conflict if the
# lock is present and valid, or return if absent or invalid.
self._enforce_import_lock(req, image)
stores = [None]
if CONF.enabled_backends:
try:
@ -195,6 +286,17 @@ class ImagesController(object):
import_task = task_factory.new_task(task_type='api_image_import',
owner=req.context.owner,
task_input=task_input)
# NOTE(danms): Try to grab the lock for this task
try:
image_repo.set_property_atomic(image,
'os_glance_import_task',
import_task.task_id)
except exception.Duplicate:
msg = (_("New operation on image '%s' is not permitted as "
"prior operation is still in progress") % image_id)
raise exception.Conflict(msg)
task_repo.add(import_task)
task_executor = executor_factory.new_task_executor(req.context)
pool = common.get_thread_pool("tasks_eventlet_pool")
@ -715,7 +817,8 @@ class RequestDeserializer(wsgi.JSONRequestDeserializer):
'size', 'virtual_size', 'direct_url', 'self',
'file', 'schema', 'id', 'os_hash_algo',
'os_hash_value')
_reserved_properties = ('location', 'deleted', 'deleted_at')
_reserved_properties = ('location', 'deleted', 'deleted_at',
'os_glance_import_task')
_base_properties = ('checksum', 'created_at', 'container_format',
'disk_format', 'id', 'min_disk', 'min_ram', 'name',
'size', 'virtual_size', 'status', 'tags', 'owner',

View File

@ -101,13 +101,16 @@ class ImportActionWrapper(object):
:param image-id: The ID of the image we should be altering
"""
def __init__(self, image_repo, image_id):
def __init__(self, image_repo, image_id, task_id):
self._image_repo = image_repo
self._image_id = image_id
self._task_id = task_id
def __enter__(self):
self._image = self._image_repo.get(self._image_id)
self._image_previous_status = self._image.status
self._assert_task_lock(self._image)
return _ImportActions(self._image)
def __exit__(self, type, value, traceback):
@ -123,6 +126,43 @@ class ImportActionWrapper(object):
'new_status': self._image.status})
self._image_repo.save(self._image, self._image_previous_status)
@property
def image_id(self):
return self._image_id
def drop_lock_for_task(self):
"""Delete the import lock for our task.
This is an atomic operation and thus does not require a context
for the image save. Note that after calling this method, no
further actions will be allowed on the image.
:raises: NotFound if the image was not locked by the expected task.
"""
image = self._image_repo.get(self._image_id)
self._image_repo.delete_property_atomic(image,
'os_glance_import_task',
self._task_id)
def _assert_task_lock(self, image):
task_lock = image.extra_properties.get('os_glance_import_task')
if task_lock != self._task_id:
LOG.error('Image %(image)s import task %(task)s attempted to '
'take action on image, but other task %(other)s holds '
'the lock; Aborting.',
{'image': self._image_id,
'task': self._task_id,
'other': task_lock})
raise exception.TaskAbortedError()
def assert_task_lock(self):
"""Assert that we own the task lock on the image.
:raises: TaskAbortedError if we do not
"""
image = self._image_repo.get(self._image_id)
self._assert_task_lock(image)
class _ImportActions(object):
"""Actions available for being performed on an image during import.
@ -319,6 +359,41 @@ class _DeleteFromFS(task.Task):
'fn': file_path})
class _ImageLock(task.Task):
def __init__(self, task_id, task_type, action_wrapper):
self.task_id = task_id
self.task_type = task_type
self.action_wrapper = action_wrapper
super(_ImageLock, self).__init__(
name='%s-ImageLock-%s' % (task_type, task_id))
def execute(self):
self.action_wrapper.assert_task_lock()
LOG.debug('Image %(image)s import task %(task)s lock confirmed',
{'image': self.action_wrapper.image_id,
'task': self.task_id})
def revert(self, result, **kwargs):
"""Drop our claim on the image.
If we have failed, we need to drop our import_task lock on the image
so that something else can have a try. Note that we may have been
preempted so we should only drop *our* lock.
"""
try:
self.action_wrapper.drop_lock_for_task()
except exception.NotFound:
LOG.warning('Image %(image)s import task %(task)s lost its '
'lock during execution!',
{'image': self.action_wrapper.image_id,
'task': self.task_id})
else:
LOG.debug('Image %(image)s import task %(task)s dropped '
'its lock after failure',
{'image': self.action_wrapper.image_id,
'task': self.task_id})
class _VerifyStaging(task.Task):
# NOTE(jokke): This could be also for example "staging_path" but to
@ -548,24 +623,17 @@ class _VerifyImageState(task.Task):
class _CompleteTask(task.Task):
def __init__(self, task_id, task_type, task_repo, image_id):
def __init__(self, task_id, task_type, task_repo, action_wrapper):
self.task_id = task_id
self.task_type = task_type
self.task_repo = task_repo
self.image_id = image_id
self.action_wrapper = action_wrapper
super(_CompleteTask, self).__init__(
name='%s-CompleteTask-%s' % (task_type, task_id))
def execute(self):
"""Finishing the task flow
:param image_id: Glance Image ID
"""
task = script_utils.get_task(self.task_repo, self.task_id)
if task is None:
return
def _finish_task(self, task):
try:
task.succeed({'image_id': self.image_id})
task.succeed({'image_id': self.action_wrapper.image_id})
except Exception as e:
# Note: The message string contains Error in it to indicate
# in the task.message that it's a error message for the user.
@ -584,6 +652,28 @@ class _CompleteTask(task.Task):
finally:
self.task_repo.save(task)
def _drop_lock(self):
try:
self.action_wrapper.drop_lock_for_task()
except exception.NotFound:
# NOTE(danms): This would be really bad, but there is probably
# not much point in reverting all the way back if we got this
# far. Log the carnage for forensics.
LOG.error('Image %(image)s import task %(task)s did not hold the '
'lock upon completion!',
{'image': self.action_wrapper.image_id,
'task': self.task_id})
def execute(self):
"""Finishing the task flow
:param image_id: Glance Image ID
"""
task = script_utils.get_task(self.task_repo, self.task_id)
if task is not None:
self._finish_task(task)
self._drop_lock()
LOG.info(_LI("%(task_id)s of %(task_type)s completed"),
{'task_id': self.task_id, 'task_type': self.task_type})
@ -616,7 +706,8 @@ def get_flow(**kwargs):
# Instantiate an action wrapper with the admin repo if we got one,
# otherwise with the regular repo.
action_wrapper = ImportActionWrapper(admin_repo or image_repo, image_id)
action_wrapper = ImportActionWrapper(admin_repo or image_repo, image_id,
task_id)
if not uri and import_method in ['glance-direct', 'copy-image']:
if CONF.enabled_backends:
@ -627,6 +718,8 @@ def get_flow(**kwargs):
flow = lf.Flow(task_type, retry=retry.AlwaysRevert())
flow.add(_ImageLock(task_id, task_type, action_wrapper))
if import_method in ['web-download', 'copy-image']:
internal_plugin = internal_plugins.get_import_plugin(**kwargs)
flow.add(internal_plugin)
@ -678,7 +771,7 @@ def get_flow(**kwargs):
complete_task = _CompleteTask(task_id,
task_type,
task_repo,
image_id)
action_wrapper)
flow.add(complete_task)
with action_wrapper as action:

View File

@ -0,0 +1,268 @@
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import os
from testtools import content as ttc
import textwrap
import time
from unittest import mock
import uuid
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import fixture as time_fixture
import webob
from glance.common import config
from glance.common import wsgi
import glance.db.sqlalchemy.api
from glance.tests import utils as test_utils
import glance_store
LOG = logging.getLogger(__name__)
TENANT1 = str(uuid.uuid4())
CONF = cfg.CONF
class SynchronousAPIBase(test_utils.BaseTestCase):
"""A test base class that provides synchronous calling into the API
without starting a separate server, and with a simple paste
pipeline. Configured with multi-store and a real database.
"""
@mock.patch('oslo_db.sqlalchemy.enginefacade.writer.get_engine')
def setup_database(self, mock_get_engine):
db_file = 'sqlite:///%s/test-%s.db' % (self.test_dir,
uuid.uuid4())
self.config(connection=db_file, group='database')
# NOTE(danms): Make sure that we clear the current global
# database configuration, provision a temporary database file,
# and run migrations with our configuration to define the
# schema there.
glance.db.sqlalchemy.api.clear_db_env()
engine = glance.db.sqlalchemy.api.get_engine()
mock_get_engine.return_value = engine
with mock.patch('logging.config'):
# NOTE(danms): The alembic config in the env module will break our
# BaseTestCase logging setup. So mock that out to prevent it while
# we db_sync.
test_utils.db_sync(engine=engine)
def setup_simple_paste(self):
self.paste_config = os.path.join(self.test_dir, 'glance-api-paste.ini')
with open(self.paste_config, 'w') as f:
f.write(textwrap.dedent("""
[filter:context]
paste.filter_factory = glance.api.middleware.context:\
ContextMiddleware.factory
[filter:fakeauth]
paste.filter_factory = glance.tests.utils:\
FakeAuthMiddleware.factory
[pipeline:glance-api]
pipeline = context rootapp
[composite:rootapp]
paste.composite_factory = glance.api:root_app_factory
/v2: apiv2app
[app:apiv2app]
paste.app_factory = glance.api.v2.router:API.factory
"""))
def _store_dir(self, store):
return os.path.join(self.test_dir, store)
def setup_stores(self):
self.config(enabled_backends={'store1': 'file', 'store2': 'file'})
glance_store.register_store_opts(CONF,
reserved_stores=wsgi.RESERVED_STORES)
self.config(default_backend='store1',
group='glance_store')
self.config(filesystem_store_datadir=self._store_dir('store1'),
group='store1')
self.config(filesystem_store_datadir=self._store_dir('store2'),
group='store2')
self.config(filesystem_store_datadir=self._store_dir('staging'),
group='os_glance_staging_store')
glance_store.create_multi_stores(CONF,
reserved_stores=wsgi.RESERVED_STORES)
glance_store.verify_store()
def setUp(self):
super(SynchronousAPIBase, self).setUp()
self.setup_database()
self.setup_simple_paste()
self.setup_stores()
def start_server(self):
config.set_config_defaults()
self.api = config.load_paste_app('glance-api',
conf_file=self.paste_config)
def _headers(self, custom_headers=None):
base_headers = {
'X-Identity-Status': 'Confirmed',
'X-Auth-Token': '932c5c84-02ac-4fe5-a9ba-620af0e2bb96',
'X-User-Id': 'f9a41d13-0c13-47e9-bee2-ce4e8bfe958e',
'X-Tenant-Id': TENANT1,
'Content-Type': 'application/json',
'X-Roles': 'admin',
}
base_headers.update(custom_headers or {})
return base_headers
def api_get(self, url, headers=None):
headers = self._headers(headers)
req = webob.Request.blank(url, method='GET',
headers=headers)
return self.api(req)
def api_post(self, url, data=None, json=None, headers=None):
headers = self._headers(headers)
req = webob.Request.blank(url, method='POST',
headers=headers)
if json and not data:
data = jsonutils.dumps(json).encode()
headers['Content-Type'] = 'application/json'
if data:
req.body = data
LOG.debug(req.as_bytes())
return self.api(req)
def api_put(self, url, data=None, json=None, headers=None):
headers = self._headers(headers)
req = webob.Request.blank(url, method='PUT',
headers=headers)
if json and not data:
data = jsonutils.dumps(json).encode()
if data:
req.body = data
return self.api(req)
class TestImageImportLocking(SynchronousAPIBase):
def _import_copy(self, image_id, stores):
"""Do an import of image_id to the given stores."""
body = {'method': {'name': 'copy-image'},
'stores': stores,
'all_stores': False}
return self.api_post(
'/v2/images/%s/import' % image_id,
json=body)
def _create_and_import(self, stores=[]):
"""Create an image, stage data, and import into the given stores.
:returns: image_id
"""
resp = self.api_post('/v2/images',
json={'name': 'foo',
'container_format': 'bare',
'disk_format': 'raw'})
image = jsonutils.loads(resp.text)
resp = self.api_put(
'/v2/images/%s/stage' % image['id'],
headers={'Content-Type': 'application/octet-stream'},
data=b'IMAGEDATA')
self.assertEqual(204, resp.status_code)
body = {'method': {'name': 'glance-direct'}}
if stores:
body['stores'] = stores
resp = self.api_post(
'/v2/images/%s/import' % image['id'],
json=body)
self.assertEqual(202, resp.status_code)
# Make sure it goes active
for i in range(0, 10):
image = self.api_get('/v2/images/%s' % image['id']).json
if not image.get('os_glance_import_task'):
break
self.addDetail('Create-Import task id',
ttc.text_content(image['os_glance_import_task']))
time.sleep(1)
self.assertEqual('active', image['status'])
return image['id']
def _test_import_copy(self, warp_time=False):
self.start_server()
state = {}
# Create and import an image with no pipeline stall
image_id = self._create_and_import(stores=['store1'])
# Set up a fake data pipeline that will stall until we are ready
# to unblock it
def slow_fake_set_data(data_iter, backend=None, set_active=True):
while True:
state['running'] = True
time.sleep(0.1)
# Constrain oslo timeutils time so we can manipulate it
tf = time_fixture.TimeFixture()
self.useFixture(tf)
# Turn on the delayed data pipeline and start a copy-image
# import which will hang out for a while
with mock.patch('glance.domain.proxy.Image.set_data') as mock_sd:
mock_sd.side_effect = slow_fake_set_data
resp = self._import_copy(image_id, ['store2'])
self.addDetail('First import response',
ttc.text_content(str(resp)))
self.assertEqual(202, resp.status_code)
# Wait to make sure the data stream gets started
for i in range(0, 10):
if state:
break
time.sleep(0.1)
# Make sure the first import got to the point where the
# hanging loop will hold it in processing state
self.assertTrue(state.get('running', False),
'slow_fake_set_data() never ran')
# If we're warping time, then advance the clock by two hours
if warp_time:
tf.advance_time_delta(datetime.timedelta(hours=2))
# Try a second copy-image import. If we are warping time,
# expect the lock to be busted. If not, then we should get
# a 409 Conflict.
resp = self._import_copy(image_id, ['store2'])
self.addDetail('Second import response',
ttc.text_content(str(resp)))
if warp_time:
self.assertEqual(202, resp.status_code)
else:
self.assertEqual(409, resp.status_code)
def test_import_copy_locked(self):
self._test_import_copy(warp_time=False)
def test_import_copy_bust_lock(self):
self._test_import_copy(warp_time=True)

View File

@ -63,6 +63,8 @@ class TestApiImageImportTask(test_utils.BaseTestCase):
self.mock_task_repo = mock.MagicMock()
self.mock_image_repo = mock.MagicMock()
self.mock_image_repo.get.return_value.extra_properties = {
'os_glance_import_task': TASK_ID1}
@mock.patch('glance.async_.flows.api_image_import._VerifyStaging.__init__')
@mock.patch('taskflow.patterns.linear_flow.Flow.add')
@ -102,6 +104,61 @@ class TestApiImageImportTask(test_utils.BaseTestCase):
import_req=self.gd_task_input['import_req'])
class TestImageLock(test_utils.BaseTestCase):
def setUp(self):
super(TestImageLock, self).setUp()
self.img_repo = mock.MagicMock()
@mock.patch('glance.async_.flows.api_image_import.LOG')
def test_execute_confirms_lock(self, mock_log):
self.img_repo.get.return_value.extra_properties = {
'os_glance_import_task': TASK_ID1}
wrapper = import_flow.ImportActionWrapper(self.img_repo, IMAGE_ID1,
TASK_ID1)
imagelock = import_flow._ImageLock(TASK_ID1, TASK_TYPE, wrapper)
imagelock.execute()
mock_log.debug.assert_called_once_with('Image %(image)s import task '
'%(task)s lock confirmed',
{'image': IMAGE_ID1,
'task': TASK_ID1})
@mock.patch('glance.async_.flows.api_image_import.LOG')
def test_execute_confirms_lock_not_held(self, mock_log):
wrapper = import_flow.ImportActionWrapper(self.img_repo, IMAGE_ID1,
TASK_ID1)
imagelock = import_flow._ImageLock(TASK_ID1, TASK_TYPE, wrapper)
self.assertRaises(exception.TaskAbortedError,
imagelock.execute)
@mock.patch('glance.async_.flows.api_image_import.LOG')
def test_revert_drops_lock(self, mock_log):
wrapper = import_flow.ImportActionWrapper(self.img_repo, IMAGE_ID1,
TASK_ID1)
imagelock = import_flow._ImageLock(TASK_ID1, TASK_TYPE, wrapper)
with mock.patch.object(wrapper, 'drop_lock_for_task') as mock_drop:
imagelock.revert(None)
mock_drop.assert_called_once_with()
mock_log.debug.assert_called_once_with('Image %(image)s import task '
'%(task)s dropped its lock '
'after failure',
{'image': IMAGE_ID1,
'task': TASK_ID1})
@mock.patch('glance.async_.flows.api_image_import.LOG')
def test_revert_drops_lock_missing(self, mock_log):
wrapper = import_flow.ImportActionWrapper(self.img_repo, IMAGE_ID1,
TASK_ID1)
imagelock = import_flow._ImageLock(TASK_ID1, TASK_TYPE, wrapper)
with mock.patch.object(wrapper, 'drop_lock_for_task') as mock_drop:
mock_drop.side_effect = exception.NotFound()
imagelock.revert(None)
mock_log.warning.assert_called_once_with('Image %(image)s import task '
'%(task)s lost its lock '
'during execution!',
{'image': IMAGE_ID1,
'task': TASK_ID1})
class TestImportToStoreTask(test_utils.BaseTestCase):
def setUp(self):
@ -137,7 +194,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
img_repo = mock.MagicMock()
img_repo.get.return_value = image
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1,
TASK_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, wrapper,
"http://url",
@ -157,7 +215,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
img_repo = mock.MagicMock()
img_repo.get.return_value = image
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1,
TASK_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, wrapper,
"http://url",
@ -177,7 +236,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
img_repo = mock.MagicMock()
img_repo.get.return_value = image
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1,
TASK_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, wrapper,
"http://url",
@ -198,7 +258,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
task_repo.get.return_value.status = 'processing'
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1,
TASK_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, wrapper,
"http://url",
@ -250,7 +311,8 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
def test_raises_when_image_deleted(self):
img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1,
TASK_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, wrapper,
"http://url",
@ -265,13 +327,15 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
def test_remove_store_from_property(self, mock_import):
img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1,
TASK_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, wrapper,
"http://url",
"store1", True,
True)
extra_properties = {"os_glance_importing_to_stores": "store1,store2"}
extra_properties = {"os_glance_importing_to_stores": "store1,store2",
"os_glance_import_task": TASK_ID1}
image = self.img_factory.new_image(image_id=UUID1,
extra_properties=extra_properties)
img_repo.get.return_value = image
@ -282,13 +346,15 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
def test_revert_updates_status_keys(self):
img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1,
TASK_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, wrapper,
"http://url",
"store1", True,
True)
extra_properties = {"os_glance_importing_to_stores": "store1,store2"}
extra_properties = {"os_glance_importing_to_stores": "store1,store2",
"os_glance_import_task": TASK_ID1}
image = self.img_factory.new_image(image_id=UUID1,
extra_properties=extra_properties)
img_repo.get.return_value = image
@ -313,13 +379,16 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
def test_raises_when_all_stores_must_succeed(self, mock_import):
img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1,
TASK_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, wrapper,
"http://url",
"store1", True,
True)
image = self.img_factory.new_image(image_id=UUID1)
extra_properties = {'os_glance_import_task': TASK_ID1}
image = self.img_factory.new_image(image_id=UUID1,
extra_properties=extra_properties)
img_repo.get.return_value = image
mock_import.set_image_data.side_effect = \
cursive_exception.SignatureVerificationError(
@ -331,13 +400,16 @@ class TestImportToStoreTask(test_utils.BaseTestCase):
def test_doesnt_raise_when_not_all_stores_must_succeed(self, mock_import):
img_repo = mock.MagicMock()
task_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1)
wrapper = import_flow.ImportActionWrapper(img_repo, IMAGE_ID1,
TASK_ID1)
image_import = import_flow._ImportToStore(TASK_ID1, TASK_TYPE,
task_repo, wrapper,
"http://url",
"store1", False,
True)
image = self.img_factory.new_image(image_id=UUID1)
extra_properties = {'os_glance_import_task': TASK_ID1}
image = self.img_factory.new_image(image_id=UUID1,
extra_properties=extra_properties)
img_repo.get.return_value = image
mock_import.set_image_data.side_effect = \
cursive_exception.SignatureVerificationError(
@ -455,10 +527,13 @@ class TestDeleteFromFS(test_utils.BaseTestCase):
class TestVerifyImageStateTask(test_utils.BaseTestCase):
def test_verify_active_status(self):
fake_img = mock.MagicMock(status='active')
fake_img = mock.MagicMock(status='active',
extra_properties={
'os_glance_import_task': TASK_ID1})
mock_repo = mock.MagicMock()
mock_repo.get.return_value = fake_img
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1)
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1,
TASK_ID1)
task = import_flow._VerifyImageState(TASK_ID1, TASK_TYPE,
wrapper, 'anything!')
@ -509,7 +584,7 @@ class TestImportCopyImageTask(test_utils.BaseTestCase):
fake_img = mock.MagicMock()
fake_img.id = IMAGE_ID1
fake_img.status = 'active'
fake_img.extra_properties = {}
fake_img.extra_properties = {'os_glance_import_task': TASK_ID1}
admin_repo.get.return_value = fake_img
import_flow.get_flow(task_id=TASK_ID1,
@ -531,7 +606,10 @@ class TestImportCopyImageTask(test_utils.BaseTestCase):
class TestImportActionWrapper(test_utils.BaseTestCase):
def test_wrapper_success(self):
mock_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1)
mock_repo.get.return_value.extra_properties = {
'os_glance_import_task': TASK_ID1}
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1,
TASK_ID1)
with wrapper as action:
self.assertIsInstance(action, import_flow._ImportActions)
mock_repo.get.assert_called_once_with(IMAGE_ID1)
@ -541,7 +619,10 @@ class TestImportActionWrapper(test_utils.BaseTestCase):
def test_wrapper_failure(self):
mock_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1)
mock_repo.get.return_value.extra_properties = {
'os_glance_import_task': TASK_ID1}
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1,
TASK_ID1)
class SpecificError(Exception):
pass
@ -561,7 +642,9 @@ class TestImportActionWrapper(test_utils.BaseTestCase):
def test_wrapper_logs_status(self, mock_log):
mock_repo = mock.MagicMock()
mock_image = mock_repo.get.return_value
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1)
mock_image.extra_properties = {'os_glance_import_task': TASK_ID1}
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1,
TASK_ID1)
mock_image.status = 'foo'
with wrapper as action:
@ -575,6 +658,61 @@ class TestImportActionWrapper(test_utils.BaseTestCase):
'new_status': 'bar'})
self.assertEqual('bar', mock_image.status)
def test_image_id_property(self):
mock_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1,
TASK_ID1)
self.assertEqual(IMAGE_ID1, wrapper.image_id)
def test_drop_lock_for_task(self):
mock_repo = mock.MagicMock()
mock_repo.get.return_value.extra_properties = {
'os_glance_import_task': TASK_ID1}
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1,
TASK_ID1)
wrapper.drop_lock_for_task()
mock_repo.delete_property_atomic.assert_called_once_with(
mock_repo.get.return_value, 'os_glance_import_task', TASK_ID1)
def test_assert_task_lock(self):
mock_repo = mock.MagicMock()
mock_repo.get.return_value.extra_properties = {
'os_glance_import_task': TASK_ID1}
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1,
TASK_ID1)
wrapper.assert_task_lock()
# Try again with a different task ID and it should fail
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1,
'foo')
self.assertRaises(exception.TaskAbortedError,
wrapper.assert_task_lock)
def _grab_image(self, wrapper):
with wrapper:
pass
@mock.patch.object(import_flow, 'LOG')
def test_check_task_lock(self, mock_log):
mock_repo = mock.MagicMock()
wrapper = import_flow.ImportActionWrapper(mock_repo, IMAGE_ID1,
TASK_ID1)
image = mock.MagicMock(image_id=IMAGE_ID1)
image.extra_properties = {'os_glance_import_task': TASK_ID1}
mock_repo.get.return_value = image
self._grab_image(wrapper)
mock_log.error.assert_not_called()
image.extra_properties['os_glance_import_task'] = 'somethingelse'
self.assertRaises(exception.TaskAbortedError,
self._grab_image, wrapper)
mock_log.error.assert_called_once_with(
'Image %(image)s import task %(task)s attempted to take action on '
'image, but other task %(other)s holds the lock; Aborting.',
{'image': image.image_id,
'task': TASK_ID1,
'other': 'somethingelse'})
class TestImportActions(test_utils.BaseTestCase):
def setUp(self):
@ -767,30 +905,48 @@ class TestCompleteTask(test_utils.BaseTestCase):
super(TestCompleteTask, self).setUp()
self.task_repo = mock.MagicMock()
self.task = mock.MagicMock()
self.wrapper = mock.MagicMock(image_id=IMAGE_ID1)
def test_execute(self, mock_get_task):
complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE,
self.task_repo, IMAGE_ID1)
self.task_repo, self.wrapper)
mock_get_task.return_value = self.task
complete.execute()
mock_get_task.assert_called_once_with(self.task_repo,
TASK_ID1)
self.task.succeed.assert_called_once_with({'image_id': IMAGE_ID1})
self.task_repo.save.assert_called_once_with(self.task)
self.wrapper.drop_lock_for_task.assert_called_once_with()
def test_execute_no_task(self, mock_get_task):
mock_get_task.return_value = None
complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE,
self.task_repo, IMAGE_ID1)
self.task_repo, self.wrapper)
complete.execute()
self.task_repo.save.assert_not_called()
self.wrapper.drop_lock_for_task.assert_called_once_with()
def test_execute_succeed_fails(self, mock_get_task):
mock_get_task.return_value = self.task
self.task.succeed.side_effect = Exception('testing')
complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE,
self.task_repo, IMAGE_ID1)
self.task_repo, self.wrapper)
complete.execute()
self.task.fail.assert_called_once_with(
_('Error: <class \'Exception\'>: testing'))
self.task_repo.save.assert_called_once_with(self.task)
self.wrapper.drop_lock_for_task.assert_called_once_with()
def test_execute_drop_lock_fails(self, mock_get_task):
mock_get_task.return_value = self.task
self.wrapper.drop_lock_for_task.side_effect = exception.NotFound()
complete = import_flow._CompleteTask(TASK_ID1, TASK_TYPE,
self.task_repo, self.wrapper)
with mock.patch('glance.async_.flows.api_image_import.LOG') as m_log:
complete.execute()
m_log.error.assert_called_once_with('Image %(image)s import task '
'%(task)s did not hold the '
'lock upon completion!',
{'image': IMAGE_ID1,
'task': TASK_ID1})
self.task.succeed.assert_called_once_with({'image_id': IMAGE_ID1})

View File

@ -78,13 +78,15 @@ class TestImportTaskFlow(test_utils.BaseTestCase):
def _get_flow(self, import_req=None):
inputs = {
'task_id': mock.MagicMock(),
'task_id': mock.sentinel.task_id,
'task_type': mock.MagicMock(),
'task_repo': mock.MagicMock(),
'image_repo': mock.MagicMock(),
'image_id': mock.MagicMock(),
'import_req': import_req or mock.MagicMock()
}
inputs['image_repo'].get.return_value = mock.MagicMock(
extra_properties={'os_glance_import_task': mock.sentinel.task_id})
flow = api_image_import.get_flow(**inputs)
return flow
@ -106,8 +108,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase):
flow = self._get_flow()
flow_comp = self._get_flow_tasks(flow)
# assert flow has 5 tasks
self.assertEqual(5, len(flow_comp))
# assert flow has 6 tasks
self.assertEqual(6, len(flow_comp))
for c in self.base_flow:
self.assertIn(c, flow_comp)
@ -125,8 +127,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase):
flow = self._get_flow(import_req=import_req)
flow_comp = self._get_flow_tasks(flow)
# assert flow has 6 tasks
self.assertEqual(6, len(flow_comp))
# assert flow has 7 tasks
self.assertEqual(7, len(flow_comp))
for c in self.base_flow:
self.assertIn(c, flow_comp)
self.assertIn('WebDownload', flow_comp)
@ -147,8 +149,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase):
flow = self._get_flow(import_req=import_req)
flow_comp = self._get_flow_tasks(flow)
# assert flow has 6 tasks
self.assertEqual(6, len(flow_comp))
# assert flow has 7 tasks
self.assertEqual(7, len(flow_comp))
for c in self.base_flow:
self.assertIn(c, flow_comp)
self.assertIn('CopyImage', flow_comp)
@ -164,8 +166,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase):
flow = self._get_flow()
flow_comp = self._get_flow_tasks(flow)
# assert flow has 8 tasks (base_flow + plugins)
self.assertEqual(8, len(flow_comp))
# assert flow has 9 tasks (base_flow + plugins)
self.assertEqual(9, len(flow_comp))
for c in self.base_flow:
self.assertIn(c, flow_comp)
for c in self.import_plugins:
@ -192,8 +194,8 @@ class TestImportTaskFlow(test_utils.BaseTestCase):
flow = self._get_flow(import_req=import_req)
flow_comp = self._get_flow_tasks(flow)
# assert flow has 6 tasks
self.assertEqual(6, len(flow_comp))
# assert flow has 7 tasks
self.assertEqual(7, len(flow_comp))
for c in self.base_flow:
self.assertIn(c, flow_comp)
self.assertIn('CopyImage', flow_comp)

View File

@ -24,6 +24,7 @@ from castellan.common import exception as castellan_exception
import glance_store as store
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import fixture
import six
from six.moves import http_client as http
# NOTE(jokke): simplified transition to py3, behaves like py2 xrange
@ -40,6 +41,7 @@ import glance.schema
from glance.tests.unit import base
from glance.tests.unit.keymgr import fake as fake_keymgr
import glance.tests.unit.utils as unit_test_utils
from glance.tests.unit.v2 import test_tasks_resource
import glance.tests.utils as test_utils
CONF = cfg.CONF
@ -746,7 +748,8 @@ class TestImagesController(base.IsolatedUnitTest):
self.controller.import_image, request, UUID4,
{'method': {'name': 'glance-direct'}})
def test_image_import_raises_bad_request(self):
@mock.patch('glance.db.simple.api.image_set_property_atomic')
def test_image_import_raises_bad_request(self, mock_spa):
request = unit_test_utils.get_fake_request()
with mock.patch.object(
glance.api.authorization.ImageRepoProxy, 'get') as mock_get:
@ -2955,30 +2958,166 @@ class TestImagesController(base.IsolatedUnitTest):
pos = self.controller._get_locations_op_pos('1', None, True)
self.assertIsNone(pos)
def test_image_import(self):
@mock.patch('glance.db.simple.api.image_set_property_atomic')
@mock.patch.object(glance.domain.TaskFactory, 'new_task')
def test_image_import(self, mock_nt, mock_spa):
request = unit_test_utils.get_fake_request()
image = FakeImage(status='uploading')
with mock.patch.object(
glance.api.authorization.ImageRepoProxy, 'get') as mock_get:
mock_get.return_value = FakeImage(status='uploading')
mock_get.return_value = image
output = self.controller.import_image(
request, UUID4, {'method': {'name': 'glance-direct'}})
self.assertEqual(UUID4, output)
@mock.patch.object(glance.domain.TaskFactory, 'new_task')
# Make sure we set the lock on the image
mock_spa.assert_called_once_with(UUID4, 'os_glance_import_task',
mock_nt.return_value.task_id)
@mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get')
def test_image_import_not_allowed(self, mock_get, mock_new_task):
# NOTE(danms): FakeImage is owned by utils.TENANT1. Try to do the
# import as TENANT2 and we should get an HTTPForbidden
request = unit_test_utils.get_fake_request(tenant=TENANT2)
mock_get.return_value = FakeImage(status='uploading')
self.assertRaises(webob.exc.HTTPForbidden,
def test_image_import_locked(self, mock_get):
task = test_tasks_resource._db_fixture(test_tasks_resource.UUID1,
status='pending')
self.db.task_create(None, task)
image = FakeImage(status='uploading')
# Image is locked with a valid task that has not aged out, so
# the lock will not be busted.
image.extra_properties['os_glance_import_task'] = task['id']
mock_get.return_value = image
request = unit_test_utils.get_fake_request(tenant=TENANT1)
req_body = {'method': {'name': 'glance-direct'}}
exc = self.assertRaises(webob.exc.HTTPConflict,
self.controller.import_image,
request, UUID1, req_body)
self.assertEqual('Image has active task', str(exc))
@mock.patch('glance.db.simple.api.image_set_property_atomic')
@mock.patch('glance.db.simple.api.image_delete_property_atomic')
@mock.patch.object(glance.api.authorization.TaskFactoryProxy, 'new_task')
@mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get')
def test_image_import_locked_by_reaped_task(self, mock_get, mock_nt,
mock_dpi, mock_spi):
image = FakeImage(status='uploading')
# Image is locked by some other task that TaskRepo will not find
image.extra_properties['os_glance_import_task'] = 'missing'
mock_get.return_value = image
request = unit_test_utils.get_fake_request(tenant=TENANT1)
req_body = {'method': {'name': 'glance-direct'}}
mock_nt.return_value.task_id = 'mytask'
self.controller.import_image(request, UUID1, req_body)
# We should have atomically deleted the missing task lock
mock_dpi.assert_called_once_with(image.id, 'os_glance_import_task',
'missing')
# We should have atomically grabbed the lock with our task id
mock_spi.assert_called_once_with(image.id, 'os_glance_import_task',
'mytask')
@mock.patch('glance.db.simple.api.image_set_property_atomic')
@mock.patch('glance.db.simple.api.image_delete_property_atomic')
@mock.patch.object(glance.api.authorization.TaskFactoryProxy, 'new_task')
@mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get')
def test_image_import_locked_by_bustable_task(self, mock_get, mock_nt,
mock_dpi, mock_spi,
task_status='processing'):
task = test_tasks_resource._db_fixture(
test_tasks_resource.UUID1,
status=task_status)
self.db.task_create(None, task)
image = FakeImage(status='uploading')
# Image is locked by a task in 'processing' state
image.extra_properties['os_glance_import_task'] = task['id']
mock_get.return_value = image
request = unit_test_utils.get_fake_request(tenant=TENANT1)
req_body = {'method': {'name': 'glance-direct'}}
# Task has only been running for ten minutes
time_fixture = fixture.TimeFixture(task['updated_at'] +
datetime.timedelta(minutes=10))
self.useFixture(time_fixture)
mock_nt.return_value.task_id = 'mytask'
# Task holds the lock, API refuses to bust it
self.assertRaises(webob.exc.HTTPConflict,
self.controller.import_image,
request, UUID4, {'method': {'name':
'glance-direct'}})
# NOTE(danms): Make sure we failed early and never even created
# a task
mock_new_task.assert_not_called()
request, UUID1, req_body)
mock_dpi.assert_not_called()
mock_spi.assert_not_called()
mock_nt.assert_not_called()
# Fast forward to 90 minutes from now
time_fixture.advance_time_delta(datetime.timedelta(minutes=90))
self.controller.import_image(request, UUID1, req_body)
# API deleted the other task's lock and locked it for us
mock_dpi.assert_called_once_with(image.id, 'os_glance_import_task',
task['id'])
mock_spi.assert_called_once_with(image.id, 'os_glance_import_task',
'mytask')
def test_image_import_locked_by_bustable_terminal_task_failure(self):
# Make sure we don't fail with a task status transition error
self.test_image_import_locked_by_bustable_task(task_status='failure')
def test_image_import_locked_by_bustable_terminal_task_success(self):
# Make sure we don't fail with a task status transition error
self.test_image_import_locked_by_bustable_task(task_status='success')
def test_bust_import_lock_race_to_delete(self):
image_repo = mock.MagicMock()
task_repo = mock.MagicMock()
image = mock.MagicMock()
task = mock.MagicMock(id='foo')
# Simulate a race where we tried to bust a specific lock and
# someone else already had, and/or re-locked it
image_repo.delete_property_atomic.side_effect = exception.NotFound
self.assertRaises(exception.Conflict,
self.controller._bust_import_lock,
image_repo, task_repo,
image, task, task.id)
def test_enforce_lock_log_not_bustable(self, task_status='processing'):
task = test_tasks_resource._db_fixture(
test_tasks_resource.UUID1,
status=task_status)
self.db.task_create(None, task)
request = unit_test_utils.get_fake_request(tenant=TENANT1)
image = FakeImage()
image.extra_properties['os_glance_import_task'] = task['id']
# Freeze time to make this repeatable
time_fixture = fixture.TimeFixture(task['updated_at'] +
datetime.timedelta(minutes=55))
self.useFixture(time_fixture)
expected_expire = 300
if task_status == 'pending':
# NOTE(danms): Tasks in 'pending' get double the expiry time,
# so we'd be expecting an extra hour here.
expected_expire += 3600
with mock.patch.object(glance.api.v2.images, 'LOG') as mock_log:
self.assertRaises(exception.Conflict,
self.controller._enforce_import_lock,
request, image)
mock_log.warning.assert_called_once_with(
'Image %(image)s has active import task %(task)s in '
'status %(status)s; lock remains valid for %(expire)i '
'more seconds',
{'image': image.id,
'task': task['id'],
'status': task_status,
'expire': expected_expire})
def test_enforce_lock_pending_takes_longer(self):
self.test_enforce_lock_log_not_bustable(task_status='pending')
def test_delete_encryption_key_no_encryption_key(self):
request = unit_test_utils.get_fake_request()