Merge "Fix pep8 new warnings and pip install error" into stable/queens

This commit is contained in:
Zuul 2018-04-30 18:01:31 +00:00 committed by Gerrit Code Review
commit 718cb9d13c
13 changed files with 43 additions and 716 deletions

View File

@ -20,13 +20,12 @@ Revises: start_neutron_fwaas
Create Date: 2015-02-10 17:17:47.846764
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = '4202e3047e47'
down_revision = 'start_neutron_fwaas'
from alembic import op
TABLES = ['firewall_rules', 'firewalls', 'firewall_policies']

View File

@ -21,14 +21,14 @@ Create Date: 2015-02-06 17:02:24.279337
"""
# revision identifiers, used by Alembic.
revision = '540142f314f4'
down_revision = '4202e3047e47'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.engine import reflection
# revision identifiers, used by Alembic.
revision = '540142f314f4'
down_revision = '4202e3047e47'
SQL_STATEMENT = (
"insert into firewall_router_associations "
"select "

View File

@ -21,15 +21,14 @@ Create Date: 2015-02-02 13:11:55.184112
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '796c68dffbb'
down_revision = '540142f314f4'
from alembic import op
import sqlalchemy as sa
def upgrade(active_plugins=None, options=None):
op.create_table('cisco_firewall_associations',

View File

@ -19,14 +19,14 @@ Create Date: 2017-03-31 14:22:21.063392
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'fd38cd995cc0'
down_revision = 'f83a0b2964d0'
depends_on = ('d6a12e637e28',)
from alembic import op
import sqlalchemy as sa
def upgrade():
op.alter_column('firewall_rules_v2', 'public', new_column_name='shared',

View File

@ -21,10 +21,6 @@ Create Date: 2017-01-26 23:47:42.795504
"""
# revision identifiers, used by Alembic.
revision = '876782258a43'
down_revision = 'd6a12e637e28'
from alembic import op
from neutron_lib.db import constants as db_constants
from neutron_lib import exceptions
@ -34,6 +30,10 @@ from neutron_fwaas._i18n import _
from neutron_fwaas.common import fwaas_constants as const
from neutron_fwaas.common import resources
# revision identifiers, used by Alembic.
revision = '876782258a43'
down_revision = 'd6a12e637e28'
class DuplicateDefaultFirewallGroup(exceptions.Conflict):
message = _("Duplicate Firewall group found named '%s'. "

View File

@ -21,16 +21,16 @@ Create Date: 2017-11-08 15:55:40.990272
"""
# revision identifiers, used by Alembic.
revision = 'f24e0d5e5bff'
down_revision = '876782258a43'
from alembic import op
from neutron_lib import exceptions
import sqlalchemy as sa
from neutron._i18n import _
# revision identifiers, used by Alembic.
revision = 'f24e0d5e5bff'
down_revision = '876782258a43'
fwg_port_association = sa.Table(
'firewall_group_port_associations_v2', sa.MetaData(),

View File

@ -363,9 +363,9 @@ class IptablesFwaasDriver(fwaas_base.FwaasDriverBase):
# iptables adds '-m protocol' when any source
# or destination port number is specified
if not((rule.get('source_port') is None)
and (rule.get('destination_port') is None)):
args += self._match_arg(rule.get('protocol'))
if (rule.get('source_port') is not None or
rule.get('destination_port') is not None):
args += self._match_arg(rule.get('protocol'))
args += self._port_arg('sport',
rule.get('protocol'),
@ -414,8 +414,7 @@ class IptablesFwaasDriver(fwaas_base.FwaasDriverBase):
return args
def _port_arg(self, direction, protocol, port):
if (protocol not in ['udp', 'tcp']
or port is None):
if protocol not in ['udp', 'tcp'] or port is None:
return []
args = ['--%s' % direction, '%s' % port]

View File

@ -394,8 +394,8 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
# iptables adds '-m protocol' when any source
# or destination port number is specified
if not((rule.get('source_port') is None)
and (rule.get('destination_port') is None)):
if (rule.get('source_port') is not None or
rule.get('destination_port') is not None):
args += self._match_arg(rule.get('protocol'))
args += self._port_arg('sport',
@ -445,8 +445,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
return args
def _port_arg(self, direction, protocol, port):
if (protocol not in ['udp', 'tcp']
or port is None):
if protocol not in ['udp', 'tcp'] or port is None:
return []
args = ['--%s' % direction, '%s' % port]

View File

@ -244,8 +244,8 @@ class FirewallPluginV2(
raise f_exc.FirewallGroupPortInvalidProject(
port_id=port_id, project_id=port_db['tenant_id'])
device_owner = port_db.get('device_owner', '')
if (device_owner not in [nl_constants.DEVICE_OWNER_ROUTER_INTF]
and not device_owner.startswith(
if (device_owner not in [nl_constants.DEVICE_OWNER_ROUTER_INTF] and
not device_owner.startswith(
nl_constants.DEVICE_OWNER_COMPUTE_PREFIX)):
raise f_exc.FirewallGroupPortInvalid(port_id=port_id)
if (device_owner.startswith(

View File

@ -28,8 +28,8 @@ class BaseTestCase(n_base.BaseTestCase):
class NeutronDbPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase):
def setup_config(self):
## Copied from neutron's test_db_base_plugin_v2 because they
## don't allow to specify args
# Copied from neutron's test_db_base_plugin_v2 because they
# don't allow to specify args
# Create the default configurations
args = ['--config-file', n_base.etcdir('neutron.conf')]
@ -37,7 +37,7 @@ class NeutronDbPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase):
for config_file in test_lib.test_config.get('config_files', []):
args.extend(['--config-file', config_file])
## our own stuff
# our own stuff
dirpath = os.path.join(os.path.dirname(__file__),
'etc/neutron/policy.d')
args.extend(['--config-dir', dirpath])

View File

@ -87,7 +87,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
cls.volumes_client = cls.os_primary.volumes_client
cls.snapshots_client = cls.os_primary.snapshots_client
# ## Test functions library
# Test functions library
#
# The create_[resource] functions only return body and discard the
# resp part which is not used in scenario tests
@ -680,8 +680,9 @@ class ScenarioTest(tempest.test.BaseTestCase):
addresses = (server['addresses'][network['name']]
if network else [])
for address in addresses:
if (address['version'] == CONF.validation.ip_version_for_ssh
and address['OS-EXT-IPS:type'] == 'fixed'):
if (address['version'] ==
CONF.validation.ip_version_for_ssh and
address['OS-EXT-IPS:type'] == 'fixed'):
return address['addr']
raise exceptions.ServerUnreachable(server_id=server['id'])
else:
@ -813,8 +814,8 @@ class NetworkScenarioTest(ScenarioTest):
port_map = [(p["id"], fxip["ip_address"])
for p in ports
for fxip in p["fixed_ips"]
if netutils.is_valid_ipv4(fxip["ip_address"])
and p['status'] in p_status]
if (netutils.is_valid_ipv4(fxip["ip_address"]) and
p['status'] in p_status)]
inactive = [p for p in ports if p['status'] != 'ACTIVE']
if inactive:
LOG.warning("Instance has ports that are not ACTIVE: %s", inactive)

View File

@ -1,675 +0,0 @@
# Copyright 2013 Big Switch Networks, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
from neutron.tests import base
from neutron.tests.unit.api.v2 import test_base as test_api_v2
from neutron.tests.unit.extensions import base as test_api_v2_extension
from neutron_lib.db import constants as db_const
from neutron_lib.exceptions import firewall_v1 as f_exc
from oslo_utils import uuidutils
from webob import exc
import webtest
from neutron_fwaas.common import fwaas_constants
from neutron_fwaas.extensions import firewall
_uuid = uuidutils.generate_uuid
_get_path = test_api_v2._get_path
_long_name = 'x' * (db_const.NAME_FIELD_SIZE + 1)
_long_description = 'y' * (db_const.DESCRIPTION_FIELD_SIZE + 1)
class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
fmt = 'json'
def setUp(self):
super(FirewallExtensionTestCase, self).setUp()
plural_mappings = {'firewall_policy': 'firewall_policies'}
self._setUpExtension(
'neutron_fwaas.extensions.firewall.FirewallPluginBase',
fwaas_constants.FIREWALL, firewall.RESOURCE_ATTRIBUTE_MAP,
firewall.Firewall, 'fw', plural_mappings=plural_mappings)
def test_create_firewall(self):
fw_id = _uuid()
project_id = _uuid()
data = {'firewall': {'description': 'descr_firewall1',
'name': 'firewall1',
'admin_state_up': True,
'firewall_policy_id': _uuid(),
'shared': False,
'project_id': project_id,
'tenant_id': project_id}}
return_value = copy.copy(data['firewall'])
return_value.update({'id': fw_id})
# since 'shared' is hidden
del return_value['shared']
instance = self.plugin.return_value
instance.create_firewall.return_value = return_value
res = self.api.post(_get_path('fw/firewalls', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt)
instance.create_firewall.assert_called_with(mock.ANY,
firewall=data)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall', res)
self.assertEqual(return_value, res['firewall'])
def test_create_firewall_invalid_long_name(self):
project_id = _uuid()
data = {'firewall': {'description': 'descr_firewall1',
'name': _long_name,
'admin_state_up': True,
'firewall_policy_id': _uuid(),
'shared': False,
'project_id': project_id,
'tenant_id': project_id}}
res = self.api.post(_get_path('fw/firewalls', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
status=exc.HTTPBadRequest.code)
self.assertIn('Invalid input for name', res.body.decode('utf-8'))
def test_create_firewall_invalid_long_description(self):
project_id = _uuid()
data = {'firewall': {'description': _long_description,
'name': 'firewall1',
'admin_state_up': True,
'firewall_policy_id': _uuid(),
'shared': False,
'project_id': project_id,
'tenant_id': project_id}}
res = self.api.post(_get_path('fw/firewalls', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
status=exc.HTTPBadRequest.code)
self.assertIn('Invalid input for description',
res.body.decode('utf-8'))
def test_firewall_list(self):
fw_id = _uuid()
return_value = [{'tenant_id': _uuid(),
'id': fw_id}]
instance = self.plugin.return_value
instance.get_firewalls.return_value = return_value
res = self.api.get(_get_path('fw/firewalls', fmt=self.fmt))
instance.get_firewalls.assert_called_with(mock.ANY,
fields=mock.ANY,
filters=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
def test_firewall_get(self):
fw_id = _uuid()
return_value = {'tenant_id': _uuid(),
'id': fw_id}
instance = self.plugin.return_value
instance.get_firewall.return_value = return_value
res = self.api.get(_get_path('fw/firewalls',
id=fw_id, fmt=self.fmt))
instance.get_firewall.assert_called_with(mock.ANY,
fw_id,
fields=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall', res)
self.assertEqual(return_value, res['firewall'])
def test_firewall_update(self):
fw_id = _uuid()
update_data = {'firewall': {'name': 'new_name'}}
return_value = {'tenant_id': _uuid(),
'id': fw_id}
instance = self.plugin.return_value
instance.update_firewall.return_value = return_value
res = self.api.put(_get_path('fw/firewalls', id=fw_id,
fmt=self.fmt),
self.serialize(update_data))
instance.update_firewall.assert_called_with(mock.ANY, fw_id,
firewall=update_data)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall', res)
self.assertEqual(return_value, res['firewall'])
def test_firewall_delete(self):
self._test_entity_delete('firewall')
def _test_create_firewall_rule(self, src_port, dst_port):
rule_id = _uuid()
project_id = _uuid()
data = {'firewall_rule': {'description': 'descr_firewall_rule1',
'name': 'rule1',
'shared': False,
'protocol': 'tcp',
'ip_version': 4,
'source_ip_address': '192.168.0.1',
'destination_ip_address': '127.0.0.1',
'source_port': src_port,
'destination_port': dst_port,
'action': 'allow',
'enabled': True,
'project_id': project_id,
'tenant_id': project_id}}
expected_ret_val = copy.copy(data['firewall_rule'])
expected_ret_val['source_port'] = str(src_port)
expected_ret_val['destination_port'] = str(dst_port)
expected_call_args = copy.copy(expected_ret_val)
expected_ret_val['id'] = rule_id
instance = self.plugin.return_value
instance.create_firewall_rule.return_value = expected_ret_val
res = self.api.post(_get_path('fw/firewall_rules', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt)
instance.create_firewall_rule.assert_called_with(
mock.ANY,
firewall_rule={'firewall_rule': expected_call_args})
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall_rule', res)
self.assertEqual(expected_ret_val, res['firewall_rule'])
def test_create_firewall_rule_with_integer_ports(self):
self._test_create_firewall_rule(1, 10)
def test_create_firewall_rule_with_string_ports(self):
self._test_create_firewall_rule('1', '10')
def test_create_firewall_rule_with_port_range(self):
self._test_create_firewall_rule('1:20', '30:40')
def test_create_firewall_rule_invalid_long_name(self):
project_id = _uuid()
data = {'firewall_rule': {'description': 'descr_firewall_rule1',
'name': _long_name,
'shared': False,
'protocol': 'tcp',
'ip_version': 4,
'source_ip_address': '192.168.0.1',
'destination_ip_address': '127.0.0.1',
'source_port': 1,
'destination_port': 1,
'action': 'allow',
'enabled': True,
'project_id': project_id,
'tenant_id': project_id}}
res = self.api.post(_get_path('fw/firewall_rules', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
status=exc.HTTPBadRequest.code)
self.assertIn('Invalid input for name', res.body.decode('utf-8'))
def test_create_firewall_rule_invalid_long_description(self):
project_id = _uuid()
data = {'firewall_rule': {'description': _long_description,
'name': 'rule1',
'shared': False,
'protocol': 'tcp',
'ip_version': 4,
'source_ip_address': '192.168.0.1',
'destination_ip_address': '127.0.0.1',
'source_port': 1,
'destination_port': 1,
'action': 'allow',
'enabled': True,
'project_id': project_id,
'tenant_id': project_id}}
res = self.api.post(_get_path('fw/firewall_rules', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
status=exc.HTTPBadRequest.code)
self.assertIn('Invalid input for description',
res.body.decode('utf-8'))
def test_firewall_rule_list(self):
rule_id = _uuid()
return_value = [{'tenant_id': _uuid(),
'id': rule_id}]
instance = self.plugin.return_value
instance.get_firewall_rules.return_value = return_value
res = self.api.get(_get_path('fw/firewall_rules', fmt=self.fmt))
instance.get_firewall_rules.assert_called_with(mock.ANY,
fields=mock.ANY,
filters=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
def test_firewall_rule_get(self):
rule_id = _uuid()
return_value = {'tenant_id': _uuid(),
'id': rule_id}
instance = self.plugin.return_value
instance.get_firewall_rule.return_value = return_value
res = self.api.get(_get_path('fw/firewall_rules',
id=rule_id, fmt=self.fmt))
instance.get_firewall_rule.assert_called_with(mock.ANY,
rule_id,
fields=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall_rule', res)
self.assertEqual(return_value, res['firewall_rule'])
def test_firewall_rule_update(self):
rule_id = _uuid()
update_data = {'firewall_rule': {'action': 'deny'}}
return_value = {'tenant_id': _uuid(),
'id': rule_id}
instance = self.plugin.return_value
instance.update_firewall_rule.return_value = return_value
res = self.api.put(_get_path('fw/firewall_rules', id=rule_id,
fmt=self.fmt),
self.serialize(update_data))
instance.update_firewall_rule.assert_called_with(
mock.ANY,
rule_id,
firewall_rule=update_data)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall_rule', res)
self.assertEqual(return_value, res['firewall_rule'])
def test_firewall_rule_delete(self):
self._test_entity_delete('firewall_rule')
def test_create_firewall_policy(self):
policy_id = _uuid()
project_id = _uuid()
data = {'firewall_policy': {'description': 'descr_firewall_policy1',
'name': 'new_fw_policy1',
'shared': False,
'firewall_rules': [_uuid(), _uuid()],
'audited': False,
'project_id': project_id,
'tenant_id': project_id}}
return_value = copy.copy(data['firewall_policy'])
return_value.update({'id': policy_id})
instance = self.plugin.return_value
instance.create_firewall_policy.return_value = return_value
res = self.api.post(_get_path('fw/firewall_policies',
fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt)
instance.create_firewall_policy.assert_called_with(
mock.ANY,
firewall_policy=data)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall_policy', res)
self.assertEqual(return_value, res['firewall_policy'])
def test_create_firewall_policy_invalid_long_name(self):
project_id = _uuid()
data = {'firewall_policy': {'description': 'descr_firewall_policy1',
'name': _long_name,
'shared': False,
'firewall_rules': [_uuid(), _uuid()],
'audited': False,
'project_id': project_id,
'tenant_id': project_id}}
res = self.api.post(_get_path('fw/firewall_policies',
fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
status=exc.HTTPBadRequest.code)
self.assertIn('Invalid input for name', res.body.decode('utf-8'))
def test_create_firewall_policy_invalid_long_description(self):
project_id = _uuid()
data = {'firewall_policy': {'description': _long_description,
'name': 'new_fw_policy1',
'shared': False,
'firewall_rules': [_uuid(), _uuid()],
'audited': False,
'project_id': project_id,
'tenant_id': project_id}}
res = self.api.post(_get_path('fw/firewall_policies',
fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
status=exc.HTTPBadRequest.code)
self.assertIn('Invalid input for description',
res.body.decode('utf-8'))
def test_firewall_policy_list(self):
policy_id = _uuid()
return_value = [{'tenant_id': _uuid(),
'id': policy_id}]
instance = self.plugin.return_value
instance.get_firewall_policies.return_value = return_value
res = self.api.get(_get_path('fw/firewall_policies',
fmt=self.fmt))
instance.get_firewall_policies.assert_called_with(mock.ANY,
fields=mock.ANY,
filters=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
def test_firewall_policy_get(self):
policy_id = _uuid()
return_value = {'tenant_id': _uuid(),
'id': policy_id}
instance = self.plugin.return_value
instance.get_firewall_policy.return_value = return_value
res = self.api.get(_get_path('fw/firewall_policies',
id=policy_id, fmt=self.fmt))
instance.get_firewall_policy.assert_called_with(mock.ANY,
policy_id,
fields=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall_policy', res)
self.assertEqual(return_value, res['firewall_policy'])
def test_firewall_policy_update(self):
policy_id = _uuid()
update_data = {'firewall_policy': {'audited': True}}
return_value = {'tenant_id': _uuid(),
'id': policy_id}
instance = self.plugin.return_value
instance.update_firewall_policy.return_value = return_value
res = self.api.put(_get_path('fw/firewall_policies',
id=policy_id,
fmt=self.fmt),
self.serialize(update_data))
instance.update_firewall_policy.assert_called_with(
mock.ANY,
policy_id,
firewall_policy=update_data)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall_policy', res)
self.assertEqual(return_value, res['firewall_policy'])
def test_firewall_policy_update_malformed_rules(self):
# emulating client request when no rule uuids are provided for
# --firewall_rules parameter
update_data = {'firewall_policy': {'firewall_rules': True}}
# have to check for generic AppError
self.assertRaises(
webtest.AppError,
self.api.put,
_get_path('fw/firewall_policies', id=_uuid(), fmt=self.fmt),
self.serialize(update_data))
def test_firewall_policy_delete(self):
self._test_entity_delete('firewall_policy')
def test_firewall_policy_insert_rule(self):
firewall_policy_id = _uuid()
firewall_rule_id = _uuid()
ref_firewall_rule_id = _uuid()
insert_data = {'firewall_rule_id': firewall_rule_id,
'insert_before': ref_firewall_rule_id,
'insert_after': None}
return_value = {'firewall_policy':
{'tenant_id': _uuid(),
'id': firewall_policy_id,
'firewall_rules': [ref_firewall_rule_id,
firewall_rule_id]}}
instance = self.plugin.return_value
instance.insert_rule.return_value = return_value
path = _get_path('fw/firewall_policies', id=firewall_policy_id,
action="insert_rule",
fmt=self.fmt)
res = self.api.put(path, self.serialize(insert_data))
instance.insert_rule.assert_called_with(mock.ANY, firewall_policy_id,
insert_data)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertEqual(return_value, res)
def test_firewall_policy_remove_rule(self):
firewall_policy_id = _uuid()
firewall_rule_id = _uuid()
remove_data = {'firewall_rule_id': firewall_rule_id}
return_value = {'firewall_policy':
{'tenant_id': _uuid(),
'id': firewall_policy_id,
'firewall_rules': []}}
instance = self.plugin.return_value
instance.remove_rule.return_value = return_value
path = _get_path('fw/firewall_policies', id=firewall_policy_id,
action="remove_rule",
fmt=self.fmt)
res = self.api.put(path, self.serialize(remove_data))
instance.remove_rule.assert_called_with(mock.ANY, firewall_policy_id,
remove_data)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertEqual(return_value, res)
class TestFirewallAttributeValidators(base.BaseTestCase):
def test_validate_port_range(self):
msg = firewall._validate_port_range(None)
self.assertIsNone(msg)
msg = firewall._validate_port_range('10')
self.assertIsNone(msg)
msg = firewall._validate_port_range(10)
self.assertIsNone(msg)
msg = firewall._validate_port_range(-1)
self.assertEqual("Invalid port '-1'", msg)
msg = firewall._validate_port_range('66000')
self.assertEqual("Invalid port '66000'", msg)
msg = firewall._validate_port_range('10:20')
self.assertIsNone(msg)
msg = firewall._validate_port_range('1:65535')
self.assertIsNone(msg)
msg = firewall._validate_port_range('0:65535')
self.assertEqual("Invalid port '0'", msg)
msg = firewall._validate_port_range('1:65536')
self.assertEqual("Invalid port '65536'", msg)
msg = firewall._validate_port_range('abc:efg')
self.assertEqual("Port 'abc' is not a valid number", msg)
msg = firewall._validate_port_range('1:efg')
self.assertEqual("Port 'efg' is not a valid number", msg)
msg = firewall._validate_port_range('-1:10')
self.assertEqual("Invalid port '-1'", msg)
msg = firewall._validate_port_range('66000:10')
self.assertEqual("Invalid port '66000'", msg)
msg = firewall._validate_port_range('10:66000')
self.assertEqual("Invalid port '66000'", msg)
msg = firewall._validate_port_range('1:-10')
self.assertEqual("Invalid port '-10'", msg)
def test_validate_ip_or_subnet_or_none(self):
msg = firewall._validate_ip_or_subnet_or_none(None)
self.assertIsNone(msg)
msg = firewall._validate_ip_or_subnet_or_none('1.1.1.1')
self.assertIsNone(msg)
msg = firewall._validate_ip_or_subnet_or_none('1.1.1.0/24')
self.assertIsNone(msg)
ip_addr = '1111.1.1.1'
msg = firewall._validate_ip_or_subnet_or_none(ip_addr)
self.assertEqual(
("'%s' is not a valid IP address and "
"'%s' is not a valid IP subnet")
% (ip_addr,
ip_addr),
msg)
ip_addr = '1.1.1.1 has whitespace'
msg = firewall._validate_ip_or_subnet_or_none(ip_addr)
self.assertEqual(
("'%s' is not a valid IP address and "
"'%s' is not a valid IP subnet") % (ip_addr,
ip_addr),
msg)
ip_addr = '111.1.1.1\twhitespace'
msg = firewall._validate_ip_or_subnet_or_none(ip_addr)
self.assertEqual(
("'%s' is not a valid IP address and "
"'%s' is not a valid IP subnet") % (ip_addr,
ip_addr),
msg)
ip_addr = '111.1.1.1\nwhitespace'
msg = firewall._validate_ip_or_subnet_or_none(ip_addr)
self.assertEqual(
("'%s' is not a valid IP address and "
"'%s' is not a valid IP subnet") % (ip_addr,
ip_addr),
msg)
# Valid - IPv4
cidr = "10.0.2.0/24"
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
self.assertIsNone(msg)
# Valid - IPv6 without final octets
cidr = "fe80::/24"
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
self.assertIsNone(msg)
# Valid - IPv6 with final octets
cidr = "fe80::0/24"
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
self.assertIsNone(msg)
cidr = "fe80::"
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
self.assertIsNone(msg)
# Invalid - IPv6 with final octets, missing mask
cidr = "fe80::0"
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
self.assertIsNone(msg)
# Invalid - Address format error
cidr = 'invalid'
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
self.assertEqual(
("'%s' is not a valid IP address and "
"'%s' is not a valid IP subnet") % (cidr,
cidr),
msg)
class TestFirewallConvertProtocols(base.BaseTestCase):
def test_convert_protocol_string_integer(self):
res = firewall.convert_protocol("0")
self.assertEqual(0, res)
res = firewall.convert_protocol("255")
self.assertEqual(255, res)
def test_convert_protocol_digit(self):
res = firewall.convert_protocol(0)
self.assertEqual(0, res)
res = firewall.convert_protocol(255)
self.assertEqual(255, res)
def test_convert_protocol_another_types(self):
res = lambda: firewall.convert_protocol(['abc'])
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
res = lambda: firewall.convert_protocol({1: 'foo'})
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
res = lambda: firewall.convert_protocol((1, 100))
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
res = lambda: firewall.convert_protocol(object)
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
def test_convert_protocol_invalid_digit(self):
res = lambda: firewall.convert_protocol("-1")
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
res = lambda: firewall.convert_protocol("256")
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
def test_convert_protocol_name(self):
res = firewall.convert_protocol("tcp")
self.assertEqual("tcp", res)
res = firewall.convert_protocol("UDP")
self.assertEqual("udp", res)
res = firewall.convert_protocol("Icmp")
self.assertEqual("icmp", res)
def test_convert_protocol_invalid_name(self):
res = lambda: firewall.convert_protocol("foo")
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
class TestConvertActionToCaseInsensitive(base.BaseTestCase):
def test_convert_action_to_case_insensitive_none(self):
res = firewall.convert_action_to_case_insensitive(None)
self.assertIsNone(res)
def test_convert_action_to_case_insensitive_value(self):
res = firewall.convert_action_to_case_insensitive("foo")
self.assertEqual("foo", res)
res = firewall.convert_action_to_case_insensitive("Bar")
self.assertEqual("bar", res)
res = firewall.convert_action_to_case_insensitive("BAZ")
self.assertEqual("baz", res)

View File

@ -24,6 +24,8 @@ set -x
install_cmd="pip install -c$1"
shift
PARAMS="$*"
# The devstack based functional tests have neutron checked out in
# $NEUTRON_DIR on the test systems - with the change to test in it.
# Use this directory if it exists, so that this script installs the
@ -66,5 +68,8 @@ else
$install_cmd -U -e $SRC_DIR
fi
$install_cmd -U $*
if [ -n "${PARAMS}" ]; then
$install_cmd -U ${PARAMS}
fi
exit $?