[log] Generic RPC stuffs for logging in server side
This patch intends to make generic RPC stuffs for logging in server side. So logging can be extended to be used by other resources like firewall. Partial-Bug: #1720727 Change-Id: I6f93a71c2ae725e3b97d8abc5f57093894792116
This commit is contained in:
parent
d00a1558a5
commit
a4ffcab4ca
|
@ -24,6 +24,9 @@ SECURITY_GROUP = 'security_group'
|
|||
|
||||
RPC_NAMESPACE_LOGGING = 'logging-plugin'
|
||||
|
||||
# Define for rpc_method_key
|
||||
LOG_RESOURCE = 'log_resource'
|
||||
|
||||
# String literal for identifying log resource
|
||||
LOGGING = 'log'
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ from neutron_lib.callbacks import registry
|
|||
from oslo_log import log as logging
|
||||
|
||||
from neutron.services.logapi.common import constants as log_const
|
||||
from neutron.services.logapi.rpc import server as server_rpc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -53,6 +54,9 @@ class DriverBase(object):
|
|||
# trigger is the LoggingServiceDriverManager
|
||||
trigger.register_driver(self)
|
||||
|
||||
def register_rpc_methods(self, resource_type, rpc_methods):
|
||||
server_rpc.register_rpc_methods(resource_type, rpc_methods)
|
||||
|
||||
def is_loaded(self):
|
||||
"""True if the driver is active for the Neutron Server.
|
||||
|
||||
|
|
|
@ -14,9 +14,12 @@
|
|||
# under the License.
|
||||
|
||||
from neutron_lib.api.definitions import portbindings
|
||||
from neutron_lib.callbacks import resources
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron.services.logapi.common import constants as log_const
|
||||
from neutron.services.logapi.drivers import base
|
||||
from neutron.services.logapi.rpc import server as server_rpc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -43,4 +46,13 @@ def register():
|
|||
global DRIVER
|
||||
if not DRIVER:
|
||||
DRIVER = OVSDriver.create()
|
||||
|
||||
# Register RPC methods
|
||||
if DRIVER.requires_rpc:
|
||||
rpc_methods = [
|
||||
{resources.PORT: server_rpc.get_sg_log_info_for_port},
|
||||
{log_const.LOG_RESOURCE:
|
||||
server_rpc.get_sg_log_info_for_log_resources}
|
||||
]
|
||||
DRIVER.register_rpc_methods(log_const.SECURITY_GROUP, rpc_methods)
|
||||
LOG.debug('Open vSwitch logging driver registered')
|
||||
|
|
|
@ -274,10 +274,14 @@ class OVSFirewallLoggingDriver(log_ext.LoggingDriver):
|
|||
# try to clean port flows log for port updated/create event
|
||||
self._cleanup_port_flows_log(port_id)
|
||||
logs_info = self.resource_rpc.get_sg_log_info_for_port(
|
||||
context, port_id=port_id)
|
||||
context,
|
||||
resource_type=log_const.SECURITY_GROUP,
|
||||
port_id=port_id)
|
||||
elif log_resources:
|
||||
logs_info = self.resource_rpc.get_sg_log_info_for_log_resources(
|
||||
context, log_resources=log_resources)
|
||||
context,
|
||||
resource_type=log_const.SECURITY_GROUP,
|
||||
log_resources=log_resources)
|
||||
|
||||
for log_info in logs_info:
|
||||
log_id = log_info['id']
|
||||
|
|
|
@ -32,14 +32,18 @@ class LoggingApiStub(object):
|
|||
self.rpc_client = n_rpc.get_client(target)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def get_sg_log_info_for_port(self, context, port_id):
|
||||
def get_sg_log_info_for_port(self, context, resource_type, port_id):
|
||||
"""Return list of sg_log info for a port"""
|
||||
cctxt = self.rpc_client.prepare()
|
||||
return cctxt.call(context, 'get_sg_log_info_for_port', port_id=port_id)
|
||||
return cctxt.call(context, 'get_sg_log_info_for_port',
|
||||
resource_type=resource_type,
|
||||
port_id=port_id)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def get_sg_log_info_for_log_resources(self, context, log_resources):
|
||||
def get_sg_log_info_for_log_resources(self, context,
|
||||
resource_type, log_resources):
|
||||
"""Return list of sg_log info for list of log_resources"""
|
||||
cctxt = self.rpc_client.prepare()
|
||||
return cctxt.call(context, 'get_sg_log_info_for_log_resources',
|
||||
resource_type=resource_type,
|
||||
log_resources=log_resources)
|
||||
|
|
|
@ -13,7 +13,9 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron_lib.callbacks import resources as r_const
|
||||
from oslo_log import helpers as log_helpers
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
|
||||
from neutron.api.rpc.callbacks import events
|
||||
|
@ -22,15 +24,57 @@ from neutron.common import rpc as n_rpc
|
|||
from neutron.services.logapi.common import constants as log_const
|
||||
from neutron.services.logapi.common import db_api
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# RPC methods mapping
|
||||
RPC_RESOURCES_METHOD_MAP = {}
|
||||
|
||||
|
||||
# This function must be called when a log_driver is registered.
|
||||
def register_rpc_methods(resource_type, rpc_methods):
|
||||
"""Register RPC methods.
|
||||
|
||||
:param resource_type: string and must be a valid resource type.
|
||||
:param rpc_methods: list of RPC methods to be registered.
|
||||
This param would look like:
|
||||
[
|
||||
{'PORT': get_sg_log_info_for_port},
|
||||
{'LOG_RESOURCE': get_sg_log_info_for_log_resources}
|
||||
]
|
||||
"""
|
||||
if resource_type not in RPC_RESOURCES_METHOD_MAP:
|
||||
RPC_RESOURCES_METHOD_MAP[resource_type] = rpc_methods
|
||||
|
||||
|
||||
def get_rpc_method(resource_type, rpc_method_key):
|
||||
if resource_type not in RPC_RESOURCES_METHOD_MAP:
|
||||
raise NotImplementedError()
|
||||
|
||||
for rpc_method in RPC_RESOURCES_METHOD_MAP[resource_type]:
|
||||
if rpc_method_key in rpc_method.keys():
|
||||
return list(rpc_method.values())[0]
|
||||
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def get_sg_log_info_for_port(context, port_id):
|
||||
return db_api.get_sg_log_info_for_port(context, port_id)
|
||||
|
||||
|
||||
def get_sg_log_info_for_log_resources(context, log_resources):
|
||||
return db_api.get_sg_log_info_for_log_resources(context, log_resources)
|
||||
|
||||
|
||||
class LoggingApiSkeleton(object):
|
||||
"""Skeleton proxy code for agent->server communication."""
|
||||
|
||||
# History
|
||||
# 1.0 Initial version
|
||||
# 1.1 Introduce resource_type as a keyword in order to extend
|
||||
# support for other resources
|
||||
|
||||
target = oslo_messaging.Target(
|
||||
version='1.0', namespace=log_const.RPC_NAMESPACE_LOGGING)
|
||||
version='1.1', namespace=log_const.RPC_NAMESPACE_LOGGING)
|
||||
|
||||
def __init__(self):
|
||||
self.conn = n_rpc.Connection()
|
||||
|
@ -38,12 +82,21 @@ class LoggingApiSkeleton(object):
|
|||
fanout=False)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def get_sg_log_info_for_port(self, context, port_id):
|
||||
return db_api.get_sg_log_info_for_port(context, port_id)
|
||||
def get_sg_log_info_for_port(self, context, port_id, **kwargs):
|
||||
resource_type = kwargs.get('resource_type', log_const.SECURITY_GROUP)
|
||||
LOG.debug("Logging agent requests log info "
|
||||
"for port with resource type %s", resource_type)
|
||||
rpc_method = get_rpc_method(resource_type, r_const.PORT)
|
||||
return rpc_method(context, port_id)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def get_sg_log_info_for_log_resources(self, context, log_resources):
|
||||
return db_api.get_sg_log_info_for_log_resources(context, log_resources)
|
||||
def get_sg_log_info_for_log_resources(self, context,
|
||||
log_resources, **kwargs):
|
||||
resource_type = kwargs.get('resource_type', log_const.SECURITY_GROUP)
|
||||
LOG.debug("Logging agent requests log info "
|
||||
"for log resources with resource type %s", resource_type)
|
||||
rpc_method = get_rpc_method(resource_type, log_const.LOG_RESOURCE)
|
||||
return rpc_method(context, log_resources)
|
||||
|
||||
|
||||
class LoggingApiNotification(object):
|
||||
|
|
|
@ -20,6 +20,7 @@ from oslo_utils import uuidutils
|
|||
|
||||
from neutron.common import utils
|
||||
from neutron.objects.logapi import logging_resource as log_object
|
||||
from neutron.services.logapi.common import constants as log_const
|
||||
from neutron.services.logapi.common import db_api
|
||||
from neutron.services.logapi.common import validators
|
||||
from neutron.services.logapi.rpc import server as server_rpc
|
||||
|
@ -162,41 +163,49 @@ class LoggingRpcCallbackTestCase(test_sg.SecurityGroupDBTestCase):
|
|||
ports_rest = self.deserialize(self.fmt, res)
|
||||
port_id = ports_rest['port']['id']
|
||||
log = _create_log(resource_id=sg_id, tenant_id=tenant_id)
|
||||
with mock.patch.object(validators, 'validate_log_type_for_port',
|
||||
return_value=True):
|
||||
ports_log = (
|
||||
self.rpc_callback.get_sg_log_info_for_log_resources(
|
||||
self.context, log_resources=[log])
|
||||
)
|
||||
expected = [{
|
||||
'event': log.event,
|
||||
'id': log.id,
|
||||
'ports_log': [{
|
||||
'port_id': port_id,
|
||||
'security_group_rules': [
|
||||
{'direction': 'egress',
|
||||
'ethertype': u'IPv4',
|
||||
'security_group_id': sg_id},
|
||||
{'direction': 'egress',
|
||||
'ethertype': u'IPv6',
|
||||
'security_group_id': sg_id},
|
||||
{'direction': 'ingress',
|
||||
'ethertype': u'IPv4',
|
||||
'port_range_max': 22,
|
||||
'port_range_min': 22,
|
||||
'protocol': u'tcp',
|
||||
'security_group_id': sg_id},
|
||||
{'direction': 'egress',
|
||||
'ethertype': u'IPv4',
|
||||
'protocol': u'tcp',
|
||||
'dest_ip_prefix':
|
||||
utils.AuthenticIPNetwork('10.0.0.1/32'),
|
||||
'security_group_id': sg_id}]
|
||||
}],
|
||||
'project_id': tenant_id
|
||||
}]
|
||||
self.assertEqual(expected, ports_log)
|
||||
self._delete('ports', port_id)
|
||||
with mock.patch.object(
|
||||
server_rpc,
|
||||
'get_rpc_method',
|
||||
return_value=server_rpc.get_sg_log_info_for_log_resources
|
||||
):
|
||||
with mock.patch.object(validators,
|
||||
'validate_log_type_for_port',
|
||||
return_value=True):
|
||||
ports_log = (
|
||||
self.rpc_callback.get_sg_log_info_for_log_resources(
|
||||
self.context,
|
||||
resource_type=log_const.SECURITY_GROUP,
|
||||
log_resources=[log])
|
||||
)
|
||||
expected = [{
|
||||
'event': log.event,
|
||||
'id': log.id,
|
||||
'ports_log': [{
|
||||
'port_id': port_id,
|
||||
'security_group_rules': [
|
||||
{'direction': 'egress',
|
||||
'ethertype': u'IPv4',
|
||||
'security_group_id': sg_id},
|
||||
{'direction': 'egress',
|
||||
'ethertype': u'IPv6',
|
||||
'security_group_id': sg_id},
|
||||
{'direction': 'ingress',
|
||||
'ethertype': u'IPv4',
|
||||
'port_range_max': 22,
|
||||
'port_range_min': 22,
|
||||
'protocol': u'tcp',
|
||||
'security_group_id': sg_id},
|
||||
{'direction': 'egress',
|
||||
'ethertype': u'IPv4',
|
||||
'protocol': u'tcp',
|
||||
'dest_ip_prefix':
|
||||
utils.AuthenticIPNetwork('10.0.0.1/32'),
|
||||
'security_group_id': sg_id}]
|
||||
}],
|
||||
'project_id': tenant_id
|
||||
}]
|
||||
self.assertEqual(expected, ports_log)
|
||||
self._delete('ports', port_id)
|
||||
|
||||
def test_get_sg_log_info_for_port_added_event(self):
|
||||
with self.network() as network, \
|
||||
|
@ -228,39 +237,48 @@ class LoggingRpcCallbackTestCase(test_sg.SecurityGroupDBTestCase):
|
|||
with mock.patch.object(
|
||||
log_object.Log, 'get_objects', return_value=[log]):
|
||||
with mock.patch.object(
|
||||
validators, 'validate_log_type_for_port',
|
||||
return_value=True):
|
||||
ports_log = (
|
||||
self.rpc_callback.get_sg_log_info_for_port(
|
||||
self.context, port_id=port_id)
|
||||
)
|
||||
expected = [{
|
||||
'event': log.event,
|
||||
'id': log.id,
|
||||
'ports_log': [{
|
||||
'port_id': port_id,
|
||||
'security_group_rules': [
|
||||
{'direction': 'egress',
|
||||
'ethertype': u'IPv4',
|
||||
'security_group_id': sg_id},
|
||||
{'direction': 'egress',
|
||||
'ethertype': u'IPv6',
|
||||
'security_group_id': sg_id},
|
||||
{'direction': 'ingress',
|
||||
'ethertype': u'IPv4',
|
||||
'port_range_max': 13,
|
||||
'port_range_min': 11,
|
||||
'protocol': u'tcp',
|
||||
'source_ip_prefix':
|
||||
utils.AuthenticIPNetwork('10.0.0.1/32'),
|
||||
'security_group_id': sg_id},
|
||||
{'direction': 'egress',
|
||||
'ethertype': u'IPv4',
|
||||
'protocol': u'icmp',
|
||||
'security_group_id': sg_id}]
|
||||
}],
|
||||
'project_id': tenant_id
|
||||
}]
|
||||
server_rpc,
|
||||
'get_rpc_method',
|
||||
return_value=server_rpc.get_sg_log_info_for_port
|
||||
):
|
||||
with mock.patch.object(
|
||||
validators,
|
||||
'validate_log_type_for_port',
|
||||
return_value=True):
|
||||
ports_log = (
|
||||
self.rpc_callback.get_sg_log_info_for_port(
|
||||
self.context,
|
||||
resource_type=log_const.SECURITY_GROUP,
|
||||
port_id=port_id)
|
||||
)
|
||||
expected = [{
|
||||
'event': log.event,
|
||||
'id': log.id,
|
||||
'ports_log': [{
|
||||
'port_id': port_id,
|
||||
'security_group_rules': [
|
||||
{'direction': 'egress',
|
||||
'ethertype': u'IPv4',
|
||||
'security_group_id': sg_id},
|
||||
{'direction': 'egress',
|
||||
'ethertype': u'IPv6',
|
||||
'security_group_id': sg_id},
|
||||
{'direction': 'ingress',
|
||||
'ethertype': u'IPv4',
|
||||
'port_range_max': 13,
|
||||
'port_range_min': 11,
|
||||
'protocol': u'tcp',
|
||||
'source_ip_prefix':
|
||||
utils.AuthenticIPNetwork(
|
||||
'10.0.0.1/32'),
|
||||
'security_group_id': sg_id},
|
||||
{'direction': 'egress',
|
||||
'ethertype': u'IPv4',
|
||||
'protocol': u'icmp',
|
||||
'security_group_id': sg_id}]
|
||||
}],
|
||||
'project_id': tenant_id
|
||||
}]
|
||||
|
||||
self.assertEqual(expected, ports_log)
|
||||
self._delete('ports', port_id)
|
||||
self.assertEqual(expected, ports_log)
|
||||
self._delete('ports', port_id)
|
||||
|
|
|
@ -62,6 +62,26 @@ class LoggingApiNotificationTestCase(base.BaseTestCase):
|
|||
events.DELETED)
|
||||
|
||||
|
||||
class TestRegisterValidateRPCMethods(base.BaseTestCase):
|
||||
|
||||
def test_register_rpc_methods_method(self):
|
||||
resource_type = 'security_group'
|
||||
method = [{'fake_key1': 'fake_method1'},
|
||||
{'fake_key2': 'fake_method2'}]
|
||||
expected = {resource_type: method}
|
||||
server_rpc.RPC_RESOURCES_METHOD_MAP.clear()
|
||||
server_rpc.register_rpc_methods(resource_type, method)
|
||||
self.assertEqual(expected, server_rpc.RPC_RESOURCES_METHOD_MAP)
|
||||
|
||||
def test_get_rpc_method(self):
|
||||
resource_type = 'security_group'
|
||||
method = [{'fake_key1': 'fake_method1'},
|
||||
{'fake_key2': 'fake_method2'}]
|
||||
server_rpc.RPC_RESOURCES_METHOD_MAP = {resource_type: method}
|
||||
actual = server_rpc.get_rpc_method('security_group', 'fake_key1')
|
||||
self.assertEqual('fake_method1', actual)
|
||||
|
||||
|
||||
class LoggingApiSkeletonTestCase(base.BaseTestCase):
|
||||
|
||||
@mock.patch("neutron.common.rpc.get_server")
|
||||
|
@ -76,18 +96,33 @@ class LoggingApiSkeletonTestCase(base.BaseTestCase):
|
|||
@mock.patch("neutron.services.logapi.common.db_api."
|
||||
"get_sg_log_info_for_port")
|
||||
def test_get_sg_log_info_for_port(self, mock_callback):
|
||||
test_obj = server_rpc.LoggingApiSkeleton()
|
||||
m_context = mock.Mock()
|
||||
port_id = '123'
|
||||
test_obj.get_sg_log_info_for_port(m_context, port_id=port_id)
|
||||
mock_callback.assert_called_with(m_context, port_id)
|
||||
with mock.patch.object(
|
||||
server_rpc,
|
||||
'get_rpc_method',
|
||||
return_value=server_rpc.get_sg_log_info_for_port
|
||||
):
|
||||
test_obj = server_rpc.LoggingApiSkeleton()
|
||||
m_context = mock.Mock()
|
||||
port_id = '123'
|
||||
test_obj.get_sg_log_info_for_port(
|
||||
m_context,
|
||||
resource_type=log_const.SECURITY_GROUP,
|
||||
port_id=port_id)
|
||||
mock_callback.assert_called_with(m_context, port_id)
|
||||
|
||||
@mock.patch("neutron.services.logapi.common.db_api."
|
||||
"get_sg_log_info_for_log_resources")
|
||||
def test_get_sg_log_info_for_log_resources(self, mock_callback):
|
||||
test_obj = server_rpc.LoggingApiSkeleton()
|
||||
m_context = mock.Mock()
|
||||
log_resources = [mock.Mock()]
|
||||
test_obj.get_sg_log_info_for_log_resources(m_context,
|
||||
log_resources=log_resources)
|
||||
mock_callback.assert_called_with(m_context, log_resources)
|
||||
with mock.patch.object(
|
||||
server_rpc,
|
||||
'get_rpc_method',
|
||||
return_value=server_rpc.get_sg_log_info_for_log_resources
|
||||
):
|
||||
test_obj = server_rpc.LoggingApiSkeleton()
|
||||
m_context = mock.Mock()
|
||||
log_resources = [mock.Mock()]
|
||||
test_obj.get_sg_log_info_for_log_resources(
|
||||
m_context,
|
||||
resource_type=log_const.SECURITY_GROUP,
|
||||
log_resources=log_resources)
|
||||
mock_callback.assert_called_with(m_context, log_resources)
|
||||
|
|
Loading…
Reference in New Issue