Add operation_log API for karbor

This API is a new one. After this API is merged to karborclient.
I will submit another patch about fullstack of it.

Change-Id: Idf67d5fdc5c2c4bb960212dae4c18294109a6fbf
blueprint: operation-log-api
This commit is contained in:
chenying 2017-06-28 18:02:28 +08:00
parent a7b2eb6432
commit 919333722a
13 changed files with 523 additions and 28 deletions

View File

@ -35,5 +35,8 @@
"scheduled_operation:create": "",
"scheduled_operation:delete": "rule:admin_or_owner",
"scheduled_operation:get": "rule:admin_or_owner",
"scheduled_operation:list": ""
"scheduled_operation:list": "",
"operation_log:get": "rule:admin_or_owner",
"operation_log:get_all": "rule:admin_or_owner"
}

View File

@ -0,0 +1,243 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""The operation_logs api."""
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import uuidutils
from webob import exc
import karbor
from karbor.api import common
from karbor.api.openstack import wsgi
from karbor import exception
from karbor.i18n import _
from karbor import objects
from karbor.objects import base as objects_base
import karbor.policy
from karbor.services.operationengine import api as operationengine_api
from karbor.services.protection import api as protection_api
from karbor import utils
import six
query_operation_log_filters_opt = cfg.ListOpt(
'query_operation_log_filters',
default=['checkpoint_id', 'plan_id', 'restore_id', 'status'],
help="Operation log filter options which "
"non-admin user could use to "
"query operation_logs. Default values "
"are: ['checkpoint_id', 'plan_id', 'restore_id', 'status']")
CONF = cfg.CONF
CONF.register_opt(query_operation_log_filters_opt)
LOG = logging.getLogger(__name__)
def check_policy(context, action, target_obj=None):
target = {
'project_id': context.project_id,
'user_id': context.user_id,
}
if isinstance(target_obj, objects_base.KarborObject):
# Turn object into dict so target.update can work
target.update(
target_obj.obj_to_primitive() or {})
else:
target.update(target_obj or {})
_action = 'operation_log:%s' % action
karbor.policy.enforce(context, _action, target)
class OperationLogViewBuilder(common.ViewBuilder):
"""Model a server API response as a python dictionary."""
_collection_name = "operation_logs"
def detail(self, request, operation_log):
"""Detailed view of a single operation_log."""
operation_log_ref = {
'operation_log': {
'id': operation_log.get('id'),
'operation_type': operation_log.get('operation_type'),
'checkpoint_id': operation_log.get('checkpoint_id'),
'plan_id': operation_log.get('plan_id'),
'provider_id': operation_log.get('provider_id'),
'restore_id': operation_log.get('restore_id'),
'scheduled_operation_id': operation_log.get(
'scheduled_operation_id'),
'status': operation_log.get('status'),
'started_at': operation_log.get('started_at'),
'ended_at': operation_log.get('ended_at'),
'error_info': operation_log.get('error_info'),
'extra_info': operation_log.get('extra_info'),
}
}
return operation_log_ref
def detail_list(self, request, operation_logs,
operation_log_count=None):
"""Detailed view of a list of operation_logs."""
return self._list_view(self.detail, request, operation_logs,
operation_log_count,
self._collection_name)
def _list_view(self, func, request, operation_logs,
operation_log_count,
coll_name=_collection_name):
"""Provide a view for a list of operation_logs.
"""
operation_logs_list = [func(
request, operation_log)['operation_log']
for operation_log in operation_logs]
operation_logs_links = self._get_collection_links(
request, operation_logs, coll_name, operation_log_count)
operation_logs_dict = {}
operation_logs_dict['operation_logs'] = operation_logs_list
if operation_logs_links:
operation_logs_dict['operation_logs_links'] = (
operation_logs_links)
return operation_logs_dict
class OperationLogsController(wsgi.Controller):
"""The operation_log API controller for the OpenStack API."""
_view_builder_class = OperationLogViewBuilder
def __init__(self):
self.operationengine_api = operationengine_api.API()
self.protection_api = protection_api.API()
super(OperationLogsController, self).__init__()
def show(self, req, id):
"""Return data about the given OperationLogs."""
context = req.environ['karbor.context']
LOG.info("Show operation log with id: %s", id, context=context)
if not uuidutils.is_uuid_like(id):
msg = _("Invalid operation log id provided.")
raise exc.HTTPBadRequest(explanation=msg)
try:
operation_log = self._operation_log_get(context, id)
except exception.OperationLogFound as error:
raise exc.HTTPNotFound(explanation=error.msg)
LOG.info("Show operation log request issued successfully.")
return self._view_builder.detail(req, operation_log)
def index(self, req):
"""Returns a list of operation_logs.
"""
context = req.environ['karbor.context']
LOG.info("Show operation log list", context=context)
params = req.params.copy()
marker, limit, offset = common.get_pagination_params(params)
sort_keys, sort_dirs = common.get_sort_params(params)
filters = params
utils.remove_invalid_filter_options(
context,
filters,
self._get_operation_log_filter_options())
utils.check_filters(filters)
operation_logs = self._get_all(context, marker, limit,
sort_keys=sort_keys,
sort_dirs=sort_dirs,
filters=filters,
offset=offset)
retval_operation_logs = self._view_builder.detail_list(
req, operation_logs)
LOG.info("Show operation_log list request issued "
"successfully.")
return retval_operation_logs
def _get_all(self, context, marker=None, limit=None, sort_keys=None,
sort_dirs=None, filters=None, offset=None):
check_policy(context, 'get_all')
if filters is None:
filters = {}
all_tenants = utils.get_bool_param('all_tenants', filters)
try:
if limit is not None:
limit = int(limit)
if limit < 0:
msg = _('limit param must be positive')
raise exception.InvalidInput(reason=msg)
except ValueError:
msg = _('limit param must be an integer')
raise exception.InvalidInput(reason=msg)
if filters:
LOG.debug("Searching by: %s.", six.text_type(filters))
if context.is_admin and all_tenants:
# Need to remove all_tenants to pass the filtering below.
del filters['all_tenants']
operation_logs = objects.OperationLogList.get_all(
context, marker, limit,
sort_keys=sort_keys,
sort_dirs=sort_dirs,
filters=filters,
offset=offset)
else:
operation_logs = objects.OperationLogList.get_all_by_project(
context, context.project_id, marker, limit,
sort_keys=sort_keys, sort_dirs=sort_dirs, filters=filters,
offset=offset)
LOG.info("Get all operation_logs completed successfully.")
return operation_logs
def _get_operation_log_filter_options(self):
"""Return operation_log search options allowed by non-admin."""
return CONF.query_operation_log_filters
def _operation_log_get(self, context, operation_log_id):
if not uuidutils.is_uuid_like(operation_log_id):
msg = _("Invalid operation_log id provided.")
raise exc.HTTPBadRequest(explanation=msg)
operation_log = objects.OperationLog.get_by_id(
context, operation_log_id)
try:
check_policy(context, 'get', operation_log)
except exception.PolicyNotAuthorized:
raise exception.OperationLogFound(
operation_log_id=operation_log_id)
LOG.info("Operation log info retrieved successfully.")
return operation_log
def create_resource():
return wsgi.Resource(OperationLogsController())

View File

@ -13,6 +13,7 @@
from oslo_service import wsgi as base_wsgi
from karbor.api.openstack import ProjectMapper
from karbor.api.v1 import operation_logs
from karbor.api.v1 import plans
from karbor.api.v1 import protectables
from karbor.api.v1 import providers
@ -33,6 +34,7 @@ class APIRouter(base_wsgi.Router):
providers_resources = providers.create_resource()
trigger_resources = triggers.create_resource()
scheduled_operation_resources = scheduled_operations.create_resource()
operation_log_resources = operation_logs.create_resource()
mapper.resource("plan", "plans",
controller=plans_resources,
@ -92,4 +94,8 @@ class APIRouter(base_wsgi.Router):
controller=scheduled_operation_resources,
collection={},
member={'action': 'POST'})
mapper.resource("operation_log", "operation_logs",
controller=operation_log_resources,
collection={},
member={})
super(APIRouter, self).__init__(mapper)

View File

@ -14,15 +14,11 @@
OPERATION_TYPES = (
OPERATION_PROTECT,
OPERATION_RESTORE,
OPERATION_DELETE,
OPERATION_START,
OPERATION_SUSPEND,
OPERATION_DELETE
) = (
'protect',
'restore',
'delete',
'start',
'suspend',
'delete'
)

View File

@ -258,6 +258,10 @@ class PlanNotFound(NotFound):
message = _("Plan %(plan_id)s could not be found.")
class OperationLogFound(NotFound):
message = _("Operation log %(operation_log_id)s could not be found.")
class RestoreNotFound(NotFound):
message = _("Restore %(restore_id)s could not be found.")

View File

@ -11,29 +11,44 @@
# under the License.
from karbor.common import constants
from karbor.services.protection.flows import utils
from karbor.services.protection import resource_flow
from oslo_log import log as logging
from oslo_utils import timeutils
from taskflow import task
LOG = logging.getLogger(__name__)
class InitiateDeleteTask(task.Task):
def execute(self, checkpoint, *args, **kwargs):
def execute(self, context, checkpoint, operation_log, *args, **kwargs):
LOG.debug("Initiate delete checkpoint_id: %s", checkpoint.id)
checkpoint.status = constants.CHECKPOINT_STATUS_DELETING
checkpoint.commit()
update_fields = {"status": checkpoint.status}
utils.update_operation_log(context, operation_log, update_fields)
def revert(self, checkpoint, *args, **kwargs):
def revert(self, context, checkpoint, operation_log, *args, **kwargs):
LOG.debug("Failed to delete checkpoint_id: %s", checkpoint.id)
checkpoint.status = constants.CHECKPOINT_STATUS_ERROR_DELETING
checkpoint.commit()
update_fields = {
"status": checkpoint.status,
"ended_at": timeutils.utcnow()
}
utils.update_operation_log(context, operation_log, update_fields)
class CompleteDeleteTask(task.Task):
def execute(self, checkpoint):
def execute(self, context, checkpoint, operation_log):
LOG.debug("Complete delete checkpoint_id: %s", checkpoint.id)
checkpoint.delete()
update_fields = {
"status": checkpoint.status,
"ended_at": timeutils.utcnow()
}
utils.update_operation_log(context, operation_log, update_fields)
def get_flow(context, workflow_engine, checkpoint, provider):
@ -41,6 +56,7 @@ def get_flow(context, workflow_engine, checkpoint, provider):
flow_name = "Delete_Checkpoint_" + checkpoint.id
delete_flow = workflow_engine.build_flow(flow_name, 'linear')
resource_graph = checkpoint.resource_graph
operation_log = utils.create_operation_log(context, checkpoint)
plugins = provider.load_plugins()
resources_task_flow = resource_flow.build_resource_flow(
operation_type=constants.OPERATION_DELETE,
@ -56,6 +72,11 @@ def get_flow(context, workflow_engine, checkpoint, provider):
resources_task_flow,
CompleteDeleteTask(),
)
flow_engine = workflow_engine.get_engine(delete_flow,
store={'checkpoint': checkpoint})
flow_engine = workflow_engine.get_engine(
delete_flow,
store={
'context': context,
'checkpoint': checkpoint,
'operation_log': operation_log}
)
return flow_engine

View File

@ -12,30 +12,45 @@
from karbor.common import constants
from karbor.resource import Resource
from karbor.services.protection.flows import utils
from karbor.services.protection import resource_flow
from oslo_log import log as logging
from oslo_utils import timeutils
from taskflow import task
LOG = logging.getLogger(__name__)
class InitiateProtectTask(task.Task):
def execute(self, checkpoint, *args, **kwargs):
def execute(self, context, checkpoint, operation_log, *args, **kwargs):
LOG.debug("Initiate protect checkpoint_id: %s", checkpoint.id)
checkpoint.status = constants.CHECKPOINT_STATUS_PROTECTING
checkpoint.commit()
update_fields = {"status": checkpoint.status}
utils.update_operation_log(context, operation_log, update_fields)
def revert(self, checkpoint, *args, **kwargs):
def revert(self, context, checkpoint, operation_log, *args, **kwargs):
LOG.debug("Failed to protect checkpoint_id: %s", checkpoint.id)
checkpoint.status = constants.CHECKPOINT_STATUS_ERROR
checkpoint.commit()
update_fields = {
"status": checkpoint.status,
"ended_at": timeutils.utcnow()
}
utils.update_operation_log(context, operation_log, update_fields)
class CompleteProtectTask(task.Task):
def execute(self, checkpoint):
def execute(self, context, checkpoint, operation_log):
LOG.debug("Complete protect checkpoint_id: %s", checkpoint.id)
checkpoint.status = constants.CHECKPOINT_STATUS_AVAILABLE
checkpoint.commit()
update_fields = {
"status": checkpoint.status,
"ended_at": timeutils.utcnow()
}
utils.update_operation_log(context, operation_log, update_fields)
def get_flow(context, protectable_registry, workflow_engine, plan, provider,
@ -45,16 +60,18 @@ def get_flow(context, protectable_registry, workflow_engine, plan, provider,
resources)
checkpoint.resource_graph = resource_graph
checkpoint.commit()
operation_log = utils.create_operation_log(context, checkpoint)
flow_name = "Protect_" + plan.get('id')
protection_flow = workflow_engine.build_flow(flow_name, 'linear')
plugins = provider.load_plugins()
parameters = plan.get('parameters')
resources_task_flow = resource_flow.build_resource_flow(
operation_type=constants.OPERATION_PROTECT,
context=context,
workflow_engine=workflow_engine,
resource_graph=resource_graph,
plugins=plugins,
parameters=plan.get('parameters'),
parameters=parameters,
)
workflow_engine.add_tasks(
protection_flow,
@ -63,6 +80,8 @@ def get_flow(context, protectable_registry, workflow_engine, plan, provider,
CompleteProtectTask(),
)
flow_engine = workflow_engine.get_engine(protection_flow, store={
'checkpoint': checkpoint
'context': context,
'checkpoint': checkpoint,
'operation_log': operation_log
})
return flow_engine

View File

@ -14,10 +14,13 @@
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import timeutils
from oslo_utils import uuidutils
from karbor.common import constants
from karbor.services.protection import client_factory
from karbor.services.protection.flows import utils
from karbor.services.protection import resource_flow
from karbor.services.protection import restore_heat
from taskflow import task
@ -35,22 +38,34 @@ LOG = logging.getLogger(__name__)
class InitiateRestoreTask(task.Task):
def execute(self, restore, *args, **kwargs):
def execute(self, context, restore, operation_log, *args, **kwargs):
LOG.debug("Initiate restore restore_id: %s", restore.id)
restore['status'] = constants.RESTORE_STATUS_IN_PROGRESS
restore.save()
update_fields = {"status": restore.status}
utils.update_operation_log(context, operation_log, update_fields)
def revert(self, restore, *args, **kwargs):
def revert(self, context, restore, operation_log, *args, **kwargs):
LOG.debug("Failed to restore restore_id: %s", restore.id)
restore['status'] = constants.RESTORE_STATUS_FAILURE
restore.save()
update_fields = {
"status": restore.status,
"ended_at": timeutils.utcnow()
}
utils.update_operation_log(context, operation_log, update_fields)
class CompleteRestoreTask(task.Task):
def execute(self, restore, *args, **kwargs):
def execute(self, context, restore, operation_log, *args, **kwargs):
LOG.debug("Complete restore restore_id: %s", restore.id)
restore['status'] = constants.RESTORE_STATUS_SUCCESS
restore.save()
update_fields = {
"status": restore.status,
"ended_at": timeutils.utcnow()
}
utils.update_operation_log(context, operation_log, update_fields)
class CreateHeatTask(task.Task):
@ -159,6 +174,7 @@ def get_flow(context, workflow_engine, checkpoint, provider, restore,
heat_conf["password"] = restore_auth["password"]
resource_graph = checkpoint.resource_graph
operation_log = utils.create_operation_log_restore(context, restore)
parameters = restore.parameters
flow_name = "Restore_" + checkpoint.id
restore_flow = workflow_engine.build_flow(flow_name, 'linear')
@ -181,7 +197,11 @@ def get_flow(context, workflow_engine, checkpoint, provider, restore,
SyncRestoreStatusTask(),
CompleteRestoreTask()
)
flow_engine = workflow_engine.get_engine(restore_flow,
store={'checkpoint': checkpoint,
'restore': restore})
flow_engine = workflow_engine.get_engine(
restore_flow,
store={
'context': context,
'checkpoint': checkpoint,
'restore': restore,
'operation_log': operation_log})
return flow_engine

View File

@ -0,0 +1,95 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from karbor.common import constants
from karbor import exception
from karbor.i18n import _
from karbor import objects
from karbor.objects import base as objects_base
from oslo_log import log as logging
from oslo_utils import timeutils
LOG = logging.getLogger(__name__)
def create_operation_log(context, checkpoint):
checkpoint_dict = checkpoint.to_dict()
extra_info = checkpoint_dict.get('extra_info', None)
create_by = (extra_info.get('create_by', None)
if extra_info else None)
scheduled_operation_id = None
if create_by:
scheduled_operation_id = create_by.get(
'scheduled_operation_id', None)
protection_plan = checkpoint_dict['protection_plan']
plan_id = None
provider_id = None
if protection_plan:
plan_id = protection_plan.get("id")
provider_id = protection_plan.get("provider_id")
operation_log_properties = {
'project_id': checkpoint_dict['project_id'],
'operation_type': constants.OPERATION_PROTECT,
'checkpoint_id': checkpoint_dict['id'],
'plan_id': plan_id,
'provider_id': provider_id,
'scheduled_operation_id': scheduled_operation_id,
'status': checkpoint_dict['status'],
'started_at': timeutils.utcnow()
}
try:
operation_log = objects.OperationLog(context=context,
**operation_log_properties)
operation_log.create()
return operation_log
except Exception:
LOG.error('Error creating operation log. checkpoint: %s',
checkpoint.id)
raise
def update_operation_log(context, operation_log, fields):
if not isinstance(operation_log, objects_base.KarborObject):
msg = _("The parameter must be a object of "
"KarborObject class.")
raise exception.InvalidInput(reason=msg)
try:
operation_log.update(fields)
operation_log.save()
except Exception:
LOG.error('Error update operation log. operation_log: %s',
operation_log.id)
raise
def create_operation_log_restore(context, restore):
operation_log_properties = {
'project_id': restore.get('project_id'),
'operation_type': constants.OPERATION_RESTORE,
'checkpoint_id': restore.get('checkpoint_id'),
'plan_id': restore.get('plan_id', None),
'provider_id': restore.get('provider_id'),
'restore_id': restore.get('id'),
'status': restore.get('status'),
'started_at': timeutils.utcnow()
}
try:
operation_log = objects.OperationLog(context=context,
**operation_log_properties)
operation_log.create()
return operation_log
except Exception:
LOG.error('Error creating operation log. checkpoint: %s',
restore.id)
raise

View File

@ -98,7 +98,8 @@ class ResourceFlowGraphWalkerListener(graph.GraphWalkerListener):
'parameters': parameters,
'resource': resource,
}
requires = OPERATION_EXTRA_ARGS.get(self.operation_type)
requires = OPERATION_EXTRA_ARGS.get(self.operation_type, [])
requires.append('operation_log')
task = self.workflow_engine.create_task(method,
name=task_name,
inject=injects,

View File

@ -0,0 +1,77 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from webob import exc
from karbor.api.v1 import operation_logs
from karbor import context
from karbor.tests import base
from karbor.tests.unit.api import fakes
CONF = cfg.CONF
class OperationLogTest(base.TestCase):
def setUp(self):
super(OperationLogTest, self).setUp()
self.controller = operation_logs.OperationLogsController()
self.ctxt = context.RequestContext('demo', 'fakeproject', True)
@mock.patch(
'karbor.api.v1.operation_logs.'
'OperationLogsController._get_all')
def test_operation_log_list_detail(self, moak_get_all):
req = fakes.HTTPRequest.blank('/v1/operation_logs')
self.controller.index(req)
self.assertTrue(moak_get_all.called)
@mock.patch(
'karbor.api.v1.operation_logs.'
'OperationLogsController._get_all')
def test_operation_log_index_limit_offset(self, moak_get_all):
req = fakes.HTTPRequest.blank(
'/v1/operation_logs?limit=2&offset=1')
self.controller.index(req)
self.assertTrue(moak_get_all.called)
req = fakes.HTTPRequest.blank('/v1/operation_logs?limit=-1&offset=1')
self.assertRaises(exc.HTTPBadRequest,
self.controller.index,
req)
req = fakes.HTTPRequest.blank('/v1/operation_logs?limit=a&offset=1')
self.assertRaises(exc.HTTPBadRequest,
self.controller.index,
req)
url = '/v1/operation_logs?limit=2&offset=43543564546567575'
req = fakes.HTTPRequest.blank(url)
self.assertRaises(exc.HTTPBadRequest,
self.controller.index,
req)
@mock.patch(
'karbor.api.v1.operation_logs.'
'OperationLogsController._operation_log_get')
def test_operation_log_show(self, moak_get):
req = fakes.HTTPRequest.blank('/v1/operation_logs')
self.controller.show(req, '2a9ce1f3-cc1a-4516-9435-0ebb13caa398')
self.assertTrue(moak_get.called)
def test_operation_log_show_Invalid(self):
req = fakes.HTTPRequest.blank('/v1/operation_logs/1')
self.assertRaises(
exc.HTTPBadRequest, self.controller.show,
req, "1")

View File

@ -17,6 +17,7 @@ import oslo_messaging
from karbor import exception
from karbor.resource import Resource
from karbor.services.protection.flows import utils
from karbor.services.protection.flows import worker as flow_manager
from karbor.services.protection import manager
from karbor.services.protection import protectable_registry
@ -120,8 +121,11 @@ class ProtectionServiceTest(base.TestCase):
'name': 'name654', 'extra_info': None}],
result)
@mock.patch.object(utils, 'update_operation_log')
@mock.patch.object(utils, 'create_operation_log')
@mock.patch.object(provider.ProviderRegistry, 'show_provider')
def test_protect(self, mock_provider):
def test_protect(self, mock_provider, mock_operation_log_create,
mock_operation_log_update):
mock_provider.return_value = fakes.FakeProvider()
self.pro_manager.protect(None, fakes.fake_protection_plan())

View File

@ -67,7 +67,8 @@ class ResourceFlowTest(base.TestCase):
parameters)
store = {
'checkpoint': checkpoint
'checkpoint': checkpoint,
'operation_log': None
}
store.update(kwargs)
@ -144,9 +145,14 @@ class ResourceFlowTest(base.TestCase):
template = restore_heat.HeatTemplate()
args['kwargs'] = {
'heat_template': template,
'restore': None
'restore': None,
'operation_log': None
}
kwargs.update(args['kwargs'])
else:
args['kwargs'] = {
'operation_log': None
}
kwargs.update(args['kwargs'])
self._walk_operation(mock_protection, operation,
parameters=parameters, **kwargs)