Merge "NSX|V3: FWaaS-v1 support"

This commit is contained in:
Jenkins 2017-06-17 06:20:10 +00:00 committed by Gerrit Code Review
commit e82d0fe5c0
11 changed files with 674 additions and 58 deletions

View File

@ -171,3 +171,20 @@ Enable trunk service and configure following flags in ``local.conf``::
# Trunk plugin NSXv3 driver config
ENABLED_SERVICES+=,q-trunk
Q_SERVICE_PLUGIN_CLASSES=trunk
FWAAS (V1) Driver:
~~~~~~~~~~~~~
Add neutron-fwaas repo as an external repository and configure following flags in ``local.conf``::
[[local|localrc]]
enable_plugin neutron-fwaas https://git.openstack.org/openstack/neutron-fwaas
ENABLED_SERVICES+=,q-fwaas
[[post-config|$NEUTRON_CONF]]
[DEFAULT]
service_plugins = neutron_fwaas.services.firewall.fwaas_plugin.FirewallPlugin
[fwaas]
enabled = True
driver = vmware_nsxv3_edge

View File

@ -38,6 +38,7 @@ neutron.core_plugins =
vmware_dvs = vmware_nsx.plugin:NsxDvsPlugin
firewall_drivers =
vmware_nsxv_edge = vmware_nsx.services.fwaas.nsx_v.edge_fwaas_driver:EdgeFwaasDriver
vmware_nsxv3_edge = vmware_nsx.services.fwaas.nsx_v3.edge_fwaas_driver:EdgeFwaasV3Driver
neutron.service_plugins =
vmware_nsxv_qos = vmware_nsx.services.qos.nsx_v.plugin:NsxVQosPlugin
neutron.qos.notification_drivers =

View File

@ -97,6 +97,7 @@ from vmware_nsx.extensions import providersecuritygroup as provider_sg
from vmware_nsx.extensions import securitygrouplogging as sg_logging
from vmware_nsx.plugins.nsx_v3 import availability_zones as nsx_az
from vmware_nsx.plugins.nsx_v3 import utils as v3_utils
from vmware_nsx.services.fwaas.nsx_v3 import fwaas_callbacks
from vmware_nsx.services.qos.common import utils as qos_com_utils
from vmware_nsx.services.qos.nsx_v3 import driver as qos_driver
from vmware_nsx.services.trunk.nsx_v3 import driver as trunk_driver
@ -227,6 +228,9 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
# init profiles on nsx backend
self._init_nsx_profiles()
# Init the FWaaS support
self._init_fwaas()
# Include exclude NSGroup
LOG.debug("Initializing NSX v3 Excluded Port NSGroup")
self._excluded_port_nsgroup = None
@ -247,6 +251,10 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
# Register NSXv3 trunk driver to support trunk extensions
self.trunk_driver = trunk_driver.NsxV3TrunkDriver.create(self)
def _init_fwaas(self):
# Bind FWaaS callbacks to the driver
self.fwaas_callbacks = fwaas_callbacks.Nsxv3FwaasCallbacks(self.nsxlib)
def init_availability_zones(self):
# availability zones are supported only with native dhcp
# if not - the default az will be loaded and used internally only
@ -2776,7 +2784,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
self._routerlib.add_router_link_port(nsx_router_id, new_tier0_uuid,
tags=tags)
if add_snat_rules:
self._routerlib.add_gw_snat_rule(nsx_router_id, newaddr)
self._routerlib.add_gw_snat_rule(nsx_router_id, newaddr,
bypass_firewall=False)
if bgp_announce:
# TODO(berlin): bgp announce on new tier0 router
pass
@ -3230,7 +3239,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
router_id)
self._routerlib.add_fip_nat_rules(
nsx_router_id, new_fip['floating_ip_address'],
new_fip['fixed_ip_address'])
new_fip['fixed_ip_address'],
bypass_firewall=False)
except nsx_lib_exc.ManagerError:
with excutils.save_and_reraise_exception():
self.delete_floatingip(context, new_fip['id'])
@ -3304,7 +3314,8 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
router_id)
self._routerlib.add_fip_nat_rules(
nsx_router_id, new_fip['floating_ip_address'],
new_fip['fixed_ip_address'])
new_fip['fixed_ip_address'],
bypass_firewall=False)
except nsx_lib_exc.ManagerError:
with excutils.save_and_reraise_exception():
super(NsxV3Plugin, self).update_floatingip(

View File

@ -0,0 +1,82 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from neutron.agent.l3 import router_info
from neutron.common import config as neutron_config # noqa
from neutron_fwaas.services.firewall.agents.l3reference \
import firewall_l3_agent
from neutron_lib import context as n_context
from neutron_lib.plugins import directory
LOG = logging.getLogger(__name__)
class NsxFwaasCallbacks(firewall_l3_agent.L3WithFWaaS):
"""Common NSX RPC callbacks for Firewall As A Service - V1."""
def __init__(self):
# The super code needs a configuration object with the neutron host
# and an agent_mode, which our driver doesn't use.
neutron_conf = cfg.CONF
neutron_conf.agent_mode = 'nsx'
super(NsxFwaasCallbacks, self).__init__(conf=neutron_conf)
@property
def core_plugin(self):
return directory.get_plugin()
# Override functions using the agent_api that is not used by our plugin
def _get_router_ids_for_fw(self, context, fw, to_delete=False):
"""Return the router_ids either from fw dict or tenant routers."""
if self._has_router_insertion_fields(fw):
# it is a new version of plugin
return (fw['del-router-ids'] if to_delete
else fw['add-router-ids'])
else:
return [router['id'] for router in
self._get_routers_in_project(context, fw['tenant_id'])]
def _get_routers_in_project(self, context, project_id):
return self.core_plugin.get_routers(
context,
filters={'project_id': [project_id]})
def _router_dict_to_obj(self, r):
# The callbacks expect a router-info object
return router_info.RouterInfo(
None, r['id'], router=r,
agent_conf=None,
interface_driver=None,
use_ipv6=False)
def _get_router_info_list_for_tenant(self, router_ids, tenant_id):
"""Returns the list of router info objects on which to apply the fw."""
context = n_context.get_admin_context()
tenant_routers = self._get_routers_in_project(context, tenant_id)
return [self._router_dict_to_obj(ri) for ri in tenant_routers
if ri['id'] in router_ids]
def should_apply_firewall_to_router(self, context, router_id):
"""Return True if the FWaaS rules should be added to this router."""
if not self.fwaas_enabled:
return False
if not self._get_router_firewall_id(context.elevated(), router_id):
# No FWaas Firewall was assigned to this router
return False
return True

View File

@ -13,78 +13,29 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from neutron.agent.l3 import router_info
from neutron.common import config as neutron_config # noqa
from neutron_fwaas.db.firewall import firewall_db # noqa
from neutron_fwaas.db.firewall import firewall_router_insertion_db \
as fw_r_ins_db
from neutron_fwaas.services.firewall.agents.l3reference \
import firewall_l3_agent
from neutron_lib import context as n_context
from neutron_lib.plugins import directory
from vmware_nsx.services.fwaas.common import fwaas_callbacks as com_callbacks
LOG = logging.getLogger(__name__)
class NsxvFwaasCallbacks(firewall_l3_agent.L3WithFWaaS):
class NsxvFwaasCallbacks(com_callbacks.NsxFwaasCallbacks):
"""NSX-V RPC callbacks for Firewall As A Service - V1."""
def __init__(self):
# The super code needs a configuration object with the neutron host
# and an agent_mode, hich our driver doesn't use.
neutron_conf = cfg.CONF
neutron_conf.agent_mode = 'nsx'
super(NsxvFwaasCallbacks, self).__init__(conf=neutron_conf)
@property
def core_plugin(self):
return directory.get_plugin()
# Override functions using the agent_api that is not used by our plugin
def _get_router_ids_for_fw(self, context, fw, to_delete=False):
"""Return the router_ids either from fw dict or tenant routers."""
if self._has_router_insertion_fields(fw):
# it is a new version of plugin
return (fw['del-router-ids'] if to_delete
else fw['add-router-ids'])
else:
return [router['id'] for router in
self._get_routers_in_project(context, fw['tenant_id'])]
def _get_routers_in_project(self, context, project_id):
return self.core_plugin.get_routers(
context,
filters={'tenant_id': [project_id]})
def _router_dict_to_obj(self, r):
# The callbacks expect a router-info object
return router_info.RouterInfo(
None, r['id'], router=r,
agent_conf=None,
interface_driver=None,
use_ipv6=False)
def _get_router_info_list_for_tenant(self, router_ids, tenant_id):
"""Returns the list of router info objects on which to apply the fw."""
context = n_context.get_admin_context()
tenant_routers = self._get_routers_in_project(context, tenant_id)
return [self._router_dict_to_obj(ri) for ri in tenant_routers
if ri['id'] in router_ids]
def should_apply_firewall_to_router(self, context, router, router_id):
"""Return True if the FWaaS rules should be added to this router."""
if not self.fwaas_enabled:
return False
ctx_elevated = context.elevated()
if not self._get_router_firewall_id(ctx_elevated, router_id):
# No FWaas Firewall was assigned to this router
if not super(NsxvFwaasCallbacks, self).should_apply_firewall_to_router(
context, router_id):
return False
# get all the relevant router info
# ("router" does not have all the fields)
ctx_elevated = context.elevated()
router_data = self.core_plugin.get_router(ctx_elevated, router['id'])
if not router_data:
LOG.error("Couldn't read router %s data", router['id'])

View File

@ -0,0 +1,284 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from neutron_fwaas.common import exceptions
from neutron_fwaas.services.firewall.drivers import fwaas_base
from neutron_lib.api.definitions import constants as fwaas_consts
from neutron_lib import context as n_context
from neutron_lib.plugins import directory
from oslo_log import helpers as log_helpers
from oslo_log import log as logging
from vmware_nsx.db import db as nsx_db
from vmware_nsxlib.v3 import nsx_constants as consts
LOG = logging.getLogger(__name__)
FWAAS_DRIVER_NAME = 'Fwaas NSX-V3 driver'
RULE_NAME_PREFIX = 'Fwaas-'
DEFAULT_RULE_NAME = 'Default LR Layer3 Rule'
class EdgeFwaasV3Driver(fwaas_base.FwaasDriverBase):
"""NSX-V3 driver for Firewall As A Service - V1."""
def __init__(self):
LOG.debug("Loading FWaaS NsxV3Driver.")
super(EdgeFwaasV3Driver, self).__init__()
@property
def nsxlib(self):
return directory.get_plugin().nsxlib
@property
def nsx_firewall(self):
return self.nsxlib.firewall_section
@property
def nsx_router(self):
return self.nsxlib.logical_router
def should_apply_firewall_to_router(self, router_data):
"""Return True if the firewall rules should be added the router
Right now the driver supports for all routers.
"""
return True
@staticmethod
def _translate_action(fwaas_action, fwaas_rule_id):
"""Translate FWaaS action to NSX action"""
if fwaas_action == fwaas_consts.FWAAS_ALLOW:
return consts.FW_ACTION_ALLOW
if fwaas_action == fwaas_consts.FWAAS_DENY:
return consts.FW_ACTION_DROP
if fwaas_action == fwaas_consts.FWAAS_REJECT:
# reject is not supported by the nsx router firewall
LOG.warning("Reject action is not supported by the NSX backend "
"for router firewall. Using %(action)s instead for "
"rule %(id)s",
{'action': consts.FW_ACTION_DROP,
'id': fwaas_rule_id})
return consts.FW_ACTION_DROP
# Unexpected action
LOG.error("Unsupported FWAAS action %(action)s for rule %(id)s", {
'action': fwaas_action, 'id': fwaas_rule_id})
raise exceptions.FirewallInternalDriverError(
driver=FWAAS_DRIVER_NAME)
def _translate_cidr(self, cidr):
return self.nsx_firewall.get_ip_cidr_reference(
cidr,
consts.IPV6 if netaddr.valid_ipv6(cidr) else consts.IPV4)
def _translate_addresses(self, cidrs):
return [self._translate_cidr(ip) for ip in cidrs]
@staticmethod
def _translate_protocol(fwaas_protocol):
"""Translate FWaaS L4 protocol to NSX protocol"""
if fwaas_protocol.lower() == 'tcp':
return consts.TCP
if fwaas_protocol.lower() == 'udp':
return consts.UDP
if fwaas_protocol.lower() == 'icmp':
# This will cover icmpv6 too, when adding the rule.
return consts.ICMPV4
def _translate_services(self, fwaas_rule):
l4_protocol = self._translate_protocol(fwaas_rule['protocol'])
if l4_protocol in [consts.TCP, consts.UDP]:
source_ports = []
destination_ports = []
if fwaas_rule.get('source_port'):
source_ports = [fwaas_rule['source_port']]
if fwaas_rule.get('destination_port'):
destination_ports = [fwaas_rule['destination_port']]
return [self.nsx_firewall.get_nsservice(
consts.L4_PORT_SET_NSSERVICE,
l4_protocol=l4_protocol,
source_ports=source_ports,
destination_ports=destination_ports)]
elif l4_protocol == consts.ICMPV4:
# Add both icmp v4 & v6 services
return [
self.nsx_firewall.get_nsservice(
consts.ICMP_TYPE_NSSERVICE,
protocol=consts.ICMPV4),
self.nsx_firewall.get_nsservice(
consts.ICMP_TYPE_NSSERVICE,
protocol=consts.ICMPV6),
]
def _translate_rules(self, fwaas_rules):
translated_rules = []
for rule in fwaas_rules:
nsx_rule = {}
if not rule['enabled']:
# skip disabled rules
continue
# Make sure the rule has a name, and it starts with the prefix
# (backend max name length is 255)
if rule.get('name'):
name = RULE_NAME_PREFIX + rule['name']
else:
name = RULE_NAME_PREFIX + rule['id']
nsx_rule['display_name'] = name[:255]
if rule.get('description'):
nsx_rule['notes'] = rule['description']
nsx_rule['action'] = self._translate_action(
rule['action'], rule['id'])
if rule.get('destination_ip_address'):
nsx_rule['destinations'] = self._translate_addresses(
[rule['destination_ip_address']])
if rule.get('source_ip_address'):
nsx_rule['sources'] = self._translate_addresses(
[rule['source_ip_address']])
if rule.get('protocol', 'any') != 'any':
nsx_rule['services'] = self._translate_services(rule)
translated_rules.append(nsx_rule)
return translated_rules
def _create_or_update_firewall(self, agent_mode, apply_list, firewall):
# admin state down means default block rule firewall
if not firewall['admin_state_up']:
self.apply_default_policy(agent_mode, apply_list, firewall)
return
context = n_context.get_admin_context()
rules = self._translate_rules(firewall['firewall_rule_list'])
# update each router using the core plugin code
self._update_backend_routers(context, apply_list, rules=rules)
@log_helpers.log_method_call
def create_firewall(self, agent_mode, apply_list, firewall):
"""Create the Firewall with a given policy. """
self._create_or_update_firewall(agent_mode, apply_list, firewall)
@log_helpers.log_method_call
def update_firewall(self, agent_mode, apply_list, firewall):
"""Remove previous policy and apply the new policy."""
self._create_or_update_firewall(agent_mode, apply_list, firewall)
@log_helpers.log_method_call
def delete_firewall(self, agent_mode, apply_list, firewall):
"""Delete firewall.
Removes rules created by this instance from the backend firewall
And add the default allow rule.
"""
context = n_context.get_admin_context()
self._update_backend_routers(context, apply_list, delete_fw=True)
@log_helpers.log_method_call
def apply_default_policy(self, agent_mode, apply_list, firewall):
"""Apply the default policy (deny all).
The backend firewall always has this policy (=deny all) as default,
so we only need to delete the current rules.
"""
context = n_context.get_admin_context()
self._update_backend_routers(context, apply_list, rules=[])
def _update_backend_routers(self, context, apply_list, rules=None,
delete_fw=False):
# update each router using the core plugin code
for router_info in apply_list:
# Skip unsupported routers
if not self.should_apply_firewall_to_router(router_info.router):
continue
router_id = router_info.router_id
# update the routers firewall
if delete_fw:
self._delete_nsx_router_firewall(context, router_id)
else:
self._update_nsx_router_firewall(context, router_id, rules)
def _get_backend_router_and_fw_section(self, context, router_id):
# find the backend router id in the DB
nsx_router_id = nsx_db.get_nsx_router_id(context.session, router_id)
if nsx_router_id is None:
LOG.error("Didn't find nsx router for router %s", router_id)
raise exceptions.FirewallInternalDriverError(
driver=FWAAS_DRIVER_NAME)
# get the FW section id of the backend router
try:
section_id = self.nsx_router.get_firewall_section_id(
nsx_router_id)
except Exception as e:
LOG.error("Failed to find router firewall section for router "
"%(id)s: %(e)s", {'id': router_id, 'e': e})
raise exceptions.FirewallInternalDriverError(
driver=FWAAS_DRIVER_NAME)
if section_id is None:
LOG.error("Failed to find router firewall section for router "
"%(id)s.", {'id': router_id})
raise exceptions.FirewallInternalDriverError(
driver=FWAAS_DRIVER_NAME)
return nsx_router_id, section_id
def _delete_nsx_router_firewall(self, context, router_id):
"""Reset the router firewall back to it's default"""
# find the backend router and its firewall section
nsx_router_id, section_id = self._get_backend_router_and_fw_section(
context, router_id)
# Add default allow all rule
old_default_rule = self.nsx_firewall.get_default_rule(
section_id)
allow_all = {
'display_name': DEFAULT_RULE_NAME,
'action': consts.FW_ACTION_ALLOW,
'is_default': True,
'id': old_default_rule['id'] if old_default_rule else 0}
# Update the backend firewall section with the rules
self.nsx_firewall.update(section_id, rules=[allow_all])
def _update_nsx_router_firewall(self, context, router_id, rules):
"""Update the backend router firewall section
Adding all relevant north-south rules from the FWaaS firewall
and the default drop all rule
Since those rules do no depend on the router gateway/interfaces/ips
there is no need to call this method on each router update.
Just when the firewall changes.
"""
# find the backend router and its firewall section
nsx_router_id, section_id = self._get_backend_router_and_fw_section(
context, router_id)
# Add default drop all rule at the end
old_default_rule = self.nsx_firewall.get_default_rule(
section_id)
drop_all = {
'display_name': DEFAULT_RULE_NAME,
'action': consts.FW_ACTION_DROP,
'is_default': True,
'id': old_default_rule['id'] if old_default_rule else 0}
# Update the backend firewall section with the rules
self.nsx_firewall.update(section_id, rules=rules + [drop_all])

View File

@ -0,0 +1,60 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from vmware_nsx.common import exceptions as nsx_exc
from vmware_nsx.services.fwaas.common import fwaas_callbacks as com_callbacks
from vmware_nsxlib.v3 import nsx_constants as consts
LOG = logging.getLogger(__name__)
class Nsxv3FwaasCallbacks(com_callbacks.NsxFwaasCallbacks):
"""NSX-V3 RPC callbacks for Firewall As A Service - V1."""
def __init__(self, nsxlib):
super(Nsxv3FwaasCallbacks, self).__init__()
# Verify that the nsx backend supports FWaaS
if self.fwaas_enabled:
self.verify_backend_version(nsxlib)
def verify_backend_version(self, nsxlib):
if not nsxlib.feature_supported(consts.FEATURE_ROUTER_FIREWALL):
# router firewall is not supported
msg = (_("FWaaS is not supported by the NSX backend (version %s): "
"Router firewall is not supported") %
self.nsxlib.get_version())
raise nsx_exc.NsxPluginException(err_msg=msg)
def should_apply_firewall_to_router(self, context, router_id):
"""Return True if the FWaaS rules should be added to this router."""
if not super(Nsxv3FwaasCallbacks,
self).should_apply_firewall_to_router(context,
router_id):
return False
# get all the relevant router info
ctx_elevated = context.elevated()
router_data = self.core_plugin.get_router(ctx_elevated, router_id)
if not router_data:
LOG.error("Couldn't read router %s data", router_id)
return False
# Check if the FWaaS driver supports this router
if not self.fwaas_driver.should_apply_firewall_to_router(router_data):
return False
return True

View File

@ -0,0 +1,200 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
from vmware_nsx.services.fwaas.nsx_v3 import edge_fwaas_driver
from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_v3_plugin
from vmware_nsxlib.v3 import nsx_constants as consts
FAKE_FW_ID = 'fake_fw_uuid'
FAKE_ROUTER_ID = 'fake_rtr_uuid'
MOCK_NSX_ID = 'nsx_router_id'
MOCK_DEFAULT_RULE_ID = 'nsx_default_rule_id'
MOCK_SECTION_ID = 'sec_id'
DEFAULT_RULE = {'is_default': True,
'display_name': edge_fwaas_driver.DEFAULT_RULE_NAME,
'id': MOCK_DEFAULT_RULE_ID,
'action': consts.FW_ACTION_DROP}
class Nsxv3FwaasTestCase(test_v3_plugin.NsxV3PluginTestCaseMixin):
def setUp(self):
super(Nsxv3FwaasTestCase, self).setUp()
self.firewall = edge_fwaas_driver.EdgeFwaasV3Driver()
# Start some nsxlib/DB mocks
mock.patch(
"vmware_nsxlib.v3.core_resources.NsxLibLogicalRouter."
"get_firewall_section_id",
return_value=MOCK_SECTION_ID).start()
mock.patch(
"vmware_nsxlib.v3.security.NsxLibFirewallSection."
"get_default_rule",
return_value={'id': MOCK_DEFAULT_RULE_ID}).start()
mock.patch(
"vmware_nsx.db.db.get_nsx_router_id",
return_value=MOCK_NSX_ID).start()
def _default_rule(self, drop=True):
rule = DEFAULT_RULE
if drop:
rule['action'] = consts.FW_ACTION_DROP
else:
rule['action'] = consts.FW_ACTION_ALLOW
return rule
def _fake_rules_v4(self):
rule1 = {'enabled': True,
'action': 'allow',
'ip_version': 4,
'protocol': 'tcp',
'destination_port': '80',
'source_ip_address': '10.24.4.2',
'id': 'fake-fw-rule1',
'description': 'first rule'}
rule2 = {'enabled': True,
'action': 'reject',
'ip_version': 4,
'protocol': 'tcp',
'destination_port': '22',
'id': 'fake-fw-rule2'}
rule3 = {'enabled': True,
'action': 'deny',
'ip_version': 4,
'protocol': 'icmp',
'id': 'fake-fw-rule3'}
return [rule1, rule2, rule3]
def _fake_translated_rules(self):
# The expected translation of the rules in _fake_rules_v4
service1 = {'l4_protocol': 'TCP',
'resource_type': 'L4PortSetNSService',
'destination_ports': ['80'],
'source_ports': []}
rule1 = {'action': 'ALLOW',
'services': [{'service': service1}],
'sources': [{'target_id': '10.24.4.2',
'target_type': 'IPv4Address'}],
'display_name': 'Fwaas-fake-fw-rule1',
'notes': 'first rule'}
service2 = {'l4_protocol': 'TCP',
'resource_type': 'L4PortSetNSService',
'destination_ports': ['22'],
'source_ports': []}
rule2 = {'action': 'DROP', # Reject is replaced with deny
'services': [{'service': service2}],
'display_name': 'Fwaas-fake-fw-rule2'}
service3_1 = {'resource_type': 'ICMPTypeNSService',
'protocol': 'ICMPv4'}
service3_2 = {'resource_type': 'ICMPTypeNSService',
'protocol': 'ICMPv6'}
rule3 = {'action': 'DROP',
# icmp is translated to icmp v4 & v6
'services': [{'service': service3_1},
{'service': service3_2}],
'display_name': 'Fwaas-fake-fw-rule3'}
return [rule1, rule2, rule3]
def _fake_firewall_no_rule(self):
rule_list = []
fw_inst = {'id': FAKE_FW_ID,
'admin_state_up': True,
'tenant_id': 'tenant-uuid',
'firewall_rule_list': rule_list}
return fw_inst
def _fake_firewall(self, rule_list):
_rule_list = copy.deepcopy(rule_list)
for rule in _rule_list:
rule['position'] = str(_rule_list.index(rule))
fw_inst = {'id': FAKE_FW_ID,
'admin_state_up': True,
'tenant_id': 'tenant-uuid',
'firewall_rule_list': _rule_list}
return fw_inst
def _fake_firewall_with_admin_down(self, rule_list):
fw_inst = {'id': FAKE_FW_ID,
'admin_state_up': False,
'tenant_id': 'tenant-uuid',
'firewall_rule_list': rule_list}
return fw_inst
def _fake_apply_list(self, router_count=1):
apply_list = []
while router_count > 0:
router_inst = {'id': FAKE_ROUTER_ID}
router_info_inst = mock.Mock()
router_info_inst.router = router_inst
apply_list.append(router_info_inst)
router_count -= 1
return apply_list
def _setup_firewall_with_rules(self, func, router_count=1):
apply_list = self._fake_apply_list(router_count=router_count)
rule_list = self._fake_rules_v4()
firewall = self._fake_firewall(rule_list)
with mock.patch("vmware_nsxlib.v3.security.NsxLibFirewallSection."
"update") as update_fw:
func('nsx', apply_list, firewall)
self.assertEqual(router_count, update_fw.call_count)
update_fw.assert_called_with(
MOCK_SECTION_ID,
rules=self._fake_translated_rules() + [self._default_rule()])
def test_create_firewall_no_rules(self):
apply_list = self._fake_apply_list()
firewall = self._fake_firewall_no_rule()
with mock.patch("vmware_nsxlib.v3.security.NsxLibFirewallSection."
"update") as update_fw:
self.firewall.create_firewall('nsx', apply_list, firewall)
update_fw.assert_called_once_with(
MOCK_SECTION_ID,
rules=[self._default_rule()])
def test_create_firewall_with_rules(self):
self._setup_firewall_with_rules(self.firewall.create_firewall)
def test_create_firewall_with_rules_two_routers(self):
self._setup_firewall_with_rules(self.firewall.create_firewall,
router_count=2)
def test_update_firewall_with_rules(self):
self._setup_firewall_with_rules(self.firewall.update_firewall)
def test_delete_firewall(self):
apply_list = self._fake_apply_list()
firewall = self._fake_firewall_no_rule()
with mock.patch("vmware_nsxlib.v3.security.NsxLibFirewallSection."
"update") as update_fw:
self.firewall.delete_firewall('nsx', apply_list, firewall)
update_fw.assert_called_once_with(
MOCK_SECTION_ID,
rules=[self._default_rule(drop=False)])
def test_create_firewall_with_admin_down(self):
apply_list = self._fake_apply_list()
rule_list = self._fake_rules_v4()
firewall = self._fake_firewall_with_admin_down(rule_list)
with mock.patch("vmware_nsxlib.v3.security.NsxLibFirewallSection."
"update") as update_fw:
self.firewall.create_firewall('nsx', apply_list, firewall)
update_fw.assert_called_once_with(
MOCK_SECTION_ID,
rules=[self._default_rule()])

View File

@ -148,6 +148,11 @@ def _mock_nsx_backend_calls():
"vmware_nsxlib.v3.resources.LogicalDhcpServer.create_binding",
side_effect=_return_id_key).start()
mock.patch(
"vmware_nsxlib.v3.core_resources.NsxLibLogicalRouter."
"get_firewall_section_id",
side_effect=_return_id_key).start()
mock.patch(
"vmware_nsxlib.v3.NsxLib.get_version",
return_value='1.1.0').start()
@ -562,6 +567,11 @@ class L3NatTest(test_l3_plugin.L3BaseForIntTests, NsxV3PluginTestCaseMixin):
cfg.CONF.set_default('max_routes', 3)
self.addCleanup(restore_l3_attribute_map, self._l3_attribute_map_bk)
ext_mgr = ext_mgr or TestL3ExtensionManager()
mock_nsx_version = mock.patch.object(nsx_plugin.utils,
'is_nsx_version_2_0_0',
new=lambda v: True)
mock_nsx_version.start()
super(L3NatTest, self).setUp(
plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins)
self.plugin_instance = directory.get_plugin()