Merge "Fix api_image_import tasks stuck in 'pending'"

This commit is contained in:
Jenkins 2017-08-23 21:09:09 +00:00 committed by Gerrit Code Review
commit 5d2ce20a9e
5 changed files with 27 additions and 18 deletions

View File

@ -94,7 +94,7 @@ class ImagesController(object):
executor_factory = self.gateway.get_task_executor_factory(req.context)
task_repo = self.gateway.get_task_repo(req.context)
task_input = {}
task_input = {'image_id': image_id}
try:
import_task = task_factory.new_task(task_type='api_image_import',

View File

@ -124,15 +124,16 @@ class _VerifyStaging(task.Task):
class _ImportToStore(task.Task):
def __init__(self, task_id, task_type, image_repo, uri):
def __init__(self, task_id, task_type, image_repo, uri, image_id):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.uri = uri
self.image_id = image_id
super(_ImportToStore, self).__init__(
name='%s-ImportToStore-%s' % (task_type, task_id))
def execute(self, image_id, file_path=None):
def execute(self, file_path=None):
"""Bringing the imported image to back end store
:param image_id: Glance Image ID
@ -191,7 +192,7 @@ class _ImportToStore(task.Task):
#
# Lets get to it and identify the different scenarios in the
# implementation
image = self.image_repo.get(image_id)
image = self.image_repo.get(self.image_id)
image.status = 'importing'
self.image_repo.save(image)
@ -244,19 +245,20 @@ class _ImportToStore(task.Task):
class _SaveImage(task.Task):
def __init__(self, task_id, task_type, image_repo):
def __init__(self, task_id, task_type, image_repo, image_id):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.image_id = image_id
super(_SaveImage, self).__init__(
name='%s-SaveImage-%s' % (task_type, task_id))
def execute(self, image_id):
def execute(self):
"""Transition image status to active
:param image_id: Glance Image ID
"""
new_image = self.image_repo.get(image_id)
new_image = self.image_repo.get(self.image_id)
if new_image.status == 'saving':
# NOTE(flaper87): THIS IS WRONG!
# we should be doing atomic updates to avoid
@ -268,14 +270,15 @@ class _SaveImage(task.Task):
class _CompleteTask(task.Task):
def __init__(self, task_id, task_type, task_repo):
def __init__(self, task_id, task_type, task_repo, image_id):
self.task_id = task_id
self.task_type = task_type
self.task_repo = task_repo
self.image_id = image_id
super(_CompleteTask, self).__init__(
name='%s-CompleteTask-%s' % (task_type, task_id))
def execute(self, image_id):
def execute(self):
"""Finishing the task flow
:param image_id: Glance Image ID
@ -284,7 +287,7 @@ class _CompleteTask(task.Task):
if task is None:
return
try:
task.succeed({'image_id': image_id})
task.succeed({'image_id': self.image_id})
except Exception as e:
# Note: The message string contains Error in it to indicate
# in the task.message that it's a error message for the user.
@ -326,12 +329,12 @@ def get_flow(**kwargs):
if not uri:
separator = ''
if not CONF.node_staging_uri.endsWith('/'):
if not CONF.node_staging_uri.endswith('/'):
separator = '/'
uri = separator.join((CONF.node_staging_uri, str(image_id)))
flow = lf.flow(task_type, retry=retry.AlwaysRevert())
flow.add(_VerifyStaging(task_id, task_type, uri))
flow = lf.Flow(task_type, retry=retry.AlwaysRevert())
flow.add(_VerifyStaging(task_id, task_type, task_repo, uri))
# TODO(jokke): For the pluggable tasks like image verification or
# image conversion we need to implement the plugin logic here.
@ -340,7 +343,7 @@ def get_flow(**kwargs):
task_type,
image_repo,
uri,
rebind_args={'image_id': image_id})
image_id)
flow.add(import_to_store)
delete_task = lf.Flow(task_type).add(_DeleteFromFS(task_id, task_type))
@ -349,13 +352,13 @@ def get_flow(**kwargs):
save_task = _SaveImage(task_id,
task_type,
image_repo,
rebind_args={'image_id': image_id})
image_id)
flow.add(save_task)
complete_task = _CompleteTask(task_id,
task_type,
task_repo,
rebind_args={'image_id': image_id})
image_id)
flow.add(complete_task)
return flow

View File

@ -126,6 +126,8 @@ class TaskExecutor(glance.async.TaskExecutor):
uri = script_utils.validate_location_uri(
task_input.get('import_from'))
kwds['uri'] = uri
if task.type == 'api_image_import':
kwds['image_id'] = task_input['image_id']
return driver.DriverManager('glance.flows', task.type,
invoke_on_load=True,
invoke_kwds=kwds).driver

View File

@ -56,8 +56,11 @@ def unpack_task_input(task):
task_input = task.task_input
if task_type == 'api_image_import':
if 'import_method' not in task_input:
msg = _("Input does not contain 'import_method'")
if not task_input:
msg = _("Input to api_image_import task is empty.")
raise exception.Invalid(msg)
if 'image_id' not in task_input:
msg = _("Missing required 'image_id' field")
raise exception.Invalid(msg)
else:
for key in ["import_from", "import_from_format", "image_properties"]:

View File

@ -53,6 +53,7 @@ glance.database.metadata_backend =
sqlalchemy = glance.db.sqlalchemy.metadata
glance.flows =
api_image_import = glance.async.flows.api_image_import:get_flow
import = glance.async.flows.base_import:get_flow
glance.flows.import =