Code Sync from neutron project to newly created neutron-tempest-plugin

* The following commit sync the code from following hash:
  start_hash: 7279aa35851110a4933a10b58b2758a2bc3933a3
  end_hash: 6e911a49a9e630878f4c46f61fde3964be550880

Change-Id: I371aa4d5f043f695df04b98b0f485c8f0548f2b3
This commit is contained in:
Chandan Kumar 2017-11-15 19:41:01 +05:30
parent db9cc26f02
commit c125fd1479
41 changed files with 1114 additions and 362 deletions

View File

@ -0,0 +1,53 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from tempest.lib import decorators
import testtools
from neutron_tempest_plugin.api import base
from neutron_tempest_plugin import config
AZ_SUPPORTED_AGENTS = [constants.AGENT_TYPE_DHCP, constants.AGENT_TYPE_L3]
CONF = config.CONF
class AgentAvailabilityZoneTestCase(base.BaseAdminNetworkTest):
required_extensions = ['agent', 'availability_zone']
@classmethod
def resource_setup(cls):
super(AgentAvailabilityZoneTestCase, cls).resource_setup()
body = cls.admin_client.list_agents()
agents = body['agents']
agents_type = [agent.get('agent_type') for agent in agents]
for az_agent in AZ_SUPPORTED_AGENTS:
if az_agent in agents_type:
return
msg = 'availability_zone supported agent not found.'
raise cls.skipException(msg)
@decorators.idempotent_id('3ffa661e-cfcc-417d-8b63-1c5ec4a22e54')
@testtools.skipUnless(CONF.neutron_plugin_options.agent_availability_zone,
"Need a single availability_zone assumption.")
def test_agents_availability_zone(self):
"""
Test list agents availability_zone, only L3 and DHCP agent support
availability_zone, default availability_zone is "nova".
"""
body = self.admin_client.list_agents()
agents = body['agents']
for agent in agents:
if agent.get('agent_type') in AZ_SUPPORTED_AGENTS:
self.assertEqual(
CONF.neutron_plugin_options.agent_availability_zone,
agent.get('availability_zone'))

View File

@ -30,7 +30,7 @@ class ExternalNetworksRBACTestJSON(base.BaseAdminNetworkTest):
cls.client2 = cls.os_alt.network_client
def _create_network(self, external=True):
post_body = {'name': data_utils.rand_name('network-')}
post_body = {'name': data_utils.rand_name('network')}
if external:
post_body['router:external'] = external
body = self.admin_client.create_network(**post_body)
@ -49,7 +49,7 @@ class ExternalNetworksRBACTestJSON(base.BaseAdminNetworkTest):
networks_list = [n['id'] for n in body['networks']]
self.assertIn(net['id'], networks_list)
r = self.client2.create_router(
data_utils.rand_name('router-'),
data_utils.rand_name('router'),
external_gateway_info={'network_id': net['id']})['router']
self.addCleanup(self.admin_client.delete_router, r['id'])
@ -92,7 +92,7 @@ class ExternalNetworksRBACTestJSON(base.BaseAdminNetworkTest):
object_id=net_id, action='access_as_external',
target_tenant='*')['rbac_policies'][0]
r = self.client2.create_router(
data_utils.rand_name('router-'),
data_utils.rand_name('router'),
external_gateway_info={'network_id': net_id})['router']
self.addCleanup(self.admin_client.delete_router, r['id'])
# changing wildcard to specific tenant should be okay since its the
@ -139,7 +139,7 @@ class ExternalNetworksRBACTestJSON(base.BaseAdminNetworkTest):
action='access_as_external',
target_tenant=self.client2.tenant_id)
r = self.client2.create_router(
data_utils.rand_name('router-'),
data_utils.rand_name('router'),
external_gateway_info={'network_id': net['id']})['router']
self.addCleanup(self.admin_client.delete_router, r['id'])
@ -152,7 +152,7 @@ class ExternalNetworksRBACTestJSON(base.BaseAdminNetworkTest):
action='access_as_external',
target_tenant='*')['rbac_policy']
r = self.client2.create_router(
data_utils.rand_name('router-'),
data_utils.rand_name('router'),
external_gateway_info={'network_id': net['id']})['router']
# delete should fail because the wildcard is required for the tenant's
# access
@ -181,7 +181,7 @@ class ExternalNetworksRBACTestJSON(base.BaseAdminNetworkTest):
# there are no policies allowing it
with testtools.ExpectedException(lib_exc.NotFound):
self.client2.create_router(
data_utils.rand_name('router-'),
data_utils.rand_name('router'),
external_gateway_info={'network_id': net['id']})
@decorators.idempotent_id('7041cec7-d8fe-4c78-9b04-b51b2fd49dc9')

View File

@ -36,7 +36,7 @@ class FloatingIPAdminTestJSON(base.BaseAdminNetworkTest):
cls.alt_client = cls.os_alt.network_client
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
cls.router = cls.create_router(data_utils.rand_name('router-'),
cls.router = cls.create_router(data_utils.rand_name('router'),
external_network_id=cls.ext_net_id)
cls.create_router_interface(cls.router['id'], cls.subnet['id'])
cls.port = cls.create_port(cls.network)
@ -47,12 +47,7 @@ class FloatingIPAdminTestJSON(base.BaseAdminNetworkTest):
body = self.client.create_floatingip(
floating_network_id=self.ext_net_id)
floating_ip = body['floatingip']
test_project = data_utils.rand_name('test_project_')
test_description = data_utils.rand_name('desc_')
project = self.identity_admin_client.create_project(
name=test_project, description=test_description)['project']
project_id = project['id']
self.addCleanup(self.identity_admin_client.delete_project, project_id)
project_id = self.create_project()['id']
port = self.admin_client.create_port(network_id=self.network['id'],
project_id=project_id)

View File

@ -12,9 +12,9 @@
import testtools
from oslo_utils import uuidutils
from tempest.common import utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron_tempest_plugin.api import base
from neutron_tempest_plugin import config
@ -23,7 +23,7 @@ from neutron_tempest_plugin import config
class NetworksTestAdmin(base.BaseAdminNetworkTest):
@decorators.idempotent_id('d3c76044-d067-4cb0-ae47-8cdd875c7f67')
@test.requires_ext(extension="project-id", service="network")
@utils.requires_ext(extension="project-id", service="network")
def test_admin_create_network_keystone_v3(self):
project_id = self.client.tenant_id # non-admin
@ -41,7 +41,7 @@ class NetworksTestAdmin(base.BaseAdminNetworkTest):
self.assertEqual(project_id, lookup_net['tenant_id'])
@decorators.idempotent_id('8d21aaca-4364-4eb9-8b79-44b4fff6373b')
@test.requires_ext(extension="project-id", service="network")
@utils.requires_ext(extension="project-id", service="network")
def test_admin_create_network_keystone_v3_and_tenant(self):
project_id = self.client.tenant_id # non-admin
@ -59,7 +59,7 @@ class NetworksTestAdmin(base.BaseAdminNetworkTest):
self.assertEqual(project_id, lookup_net['tenant_id'])
@decorators.idempotent_id('08b92179-669d-45ee-8233-ef6611190809')
@test.requires_ext(extension="project-id", service="network")
@utils.requires_ext(extension="project-id", service="network")
def test_admin_create_network_keystone_v3_and_other_tenant(self):
project_id = self.client.tenant_id # non-admin
other_tenant = uuidutils.generate_uuid()
@ -76,7 +76,7 @@ class NetworksTestAdmin(base.BaseAdminNetworkTest):
@testtools.skipUnless("vxlan" in config.CONF.neutron_plugin_options.
available_type_drivers,
'VXLAN type_driver is not enabled')
@test.requires_ext(extension="provider", service="network")
@utils.requires_ext(extension="provider", service="network")
def test_create_tenant_network_vxlan(self):
network = self.admin_client.create_network(
**{"provider:network_type": "vxlan"})['network']

View File

@ -14,10 +14,9 @@
# under the License.
import six
from tempest.lib.common.utils import data_utils
from tempest.common import utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron_tempest_plugin.api import base
from neutron_tempest_plugin import config
@ -33,17 +32,6 @@ class QuotasTestBase(base.BaseAdminNetworkTest):
def resource_setup(cls):
super(QuotasTestBase, cls).resource_setup()
def _create_tenant(self):
# Add a tenant to conduct the test
test_tenant = data_utils.rand_name('test_tenant_')
test_description = data_utils.rand_name('desc_')
project = self.identity_admin_client.create_project(
name=test_tenant,
description=test_description)['project']
self.addCleanup(
self.identity_admin_client.delete_project, project['id'])
return project
def _setup_quotas(self, project_id, **new_quotas):
# Change quotas for tenant
quota_set = self.admin_client.update_quotas(project_id,
@ -96,7 +84,7 @@ class QuotasTest(QuotasTestBase):
@decorators.attr(type='gate')
@decorators.idempotent_id('2390f766-836d-40ef-9aeb-e810d78207fb')
def test_quotas(self):
tenant_id = self._create_tenant()['id']
tenant_id = self.create_project()['id']
new_quotas = {'network': 0, 'security_group': 0}
# Change quotas for tenant
@ -127,9 +115,9 @@ class QuotasTest(QuotasTestBase):
@decorators.idempotent_id('e974b5ba-090a-452c-a578-f9710151d9fc')
@decorators.attr(type='gate')
@test.requires_ext(extension="quota_details", service="network")
@utils.requires_ext(extension="quota_details", service="network")
def test_detail_quotas(self):
tenant_id = self._create_tenant()['id']
tenant_id = self.create_project()['id']
new_quotas = {'network': {'used': 1, 'limit': 2, 'reserved': 0},
'port': {'used': 1, 'limit': 2, 'reserved': 0}}

View File

@ -10,10 +10,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import utils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron_tempest_plugin.api.admin import test_quotas
from neutron_tempest_plugin import config
@ -26,7 +26,7 @@ class QuotasAdminNegativeTestJSON(test_quotas.QuotasTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('952f9b24-9156-4bdc-90f3-682a3d4302f0')
def test_create_network_when_quotas_is_full(self):
tenant_id = self._create_tenant()['id']
tenant_id = self.create_project()['id']
new_quotas = {'network': 1}
self._setup_quotas(tenant_id, **new_quotas)
@ -40,7 +40,7 @@ class QuotasAdminNegativeTestJSON(test_quotas.QuotasTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('0b7f99e3-9f77-45ce-9a89-b39a184de618')
def test_create_subnet_when_quotas_is_full(self):
tenant_id = self._create_tenant()['id']
tenant_id = self.create_project()['id']
new_quotas = {'subnet': 1}
self._setup_quotas(tenant_id, **new_quotas)
@ -62,7 +62,7 @@ class QuotasAdminNegativeTestJSON(test_quotas.QuotasTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('fe20d9f9-346c-4a20-bbfa-d9ca390f4dc6')
def test_create_port_when_quotas_is_full(self):
tenant_id = self._create_tenant()['id']
tenant_id = self.create_project()['id']
new_quotas = {'port': 1}
self._setup_quotas(tenant_id, **new_quotas)
@ -88,13 +88,13 @@ class QuotasAdminNegativeTestJSON(test_quotas.QuotasTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('bb1e9c3c-7e6f-41f1-b579-63dbc655ecb7')
@test.requires_ext(extension="router", service="network")
@utils.requires_ext(extension="router", service="network")
def test_create_router_when_quotas_is_full(self):
tenant_id = self._create_tenant()['id']
tenant_id = self.create_project()['id']
new_quotas = {'router': 1}
self._setup_quotas(tenant_id, **new_quotas)
name = data_utils.rand_name('test_router_')
name = data_utils.rand_name('test_router')
router_args = {'tenant_id': tenant_id}
router = self.admin_client.create_router(
name, True, **router_args)['router']
@ -106,9 +106,9 @@ class QuotasAdminNegativeTestJSON(test_quotas.QuotasTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('5c924ff7-b7a9-474f-92a3-dbe0f976ec13')
@test.requires_ext(extension="security-group", service="network")
@utils.requires_ext(extension="security-group", service="network")
def test_create_security_group_when_quotas_is_full(self):
tenant_id = self._create_tenant()['id']
tenant_id = self.create_project()['id']
sg_args = {'tenant_id': tenant_id}
# avoid a number that is made by default
sg_list = self.admin_client.list_security_groups(
@ -127,9 +127,9 @@ class QuotasAdminNegativeTestJSON(test_quotas.QuotasTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('b7143480-6118-4ed4-be38-1b6f15f30d05')
@test.requires_ext(extension="security-group", service="network")
@utils.requires_ext(extension="security-group", service="network")
def test_create_security_group_rule_when_quotas_is_full(self):
tenant_id = self._create_tenant()['id']
tenant_id = self.create_project()['id']
sg_args = {'tenant_id': tenant_id}
sg = self.admin_client.create_security_group(
@ -159,9 +159,9 @@ class QuotasAdminNegativeTestJSON(test_quotas.QuotasTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('d00fe5bb-9db8-4e1a-9c31-490f52897e6f')
@test.requires_ext(extension="router", service="network")
@utils.requires_ext(extension="router", service="network")
def test_create_floatingip_when_quotas_is_full(self):
tenant_id = self._create_tenant()['id']
tenant_id = self.create_project()['id']
new_quotas = {'floatingip': 1}
self._setup_quotas(tenant_id, **new_quotas)

View File

@ -0,0 +1,43 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib import decorators
from neutron_tempest_plugin.api import base_security_groups as base
class SecGroupAdminTest(base.BaseSecGroupTest):
required_extensions = ['security-group']
credentials = ['primary', 'admin']
@classmethod
def setup_clients(cls):
super(SecGroupAdminTest, cls).setup_clients()
cls.admin_client = cls.os_admin.network_client
cls.identity_admin_client = cls.os_admin.projects_client
@decorators.idempotent_id('44f1e1c4-af10-4aa0-972f-87c1c8fa25cc')
def test_security_group_recreated_on_port_update(self):
network = self.create_network()
self.create_subnet(network)
port = self.create_port(network, security_groups=[])
for sg in self.client.list_security_groups()['security_groups']:
if sg['name'] == 'default':
self.admin_client.delete_security_group(sg['id'])
self.update_port(port, name='update')
names = [
sg['name']
for sg in self.client.list_security_groups()['security_groups']
]
self.assertIn('default', names)

View File

@ -15,10 +15,10 @@
# under the License.
from oslo_utils import uuidutils
from tempest.common import utils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
import testtools
from neutron_tempest_plugin.api import base
@ -186,7 +186,7 @@ class RBACSharedNetworksTest(base.BaseAdminNetworkTest):
def _make_admin_net_and_subnet_shared_to_tenant_id(self, tenant_id):
net = self.admin_client.create_network(
name=data_utils.rand_name('test-network-'))['network']
name=data_utils.rand_name('test-network'))['network']
self.addCleanup(self.admin_client.delete_network, net['id'])
subnet = self.create_subnet(net, client=self.admin_client)
# network is shared to first unprivileged client by default
@ -384,13 +384,13 @@ class RBACSharedNetworksTest(base.BaseAdminNetworkTest):
action='access_as_shared', target_tenant=self.client.tenant_id)
@decorators.idempotent_id('c5f8f785-ce8d-4430-af7e-a236205862fb')
@test.requires_ext(extension="quotas", service="network")
@utils.requires_ext(extension="quotas", service="network")
def test_rbac_policy_quota(self):
quota = self.client.show_quotas(self.client.tenant_id)['quota']
max_policies = quota['rbac_policy']
self.assertGreater(max_policies, 0)
net = self.client.create_network(
name=data_utils.rand_name('test-network-'))['network']
name=data_utils.rand_name('test-network'))['network']
self.addCleanup(self.client.delete_network, net['id'])
with testtools.ExpectedException(lib_exc.Conflict):
for i in range(0, max_policies + 1):
@ -426,7 +426,7 @@ class RBACSharedNetworksTest(base.BaseAdminNetworkTest):
target_tenant=self.client2.tenant_id)
self.client.delete_port(port['id'])
@test.requires_ext(extension="standard-attr-revisions", service="network")
@utils.requires_ext(extension="standard-attr-revisions", service="network")
@decorators.idempotent_id('86c3529b-1231-40de-1234-89664291a4cb')
def test_rbac_bumps_network_revision(self):
resp = self._make_admin_net_and_subnet_shared_to_tenant_id(

View File

@ -10,11 +10,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron_tempest_plugin.api import base
from neutron_tempest_plugin import config
class TagTestJSON(base.BaseAdminNetworkTest):
@ -100,7 +101,7 @@ class TagSubnetTestJSON(TagTestJSON):
@decorators.attr(type='smoke')
@decorators.idempotent_id('2805aabf-a94c-4e70-a0b2-9814f06beb03')
@test.requires_ext(extension="tag-ext", service="network")
@utils.requires_ext(extension="tag-ext", service="network")
def test_subnet_tags(self):
self._test_tag_operations()
@ -116,7 +117,7 @@ class TagPortTestJSON(TagTestJSON):
@decorators.attr(type='smoke')
@decorators.idempotent_id('c7c44f2c-edb0-4ebd-a386-d37cec155c34')
@test.requires_ext(extension="tag-ext", service="network")
@utils.requires_ext(extension="tag-ext", service="network")
def test_port_tags(self):
self._test_tag_operations()
@ -125,6 +126,7 @@ class TagSubnetPoolTestJSON(TagTestJSON):
resource = 'subnetpools'
@classmethod
@utils.requires_ext(extension="subnet_allocation", service="network")
def _create_resource(cls):
subnetpool = cls.create_subnetpool('subnetpool', default_prefixlen=24,
prefixes=['10.0.0.0/8'])
@ -132,7 +134,7 @@ class TagSubnetPoolTestJSON(TagTestJSON):
@decorators.attr(type='smoke')
@decorators.idempotent_id('bdc1c24b-c0b5-4835-953c-8f67dc11edfe')
@test.requires_ext(extension="tag-ext", service="network")
@utils.requires_ext(extension="tag-ext", service="network")
def test_subnetpool_tags(self):
self._test_tag_operations()
@ -141,18 +143,90 @@ class TagRouterTestJSON(TagTestJSON):
resource = 'routers'
@classmethod
@utils.requires_ext(extension="router", service="network")
def _create_resource(cls):
router = cls.create_router(router_name='test')
return router['id']
@decorators.attr(type='smoke')
@decorators.idempotent_id('b898ff92-dc33-4232-8ab9-2c6158c80d28')
@test.requires_ext(extension="router", service="network")
@test.requires_ext(extension="tag-ext", service="network")
@utils.requires_ext(extension="tag-ext", service="network")
def test_router_tags(self):
self._test_tag_operations()
class TagSecGroupTestJSON(TagTestJSON):
resource = 'security-groups'
@classmethod
@utils.requires_ext(extension="security-group", service="network")
def _create_resource(cls):
sec_group = cls.create_security_group(name='test')
return sec_group['id']
@decorators.attr(type='smoke')
@decorators.idempotent_id('0f1a78eb-c5be-42cf-919d-2ce3621a51c2')
@utils.requires_ext(extension="standard-attr-tag", service="network")
def test_security_group_tags(self):
self._test_tag_operations()
class TagFloatingIpTestJSON(TagTestJSON):
resource = 'floatingips'
@classmethod
@utils.requires_ext(extension="router", service="network")
def _create_resource(cls):
cls.ext_net_id = config.CONF.network.public_network_id
floatingip = cls.create_floatingip(cls.ext_net_id)
return floatingip['id']
@decorators.attr(type='smoke')
@decorators.idempotent_id('53f6c2bf-e272-4e9e-b9a9-b165eb7be807')
@utils.requires_ext(extension="standard-attr-tag", service="network")
def test_floatingip_tags(self):
self._test_tag_operations()
class TagQosPolicyTestJSON(TagTestJSON):
resource = 'policies'
@classmethod
@utils.requires_ext(extension="qos", service="network")
def _create_resource(cls):
qos_policy = cls.create_qos_policy(name='test-policy', shared=True)
return qos_policy['id']
@decorators.attr(type='smoke')
@decorators.idempotent_id('e9bac15e-c8bc-4317-8295-4bf1d8d522b8')
@utils.requires_ext(extension="standard-attr-tag", service="network")
def test_qos_policy_tags(self):
self._test_tag_operations()
class TagTrunkTestJSON(TagTestJSON):
resource = 'trunks'
@classmethod
@utils.requires_ext(extension="trunk", service="network")
def _create_resource(cls):
network = cls.create_network()
parent_port = cls.create_port(network)
trunk = cls.client.create_trunk(parent_port['id'], None)
return trunk['trunk']['id']
@classmethod
def resource_cleanup(cls):
cls.client.delete_trunk(cls.res_id)
super(TagTrunkTestJSON, cls).resource_cleanup()
@decorators.attr(type='smoke')
@decorators.idempotent_id('4c63708b-c4c3-407c-8101-7a9593882f5f')
@utils.requires_ext(extension="standard-attr-tag", service="network")
def test_trunk_tags(self):
self._test_tag_operations()
class TagFilterTestJSON(base.BaseAdminNetworkTest):
credentials = ['primary', 'alt', 'admin']
required_extensions = ['tag']
@ -161,18 +235,16 @@ class TagFilterTestJSON(base.BaseAdminNetworkTest):
def resource_setup(cls):
super(TagFilterTestJSON, cls).resource_setup()
res1_id = cls._create_resource('tag-res1')
res2_id = cls._create_resource('tag-res2')
res3_id = cls._create_resource('tag-res3')
res4_id = cls._create_resource('tag-res4')
# tag-res5: a resource without tags
cls._create_resource('tag-res5')
cls.res_ids = []
for i in range(5):
cls.res_ids.append(cls._create_resource())
cls.client.update_tags(cls.resource, res1_id, ['red'])
cls.client.update_tags(cls.resource, res2_id, ['red', 'blue'])
cls.client.update_tags(cls.resource, res3_id,
cls.client.update_tags(cls.resource, cls.res_ids[0], ['red'])
cls.client.update_tags(cls.resource, cls.res_ids[1], ['red', 'blue'])
cls.client.update_tags(cls.resource, cls.res_ids[2],
['red', 'blue', 'green'])
cls.client.update_tags(cls.resource, res4_id, ['green'])
cls.client.update_tags(cls.resource, cls.res_ids[3], ['green'])
# 5th resource: no tags
@classmethod
def setup_clients(cls):
@ -180,57 +252,58 @@ class TagFilterTestJSON(base.BaseAdminNetworkTest):
cls.client = cls.os_alt.network_client
def _assertEqualResources(self, expected, res):
actual = [n['name'] for n in res if n['name'].startswith('tag-res')]
expected = [self.res_ids[i] for i in expected]
actual = [n['id'] for n in res if n['id'] in self.res_ids]
self.assertEqual(set(expected), set(actual))
def _test_filter_tags(self):
# tags single
filters = {'tags': 'red'}
res = self._list_resource(filters)
self._assertEqualResources(['tag-res1', 'tag-res2', 'tag-res3'], res)
self._assertEqualResources([0, 1, 2], res)
# tags multi
filters = {'tags': 'red,blue'}
res = self._list_resource(filters)
self._assertEqualResources(['tag-res2', 'tag-res3'], res)
self._assertEqualResources([1, 2], res)
# tags-any single
filters = {'tags-any': 'blue'}
res = self._list_resource(filters)
self._assertEqualResources(['tag-res2', 'tag-res3'], res)
self._assertEqualResources([1, 2], res)
# tags-any multi
filters = {'tags-any': 'red,blue'}
res = self._list_resource(filters)
self._assertEqualResources(['tag-res1', 'tag-res2', 'tag-res3'], res)
self._assertEqualResources([0, 1, 2], res)
# not-tags single
filters = {'not-tags': 'red'}
res = self._list_resource(filters)
self._assertEqualResources(['tag-res4', 'tag-res5'], res)
self._assertEqualResources([3, 4], res)
# not-tags multi
filters = {'not-tags': 'red,blue'}
res = self._list_resource(filters)
self._assertEqualResources(['tag-res1', 'tag-res4', 'tag-res5'], res)
self._assertEqualResources([0, 3, 4], res)
# not-tags-any single
filters = {'not-tags-any': 'blue'}
res = self._list_resource(filters)
self._assertEqualResources(['tag-res1', 'tag-res4', 'tag-res5'], res)
self._assertEqualResources([0, 3, 4], res)
# not-tags-any multi
filters = {'not-tags-any': 'red,blue'}
res = self._list_resource(filters)
self._assertEqualResources(['tag-res4', 'tag-res5'], res)
self._assertEqualResources([3, 4], res)
class TagFilterNetworkTestJSON(TagFilterTestJSON):
resource = 'networks'
@classmethod
def _create_resource(cls, name):
res = cls.create_network(network_name=name)
def _create_resource(cls):
res = cls.create_network()
return res['id']
def _list_resource(self, filters):
@ -247,9 +320,9 @@ class TagFilterSubnetTestJSON(TagFilterTestJSON):
resource = 'subnets'
@classmethod
def _create_resource(cls, name):
def _create_resource(cls):
network = cls.create_network()
res = cls.create_subnet(network, name=name)
res = cls.create_subnet(network)
return res['id']
def _list_resource(self, filters):
@ -258,7 +331,7 @@ class TagFilterSubnetTestJSON(TagFilterTestJSON):
@decorators.attr(type='smoke')
@decorators.idempotent_id('dd8f9ba7-bcf6-496f-bead-714bd3daac10')
@test.requires_ext(extension="tag-ext", service="network")
@utils.requires_ext(extension="tag-ext", service="network")
def test_filter_subnet_tags(self):
self._test_filter_tags()
@ -267,9 +340,9 @@ class TagFilterPortTestJSON(TagFilterTestJSON):
resource = 'ports'
@classmethod
def _create_resource(cls, name):
def _create_resource(cls):
network = cls.create_network()
res = cls.create_port(network, name=name)
res = cls.create_port(network)
return res['id']
def _list_resource(self, filters):
@ -278,7 +351,7 @@ class TagFilterPortTestJSON(TagFilterTestJSON):
@decorators.attr(type='smoke')
@decorators.idempotent_id('09c036b8-c8d0-4bee-b776-7f4601512898')
@test.requires_ext(extension="tag-ext", service="network")
@utils.requires_ext(extension="tag-ext", service="network")
def test_filter_port_tags(self):
self._test_filter_tags()
@ -287,8 +360,9 @@ class TagFilterSubnetpoolTestJSON(TagFilterTestJSON):
resource = 'subnetpools'
@classmethod
def _create_resource(cls, name):
res = cls.create_subnetpool(name, default_prefixlen=24,
@utils.requires_ext(extension="subnet_allocation", service="network")
def _create_resource(cls):
res = cls.create_subnetpool('subnetpool', default_prefixlen=24,
prefixes=['10.0.0.0/8'])
return res['id']
@ -298,7 +372,7 @@ class TagFilterSubnetpoolTestJSON(TagFilterTestJSON):
@decorators.attr(type='smoke')
@decorators.idempotent_id('16ae7ad2-55c2-4821-9195-bfd04ab245b7')
@test.requires_ext(extension="tag-ext", service="network")
@utils.requires_ext(extension="tag-ext", service="network")
def test_filter_subnetpool_tags(self):
self._test_filter_tags()
@ -307,8 +381,9 @@ class TagFilterRouterTestJSON(TagFilterTestJSON):
resource = 'routers'
@classmethod
def _create_resource(cls, name):
res = cls.create_router(router_name=name)
@utils.requires_ext(extension="router", service="network")
def _create_resource(cls):
res = cls.create_router(router_name='test')
return res['id']
def _list_resource(self, filters):
@ -317,11 +392,101 @@ class TagFilterRouterTestJSON(TagFilterTestJSON):
@decorators.attr(type='smoke')
@decorators.idempotent_id('cdd3f3ea-073d-4435-a6cb-826a4064193d')
@test.requires_ext(extension="tag-ext", service="network")
@utils.requires_ext(extension="tag-ext", service="network")
def test_filter_router_tags(self):
self._test_filter_tags()
class TagFilterSecGroupTestJSON(TagFilterTestJSON):
resource = 'security-groups'
@classmethod
@utils.requires_ext(extension="security-group", service="network")
def _create_resource(cls):
sec_group = cls.create_security_group(name='test')
return sec_group['id']
def _list_resource(self, filters):
res = self.client.list_security_groups(**filters)
resource_key = self.resource.replace('-', '_')
return res[resource_key]
@decorators.attr(type='smoke')
@decorators.idempotent_id('d4d1d681-0116-4800-9725-16cb88f8171a')
@utils.requires_ext(extension="standard-attr-tag", service="network")
def test_filter_security_group_tags(self):
self._test_filter_tags()
class TagFilterFloatingIpTestJSON(TagFilterTestJSON):
resource = 'floatingips'
@classmethod
@utils.requires_ext(extension="router", service="network")
def _create_resource(cls):
cls.ext_net_id = config.CONF.network.public_network_id
floatingip = cls.create_floatingip(cls.ext_net_id)
return floatingip['id']
def _list_resource(self, filters):
res = self.client.list_floatingips(**filters)
return res[self.resource]
@decorators.attr(type='smoke')
@decorators.idempotent_id('01f00afc-dbec-432a-bfee-2a1f0510e7a8')
@utils.requires_ext(extension="standard-attr-tag", service="network")
def test_filter_floatingip_tags(self):
self._test_filter_tags()
class TagFilterQosPolicyTestJSON(TagFilterTestJSON):
resource = 'policies'
@classmethod
@utils.requires_ext(extension="qos", service="network")
def _create_resource(cls):
qos_policy = cls.create_qos_policy(name='test-policy', shared=True)
return qos_policy['id']
def _list_resource(self, filters):
res = self.client.list_qos_policies(**filters)
return res[self.resource]
@decorators.attr(type='smoke')
@decorators.idempotent_id('c2f9a6ae-2529-4cb9-a44b-b16f8ba27832')
@utils.requires_ext(extension="standard-attr-tag", service="network")
def test_filter_qos_policy_tags(self):
self._test_filter_tags()
class TagFilterTrunkTestJSON(TagFilterTestJSON):
resource = 'trunks'
@classmethod
@utils.requires_ext(extension="trunk", service="network")
def _create_resource(cls):
network = cls.create_network()
parent_port = cls.create_port(network)
trunk = cls.client.create_trunk(parent_port['id'], None)
return trunk['trunk']['id']
@classmethod
def resource_cleanup(cls):
for res_id in cls.res_ids:
cls.client.delete_trunk(res_id)
super(TagFilterTrunkTestJSON, cls).resource_cleanup()
def _list_resource(self, filters):
res = self.client.list_trunks(**filters)
return res[self.resource]
@decorators.attr(type='smoke')
@decorators.idempotent_id('3fb3ca3a-8e3a-4565-ba73-16413d445e25')
@utils.requires_ext(extension="standard-attr-tag", service="network")
def test_filter_trunk_tags(self):
self._test_filter_tags()
class UpdateTagsTest(base.BaseAdminNetworkTest):
required_extensions = ['tag']

View File

@ -17,6 +17,8 @@ import functools
import math
import netaddr
from neutron_lib import constants as const
from tempest.common import utils as tutils
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
from tempest import test
@ -77,7 +79,7 @@ class BaseNetworkTest(test.BaseTestCase):
if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
raise cls.skipException("IPv6 Tests are disabled.")
for req_ext in getattr(cls, 'required_extensions', []):
if not test.is_extension_enabled(req_ext, 'network'):
if not tutils.is_extension_enabled(req_ext, 'network'):
msg = "%s extension not enabled." % req_ext
raise cls.skipException(msg)
@ -115,6 +117,7 @@ class BaseNetworkTest(test.BaseTestCase):
cls.subnetpools = []
cls.admin_subnetpools = []
cls.security_groups = []
cls.projects = []
@classmethod
def resource_cleanup(cls):
@ -191,6 +194,11 @@ class BaseNetworkTest(test.BaseTestCase):
cls.admin_client.delete_address_scope,
address_scope['id'])
for project in cls.projects:
cls._try_delete_resource(
cls.identity_admin_client.delete_project,
project['id'])
# Clean up QoS rules
for qos_rule in cls.qos_rules:
cls._try_delete_resource(cls.admin_client.delete_qos_rule,
@ -395,7 +403,7 @@ class BaseNetworkTest(test.BaseTestCase):
@classmethod
def create_qos_bandwidth_limit_rule(cls, policy_id, max_kbps,
max_burst_kbps,
direction=constants.EGRESS_DIRECTION):
direction=const.EGRESS_DIRECTION):
"""Wrapper utility that returns a test QoS bandwidth limit rule."""
body = cls.admin_client.create_bandwidth_limit_rule(
policy_id, max_kbps, max_burst_kbps, direction)
@ -406,7 +414,8 @@ class BaseNetworkTest(test.BaseTestCase):
@classmethod
def delete_router(cls, router):
body = cls.client.list_router_interfaces(router['id'])
interfaces = body['ports']
interfaces = [port for port in body['ports']
if port['device_owner'] in const.ROUTER_INTERFACE_OWNERS]
for i in interfaces:
try:
cls.client.remove_router_interface_with_subnet_id(
@ -435,6 +444,22 @@ class BaseNetworkTest(test.BaseTestCase):
cls.subnetpools.append(body['subnetpool'])
return body['subnetpool']
@classmethod
def create_project(cls, name=None, description=None):
test_project = name or data_utils.rand_name('test_project_')
test_description = description or data_utils.rand_name('desc_')
project = cls.identity_admin_client.create_project(
name=test_project,
description=test_description)['project']
cls.projects.append(project)
return project
@classmethod
def create_security_group(cls, name, **kwargs):
body = cls.client.create_security_group(name=name, **kwargs)
cls.security_groups.append(body['security_group'])
return body['security_group']
class BaseAdminNetworkTest(BaseNetworkTest):
@ -542,7 +567,7 @@ def require_qos_rule_type(rule_type):
def _require_sorting(f):
@functools.wraps(f)
def inner(self, *args, **kwargs):
if not test.is_extension_enabled("sorting", "network"):
if not tutils.is_extension_enabled("sorting", "network"):
self.skipTest('Sorting feature is required')
return f(self, *args, **kwargs)
return inner
@ -551,7 +576,7 @@ def _require_sorting(f):
def _require_pagination(f):
@functools.wraps(f)
def inner(self, *args, **kwargs):
if not test.is_extension_enabled("pagination", "network"):
if not tutils.is_extension_enabled("pagination", "network"):
self.skipTest('Pagination feature is required')
return f(self, *args, **kwargs)
return inner

View File

@ -13,11 +13,25 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from tempest.lib.common.utils import data_utils
from neutron_tempest_plugin.api import base
V4_PROTOCOL_NAMES = set(key for key in constants.IP_PROTOCOL_MAP if
'v6' not in key)
V4_PROTOCOL_INTS = set(v for k, v in constants.IP_PROTOCOL_MAP.items()
if 'v6' not in k)
V6_PROTOCOL_LEGACY = set([constants.PROTO_NAME_IPV6_ICMP_LEGACY])
V6_PROTOCOL_NAMES = (
set(key for key in constants.IP_PROTOCOL_MAP if 'v6' in key) -
V6_PROTOCOL_LEGACY
)
V6_PROTOCOL_INTS = set(v for k, v in constants.IP_PROTOCOL_MAP.items() if
'v6' in k)
class BaseSecGroupTest(base.BaseNetworkTest):
def _create_security_group(self, **kwargs):
@ -39,3 +53,41 @@ class BaseSecGroupTest(base.BaseNetworkTest):
for secgroup in list_body['security_groups']:
secgroup_list.append(secgroup['id'])
self.assertNotIn(secgroup_id, secgroup_list)
def _create_security_group_rule(self, **kwargs):
rule_create_body = self.client.create_security_group_rule(**kwargs)
# List rules and verify created rule is in response
rule_list_body = (
self.client.list_security_group_rules())
rule_list = [rule['id']
for rule in rule_list_body['security_group_rules']]
self.assertIn(rule_create_body['security_group_rule']['id'],
rule_list)
self.addCleanup(self._delete_security_group_rule,
rule_create_body['security_group_rule']['id'])
return rule_create_body
def _show_security_group_rule(self, **kwargs):
show_rule_body = self.client.show_security_group_rule(kwargs['id'])
for key, value in kwargs.items():
self.assertEqual(value,
show_rule_body['security_group_rule'][key],
"%s does not match." % key)
def _delete_security_group_rule(self, secgroup_rule_id):
self.client.delete_security_group_rule(secgroup_rule_id)
rule_list_body = self.client.list_security_group_rules()
rule_list = [rule['id']
for rule in rule_list_body['security_group_rules']]
self.assertNotIn(secgroup_rule_id, rule_list)
def _test_create_show_delete_security_group_rule(self, **kwargs):
# The security group rule is deleted by the cleanup call in
# _create_security_group_rule.
rule_create_body = (
self._create_security_group_rule(**kwargs)['security_group_rule'])
self._show_security_group_rule(
id=rule_create_body['id'],
protocol=rule_create_body['protocol'],
direction=rule_create_body['direction'],
ethertype=rule_create_body['ethertype'])

View File

@ -12,10 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import utils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron_tempest_plugin.api import base
@ -78,7 +78,7 @@ class AddressScopeTest(AddressScopeTestBase):
self.assertFalse(returned_address_scope['shared'])
@decorators.idempotent_id('bbd57364-6d57-48e4-b0f1-8b9a998f5e06')
@test.requires_ext(extension="project-id", service="network")
@utils.requires_ext(extension="project-id", service="network")
def test_show_address_scope_project_id(self):
address_scope = self._create_address_scope(ip_version=4)
body = self.client.show_address_scope(address_scope['id'])

View File

@ -33,6 +33,7 @@ class NetworksTestDHCPv6(base.BaseNetworkTest):
@classmethod
def skip_checks(cls):
super(NetworksTestDHCPv6, cls).skip_checks()
msg = None
if not CONF.network_feature_enabled.ipv6:
msg = "IPv6 is not enabled"

View File

@ -14,9 +14,9 @@
# under the License.
import ddt
from tempest.common import utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron_tempest_plugin.api import base
from neutron_tempest_plugin.api import base_security_groups as base_security
@ -30,7 +30,7 @@ class PortSecTest(base_security.BaseSecGroupTest,
base.BaseNetworkTest):
@decorators.idempotent_id('7c338ddf-e64e-4118-bd33-e49a1f2f1495')
@test.requires_ext(extension='port-security', service='network')
@utils.requires_ext(extension='port-security', service='network')
def test_port_sec_default_value(self):
# Default port-sec value is True, and the attr of the port will inherit
# from the port-sec of the network when it not be specified in API
@ -41,7 +41,7 @@ class PortSecTest(base_security.BaseSecGroupTest,
self.assertTrue(port['port_security_enabled'])
@decorators.idempotent_id('e60eafd2-31de-4c38-8106-55447d033b57')
@test.requires_ext(extension='port-security', service='network')
@utils.requires_ext(extension='port-security', service='network')
@ddt.unpack
@ddt.data({'port_sec_net': False, 'port_sec_port': True, 'expected': True},
{'port_sec_net': True, 'port_sec_port': False,
@ -55,7 +55,7 @@ class PortSecTest(base_security.BaseSecGroupTest,
self.assertEqual(port['port_security_enabled'], expected)
@decorators.idempotent_id('fe7c27b9-f320-4daf-b977-b1547c43daf6')
@test.requires_ext(extension='port-security', service='network')
@utils.requires_ext(extension='port-security', service='network')
def test_create_port_sec_with_security_group(self):
network = self.create_network(port_security_enabled=True)
self.create_subnet(network)
@ -71,7 +71,7 @@ class PortSecTest(base_security.BaseSecGroupTest,
@decorators.attr(type='negative')
@decorators.idempotent_id('ff11226c-a5ff-4ad4-8480-0840e36e47a9')
@test.requires_ext(extension='port-security', service='network')
@utils.requires_ext(extension='port-security', service='network')
def test_port_sec_update_port_failed(self):
network = self.create_network()
self.create_subnet(network)
@ -98,7 +98,7 @@ class PortSecTest(base_security.BaseSecGroupTest,
self.update_port(port, security_groups=[])
@decorators.idempotent_id('05642059-1bfc-4581-9bc9-aaa5db08dd60')
@test.requires_ext(extension='port-security', service='network')
@utils.requires_ext(extension='port-security', service='network')
def test_port_sec_update_pass(self):
network = self.create_network()
self.create_subnet(network)
@ -122,7 +122,7 @@ class PortSecTest(base_security.BaseSecGroupTest,
self.assertFalse(port['port_security_enabled'])
@decorators.idempotent_id('2df6114b-b8c3-48a1-96e8-47f08159d35c')
@test.requires_ext(extension='port-security', service='network')
@utils.requires_ext(extension='port-security', service='network')
def test_delete_with_port_sec(self):
network = self.create_network(port_security_enabled=True)
port = self.create_port(network=network,
@ -135,9 +135,9 @@ class PortSecTest(base_security.BaseSecGroupTest,
@decorators.attr(type='negative')
@decorators.idempotent_id('ed93e453-3f8d-495e-8e7e-b0e268c2ebd9')
@test.requires_ext(extension='port-security', service='network')
@test.requires_ext(extension='allowed-address-pairs', service='network')
def test_allow_address_pairs(self):
@utils.requires_ext(extension='port-security', service='network')
@utils.requires_ext(extension='allowed-address-pairs', service='network')
def test_allowed_address_pairs(self):
network = self.create_network()
self.create_subnet(network)
port = self.create_port(network=network, port_security_enabled=False)

View File

@ -10,8 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import utils
from tempest.lib import decorators
from tempest import test
from neutron_tempest_plugin.api import base
@ -22,7 +22,7 @@ class ExtensionsTest(base.BaseNetworkTest):
body = self.client.list_extensions()
extensions = {ext_['alias'] for ext_ in body['extensions']}
self.assertNotEmpty(extensions, "Extension list returned is empty")
ext_enabled = test.is_extension_enabled(ext, "network")
ext_enabled = utils.is_extension_enabled(ext, "network")
if ext_enabled:
self.assertIn(ext, extensions)
else:

View File

@ -13,9 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import utils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest import test
from neutron_tempest_plugin.api import base
from neutron_tempest_plugin import config
@ -35,7 +35,7 @@ class FloatingIPTestJSON(base.BaseNetworkTest):
# Create network, subnet, router and add interface
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
cls.router = cls.create_router(data_utils.rand_name('router-'),
cls.router = cls.create_router(data_utils.rand_name('router'),
external_network_id=cls.ext_net_id)
cls.create_router_interface(cls.router['id'], cls.subnet['id'])
cls.port = list()
@ -58,7 +58,7 @@ class FloatingIPTestJSON(base.BaseNetworkTest):
self.assertFalse(body['port_id'])
@decorators.idempotent_id('c72c1c0c-2193-4aca-eeee-b1442641ffff')
@test.requires_ext(extension="standard-attr-description",
@utils.requires_ext(extension="standard-attr-description",
service="network")
def test_create_update_floatingip_description(self):
body = self.client.create_floatingip(
@ -80,7 +80,7 @@ class FloatingIPTestJSON(base.BaseNetworkTest):
self.assertEqual('d2', body['floatingip']['description'])
@decorators.idempotent_id('fd7161e1-2167-4686-a6ff-0f3df08001bb')
@test.requires_ext(extension="standard-attr-description",
@utils.requires_ext(extension="standard-attr-description",
service="network")
def test_floatingip_update_extra_attributes_port_id_not_changed(self):
port_id = self.ports[1]['id']

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import metering as metering_apidef
from neutron_lib.db import constants as db_const
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
@ -31,7 +32,7 @@ class MeteringTestJSON(base.BaseAdminNetworkTest):
List, Show, Create, Delete Metering labels rules
"""
required_extensions = ['metering']
required_extensions = [metering_apidef.ALIAS]
@classmethod
def resource_setup(cls):
@ -72,7 +73,7 @@ class MeteringTestJSON(base.BaseAdminNetworkTest):
@decorators.idempotent_id('ec8e15ff-95d0-433b-b8a6-b466bddb1e50')
def test_create_delete_metering_label_with_filters(self):
# Creates a label
name = data_utils.rand_name('metering-label-')
name = data_utils.rand_name('metering-label')
description = "label created by tempest"
body = self.admin_client.create_metering_label(name=name,
description=description)

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import metering as metering_apidef
from neutron_lib.db import constants as db_const
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
@ -23,7 +24,7 @@ LONG_NAME_NG = 'x' * (db_const.NAME_FIELD_SIZE + 1)
class MeteringNegativeTestJSON(base.BaseAdminNetworkTest):
required_extensions = ['metering']
required_extensions = [metering_apidef.ALIAS]
@decorators.attr(type='negative')
@decorators.idempotent_id('8b3f7c84-9d37-4771-8681-bfd2c07f3c2d')

View File

@ -15,11 +15,11 @@
import netaddr
from tempest.common import utils
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron_tempest_plugin.api import base
from neutron_tempest_plugin import config
@ -50,7 +50,7 @@ class NetworksIpAvailabilityTest(base.BaseAdminNetworkTest):
"""
@classmethod
@test.requires_ext(extension="network-ip-availability", service="network")
@utils.requires_ext(extension="network-ip-availability", service="network")
def skip_checks(cls):
super(NetworksIpAvailabilityTest, cls).skip_checks()
@ -111,7 +111,7 @@ class NetworksIpAvailabilityIPv4Test(NetworksIpAvailabilityTest):
@decorators.idempotent_id('0f33cc8c-1bf6-47d1-9ce1-010618240599')
def test_admin_network_availability_before_subnet(self):
net_name = data_utils.rand_name('network-')
net_name = data_utils.rand_name('network')
network = self.create_network(network_name=net_name)
self.addCleanup(self.client.delete_network, network['id'])
net_availability = self.admin_client.list_network_ip_availabilities()
@ -119,7 +119,7 @@ class NetworksIpAvailabilityIPv4Test(NetworksIpAvailabilityTest):
@decorators.idempotent_id('3aecd3b2-16ed-4b87-a54a-91d7b3c2986b')
def test_net_ip_availability_after_subnet_and_ports(self):
net_name = data_utils.rand_name('network-')
net_name = data_utils.rand_name('network')
network = self.create_network(network_name=net_name)
self.addCleanup(self.client.delete_network, network['id'])
subnet, prefix = self._create_subnet(network, self._ip_version)
@ -138,7 +138,7 @@ class NetworksIpAvailabilityIPv4Test(NetworksIpAvailabilityTest):
@decorators.idempotent_id('9f11254d-757b-492e-b14b-f52144e4ee7b')
def test_net_ip_availability_after_port_delete(self):
net_name = data_utils.rand_name('network-')
net_name = data_utils.rand_name('network')
network = self.create_network(network_name=net_name)
self.addCleanup(self.client.delete_network, network['id'])
subnet, prefix = self._create_subnet(network, self._ip_version)

View File

@ -13,8 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import utils
from tempest.lib import decorators
from tempest import test
import testtools
from neutron_tempest_plugin.api import base
@ -45,17 +45,17 @@ class NetworksTestJSON(base.BaseNetworkTest):
body = self.client.show_network(self.network['id'])
network = body['network']
fields = ['id', 'name']
if test.is_extension_enabled('net-mtu', 'network'):
if utils.is_extension_enabled('net-mtu', 'network'):
fields.append('mtu')
for key in fields:
self.assertEqual(network[key], self.network[key])
project_id = self.client.tenant_id
self.assertEqual(project_id, network['tenant_id'])
if test.is_extension_enabled('project-id', 'network'):
if utils.is_extension_enabled('project-id', 'network'):
self.assertEqual(project_id, network['project_id'])
@decorators.idempotent_id('26f2b7a5-2cd1-4f3a-b11f-ad259b099b11')
@test.requires_ext(extension="project-id", service="network")
@utils.requires_ext(extension="project-id", service="network")
def test_show_network_fields_keystone_v3(self):
def _check_show_network_fields(fields, expect_project_id,
@ -74,7 +74,7 @@ class NetworksTestJSON(base.BaseNetworkTest):
_check_show_network_fields(['project_id', 'tenant_id'], True, True)
@decorators.idempotent_id('0cc0552f-afaf-4231-b7a7-c2a1774616da')
@test.requires_ext(extension="project-id", service="network")
@utils.requires_ext(extension="project-id", service="network")
def test_create_network_keystone_v3(self):
project_id = self.client.tenant_id
@ -95,7 +95,7 @@ class NetworksTestJSON(base.BaseNetworkTest):
self.assertEqual(project_id, new_net['tenant_id'])
@decorators.idempotent_id('94e2a44c-3367-4253-8c2a-22deaf59e96c')
@test.requires_ext(extension="dns-integration",
@utils.requires_ext(extension="dns-integration",
service="network")
def test_create_update_network_dns_domain(self):
domain1 = 'test.org.'
@ -111,7 +111,7 @@ class NetworksTestJSON(base.BaseNetworkTest):
self.assertEqual(domain2, body['dns_domain'])
@decorators.idempotent_id('a23186b9-aa6f-4b08-b877-35ca3b9cd54c')
@test.requires_ext(extension="project-id", service="network")
@utils.requires_ext(extension="project-id", service="network")
def test_list_networks_fields_keystone_v3(self):
def _check_list_networks_fields(fields, expect_project_id,
expect_tenant_id):

View File

@ -13,8 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import utils
from tempest.lib import decorators
from tempest import test
from neutron_tempest_plugin.api import base
@ -44,7 +44,7 @@ class PortsTestJSON(base.BaseNetworkTest):
self.assertEqual(ip, dns_assignment['ip_address'])
@decorators.idempotent_id('c72c1c0c-2193-4aca-bbb4-b1442640bbbb')
@test.requires_ext(extension="standard-attr-description",
@utils.requires_ext(extension="standard-attr-description",
service="network")
def test_create_update_port_description(self):
body = self.create_port(self.network,
@ -59,7 +59,7 @@ class PortsTestJSON(base.BaseNetworkTest):
self.assertEqual('d2', body['description'])
@decorators.idempotent_id('539fbefe-fb36-48aa-9a53-8c5fbd44e492')
@test.requires_ext(extension="dns-integration",
@utils.requires_ext(extension="dns-integration",
service="network")
def test_create_update_port_with_dns_name(self):
# NOTE(manjeets) dns_domain is set to openstackgate.local
@ -80,7 +80,7 @@ class PortsTestJSON(base.BaseNetworkTest):
self._confirm_dns_assignment(body)
@decorators.idempotent_id('435e89df-a8bb-4b41-801a-9f20d362d777')
@test.requires_ext(extension="dns-integration",
@utils.requires_ext(extension="dns-integration",
service="network")
def test_create_update_port_with_no_dns_name(self):
self.create_subnet(self.network)
@ -92,7 +92,7 @@ class PortsTestJSON(base.BaseNetworkTest):
self._confirm_dns_assignment(port_body['port'])
@decorators.idempotent_id('dfe8cc79-18d9-4ae8-acef-3ec6bb719aa7')
@test.requires_ext(extension="dns-domain-ports",
@utils.requires_ext(extension="dns-domain-ports",
service="network")
def test_create_update_port_with_dns_domain(self):
self.create_subnet(self.network)

View File

@ -12,16 +12,17 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import qos as qos_apidef
from neutron_lib.services.qos import constants as qos_consts
from tempest.common import utils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import test
import testscenarios
import testtools
from neutron_tempest_plugin.api import base
from neutron_tempest_plugin.common import qos_consts
load_tests = testscenarios.load_tests_apply_scenarios
@ -29,7 +30,7 @@ load_tests = testscenarios.load_tests_apply_scenarios
class QosTestJSON(base.BaseAdminNetworkTest):
required_extensions = ['qos']
required_extensions = [qos_apidef.ALIAS]
@staticmethod
def _get_driver_details(rule_type_details, driver_name):
@ -37,17 +38,6 @@ class QosTestJSON(base.BaseAdminNetworkTest):
if driver['name'] == driver_name:
return driver
def _create_project(self):
# Add a project to conduct the test
test_project = data_utils.rand_name('test_project_')
test_description = data_utils.rand_name('desc_')
project = self.identity_admin_client.create_project(
name=test_project,
description=test_description)['project']
self.addCleanup(
self.identity_admin_client.delete_project, project['id'])
return project
@decorators.idempotent_id('108fbdf7-3463-4e47-9871-d07f3dcf5bbb')
def test_create_policy(self):
policy = self.create_qos_policy(name='test-policy',
@ -67,7 +57,7 @@ class QosTestJSON(base.BaseAdminNetworkTest):
self.assertIn(policy['id'], policies_ids)
@decorators.idempotent_id('606a48e2-5403-4052-b40f-4d54b855af76')
@test.requires_ext(extension="project-id", service="network")
@utils.requires_ext(extension="project-id", service="network")
def test_show_policy_has_project_id(self):
policy = self.create_qos_policy(name='test-policy', shared=False)
body = self.admin_client.show_qos_policy(policy['id'])
@ -371,7 +361,7 @@ class QosTestJSON(base.BaseAdminNetworkTest):
@decorators.idempotent_id('18d94f22-b9d5-4390-af12-d30a0cfc4cd3')
def test_default_policy_creating_network_without_policy(self):
project_id = self._create_project()['id']
project_id = self.create_project()['id']
policy = self.create_qos_policy(name='test-policy',
tenant_id=project_id,
is_default=True)
@ -383,7 +373,7 @@ class QosTestJSON(base.BaseAdminNetworkTest):
@decorators.idempotent_id('807cce45-38e5-482d-94db-36e1796aba73')
def test_default_policy_creating_network_with_policy(self):
project_id = self._create_project()['id']
project_id = self.create_project()['id']
self.create_qos_policy(name='test-policy',
tenant_id=project_id,
is_default=True)
@ -400,7 +390,7 @@ class QosTestJSON(base.BaseAdminNetworkTest):
class QosBandwidthLimitRuleTestJSON(base.BaseAdminNetworkTest):
direction = None
required_extensions = ['qos']
required_extensions = [qos_apidef.ALIAS]
@classmethod
@base.require_qos_rule_type(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT)
@ -598,7 +588,7 @@ class RbacSharedQosPoliciesTest(base.BaseAdminNetworkTest):
force_tenant_isolation = True
credentials = ['primary', 'alt', 'admin']
required_extensions = ['qos']
required_extensions = [qos_apidef.ALIAS]
@classmethod
def resource_setup(cls):
@ -852,7 +842,7 @@ class QosDscpMarkingRuleTestJSON(base.BaseAdminNetworkTest):
VALID_DSCP_MARK1 = 56
VALID_DSCP_MARK2 = 48
required_extensions = ['qos']
required_extensions = [qos_apidef.ALIAS]
@classmethod
@base.require_qos_rule_type(qos_consts.RULE_TYPE_DSCP_MARKING)
@ -986,7 +976,7 @@ class QosMinimumBandwidthRuleTestJSON(base.BaseAdminNetworkTest):
DIRECTION_INGRESS = "ingress"
RULE_NAME = qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH + "_rule"
RULES_NAME = RULE_NAME + "s"
required_extensions = ['qos']
required_extensions = [qos_apidef.ALIAS]
@classmethod
@base.require_qos_rule_type(qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH)
@ -1148,7 +1138,7 @@ class QosSearchCriteriaTest(base.BaseSearchCriteriaTest,
list_kwargs = {'description': 'search-criteria-test'}
list_as_admin = True
required_extensions = ['qos']
required_extensions = [qos_apidef.ALIAS]
@classmethod
def resource_setup(cls):

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import qos as qos_apidef
from neutron_lib.db import constants as db_const
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
@ -23,7 +24,7 @@ LONG_TENANT_ID_NG = 'z' * (db_const.PROJECT_ID_FIELD_SIZE + 1)
class QosNegativeTestJSON(base.BaseAdminNetworkTest):
required_extensions = ['qos']
required_extensions = [qos_apidef.ALIAS]
@decorators.attr(type='negative')
@decorators.idempotent_id('b9dce555-d3b3-11e5-950a-54ee757c77da')

View File

@ -12,9 +12,9 @@
import netaddr
from tempest.common import utils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import test
from neutron_tempest_plugin.api import base
from neutron_tempest_plugin.api import base_security_groups as bsg
@ -109,7 +109,7 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
updated['network']['revision_number'])
@decorators.idempotent_id('6c256f71-c929-4200-b3dc-4e1843506be5')
@test.requires_ext(extension="security-group", service="network")
@utils.requires_ext(extension="security-group", service="network")
def test_update_sg_group_bumps_revision(self):
sg, name = self._create_security_group()
self.assertIn('revision_number', sg['security_group'])
@ -119,7 +119,7 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
sg['security_group']['revision_number'])
@decorators.idempotent_id('6489632f-8550-4453-a674-c98849742967')
@test.requires_ext(extension="security-group", service="network")
@utils.requires_ext(extension="security-group", service="network")
def test_update_port_sg_binding_bumps_revision(self):
net = self.create_network()
self.addCleanup(self.client.delete_network, net['id'])
@ -136,7 +136,7 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
updated['port']['revision_number'])
@decorators.idempotent_id('29c7ab2b-d1d8-425d-8cec-fcf632960f22')
@test.requires_ext(extension="security-group", service="network")
@utils.requires_ext(extension="security-group", service="network")
def test_update_sg_rule_bumps_sg_revision(self):
sg, name = self._create_security_group()
rule = self.client.create_security_group_rule(
@ -153,7 +153,7 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
updated['security_group']['revision_number'])
@decorators.idempotent_id('db70c285-0365-4fac-9f55-2a0ad8cf55a8')
@test.requires_ext(extension="allowed-address-pairs", service="network")
@utils.requires_ext(extension="allowed-address-pairs", service="network")
def test_update_allowed_address_pairs_bumps_revision(self):
net = self.create_network()
self.addCleanup(self.client.delete_network, net['id'])
@ -169,7 +169,7 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
updated['port']['revision_number'])
@decorators.idempotent_id('a21ec3b4-3569-4b77-bf29-4177edaa2df5')
@test.requires_ext(extension="extra_dhcp_opt", service="network")
@utils.requires_ext(extension="extra_dhcp_opt", service="network")
def test_update_extra_dhcp_opt_bumps_revision(self):
net = self.create_network()
self.addCleanup(self.client.delete_network, net['id'])
@ -186,7 +186,7 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
updated['port']['revision_number'])
@decorators.idempotent_id('40ba648f-f374-4c29-a5b7-489dd5a38a4e')
@test.requires_ext(extension="dns-integration", service="network")
@utils.requires_ext(extension="dns-integration", service="network")
def test_update_dns_domain_bumps_revision(self):
net = self.create_network(dns_domain='example.test.')
self.addCleanup(self.client.delete_network, net['id'])
@ -205,8 +205,8 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
updated['port']['revision_number'])
@decorators.idempotent_id('8482324f-cf59-4d73-b98e-d37119255300')
@test.requires_ext(extension="router", service="network")
@test.requires_ext(extension="extraroute", service="network")
@utils.requires_ext(extension="router", service="network")
@utils.requires_ext(extension="extraroute", service="network")
def test_update_router_extra_routes_bumps_revision(self):
net = self.create_network()
self.addCleanup(self.client.delete_network, net['id'])
@ -230,7 +230,7 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
updated['router']['revision_number'])
@decorators.idempotent_id('6bd18702-e25a-4b4b-8c0c-680113533511')
@test.requires_ext(extension="subnet-service-types", service="network")
@utils.requires_ext(extension="subnet-service-types", service="network")
def test_update_subnet_service_types_bumps_revisions(self):
net = self.create_network()
self.addCleanup(self.client.delete_network, net['id'])
@ -246,7 +246,7 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
updated['subnet']['revision_number'])
@decorators.idempotent_id('9c83105c-9973-45ff-9ca2-e66d64700abe')
@test.requires_ext(extension="port-security", service="network")
@utils.requires_ext(extension="port-security", service="network")
def test_update_port_security_bumps_revisions(self):
net = self.create_network(port_security_enabled=False)
self.addCleanup(self.client.delete_network, net['id'])
@ -270,7 +270,7 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
updated['port']['revision_number'])
@decorators.idempotent_id('68d5ac3a-11a1-4847-8e2e-5843c043d89b')
@test.requires_ext(extension="binding", service="network")
@utils.requires_ext(extension="binding", service="network")
def test_portbinding_bumps_revision(self):
net = self.create_network()
self.addCleanup(self.client.delete_network, net['id'])
@ -284,7 +284,7 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
port['revision_number'])
@decorators.idempotent_id('4a37bde9-1975-47e0-9b8c-2c9ca36415b0')
@test.requires_ext(extension="router", service="network")
@utils.requires_ext(extension="router", service="network")
def test_update_router_bumps_revision(self):
net = self.create_network()
self.addCleanup(self.client.delete_network, net['id'])
@ -307,8 +307,8 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
router['revision_number'])
@decorators.idempotent_id('9de71ebc-f5df-4cd0-80bc-60299fce3ce9')
@test.requires_ext(extension="router", service="network")
@test.requires_ext(extension="standard-attr-description",
@utils.requires_ext(extension="router", service="network")
@utils.requires_ext(extension="standard-attr-description",
service="network")
def test_update_floatingip_bumps_revision(self):
ext_id = config.CONF.network.public_network_id
@ -339,8 +339,8 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
self.client.update_floatingip(b2['floatingip']['id'], port_id=None)
@decorators.idempotent_id('afb6486c-41b5-483e-a500-3c506f4deb49')
@test.requires_ext(extension="router", service="network")
@test.requires_ext(extension="l3-ha", service="network")
@utils.requires_ext(extension="router", service="network")
@utils.requires_ext(extension="l3-ha", service="network")
def test_update_router_extra_attributes_bumps_revision(self):
# updates from CVR to CVR-HA are supported on every release,
# but only the admin can forcibly create a non-HA router
@ -360,10 +360,10 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
router['revision_number'])
@decorators.idempotent_id('90743b00-b0e2-40e4-9524-1c884fe3ef23')
@test.requires_ext(extension="external-net", service="network")
@test.requires_ext(extension="auto-allocated-topology", service="network")
@test.requires_ext(extension="subnet_allocation", service="network")
@test.requires_ext(extension="router", service="network")
@utils.requires_ext(extension="external-net", service="network")
@utils.requires_ext(extension="auto-allocated-topology", service="network")
@utils.requires_ext(extension="subnet_allocation", service="network")
@utils.requires_ext(extension="router", service="network")
def test_update_external_network_bumps_revision(self):
net = self.create_network()
self.addCleanup(self.client.delete_network, net['id'])
@ -374,7 +374,7 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
net['revision_number'])
@decorators.idempotent_id('5af6450a-0f61-49c3-b628-38db77c7b856')
@test.requires_ext(extension="qos", service="network")
@utils.requires_ext(extension="qos", service="network")
def test_update_qos_port_policy_binding_bumps_revision(self):
policy = self.create_qos_policy(name='port-policy', shared=False)
net = self.create_network()
@ -387,7 +387,7 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
port['revision_number'])
@decorators.idempotent_id('817da343-c6e4-445c-9519-a621f124dfbe')
@test.requires_ext(extension="qos", service="network")
@utils.requires_ext(extension="qos", service="network")
def test_update_qos_network_policy_binding_bumps_revision(self):
policy = self.create_qos_policy(name='network-policy', shared=False)
network = self.create_network()

View File

@ -14,9 +14,9 @@
# under the License.
import netaddr
from tempest.common import utils as tutils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest import test
from neutron_tempest_plugin.api import base
from neutron_tempest_plugin.api import base_routers
@ -39,7 +39,7 @@ class RoutersTest(base_routers.BaseRouterTest):
config.safe_get_config_value('network', 'project_network_v6_cidr'))
@decorators.idempotent_id('c72c1c0c-2193-4aca-eeee-b1442640eeee')
@test.requires_ext(extension="standard-attr-description",
@tutils.requires_ext(extension="standard-attr-description",
service="network")
def test_create_update_router_description(self):
body = self.create_router(description='d1', router_name='test')
@ -52,7 +52,7 @@ class RoutersTest(base_routers.BaseRouterTest):
self.assertEqual('d2', body['description'])
@decorators.idempotent_id('847257cc-6afd-4154-b8fb-af49f5670ce8')
@test.requires_ext(extension='ext-gw-mode', service='network')
@tutils.requires_ext(extension='ext-gw-mode', service='network')
def test_create_router_with_default_snat_value(self):
# Create a router with default snat rule
name = data_utils.rand_name('router')
@ -63,7 +63,7 @@ class RoutersTest(base_routers.BaseRouterTest):
'enable_snat': True})
@decorators.idempotent_id('ea74068d-09e9-4fd7-8995-9b6a1ace920f')
@test.requires_ext(extension='ext-gw-mode', service='network')
@tutils.requires_ext(extension='ext-gw-mode', service='network')
def test_create_router_with_snat_explicit(self):
name = data_utils.rand_name('snat-router')
# Create a router enabling snat attributes
@ -100,14 +100,15 @@ class RoutersTest(base_routers.BaseRouterTest):
self.assertGreaterEqual(len(fixed_ips), 1)
public_net_body = self.admin_client.show_network(
CONF.network.public_network_id)
public_subnet_id = public_net_body['network']['subnets'][0]
self.assertIn(public_subnet_id,
[x['subnet_id'] for x in fixed_ips])
public_subnet_ids = public_net_body['network']['subnets']
for fixed_ip in fixed_ips:
self.assertIn(fixed_ip['subnet_id'],
public_subnet_ids)
@decorators.idempotent_id('b386c111-3b21-466d-880c-5e72b01e1a33')
@test.requires_ext(extension='ext-gw-mode', service='network')
@tutils.requires_ext(extension='ext-gw-mode', service='network')
def test_update_router_set_gateway_with_snat_explicit(self):
router = self._create_router(data_utils.rand_name('router-'))
router = self._create_router(data_utils.rand_name('router'))
self.admin_client.update_router_with_snat_gw_info(
router['id'],
external_gateway_info={
@ -120,9 +121,9 @@ class RoutersTest(base_routers.BaseRouterTest):
self._verify_gateway_port(router['id'])
@decorators.idempotent_id('96536bc7-8262-4fb2-9967-5c46940fa279')
@test.requires_ext(extension='ext-gw-mode', service='network')
@tutils.requires_ext(extension='ext-gw-mode', service='network')
def test_update_router_set_gateway_without_snat(self):
router = self._create_router(data_utils.rand_name('router-'))
router = self._create_router(data_utils.rand_name('router'))
self.admin_client.update_router_with_snat_gw_info(
router['id'],
external_gateway_info={
@ -135,10 +136,10 @@ class RoutersTest(base_routers.BaseRouterTest):
self._verify_gateway_port(router['id'])
@decorators.idempotent_id('f2faf994-97f4-410b-a831-9bc977b64374')
@test.requires_ext(extension='ext-gw-mode', service='network')
@tutils.requires_ext(extension='ext-gw-mode', service='network')
def test_update_router_reset_gateway_without_snat(self):
router = self._create_router(
data_utils.rand_name('router-'),
data_utils.rand_name('router'),
external_network_id=CONF.network.public_network_id)
self.admin_client.update_router_with_snat_gw_info(
router['id'],
@ -156,14 +157,14 @@ class RoutersTest(base_routers.BaseRouterTest):
network = self.create_network()
subnet = self.create_subnet(network)
# Add router interface with subnet id
router = self._create_router(data_utils.rand_name('router-'), True)
router = self._create_router(data_utils.rand_name('router'), True)
intf = self.create_router_interface(router['id'], subnet['id'])
status_active = lambda: self.client.show_port(
intf['port_id'])['port']['status'] == 'ACTIVE'
utils.wait_until_true(status_active, exception=AssertionError)
@decorators.idempotent_id('c86ac3a8-50bd-4b00-a6b8-62af84a0765c')
@test.requires_ext(extension='extraroute', service='network')
@tutils.requires_ext(extension='extraroute', service='network')
def test_update_extra_route(self):
self.network = self.create_network()
self.name = self.network['name']

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
@ -34,7 +35,7 @@ class SecGroupTest(base.BaseSecGroupTest):
secgroup_list.append(secgroup['id'])
self.assertIn(group_create_body['security_group']['id'], secgroup_list)
# Update the security group
new_name = data_utils.rand_name('security-')
new_name = data_utils.rand_name('security')
new_description = data_utils.rand_name('security-description')
update_body = self.client.update_security_group(
group_create_body['security_group']['id'],
@ -64,3 +65,60 @@ class SecGroupTest(base.BaseSecGroupTest):
secgrp['id'])
self.assertIn(secgrp['name'], sec_nm)
self.assertIsNotNone(secgrp['id'])
class SecGroupProtocolTest(base.BaseSecGroupTest):
@decorators.idempotent_id('282e3681-aa6e-42a7-b05c-c341aa1e3cdf')
def test_create_show_delete_security_group_rule_names(self):
group_create_body, _ = self._create_security_group()
for protocol in base.V4_PROTOCOL_NAMES:
self._test_create_show_delete_security_group_rule(
security_group_id=group_create_body['security_group']['id'],
protocol=protocol,
direction=constants.INGRESS_DIRECTION,
ethertype=self.ethertype)
@decorators.idempotent_id('66e47f1f-20b6-4417-8839-3cc671c7afa3')
def test_create_show_delete_security_group_rule_integers(self):
group_create_body, _ = self._create_security_group()
for protocol in base.V4_PROTOCOL_INTS:
self._test_create_show_delete_security_group_rule(
security_group_id=group_create_body['security_group']['id'],
protocol=protocol,
direction=constants.INGRESS_DIRECTION,
ethertype=self.ethertype)
class SecGroupProtocolIPv6Test(SecGroupProtocolTest):
_ip_version = constants.IP_VERSION_6
@decorators.idempotent_id('1f7cc9f5-e0d5-487c-8384-3d74060ab530')
def test_create_security_group_rule_with_ipv6_protocol_names(self):
group_create_body, _ = self._create_security_group()
for protocol in base.V6_PROTOCOL_NAMES:
self._test_create_show_delete_security_group_rule(
security_group_id=group_create_body['security_group']['id'],
protocol=protocol,
direction=constants.INGRESS_DIRECTION,
ethertype=self.ethertype)
@decorators.idempotent_id('c7d17b41-3b4e-4add-bb3b-6af59baaaffa')
def test_create_security_group_rule_with_ipv6_protocol_legacy_names(self):
group_create_body, _ = self._create_security_group()
for protocol in base.V6_PROTOCOL_LEGACY:
self._test_create_show_delete_security_group_rule(
security_group_id=group_create_body['security_group']['id'],
protocol=protocol,
direction=constants.INGRESS_DIRECTION,
ethertype=self.ethertype)
@decorators.idempotent_id('bcfce0b7-bc96-40ae-9b08-3f6774ee0260')
def test_create_security_group_rule_with_ipv6_protocol_integers(self):
group_create_body, _ = self._create_security_group()
for protocol in base.V6_PROTOCOL_INTS:
self._test_create_show_delete_security_group_rule(
security_group_id=group_create_body['security_group']['id'],
protocol=protocol,
direction=constants.INGRESS_DIRECTION,
ethertype=self.ethertype)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from neutron_lib.db import constants as db_const
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
@ -26,6 +27,11 @@ class NegativeSecGroupTest(base.BaseSecGroupTest):
required_extensions = ['security-group']
@classmethod
def resource_setup(cls):
super(NegativeSecGroupTest, cls).resource_setup()
cls.network = cls.create_network()
@decorators.attr(type='negative')
@decorators.idempotent_id('594edfa8-9a5b-438e-9344-49aece337d49')
def test_create_security_group_with_too_long_name(self):
@ -66,6 +72,47 @@ class NegativeSecGroupTest(base.BaseSecGroupTest):
self.client.update_security_group,
sg['id'], name=True)
@decorators.attr(type='negative')
@decorators.idempotent_id('3200b1a8-d73b-48e9-b03f-e891a4abe2d3')
def test_delete_in_use_sec_group(self):
sgroup = self.os_primary.network_client.create_security_group(
name='sgroup')
self.security_groups.append(sgroup['security_group'])
port = self.client.create_port(
network_id=self.network['id'],
security_groups=[sgroup['security_group']['id']])
self.ports.append(port['port'])
self.assertRaises(lib_exc.Conflict,
self.os_primary.network_client.delete_security_group,
security_group_id=sgroup['security_group']['id'])
class NegativeSecGroupIPv6Test(NegativeSecGroupTest):
_ip_version = 6
_ip_version = constants.IP_VERSION_6
class NegativeSecGroupProtocolTest(base.BaseSecGroupTest):
def _test_create_security_group_rule_with_bad_protocols(self, protocols):
group_create_body, _ = self._create_security_group()
# bad protocols can include v6 protocols because self.ethertype is v4
for protocol in protocols:
self.assertRaises(
lib_exc.BadRequest,
self.client.create_security_group_rule,
security_group_id=group_create_body['security_group']['id'],
protocol=protocol, direction=constants.INGRESS_DIRECTION,
ethertype=self.ethertype)
@decorators.attr(type=['negative'])
@decorators.idempotent_id('cccbb0f3-c273-43ed-b3fc-1efc48833810')
def test_create_security_group_rule_with_ipv6_protocol_names(self):
self._test_create_security_group_rule_with_bad_protocols(
base.V6_PROTOCOL_NAMES)
@decorators.attr(type=['negative'])
@decorators.idempotent_id('8aa636bd-7060-4fdf-b722-cdae28e2f1ef')
def test_create_security_group_rule_with_ipv6_protocol_integers(self):
self._test_create_security_group_rule_with_bad_protocols(
base.V6_PROTOCOL_INTS)

View File

@ -13,9 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import utils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest import test
from neutron_tempest_plugin.api import base
@ -105,7 +105,7 @@ class SubnetPoolsTest(SubnetPoolsTestBase):
"Created subnetpool name should be in the list")
@decorators.idempotent_id('c72c1c0c-2193-4aca-ddd4-b1442640bbbb')
@test.requires_ext(extension="standard-attr-description",
@utils.requires_ext(extension="standard-attr-description",
service="network")
def test_create_update_subnetpool_description(self):
body = self._create_subnetpool(description='d1')
@ -135,7 +135,7 @@ class SubnetPoolsTest(SubnetPoolsTestBase):
self.assertFalse(subnetpool['shared'])
@decorators.idempotent_id('5bf9f1e2-efc8-4195-acf3-d12b2bd68dd3')
@test.requires_ext(extension="project-id", service="network")
@utils.requires_ext(extension="project-id", service="network")
def test_show_subnetpool_has_project_id(self):
subnetpool = self._create_subnetpool()
body = self.client.show_subnetpool(subnetpool['id'])
@ -259,7 +259,7 @@ class SubnetPoolsTest(SubnetPoolsTestBase):
self.assertTrue(cidr.endswith(str(self.max_prefixlen)))
@decorators.idempotent_id('49b44c64-1619-4b29-b527-ffc3c3115dc4')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_create_subnetpool_associate_address_scope(self):
address_scope = self.create_address_scope(
name=data_utils.rand_name('smoke-address-scope'),
@ -271,7 +271,7 @@ class SubnetPoolsTest(SubnetPoolsTestBase):
body['subnetpool']['address_scope_id'])
@decorators.idempotent_id('910b6393-db24-4f6f-87dc-b36892ad6c8c')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_update_subnetpool_associate_address_scope(self):
address_scope = self.create_address_scope(
name=data_utils.rand_name('smoke-address-scope'),
@ -287,7 +287,7 @@ class SubnetPoolsTest(SubnetPoolsTestBase):
body['subnetpool']['address_scope_id'])
@decorators.idempotent_id('18302e80-46a3-4563-82ac-ccd1dd57f652')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_update_subnetpool_associate_another_address_scope(self):
address_scope = self.create_address_scope(
name=data_utils.rand_name('smoke-address-scope'),
@ -308,7 +308,7 @@ class SubnetPoolsTest(SubnetPoolsTestBase):
body['subnetpool']['address_scope_id'])
@decorators.idempotent_id('f8970048-e41b-42d6-934b-a1297b07706a')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_update_subnetpool_disassociate_address_scope(self):
address_scope = self.create_address_scope(
name=data_utils.rand_name('smoke-address-scope'),
@ -325,7 +325,7 @@ class SubnetPoolsTest(SubnetPoolsTestBase):
self.assertIsNone(body['subnetpool']['address_scope_id'])
@decorators.idempotent_id('4c6963c2-f54c-4347-b288-75d18421c4c4')
@test.requires_ext(extension='default-subnetpools', service='network')
@utils.requires_ext(extension='default-subnetpools', service='network')
def test_tenant_create_non_default_subnetpool(self):
"""
Test creates a subnetpool, the "is_default" attribute is False.
@ -334,69 +334,6 @@ class SubnetPoolsTest(SubnetPoolsTestBase):
self.assertFalse(created_subnetpool['is_default'])
class DefaultSubnetPoolsTest(SubnetPoolsTestBase):
def setUp(self):
self.addCleanup(self.resource_cleanup)
super(DefaultSubnetPoolsTest, self).setUp()
@classmethod
def resource_setup(cls):
super(DefaultSubnetPoolsTest, cls).resource_setup()
body = cls.admin_client.list_subnetpools()
subnetpools = body['subnetpools']
for subnetpool in subnetpools:
if subnetpool.get('is_default'):
msg = 'Default subnetpool already exists. Only one is allowed.'
raise cls.skipException(msg)
@decorators.idempotent_id('cb839106-6184-4332-b292-5d07c074de4f')
@test.requires_ext(extension='default-subnetpools', service='network')
def test_admin_create_default_subnetpool(self):
"""
Test uses administrative credentials to create a default subnetpool,
using the is_default=True.
"""
created_subnetpool = self._create_subnetpool(is_admin=True,
is_default=True)
self.assertTrue(created_subnetpool['is_default'])
@decorators.idempotent_id('9e79730c-29b6-44a4-9504-bf3c7cedc56c')
@test.requires_ext(extension='default-subnetpools', service='network')
def test_convert_subnetpool_to_default_subnetpool(self):
"""
Test creates a subnetpool, which is non default subnetpool.
Then it will update to a default subnetpool, by setting "is_default"
attribute to True.
"""
created_subnetpool = self._create_subnetpool()
subnetpool_id = created_subnetpool['id']
self.assertFalse(created_subnetpool['is_default'])
subnetpool_data = {'is_default': True}
self.admin_client.update_subnetpool(subnetpool_id,
**subnetpool_data)
show_body = self.client.show_subnetpool(subnetpool_id)
self.assertTrue(show_body['subnetpool']['is_default'])
@decorators.idempotent_id('39687561-7a37-47b8-91ce-f9143ae26969')
@test.requires_ext(extension='default-subnetpools', service='network')
def test_convert_default_subnetpool_to_non_default(self):
"""
Test uses administrative credentials to create a default subnetpool,
using the is_default=True.
Then it will update "is_default" attribute to False.
"""
created_subnetpool = self._create_subnetpool(is_admin=True,
is_default=True)
subnetpool_id = created_subnetpool['id']
self.assertTrue(created_subnetpool['is_default'])
subnetpool_data = {'is_default': False}
self.admin_client.update_subnetpool(subnetpool_id,
**subnetpool_data)
show_body = self.admin_client.show_subnetpool(subnetpool_id)
self.assertFalse(show_body['subnetpool']['is_default'])
class SubnetPoolsTestV6(SubnetPoolsTest):
min_prefixlen = '48'

View File

@ -15,10 +15,10 @@
import netaddr
from oslo_utils import uuidutils
from tempest.common import utils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron_tempest_plugin.api import test_subnetpools
@ -61,7 +61,7 @@ class SubnetPoolsNegativeTestJSON(test_subnetpools.SubnetPoolsTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('6ae09d8f-95be-40ed-b1cf-8b850d45bab5')
@test.requires_ext(extension='default-subnetpools', service='network')
@utils.requires_ext(extension='default-subnetpools', service='network')
def test_tenant_create_default_subnetpool(self):
# 'default' subnetpool can only be created by admin.
self.assertRaises(lib_exc.Forbidden, self._create_subnetpool,
@ -113,21 +113,21 @@ class SubnetPoolsNegativeTestJSON(test_subnetpools.SubnetPoolsTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('9589e332-638e-476e-81bd-013d964aa3cb')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_create_subnetpool_associate_invalid_address_scope(self):
self.assertRaises(lib_exc.BadRequest, self._create_subnetpool,
address_scope_id='foo-addr-scope')
@decorators.attr(type='negative')
@decorators.idempotent_id('3b6c5942-485d-4964-a560-55608af020b5')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_create_subnetpool_associate_non_exist_address_scope(self):
self.assertRaises(lib_exc.NotFound, self._create_subnetpool,
address_scope_id=uuidutils.generate_uuid())
@decorators.attr(type='negative')
@decorators.idempotent_id('2dfb4269-8657-485a-a053-b022e911456e')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_create_subnetpool_associate_address_scope_prefix_intersect(self):
address_scope = self.create_address_scope(
name=data_utils.rand_name('smoke-address-scope'),
@ -143,7 +143,7 @@ class SubnetPoolsNegativeTestJSON(test_subnetpools.SubnetPoolsTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('83a19a13-5384-42e2-b579-43fc69c80914')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_create_sp_associate_address_scope_multiple_prefix_intersect(self):
address_scope = self.create_address_scope(
name=data_utils.rand_name('smoke-address-scope'),
@ -161,7 +161,7 @@ class SubnetPoolsNegativeTestJSON(test_subnetpools.SubnetPoolsTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('f06d8e7b-908b-4e94-b570-8156be6a4bf1')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_create_subnetpool_associate_address_scope_of_other_owner(self):
address_scope = self.create_address_scope(
name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
@ -171,7 +171,7 @@ class SubnetPoolsNegativeTestJSON(test_subnetpools.SubnetPoolsTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('3396ec6c-cb80-4ebe-b897-84e904580bdf')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_tenant_create_subnetpool_associate_shared_address_scope(self):
address_scope = self.create_address_scope(
name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
@ -181,7 +181,7 @@ class SubnetPoolsNegativeTestJSON(test_subnetpools.SubnetPoolsTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('6d3d9ad5-32d4-4d63-aa00-8c62f73e2881')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_update_subnetpool_associate_address_scope_of_other_owner(self):
address_scope = self.create_address_scope(
name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
@ -214,7 +214,7 @@ class SubnetPoolsNegativeTestJSON(test_subnetpools.SubnetPoolsTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('96006292-7214-40e0-a471-153fb76e6b31')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_update_subnetpool_prefix_intersect(self):
pool_1_prefix = [u'20.0.0.0/18']
pool_2_prefix = [u'20.10.0.0/24']
@ -224,7 +224,7 @@ class SubnetPoolsNegativeTestJSON(test_subnetpools.SubnetPoolsTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('4d3f8a79-c530-4e59-9acf-6c05968adbfe')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_update_subnetpool_multiple_prefix_intersect(self):
pool_1_prefixes = [u'20.0.0.0/18', u'30.0.0.0/18']
pool_2_prefixes = [u'20.10.0.0/24', u'40.0.0.0/18', '50.0.0.0/18']
@ -235,7 +235,7 @@ class SubnetPoolsNegativeTestJSON(test_subnetpools.SubnetPoolsTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('7438e49e-1351-45d8-937b-892059fb97f5')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_tenant_update_sp_prefix_associated_with_shared_addr_scope(self):
address_scope = self.create_address_scope(
name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
@ -266,7 +266,7 @@ class SubnetPoolsNegativeTestJSON(test_subnetpools.SubnetPoolsTestBase):
@decorators.attr(type='negative')
@decorators.idempotent_id('648fee7d-a909-4ced-bad3-3a169444c0a8')
@test.requires_ext(extension='address-scope', service='network')
@utils.requires_ext(extension='address-scope', service='network')
def test_update_subnetpool_associate_address_scope_wrong_ip_version(self):
address_scope = self.create_address_scope(
name=data_utils.rand_name('smoke-address-scope'),

View File

@ -42,7 +42,7 @@ class TestTimeStamp(base.BaseAdminNetworkTest):
cls._subnetpool_data = {'min_prefixlen': '29', 'prefixes': prefixes}
def _create_subnetpool(self, is_admin=False, **kwargs):
name = data_utils.rand_name('subnetpool-')
name = data_utils.rand_name('subnetpool')
subnetpool_data = copy.deepcopy(self._subnetpool_data)
for key in subnetpool_data.keys():
kwargs[key] = subnetpool_data[key]

View File

@ -12,11 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import utils
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron_tempest_plugin.api import base
from neutron_tempest_plugin import config
@ -95,7 +95,7 @@ class TrunkTestJSON(TrunkTestJSONBase):
self.assertRaises(lib_exc.NotFound, self._show_trunk, trunk_id)
@decorators.idempotent_id('8d83a6ca-662d-45b8-8062-d513077296aa')
@test.requires_ext(extension="project-id", service="network")
@utils.requires_ext(extension="project-id", service="network")
def test_show_trunk_has_project_id(self):
trunk = self._create_trunk_with_network_and_parent(None)
body = self._show_trunk(trunk['trunk']['id'])
@ -228,7 +228,7 @@ class TrunkTestInheritJSONBase(TrunkTestJSONBase):
def create_provider_network(self):
foo_net = config.CONF.neutron_plugin_options.provider_vlans[0]
post_body = {'network_name': data_utils.rand_name('vlan-net-'),
post_body = {'network_name': data_utils.rand_name('vlan-net'),
'provider:network_type': 'vlan',
'provider:physical_network': foo_net}
return self.create_shared_network(**post_body)
@ -276,11 +276,11 @@ class TrunkTestMtusJSONBase(TrunkTestJSONBase):
super(TrunkTestMtusJSONBase, self).setUp()
# VXLAN autocomputed MTU (1450) is smaller than that of GRE (1458)
vxlan_kwargs = {'network_name': data_utils.rand_name('vxlan-net-'),
vxlan_kwargs = {'network_name': data_utils.rand_name('vxlan-net'),
'provider:network_type': 'vxlan'}
self.smaller_mtu_net = self.create_shared_network(**vxlan_kwargs)
gre_kwargs = {'network_name': data_utils.rand_name('gre-net-'),
gre_kwargs = {'network_name': data_utils.rand_name('gre-net'),
'provider:network_type': 'gre'}
self.larger_mtu_net = self.create_shared_network(**gre_kwargs)

View File

@ -1,56 +0,0 @@
# Copyright (c) 2015 Red Hat Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth_limit'
RULE_TYPE_DSCP_MARKING = 'dscp_marking'
RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum_bandwidth'
VALID_RULE_TYPES = [RULE_TYPE_BANDWIDTH_LIMIT,
RULE_TYPE_DSCP_MARKING,
RULE_TYPE_MINIMUM_BANDWIDTH,
]
# Names of rules' attributes
MAX_KBPS = "max_kbps"
MAX_BURST = "max_burst_kbps"
MIN_KBPS = "min_kbps"
DIRECTION = "direction"
DSCP_MARK = "dscp_mark"
QOS_POLICY_ID = 'qos_policy_id'
QOS_PLUGIN = 'qos_plugin'
# NOTE(slaweq): Value used to calculate burst value for egress bandwidth limit
# if burst is not given by user. In such case burst value will be calculated
# as 80% of bw_limit to ensure that at least limits for TCP traffic will work
# fine.
DEFAULT_BURST_RATE = 0.8
# Method names for QoSDriver
PRECOMMIT_POSTFIX = '_precommit'
CREATE_POLICY = 'create_policy'
CREATE_POLICY_PRECOMMIT = CREATE_POLICY + PRECOMMIT_POSTFIX
UPDATE_POLICY = 'update_policy'
UPDATE_POLICY_PRECOMMIT = UPDATE_POLICY + PRECOMMIT_POSTFIX
DELETE_POLICY = 'delete_policy'
DELETE_POLICY_PRECOMMIT = DELETE_POLICY + PRECOMMIT_POSTFIX
QOS_CALL_METHODS = (
CREATE_POLICY,
CREATE_POLICY_PRECOMMIT,
UPDATE_POLICY,
UPDATE_POLICY_PRECOMMIT,
DELETE_POLICY,
DELETE_POLICY_PRECOMMIT, )

View File

@ -34,6 +34,10 @@ NeutronPluginOptions = [
default=False,
help='Image that supports features that cirros does not, like'
' Ubuntu or CentOS supporting advanced features'),
cfg.StrOpt('agent_availability_zone',
help='The availability zone for all agents in the deployment. '
'Configure this only when the single value is used by '
'all agents in the deployment.'),
]
# TODO(amuller): Redo configuration options registration as part of the planned

View File

@ -12,6 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import subprocess
import netaddr
from oslo_log import log
@ -173,7 +174,7 @@ class BaseTempestTestCase(base_api.BaseNetworkTest):
LOG.debug("Created subnet %s", self.subnet['id'])
secgroup = self.os_primary.network_client.create_security_group(
name=data_utils.rand_name('secgroup-'))
name=data_utils.rand_name('secgroup'))
LOG.debug("Created security group %s",
secgroup['security_group']['name'])
self.security_groups.append(secgroup['security_group'])
@ -276,3 +277,41 @@ class BaseTempestTestCase(base_api.BaseNetworkTest):
nic=None, mtu=None, fragmentation=True):
self.assertTrue(self._check_remote_connectivity(
source, dest, should_succeed, nic, mtu, fragmentation))
def ping_ip_address(self, ip_address, should_succeed=True,
ping_timeout=None, mtu=None):
# the code is taken from tempest/scenario/manager.py in tempest git
timeout = ping_timeout or CONF.validation.ping_timeout
cmd = ['ping', '-c1', '-w1']
if mtu:
cmd += [
# don't fragment
'-M', 'do',
# ping receives just the size of ICMP payload
'-s', str(net_utils.get_ping_payload_size(mtu, 4))
]
cmd.append(ip_address)
def ping():
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.communicate()
return (proc.returncode == 0) == should_succeed
caller = test_utils.find_test_caller()
LOG.debug('%(caller)s begins to ping %(ip)s in %(timeout)s sec and the'
' expected result is %(should_succeed)s', {
'caller': caller, 'ip': ip_address, 'timeout': timeout,
'should_succeed':
'reachable' if should_succeed else 'unreachable'
})
result = test_utils.call_until_true(ping, timeout, 1)
LOG.debug('%(caller)s finishes ping %(ip)s in %(timeout)s sec and the '
'ping result is %(result)s', {
'caller': caller, 'ip': ip_address, 'timeout': timeout,
'result': 'expected' if result else 'unexpected'
})
return result

View File

@ -12,8 +12,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import utils
from tempest.lib import decorators
from tempest import test
from neutron_lib import constants
from neutron_tempest_plugin import config
@ -49,7 +49,7 @@ class NetworkDvrTest(base.BaseTempestTestCase, NetworkTestMixin):
force_tenant_isolation = False
@classmethod
@test.requires_ext(extension="dvr", service="network")
@utils.requires_ext(extension="dvr", service="network")
def skip_checks(cls):
super(NetworkDvrTest, cls).skip_checks()

View File

@ -14,10 +14,10 @@
# under the License.
import netaddr
from tempest.common import utils
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest import test
import testscenarios
from testscenarios.scenarios import multiply_scenarios
@ -37,7 +37,7 @@ class FloatingIpTestCasesMixin(object):
credentials = ['primary', 'admin']
@classmethod
@test.requires_ext(extension="router", service="network")
@utils.requires_ext(extension="router", service="network")
def resource_setup(cls):
super(FloatingIpTestCasesMixin, cls).resource_setup()
cls.network = cls.create_network()
@ -47,7 +47,7 @@ class FloatingIpTestCasesMixin(object):
cls.keypair = cls.create_keypair()
cls.secgroup = cls.os_primary.network_client.create_security_group(
name=data_utils.rand_name('secgroup-'))['security_group']
name=data_utils.rand_name('secgroup'))['security_group']
cls.security_groups.append(cls.secgroup)
cls.create_loginable_secgroup_rule(secgroup_id=cls.secgroup['id'])
cls.create_pingable_secgroup_rule(secgroup_id=cls.secgroup['id'])

View File

@ -13,9 +13,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib import decorators
from tempest import test
import functools
from neutron_lib.api.definitions import portbindings as pb
from neutron_lib import constants as const
from tempest.common import utils
from tempest.lib import decorators
from neutron_tempest_plugin.common import utils as common_utils
from neutron_tempest_plugin.scenario import base
from neutron_tempest_plugin.scenario import test_dvr
@ -26,8 +31,8 @@ class NetworkMigrationTestBase(base.BaseTempestTestCase,
force_tenant_isolation = False
@classmethod
@test.requires_ext(extension="dvr", service="network")
@test.requires_ext(extension="l3-ha", service="network")
@utils.requires_ext(extension="dvr", service="network")
@utils.requires_ext(extension="l3-ha", service="network")
def skip_checks(cls):
super(NetworkMigrationTestBase, cls).skip_checks()
@ -36,12 +41,77 @@ class NetworkMigrationTestBase(base.BaseTempestTestCase,
self.assertEqual(is_dvr, router['router']['distributed'])
self.assertEqual(is_ha, router['router']['ha'])
def _wait_until_port_deleted(self, router_id, device_owner):
common_utils.wait_until_true(
functools.partial(
self._is_port_deleted,
router_id,
device_owner),
timeout=300, sleep=5)
def _is_port_deleted(self, router_id, device_owner):
ports = self.os_admin.network_client.list_ports(
device_id=router_id,
device_owner=device_owner)
return not ports.get('ports')
def _wait_until_port_ready(self, router_id, device_owner):
common_utils.wait_until_true(
functools.partial(
self._is_port_active,
router_id,
device_owner),
timeout=300, sleep=5)
def _is_port_active(self, router_id, device_owner):
ports = self.os_admin.network_client.list_ports(
device_id=router_id,
device_owner=device_owner,
status=const.ACTIVE).get('ports')
if ports:
if ports[0][pb.VIF_TYPE] not in [pb.VIF_TYPE_UNBOUND,
pb.VIF_TYPE_BINDING_FAILED]:
return True
return False
def _wait_until_router_ports_ready(self, router_id, dvr, ha):
if dvr:
self._wait_until_port_ready(
router_id, const.DEVICE_OWNER_DVR_INTERFACE)
if ha:
self._wait_until_port_ready(
router_id, const.DEVICE_OWNER_ROUTER_HA_INTF)
if dvr:
self._wait_until_port_ready(
router_id, const.DEVICE_OWNER_ROUTER_SNAT)
else:
self._wait_until_port_ready(
router_id, const.DEVICE_OWNER_HA_REPLICATED_INT)
self._wait_until_port_ready(
router_id, const.DEVICE_OWNER_ROUTER_GW)
def _wait_until_router_ports_migrated(
self, router_id, before_dvr, before_ha, after_dvr, after_ha):
if before_ha and not after_ha:
self._wait_until_port_deleted(
router_id, const.DEVICE_OWNER_ROUTER_HA_INTF)
self._wait_until_port_deleted(
router_id, const.DEVICE_OWNER_HA_REPLICATED_INT)
if before_dvr and not after_dvr:
self._wait_until_port_deleted(
router_id, const.DEVICE_OWNER_DVR_INTERFACE)
self._wait_until_port_deleted(
router_id, const.DEVICE_OWNER_ROUTER_SNAT)
self._wait_until_router_ports_ready(router_id, after_dvr, after_ha)
def _test_migration(self, before_dvr, before_ha, after_dvr, after_ha):
router = self.create_router_by_client(
distributed=before_dvr, ha=before_ha,
tenant_id=self.client.tenant_id, is_admin=True)
self.setup_network_and_server(router=router)
self._wait_until_router_ports_ready(
router['id'], before_dvr, before_ha)
self._check_connectivity()
self.os_admin.network_client.update_router(
@ -52,6 +122,9 @@ class NetworkMigrationTestBase(base.BaseTempestTestCase,
self.os_admin.network_client.update_router(
router_id=router['id'], admin_state_up=True)
self._wait_until_router_ports_migrated(
router['id'], before_dvr, before_ha, after_dvr, after_ha)
self._check_connectivity()

View File

@ -0,0 +1,134 @@
# Copyright 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from neutron_lib.api.definitions import provider_net
from tempest.common import utils
from tempest.common import waiters
from tempest.lib import decorators
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin import config
from neutron_tempest_plugin.scenario import base
from neutron_tempest_plugin.scenario import constants
CONF = config.CONF
class NetworkMtuBaseTest(base.BaseTempestTestCase):
credentials = ['primary', 'admin']
servers = []
networks = []
@classmethod
def skip_checks(cls):
super(NetworkMtuBaseTest, cls).skip_checks()
if ("vxlan" not in
config.CONF.neutron_plugin_options.available_type_drivers
or "gre" not in
config.CONF.neutron_plugin_options.available_type_drivers):
raise cls.skipException("GRE or VXLAN type_driver is not enabled")
@classmethod
@utils.requires_ext(extension=provider_net.ALIAS, service="network")
def resource_setup(cls):
super(NetworkMtuBaseTest, cls).resource_setup()
# setup basic topology for servers we can log into it
cls.router = cls.create_router_by_client()
cls.keypair = cls.create_keypair()
cls.secgroup = cls.os_primary.network_client.create_security_group(
name='secgroup_mtu')
cls.security_groups.append(cls.secgroup['security_group'])
cls.create_loginable_secgroup_rule(
secgroup_id=cls.secgroup['security_group']['id'])
cls.create_pingable_secgroup_rule(
secgroup_id=cls.secgroup['security_group']['id'])
def _create_setup(self):
self.admin_client = self.os_admin.network_client
net_kwargs = {'tenant_id': self.client.tenant_id}
for sub, net_type in (
('10.100.0.0/16', 'vxlan'), ('10.200.0.0/16', 'gre')):
net_kwargs['name'] = '-'.join([net_type, 'net'])
net_kwargs['provider:network_type'] = net_type
network = self.admin_client.create_network(**net_kwargs)[
'network']
self.networks.append(network)
self.addCleanup(self.admin_client.delete_network, network['id'])
cidr = netaddr.IPNetwork(sub)
subnet = self.create_subnet(network, cidr=cidr)
self.create_router_interface(self.router['id'], subnet['id'])
self.addCleanup(self.client.remove_router_interface_with_subnet_id,
self.router['id'], subnet['id'])
# check that MTUs are different for 2 networks
self.assertNotEqual(self.networks[0]['mtu'], self.networks[1]['mtu'])
self.networks.sort(key=lambda net: net['mtu'])
server1, fip1 = self.create_pingable_vm(self.networks[0])
server_ssh_client1 = ssh.Client(
self.floating_ips[0]['floating_ip_address'],
CONF.validation.image_ssh_user,
pkey=self.keypair['private_key'])
server2, fip2 = self.create_pingable_vm(self.networks[1])
server_ssh_client2 = ssh.Client(
self.floating_ips[0]['floating_ip_address'],
CONF.validation.image_ssh_user,
pkey=self.keypair['private_key'])
for fip in (fip1, fip2):
self.check_connectivity(fip['floating_ip_address'],
CONF.validation.image_ssh_user,
self.keypair['private_key'])
return server_ssh_client1, fip1, server_ssh_client2, fip2
def create_pingable_vm(self, net):
server = self.create_server(
flavor_ref=CONF.compute.flavor_ref,
image_ref=CONF.compute.image_ref,
key_name=self.keypair['name'],
networks=[{'uuid': net['id']}],
security_groups=[{'name': self.secgroup[
'security_group']['name']}])
waiters.wait_for_server_status(
self.os_primary.servers_client, server['server']['id'],
constants.SERVER_STATUS_ACTIVE)
port = self.client.list_ports(
network_id=net['id'], device_id=server['server']['id'])['ports'][0]
fip = self.create_and_associate_floatingip(port['id'])
return server, fip
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a273d9d344')
def test_connectivity_min_max_mtu(self):
server_ssh_client, _, _, fip2 = self._create_setup()
# ping with min mtu of 2 networks succeeds even when
# fragmentation is disabled
self.check_remote_connectivity(
server_ssh_client, fip2['fixed_ip_address'],
mtu=self.networks[0]['mtu'], fragmentation=False)
# ping with the size above min mtu of 2 networks
# fails when fragmentation is disabled
self.check_remote_connectivity(
server_ssh_client, fip2['fixed_ip_address'], should_succeed=False,
mtu=self.networks[0]['mtu'] + 1, fragmentation=False)
# ping with max mtu of 2 networks succeeds when
# fragmentation is enabled
self.check_remote_connectivity(
server_ssh_client, fip2['fixed_ip_address'],
mtu=self.networks[1]['mtu'])
# ping with max mtu of 2 networks fails when fragmentation is disabled
self.check_remote_connectivity(
server_ssh_client, fip2['fixed_ip_address'], should_succeed=False,
mtu=self.networks[1]['mtu'], fragmentation=False)

View File

@ -16,13 +16,13 @@ import errno
import socket
import time
from neutron_lib.services.qos import constants as qos_consts
from oslo_log import log as logging
from tempest.common import utils as tutils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import test
from neutron_tempest_plugin.api import base as base_api
from neutron_tempest_plugin.common import qos_consts
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin.common import utils
from neutron_tempest_plugin import config
@ -80,7 +80,7 @@ class QoSTest(base.BaseTempestTestCase):
FILE_PATH = "/tmp/img"
@classmethod
@test.requires_ext(extension="qos", service="network")
@tutils.requires_ext(extension="qos", service="network")
@base_api.require_qos_rule_type(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT)
def resource_setup(cls):
super(QoSTest, cls).resource_setup()
@ -154,9 +154,9 @@ class QoSTest(base.BaseTempestTestCase):
CONF.validation.image_ssh_user,
pkey=self.keypair['private_key'])
policy = self.os_admin.network_client.create_qos_policy(
name='test-policy',
description='test-qos-policy',
shared=True)
name='test-policy',
description='test-qos-policy',
shared=True)
policy_id = policy['policy']['id']
self.os_admin.network_client.create_bandwidth_limit_rule(
policy_id, max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,

View File

@ -0,0 +1,200 @@
# Copyright 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin import config
from neutron_tempest_plugin.scenario import base
from neutron_tempest_plugin.scenario import constants as const
CONF = config.CONF
class NetworkDefaultSecGroupTest(base.BaseTempestTestCase):
credentials = ['primary', 'admin']
required_extensions = ['router', 'security-group']
@classmethod
def resource_setup(cls):
super(NetworkDefaultSecGroupTest, cls).resource_setup()
# setup basic topology for servers we can log into it
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
router = cls.create_router_by_client()
cls.create_router_interface(router['id'], cls.subnet['id'])
cls.keypair = cls.create_keypair()
def create_vm_testing_sec_grp(self, num_servers=2, security_groups=None):
servers, fips, server_ssh_clients = ([], [], [])
for i in range(num_servers):
servers.append(self.create_server(
flavor_ref=CONF.compute.flavor_ref,
image_ref=CONF.compute.image_ref,
key_name=self.keypair['name'],
networks=[{'uuid': self.network['id']}],
security_groups=security_groups))
for i, server in enumerate(servers):
waiters.wait_for_server_status(
self.os_primary.servers_client, server['server']['id'],
const.SERVER_STATUS_ACTIVE)
port = self.client.list_ports(
network_id=self.network['id'], device_id=server['server'][
'id'])['ports'][0]
fips.append(self.create_and_associate_floatingip(port['id']))
server_ssh_clients.append(ssh.Client(
fips[i]['floating_ip_address'], CONF.validation.image_ssh_user,
pkey=self.keypair['private_key']))
return server_ssh_clients, fips, servers
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d764')
def test_default_sec_grp_scenarios(self):
server_ssh_clients, fips, _ = self.create_vm_testing_sec_grp()
# Check ssh connectivity when you add sec group rule, enabling ssh
self.create_loginable_secgroup_rule(
self.os_primary.network_client.list_security_groups()[
'security_groups'][0]['id']
)
self.check_connectivity(fips[0]['floating_ip_address'],
CONF.validation.image_ssh_user,
self.keypair['private_key'])
# make sure ICMP connectivity still fails as only ssh rule was added
self.ping_ip_address(fips[0]['floating_ip_address'],
should_succeed=False)
# Check ICMP connectivity between VMs without specific rule for that
# It should work though the rule is not configured
self.check_remote_connectivity(
server_ssh_clients[0], fips[1]['fixed_ip_address'])
# Check ICMP connectivity from VM to external network
subnets = self.os_admin.network_client.list_subnets(
network_id=CONF.network.public_network_id)['subnets']
ext_net_ip = None
for subnet in subnets:
if subnet['ip_version'] == 4:
ext_net_ip = subnet['gateway_ip']
break
self.assertTrue(ext_net_ip)
self.check_remote_connectivity(server_ssh_clients[0], ext_net_ip)
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d864')
def test_protocol_number_rule(self):
# protocol number is added instead of str in security rule creation
server_ssh_clients, fips, _ = self.create_vm_testing_sec_grp(
num_servers=1)
self.ping_ip_address(fips[0]['floating_ip_address'],
should_succeed=False)
rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
'direction': constants.INGRESS_DIRECTION,
'remote_ip_prefix': '0.0.0.0/0'}]
secgroup_id = self.os_primary.network_client.list_security_groups()[
'security_groups'][0]['id']
self.create_secgroup_rules(rule_list, secgroup_id=secgroup_id)
self.ping_ip_address(fips[0]['floating_ip_address'])
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d964')
def test_two_sec_groups(self):
# add 2 sec groups to VM and test rules of both are working
ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
icmp_secgrp_name = data_utils.rand_name('icmp_secgrp')
ssh_secgrp = self.os_primary.network_client.create_security_group(
name=ssh_secgrp_name)
self.create_loginable_secgroup_rule(
secgroup_id=ssh_secgrp['security_group']['id'])
icmp_secgrp = self.os_primary.network_client.create_security_group(
name=icmp_secgrp_name)
self.create_pingable_secgroup_rule(
secgroup_id=icmp_secgrp['security_group']['id'])
for sec_grp in (ssh_secgrp, icmp_secgrp):
self.security_groups.append(sec_grp['security_group'])
security_groups_list = [{'name': ssh_secgrp_name},
{'name': icmp_secgrp_name}]
server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
num_servers=1, security_groups=security_groups_list)
# make sure ssh connectivity works
self.check_connectivity(fips[0]['floating_ip_address'],
CONF.validation.image_ssh_user,
self.keypair['private_key'])
# make sure ICMP connectivity works
self.ping_ip_address(fips[0]['floating_ip_address'],
should_succeed=True)
ports = self.client.list_ports(device_id=servers[0]['server']['id'])
port_id = ports['ports'][0]['id']
# update port with ssh security group only
self.os_primary.network_client.update_port(
port_id, security_groups=[ssh_secgrp['security_group']['id']])
# make sure ssh connectivity works
self.check_connectivity(fips[0]['floating_ip_address'],
CONF.validation.image_ssh_user,
self.keypair['private_key'])
# make sure ICMP connectivity doesn't work
self.ping_ip_address(fips[0]['floating_ip_address'],
should_succeed=False)
# update port with ssh and ICMP security groups
self.os_primary.network_client.update_port(
port_id, security_groups=[
icmp_secgrp['security_group']['id'],
ssh_secgrp['security_group']['id']])
# make sure ssh connectivity works after update
self.check_connectivity(fips[0]['floating_ip_address'],
CONF.validation.image_ssh_user,
self.keypair['private_key'])
# make sure ICMP connectivity works after update
self.ping_ip_address(fips[0]['floating_ip_address'])
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d664')
def test_ip_prefix(self):
# Add specific remote prefix to VMs and check connectivity
ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
icmp_secgrp_name = data_utils.rand_name('icmp_secgrp_with_cidr')
cidr = self.subnet['cidr']
ssh_secgrp = self.os_primary.network_client.create_security_group(
name=ssh_secgrp_name)
self.create_loginable_secgroup_rule(
secgroup_id=ssh_secgrp['security_group']['id'])
rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
'direction': constants.INGRESS_DIRECTION,
'remote_ip_prefix': cidr}]
icmp_secgrp = self.os_primary.network_client.create_security_group(
name=icmp_secgrp_name)
self.create_secgroup_rules(
rule_list, secgroup_id=icmp_secgrp['security_group']['id'])
for sec_grp in (ssh_secgrp, icmp_secgrp):
self.security_groups.append(sec_grp['security_group'])
security_groups_list = [{'name': ssh_secgrp_name},
{'name': icmp_secgrp_name}]
server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
security_groups=security_groups_list)
# make sure ssh connectivity works
self.check_connectivity(fips[0]['floating_ip_address'],
CONF.validation.image_ssh_user,
self.keypair['private_key'])
# make sure ICMP connectivity works
self.check_remote_connectivity(server_ssh_clients[0], fips[1][
'fixed_ip_address'])

View File

@ -14,10 +14,10 @@
import netaddr
from oslo_log import log as logging
from tempest.common import utils as tutils
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest import test
import testtools
from neutron_tempest_plugin.common import ssh
@ -47,7 +47,7 @@ class TrunkTest(base.BaseTempestTestCase):
force_tenant_isolation = False
@classmethod
@test.requires_ext(extension="trunk", service="network")
@tutils.requires_ext(extension="trunk", service="network")
def resource_setup(cls):
super(TrunkTest, cls).resource_setup()
# setup basic topology for servers we can log into
@ -57,7 +57,7 @@ class TrunkTest(base.BaseTempestTestCase):
cls.create_router_interface(router['id'], cls.subnet['id'])
cls.keypair = cls.create_keypair()
cls.secgroup = cls.os_primary.network_client.create_security_group(
name=data_utils.rand_name('secgroup-'))
name=data_utils.rand_name('secgroup'))
cls.security_groups.append(cls.secgroup['security_group'])
cls.create_loginable_secgroup_rule(
secgroup_id=cls.secgroup['security_group']['id'])