Merge "Fix api_image_import tasks stuck in 'pending'"
This commit is contained in:
commit
5d2ce20a9e
|
@ -94,7 +94,7 @@ class ImagesController(object):
|
|||
executor_factory = self.gateway.get_task_executor_factory(req.context)
|
||||
task_repo = self.gateway.get_task_repo(req.context)
|
||||
|
||||
task_input = {}
|
||||
task_input = {'image_id': image_id}
|
||||
|
||||
try:
|
||||
import_task = task_factory.new_task(task_type='api_image_import',
|
||||
|
|
|
@ -124,15 +124,16 @@ class _VerifyStaging(task.Task):
|
|||
|
||||
class _ImportToStore(task.Task):
|
||||
|
||||
def __init__(self, task_id, task_type, image_repo, uri):
|
||||
def __init__(self, task_id, task_type, image_repo, uri, image_id):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.image_repo = image_repo
|
||||
self.uri = uri
|
||||
self.image_id = image_id
|
||||
super(_ImportToStore, self).__init__(
|
||||
name='%s-ImportToStore-%s' % (task_type, task_id))
|
||||
|
||||
def execute(self, image_id, file_path=None):
|
||||
def execute(self, file_path=None):
|
||||
"""Bringing the imported image to back end store
|
||||
|
||||
:param image_id: Glance Image ID
|
||||
|
@ -191,7 +192,7 @@ class _ImportToStore(task.Task):
|
|||
#
|
||||
# Lets get to it and identify the different scenarios in the
|
||||
# implementation
|
||||
image = self.image_repo.get(image_id)
|
||||
image = self.image_repo.get(self.image_id)
|
||||
image.status = 'importing'
|
||||
self.image_repo.save(image)
|
||||
|
||||
|
@ -244,19 +245,20 @@ class _ImportToStore(task.Task):
|
|||
|
||||
class _SaveImage(task.Task):
|
||||
|
||||
def __init__(self, task_id, task_type, image_repo):
|
||||
def __init__(self, task_id, task_type, image_repo, image_id):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.image_repo = image_repo
|
||||
self.image_id = image_id
|
||||
super(_SaveImage, self).__init__(
|
||||
name='%s-SaveImage-%s' % (task_type, task_id))
|
||||
|
||||
def execute(self, image_id):
|
||||
def execute(self):
|
||||
"""Transition image status to active
|
||||
|
||||
:param image_id: Glance Image ID
|
||||
"""
|
||||
new_image = self.image_repo.get(image_id)
|
||||
new_image = self.image_repo.get(self.image_id)
|
||||
if new_image.status == 'saving':
|
||||
# NOTE(flaper87): THIS IS WRONG!
|
||||
# we should be doing atomic updates to avoid
|
||||
|
@ -268,14 +270,15 @@ class _SaveImage(task.Task):
|
|||
|
||||
class _CompleteTask(task.Task):
|
||||
|
||||
def __init__(self, task_id, task_type, task_repo):
|
||||
def __init__(self, task_id, task_type, task_repo, image_id):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.task_repo = task_repo
|
||||
self.image_id = image_id
|
||||
super(_CompleteTask, self).__init__(
|
||||
name='%s-CompleteTask-%s' % (task_type, task_id))
|
||||
|
||||
def execute(self, image_id):
|
||||
def execute(self):
|
||||
"""Finishing the task flow
|
||||
|
||||
:param image_id: Glance Image ID
|
||||
|
@ -284,7 +287,7 @@ class _CompleteTask(task.Task):
|
|||
if task is None:
|
||||
return
|
||||
try:
|
||||
task.succeed({'image_id': image_id})
|
||||
task.succeed({'image_id': self.image_id})
|
||||
except Exception as e:
|
||||
# Note: The message string contains Error in it to indicate
|
||||
# in the task.message that it's a error message for the user.
|
||||
|
@ -326,12 +329,12 @@ def get_flow(**kwargs):
|
|||
|
||||
if not uri:
|
||||
separator = ''
|
||||
if not CONF.node_staging_uri.endsWith('/'):
|
||||
if not CONF.node_staging_uri.endswith('/'):
|
||||
separator = '/'
|
||||
uri = separator.join((CONF.node_staging_uri, str(image_id)))
|
||||
|
||||
flow = lf.flow(task_type, retry=retry.AlwaysRevert())
|
||||
flow.add(_VerifyStaging(task_id, task_type, uri))
|
||||
flow = lf.Flow(task_type, retry=retry.AlwaysRevert())
|
||||
flow.add(_VerifyStaging(task_id, task_type, task_repo, uri))
|
||||
|
||||
# TODO(jokke): For the pluggable tasks like image verification or
|
||||
# image conversion we need to implement the plugin logic here.
|
||||
|
@ -340,7 +343,7 @@ def get_flow(**kwargs):
|
|||
task_type,
|
||||
image_repo,
|
||||
uri,
|
||||
rebind_args={'image_id': image_id})
|
||||
image_id)
|
||||
flow.add(import_to_store)
|
||||
|
||||
delete_task = lf.Flow(task_type).add(_DeleteFromFS(task_id, task_type))
|
||||
|
@ -349,13 +352,13 @@ def get_flow(**kwargs):
|
|||
save_task = _SaveImage(task_id,
|
||||
task_type,
|
||||
image_repo,
|
||||
rebind_args={'image_id': image_id})
|
||||
image_id)
|
||||
flow.add(save_task)
|
||||
|
||||
complete_task = _CompleteTask(task_id,
|
||||
task_type,
|
||||
task_repo,
|
||||
rebind_args={'image_id': image_id})
|
||||
image_id)
|
||||
flow.add(complete_task)
|
||||
|
||||
return flow
|
||||
|
|
|
@ -126,6 +126,8 @@ class TaskExecutor(glance.async.TaskExecutor):
|
|||
uri = script_utils.validate_location_uri(
|
||||
task_input.get('import_from'))
|
||||
kwds['uri'] = uri
|
||||
if task.type == 'api_image_import':
|
||||
kwds['image_id'] = task_input['image_id']
|
||||
return driver.DriverManager('glance.flows', task.type,
|
||||
invoke_on_load=True,
|
||||
invoke_kwds=kwds).driver
|
||||
|
|
|
@ -56,8 +56,11 @@ def unpack_task_input(task):
|
|||
task_input = task.task_input
|
||||
|
||||
if task_type == 'api_image_import':
|
||||
if 'import_method' not in task_input:
|
||||
msg = _("Input does not contain 'import_method'")
|
||||
if not task_input:
|
||||
msg = _("Input to api_image_import task is empty.")
|
||||
raise exception.Invalid(msg)
|
||||
if 'image_id' not in task_input:
|
||||
msg = _("Missing required 'image_id' field")
|
||||
raise exception.Invalid(msg)
|
||||
else:
|
||||
for key in ["import_from", "import_from_format", "image_properties"]:
|
||||
|
|
Loading…
Reference in New Issue