Fix api_image_import tasks stuck in 'pending'
The immediate cause was that the api_image_import flow was not
listed in the entrypoints in setup.cfg, so stevedore couldn't find
it. After the entrypoint was added, the task was going to failure
because it couldn't find the image_id. So this patch has the
import controller add the image_id to the task_input dict. Next
the flows were having trouble finding the image_id, so I just
passed it to them. There's probably a more elegant way to have
taskflow handle this, so this patch could use a refactoring. On
the plus side, the interoperable image import process does appear
to work with this patch.
NOTE: if you want to test this in devstack, use the stable/ocata
devstack but with the master branch of glance. The tasks engine
doesn't appear to be running under the Pike devstack configuration.
(Use defect 1712463 if you have an idea what might be the problem.)
Change-Id: Ic68c17f4cb671eb664e2de331787b55fe9878a27
Closes-bug: #1712462
(cherry picked from commit 4366a7493c
)
This commit is contained in:
parent
ced2a1a7fd
commit
982016670f
|
@ -94,7 +94,7 @@ class ImagesController(object):
|
|||
executor_factory = self.gateway.get_task_executor_factory(req.context)
|
||||
task_repo = self.gateway.get_task_repo(req.context)
|
||||
|
||||
task_input = {}
|
||||
task_input = {'image_id': image_id}
|
||||
|
||||
try:
|
||||
import_task = task_factory.new_task(task_type='api_image_import',
|
||||
|
|
|
@ -124,15 +124,16 @@ class _VerifyStaging(task.Task):
|
|||
|
||||
class _ImportToStore(task.Task):
|
||||
|
||||
def __init__(self, task_id, task_type, image_repo, uri):
|
||||
def __init__(self, task_id, task_type, image_repo, uri, image_id):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.image_repo = image_repo
|
||||
self.uri = uri
|
||||
self.image_id = image_id
|
||||
super(_ImportToStore, self).__init__(
|
||||
name='%s-ImportToStore-%s' % (task_type, task_id))
|
||||
|
||||
def execute(self, image_id, file_path=None):
|
||||
def execute(self, file_path=None):
|
||||
"""Bringing the imported image to back end store
|
||||
|
||||
:param image_id: Glance Image ID
|
||||
|
@ -191,7 +192,7 @@ class _ImportToStore(task.Task):
|
|||
#
|
||||
# Lets get to it and identify the different scenarios in the
|
||||
# implementation
|
||||
image = self.image_repo.get(image_id)
|
||||
image = self.image_repo.get(self.image_id)
|
||||
image.status = 'importing'
|
||||
self.image_repo.save(image)
|
||||
|
||||
|
@ -244,19 +245,20 @@ class _ImportToStore(task.Task):
|
|||
|
||||
class _SaveImage(task.Task):
|
||||
|
||||
def __init__(self, task_id, task_type, image_repo):
|
||||
def __init__(self, task_id, task_type, image_repo, image_id):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.image_repo = image_repo
|
||||
self.image_id = image_id
|
||||
super(_SaveImage, self).__init__(
|
||||
name='%s-SaveImage-%s' % (task_type, task_id))
|
||||
|
||||
def execute(self, image_id):
|
||||
def execute(self):
|
||||
"""Transition image status to active
|
||||
|
||||
:param image_id: Glance Image ID
|
||||
"""
|
||||
new_image = self.image_repo.get(image_id)
|
||||
new_image = self.image_repo.get(self.image_id)
|
||||
if new_image.status == 'saving':
|
||||
# NOTE(flaper87): THIS IS WRONG!
|
||||
# we should be doing atomic updates to avoid
|
||||
|
@ -268,14 +270,15 @@ class _SaveImage(task.Task):
|
|||
|
||||
class _CompleteTask(task.Task):
|
||||
|
||||
def __init__(self, task_id, task_type, task_repo):
|
||||
def __init__(self, task_id, task_type, task_repo, image_id):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.task_repo = task_repo
|
||||
self.image_id = image_id
|
||||
super(_CompleteTask, self).__init__(
|
||||
name='%s-CompleteTask-%s' % (task_type, task_id))
|
||||
|
||||
def execute(self, image_id):
|
||||
def execute(self):
|
||||
"""Finishing the task flow
|
||||
|
||||
:param image_id: Glance Image ID
|
||||
|
@ -284,7 +287,7 @@ class _CompleteTask(task.Task):
|
|||
if task is None:
|
||||
return
|
||||
try:
|
||||
task.succeed({'image_id': image_id})
|
||||
task.succeed({'image_id': self.image_id})
|
||||
except Exception as e:
|
||||
# Note: The message string contains Error in it to indicate
|
||||
# in the task.message that it's a error message for the user.
|
||||
|
@ -326,12 +329,12 @@ def get_flow(**kwargs):
|
|||
|
||||
if not uri:
|
||||
separator = ''
|
||||
if not CONF.node_staging_uri.endsWith('/'):
|
||||
if not CONF.node_staging_uri.endswith('/'):
|
||||
separator = '/'
|
||||
uri = separator.join((CONF.node_staging_uri, str(image_id)))
|
||||
|
||||
flow = lf.flow(task_type, retry=retry.AlwaysRevert())
|
||||
flow.add(_VerifyStaging(task_id, task_type, uri))
|
||||
flow = lf.Flow(task_type, retry=retry.AlwaysRevert())
|
||||
flow.add(_VerifyStaging(task_id, task_type, task_repo, uri))
|
||||
|
||||
# TODO(jokke): For the pluggable tasks like image verification or
|
||||
# image conversion we need to implement the plugin logic here.
|
||||
|
@ -340,7 +343,7 @@ def get_flow(**kwargs):
|
|||
task_type,
|
||||
image_repo,
|
||||
uri,
|
||||
rebind_args={'image_id': image_id})
|
||||
image_id)
|
||||
flow.add(import_to_store)
|
||||
|
||||
delete_task = lf.Flow(task_type).add(_DeleteFromFS(task_id, task_type))
|
||||
|
@ -349,13 +352,13 @@ def get_flow(**kwargs):
|
|||
save_task = _SaveImage(task_id,
|
||||
task_type,
|
||||
image_repo,
|
||||
rebind_args={'image_id': image_id})
|
||||
image_id)
|
||||
flow.add(save_task)
|
||||
|
||||
complete_task = _CompleteTask(task_id,
|
||||
task_type,
|
||||
task_repo,
|
||||
rebind_args={'image_id': image_id})
|
||||
image_id)
|
||||
flow.add(complete_task)
|
||||
|
||||
return flow
|
||||
|
|
|
@ -126,6 +126,8 @@ class TaskExecutor(glance.async.TaskExecutor):
|
|||
uri = script_utils.validate_location_uri(
|
||||
task_input.get('import_from'))
|
||||
kwds['uri'] = uri
|
||||
if task.type == 'api_image_import':
|
||||
kwds['image_id'] = task_input['image_id']
|
||||
return driver.DriverManager('glance.flows', task.type,
|
||||
invoke_on_load=True,
|
||||
invoke_kwds=kwds).driver
|
||||
|
|
|
@ -56,8 +56,11 @@ def unpack_task_input(task):
|
|||
task_input = task.task_input
|
||||
|
||||
if task_type == 'api_image_import':
|
||||
if 'import_method' not in task_input:
|
||||
msg = _("Input does not contain 'import_method'")
|
||||
if not task_input:
|
||||
msg = _("Input to api_image_import task is empty.")
|
||||
raise exception.Invalid(msg)
|
||||
if 'image_id' not in task_input:
|
||||
msg = _("Missing required 'image_id' field")
|
||||
raise exception.Invalid(msg)
|
||||
else:
|
||||
for key in ["import_from", "import_from_format", "image_properties"]:
|
||||
|
|
Loading…
Reference in New Issue