NSX|v: refactor FWaaS driver
The NSX-V FWaaS driver updated the backend router firewall using a different code than the plugin uses when the router is updated. This causes code duplication, and ,ultiple bugs. Now the driver will call the plugin in order to recreate all firewall rules. Change-Id: I3651cfc0ceafc81b28747476f98a82e30e41a2af
This commit is contained in:
parent
5e5b75c5ea
commit
41fb262771
|
@ -3593,18 +3593,47 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
|||
if router_id:
|
||||
self._update_edge_router(context, router_id)
|
||||
|
||||
def _update_subnets_and_dnat_firewall(self, context, router,
|
||||
def _update_subnets_and_dnat_firewall(self, context, router_db,
|
||||
router_id=None):
|
||||
# Note(asarfaty): If changing the order or names of rules here,
|
||||
# please take care of fwaas _get_other_backend_rules too.
|
||||
fw_rules = []
|
||||
"""Update the router edge firewall with all the relevant rules.
|
||||
|
||||
router_db is the neutron router structure
|
||||
router_id is the id of the actual router that will be updated on
|
||||
the NSX (in case of distributed router it can be plr or tlr)
|
||||
"""
|
||||
if not router_id:
|
||||
router_id = router['id']
|
||||
router_id = router_db['id']
|
||||
|
||||
# Add fw rules if FWaaS is enabled
|
||||
# in case of a distributed-router:
|
||||
# router['id'] is the id of the neutron router (=tlr)
|
||||
# and router_id is the plr/tlr (the one that is being updated)
|
||||
fwaas_rules = None
|
||||
if (self.fwaas_callbacks.should_apply_firewall_to_router(
|
||||
context, router_db, router_id)):
|
||||
fwaas_rules = self.fwaas_callbacks.get_fwaas_rules_for_router(
|
||||
context, router_db['id'])
|
||||
|
||||
self.update_router_firewall(context, router_id, router_db,
|
||||
fwaas_rules=fwaas_rules)
|
||||
|
||||
def update_router_firewall(self, context, router_id, router_db,
|
||||
fwaas_rules=None):
|
||||
"""Recreate all rules in the router edge firewall
|
||||
|
||||
router_db is the neutron router structure
|
||||
router_id is the id of the actual router that will be updated on
|
||||
the NSX (in case of distributed router it can be plr or tlr)
|
||||
if fwaas_rules is not none - this router is attached to a firewall
|
||||
"""
|
||||
fw_rules = []
|
||||
router_with_firewall = True if fwaas_rules is not None else False
|
||||
neutron_id = router_db['id']
|
||||
|
||||
# Add FW rule to open subnets firewall flows and static routes
|
||||
# relative flows
|
||||
subnet_cidrs = self._find_router_subnets_cidrs(context, router['id'])
|
||||
routes = self._get_extra_routes_by_router_id(context, router_id)
|
||||
subnet_cidrs = self._find_router_subnets_cidrs(context, neutron_id)
|
||||
routes = self._get_extra_routes_by_router_id(context, neutron_id)
|
||||
subnet_cidrs.extend([route['destination'] for route in routes])
|
||||
if subnet_cidrs:
|
||||
subnet_fw_rule = {
|
||||
|
@ -3619,17 +3648,13 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
|||
if self.metadata_proxy_handler:
|
||||
fw_rules += nsx_v_md_proxy.get_router_fw_rules()
|
||||
|
||||
# Add fw rules if FWaaS is enabled
|
||||
router_with_firewall = False
|
||||
if (self.fwaas_callbacks.should_apply_firewall_to_router(
|
||||
context, router, router_id)):
|
||||
fw_rules.extend(self.fwaas_callbacks.get_fwaas_rules_for_router(
|
||||
context, router['id']))
|
||||
router_with_firewall = True
|
||||
# Add FWaaS rules
|
||||
if router_with_firewall and fwaas_rules:
|
||||
fw_rules += fwaas_rules
|
||||
|
||||
if not router_with_firewall:
|
||||
# Add FW rule to open dnat firewall flows
|
||||
_, dnat_rules = self._get_nat_rules(context, router)
|
||||
_, dnat_rules = self._get_nat_rules(context, router_db)
|
||||
dnat_cidrs = [rule['dst'] for rule in dnat_rules]
|
||||
if dnat_cidrs:
|
||||
dnat_fw_rule = {
|
||||
|
@ -3641,7 +3666,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
|
|||
|
||||
# Add no-snat rules
|
||||
nosnat_fw_rules = self._get_nosnat_subnets_fw_rules(
|
||||
context, router)
|
||||
context, router_db)
|
||||
fw_rules.extend(nosnat_fw_rules)
|
||||
|
||||
# Get the load balancer rules in case they are refreshed
|
||||
|
|
|
@ -22,11 +22,7 @@ from neutron_fwaas.common import exceptions
|
|||
from neutron_fwaas.services.firewall.drivers import fwaas_base
|
||||
|
||||
from vmware_nsx.common import locking
|
||||
from vmware_nsx.plugins.nsx_v.vshield.common import (
|
||||
exceptions as vcns_exc)
|
||||
from vmware_nsx.plugins.nsx_v.vshield import edge_firewall_driver
|
||||
from vmware_nsx.plugins.nsx_v.vshield import edge_utils
|
||||
from vmware_nsx.plugins.nsx_v.vshield import vcns_driver
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
FWAAS_DRIVER_NAME = 'Fwaas NSX-V driver'
|
||||
|
@ -36,14 +32,17 @@ RULE_NAME_PREFIX = 'Fwaas-'
|
|||
class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
||||
"""NSX-V driver for Firewall As A Service - V1."""
|
||||
|
||||
@property
|
||||
def core_plugin(self):
|
||||
return directory.get_plugin()
|
||||
|
||||
@property
|
||||
def edge_manager(self):
|
||||
return directory.get_plugin().edge_manager
|
||||
return self.core_plugin.edge_manager
|
||||
|
||||
def __init__(self):
|
||||
LOG.debug("Loading FWaaS NsxVDriver.")
|
||||
super(EdgeFwaasDriver, self).__init__()
|
||||
self._nsxv = vcns_driver.VcnsDriver(None)
|
||||
|
||||
def should_apply_firewall_to_router(self, router_data):
|
||||
"""Return True if the firewall rules should be added the router
|
||||
|
@ -81,7 +80,7 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
|||
# Get edges for all the routers in the apply list.
|
||||
# note that shared routers are currently not supported
|
||||
edge_manager = self.edge_manager
|
||||
edges = []
|
||||
edges_map = {}
|
||||
for router_info in apply_list:
|
||||
|
||||
# No FWaaS rules needed if there is no external gateway
|
||||
|
@ -102,8 +101,9 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
|||
# look for the edge id in the DB
|
||||
edge_id = edge_utils.get_router_edge_id(context, lookup_id)
|
||||
if edge_id:
|
||||
edges.append(edge_id)
|
||||
return edges
|
||||
edges_map[router_id] = {'edge_id': edge_id,
|
||||
'lookup_id': lookup_id}
|
||||
return edges_map
|
||||
|
||||
def _translate_rules(self, fwaas_rules):
|
||||
translated_rules = []
|
||||
|
@ -128,91 +128,29 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
|||
|
||||
return translated_rules
|
||||
|
||||
def _is_allow_external_rule(self, rule):
|
||||
rule_name = rule.get('name', '')
|
||||
if rule_name == edge_firewall_driver.FWAAS_ALLOW_EXT_RULE_NAME:
|
||||
return True
|
||||
# For older routers, the allow-external rule didn't have a name
|
||||
# TODO(asarfaty): delete this in the future
|
||||
if (not rule_name and
|
||||
rule.get('action') == edge_firewall_driver.FWAAS_ALLOW and
|
||||
rule.get('destination_vnic_groups', []) == ['external']):
|
||||
return True
|
||||
return False
|
||||
def _set_rules_on_router_edge(self, context, router_id, neutron_id,
|
||||
edge_id, fw_id, translated_rules,
|
||||
delete_fw=False):
|
||||
"""Recreate router edge firewall rules
|
||||
|
||||
def _get_other_backend_rules(self, context, edge_id):
|
||||
"""Get a list of current backend rules from other applications
|
||||
|
||||
Those rules should stay on the backend firewall, when updating the
|
||||
Using the plugin code to recreate all the rules with the additional
|
||||
FWaaS rules.
|
||||
|
||||
Return it as 2 separate lists of rules, which should go before/after
|
||||
the fwaas rules.
|
||||
router_id is the is of the router about to be updated
|
||||
(in case of distributed router - the plr)
|
||||
neutron_id is the neutron router id
|
||||
"""
|
||||
try:
|
||||
backend_fw = self._nsxv.get_firewall(context, edge_id)
|
||||
backend_rules = backend_fw['firewall_rule_list']
|
||||
except vcns_exc.VcnsApiException:
|
||||
# Need to create a new one
|
||||
backend_rules = []
|
||||
# remove old FWaaS rules from the rules list.
|
||||
# also delete the allow-external rule, if it is there.
|
||||
# If necessary - we will add it again later
|
||||
before_rules = []
|
||||
after_rules = []
|
||||
go_after = False
|
||||
for rule_item in backend_rules:
|
||||
rule = rule_item['firewall_rule']
|
||||
rule_name = rule.get('name', '')
|
||||
fwaas_rule = rule_name.startswith(RULE_NAME_PREFIX)
|
||||
if fwaas_rule:
|
||||
# reached the fwaas part, the rest of the rules should be
|
||||
# in the 'after' list
|
||||
go_after = True
|
||||
if (rule_name == edge_firewall_driver.DNAT_RULE_NAME or
|
||||
rule_name == edge_firewall_driver.NO_SNAT_RULE_NAME):
|
||||
# passed the fwaas part, the rest of the rules should be
|
||||
# in the 'after' list
|
||||
go_after = True
|
||||
if (not fwaas_rule and
|
||||
not self._is_allow_external_rule(rule)):
|
||||
if go_after:
|
||||
after_rules.append(rule)
|
||||
else:
|
||||
before_rules.append(rule)
|
||||
|
||||
return before_rules, after_rules
|
||||
|
||||
def _set_rules_on_edge(self, context, edge_id, fw_id, translated_rules,
|
||||
allow_external=False):
|
||||
"""delete old FWaaS rules from the Edge, and add new ones
|
||||
|
||||
Note that the edge might have other FW rules like NAT or LBaas
|
||||
that should remain there.
|
||||
|
||||
allow_external is usually False because it shouldn't exist with a
|
||||
firewall. It should only be True when the firewall is being deleted.
|
||||
"""
|
||||
# Get the existing backend rules which do not belong to FWaaS
|
||||
backend_rules, after_rules = self._get_other_backend_rules(
|
||||
context, edge_id)
|
||||
|
||||
# add new FWaaS rules at the correct location by their original order
|
||||
backend_rules.extend(translated_rules)
|
||||
backend_rules.extend(after_rules)
|
||||
|
||||
# update the backend
|
||||
router_db = self.core_plugin._get_router(context, neutron_id)
|
||||
try:
|
||||
with locking.LockManager.get_lock(str(edge_id)):
|
||||
self._nsxv.update_firewall(
|
||||
edge_id,
|
||||
{'firewall_rule_list': backend_rules},
|
||||
context,
|
||||
allow_external=allow_external)
|
||||
self.core_plugin.update_router_firewall(
|
||||
context, router_id, router_db,
|
||||
fwaas_rules=translated_rules)
|
||||
except Exception as e:
|
||||
# catch known library exceptions and raise Fwaas generic exception
|
||||
LOG.error("Failed to update backend firewall %(fw)s: "
|
||||
"%(e)s", {'e': e, 'fw': fw_id})
|
||||
LOG.error("Failed to update firewall %(fw)s on edge %(edge_id)s: "
|
||||
"%(e)s", {'e': e, 'fw': fw_id, 'edge_id': edge_id})
|
||||
raise exceptions.FirewallInternalDriverError(
|
||||
driver=FWAAS_DRIVER_NAME)
|
||||
|
||||
|
@ -222,20 +160,27 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
|||
self.apply_default_policy(agent_mode, apply_list, firewall)
|
||||
return
|
||||
|
||||
# get router-edge mapping
|
||||
context = n_context.get_admin_context()
|
||||
|
||||
# Find out the relevant edges
|
||||
router_edges = self._get_routers_edges(context, apply_list)
|
||||
if not router_edges:
|
||||
LOG.warning("Cannot apply the firewall to any of the routers %s",
|
||||
apply_list)
|
||||
edges_map = self._get_routers_edges(context, apply_list)
|
||||
if not edges_map:
|
||||
routers = [r.router_id for r in apply_list]
|
||||
LOG.warning("Cannot apply the firewall %(fw)s to any of the "
|
||||
"routers %(rtrs)s",
|
||||
{'fw': firewall['id'], 'rtrs': routers})
|
||||
return
|
||||
|
||||
# Translate the FWaaS rules
|
||||
rules = self._translate_rules(firewall['firewall_rule_list'])
|
||||
# update each edge
|
||||
for edge_id in router_edges:
|
||||
self._set_rules_on_edge(
|
||||
context, edge_id, firewall['id'], rules)
|
||||
|
||||
# update each relevant edge with the new rules
|
||||
for router_info in apply_list:
|
||||
neutron_id = router_info.router_id
|
||||
info = edges_map.get(neutron_id)
|
||||
if info:
|
||||
self._set_rules_on_router_edge(
|
||||
context, info['lookup_id'], neutron_id, info['edge_id'],
|
||||
firewall['id'], rules)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def create_firewall(self, agent_mode, apply_list, firewall):
|
||||
|
@ -248,13 +193,22 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
|||
self._create_or_update_firewall(agent_mode, apply_list, firewall)
|
||||
|
||||
def _delete_firewall_or_set_default_policy(self, apply_list, firewall,
|
||||
allow_external):
|
||||
delete_fw=False):
|
||||
# get router-edge mapping
|
||||
context = n_context.get_admin_context()
|
||||
router_edges = self._get_routers_edges(context, apply_list)
|
||||
if router_edges:
|
||||
for edge_id in router_edges:
|
||||
self._set_rules_on_edge(context, edge_id, firewall['id'], [],
|
||||
allow_external=allow_external)
|
||||
edges_map = self._get_routers_edges(context, apply_list)
|
||||
|
||||
# if the firewall is deleted, rules should be None
|
||||
rules = None if delete_fw else []
|
||||
|
||||
# Go over all routers and update them on backend
|
||||
for router_info in apply_list:
|
||||
neutron_id = router_info.router_id
|
||||
info = edges_map.get(neutron_id)
|
||||
if info:
|
||||
self._set_rules_on_router_edge(
|
||||
context, info['lookup_id'], neutron_id, info['edge_id'],
|
||||
firewall['id'], rules, delete_fw=delete_fw)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def delete_firewall(self, agent_mode, apply_list, firewall):
|
||||
|
@ -264,7 +218,7 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
|||
And add the default allow-external rule.
|
||||
"""
|
||||
self._delete_firewall_or_set_default_policy(apply_list, firewall,
|
||||
allow_external=True)
|
||||
delete_fw=True)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def apply_default_policy(self, agent_mode, apply_list, firewall):
|
||||
|
@ -274,7 +228,7 @@ class EdgeFwaasDriver(fwaas_base.FwaasDriverBase):
|
|||
so we only need to delete the current rules.
|
||||
"""
|
||||
self._delete_firewall_or_set_default_policy(apply_list, firewall,
|
||||
allow_external=False)
|
||||
delete_fw=False)
|
||||
|
||||
def get_firewall_translated_rules(self, firewall):
|
||||
if firewall['admin_state_up']:
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
import copy
|
||||
import mock
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron_fwaas.common import exceptions
|
||||
|
||||
|
@ -28,8 +29,6 @@ class NsxvFwaasTestCase(test_v_plugin.NsxVPluginV2TestCase):
|
|||
def setUp(self):
|
||||
super(NsxvFwaasTestCase, self).setUp()
|
||||
self.firewall = edge_fwaas_driver.EdgeFwaasDriver()
|
||||
self.firewall._get_routers_edges = mock.Mock()
|
||||
self.firewall._get_routers_edges.return_value = ['edge-1']
|
||||
|
||||
def _fake_rules_v4(self):
|
||||
rule1 = {'enabled': True,
|
||||
|
@ -81,40 +80,60 @@ class NsxvFwaasTestCase(test_v_plugin.NsxVPluginV2TestCase):
|
|||
def _fake_apply_list(self, router_count=1):
|
||||
apply_list = []
|
||||
while router_count > 0:
|
||||
router_inst = {}
|
||||
rtr_id = uuidutils.generate_uuid()
|
||||
router_inst = {'id': rtr_id}
|
||||
router_info_inst = mock.Mock()
|
||||
router_info_inst.router = router_inst
|
||||
router_info_inst.router_id = rtr_id
|
||||
apply_list.append(router_info_inst)
|
||||
router_count -= 1
|
||||
return apply_list
|
||||
|
||||
def _get_fake_mapping(self, apply_list):
|
||||
router_edge_map = {}
|
||||
for router_info in apply_list:
|
||||
router_edge_map[router_info.router_id] = {
|
||||
'edge_id': 'edge-1',
|
||||
'lookup_id': router_info.router_id}
|
||||
return router_edge_map
|
||||
|
||||
def _setup_firewall_with_rules(self, func, router_count=1):
|
||||
apply_list = self._fake_apply_list(router_count=router_count)
|
||||
rule_list = self._fake_rules_v4()
|
||||
firewall = self._fake_firewall(rule_list)
|
||||
edges = ['edge-1'] * router_count
|
||||
with mock.patch.object(self.firewall._nsxv,
|
||||
"update_firewall") as update_fw,\
|
||||
edges = self._get_fake_mapping(apply_list)
|
||||
|
||||
with mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"update_router_firewall") as update_fw,\
|
||||
mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"_get_router"),\
|
||||
mock.patch.object(self.firewall,
|
||||
"_get_routers_edges", return_value=edges):
|
||||
func('nsx', apply_list, firewall)
|
||||
self.assertEqual(router_count, update_fw.call_count)
|
||||
backend_rules = update_fw.call_args[0][1]['firewall_rule_list']
|
||||
# Validate the args of the last call
|
||||
self.assertEqual(apply_list[-1].router_id,
|
||||
update_fw.call_args[0][1])
|
||||
backend_rules = update_fw.call_args[1]['fwaas_rules']
|
||||
self.assertEqual(len(rule_list), len(backend_rules))
|
||||
allow_ext = update_fw.call_args[1]['allow_external']
|
||||
self.assertEqual(False, allow_ext)
|
||||
|
||||
def test_create_firewall_no_rules(self):
|
||||
apply_list = self._fake_apply_list()
|
||||
firewall = self._fake_firewall_no_rule()
|
||||
with mock.patch.object(self.firewall._nsxv,
|
||||
"update_firewall") as update_fw:
|
||||
edges = self._get_fake_mapping(apply_list)
|
||||
with mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"update_router_firewall") as update_fw,\
|
||||
mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"_get_router"),\
|
||||
mock.patch.object(self.firewall,
|
||||
"_get_routers_edges", return_value=edges):
|
||||
self.firewall.create_firewall('nsx', apply_list, firewall)
|
||||
self.assertEqual(1, update_fw.call_count)
|
||||
backend_rules = update_fw.call_args[0][1]['firewall_rule_list']
|
||||
self.assertEqual(0, len(backend_rules))
|
||||
allow_ext = update_fw.call_args[1]['allow_external']
|
||||
self.assertEqual(False, allow_ext)
|
||||
# Validate the args of the last call
|
||||
self.assertEqual(apply_list[0].router_id,
|
||||
update_fw.call_args[0][1])
|
||||
backend_rules = update_fw.call_args[1]['fwaas_rules']
|
||||
self.assertEqual([], backend_rules)
|
||||
|
||||
def test_create_firewall_with_rules(self):
|
||||
self._setup_firewall_with_rules(self.firewall.create_firewall)
|
||||
|
@ -126,63 +145,42 @@ class NsxvFwaasTestCase(test_v_plugin.NsxVPluginV2TestCase):
|
|||
def test_update_firewall_with_rules(self):
|
||||
self._setup_firewall_with_rules(self.firewall.update_firewall)
|
||||
|
||||
def test_create_firewall_with_existing_backend_rules(self):
|
||||
apply_list = self._fake_apply_list()
|
||||
rule_list = self._fake_rules_v4()
|
||||
firewall = self._fake_firewall(rule_list)
|
||||
edge_id = 'edge-1'
|
||||
# backend fw already has 3 rules.
|
||||
# The fwaas rule should come in the middle, and replace the fwaas rule
|
||||
existing_rules = [{'name': 'Subnet Rule',
|
||||
'action': 'accept',
|
||||
'enabled': True},
|
||||
{'name': 'Fwaas-1',
|
||||
'action': 'deny',
|
||||
'enabled': True},
|
||||
{'name': 'DNAT Rule',
|
||||
'action': 'accept',
|
||||
'enabled': True}]
|
||||
for rule in existing_rules:
|
||||
self.firewall._nsxv.vcns.add_firewall_rule(edge_id, rule)
|
||||
|
||||
with mock.patch.object(self.firewall._nsxv,
|
||||
"update_firewall") as update_fw,\
|
||||
mock.patch.object(self.firewall,
|
||||
"_get_routers_edges", return_value=[edge_id]):
|
||||
self.firewall.create_firewall('nsx', apply_list, firewall)
|
||||
self.assertEqual(1, update_fw.call_count)
|
||||
backend_rules = update_fw.call_args[0][1]['firewall_rule_list']
|
||||
self.assertEqual(len(rule_list) + len(existing_rules) - 1,
|
||||
len(backend_rules))
|
||||
self.assertEqual('Subnet Rule', backend_rules[0]['name'])
|
||||
self.assertEqual('DNAT Rule', backend_rules[-1]['name'])
|
||||
allow_ext = update_fw.call_args[1]['allow_external']
|
||||
self.assertEqual(False, allow_ext)
|
||||
|
||||
def test_delete_firewall(self):
|
||||
apply_list = self._fake_apply_list()
|
||||
firewall = self._fake_firewall_no_rule()
|
||||
with mock.patch.object(self.firewall._nsxv,
|
||||
"update_firewall") as update_fw:
|
||||
edges = self._get_fake_mapping(apply_list)
|
||||
with mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"update_router_firewall") as update_fw,\
|
||||
mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"_get_router"),\
|
||||
mock.patch.object(self.firewall,
|
||||
"_get_routers_edges", return_value=edges):
|
||||
self.firewall.delete_firewall('nsx', apply_list, firewall)
|
||||
self.assertEqual(1, update_fw.call_count)
|
||||
backend_rules = update_fw.call_args[0][1]['firewall_rule_list']
|
||||
self.assertEqual(0, len(backend_rules))
|
||||
allow_ext = update_fw.call_args[1]['allow_external']
|
||||
self.assertEqual(True, allow_ext)
|
||||
# Validate the args of the last call
|
||||
self.assertEqual(apply_list[0].router_id,
|
||||
update_fw.call_args[0][1])
|
||||
backend_rules = update_fw.call_args[1]['fwaas_rules']
|
||||
self.assertIsNone(backend_rules)
|
||||
|
||||
def test_create_firewall_with_admin_down(self):
|
||||
apply_list = self._fake_apply_list()
|
||||
rule_list = self._fake_rules_v4()
|
||||
firewall = self._fake_firewall_with_admin_down(rule_list)
|
||||
with mock.patch.object(self.firewall._nsxv,
|
||||
"update_firewall") as update_fw:
|
||||
edges = self._get_fake_mapping(apply_list)
|
||||
with mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"update_router_firewall") as update_fw,\
|
||||
mock.patch("vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2."
|
||||
"_get_router"),\
|
||||
mock.patch.object(self.firewall,
|
||||
"_get_routers_edges", return_value=edges):
|
||||
self.firewall.create_firewall('nsx', apply_list, firewall)
|
||||
self.assertEqual(1, update_fw.call_count)
|
||||
backend_rules = update_fw.call_args[0][1]['firewall_rule_list']
|
||||
self.assertEqual(0, len(backend_rules))
|
||||
allow_ext = update_fw.call_args[1]['allow_external']
|
||||
self.assertEqual(False, allow_ext)
|
||||
# Validate the args of the last call
|
||||
self.assertEqual(apply_list[0].router_id,
|
||||
update_fw.call_args[0][1])
|
||||
backend_rules = update_fw.call_args[1]['fwaas_rules']
|
||||
self.assertEqual([], backend_rules)
|
||||
|
||||
def test_should_apply_firewall_to_router(self):
|
||||
router = {'id': 'fake_id',
|
||||
|
|
Loading…
Reference in New Issue