Fix pep8 new warnings and pip install error

This patch is a squash of two fix that are needed together to have a
successful zuul run.

1. pep8 fix (cherry-picked from master:
   87eb1b15be )

A recent change in pep/pycodingchecks introduced new warnings as part of
the pep8 target that causes pep8 to fail now.

This patch fixes code that issued warnings W503,E731,E266,E402

In addition, old tests which were not ran anyway, and are no longer
relevant (test_firewall) were deleted.

Conflicts:
	neutron_fwaas/tests/tempest_plugin/tests/scenario/manager.py

2. Avoid parameterless pip install

build-openstack-sphinx-docs jobs fail on all stable branches due to the
new pip version 10 introduces some previously Warning as Error: in case
of calling "pip install" without any package name, the command fails.
tox_install.sh is called during docs job without any package passed to
pip.

This patch also disables E402 hacking check because releasenotes/* violates it
(the issue is not present in Queens+ where
Ib19f7ea4ea136180f38bc78389f51b6b5d179ab8 is merged).

Conflicts:
      neutron_fwaas/db/migration/alembic_migrations/versions/queens/expand/876782258a43_create_default_firewall_groups_table.py
      neutron_fwaas/db/migration/alembic_migrations/versions/queens/expand/f24e0d5e5bff_uniq_firewallgroupportassociation0port.py
      neutron_fwaas/services/firewall/fwaas_plugin_v2.py

Change-Id: If893cf2bc136d09e09344a81fd996ac89479a135
(cherry picked from commit 58a62382e9)
This commit is contained in:
Adit Sarfaty 2018-04-11 11:04:09 +03:00 committed by Ihar Hrachyshka
parent d02659fc5d
commit 1d884eab72
11 changed files with 35 additions and 707 deletions

View File

@ -20,13 +20,12 @@ Revises: start_neutron_fwaas
Create Date: 2015-02-10 17:17:47.846764
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = '4202e3047e47'
down_revision = 'start_neutron_fwaas'
from alembic import op
TABLES = ['firewall_rules', 'firewalls', 'firewall_policies']

View File

@ -21,14 +21,14 @@ Create Date: 2015-02-06 17:02:24.279337
"""
# revision identifiers, used by Alembic.
revision = '540142f314f4'
down_revision = '4202e3047e47'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.engine import reflection
# revision identifiers, used by Alembic.
revision = '540142f314f4'
down_revision = '4202e3047e47'
SQL_STATEMENT = (
"insert into firewall_router_associations "
"select "

View File

@ -21,15 +21,14 @@ Create Date: 2015-02-02 13:11:55.184112
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '796c68dffbb'
down_revision = '540142f314f4'
from alembic import op
import sqlalchemy as sa
def upgrade(active_plugins=None, options=None):
op.create_table('cisco_firewall_associations',

View File

@ -19,14 +19,14 @@ Create Date: 2017-03-31 14:22:21.063392
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'fd38cd995cc0'
down_revision = 'f83a0b2964d0'
depends_on = ('d6a12e637e28',)
from alembic import op
import sqlalchemy as sa
def upgrade():
op.alter_column('firewall_rules_v2', 'public', new_column_name='shared',

View File

@ -379,9 +379,9 @@ class IptablesFwaasDriver(fwaas_base.FwaasDriverBase):
# iptables adds '-m protocol' when any source
# or destination port number is specified
if not((rule.get('source_port') is None)
and (rule.get('destination_port') is None)):
args += self._match_arg(rule.get('protocol'))
if (rule.get('source_port') is not None or
rule.get('destination_port') is not None):
args += self._match_arg(rule.get('protocol'))
args += self._port_arg('sport',
rule.get('protocol'),
@ -430,8 +430,7 @@ class IptablesFwaasDriver(fwaas_base.FwaasDriverBase):
return args
def _port_arg(self, direction, protocol, port):
if (protocol not in ['udp', 'tcp']
or port is None):
if protocol not in ['udp', 'tcp'] or port is None:
return []
args = ['--%s' % direction, '%s' % port]

View File

@ -431,8 +431,8 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
# iptables adds '-m protocol' when any source
# or destination port number is specified
if not((rule.get('source_port') is None)
and (rule.get('destination_port') is None)):
if (rule.get('source_port') is not None or
rule.get('destination_port') is not None):
args += self._match_arg(rule.get('protocol'))
args += self._port_arg('sport',
@ -482,8 +482,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
return args
def _port_arg(self, direction, protocol, port):
if (protocol not in ['udp', 'tcp']
or port is None):
if protocol not in ['udp', 'tcp'] or port is None:
return []
args = ['--%s' % direction, '%s' % port]

View File

@ -28,8 +28,8 @@ class BaseTestCase(n_base.BaseTestCase):
class NeutronDbPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase):
def setup_config(self):
## Copied from neutron's test_db_base_plugin_v2 because they
## don't allow to specify args
# Copied from neutron's test_db_base_plugin_v2 because they
# don't allow to specify args
# Create the default configurations
args = ['--config-file', n_base.etcdir('neutron.conf')]
@ -37,7 +37,7 @@ class NeutronDbPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase):
for config_file in test_lib.test_config.get('config_files', []):
args.extend(['--config-file', config_file])
## our own stuff
# our own stuff
dirpath = os.path.join(os.path.dirname(__file__),
'etc/neutron/policy.d')
args.extend(['--config-dir', dirpath])

View File

@ -87,7 +87,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
cls.volumes_client = cls.os_primary.volumes_client
cls.snapshots_client = cls.os_primary.snapshots_client
# ## Test functions library
# Test functions library
#
# The create_[resource] functions only return body and discard the
# resp part which is not used in scenario tests
@ -680,8 +680,9 @@ class ScenarioTest(tempest.test.BaseTestCase):
addresses = (server['addresses'][network['name']]
if network else [])
for address in addresses:
if (address['version'] == CONF.validation.ip_version_for_ssh
and address['OS-EXT-IPS:type'] == 'fixed'):
if (address['version'] ==
CONF.validation.ip_version_for_ssh and
address['OS-EXT-IPS:type'] == 'fixed'):
return address['addr']
raise exceptions.ServerUnreachable(server_id=server['id'])
else:
@ -813,8 +814,8 @@ class NetworkScenarioTest(ScenarioTest):
port_map = [(p["id"], fxip["ip_address"])
for p in ports
for fxip in p["fixed_ips"]
if netutils.is_valid_ipv4(fxip["ip_address"])
and p['status'] in p_status]
if (netutils.is_valid_ipv4(fxip["ip_address"]) and
p['status'] in p_status)]
inactive = [p for p in ports if p['status'] != 'ACTIVE']
if inactive:
LOG.warning("Instance has ports that are not ACTIVE: %s", inactive)

View File

@ -1,675 +0,0 @@
# Copyright 2013 Big Switch Networks, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
from neutron.tests import base
from neutron.tests.unit.api.v2 import test_base as test_api_v2
from neutron.tests.unit.extensions import base as test_api_v2_extension
from neutron_lib.db import constants as db_const
from neutron_lib.exceptions import firewall_v1 as f_exc
from oslo_utils import uuidutils
from webob import exc
import webtest
from neutron_fwaas.common import fwaas_constants
from neutron_fwaas.extensions import firewall
_uuid = uuidutils.generate_uuid
_get_path = test_api_v2._get_path
_long_name = 'x' * (db_const.NAME_FIELD_SIZE + 1)
_long_description = 'y' * (db_const.DESCRIPTION_FIELD_SIZE + 1)
class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
fmt = 'json'
def setUp(self):
super(FirewallExtensionTestCase, self).setUp()
plural_mappings = {'firewall_policy': 'firewall_policies'}
self._setUpExtension(
'neutron_fwaas.extensions.firewall.FirewallPluginBase',
fwaas_constants.FIREWALL, firewall.RESOURCE_ATTRIBUTE_MAP,
firewall.Firewall, 'fw', plural_mappings=plural_mappings)
def test_create_firewall(self):
fw_id = _uuid()
project_id = _uuid()
data = {'firewall': {'description': 'descr_firewall1',
'name': 'firewall1',
'admin_state_up': True,
'firewall_policy_id': _uuid(),
'shared': False,
'project_id': project_id,
'tenant_id': project_id}}
return_value = copy.copy(data['firewall'])
return_value.update({'id': fw_id})
# since 'shared' is hidden
del return_value['shared']
instance = self.plugin.return_value
instance.create_firewall.return_value = return_value
res = self.api.post(_get_path('fw/firewalls', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt)
instance.create_firewall.assert_called_with(mock.ANY,
firewall=data)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall', res)
self.assertEqual(return_value, res['firewall'])
def test_create_firewall_invalid_long_name(self):
project_id = _uuid()
data = {'firewall': {'description': 'descr_firewall1',
'name': _long_name,
'admin_state_up': True,
'firewall_policy_id': _uuid(),
'shared': False,
'project_id': project_id,
'tenant_id': project_id}}
res = self.api.post(_get_path('fw/firewalls', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
status=exc.HTTPBadRequest.code)
self.assertIn('Invalid input for name', res.body.decode('utf-8'))
def test_create_firewall_invalid_long_description(self):
project_id = _uuid()
data = {'firewall': {'description': _long_description,
'name': 'firewall1',
'admin_state_up': True,
'firewall_policy_id': _uuid(),
'shared': False,
'project_id': project_id,
'tenant_id': project_id}}
res = self.api.post(_get_path('fw/firewalls', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
status=exc.HTTPBadRequest.code)
self.assertIn('Invalid input for description',
res.body.decode('utf-8'))
def test_firewall_list(self):
fw_id = _uuid()
return_value = [{'tenant_id': _uuid(),
'id': fw_id}]
instance = self.plugin.return_value
instance.get_firewalls.return_value = return_value
res = self.api.get(_get_path('fw/firewalls', fmt=self.fmt))
instance.get_firewalls.assert_called_with(mock.ANY,
fields=mock.ANY,
filters=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
def test_firewall_get(self):
fw_id = _uuid()
return_value = {'tenant_id': _uuid(),
'id': fw_id}
instance = self.plugin.return_value
instance.get_firewall.return_value = return_value
res = self.api.get(_get_path('fw/firewalls',
id=fw_id, fmt=self.fmt))
instance.get_firewall.assert_called_with(mock.ANY,
fw_id,
fields=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall', res)
self.assertEqual(return_value, res['firewall'])
def test_firewall_update(self):
fw_id = _uuid()
update_data = {'firewall': {'name': 'new_name'}}
return_value = {'tenant_id': _uuid(),
'id': fw_id}
instance = self.plugin.return_value
instance.update_firewall.return_value = return_value
res = self.api.put(_get_path('fw/firewalls', id=fw_id,
fmt=self.fmt),
self.serialize(update_data))
instance.update_firewall.assert_called_with(mock.ANY, fw_id,
firewall=update_data)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall', res)
self.assertEqual(return_value, res['firewall'])
def test_firewall_delete(self):
self._test_entity_delete('firewall')
def _test_create_firewall_rule(self, src_port, dst_port):
rule_id = _uuid()
project_id = _uuid()
data = {'firewall_rule': {'description': 'descr_firewall_rule1',
'name': 'rule1',
'shared': False,
'protocol': 'tcp',
'ip_version': 4,
'source_ip_address': '192.168.0.1',
'destination_ip_address': '127.0.0.1',
'source_port': src_port,
'destination_port': dst_port,
'action': 'allow',
'enabled': True,
'project_id': project_id,
'tenant_id': project_id}}
expected_ret_val = copy.copy(data['firewall_rule'])
expected_ret_val['source_port'] = str(src_port)
expected_ret_val['destination_port'] = str(dst_port)
expected_call_args = copy.copy(expected_ret_val)
expected_ret_val['id'] = rule_id
instance = self.plugin.return_value
instance.create_firewall_rule.return_value = expected_ret_val
res = self.api.post(_get_path('fw/firewall_rules', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt)
instance.create_firewall_rule.assert_called_with(
mock.ANY,
firewall_rule={'firewall_rule': expected_call_args})
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall_rule', res)
self.assertEqual(expected_ret_val, res['firewall_rule'])
def test_create_firewall_rule_with_integer_ports(self):
self._test_create_firewall_rule(1, 10)
def test_create_firewall_rule_with_string_ports(self):
self._test_create_firewall_rule('1', '10')
def test_create_firewall_rule_with_port_range(self):
self._test_create_firewall_rule('1:20', '30:40')
def test_create_firewall_rule_invalid_long_name(self):
project_id = _uuid()
data = {'firewall_rule': {'description': 'descr_firewall_rule1',
'name': _long_name,
'shared': False,
'protocol': 'tcp',
'ip_version': 4,
'source_ip_address': '192.168.0.1',
'destination_ip_address': '127.0.0.1',
'source_port': 1,
'destination_port': 1,
'action': 'allow',
'enabled': True,
'project_id': project_id,
'tenant_id': project_id}}
res = self.api.post(_get_path('fw/firewall_rules', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
status=exc.HTTPBadRequest.code)
self.assertIn('Invalid input for name', res.body.decode('utf-8'))
def test_create_firewall_rule_invalid_long_description(self):
project_id = _uuid()
data = {'firewall_rule': {'description': _long_description,
'name': 'rule1',
'shared': False,
'protocol': 'tcp',
'ip_version': 4,
'source_ip_address': '192.168.0.1',
'destination_ip_address': '127.0.0.1',
'source_port': 1,
'destination_port': 1,
'action': 'allow',
'enabled': True,
'project_id': project_id,
'tenant_id': project_id}}
res = self.api.post(_get_path('fw/firewall_rules', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
status=exc.HTTPBadRequest.code)
self.assertIn('Invalid input for description',
res.body.decode('utf-8'))
def test_firewall_rule_list(self):
rule_id = _uuid()
return_value = [{'tenant_id': _uuid(),
'id': rule_id}]
instance = self.plugin.return_value
instance.get_firewall_rules.return_value = return_value
res = self.api.get(_get_path('fw/firewall_rules', fmt=self.fmt))
instance.get_firewall_rules.assert_called_with(mock.ANY,
fields=mock.ANY,
filters=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
def test_firewall_rule_get(self):
rule_id = _uuid()
return_value = {'tenant_id': _uuid(),
'id': rule_id}
instance = self.plugin.return_value
instance.get_firewall_rule.return_value = return_value
res = self.api.get(_get_path('fw/firewall_rules',
id=rule_id, fmt=self.fmt))
instance.get_firewall_rule.assert_called_with(mock.ANY,
rule_id,
fields=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall_rule', res)
self.assertEqual(return_value, res['firewall_rule'])
def test_firewall_rule_update(self):
rule_id = _uuid()
update_data = {'firewall_rule': {'action': 'deny'}}
return_value = {'tenant_id': _uuid(),
'id': rule_id}
instance = self.plugin.return_value
instance.update_firewall_rule.return_value = return_value
res = self.api.put(_get_path('fw/firewall_rules', id=rule_id,
fmt=self.fmt),
self.serialize(update_data))
instance.update_firewall_rule.assert_called_with(
mock.ANY,
rule_id,
firewall_rule=update_data)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall_rule', res)
self.assertEqual(return_value, res['firewall_rule'])
def test_firewall_rule_delete(self):
self._test_entity_delete('firewall_rule')
def test_create_firewall_policy(self):
policy_id = _uuid()
project_id = _uuid()
data = {'firewall_policy': {'description': 'descr_firewall_policy1',
'name': 'new_fw_policy1',
'shared': False,
'firewall_rules': [_uuid(), _uuid()],
'audited': False,
'project_id': project_id,
'tenant_id': project_id}}
return_value = copy.copy(data['firewall_policy'])
return_value.update({'id': policy_id})
instance = self.plugin.return_value
instance.create_firewall_policy.return_value = return_value
res = self.api.post(_get_path('fw/firewall_policies',
fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt)
instance.create_firewall_policy.assert_called_with(
mock.ANY,
firewall_policy=data)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall_policy', res)
self.assertEqual(return_value, res['firewall_policy'])
def test_create_firewall_policy_invalid_long_name(self):
project_id = _uuid()
data = {'firewall_policy': {'description': 'descr_firewall_policy1',
'name': _long_name,
'shared': False,
'firewall_rules': [_uuid(), _uuid()],
'audited': False,
'project_id': project_id,
'tenant_id': project_id}}
res = self.api.post(_get_path('fw/firewall_policies',
fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
status=exc.HTTPBadRequest.code)
self.assertIn('Invalid input for name', res.body.decode('utf-8'))
def test_create_firewall_policy_invalid_long_description(self):
project_id = _uuid()
data = {'firewall_policy': {'description': _long_description,
'name': 'new_fw_policy1',
'shared': False,
'firewall_rules': [_uuid(), _uuid()],
'audited': False,
'project_id': project_id,
'tenant_id': project_id}}
res = self.api.post(_get_path('fw/firewall_policies',
fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
status=exc.HTTPBadRequest.code)
self.assertIn('Invalid input for description',
res.body.decode('utf-8'))
def test_firewall_policy_list(self):
policy_id = _uuid()
return_value = [{'tenant_id': _uuid(),
'id': policy_id}]
instance = self.plugin.return_value
instance.get_firewall_policies.return_value = return_value
res = self.api.get(_get_path('fw/firewall_policies',
fmt=self.fmt))
instance.get_firewall_policies.assert_called_with(mock.ANY,
fields=mock.ANY,
filters=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
def test_firewall_policy_get(self):
policy_id = _uuid()
return_value = {'tenant_id': _uuid(),
'id': policy_id}
instance = self.plugin.return_value
instance.get_firewall_policy.return_value = return_value
res = self.api.get(_get_path('fw/firewall_policies',
id=policy_id, fmt=self.fmt))
instance.get_firewall_policy.assert_called_with(mock.ANY,
policy_id,
fields=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall_policy', res)
self.assertEqual(return_value, res['firewall_policy'])
def test_firewall_policy_update(self):
policy_id = _uuid()
update_data = {'firewall_policy': {'audited': True}}
return_value = {'tenant_id': _uuid(),
'id': policy_id}
instance = self.plugin.return_value
instance.update_firewall_policy.return_value = return_value
res = self.api.put(_get_path('fw/firewall_policies',
id=policy_id,
fmt=self.fmt),
self.serialize(update_data))
instance.update_firewall_policy.assert_called_with(
mock.ANY,
policy_id,
firewall_policy=update_data)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('firewall_policy', res)
self.assertEqual(return_value, res['firewall_policy'])
def test_firewall_policy_update_malformed_rules(self):
# emulating client request when no rule uuids are provided for
# --firewall_rules parameter
update_data = {'firewall_policy': {'firewall_rules': True}}
# have to check for generic AppError
self.assertRaises(
webtest.AppError,
self.api.put,
_get_path('fw/firewall_policies', id=_uuid(), fmt=self.fmt),
self.serialize(update_data))
def test_firewall_policy_delete(self):
self._test_entity_delete('firewall_policy')
def test_firewall_policy_insert_rule(self):
firewall_policy_id = _uuid()
firewall_rule_id = _uuid()
ref_firewall_rule_id = _uuid()
insert_data = {'firewall_rule_id': firewall_rule_id,
'insert_before': ref_firewall_rule_id,
'insert_after': None}
return_value = {'firewall_policy':
{'tenant_id': _uuid(),
'id': firewall_policy_id,
'firewall_rules': [ref_firewall_rule_id,
firewall_rule_id]}}
instance = self.plugin.return_value
instance.insert_rule.return_value = return_value
path = _get_path('fw/firewall_policies', id=firewall_policy_id,
action="insert_rule",
fmt=self.fmt)
res = self.api.put(path, self.serialize(insert_data))
instance.insert_rule.assert_called_with(mock.ANY, firewall_policy_id,
insert_data)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertEqual(return_value, res)
def test_firewall_policy_remove_rule(self):
firewall_policy_id = _uuid()
firewall_rule_id = _uuid()
remove_data = {'firewall_rule_id': firewall_rule_id}
return_value = {'firewall_policy':
{'tenant_id': _uuid(),
'id': firewall_policy_id,
'firewall_rules': []}}
instance = self.plugin.return_value
instance.remove_rule.return_value = return_value
path = _get_path('fw/firewall_policies', id=firewall_policy_id,
action="remove_rule",
fmt=self.fmt)
res = self.api.put(path, self.serialize(remove_data))
instance.remove_rule.assert_called_with(mock.ANY, firewall_policy_id,
remove_data)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertEqual(return_value, res)
class TestFirewallAttributeValidators(base.BaseTestCase):
def test_validate_port_range(self):
msg = firewall._validate_port_range(None)
self.assertIsNone(msg)
msg = firewall._validate_port_range('10')
self.assertIsNone(msg)
msg = firewall._validate_port_range(10)
self.assertIsNone(msg)
msg = firewall._validate_port_range(-1)
self.assertEqual("Invalid port '-1'", msg)
msg = firewall._validate_port_range('66000')
self.assertEqual("Invalid port '66000'", msg)
msg = firewall._validate_port_range('10:20')
self.assertIsNone(msg)
msg = firewall._validate_port_range('1:65535')
self.assertIsNone(msg)
msg = firewall._validate_port_range('0:65535')
self.assertEqual("Invalid port '0'", msg)
msg = firewall._validate_port_range('1:65536')
self.assertEqual("Invalid port '65536'", msg)
msg = firewall._validate_port_range('abc:efg')
self.assertEqual("Port 'abc' is not a valid number", msg)
msg = firewall._validate_port_range('1:efg')
self.assertEqual("Port 'efg' is not a valid number", msg)
msg = firewall._validate_port_range('-1:10')
self.assertEqual("Invalid port '-1'", msg)
msg = firewall._validate_port_range('66000:10')
self.assertEqual("Invalid port '66000'", msg)
msg = firewall._validate_port_range('10:66000')
self.assertEqual("Invalid port '66000'", msg)
msg = firewall._validate_port_range('1:-10')
self.assertEqual("Invalid port '-10'", msg)
def test_validate_ip_or_subnet_or_none(self):
msg = firewall._validate_ip_or_subnet_or_none(None)
self.assertIsNone(msg)
msg = firewall._validate_ip_or_subnet_or_none('1.1.1.1')
self.assertIsNone(msg)
msg = firewall._validate_ip_or_subnet_or_none('1.1.1.0/24')
self.assertIsNone(msg)
ip_addr = '1111.1.1.1'
msg = firewall._validate_ip_or_subnet_or_none(ip_addr)
self.assertEqual(
("'%s' is not a valid IP address and "
"'%s' is not a valid IP subnet")
% (ip_addr,
ip_addr),
msg)
ip_addr = '1.1.1.1 has whitespace'
msg = firewall._validate_ip_or_subnet_or_none(ip_addr)
self.assertEqual(
("'%s' is not a valid IP address and "
"'%s' is not a valid IP subnet") % (ip_addr,
ip_addr),
msg)
ip_addr = '111.1.1.1\twhitespace'
msg = firewall._validate_ip_or_subnet_or_none(ip_addr)
self.assertEqual(
("'%s' is not a valid IP address and "
"'%s' is not a valid IP subnet") % (ip_addr,
ip_addr),
msg)
ip_addr = '111.1.1.1\nwhitespace'
msg = firewall._validate_ip_or_subnet_or_none(ip_addr)
self.assertEqual(
("'%s' is not a valid IP address and "
"'%s' is not a valid IP subnet") % (ip_addr,
ip_addr),
msg)
# Valid - IPv4
cidr = "10.0.2.0/24"
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
self.assertIsNone(msg)
# Valid - IPv6 without final octets
cidr = "fe80::/24"
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
self.assertIsNone(msg)
# Valid - IPv6 with final octets
cidr = "fe80::0/24"
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
self.assertIsNone(msg)
cidr = "fe80::"
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
self.assertIsNone(msg)
# Invalid - IPv6 with final octets, missing mask
cidr = "fe80::0"
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
self.assertIsNone(msg)
# Invalid - Address format error
cidr = 'invalid'
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
self.assertEqual(
("'%s' is not a valid IP address and "
"'%s' is not a valid IP subnet") % (cidr,
cidr),
msg)
class TestFirewallConvertProtocols(base.BaseTestCase):
def test_convert_protocol_string_integer(self):
res = firewall.convert_protocol("0")
self.assertEqual(0, res)
res = firewall.convert_protocol("255")
self.assertEqual(255, res)
def test_convert_protocol_digit(self):
res = firewall.convert_protocol(0)
self.assertEqual(0, res)
res = firewall.convert_protocol(255)
self.assertEqual(255, res)
def test_convert_protocol_another_types(self):
res = lambda: firewall.convert_protocol(['abc'])
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
res = lambda: firewall.convert_protocol({1: 'foo'})
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
res = lambda: firewall.convert_protocol((1, 100))
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
res = lambda: firewall.convert_protocol(object)
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
def test_convert_protocol_invalid_digit(self):
res = lambda: firewall.convert_protocol("-1")
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
res = lambda: firewall.convert_protocol("256")
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
def test_convert_protocol_name(self):
res = firewall.convert_protocol("tcp")
self.assertEqual("tcp", res)
res = firewall.convert_protocol("UDP")
self.assertEqual("udp", res)
res = firewall.convert_protocol("Icmp")
self.assertEqual("icmp", res)
def test_convert_protocol_invalid_name(self):
res = lambda: firewall.convert_protocol("foo")
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
class TestConvertActionToCaseInsensitive(base.BaseTestCase):
def test_convert_action_to_case_insensitive_none(self):
res = firewall.convert_action_to_case_insensitive(None)
self.assertIsNone(res)
def test_convert_action_to_case_insensitive_value(self):
res = firewall.convert_action_to_case_insensitive("foo")
self.assertEqual("foo", res)
res = firewall.convert_action_to_case_insensitive("Bar")
self.assertEqual("bar", res)
res = firewall.convert_action_to_case_insensitive("BAZ")
self.assertEqual("baz", res)

View File

@ -24,6 +24,8 @@ set -x
install_cmd="pip install -c$1"
shift
PARAMS="$*"
# The devstack based functional tests have neutron checked out in
# $NEUTRON_DIR on the test systems - with the change to test in it.
# Use this directory if it exists, so that this script installs the
@ -66,5 +68,8 @@ else
$install_cmd -U -e $SRC_DIR
fi
$install_cmd -U $*
if [ -n "${PARAMS}" ]; then
$install_cmd -U ${PARAMS}
fi
exit $?

View File

@ -86,13 +86,14 @@ commands = python setup.py build_sphinx
# E128 continuation line under-indented for visual indent
# E129 visually indented line with same indent as next logical line
# E265 block comment should start with '# '
# E402 module level import not at top of file
# H404 multi line docstring should start with a summary
# H405 multi line docstring summary not separated with an empty line
# TODO(dougwig) -- uncomment this to test for remaining linkages
# N530 direct neutron imports not allowed
# TODO(ihrachys) -- reenable N537 when new neutron-lib release is available
# N537 Log messages should not be translated
ignore = E125,E126,E128,E129,E265,H404,H405,N530,N537
ignore = E125,E126,E128,E129,E265,E402,H404,H405,N530,N537
enable-extensions=H904
show-source = true
exclude = .venv,.git,.tox,dist,doc,*lib/python*,.tmp,*egg,build,tools,.ropeproject,rally-scenarios