Merge "Add functional test for FWaaS L3 logging driver"

This commit is contained in:
Zuul 2018-10-19 20:02:51 +00:00 committed by Gerrit Code Review
commit 61a3b1f70e
7 changed files with 572 additions and 0 deletions

View File

@ -20,5 +20,10 @@ case $VENV in
source $FWAAS_PATH/tools/configure_for_fwaas_func_testing.sh
configure_host_for_func_testing
if is_ubuntu || is_suse; then
install_package libnetfilter-log1
elif is_fedora; then
install_package libnetfilter-log
fi
;;
esac

View File

@ -0,0 +1,567 @@
# Copyright (c) 2018 Fujitsu Limited.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import mock
from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api
from neutron.agent.linux import utils as linux_utils
from neutron.common import constants as n_const
from neutron.services.logapi.common import constants as log_const
from neutron.tests.functional.agent.l3 import framework
from neutron_lib import context as neutron_context
from oslo_config import cfg
from oslo_log import log as logging
from neutron_fwaas.services.logapi.agents.drivers.iptables import log
LOG = logging.getLogger(__name__)
FAKE_LOG_ID = 'fake_log_id'
FAKE_PROJECT_ID = 'fake_project_id'
FAKE_RESOURCE_TYPE = 'firewall_group'
# Default chain name
ACCEPTED_CHAIN = 'accepted'
DROPPED_CHAIN = 'dropped'
REJECTED_CHAIN = 'rejected'
ACCEPT = 'ACCEPT'
DROP = 'DROP'
REJECT = 'REJECT'
ALL = 'ALL'
CHAIN_NAME_POSTFIX_MAP = {
ACCEPT: ACCEPTED_CHAIN,
DROP: DROPPED_CHAIN,
REJECT: REJECTED_CHAIN
}
FWAAS_V2_LOG_OPTS = [
cfg.StrOpt('extensions', default=['fwaas_v2', 'fwaas_v2_log']),
]
AGENT_MODE_OPTS = [
cfg.StrOpt('agent_mode', default='legacy',
choices=['legacy', 'dvr', 'dvr_snat', 'dvr_no_external']),
]
class FWLoggingTestBase(framework.L3AgentTestFramework):
def setUp(self):
super(FWLoggingTestBase, self).setUp()
self.conf.register_opts(FWAAS_V2_LOG_OPTS, 'fwaas')
self.conf.register_opts(AGENT_MODE_OPTS, group='DEFAULT')
self._set_agent_mode(self.conf)
self.if_prefix = 'qr-'
self.context = neutron_context.get_admin_context()
self.context.tenant_id = FAKE_PROJECT_ID
self.resource_rpc = mock.patch(
'neutron.services.logapi.rpc.agent.LoggingApiStub').start()
# Initialize logging driver
self.log_driver = self._initialize_iptables_log()
# Prepare router_info
self._prepare_router_info(n_ports=2)
def _prepare_router_info(self, n_ports=0):
router_data = self.generate_router_info(enable_ha=False,
num_internal_ports=n_ports)
self.router_info = self.manage_router(self.agent, router_data)
self.log_driver.agent_api._router_info = {
self.router_info.router_id: self.router_info
}
def _initialize_iptables_log(self):
self.agent_api = l3_ext_api.L3AgentExtensionAPI({})
log_driver = log.IptablesLoggingDriver(self.agent_api)
log_driver.initialize(self.resource_rpc)
log_driver.conf = self.conf
return log_driver
def _refresh_logging_config(self, ipt_mgr):
# Reset configuration for the next testing EVENT
self.log_driver.ipt_mgr_list.clear()
self.log_driver.fwg_port_logs.clear()
self.log_driver.prefixes_table.clear()
self.log_driver.cleanup_table.clear()
self.log_driver.nflog_proc_map.clear()
self.log_driver.unused_port_ids.clear()
# Empty default chains
self._empty_default_chains_v4v6(ipt_mgr=ipt_mgr)
def _set_agent_mode(self, cfg, agent_mode='legacy'):
cfg.agent_mode = agent_mode
def _config_default_chains_v4v6(self, ipt_mgr):
# Config default chains in iptables and ip6tables
for action, chain in CHAIN_NAME_POSTFIX_MAP.items():
v4rules_in_chain = \
ipt_mgr.get_chain("filter", chain, ip_version=4)
if not v4rules_in_chain:
ipt_mgr.ipv4['filter'].add_chain(chain)
ipt_mgr.ipv4['filter'].add_rule(chain, '-j %s' % action)
v6rules_in_chain = \
ipt_mgr.get_chain("filter", chain, ip_version=6)
if not v6rules_in_chain:
ipt_mgr.ipv6['filter'].add_chain(chain)
ipt_mgr.ipv6['filter'].add_rule(chain, '-j %s' % action)
def _empty_default_chains_v4v6(self, ipt_mgr):
# Empty default chains in iptables and ip6tables
for action, chain in CHAIN_NAME_POSTFIX_MAP.items():
ipt_mgr.ipv4['filter'].empty_chain(chain=chain)
ipt_mgr.ipv6['filter'].empty_chain(chain=chain)
def _fake_log_resource(self, tenant_id, resource_id=None,
target_id=None, event='ALL', enabled=True):
log_resource = {
'id': FAKE_LOG_ID,
'name': 'fake_log_name',
'resource_type': FAKE_RESOURCE_TYPE,
'project_id': tenant_id,
'event': event,
'enabled': True}
if resource_id:
log_resource['resource_id'] = resource_id
if target_id:
log_resource['target_id'] = target_id
if not enabled:
log_resource['enabled'] = enabled
return log_resource
def _fake_log_info(self, log_id, port_ids, event='ALL'):
return {
'event': event,
'id': log_id,
'project_id': FAKE_PROJECT_ID,
'ports_log': port_ids
}
def _get_expected_nflog_rule(self, wrap_name, if_prefix, logs_info):
# Generate an expected NFLOG rules from given log_info
rules = set()
limit = 'limit --limit %s/sec --limit-burst %s' % \
(self.log_driver.rate_limit, self.log_driver.burst_limit)
accept_chain = wrap_name + '-' + ACCEPTED_CHAIN
drop_chain = wrap_name + '-' + DROPPED_CHAIN
reject_chain = wrap_name + '-' + REJECTED_CHAIN
for log_info in logs_info:
event = log_info['event']
ports_log = log_info['ports_log']
for port_id in ports_log:
device = (if_prefix + port_id)[:n_const.LINUX_DEV_LEN]
if event in [ACCEPT, ALL]:
# Generate iptables rules for ACCEPT action
prefix = self._get_log_prefix(port_id, ACCEPT)
rules.add('-A %s -i %s -m %s -j NFLOG --nflog-prefix %s'
% (accept_chain, device, limit, prefix.id))
rules.add('-A %s -o %s -m %s -j NFLOG --nflog-prefix %s'
% (accept_chain, device, limit, prefix.id))
if event in [DROP, ALL]:
# Generate iptables rules for DROP action
prefix = self._get_log_prefix(port_id, DROP)
rules.add('-A %s -i %s -m %s -j NFLOG --nflog-prefix %s'
% (drop_chain, device, limit, prefix.id))
rules.add('-A %s -o %s -m %s -j NFLOG --nflog-prefix %s'
% (drop_chain, device, limit, prefix.id))
# Generate iptables rules for REJECT action
rules.add('-A %s -i %s -m %s -j NFLOG --nflog-prefix %s'
% (reject_chain, device, limit, prefix.id))
rules.add('-A %s -o %s -m %s -j NFLOG --nflog-prefix %s'
% (reject_chain, device, limit, prefix.id))
return rules
def _get_log_prefix(self, port_id, action):
prefix_table = self.log_driver.prefixes_table
if port_id in prefix_table:
for prefix in prefix_table[port_id]:
if prefix.action == action:
return prefix
return None
def _get_nflog_entries(self, namespace, table='iptables', chain_name=None):
# Get NFLOG entries from iptables and ip6tables
exec_cmd = ['ip', 'netns', 'exec', namespace, table, '-S']
if chain_name:
exec_cmd += [chain_name]
while True:
try:
output = linux_utils.execute(exec_cmd,
run_as_root=True,
check_exit_code=True,
extra_ok_codes=[1])
nflog_rules = [rule for rule in output.splitlines()
if 'NFLOG' in rule]
return nflog_rules
except RuntimeError:
time.sleep(1)
def assert_logging_results(self, ipt_mgr, log_info):
# Comparing between expected NFLOG rules and NFLOG rules from iptables
v4_rules = v6_rules = self._get_expected_nflog_rule(
ipt_mgr.wrap_name, self.if_prefix, log_info)
v4_actual = self._get_nflog_entries(ipt_mgr.namespace,
table='iptables')
v6_actual = self._get_nflog_entries(ipt_mgr.namespace,
table='ip6tables')
self.assertEqual(sorted(v4_rules), sorted(v4_actual))
self.assertEqual(sorted(v6_rules), sorted(v6_actual))
def run_start_logging(self, ipt_mgr, log_info, **kwargs):
# Run start logging function with a give log_info
router_info = kwargs.get('router_info')
log_resources = kwargs.get('log_resources')
self._config_default_chains_v4v6(ipt_mgr)
if router_info:
with mock.patch.object(self.resource_rpc,
'get_sg_log_info_for_port',
return_value=log_info):
self.log_driver.start_logging(self.context,
router_info=router_info)
elif log_resources:
with mock.patch.object(self.resource_rpc,
'get_sg_log_info_for_log_resources',
return_value=log_info):
self.log_driver.start_logging(self.context,
log_resources=log_resources)
class FWLoggingTestCase(FWLoggingTestBase):
def test_start_logging_when_l3_starting(self):
# Get router information
ipt_mgr = self.router_info.iptables_manager
port_ids = [port['id'] for port in self.router_info.internal_ports]
for event in log_const.LOG_EVENTS:
# Test start_logging with single log resource
f_log_info = [
self._fake_log_info(log_id=FAKE_LOG_ID,
port_ids=port_ids,
event=event)
]
self.run_start_logging(ipt_mgr,
log_info=f_log_info,
router_info=self.router_info)
# Test start_logging with multiple log resources
f_log_info = [
self._fake_log_info(log_id='fake_log_id_1',
port_ids=[port_ids[0]],
event=event),
self._fake_log_info(log_id='fake_log_id_2',
port_ids=[port_ids[1]],
event=event)
]
self.run_start_logging(ipt_mgr,
log_info=f_log_info,
router_info=self.router_info)
self.assert_logging_results(ipt_mgr, f_log_info)
self._refresh_logging_config(ipt_mgr=ipt_mgr)
def test_start_logging_when_create_log(self):
# Get router information
ipt_mgr = self.router_info.iptables_manager
port_ids = [port['id'] for port in self.router_info.internal_ports]
for event in log_const.LOG_EVENTS:
log_resources = [self._fake_log_resource(FAKE_PROJECT_ID,
event=event)]
f_log_info = [
self._fake_log_info(log_id=FAKE_LOG_ID,
port_ids=port_ids,
event=event)
]
self.run_start_logging(ipt_mgr,
log_info=f_log_info,
log_resources=log_resources)
self.assert_logging_results(ipt_mgr, f_log_info)
self._refresh_logging_config(ipt_mgr=ipt_mgr)
def test_start_logging_when_add_router_port(self):
ipt_mgr = self.router_info.iptables_manager
for event in log_const.LOG_EVENTS:
port_ids = [port['id'] for port in self.router_info.internal_ports]
# Making log_info when there is only one port
added_port_id = port_ids.pop()
initial_log_info = [
self._fake_log_info(log_id=FAKE_LOG_ID,
port_ids=port_ids,
event=event)
]
# Make log_info with new adding port
port_ids.append(added_port_id)
add_port_log_info = [
self._fake_log_info(log_id=FAKE_LOG_ID,
port_ids=port_ids,
event=event)
]
self._config_default_chains_v4v6(ipt_mgr)
with mock.patch.object(self.resource_rpc,
'get_sg_log_info_for_port',
side_effect=[initial_log_info,
add_port_log_info]):
# Start logging with a single port as normal to get initial
# NFLOG rules into iptables
self.log_driver.start_logging(self.context,
router_info=self.router_info)
# Start logging with the new port
self.log_driver.start_logging(self.context,
router_info=self.router_info)
self.assert_logging_results(ipt_mgr, add_port_log_info)
self._refresh_logging_config(ipt_mgr=ipt_mgr)
def test_start_logging_when_remove_port(self):
ipt_mgr = self.router_info.iptables_manager
for event in log_const.LOG_EVENTS:
port_ids = [port['id'] for port in self.router_info.internal_ports]
# Making log_info when there are two ports on router
initial_log_info = [
self._fake_log_info(log_id=FAKE_LOG_ID,
port_ids=port_ids,
event=event)
]
# Make log_info when a port is removed from router
port_ids.pop()
remove_port_log_info = [
self._fake_log_info(log_id=FAKE_LOG_ID,
port_ids=port_ids,
event=event)
]
self._config_default_chains_v4v6(ipt_mgr)
with mock.patch.object(self.resource_rpc,
'get_sg_log_info_for_port',
side_effect=[initial_log_info,
remove_port_log_info]):
# Start logging with a single port as normal to get initial
# NFLOG rules into iptables
self.log_driver.start_logging(self.context,
router_info=self.router_info)
# Start logging with the new port
self.log_driver.start_logging(self.context,
router_info=self.router_info)
self.assert_logging_results(ipt_mgr, remove_port_log_info)
self._refresh_logging_config(ipt_mgr=ipt_mgr)
def test_start_logging_when_attach_port_to_fwg(self):
ipt_mgr = self.router_info.iptables_manager
for event in log_const.LOG_EVENTS:
port_ids = [port['id'] for port in self.router_info.internal_ports]
# Making log_info when there is only one port
attached_port_id = port_ids.pop()
initial_log_info = [
self._fake_log_info(log_id=FAKE_LOG_ID,
port_ids=port_ids,
event=event)
]
# Make log_info with a new port that attached to fwg
port_ids.append(attached_port_id)
attached_log_info = [
self._fake_log_info(log_id=FAKE_LOG_ID,
port_ids=port_ids,
event=event)
]
log_resources = [
self._fake_log_resource(FAKE_PROJECT_ID,
resource_id=attached_port_id,
event=event)
]
self._config_default_chains_v4v6(ipt_mgr)
with mock.patch.object(self.resource_rpc,
'get_sg_log_info_for_port',
return_value=initial_log_info):
with mock.patch.object(self.resource_rpc,
'get_sg_log_info_for_log_resources',
return_value=attached_log_info):
# Start logging with a single port as normal to get initial
# NFLOG rules into iptables
self.log_driver.start_logging(self.context,
router_info=self.router_info)
# Start logging with the new port attach to fwg
self.log_driver.start_logging(self.context,
log_resources=log_resources)
self.assert_logging_results(ipt_mgr, attached_log_info)
self._refresh_logging_config(ipt_mgr=ipt_mgr)
def test_start_logging_when_detach_port_from_fwg(self):
ipt_mgr = self.router_info.iptables_manager
for event in log_const.LOG_EVENTS:
port_ids = [port['id'] for port in self.router_info.internal_ports]
# Making log_info when there are two ports attached to fwg
initial_log_info = [
self._fake_log_info(log_id=FAKE_LOG_ID,
port_ids=port_ids,
event=event)
]
# Make log_info when a port is detached from fwg
detached_port_id = port_ids.pop()
detached_log_info = [
self._fake_log_info(log_id=FAKE_LOG_ID,
port_ids=port_ids,
event=event)
]
log_resources = [
self._fake_log_resource(FAKE_PROJECT_ID,
resource_id=detached_port_id,
event=event)
]
self._config_default_chains_v4v6(ipt_mgr)
with mock.patch.object(self.resource_rpc,
'get_sg_log_info_for_port',
return_value=initial_log_info):
with mock.patch.object(self.resource_rpc,
'get_sg_log_info_for_log_resources',
return_value=detached_log_info):
# Start logging with a single port as normal to get initial
# NFLOG rules into iptables
self.log_driver.start_logging(self.context,
router_info=self.router_info)
# Start logging with the new port attach to fwg
self.log_driver.start_logging(self.context,
log_resources=log_resources)
self.assert_logging_results(ipt_mgr, detached_log_info)
self._refresh_logging_config(ipt_mgr=ipt_mgr)
def test_start_logging_when_enable_router(self):
ipt_mgr = self.router_info.iptables_manager
port_ids = [port['id'] for port in self.router_info.internal_ports]
for event in log_const.LOG_EVENTS:
# Log info to initialize NFLOG rules
f_log_info = [
self._fake_log_info(log_id=FAKE_LOG_ID,
port_ids=port_ids,
event=ALL)
]
# Initialize NFLOG rules with start_logging
self.run_start_logging(ipt_mgr,
log_info=f_log_info,
router_info=self.router_info)
# Fake disable router by running stop_logging with router_info
self.log_driver.stop_logging(
self.context, router_info=self.router_info.router_id)
# Fake enable router by running start_logging with router_info
self.log_driver.start_logging(self.context,
router_info=self.router_info)
self.assert_logging_results(ipt_mgr, f_log_info)
self._refresh_logging_config(ipt_mgr=ipt_mgr)
def test_stop_logging_when_delete_log(self):
ipt_mgr = self.router_info.iptables_manager
for event in log_const.LOG_EVENTS:
port_ids = [port['id'] for port in self.router_info.internal_ports]
# Initialize log_info to start logging
log_info_1 = self._fake_log_info(log_id='fake_log_id_1',
port_ids=port_ids,
event=event)
log_info_2 = self._fake_log_info(log_id='fake_log_id_2',
port_ids=[port_ids[0]],
event=event)
initial_log_info = [
log_info_1,
log_info_2
]
self._config_default_chains_v4v6(ipt_mgr)
with mock.patch.object(self.resource_rpc,
'get_sg_log_info_for_port',
return_value=initial_log_info):
# Start logging to get initial NFLOG rules
self.log_driver.start_logging(self.context,
router_info=self.router_info)
# Stop logging by deleting fake_log_id_1
deleted_log_1 = [{'id': 'fake_log_id_1'}]
self.log_driver.stop_logging(self.context,
log_resources=deleted_log_1)
self.assert_logging_results(ipt_mgr, [log_info_2])
# Stop logging by deleting fake_log_id_2
deleted_log_2 = [{'id': 'fake_log_id_2'}]
self.log_driver.stop_logging(self.context,
log_resources=deleted_log_2)
self.assert_logging_results(ipt_mgr, [])
self._refresh_logging_config(ipt_mgr=ipt_mgr)
def test_stop_logging_when_delete_log_with_event_combination(self):
ipt_mgr = self.router_info.iptables_manager
port_ids = [port['id'] for port in self.router_info.internal_ports]
# Initial log_info to start logging
log_info_1 = self._fake_log_info(log_id='accept_log_id',
port_ids=port_ids,
event=ACCEPT)
log_info_2 = self._fake_log_info(log_id='all_log_id',
port_ids=[port_ids[0]],
event=ALL)
initial_log_info = [
log_info_1,
log_info_2
]
self._config_default_chains_v4v6(ipt_mgr)
with mock.patch.object(self.resource_rpc,
'get_sg_log_info_for_port',
return_value=initial_log_info):
# Start logging to get initial NFLOG rules
self.log_driver.start_logging(self.context,
router_info=self.router_info)
# Stop logging by deleting accept_log_id
accepted_log = [{'id': 'accept_log_id'}]
self.log_driver.stop_logging(self.context,
log_resources=accepted_log)
self.assert_logging_results(ipt_mgr, [log_info_2])
# Stop logging by deleting all_log_id
all_log = [{'id': 'all_log_id'}]
self.log_driver.stop_logging(self.context,
log_resources=all_log)
self.assert_logging_results(ipt_mgr, [])