diff --git a/glance/api/v2/images.py b/glance/api/v2/images.py index 9f79ead516..d464c7998c 100644 --- a/glance/api/v2/images.py +++ b/glance/api/v2/images.py @@ -94,7 +94,7 @@ class ImagesController(object): executor_factory = self.gateway.get_task_executor_factory(req.context) task_repo = self.gateway.get_task_repo(req.context) - task_input = {} + task_input = {'image_id': image_id} try: import_task = task_factory.new_task(task_type='api_image_import', diff --git a/glance/async/flows/api_image_import.py b/glance/async/flows/api_image_import.py index e5036f69ee..91b753f914 100644 --- a/glance/async/flows/api_image_import.py +++ b/glance/async/flows/api_image_import.py @@ -124,15 +124,16 @@ class _VerifyStaging(task.Task): class _ImportToStore(task.Task): - def __init__(self, task_id, task_type, image_repo, uri): + def __init__(self, task_id, task_type, image_repo, uri, image_id): self.task_id = task_id self.task_type = task_type self.image_repo = image_repo self.uri = uri + self.image_id = image_id super(_ImportToStore, self).__init__( name='%s-ImportToStore-%s' % (task_type, task_id)) - def execute(self, image_id, file_path=None): + def execute(self, file_path=None): """Bringing the imported image to back end store :param image_id: Glance Image ID @@ -191,7 +192,7 @@ class _ImportToStore(task.Task): # # Lets get to it and identify the different scenarios in the # implementation - image = self.image_repo.get(image_id) + image = self.image_repo.get(self.image_id) image.status = 'importing' self.image_repo.save(image) @@ -244,19 +245,20 @@ class _ImportToStore(task.Task): class _SaveImage(task.Task): - def __init__(self, task_id, task_type, image_repo): + def __init__(self, task_id, task_type, image_repo, image_id): self.task_id = task_id self.task_type = task_type self.image_repo = image_repo + self.image_id = image_id super(_SaveImage, self).__init__( name='%s-SaveImage-%s' % (task_type, task_id)) - def execute(self, image_id): + def execute(self): """Transition image status to active :param image_id: Glance Image ID """ - new_image = self.image_repo.get(image_id) + new_image = self.image_repo.get(self.image_id) if new_image.status == 'saving': # NOTE(flaper87): THIS IS WRONG! # we should be doing atomic updates to avoid @@ -268,14 +270,15 @@ class _SaveImage(task.Task): class _CompleteTask(task.Task): - def __init__(self, task_id, task_type, task_repo): + def __init__(self, task_id, task_type, task_repo, image_id): self.task_id = task_id self.task_type = task_type self.task_repo = task_repo + self.image_id = image_id super(_CompleteTask, self).__init__( name='%s-CompleteTask-%s' % (task_type, task_id)) - def execute(self, image_id): + def execute(self): """Finishing the task flow :param image_id: Glance Image ID @@ -284,7 +287,7 @@ class _CompleteTask(task.Task): if task is None: return try: - task.succeed({'image_id': image_id}) + task.succeed({'image_id': self.image_id}) except Exception as e: # Note: The message string contains Error in it to indicate # in the task.message that it's a error message for the user. @@ -326,12 +329,12 @@ def get_flow(**kwargs): if not uri: separator = '' - if not CONF.node_staging_uri.endsWith('/'): + if not CONF.node_staging_uri.endswith('/'): separator = '/' uri = separator.join((CONF.node_staging_uri, str(image_id))) - flow = lf.flow(task_type, retry=retry.AlwaysRevert()) - flow.add(_VerifyStaging(task_id, task_type, uri)) + flow = lf.Flow(task_type, retry=retry.AlwaysRevert()) + flow.add(_VerifyStaging(task_id, task_type, task_repo, uri)) # TODO(jokke): For the pluggable tasks like image verification or # image conversion we need to implement the plugin logic here. @@ -340,7 +343,7 @@ def get_flow(**kwargs): task_type, image_repo, uri, - rebind_args={'image_id': image_id}) + image_id) flow.add(import_to_store) delete_task = lf.Flow(task_type).add(_DeleteFromFS(task_id, task_type)) @@ -349,13 +352,13 @@ def get_flow(**kwargs): save_task = _SaveImage(task_id, task_type, image_repo, - rebind_args={'image_id': image_id}) + image_id) flow.add(save_task) complete_task = _CompleteTask(task_id, task_type, task_repo, - rebind_args={'image_id': image_id}) + image_id) flow.add(complete_task) return flow diff --git a/glance/async/taskflow_executor.py b/glance/async/taskflow_executor.py index ab252fbd4c..6098ac342a 100644 --- a/glance/async/taskflow_executor.py +++ b/glance/async/taskflow_executor.py @@ -126,6 +126,8 @@ class TaskExecutor(glance.async.TaskExecutor): uri = script_utils.validate_location_uri( task_input.get('import_from')) kwds['uri'] = uri + if task.type == 'api_image_import': + kwds['image_id'] = task_input['image_id'] return driver.DriverManager('glance.flows', task.type, invoke_on_load=True, invoke_kwds=kwds).driver diff --git a/glance/common/scripts/utils.py b/glance/common/scripts/utils.py index f88d21015a..f7c95a4af6 100644 --- a/glance/common/scripts/utils.py +++ b/glance/common/scripts/utils.py @@ -56,8 +56,11 @@ def unpack_task_input(task): task_input = task.task_input if task_type == 'api_image_import': - if 'import_method' not in task_input: - msg = _("Input does not contain 'import_method'") + if not task_input: + msg = _("Input to api_image_import task is empty.") + raise exception.Invalid(msg) + if 'image_id' not in task_input: + msg = _("Missing required 'image_id' field") raise exception.Invalid(msg) else: for key in ["import_from", "import_from_format", "image_properties"]: diff --git a/setup.cfg b/setup.cfg index dbdd4f66b8..4230d952c4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -53,6 +53,7 @@ glance.database.metadata_backend = sqlalchemy = glance.db.sqlalchemy.metadata glance.flows = + api_image_import = glance.async.flows.api_image_import:get_flow import = glance.async.flows.base_import:get_flow glance.flows.import =