[log]: implement logging agent extension

This patch introduces generic logging agent extension following
the spec [1].

[1] https://specs.openstack.org/openstack/neutron-specs/specs/pike/logging-API-for-security-group-rules.html

Co-Authored-By: Yushiro FURUKAWA <y.furukawa_2@jp.fujitsu.com>

Change-Id: I1a59367cf23060fb1a0cd9bab6772b22da15c9f0
Partially-implements: blueprint security-group-logging
Related-Bug: #1468366
This commit is contained in:
Nguyen Phuong An 2016-11-09 17:02:48 +07:00
parent b11b5c5c43
commit bb8954a228
7 changed files with 315 additions and 1 deletions

View File

@ -0,0 +1,39 @@
# Copyright 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_config import cfg
from neutron._i18n import _
log_driver_opts = [
cfg.IntOpt(
'rate_limit',
default=100,
min=100,
help=_('Maximum packets logging per second.')),
cfg.IntOpt(
'burst_limit',
default=25,
min=25,
help=_('Maximum number of packets per rate_limit.')),
cfg.StrOpt(
'local_output_log_base',
help=_('Output logfile path on agent side, default syslog file.')),
]
def register_log_driver_opts(cfg=cfg.CONF):
cfg.register_opts(log_driver_opts, 'network_log')

View File

@ -43,6 +43,7 @@ import neutron.conf.plugins.ml2.drivers.mech_sriov.agent_common
import neutron.conf.plugins.ml2.drivers.ovs_conf
import neutron.conf.quota
import neutron.conf.service
import neutron.conf.services.logging
import neutron.conf.services.metering_agent
import neutron.conf.wsgi
import neutron.db.agents_db
@ -184,7 +185,9 @@ def list_linux_bridge_opts():
AGENT_EXT_MANAGER_OPTS)
),
('securitygroup',
neutron.conf.agent.securitygroups_rpc.security_group_opts)
neutron.conf.agent.securitygroups_rpc.security_group_opts),
('network_log',
neutron.conf.services.logging.log_driver_opts)
]
@ -269,6 +272,8 @@ def list_ovs_opts():
),
('securitygroup',
neutron.conf.agent.securitygroups_rpc.security_group_opts),
('network_log',
neutron.conf.services.logging.log_driver_opts)
]

View File

@ -0,0 +1,138 @@
# Copyright (c) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import contextlib
from neutron_lib import constants
from oslo_concurrency import lockutils
import six
from neutron.agent import agent_extension
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.conf.services import logging as log_cfg
from neutron import manager
log_cfg.register_log_driver_opts()
LOGGING_DRIVERS_NAMESPACE = 'neutron.services.logapi.drivers'
@six.add_metaclass(abc.ABCMeta)
class LoggingDriver(object):
"""Defines abstract interface for logging driver"""
# specific logging types are supported
SUPPORTED_LOGGING_TYPES = None
@abc.abstractmethod
def initialize(self, resource_rpc, **kwargs):
"""Perform logging driver initialization.
"""
@abc.abstractmethod
def start_logging(self, context, **kwargs):
"""Enable logging
:param context: rpc context
:param kwargs: log_resources data or port_id
"""
@abc.abstractmethod
def stop_logging(self, context, **kwargs):
"""Disable logging
:param context: rpc context
:param kwargs: log_resources data or port_id
"""
def defer_apply_on(self):
"""Defer application of logging rule."""
pass
def defer_apply_off(self):
"""Turn off deferral of rules and apply the logging rules now."""
pass
@contextlib.contextmanager
def defer_apply(self):
"""Defer apply context."""
self.defer_apply_on()
try:
yield
finally:
self.defer_apply_off()
class LoggingExtension(agent_extension.AgentExtension):
SUPPORTED_RESOURCE_TYPES = [resources.LOGGING_RESOURCE]
def initialize(self, connection, driver_type):
"""Initialize agent extension."""
self.resource_rpc = None
int_br = self.agent_api.request_int_br()
self.log_driver = manager.NeutronManager.load_class_for_provider(
LOGGING_DRIVERS_NAMESPACE, driver_type)(int_br)
self._register_rpc_consumers(connection)
self.log_driver.initialize(self.resource_rpc)
def consume_api(self, agent_api):
self.agent_api = agent_api
def _register_rpc_consumers(self, connection):
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
for resource_type in self.SUPPORTED_RESOURCE_TYPES:
registry.register(self._handle_notification, resource_type)
topic = resources_rpc.resource_type_versioned_topic(resource_type)
connection.create_consumer(topic, endpoints, fanout=True)
@lockutils.synchronized('log-port')
def _handle_notification(self, context, resource_type,
log_resources, event_type):
with self.log_driver.defer_apply():
if event_type == events.UPDATED:
self._update_logging(context, log_resources)
elif event_type == events.CREATED:
self.log_driver.start_logging(
context, log_resources=log_resources)
elif event_type == events.DELETED:
self.log_driver.stop_logging(
context, log_resources=log_resources)
@lockutils.synchronized('log-port')
def handle_port(self, context, port):
if port['device_owner'].startswith(
constants.DEVICE_OWNER_COMPUTE_PREFIX):
self.log_driver.start_logging(context, port_id=port['port_id'])
def delete_port(self, context, port):
self.log_driver.stop_logging(context, port_id=port['port_id'])
def _update_logging(self, context, log_resources):
enables = []
disables = []
for log_resource in log_resources:
if log_resource.enabled:
enables.append(log_resource)
else:
disables.append(log_resource)
if enables:
self.log_driver.start_logging(context, log_resources=enables)
if disables:
self.log_driver.stop_logging(context, log_resources=disables)

View File

@ -0,0 +1,131 @@
# Copyright (C) 2017 Fujitsu Limited
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib import context
from oslo_utils import uuidutils
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent import (
ovs_agent_extension_api as ovs_ext_api)
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl import (
ovs_bridge)
from neutron.services.logapi.agent import log_extension as log_ext
from neutron.tests import base
class FakeLogDriver(log_ext.LoggingDriver):
SUPPORTED_LOGGING_TYPES = ['security_group']
def initialize(self, resource_rpc, **kwargs):
pass
def start_logging(self, context, **kwargs):
pass
def stop_logging(self, context, **kwargs):
pass
class LoggingExtensionBaseTestCase(base.BaseTestCase):
def setUp(self):
super(LoggingExtensionBaseTestCase, self).setUp()
conn_patcher = mock.patch(
'neutron.agent.ovsdb.native.connection.Connection.start')
conn_patcher.start()
self.agent_ext = log_ext.LoggingExtension()
self.context = context.get_admin_context()
self.connection = mock.Mock()
agent_api = ovs_ext_api.OVSAgentExtensionAPI(
ovs_bridge.OVSAgentBridge('br-int'),
ovs_bridge.OVSAgentBridge('br-tun'))
self.agent_ext.consume_api(agent_api)
mock.patch(
'neutron.manager.NeutronManager.load_class_for_provider').start()
class LoggingExtensionTestCase(LoggingExtensionBaseTestCase):
def setUp(self):
super(LoggingExtensionTestCase, self).setUp()
self.agent_ext.initialize(
self.connection, constants.EXTENSION_DRIVER_TYPE)
self.log_driver = mock.Mock()
log_driver_object = FakeLogDriver()
self.log_driver.defer_apply.side_effect = log_driver_object.defer_apply
self.agent_ext.log_driver = self.log_driver
def _create_test_port_dict(self, device_owner):
return {'port_id': uuidutils.generate_uuid(),
'device_owner': device_owner}
def test__handle_notification_passes_update_events_enabled_log(self):
log_obj = mock.Mock()
log_obj.enabled = True
self.agent_ext._handle_notification(
self.context, 'log', [log_obj], events.UPDATED)
self.assertTrue(self.log_driver.start_logging.called)
def test__handle_notification_passes_update_events_disabled_log(self):
log_obj = mock.Mock()
log_obj.enabled = False
self.agent_ext._handle_notification(
self.context, 'log', [log_obj], events.UPDATED)
self.assertTrue(self.log_driver.stop_logging.called)
def test__handle_notification_passes_create_events(self):
log_obj = mock.Mock()
self.agent_ext._handle_notification(
self.context, 'log', [log_obj], events.CREATED)
self.assertTrue(self.log_driver.start_logging.called)
def test__handle_notification_passes_delete_events(self):
log_obj = mock.Mock()
self.agent_ext._handle_notification(
self.context, 'log', [log_obj], events.DELETED)
self.assertTrue(self.log_driver.stop_logging.called)
def test_handle_port_vm(self):
port = self._create_test_port_dict(device_owner='compute:nova')
self.agent_ext.handle_port(self.context, port)
self.assertTrue(self.log_driver.start_logging.called)
def test_handle_not_port_vm(self):
port = self._create_test_port_dict(
device_owner='network:router_interface')
self.agent_ext.handle_port(self.context, port)
self.assertFalse(self.log_driver.start_logging.called)
class LoggingExtensionInitializeTestCase(LoggingExtensionBaseTestCase):
@mock.patch.object(registry, 'register')
@mock.patch.object(resources_rpc, 'ResourcesPushRpcCallback')
def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock):
self.agent_ext.initialize(
self.connection, constants.EXTENSION_DRIVER_TYPE)
self.connection.create_consumer.assert_has_calls(
[mock.call(
resources_rpc.resource_type_versioned_topic(resource_type),
[rpc_mock()],
fanout=True)
for resource_type in self.agent_ext.SUPPORTED_RESOURCE_TYPES]
)
subscribe_mock.assert_called_with(mock.ANY, resources.LOGGING_RESOURCE)

View File

@ -114,6 +114,7 @@ neutron.ipam_drivers =
neutron.agent.l2.extensions =
qos = neutron.agent.l2.extensions.qos:QosAgentExtension
fdb = neutron.agent.l2.extensions.fdb_population:FdbPopulationAgentExtension
log = neutron.services.logapi.agent.log_extension:LoggingExtension
neutron.qos.agent_drivers =
ovs = neutron.plugins.ml2.drivers.openvswitch.agent.extension_drivers.qos_driver:QosOVSAgentDriver
sriov = neutron.plugins.ml2.drivers.mech_sriov.agent.extension_drivers.qos_driver:QosSRIOVAgentDriver