From a25323152290bc1ec061ecebc24a832ee9ca1277 Mon Sep 17 00:00:00 2001 From: Nguyen Phuong An Date: Wed, 9 Nov 2016 17:02:48 +0700 Subject: [PATCH] [log]: Add driver api and rpc stuff for logging This patch adds driver api and rpc stuff for logging extension as below: - Provides create, update and delete log api for log drivers - Reserves a rpc notification api for log drivers if a log driver requires rpc. - Reserves a rpc listener for listening callback from log drivers. - Also provides db_api: get_logs_bound_port, get_logs_bound_sg and get_sg_log_info_for_port and get_sg_log_info_for_log_resources. Change-Id: I7d50356dd1da49af6faaaa8969b6ae9041f81063 Partially-implements: blueprint security-group-logging Related-Bug: #1468366 --- .../services/logapi/agent/log_extension.py | 3 +- neutron/services/logapi/common/constants.py | 29 ++ neutron/services/logapi/common/db_api.py | 258 +++++++++++++++++ neutron/services/logapi/common/exceptions.py | 5 + neutron/services/logapi/drivers/base.py | 72 +++++ neutron/services/logapi/drivers/manager.py | 76 +++++ neutron/services/logapi/logging_plugin.py | 18 ++ neutron/services/logapi/rpc/__init__.py | 0 neutron/services/logapi/rpc/agent.py | 45 +++ neutron/services/logapi/rpc/server.py | 69 +++++ .../services/logapi/common/test_db_api.py | 266 ++++++++++++++++++ .../services/logapi/drivers/test_manager.py | 49 ++++ .../unit/services/logapi/rpc/__init__.py | 0 .../unit/services/logapi/rpc/test_server.py | 93 ++++++ .../services/logapi/test_logging_plugin.py | 42 ++- 15 files changed, 1023 insertions(+), 2 deletions(-) create mode 100644 neutron/services/logapi/common/db_api.py create mode 100644 neutron/services/logapi/rpc/__init__.py create mode 100644 neutron/services/logapi/rpc/agent.py create mode 100644 neutron/services/logapi/rpc/server.py create mode 100644 neutron/tests/unit/services/logapi/common/test_db_api.py create mode 100644 neutron/tests/unit/services/logapi/rpc/__init__.py create mode 100644 neutron/tests/unit/services/logapi/rpc/test_server.py diff --git a/neutron/services/logapi/agent/log_extension.py b/neutron/services/logapi/agent/log_extension.py index 832296a9a83..b41e6ef4e87 100644 --- a/neutron/services/logapi/agent/log_extension.py +++ b/neutron/services/logapi/agent/log_extension.py @@ -27,6 +27,7 @@ from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import resources_rpc from neutron.conf.services import logging as log_cfg from neutron import manager +from neutron.services.logapi.rpc import agent as agent_rpc log_cfg.register_log_driver_opts() @@ -85,10 +86,10 @@ class LoggingExtension(agent_extension.AgentExtension): def initialize(self, connection, driver_type): """Initialize agent extension.""" - self.resource_rpc = None int_br = self.agent_api.request_int_br() self.log_driver = manager.NeutronManager.load_class_for_provider( LOGGING_DRIVERS_NAMESPACE, driver_type)(int_br) + self.resource_rpc = agent_rpc.LoggingApiStub() self._register_rpc_consumers(connection) self.log_driver.initialize(self.resource_rpc) diff --git a/neutron/services/logapi/common/constants.py b/neutron/services/logapi/common/constants.py index aab3ae287e5..bb50817b245 100644 --- a/neutron/services/logapi/common/constants.py +++ b/neutron/services/logapi/common/constants.py @@ -21,3 +21,32 @@ LOGGING_PLUGIN = 'logging-plugin' # supported logging types SECURITY_GROUP = 'security_group' + +RPC_NAMESPACE_LOGGING = 'logging-plugin' + +# String literal for identifying log resource +LOGGING = 'log' + +# Method names for Logging Driver +PRECOMMIT_POSTFIX = '_precommit' +CREATE_LOG = 'create_log' +CREATE_LOG_PRECOMMIT = CREATE_LOG + PRECOMMIT_POSTFIX +UPDATE_LOG = 'update_log' +UPDATE_LOG_PRECOMMIT = UPDATE_LOG + PRECOMMIT_POSTFIX +DELETE_LOG = 'delete_log' +DELETE_LOG_PRECOMMIT = DELETE_LOG + PRECOMMIT_POSTFIX +# Tell to agent when resources related log_objects update +RESOURCE_UPDATE = 'resource_update' + +LOG_CALL_METHODS = ( + CREATE_LOG, + CREATE_LOG_PRECOMMIT, + UPDATE_LOG, + UPDATE_LOG_PRECOMMIT, + DELETE_LOG, + DELETE_LOG_PRECOMMIT, + RESOURCE_UPDATE +) + +DIRECTION_IP_PREFIX = {'ingress': 'source_ip_prefix', + 'egress': 'dest_ip_prefix'} diff --git a/neutron/services/logapi/common/db_api.py b/neutron/services/logapi/common/db_api.py new file mode 100644 index 00000000000..9cc877c4814 --- /dev/null +++ b/neutron/services/logapi/common/db_api.py @@ -0,0 +1,258 @@ +# Copyright (c) 2017 Fujitsu Limited +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging +from sqlalchemy.orm import exc as orm_exc + +from neutron.db import api as db_api +from neutron.db.models import securitygroup as sg_db +from neutron.objects.logapi import logging_resource as log_object +from neutron.objects import ports as port_objects +from neutron.objects import securitygroup as sg_object +from neutron.services.logapi.common import constants +from neutron.services.logapi.common import validators + +LOG = logging.getLogger(__name__) + + +def _get_ports_attached_to_sg(context, sg_id): + """Return a list of ports attached to a security group""" + + with db_api.context_manager.reader.using(context): + ports = context.session.query( + sg_db.SecurityGroupPortBinding.port_id).filter( + sg_db.SecurityGroupPortBinding.security_group_id == + sg_id).all() + return [port for (port,) in ports] + + +def _get_ports_filter_in_tenant(context, tenant_id): + """Return a list of ports filter under a tenant""" + + try: + sg_id = sg_db.SecurityGroupPortBinding.security_group_id + with db_api.context_manager.reader.using(context): + ports = context.session.query( + sg_db.SecurityGroupPortBinding.port_id).join( + sg_db.SecurityGroup, sg_db.SecurityGroup.id == sg_id).filter( + sg_db.SecurityGroup.tenant_id == tenant_id).all() + return list({port for (port,) in ports}) + except orm_exc.NoResultFound: + return [] + + +def _get_sgs_attached_to_port(context, port_id): + """Return a list of security groups are associated to a port""" + + with db_api.context_manager.reader.using(context): + sg_ids = context.session.query( + sg_db.SecurityGroupPortBinding.security_group_id).filter( + sg_db.SecurityGroupPortBinding.port_id == port_id).all() + return [sg_id for (sg_id, ) in sg_ids] + + +def _get_ports_being_logged(context, sg_log): + """Return a list of ports being logged for a log_resource""" + + target_id = sg_log['target_id'] + resource_id = sg_log['resource_id'] + + # if 'target_id' (port_id) is specified in a log_resource + if target_id is not None: + port_ids = [target_id] + # if 'resource_id' (sg_id) is specified in a log_resource + elif resource_id is not None: + port_ids = _get_ports_attached_to_sg(context, resource_id) + # both 'resource_id' and 'target_id' aren't specified in a log_resource + else: + port_ids = _get_ports_filter_in_tenant(context, sg_log['project_id']) + + # list of validated ports's being logged + validated_port_ids = [] + ports = port_objects.Port.get_objects(context, id=port_ids) + for port in ports: + if validators.validate_log_type_for_port('security_group', port): + validated_port_ids.append(port.id) + else: + msg = ("Logging type %(log_type)s is not supported on " + "port %(port_id)s." % + {'log_type': 'security_group', 'port_id': port.id}) + LOG.warning(msg) + + return validated_port_ids + + +def _get_sg_ids_log_for_port(context, sg_log, port_id): + """Return a list of security group ids being logged for a port""" + + sg_ids = _get_sgs_attached_to_port(context, port_id) + resource_id = sg_log['resource_id'] + + # if resource_id is not specified + if not resource_id: + return sg_ids + + # if resource_id is specified and belong a set of sgs are + # associated to port + if resource_id in sg_ids: + return [resource_id] + + return [] + + +def _create_sg_rule_dict(rule_in_db): + """Return a dict of a security group rule""" + direction = rule_in_db['direction'] + rule_dict = { + 'direction': direction, + 'ethertype': rule_in_db['ethertype']} + + rule_dict.update({ + key: rule_in_db[key] + for key in ('protocol', 'port_range_min', 'port_range_max', + 'remote_group_id') if rule_in_db[key] is not None}) + + remote_ip_prefix = rule_in_db['remote_ip_prefix'] + if remote_ip_prefix is not None: + direction_ip_prefix = constants.DIRECTION_IP_PREFIX[direction] + rule_dict[direction_ip_prefix] = remote_ip_prefix + + rule_dict['security_group_id'] = rule_in_db['security_group_id'] + + return rule_dict + + +def _get_sg_rules(context, sg_log, port_id): + """Return a list of sg_rules log for a port being logged""" + + sg_ids = _get_sg_ids_log_for_port(context, sg_log, port_id) + if not sg_ids: + return [] + filters = {'security_group_id': sg_ids} + rules_in_db = sg_object.SecurityGroupRule.get_objects(context, **filters) + return [_create_sg_rule_dict(rule_in_db) for rule_in_db in rules_in_db] + + +def _get_port_log_dict(context, port_id, sg_log): + return { + 'port_id': port_id, + 'security_group_rules': _get_sg_rules(context, sg_log, port_id) + } + + +def _make_log_dict(context, sg_log, port_ids_log): + return { + 'id': sg_log['id'], + 'ports_log': [_get_port_log_dict(context, port_id, sg_log) + for port_id in port_ids_log], + 'event': sg_log['event'], + 'project_id': sg_log['project_id'] + } + + +def get_logs_bound_port(context, port_id): + """Return a list of log_resources bound to a port""" + + port = port_objects.Port.get_object(context, id=port_id) + project_id = port['project_id'] + logs = log_object.Log.get_objects( + context, project_id=project_id, enabled=True) + is_bound = lambda log: (log.resource_id in port.security_group_ids + or log.target_id == port.id + or (not log.target_id and not log.resource_id)) + return [log for log in logs if is_bound(log)] + + +def get_logs_bound_sg(context, sg_id): + """Return a list of log_resources bound to a security group""" + + project_id = context.tenant_id + log_objs = log_object.Log.get_objects( + context, project_id=project_id, enabled=True) + log_resources = [] + for log_obj in log_objs: + if log_obj.resource_id == sg_id: + log_resources.append(log_obj) + elif log_obj.target_id: + port = port_objects.Port.get_object( + context, id=log_obj.target_id) + if sg_id in port.security_group_ids: + log_resources.append(log_obj) + elif not log_obj.resource_id and not log_obj.target_id: + log_resources.append(log_obj) + return log_resources + + +def get_sg_log_info_for_port(context, port_id): + """Return a list of security groups log info for a port + + This method provides a list of security groups log info for a port. + The list has format as below: + + [ + {'id': xxx, + 'ports_log': [{'port_id': u'xxx', + 'security_group_rules': [{ + 'direction': u'egress', + 'ethertype': u'IPv6', + 'security_group_id': u'xxx'}, + {...}] + }] + 'event': u'ALL', + 'project_id': u'xxx' + }, + ... + ] + :param context: current running context information + :param port_id: port ID which needed to get security groups log info + + """ + + sg_logs = get_logs_bound_port(context, port_id) + return [_make_log_dict(context, sg_log, [port_id]) + for sg_log in sg_logs] + + +def get_sg_log_info_for_log_resources(context, log_resources): + """Return a list of security groups log info for list of log_resources + + This method provides a list of security groups log info for list of + log_resources. The list has format as below: + + [ + {'id': xxx, + 'ports_log': [{'port_id': u'xxx', + 'security_group_rules': [{ + 'direction': u'egress', + 'ethertype': u'IPv6', + 'security_group_id': u'xxx'}, + {...}] + }, ...] + 'event': u'ALL', + 'project_id': u'xxx' + }, + ... + ] + :param context: current running context information + :param log_resources: list of log_resources, which needed to get + security groups log info + + """ + + logs_info = [] + for sg_log in log_resources: + port_ids = _get_ports_being_logged(context, sg_log) + logs_info.append(_make_log_dict(context, sg_log, port_ids)) + return logs_info diff --git a/neutron/services/logapi/common/exceptions.py b/neutron/services/logapi/common/exceptions.py index f9ff5f40a9c..85565de2c66 100644 --- a/neutron/services/logapi/common/exceptions.py +++ b/neutron/services/logapi/common/exceptions.py @@ -42,3 +42,8 @@ class InvalidResourceConstraint(n_exc.InvalidInput): message = _("Invalid resource constraint between resource " "(%(resource)s %(resource_id)s) and target resource " "(%(target_resource)s %(target_id)s).") + + +class LogapiDriverException(n_exc.NeutronException): + """A log api driver Exception""" + message = _("Driver exception: %(exception_msg)s") diff --git a/neutron/services/logapi/drivers/base.py b/neutron/services/logapi/drivers/base.py index 4e05996e982..d7e0b2cc3ab 100644 --- a/neutron/services/logapi/drivers/base.py +++ b/neutron/services/logapi/drivers/base.py @@ -77,3 +77,75 @@ class DriverBase(object): {'log_type': log_type, 'driver_name': self.name}) return supported + + def create_log(self, context, log_obj): + """Create a log_obj invocation. + + This method can be implemented by the specific driver subclass + to update the backend where necessary with a specific log object. + + :param context: current running context information + :param log_obj: a log objects being created + + """ + + def create_log_precommit(self, context, log_obj): + """Create a log_obj precommit. + + This method can be implemented by the specific driver subclass + to handle the precommit event of a log_object that is being created. + + :param context: current running context information + :param log_obj: a log object being created + """ + + def update_log(self, context, log_obj): + """Update a log_obj invocation. + + This method can be implemented by the specific driver subclass + to update the backend where necessary with a specific log object. + + :param context: current running context information + :param log_obj: a log object being updated + + """ + + def update_log_precommit(self, context, log_obj): + """Update a log_obj precommit. + + This method can be implemented by the specific driver subclass + to handle update precommit event of a log_object that is being updated. + + :param context: current running context information + :param log_obj: a log_object being updated. + """ + + def delete_log(self, context, log_obj): + """Delete a log_obj invocation. + + This method can be implemented by the specific driver subclass + to delete the backend where necessary with a specific log object. + + :param context: current running context information + :param log_obj: a log_object being deleted + + """ + + def delete_log_precommit(self, context, log_obj): + """Delete a log_obj precommit. + + This method can be implemented by the specific driver subclass + to handle delete precommit event of a log_object that is being deleted. + + :param context: current running context information + :param log_obj: a log_object being deleted + """ + + def resource_update(self, context, log_objs): + """Tell the agent when resources related to log_objects are + being updated + + :param context: current running context information + :param log_objs: a list of log_objects, whose related resources are + being updated. + """ diff --git a/neutron/services/logapi/drivers/manager.py b/neutron/services/logapi/drivers/manager.py index 3165707cf40..fdcfc97e771 100644 --- a/neutron/services/logapi/drivers/manager.py +++ b/neutron/services/logapi/drivers/manager.py @@ -15,19 +15,41 @@ from neutron_lib.callbacks import events from neutron_lib.callbacks import registry +from neutron_lib.callbacks import resources from oslo_log import log as logging +from neutron.common import exceptions from neutron.services.logapi.common import constants as log_const +from neutron.services.logapi.common import db_api +from neutron.services.logapi.common import exceptions as log_exc +from neutron.services.logapi.rpc import server as server_rpc LOG = logging.getLogger(__name__) +def _get_param(args, kwargs, name, index): + try: + return kwargs[name] + except KeyError: + try: + return args[index] + except IndexError: + msg = "Missing parameter %s" % name + raise log_exc.LogapiDriverException(exception_msg=msg) + + +@registry.has_registry_receivers class LoggingServiceDriverManager(object): def __init__(self): self._drivers = set() + self.rpc_required = False registry.publish(log_const.LOGGING_PLUGIN, events.AFTER_INIT, self) + if self.rpc_required: + self._start_rpc_listeners() + self.logging_rpc = server_rpc.LoggingApiNotification() + @property def drivers(self): return self._drivers @@ -38,6 +60,11 @@ class LoggingServiceDriverManager(object): This method is called from drivers on INIT event. """ self._drivers.add(driver) + self.rpc_required |= driver.requires_rpc + + def _start_rpc_listeners(self): + self._skeleton = server_rpc.LoggingApiSkeleton() + return self._skeleton.conn.consume_in_threads() @property def supported_logging_types(self): @@ -51,3 +78,52 @@ class LoggingServiceDriverManager(object): LOG.debug("Supported logging types (logging types supported " "by at least one loaded log_driver): %s", log_types) return log_types + + def call(self, method_name, *args, **kwargs): + """Helper method for calling a method across all extension drivers.""" + exc_list = [] + for driver in self._drivers: + try: + getattr(driver, method_name)(*args, **kwargs) + except Exception as exc: + exception_msg = ("Extension driver '%(name)s' failed in " + "%(method)s") + exception_data = {'name': driver.name, 'method': method_name} + LOG.exception(exception_msg, exception_data) + exc_list.append(exc) + + if exc_list: + raise exceptions.DriverCallError(exc_list=exc_list) + + if self.rpc_required: + context = _get_param(args, kwargs, 'context', index=0) + log_obj = _get_param(args, kwargs, 'log_obj', index=1) + + try: + rpc_method = getattr(self.logging_rpc, method_name) + except AttributeError: + LOG.error("Method %s is not implemented in logging RPC", + method_name) + return + rpc_method(context, log_obj) + + @registry.receives(resources.SECURITY_GROUP_RULE, + [events.AFTER_CREATE, events.AFTER_DELETE]) + def _handle_sg_rule_callback(self, resource, event, trigger, **kwargs): + """Handle sg_rule create/delete events + + This method handles sg_rule events, if sg_rule bound by log_resources, + it should tell to agent to update log_drivers. + + """ + context = kwargs['context'] + sg_rules = kwargs.get('security_group_rule') + if sg_rules: + sg_id = sg_rules.get('security_group_id') + else: + sg_id = kwargs.get('security_group_id') + + log_resources = db_api.get_logs_bound_sg(context, sg_id) + if log_resources: + self.call( + log_const.RESOURCE_UPDATE, context, log_resources) diff --git a/neutron/services/logapi/logging_plugin.py b/neutron/services/logapi/logging_plugin.py index a7ee2f8e30b..2368ac866eb 100644 --- a/neutron/services/logapi/logging_plugin.py +++ b/neutron/services/logapi/logging_plugin.py @@ -18,6 +18,7 @@ from neutron.db import db_base_plugin_common from neutron.extensions import logging as log_ext from neutron.objects import base as base_obj from neutron.objects.logapi import logging_resource as log_object +from neutron.services.logapi.common import constants as log_const from neutron.services.logapi.common import exceptions as log_exc from neutron.services.logapi.common import validators from neutron.services.logapi.drivers import manager as driver_mgr @@ -77,6 +78,12 @@ class LoggingPlugin(log_ext.LoggingPluginBase): log_data.pop('tenant_id', None) log_obj = log_object.Log(context=context, **log_data) log_obj.create() + if log_obj.enabled: + self.driver_manager.call( + log_const.CREATE_LOG_PRECOMMIT, context, log_obj) + if log_obj.enabled: + self.driver_manager.call( + log_const.CREATE_LOG, context, log_obj) return log_obj @db_base_plugin_common.convert_result_to_dict @@ -87,6 +94,13 @@ class LoggingPlugin(log_ext.LoggingPluginBase): log_obj = log_object.Log(context, id=log_id) log_obj.update_fields(log_data, reset_changes=True) log_obj.update() + need_notify = 'enabled' in log_data + if need_notify: + self.driver_manager.call( + log_const.UPDATE_LOG_PRECOMMIT, context, log_obj) + if need_notify: + self.driver_manager.call( + log_const.UPDATE_LOG, context, log_obj) return log_obj def delete_log(self, context, log_id): @@ -94,6 +108,10 @@ class LoggingPlugin(log_ext.LoggingPluginBase): with db_api.context_manager.writer.using(context): log_obj = self._get_log(context, log_id) log_obj.delete() + self.driver_manager.call( + log_const.DELETE_LOG_PRECOMMIT, context, log_obj) + self.driver_manager.call( + log_const.DELETE_LOG, context, log_obj) def get_loggable_resources(self, context, filters=None, fields=None, sorts=None, limit=None, diff --git a/neutron/services/logapi/rpc/__init__.py b/neutron/services/logapi/rpc/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/services/logapi/rpc/agent.py b/neutron/services/logapi/rpc/agent.py new file mode 100644 index 00000000000..5a4b8f6beb4 --- /dev/null +++ b/neutron/services/logapi/rpc/agent.py @@ -0,0 +1,45 @@ +# Copyright (C) 2017 Fujitsu Limited +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import helpers as log_helpers +import oslo_messaging + +from neutron.common import rpc as n_rpc +from neutron.services.logapi.common import constants as log_const + + +class LoggingApiStub(object): + """Stub proxy code for agent->server communication.""" + + def __init__(self): + + target = oslo_messaging.Target( + topic=log_const.LOGGING_PLUGIN, + version='1.0', + namespace=log_const.RPC_NAMESPACE_LOGGING) + self.rpc_client = n_rpc.get_client(target) + + @log_helpers.log_method_call + def get_sg_log_info_for_port(self, context, port_id): + """Return list of sg_log info for a port""" + cctxt = self.rpc_client.prepare() + return cctxt.call(context, 'get_sg_log_info_for_port', port_id=port_id) + + @log_helpers.log_method_call + def get_sg_log_info_for_log_resources(self, context, log_resources): + """Return list of sg_log info for list of log_resources""" + cctxt = self.rpc_client.prepare() + return cctxt.call(context, 'get_sg_log_info_for_log_resources', + log_resources=log_resources) diff --git a/neutron/services/logapi/rpc/server.py b/neutron/services/logapi/rpc/server.py new file mode 100644 index 00000000000..3de4b07a062 --- /dev/null +++ b/neutron/services/logapi/rpc/server.py @@ -0,0 +1,69 @@ +# Copyright (C) 2017 Fujitsu Limited +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import helpers as log_helpers +import oslo_messaging + +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.handlers import resources_rpc +from neutron.common import rpc as n_rpc +from neutron.services.logapi.common import constants as log_const +from neutron.services.logapi.common import db_api + + +class LoggingApiSkeleton(object): + """Skeleton proxy code for agent->server communication.""" + + # History + # 1.0 Initial version + + target = oslo_messaging.Target( + version='1.0', namespace=log_const.RPC_NAMESPACE_LOGGING) + + def __init__(self): + self.conn = n_rpc.create_connection() + self.conn.create_consumer(log_const.LOGGING_PLUGIN, [self], + fanout=False) + + @log_helpers.log_method_call + def get_sg_log_info_for_port(self, context, port_id): + return db_api.get_sg_log_info_for_port(context, port_id) + + @log_helpers.log_method_call + def get_sg_log_info_for_log_resources(self, context, log_resources): + return db_api.get_sg_log_info_for_log_resources(context, log_resources) + + +class LoggingApiNotification(object): + + def __init__(self): + self.notification_api = resources_rpc.ResourcesPushRpcApi() + + @log_helpers.log_method_call + def create_log(self, context, log_obj): + self.notification_api.push(context, [log_obj], events.CREATED) + + @log_helpers.log_method_call + def update_log(self, context, log_obj): + self.notification_api.push(context, [log_obj], events.UPDATED) + + @log_helpers.log_method_call + def delete_log(self, context, log_obj): + self.notification_api.push(context, [log_obj], events.DELETED) + + @log_helpers.log_method_call + def resource_update(self, context, log_objs): + """Tell to agent when resources related to log_objects updated""" + self.notification_api.push(context, log_objs, events.UPDATED) diff --git a/neutron/tests/unit/services/logapi/common/test_db_api.py b/neutron/tests/unit/services/logapi/common/test_db_api.py new file mode 100644 index 00000000000..06cd2f7d13f --- /dev/null +++ b/neutron/tests/unit/services/logapi/common/test_db_api.py @@ -0,0 +1,266 @@ +# Copyright (c) 2017 Fujitsu Limited +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from neutron_lib import constants as const +from neutron_lib import context +from oslo_utils import uuidutils + +from neutron.common import utils +from neutron.objects.logapi import logging_resource as log_object +from neutron.services.logapi.common import db_api +from neutron.services.logapi.common import validators +from neutron.services.logapi.rpc import server as server_rpc +from neutron.tests.unit.extensions import test_securitygroup as test_sg + + +def _create_log(tenant_id, resource_id=None, + target_id=None, event='ALL', enabled=True,): + + log_data = { + 'id': uuidutils.generate_uuid(), + 'name': 'test', + 'resource_type': 'security_group', + 'project_id': tenant_id, + 'event': event, + 'enabled': enabled} + if resource_id: + log_data['resource_id'] = resource_id + if target_id: + log_data['target_id'] = target_id + return log_object.Log(**log_data) + + +class LoggingDBApiTestCase(test_sg.SecurityGroupDBTestCase): + + def setUp(self): + super(LoggingDBApiTestCase, self).setUp() + self.context = context.get_admin_context() + self.sg_id, self.port_id, self.tenant_id = self._create_sg_and_port() + + def _create_sg_and_port(self): + with self.network() as network, \ + self.subnet(network), \ + self.security_group() as sg: + sg_id = sg['security_group']['id'] + tenant_id = sg['security_group']['tenant_id'] + + res = self._create_port( + self.fmt, network['network']['id'], + security_groups=[sg_id]) + ports_rest = self.deserialize(self.fmt, res) + port_id = ports_rest['port']['id'] + return sg_id, port_id, tenant_id + + def test_get_logs_bound_port(self): + log = _create_log(target_id=self.port_id, tenant_id=self.tenant_id) + with mock.patch.object(log_object.Log, 'get_objects', + return_value=[log]): + self.assertEqual( + [log], db_api.get_logs_bound_port(self.context, self.port_id)) + + def test_get_logs_not_bound_port(self): + fake_sg_id = uuidutils.generate_uuid() + log = _create_log(resource_id=fake_sg_id, tenant_id=self.tenant_id) + with mock.patch.object(log_object.Log, 'get_objects', + return_value=[log]): + self.assertEqual( + [], db_api.get_logs_bound_port(self.context, self.port_id)) + + def test_get_logs_bound_sg(self): + log = _create_log(resource_id=self.sg_id, tenant_id=self.tenant_id) + with mock.patch.object(log_object.Log, 'get_objects', + return_value=[log]): + self.assertEqual( + [log], db_api.get_logs_bound_sg(self.context, self.sg_id)) + + def test_get_logs_not_bound_sg(self): + with self.network() as network, \ + self.subnet(network), \ + self.security_group() as sg: + sg2_id = sg['security_group']['id'] + res = self._create_port( + self.fmt, network['network']['id'], + security_groups=[sg2_id]) + port2_id = self.deserialize(self.fmt, res)['port']['id'] + log = _create_log(target_id=port2_id, tenant_id=self.tenant_id) + with mock.patch.object(log_object.Log, 'get_objects', + return_value=[log]): + self.assertEqual( + [], db_api.get_logs_bound_sg(self.context, self.sg_id)) + + def test__get_ports_being_logged(self): + log1 = _create_log(target_id=self.port_id, + tenant_id=self.tenant_id) + log2 = _create_log(resource_id=self.sg_id, + tenant_id=self.tenant_id) + log3 = _create_log(target_id=self.port_id, + resource_id=self.tenant_id, + tenant_id=self.tenant_id) + log4 = _create_log(tenant_id=self.tenant_id) + with mock.patch.object( + validators, 'validate_log_type_for_port', return_value=True): + ports_log1 = db_api._get_ports_being_logged(self.context, log1) + ports_log2 = db_api._get_ports_being_logged(self.context, log2) + ports_log3 = db_api._get_ports_being_logged(self.context, log3) + ports_log4 = db_api._get_ports_being_logged(self.context, log4) + + self.assertEqual([self.port_id], ports_log1) + self.assertEqual([self.port_id], ports_log2) + self.assertEqual([self.port_id], ports_log3) + self.assertEqual([self.port_id], ports_log4) + + def test__get_ports_being_logged_not_supported_log_type(self): + log = _create_log(tenant_id=self.tenant_id) + with mock.patch.object( + validators, 'validate_log_type_for_port', return_value=False): + ports_log = db_api._get_ports_being_logged(self.context, log) + self.assertEqual([], ports_log) + + +class LoggingRpcCallbackTestCase(test_sg.SecurityGroupDBTestCase): + + def setUp(self): + super(LoggingRpcCallbackTestCase, self).setUp() + self.context = context.get_admin_context() + self.rpc_callback = server_rpc.LoggingApiSkeleton() + + def test_get_sg_log_info_for_create_or_update_log(self): + with self.network() as network, \ + self.subnet(network), \ + self.security_group() as sg: + sg_id = sg['security_group']['id'] + tenant_id = sg['security_group']['tenant_id'] + rule1 = self._build_security_group_rule( + sg_id, + 'ingress', const.PROTO_NAME_TCP, '22', '22', + ) + rule2 = self._build_security_group_rule( + sg_id, + 'egress', const.PROTO_NAME_TCP, + remote_ip_prefix='10.0.0.1', + ) + rules = { + 'security_group_rules': [rule1['security_group_rule'], + rule2['security_group_rule']]} + self._create_security_group_rule(self.fmt, rules) + res = self._create_port( + self.fmt, network['network']['id'], + security_groups=[sg_id]) + ports_rest = self.deserialize(self.fmt, res) + port_id = ports_rest['port']['id'] + log = _create_log(resource_id=sg_id, tenant_id=tenant_id) + with mock.patch.object(validators, 'validate_log_type_for_port', + return_value=True): + ports_log = ( + self.rpc_callback.get_sg_log_info_for_log_resources( + self.context, log_resources=[log]) + ) + expected = [{ + 'event': log.event, + 'id': log.id, + 'ports_log': [{ + 'port_id': port_id, + 'security_group_rules': [ + {'direction': 'egress', + 'ethertype': u'IPv4', + 'security_group_id': sg_id}, + {'direction': 'egress', + 'ethertype': u'IPv6', + 'security_group_id': sg_id}, + {'direction': 'ingress', + 'ethertype': u'IPv4', + 'port_range_max': 22, + 'port_range_min': 22, + 'protocol': u'tcp', + 'security_group_id': sg_id}, + {'direction': 'egress', + 'ethertype': u'IPv4', + 'protocol': u'tcp', + 'dest_ip_prefix': + utils.AuthenticIPNetwork('10.0.0.1/32'), + 'security_group_id': sg_id}] + }], + 'project_id': tenant_id + }] + self.assertEqual(expected, ports_log) + self._delete('ports', port_id) + + def test_get_sg_log_info_for_port_added_event(self): + with self.network() as network, \ + self.subnet(network), \ + self.security_group() as sg: + sg_id = sg['security_group']['id'] + tenant_id = sg['security_group']['tenant_id'] + rule1 = self._build_security_group_rule( + sg_id, + 'ingress', const.PROTO_NAME_TCP, '11', '13', + remote_ip_prefix='10.0.0.1', + ) + rule2 = self._build_security_group_rule( + sg_id, + 'egress', const.PROTO_NAME_ICMP, + ) + rules = { + 'security_group_rules': [rule1['security_group_rule'], + rule2['security_group_rule']]} + self._create_security_group_rule(self.fmt, rules) + res = self._create_port( + self.fmt, network['network']['id'], + security_groups=[sg_id], + tenant_id=tenant_id + ) + ports_rest = self.deserialize(self.fmt, res) + port_id = ports_rest['port']['id'] + log = _create_log(tenant_id=tenant_id) + with mock.patch.object( + log_object.Log, 'get_objects', return_value=[log]): + with mock.patch.object( + validators, 'validate_log_type_for_port', + return_value=True): + ports_log = ( + self.rpc_callback.get_sg_log_info_for_port( + self.context, port_id=port_id) + ) + expected = [{ + 'event': log.event, + 'id': log.id, + 'ports_log': [{ + 'port_id': port_id, + 'security_group_rules': [ + {'direction': 'egress', + 'ethertype': u'IPv4', + 'security_group_id': sg_id}, + {'direction': 'egress', + 'ethertype': u'IPv6', + 'security_group_id': sg_id}, + {'direction': 'ingress', + 'ethertype': u'IPv4', + 'port_range_max': 13, + 'port_range_min': 11, + 'protocol': u'tcp', + 'source_ip_prefix': + utils.AuthenticIPNetwork('10.0.0.1/32'), + 'security_group_id': sg_id}, + {'direction': 'egress', + 'ethertype': u'IPv4', + 'protocol': u'icmp', + 'security_group_id': sg_id}] + }], + 'project_id': tenant_id + }] + + self.assertEqual(expected, ports_log) + self._delete('ports', port_id) diff --git a/neutron/tests/unit/services/logapi/drivers/test_manager.py b/neutron/tests/unit/services/logapi/drivers/test_manager.py index f643dd8d157..eae7f801980 100644 --- a/neutron/tests/unit/services/logapi/drivers/test_manager.py +++ b/neutron/tests/unit/services/logapi/drivers/test_manager.py @@ -13,11 +13,34 @@ # License for the specific language governing permissions and limitations # under the License. +import mock + +from neutron.common import exceptions +from neutron.services.logapi.common import constants as log_const +from neutron.services.logapi.common import exceptions as log_exc from neutron.services.logapi.drivers import base as log_driver_base from neutron.services.logapi.drivers import manager as driver_mgr from neutron.tests.unit.services.logapi import base +class TestGetParameter(base.BaseLogTestCase): + + def test__get_param_missing_parameter(self): + kwargs = {'context': mock.sentinel.context} + self.assertRaises(log_exc.LogapiDriverException, + driver_mgr._get_param, + args=[], kwargs=kwargs, + name='log_obj', index=1) + self.assertRaises(log_exc.LogapiDriverException, + driver_mgr._get_param, + args=[mock.sentinel.context], kwargs={}, + name='log_obj', index=1) + self.assertRaises(log_exc.LogapiDriverException, + driver_mgr._get_param, + args=[], kwargs={'log_obj': mock.sentinel.log_obj}, + name='context', index=0) + + class TestLogDriversManagerBase(base.BaseLogTestCase): def setUp(self): @@ -77,3 +100,29 @@ class TestLogDriversManagerLoggingTypes(TestLogDriversManagerBase): }) self.assertEqual(set(['security_group', 'firewall']), driver_manager.supported_logging_types) + + +class TestLogDriversCalls(TestLogDriversManagerBase): + """Test log driver calls""" + + def setUp(self): + super(TestLogDriversCalls, self).setUp() + self.driver_manager = self._create_manager_with_drivers( + {'driver-A': {'is_loaded': True}}) + + def test_implemented_call_methods(self): + for method in log_const.LOG_CALL_METHODS: + with mock.patch.object(log_driver_base.DriverBase, method) as \ + method_fnc: + context = mock.sentinel.context + log_obj = mock.sentinel.log_obj + self.driver_manager.call( + method, context=context, log_objs=[log_obj]) + method_fnc.assert_called_once_with( + context=context, log_objs=[log_obj]) + + def test_not_implemented_call_methods(self): + context = mock.sentinel.context + log_obj = mock.sentinel.log_obj + self.assertRaises(exceptions.DriverCallError, self.driver_manager.call, + 'wrong_method', context=context, log_objs=[log_obj]) diff --git a/neutron/tests/unit/services/logapi/rpc/__init__.py b/neutron/tests/unit/services/logapi/rpc/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/unit/services/logapi/rpc/test_server.py b/neutron/tests/unit/services/logapi/rpc/test_server.py new file mode 100644 index 00000000000..4d71a5d7115 --- /dev/null +++ b/neutron/tests/unit/services/logapi/rpc/test_server.py @@ -0,0 +1,93 @@ +# Copyright (c) 2017 Fujitsu Limited +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from oslo_config import cfg +import oslo_messaging + +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.handlers import resources_rpc +from neutron.services.logapi.common import constants as log_const +from neutron.services.logapi.rpc import server as server_rpc +from neutron.tests import base + + +class LoggingApiNotificationTestCase(base.BaseTestCase): + + def setUp(self): + super(LoggingApiNotificationTestCase, self).setUp() + self.test_obj = server_rpc.LoggingApiNotification() + + def test___init__(self): + self.assertIsInstance(self.test_obj.notification_api, + resources_rpc.ResourcesPushRpcApi) + + @mock.patch("neutron.api.rpc.handlers.resources_rpc.ResourcesPushRpcApi." + "push") + def test_create_log(self, mocked_push): + m_context = mock.Mock() + m_log_resource = mock.Mock() + self.test_obj.create_log(m_context, m_log_resource) + mocked_push.assert_called_with(m_context, [m_log_resource], + events.CREATED) + + @mock.patch("neutron.api.rpc.handlers.resources_rpc.ResourcesPushRpcApi." + "push") + def test_update_log(self, mocked_push): + m_context = mock.Mock() + m_log_resource = mock.Mock() + self.test_obj.update_log(m_context, m_log_resource) + mocked_push.assert_called_with(m_context, [m_log_resource], + events.UPDATED) + + @mock.patch("neutron.api.rpc.handlers.resources_rpc.ResourcesPushRpcApi." + "push") + def test_delete_log(self, mocked_push): + m_context = mock.Mock() + m_log_resource = mock.Mock() + self.test_obj.delete_log(m_context, m_log_resource) + mocked_push.assert_called_with(m_context, [m_log_resource], + events.DELETED) + + +class LoggingApiSkeletonTestCase(base.BaseTestCase): + + @mock.patch("neutron.common.rpc.get_server") + def test___init__(self, mocked_get_server): + test_obj = server_rpc.LoggingApiSkeleton() + _target = oslo_messaging.Target( + topic=log_const.LOGGING_PLUGIN, + server=cfg.CONF.host, + fanout=False) + mocked_get_server.assert_called_with(_target, [test_obj]) + + @mock.patch("neutron.services.logapi.common.db_api." + "get_sg_log_info_for_port") + def test_get_sg_log_info_for_port(self, mock_callback): + test_obj = server_rpc.LoggingApiSkeleton() + m_context = mock.Mock() + port_id = '123' + test_obj.get_sg_log_info_for_port(m_context, port_id=port_id) + mock_callback.assert_called_with(m_context, port_id) + + @mock.patch("neutron.services.logapi.common.db_api." + "get_sg_log_info_for_log_resources") + def test_get_sg_log_info_for_log_resources(self, mock_callback): + test_obj = server_rpc.LoggingApiSkeleton() + m_context = mock.Mock() + log_resources = [mock.Mock()] + test_obj.get_sg_log_info_for_log_resources(m_context, + log_resources=log_resources) + mock_callback.assert_called_with(m_context, log_resources) diff --git a/neutron/tests/unit/services/logapi/test_logging_plugin.py b/neutron/tests/unit/services/logapi/test_logging_plugin.py index 7a73b225ed3..94d1121d157 100644 --- a/neutron/tests/unit/services/logapi/test_logging_plugin.py +++ b/neutron/tests/unit/services/logapi/test_logging_plugin.py @@ -58,7 +58,7 @@ class TestLoggingPlugin(base.BaseLogTestCase): mock.patch('neutron.services.logapi.drivers.manager.' 'LoggingServiceDriverManager.supported_logging_types', new_callable=log_types).start() - self.ctxt = context.Context('fake_user', 'fake_tenant') + self.ctxt = context.Context('admin', 'fake_tenant') mock.patch.object(self.ctxt.session, 'refresh').start() mock.patch.object(self.ctxt.session, 'expunge').start() @@ -111,6 +111,13 @@ class TestLoggingPlugin(base.BaseLogTestCase): context=self.ctxt, **log['log']) self.assertTrue(new_log.create.called) + calls = [ + mock.call.call('create_log_precommit', + self.ctxt, new_log), + mock.call.call('create_log', self.ctxt, new_log) + ] + self.log_plugin.driver_manager.assert_has_calls(calls) + def test_create_log_without_sg_resource(self): log = {'log': {'resource_type': 'security_group', 'enabled': True, @@ -129,6 +136,13 @@ class TestLoggingPlugin(base.BaseLogTestCase): context=self.ctxt, **log['log']) self.assertTrue(new_log.create.called) + calls = [ + mock.call.call('create_log_precommit', + self.ctxt, new_log), + mock.call.call('create_log', self.ctxt, new_log) + ] + self.log_plugin.driver_manager.assert_has_calls(calls) + def test_create_log_without_parent_resource(self): log = {'log': {'resource_type': 'security_group', 'enabled': True, @@ -144,6 +158,12 @@ class TestLoggingPlugin(base.BaseLogTestCase): **log['log']) self.assertTrue(new_log.create.called) + calls = [ + mock.call.call('create_log_precommit', self.ctxt, new_log), + mock.call.call('create_log', self.ctxt, new_log) + ] + self.log_plugin.driver_manager.assert_has_calls(calls) + def test_create_log_without_target(self): log = {'log': {'resource_type': 'security_group', 'enabled': True, }} @@ -157,6 +177,12 @@ class TestLoggingPlugin(base.BaseLogTestCase): **log['log']) self.assertTrue(new_log.create.called) + calls = [ + mock.call.call('create_log_precommit', self.ctxt, new_log), + mock.call.call('create_log', self.ctxt, new_log) + ] + self.log_plugin.driver_manager.assert_has_calls(calls) + def test_create_log_nonexistent_sg_resource(self): log = {'log': {'resource_type': 'security_group', 'enabled': True, @@ -212,6 +238,7 @@ class TestLoggingPlugin(base.BaseLogTestCase): init_log_mock.assert_called_once_with( context=self.ctxt, **log_data['log']) self.assertTrue(new_log.create.called) + self.log_plugin.driver_manager.call.assert_not_called() def test_create_log_with_unsupported_logging_type(self): log = {'log': {'resource_type': 'fake_type', @@ -254,6 +281,12 @@ class TestLoggingPlugin(base.BaseLogTestCase): reset_changes=True) self.assertTrue(new_log.update.called) + calls = [ + mock.call.call('update_log_precommit', self.ctxt, new_log), + mock.call.call('update_log', self.ctxt, new_log) + ] + self.log_plugin.driver_manager.assert_has_calls(calls) + def test_update_log_none_enabled(self): log_data = {'log': {}} new_log = mock.Mock() @@ -267,6 +300,7 @@ class TestLoggingPlugin(base.BaseLogTestCase): new_log.update_fields.assert_called_once_with(log_data['log'], reset_changes=True) self.assertTrue(new_log.update.called) + self.log_plugin.driver_manager.call.assert_not_called() def test_delete_log(self): delete_log = mock.Mock() @@ -278,6 +312,12 @@ class TestLoggingPlugin(base.BaseLogTestCase): id=delete_log.id) self.assertTrue(delete_log.delete.called) + calls = [ + mock.call.call('delete_log_precommit', self.ctxt, delete_log), + mock.call.call('delete_log', self.ctxt, delete_log) + ] + self.log_plugin.driver_manager.assert_has_calls(calls) + def test_delete_nonexistent_log(self): with mock.patch.object(log_object.Log, 'get_object', return_value=None):