Merge "Added 'shared' attribute for firewall resources"

This commit is contained in:
Jenkins 2017-04-10 06:48:30 +00:00 committed by Gerrit Code Review
commit 462511d09a
6 changed files with 178 additions and 144 deletions

View File

@ -42,7 +42,7 @@ class HasDescription(object):
class FirewallRuleV2(model_base.BASEV2, model_base.HasId, HasName,
HasDescription, model_base.HasProject):
__tablename__ = "firewall_rules_v2"
public = sa.Column(sa.Boolean)
shared = sa.Column(sa.Boolean)
protocol = sa.Column(sa.String(40))
ip_version = sa.Column(sa.Integer)
source_ip_address = sa.Column(sa.String(46))
@ -65,7 +65,6 @@ class FirewallGroup(model_base.BASEV2, model_base.HasId, HasName,
cascade='all, delete'))
name = sa.Column(sa.String(255))
description = sa.Column(sa.String(1024))
public = sa.Column(sa.Boolean)
ingress_firewall_policy_id = sa.Column(sa.String(36),
sa.ForeignKey(
'firewall_policies_v2.id'))
@ -74,6 +73,7 @@ class FirewallGroup(model_base.BASEV2, model_base.HasId, HasName,
'firewall_policies_v2.id'))
admin_state_up = sa.Column(sa.Boolean)
status = sa.Column(sa.String(16))
shared = sa.Column(sa.Boolean)
class FirewallGroupPortAssociation(model_base.BASEV2):
@ -109,7 +109,6 @@ class FirewallPolicy(model_base.BASEV2, model_base.HasId, HasName,
__tablename__ = 'firewall_policies_v2'
name = sa.Column(sa.String(255))
description = sa.Column(sa.String(1024))
public = sa.Column(sa.Boolean)
rule_count = sa.Column(sa.Integer)
audited = sa.Column(sa.Boolean)
rule_associations = orm.relationship(
@ -117,6 +116,7 @@ class FirewallPolicy(model_base.BASEV2, model_base.HasId, HasName,
backref=orm.backref('firewall_policies_v2', cascade='all, delete'),
order_by='FirewallPolicyRuleAssociation.position',
collection_class=ordering_list('position', count_from=1))
shared = sa.Column(sa.Boolean)
class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
@ -197,7 +197,6 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
'tenant_id': firewall_rule['tenant_id'],
'name': firewall_rule['name'],
'description': firewall_rule['description'],
'public': firewall_rule['public'],
'protocol': firewall_rule['protocol'],
'ip_version': firewall_rule['ip_version'],
'source_ip_address': firewall_rule['source_ip_address'],
@ -206,7 +205,8 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
'source_port': src_port_range,
'destination_port': dst_port_range,
'action': firewall_rule['action'],
'enabled': firewall_rule['enabled']}
'enabled': firewall_rule['enabled'],
'shared': firewall_rule['shared']}
return self._fields(res, fields)
def _make_firewall_policy_dict(self, firewall_policy, fields=None):
@ -217,9 +217,9 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
'tenant_id': firewall_policy['tenant_id'],
'name': firewall_policy['name'],
'description': firewall_policy['description'],
'public': firewall_policy['public'],
'audited': firewall_policy['audited'],
'firewall_rules': fw_rules}
'firewall_rules': fw_rules,
'shared': firewall_policy['shared']}
return self._fields(res, fields)
def _make_firewall_group_dict(self, firewall_group, fields=None):
@ -230,14 +230,14 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
'tenant_id': firewall_group['tenant_id'],
'name': firewall_group['name'],
'description': firewall_group['description'],
'public': firewall_group['public'],
'ingress_firewall_policy_id':
firewall_group['ingress_firewall_policy_id'],
'egress_firewall_policy_id':
firewall_group['egress_firewall_policy_id'],
'admin_state_up': firewall_group['admin_state_up'],
'ports': fwg_ports,
'status': firewall_group['status']}
'status': firewall_group['status'],
'shared': firewall_group['shared']}
return self._fields(res, fields)
def _get_policy_ordered_rules(self, context, policy_id):
@ -265,7 +265,7 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
return firewall_group
def _check_firewall_rule_conflict(self, fwr_db, fwp_db):
if not fwr_db['public']:
if not fwr_db['shared']:
if fwr_db['tenant_id'] != fwp_db['tenant_id']:
raise fw_ext.FirewallRuleConflict(
firewall_rule_id=fwr_db['id'],
@ -342,7 +342,6 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
tenant_id=fwr['tenant_id'],
name=fwr['name'],
description=fwr['description'],
public=fwr['public'],
protocol=fwr['protocol'],
ip_version=fwr['ip_version'],
source_ip_address=fwr['source_ip_address'],
@ -352,7 +351,8 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
destination_port_range_min=dst_port_min,
destination_port_range_max=dst_port_max,
action=fwr['action'],
enabled=fwr['enabled'])
enabled=fwr['enabled'],
shared=fwr['shared'])
context.session.add(fwr_db)
return self._make_firewall_rule_dict(fwr_db)
@ -524,32 +524,32 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
# Bail as soon as we find an invalid rule.
raise fw_ext.FirewallRuleNotFound(
firewall_rule_id=fwrule_id)
if 'public' in fwp:
if fwp['public'] and not rules_dict[fwrule_id]['public']:
if 'shared' in fwp:
if fwp['shared'] and not rules_dict[fwrule_id]['shared']:
raise fw_ext.FirewallRuleSharingConflict(
firewall_rule_id=fwrule_id,
firewall_policy_id=fwp_db['id'])
elif fwp_db['public'] and not rules_dict[fwrule_id]['public']:
elif fwp_db['shared'] and not rules_dict[fwrule_id]['shared']:
raise fw_ext.FirewallRuleSharingConflict(
firewall_rule_id=fwrule_id,
firewall_policy_id=fwp_db['id'])
else:
# the policy is not public, the rule and policy should be in
# the same project if the rule is not public.
if not rules_dict[fwrule_id]['public']:
if (rules_dict[fwrule_id]['tenant_id'] !=
fwp_db['tenant_id']):
# the policy is not shared, the rule and policy should be in
# the same project if the rule is not shared.
if not rules_dict[fwrule_id]['shared']:
if (rules_dict[fwrule_id]['tenant_id'] != fwp_db[
'tenant_id']):
raise fw_ext.FirewallRuleConflict(
firewall_rule_id=fwrule_id,
tenant_id=rules_dict[fwrule_id]['tenant_id'])
def _check_if_rules_public_for_policy_public(self, context, fwp_db, fwp):
if fwp['public']:
def _check_if_rules_shared_for_policy_shared(self, context, fwp_db, fwp):
if fwp['shared']:
rules_in_db = fwp_db.rule_associations
for entry in rules_in_db:
fwr_db = self._get_firewall_rule(context,
entry.firewall_rule_id)
if not fwr_db['public']:
if not fwp_db['shared']:
raise fw_ext.FirewallPolicySharingConflict(
firewall_rule_id=fwr_db['id'],
firewall_policy_id=fwp_db['id'])
@ -626,8 +626,8 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
tenant_id=fwp['tenant_id'],
name=fwp['name'],
description=fwp['description'],
public=fwp['public'],
audited=fwp['audited'])
audited=fwp['audited'],
shared=fwp['shared'])
context.session.add(fwp_db)
self._set_rules_for_policy(context, fwp_db, fwp)
return self._make_firewall_policy_dict(fwp_db)
@ -637,13 +637,13 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
fwp = firewall_policy['firewall_policy']
with context.session.begin(subtransactions=True):
fwp_db = self._get_firewall_policy(context, id)
if not fwp.get('public', True):
# an update is setting public to False, make sure associated
if not fwp.get('shared', True):
# an update is setting shared to False, make sure associated
# firewall groups are in the same project.
self._check_fwgs_associated_with_policy_in_same_project(
context, id, fwp_db['tenant_id'])
if 'public' in fwp and 'firewall_rules' not in fwp:
self._check_if_rules_public_for_policy_public(
if 'shared' in fwp and 'firewall_rules' not in fwp:
self._check_if_rules_shared_for_policy_shared(
context, fwp_db, fwp)
if 'firewall_rules' in fwp:
self._set_rules_for_policy(context, fwp_db, fwp)
@ -685,7 +685,7 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
fwp_id = fwg['ingress_firewall_policy_id']
if fwp_id is not None:
fwp = self._get_firewall_policy(context, fwp_id)
if fwg_tenant_id != fwp['tenant_id'] and not fwp['public']:
if fwg_tenant_id != fwp['tenant_id'] and not fwp['shared']:
raise fw_ext.FirewallPolicyConflict(
firewall_policy_id=fwp_id)
@ -693,7 +693,7 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
fwp_id = fwg['egress_firewall_policy_id']
if fwp_id is not None:
fwp = self._get_firewall_policy(context, fwp_id)
if fwg_tenant_id != fwp['tenant_id'] and not fwp['public']:
if fwg_tenant_id != fwp['tenant_id'] and not fwp['shared']:
raise fw_ext.FirewallPolicyConflict(
firewall_policy_id=fwp_id)
return
@ -754,11 +754,11 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
tenant_id=fwg['tenant_id'],
name=fwg['name'],
description=fwg['description'],
public=fwg['public'],
status=status,
ingress_firewall_policy_id=fwg['ingress_firewall_policy_id'],
egress_firewall_policy_id=fwg['egress_firewall_policy_id'],
admin_state_up=fwg['admin_state_up'])
admin_state_up=fwg['admin_state_up'],
shared=fwg['shared'])
context.session.add(fwg_db)
self._set_ports_for_firewall_group(context, fwg_db, fwg)
return self._make_firewall_group_dict(fwg_db)

View File

@ -1 +1 @@
f83a0b2964d0
fd38cd995cc0

View File

@ -0,0 +1,37 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""change shared attribute for firewall resource
Revision ID: fd38cd995cc0
Revises: f83a0b2964d0
Create Date: 2017-03-31 14:22:21.063392
"""
# revision identifiers, used by Alembic.
revision = 'fd38cd995cc0'
down_revision = 'f83a0b2964d0'
depends_on = ('d6a12e637e28',)
from alembic import op
import sqlalchemy as sa
def upgrade():
op.alter_column('firewall_rules_v2', 'public', new_column_name='shared',
existing_type=sa.Boolean)
op.alter_column('firewall_groups_v2', 'public', new_column_name='shared',
existing_type=sa.Boolean)
op.alter_column('firewall_policies_v2', 'public', new_column_name='shared',
existing_type=sa.Boolean)

View File

@ -72,36 +72,36 @@ class FirewallPolicyConflict(nexception.Conflict):
"""FWaaS exception for firewall policy
Occurs when admin policy tries to use another tenant's policy that
is not public.
is not shared.
"""
message = _("Operation cannot be performed since Firewall Policy "
"%(firewall_policy_id)s is not public and does not belong to "
"%(firewall_policy_id)s is not shared and does not belong to "
"your tenant.")
class FirewallRuleSharingConflict(nexception.Conflict):
"""FWaaS exception for firewall rules
This exception will be raised when a public policy is created or
updated with rules that are not public.
This exception will be raised when a shared policy is created or
updated with rules that are not shared.
"""
message = _("Operation cannot be performed since Firewall Policy "
"%(firewall_policy_id)s is public but Firewall Rule "
"%(firewall_rule_id)s is not public")
"%(firewall_policy_id)s is shared but Firewall Rule "
"%(firewall_rule_id)s is not shared.")
class FirewallPolicySharingConflict(nexception.Conflict):
"""FWaaS exception for firewall policy
When a policy is public without sharing its associated rules,
When a policy is 'shared' without sharing its associated rules,
this exception will be raised.
"""
message = _("Operation cannot be performed. Before sharing Firewall "
"Policy %(firewall_policy_id)s, share associated Firewall "
"Rule %(firewall_rule_id)s")
"Rule %(firewall_rule_id)s.")
class FirewallRuleNotFound(nexception.NotFound):
@ -147,7 +147,7 @@ class FirewallRuleInfoMissing(nexception.InvalidInput):
class FirewallIpAddressConflict(nexception.InvalidInput):
message = _("Invalid input - IP addresses do not agree with IP Version")
message = _("Invalid input - IP addresses do not agree with IP Version.")
class FirewallInternalDriverError(nexception.NeutronException):
@ -164,12 +164,12 @@ class FirewallRuleConflict(nexception.Conflict):
"""Firewall rule conflict exception.
Occurs when admin policy tries to use another tenant's rule that is
not public
not shared
"""
message = _("Operation cannot be performed since Firewall Rule "
"%(firewall_rule_id)s is not public and belongs to "
"another tenant %(tenant_id)s")
"%(firewall_rule_id)s is not shared and belongs to "
"another tenant %(tenant_id)s.")
class FirewallRuleAlreadyAssociated(nexception.Conflict):
@ -181,7 +181,7 @@ class FirewallRuleAlreadyAssociated(nexception.Conflict):
message = _("Operation cannot be performed since Firewall Rule "
"%(firewall_rule_id)s is already associated with Firewall"
"Policy %(firewall_policy_id)s")
"Policy %(firewall_policy_id)s.")
RESOURCE_ATTRIBUTE_MAP = {
@ -204,7 +204,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'firewall_policy_id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid_or_none': None},
'is_visible': True},
'public': {'allow_post': True, 'allow_put': True,
'shared': {'allow_post': True, 'allow_put': True,
'default': False, 'is_visible': True,
'convert_to': converters.convert_to_boolean,
'required_by_policy': True, 'enforce_policy': True},
@ -260,7 +260,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'convert_to': converters.convert_to_boolean},
'status': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'public': {'allow_post': True, 'allow_put': True, 'default': False,
'shared': {'allow_post': True, 'allow_put': True, 'default': False,
'convert_to': converters.convert_to_boolean,
'is_visible': True, 'required_by_policy': True,
'enforce_policy': True},
@ -301,7 +301,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'validate': {'type:string':
nl_db_constants.DESCRIPTION_FIELD_SIZE},
'is_visible': True, 'default': ''},
'public': {'allow_post': True, 'allow_put': True, 'default': False,
'shared': {'allow_post': True, 'allow_put': True, 'default': False,
'convert_to': converters.convert_to_boolean,
'is_visible': True, 'required_by_policy': True,
'enforce_policy': True},

View File

@ -43,7 +43,6 @@ FWAAS_PLUGIN = 'neutron_fwaas.services.firewall.fwaas_plugin_v2'
DELETEFW_PATH = FWAAS_PLUGIN + '.FirewallAgentApi.delete_firewall_group'
extensions_path = ':'.join(extensions.__path__)
DESCRIPTION = 'default description'
PUBLIC = True
PROTOCOL = 'tcp'
IP_VERSION = 4
SOURCE_IP_ADDRESS_RAW = '1.1.1.1'
@ -54,6 +53,7 @@ ACTION = 'allow'
AUDITED = True
ENABLED = True
ADMIN_STATE_UP = True
SHARED = True
class FakeAgentApi(fwaas_plugin_v2.FirewallCallbacks):
@ -132,7 +132,6 @@ class FirewallPluginV2DbTestCase(base.NeutronDbPluginV2TestCase):
attrs = {'name': name,
'tenant_id': self._tenant_id,
'project_id': self._tenant_id,
'public': PUBLIC,
'protocol': PROTOCOL,
'ip_version': IP_VERSION,
'source_ip_address': SOURCE_IP_ADDRESS_RAW,
@ -140,7 +139,8 @@ class FirewallPluginV2DbTestCase(base.NeutronDbPluginV2TestCase):
'source_port': SOURCE_PORT,
'destination_port': DESTINATION_PORT,
'action': ACTION,
'enabled': ENABLED}
'enabled': ENABLED,
'shared': SHARED}
return attrs
def _get_test_firewall_policy_attrs(self, name='firewall_policy1',
@ -149,9 +149,9 @@ class FirewallPluginV2DbTestCase(base.NeutronDbPluginV2TestCase):
'description': DESCRIPTION,
'tenant_id': self._tenant_id,
'project_id': self._tenant_id,
'public': PUBLIC,
'firewall_rules': [],
'audited': audited}
'audited': audited,
'shared': SHARED}
return attrs
def _get_test_firewall_group_attrs(self, name='firewall_1',
@ -164,7 +164,7 @@ class FirewallPluginV2DbTestCase(base.NeutronDbPluginV2TestCase):
return attrs
def _create_firewall_policy(self, fmt, name, description, public,
def _create_firewall_policy(self, fmt, name, description, shared,
firewall_rules, audited,
expected_res_status=None, **kwargs):
tenant_id = kwargs.get('tenant_id', self._tenant_id)
@ -172,9 +172,9 @@ class FirewallPluginV2DbTestCase(base.NeutronDbPluginV2TestCase):
'description': description,
'tenant_id': tenant_id,
'project_id': tenant_id,
'public': public,
'firewall_rules': firewall_rules,
'audited': audited}}
'audited': audited,
'shared': shared}}
fw_policy_req = self.new_create_request('firewall_policies', data, fmt)
fw_policy_res = fw_policy_req.get_response(self.ext_api)
@ -190,16 +190,15 @@ class FirewallPluginV2DbTestCase(base.NeutronDbPluginV2TestCase):
@contextlib.contextmanager
def firewall_policy(self, fmt=None, name='firewall_policy1',
description=DESCRIPTION, public=True,
description=DESCRIPTION, shared=SHARED,
firewall_rules=None, audited=True,
do_delete=True, **kwargs):
if firewall_rules is None:
firewall_rules = []
if not fmt:
fmt = self.fmt
res = self._create_firewall_policy(fmt, name, description, public,
firewall_rules, audited,
**kwargs)
res = self._create_firewall_policy(fmt, name, description, shared,
firewall_rules, audited, **kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
firewall_policy = self.deserialize(fmt or self.fmt, res)
@ -208,7 +207,7 @@ class FirewallPluginV2DbTestCase(base.NeutronDbPluginV2TestCase):
self._delete('firewall_policies',
firewall_policy['firewall_policy']['id'])
def _create_firewall_rule(self, fmt, name, public, protocol,
def _create_firewall_rule(self, fmt, name, shared, protocol,
ip_version, source_ip_address,
destination_ip_address, source_port,
destination_port, action, enabled,
@ -217,7 +216,6 @@ class FirewallPluginV2DbTestCase(base.NeutronDbPluginV2TestCase):
data = {'firewall_rule': {'name': name,
'tenant_id': tenant_id,
'project_id': tenant_id,
'public': public,
'protocol': protocol,
'ip_version': ip_version,
'source_ip_address': source_ip_address,
@ -226,7 +224,8 @@ class FirewallPluginV2DbTestCase(base.NeutronDbPluginV2TestCase):
'source_port': source_port,
'destination_port': destination_port,
'action': action,
'enabled': enabled}}
'enabled': enabled,
'shared': shared}}
fw_rule_req = self.new_create_request('firewall_rules', data, fmt)
fw_rule_res = fw_rule_req.get_response(self.ext_api)
@ -237,7 +236,7 @@ class FirewallPluginV2DbTestCase(base.NeutronDbPluginV2TestCase):
@contextlib.contextmanager
def firewall_rule(self, fmt=None, name='firewall_rule1',
public=PUBLIC, protocol=PROTOCOL, ip_version=IP_VERSION,
shared=SHARED, protocol=PROTOCOL, ip_version=IP_VERSION,
source_ip_address=SOURCE_IP_ADDRESS_RAW,
destination_ip_address=DESTINATION_IP_ADDRESS_RAW,
source_port=SOURCE_PORT,
@ -246,7 +245,7 @@ class FirewallPluginV2DbTestCase(base.NeutronDbPluginV2TestCase):
do_delete=True, **kwargs):
if not fmt:
fmt = self.fmt
res = self._create_firewall_rule(fmt, name, public, protocol,
res = self._create_firewall_rule(fmt, name, shared, protocol,
ip_version, source_ip_address,
destination_ip_address,
source_port, destination_port,
@ -270,9 +269,8 @@ class FirewallPluginV2DbTestCase(base.NeutronDbPluginV2TestCase):
if default_policy:
res = self._create_firewall_policy(fmt, 'fwp',
description=DESCRIPTION,
public=True,
shared=SHARED,
firewall_rules=[],
tenant_id=tenant_id,
audited=AUDITED)
firewall_policy = self.deserialize(fmt or self.fmt, res)
fwp_id = firewall_policy["firewall_policy"]["id"]
@ -375,9 +373,9 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
name = "firewall_policy1"
attrs = self._get_test_firewall_policy_attrs(name)
with self.firewall_policy(name=name, public=PUBLIC,
firewall_rules=None,
audited=AUDITED) as firewall_policy:
with self.firewall_policy(name=name, shared=SHARED,
firewall_rules=None, audited=AUDITED
) as firewall_policy:
for k, v in six.iteritems(attrs):
self.assertEqual(v, firewall_policy['firewall_policy'][k])
@ -391,18 +389,18 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
fr = [fwr1, fwr2, fwr3]
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
attrs['firewall_rules'] = fw_rule_ids
with self.firewall_policy(name=name, public=PUBLIC,
with self.firewall_policy(name=name, shared=SHARED,
firewall_rules=fw_rule_ids,
audited=AUDITED) as fwp:
for k, v in six.iteritems(attrs):
self.assertEqual(v, fwp['firewall_policy'][k])
def test_create_admin_firewall_policy_with_other_tenant_rules(self):
with self.firewall_rule(public=False) as fr:
with self.firewall_rule(shared=False) as fr:
fw_rule_ids = [fr['firewall_rule']['id']]
res = self._create_firewall_policy(None, 'firewall_policy1',
description=DESCRIPTION,
public=PUBLIC,
shared=SHARED,
firewall_rules=fw_rule_ids,
audited=AUDITED,
tenant_id='admin-tenant')
@ -412,27 +410,28 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
with self.firewall_rule() as fwr:
fw_rule_ids = [fwr['firewall_rule']['id']]
with self.firewall_policy(firewall_rules=fw_rule_ids):
with self.firewall_policy(firewall_rules=fw_rule_ids,
public=PUBLIC) as fwp2:
with self.firewall_policy(shared=SHARED,
firewall_rules=fw_rule_ids) as fwp2:
self.assertEqual(
fwr['firewall_rule']['id'],
fwp2['firewall_policy']['firewall_rules'][0])
def test_create_public_firewall_policy_with_nonpublic_rule(self):
with self.firewall_rule(public=False) as fwr:
def test_create_shared_firewall_policy_with_nonshared_rule(self):
with self.firewall_rule(shared=False) as fwr:
fw_rule_ids = [fwr['firewall_rule']['id']]
res = self._create_firewall_policy(
None, 'firewall_policy1', description=DESCRIPTION, public=True,
firewall_rules=fw_rule_ids, audited=AUDITED)
res = self._create_firewall_policy(None, 'firewall_policy1',
description=DESCRIPTION,
shared=SHARED,
firewall_rules=fw_rule_ids,
audited=AUDITED)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_show_firewall_policy(self):
name = "firewall_policy1"
attrs = self._get_test_firewall_policy_attrs(name)
with self.firewall_policy(name=name, public=PUBLIC,
firewall_rules=None,
audited=AUDITED) as fwp:
with self.firewall_policy(name=name, shared=SHARED,
firewall_rules=None, audited=AUDITED) as fwp:
req = self.new_show_request('firewall_policies',
fwp['firewall_policy']['id'],
fmt=self.fmt)
@ -453,8 +452,7 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
name = "new_firewall_policy1"
attrs = self._get_test_firewall_policy_attrs(name, audited=False)
with self.firewall_policy(public=PUBLIC,
firewall_rules=None,
with self.firewall_policy(shared=SHARED, firewall_rules=None,
audited=AUDITED) as fwp:
data = {'firewall_policy': {'name': name}}
req = self.new_update_request('firewall_policies', data,
@ -464,8 +462,7 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
self.assertEqual(v, res['firewall_policy'][k])
def _test_update_firewall_policy(self, with_audited):
with self.firewall_policy(name='firewall_policy1',
description='fwp',
with self.firewall_policy(name='firewall_policy1', description='fwp',
audited=AUDITED) as fwp:
attrs = self._get_test_firewall_policy_attrs(audited=with_audited)
data = {'firewall_policy':
@ -613,11 +610,11 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
for k, v in six.iteritems(attrs):
self.assertEqual(v, res['firewall_policy'][k])
def test_update_public_firewall_policy_with_nonpublic_rule(self):
with self.firewall_rule(name='fwr1', public=False) as fr:
def test_update_shared_firewall_policy_with_nonshared_rule(self):
with self.firewall_rule(name='fwr1', shared=False) as fr:
with self.firewall_policy() as fwp:
fw_rule_ids = [fr['firewall_rule']['id']]
# update public policy with nonpublic rule
# update shared policy with nonshared rule
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
req = self.new_update_request('firewall_policies', data,
@ -625,36 +622,36 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_update_firewall_policy_with_public_attr_nonpublic_rule(self):
with self.firewall_rule(name='fwr1', public=False) as fr:
with self.firewall_policy(public=False) as fwp:
def test_update_firewall_policy_with_shared_attr_nonshared_rule(self):
with self.firewall_rule(name='fwr1', shared=False) as fr:
with self.firewall_policy(shared=False) as fwp:
fw_rule_ids = [fr['firewall_rule']['id']]
# update public policy with public attr and nonpublic rule
data = {'firewall_policy': {'public': True,
# update shared policy with shared attr and nonshared rule
data = {'firewall_policy': {'shared': SHARED,
'firewall_rules': fw_rule_ids}}
req = self.new_update_request('firewall_policies', data,
fwp['firewall_policy']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_update_firewall_policy_with_public_attr_exist_unshare_rule(self):
with self.firewall_rule(name='fwr1', public=False) as fr:
def test_update_firewall_policy_with_shared_attr_exist_unshare_rule(self):
with self.firewall_rule(name='fwr1', shared=False) as fr:
fw_rule_ids = [fr['firewall_rule']['id']]
with self.firewall_policy(public=False,
with self.firewall_policy(shared=False,
firewall_rules=fw_rule_ids) as fwp:
# update policy with public attr
data = {'firewall_policy': {'public': True}}
# update policy with shared attr
data = {'firewall_policy': {'shared': SHARED}}
req = self.new_update_request('firewall_policies', data,
fwp['firewall_policy']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_update_firewall_policy_assoc_with_other_tenant_firewall(self):
with self.firewall_policy(public=True, tenant_id='tenant1') as fwp:
with self.firewall_policy(shared=SHARED, tenant_id='tenant1') as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall_group(ingress_firewall_policy_id=fwp_id,
egress_firewall_policy_id=fwp_id):
data = {'firewall_policy': {'public': False}}
data = {'firewall_policy': {'shared': False}}
req = self.new_update_request('firewall_policies', data,
fwp['firewall_policy']['id'])
res = req.get_response(self.ext_api)
@ -1012,11 +1009,10 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
@testtools.skip('bug/1614680')
def test_update_firewall_rule_associated_with_other_tenant_policy(self):
with self.firewall_rule(public=True, tenant_id='tenant1') as fwr:
with self.firewall_rule(shared=SHARED, tenant_id='tenant1') as fwr:
fwr_id = [fwr['firewall_rule']['id']]
with self.firewall_policy(public=False,
firewall_rules=fwr_id):
data = {'firewall_rule': {'public': False}}
with self.firewall_policy(shared=False, firewall_rules=fwr_id):
data = {'firewall_rule': {'shared': False}}
req = self.new_update_request('firewall_rules', data,
fwr['firewall_rule']['id'])
res = req.get_response(self.ext_api)
@ -1107,7 +1103,7 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
fmt = self.fmt
fwg_name = "firewall1"
description = "my_firewall1"
with self.firewall_policy(public=False, tenant_id='tenant2') as fwp:
with self.firewall_policy(shared=False, tenant_id='tenant2') as fwp:
fwp_id = fwp['firewall_policy']['id']
ctx = context.Context('not_admin', 'tenant1')
self._create_firewall_group(fmt, fwg_name,
@ -1121,7 +1117,7 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
fmt = self.fmt
fwg_name = "firewall1"
description = "my_firewall1"
with self.firewall_policy(public=False, tenant_id='tenant2') as fwp:
with self.firewall_policy(shared=False, tenant_id='tenant2') as fwp:
fwp_id = fwp['firewall_policy']['id']
ctx = context.get_admin_context()
self._create_firewall_group(fmt, fwg_name,
@ -1130,8 +1126,8 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
context=ctx,
expected_res_status=409)
def test_create_firewall_group_with_admin_and_fwp_is_public(self):
fwg_name = "fw_with_public_fwp"
def test_create_firewall_group_with_admin_and_fwp_is_shared(self):
fwg_name = "fw_with_shared_fwp"
with self.firewall_policy(tenant_id="tenantX") as fwp:
fwp_id = fwp['firewall_policy']['id']
ctx = context.get_admin_context()
@ -1217,10 +1213,10 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
def test_update_firewall_group_with_fwp(self):
ctx = context.Context('not_admin', 'tenant1')
with self.firewall_policy(
name='p1', tenant_id='tenant1', public=False) as fwp1, \
self.firewall_policy(
name='p2', tenant_id='tenant1', public=False) as fwp2, \
with self.firewall_policy(name='p1', tenant_id='tenant1',
shared=False) as fwp1, \
self.firewall_policy(name='p2', tenant_id='tenant1',
shared=False) as fwp2, \
self.firewall_group(
ingress_firewall_policy_id=fwp1['firewall_policy']['id'],
egress_firewall_policy_id=fwp2['firewall_policy']['id'],
@ -1234,10 +1230,12 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
self.assertEqual(200, res.status_int)
@testtools.skip('bug/1614680')
def test_update_firewall_group_with_public_fwp(self):
def test_update_firewall_group_with_shared_fwp(self):
ctx = context.Context('not_admin', 'tenant1')
with self.firewall_policy(name='p1', tenant_id='tenant1', public=True) as fwp1, \
self.firewall_policy(name='p2', tenant_id='tenant2', public=True) as fwp2, \
with self.firewall_policy(name='p1', tenant_id='tenant1',
shared=True) as fwp1, \
self.firewall_policy(name='p2', tenant_id='tenant2',
shared=True) as fwp2, \
self.firewall_group(
ingress_firewall_policy_id=fwp1['firewall_policy']['id'],
egress_firewall_policy_id=fwp1['firewall_policy']['id'],
@ -1253,8 +1251,8 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
def test_update_firewall_group_with_admin_and_fwp_different_tenant(self):
ctx = context.get_admin_context()
with self.firewall_policy() as fwp1, \
self.firewall_policy(
tenant_id='tenant2', public=False) as fwp2, \
self.firewall_policy(tenant_id='tenant2',
shared=False) as fwp2, \
self.firewall_group(
ingress_firewall_policy_id=fwp1['firewall_policy']['id'],
egress_firewall_policy_id=fwp1['firewall_policy']['id'],
@ -1270,7 +1268,7 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
def test_update_firewall_group_fwp_not_found_on_different_tenant(self):
with self.firewall_policy(name='fwp1', tenant_id='tenant1',
do_delete=False) as fwp1, \
self.firewall_policy(name='fwp2', public=False,
self.firewall_policy(name='fwp2', shared=False,
tenant_id='tenant2') as fwp2:
fwps = [fwp1, fwp2]
@ -1383,7 +1381,7 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
fwp_id = fwp['firewall_policy']['id']
msg = "Operation cannot be performed since Firewall Rule " \
"{0} is already associated with FirewallPolicy " \
"{1}".format(fwr_id, fwp_id)
"{1}.".format(fwr_id, fwp_id)
result = self._rule_action(
'insert', fwp_id, fwr_id,
insert_before=None,
@ -1446,7 +1444,7 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
expected_body=None)
def test_insert_rule_for_policy_of_other_tenant(self):
with self.firewall_rule(tenant_id='tenant-2', public=False) as fwr:
with self.firewall_rule(tenant_id='tenant-2', shared=False) as fwr:
fwr_id = fwr['firewall_rule']['id']
with self.firewall_policy(name='firewall_policy') as fwp:
fwp_id = fwp['firewall_policy']['id']
@ -1615,8 +1613,7 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
self.assertEqual('firewall_Rule1', res['firewall_rule']['name'])
def test_show_firewall_policy_by_name(self):
with self.firewall_policy(
name='firewall_Policy1') as fw_policy:
with self.firewall_policy(name='firewall_Policy1') as fw_policy:
res = self._show('firewall_policies',
fw_policy['firewall_policy']['id'])
self.assertEqual(

View File

@ -50,7 +50,6 @@ class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
project_id = _uuid()
data = {'firewall_rule': {'description': 'descr_firewall_rule1',
'name': 'rule1',
'public': False,
'protocol': 'tcp',
'ip_version': 4,
'source_ip_address': '192.168.0.1',
@ -59,7 +58,8 @@ class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
'destination_port': dst_port,
'action': 'allow',
'enabled': True,
'tenant_id': project_id}}
'tenant_id': project_id,
'shared': False}}
expected_ret_val = copy.copy(data['firewall_rule'])
expected_ret_val['source_port'] = str(src_port)
expected_ret_val['destination_port'] = str(dst_port)
@ -87,7 +87,6 @@ class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
def test_create_firewall_rule_invalid_long_name(self):
data = {'firewall_rule': {'description': 'descr_firewall_rule1',
'name': _long_name,
'public': False,
'protocol': 'tcp',
'ip_version': 4,
'source_ip_address': '192.168.0.1',
@ -96,7 +95,8 @@ class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
'destination_port': 1,
'action': 'allow',
'enabled': True,
'tenant_id': _uuid()}}
'tenant_id': _uuid(),
'shared': False}}
res = self.api.post(_get_path('fwaas/firewall_rules', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
@ -106,7 +106,6 @@ class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
def test_create_firewall_rule_invalid_long_description(self):
data = {'firewall_rule': {'description': _long_description,
'name': 'rule1',
'public': False,
'protocol': 'tcp',
'ip_version': 4,
'source_ip_address': '192.168.0.1',
@ -115,7 +114,8 @@ class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
'destination_port': 1,
'action': 'allow',
'enabled': True,
'tenant_id': _uuid()}}
'tenant_id': _uuid(),
'shared': False}}
res = self.api.post(_get_path('fwaas/firewall_rules', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
@ -126,7 +126,6 @@ class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
def test_create_firewall_rule_invalid_long_tenant_id(self):
data = {'firewall_rule': {'description': 'desc',
'name': 'rule1',
'public': False,
'protocol': 'tcp',
'ip_version': 4,
'source_ip_address': '192.168.0.1',
@ -135,7 +134,8 @@ class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
'destination_port': 1,
'action': 'allow',
'enabled': True,
'tenant_id': _long_tenant}}
'tenant_id': _long_tenant,
'shared': False}}
res = self.api.post(_get_path('fwaas/firewall_rules', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
@ -206,10 +206,10 @@ class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
project_id = _uuid()
data = {'firewall_policy': {'description': 'descr_firewall_policy1',
'name': 'new_fw_policy1',
'public': False,
'firewall_rules': [_uuid(), _uuid()],
'audited': False,
'tenant_id': project_id}}
'tenant_id': project_id,
'shared': False}}
return_value = copy.copy(data['firewall_policy'])
return_value.update({'id': policy_id})
@ -228,10 +228,10 @@ class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
def test_create_firewall_policy_invalid_long_name(self):
data = {'firewall_policy': {'description': 'descr_firewall_policy1',
'name': _long_name,
'public': False,
'firewall_rules': [_uuid(), _uuid()],
'audited': False,
'tenant_id': _uuid()}}
'tenant_id': _uuid(),
'shared': False}}
res = self.api.post(_get_path('fwaas/firewall_policies',
fmt=self.fmt),
self.serialize(data),
@ -242,10 +242,10 @@ class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
def test_create_firewall_policy_invalid_long_description(self):
data = {'firewall_policy': {'description': _long_description,
'name': 'new_fw_policy1',
'public': False,
'firewall_rules': [_uuid(), _uuid()],
'audited': False,
'tenant_id': _uuid()}}
'tenant_id': _uuid(),
'shared': False}}
res = self.api.post(_get_path('fwaas/firewall_policies',
fmt=self.fmt),
self.serialize(data),
@ -257,10 +257,10 @@ class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
def test_create_firewall_policy_invalid_long_tenant_id(self):
data = {'firewall_policy': {'description': 'desc',
'name': 'new_fw_policy1',
'public': False,
'firewall_rules': [_uuid(), _uuid()],
'audited': False,
'tenant_id': _long_tenant}}
'tenant_id': _long_tenant,
'shared': False}}
res = self.api.post(_get_path('fwaas/firewall_policies',
fmt=self.fmt),
self.serialize(data),
@ -399,11 +399,11 @@ class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
data = {'firewall_group': {'description': 'fake_description',
'name': 'fake_name',
'tenant_id': 'fake-tenant_id',
'public': False,
'ingress_firewall_policy_id': None,
'egress_firewall_policy_id': None,
'admin_state_up': True,
'ports': []}}
'ports': [],
'shared': False}}
data['firewall_group'].update(target)
res = self.api.post(_get_path('fwaas/firewall_groups',
fmt=self.fmt),