Fix api_image_import tasks stuck in 'pending'

The immediate cause was that the api_image_import flow was not
listed in the entrypoints in setup.cfg, so stevedore couldn't find
it.  After the entrypoint was added, the task was going to failure
because it couldn't find the image_id.  So this patch has the
import controller add the image_id to the task_input dict.  Next
the flows were having trouble finding the image_id, so I just
passed it to them.  There's probably a more elegant way to have
taskflow handle this, so this patch could use a refactoring.  On
the plus side, the interoperable image import process does appear
to work with this patch.

NOTE: if you want to test this in devstack, use the stable/ocata
devstack but with the master branch of glance.  The tasks engine
doesn't appear to be running under the Pike devstack configuration.
(Use defect 1712463 if you have an idea what might be the problem.)

Change-Id: Ic68c17f4cb671eb664e2de331787b55fe9878a27
Closes-bug: #1712462
(cherry picked from commit 4366a7493c)
This commit is contained in:
Brian Rosmaita 2017-08-22 23:33:04 -04:00
parent ced2a1a7fd
commit 982016670f
5 changed files with 27 additions and 18 deletions

View File

@ -94,7 +94,7 @@ class ImagesController(object):
executor_factory = self.gateway.get_task_executor_factory(req.context)
task_repo = self.gateway.get_task_repo(req.context)
task_input = {}
task_input = {'image_id': image_id}
try:
import_task = task_factory.new_task(task_type='api_image_import',

View File

@ -124,15 +124,16 @@ class _VerifyStaging(task.Task):
class _ImportToStore(task.Task):
def __init__(self, task_id, task_type, image_repo, uri):
def __init__(self, task_id, task_type, image_repo, uri, image_id):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.uri = uri
self.image_id = image_id
super(_ImportToStore, self).__init__(
name='%s-ImportToStore-%s' % (task_type, task_id))
def execute(self, image_id, file_path=None):
def execute(self, file_path=None):
"""Bringing the imported image to back end store
:param image_id: Glance Image ID
@ -191,7 +192,7 @@ class _ImportToStore(task.Task):
#
# Lets get to it and identify the different scenarios in the
# implementation
image = self.image_repo.get(image_id)
image = self.image_repo.get(self.image_id)
image.status = 'importing'
self.image_repo.save(image)
@ -244,19 +245,20 @@ class _ImportToStore(task.Task):
class _SaveImage(task.Task):
def __init__(self, task_id, task_type, image_repo):
def __init__(self, task_id, task_type, image_repo, image_id):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.image_id = image_id
super(_SaveImage, self).__init__(
name='%s-SaveImage-%s' % (task_type, task_id))
def execute(self, image_id):
def execute(self):
"""Transition image status to active
:param image_id: Glance Image ID
"""
new_image = self.image_repo.get(image_id)
new_image = self.image_repo.get(self.image_id)
if new_image.status == 'saving':
# NOTE(flaper87): THIS IS WRONG!
# we should be doing atomic updates to avoid
@ -268,14 +270,15 @@ class _SaveImage(task.Task):
class _CompleteTask(task.Task):
def __init__(self, task_id, task_type, task_repo):
def __init__(self, task_id, task_type, task_repo, image_id):
self.task_id = task_id
self.task_type = task_type
self.task_repo = task_repo
self.image_id = image_id
super(_CompleteTask, self).__init__(
name='%s-CompleteTask-%s' % (task_type, task_id))
def execute(self, image_id):
def execute(self):
"""Finishing the task flow
:param image_id: Glance Image ID
@ -284,7 +287,7 @@ class _CompleteTask(task.Task):
if task is None:
return
try:
task.succeed({'image_id': image_id})
task.succeed({'image_id': self.image_id})
except Exception as e:
# Note: The message string contains Error in it to indicate
# in the task.message that it's a error message for the user.
@ -326,12 +329,12 @@ def get_flow(**kwargs):
if not uri:
separator = ''
if not CONF.node_staging_uri.endsWith('/'):
if not CONF.node_staging_uri.endswith('/'):
separator = '/'
uri = separator.join((CONF.node_staging_uri, str(image_id)))
flow = lf.flow(task_type, retry=retry.AlwaysRevert())
flow.add(_VerifyStaging(task_id, task_type, uri))
flow = lf.Flow(task_type, retry=retry.AlwaysRevert())
flow.add(_VerifyStaging(task_id, task_type, task_repo, uri))
# TODO(jokke): For the pluggable tasks like image verification or
# image conversion we need to implement the plugin logic here.
@ -340,7 +343,7 @@ def get_flow(**kwargs):
task_type,
image_repo,
uri,
rebind_args={'image_id': image_id})
image_id)
flow.add(import_to_store)
delete_task = lf.Flow(task_type).add(_DeleteFromFS(task_id, task_type))
@ -349,13 +352,13 @@ def get_flow(**kwargs):
save_task = _SaveImage(task_id,
task_type,
image_repo,
rebind_args={'image_id': image_id})
image_id)
flow.add(save_task)
complete_task = _CompleteTask(task_id,
task_type,
task_repo,
rebind_args={'image_id': image_id})
image_id)
flow.add(complete_task)
return flow

View File

@ -126,6 +126,8 @@ class TaskExecutor(glance.async.TaskExecutor):
uri = script_utils.validate_location_uri(
task_input.get('import_from'))
kwds['uri'] = uri
if task.type == 'api_image_import':
kwds['image_id'] = task_input['image_id']
return driver.DriverManager('glance.flows', task.type,
invoke_on_load=True,
invoke_kwds=kwds).driver

View File

@ -56,8 +56,11 @@ def unpack_task_input(task):
task_input = task.task_input
if task_type == 'api_image_import':
if 'import_method' not in task_input:
msg = _("Input does not contain 'import_method'")
if not task_input:
msg = _("Input to api_image_import task is empty.")
raise exception.Invalid(msg)
if 'image_id' not in task_input:
msg = _("Missing required 'image_id' field")
raise exception.Invalid(msg)
else:
for key in ["import_from", "import_from_format", "image_properties"]:

View File

@ -53,6 +53,7 @@ glance.database.metadata_backend =
sqlalchemy = glance.db.sqlalchemy.metadata
glance.flows =
api_image_import = glance.async.flows.api_image_import:get_flow
import = glance.async.flows.base_import:get_flow
glance.flows.import =