[log]: Add driver api and rpc stuff for logging

This patch adds driver api and rpc stuff for logging extension as
below:
- Provides create, update and delete log api for log drivers
- Reserves a rpc notification api for log drivers if a log driver
requires rpc.
- Reserves a rpc listener for listening callback from log drivers.
- Also provides db_api: get_logs_bound_port, get_logs_bound_sg and
  get_sg_log_info_for_port and get_sg_log_info_for_log_resources.

Change-Id: I7d50356dd1da49af6faaaa8969b6ae9041f81063
Partially-implements: blueprint security-group-logging
Related-Bug: #1468366
This commit is contained in:
Nguyen Phuong An 2016-11-09 17:02:48 +07:00
parent 921cfa9cdb
commit a253231522
15 changed files with 1023 additions and 2 deletions

View File

@ -27,6 +27,7 @@ from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.conf.services import logging as log_cfg
from neutron import manager
from neutron.services.logapi.rpc import agent as agent_rpc
log_cfg.register_log_driver_opts()
@ -85,10 +86,10 @@ class LoggingExtension(agent_extension.AgentExtension):
def initialize(self, connection, driver_type):
"""Initialize agent extension."""
self.resource_rpc = None
int_br = self.agent_api.request_int_br()
self.log_driver = manager.NeutronManager.load_class_for_provider(
LOGGING_DRIVERS_NAMESPACE, driver_type)(int_br)
self.resource_rpc = agent_rpc.LoggingApiStub()
self._register_rpc_consumers(connection)
self.log_driver.initialize(self.resource_rpc)

View File

@ -21,3 +21,32 @@ LOGGING_PLUGIN = 'logging-plugin'
# supported logging types
SECURITY_GROUP = 'security_group'
RPC_NAMESPACE_LOGGING = 'logging-plugin'
# String literal for identifying log resource
LOGGING = 'log'
# Method names for Logging Driver
PRECOMMIT_POSTFIX = '_precommit'
CREATE_LOG = 'create_log'
CREATE_LOG_PRECOMMIT = CREATE_LOG + PRECOMMIT_POSTFIX
UPDATE_LOG = 'update_log'
UPDATE_LOG_PRECOMMIT = UPDATE_LOG + PRECOMMIT_POSTFIX
DELETE_LOG = 'delete_log'
DELETE_LOG_PRECOMMIT = DELETE_LOG + PRECOMMIT_POSTFIX
# Tell to agent when resources related log_objects update
RESOURCE_UPDATE = 'resource_update'
LOG_CALL_METHODS = (
CREATE_LOG,
CREATE_LOG_PRECOMMIT,
UPDATE_LOG,
UPDATE_LOG_PRECOMMIT,
DELETE_LOG,
DELETE_LOG_PRECOMMIT,
RESOURCE_UPDATE
)
DIRECTION_IP_PREFIX = {'ingress': 'source_ip_prefix',
'egress': 'dest_ip_prefix'}

View File

@ -0,0 +1,258 @@
# Copyright (c) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from sqlalchemy.orm import exc as orm_exc
from neutron.db import api as db_api
from neutron.db.models import securitygroup as sg_db
from neutron.objects.logapi import logging_resource as log_object
from neutron.objects import ports as port_objects
from neutron.objects import securitygroup as sg_object
from neutron.services.logapi.common import constants
from neutron.services.logapi.common import validators
LOG = logging.getLogger(__name__)
def _get_ports_attached_to_sg(context, sg_id):
"""Return a list of ports attached to a security group"""
with db_api.context_manager.reader.using(context):
ports = context.session.query(
sg_db.SecurityGroupPortBinding.port_id).filter(
sg_db.SecurityGroupPortBinding.security_group_id ==
sg_id).all()
return [port for (port,) in ports]
def _get_ports_filter_in_tenant(context, tenant_id):
"""Return a list of ports filter under a tenant"""
try:
sg_id = sg_db.SecurityGroupPortBinding.security_group_id
with db_api.context_manager.reader.using(context):
ports = context.session.query(
sg_db.SecurityGroupPortBinding.port_id).join(
sg_db.SecurityGroup, sg_db.SecurityGroup.id == sg_id).filter(
sg_db.SecurityGroup.tenant_id == tenant_id).all()
return list({port for (port,) in ports})
except orm_exc.NoResultFound:
return []
def _get_sgs_attached_to_port(context, port_id):
"""Return a list of security groups are associated to a port"""
with db_api.context_manager.reader.using(context):
sg_ids = context.session.query(
sg_db.SecurityGroupPortBinding.security_group_id).filter(
sg_db.SecurityGroupPortBinding.port_id == port_id).all()
return [sg_id for (sg_id, ) in sg_ids]
def _get_ports_being_logged(context, sg_log):
"""Return a list of ports being logged for a log_resource"""
target_id = sg_log['target_id']
resource_id = sg_log['resource_id']
# if 'target_id' (port_id) is specified in a log_resource
if target_id is not None:
port_ids = [target_id]
# if 'resource_id' (sg_id) is specified in a log_resource
elif resource_id is not None:
port_ids = _get_ports_attached_to_sg(context, resource_id)
# both 'resource_id' and 'target_id' aren't specified in a log_resource
else:
port_ids = _get_ports_filter_in_tenant(context, sg_log['project_id'])
# list of validated ports's being logged
validated_port_ids = []
ports = port_objects.Port.get_objects(context, id=port_ids)
for port in ports:
if validators.validate_log_type_for_port('security_group', port):
validated_port_ids.append(port.id)
else:
msg = ("Logging type %(log_type)s is not supported on "
"port %(port_id)s." %
{'log_type': 'security_group', 'port_id': port.id})
LOG.warning(msg)
return validated_port_ids
def _get_sg_ids_log_for_port(context, sg_log, port_id):
"""Return a list of security group ids being logged for a port"""
sg_ids = _get_sgs_attached_to_port(context, port_id)
resource_id = sg_log['resource_id']
# if resource_id is not specified
if not resource_id:
return sg_ids
# if resource_id is specified and belong a set of sgs are
# associated to port
if resource_id in sg_ids:
return [resource_id]
return []
def _create_sg_rule_dict(rule_in_db):
"""Return a dict of a security group rule"""
direction = rule_in_db['direction']
rule_dict = {
'direction': direction,
'ethertype': rule_in_db['ethertype']}
rule_dict.update({
key: rule_in_db[key]
for key in ('protocol', 'port_range_min', 'port_range_max',
'remote_group_id') if rule_in_db[key] is not None})
remote_ip_prefix = rule_in_db['remote_ip_prefix']
if remote_ip_prefix is not None:
direction_ip_prefix = constants.DIRECTION_IP_PREFIX[direction]
rule_dict[direction_ip_prefix] = remote_ip_prefix
rule_dict['security_group_id'] = rule_in_db['security_group_id']
return rule_dict
def _get_sg_rules(context, sg_log, port_id):
"""Return a list of sg_rules log for a port being logged"""
sg_ids = _get_sg_ids_log_for_port(context, sg_log, port_id)
if not sg_ids:
return []
filters = {'security_group_id': sg_ids}
rules_in_db = sg_object.SecurityGroupRule.get_objects(context, **filters)
return [_create_sg_rule_dict(rule_in_db) for rule_in_db in rules_in_db]
def _get_port_log_dict(context, port_id, sg_log):
return {
'port_id': port_id,
'security_group_rules': _get_sg_rules(context, sg_log, port_id)
}
def _make_log_dict(context, sg_log, port_ids_log):
return {
'id': sg_log['id'],
'ports_log': [_get_port_log_dict(context, port_id, sg_log)
for port_id in port_ids_log],
'event': sg_log['event'],
'project_id': sg_log['project_id']
}
def get_logs_bound_port(context, port_id):
"""Return a list of log_resources bound to a port"""
port = port_objects.Port.get_object(context, id=port_id)
project_id = port['project_id']
logs = log_object.Log.get_objects(
context, project_id=project_id, enabled=True)
is_bound = lambda log: (log.resource_id in port.security_group_ids
or log.target_id == port.id
or (not log.target_id and not log.resource_id))
return [log for log in logs if is_bound(log)]
def get_logs_bound_sg(context, sg_id):
"""Return a list of log_resources bound to a security group"""
project_id = context.tenant_id
log_objs = log_object.Log.get_objects(
context, project_id=project_id, enabled=True)
log_resources = []
for log_obj in log_objs:
if log_obj.resource_id == sg_id:
log_resources.append(log_obj)
elif log_obj.target_id:
port = port_objects.Port.get_object(
context, id=log_obj.target_id)
if sg_id in port.security_group_ids:
log_resources.append(log_obj)
elif not log_obj.resource_id and not log_obj.target_id:
log_resources.append(log_obj)
return log_resources
def get_sg_log_info_for_port(context, port_id):
"""Return a list of security groups log info for a port
This method provides a list of security groups log info for a port.
The list has format as below:
[
{'id': xxx,
'ports_log': [{'port_id': u'xxx',
'security_group_rules': [{
'direction': u'egress',
'ethertype': u'IPv6',
'security_group_id': u'xxx'},
{...}]
}]
'event': u'ALL',
'project_id': u'xxx'
},
...
]
:param context: current running context information
:param port_id: port ID which needed to get security groups log info
"""
sg_logs = get_logs_bound_port(context, port_id)
return [_make_log_dict(context, sg_log, [port_id])
for sg_log in sg_logs]
def get_sg_log_info_for_log_resources(context, log_resources):
"""Return a list of security groups log info for list of log_resources
This method provides a list of security groups log info for list of
log_resources. The list has format as below:
[
{'id': xxx,
'ports_log': [{'port_id': u'xxx',
'security_group_rules': [{
'direction': u'egress',
'ethertype': u'IPv6',
'security_group_id': u'xxx'},
{...}]
}, ...]
'event': u'ALL',
'project_id': u'xxx'
},
...
]
:param context: current running context information
:param log_resources: list of log_resources, which needed to get
security groups log info
"""
logs_info = []
for sg_log in log_resources:
port_ids = _get_ports_being_logged(context, sg_log)
logs_info.append(_make_log_dict(context, sg_log, port_ids))
return logs_info

View File

@ -42,3 +42,8 @@ class InvalidResourceConstraint(n_exc.InvalidInput):
message = _("Invalid resource constraint between resource "
"(%(resource)s %(resource_id)s) and target resource "
"(%(target_resource)s %(target_id)s).")
class LogapiDriverException(n_exc.NeutronException):
"""A log api driver Exception"""
message = _("Driver exception: %(exception_msg)s")

View File

@ -77,3 +77,75 @@ class DriverBase(object):
{'log_type': log_type,
'driver_name': self.name})
return supported
def create_log(self, context, log_obj):
"""Create a log_obj invocation.
This method can be implemented by the specific driver subclass
to update the backend where necessary with a specific log object.
:param context: current running context information
:param log_obj: a log objects being created
"""
def create_log_precommit(self, context, log_obj):
"""Create a log_obj precommit.
This method can be implemented by the specific driver subclass
to handle the precommit event of a log_object that is being created.
:param context: current running context information
:param log_obj: a log object being created
"""
def update_log(self, context, log_obj):
"""Update a log_obj invocation.
This method can be implemented by the specific driver subclass
to update the backend where necessary with a specific log object.
:param context: current running context information
:param log_obj: a log object being updated
"""
def update_log_precommit(self, context, log_obj):
"""Update a log_obj precommit.
This method can be implemented by the specific driver subclass
to handle update precommit event of a log_object that is being updated.
:param context: current running context information
:param log_obj: a log_object being updated.
"""
def delete_log(self, context, log_obj):
"""Delete a log_obj invocation.
This method can be implemented by the specific driver subclass
to delete the backend where necessary with a specific log object.
:param context: current running context information
:param log_obj: a log_object being deleted
"""
def delete_log_precommit(self, context, log_obj):
"""Delete a log_obj precommit.
This method can be implemented by the specific driver subclass
to handle delete precommit event of a log_object that is being deleted.
:param context: current running context information
:param log_obj: a log_object being deleted
"""
def resource_update(self, context, log_objs):
"""Tell the agent when resources related to log_objects are
being updated
:param context: current running context information
:param log_objs: a list of log_objects, whose related resources are
being updated.
"""

View File

@ -15,19 +15,41 @@
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from oslo_log import log as logging
from neutron.common import exceptions
from neutron.services.logapi.common import constants as log_const
from neutron.services.logapi.common import db_api
from neutron.services.logapi.common import exceptions as log_exc
from neutron.services.logapi.rpc import server as server_rpc
LOG = logging.getLogger(__name__)
def _get_param(args, kwargs, name, index):
try:
return kwargs[name]
except KeyError:
try:
return args[index]
except IndexError:
msg = "Missing parameter %s" % name
raise log_exc.LogapiDriverException(exception_msg=msg)
@registry.has_registry_receivers
class LoggingServiceDriverManager(object):
def __init__(self):
self._drivers = set()
self.rpc_required = False
registry.publish(log_const.LOGGING_PLUGIN, events.AFTER_INIT, self)
if self.rpc_required:
self._start_rpc_listeners()
self.logging_rpc = server_rpc.LoggingApiNotification()
@property
def drivers(self):
return self._drivers
@ -38,6 +60,11 @@ class LoggingServiceDriverManager(object):
This method is called from drivers on INIT event.
"""
self._drivers.add(driver)
self.rpc_required |= driver.requires_rpc
def _start_rpc_listeners(self):
self._skeleton = server_rpc.LoggingApiSkeleton()
return self._skeleton.conn.consume_in_threads()
@property
def supported_logging_types(self):
@ -51,3 +78,52 @@ class LoggingServiceDriverManager(object):
LOG.debug("Supported logging types (logging types supported "
"by at least one loaded log_driver): %s", log_types)
return log_types
def call(self, method_name, *args, **kwargs):
"""Helper method for calling a method across all extension drivers."""
exc_list = []
for driver in self._drivers:
try:
getattr(driver, method_name)(*args, **kwargs)
except Exception as exc:
exception_msg = ("Extension driver '%(name)s' failed in "
"%(method)s")
exception_data = {'name': driver.name, 'method': method_name}
LOG.exception(exception_msg, exception_data)
exc_list.append(exc)
if exc_list:
raise exceptions.DriverCallError(exc_list=exc_list)
if self.rpc_required:
context = _get_param(args, kwargs, 'context', index=0)
log_obj = _get_param(args, kwargs, 'log_obj', index=1)
try:
rpc_method = getattr(self.logging_rpc, method_name)
except AttributeError:
LOG.error("Method %s is not implemented in logging RPC",
method_name)
return
rpc_method(context, log_obj)
@registry.receives(resources.SECURITY_GROUP_RULE,
[events.AFTER_CREATE, events.AFTER_DELETE])
def _handle_sg_rule_callback(self, resource, event, trigger, **kwargs):
"""Handle sg_rule create/delete events
This method handles sg_rule events, if sg_rule bound by log_resources,
it should tell to agent to update log_drivers.
"""
context = kwargs['context']
sg_rules = kwargs.get('security_group_rule')
if sg_rules:
sg_id = sg_rules.get('security_group_id')
else:
sg_id = kwargs.get('security_group_id')
log_resources = db_api.get_logs_bound_sg(context, sg_id)
if log_resources:
self.call(
log_const.RESOURCE_UPDATE, context, log_resources)

View File

@ -18,6 +18,7 @@ from neutron.db import db_base_plugin_common
from neutron.extensions import logging as log_ext
from neutron.objects import base as base_obj
from neutron.objects.logapi import logging_resource as log_object
from neutron.services.logapi.common import constants as log_const
from neutron.services.logapi.common import exceptions as log_exc
from neutron.services.logapi.common import validators
from neutron.services.logapi.drivers import manager as driver_mgr
@ -77,6 +78,12 @@ class LoggingPlugin(log_ext.LoggingPluginBase):
log_data.pop('tenant_id', None)
log_obj = log_object.Log(context=context, **log_data)
log_obj.create()
if log_obj.enabled:
self.driver_manager.call(
log_const.CREATE_LOG_PRECOMMIT, context, log_obj)
if log_obj.enabled:
self.driver_manager.call(
log_const.CREATE_LOG, context, log_obj)
return log_obj
@db_base_plugin_common.convert_result_to_dict
@ -87,6 +94,13 @@ class LoggingPlugin(log_ext.LoggingPluginBase):
log_obj = log_object.Log(context, id=log_id)
log_obj.update_fields(log_data, reset_changes=True)
log_obj.update()
need_notify = 'enabled' in log_data
if need_notify:
self.driver_manager.call(
log_const.UPDATE_LOG_PRECOMMIT, context, log_obj)
if need_notify:
self.driver_manager.call(
log_const.UPDATE_LOG, context, log_obj)
return log_obj
def delete_log(self, context, log_id):
@ -94,6 +108,10 @@ class LoggingPlugin(log_ext.LoggingPluginBase):
with db_api.context_manager.writer.using(context):
log_obj = self._get_log(context, log_id)
log_obj.delete()
self.driver_manager.call(
log_const.DELETE_LOG_PRECOMMIT, context, log_obj)
self.driver_manager.call(
log_const.DELETE_LOG, context, log_obj)
def get_loggable_resources(self, context, filters=None, fields=None,
sorts=None, limit=None,

View File

View File

@ -0,0 +1,45 @@
# Copyright (C) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import helpers as log_helpers
import oslo_messaging
from neutron.common import rpc as n_rpc
from neutron.services.logapi.common import constants as log_const
class LoggingApiStub(object):
"""Stub proxy code for agent->server communication."""
def __init__(self):
target = oslo_messaging.Target(
topic=log_const.LOGGING_PLUGIN,
version='1.0',
namespace=log_const.RPC_NAMESPACE_LOGGING)
self.rpc_client = n_rpc.get_client(target)
@log_helpers.log_method_call
def get_sg_log_info_for_port(self, context, port_id):
"""Return list of sg_log info for a port"""
cctxt = self.rpc_client.prepare()
return cctxt.call(context, 'get_sg_log_info_for_port', port_id=port_id)
@log_helpers.log_method_call
def get_sg_log_info_for_log_resources(self, context, log_resources):
"""Return list of sg_log info for list of log_resources"""
cctxt = self.rpc_client.prepare()
return cctxt.call(context, 'get_sg_log_info_for_log_resources',
log_resources=log_resources)

View File

@ -0,0 +1,69 @@
# Copyright (C) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import helpers as log_helpers
import oslo_messaging
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import rpc as n_rpc
from neutron.services.logapi.common import constants as log_const
from neutron.services.logapi.common import db_api
class LoggingApiSkeleton(object):
"""Skeleton proxy code for agent->server communication."""
# History
# 1.0 Initial version
target = oslo_messaging.Target(
version='1.0', namespace=log_const.RPC_NAMESPACE_LOGGING)
def __init__(self):
self.conn = n_rpc.create_connection()
self.conn.create_consumer(log_const.LOGGING_PLUGIN, [self],
fanout=False)
@log_helpers.log_method_call
def get_sg_log_info_for_port(self, context, port_id):
return db_api.get_sg_log_info_for_port(context, port_id)
@log_helpers.log_method_call
def get_sg_log_info_for_log_resources(self, context, log_resources):
return db_api.get_sg_log_info_for_log_resources(context, log_resources)
class LoggingApiNotification(object):
def __init__(self):
self.notification_api = resources_rpc.ResourcesPushRpcApi()
@log_helpers.log_method_call
def create_log(self, context, log_obj):
self.notification_api.push(context, [log_obj], events.CREATED)
@log_helpers.log_method_call
def update_log(self, context, log_obj):
self.notification_api.push(context, [log_obj], events.UPDATED)
@log_helpers.log_method_call
def delete_log(self, context, log_obj):
self.notification_api.push(context, [log_obj], events.DELETED)
@log_helpers.log_method_call
def resource_update(self, context, log_objs):
"""Tell to agent when resources related to log_objects updated"""
self.notification_api.push(context, log_objs, events.UPDATED)

View File

@ -0,0 +1,266 @@
# Copyright (c) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib import constants as const
from neutron_lib import context
from oslo_utils import uuidutils
from neutron.common import utils
from neutron.objects.logapi import logging_resource as log_object
from neutron.services.logapi.common import db_api
from neutron.services.logapi.common import validators
from neutron.services.logapi.rpc import server as server_rpc
from neutron.tests.unit.extensions import test_securitygroup as test_sg
def _create_log(tenant_id, resource_id=None,
target_id=None, event='ALL', enabled=True,):
log_data = {
'id': uuidutils.generate_uuid(),
'name': 'test',
'resource_type': 'security_group',
'project_id': tenant_id,
'event': event,
'enabled': enabled}
if resource_id:
log_data['resource_id'] = resource_id
if target_id:
log_data['target_id'] = target_id
return log_object.Log(**log_data)
class LoggingDBApiTestCase(test_sg.SecurityGroupDBTestCase):
def setUp(self):
super(LoggingDBApiTestCase, self).setUp()
self.context = context.get_admin_context()
self.sg_id, self.port_id, self.tenant_id = self._create_sg_and_port()
def _create_sg_and_port(self):
with self.network() as network, \
self.subnet(network), \
self.security_group() as sg:
sg_id = sg['security_group']['id']
tenant_id = sg['security_group']['tenant_id']
res = self._create_port(
self.fmt, network['network']['id'],
security_groups=[sg_id])
ports_rest = self.deserialize(self.fmt, res)
port_id = ports_rest['port']['id']
return sg_id, port_id, tenant_id
def test_get_logs_bound_port(self):
log = _create_log(target_id=self.port_id, tenant_id=self.tenant_id)
with mock.patch.object(log_object.Log, 'get_objects',
return_value=[log]):
self.assertEqual(
[log], db_api.get_logs_bound_port(self.context, self.port_id))
def test_get_logs_not_bound_port(self):
fake_sg_id = uuidutils.generate_uuid()
log = _create_log(resource_id=fake_sg_id, tenant_id=self.tenant_id)
with mock.patch.object(log_object.Log, 'get_objects',
return_value=[log]):
self.assertEqual(
[], db_api.get_logs_bound_port(self.context, self.port_id))
def test_get_logs_bound_sg(self):
log = _create_log(resource_id=self.sg_id, tenant_id=self.tenant_id)
with mock.patch.object(log_object.Log, 'get_objects',
return_value=[log]):
self.assertEqual(
[log], db_api.get_logs_bound_sg(self.context, self.sg_id))
def test_get_logs_not_bound_sg(self):
with self.network() as network, \
self.subnet(network), \
self.security_group() as sg:
sg2_id = sg['security_group']['id']
res = self._create_port(
self.fmt, network['network']['id'],
security_groups=[sg2_id])
port2_id = self.deserialize(self.fmt, res)['port']['id']
log = _create_log(target_id=port2_id, tenant_id=self.tenant_id)
with mock.patch.object(log_object.Log, 'get_objects',
return_value=[log]):
self.assertEqual(
[], db_api.get_logs_bound_sg(self.context, self.sg_id))
def test__get_ports_being_logged(self):
log1 = _create_log(target_id=self.port_id,
tenant_id=self.tenant_id)
log2 = _create_log(resource_id=self.sg_id,
tenant_id=self.tenant_id)
log3 = _create_log(target_id=self.port_id,
resource_id=self.tenant_id,
tenant_id=self.tenant_id)
log4 = _create_log(tenant_id=self.tenant_id)
with mock.patch.object(
validators, 'validate_log_type_for_port', return_value=True):
ports_log1 = db_api._get_ports_being_logged(self.context, log1)
ports_log2 = db_api._get_ports_being_logged(self.context, log2)
ports_log3 = db_api._get_ports_being_logged(self.context, log3)
ports_log4 = db_api._get_ports_being_logged(self.context, log4)
self.assertEqual([self.port_id], ports_log1)
self.assertEqual([self.port_id], ports_log2)
self.assertEqual([self.port_id], ports_log3)
self.assertEqual([self.port_id], ports_log4)
def test__get_ports_being_logged_not_supported_log_type(self):
log = _create_log(tenant_id=self.tenant_id)
with mock.patch.object(
validators, 'validate_log_type_for_port', return_value=False):
ports_log = db_api._get_ports_being_logged(self.context, log)
self.assertEqual([], ports_log)
class LoggingRpcCallbackTestCase(test_sg.SecurityGroupDBTestCase):
def setUp(self):
super(LoggingRpcCallbackTestCase, self).setUp()
self.context = context.get_admin_context()
self.rpc_callback = server_rpc.LoggingApiSkeleton()
def test_get_sg_log_info_for_create_or_update_log(self):
with self.network() as network, \
self.subnet(network), \
self.security_group() as sg:
sg_id = sg['security_group']['id']
tenant_id = sg['security_group']['tenant_id']
rule1 = self._build_security_group_rule(
sg_id,
'ingress', const.PROTO_NAME_TCP, '22', '22',
)
rule2 = self._build_security_group_rule(
sg_id,
'egress', const.PROTO_NAME_TCP,
remote_ip_prefix='10.0.0.1',
)
rules = {
'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
self._create_security_group_rule(self.fmt, rules)
res = self._create_port(
self.fmt, network['network']['id'],
security_groups=[sg_id])
ports_rest = self.deserialize(self.fmt, res)
port_id = ports_rest['port']['id']
log = _create_log(resource_id=sg_id, tenant_id=tenant_id)
with mock.patch.object(validators, 'validate_log_type_for_port',
return_value=True):
ports_log = (
self.rpc_callback.get_sg_log_info_for_log_resources(
self.context, log_resources=[log])
)
expected = [{
'event': log.event,
'id': log.id,
'ports_log': [{
'port_id': port_id,
'security_group_rules': [
{'direction': 'egress',
'ethertype': u'IPv4',
'security_group_id': sg_id},
{'direction': 'egress',
'ethertype': u'IPv6',
'security_group_id': sg_id},
{'direction': 'ingress',
'ethertype': u'IPv4',
'port_range_max': 22,
'port_range_min': 22,
'protocol': u'tcp',
'security_group_id': sg_id},
{'direction': 'egress',
'ethertype': u'IPv4',
'protocol': u'tcp',
'dest_ip_prefix':
utils.AuthenticIPNetwork('10.0.0.1/32'),
'security_group_id': sg_id}]
}],
'project_id': tenant_id
}]
self.assertEqual(expected, ports_log)
self._delete('ports', port_id)
def test_get_sg_log_info_for_port_added_event(self):
with self.network() as network, \
self.subnet(network), \
self.security_group() as sg:
sg_id = sg['security_group']['id']
tenant_id = sg['security_group']['tenant_id']
rule1 = self._build_security_group_rule(
sg_id,
'ingress', const.PROTO_NAME_TCP, '11', '13',
remote_ip_prefix='10.0.0.1',
)
rule2 = self._build_security_group_rule(
sg_id,
'egress', const.PROTO_NAME_ICMP,
)
rules = {
'security_group_rules': [rule1['security_group_rule'],
rule2['security_group_rule']]}
self._create_security_group_rule(self.fmt, rules)
res = self._create_port(
self.fmt, network['network']['id'],
security_groups=[sg_id],
tenant_id=tenant_id
)
ports_rest = self.deserialize(self.fmt, res)
port_id = ports_rest['port']['id']
log = _create_log(tenant_id=tenant_id)
with mock.patch.object(
log_object.Log, 'get_objects', return_value=[log]):
with mock.patch.object(
validators, 'validate_log_type_for_port',
return_value=True):
ports_log = (
self.rpc_callback.get_sg_log_info_for_port(
self.context, port_id=port_id)
)
expected = [{
'event': log.event,
'id': log.id,
'ports_log': [{
'port_id': port_id,
'security_group_rules': [
{'direction': 'egress',
'ethertype': u'IPv4',
'security_group_id': sg_id},
{'direction': 'egress',
'ethertype': u'IPv6',
'security_group_id': sg_id},
{'direction': 'ingress',
'ethertype': u'IPv4',
'port_range_max': 13,
'port_range_min': 11,
'protocol': u'tcp',
'source_ip_prefix':
utils.AuthenticIPNetwork('10.0.0.1/32'),
'security_group_id': sg_id},
{'direction': 'egress',
'ethertype': u'IPv4',
'protocol': u'icmp',
'security_group_id': sg_id}]
}],
'project_id': tenant_id
}]
self.assertEqual(expected, ports_log)
self._delete('ports', port_id)

View File

@ -13,11 +13,34 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.common import exceptions
from neutron.services.logapi.common import constants as log_const
from neutron.services.logapi.common import exceptions as log_exc
from neutron.services.logapi.drivers import base as log_driver_base
from neutron.services.logapi.drivers import manager as driver_mgr
from neutron.tests.unit.services.logapi import base
class TestGetParameter(base.BaseLogTestCase):
def test__get_param_missing_parameter(self):
kwargs = {'context': mock.sentinel.context}
self.assertRaises(log_exc.LogapiDriverException,
driver_mgr._get_param,
args=[], kwargs=kwargs,
name='log_obj', index=1)
self.assertRaises(log_exc.LogapiDriverException,
driver_mgr._get_param,
args=[mock.sentinel.context], kwargs={},
name='log_obj', index=1)
self.assertRaises(log_exc.LogapiDriverException,
driver_mgr._get_param,
args=[], kwargs={'log_obj': mock.sentinel.log_obj},
name='context', index=0)
class TestLogDriversManagerBase(base.BaseLogTestCase):
def setUp(self):
@ -77,3 +100,29 @@ class TestLogDriversManagerLoggingTypes(TestLogDriversManagerBase):
})
self.assertEqual(set(['security_group', 'firewall']),
driver_manager.supported_logging_types)
class TestLogDriversCalls(TestLogDriversManagerBase):
"""Test log driver calls"""
def setUp(self):
super(TestLogDriversCalls, self).setUp()
self.driver_manager = self._create_manager_with_drivers(
{'driver-A': {'is_loaded': True}})
def test_implemented_call_methods(self):
for method in log_const.LOG_CALL_METHODS:
with mock.patch.object(log_driver_base.DriverBase, method) as \
method_fnc:
context = mock.sentinel.context
log_obj = mock.sentinel.log_obj
self.driver_manager.call(
method, context=context, log_objs=[log_obj])
method_fnc.assert_called_once_with(
context=context, log_objs=[log_obj])
def test_not_implemented_call_methods(self):
context = mock.sentinel.context
log_obj = mock.sentinel.log_obj
self.assertRaises(exceptions.DriverCallError, self.driver_manager.call,
'wrong_method', context=context, log_objs=[log_obj])

View File

@ -0,0 +1,93 @@
# Copyright (c) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
import oslo_messaging
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.handlers import resources_rpc
from neutron.services.logapi.common import constants as log_const
from neutron.services.logapi.rpc import server as server_rpc
from neutron.tests import base
class LoggingApiNotificationTestCase(base.BaseTestCase):
def setUp(self):
super(LoggingApiNotificationTestCase, self).setUp()
self.test_obj = server_rpc.LoggingApiNotification()
def test___init__(self):
self.assertIsInstance(self.test_obj.notification_api,
resources_rpc.ResourcesPushRpcApi)
@mock.patch("neutron.api.rpc.handlers.resources_rpc.ResourcesPushRpcApi."
"push")
def test_create_log(self, mocked_push):
m_context = mock.Mock()
m_log_resource = mock.Mock()
self.test_obj.create_log(m_context, m_log_resource)
mocked_push.assert_called_with(m_context, [m_log_resource],
events.CREATED)
@mock.patch("neutron.api.rpc.handlers.resources_rpc.ResourcesPushRpcApi."
"push")
def test_update_log(self, mocked_push):
m_context = mock.Mock()
m_log_resource = mock.Mock()
self.test_obj.update_log(m_context, m_log_resource)
mocked_push.assert_called_with(m_context, [m_log_resource],
events.UPDATED)
@mock.patch("neutron.api.rpc.handlers.resources_rpc.ResourcesPushRpcApi."
"push")
def test_delete_log(self, mocked_push):
m_context = mock.Mock()
m_log_resource = mock.Mock()
self.test_obj.delete_log(m_context, m_log_resource)
mocked_push.assert_called_with(m_context, [m_log_resource],
events.DELETED)
class LoggingApiSkeletonTestCase(base.BaseTestCase):
@mock.patch("neutron.common.rpc.get_server")
def test___init__(self, mocked_get_server):
test_obj = server_rpc.LoggingApiSkeleton()
_target = oslo_messaging.Target(
topic=log_const.LOGGING_PLUGIN,
server=cfg.CONF.host,
fanout=False)
mocked_get_server.assert_called_with(_target, [test_obj])
@mock.patch("neutron.services.logapi.common.db_api."
"get_sg_log_info_for_port")
def test_get_sg_log_info_for_port(self, mock_callback):
test_obj = server_rpc.LoggingApiSkeleton()
m_context = mock.Mock()
port_id = '123'
test_obj.get_sg_log_info_for_port(m_context, port_id=port_id)
mock_callback.assert_called_with(m_context, port_id)
@mock.patch("neutron.services.logapi.common.db_api."
"get_sg_log_info_for_log_resources")
def test_get_sg_log_info_for_log_resources(self, mock_callback):
test_obj = server_rpc.LoggingApiSkeleton()
m_context = mock.Mock()
log_resources = [mock.Mock()]
test_obj.get_sg_log_info_for_log_resources(m_context,
log_resources=log_resources)
mock_callback.assert_called_with(m_context, log_resources)

View File

@ -58,7 +58,7 @@ class TestLoggingPlugin(base.BaseLogTestCase):
mock.patch('neutron.services.logapi.drivers.manager.'
'LoggingServiceDriverManager.supported_logging_types',
new_callable=log_types).start()
self.ctxt = context.Context('fake_user', 'fake_tenant')
self.ctxt = context.Context('admin', 'fake_tenant')
mock.patch.object(self.ctxt.session, 'refresh').start()
mock.patch.object(self.ctxt.session, 'expunge').start()
@ -111,6 +111,13 @@ class TestLoggingPlugin(base.BaseLogTestCase):
context=self.ctxt, **log['log'])
self.assertTrue(new_log.create.called)
calls = [
mock.call.call('create_log_precommit',
self.ctxt, new_log),
mock.call.call('create_log', self.ctxt, new_log)
]
self.log_plugin.driver_manager.assert_has_calls(calls)
def test_create_log_without_sg_resource(self):
log = {'log': {'resource_type': 'security_group',
'enabled': True,
@ -129,6 +136,13 @@ class TestLoggingPlugin(base.BaseLogTestCase):
context=self.ctxt, **log['log'])
self.assertTrue(new_log.create.called)
calls = [
mock.call.call('create_log_precommit',
self.ctxt, new_log),
mock.call.call('create_log', self.ctxt, new_log)
]
self.log_plugin.driver_manager.assert_has_calls(calls)
def test_create_log_without_parent_resource(self):
log = {'log': {'resource_type': 'security_group',
'enabled': True,
@ -144,6 +158,12 @@ class TestLoggingPlugin(base.BaseLogTestCase):
**log['log'])
self.assertTrue(new_log.create.called)
calls = [
mock.call.call('create_log_precommit', self.ctxt, new_log),
mock.call.call('create_log', self.ctxt, new_log)
]
self.log_plugin.driver_manager.assert_has_calls(calls)
def test_create_log_without_target(self):
log = {'log': {'resource_type': 'security_group',
'enabled': True, }}
@ -157,6 +177,12 @@ class TestLoggingPlugin(base.BaseLogTestCase):
**log['log'])
self.assertTrue(new_log.create.called)
calls = [
mock.call.call('create_log_precommit', self.ctxt, new_log),
mock.call.call('create_log', self.ctxt, new_log)
]
self.log_plugin.driver_manager.assert_has_calls(calls)
def test_create_log_nonexistent_sg_resource(self):
log = {'log': {'resource_type': 'security_group',
'enabled': True,
@ -212,6 +238,7 @@ class TestLoggingPlugin(base.BaseLogTestCase):
init_log_mock.assert_called_once_with(
context=self.ctxt, **log_data['log'])
self.assertTrue(new_log.create.called)
self.log_plugin.driver_manager.call.assert_not_called()
def test_create_log_with_unsupported_logging_type(self):
log = {'log': {'resource_type': 'fake_type',
@ -254,6 +281,12 @@ class TestLoggingPlugin(base.BaseLogTestCase):
reset_changes=True)
self.assertTrue(new_log.update.called)
calls = [
mock.call.call('update_log_precommit', self.ctxt, new_log),
mock.call.call('update_log', self.ctxt, new_log)
]
self.log_plugin.driver_manager.assert_has_calls(calls)
def test_update_log_none_enabled(self):
log_data = {'log': {}}
new_log = mock.Mock()
@ -267,6 +300,7 @@ class TestLoggingPlugin(base.BaseLogTestCase):
new_log.update_fields.assert_called_once_with(log_data['log'],
reset_changes=True)
self.assertTrue(new_log.update.called)
self.log_plugin.driver_manager.call.assert_not_called()
def test_delete_log(self):
delete_log = mock.Mock()
@ -278,6 +312,12 @@ class TestLoggingPlugin(base.BaseLogTestCase):
id=delete_log.id)
self.assertTrue(delete_log.delete.called)
calls = [
mock.call.call('delete_log_precommit', self.ctxt, delete_log),
mock.call.call('delete_log', self.ctxt, delete_log)
]
self.log_plugin.driver_manager.assert_has_calls(calls)
def test_delete_nonexistent_log(self):
with mock.patch.object(log_object.Log, 'get_object',
return_value=None):