Merge "Add scenario tests to taas tempest plugin"

This commit is contained in:
Zuul 2019-12-25 12:25:02 +00:00 committed by Gerrit Code Review
commit 824bbc5318
4 changed files with 469 additions and 66 deletions

View File

@ -101,6 +101,21 @@
devstack_localrc:
TEMPEST_PLUGINS: /opt/stack/tap-as-a-service-tempest-plugin
NETWORK_API_EXTENSIONS: "{{ (network_api_extensions_common + network_api_extensions_tempest) | join(',') }}"
DOWNLOAD_DEFAULT_IMAGES: false
IMAGE_URLS: "http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-i386-disk.img,https://cloud-images.ubuntu.com/releases/bionic/release/ubuntu-18.04-server-cloudimg-amd64.img"
DEFAULT_IMAGE_NAME: cirros-0.3.4-i386-disk
ADVANCED_IMAGE_NAME: ubuntu-18.04-server-cloudimg-amd64
BUILD_TIMEOUT: 784
devstack_local_conf:
test-config:
$TEMPEST_CONFIG:
taas_plugin_options:
advanced_image_ref: ubuntu-18.04-server-cloudimg-amd64
advanced_image_ssh_user: ubuntu
provider_physical_network: public
provider_segmentation_id: 100
image_feature_enabled:
api_v2: true
devstack_plugins:
neutron: git://opendev.org/openstack/neutron.git
tap-as-a-service: git://opendev.org/x/tap-as-a-service.git

View File

@ -34,4 +34,14 @@ TaaSPluginGroup = [
default='',
help='Comma separated list of VLANs to be mirrored '
'for a Tap-Flow.'),
cfg.StrOpt('advanced_image_ref',
default='',
help='Valid advanced image uuid to be used in tests. '
'Must contain tcpdump preinstalled.'),
cfg.StrOpt('advanced_image_ssh_user',
default='ubuntu',
help='Name of ssh user to use with advanced image in tests.'),
cfg.StrOpt('advanced_image_flavor_ref',
default='m1.medium',
help='Valid flavor to use with advanced image in tests.'),
]

View File

@ -38,7 +38,7 @@ LOG = log.getLogger(__name__)
class ScenarioTest(tempest.test.BaseTestCase):
"""Base class for scenario tests. Uses tempest own clients. """
credentials = ['primary']
credentials = ['primary', 'admin']
@classmethod
def setup_clients(cls):
@ -51,10 +51,14 @@ class ScenarioTest(tempest.test.BaseTestCase):
cls.ports_client = cls.os_primary.ports_client
cls.routers_client = cls.os_primary.routers_client
cls.subnets_client = cls.os_primary.subnets_client
cls.flavor_client = cls.os_primary.flavors_client
cls.floating_ips_client = cls.os_primary.floating_ips_client
cls.security_groups_client = cls.os_primary.security_groups_client
cls.security_group_rules_client = (
cls.os_primary.security_group_rules_client)
cls.admin_networks_client = cls.os_admin.networks_client
cls.admin_subnets_client = cls.os_admin.subnets_client
cls.admin_routers_client = cls.os_admin.routers_client
# ## Test functions library
#
@ -76,13 +80,14 @@ class ScenarioTest(tempest.test.BaseTestCase):
client.delete_port, port['id'])
return port
def create_keypair(self, client=None):
@classmethod
def create_keypair(cls, client=None):
if not client:
client = self.keypairs_client
name = data_utils.rand_name(self.__class__.__name__)
client = cls.keypairs_client
name = data_utils.rand_name(cls.__class__.__name__)
# We don't need to create a keypair by pubkey in scenario
body = client.create_keypair(name=name)
self.addCleanup(client.delete_keypair, name)
cls.addClassResourceCleanup(client.delete_keypair, name)
return body['keypair']
def create_server(self, name=None, image_id=None, flavor=None,
@ -353,12 +358,13 @@ class NetworkScenarioTest(ScenarioTest):
if not CONF.service_available.neutron:
raise cls.skipException('Neutron not available')
def _create_network(self, networks_client=None,
@classmethod
def _create_network(cls, networks_client=None,
tenant_id=None,
namestart='network-smoke-',
port_security_enabled=True):
if not networks_client:
networks_client = self.networks_client
networks_client = cls.networks_client
if not tenant_id:
tenant_id = networks_client.tenant_id
name = data_utils.rand_name(namestart)
@ -370,13 +376,14 @@ class NetworkScenarioTest(ScenarioTest):
result = networks_client.create_network(**network_kwargs)
network = result['network']
self.assertEqual(network['name'], name)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
networks_client.delete_network,
network['id'])
assert network['name'] == name
cls.addClassResourceCleanup(test_utils.call_and_ignore_notfound_exc,
networks_client.delete_network,
network['id'])
return network
def _create_subnet(self, network, subnets_client=None,
@classmethod
def _create_subnet(cls, network, subnets_client=None,
routers_client=None, namestart='subnet-smoke',
**kwargs):
"""Create a subnet for the given network
@ -384,9 +391,9 @@ class NetworkScenarioTest(ScenarioTest):
within the cidr block configured for tenant networks.
"""
if not subnets_client:
subnets_client = self.subnets_client
subnets_client = cls.subnets_client
if not routers_client:
routers_client = self.routers_client
routers_client = cls.routers_client
def cidr_in_use(cidr, tenant_id):
"""Check cidr existence
@ -394,7 +401,7 @@ class NetworkScenarioTest(ScenarioTest):
:returns: True if subnet with cidr already exist in tenant
False else
"""
cidr_in_use = self.os_admin.subnets_client.list_subnets(
cidr_in_use = cls.os_admin.subnets_client.list_subnets(
tenant_id=tenant_id, cidr=cidr)['subnets']
return len(cidr_in_use) != 0
@ -432,13 +439,14 @@ class NetworkScenarioTest(ScenarioTest):
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
self.assertIsNotNone(result, 'Unable to allocate tenant network')
assert result is not None, 'Unable to allocate tenant network'
subnet = result['subnet']
self.assertEqual(subnet['cidr'], str_cidr)
assert subnet['cidr'] == str_cidr
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
subnets_client.delete_subnet, subnet['id'])
cls.addClassResourceCleanup(
test_utils.call_and_ignore_notfound_exc,
subnets_client.delete_subnet, subnet['id'])
return subnet
@ -593,31 +601,33 @@ class NetworkScenarioTest(ScenarioTest):
CONF.validation.ping_timeout,
1)
def _create_security_group(self, security_group_rules_client=None,
@classmethod
def _create_security_group(cls, security_group_rules_client=None,
tenant_id=None,
namestart='secgroup-smoke',
security_groups_client=None):
if security_group_rules_client is None:
security_group_rules_client = self.security_group_rules_client
security_group_rules_client = cls.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
security_groups_client = cls.security_groups_client
if tenant_id is None:
tenant_id = security_groups_client.tenant_id
secgroup = self._create_empty_security_group(
secgroup = cls._create_empty_security_group(
namestart=namestart, client=security_groups_client,
tenant_id=tenant_id)
# Add rules to the security group
rules = self._create_loginable_secgroup_rule(
rules = cls._create_loginable_secgroup_rule(
security_group_rules_client=security_group_rules_client,
secgroup=secgroup,
security_groups_client=security_groups_client)
for rule in rules:
self.assertEqual(tenant_id, rule['tenant_id'])
self.assertEqual(secgroup['id'], rule['security_group_id'])
assert tenant_id == rule['tenant_id']
assert secgroup['id'] == rule['security_group_id']
return secgroup
def _create_empty_security_group(self, client=None, tenant_id=None,
@classmethod
def _create_empty_security_group(cls, client=None, tenant_id=None,
namestart='secgroup-smoke'):
"""Create a security group without rules.
@ -629,7 +639,7 @@ class NetworkScenarioTest(ScenarioTest):
:returns: the created security group
"""
if client is None:
client = self.security_groups_client
client = cls.security_groups_client
if not tenant_id:
tenant_id = client.tenant_id
sg_name = data_utils.rand_name(namestart)
@ -640,21 +650,23 @@ class NetworkScenarioTest(ScenarioTest):
result = client.create_security_group(**sg_dict)
secgroup = result['security_group']
self.assertEqual(secgroup['name'], sg_name)
self.assertEqual(tenant_id, secgroup['tenant_id'])
self.assertEqual(secgroup['description'], sg_desc)
assert secgroup['name'] == sg_name
assert tenant_id == secgroup['tenant_id']
assert secgroup['description'] == sg_desc
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_security_group, secgroup['id'])
cls.addClassResourceCleanup(
test_utils.call_and_ignore_notfound_exc,
client.delete_security_group, secgroup['id'])
return secgroup
def _default_security_group(self, client=None, tenant_id=None):
@classmethod
def _default_security_group(cls, client=None, tenant_id=None):
"""Get default secgroup for given tenant_id.
:returns: default secgroup for given tenant
"""
if client is None:
client = self.security_groups_client
client = cls.security_groups_client
if not tenant_id:
tenant_id = client.tenant_id
sgs = [
@ -662,10 +674,11 @@ class NetworkScenarioTest(ScenarioTest):
if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
]
msg = "No default security group for tenant %s." % (tenant_id)
self.assertGreater(len(sgs), 0, msg)
assert len(sgs) > 0, msg
return sgs[0]
def _create_security_group_rule(self, secgroup=None,
@classmethod
def _create_security_group_rule(cls, secgroup=None,
sec_group_rules_client=None,
tenant_id=None,
security_groups_client=None, **kwargs):
@ -687,13 +700,13 @@ class NetworkScenarioTest(ScenarioTest):
}
"""
if sec_group_rules_client is None:
sec_group_rules_client = self.security_group_rules_client
sec_group_rules_client = cls.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
security_groups_client = cls.security_groups_client
if not tenant_id:
tenant_id = security_groups_client.tenant_id
if secgroup is None:
secgroup = self._default_security_group(
secgroup = cls._default_security_group(
client=security_groups_client, tenant_id=tenant_id)
ruleset = dict(security_group_id=secgroup['id'],
@ -703,12 +716,13 @@ class NetworkScenarioTest(ScenarioTest):
sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
sg_rule = sg_rule['security_group_rule']
self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
assert secgroup['tenant_id'] == sg_rule['tenant_id']
assert secgroup['id'] == sg_rule['security_group_id']
return sg_rule
def _create_loginable_secgroup_rule(self, security_group_rules_client=None,
@classmethod
def _create_loginable_secgroup_rule(cls, security_group_rules_client=None,
secgroup=None,
security_groups_client=None):
"""Create loginable security group rule
@ -721,9 +735,9 @@ class NetworkScenarioTest(ScenarioTest):
"""
if security_group_rules_client is None:
security_group_rules_client = self.security_group_rules_client
security_group_rules_client = cls.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
security_groups_client = cls.security_groups_client
rules = []
rulesets = [
dict(
@ -747,7 +761,7 @@ class NetworkScenarioTest(ScenarioTest):
for r_direction in ['ingress', 'egress']:
ruleset['direction'] = r_direction
try:
sg_rule = self._create_security_group_rule(
sg_rule = cls._create_security_group_rule(
sec_group_rules_client=sec_group_rules_client,
secgroup=secgroup,
security_groups_client=security_groups_client,
@ -758,12 +772,13 @@ class NetworkScenarioTest(ScenarioTest):
if msg not in ex._error_string:
raise ex
else:
self.assertEqual(r_direction, sg_rule['direction'])
assert r_direction == sg_rule['direction']
rules.append(sg_rule)
return rules
def _get_router(self, client=None, tenant_id=None):
@classmethod
def _get_router(cls, client=None, tenant_id=None):
"""Retrieve a router for the given tenant id.
If a public router has been configured, it will be returned.
@ -773,7 +788,7 @@ class NetworkScenarioTest(ScenarioTest):
routes traffic to the public network.
"""
if not client:
client = self.routers_client
client = cls.routers_client
if not tenant_id:
tenant_id = client.tenant_id
router_id = CONF.network.public_router_id
@ -782,7 +797,7 @@ class NetworkScenarioTest(ScenarioTest):
body = client.show_router(router_id)
return body['router']
elif network_id:
router = self._create_router(client, tenant_id)
router = cls._create_router(client, tenant_id)
kwargs = {'external_gateway_info': dict(network_id=network_id)}
router = client.update_router(router['id'], **kwargs)['router']
return router
@ -790,10 +805,11 @@ class NetworkScenarioTest(ScenarioTest):
raise Exception("Neither of 'public_router_id' or "
"'public_network_id' has been defined.")
def _create_router(self, client=None, tenant_id=None,
@classmethod
def _create_router(cls, client=None, tenant_id=None,
namestart='router-smoke'):
if not client:
client = self.routers_client
client = cls.routers_client
if not tenant_id:
tenant_id = client.tenant_id
name = data_utils.rand_name(namestart)
@ -801,10 +817,10 @@ class NetworkScenarioTest(ScenarioTest):
admin_state_up=True,
tenant_id=tenant_id)
router = result['router']
self.assertEqual(router['name'], name)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_router,
router['id'])
assert router['name'] == name
cls.addClassResourceCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_router,
router['id'])
return router
def _update_router_admin_state(self, router, admin_state_up):
@ -813,7 +829,8 @@ class NetworkScenarioTest(ScenarioTest):
router['id'], **kwargs)['router']
self.assertEqual(admin_state_up, router['admin_state_up'])
def create_networks(self, networks_client=None,
@classmethod
def create_networks(cls, networks_client=None,
routers_client=None, subnets_client=None,
tenant_id=None, dns_nameservers=None,
port_security_enabled=True):
@ -835,33 +852,149 @@ class NetworkScenarioTest(ScenarioTest):
if not CONF.compute.fixed_network_name:
m = 'fixed_network_name must be specified in config'
raise lib_exc.InvalidConfiguration(m)
network = self._get_network_by_name(
network = cls._get_network_by_name(
CONF.compute.fixed_network_name)
router = None
subnet = None
else:
network = self._create_network(
network = cls._create_network(
networks_client=networks_client,
tenant_id=tenant_id,
port_security_enabled=port_security_enabled)
router = self._get_router(client=routers_client,
tenant_id=tenant_id)
router = cls._get_router(client=routers_client,
tenant_id=tenant_id)
subnet_kwargs = dict(network=network,
subnets_client=subnets_client,
routers_client=routers_client)
# use explicit check because empty list is a valid option
if dns_nameservers is not None:
subnet_kwargs['dns_nameservers'] = dns_nameservers
subnet = self._create_subnet(**subnet_kwargs)
subnet = cls._create_subnet(**subnet_kwargs)
if not routers_client:
routers_client = self.routers_client
routers_client = cls.routers_client
router_id = router['id']
routers_client.add_router_interface(router_id,
subnet_id=subnet['id'])
# save a cleanup job to remove this association between
# router and subnet
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
routers_client.remove_router_interface, router_id,
subnet_id=subnet['id'])
cls.addClassResourceCleanup(
test_utils.call_and_ignore_notfound_exc,
routers_client.remove_router_interface, router_id,
subnet_id=subnet['id'])
return network, subnet, router
def _create_server_and_wait(self, source_port, image_id=None,
flavor_id=None):
image_id = image_id if image_id else CONF.compute.image_ref
flavor_id = flavor_id if flavor_id else CONF.compute.flavor_ref
vm = self.create_server(
flavor=flavor_id,
image_id=image_id,
key_name=self.keypair['name'],
wait_until='ACTIVE',
networks=[{'port': source_port['id']}])
return vm
def _create_server_with_floatingip(self, use_taas_cloud_image=False,
provider_net=False, **kwargs):
net_id = self.network['id']
flavor = CONF.compute.flavor_ref
image = CONF.compute.image_ref
if use_taas_cloud_image:
image = self._get_real_image_id(
CONF.taas_plugin_options.advanced_image_ref)
flavor = self._get_real_flavor_id(
CONF.taas_plugin_options.advanced_image_flavor_ref)
if provider_net:
net_id = self.provider_network['id']
port = self._create_port(
net_id, security_groups=[self.secgroup['id']], **kwargs)
self._create_server_and_wait(port, image, flavor)
fip = self.create_floating_ip(
port, port_id=port['id'],
external_network_id=CONF.network.public_network_id)
return port, fip
def _run_in_background(self, sshclient, cmd):
runInBg = "nohup %s 2>&1 &" % cmd
sshclient.exec_command(runInBg)
def _setup_provider_network(self):
net = self._create_provider_network()
self._create_provider_subnet(net["id"])
return net
def _create_provider_network(self):
network_kwargs = {
"admin_state_up": True,
"shared": True,
"provider:network_type": "vlan",
"provider:physical_network":
CONF.taas_plugin_options.provider_physical_network,
}
segmentation_id = CONF.taas_plugin_options.provider_segmentation_id
if segmentation_id and segmentation_id == "0":
network_kwargs['provider:network_type'] = 'flat'
elif segmentation_id:
network_kwargs['provider:segmentation_id'] = segmentation_id
network = self.admin_networks_client.create_network(
**network_kwargs)['network']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.admin_networks_client.delete_network,
network['id'])
return network
def _create_provider_subnet(self, net_id):
subnet = dict(
network_id=net_id,
cidr="172.25.100.0/24",
ip_version=4,
)
result = self.admin_subnets_client.create_subnet(**subnet)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.admin_subnets_client.delete_subnet, result['subnet']['id'])
self.admin_routers_client.add_router_interface(
self.router['id'], subnet_id=result['subnet']['id'])
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.admin_routers_client.remove_router_interface,
self.router['id'], subnet_id=result['subnet']['id'])
def _get_real_image_id(self, image_id):
try:
id = self.image_client.show_image(
image_id=image_id)["id"]
except AssertionError:
images = self.image_client.list_images(
params={"name": image_id})["images"]
if len(images) == 0:
self.skip_checks()
id = images[0]["id"]
return id
def _get_real_flavor_id(self, flavor_id):
id = None
try:
id = self.flavor_client.show_flavor(
flavor_id=flavor_id)["flavor"]["id"]
except AssertionError:
flavors = self.flavor_client.list_flavors()["flavors"]
for f in flavors:
if f["name"] == flavor_id:
id = f["id"]
if id is None:
self.skip_checks()
return id

View File

@ -0,0 +1,245 @@
# Copyright (c) 2019 AT&T
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from contextlib import contextmanager
from oslo_log import log
import testtools
from tempest.common import utils
from tempest import config
from tempest.lib.common.utils.linux import remote_client
from tempest.lib import decorators
from neutron_taas_tempest_plugin.tests.scenario import base
CONF = config.CONF
LOG = log.getLogger(__name__)
class TestTaaSTrafficScenarios(base.TaaSScenarioTest):
@classmethod
def setup_clients(cls):
super(TestTaaSTrafficScenarios, cls).setup_clients()
if CONF.image_feature_enabled.api_v1:
cls.image_client = cls.os_primary.image_client
elif CONF.image_feature_enabled.api_v2:
cls.image_client = cls.os_primary.image_client_v2
else:
raise cls.skipException(
'Either api_v1 or api_v2 must be True in '
'[image-feature-enabled].')
@classmethod
@utils.requires_ext(extension="router", service="network")
def resource_setup(cls):
super(TestTaaSTrafficScenarios, cls).resource_setup()
for ext in ['taas']:
if not utils.is_extension_enabled(ext, 'network'):
msg = "%s Extension not enabled." % ext
raise cls.skipException(msg)
cls.network, cls.subnet, cls.router = cls.create_networks()
cls.provider_network = None
cls.keypair = cls.create_keypair()
cls.secgroup = cls._create_security_group()
@contextmanager
def _setup_topology(self, taas=True, use_taas_cloud_image=False,
provider_net=False):
"""Setup topology for the test
+------------+
| monitor vm |
+-----+------+
|
+-----v---+
+--+ network <--+
| +----^----+ |
| | |
| +----+-+ +---+--+
| | vm 1 | | vm 2 |
| +------+ +------+
|
| +--------+
+--> router |
+-----+--+
|
+-----v------+
| public net |
+------------+
"""
if provider_net:
if CONF.taas_plugin_options.provider_physical_network:
self.provider_network = self._setup_provider_network()
else:
msg = "provider_physical_network not provided"
raise self.skipException(msg)
self.mon_port, mon_fip = self._create_server_with_floatingip(
use_taas_cloud_image=use_taas_cloud_image,
provider_net=provider_net)
self.left_port, left_fip = self._create_server_with_floatingip(
provider_net=provider_net)
self.right_port, right_fip = self._create_server_with_floatingip(
provider_net=provider_net)
if taas:
LOG.debug("Create TAAS service")
tap_service = self.create_tap_service(port_id=self.mon_port['id'])
self.create_tap_flow(tap_service_id=tap_service['id'],
direction='BOTH',
source_port=self.left_port['id'])
self.create_tap_flow(tap_service_id=tap_service['id'],
direction='BOTH',
source_port=self.right_port['id'])
user = CONF.validation.image_ssh_user
if use_taas_cloud_image:
user = CONF.taas_plugin_options.advanced_image_ssh_user
self.monitor_client = remote_client.RemoteClient(
mon_fip['floating_ip_address'], user,
pkey=self.keypair['private_key'])
self.left_client = remote_client.RemoteClient(
left_fip['floating_ip_address'], CONF.validation.image_ssh_user,
pkey=self.keypair['private_key'])
self.right_client = remote_client.RemoteClient(
right_fip['floating_ip_address'], CONF.validation.image_ssh_user,
pkey=self.keypair['private_key'])
yield
def _check_icmp_traffic(self):
log_location = "/tmp/tcpdumplog"
right_ip = self.right_port['fixed_ips'][0]['ip_address']
left_ip = self.left_port['fixed_ips'][0]['ip_address']
# Run tcpdump in background
self._run_in_background(self.monitor_client,
"sudo tcpdump -n -nn > %s" % log_location)
# Ensure tcpdump is up and running
psax = self.monitor_client.exec_command("ps -ax")
self.assertTrue("tcpdump" in psax)
# Run traffic from left_vm to right_vm
self.left_client.exec_command("ping -c 50 %s" % right_ip)
# Collect tcpdump results
output = self.monitor_client.exec_command("cat %s" % log_location)
self.assertTrue(len(output) > 0)
looking_for = ["IP %s > %s: ICMP echo request" % (left_ip, right_ip),
"IP %s > %s: ICMP echo reply" % (right_ip, left_ip)]
results = []
for tcpdump_line in looking_for:
results.append(tcpdump_line in output)
return all(results)
def _test_taas_connectivity(self, use_provider_net=False):
"""Ensure TAAS doesn't break connectivity
This test creates TAAS service between two servers and checks that
it doesn't break basic connectivity between them.
"""
# Check uninterrupted traffic between VMs
with self._setup_topology(provider_net=use_provider_net):
# Left to right
self._check_remote_connectivity(
self.left_client,
self.right_port['fixed_ips'][0]['ip_address'])
# Right to left
self._check_remote_connectivity(
self.right_client,
self.left_port['fixed_ips'][0]['ip_address'])
# TAAS vm to right
self._check_remote_connectivity(
self.monitor_client,
self.right_port['fixed_ips'][0]['ip_address'])
# TAAS vm to left
self._check_remote_connectivity(
self.monitor_client,
self.left_port['fixed_ips'][0]['ip_address'])
@decorators.idempotent_id('ff414b7d-e81c-47f2-b6c8-53bc2f1e9b00')
@decorators.attr(type='slow')
@utils.services('compute', 'network')
def test_taas_provider_network_connectivity(self):
self._test_taas_connectivity(use_provider_net=True)
@decorators.idempotent_id('e3c52e91-7abf-4dfd-8687-f7c071cdd333')
@decorators.attr(type='slow')
@utils.services('compute', 'network')
def test_taas_network_connectivity(self):
self._test_taas_connectivity(use_provider_net=False)
@decorators.idempotent_id('fcb15ca3-ef61-11e9-9792-f45c89c47e11')
@testtools.skipUnless(CONF.taas_plugin_options.advanced_image_ref,
'Cloud image not found.')
@decorators.attr(type='slow')
@utils.services('compute', 'network')
def test_taas_forwarded_traffic_positive(self):
"""Check that TAAS forwards traffic as expected"""
with self._setup_topology(use_taas_cloud_image=True):
# Check that traffic was forwarded to TAAS service
self.assertTrue(self._check_icmp_traffic())
@decorators.idempotent_id('6c54d9c5-075a-4a1f-bbe6-12c3c9abf1e2')
@testtools.skipUnless(CONF.taas_plugin_options.advanced_image_ref,
'Cloud image not found.')
@decorators.attr(type='slow')
@utils.services('compute', 'network')
def test_taas_forwarded_traffic_negative(self):
"""Check that TAAS doesn't forward traffic"""
with self._setup_topology(taas=False, use_taas_cloud_image=True):
# Check that traffic was NOT forwarded to TAAS service
self.assertFalse(self._check_icmp_traffic())
@decorators.idempotent_id('fcb15ca3-ef61-11e9-9792-f45c89c47e12')
@testtools.skipUnless(CONF.taas_plugin_options.advanced_image_ref,
'Cloud image not found.')
@decorators.attr(type='slow')
@utils.services('compute', 'network')
def test_taas_forwarded_traffic_provider_net_positive(self):
"""Check that TAAS forwards traffic as expected in provider network"""
with self._setup_topology(use_taas_cloud_image=True,
provider_net=True):
# Check that traffic was forwarded to TAAS service
self.assertTrue(self._check_icmp_traffic())
@decorators.idempotent_id('6c54d9c5-075a-4a1f-bbe6-12c3c9abf1e3')
@testtools.skipUnless(CONF.taas_plugin_options.advanced_image_ref,
'Cloud image not found.')
@decorators.attr(type='slow')
@utils.services('compute', 'network')
def test_taas_forwarded_traffic_provider_net_negative(self):
"""Check that TAAS doesn't forward traffic in provider network"""
with self._setup_topology(taas=False, use_taas_cloud_image=True,
provider_net=True):
# Check that traffic was NOT forwarded to TAAS service
self.assertFalse(self._check_icmp_traffic())