Add the checkpoint copy API for Karbor

Change-Id: I203733f9e810e16bcaba8e3ac7d11d06e4347372
Implements: blueprint support-copy-the-checkpoint-api
This commit is contained in:
chenying 2017-11-10 15:22:10 +08:00
parent 10de69e32d
commit cf1fb3610d
15 changed files with 485 additions and 4 deletions

126
karbor/api/v1/copies.py Normal file
View File

@ -0,0 +1,126 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""The copy api."""
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import uuidutils
from webob import exc
from karbor.api import common
from karbor.api.openstack import wsgi
from karbor import exception
from karbor.i18n import _
from karbor import objects
from karbor.policies import copies as copy_policy
from karbor.services.protection import api as protection_api
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class CopiesViewBuilder(common.ViewBuilder):
"""Model a server API response as a python dictionary."""
def detail(self, request, copy):
"""Detailed view of a single copy."""
copy_ref = {
'copy': {
'project_id': copy.get('project_id'),
'provider_id': copy.get('provider_id'),
'plan_id': copy.get('plan_id'),
'checkpoint_id': copy.get('checkpoint_id'),
'parameters': copy.get('parameters'),
}
}
return copy_ref
class CopiesController(wsgi.Controller):
"""The copy API controller for the OpenStack API."""
_view_builder_class = CopiesViewBuilder
def __init__(self):
self.protection_api = protection_api.API()
super(CopiesController, self).__init__()
def create(self, req, provider_id, body):
"""Creates a new copy."""
if not self.is_valid_body(body, 'copy'):
raise exc.HTTPUnprocessableEntity()
LOG.debug('Create copy request body: %s', body)
context = req.environ['karbor.context']
context.can(copy_policy.CREATE_POLICY)
copy = body['copy']
plan_id = copy.get("plan_id", None)
if not uuidutils.is_uuid_like(plan_id):
msg = _("Invalid plan id provided.")
raise exception.InvalidInput(reason=msg)
if not uuidutils.is_uuid_like(provider_id):
msg = _("Invalid provider id provided.")
raise exception.InvalidInput(reason=msg)
parameters = copy.get("parameters", None)
if parameters:
if not isinstance(parameters, dict):
msg = _("The parameters must be a dict when creating"
" a copy.")
raise exception.InvalidInput(reason=msg)
try:
plan = objects.Plan.get_by_id(context, plan_id)
except exception.PlanNotFound as error:
raise exc.HTTPNotFound(explanation=error.msg)
if provider_id != plan.provider_id:
msg = _("The provider id is not the same as the value "
"in the plan.")
raise exception.InvalidInput(reason=msg)
filters = {'plan_id': plan_id}
checkpoints = self.protection_api.list_checkpoints(
context, provider_id, marker=None, limit=None,
sort_keys=None, sort_dirs=None, filters=filters, offset=None)
if not checkpoints:
msg = _("The plan has not been protected.")
raise exception.InvalidInput(reason=msg)
plan.parameters.update(parameters)
try:
checkpoint_copy = self.protection_api.copy(context, plan)
except Exception:
LOG.exception("Failed to create checkpoint copies.")
raise
copy = {
'project_id': context.project_id,
'provider_id': plan.provider_id,
'plan_id': plan.id,
'checkpoint_id': checkpoint_copy,
'parameters': parameters
}
retval = self._view_builder.detail(req, copy)
return retval
def create_resource():
return wsgi.Resource(CopiesController())

View File

@ -13,6 +13,7 @@
from oslo_service import wsgi as base_wsgi
from karbor.api.openstack import ProjectMapper
from karbor.api.v1 import copies
from karbor.api.v1 import operation_logs
from karbor.api.v1 import plans
from karbor.api.v1 import protectables
@ -43,6 +44,7 @@ class APIRouter(base_wsgi.Router):
service_resources = services.create_resource()
quota_resources = quotas.create_resource()
quota_class_resources = quota_classes.create_resource()
copy_resources = copies.create_resource()
mapper.resource("plan", "plans",
controller=plans_resources,
@ -132,4 +134,10 @@ class APIRouter(base_wsgi.Router):
controller=quota_class_resources,
collection={},
member={'action': 'POST'})
mapper.connect("copy",
"/{project_id}/providers/{provider_id}/checkpoints/"
"action",
controller=copy_resources,
action='create',
conditions={"method": ['POST']})
super(APIRouter, self).__init__(mapper)

View File

@ -16,11 +16,13 @@ OPERATION_TYPES = (
OPERATION_RESTORE,
OPERATION_DELETE,
OPERATION_VERIFY,
OPERATION_COPY,
) = (
'protect',
'restore',
'delete',
'verify'
'verify',
'copy'
)
@ -66,6 +68,9 @@ PLAN_STATUS_STARTED = 'started'
CHECKPOINT_STATUS_ERROR = 'error'
CHECKPOINT_STATUS_PROTECTING = 'protecting'
CHECKPOINT_STATUS_WAIT_COPYING = 'wait_copying'
CHECKPOINT_STATUS_COPYING = 'copying'
CHECKPOINT_STATUS_COPY_FINISHED = 'finished'
CHECKPOINT_STATUS_AVAILABLE = 'available'
CHECKPOINT_STATUS_DELETING = 'deleting'
CHECKPOINT_STATUS_DELETED = 'deleted'

View File

@ -15,6 +15,7 @@
import itertools
from karbor.policies import base
from karbor.policies import copies
from karbor.policies import operation_logs
from karbor.policies import plans
from karbor.policies import protectables
@ -42,4 +43,5 @@ def list_rules():
services.list_rules(),
quotas.list_rules(),
quota_classes.list_rules(),
copies.list_rules(),
)

39
karbor/policies/copies.py Normal file
View File

@ -0,0 +1,39 @@
# Copyright (c) 2017 Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
from karbor.policies import base
CREATE_POLICY = 'copy:create'
copies_policies = [
policy.DocumentedRuleDefault(
name=CREATE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
description='Create a copy.',
operations=[
{
'method': 'POST',
'path': '/{project_id}/providers/{provider_id}/'
'checkpoints/action'
}
]),
]
def list_rules():
return copies_policies

View File

@ -34,6 +34,9 @@ class API(base.Base):
return self.protection_rpcapi.protect(context, plan,
checkpoint_properties)
def copy(self, context, plan):
return self.protection_rpcapi.copy(context, plan)
def delete(self, context, provider_id, checkpoint_id):
return self.protection_rpcapi.delete(
context,

View File

@ -73,6 +73,10 @@ class Checkpoint(object):
# TODO(saggi): check for valid values and transitions
return self._md_cache["status"]
@property
def extra_info(self):
return self._md_cache["extra_info"]
@property
def project_id(self):
return self._md_cache["project_id"]
@ -100,6 +104,10 @@ class Checkpoint(object):
def status(self, value):
self._md_cache["status"] = value
@extra_info.setter
def extra_info(self, value):
self._md_cache["extra_info"] = value
@resource_graph.setter
def resource_graph(self, resource_graph):
serialized_resource_graph = graph.serialize_resource_graph(
@ -170,14 +178,18 @@ class Checkpoint(object):
provider_id = plan.get("provider_id")
project_id = plan.get("project_id")
extra_info = None
checkpoint_status = constants.CHECKPOINT_STATUS_PROTECTING
if checkpoint_properties:
extra_info = checkpoint_properties.get("extra_info", None)
status = checkpoint_properties.get("status", None)
if status:
checkpoint_status = status
checkpoint_section.update_object(
key=_INDEX_FILE_NAME,
value={
"version": cls.VERSION,
"id": checkpoint_id,
"status": constants.CHECKPOINT_STATUS_PROTECTING,
"status": checkpoint_status,
"owner_id": owner_id,
"provider_id": provider_id,
"project_id": project_id,

View File

@ -0,0 +1,191 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from karbor.common import constants
from karbor import exception
from karbor.resource import Resource
from karbor.services.protection.flows import utils
from karbor.services.protection import resource_flow
from oslo_log import log as logging
from oslo_utils import timeutils
from oslo_serialization import jsonutils
from taskflow import task
LOG = logging.getLogger(__name__)
class InitiateCopyTask(task.Task):
def execute(self, context, checkpoint, checkpoint_copy, operation_log,
*args, **kwargs):
LOG.debug("Initiate copy checkpoint_id: %s", checkpoint_copy.id)
checkpoint_copy.status = constants.CHECKPOINT_STATUS_COPYING
checkpoint_copy.commit()
update_fields = {"status": checkpoint_copy.status}
utils.update_operation_log(context, operation_log, update_fields)
def revert(self, context, checkpoint, checkpoint_copy, operation_log,
*args, **kwargs):
LOG.debug("Failed to copy checkpoint_id: %s", checkpoint_copy.id)
checkpoint_copy.status = constants.CHECKPOINT_STATUS_ERROR
checkpoint_copy.commit()
update_fields = {
"status": checkpoint_copy.status,
"ended_at": timeutils.utcnow()
}
utils.update_operation_log(context, operation_log, update_fields)
class CompleteCopyTask(task.Task):
def execute(self, context, checkpoint, checkpoint_copy, operation_log):
LOG.debug("Complete copy checkpoint_id: %s", checkpoint_copy.id)
checkpoint_copy.status = constants.CHECKPOINT_STATUS_AVAILABLE
if checkpoint_copy.extra_info:
extra_info = jsonutils.loads(checkpoint_copy.extra_info)
extra_info['copy_status'] = \
constants.CHECKPOINT_STATUS_COPY_FINISHED
else:
extra_info = {
'copy_status': constants.CHECKPOINT_STATUS_COPY_FINISHED}
checkpoint_copy.extra_info = jsonutils.dumps(extra_info)
checkpoint_copy.commit()
update_fields = {
"status": checkpoint_copy.status,
"ended_at": timeutils.utcnow()
}
utils.update_operation_log(context, operation_log, update_fields)
def get_flow(context, protectable_registry, workflow_engine, plan, provider,
checkpoint, checkpoint_copy):
resources = set(Resource(**item) for item in plan.get("resources"))
resource_graph = protectable_registry.build_graph(context,
resources)
checkpoint_copy.resource_graph = resource_graph
checkpoint_copy.commit()
operation_log = utils.create_operation_log(context, checkpoint_copy,
constants.OPERATION_COPY)
flow_name = "Copy_" + plan.get('id')+checkpoint.id
copy_flow = workflow_engine.build_flow(flow_name, 'linear')
plugins = provider.load_plugins()
parameters = {}
parameters.update(plan.get('parameters', {}))
parameters['checkpoint'] = checkpoint
parameters['checkpoint_copy'] = checkpoint_copy
parameters['operation_log'] = operation_log
resources_task_flow = resource_flow.build_resource_flow(
operation_type=constants.OPERATION_COPY,
context=context,
workflow_engine=workflow_engine,
resource_graph=resource_graph,
plugins=plugins,
parameters=parameters,
)
store_dict = {'context': context,
'checkpoint': checkpoint,
'checkpoint_copy': checkpoint_copy,
'operation_log': operation_log
}
workflow_engine.add_tasks(
copy_flow,
InitiateCopyTask(name='InitiateCopyTask_'+checkpoint_copy.id,
inject=store_dict),
resources_task_flow,
CompleteCopyTask(name='CompleteCopyTask_'+checkpoint_copy.id,
inject=store_dict),
)
return copy_flow
def get_flows(context, protectable_registry, workflow_engine, plan, provider,
checkpoints, checkpoint_collection):
checkpoints_protect_copy = prepare_create_flows(
context, plan, checkpoints, checkpoint_collection)
copy_flows = create_flows(
context, protectable_registry, workflow_engine, plan, provider,
checkpoints_protect_copy, checkpoint_collection)
return copy_flows, checkpoints_protect_copy
def prepare_create_flows(context, plan, checkpoints, checkpoint_collection):
LOG.debug("Creating checkpoint copy for plan. plan: %s", plan.id)
checkpoints_protect_copy = []
for checkpoint in checkpoints:
extra_info = checkpoint.get("extra_info", None)
copy_status = None
if extra_info:
extra_info = jsonutils.loads(extra_info)
copy_status = extra_info.get('copy_status', None)
if (checkpoint.get("status") !=
constants.CHECKPOINT_STATUS_AVAILABLE) or (
copy_status ==
constants.CHECKPOINT_STATUS_COPY_FINISHED):
continue
checkpoint_dict = {
'project_id': context.project_id,
'status': constants.CHECKPOINT_STATUS_WAIT_COPYING,
'provider_id': checkpoint.get("provider_id"),
"protection_plan": checkpoint.get("protection_plan"),
"extra_info": {}
}
checkpoint_copy = checkpoint_collection.create(plan,
checkpoint_dict)
checkpoint_protect_copy = {
'checkpoint_protect_id': checkpoint.get("id"),
'checkpoint_copy_id': checkpoint_copy.id
}
checkpoints_protect_copy.append(checkpoint_protect_copy)
LOG.debug("The protect and copy checkpoints . checkpoints_copy: %s",
checkpoints_protect_copy)
return checkpoints_protect_copy
def create_flows(context, protectable_registry, workflow_engine,
plan, provider, checkpoints_protect_copy,
checkpoint_collection):
LOG.debug("Creating flows for the plan. checkpoints: %s",
checkpoints_protect_copy)
flow_name = "Copy_flows" + plan.get('id')
copy_flows = workflow_engine.build_flow(flow_name, 'linear')
for checkpoint_protect_copy in checkpoints_protect_copy:
checkpoint_protect_id = checkpoint_protect_copy.get(
"checkpoint_protect_id")
checkpoint_copy_id = checkpoint_protect_copy.get(
"checkpoint_copy_id")
checkpoint_protect = checkpoint_collection.get(checkpoint_protect_id)
checkpoint_copy = checkpoint_collection.get(checkpoint_copy_id)
try:
copy_flow = get_flow(
context,
protectable_registry,
workflow_engine,
plan,
provider,
checkpoint_protect,
checkpoint_copy,
)
except Exception as e:
LOG.exception("Failed to create copy flow, checkpoint: %s",
checkpoint_protect_id)
raise exception.FlowError(
flow="copy",
error=e.msg if hasattr(e, 'msg') else 'Internal error')
workflow_engine.add_tasks(copy_flows, copy_flow)
flows_engine = workflow_engine.get_engine(copy_flows, store={
'context': context
})
LOG.debug("Creating flows for the plan. copy_flows: %s", copy_flows)
return flows_engine

View File

@ -22,7 +22,7 @@ from oslo_utils import timeutils
LOG = logging.getLogger(__name__)
def create_operation_log(context, checkpoint):
def create_operation_log(context, checkpoint, operation_type=None):
checkpoint_dict = checkpoint.to_dict()
extra_info = checkpoint_dict.get('extra_info', None)
scheduled_operation_id = None
@ -41,7 +41,9 @@ def create_operation_log(context, checkpoint):
provider_id = protection_plan.get("provider_id")
operation_log_properties = {
'project_id': checkpoint_dict['project_id'],
'operation_type': constants.OPERATION_PROTECT,
'operation_type': (
constants.OPERATION_PROTECT if operation_type is None
else operation_type),
'checkpoint_id': checkpoint_dict['id'],
'plan_id': plan_id,
'provider_id': provider_id,

View File

@ -17,6 +17,7 @@ from oslo_utils import importutils
from karbor.common import constants
from karbor import exception
from karbor.services.protection.flows import copy as flow_copy
from karbor.services.protection.flows import delete as flow_delete
from karbor.services.protection.flows import protect as flow_protect
from karbor.services.protection.flows import restore as flow_restore
@ -90,6 +91,20 @@ class Worker(object):
checkpoint,
provider,
)
elif operation_type == constants.OPERATION_COPY:
plan = kwargs.get('plan', None)
protectable_registry = kwargs.get('protectable_registry', None)
checkpoint_collection = kwargs.get('checkpoint_collection', None)
flow, checkpoint_copy = flow_copy.get_flows(
context,
protectable_registry,
self.workflow_engine,
plan,
provider,
checkpoint,
checkpoint_collection,
)
return flow, checkpoint_copy
else:
raise exception.InvalidParameterValue(
err='unknown operation type %s' % operation_type

View File

@ -130,6 +130,53 @@ class ProtectionManager(manager.Manager):
self._spawn(self.worker.run_flow, flow)
return checkpoint.id
@messaging.expected_exceptions(exception.InvalidPlan,
exception.ProviderNotFound,
exception.FlowError)
def copy(self, context, plan):
"""create copy of checkpoint for the given plan
:param plan: Define that protection plan should be done
"""
LOG.info("Starting protection service:copy action.")
LOG.debug("Creating the checkpoint copy for the plan: %s", plan)
if not plan:
raise exception.InvalidPlan(
reason=_('The protection plan is None'))
provider_id = plan.get('provider_id', None)
plan_id = plan.get('id', None)
provider = self.provider_registry.show_provider(provider_id)
checkpoints = None
checkpoint_collection = provider.get_checkpoint_collection()
try:
checkpoints = self.list_checkpoints(context, provider_id,
filters={'plan_id': plan_id})
except Exception as e:
LOG.exception("Failed to get checkpoints for the plan: %s",
plan_id)
exc = exception.FlowError(flow="copy",
error="Failed to get checkpoints")
six.raise_from(exc, e)
try:
flow, checkpoint_copy = self.worker.get_flow(
context=context,
protectable_registry=self.protectable_registry,
operation_type=constants.OPERATION_COPY,
plan=plan,
provider=provider,
checkpoint=checkpoints,
checkpoint_collection=checkpoint_collection)
except Exception as e:
LOG.exception("Failed to create copy flow, plan: %s",
plan_id)
raise exception.FlowError(
flow="copy",
error=e.msg if hasattr(e, 'msg') else 'Internal error')
self._spawn(self.worker.run_flow, flow)
return checkpoint_copy
@messaging.expected_exceptions(exception.ProviderNotFound,
exception.CheckpointNotFound,
exception.CheckpointNotAvailable,

View File

@ -90,6 +90,13 @@ class ProtectionPlugin(object):
"""
raise NotImplementedError
def get_copy_operation(self, resource):
"""Returns the copy Operation for this resource
:returns: Operation for the resource
"""
raise NotImplementedError
def get_delete_operation(self, resource):
"""Returns the delete Operation for this resource

View File

@ -42,6 +42,7 @@ ResourceHooks = namedtuple('ResourceHooks', [
OPERATION_EXTRA_ARGS = {
constants.OPERATION_RESTORE: ['restore', 'new_resources'],
constants.OPERATION_VERIFY: ['verify', 'new_resources'],
constants.OPERATION_COPY: ['checkpoint', 'checkpoint_copy'],
}
@ -99,6 +100,14 @@ class ResourceFlowGraphWalkerListener(graph.GraphWalkerListener):
'parameters': parameters,
'resource': resource,
}
if self.operation_type == constants.OPERATION_COPY:
injects['checkpoint'] = self.parameters.get(
'checkpoint')
injects['checkpoint_copy'] = self.parameters.get(
'checkpoint_copy')
injects['operation_log'] = self.parameters.get(
'operation_log')
requires = OPERATION_EXTRA_ARGS.get(self.operation_type, [])
requires.append('operation_log')
task = self.workflow_engine.create_task(method,

View File

@ -67,6 +67,13 @@ class ProtectionAPI(object):
plan=plan,
checkpoint_properties=checkpoint_properties)
def copy(self, ctxt, plan=None):
cctxt = self.client.prepare(version='1.0')
return cctxt.call(
ctxt,
'copy',
plan=plan)
def delete(self, ctxt, provider_id, checkpoint_id):
cctxt = self.client.prepare(version='1.0')
return cctxt.call(

View File

@ -86,6 +86,8 @@ class ResourceFlowTest(base.TestCase):
elif operation == constants.OPERATION_VERIFY:
kwargs['new_resources'] = {}
kwargs['verify'] = None
elif operation == constants.OPERATION_COPY:
kwargs['checkpoint_copy'] = None
self._walk_operation(mock_protection, operation, **kwargs)
@mock.patch('karbor.tests.unit.protection.fakes.FakeProtectionPlugin')
@ -105,6 +107,8 @@ class ResourceFlowTest(base.TestCase):
elif operation == constants.OPERATION_VERIFY:
kwargs['new_resources'] = {}
kwargs['verify'] = None
elif operation == constants.OPERATION_COPY:
kwargs['checkpoint_copy'] = None
self._walk_operation(mock_protection, operation, **kwargs)
self.assertEqual(mock_operation.on_prepare_begin.call_count,
@ -133,6 +137,8 @@ class ResourceFlowTest(base.TestCase):
self.assertEqual(v, result[k])
for operation in constants.OPERATION_TYPES:
if operation == constants.OPERATION_COPY:
continue
fake_operation = fakes.FakeOperation()
get_operation_attr = 'get_{}_operation'.format(operation)
getattr(
@ -153,6 +159,8 @@ class ResourceFlowTest(base.TestCase):
elif operation == constants.OPERATION_VERIFY:
kwargs['new_resources'] = {}
kwargs['verify'] = None
elif operation == constants.OPERATION_COPY:
kwargs['checkpoint_copy'] = None
self._walk_operation(mock_protection, operation,
parameters=parameters, **kwargs)