[log] Generic RPC stuffs for logging in server side

This patch intends to make generic RPC stuffs for logging in server side.
So logging can be extended to be used by other resources like firewall.

Partial-Bug: #1720727
Change-Id: I6f93a71c2ae725e3b97d8abc5f57093894792116
This commit is contained in:
Cao Xuan Hoang 2018-01-16 17:22:59 +07:00 committed by Kim Bao Long
parent d00a1558a5
commit a4ffcab4ca
8 changed files with 224 additions and 91 deletions

View File

@ -24,6 +24,9 @@ SECURITY_GROUP = 'security_group'
RPC_NAMESPACE_LOGGING = 'logging-plugin'
# Define for rpc_method_key
LOG_RESOURCE = 'log_resource'
# String literal for identifying log resource
LOGGING = 'log'

View File

@ -18,6 +18,7 @@ from neutron_lib.callbacks import registry
from oslo_log import log as logging
from neutron.services.logapi.common import constants as log_const
from neutron.services.logapi.rpc import server as server_rpc
LOG = logging.getLogger(__name__)
@ -53,6 +54,9 @@ class DriverBase(object):
# trigger is the LoggingServiceDriverManager
trigger.register_driver(self)
def register_rpc_methods(self, resource_type, rpc_methods):
server_rpc.register_rpc_methods(resource_type, rpc_methods)
def is_loaded(self):
"""True if the driver is active for the Neutron Server.

View File

@ -14,9 +14,12 @@
# under the License.
from neutron_lib.api.definitions import portbindings
from neutron_lib.callbacks import resources
from oslo_log import log as logging
from neutron.services.logapi.common import constants as log_const
from neutron.services.logapi.drivers import base
from neutron.services.logapi.rpc import server as server_rpc
LOG = logging.getLogger(__name__)
@ -43,4 +46,13 @@ def register():
global DRIVER
if not DRIVER:
DRIVER = OVSDriver.create()
# Register RPC methods
if DRIVER.requires_rpc:
rpc_methods = [
{resources.PORT: server_rpc.get_sg_log_info_for_port},
{log_const.LOG_RESOURCE:
server_rpc.get_sg_log_info_for_log_resources}
]
DRIVER.register_rpc_methods(log_const.SECURITY_GROUP, rpc_methods)
LOG.debug('Open vSwitch logging driver registered')

View File

@ -274,10 +274,14 @@ class OVSFirewallLoggingDriver(log_ext.LoggingDriver):
# try to clean port flows log for port updated/create event
self._cleanup_port_flows_log(port_id)
logs_info = self.resource_rpc.get_sg_log_info_for_port(
context, port_id=port_id)
context,
resource_type=log_const.SECURITY_GROUP,
port_id=port_id)
elif log_resources:
logs_info = self.resource_rpc.get_sg_log_info_for_log_resources(
context, log_resources=log_resources)
context,
resource_type=log_const.SECURITY_GROUP,
log_resources=log_resources)
for log_info in logs_info:
log_id = log_info['id']

View File

@ -32,14 +32,18 @@ class LoggingApiStub(object):
self.rpc_client = n_rpc.get_client(target)
@log_helpers.log_method_call
def get_sg_log_info_for_port(self, context, port_id):
def get_sg_log_info_for_port(self, context, resource_type, port_id):
"""Return list of sg_log info for a port"""
cctxt = self.rpc_client.prepare()
return cctxt.call(context, 'get_sg_log_info_for_port', port_id=port_id)
return cctxt.call(context, 'get_sg_log_info_for_port',
resource_type=resource_type,
port_id=port_id)
@log_helpers.log_method_call
def get_sg_log_info_for_log_resources(self, context, log_resources):
def get_sg_log_info_for_log_resources(self, context,
resource_type, log_resources):
"""Return list of sg_log info for list of log_resources"""
cctxt = self.rpc_client.prepare()
return cctxt.call(context, 'get_sg_log_info_for_log_resources',
resource_type=resource_type,
log_resources=log_resources)

View File

@ -13,7 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.callbacks import resources as r_const
from oslo_log import helpers as log_helpers
from oslo_log import log as logging
import oslo_messaging
from neutron.api.rpc.callbacks import events
@ -22,15 +24,57 @@ from neutron.common import rpc as n_rpc
from neutron.services.logapi.common import constants as log_const
from neutron.services.logapi.common import db_api
LOG = logging.getLogger(__name__)
# RPC methods mapping
RPC_RESOURCES_METHOD_MAP = {}
# This function must be called when a log_driver is registered.
def register_rpc_methods(resource_type, rpc_methods):
"""Register RPC methods.
:param resource_type: string and must be a valid resource type.
:param rpc_methods: list of RPC methods to be registered.
This param would look like:
[
{'PORT': get_sg_log_info_for_port},
{'LOG_RESOURCE': get_sg_log_info_for_log_resources}
]
"""
if resource_type not in RPC_RESOURCES_METHOD_MAP:
RPC_RESOURCES_METHOD_MAP[resource_type] = rpc_methods
def get_rpc_method(resource_type, rpc_method_key):
if resource_type not in RPC_RESOURCES_METHOD_MAP:
raise NotImplementedError()
for rpc_method in RPC_RESOURCES_METHOD_MAP[resource_type]:
if rpc_method_key in rpc_method.keys():
return list(rpc_method.values())[0]
raise NotImplementedError()
def get_sg_log_info_for_port(context, port_id):
return db_api.get_sg_log_info_for_port(context, port_id)
def get_sg_log_info_for_log_resources(context, log_resources):
return db_api.get_sg_log_info_for_log_resources(context, log_resources)
class LoggingApiSkeleton(object):
"""Skeleton proxy code for agent->server communication."""
# History
# 1.0 Initial version
# 1.1 Introduce resource_type as a keyword in order to extend
# support for other resources
target = oslo_messaging.Target(
version='1.0', namespace=log_const.RPC_NAMESPACE_LOGGING)
version='1.1', namespace=log_const.RPC_NAMESPACE_LOGGING)
def __init__(self):
self.conn = n_rpc.Connection()
@ -38,12 +82,21 @@ class LoggingApiSkeleton(object):
fanout=False)
@log_helpers.log_method_call
def get_sg_log_info_for_port(self, context, port_id):
return db_api.get_sg_log_info_for_port(context, port_id)
def get_sg_log_info_for_port(self, context, port_id, **kwargs):
resource_type = kwargs.get('resource_type', log_const.SECURITY_GROUP)
LOG.debug("Logging agent requests log info "
"for port with resource type %s", resource_type)
rpc_method = get_rpc_method(resource_type, r_const.PORT)
return rpc_method(context, port_id)
@log_helpers.log_method_call
def get_sg_log_info_for_log_resources(self, context, log_resources):
return db_api.get_sg_log_info_for_log_resources(context, log_resources)
def get_sg_log_info_for_log_resources(self, context,
log_resources, **kwargs):
resource_type = kwargs.get('resource_type', log_const.SECURITY_GROUP)
LOG.debug("Logging agent requests log info "
"for log resources with resource type %s", resource_type)
rpc_method = get_rpc_method(resource_type, log_const.LOG_RESOURCE)
return rpc_method(context, log_resources)
class LoggingApiNotification(object):

View File

@ -20,6 +20,7 @@ from oslo_utils import uuidutils
from neutron.common import utils
from neutron.objects.logapi import logging_resource as log_object
from neutron.services.logapi.common import constants as log_const
from neutron.services.logapi.common import db_api
from neutron.services.logapi.common import validators
from neutron.services.logapi.rpc import server as server_rpc
@ -162,41 +163,49 @@ class LoggingRpcCallbackTestCase(test_sg.SecurityGroupDBTestCase):
ports_rest = self.deserialize(self.fmt, res)
port_id = ports_rest['port']['id']
log = _create_log(resource_id=sg_id, tenant_id=tenant_id)
with mock.patch.object(validators, 'validate_log_type_for_port',
return_value=True):
ports_log = (
self.rpc_callback.get_sg_log_info_for_log_resources(
self.context, log_resources=[log])
)
expected = [{
'event': log.event,
'id': log.id,
'ports_log': [{
'port_id': port_id,
'security_group_rules': [
{'direction': 'egress',
'ethertype': u'IPv4',
'security_group_id': sg_id},
{'direction': 'egress',
'ethertype': u'IPv6',
'security_group_id': sg_id},
{'direction': 'ingress',
'ethertype': u'IPv4',
'port_range_max': 22,
'port_range_min': 22,
'protocol': u'tcp',
'security_group_id': sg_id},
{'direction': 'egress',
'ethertype': u'IPv4',
'protocol': u'tcp',
'dest_ip_prefix':
utils.AuthenticIPNetwork('10.0.0.1/32'),
'security_group_id': sg_id}]
}],
'project_id': tenant_id
}]
self.assertEqual(expected, ports_log)
self._delete('ports', port_id)
with mock.patch.object(
server_rpc,
'get_rpc_method',
return_value=server_rpc.get_sg_log_info_for_log_resources
):
with mock.patch.object(validators,
'validate_log_type_for_port',
return_value=True):
ports_log = (
self.rpc_callback.get_sg_log_info_for_log_resources(
self.context,
resource_type=log_const.SECURITY_GROUP,
log_resources=[log])
)
expected = [{
'event': log.event,
'id': log.id,
'ports_log': [{
'port_id': port_id,
'security_group_rules': [
{'direction': 'egress',
'ethertype': u'IPv4',
'security_group_id': sg_id},
{'direction': 'egress',
'ethertype': u'IPv6',
'security_group_id': sg_id},
{'direction': 'ingress',
'ethertype': u'IPv4',
'port_range_max': 22,
'port_range_min': 22,
'protocol': u'tcp',
'security_group_id': sg_id},
{'direction': 'egress',
'ethertype': u'IPv4',
'protocol': u'tcp',
'dest_ip_prefix':
utils.AuthenticIPNetwork('10.0.0.1/32'),
'security_group_id': sg_id}]
}],
'project_id': tenant_id
}]
self.assertEqual(expected, ports_log)
self._delete('ports', port_id)
def test_get_sg_log_info_for_port_added_event(self):
with self.network() as network, \
@ -228,39 +237,48 @@ class LoggingRpcCallbackTestCase(test_sg.SecurityGroupDBTestCase):
with mock.patch.object(
log_object.Log, 'get_objects', return_value=[log]):
with mock.patch.object(
validators, 'validate_log_type_for_port',
return_value=True):
ports_log = (
self.rpc_callback.get_sg_log_info_for_port(
self.context, port_id=port_id)
)
expected = [{
'event': log.event,
'id': log.id,
'ports_log': [{
'port_id': port_id,
'security_group_rules': [
{'direction': 'egress',
'ethertype': u'IPv4',
'security_group_id': sg_id},
{'direction': 'egress',
'ethertype': u'IPv6',
'security_group_id': sg_id},
{'direction': 'ingress',
'ethertype': u'IPv4',
'port_range_max': 13,
'port_range_min': 11,
'protocol': u'tcp',
'source_ip_prefix':
utils.AuthenticIPNetwork('10.0.0.1/32'),
'security_group_id': sg_id},
{'direction': 'egress',
'ethertype': u'IPv4',
'protocol': u'icmp',
'security_group_id': sg_id}]
}],
'project_id': tenant_id
}]
server_rpc,
'get_rpc_method',
return_value=server_rpc.get_sg_log_info_for_port
):
with mock.patch.object(
validators,
'validate_log_type_for_port',
return_value=True):
ports_log = (
self.rpc_callback.get_sg_log_info_for_port(
self.context,
resource_type=log_const.SECURITY_GROUP,
port_id=port_id)
)
expected = [{
'event': log.event,
'id': log.id,
'ports_log': [{
'port_id': port_id,
'security_group_rules': [
{'direction': 'egress',
'ethertype': u'IPv4',
'security_group_id': sg_id},
{'direction': 'egress',
'ethertype': u'IPv6',
'security_group_id': sg_id},
{'direction': 'ingress',
'ethertype': u'IPv4',
'port_range_max': 13,
'port_range_min': 11,
'protocol': u'tcp',
'source_ip_prefix':
utils.AuthenticIPNetwork(
'10.0.0.1/32'),
'security_group_id': sg_id},
{'direction': 'egress',
'ethertype': u'IPv4',
'protocol': u'icmp',
'security_group_id': sg_id}]
}],
'project_id': tenant_id
}]
self.assertEqual(expected, ports_log)
self._delete('ports', port_id)
self.assertEqual(expected, ports_log)
self._delete('ports', port_id)

View File

@ -62,6 +62,26 @@ class LoggingApiNotificationTestCase(base.BaseTestCase):
events.DELETED)
class TestRegisterValidateRPCMethods(base.BaseTestCase):
def test_register_rpc_methods_method(self):
resource_type = 'security_group'
method = [{'fake_key1': 'fake_method1'},
{'fake_key2': 'fake_method2'}]
expected = {resource_type: method}
server_rpc.RPC_RESOURCES_METHOD_MAP.clear()
server_rpc.register_rpc_methods(resource_type, method)
self.assertEqual(expected, server_rpc.RPC_RESOURCES_METHOD_MAP)
def test_get_rpc_method(self):
resource_type = 'security_group'
method = [{'fake_key1': 'fake_method1'},
{'fake_key2': 'fake_method2'}]
server_rpc.RPC_RESOURCES_METHOD_MAP = {resource_type: method}
actual = server_rpc.get_rpc_method('security_group', 'fake_key1')
self.assertEqual('fake_method1', actual)
class LoggingApiSkeletonTestCase(base.BaseTestCase):
@mock.patch("neutron.common.rpc.get_server")
@ -76,18 +96,33 @@ class LoggingApiSkeletonTestCase(base.BaseTestCase):
@mock.patch("neutron.services.logapi.common.db_api."
"get_sg_log_info_for_port")
def test_get_sg_log_info_for_port(self, mock_callback):
test_obj = server_rpc.LoggingApiSkeleton()
m_context = mock.Mock()
port_id = '123'
test_obj.get_sg_log_info_for_port(m_context, port_id=port_id)
mock_callback.assert_called_with(m_context, port_id)
with mock.patch.object(
server_rpc,
'get_rpc_method',
return_value=server_rpc.get_sg_log_info_for_port
):
test_obj = server_rpc.LoggingApiSkeleton()
m_context = mock.Mock()
port_id = '123'
test_obj.get_sg_log_info_for_port(
m_context,
resource_type=log_const.SECURITY_GROUP,
port_id=port_id)
mock_callback.assert_called_with(m_context, port_id)
@mock.patch("neutron.services.logapi.common.db_api."
"get_sg_log_info_for_log_resources")
def test_get_sg_log_info_for_log_resources(self, mock_callback):
test_obj = server_rpc.LoggingApiSkeleton()
m_context = mock.Mock()
log_resources = [mock.Mock()]
test_obj.get_sg_log_info_for_log_resources(m_context,
log_resources=log_resources)
mock_callback.assert_called_with(m_context, log_resources)
with mock.patch.object(
server_rpc,
'get_rpc_method',
return_value=server_rpc.get_sg_log_info_for_log_resources
):
test_obj = server_rpc.LoggingApiSkeleton()
m_context = mock.Mock()
log_resources = [mock.Mock()]
test_obj.get_sg_log_info_for_log_resources(
m_context,
resource_type=log_const.SECURITY_GROUP,
log_resources=log_resources)
mock_callback.assert_called_with(m_context, log_resources)