diff --git a/neutron_taas_tempest_plugin/tests/scenario/manager.py b/neutron_taas_tempest_plugin/tests/scenario/manager.py index 02b481c..f28ef9b 100644 --- a/neutron_taas_tempest_plugin/tests/scenario/manager.py +++ b/neutron_taas_tempest_plugin/tests/scenario/manager.py @@ -14,28 +14,25 @@ # License for the specific language governing permissions and limitations # under the License. -import subprocess - import netaddr from oslo_log import log from oslo_utils import netutils from tempest.common import compute from tempest.common.utils.linux import remote_client -from tempest.common.utils import net_utils from tempest.common import waiters from tempest import config from tempest.lib.common.utils import data_utils from tempest.lib.common.utils import test_utils from tempest.lib import exceptions as lib_exc -import tempest.test +import tempest.scenario.manager CONF = config.CONF LOG = log.getLogger(__name__) -class ScenarioTest(tempest.test.BaseTestCase): +class ScenarioTest(tempest.scenario.manager.ScenarioTest): """Base class for scenario tests. Uses tempest own clients. """ credentials = ['primary', 'admin'] @@ -65,21 +62,6 @@ class ScenarioTest(tempest.test.BaseTestCase): # The create_[resource] functions only return body and discard the # resp part which is not used in scenario tests - def _create_port(self, network_id, client=None, namestart='port-quotatest', - **kwargs): - if not client: - client = self.ports_client - name = data_utils.rand_name(namestart) - result = client.create_port( - name=name, - network_id=network_id, - **kwargs) - self.assertIsNotNone(result, 'Unable to allocate port') - port = result['port'] - self.addCleanup(test_utils.call_and_ignore_notfound_exc, - client.delete_port, port['id']) - return port - @classmethod def create_keypair(cls, client=None): if not client: @@ -162,9 +144,10 @@ class ScenarioTest(tempest.test.BaseTestCase): for net in networks: net_id = net.get('uuid', net.get('id')) if 'port' not in net: - port = self._create_port(network_id=net_id, - client=clients.ports_client, - **create_port_body) + port = self.create_port(network_id=net_id, + client=clients.ports_client, + namestart='port-quotatest', + **create_port_body) ports.append({'port': port['id']}) else: ports.append({'port': net['port']}) @@ -249,43 +232,6 @@ class ScenarioTest(tempest.test.BaseTestCase): if not isinstance(exc, lib_exc.SSHTimeout): LOG.debug('Network information on a devstack host') - def ping_ip_address(self, ip_address, should_succeed=True, - ping_timeout=None, mtu=None): - timeout = ping_timeout or CONF.validation.ping_timeout - cmd = ['ping', '-c1', '-w1'] - - if mtu: - cmd += [ - # don't fragment - '-M', 'do', - # ping receives just the size of ICMP payload - '-s', str(net_utils.get_ping_payload_size(mtu, 4)) - ] - cmd.append(ip_address) - - def ping(): - proc = subprocess.Popen(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - proc.communicate() - - return (proc.returncode == 0) == should_succeed - - caller = test_utils.find_test_caller() - LOG.debug('%(caller)s begins to ping %(ip)s in %(timeout)s sec and the' - ' expected result is %(should_succeed)s', { - 'caller': caller, 'ip': ip_address, 'timeout': timeout, - 'should_succeed': - 'reachable' if should_succeed else 'unreachable' - }) - result = test_utils.call_until_true(ping, timeout, 1) - LOG.debug('%(caller)s finishes ping %(ip)s in %(timeout)s sec and the ' - 'ping result is %(result)s', { - 'caller': caller, 'ip': ip_address, 'timeout': timeout, - 'result': 'expected' if result else 'unexpected' - }) - return result - def check_vm_connectivity(self, ip_address, username=None, private_key=None, @@ -912,8 +858,10 @@ class NetworkScenarioTest(ScenarioTest): if provider_net: net_id = self.provider_network['id'] - port = self._create_port( - net_id, security_groups=[self.secgroup['id']], **kwargs) + port = self.create_port(net_id, + security_groups=[self.secgroup['id']], + namestart='port-quotatest', + **kwargs) self._create_server_and_wait(port, image, flavor) fip = self.create_floating_ip(