Freescale FWaaS Plugin code final decomposition.

This removes all the artifacts specific to Freescale FWaaS Plugin code
from the neutron-fwaas tree.

The code is maintained at github repository.

Setup option "fsl_firewall" for this plugin are removed in
change: I194a4da49058724766b7fde7343f85d19a75fe8c

UpgradeImpact

Change-Id: I5d419671cf3ce3dc10020949bc14bb9d8031da3f
Closes-Bug: #1519223
This commit is contained in:
TrinathSomanchi 2015-11-24 13:22:54 +05:30
parent 1cbc6f47b4
commit 69bc97741f
5 changed files with 0 additions and 621 deletions

View File

@ -1,11 +0,0 @@
Freescale Firewall as a Service Plugin
* For more information, refer to:
https://wiki.openstack.org/wiki/Freescale_Firewall_as_a_Service_Plugin
* For Information on Freescale CI, refer to:
https://wiki.openstack.org/wiki/ThirdPartySystems/Freescale_CI
* Freescale CI contact:
- fslosci@freescale.com
- trinath.somanchi@freescale.com

View File

@ -1,272 +0,0 @@
# Copyright 2015 Freescale, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron.common import rpc
from neutron.common import topics
from neutron.i18n import _LE
from neutron.plugins.common import constants as const
from neutron.plugins.ml2.drivers.freescale import config
from oslo_log import log as logging
from oslo_utils import excutils
from sqlalchemy.orm import exc
from neutron_fwaas.db.firewall import firewall_db
from neutron_fwaas.services.firewall import fwaas_plugin
LOG = logging.getLogger(__name__)
class FirewallCallbacks(fwaas_plugin.FirewallCallbacks):
"""Callbacks to handle CRD notifications to amqp."""
RPC_API_VERSION = '1.0'
def __init__(self, plugin):
self.plugin = plugin
self._client = self.plugin._client
def get_firewalls_for_tenant(self, context, **kwargs):
"""Get all Firewalls and rules for a tenant from CRD.
For all the firewalls created, check CRD for config_mode.
If it is Network Node, prepare the list.
Other config modes are handled by CRD internally.
"""
fw_list = []
for fw in self.plugin.get_firewalls(context):
fw_id = fw['id']
# get the firewall details from CRD service.
crd_fw_details = self._client.show_firewall(fw_id)
config_mode = crd_fw_details['firewall']['config_mode']
# get those FWs with config mode NetworkNode (NN) or None
if config_mode in ('NN', None):
fw_list.append(self.plugin._make_firewall_dict_with_rules(
context, fw_id))
return fw_list
class FirewallPlugin(firewall_db.Firewall_db_mixin):
"""Implementation of the Freescale Firewall Service Plugin.
This class manages the workflow of FWaaS request/response.
Existing Firewall database is used.
"""
supported_extension_aliases = ["fwaas"]
def __init__(self):
"""Do the initialization for the firewall service plugin here."""
self._client = config.get_crdclient()
self.endpoints = [FirewallCallbacks(self)]
self.conn = rpc.create_connection()
self.conn.create_consumer(
topics.FIREWALL_PLUGIN, self.endpoints, fanout=False)
self.conn.consume_in_threads()
def _update_firewall_status(self, context, firewall_id):
status_update = {"firewall": {"status": const.PENDING_UPDATE}}
super(FirewallPlugin, self).update_firewall(context, firewall_id,
status_update)
try:
self._client.update_firewall(firewall_id, status_update)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Failed to update firewall status (%s)."),
firewall_id)
def _update_firewall_policy(self, context, firewall_policy_id):
firewall_policy = self.get_firewall_policy(context, firewall_policy_id)
if firewall_policy:
for firewall_id in firewall_policy['firewall_list']:
self._update_firewall_status(context, firewall_id)
# Firewall Management
def create_firewall(self, context, firewall):
"""Create Firewall.
'PENDING' status updates are handled by CRD by posting messages
to AMQP (topics.FIREWALL_PLUGIN) that Firewall consumes to
update its status.
"""
firewall['firewall']['status'] = const.PENDING_CREATE
fw = super(FirewallPlugin, self).create_firewall(context, firewall)
try:
crd_firewall = {'firewall': fw}
self._client.create_firewall(crd_firewall)
except Exception:
with excutils.save_and_reraise_exception():
fw_id = fw['firewall']['id']
LOG.error(_LE("Failed to create firewall (%s)."),
fw_id)
super(FirewallPlugin, self).delete_firewall(context, fw_id)
return fw
def update_firewall(self, context, fw_id, firewall=None):
firewall['firewall']['status'] = const.PENDING_UPDATE
fw = super(FirewallPlugin,
self).update_firewall(context, fw_id, firewall)
try:
crd_firewall = {'firewall': fw}
self._client.update_firewall(fw_id, crd_firewall)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Failed to update firewall (%s)."), fw_id)
# TODO(trinaths):do rollback on error
return fw
def delete_db_firewall_object(self, context, fw_id):
firewall = self.get_firewall(context, fw_id)
if firewall['status'] in [const.PENDING_DELETE]:
try:
super(FirewallPlugin, self).delete_firewall(context, fw_id)
except exc.NoResultFound:
LOG.error(_LE("Delete Firewall (%s) DB object failed."),
fw_id)
def delete_firewall(self, context, fw_id):
status_update = {"firewall": {"status": const.PENDING_DELETE}}
super(FirewallPlugin, self).update_firewall(context, fw_id,
status_update)
try:
self._client.delete_firewall(fw_id)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Failed to delete firewall (%s)."), fw_id)
# TODO(trinaths):do rollback on error
# Firewall Policy Management
def create_firewall_policy(self, context, firewall_policy):
fw_policy = super(FirewallPlugin, self).create_firewall_policy(
context,
firewall_policy)
fw_policy.pop('firewall_list')
try:
crd_firewall_policy = {'firewall_policy': fw_policy}
self._client.create_firewall_policy(crd_firewall_policy)
except Exception:
with excutils.save_and_reraise_exception():
fwp_id = fw_policy['firewall_policy']['id']
LOG.error(_LE("Failed to create firewall policy (%s)."),
fwp_id)
super(FirewallPlugin, self).delete_firewall_policy(context,
fwp_id)
return fw_policy
def update_firewall_policy(self, context, fp_id, firewall_policy):
fw_policy = super(FirewallPlugin,
self).update_firewall_policy(context, fp_id,
firewall_policy)
self._update_firewall_policy(context, fp_id)
fw_policy.pop('firewall_list')
try:
crd_firewall_policy = {'firewall_policy': fw_policy}
self._client.update_firewall_policy(fp_id, crd_firewall_policy)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Update firewall policy failed (%s)."), fp_id)
# TODO(trinaths):do rollback on error
return fw_policy
def delete_firewall_policy(self, context, fp_id):
super(FirewallPlugin, self).delete_firewall_policy(context, fp_id)
try:
self._client.delete_firewall_policy(fp_id)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Delete Firewall Policy (%s) failed."),
fp_id)
# TODO(trinaths):do rollback on error
# Firewall Rule management
def create_firewall_rule(self, context, firewall_rule):
fw_rule = super(FirewallPlugin,
self).create_firewall_rule(context, firewall_rule)
try:
crd_firewall_rule = {'firewall_rule': fw_rule}
self._client.create_firewall_rule(crd_firewall_rule)
except Exception:
with excutils.save_and_reraise_exception():
fwr_id = fw_rule['firewall_rule']['id']
LOG.error(_LE("Failed to create firewall rule (%s)."),
fwr_id)
super(FirewallPlugin, self).delete_firewall_rule(context,
fwr_id)
return fw_rule
def update_firewall_rule(self, context, fr_id, firewall_rule):
fw_rule = super(FirewallPlugin,
self).update_firewall_rule(context, fr_id,
firewall_rule)
if fw_rule['firewall_policy_id']:
self._update_firewall_policy(
context,
fw_rule['firewall_policy_id'])
try:
crd_firewall_rule = {'firewall_rule': fw_rule}
self._client.update_firewall_rule(fr_id, crd_firewall_rule)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Failed to update firewall rule (%s)."), fr_id)
# TODO(trinaths):do rollback on error
return fw_rule
def delete_firewall_rule(self, context, fr_id):
fw_rule = self.get_firewall_rule(context, fr_id)
super(FirewallPlugin, self).delete_firewall_rule(context, fr_id)
if fw_rule['firewall_policy_id']:
self._update_firewall_policy(context,
fw_rule['firewall_policy_id'])
try:
self._client.delete_firewall_rule(fr_id)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Failed to delete firewall rule (%s)."),
fr_id)
# TODO(trinaths):do rollback on error
def insert_rule(self, context, rid, rule_info):
rule = super(FirewallPlugin,
self).insert_rule(context, rid, rule_info)
try:
self._client.firewall_policy_insert_rule(rid, rule_info)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Failed to insert rule %(rule)s into "
"firewall policy %(fwpid)s."),
{'rule': rule_info,
'fwpid': rid})
super(FirewallPlugin, self).remove_rule(context, rid,
rule_info)
self._update_firewall_policy(context, rid)
return rule
def remove_rule(self, context, rid, rule_info):
rule = super(FirewallPlugin,
self).remove_rule(context, rid, rule_info)
try:
self._client.firewall_policy_remove_rule(rid, rule_info)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Failed to remove rule %(rule)s from "
"firewall policy %(fwpid)s."),
{'rule': rule_info,
'fwpid': rid})
self._update_firewall_policy(context, rid)
return rule

View File

@ -1,338 +0,0 @@
# Copyright 2015 Freescale, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron import context
from neutron import manager
from webob import exc
from neutron.plugins.common import constants as const
from neutron_fwaas.tests.unit.db.firewall import (
test_firewall_db as test_db_firewall)
"""Unit testing for Freescale FWaaS Plugin."""
PLUGIN = ("neutron_fwaas.services.firewall.freescale"
".fwaas_plugin.FirewallPlugin")
class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
def setUp(self):
mock.patch('neutronclient.v2_0.client.Client').start()
super(TestFirewallCallbacks, self).setUp(fw_plugin=PLUGIN)
n_mgr = manager.NeutronManager
self.plugin = n_mgr.get_service_plugins()[const.FIREWALL]
self.callbacks = self.plugin.endpoints[0]
self.ctx = context.get_admin_context()
def test_get_firewalls_for_tenant(self):
tenant_id = 'test-tenant'
with self.firewall_rule(name='fwr1', tenant_id=tenant_id,
do_delete=False) as fr:
with self.firewall_policy(tenant_id=tenant_id,
do_delete=False) as fwp:
fwp_id = fwp['firewall_policy']['id']
fw_id = fr['firewall_rule']['id']
data = {'firewall_policy':
{'firewall_rules': [fw_id]}}
self.plugin.update_firewall_policy(self.ctx, fwp_id, data)
admin_state = test_db_firewall.ADMIN_STATE_UP
with self.firewall(firewall_policy_id=fwp_id,
tenant_id=tenant_id,
do_delete=False,
admin_state_up=admin_state) as fw:
self.callbacks.get_firewalls_for_tenant(self.ctx,
host='dummy')
fw_id = fw['firewall']['id']
fw['firewall']['config_mode'] = "NN"
self.plugin._client.show_firewall.assert_called_once_with(
fw_id)
self.plugin.delete_firewall(self.ctx, fw_id)
self.callbacks.firewall_deleted(self.ctx, fw_id)
self.plugin.delete_firewall_policy(self.ctx, fwp_id)
self.plugin.delete_firewall_rule(self.ctx, fr['firewall_rule']['id'])
class TestFreescaleFirewallPlugin(test_db_firewall.TestFirewallDBPlugin):
def setUp(self):
mock.patch('neutronclient.v2_0.client.Client').start()
super(TestFreescaleFirewallPlugin, self).setUp(fw_plugin=PLUGIN)
self.plugin = manager.NeutronManager.get_service_plugins()['FIREWALL']
self.callbacks = self.plugin.endpoints[0]
self.clnt = self.plugin._client
self.ctx = context.get_admin_context()
def test_create_firewall_with_admin_and_fwp_is_shared(self):
fw_name = "fw_with_shared_fwp"
with self.firewall_policy(do_delete=False, tenant_id="tenantX") as fwp:
fwp_id = fwp['firewall_policy']['id']
ctx = context.get_admin_context()
target_tenant = 'tenant1'
with self.firewall(name=fw_name,
firewall_policy_id=fwp_id,
tenant_id=target_tenant,
context=ctx,
do_delete=False,
admin_state_up=True) as fw:
self.assertEqual(target_tenant, fw['firewall']['tenant_id'])
fw_id = fw['firewall']['id']
self.plugin.delete_firewall(self.ctx, fw_id)
self.clnt.delete_firewall.assert_called_once_with(fw_id)
self.callbacks.firewall_deleted(self.ctx, fw_id)
def test_create_update_delete_firewall_rule(self):
"""Testing create, update and delete firewall rule."""
ctx = context.get_admin_context()
clnt = self.plugin._client
with self.firewall_rule(do_delete=False) as fwr:
fwr_id = fwr['firewall_rule']['id']
# Create Firewall Rule
crd_rule = {'firewall_rule': fwr}
clnt.create_firewall_rule.assert_called_once_with(fwr)
# Update Firewall Rule
data = {'firewall_rule': {'name': 'new_rule_name',
'source_port': '10:20',
'destination_port': '30:40'}}
fw_rule = self.plugin.update_firewall_rule(ctx, fwr_id, data)
crd_rule = {'firewall_rule': fw_rule}
clnt.update_firewall_rule.assert_called_once_with(fwr_id, crd_rule)
# Delete Firewall Rule
self.plugin.delete_firewall_rule(ctx, fwr_id)
clnt.delete_firewall_rule.assert_called_once_with(fwr_id)
def test_create_update_delete_firewall_policy(self):
"""Testing create, update and delete firewall policy."""
with self.firewall_policy(do_delete=False) as fwp:
fwp_id = fwp['firewall_policy']['id']
# Create Firewall Policy
crd_policy = {'firewall_policy': fwp}
self.clnt.create_firewall_policy.assert_called_once_with(fwp)
# Update Firewall Policy
data = {'firewall_policy': {'name': 'updated-name'}}
fwp = self.plugin.update_firewall_policy(self.ctx, fwp_id, data)
crd_policy = {'firewall_policy': fwp}
self.clnt.update_firewall_policy.assert_called_once_with(
fwp_id,
crd_policy)
# Delete Firewall Policy
self.plugin.delete_firewall_policy(self.ctx, fwp_id)
self.clnt.delete_firewall_policy.assert_called_once_with(fwp_id)
def test_create_firewall(self):
name = "firewall-fake"
expected_attrs = self._get_test_firewall_attrs(name)
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
expected_attrs['firewall_policy_id'] = fwp_id
with self.firewall(name=name,
firewall_policy_id=fwp_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
do_delete=False) as actual_firewall:
fw_id = actual_firewall['firewall']['id']
self.assertDictSupersetOf(expected_attrs,
actual_firewall['firewall'])
self.plugin.delete_firewall(self.ctx, fw_id)
self.clnt.delete_firewall.assert_called_once_with(fw_id)
self.callbacks.firewall_deleted(self.ctx, fw_id)
def test_show_firewall(self):
name = "firewall1"
expected_attrs = self._get_test_firewall_attrs(name)
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
expected_attrs['firewall_policy_id'] = fwp_id
with self.firewall(name=name,
firewall_policy_id=fwp_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
do_delete=False) as actual_firewall:
fw_id = actual_firewall['firewall']['id']
req = self.new_show_request('firewalls', fw_id,
fmt=self.fmt)
actual_fw = self.deserialize(self.fmt,
req.get_response(self.ext_api))
self.assertDictSupersetOf(expected_attrs,
actual_fw['firewall'])
self.plugin.delete_firewall(self.ctx, fw_id)
self.clnt.delete_firewall.assert_called_once_with(fw_id)
self.callbacks.firewall_deleted(self.ctx, fw_id)
def test_update_firewall(self):
name = "new_firewall1"
expected_attrs = self._get_test_firewall_attrs(name)
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
expected_attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
do_delete=False) as firewall:
fw_id = firewall['firewall']['id']
self.callbacks.set_firewall_status(self.ctx, fw_id,
const.ACTIVE)
data = {'firewall': {'name': name}}
req = self.new_update_request('firewalls', data, fw_id)
actual_fw = self.deserialize(self.fmt,
req.get_response(self.ext_api))
expected_attrs = self._replace_firewall_status(expected_attrs,
const.PENDING_CREATE,
const.PENDING_UPDATE)
self.assertDictSupersetOf(expected_attrs,
actual_fw['firewall'])
self.plugin.delete_firewall(self.ctx, fw_id)
self.clnt.delete_firewall.assert_called_once_with(fw_id)
self.callbacks.firewall_deleted(self.ctx, fw_id)
def test_update_firewall_with_fwp(self):
with self.firewall_policy() as fwp1, \
self.firewall_policy(shared=False, do_delete=False) as fwp2, \
self.firewall(firewall_policy_id=fwp1['firewall_policy']['id'],
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
do_delete=False) as firewall:
fw_id = firewall['firewall']['id']
fwp2_id = fwp2['firewall_policy']['id']
self.callbacks.set_firewall_status(self.ctx, fw_id, const.ACTIVE)
data = {'firewall': {'firewall_policy_id': fwp2_id}}
req = self.new_update_request('firewalls', data, fw_id)
res = req.get_response(self.ext_api)
self.assertEqual(200, res.status_int)
def test_update_firewall_with_shared_fwp(self):
with self.firewall_policy() as fwp1, \
self.firewall_policy(tenant_id='tenant2',
do_delete=False) as fwp2, \
self.firewall(firewall_policy_id=fwp1['firewall_policy']['id'],
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
do_delete=False) as firewall:
fw_id = firewall['firewall']['id']
fwp2_id = fwp2['firewall_policy']['id']
self.callbacks.set_firewall_status(self.ctx, fw_id, const.ACTIVE)
data = {'firewall': {'firewall_policy_id': fwp2_id}}
req = self.new_update_request('firewalls', data, fw_id)
res = req.get_response(self.ext_api)
self.assertEqual(200, res.status_int)
def test_update_firewall_with_admin_and_fwp_different_tenant(self):
with self.firewall_policy(do_delete=False) as fwp1, \
self.firewall_policy(tenant_id='tenant2', shared=False,
do_delete=False) as fwp2, \
self.firewall(firewall_policy_id=fwp1['firewall_policy']['id'],
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
do_delete=False) as firewall:
fw_id = firewall['firewall']['id']
fwp2_id = fwp2['firewall_policy']['id']
self.callbacks.set_firewall_status(self.ctx, fw_id, const.ACTIVE)
data = {'firewall': {'firewall_policy_id': fwp2_id}}
req = self.new_update_request('firewalls', data, fw_id)
res = req.get_response(self.ext_api)
self.assertEqual(409, res.status_int)
def test_list_firewalls(self):
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(name='fw1', firewall_policy_id=fwp_id,
description='fw') as fw1, \
self.firewall(name='fw2', firewall_policy_id=fwp_id,
description='fw') as fw2, \
self.firewall(name='fw3', firewall_policy_id=fwp_id,
description='fw') as fw3:
fwalls = [fw1, fw2, fw3]
self._test_list_resources('firewall', fwalls,
query_params='description=fw')
for fw in fwalls:
fw_id = fw['firewall']['id']
self.plugin.delete_firewall(self.ctx, fw_id)
self.callbacks.firewall_deleted(self.ctx, fw_id)
def test_delete_firewall_policy_with_firewall_association(self):
attrs = self._get_test_firewall_attrs()
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
do_delete=False)as fw:
fw_id = fw['firewall']['id']
req = self.new_delete_request('firewall_policies', fwp_id)
res = req.get_response(self.ext_api)
self.assertEqual(exc.HTTPConflict.code, res.status_int)
self.plugin.delete_firewall(self.ctx, fw_id)
self.clnt.delete_firewall.assert_called_once_with(fw_id)
self.callbacks.firewall_deleted(self.ctx, fw_id)
def test_update_firewall_policy_assoc_with_other_tenant_firewall(self):
with self.firewall_policy(shared=True, tenant_id='tenant1') as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
do_delete=False) as fw:
fw_id = fw['firewall']['id']
data = {'firewall_policy': {'shared': False}}
req = self.new_update_request('firewall_policies', data,
fwp['firewall_policy']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(exc.HTTPConflict.code, res.status_int)
self.plugin.delete_firewall(self.ctx, fw_id)
self.clnt.delete_firewall.assert_called_once_with(fw_id)
self.callbacks.firewall_deleted(self.ctx, fw_id)
def test_delete_firewall(self):
attrs = self._get_test_firewall_attrs()
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
do_delete=False) as firewall:
fw_id = firewall['firewall']['id']
attrs = self._replace_firewall_status(attrs,
const.PENDING_CREATE,
const.PENDING_DELETE)
req = self.new_delete_request('firewalls', fw_id)
res = req.get_response(self.ext_api)
self.assertEqual(exc.HTTPNoContent.code, res.status_int)
self.clnt.delete_firewall.assert_called_once_with(fw_id)
self.plugin.endpoints[0].firewall_deleted(self.ctx, fw_id)
def test_insert_remove_rule(self):
"""Testing Insert and Remove rule operations."""
status_update = {"firewall": {"status": 'PENDING_UPDATE'}}
with self.firewall_rule(name='fake_rule',
do_delete=False) as fr1:
fr_id = fr1['firewall_rule']['id']
with self.firewall_policy(do_delete=False) as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
do_delete=False) as fw:
fw_id = fw['firewall']['id']
# Insert Rule
rule_info = {'firewall_rule_id': fr_id}
self.plugin.insert_rule(self.ctx, fwp_id, rule_info)
fp_insert_rule = self.clnt.firewall_policy_insert_rule
fp_insert_rule.assert_called_once_with(fwp_id, rule_info)
self.clnt.update_firewall.assert_called_once_with(
fw_id,
status_update)
# Remove Rule
rule_info = {'firewall_rule_id': fr_id}
self.plugin.remove_rule(self.ctx, fwp_id, rule_info)
fp_remove_rule = self.clnt.firewall_policy_remove_rule
fp_remove_rule.assert_called_once_with(fwp_id, rule_info)
self.clnt.update_firewall.assert_called_with(fw_id,
status_update)
def test_create_firewall_with_dvr(self):
"""Skip DVR Testing."""
self.skipTest("DVR not supported")