Don't use Tempest internal methods

All changed methods are clearly internal methods of Tempest, and the
Tempest commit 64e6b4457c748f74bfb4fbf3860ab65b65ae9beb has removed
the internal methods and now the gate issue happens.
This patch makes this project to use better ones instead.

Change-Id: I736a0099510a4ad6d1cb7ed4c2510c961de95b15
Related-Bug: #1667824
This commit is contained in:
Ken'ichi Ohmichi 2017-02-24 15:56:59 -08:00 committed by Ken'ichi Ohmichi
parent 0bded420c5
commit 88937db141
2 changed files with 22 additions and 19 deletions

View File

@ -95,23 +95,23 @@ class ScenarioPolicyBase(manager.NetworkScenarioTest):
checking the result of list_[networks,routers,subnets].
"""
seen_nets = self._list_networks()
seen_names = [n['name'] for n in seen_nets]
seen_ids = [n['id'] for n in seen_nets]
seen_nets = self.admin_manager.networks_client.list_networks()
seen_names = [n['name'] for n in seen_nets['networks']]
seen_ids = [n['id'] for n in seen_nets['networks']]
self.assertIn(self.network['name'], seen_names)
self.assertIn(self.network['id'], seen_ids)
if self.subnet:
seen_subnets = self._list_subnets()
seen_net_ids = [n['network_id'] for n in seen_subnets]
seen_subnet_ids = [n['id'] for n in seen_subnets]
seen_subnets = self.admin_manager.subnets_client.list_subnets()
seen_net_ids = [n['network_id'] for n in seen_subnets['subnets']]
seen_subnet_ids = [n['id'] for n in seen_subnets['subnets']]
self.assertIn(self.network['id'], seen_net_ids)
self.assertIn(self.subnet['id'], seen_subnet_ids)
if self.router:
seen_routers = self._list_routers()
seen_router_ids = [n['id'] for n in seen_routers]
seen_router_names = [n['name'] for n in seen_routers]
seen_routers = self.admin_manager.routers_client.list_routers()
seen_router_ids = [n['id'] for n in seen_routers['routers']]
seen_router_names = [n['name'] for n in seen_routers['routers']]
self.assertIn(self.router['name'],
seen_router_names)
self.assertIn(self.router['id'],
@ -191,7 +191,8 @@ class ScenarioPolicyBase(manager.NetworkScenarioTest):
private_key=private_key)
old_nic_list = self._get_server_nics(ssh_client)
# get a port from a list of one item
port_list = self._list_ports(device_id=server['id'])
port_list = self.admin_manager.ports_client.list_ports(
device_id=server['id'])['ports']
self.assertEqual(1, len(port_list))
old_port = port_list[0]
_, interface = self.interface_client.create_interface(
@ -204,9 +205,9 @@ class ScenarioPolicyBase(manager.NetworkScenarioTest):
server['id'], interface['port_id'])
def check_ports():
self.new_port_list = [port for port in
self._list_ports(device_id=server['id'])
if port != old_port]
ports = self.admin_manager.ports_client.list_ports(
device_id=server['id'])['ports']
self.new_port_list = [port for port in ports if port != old_port]
return len(self.new_port_list) == 1
if not test.call_until_true(check_ports, CONF.network.build_timeout,
@ -246,9 +247,10 @@ class ScenarioPolicyBase(manager.NetworkScenarioTest):
floating_ip, server = self.floating_ip_tuple
# get internal ports' ips:
# get all network ports in the new network
internal_ips = (p['fixed_ips'][0]['ip_address'] for p in
self._list_ports(tenant_id=server['tenant_id'],
network_id=network.id)
ports = self.admin_manager.ports_client.list_ports(
tenant_id=server['tenant_id'], network_id=network.id)['ports']
internal_ips = (p['fixed_ips'][0]['ip_address'] for p in ports
if p['device_owner'].startswith('network'))
self._check_server_connectivity(floating_ip, internal_ips)
@ -260,8 +262,8 @@ class ScenarioPolicyBase(manager.NetworkScenarioTest):
LOG.info(msg)
return
subnet = self._list_subnets(
network_id=CONF.network.public_network_id)
subnet = self.admin_manager.subnets_client.list_subnets(
network_id=CONF.network.public_network_id)['subnets']
self.assertEqual(1, len(subnet), "Found %d subnets" % len(subnet))
external_ips = [subnet[0]['gateway_ip']]

View File

@ -138,7 +138,8 @@ class TestPolicyBasicOps(manager_congress.ScenarioPolicyBase):
'classification', rule_id)
# Find the ports of on this server
ports = self._list_ports(device_id=self.servers[0]['id'])
ports = self.admin_manager.ports_client.list_ports(
device_id=self.servers[0]['id'])['ports']
def check_data():
results = self.admin_manager.congress_client.list_policy_rows(