Fix pep8 new warnings and pip install error
This patch is a squash of two fix that are needed together to have a
successful zuul run.
1. pep8 fix (cherry-picked from master:
87eb1b15be
)
A recent change in pep/pycodingchecks introduced new warnings as part of
the pep8 target that causes pep8 to fail now.
This patch fixes code that issued warnings W503,E731,E266,E402
In addition, old tests which were not ran anyway, and are no longer
relevant (test_firewall) were deleted.
Conflicts:
neutron_fwaas/tests/tempest_plugin/tests/scenario/manager.py
2. Avoid parameterless pip install
build-openstack-sphinx-docs jobs fail on all stable branches due to the
new pip version 10 introduces some previously Warning as Error: in case
of calling "pip install" without any package name, the command fails.
tox_install.sh is called during docs job without any package passed to
pip.
Change-Id: If893cf2bc136d09e09344a81fd996ac89479a135
This commit is contained in:
parent
43f56b794b
commit
58a62382e9
|
@ -20,13 +20,12 @@ Revises: start_neutron_fwaas
|
|||
Create Date: 2015-02-10 17:17:47.846764
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '4202e3047e47'
|
||||
down_revision = 'start_neutron_fwaas'
|
||||
|
||||
from alembic import op
|
||||
|
||||
TABLES = ['firewall_rules', 'firewalls', 'firewall_policies']
|
||||
|
||||
|
||||
|
|
|
@ -21,14 +21,14 @@ Create Date: 2015-02-06 17:02:24.279337
|
|||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '540142f314f4'
|
||||
down_revision = '4202e3047e47'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.engine import reflection
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '540142f314f4'
|
||||
down_revision = '4202e3047e47'
|
||||
|
||||
SQL_STATEMENT = (
|
||||
"insert into firewall_router_associations "
|
||||
"select "
|
||||
|
|
|
@ -21,15 +21,14 @@ Create Date: 2015-02-02 13:11:55.184112
|
|||
|
||||
"""
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '796c68dffbb'
|
||||
down_revision = '540142f314f4'
|
||||
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade(active_plugins=None, options=None):
|
||||
|
||||
op.create_table('cisco_firewall_associations',
|
||||
|
|
|
@ -19,14 +19,14 @@ Create Date: 2017-03-31 14:22:21.063392
|
|||
|
||||
"""
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'fd38cd995cc0'
|
||||
down_revision = 'f83a0b2964d0'
|
||||
depends_on = ('d6a12e637e28',)
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.alter_column('firewall_rules_v2', 'public', new_column_name='shared',
|
||||
|
|
|
@ -21,10 +21,6 @@ Create Date: 2017-01-26 23:47:42.795504
|
|||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '876782258a43'
|
||||
down_revision = 'd6a12e637e28'
|
||||
|
||||
from alembic import op
|
||||
from neutron_lib.db import constants as db_constants
|
||||
from neutron_lib import exceptions
|
||||
|
@ -34,6 +30,10 @@ from neutron_fwaas._i18n import _
|
|||
from neutron_fwaas.common import fwaas_constants as const
|
||||
from neutron_fwaas.common import resources
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '876782258a43'
|
||||
down_revision = 'd6a12e637e28'
|
||||
|
||||
|
||||
class DuplicateDefaultFirewallGroup(exceptions.Conflict):
|
||||
message = _("Duplicate Firewall group found named '%s'. "
|
||||
|
|
|
@ -21,16 +21,16 @@ Create Date: 2017-11-08 15:55:40.990272
|
|||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'f24e0d5e5bff'
|
||||
down_revision = '876782258a43'
|
||||
|
||||
from alembic import op
|
||||
from neutron_lib import exceptions
|
||||
import sqlalchemy as sa
|
||||
|
||||
from neutron._i18n import _
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'f24e0d5e5bff'
|
||||
down_revision = '876782258a43'
|
||||
|
||||
|
||||
fwg_port_association = sa.Table(
|
||||
'firewall_group_port_associations_v2', sa.MetaData(),
|
||||
|
|
|
@ -363,9 +363,9 @@ class IptablesFwaasDriver(fwaas_base.FwaasDriverBase):
|
|||
|
||||
# iptables adds '-m protocol' when any source
|
||||
# or destination port number is specified
|
||||
if not((rule.get('source_port') is None)
|
||||
and (rule.get('destination_port') is None)):
|
||||
args += self._match_arg(rule.get('protocol'))
|
||||
if (rule.get('source_port') is not None or
|
||||
rule.get('destination_port') is not None):
|
||||
args += self._match_arg(rule.get('protocol'))
|
||||
|
||||
args += self._port_arg('sport',
|
||||
rule.get('protocol'),
|
||||
|
@ -414,8 +414,7 @@ class IptablesFwaasDriver(fwaas_base.FwaasDriverBase):
|
|||
return args
|
||||
|
||||
def _port_arg(self, direction, protocol, port):
|
||||
if (protocol not in ['udp', 'tcp']
|
||||
or port is None):
|
||||
if protocol not in ['udp', 'tcp'] or port is None:
|
||||
return []
|
||||
|
||||
args = ['--%s' % direction, '%s' % port]
|
||||
|
|
|
@ -394,8 +394,8 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
|
|||
|
||||
# iptables adds '-m protocol' when any source
|
||||
# or destination port number is specified
|
||||
if not((rule.get('source_port') is None)
|
||||
and (rule.get('destination_port') is None)):
|
||||
if (rule.get('source_port') is not None or
|
||||
rule.get('destination_port') is not None):
|
||||
args += self._match_arg(rule.get('protocol'))
|
||||
|
||||
args += self._port_arg('sport',
|
||||
|
@ -445,8 +445,7 @@ class IptablesFwaasDriver(fwaas_base_v2.FwaasDriverBase):
|
|||
return args
|
||||
|
||||
def _port_arg(self, direction, protocol, port):
|
||||
if (protocol not in ['udp', 'tcp']
|
||||
or port is None):
|
||||
if protocol not in ['udp', 'tcp'] or port is None:
|
||||
return []
|
||||
|
||||
args = ['--%s' % direction, '%s' % port]
|
||||
|
|
|
@ -244,8 +244,8 @@ class FirewallPluginV2(
|
|||
raise f_exc.FirewallGroupPortInvalidProject(
|
||||
port_id=port_id, project_id=port_db['tenant_id'])
|
||||
device_owner = port_db.get('device_owner', '')
|
||||
if (device_owner not in [nl_constants.DEVICE_OWNER_ROUTER_INTF]
|
||||
and not device_owner.startswith(
|
||||
if (device_owner not in [nl_constants.DEVICE_OWNER_ROUTER_INTF] and
|
||||
not device_owner.startswith(
|
||||
nl_constants.DEVICE_OWNER_COMPUTE_PREFIX)):
|
||||
raise f_exc.FirewallGroupPortInvalid(port_id=port_id)
|
||||
if (device_owner.startswith(
|
||||
|
|
|
@ -28,8 +28,8 @@ class BaseTestCase(n_base.BaseTestCase):
|
|||
class NeutronDbPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase):
|
||||
|
||||
def setup_config(self):
|
||||
## Copied from neutron's test_db_base_plugin_v2 because they
|
||||
## don't allow to specify args
|
||||
# Copied from neutron's test_db_base_plugin_v2 because they
|
||||
# don't allow to specify args
|
||||
|
||||
# Create the default configurations
|
||||
args = ['--config-file', n_base.etcdir('neutron.conf')]
|
||||
|
@ -37,7 +37,7 @@ class NeutronDbPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase):
|
|||
for config_file in test_lib.test_config.get('config_files', []):
|
||||
args.extend(['--config-file', config_file])
|
||||
|
||||
## our own stuff
|
||||
# our own stuff
|
||||
dirpath = os.path.join(os.path.dirname(__file__),
|
||||
'./../../etc/neutron/policy.d')
|
||||
args.extend(['--config-dir', dirpath])
|
||||
|
|
|
@ -87,7 +87,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
|
|||
cls.volumes_client = cls.os_primary.volumes_client
|
||||
cls.snapshots_client = cls.os_primary.snapshots_client
|
||||
|
||||
# ## Test functions library
|
||||
# Test functions library
|
||||
#
|
||||
# The create_[resource] functions only return body and discard the
|
||||
# resp part which is not used in scenario tests
|
||||
|
@ -680,8 +680,9 @@ class ScenarioTest(tempest.test.BaseTestCase):
|
|||
addresses = (server['addresses'][network['name']]
|
||||
if network else [])
|
||||
for address in addresses:
|
||||
if (address['version'] == CONF.validation.ip_version_for_ssh
|
||||
and address['OS-EXT-IPS:type'] == 'fixed'):
|
||||
if (address['version'] ==
|
||||
CONF.validation.ip_version_for_ssh and
|
||||
address['OS-EXT-IPS:type'] == 'fixed'):
|
||||
return address['addr']
|
||||
raise exceptions.ServerUnreachable(server_id=server['id'])
|
||||
else:
|
||||
|
@ -813,8 +814,8 @@ class NetworkScenarioTest(ScenarioTest):
|
|||
port_map = [(p["id"], fxip["ip_address"])
|
||||
for p in ports
|
||||
for fxip in p["fixed_ips"]
|
||||
if netutils.is_valid_ipv4(fxip["ip_address"])
|
||||
and p['status'] in p_status]
|
||||
if (netutils.is_valid_ipv4(fxip["ip_address"]) and
|
||||
p['status'] in p_status)]
|
||||
inactive = [p for p in ports if p['status'] != 'ACTIVE']
|
||||
if inactive:
|
||||
LOG.warning("Instance has ports that are not ACTIVE: %s", inactive)
|
||||
|
|
|
@ -1,675 +0,0 @@
|
|||
# Copyright 2013 Big Switch Networks, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
import mock
|
||||
from neutron.tests import base
|
||||
from neutron.tests.unit.api.v2 import test_base as test_api_v2
|
||||
from neutron.tests.unit.extensions import base as test_api_v2_extension
|
||||
from neutron_lib.db import constants as db_const
|
||||
from neutron_lib.exceptions import firewall_v1 as f_exc
|
||||
from oslo_utils import uuidutils
|
||||
from webob import exc
|
||||
import webtest
|
||||
|
||||
from neutron_fwaas.common import fwaas_constants
|
||||
from neutron_fwaas.extensions import firewall
|
||||
|
||||
_uuid = uuidutils.generate_uuid
|
||||
_get_path = test_api_v2._get_path
|
||||
_long_name = 'x' * (db_const.NAME_FIELD_SIZE + 1)
|
||||
_long_description = 'y' * (db_const.DESCRIPTION_FIELD_SIZE + 1)
|
||||
|
||||
|
||||
class FirewallExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
|
||||
fmt = 'json'
|
||||
|
||||
def setUp(self):
|
||||
super(FirewallExtensionTestCase, self).setUp()
|
||||
plural_mappings = {'firewall_policy': 'firewall_policies'}
|
||||
self._setUpExtension(
|
||||
'neutron_fwaas.extensions.firewall.FirewallPluginBase',
|
||||
fwaas_constants.FIREWALL, firewall.RESOURCE_ATTRIBUTE_MAP,
|
||||
firewall.Firewall, 'fw', plural_mappings=plural_mappings)
|
||||
|
||||
def test_create_firewall(self):
|
||||
fw_id = _uuid()
|
||||
project_id = _uuid()
|
||||
data = {'firewall': {'description': 'descr_firewall1',
|
||||
'name': 'firewall1',
|
||||
'admin_state_up': True,
|
||||
'firewall_policy_id': _uuid(),
|
||||
'shared': False,
|
||||
'project_id': project_id,
|
||||
'tenant_id': project_id}}
|
||||
return_value = copy.copy(data['firewall'])
|
||||
return_value.update({'id': fw_id})
|
||||
# since 'shared' is hidden
|
||||
del return_value['shared']
|
||||
|
||||
instance = self.plugin.return_value
|
||||
instance.create_firewall.return_value = return_value
|
||||
res = self.api.post(_get_path('fw/firewalls', fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/%s' % self.fmt)
|
||||
instance.create_firewall.assert_called_with(mock.ANY,
|
||||
firewall=data)
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('firewall', res)
|
||||
self.assertEqual(return_value, res['firewall'])
|
||||
|
||||
def test_create_firewall_invalid_long_name(self):
|
||||
project_id = _uuid()
|
||||
data = {'firewall': {'description': 'descr_firewall1',
|
||||
'name': _long_name,
|
||||
'admin_state_up': True,
|
||||
'firewall_policy_id': _uuid(),
|
||||
'shared': False,
|
||||
'project_id': project_id,
|
||||
'tenant_id': project_id}}
|
||||
res = self.api.post(_get_path('fw/firewalls', fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/%s' % self.fmt,
|
||||
status=exc.HTTPBadRequest.code)
|
||||
self.assertIn('Invalid input for name', res.body.decode('utf-8'))
|
||||
|
||||
def test_create_firewall_invalid_long_description(self):
|
||||
project_id = _uuid()
|
||||
data = {'firewall': {'description': _long_description,
|
||||
'name': 'firewall1',
|
||||
'admin_state_up': True,
|
||||
'firewall_policy_id': _uuid(),
|
||||
'shared': False,
|
||||
'project_id': project_id,
|
||||
'tenant_id': project_id}}
|
||||
res = self.api.post(_get_path('fw/firewalls', fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/%s' % self.fmt,
|
||||
status=exc.HTTPBadRequest.code)
|
||||
self.assertIn('Invalid input for description',
|
||||
res.body.decode('utf-8'))
|
||||
|
||||
def test_firewall_list(self):
|
||||
fw_id = _uuid()
|
||||
return_value = [{'tenant_id': _uuid(),
|
||||
'id': fw_id}]
|
||||
|
||||
instance = self.plugin.return_value
|
||||
instance.get_firewalls.return_value = return_value
|
||||
|
||||
res = self.api.get(_get_path('fw/firewalls', fmt=self.fmt))
|
||||
|
||||
instance.get_firewalls.assert_called_with(mock.ANY,
|
||||
fields=mock.ANY,
|
||||
filters=mock.ANY)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
|
||||
def test_firewall_get(self):
|
||||
fw_id = _uuid()
|
||||
return_value = {'tenant_id': _uuid(),
|
||||
'id': fw_id}
|
||||
|
||||
instance = self.plugin.return_value
|
||||
instance.get_firewall.return_value = return_value
|
||||
|
||||
res = self.api.get(_get_path('fw/firewalls',
|
||||
id=fw_id, fmt=self.fmt))
|
||||
|
||||
instance.get_firewall.assert_called_with(mock.ANY,
|
||||
fw_id,
|
||||
fields=mock.ANY)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('firewall', res)
|
||||
self.assertEqual(return_value, res['firewall'])
|
||||
|
||||
def test_firewall_update(self):
|
||||
fw_id = _uuid()
|
||||
update_data = {'firewall': {'name': 'new_name'}}
|
||||
return_value = {'tenant_id': _uuid(),
|
||||
'id': fw_id}
|
||||
|
||||
instance = self.plugin.return_value
|
||||
instance.update_firewall.return_value = return_value
|
||||
|
||||
res = self.api.put(_get_path('fw/firewalls', id=fw_id,
|
||||
fmt=self.fmt),
|
||||
self.serialize(update_data))
|
||||
|
||||
instance.update_firewall.assert_called_with(mock.ANY, fw_id,
|
||||
firewall=update_data)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('firewall', res)
|
||||
self.assertEqual(return_value, res['firewall'])
|
||||
|
||||
def test_firewall_delete(self):
|
||||
self._test_entity_delete('firewall')
|
||||
|
||||
def _test_create_firewall_rule(self, src_port, dst_port):
|
||||
rule_id = _uuid()
|
||||
project_id = _uuid()
|
||||
data = {'firewall_rule': {'description': 'descr_firewall_rule1',
|
||||
'name': 'rule1',
|
||||
'shared': False,
|
||||
'protocol': 'tcp',
|
||||
'ip_version': 4,
|
||||
'source_ip_address': '192.168.0.1',
|
||||
'destination_ip_address': '127.0.0.1',
|
||||
'source_port': src_port,
|
||||
'destination_port': dst_port,
|
||||
'action': 'allow',
|
||||
'enabled': True,
|
||||
'project_id': project_id,
|
||||
'tenant_id': project_id}}
|
||||
expected_ret_val = copy.copy(data['firewall_rule'])
|
||||
expected_ret_val['source_port'] = str(src_port)
|
||||
expected_ret_val['destination_port'] = str(dst_port)
|
||||
expected_call_args = copy.copy(expected_ret_val)
|
||||
expected_ret_val['id'] = rule_id
|
||||
instance = self.plugin.return_value
|
||||
instance.create_firewall_rule.return_value = expected_ret_val
|
||||
res = self.api.post(_get_path('fw/firewall_rules', fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/%s' % self.fmt)
|
||||
instance.create_firewall_rule.assert_called_with(
|
||||
mock.ANY,
|
||||
firewall_rule={'firewall_rule': expected_call_args})
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('firewall_rule', res)
|
||||
self.assertEqual(expected_ret_val, res['firewall_rule'])
|
||||
|
||||
def test_create_firewall_rule_with_integer_ports(self):
|
||||
self._test_create_firewall_rule(1, 10)
|
||||
|
||||
def test_create_firewall_rule_with_string_ports(self):
|
||||
self._test_create_firewall_rule('1', '10')
|
||||
|
||||
def test_create_firewall_rule_with_port_range(self):
|
||||
self._test_create_firewall_rule('1:20', '30:40')
|
||||
|
||||
def test_create_firewall_rule_invalid_long_name(self):
|
||||
project_id = _uuid()
|
||||
data = {'firewall_rule': {'description': 'descr_firewall_rule1',
|
||||
'name': _long_name,
|
||||
'shared': False,
|
||||
'protocol': 'tcp',
|
||||
'ip_version': 4,
|
||||
'source_ip_address': '192.168.0.1',
|
||||
'destination_ip_address': '127.0.0.1',
|
||||
'source_port': 1,
|
||||
'destination_port': 1,
|
||||
'action': 'allow',
|
||||
'enabled': True,
|
||||
'project_id': project_id,
|
||||
'tenant_id': project_id}}
|
||||
res = self.api.post(_get_path('fw/firewall_rules', fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/%s' % self.fmt,
|
||||
status=exc.HTTPBadRequest.code)
|
||||
self.assertIn('Invalid input for name', res.body.decode('utf-8'))
|
||||
|
||||
def test_create_firewall_rule_invalid_long_description(self):
|
||||
project_id = _uuid()
|
||||
data = {'firewall_rule': {'description': _long_description,
|
||||
'name': 'rule1',
|
||||
'shared': False,
|
||||
'protocol': 'tcp',
|
||||
'ip_version': 4,
|
||||
'source_ip_address': '192.168.0.1',
|
||||
'destination_ip_address': '127.0.0.1',
|
||||
'source_port': 1,
|
||||
'destination_port': 1,
|
||||
'action': 'allow',
|
||||
'enabled': True,
|
||||
'project_id': project_id,
|
||||
'tenant_id': project_id}}
|
||||
res = self.api.post(_get_path('fw/firewall_rules', fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/%s' % self.fmt,
|
||||
status=exc.HTTPBadRequest.code)
|
||||
self.assertIn('Invalid input for description',
|
||||
res.body.decode('utf-8'))
|
||||
|
||||
def test_firewall_rule_list(self):
|
||||
rule_id = _uuid()
|
||||
return_value = [{'tenant_id': _uuid(),
|
||||
'id': rule_id}]
|
||||
|
||||
instance = self.plugin.return_value
|
||||
instance.get_firewall_rules.return_value = return_value
|
||||
|
||||
res = self.api.get(_get_path('fw/firewall_rules', fmt=self.fmt))
|
||||
|
||||
instance.get_firewall_rules.assert_called_with(mock.ANY,
|
||||
fields=mock.ANY,
|
||||
filters=mock.ANY)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
|
||||
def test_firewall_rule_get(self):
|
||||
rule_id = _uuid()
|
||||
return_value = {'tenant_id': _uuid(),
|
||||
'id': rule_id}
|
||||
|
||||
instance = self.plugin.return_value
|
||||
instance.get_firewall_rule.return_value = return_value
|
||||
|
||||
res = self.api.get(_get_path('fw/firewall_rules',
|
||||
id=rule_id, fmt=self.fmt))
|
||||
|
||||
instance.get_firewall_rule.assert_called_with(mock.ANY,
|
||||
rule_id,
|
||||
fields=mock.ANY)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('firewall_rule', res)
|
||||
self.assertEqual(return_value, res['firewall_rule'])
|
||||
|
||||
def test_firewall_rule_update(self):
|
||||
rule_id = _uuid()
|
||||
update_data = {'firewall_rule': {'action': 'deny'}}
|
||||
return_value = {'tenant_id': _uuid(),
|
||||
'id': rule_id}
|
||||
|
||||
instance = self.plugin.return_value
|
||||
instance.update_firewall_rule.return_value = return_value
|
||||
|
||||
res = self.api.put(_get_path('fw/firewall_rules', id=rule_id,
|
||||
fmt=self.fmt),
|
||||
self.serialize(update_data))
|
||||
|
||||
instance.update_firewall_rule.assert_called_with(
|
||||
mock.ANY,
|
||||
rule_id,
|
||||
firewall_rule=update_data)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('firewall_rule', res)
|
||||
self.assertEqual(return_value, res['firewall_rule'])
|
||||
|
||||
def test_firewall_rule_delete(self):
|
||||
self._test_entity_delete('firewall_rule')
|
||||
|
||||
def test_create_firewall_policy(self):
|
||||
policy_id = _uuid()
|
||||
project_id = _uuid()
|
||||
data = {'firewall_policy': {'description': 'descr_firewall_policy1',
|
||||
'name': 'new_fw_policy1',
|
||||
'shared': False,
|
||||
'firewall_rules': [_uuid(), _uuid()],
|
||||
'audited': False,
|
||||
'project_id': project_id,
|
||||
'tenant_id': project_id}}
|
||||
return_value = copy.copy(data['firewall_policy'])
|
||||
return_value.update({'id': policy_id})
|
||||
|
||||
instance = self.plugin.return_value
|
||||
instance.create_firewall_policy.return_value = return_value
|
||||
res = self.api.post(_get_path('fw/firewall_policies',
|
||||
fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/%s' % self.fmt)
|
||||
instance.create_firewall_policy.assert_called_with(
|
||||
mock.ANY,
|
||||
firewall_policy=data)
|
||||
self.assertEqual(exc.HTTPCreated.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('firewall_policy', res)
|
||||
self.assertEqual(return_value, res['firewall_policy'])
|
||||
|
||||
def test_create_firewall_policy_invalid_long_name(self):
|
||||
project_id = _uuid()
|
||||
data = {'firewall_policy': {'description': 'descr_firewall_policy1',
|
||||
'name': _long_name,
|
||||
'shared': False,
|
||||
'firewall_rules': [_uuid(), _uuid()],
|
||||
'audited': False,
|
||||
'project_id': project_id,
|
||||
'tenant_id': project_id}}
|
||||
res = self.api.post(_get_path('fw/firewall_policies',
|
||||
fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/%s' % self.fmt,
|
||||
status=exc.HTTPBadRequest.code)
|
||||
self.assertIn('Invalid input for name', res.body.decode('utf-8'))
|
||||
|
||||
def test_create_firewall_policy_invalid_long_description(self):
|
||||
project_id = _uuid()
|
||||
data = {'firewall_policy': {'description': _long_description,
|
||||
'name': 'new_fw_policy1',
|
||||
'shared': False,
|
||||
'firewall_rules': [_uuid(), _uuid()],
|
||||
'audited': False,
|
||||
'project_id': project_id,
|
||||
'tenant_id': project_id}}
|
||||
res = self.api.post(_get_path('fw/firewall_policies',
|
||||
fmt=self.fmt),
|
||||
self.serialize(data),
|
||||
content_type='application/%s' % self.fmt,
|
||||
status=exc.HTTPBadRequest.code)
|
||||
self.assertIn('Invalid input for description',
|
||||
res.body.decode('utf-8'))
|
||||
|
||||
def test_firewall_policy_list(self):
|
||||
policy_id = _uuid()
|
||||
return_value = [{'tenant_id': _uuid(),
|
||||
'id': policy_id}]
|
||||
|
||||
instance = self.plugin.return_value
|
||||
instance.get_firewall_policies.return_value = return_value
|
||||
|
||||
res = self.api.get(_get_path('fw/firewall_policies',
|
||||
fmt=self.fmt))
|
||||
|
||||
instance.get_firewall_policies.assert_called_with(mock.ANY,
|
||||
fields=mock.ANY,
|
||||
filters=mock.ANY)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
|
||||
def test_firewall_policy_get(self):
|
||||
policy_id = _uuid()
|
||||
return_value = {'tenant_id': _uuid(),
|
||||
'id': policy_id}
|
||||
|
||||
instance = self.plugin.return_value
|
||||
instance.get_firewall_policy.return_value = return_value
|
||||
|
||||
res = self.api.get(_get_path('fw/firewall_policies',
|
||||
id=policy_id, fmt=self.fmt))
|
||||
|
||||
instance.get_firewall_policy.assert_called_with(mock.ANY,
|
||||
policy_id,
|
||||
fields=mock.ANY)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('firewall_policy', res)
|
||||
self.assertEqual(return_value, res['firewall_policy'])
|
||||
|
||||
def test_firewall_policy_update(self):
|
||||
policy_id = _uuid()
|
||||
update_data = {'firewall_policy': {'audited': True}}
|
||||
return_value = {'tenant_id': _uuid(),
|
||||
'id': policy_id}
|
||||
|
||||
instance = self.plugin.return_value
|
||||
instance.update_firewall_policy.return_value = return_value
|
||||
|
||||
res = self.api.put(_get_path('fw/firewall_policies',
|
||||
id=policy_id,
|
||||
fmt=self.fmt),
|
||||
self.serialize(update_data))
|
||||
|
||||
instance.update_firewall_policy.assert_called_with(
|
||||
mock.ANY,
|
||||
policy_id,
|
||||
firewall_policy=update_data)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertIn('firewall_policy', res)
|
||||
self.assertEqual(return_value, res['firewall_policy'])
|
||||
|
||||
def test_firewall_policy_update_malformed_rules(self):
|
||||
# emulating client request when no rule uuids are provided for
|
||||
# --firewall_rules parameter
|
||||
update_data = {'firewall_policy': {'firewall_rules': True}}
|
||||
# have to check for generic AppError
|
||||
self.assertRaises(
|
||||
webtest.AppError,
|
||||
self.api.put,
|
||||
_get_path('fw/firewall_policies', id=_uuid(), fmt=self.fmt),
|
||||
self.serialize(update_data))
|
||||
|
||||
def test_firewall_policy_delete(self):
|
||||
self._test_entity_delete('firewall_policy')
|
||||
|
||||
def test_firewall_policy_insert_rule(self):
|
||||
firewall_policy_id = _uuid()
|
||||
firewall_rule_id = _uuid()
|
||||
ref_firewall_rule_id = _uuid()
|
||||
|
||||
insert_data = {'firewall_rule_id': firewall_rule_id,
|
||||
'insert_before': ref_firewall_rule_id,
|
||||
'insert_after': None}
|
||||
return_value = {'firewall_policy':
|
||||
{'tenant_id': _uuid(),
|
||||
'id': firewall_policy_id,
|
||||
'firewall_rules': [ref_firewall_rule_id,
|
||||
firewall_rule_id]}}
|
||||
|
||||
instance = self.plugin.return_value
|
||||
instance.insert_rule.return_value = return_value
|
||||
|
||||
path = _get_path('fw/firewall_policies', id=firewall_policy_id,
|
||||
action="insert_rule",
|
||||
fmt=self.fmt)
|
||||
res = self.api.put(path, self.serialize(insert_data))
|
||||
instance.insert_rule.assert_called_with(mock.ANY, firewall_policy_id,
|
||||
insert_data)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertEqual(return_value, res)
|
||||
|
||||
def test_firewall_policy_remove_rule(self):
|
||||
firewall_policy_id = _uuid()
|
||||
firewall_rule_id = _uuid()
|
||||
|
||||
remove_data = {'firewall_rule_id': firewall_rule_id}
|
||||
return_value = {'firewall_policy':
|
||||
{'tenant_id': _uuid(),
|
||||
'id': firewall_policy_id,
|
||||
'firewall_rules': []}}
|
||||
|
||||
instance = self.plugin.return_value
|
||||
instance.remove_rule.return_value = return_value
|
||||
|
||||
path = _get_path('fw/firewall_policies', id=firewall_policy_id,
|
||||
action="remove_rule",
|
||||
fmt=self.fmt)
|
||||
res = self.api.put(path, self.serialize(remove_data))
|
||||
instance.remove_rule.assert_called_with(mock.ANY, firewall_policy_id,
|
||||
remove_data)
|
||||
self.assertEqual(exc.HTTPOk.code, res.status_int)
|
||||
res = self.deserialize(res)
|
||||
self.assertEqual(return_value, res)
|
||||
|
||||
|
||||
class TestFirewallAttributeValidators(base.BaseTestCase):
|
||||
|
||||
def test_validate_port_range(self):
|
||||
msg = firewall._validate_port_range(None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = firewall._validate_port_range('10')
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = firewall._validate_port_range(10)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = firewall._validate_port_range(-1)
|
||||
self.assertEqual("Invalid port '-1'", msg)
|
||||
|
||||
msg = firewall._validate_port_range('66000')
|
||||
self.assertEqual("Invalid port '66000'", msg)
|
||||
|
||||
msg = firewall._validate_port_range('10:20')
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = firewall._validate_port_range('1:65535')
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = firewall._validate_port_range('0:65535')
|
||||
self.assertEqual("Invalid port '0'", msg)
|
||||
|
||||
msg = firewall._validate_port_range('1:65536')
|
||||
self.assertEqual("Invalid port '65536'", msg)
|
||||
|
||||
msg = firewall._validate_port_range('abc:efg')
|
||||
self.assertEqual("Port 'abc' is not a valid number", msg)
|
||||
|
||||
msg = firewall._validate_port_range('1:efg')
|
||||
self.assertEqual("Port 'efg' is not a valid number", msg)
|
||||
|
||||
msg = firewall._validate_port_range('-1:10')
|
||||
self.assertEqual("Invalid port '-1'", msg)
|
||||
|
||||
msg = firewall._validate_port_range('66000:10')
|
||||
self.assertEqual("Invalid port '66000'", msg)
|
||||
|
||||
msg = firewall._validate_port_range('10:66000')
|
||||
self.assertEqual("Invalid port '66000'", msg)
|
||||
|
||||
msg = firewall._validate_port_range('1:-10')
|
||||
self.assertEqual("Invalid port '-10'", msg)
|
||||
|
||||
def test_validate_ip_or_subnet_or_none(self):
|
||||
msg = firewall._validate_ip_or_subnet_or_none(None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = firewall._validate_ip_or_subnet_or_none('1.1.1.1')
|
||||
self.assertIsNone(msg)
|
||||
|
||||
msg = firewall._validate_ip_or_subnet_or_none('1.1.1.0/24')
|
||||
self.assertIsNone(msg)
|
||||
|
||||
ip_addr = '1111.1.1.1'
|
||||
msg = firewall._validate_ip_or_subnet_or_none(ip_addr)
|
||||
self.assertEqual(
|
||||
("'%s' is not a valid IP address and "
|
||||
"'%s' is not a valid IP subnet")
|
||||
% (ip_addr,
|
||||
ip_addr),
|
||||
msg)
|
||||
|
||||
ip_addr = '1.1.1.1 has whitespace'
|
||||
msg = firewall._validate_ip_or_subnet_or_none(ip_addr)
|
||||
self.assertEqual(
|
||||
("'%s' is not a valid IP address and "
|
||||
"'%s' is not a valid IP subnet") % (ip_addr,
|
||||
ip_addr),
|
||||
msg)
|
||||
|
||||
ip_addr = '111.1.1.1\twhitespace'
|
||||
msg = firewall._validate_ip_or_subnet_or_none(ip_addr)
|
||||
self.assertEqual(
|
||||
("'%s' is not a valid IP address and "
|
||||
"'%s' is not a valid IP subnet") % (ip_addr,
|
||||
ip_addr),
|
||||
msg)
|
||||
|
||||
ip_addr = '111.1.1.1\nwhitespace'
|
||||
msg = firewall._validate_ip_or_subnet_or_none(ip_addr)
|
||||
self.assertEqual(
|
||||
("'%s' is not a valid IP address and "
|
||||
"'%s' is not a valid IP subnet") % (ip_addr,
|
||||
ip_addr),
|
||||
msg)
|
||||
|
||||
# Valid - IPv4
|
||||
cidr = "10.0.2.0/24"
|
||||
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Valid - IPv6 without final octets
|
||||
cidr = "fe80::/24"
|
||||
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Valid - IPv6 with final octets
|
||||
cidr = "fe80::0/24"
|
||||
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
cidr = "fe80::"
|
||||
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Invalid - IPv6 with final octets, missing mask
|
||||
cidr = "fe80::0"
|
||||
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
|
||||
self.assertIsNone(msg)
|
||||
|
||||
# Invalid - Address format error
|
||||
cidr = 'invalid'
|
||||
msg = firewall._validate_ip_or_subnet_or_none(cidr, None)
|
||||
self.assertEqual(
|
||||
("'%s' is not a valid IP address and "
|
||||
"'%s' is not a valid IP subnet") % (cidr,
|
||||
cidr),
|
||||
msg)
|
||||
|
||||
|
||||
class TestFirewallConvertProtocols(base.BaseTestCase):
|
||||
|
||||
def test_convert_protocol_string_integer(self):
|
||||
res = firewall.convert_protocol("0")
|
||||
self.assertEqual(0, res)
|
||||
res = firewall.convert_protocol("255")
|
||||
self.assertEqual(255, res)
|
||||
|
||||
def test_convert_protocol_digit(self):
|
||||
res = firewall.convert_protocol(0)
|
||||
self.assertEqual(0, res)
|
||||
res = firewall.convert_protocol(255)
|
||||
self.assertEqual(255, res)
|
||||
|
||||
def test_convert_protocol_another_types(self):
|
||||
res = lambda: firewall.convert_protocol(['abc'])
|
||||
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
|
||||
res = lambda: firewall.convert_protocol({1: 'foo'})
|
||||
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
|
||||
res = lambda: firewall.convert_protocol((1, 100))
|
||||
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
|
||||
res = lambda: firewall.convert_protocol(object)
|
||||
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
|
||||
|
||||
def test_convert_protocol_invalid_digit(self):
|
||||
res = lambda: firewall.convert_protocol("-1")
|
||||
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
|
||||
|
||||
res = lambda: firewall.convert_protocol("256")
|
||||
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
|
||||
|
||||
def test_convert_protocol_name(self):
|
||||
res = firewall.convert_protocol("tcp")
|
||||
self.assertEqual("tcp", res)
|
||||
|
||||
res = firewall.convert_protocol("UDP")
|
||||
self.assertEqual("udp", res)
|
||||
|
||||
res = firewall.convert_protocol("Icmp")
|
||||
self.assertEqual("icmp", res)
|
||||
|
||||
def test_convert_protocol_invalid_name(self):
|
||||
res = lambda: firewall.convert_protocol("foo")
|
||||
self.assertRaises(f_exc.FirewallRuleInvalidProtocol, res)
|
||||
|
||||
|
||||
class TestConvertActionToCaseInsensitive(base.BaseTestCase):
|
||||
def test_convert_action_to_case_insensitive_none(self):
|
||||
res = firewall.convert_action_to_case_insensitive(None)
|
||||
self.assertIsNone(res)
|
||||
|
||||
def test_convert_action_to_case_insensitive_value(self):
|
||||
res = firewall.convert_action_to_case_insensitive("foo")
|
||||
self.assertEqual("foo", res)
|
||||
|
||||
res = firewall.convert_action_to_case_insensitive("Bar")
|
||||
self.assertEqual("bar", res)
|
||||
|
||||
res = firewall.convert_action_to_case_insensitive("BAZ")
|
||||
self.assertEqual("baz", res)
|
|
@ -24,6 +24,8 @@ set -x
|
|||
install_cmd="pip install -c$1"
|
||||
shift
|
||||
|
||||
PARAMS="$*"
|
||||
|
||||
# The devstack based functional tests have neutron checked out in
|
||||
# $NEUTRON_DIR on the test systems - with the change to test in it.
|
||||
# Use this directory if it exists, so that this script installs the
|
||||
|
@ -66,5 +68,8 @@ else
|
|||
$install_cmd -U -e $SRC_DIR
|
||||
fi
|
||||
|
||||
$install_cmd -U $*
|
||||
if [ -n "${PARAMS}" ]; then
|
||||
$install_cmd -U ${PARAMS}
|
||||
fi
|
||||
|
||||
exit $?
|
||||
|
|
Loading…
Reference in New Issue