Replace the usage of 'admin_manager' with 'os_admin'
Alias 'admin_manager' has been moved to 'os_admin' in version Pike, and it will be removed in version Queens. This patch is to replace the usage of 'admin_manager' with 'os_admin' which mainly used in Tempest scenario tests. Change-Id: I2ee802a2dc20eb4f065ecccec127f817b1d6743c
This commit is contained in:
parent
543ed08825
commit
5870ff1ada
|
@ -741,7 +741,7 @@ class NetworkScenarioTest(ScenarioTest):
|
|||
:returns: True if subnet with cidr already exist in tenant
|
||||
False else
|
||||
"""
|
||||
cidr_in_use = self.admin_manager.subnets_client.list_subnets(
|
||||
cidr_in_use = self.os_admin.subnets_client.list_subnets(
|
||||
tenant_id=tenant_id, cidr=cidr)['subnets']
|
||||
return len(cidr_in_use) != 0
|
||||
|
||||
|
@ -790,7 +790,7 @@ class NetworkScenarioTest(ScenarioTest):
|
|||
return subnet
|
||||
|
||||
def _get_server_port_id_and_ip4(self, server, ip_addr=None):
|
||||
ports = self.admin_manager.ports_client.list_ports(
|
||||
ports = self.os_admin.ports_client.list_ports(
|
||||
device_id=server['id'], fixed_ip=ip_addr)['ports']
|
||||
# A port can have more than one IP address in some cases.
|
||||
# If the network is dual-stack (IPv4 + IPv6), this port is associated
|
||||
|
@ -820,7 +820,7 @@ class NetworkScenarioTest(ScenarioTest):
|
|||
return port_map[0]
|
||||
|
||||
def _get_network_by_name(self, network_name):
|
||||
net = self.admin_manager.networks_client.list_networks(
|
||||
net = self.os_admin.networks_client.list_networks(
|
||||
name=network_name)['networks']
|
||||
self.assertNotEqual(len(net), 0,
|
||||
"Unable to get network by name: %s" % network_name)
|
||||
|
|
|
@ -36,8 +36,8 @@ class TestAggregatesBasicOps(manager.ScenarioTest):
|
|||
def setup_clients(cls):
|
||||
super(TestAggregatesBasicOps, cls).setup_clients()
|
||||
# Use admin client by default
|
||||
cls.aggregates_client = cls.admin_manager.aggregates_client
|
||||
cls.hosts_client = cls.admin_manager.hosts_client
|
||||
cls.aggregates_client = cls.os_admin.aggregates_client
|
||||
cls.hosts_client = cls.os_admin.hosts_client
|
||||
|
||||
def _create_aggregate(self, **kwargs):
|
||||
aggregate = (self.aggregates_client.create_aggregate(**kwargs)
|
||||
|
|
|
@ -126,21 +126,21 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
|
|||
via checking the result of list_[networks,routers,subnets]
|
||||
"""
|
||||
|
||||
seen_nets = self.admin_manager.networks_client.list_networks()
|
||||
seen_nets = self.os_admin.networks_client.list_networks()
|
||||
seen_names = [n['name'] for n in seen_nets['networks']]
|
||||
seen_ids = [n['id'] for n in seen_nets['networks']]
|
||||
self.assertIn(self.network['name'], seen_names)
|
||||
self.assertIn(self.network['id'], seen_ids)
|
||||
|
||||
if self.subnet:
|
||||
seen_subnets = self.admin_manager.subnets_client.list_subnets()
|
||||
seen_subnets = self.os_admin.subnets_client.list_subnets()
|
||||
seen_net_ids = [n['network_id'] for n in seen_subnets['subnets']]
|
||||
seen_subnet_ids = [n['id'] for n in seen_subnets['subnets']]
|
||||
self.assertIn(self.network['id'], seen_net_ids)
|
||||
self.assertIn(self.subnet['id'], seen_subnet_ids)
|
||||
|
||||
if self.router:
|
||||
seen_routers = self.admin_manager.routers_client.list_routers()
|
||||
seen_routers = self.os_admin.routers_client.list_routers()
|
||||
seen_router_ids = [n['id'] for n in seen_routers['routers']]
|
||||
seen_router_names = [n['name'] for n in seen_routers['routers']]
|
||||
self.assertIn(self.router['name'],
|
||||
|
@ -241,7 +241,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
|
|||
ip_address, private_key=private_key)
|
||||
old_nic_list = self._get_server_nics(ssh_client)
|
||||
# get a port from a list of one item
|
||||
port_list = self.admin_manager.ports_client.list_ports(
|
||||
port_list = self.os_admin.ports_client.list_ports(
|
||||
device_id=server['id'])['ports']
|
||||
self.assertEqual(1, len(port_list))
|
||||
old_port = port_list[0]
|
||||
|
@ -257,7 +257,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
|
|||
def check_ports():
|
||||
self.new_port_list = [
|
||||
port for port in
|
||||
self.admin_manager.ports_client.list_ports(
|
||||
self.os_admin.ports_client.list_ports(
|
||||
device_id=server['id'])['ports']
|
||||
if port['id'] != old_port['id']
|
||||
]
|
||||
|
@ -309,7 +309,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
|
|||
# get all network ports in the new network
|
||||
internal_ips = (
|
||||
p['fixed_ips'][0]['ip_address'] for p in
|
||||
self.admin_manager.ports_client.list_ports(
|
||||
self.os_admin.ports_client.list_ports(
|
||||
tenant_id=server['tenant_id'],
|
||||
network_id=network['id'])['ports']
|
||||
if p['device_owner'].startswith('network')
|
||||
|
@ -330,7 +330,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
|
|||
# which is always IPv4, so we must only test connectivity to
|
||||
# external IPv4 IPs if the external network is dualstack.
|
||||
v4_subnets = [
|
||||
s for s in self.admin_manager.subnets_client.list_subnets(
|
||||
s for s in self.os_admin.subnets_client.list_subnets(
|
||||
network_id=CONF.network.public_network_id)['subnets']
|
||||
if s['ip_version'] == 4
|
||||
]
|
||||
|
@ -630,7 +630,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
|
|||
self._setup_network_and_servers()
|
||||
floating_ip, server = self.floating_ip_tuple
|
||||
server_id = server['id']
|
||||
port_id = self.admin_manager.ports_client.list_ports(
|
||||
port_id = self.os_admin.ports_client.list_ports(
|
||||
device_id=server_id)['ports'][0]['id']
|
||||
server_pip = server['addresses'][self.network['name']][0]['addr']
|
||||
|
||||
|
@ -685,7 +685,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
|
|||
'Server should have been created from a '
|
||||
'pre-existing port.')
|
||||
# Assert the port is bound to the server.
|
||||
port_list = self.admin_manager.ports_client.list_ports(
|
||||
port_list = self.os_admin.ports_client.list_ports(
|
||||
device_id=server['id'], network_id=self.network['id'])['ports']
|
||||
self.assertEqual(1, len(port_list),
|
||||
'There should only be one port created for '
|
||||
|
@ -704,7 +704,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
|
|||
# Boot another server with the same port to make sure nothing was
|
||||
# left around that could cause issues.
|
||||
server = self._create_server(self.network, port['id'])
|
||||
port_list = self.admin_manager.ports_client.list_ports(
|
||||
port_list = self.os_admin.ports_client.list_ports(
|
||||
device_id=server['id'], network_id=self.network['id'])['ports']
|
||||
self.assertEqual(1, len(port_list),
|
||||
'There should only be one port created for '
|
||||
|
@ -729,23 +729,23 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
|
|||
# TODO(yfried): refactor this test to be used for other agents (dhcp)
|
||||
# as well
|
||||
|
||||
list_hosts = (self.admin_manager.routers_client.
|
||||
list_hosts = (self.os_admin.routers_client.
|
||||
list_l3_agents_hosting_router)
|
||||
schedule_router = (self.admin_manager.network_agents_client.
|
||||
schedule_router = (self.os_admin.network_agents_client.
|
||||
create_router_on_l3_agent)
|
||||
unschedule_router = (self.admin_manager.network_agents_client.
|
||||
unschedule_router = (self.os_admin.network_agents_client.
|
||||
delete_router_from_l3_agent)
|
||||
|
||||
agent_list_alive = set(
|
||||
a["id"] for a in
|
||||
self.admin_manager.network_agents_client.list_agents(
|
||||
self.os_admin.network_agents_client.list_agents(
|
||||
agent_type="L3 agent")['agents'] if a["alive"] is True
|
||||
)
|
||||
self._setup_network_and_servers()
|
||||
|
||||
# NOTE(kevinbenton): we have to use the admin credentials to check
|
||||
# for the distributed flag because self.router only has a project view.
|
||||
admin = self.admin_manager.routers_client.show_router(
|
||||
admin = self.os_admin.routers_client.show_router(
|
||||
self.router['id'])
|
||||
if admin['router'].get('distributed', False):
|
||||
msg = "Rescheduling test does not apply to distributed routers."
|
||||
|
@ -823,7 +823,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
|
|||
self._create_new_network()
|
||||
self._hotplug_server()
|
||||
fip, server = self.floating_ip_tuple
|
||||
new_ports = self.admin_manager.ports_client.list_ports(
|
||||
new_ports = self.os_admin.ports_client.list_ports(
|
||||
device_id=server["id"], network_id=self.new_net["id"])['ports']
|
||||
spoof_port = new_ports[0]
|
||||
private_key = self._get_server_key(server)
|
||||
|
|
|
@ -144,7 +144,7 @@ class TestGettingAddress(manager.NetworkScenarioTest):
|
|||
"""
|
||||
ports = [
|
||||
p["mac_address"] for p in
|
||||
self.admin_manager.ports_client.list_ports(
|
||||
self.os_admin.ports_client.list_ports(
|
||||
device_id=sid, network_id=network_id)['ports']
|
||||
]
|
||||
|
||||
|
|
|
@ -221,7 +221,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
|
|||
# Checks that we see the newly created network/subnet/router via
|
||||
# checking the result of list_[networks,routers,subnets]
|
||||
# Check that (router, subnet) couple exist in port_list
|
||||
seen_nets = self.admin_manager.networks_client.list_networks()
|
||||
seen_nets = self.os_admin.networks_client.list_networks()
|
||||
seen_names = [n['name'] for n in seen_nets['networks']]
|
||||
seen_ids = [n['id'] for n in seen_nets['networks']]
|
||||
|
||||
|
@ -230,13 +230,13 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
|
|||
|
||||
seen_subnets = [
|
||||
(n['id'], n['cidr'], n['network_id']) for n in
|
||||
self.admin_manager.subnets_client.list_subnets()['subnets']
|
||||
self.os_admin.subnets_client.list_subnets()['subnets']
|
||||
]
|
||||
mysubnet = (tenant.subnet['id'], tenant.subnet['cidr'],
|
||||
tenant.network['id'])
|
||||
self.assertIn(mysubnet, seen_subnets)
|
||||
|
||||
seen_routers = self.admin_manager.routers_client.list_routers()
|
||||
seen_routers = self.os_admin.routers_client.list_routers()
|
||||
seen_router_ids = [n['id'] for n in seen_routers['routers']]
|
||||
seen_router_names = [n['name'] for n in seen_routers['routers']]
|
||||
|
||||
|
@ -246,7 +246,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
|
|||
myport = (tenant.router['id'], tenant.subnet['id'])
|
||||
router_ports = [
|
||||
(i['device_id'], f['subnet_id'])
|
||||
for i in self.admin_manager.ports_client.list_ports(
|
||||
for i in self.os_admin.ports_client.list_ports(
|
||||
device_id=tenant.router['id'])['ports']
|
||||
if net_info.is_router_interface_port(i)
|
||||
for f in i['fixed_ips']
|
||||
|
@ -279,7 +279,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
|
|||
|
||||
# Verify servers are on different compute nodes
|
||||
if self.multi_node:
|
||||
adm_get_server = self.admin_manager.servers_client.show_server
|
||||
adm_get_server = self.os_admin.servers_client.show_server
|
||||
new_host = adm_get_server(server["id"])["server"][
|
||||
"OS-EXT-SRV-ATTR:host"]
|
||||
host_list = [adm_get_server(s)["server"]["OS-EXT-SRV-ATTR:host"]
|
||||
|
@ -447,7 +447,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
|
|||
mac_addr = mac_addr.strip().lower()
|
||||
# Get the fixed_ips and mac_address fields of all ports. Select
|
||||
# only those two columns to reduce the size of the response.
|
||||
port_list = self.admin_manager.ports_client.list_ports(
|
||||
port_list = self.os_admin.ports_client.list_ports(
|
||||
fields=['fixed_ips', 'mac_address'])['ports']
|
||||
port_detail_list = [
|
||||
(port['fixed_ips'][0]['subnet_id'],
|
||||
|
@ -541,7 +541,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
|
|||
dest=self._get_server_ip(server),
|
||||
should_succeed=False)
|
||||
server_id = server['id']
|
||||
port_id = self.admin_manager.ports_client.list_ports(
|
||||
port_id = self.os_admin.ports_client.list_ports(
|
||||
device_id=server_id)['ports'][0]['id']
|
||||
|
||||
# update port with new security group and check connectivity
|
||||
|
@ -605,7 +605,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
|
|||
|
||||
access_point_ssh = self._connect_to_access_point(new_tenant)
|
||||
server_id = server['id']
|
||||
port_id = self.admin_manager.ports_client.list_ports(
|
||||
port_id = self.os_admin.ports_client.list_ports(
|
||||
device_id=server_id)['ports'][0]['id']
|
||||
|
||||
# Flip the port's port security and check connectivity
|
||||
|
@ -647,7 +647,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
|
|||
sec_groups = []
|
||||
server = self._create_server(name, tenant, sec_groups)
|
||||
server_id = server['id']
|
||||
ports = self.admin_manager.ports_client.list_ports(
|
||||
ports = self.os_admin.ports_client.list_ports(
|
||||
device_id=server_id)['ports']
|
||||
self.assertEqual(1, len(ports))
|
||||
for port in ports:
|
||||
|
|
|
@ -39,10 +39,10 @@ class TestServerMultinode(manager.ScenarioTest):
|
|||
def setup_clients(cls):
|
||||
super(TestServerMultinode, cls).setup_clients()
|
||||
# Use admin client by default
|
||||
cls.manager = cls.admin_manager
|
||||
cls.manager = cls.os_admin
|
||||
# this is needed so that we can use the availability_zone:host
|
||||
# scheduler hint, which is admin_only by default
|
||||
cls.servers_client = cls.admin_manager.servers_client
|
||||
cls.servers_client = cls.os_admin.servers_client
|
||||
|
||||
@decorators.idempotent_id('9cecbe35-b9d4-48da-a37e-7ce70aa43d30')
|
||||
@decorators.attr(type='smoke')
|
||||
|
|
Loading…
Reference in New Issue