Glance Image Introspection

This patch adds a first step towards image introspection. The patch adds
a set of tasks to correctly import an image from an external source (old
copy-from) and then introspect the image.

The implementation keeps these 2 implementations separate by considering
the import process a top-level task and the introspection step an
optionally available task that may/may not succeed.

The implementation uses stevedore to load both, import and optional
flows, and chains them using linear and unordered flows where needed.

DocImpact

Co-Authored: Erno Kuvaja <jokke@hp.com>

Partially-implements blueprint: new-upload-workflow
Partially-implements blueprint: introspection-of-images
Change-Id: I9e7505d4e84aabf9d71e03c360514ac824ea84de
This commit is contained in:
Flavio Percoco 2015-02-12 16:16:20 +01:00
parent 8a2be34e7e
commit 5dd82abb74
18 changed files with 1179 additions and 54 deletions

View File

@ -460,18 +460,35 @@ revocation_cache_time = 10
# The default value for task_executor is taskflow.
# task_executor = taskflow
# Work dir for asynchronous task operations. The directory set here
# will be used to operate over images - normally before they are
# imported in the destination store. When providing work dir, make sure
# enough space is provided for concurrent tasks to run efficiently
# without running out of space. A rough estimation can be done by
# multiplying the number of `max_workers` - or the N of workers running
# - by an average image size (e.g 500MB). The image size estimation
# should be done based on the average size in your deployment. Note that
# depending on the tasks running you may need to multiply this number by
# some factor depending on what the task does. For example, you may want
# to double the available size if image conversion is enabled. All this
# being said, remember these are just estimations and you should do them
# based on the worst case scenario and be prepared to act in case they
# were wrong.
# work_dir=None
# Specifies the maximum number of eventlet threads which can be spun up by
# the eventlet based task executor to perform execution of Glance tasks.
# DEPRECATED: Use [taskflow_executor]/max_workers instead.
# eventlet_executor_pool_size = 1000
[taskflow_executor]
# The mode in which the engine will run. Can be 'serial' or 'parallel'.
# The mode in which the engine will run. Can bedefault', 'serial',
# 'parallel' or 'worker-based'
#engine_mode = serial
# The number of parallel activities executed at the same time by
# the engine. The value can be greater than one when the engine mode is
# 'parallel', otherwise this value will be ignored.
# 'parallel' or 'worker-based', otherwise this value will be ignored.
#max_workers = 10
[glance_store]

View File

View File

@ -0,0 +1,431 @@
# Copyright 2015 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import glance_store as store_api
from glance_store import backend
from oslo_config import cfg
import six
from stevedore import extension
from taskflow.patterns import linear_flow as lf
from taskflow import retry
from taskflow import task
from glance.common import exception
from glance.common.scripts.image_import import main as image_import
from glance.common.scripts import utils as script_utils
from glance.common import utils as common_utils
from glance import i18n
LOG = logging.getLogger(__name__)
_ = i18n._
_LE = i18n._LE
_LI = i18n._LI
CONF = cfg.CONF
class _CreateImage(task.Task):
default_provides = 'image_id'
def __init__(self, task_id, task_type, task_repo, image_repo,
image_factory):
self.task_id = task_id
self.task_type = task_type
self.task_repo = task_repo
self.image_repo = image_repo
self.image_factory = image_factory
super(_CreateImage, self).__init__(
name='%s-CreateImage-%s' % (task_type, task_id))
def execute(self):
task = script_utils.get_task(self.task_repo, self.task_id)
if task is None:
return
task_input = script_utils.unpack_task_input(task)
image = image_import.create_image(
self.image_repo, self.image_factory,
task_input.get('image_properties'), self.task_id)
LOG.debug("Task %(task_id)s created image %(image_id)s" %
{'task_id': task.task_id, 'image_id': image.image_id})
return image.image_id
def revert(self, *args, **kwargs):
# TODO(flaper87): Define the revert rules for images on failures.
# Deleting the image may not be what we want since users could upload
# the image data in a separate step. However, it really depends on
# when the failure happened. I guess we should check if data has been
# written, although at that point failures are (should be) unexpected,
# at least image-workflow wise.
pass
class _ImportToFS(task.Task):
default_provides = 'file_path'
def __init__(self, task_id, task_type, task_repo, uri):
self.task_id = task_id
self.task_type = task_type
self.task_repo = task_repo
self.uri = uri
super(_ImportToFS, self).__init__(
name='%s-ImportToFS-%s' % (task_type, task_id))
if CONF.task.work_dir is None:
msg = (_("%(task_id)s of %(task_type)s not configured "
"properly. Missing work dir: %(work_dir)s") %
{'task_id': self.task_id,
'task_type': self.task_type,
'work_dir': CONF.task.work_dir})
raise exception.BadTaskConfiguration(msg)
self.store = self._build_store()
def _build_store(self):
# NOTE(flaper87): Due to the nice glance_store api (#sarcasm), we're
# forced to build our own config object, register the required options
# (and by required I mean *ALL* of them, even the ones we don't want),
# and create our own store instance by calling a private function.
# This is certainly unfortunate but it's the best we can do until the
# glance_store refactor is done. A good thing is that glance_store is
# under our team's management and it gates on Glance so changes to
# this API will (should?) break task's tests.
conf = cfg.ConfigOpts()
backend.register_opts(conf)
conf.set_override('filesystem_store_datadir',
CONF.task.work_dir,
group='glance_store')
# NOTE(flaper87): Do not even try to judge me for this... :(
# With the glance_store refactor, this code will change, until
# that happens, we don't have a better option and this is the
# least worst one, IMHO.
store = backend._load_store(conf, 'file')
if store is None:
msg = (_("%(task_id)s of %(task_type)s not configured "
"properly. Could not load the filesystem store") %
{'task_id': self.task_id, 'task_type': self.task_type})
raise exception.BadTaskConfiguration(msg)
store.configure()
return store
def execute(self, image_id):
"""Create temp file into store and return path to it
:param image_id: Glance Image ID
"""
# NOTE(flaper87): We've decided to use a separate `work_dir` for
# this task - and tasks coming after this one - as a way to expect
# users to configure a local store for pre-import works on the image
# to happen.
#
# While using any path should be "technically" fine, it's not what
# we recommend as the best solution. For more details on this, please
# refer to the comment in the `_ImportToStore.execute` method.
data = script_utils.get_image_data_iter(self.uri)
# NOTE(jokke): Using .tasks_import to ease debugging. The file name
# is specific so we know exactly where it's coming from.
tmp_id = "%s.tasks_import" % image_id
path = self.store.add(tmp_id, data, 0, context=None)[0]
return path
def revert(self, image_id, result=None, **kwargs):
# NOTE(flaper87): If result is None, it probably
# means this task failed. Otherwise, we would have
# a result from its execution.
if result is None:
return
if os.path.exists(result.split("file://")[-1]):
store_api.delete_from_backend(result)
class _DeleteFromFS(task.Task):
def __init__(self, task_id, task_type):
self.task_id = task_id
self.task_type = task_type
super(_DeleteFromFS, self).__init__(
name='%s-DeleteFromFS-%s' % (task_type, task_id))
def execute(self, file_path):
"""Remove file from the backend
:param file_path: path to the file being deleted
"""
store_api.delete_from_backend(file_path)
class _ImportToStore(task.Task):
def __init__(self, task_id, task_type, image_repo, uri):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
self.uri = uri
super(_ImportToStore, self).__init__(
name='%s-ImportToStore-%s' % (task_type, task_id))
def execute(self, image_id, file_path=None):
"""Bringing the introspected image to back end store
:param image_id: Glance Image ID
:param file_path: path to the image file
"""
# NOTE(flaper87): There are a couple of interesting bits in the
# interaction between this task and the `_ImportToFS` one. I'll try
# to cover them in this comment.
#
# NOTE(flaper87):
# `_ImportToFS` downloads the image to a dedicated `work_dir` which
# needs to be configured in advance (please refer to the config option
# docs for more info). The motivation behind this is also explained in
# the `_ImportToFS.execute` method.
#
# Due to the fact that we have an `_ImportToFS` task which downloads
# the image data already, we need to be as smart as we can in this task
# to avoid downloading the data several times and reducing the copy or
# write times. There are several scenarios where the interaction
# between this task and `_ImportToFS` could be improved. All these
# scenarios assume the `_ImportToFS` task has been executed before
# and/or in a more abstract scenario, that `file_path` is being
# provided.
#
# Scenario 1: FS Store is Remote, introspection enabled,
# conversion disabled
#
# In this scenario, the user would benefit from having the scratch path
# being the same path as the fs store. Only one write would happen and
# an extra read will happen in order to introspect the image. Note that
# this read is just for the image headers and not the entire file.
#
# Scenario 2: FS Store is remote, introspection enabled,
# conversion enabled
#
# In this scenario, the user would benefit from having a *local* store
# into which the image can be converted. This will require downloading
# the image locally, converting it and then copying the converted image
# to the remote store.
#
# Scenario 3: FS Store is local, introspection enabled,
# conversion disabled
# Scenario 4: FS Store is local, introspection enabled,
# conversion enabled
#
# In both these scenarios the user shouldn't care if the FS
# store path and the work dir are the same, therefore probably
# benefit, about the scratch path and the FS store being the
# same from a performance perspective. Space wise, regardless
# of the scenario, the user will have to account for it in
# advance.
#
# Lets get to it and identify the different scenarios in the
# implementation
image = self.image_repo.get(image_id)
image.status = 'saving'
self.image_repo.save(image)
# NOTE(flaper87): Let's dance... and fall
#
# Unfortunatelly, because of the way our domain layers work and
# the checks done in the FS store, we can't simply rename the file
# and set the location. To do that, we'd have to duplicate the logic
# of every and each of the domain factories (quota, location, etc)
# and we'd also need to hack the FS store to prevent it from raising
# a "duplication path" error. I'd rather have this task copying the
# image bits one more time than duplicating all that logic.
#
# Since I don't think this should be the definitive solution, I'm
# leaving the code below as a reference for what should happen here
# once the FS store and domain code will be able to handle this case.
#
# if file_path is None:
# image_import.set_image_data(image, self.uri, None)
# return
# NOTE(flaper87): Don't assume the image was stored in the
# work_dir. Think in the case this path was provided by another task.
# Also, lets try to neither assume things nor create "logic"
# dependencies between this task and `_ImportToFS`
#
# base_path = os.path.dirname(file_path.split("file://")[-1])
# NOTE(flaper87): Hopefully just scenarios #3 and #4. I say
# hopefully because nothing prevents the user to use the same
# FS store path as a work dir
#
# image_path = os.path.join(base_path, image_id)
#
# if (base_path == CONF.glance_store.filesystem_store_datadir or
# base_path in CONF.glance_store.filesystem_store_datadirs):
# os.rename(file_path, image_path)
#
# image_import.set_image_data(image, image_path, None)
image_import.set_image_data(image, file_path or self.uri, None)
class _SaveImage(task.Task):
def __init__(self, task_id, task_type, image_repo):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
super(_SaveImage, self).__init__(
name='%s-SaveImage-%s' % (task_type, task_id))
def execute(self, image_id):
"""Transition image status to active
:param image_id: Glance Image ID
"""
new_image = self.image_repo.get(image_id)
if new_image.status == 'saving':
# NOTE(flaper87): THIS IS WRONG!
# we should be doing atomic updates to avoid
# race conditions. This happens in other places
# too.
new_image.status = 'active'
self.image_repo.save(new_image)
class _CompleteTask(task.Task):
def __init__(self, task_id, task_type, task_repo):
self.task_id = task_id
self.task_type = task_type
self.task_repo = task_repo
super(_CompleteTask, self).__init__(
name='%s-CompleteTask-%s' % (task_type, task_id))
def execute(self, image_id):
"""Finishing the task flow
:param image_id: Glance Image ID
"""
task = script_utils.get_task(self.task_repo, self.task_id)
if task is None:
return
try:
task.succeed({'image_id': image_id})
except Exception as e:
# Note: The message string contains Error in it to indicate
# in the task.message that it's a error message for the user.
# TODO(nikhil): need to bring back save_and_reraise_exception when
# necessary
err_msg = ("Error: " + six.text_type(type(e)) + ': ' +
common_utils.exception_to_str(e))
log_msg = err_msg + _LE("Task ID %s") % task.task_id
LOG.exception(log_msg)
task.fail(err_msg)
finally:
self.task_repo.save(task)
LOG.info(_LI("%(task_id)s of %(task_type)s completed") %
{'task_id': self.task_id, 'task_type': self.task_type})
def _get_import_flows(**kwargs):
extensions = extension.ExtensionManager('glance.flows.import',
invoke_on_load=True,
invoke_kwds=kwargs)
for ext in extensions.extensions:
yield ext.obj
def get_flow(**kwargs):
"""Return task flow
:param task_id: Task ID
:param task_type: Type of the task
:param task_repo: Task repo
:param image_repo: Image repository used
:param image_factory: Glance Image Factory
:param uri: uri for the image file
"""
task_id = kwargs.get('task_id')
task_type = kwargs.get('task_type')
task_repo = kwargs.get('task_repo')
image_repo = kwargs.get('image_repo')
image_factory = kwargs.get('image_factory')
uri = kwargs.get('uri')
flow = lf.Flow(task_type, retry=retry.AlwaysRevert()).add(
_CreateImage(task_id, task_type, task_repo, image_repo, image_factory))
import_to_store = _ImportToStore(task_id, task_type, image_repo, uri)
try:
# NOTE(flaper87): ImportToLocal and DeleteFromLocal shouldn't be here.
# Ideally, we should have the different import flows doing this for us
# and this function should clean up duplicated tasks. For example, say
# 2 flows need to have a local copy of the image - ImportToLocal - in
# order to be able to complete the task - i.e Introspect-. In that
# case, the introspect.get_flow call should add both, ImportToLocal and
# DeleteFromLocal, to the flow and this function will reduce the
# duplicated calls to those tasks by creating a linear flow that
# ensures those are called before the other tasks. For now, I'm
# keeping them here, though.
limbo = lf.Flow(task_type).add(_ImportToFS(task_id,
task_type,
task_repo,
uri))
for subflow in _get_import_flows(**kwargs):
limbo.add(subflow)
# NOTE(flaper87): We have hard-coded 2 tasks,
# if there aren't more than 2, it means that
# no subtask has been registered.
if len(limbo) > 1:
flow.add(limbo)
# NOTE(flaper87): Until this implementation gets smarter,
# make sure ImportToStore is called *after* the imported
# flow stages. If not, the image will be set to saving state
# invalidating tasks like Introspection or Convert.
flow.add(import_to_store)
# NOTE(flaper87): Since this is an "optional" task but required
# when `limbo` is executed, we're adding it in its own subflow
# to isolat it from the rest of the flow.
delete_flow = lf.Flow(task_type).add(_DeleteFromFS(task_id,
task_type))
flow.add(delete_flow)
else:
flow.add(import_to_store)
except exception.BadTaskConfiguration:
# NOTE(flaper87): If something goes wrong with the load of
# import tasks, make sure we go on.
flow.add(import_to_store)
flow.add(
_SaveImage(task_id, task_type, image_repo),
_CompleteTask(task_id, task_type, task_repo)
)
return flow

View File

@ -0,0 +1,89 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
from oslo_concurrency import processutils as putils
from oslo_utils import excutils
from taskflow.patterns import linear_flow as lf
from glance.async import utils
from glance import i18n
_LE = i18n._LE
_LI = i18n._LI
LOG = logging.getLogger(__name__)
class _Introspect(utils.OptionalTask):
"""Taskflow to pull the embedded metadata out of image file"""
def __init__(self, task_id, task_type, image_repo):
self.task_id = task_id
self.task_type = task_type
self.image_repo = image_repo
super(_Introspect, self).__init__(
name='%s-Introspect-%s' % (task_type, task_id))
def execute(self, image_id, file_path):
"""Does the actual introspection
:param image_id: Glance image ID
:param file_path: Path to the file being introspected
"""
try:
stdout, stderr = putils.trycmd('qemu-img', 'info',
'--output=json', file_path,
log_errors=putils.LOG_ALL_ERRORS)
except OSError as exc:
# NOTE(flaper87): errno == 2 means the executable file
# was not found. For now, log an error and move forward
# until we have a better way to enable/disable optional
# tasks.
if exc.errno != 2:
with excutils.save_and_reraise_exception():
msg = (_LE('Failed to execute introspection '
'%(task_id)s: %(exc)s') %
{'task_id': self.task_id, 'exc': exc.message})
LOG.error(msg)
return
if stderr:
raise RuntimeError(stderr)
metadata = json.loads(stdout)
new_image = self.image_repo.get(image_id)
new_image.virtual_size = metadata.get('virtual-size', 0)
new_image.disk_format = metadata.get('format')
self.image_repo.save(new_image)
LOG.debug("%(task_id)s: Introspection successful: %(file)s" %
{'task_id': self.task_id, 'file': file_path})
return new_image
def get_flow(**kwargs):
task_id = kwargs.get('task_id')
task_type = kwargs.get('task_type')
image_repo = kwargs.get('image_repo')
LOG.debug("Flow: %(task_type)s with ID %(id)s on %(repo)s" %
{'task_type': task_type, 'id': task_id, 'repo': image_repo})
return lf.Flow(task_type).add(
_Introspect(task_id, task_type, image_repo),
)

View File

@ -17,20 +17,18 @@ import contextlib
from oslo_config import cfg
from oslo_utils import excutils
from stevedore import driver
from taskflow import engines
from taskflow.listeners import logging as llistener
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.types import futures
from taskflow.utils import eventlet_utils
import glance.async
import glance.common.scripts as scripts
from glance.common.scripts import utils as script_utils
from glance import i18n
import glance.openstack.common.log as logging
_ = i18n._
_LI = i18n._LI
_LE = i18n._LE
LOG = logging.getLogger(__name__)
@ -56,23 +54,6 @@ CONF = cfg.CONF
CONF.register_opts(taskflow_executor_opts, group='taskflow_executor')
class _Task(task.Task):
def __init__(self, task_id, task_type, context, task_repo,
image_repo, image_factory):
super(_Task, self).__init__(name='%s-%s' % (task_type, task_id))
self.task_id = task_id
self.task_type = task_type
self.context = context
self.task_repo = task_repo
self.image_repo = image_repo
self.image_factory = image_factory
def execute(self):
scripts.run_task(self.task_id, self.task_type, self.context,
self.task_repo, self.image_repo, self.image_factory)
class TaskExecutor(glance.async.TaskExecutor):
def __init__(self, context, task_repo, image_repo, image_factory):
@ -101,15 +82,43 @@ class TaskExecutor(glance.async.TaskExecutor):
else:
yield futures.ThreadPoolExecutor(max_workers=max_workers)
def _get_flow(self, task):
try:
task_input = script_utils.unpack_task_input(task)
uri = script_utils.validate_location_uri(
task_input.get('import_from'))
kwds = {
'uri': uri,
'task_id': task.task_id,
'task_type': task.type,
'context': self.context,
'task_repo': self.task_repo,
'image_repo': self.image_repo,
'image_factory': self.image_factory
}
return driver.DriverManager('glance.flows', task.type,
invoke_on_load=True,
invoke_kwds=kwds).driver
except RuntimeError:
raise NotImplementedError()
def _run(self, task_id, task_type):
LOG.info(_LI('Taskflow executor picked up the execution of task ID '
'%(task_id)s of task type '
'%(task_type)s') % {'task_id': task_id,
'task_type': task_type})
flow = lf.Flow(task_type).add(
_Task(task_id, task_type, self.context, self.task_repo,
self.image_repo, self.image_factory)
)
LOG.debug('Taskflow executor picked up the execution of task ID '
'%(task_id)s of task type '
'%(task_type)s' % {'task_id': task_id,
'task_type': task_type})
task = script_utils.get_task(self.task_repo, task_id)
if task is None:
# NOTE: This happens if task is not found in the database. In
# such cases, there is no way to update the task status so,
# it's ignored here.
return
flow = self._get_flow(task)
try:
with self._executor() as executor:
engine = engines.load(flow, self.engine_conf,

66
glance/async/utils.py Normal file
View File

@ -0,0 +1,66 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import task
from glance import i18n
import glance.openstack.common.log as logging
LOG = logging.getLogger(__name__)
_LW = i18n._LW
class OptionalTask(task.Task):
def __init__(self, *args, **kwargs):
super(OptionalTask, self).__init__(*args, **kwargs)
self.execute = self._catch_all(self.execute)
def _catch_all(self, func):
# NOTE(flaper87): Read this comment before calling the MI6
# Here's the thing, there's no nice way to define "optional"
# tasks. That is, tasks whose failure shouldn't affect the execution
# of the flow. The only current "sane" way to do this, is by catching
# everything and logging. This seems harmless from a taskflow
# perspective but it is not. There are some issues related to this
# "workaround":
#
# - Task's states will shamelessly lie to us saying the task succeeded.
#
# - No revert procedure will be triggered, which means optional tasks,
# for now, mustn't cause any side-effects because they won't be able to
# clean them up. If these tasks depend on other task that do cause side
# effects, a task that cleans those side effects most be registered as
# well. For example, _ImportToFS, _MyDumbTask, _DeleteFromFS.
#
# - Ideally, optional tasks shouldn't `provide` new values unless they
# are part of an optional flow. Due to the decoration of the execute
# method, these tasks will need to define the provided methods at
# class level using `default_provides`.
#
#
# The taskflow team is working on improving this and on something that
# will provide the ability of defining optional tasks. For now, to lie
# ourselves we must.
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as exc:
msg = (_LW("An optional task has failed, "
"the failure was: %s") %
exc.message)
LOG.warn(msg)
return wrapper

View File

@ -70,6 +70,26 @@ task_opts = [
default='taskflow',
help=_("Specifies which task executor to be used to run the "
"task scripts.")),
cfg.StrOpt('work_dir',
default=None,
help=_('Work dir for asynchronous task operations. '
'The directory set here will be used to operate over '
'images - normally before they are imported in the '
'destination store. When providing work dir, make sure '
'enough space is provided for concurrent tasks to run '
'efficiently without running out of space. A rough '
'estimation can be done by multiplying the number of '
'`max_workers` - or the N of workers running - by an '
'average image size (e.g 500MB). The image size '
'estimation should be done based on the average size in '
'your deployment. Note that depending on the tasks '
'running you may need to multiply this number by some '
'factor depending on what the task does. For example, '
'you may want to double the available size if image '
'conversion is enabled. All this being said, remember '
'these are just estimations and you should do them '
'based on the worst case scenario and be prepared to '
'act in case they were wrong.')),
]
manage_opts = [
cfg.BoolOpt('db_enforce_mysql_charset',

View File

@ -324,6 +324,10 @@ class TaskException(GlanceException):
message = _("An unknown task exception occurred")
class BadTaskConfiguration(GlanceException):
message = _("Task was not configured properly")
class TaskNotFound(TaskException, NotFound):
message = _("Task with the given id %(task_id)s was not found")

View File

@ -111,10 +111,26 @@ def validate_location_uri(location):
def get_image_data_iter(uri):
"""The scripts are expected to support only over non-local locations of
data. Note the absence of file:// for security reasons, see LP bug #942118.
If the above constraint is violated, task should fail.
"""Returns iterable object either for local file or uri
:param uri: uri (remote or local) to the datasource we want to iterate
Validation/sanitization of the uri is expected to happen before we get
here.
"""
# NOTE: Current script supports http location. Other locations
# types are to be supported as the script evolve.
# NOTE(flaper87): This is safe because the input uri is already
# verified before the task is created.
if uri.startswith("file://"):
uri = uri.split("file://")[-1]
# NOTE(flaper87): The caller of this function expects to have
# an iterable object. FileObjects in python are iterable, therefore
# we are returning it as is.
# The file descriptor will be eventually cleaned up by the garbage
# collector once its ref-count is dropped to 0. That is, when there
# wont be any references pointing to this file.
#
# We're not using StringIO or other tools to avoid reading everything
# into memory. Some images may be quite heavy.
return open(uri, "r")
return urllib2.urlopen(uri)

View File

@ -30,6 +30,7 @@ import glance.openstack.common.log as logging
_ = i18n._
_LE = i18n._LE
_LW = i18n._LW
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('task_executor', 'glance.common.config', group='task')
@ -453,6 +454,7 @@ class TaskFactory(object):
class TaskExecutorFactory(object):
eventlet_deprecation_warned = False
def __init__(self, task_repo, image_repo, image_factory):
self.task_repo = task_repo
@ -467,6 +469,13 @@ class TaskExecutorFactory(object):
# executor.
task_executor = CONF.task.task_executor
if task_executor == 'eventlet':
# NOTE(jokke): Making sure we do not log the deprecation
# warning 1000 times or anything crazy like that.
if not TaskExecutorFactory.eventlet_deprecation_warned:
msg = _LW("The `eventlet` executor has been deprecated. "
"Use `taskflow` instead.")
LOG.warn(msg)
TaskExecutorFactory.eventlet_deprecation_warned = True
task_executor = 'taskflow'
executor_cls = ('glance.async.%s_executor.'

View File

@ -0,0 +1,308 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import os
import urllib2
import glance_store
from oslo_config import cfg
from six.moves import cStringIO
from taskflow import task
import glance.async.flows.base_import as import_flow
from glance.async import taskflow_executor
from glance.common.scripts.image_import import main as image_import
from glance.common.scripts import utils as script_utils
from glance.common import utils
from glance import domain
from glance import gateway
import glance.tests.utils as test_utils
CONF = cfg.CONF
UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
class _ErrorTask(task.Task):
def execute(self):
raise RuntimeError()
class TestImportTask(test_utils.BaseTestCase):
def setUp(self):
super(TestImportTask, self).setUp()
glance_store.register_opts(CONF)
self.config(default_store='file',
stores=['file', 'http'],
filesystem_store_datadir=self.test_dir,
group="glance_store")
glance_store.create_stores(CONF)
self.work_dir = os.path.join(self.test_dir, 'work_dir')
utils.safe_mkdirs(self.work_dir)
self.config(work_dir=self.work_dir, group='task')
self.context = mock.MagicMock()
self.img_repo = mock.MagicMock()
self.task_repo = mock.MagicMock()
self.gateway = gateway.Gateway()
self.task_factory = domain.TaskFactory()
self.img_factory = self.gateway.get_image_factory(self.context)
self.image = self.img_factory.new_image(image_id=UUID1,
disk_format='qcow2',
container_format='bare')
task_input = {
"import_from": "http://cloud.foo/image.qcow2",
"import_from_format": "qcow2",
"image_properties": {'disk_format': 'qcow2',
'container_format': 'bare'}
}
task_ttl = CONF.task.task_time_to_live
self.task_type = 'import'
self.task = self.task_factory.new_task(self.task_type, TENANT1,
task_time_to_live=task_ttl,
task_input=task_input)
def test_import_flow(self):
self.config(engine_mode='serial',
group='taskflow_executor')
img_factory = mock.MagicMock()
executor = taskflow_executor.TaskExecutor(
self.context,
self.task_repo,
self.img_repo,
img_factory)
self.task_repo.get.return_value = self.task
def create_image(*args, **kwargs):
kwargs['image_id'] = UUID1
return self.img_factory.new_image(*args, **kwargs)
self.img_repo.get.return_value = self.image
img_factory.new_image.side_effect = create_image
with mock.patch.object(script_utils, 'get_image_data_iter') as dmock:
dmock.return_value = cStringIO("TEST_IMAGE")
executor.begin_processing(self.task.task_id)
image_path = os.path.join(self.test_dir, self.image.image_id)
tmp_image_path = os.path.join(self.work_dir,
"%s.tasks_import" % image_path)
self.assertFalse(os.path.exists(tmp_image_path))
self.assertTrue(os.path.exists(image_path))
def test_import_flow_missing_work_dir(self):
self.config(engine_mode='serial', group='taskflow_executor')
self.config(work_dir=None, group='task')
img_factory = mock.MagicMock()
executor = taskflow_executor.TaskExecutor(
self.context,
self.task_repo,
self.img_repo,
img_factory)
self.task_repo.get.return_value = self.task
def create_image(*args, **kwargs):
kwargs['image_id'] = UUID1
return self.img_factory.new_image(*args, **kwargs)
self.img_repo.get.return_value = self.image
img_factory.new_image.side_effect = create_image
with mock.patch.object(script_utils, 'get_image_data_iter') as dmock:
dmock.return_value = cStringIO("TEST_IMAGE")
with mock.patch.object(import_flow._ImportToFS, 'execute') as emk:
executor.begin_processing(self.task.task_id)
self.assertFalse(emk.called)
image_path = os.path.join(self.test_dir, self.image.image_id)
tmp_image_path = os.path.join(self.work_dir,
"%s.tasks_import" % image_path)
self.assertFalse(os.path.exists(tmp_image_path))
self.assertTrue(os.path.exists(image_path))
def test_import_flow_revert(self):
self.config(engine_mode='serial',
group='taskflow_executor')
img_factory = mock.MagicMock()
executor = taskflow_executor.TaskExecutor(
self.context,
self.task_repo,
self.img_repo,
img_factory)
self.task_repo.get.return_value = self.task
def create_image(*args, **kwargs):
kwargs['image_id'] = UUID1
return self.img_factory.new_image(*args, **kwargs)
self.img_repo.get.return_value = self.image
img_factory.new_image.side_effect = create_image
with mock.patch.object(script_utils, 'get_image_data_iter') as dmock:
dmock.return_value = cStringIO("TEST_IMAGE")
with mock.patch.object(import_flow, "_get_import_flows") as imock:
imock.return_value = (x for x in [_ErrorTask()])
self.assertRaises(RuntimeError,
executor.begin_processing, self.task.task_id)
image_path = os.path.join(self.test_dir, self.image.image_id)
tmp_image_path = os.path.join(self.work_dir,
"%s.tasks_import" % image_path)
self.assertFalse(os.path.exists(tmp_image_path))
# NOTE(flaper87): Eventually, we want this to be assertTrue.
# The current issue is there's no way to tell taskflow to
# continue on failures. That is, revert the subflow but keep
# executing the parent flow. Under discussion/development.
self.assertFalse(os.path.exists(image_path))
def test_import_flow_no_import_flows(self):
self.config(engine_mode='serial',
group='taskflow_executor')
img_factory = mock.MagicMock()
executor = taskflow_executor.TaskExecutor(
self.context,
self.task_repo,
self.img_repo,
img_factory)
self.task_repo.get.return_value = self.task
def create_image(*args, **kwargs):
kwargs['image_id'] = UUID1
return self.img_factory.new_image(*args, **kwargs)
self.img_repo.get.return_value = self.image
img_factory.new_image.side_effect = create_image
with mock.patch.object(urllib2, 'urlopen') as umock:
content = "TEST_IMAGE"
umock.return_value = cStringIO(content)
with mock.patch.object(import_flow, "_get_import_flows") as imock:
imock.return_value = (x for x in [])
executor.begin_processing(self.task.task_id)
image_path = os.path.join(self.test_dir, self.image.image_id)
tmp_image_path = os.path.join(self.work_dir,
"%s.tasks_import" % image_path)
self.assertFalse(os.path.exists(tmp_image_path))
self.assertTrue(os.path.exists(image_path))
umock.assert_called_once()
with open(image_path) as ifile:
self.assertEqual(content, ifile.read())
def test_create_image(self):
image_create = import_flow._CreateImage(self.task.task_id,
self.task_type,
self.task_repo,
self.img_repo,
self.img_factory)
self.task_repo.get.return_value = self.task
with mock.patch.object(image_import, 'create_image') as ci_mock:
ci_mock.return_value = mock.Mock()
image_create.execute()
ci_mock.assert_called_once_with(self.img_repo,
self.img_factory,
{'container_format': 'bare',
'disk_format': 'qcow2'},
self.task.task_id)
def test_save_image(self):
save_image = import_flow._SaveImage(self.task.task_id,
self.task_type,
self.img_repo)
with mock.patch.object(self.img_repo, 'get') as get_mock:
image_id = mock.sentinel.image_id
image = mock.MagicMock(image_id=image_id, status='saving')
get_mock.return_value = image
with mock.patch.object(self.img_repo, 'save') as save_mock:
save_image.execute(image.image_id)
get_mock.assert_called_once_with(image_id)
save_mock.assert_called_once_with(image)
self.assertEqual('active', image.status)
def test_import_to_fs(self):
import_fs = import_flow._ImportToFS(self.task.task_id,
self.task_type,
self.task_repo,
'http://example.com/image.qcow2')
with mock.patch.object(script_utils, 'get_image_data_iter') as dmock:
dmock.return_value = "test"
image_id = UUID1
path = import_fs.execute(image_id)
reader, size = glance_store.get_from_backend(path)
self.assertEqual(4, size)
self.assertEqual(dmock.return_value, "".join(reader))
image_path = os.path.join(self.work_dir, image_id)
tmp_image_path = os.path.join(self.work_dir,
"%s.tasks_import" % image_path)
self.assertTrue(os.path.exists(tmp_image_path))
def test_delete_from_fs(self):
delete_fs = import_flow._DeleteFromFS(self.task.task_id,
self.task_type)
data = "test"
store = glance_store.get_store_from_scheme('file')
path = glance_store.store_add_to_backend(mock.sentinel.image_id, data,
mock.sentinel.image_size,
store, context=None)[0]
path_wo_scheme = path.split("file://")[1]
self.assertTrue(os.path.exists(path_wo_scheme))
delete_fs.execute(path)
self.assertFalse(os.path.exists(path_wo_scheme))
def test_complete_task(self):
complete_task = import_flow._CompleteTask(self.task.task_id,
self.task_type,
self.task_repo)
image_id = mock.sentinel.image_id
image = mock.MagicMock(image_id=image_id)
self.task_repo.get.return_value = self.task
with mock.patch.object(self.task, 'succeed') as succeed:
complete_task.execute(image.image_id)
succeed.assert_called_once_with({'image_id': image_id})

View File

@ -0,0 +1,111 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import glance_store
from oslo_concurrency import processutils
from oslo_config import cfg
from glance.async.flows import introspect
from glance import domain
import glance.tests.utils as test_utils
CONF = cfg.CONF
UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
class TestImportTask(test_utils.BaseTestCase):
def setUp(self):
super(TestImportTask, self).setUp()
self.task_factory = domain.TaskFactory()
task_input = {
"import_from": "http://cloud.foo/image.qcow2",
"import_from_format": "qcow2",
"image_properties": mock.sentinel.image_properties
}
task_ttl = CONF.task.task_time_to_live
self.task_type = 'import'
self.task = self.task_factory.new_task(self.task_type, TENANT1,
task_time_to_live=task_ttl,
task_input=task_input)
self.context = mock.Mock()
self.img_repo = mock.Mock()
self.task_repo = mock.Mock()
self.img_factory = mock.Mock()
glance_store.register_opts(CONF)
self.config(default_store='file',
stores=['file', 'http'],
filesystem_store_datadir=self.test_dir,
group="glance_store")
glance_store.create_stores(CONF)
def test_introspect_success(self):
image_create = introspect._Introspect(self.task.task_id,
self.task_type,
self.img_repo)
self.task_repo.get.return_value = self.task
image_id = mock.sentinel.image_id
image = mock.MagicMock(image_id=image_id)
self.img_repo.get.return_value = image
with mock.patch.object(processutils, 'execute') as exc_mock:
result = json.dumps({
"virtual-size": 10737418240,
"filename": "/tmp/image.qcow2",
"cluster-size": 65536,
"format": "qcow2",
"actual-size": 373030912,
"format-specific": {
"type": "qcow2",
"data": {
"compat": "0.10"
}
},
"dirty-flag": False
})
exc_mock.return_value = (result, None)
image_create.execute(image, '/test/path.qcow2')
self.assertEqual(10737418240, image.virtual_size)
def test_introspect_no_image(self):
image_create = introspect._Introspect(self.task.task_id,
self.task_type,
self.img_repo)
self.task_repo.get.return_value = self.task
image_id = mock.sentinel.image_id
image = mock.MagicMock(image_id=image_id, virtual_size=None)
self.img_repo.get.return_value = image
# NOTE(flaper87): Don't mock, test the error.
with mock.patch.object(processutils, 'execute') as exc_mock:
exc_mock.return_value = (None, "some error")
# NOTE(flaper87): Pls, read the `OptionalTask._catch_all`
# docs to know why this is commented.
# self.assertRaises(RuntimeError,
# image_create.execute,
# image, '/test/path.qcow2')
image_create.execute(image, '/test/path.qcow2')
self.assertIsNone(image.virtual_size)

View File

@ -15,18 +15,53 @@
import mock
import glance_store
from oslo.config import cfg
from taskflow import engines
from glance.async import taskflow_executor
from glance import domain
import glance.tests.utils as test_utils
CONF = cfg.CONF
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
class TestTaskExecutor(test_utils.BaseTestCase):
def setUp(self):
super(TestTaskExecutor, self).setUp()
glance_store.register_opts(CONF)
self.config(default_store='file',
stores=['file', 'http'],
filesystem_store_datadir=self.test_dir,
group="glance_store")
glance_store.create_stores(CONF)
self.config(engine_mode='serial',
group='taskflow_executor')
self.context = mock.Mock()
self.task_repo = mock.Mock()
self.image_repo = mock.Mock()
self.image_factory = mock.Mock()
task_input = {
"import_from": "http://cloud.foo/image.qcow2",
"import_from_format": "qcow2",
"image_properties": {'disk_format': 'qcow2',
'container_format': 'bare'}
}
task_ttl = CONF.task.task_time_to_live
self.task_type = 'import'
self.task_factory = domain.TaskFactory()
self.task = self.task_factory.new_task(self.task_type, TENANT1,
task_time_to_live=task_ttl,
task_input=task_input)
self.executor = taskflow_executor.TaskExecutor(
self.context,
self.task_repo,
@ -34,14 +69,12 @@ class TestTaskExecutor(test_utils.BaseTestCase):
self.image_factory)
def test_begin_processing(self):
task_id = mock.ANY
task = mock.Mock()
task.type = mock.ANY
with mock.patch.object(taskflow_executor.TaskExecutor,
'_run') as mock_run:
self.task_repo.get.return_value = task
self.executor.begin_processing(task_id)
with mock.patch.object(engines, 'load') as load_mock:
engine = mock.Mock()
load_mock.return_value = engine
self.task_repo.get.return_value = self.task
self.executor.begin_processing(self.task.task_id)
# assert the call
mock_run.assert_called_once_with(task_id, task.type)
load_mock.assert_called_once()
engine.assert_called_once()

View File

@ -584,7 +584,7 @@ class TestTaskRepo(test_utils.BaseTestCase):
self.task_factory = glance.domain.TaskFactory()
self.fake_task_input = ('{"import_from": '
'"swift://cloud.foo/account/mycontainer/path"'
',"image_from_format": "qcow2"}')
',"import_from_format": "qcow2"}')
self._create_tasks()
def _create_tasks(self):

View File

@ -140,6 +140,7 @@ class OptsTestCase(utils.BaseTestCase):
'disk_formats',
'task_time_to_live',
'task_executor',
'work_dir',
'store_type_preference',
'flavor',
'config_file',

View File

@ -297,7 +297,8 @@ class TestTasksController(test_utils.BaseTestCase):
"type": "import",
"input": {
"import_from": "swift://cloud.foo/myaccount/mycontainer/path",
"image_from_format": "qcow2"
"import_from_format": "qcow2",
"image_properties": {}
}
}
new_task = mock.Mock()
@ -316,17 +317,21 @@ class TestTasksController(test_utils.BaseTestCase):
mock_get_task_executor_factory.new_task_exector.assert_called_once()
mock_get_task_factory.new_task.run.assert_called_once()
def test_notifications_on_create(self):
@mock.patch.object(glance.gateway.Gateway, 'get_task_factory')
def test_notifications_on_create(self, mock_get_task_factory):
request = unit_test_utils.get_fake_request()
new_task = mock.MagicMock(type='import')
mock_get_task_factory.new_task.return_value = new_task
new_task.run.return_value = mock.ANY
task = {"type": "import", "input": {
"import_from": "swift://cloud.foo/myaccount/mycontainer/path",
"image_from_format": "qcow2"}
"import_from": "http://cloud.foo/myaccount/mycontainer/path",
"import_from_format": "qcow2",
"image_properties": {}
}
}
task = self.controller.create(request, task=task)
self.assertEqual('import', task.type)
self.assertEqual({
"import_from": "swift://cloud.foo/myaccount/mycontainer/path",
"image_from_format": "qcow2"}, task.task_input)
output_logs = [nlog for nlog in self.notifier.get_logs()
if nlog['event_type'] == 'task.create']
self.assertEqual(1, len(output_logs))

View File

@ -47,6 +47,12 @@ glance.database.migration_backend =
glance.database.metadata_backend =
sqlalchemy = glance.db.sqlalchemy.metadata
glance.flows =
import = glance.async.flows.base_import:get_flow
glance.flows.import =
introspect = glance.async.flows.introspect:get_flow
[build_sphinx]
all_files = 1
build-dir = doc/build