From cf1fb3610d8b1c2929431897f99cf573e83f38ac Mon Sep 17 00:00:00 2001 From: chenying Date: Fri, 10 Nov 2017 15:22:10 +0800 Subject: [PATCH] Add the checkpoint copy API for Karbor Change-Id: I203733f9e810e16bcaba8e3ac7d11d06e4347372 Implements: blueprint support-copy-the-checkpoint-api --- karbor/api/v1/copies.py | 126 ++++++++++++ karbor/api/v1/router.py | 8 + karbor/common/constants.py | 7 +- karbor/policies/__init__.py | 2 + karbor/policies/copies.py | 39 ++++ karbor/services/protection/api.py | 3 + karbor/services/protection/checkpoint.py | 14 +- karbor/services/protection/flows/copy.py | 191 ++++++++++++++++++ karbor/services/protection/flows/utils.py | 6 +- karbor/services/protection/flows/worker.py | 15 ++ karbor/services/protection/manager.py | 47 +++++ .../services/protection/protection_plugin.py | 7 + karbor/services/protection/resource_flow.py | 9 + karbor/services/protection/rpcapi.py | 7 + .../unit/protection/test_resource_flow.py | 8 + 15 files changed, 485 insertions(+), 4 deletions(-) create mode 100644 karbor/api/v1/copies.py create mode 100644 karbor/policies/copies.py create mode 100644 karbor/services/protection/flows/copy.py diff --git a/karbor/api/v1/copies.py b/karbor/api/v1/copies.py new file mode 100644 index 00000000..2f11b3a1 --- /dev/null +++ b/karbor/api/v1/copies.py @@ -0,0 +1,126 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""The copy api.""" + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import uuidutils + +from webob import exc + +from karbor.api import common +from karbor.api.openstack import wsgi +from karbor import exception +from karbor.i18n import _ + +from karbor import objects +from karbor.policies import copies as copy_policy +from karbor.services.protection import api as protection_api + +CONF = cfg.CONF + +LOG = logging.getLogger(__name__) + + +class CopiesViewBuilder(common.ViewBuilder): + """Model a server API response as a python dictionary.""" + + def detail(self, request, copy): + """Detailed view of a single copy.""" + copy_ref = { + 'copy': { + 'project_id': copy.get('project_id'), + 'provider_id': copy.get('provider_id'), + 'plan_id': copy.get('plan_id'), + 'checkpoint_id': copy.get('checkpoint_id'), + 'parameters': copy.get('parameters'), + } + } + return copy_ref + + +class CopiesController(wsgi.Controller): + """The copy API controller for the OpenStack API.""" + + _view_builder_class = CopiesViewBuilder + + def __init__(self): + self.protection_api = protection_api.API() + super(CopiesController, self).__init__() + + def create(self, req, provider_id, body): + """Creates a new copy.""" + if not self.is_valid_body(body, 'copy'): + raise exc.HTTPUnprocessableEntity() + + LOG.debug('Create copy request body: %s', body) + context = req.environ['karbor.context'] + context.can(copy_policy.CREATE_POLICY) + copy = body['copy'] + plan_id = copy.get("plan_id", None) + + if not uuidutils.is_uuid_like(plan_id): + msg = _("Invalid plan id provided.") + raise exception.InvalidInput(reason=msg) + + if not uuidutils.is_uuid_like(provider_id): + msg = _("Invalid provider id provided.") + raise exception.InvalidInput(reason=msg) + + parameters = copy.get("parameters", None) + if parameters: + if not isinstance(parameters, dict): + msg = _("The parameters must be a dict when creating" + " a copy.") + raise exception.InvalidInput(reason=msg) + + try: + plan = objects.Plan.get_by_id(context, plan_id) + except exception.PlanNotFound as error: + raise exc.HTTPNotFound(explanation=error.msg) + + if provider_id != plan.provider_id: + msg = _("The provider id is not the same as the value " + "in the plan.") + raise exception.InvalidInput(reason=msg) + + filters = {'plan_id': plan_id} + checkpoints = self.protection_api.list_checkpoints( + context, provider_id, marker=None, limit=None, + sort_keys=None, sort_dirs=None, filters=filters, offset=None) + + if not checkpoints: + msg = _("The plan has not been protected.") + raise exception.InvalidInput(reason=msg) + + plan.parameters.update(parameters) + try: + checkpoint_copy = self.protection_api.copy(context, plan) + except Exception: + LOG.exception("Failed to create checkpoint copies.") + raise + + copy = { + 'project_id': context.project_id, + 'provider_id': plan.provider_id, + 'plan_id': plan.id, + 'checkpoint_id': checkpoint_copy, + 'parameters': parameters + } + + retval = self._view_builder.detail(req, copy) + return retval + + +def create_resource(): + return wsgi.Resource(CopiesController()) diff --git a/karbor/api/v1/router.py b/karbor/api/v1/router.py index 32e5fe7f..a39159d6 100644 --- a/karbor/api/v1/router.py +++ b/karbor/api/v1/router.py @@ -13,6 +13,7 @@ from oslo_service import wsgi as base_wsgi from karbor.api.openstack import ProjectMapper +from karbor.api.v1 import copies from karbor.api.v1 import operation_logs from karbor.api.v1 import plans from karbor.api.v1 import protectables @@ -43,6 +44,7 @@ class APIRouter(base_wsgi.Router): service_resources = services.create_resource() quota_resources = quotas.create_resource() quota_class_resources = quota_classes.create_resource() + copy_resources = copies.create_resource() mapper.resource("plan", "plans", controller=plans_resources, @@ -132,4 +134,10 @@ class APIRouter(base_wsgi.Router): controller=quota_class_resources, collection={}, member={'action': 'POST'}) + mapper.connect("copy", + "/{project_id}/providers/{provider_id}/checkpoints/" + "action", + controller=copy_resources, + action='create', + conditions={"method": ['POST']}) super(APIRouter, self).__init__(mapper) diff --git a/karbor/common/constants.py b/karbor/common/constants.py index 3c9b997a..c7136bb7 100644 --- a/karbor/common/constants.py +++ b/karbor/common/constants.py @@ -16,11 +16,13 @@ OPERATION_TYPES = ( OPERATION_RESTORE, OPERATION_DELETE, OPERATION_VERIFY, + OPERATION_COPY, ) = ( 'protect', 'restore', 'delete', - 'verify' + 'verify', + 'copy' ) @@ -66,6 +68,9 @@ PLAN_STATUS_STARTED = 'started' CHECKPOINT_STATUS_ERROR = 'error' CHECKPOINT_STATUS_PROTECTING = 'protecting' +CHECKPOINT_STATUS_WAIT_COPYING = 'wait_copying' +CHECKPOINT_STATUS_COPYING = 'copying' +CHECKPOINT_STATUS_COPY_FINISHED = 'finished' CHECKPOINT_STATUS_AVAILABLE = 'available' CHECKPOINT_STATUS_DELETING = 'deleting' CHECKPOINT_STATUS_DELETED = 'deleted' diff --git a/karbor/policies/__init__.py b/karbor/policies/__init__.py index 36f710dd..e19f287a 100644 --- a/karbor/policies/__init__.py +++ b/karbor/policies/__init__.py @@ -15,6 +15,7 @@ import itertools from karbor.policies import base +from karbor.policies import copies from karbor.policies import operation_logs from karbor.policies import plans from karbor.policies import protectables @@ -42,4 +43,5 @@ def list_rules(): services.list_rules(), quotas.list_rules(), quota_classes.list_rules(), + copies.list_rules(), ) diff --git a/karbor/policies/copies.py b/karbor/policies/copies.py new file mode 100644 index 00000000..1be24c79 --- /dev/null +++ b/karbor/policies/copies.py @@ -0,0 +1,39 @@ +# Copyright (c) 2017 Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_policy import policy + +from karbor.policies import base + + +CREATE_POLICY = 'copy:create' + +copies_policies = [ + policy.DocumentedRuleDefault( + name=CREATE_POLICY, + check_str=base.RULE_ADMIN_OR_OWNER, + description='Create a copy.', + operations=[ + { + 'method': 'POST', + 'path': '/{project_id}/providers/{provider_id}/' + 'checkpoints/action' + } + ]), +] + + +def list_rules(): + return copies_policies diff --git a/karbor/services/protection/api.py b/karbor/services/protection/api.py index 9fc6d4e2..369d067e 100644 --- a/karbor/services/protection/api.py +++ b/karbor/services/protection/api.py @@ -34,6 +34,9 @@ class API(base.Base): return self.protection_rpcapi.protect(context, plan, checkpoint_properties) + def copy(self, context, plan): + return self.protection_rpcapi.copy(context, plan) + def delete(self, context, provider_id, checkpoint_id): return self.protection_rpcapi.delete( context, diff --git a/karbor/services/protection/checkpoint.py b/karbor/services/protection/checkpoint.py index f6005d29..e4e5873c 100644 --- a/karbor/services/protection/checkpoint.py +++ b/karbor/services/protection/checkpoint.py @@ -73,6 +73,10 @@ class Checkpoint(object): # TODO(saggi): check for valid values and transitions return self._md_cache["status"] + @property + def extra_info(self): + return self._md_cache["extra_info"] + @property def project_id(self): return self._md_cache["project_id"] @@ -100,6 +104,10 @@ class Checkpoint(object): def status(self, value): self._md_cache["status"] = value + @extra_info.setter + def extra_info(self, value): + self._md_cache["extra_info"] = value + @resource_graph.setter def resource_graph(self, resource_graph): serialized_resource_graph = graph.serialize_resource_graph( @@ -170,14 +178,18 @@ class Checkpoint(object): provider_id = plan.get("provider_id") project_id = plan.get("project_id") extra_info = None + checkpoint_status = constants.CHECKPOINT_STATUS_PROTECTING if checkpoint_properties: extra_info = checkpoint_properties.get("extra_info", None) + status = checkpoint_properties.get("status", None) + if status: + checkpoint_status = status checkpoint_section.update_object( key=_INDEX_FILE_NAME, value={ "version": cls.VERSION, "id": checkpoint_id, - "status": constants.CHECKPOINT_STATUS_PROTECTING, + "status": checkpoint_status, "owner_id": owner_id, "provider_id": provider_id, "project_id": project_id, diff --git a/karbor/services/protection/flows/copy.py b/karbor/services/protection/flows/copy.py new file mode 100644 index 00000000..96a00563 --- /dev/null +++ b/karbor/services/protection/flows/copy.py @@ -0,0 +1,191 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from karbor.common import constants +from karbor import exception +from karbor.resource import Resource +from karbor.services.protection.flows import utils +from karbor.services.protection import resource_flow +from oslo_log import log as logging +from oslo_utils import timeutils + +from oslo_serialization import jsonutils + +from taskflow import task + +LOG = logging.getLogger(__name__) + + +class InitiateCopyTask(task.Task): + def execute(self, context, checkpoint, checkpoint_copy, operation_log, + *args, **kwargs): + LOG.debug("Initiate copy checkpoint_id: %s", checkpoint_copy.id) + checkpoint_copy.status = constants.CHECKPOINT_STATUS_COPYING + checkpoint_copy.commit() + update_fields = {"status": checkpoint_copy.status} + utils.update_operation_log(context, operation_log, update_fields) + + def revert(self, context, checkpoint, checkpoint_copy, operation_log, + *args, **kwargs): + LOG.debug("Failed to copy checkpoint_id: %s", checkpoint_copy.id) + checkpoint_copy.status = constants.CHECKPOINT_STATUS_ERROR + checkpoint_copy.commit() + update_fields = { + "status": checkpoint_copy.status, + "ended_at": timeutils.utcnow() + } + utils.update_operation_log(context, operation_log, update_fields) + + +class CompleteCopyTask(task.Task): + def execute(self, context, checkpoint, checkpoint_copy, operation_log): + LOG.debug("Complete copy checkpoint_id: %s", checkpoint_copy.id) + checkpoint_copy.status = constants.CHECKPOINT_STATUS_AVAILABLE + if checkpoint_copy.extra_info: + extra_info = jsonutils.loads(checkpoint_copy.extra_info) + extra_info['copy_status'] = \ + constants.CHECKPOINT_STATUS_COPY_FINISHED + else: + extra_info = { + 'copy_status': constants.CHECKPOINT_STATUS_COPY_FINISHED} + checkpoint_copy.extra_info = jsonutils.dumps(extra_info) + checkpoint_copy.commit() + update_fields = { + "status": checkpoint_copy.status, + "ended_at": timeutils.utcnow() + } + utils.update_operation_log(context, operation_log, update_fields) + + +def get_flow(context, protectable_registry, workflow_engine, plan, provider, + checkpoint, checkpoint_copy): + resources = set(Resource(**item) for item in plan.get("resources")) + resource_graph = protectable_registry.build_graph(context, + resources) + checkpoint_copy.resource_graph = resource_graph + checkpoint_copy.commit() + operation_log = utils.create_operation_log(context, checkpoint_copy, + constants.OPERATION_COPY) + flow_name = "Copy_" + plan.get('id')+checkpoint.id + copy_flow = workflow_engine.build_flow(flow_name, 'linear') + plugins = provider.load_plugins() + parameters = {} + parameters.update(plan.get('parameters', {})) + parameters['checkpoint'] = checkpoint + parameters['checkpoint_copy'] = checkpoint_copy + parameters['operation_log'] = operation_log + resources_task_flow = resource_flow.build_resource_flow( + operation_type=constants.OPERATION_COPY, + context=context, + workflow_engine=workflow_engine, + resource_graph=resource_graph, + plugins=plugins, + parameters=parameters, + ) + store_dict = {'context': context, + 'checkpoint': checkpoint, + 'checkpoint_copy': checkpoint_copy, + 'operation_log': operation_log + } + workflow_engine.add_tasks( + copy_flow, + InitiateCopyTask(name='InitiateCopyTask_'+checkpoint_copy.id, + inject=store_dict), + resources_task_flow, + CompleteCopyTask(name='CompleteCopyTask_'+checkpoint_copy.id, + inject=store_dict), + ) + return copy_flow + + +def get_flows(context, protectable_registry, workflow_engine, plan, provider, + checkpoints, checkpoint_collection): + checkpoints_protect_copy = prepare_create_flows( + context, plan, checkpoints, checkpoint_collection) + + copy_flows = create_flows( + context, protectable_registry, workflow_engine, plan, provider, + checkpoints_protect_copy, checkpoint_collection) + + return copy_flows, checkpoints_protect_copy + + +def prepare_create_flows(context, plan, checkpoints, checkpoint_collection): + LOG.debug("Creating checkpoint copy for plan. plan: %s", plan.id) + checkpoints_protect_copy = [] + for checkpoint in checkpoints: + extra_info = checkpoint.get("extra_info", None) + copy_status = None + if extra_info: + extra_info = jsonutils.loads(extra_info) + copy_status = extra_info.get('copy_status', None) + if (checkpoint.get("status") != + constants.CHECKPOINT_STATUS_AVAILABLE) or ( + copy_status == + constants.CHECKPOINT_STATUS_COPY_FINISHED): + continue + checkpoint_dict = { + 'project_id': context.project_id, + 'status': constants.CHECKPOINT_STATUS_WAIT_COPYING, + 'provider_id': checkpoint.get("provider_id"), + "protection_plan": checkpoint.get("protection_plan"), + "extra_info": {} + } + checkpoint_copy = checkpoint_collection.create(plan, + checkpoint_dict) + checkpoint_protect_copy = { + 'checkpoint_protect_id': checkpoint.get("id"), + 'checkpoint_copy_id': checkpoint_copy.id + } + checkpoints_protect_copy.append(checkpoint_protect_copy) + LOG.debug("The protect and copy checkpoints . checkpoints_copy: %s", + checkpoints_protect_copy) + return checkpoints_protect_copy + + +def create_flows(context, protectable_registry, workflow_engine, + plan, provider, checkpoints_protect_copy, + checkpoint_collection): + LOG.debug("Creating flows for the plan. checkpoints: %s", + checkpoints_protect_copy) + flow_name = "Copy_flows" + plan.get('id') + copy_flows = workflow_engine.build_flow(flow_name, 'linear') + for checkpoint_protect_copy in checkpoints_protect_copy: + checkpoint_protect_id = checkpoint_protect_copy.get( + "checkpoint_protect_id") + checkpoint_copy_id = checkpoint_protect_copy.get( + "checkpoint_copy_id") + checkpoint_protect = checkpoint_collection.get(checkpoint_protect_id) + checkpoint_copy = checkpoint_collection.get(checkpoint_copy_id) + try: + copy_flow = get_flow( + context, + protectable_registry, + workflow_engine, + plan, + provider, + checkpoint_protect, + checkpoint_copy, + ) + except Exception as e: + LOG.exception("Failed to create copy flow, checkpoint: %s", + checkpoint_protect_id) + raise exception.FlowError( + flow="copy", + error=e.msg if hasattr(e, 'msg') else 'Internal error') + workflow_engine.add_tasks(copy_flows, copy_flow) + flows_engine = workflow_engine.get_engine(copy_flows, store={ + 'context': context + }) + LOG.debug("Creating flows for the plan. copy_flows: %s", copy_flows) + + return flows_engine diff --git a/karbor/services/protection/flows/utils.py b/karbor/services/protection/flows/utils.py index 55cf2b2c..b6dc0ced 100644 --- a/karbor/services/protection/flows/utils.py +++ b/karbor/services/protection/flows/utils.py @@ -22,7 +22,7 @@ from oslo_utils import timeutils LOG = logging.getLogger(__name__) -def create_operation_log(context, checkpoint): +def create_operation_log(context, checkpoint, operation_type=None): checkpoint_dict = checkpoint.to_dict() extra_info = checkpoint_dict.get('extra_info', None) scheduled_operation_id = None @@ -41,7 +41,9 @@ def create_operation_log(context, checkpoint): provider_id = protection_plan.get("provider_id") operation_log_properties = { 'project_id': checkpoint_dict['project_id'], - 'operation_type': constants.OPERATION_PROTECT, + 'operation_type': ( + constants.OPERATION_PROTECT if operation_type is None + else operation_type), 'checkpoint_id': checkpoint_dict['id'], 'plan_id': plan_id, 'provider_id': provider_id, diff --git a/karbor/services/protection/flows/worker.py b/karbor/services/protection/flows/worker.py index 18c117be..58822947 100644 --- a/karbor/services/protection/flows/worker.py +++ b/karbor/services/protection/flows/worker.py @@ -17,6 +17,7 @@ from oslo_utils import importutils from karbor.common import constants from karbor import exception +from karbor.services.protection.flows import copy as flow_copy from karbor.services.protection.flows import delete as flow_delete from karbor.services.protection.flows import protect as flow_protect from karbor.services.protection.flows import restore as flow_restore @@ -90,6 +91,20 @@ class Worker(object): checkpoint, provider, ) + elif operation_type == constants.OPERATION_COPY: + plan = kwargs.get('plan', None) + protectable_registry = kwargs.get('protectable_registry', None) + checkpoint_collection = kwargs.get('checkpoint_collection', None) + flow, checkpoint_copy = flow_copy.get_flows( + context, + protectable_registry, + self.workflow_engine, + plan, + provider, + checkpoint, + checkpoint_collection, + ) + return flow, checkpoint_copy else: raise exception.InvalidParameterValue( err='unknown operation type %s' % operation_type diff --git a/karbor/services/protection/manager.py b/karbor/services/protection/manager.py index c76c696c..e32ab106 100644 --- a/karbor/services/protection/manager.py +++ b/karbor/services/protection/manager.py @@ -130,6 +130,53 @@ class ProtectionManager(manager.Manager): self._spawn(self.worker.run_flow, flow) return checkpoint.id + @messaging.expected_exceptions(exception.InvalidPlan, + exception.ProviderNotFound, + exception.FlowError) + def copy(self, context, plan): + """create copy of checkpoint for the given plan + + :param plan: Define that protection plan should be done + """ + + LOG.info("Starting protection service:copy action.") + LOG.debug("Creating the checkpoint copy for the plan: %s", plan) + + if not plan: + raise exception.InvalidPlan( + reason=_('The protection plan is None')) + provider_id = plan.get('provider_id', None) + plan_id = plan.get('id', None) + provider = self.provider_registry.show_provider(provider_id) + checkpoints = None + checkpoint_collection = provider.get_checkpoint_collection() + try: + checkpoints = self.list_checkpoints(context, provider_id, + filters={'plan_id': plan_id}) + except Exception as e: + LOG.exception("Failed to get checkpoints for the plan: %s", + plan_id) + exc = exception.FlowError(flow="copy", + error="Failed to get checkpoints") + six.raise_from(exc, e) + try: + flow, checkpoint_copy = self.worker.get_flow( + context=context, + protectable_registry=self.protectable_registry, + operation_type=constants.OPERATION_COPY, + plan=plan, + provider=provider, + checkpoint=checkpoints, + checkpoint_collection=checkpoint_collection) + except Exception as e: + LOG.exception("Failed to create copy flow, plan: %s", + plan_id) + raise exception.FlowError( + flow="copy", + error=e.msg if hasattr(e, 'msg') else 'Internal error') + self._spawn(self.worker.run_flow, flow) + return checkpoint_copy + @messaging.expected_exceptions(exception.ProviderNotFound, exception.CheckpointNotFound, exception.CheckpointNotAvailable, diff --git a/karbor/services/protection/protection_plugin.py b/karbor/services/protection/protection_plugin.py index 4e386656..075941ff 100644 --- a/karbor/services/protection/protection_plugin.py +++ b/karbor/services/protection/protection_plugin.py @@ -90,6 +90,13 @@ class ProtectionPlugin(object): """ raise NotImplementedError + def get_copy_operation(self, resource): + """Returns the copy Operation for this resource + + :returns: Operation for the resource + """ + raise NotImplementedError + def get_delete_operation(self, resource): """Returns the delete Operation for this resource diff --git a/karbor/services/protection/resource_flow.py b/karbor/services/protection/resource_flow.py index b9cd0aa2..e6166773 100644 --- a/karbor/services/protection/resource_flow.py +++ b/karbor/services/protection/resource_flow.py @@ -42,6 +42,7 @@ ResourceHooks = namedtuple('ResourceHooks', [ OPERATION_EXTRA_ARGS = { constants.OPERATION_RESTORE: ['restore', 'new_resources'], constants.OPERATION_VERIFY: ['verify', 'new_resources'], + constants.OPERATION_COPY: ['checkpoint', 'checkpoint_copy'], } @@ -99,6 +100,14 @@ class ResourceFlowGraphWalkerListener(graph.GraphWalkerListener): 'parameters': parameters, 'resource': resource, } + if self.operation_type == constants.OPERATION_COPY: + injects['checkpoint'] = self.parameters.get( + 'checkpoint') + injects['checkpoint_copy'] = self.parameters.get( + 'checkpoint_copy') + injects['operation_log'] = self.parameters.get( + 'operation_log') + requires = OPERATION_EXTRA_ARGS.get(self.operation_type, []) requires.append('operation_log') task = self.workflow_engine.create_task(method, diff --git a/karbor/services/protection/rpcapi.py b/karbor/services/protection/rpcapi.py index 37a4d57a..8e3de813 100644 --- a/karbor/services/protection/rpcapi.py +++ b/karbor/services/protection/rpcapi.py @@ -67,6 +67,13 @@ class ProtectionAPI(object): plan=plan, checkpoint_properties=checkpoint_properties) + def copy(self, ctxt, plan=None): + cctxt = self.client.prepare(version='1.0') + return cctxt.call( + ctxt, + 'copy', + plan=plan) + def delete(self, ctxt, provider_id, checkpoint_id): cctxt = self.client.prepare(version='1.0') return cctxt.call( diff --git a/karbor/tests/unit/protection/test_resource_flow.py b/karbor/tests/unit/protection/test_resource_flow.py index 9edf5ccf..ba26b043 100644 --- a/karbor/tests/unit/protection/test_resource_flow.py +++ b/karbor/tests/unit/protection/test_resource_flow.py @@ -86,6 +86,8 @@ class ResourceFlowTest(base.TestCase): elif operation == constants.OPERATION_VERIFY: kwargs['new_resources'] = {} kwargs['verify'] = None + elif operation == constants.OPERATION_COPY: + kwargs['checkpoint_copy'] = None self._walk_operation(mock_protection, operation, **kwargs) @mock.patch('karbor.tests.unit.protection.fakes.FakeProtectionPlugin') @@ -105,6 +107,8 @@ class ResourceFlowTest(base.TestCase): elif operation == constants.OPERATION_VERIFY: kwargs['new_resources'] = {} kwargs['verify'] = None + elif operation == constants.OPERATION_COPY: + kwargs['checkpoint_copy'] = None self._walk_operation(mock_protection, operation, **kwargs) self.assertEqual(mock_operation.on_prepare_begin.call_count, @@ -133,6 +137,8 @@ class ResourceFlowTest(base.TestCase): self.assertEqual(v, result[k]) for operation in constants.OPERATION_TYPES: + if operation == constants.OPERATION_COPY: + continue fake_operation = fakes.FakeOperation() get_operation_attr = 'get_{}_operation'.format(operation) getattr( @@ -153,6 +159,8 @@ class ResourceFlowTest(base.TestCase): elif operation == constants.OPERATION_VERIFY: kwargs['new_resources'] = {} kwargs['verify'] = None + elif operation == constants.OPERATION_COPY: + kwargs['checkpoint_copy'] = None self._walk_operation(mock_protection, operation, parameters=parameters, **kwargs)