Move SG and FIP API wrapper to api.neutron

We no longer need to have SG and FIP API wrapper in api.network
as we only supports a single network back-end.

Completes blueprint drop-nova-network

Change-Id: I4e59d897508b497a3cd2ae2fda93b30b786610dc
This commit is contained in:
Akihiro Motoki 2017-05-22 08:31:56 +00:00
parent b88461882f
commit 9067ae8b0f
33 changed files with 906 additions and 1035 deletions

View File

@ -19,132 +19,10 @@ introduced to abstract the differences between them for seamless consumption by
different dashboard implementations.
"""
from horizon.utils.memoized import memoized
from openstack_dashboard.api import base
from openstack_dashboard.api import neutron
class NetworkClient(object):
def __init__(self, request):
# TODO(amotoki): neutron check needs to be dropped.
# The network API wrapper can depend on neutron.
neutron_enabled = base.is_service_enabled(request, 'network')
if neutron_enabled:
self.floating_ips = neutron.FloatingIpManager(request)
else:
self.floating_ips = None
if (neutron_enabled and
neutron.is_extension_supported(request, 'security-group')):
self.secgroups = neutron.SecurityGroupManager(request)
else:
self.secgroups = None
@property
def enabled(self):
return self.floating_ips is not None
def floating_ip_pools_list(request):
return NetworkClient(request).floating_ips.list_pools()
@memoized
def tenant_floating_ip_list(request, all_tenants=False):
return NetworkClient(request).floating_ips.list(all_tenants=all_tenants)
def tenant_floating_ip_get(request, floating_ip_id):
return NetworkClient(request).floating_ips.get(floating_ip_id)
def tenant_floating_ip_allocate(request, pool=None, tenant_id=None, **params):
return NetworkClient(request).floating_ips.allocate(pool,
tenant_id,
**params)
def tenant_floating_ip_release(request, floating_ip_id):
return NetworkClient(request).floating_ips.release(floating_ip_id)
def floating_ip_associate(request, floating_ip_id, port_id):
return NetworkClient(request).floating_ips.associate(floating_ip_id,
port_id)
def floating_ip_disassociate(request, floating_ip_id):
return NetworkClient(request).floating_ips.disassociate(floating_ip_id)
def floating_ip_target_list(request):
return NetworkClient(request).floating_ips.list_targets()
def floating_ip_target_get_by_instance(request, instance_id, cache=None):
return NetworkClient(request).floating_ips.get_target_id_by_instance(
instance_id, cache)
def floating_ip_target_list_by_instance(request, instance_id, cache=None):
floating_ips = NetworkClient(request).floating_ips
return floating_ips.list_target_id_by_instance(instance_id, cache)
def floating_ip_simple_associate_supported(request):
return NetworkClient(request).floating_ips.is_simple_associate_supported()
def floating_ip_supported(request):
nwc = NetworkClient(request)
return nwc.enabled and nwc.floating_ips.is_supported()
@memoized
def security_group_list(request):
return NetworkClient(request).secgroups.list()
def security_group_get(request, sg_id):
return NetworkClient(request).secgroups.get(sg_id)
def security_group_create(request, name, desc):
return NetworkClient(request).secgroups.create(name, desc)
def security_group_delete(request, sg_id):
return NetworkClient(request).secgroups.delete(sg_id)
def security_group_update(request, sg_id, name, desc):
return NetworkClient(request).secgroups.update(sg_id, name, desc)
def security_group_rule_create(request, parent_group_id,
direction, ethertype,
ip_protocol, from_port, to_port,
cidr, group_id):
return NetworkClient(request).secgroups.rule_create(
parent_group_id, direction, ethertype, ip_protocol,
from_port, to_port, cidr, group_id)
def security_group_rule_delete(request, sgr_id):
return NetworkClient(request).secgroups.rule_delete(sgr_id)
def server_security_groups(request, instance_id):
return NetworkClient(request).secgroups.list_by_instance(instance_id)
def server_update_security_groups(request, instance_id,
new_security_group_ids):
return NetworkClient(request).secgroups.update_instance_security_group(
instance_id, new_security_group_ids)
def servers_update_addresses(request, servers, all_tenants=False):
"""Retrieve servers networking information from Neutron if enabled.

View File

@ -1246,6 +1246,101 @@ def provider_list(request):
return providers['service_providers']
def floating_ip_pools_list(request):
return FloatingIpManager(request).list_pools()
@memoized
def tenant_floating_ip_list(request, all_tenants=False):
return FloatingIpManager(request).list(all_tenants=all_tenants)
def tenant_floating_ip_get(request, floating_ip_id):
return FloatingIpManager(request).get(floating_ip_id)
def tenant_floating_ip_allocate(request, pool=None, tenant_id=None, **params):
return FloatingIpManager(request).allocate(pool, tenant_id, **params)
def tenant_floating_ip_release(request, floating_ip_id):
return FloatingIpManager(request).release(floating_ip_id)
def floating_ip_associate(request, floating_ip_id, port_id):
return FloatingIpManager(request).associate(floating_ip_id, port_id)
def floating_ip_disassociate(request, floating_ip_id):
return FloatingIpManager(request).disassociate(floating_ip_id)
def floating_ip_target_list(request):
return FloatingIpManager(request).list_targets()
def floating_ip_target_get_by_instance(request, instance_id, cache=None):
return FloatingIpManager(request).get_target_id_by_instance(
instance_id, cache)
def floating_ip_target_list_by_instance(request, instance_id, cache=None):
return FloatingIpManager(request).list_target_id_by_instance(
instance_id, cache)
def floating_ip_simple_associate_supported(request):
return FloatingIpManager(request).is_simple_associate_supported()
def floating_ip_supported(request):
return FloatingIpManager(request).is_supported()
@memoized
def security_group_list(request):
return SecurityGroupManager(request).list()
def security_group_get(request, sg_id):
return SecurityGroupManager(request).get(sg_id)
def security_group_create(request, name, desc):
return SecurityGroupManager(request).create(name, desc)
def security_group_delete(request, sg_id):
return SecurityGroupManager(request).delete(sg_id)
def security_group_update(request, sg_id, name, desc):
return SecurityGroupManager(request).update(sg_id, name, desc)
def security_group_rule_create(request, parent_group_id,
direction, ethertype,
ip_protocol, from_port, to_port,
cidr, group_id):
return SecurityGroupManager(request).rule_create(
parent_group_id, direction, ethertype, ip_protocol,
from_port, to_port, cidr, group_id)
def security_group_rule_delete(request, sgr_id):
return SecurityGroupManager(request).rule_delete(sgr_id)
def server_security_groups(request, instance_id):
return SecurityGroupManager(request).list_by_instance(instance_id)
def server_update_security_groups(request, instance_id,
new_security_group_ids):
return SecurityGroupManager(request).update_instance_security_group(
instance_id, new_security_group_ids)
# TODO(pkarikh) need to uncomment when osprofiler will have no
# issues with unicode in:
# openstack_dashboard/test/test_data/nova_data.py#L470 data

View File

@ -42,7 +42,7 @@ class SecurityGroups(generic.View):
http://localhost/api/network/securitygroups
"""
security_groups = api.network.security_group_list(request)
security_groups = api.neutron.security_group_list(request)
return {'items': [sg.to_dict() for sg in security_groups]}
@ -63,7 +63,7 @@ class FloatingIP(generic.View):
:return: JSON representation of the new floating IP address
"""
pool = request.DATA['pool_id']
result = api.network.tenant_floating_ip_allocate(request, pool)
result = api.neutron.tenant_floating_ip_allocate(request, pool)
return result.to_dict()
@rest_utils.ajax(data_required=True)
@ -77,9 +77,9 @@ class FloatingIP(generic.View):
address = request.DATA['address_id']
port = request.DATA.get('port_id')
if port is None:
api.network.floating_ip_disassociate(request, address)
api.neutron.floating_ip_disassociate(request, address)
else:
api.network.floating_ip_associate(request, address, port)
api.neutron.floating_ip_associate(request, address, port)
@urls.register
@ -98,7 +98,7 @@ class FloatingIPs(generic.View):
Example:
http://localhost/api/network/floatingips
"""
result = api.network.tenant_floating_ip_list(request)
result = api.neutron.tenant_floating_ip_list(request)
return {'items': [ip.to_dict() for ip in result]}
@ -118,5 +118,5 @@ class FloatingIPPools(generic.View):
Example:
http://localhost/api/network/floatingippools
"""
result = api.network.floating_ip_pools_list(request)
result = api.neutron.floating_ip_pools_list(request)
return {'items': [p.to_dict() for p in result]}

View File

@ -233,7 +233,7 @@ class SecurityGroups(generic.View):
Example GET:
http://localhost/api/nova/servers/abcd/security-groups/
"""
groups = api.network.server_security_groups(request, server_id)
groups = api.neutron.server_security_groups(request, server_id)
return {'items': [s.to_dict() for s in groups]}

View File

@ -208,8 +208,8 @@ class AggregatesViewTests(test.BaseAdminViewTests):
'availability_zone_list',
'tenant_absolute_limits',),
api.cinder: ('tenant_absolute_limits',),
api.neutron: ('is_extension_supported',),
api.network: ('tenant_floating_ip_list',
api.neutron: ('is_extension_supported',
'tenant_floating_ip_list',
'security_group_list'),
api.keystone: ('tenant_list',)})
def test_panel_not_available(self):
@ -220,9 +220,9 @@ class AggregatesViewTests(test.BaseAdminViewTests):
api.neutron.\
is_extension_supported(IsA(http.HttpRequest), 'security-group'). \
MultipleTimes().AndReturn(True)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.keystone.tenant_list(IsA(http.HttpRequest)) \
.AndReturn(self.tenants.list())

View File

@ -50,7 +50,7 @@ class AdminFloatingIpAllocate(forms.SelfHandlingForm):
param['floating_ip_address'] = data['floating_ip_address']
subnet = api.neutron.subnet_get(request, data['pool'])
param['subnet_id'] = subnet.id
fip = api.network.tenant_floating_ip_allocate(
fip = api.neutron.tenant_floating_ip_allocate(
request,
pool=subnet.network_id,
tenant_id=data['tenant'],

View File

@ -61,7 +61,7 @@ class AdminSimpleDisassociateIP(project_tables.DisassociateIP):
def single(self, table, request, obj_id):
try:
fip = table.get_object_by_id(filters.get_int_or_uuid(obj_id))
api.network.floating_ip_disassociate(request, fip.id)
api.neutron.floating_ip_disassociate(request, fip.id)
LOG.info('Disassociating Floating IP "%s".', obj_id)
messages.success(request,
_('Successfully disassociated Floating IP: %s')

View File

@ -25,16 +25,16 @@ INDEX_TEMPLATE = 'horizon/common/_data_table_view.html'
class AdminFloatingIpViewTest(test.BaseAdminViewTests):
@test.create_stubs({api.network: ('tenant_floating_ip_list', ),
api.nova: ('server_list', ),
@test.create_stubs({api.nova: ('server_list', ),
api.keystone: ('tenant_list', ),
api.neutron: ('network_list', )})
api.neutron: ('network_list',
'tenant_floating_ip_list',)})
def test_index(self):
# Use neutron test data
fips = self.floating_ips.list()
servers = self.servers.list()
tenants = self.tenants.list()
api.network.tenant_floating_ip_list(IsA(http.HttpRequest),
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest),
all_tenants=True).AndReturn(fips)
api.nova.server_list(IsA(http.HttpRequest), all_tenants=True) \
.AndReturn([servers, False])
@ -58,12 +58,12 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
row_actions = floating_ips_table.get_row_actions(floating_ips[1])
self.assertEqual(len(row_actions), 2)
@test.create_stubs({api.network: ('tenant_floating_ip_get', ),
api.neutron: ('network_get', )})
@test.create_stubs({api.neutron: ('tenant_floating_ip_get',
'network_get', )})
def test_floating_ip_detail_get(self):
fip = self.floating_ips.first()
network = self.networks.first()
api.network.tenant_floating_ip_get(
api.neutron.tenant_floating_ip_get(
IsA(http.HttpRequest), fip.id).AndReturn(fip)
api.neutron.network_get(
IsA(http.HttpRequest), fip.pool).AndReturn(network)
@ -75,11 +75,11 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
'admin/floating_ips/detail.html')
self.assertEqual(res.context['floating_ip'].ip, fip.ip)
@test.create_stubs({api.network: ('tenant_floating_ip_get',)})
@test.create_stubs({api.neutron: ('tenant_floating_ip_get',)})
def test_floating_ip_detail_exception(self):
fip = self.floating_ips.first()
# Only supported by neutron, so raise a neutron exception
api.network.tenant_floating_ip_get(
api.neutron.tenant_floating_ip_get(
IsA(http.HttpRequest),
fip.id).AndRaise(self.exceptions.neutron)
@ -90,18 +90,18 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api.network: ('tenant_floating_ip_list', )})
@test.create_stubs({api.neutron: ('tenant_floating_ip_list', )})
def test_index_no_floating_ips(self):
api.network.tenant_floating_ip_list(IsA(http.HttpRequest),
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest),
all_tenants=True).AndReturn([])
self.mox.ReplayAll()
res = self.client.get(INDEX_URL)
self.assertTemplateUsed(res, INDEX_TEMPLATE)
@test.create_stubs({api.network: ('tenant_floating_ip_list', )})
@test.create_stubs({api.neutron: ('tenant_floating_ip_list', )})
def test_index_error(self):
api.network.tenant_floating_ip_list(IsA(http.HttpRequest),
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest),
all_tenants=True) \
.AndRaise(self.exceptions.neutron)
self.mox.ReplayAll()
@ -154,8 +154,8 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
res = self.client.post(url, form_data)
self.assertContains(res, "Invalid version for IP address")
@test.create_stubs({api.network: ('tenant_floating_ip_allocate',),
api.neutron: ('network_list', 'subnet_get'),
@test.create_stubs({api.neutron: ('tenant_floating_ip_allocate',
'network_list', 'subnet_get'),
api.keystone: ('tenant_list',)})
def test_admin_allocate_post(self):
tenant = self.tenants.first()
@ -171,7 +171,7 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
search_opts = {'router:external': True}
api.neutron.network_list(IsA(http.HttpRequest), **search_opts) \
.AndReturn([pool])
api.network.tenant_floating_ip_allocate(
api.neutron.tenant_floating_ip_allocate(
IsA(http.HttpRequest),
pool=pool.id,
tenant_id=tenant.id).AndReturn(floating_ip)
@ -183,18 +183,18 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
res = self.client.post(url, form_data)
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api.network: ('tenant_floating_ip_list',
'floating_ip_disassociate'),
@test.create_stubs({api.neutron: ('tenant_floating_ip_list',
'floating_ip_disassociate',
'network_list'),
api.nova: ('server_list', ),
api.keystone: ('tenant_list', ),
api.neutron: ('network_list', )})
api.keystone: ('tenant_list', )})
def test_admin_disassociate_floatingip(self):
# Use neutron test data
fips = self.floating_ips.list()
floating_ip = self.floating_ips.list()[1]
servers = self.servers.list()
tenants = self.tenants.list()
api.network.tenant_floating_ip_list(IsA(http.HttpRequest),
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest),
all_tenants=True).AndReturn(fips)
api.nova.server_list(IsA(http.HttpRequest), all_tenants=True) \
.AndReturn([servers, False])
@ -203,7 +203,7 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
params = {"router:external": True}
api.neutron.network_list(IsA(http.HttpRequest), **params) \
.AndReturn(self.networks.list())
api.network.floating_ip_disassociate(IsA(http.HttpRequest),
api.neutron.floating_ip_disassociate(IsA(http.HttpRequest),
floating_ip.id)
self.mox.ReplayAll()
@ -214,17 +214,17 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
self.assertNoFormErrors(res)
@test.create_stubs({api.network: ('tenant_floating_ip_list', ),
@test.create_stubs({api.neutron: ('tenant_floating_ip_list',
'network_list'),
api.nova: ('server_list', ),
api.keystone: ('tenant_list', ),
api.neutron: ('network_list', )})
api.keystone: ('tenant_list', )})
def test_admin_delete_floatingip(self):
# Use neutron test data
fips = self.floating_ips.list()
floating_ip = self.floating_ips.list()[1]
servers = self.servers.list()
tenants = self.tenants.list()
api.network.tenant_floating_ip_list(IsA(http.HttpRequest),
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest),
all_tenants=True).AndReturn(fips)
api.nova.server_list(IsA(http.HttpRequest), all_tenants=True) \
.AndReturn([servers, False])
@ -243,16 +243,16 @@ class AdminFloatingIpViewTest(test.BaseAdminViewTests):
self.assertNoFormErrors(res)
@test.create_stubs({api.network: ('tenant_floating_ip_list', ),
@test.create_stubs({api.neutron: ('tenant_floating_ip_list',
'network_list'),
api.nova: ('server_list', ),
api.keystone: ('tenant_list', ),
api.neutron: ('network_list', )})
api.keystone: ('tenant_list', )})
def test_floating_ip_table_actions(self):
# Use neutron test data
fips = self.floating_ips.list()
servers = self.servers.list()
tenants = self.tenants.list()
api.network.tenant_floating_ip_list(IsA(http.HttpRequest),
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest),
all_tenants=True).AndReturn(fips)
api.nova.server_list(IsA(http.HttpRequest), all_tenants=True) \
.AndReturn([servers, False])

View File

@ -65,7 +65,7 @@ class IndexView(tables.DataTableView):
def get_data(self):
floating_ips = []
try:
floating_ips = api.network.tenant_floating_ip_list(
floating_ips = api.neutron.tenant_floating_ip_list(
self.request,
all_tenants=True)
except Exception:
@ -104,7 +104,7 @@ class DetailView(views.HorizonTemplateView):
page_title = _("Floating IP Details")
def _get_corresponding_data(self, resource, resource_id):
function_dict = {"floating IP": api.network.tenant_floating_ip_get,
function_dict = {"floating IP": api.neutron.tenant_floating_ip_get,
"instance": api.nova.server_get,
"network": api.neutron.network_get,
"router": api.neutron.router_get}

View File

@ -44,9 +44,9 @@ class UsageViewTests(test.BaseAdminViewTests):
self.mox.StubOutWithMock(api.nova, 'extension_supported')
self.mox.StubOutWithMock(api.keystone, 'tenant_list')
self.mox.StubOutWithMock(api.neutron, 'is_extension_supported')
self.mox.StubOutWithMock(api.network, 'floating_ip_supported')
self.mox.StubOutWithMock(api.network, 'tenant_floating_ip_list')
self.mox.StubOutWithMock(api.network, 'security_group_list')
self.mox.StubOutWithMock(api.neutron, 'floating_ip_supported')
self.mox.StubOutWithMock(api.neutron, 'tenant_floating_ip_list')
self.mox.StubOutWithMock(api.neutron, 'security_group_list')
self.mox.StubOutWithMock(api.cinder, 'tenant_absolute_limits')
api.nova.extension_supported(
@ -103,11 +103,11 @@ class UsageViewTests(test.BaseAdminViewTests):
.AndReturn(self.limits['absolute'])
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'security-group').AndReturn(True)
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.AndReturn(True)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.cinder.tenant_absolute_limits(IsA(http.HttpRequest)) \
.AndReturn(self.cinder_limits['absolute'])
@ -199,11 +199,11 @@ class UsageViewTests(test.BaseAdminViewTests):
.AndReturn(self.limits['absolute'])
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'security-group').AndReturn(True)
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.AndReturn(True)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.cinder.tenant_absolute_limits(IsA(http.HttpRequest)) \
.AndReturn(self.cinder_limits['absolute'])

View File

@ -1566,19 +1566,19 @@ class UsageViewTests(test.BaseAdminViewTests):
def _stub_neutron_api_calls(self, neutron_sg_enabled=True):
self.mox.StubOutWithMock(api.neutron, 'is_extension_supported')
self.mox.StubOutWithMock(api.network, 'floating_ip_supported')
self.mox.StubOutWithMock(api.network, 'tenant_floating_ip_list')
self.mox.StubOutWithMock(api.neutron, 'floating_ip_supported')
self.mox.StubOutWithMock(api.neutron, 'tenant_floating_ip_list')
if neutron_sg_enabled:
self.mox.StubOutWithMock(api.network, 'security_group_list')
self.mox.StubOutWithMock(api.neutron, 'security_group_list')
api.neutron.is_extension_supported(
IsA(http.HttpRequest),
'security-group').AndReturn(neutron_sg_enabled)
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.AndReturn(True)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
if neutron_sg_enabled:
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
def test_usage_csv(self):

View File

@ -45,7 +45,7 @@ class FloatingIpAllocate(forms.SelfHandlingForm):
self.api_error(error_message)
return False
fip = api.network.tenant_floating_ip_allocate(request,
fip = api.neutron.tenant_floating_ip_allocate(request,
pool=data['pool'])
messages.success(request,
_('Allocated Floating IP %(ip)s.')

View File

@ -90,7 +90,7 @@ class ReleaseIPs(tables.BatchAction):
return policy.check(policy_rules, request)
def action(self, request, obj_id):
api.network.tenant_floating_ip_release(request, obj_id)
api.neutron.tenant_floating_ip_release(request, obj_id)
class AssociateIP(tables.LinkAction):
@ -124,7 +124,7 @@ class DisassociateIP(tables.Action):
def single(self, table, request, obj_id):
try:
fip = table.get_object_by_id(filters.get_int_or_uuid(obj_id))
api.network.floating_ip_disassociate(request, fip.id)
api.neutron.floating_ip_disassociate(request, fip.id)
LOG.info('Disassociating Floating IP "%s".', obj_id)
messages.success(request,
_('Successfully disassociated Floating IP: %s')

View File

@ -37,12 +37,12 @@ NAMESPACE = "horizon:project:floating_ips"
class FloatingIpViewTests(test.TestCase):
@test.create_stubs({api.network: ('floating_ip_target_list',
@test.create_stubs({api.neutron: ('floating_ip_target_list',
'tenant_floating_ip_list',)})
def test_associate(self):
api.network.floating_ip_target_list(IsA(http.HttpRequest)) \
api.neutron.floating_ip_target_list(IsA(http.HttpRequest)) \
.AndReturn(self._get_fip_targets())
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
self.mox.ReplayAll()
@ -54,18 +54,18 @@ class FloatingIpViewTests(test.TestCase):
# Verify that our "associated" floating IP isn't in the choices list.
self.assertNotIn(self.floating_ips.first(), choices)
@test.create_stubs({api.network: ('floating_ip_target_list',
@test.create_stubs({api.neutron: ('floating_ip_target_list',
'floating_ip_target_get_by_instance',
'tenant_floating_ip_list',)})
def test_associate_with_instance_id(self):
targets = self._get_fip_targets()
target = targets[0]
api.network.floating_ip_target_list(IsA(http.HttpRequest)) \
api.neutron.floating_ip_target_list(IsA(http.HttpRequest)) \
.AndReturn(targets)
api.network.floating_ip_target_get_by_instance(
api.neutron.floating_ip_target_get_by_instance(
IsA(http.HttpRequest), target.instance_id, targets) \
.AndReturn(target.id)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
self.mox.ReplayAll()
@ -100,16 +100,16 @@ class FloatingIpViewTests(test.TestCase):
def _get_target_id(port):
return '%s_%s' % (port.id, port.fixed_ips[0]['ip_address'])
@test.create_stubs({api.network: ('floating_ip_target_list',
@test.create_stubs({api.neutron: ('floating_ip_target_list',
'tenant_floating_ip_list',)})
def test_associate_with_port_id(self):
compute_port = self._get_compute_ports()[0]
associated_fips = [fip.id for fip in self.floating_ips.list()
if fip.port_id]
api.network.floating_ip_target_list(IsA(http.HttpRequest)) \
api.neutron.floating_ip_target_list(IsA(http.HttpRequest)) \
.AndReturn(self._get_fip_targets())
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
self.mox.ReplayAll()
@ -123,7 +123,7 @@ class FloatingIpViewTests(test.TestCase):
# Verify that our "associated" floating IP isn't in the choices list.
self.assertFalse(set(associated_fips) & set(choices.keys()))
@test.create_stubs({api.network: ('floating_ip_associate',
@test.create_stubs({api.neutron: ('floating_ip_associate',
'floating_ip_target_list',
'tenant_floating_ip_list',)})
def test_associate_post(self):
@ -132,11 +132,11 @@ class FloatingIpViewTests(test.TestCase):
compute_port = self._get_compute_ports()[0]
port_target_id = self._get_target_id(compute_port)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
api.network.floating_ip_target_list(IsA(http.HttpRequest)) \
api.neutron.floating_ip_target_list(IsA(http.HttpRequest)) \
.AndReturn(self._get_fip_targets())
api.network.floating_ip_associate(IsA(http.HttpRequest),
api.neutron.floating_ip_associate(IsA(http.HttpRequest),
floating_ip.id,
port_target_id)
self.mox.ReplayAll()
@ -147,7 +147,7 @@ class FloatingIpViewTests(test.TestCase):
res = self.client.post(url, form_data)
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api.network: ('floating_ip_associate',
@test.create_stubs({api.neutron: ('floating_ip_associate',
'floating_ip_target_list',
'tenant_floating_ip_list',)})
def test_associate_post_with_redirect(self):
@ -156,11 +156,11 @@ class FloatingIpViewTests(test.TestCase):
compute_port = self._get_compute_ports()[0]
port_target_id = self._get_target_id(compute_port)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
api.network.floating_ip_target_list(IsA(http.HttpRequest)) \
api.neutron.floating_ip_target_list(IsA(http.HttpRequest)) \
.AndReturn(self._get_fip_targets())
api.network.floating_ip_associate(IsA(http.HttpRequest),
api.neutron.floating_ip_associate(IsA(http.HttpRequest),
floating_ip.id,
port_target_id)
self.mox.ReplayAll()
@ -172,7 +172,7 @@ class FloatingIpViewTests(test.TestCase):
res = self.client.post(url, form_data)
self.assertRedirectsNoFollow(res, next)
@test.create_stubs({api.network: ('floating_ip_associate',
@test.create_stubs({api.neutron: ('floating_ip_associate',
'floating_ip_target_list',
'tenant_floating_ip_list',)})
def test_associate_post_with_exception(self):
@ -181,11 +181,11 @@ class FloatingIpViewTests(test.TestCase):
compute_port = self._get_compute_ports()[0]
port_target_id = self._get_target_id(compute_port)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
api.network.floating_ip_target_list(IsA(http.HttpRequest)) \
api.neutron.floating_ip_target_list(IsA(http.HttpRequest)) \
.AndReturn(self._get_fip_targets())
api.network.floating_ip_associate(IsA(http.HttpRequest),
api.neutron.floating_ip_associate(IsA(http.HttpRequest),
floating_ip.id,
port_target_id) \
.AndRaise(self.exceptions.nova)
@ -198,21 +198,21 @@ class FloatingIpViewTests(test.TestCase):
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api.nova: ('server_list',),
api.network: ('floating_ip_disassociate',
api.neutron: ('floating_ip_disassociate',
'floating_ip_pools_list',
'tenant_floating_ip_get',
'tenant_floating_ip_list',),
api.neutron: ('is_extension_supported',)})
'tenant_floating_ip_list',
'is_extension_supported',)})
def test_disassociate_post(self):
floating_ip = self.floating_ips.first()
api.nova.server_list(IsA(http.HttpRequest), detailed=False) \
.AndReturn([self.servers.list(), False])
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'subnet_allocation')\
.AndReturn(True)
api.network.floating_ip_disassociate(IsA(http.HttpRequest),
api.neutron.floating_ip_pools_list(IsA(http.HttpRequest)) \
.AndReturn(self.pools.list())
api.neutron.floating_ip_disassociate(IsA(http.HttpRequest),
floating_ip.id)
self.mox.ReplayAll()
@ -222,22 +222,21 @@ class FloatingIpViewTests(test.TestCase):
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api.nova: ('server_list',),
api.network: ('floating_ip_disassociate',
api.neutron: ('floating_ip_disassociate',
'floating_ip_pools_list',
'tenant_floating_ip_get',
'tenant_floating_ip_list',),
api.neutron: ('is_extension_supported',)})
'tenant_floating_ip_list',
'is_extension_supported',)})
def test_disassociate_post_with_exception(self):
floating_ip = self.floating_ips.first()
api.nova.server_list(IsA(http.HttpRequest), detailed=False) \
.AndReturn([self.servers.list(), False])
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'subnet_allocation')\
.AndReturn(True)
api.network.floating_ip_disassociate(IsA(http.HttpRequest),
api.neutron.floating_ip_pools_list(IsA(http.HttpRequest)) \
.AndReturn(self.pools.list())
api.neutron.floating_ip_disassociate(IsA(http.HttpRequest),
floating_ip.id) \
.AndRaise(self.exceptions.nova)
self.mox.ReplayAll()
@ -246,7 +245,7 @@ class FloatingIpViewTests(test.TestCase):
res = self.client.post(INDEX_URL, {"action": action})
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api.network: ('tenant_floating_ip_list',
@test.create_stubs({api.neutron: ('tenant_floating_ip_list',
'floating_ip_pools_list',),
api.nova: ('server_list',),
quotas: ('tenant_quota_usages',),
@ -257,10 +256,10 @@ class FloatingIpViewTests(test.TestCase):
quota_data = self.quota_usages.first()
quota_data['floating_ips']['available'] = 10
api.network.tenant_floating_ip_list(
api.neutron.tenant_floating_ip_list(
IsA(http.HttpRequest)) \
.AndReturn(floating_ips)
api.network.floating_ip_pools_list(
api.neutron.floating_ip_pools_list(
IsA(http.HttpRequest)) \
.AndReturn(floating_pools)
api.nova.server_list(
@ -284,7 +283,7 @@ class FloatingIpViewTests(test.TestCase):
url = 'horizon:project:floating_ips:allocate'
self.assertEqual(url, allocate_action.url)
@test.create_stubs({api.network: ('tenant_floating_ip_list',
@test.create_stubs({api.neutron: ('tenant_floating_ip_list',
'floating_ip_pools_list',),
api.nova: ('server_list',),
quotas: ('tenant_quota_usages',),
@ -295,10 +294,10 @@ class FloatingIpViewTests(test.TestCase):
quota_data = self.quota_usages.first()
quota_data['floating_ips']['available'] = 0
api.network.tenant_floating_ip_list(
api.neutron.tenant_floating_ip_list(
IsA(http.HttpRequest)) \
.AndReturn(floating_ips)
api.network.floating_ip_pools_list(
api.neutron.floating_ip_pools_list(
IsA(http.HttpRequest)) \
.AndReturn(floating_pools)
api.nova.server_list(
@ -321,11 +320,11 @@ class FloatingIpViewTests(test.TestCase):
@test.create_stubs({api.nova: ('tenant_quota_get', 'flavor_list',
'server_list'),
api.network: ('floating_ip_pools_list',
api.neutron: ('floating_ip_pools_list',
'floating_ip_supported',
'security_group_list',
'tenant_floating_ip_list'),
api.neutron: ('is_extension_supported',
'tenant_floating_ip_list',
'is_extension_supported',
'is_router_enabled',
'tenant_quota_get',
'network_list',
@ -368,13 +367,13 @@ class FloatingIpViewTests(test.TestCase):
api.neutron.network_list(IsA(http.HttpRequest),
tenant_id=self.tenant.id) \
.AndReturn(self.networks.list())
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.AndReturn(True)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(self.floating_ips.list())
api.network.floating_ip_pools_list(IsA(http.HttpRequest)) \
api.neutron.floating_ip_pools_list(IsA(http.HttpRequest)) \
.AndReturn(self.pools.list())
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
self.mox.ReplayAll()

View File

@ -68,7 +68,7 @@ class AllocateView(forms.ModalFormView):
def get_initial(self):
try:
pools = api.network.floating_ip_pools_list(self.request)
pools = api.neutron.floating_ip_pools_list(self.request)
except neutron_exc.ConnectionFailed:
pools = []
exceptions.handle(self.request)
@ -88,7 +88,7 @@ class IndexView(tables.DataTableView):
def get_data(self):
try:
floating_ips = api.network.tenant_floating_ip_list(self.request)
floating_ips = api.neutron.tenant_floating_ip_list(self.request)
except neutron_exc.ConnectionFailed:
floating_ips = []
exceptions.handle(self.request)
@ -99,7 +99,7 @@ class IndexView(tables.DataTableView):
try:
floating_ip_pools = \
api.network.floating_ip_pools_list(self.request)
api.neutron.floating_ip_pools_list(self.request)
except neutron_exc.ConnectionFailed:
floating_ip_pools = []
exceptions.handle(self.request)

View File

@ -54,7 +54,7 @@ class AssociateIPAction(workflows.Action):
q_port_id = self.request.GET.get('port_id')
if q_instance_id:
targets = self._get_target_list()
target_id = api.network.floating_ip_target_get_by_instance(
target_id = api.neutron.floating_ip_target_get_by_instance(
self.request, q_instance_id, targets)
self.initial['instance_id'] = target_id
elif q_port_id:
@ -69,7 +69,7 @@ class AssociateIPAction(workflows.Action):
ips = []
redirect = reverse('horizon:project:floating_ips:index')
try:
ips = api.network.tenant_floating_ip_list(self.request)
ips = api.neutron.tenant_floating_ip_list(self.request)
except neutron_exc.ConnectionFailed:
exceptions.handle(self.request, redirect=redirect)
except Exception:
@ -88,7 +88,7 @@ class AssociateIPAction(workflows.Action):
def _get_target_list(self):
targets = []
try:
targets = api.network.floating_ip_target_list(self.request)
targets = api.neutron.floating_ip_target_list(self.request)
except Exception:
redirect = reverse('horizon:project:floating_ips:index')
exceptions.handle(self.request,
@ -145,7 +145,7 @@ class IPAssociationWorkflow(workflows.Workflow):
def handle(self, request, data):
try:
api.network.floating_ip_associate(request,
api.neutron.floating_ip_associate(request,
data['ip_id'],
data['instance_id'])
except neutron_exc.Conflict:

View File

@ -633,9 +633,9 @@ class AssociateIP(policy.PolicyTargetMixin, tables.LinkAction):
def allowed(self, request, instance):
if not api.base.is_service_enabled(request, 'network'):
return False
if not api.network.floating_ip_supported(request):
if not api.neutron.floating_ip_supported(request):
return False
if api.network.floating_ip_simple_associate_supported(request):
if api.neutron.floating_ip_simple_associate_supported(request):
return False
if instance.status == "ERROR":
return False
@ -669,7 +669,7 @@ class SimpleDisassociateIP(policy.PolicyTargetMixin, tables.Action):
def allowed(self, request, instance):
if not api.base.is_service_enabled(request, 'network'):
return False
if not api.network.floating_ip_supported(request):
if not api.neutron.floating_ip_supported(request):
return False
if not conf.HORIZON_CONFIG["simple_ip_management"]:
return False
@ -683,18 +683,18 @@ class SimpleDisassociateIP(policy.PolicyTargetMixin, tables.Action):
try:
# target_id is port_id for Neutron and instance_id for Nova Network
# (Neutron API wrapper returns a 'portid_fixedip' string)
targets = api.network.floating_ip_target_list_by_instance(
targets = api.neutron.floating_ip_target_list_by_instance(
request, instance_id)
target_ids = [t.split('_')[0] for t in targets]
fips = [fip for fip in api.network.tenant_floating_ip_list(request)
fips = [fip for fip in api.neutron.tenant_floating_ip_list(request)
if fip.port_id in target_ids]
# Removing multiple floating IPs at once doesn't work, so this pops
# off the first one.
if fips:
fip = fips.pop()
api.network.floating_ip_disassociate(request, fip.id)
api.neutron.floating_ip_disassociate(request, fip.id)
messages.success(request,
_("Successfully disassociated "
"floating IP: %s") % fip.ip)

View File

@ -100,7 +100,7 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
.AndReturn(self.flavors.list())
api.nova.keypair_list(IsA(http.HttpRequest)) \
.AndReturn(self.keypairs.list())
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())
@ -123,9 +123,11 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
'is_feature_available',
),
api.glance: ('image_list_detailed',),
api.network: (
api.neutron: (
'floating_ip_simple_associate_supported',
'floating_ip_supported',
),
api.network: (
'servers_update_addresses',
),
})
@ -148,9 +150,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.network.servers_update_addresses(IsA(http.HttpRequest), servers)
api.nova.tenant_absolute_limits(IsA(http.HttpRequest), reserved=True) \
.MultipleTimes().AndReturn(self.limits['absolute'])
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True)
api.network.floating_ip_simple_associate_supported(
api.neutron.floating_ip_simple_associate_supported(
IsA(http.HttpRequest)).MultipleTimes().AndReturn(True)
self.mox.ReplayAll()
@ -197,9 +199,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
'tenant_absolute_limits', 'extension_supported',
'is_feature_available',),
api.glance: ('image_list_detailed',),
api.network: ('floating_ip_simple_associate_supported',
'floating_ip_supported',
'servers_update_addresses',),
api.neutron: ('floating_ip_simple_associate_supported',
'floating_ip_supported',),
api.network: ('servers_update_addresses',),
})
def test_index_flavor_list_exception(self):
servers = self.servers.list()
@ -225,9 +227,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
AndReturn(full_flavors[server.flavor["id"]])
api.nova.tenant_absolute_limits(IsA(http.HttpRequest), reserved=True) \
.MultipleTimes().AndReturn(self.limits['absolute'])
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True)
api.network.floating_ip_simple_associate_supported(
api.neutron.floating_ip_simple_associate_supported(
IsA(http.HttpRequest)).MultipleTimes().AndReturn(True)
self.mox.ReplayAll()
@ -243,9 +245,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.nova: ('flavor_list', 'server_list', 'tenant_absolute_limits',
'extension_supported', 'is_feature_available',),
api.glance: ('image_list_detailed',),
api.network: ('floating_ip_simple_associate_supported',
'floating_ip_supported',
'servers_update_addresses',),
api.neutron: ('floating_ip_simple_associate_supported',
'floating_ip_supported',),
api.network: ('servers_update_addresses',),
})
def test_index_with_instance_booted_from_volume(self):
volume_server = self.servers.first()
@ -271,9 +273,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.network.servers_update_addresses(IsA(http.HttpRequest), servers)
api.nova.tenant_absolute_limits(IsA(http.HttpRequest), reserved=True) \
.MultipleTimes().AndReturn(self.limits['absolute'])
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True)
api.network.floating_ip_simple_associate_supported(
api.neutron.floating_ip_simple_associate_supported(
IsA(http.HttpRequest)).MultipleTimes().AndReturn(True)
self.mox.ReplayAll()
@ -988,12 +990,12 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
"extension_supported",
'is_feature_available',
),
api.network: (
api.neutron: (
"server_security_groups",
"servers_update_addresses",
"floating_ip_simple_associate_supported",
"floating_ip_supported"
)
),
api.network: ('servers_update_addresses',),
})
def _get_instance_details(self, server, qs=None,
flavor_return=None, volumes_return=None,
@ -1024,11 +1026,11 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
else:
api.nova.flavor_get(IsA(http.HttpRequest), server.flavor['id']) \
.AndReturn(flavor_return)
api.network.server_security_groups(IsA(http.HttpRequest), server.id) \
api.neutron.server_security_groups(IsA(http.HttpRequest), server.id) \
.AndReturn(security_groups_return)
api.network.floating_ip_simple_associate_supported(
api.neutron.floating_ip_simple_associate_supported(
IsA(http.HttpRequest)).MultipleTimes().AndReturn(True)
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True)
api.nova.extension_supported('AdminActions', IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True)
@ -1400,9 +1402,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.nova: ('flavor_list', 'server_list', 'tenant_absolute_limits',
'extension_supported', 'is_feature_available',),
api.glance: ('image_list_detailed',),
api.network: ('floating_ip_simple_associate_supported',
'floating_ip_supported',
'servers_update_addresses',),
api.neutron: ('floating_ip_simple_associate_supported',
'floating_ip_supported',),
api.network: ('servers_update_addresses',),
})
def _test_instances_index_retrieve_password_action(self):
servers = self.servers.list()
@ -1423,9 +1425,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.network.servers_update_addresses(IsA(http.HttpRequest), servers)
api.nova.tenant_absolute_limits(IsA(http.HttpRequest), reserved=True) \
.MultipleTimes().AndReturn(self.limits['absolute'])
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True)
api.network.floating_ip_simple_associate_supported(
api.neutron.floating_ip_simple_associate_supported(
IsA(http.HttpRequest)).MultipleTimes().AndReturn(True)
self.mox.ReplayAll()
@ -1470,7 +1472,7 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
instance_update_get_stubs = {
api.nova: ('server_get',),
api.network: ('security_group_list',
api.neutron: ('security_group_list',
'server_security_groups',)}
@helpers.create_stubs(instance_update_get_stubs)
@ -1478,9 +1480,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
server = self.servers.first()
api.nova.server_get(IsA(http.HttpRequest), server.id).AndReturn(server)
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn([])
api.network.server_security_groups(IsA(http.HttpRequest),
api.neutron.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn([])
self.mox.ReplayAll()
@ -1517,7 +1519,7 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
instance_update_post_stubs = {
api.nova: ('server_get', 'server_update'),
api.network: ('security_group_list',
api.neutron: ('security_group_list',
'server_security_groups',
'server_update_security_groups')}
@ -1530,15 +1532,15 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
wanted_groups = [secgroups[1].id, secgroups[2].id]
api.nova.server_get(IsA(http.HttpRequest), server.id).AndReturn(server)
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(secgroups)
api.network.server_security_groups(IsA(http.HttpRequest),
api.neutron.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn(server_groups)
api.nova.server_update(IsA(http.HttpRequest),
server.id,
server.name).AndReturn(server)
api.network.server_update_security_groups(IsA(http.HttpRequest),
api.neutron.server_update_security_groups(IsA(http.HttpRequest),
server.id,
wanted_groups)
@ -1553,14 +1555,14 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
server = self.servers.first()
api.nova.server_get(IsA(http.HttpRequest), server.id).AndReturn(server)
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn([])
api.network.server_security_groups(IsA(http.HttpRequest),
api.neutron.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn([])
api.nova.server_update(IsA(http.HttpRequest), server.id, server.name) \
.AndRaise(self.exceptions.nova)
api.network.server_update_security_groups(
api.neutron.server_update_security_groups(
IsA(http.HttpRequest), server.id, [])
self.mox.ReplayAll()
@ -1573,15 +1575,15 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
server = self.servers.first()
api.nova.server_get(IsA(http.HttpRequest), server.id).AndReturn(server)
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn([])
api.network.server_security_groups(IsA(http.HttpRequest),
api.neutron.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn([])
api.nova.server_update(IsA(http.HttpRequest),
server.id,
server.name).AndReturn(server)
api.network.server_update_security_groups(
api.neutron.server_update_security_groups(
IsA(http.HttpRequest),
server.id, []).AndRaise(self.exceptions.nova)
@ -1596,11 +1598,11 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
'keypair_list',
'server_group_list',
'availability_zone_list',),
api.network: ('security_group_list',),
cinder: ('volume_snapshot_list',
'volume_list',),
api.neutron: ('network_list',
'port_list'),
'port_list',
'security_group_list',),
api.glance: ('image_list_detailed',),
quotas: ('tenant_limit_usages',)})
def test_launch_instance_get(self,
@ -1832,11 +1834,11 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
'keypair_list',
'server_group_list',
'availability_zone_list',),
api.network: ('security_group_list',),
cinder: ('volume_snapshot_list',
'volume_list',),
api.neutron: ('network_list',
'port_list'),
'port_list',
'security_group_list',),
api.glance: ('image_list_detailed',),
quotas: ('tenant_limit_usages',)})
def test_launch_instance_get_bootable_volumes(self,
@ -1925,7 +1927,8 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
@helpers.create_stubs({api.glance: ('image_list_detailed',),
api.neutron: ('network_list',
'port_create',
'port_list'),
'port_list',
'security_group_list',),
api.nova: ('extension_supported',
'is_feature_available',
'flavor_list',
@ -1933,7 +1936,6 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
'availability_zone_list',
'server_group_list',
'server_create',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
quotas: ('tenant_quota_usages',)})
@ -2037,14 +2039,14 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
@helpers.create_stubs({api.glance: ('image_list_detailed',),
api.neutron: ('network_list',
'port_create',
'port_list'),
'port_list',
'security_group_list',),
api.nova: ('extension_supported',
'is_feature_available',
'flavor_list',
'keypair_list',
'availability_zone_list',
'server_create',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
quotas: ('tenant_quota_usages',)})
@ -2159,7 +2161,8 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
@helpers.create_stubs({api.glance: ('image_list_detailed',),
api.neutron: ('network_list',
'port_create',
'port_list'),
'port_list',
'security_group_list',),
api.nova: ('server_create',
'extension_supported',
'is_feature_available',
@ -2167,7 +2170,6 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
'keypair_list',
'availability_zone_list',
'server_group_list',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
quotas: ('tenant_quota_usages',)})
@ -2260,13 +2262,13 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
@helpers.create_stubs({api.glance: ('image_list_detailed',),
api.neutron: ('network_list',
'port_list'),
'port_list',
'security_group_list',),
api.nova: ('extension_supported',
'is_feature_available',
'flavor_list',
'keypair_list',
'availability_zone_list'),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
quotas: ('tenant_quota_usages',
@ -2337,7 +2339,8 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.glance: ('image_list_detailed',),
api.neutron: ('network_list',
'port_create',
'port_list'),
'port_list',
'security_group_list',),
api.nova: ('extension_supported',
'is_feature_available',
'flavor_list',
@ -2345,7 +2348,6 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
'availability_zone_list',
'server_group_list',
'server_create',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
quotas: ('tenant_quota_usages',)})
@ -2463,7 +2465,8 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.glance: ('image_list_detailed',),
api.neutron: ('network_list',
'port_create',
'port_list'),
'port_list',
'security_group_list',),
api.nova: ('extension_supported',
'is_feature_available',
'flavor_list',
@ -2471,7 +2474,6 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
'availability_zone_list',
'server_create',
'tenant_absolute_limits'),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',
'tenant_absolute_limits'),
@ -2532,10 +2534,10 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
@helpers.create_stubs({api.glance: ('image_list_detailed',),
api.neutron: ('network_list',
'port_list'),
'port_list',
'security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
api.network: ('security_group_list',),
api.nova: ('extension_supported',
'is_feature_available',
'flavor_list',
@ -2571,7 +2573,7 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
.AndRaise(self.exceptions.nova)
api.nova.keypair_list(IsA(http.HttpRequest)) \
.AndReturn(self.keypairs.list())
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())
@ -2591,7 +2593,8 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.neutron: ('network_list',
'port_create',
'port_delete',
'port_list'),
'port_list',
'security_group_list',),
api.nova: ('extension_supported',
'is_feature_available',
'flavor_list',
@ -2599,7 +2602,6 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
'availability_zone_list',
'server_group_list',
'server_create',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
quotas: ('tenant_quota_usages',)})
@ -2629,7 +2631,7 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
.AndReturn(volumes)
api.nova.flavor_list(IgnoreArg()).AndReturn(self.flavors.list())
api.nova.keypair_list(IgnoreArg()).AndReturn(self.keypairs.list())
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())
@ -2699,13 +2701,13 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
@helpers.create_stubs({api.glance: ('image_list_detailed',),
api.neutron: ('network_list',
'port_list'),
'port_list',
'security_group_list',),
api.nova: ('extension_supported',
'is_feature_available',
'flavor_list',
'keypair_list',
'availability_zone_list',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
quotas: ('tenant_limit_usages',
@ -2778,14 +2780,14 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
@helpers.create_stubs({api.glance: ('image_list_detailed',),
api.neutron: ('network_list',
'port_list'),
'port_list',
'security_group_list',),
api.nova: ('extension_supported',
'is_feature_available',
'flavor_list',
'keypair_list',
'server_group_list',
'availability_zone_list',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
quotas: ('tenant_quota_usages',
@ -2884,13 +2886,13 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
@helpers.create_stubs({api.glance: ('image_list_detailed',),
api.neutron: ('network_list',
'port_list'),
'port_list',
'security_group_list',),
api.nova: ('extension_supported',
'is_feature_available',
'flavor_list',
'keypair_list',
'availability_zone_list',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
quotas: ('tenant_quota_usages',
@ -2982,13 +2984,13 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
@helpers.create_stubs({api.glance: ('image_list_detailed',),
api.neutron: ('network_list',
'port_list'),
'port_list',
'security_group_list',),
api.nova: ('extension_supported',
'is_feature_available',
'flavor_list',
'keypair_list',
'availability_zone_list',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
quotas: ('tenant_quota_usages',
@ -3013,7 +3015,7 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
IsA(http.HttpRequest)).AndReturn(self.flavors.list())
api.nova.keypair_list(
IsA(http.HttpRequest)).AndReturn(self.keypairs.list())
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(self.security_groups.list())
api.nova.availability_zone_list(
IsA(http.HttpRequest)).AndReturn(self.availability_zones.list())
@ -3126,14 +3128,14 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
@helpers.create_stubs({api.glance: ('image_list_detailed',),
api.neutron: ('network_list',
'port_list'),
'port_list',
'security_group_list',),
api.nova: ('extension_supported',
'is_feature_available',
'flavor_list',
'keypair_list',
'server_group_list',
'availability_zone_list',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
quotas: ('tenant_quota_usages',
@ -3160,7 +3162,7 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
.AndReturn(True)
api.nova.keypair_list(IsA(http.HttpRequest)) \
.AndReturn(self.keypairs.list())
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())
@ -3244,9 +3246,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.nova: ('flavor_list', 'server_list', 'tenant_absolute_limits',
'extension_supported', 'is_feature_available',),
api.glance: ('image_list_detailed',),
api.network: ('floating_ip_simple_associate_supported',
'floating_ip_supported',
'servers_update_addresses',),
api.neutron: ('floating_ip_simple_associate_supported',
'floating_ip_supported',),
api.network: ('servers_update_addresses',),
})
def test_launch_button_attributes(self):
servers = self.servers.list()
@ -3270,9 +3272,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.network.servers_update_addresses(IsA(http.HttpRequest), servers)
api.nova.tenant_absolute_limits(IsA(http.HttpRequest), reserved=True) \
.MultipleTimes().AndReturn(limits)
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True)
api.network.floating_ip_simple_associate_supported(
api.neutron.floating_ip_simple_associate_supported(
IsA(http.HttpRequest)).MultipleTimes().AndReturn(True)
self.mox.ReplayAll()
@ -3293,9 +3295,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.nova: ('flavor_list', 'server_list', 'tenant_absolute_limits',
'extension_supported', 'is_feature_available',),
api.glance: ('image_list_detailed',),
api.network: ('floating_ip_simple_associate_supported',
'floating_ip_supported',
'servers_update_addresses',),
api.neutron: ('floating_ip_simple_associate_supported',
'floating_ip_supported',),
api.network: ('servers_update_addresses',),
})
def test_launch_button_disabled_when_quota_exceeded(self):
servers = self.servers.list()
@ -3319,9 +3321,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.network.servers_update_addresses(IsA(http.HttpRequest), servers)
api.nova.tenant_absolute_limits(IsA(http.HttpRequest), reserved=True) \
.MultipleTimes().AndReturn(limits)
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True)
api.network.floating_ip_simple_associate_supported(
api.neutron.floating_ip_simple_associate_supported(
IsA(http.HttpRequest)).MultipleTimes().AndReturn(True)
self.mox.ReplayAll()
@ -3339,7 +3341,8 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
@helpers.create_stubs({api.glance: ('image_list_detailed',),
api.neutron: ('network_list',
'port_list'),
'port_list',
'security_group_list',),
api.nova: ('extension_supported',
'is_feature_available',
'flavor_list',
@ -3348,7 +3351,6 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
'server_group_list',
'tenant_absolute_limits',
'server_create',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
quotas: ('tenant_quota_usages',)})
@ -3445,9 +3447,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.nova: ('flavor_list', 'server_list', 'tenant_absolute_limits',
'extension_supported', 'is_feature_available',),
api.glance: ('image_list_detailed',),
api.network: ('floating_ip_simple_associate_supported',
'floating_ip_supported',
'servers_update_addresses',),
api.neutron: ('floating_ip_simple_associate_supported',
'floating_ip_supported',),
api.network: ('servers_update_addresses',),
})
def test_index_options_after_migrate(self):
servers = self.servers.list()
@ -3471,9 +3473,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.network.servers_update_addresses(IsA(http.HttpRequest), servers)
api.nova.tenant_absolute_limits(IsA(http.HttpRequest), reserved=True) \
.MultipleTimes().AndReturn(self.limits['absolute'])
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True)
api.network.floating_ip_simple_associate_supported(
api.neutron.floating_ip_simple_associate_supported(
IsA(http.HttpRequest)).MultipleTimes().AndReturn(True)
self.mox.ReplayAll()
@ -3487,11 +3489,11 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
'flavor_list',
'keypair_list',
'availability_zone_list'),
api.network: ('security_group_list',),
cinder: ('volume_snapshot_list',
'volume_list',),
api.neutron: ('network_list',
'port_list'),
'port_list',
'security_group_list',),
api.glance: ('image_list_detailed',),
quotas: ('tenant_limit_usages',)})
def test_select_default_keypair_if_only_one(self):
@ -3537,13 +3539,15 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
def test_select_default_keypair_if_only_one_glance_v1(self):
self.test_select_default_keypair_if_only_one()
@helpers.create_stubs({api.network: ('floating_ip_target_list_by_instance',
'tenant_floating_ip_list',
'floating_ip_disassociate',
'servers_update_addresses',),
api.glance: ('image_list_detailed',),
api.nova: ('server_list',
'flavor_list')})
@helpers.create_stubs({
api.neutron: ('floating_ip_target_list_by_instance',
'tenant_floating_ip_list',
'floating_ip_disassociate',),
api.network: ('servers_update_addresses',),
api.glance: ('image_list_detailed',),
api.nova: ('server_list',
'flavor_list'),
})
def test_disassociate_floating_ip(self):
servers = self.servers.list()
server = servers[0]
@ -3557,12 +3561,12 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.nova.flavor_list(IgnoreArg()).AndReturn(self.flavors.list())
api.glance.image_list_detailed(IgnoreArg()) \
.AndReturn((self.images.list(), False, False))
api.network.floating_ip_target_list_by_instance(
api.neutron.floating_ip_target_list_by_instance(
IsA(http.HttpRequest),
server.id).AndReturn([server.id, ])
api.network.tenant_floating_ip_list(
api.neutron.tenant_floating_ip_list(
IsA(http.HttpRequest)).AndReturn([fip])
api.network.floating_ip_disassociate(
api.neutron.floating_ip_disassociate(
IsA(http.HttpRequest), fip.id)
self.mox.ReplayAll()
@ -3911,9 +3915,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.nova: ('flavor_list', 'server_list', 'tenant_absolute_limits',
'extension_supported', 'is_feature_available',),
api.glance: ('image_list_detailed',),
api.network: ('floating_ip_simple_associate_supported',
'floating_ip_supported',
'servers_update_addresses',),
api.neutron: ('floating_ip_simple_associate_supported',
'floating_ip_supported',),
api.network: ('servers_update_addresses',),
})
def test_index_form_action_with_pagination(self):
"""The form action on the next page should have marker
@ -3948,9 +3952,9 @@ class InstanceTests(helpers.ResetImageAPIVersionMixin, helpers.TestCase):
api.nova.tenant_absolute_limits(IsA(http.HttpRequest), reserved=True) \
.MultipleTimes().AndReturn(self.limits['absolute'])
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True)
api.network.floating_ip_simple_associate_supported(
api.neutron.floating_ip_simple_associate_supported(
IsA(http.HttpRequest)).MultipleTimes().AndReturn(True)
self.mox.ReplayAll()
@ -4067,8 +4071,7 @@ class InstanceAjaxTests(helpers.TestCase):
"flavor_get",
"extension_supported",
"is_feature_available"),
api.network: ('servers_update_addresses',),
api.neutron: ("is_extension_supported",)})
api.network: ('servers_update_addresses',)})
def test_row_update(self):
server = self.servers.first()
instance_id = server.id
@ -4083,13 +4086,12 @@ class InstanceAjaxTests(helpers.TestCase):
).MultipleTimes().AndReturn(True)
api.nova.extension_supported('Shelve', IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True)
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'security-group')\
.MultipleTimes().AndReturn(True)
api.nova.server_get(IsA(http.HttpRequest), instance_id)\
.AndReturn(server)
api.nova.flavor_get(IsA(http.HttpRequest), flavor_id)\
.AndReturn(full_flavors[flavor_id])
api.network.servers_update_addresses(IsA(http.HttpRequest), server) \
.AndReturn(None)
self.mox.ReplayAll()
@ -4105,7 +4107,6 @@ class InstanceAjaxTests(helpers.TestCase):
"flavor_get",
'is_feature_available',
"extension_supported"),
api.neutron: ("is_extension_supported",),
api.network: ('servers_update_addresses',)})
def test_row_update_instance_error(self):
server = self.servers.first()
@ -4132,13 +4133,12 @@ class InstanceAjaxTests(helpers.TestCase):
).MultipleTimes().AndReturn(True)
api.nova.extension_supported('Shelve', IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True)
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'security-group')\
.MultipleTimes().AndReturn(True)
api.nova.server_get(IsA(http.HttpRequest), instance_id)\
.AndReturn(server)
api.nova.flavor_get(IsA(http.HttpRequest), flavor_id)\
.AndReturn(full_flavors[flavor_id])
api.network.servers_update_addresses(IsA(http.HttpRequest), server) \
.AndReturn(None)
self.mox.ReplayAll()
@ -4164,8 +4164,7 @@ class InstanceAjaxTests(helpers.TestCase):
"flavor_get",
'is_feature_available',
"extension_supported"),
api.neutron: ("is_extension_supported",
"servers_update_addresses",)})
api.network: ('servers_update_addresses',)})
def test_row_update_flavor_not_found(self):
server = self.servers.first()
instance_id = server.id
@ -4178,13 +4177,12 @@ class InstanceAjaxTests(helpers.TestCase):
).MultipleTimes().AndReturn(True)
api.nova.extension_supported('Shelve', IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(True)
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'security-group')\
.MultipleTimes().AndReturn(True)
api.nova.server_get(IsA(http.HttpRequest), instance_id)\
.AndReturn(server)
api.nova.flavor_get(IsA(http.HttpRequest), server.flavor["id"])\
.AndRaise(self.exceptions.nova)
api.network.servers_update_addresses(IsA(http.HttpRequest), server) \
.AndReturn(None)
self.mox.ReplayAll()

View File

@ -375,7 +375,7 @@ class DetailView(tabs.TabView):
exceptions.handle(self.request, msg, ignore=True)
try:
instance.security_groups = api.network.server_security_groups(
instance.security_groups = api.neutron.server_security_groups(
self.request, instance_id)
except Exception:
msg = _('Unable to retrieve security groups for instance '

View File

@ -580,7 +580,7 @@ class SetAccessControlsAction(workflows.Action):
def populate_groups_choices(self, request, context):
try:
groups = api.network.security_group_list(request)
groups = api.neutron.security_group_list(request)
security_group_list = [(sg.id, sg.name) for sg in groups]
except Exception:
exceptions.handle(request,

View File

@ -48,14 +48,14 @@ class UpdateInstanceSecurityGroupsAction(workflows.MembershipAction):
# Get list of available security groups
all_groups = []
try:
all_groups = api.network.security_group_list(request)
all_groups = api.neutron.security_group_list(request)
except Exception:
exceptions.handle(request, err_msg)
groups_list = [(group.id, group.name) for group in all_groups]
instance_groups = []
try:
instance_groups = api.network.server_security_groups(request,
instance_groups = api.neutron.server_security_groups(request,
instance_id)
except Exception:
exceptions.handle(request, err_msg)
@ -69,7 +69,7 @@ class UpdateInstanceSecurityGroupsAction(workflows.MembershipAction):
instance_id = data['instance_id']
wanted_groups = map(filters.get_int_or_uuid, data['wanted_groups'])
try:
api.network.server_update_security_groups(request, instance_id,
api.neutron.server_update_security_groups(request, instance_id,
wanted_groups)
except Exception as e:
exceptions.handle(request, str(e))

View File

@ -67,20 +67,20 @@ class UsageViewTests(test.TestCase):
api.cinder.tenant_absolute_limits(IsA(http.HttpRequest)) \
.AndReturn(self.cinder_limits['absolute'])
@test.create_stubs({api.neutron: ('is_extension_supported',),
api.network: ('floating_ip_supported',
@test.create_stubs({api.neutron: ('is_extension_supported',
'floating_ip_supported',
'tenant_floating_ip_list',
'security_group_list')})
def _stub_neutron_api_calls(self, neutron_sg_enabled=True):
api.neutron.is_extension_supported(
IsA(http.HttpRequest),
'security-group').AndReturn(neutron_sg_enabled)
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.AndReturn(True)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
if neutron_sg_enabled:
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
def _nova_stu_enabled(self, exception=False, overview_days_range=1):
@ -248,8 +248,8 @@ class UsageViewTests(test.TestCase):
self._test_usage_with_neutron(neutron_fip_enabled=False)
@test.create_stubs({api.neutron: ('tenant_quota_get',
'is_extension_supported'),
api.network: ('floating_ip_supported',
'is_extension_supported',
'floating_ip_supported',
'tenant_floating_ip_list',
'security_group_list')})
def _test_usage_with_neutron_prepare(self):
@ -264,13 +264,13 @@ class UsageViewTests(test.TestCase):
api.neutron.is_extension_supported(
IsA(http.HttpRequest),
'security-group').AndReturn(neutron_sg_enabled)
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.AndReturn(neutron_fip_enabled)
if neutron_fip_enabled:
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
if neutron_sg_enabled:
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.neutron.tenant_quota_get(IsA(http.HttpRequest), self.tenant.id) \
.AndReturn(self.neutron_quotas.first())

View File

@ -78,7 +78,7 @@ class CreateGroup(GroupBase):
error_message = _('Unable to create security group: %s')
def _call_network_api(self, request, data):
return api.network.security_group_create(request,
return api.neutron.security_group_create(request,
data['name'],
data['description'])
@ -90,7 +90,7 @@ class UpdateGroup(GroupBase):
id = forms.CharField(widget=forms.HiddenInput())
def _call_network_api(self, request, data):
return api.network.security_group_update(request,
return api.neutron.security_group_update(request,
data['id'],
data['name'],
data['description'])
@ -415,7 +415,7 @@ class AddRule(forms.SelfHandlingForm):
redirect = reverse("horizon:project:security_groups:detail",
args=[data['id']])
try:
rule = api.network.security_group_rule_create(
rule = api.neutron.security_group_rule_create(
request,
filters.get_int_or_uuid(data['id']),
data['direction'],

View File

@ -52,7 +52,7 @@ class DeleteGroup(policy.PolicyTargetMixin, tables.DeleteAction):
return security_group.name != 'default'
def delete(self, request, obj_id):
api.network.security_group_delete(request, obj_id)
api.neutron.security_group_delete(request, obj_id)
class CreateGroup(tables.LinkAction):
@ -149,7 +149,7 @@ class DeleteRule(tables.DeleteAction):
)
def delete(self, request, obj_id):
api.network.security_group_rule_delete(request, obj_id)
api.neutron.security_group_rule_delete(request, obj_id)
def get_success_url(self, request):
sg_id = self.table.kwargs['security_group_id']

View File

@ -65,14 +65,14 @@ class SecurityGroupsViewTests(test.TestCase):
self.edit_url = reverse(SG_ADD_RULE_VIEW, args=[sec_group.id])
self.update_url = reverse(SG_UPDATE_VIEW, args=[sec_group.id])
@test.create_stubs({api.network: ('security_group_list',),
@test.create_stubs({api.neutron: ('security_group_list',),
quotas: ('tenant_quota_usages',)})
def test_index(self):
sec_groups = self.security_groups.list()
quota_data = self.quota_usages.first()
quota_data['security_groups']['available'] = 10
api.network.security_group_list(IsA(http.HttpRequest)) \
api.neutron.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(sec_groups)
quotas.tenant_quota_usages(IsA(http.HttpRequest)).MultipleTimes() \
.AndReturn(quota_data)
@ -96,14 +96,14 @@ class SecurityGroupsViewTests(test.TestCase):
all([sec_groups_from_ctx[i].name <= sec_groups_from_ctx[i + 1].name
for i in range(len(sec_groups_from_ctx) - 1)]))
@test.create_stubs({api.network: ('security_group_list',),
@test.create_stubs({api.neutron: ('security_group_list',),
quotas: ('tenant_quota_usages',)})
def test_create_button_attributes(self):
sec_groups = self.security_groups.list()
quota_data = self.quota_usages.first()
quota_data['security_groups']['available'] = 10
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)) \
.AndReturn(sec_groups)
quotas.tenant_quota_usages(
@ -128,7 +128,7 @@ class SecurityGroupsViewTests(test.TestCase):
url = 'horizon:project:security_groups:create'
self.assertEqual(url, create_action.url)
@test.create_stubs({api.network: ('security_group_list',),
@test.create_stubs({api.neutron: ('security_group_list',),
quotas: ('tenant_quota_usages',)})
def _test_create_button_disabled_when_quota_exceeded(self,
network_enabled):
@ -136,7 +136,7 @@ class SecurityGroupsViewTests(test.TestCase):
quota_data = self.quota_usages.first()
quota_data['security_groups']['available'] = 0
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)) \
.AndReturn(sec_groups)
quotas.tenant_quota_usages(
@ -161,14 +161,14 @@ class SecurityGroupsViewTests(test.TestCase):
def test_create_button_disabled_when_quota_exceeded_neutron_enabled(self):
self._test_create_button_disabled_when_quota_exceeded(True)
@test.create_stubs({api.network: ('security_group_rule_create',
@test.create_stubs({api.neutron: ('security_group_rule_create',
'security_group_list')})
def _add_security_group_rule_fixture(self, **kwargs):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_rule_create(
api.neutron.security_group_rule_create(
IsA(http.HttpRequest),
kwargs.get('sec_group', sec_group.id),
kwargs.get('ingress', 'ingress'),
@ -178,14 +178,14 @@ class SecurityGroupsViewTests(test.TestCase):
kwargs.get('to_port', int(rule.to_port)),
kwargs.get('cidr', rule.ip_range['cidr']),
kwargs.get('security_group', u'%s' % sec_group.id)).AndReturn(rule)
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
return sec_group, rule
@test.create_stubs({api.network: ('security_group_get',)})
@test.create_stubs({api.neutron: ('security_group_get',)})
def test_update_security_groups_get(self):
sec_group = self.security_groups.first()
api.network.security_group_get(IsA(http.HttpRequest),
api.neutron.security_group_get(IsA(http.HttpRequest),
sec_group.id).AndReturn(sec_group)
self.mox.ReplayAll()
res = self.client.get(self.update_url)
@ -193,7 +193,7 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertEqual(res.context['security_group'].name,
sec_group.name)
@test.create_stubs({api.network: ('security_group_update',
@test.create_stubs({api.neutron: ('security_group_update',
'security_group_get')})
def test_update_security_groups_post(self):
"""Ensure that we can change a group name.
@ -204,12 +204,12 @@ class SecurityGroupsViewTests(test.TestCase):
"""
sec_group = self.security_groups.first()
sec_group.name = "@new name"
api.network.security_group_update(
api.neutron.security_group_update(
IsA(http.HttpRequest),
str(sec_group.id),
sec_group.name,
sec_group.description).AndReturn(sec_group)
api.network.security_group_get(
api.neutron.security_group_get(
IsA(http.HttpRequest), sec_group.id).AndReturn(sec_group)
self.mox.ReplayAll()
form_data = {'method': 'UpdateGroup',
@ -238,9 +238,9 @@ class SecurityGroupsViewTests(test.TestCase):
sec_group.name = '@group name-\xe3\x82\xb3'
self._create_security_group(sec_group)
@test.create_stubs({api.network: ('security_group_create',)})
@test.create_stubs({api.neutron: ('security_group_create',)})
def _create_security_group(self, sec_group):
api.network.security_group_create(
api.neutron.security_group_create(
IsA(http.HttpRequest),
sec_group.name,
sec_group.description).AndReturn(sec_group)
@ -252,10 +252,10 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.post(SG_CREATE_URL, form_data)
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api.network: ('security_group_create',)})
@test.create_stubs({api.neutron: ('security_group_create',)})
def test_create_security_groups_post_exception(self):
sec_group = self.security_groups.first()
api.network.security_group_create(
api.neutron.security_group_create(
IsA(http.HttpRequest),
sec_group.name,
sec_group.description).AndRaise(self.exceptions.nova)
@ -268,21 +268,21 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertMessageCount(error=1)
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api.network: ('security_group_get',)})
@test.create_stubs({api.neutron: ('security_group_get',)})
def test_detail_get(self):
sec_group = self.security_groups.first()
api.network.security_group_get(IsA(http.HttpRequest),
api.neutron.security_group_get(IsA(http.HttpRequest),
sec_group.id).AndReturn(sec_group)
self.mox.ReplayAll()
res = self.client.get(self.detail_url)
self.assertTemplateUsed(res, SG_DETAIL_TEMPLATE)
@test.create_stubs({api.network: ('security_group_get',)})
@test.create_stubs({api.neutron: ('security_group_get',)})
def test_detail_get_exception(self):
sec_group = self.security_groups.first()
api.network.security_group_get(
api.neutron.security_group_get(
IsA(http.HttpRequest),
sec_group.id).AndRaise(self.exceptions.nova)
self.mox.ReplayAll()
@ -371,14 +371,14 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_create',
@test.create_stubs({api.neutron: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_cidr_with_template(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_rule_create(IsA(http.HttpRequest),
api.neutron.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id,
'ingress', 'IPv4',
rule.ip_protocol,
@ -386,7 +386,7 @@ class SecurityGroupsViewTests(test.TestCase):
int(rule.to_port),
rule.ip_range['cidr'],
None).AndReturn(rule)
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -405,14 +405,14 @@ class SecurityGroupsViewTests(test.TestCase):
return rule
raise Exception("No matches found.")
@test.create_stubs({api.network: ('security_group_rule_create',
@test.create_stubs({api.neutron: ('security_group_rule_create',
'security_group_list',)})
def test_detail_add_rule_self_as_source_group(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self._get_source_group_rule()
api.network.security_group_rule_create(
api.neutron.security_group_rule_create(
IsA(http.HttpRequest),
sec_group.id,
'ingress',
@ -423,7 +423,7 @@ class SecurityGroupsViewTests(test.TestCase):
int(rule.to_port),
None,
u'%s' % sec_group.id).AndReturn(rule)
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -438,14 +438,14 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_create',
@test.create_stubs({api.neutron: ('security_group_rule_create',
'security_group_list',)})
def test_detail_add_rule_self_as_source_group_with_template(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self._get_source_group_rule()
api.network.security_group_rule_create(
api.neutron.security_group_rule_create(
IsA(http.HttpRequest),
sec_group.id,
'ingress',
@ -456,7 +456,7 @@ class SecurityGroupsViewTests(test.TestCase):
int(rule.to_port),
None,
u'%s' % sec_group.id).AndReturn(rule)
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -470,16 +470,16 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_list',)})
@test.create_stubs({api.neutron: ('security_group_list',)})
def test_detail_invalid_port(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
if django.VERSION >= (1, 9):
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -495,18 +495,18 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertNoMessages()
self.assertContains(res, "The specified port is invalid")
@test.create_stubs({api.network: ('security_group_list',)})
@test.create_stubs({api.neutron: ('security_group_list',)})
def test_detail_invalid_port_range(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
for i in range(3):
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
if django.VERSION >= (1, 9):
for i in range(3):
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -549,7 +549,7 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertContains(res, cgi.escape('"to" port number is invalid',
quote=True))
@test.create_stubs({api.network: ('security_group_get',
@test.create_stubs({api.neutron: ('security_group_get',
'security_group_list')})
def test_detail_invalid_icmp_rule(self):
sec_group = self.security_groups.first()
@ -562,7 +562,7 @@ class SecurityGroupsViewTests(test.TestCase):
call_post *= 2
for i in range(call_post):
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -628,14 +628,14 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertContains(
res, "ICMP code is provided but ICMP type is missing.")
@test.create_stubs({api.network: ('security_group_rule_create',
@test.create_stubs({api.neutron: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_exception(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_rule_create(
api.neutron.security_group_rule_create(
IsA(http.HttpRequest),
sec_group.id, 'ingress', 'IPv4',
rule.ip_protocol,
@ -643,7 +643,7 @@ class SecurityGroupsViewTests(test.TestCase):
int(rule.to_port),
rule.ip_range['cidr'],
None).AndRaise(self.exceptions.nova)
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -657,14 +657,14 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_create',
@test.create_stubs({api.neutron: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_duplicated(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_rule_create(
api.neutron.security_group_rule_create(
IsA(http.HttpRequest),
sec_group.id, 'ingress', 'IPv4',
rule.ip_protocol,
@ -672,7 +672,7 @@ class SecurityGroupsViewTests(test.TestCase):
int(rule.to_port),
rule.ip_range['cidr'],
None).AndRaise(exceptions.Conflict)
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -687,12 +687,12 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_delete',)})
@test.create_stubs({api.neutron: ('security_group_rule_delete',)})
def test_detail_delete_rule(self):
sec_group = self.security_groups.first()
rule = self.security_group_rules.first()
api.network.security_group_rule_delete(IsA(http.HttpRequest), rule.id)
api.neutron.security_group_rule_delete(IsA(http.HttpRequest), rule.id)
self.mox.ReplayAll()
form_data = {"action": "rules__delete__%s" % rule.id}
@ -703,12 +703,12 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertEqual(strip_absolute_base(handled['location']),
self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_delete',)})
@test.create_stubs({api.neutron: ('security_group_rule_delete',)})
def test_detail_delete_rule_exception(self):
sec_group = self.security_groups.first()
rule = self.security_group_rules.first()
api.network.security_group_rule_delete(
api.neutron.security_group_rule_delete(
IsA(http.HttpRequest),
rule.id).AndRaise(self.exceptions.nova)
self.mox.ReplayAll()
@ -722,11 +722,11 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertEqual(strip_absolute_base(handled['location']),
self.detail_url)
@test.create_stubs({api.network: ('security_group_delete',)})
@test.create_stubs({api.neutron: ('security_group_delete',)})
def test_delete_group(self):
sec_group = self.security_groups.get(name="other_group")
api.network.security_group_delete(IsA(http.HttpRequest), sec_group.id)
api.neutron.security_group_delete(IsA(http.HttpRequest), sec_group.id)
self.mox.ReplayAll()
form_data = {"action": "security_groups__delete__%s" % sec_group.id}
@ -736,11 +736,11 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertEqual(strip_absolute_base(handled['location']),
INDEX_URL)
@test.create_stubs({api.network: ('security_group_delete',)})
@test.create_stubs({api.neutron: ('security_group_delete',)})
def test_delete_group_exception(self):
sec_group = self.security_groups.get(name="other_group")
api.network.security_group_delete(
api.neutron.security_group_delete(
IsA(http.HttpRequest),
sec_group.id).AndRaise(self.exceptions.nova)
@ -754,18 +754,18 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertEqual(strip_absolute_base(handled['location']),
INDEX_URL)
@test.create_stubs({api.network: ('security_group_rule_create',
@test.create_stubs({api.neutron: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_custom_protocol(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_rule_create(IsA(http.HttpRequest),
api.neutron.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id, 'ingress', 'IPv6',
37, None, None, 'fe80::/48',
None).AndReturn(rule)
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -780,18 +780,18 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_create',
@test.create_stubs({api.neutron: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_egress(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_rule_create(IsA(http.HttpRequest),
api.neutron.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id, 'egress', 'IPv4',
'udp', 80, 80, '10.1.1.0/24',
None).AndReturn(rule)
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -806,21 +806,21 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_create',
@test.create_stubs({api.neutron: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_egress_with_all_tcp(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.list()[3]
api.network.security_group_rule_create(IsA(http.HttpRequest),
api.neutron.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id, 'egress', 'IPv4',
rule.ip_protocol,
int(rule.from_port),
int(rule.to_port),
rule.ip_range['cidr'],
None).AndReturn(rule)
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -834,14 +834,14 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_create',
@test.create_stubs({api.neutron: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_source_group_with_direction_ethertype(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self._get_source_group_rule()
api.network.security_group_rule_create(
api.neutron.security_group_rule_create(
IsA(http.HttpRequest),
sec_group.id,
'egress',
@ -852,7 +852,7 @@ class SecurityGroupsViewTests(test.TestCase):
int(rule.to_port),
None,
u'%s' % sec_group.id).AndReturn(rule)
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -871,7 +871,7 @@ class SecurityGroupsViewTests(test.TestCase):
@test.update_settings(
OPENSTACK_NEUTRON_NETWORK={'enable_ipv6': False})
@test.create_stubs({api.network: ('security_group_rule_create',
@test.create_stubs({api.neutron: ('security_group_rule_create',
'security_group_list')})
def test_add_rule_ethertype_with_ipv6_disabled(self):
@ -894,7 +894,7 @@ class SecurityGroupsViewTests(test.TestCase):
@test.update_settings(
OPENSTACK_NEUTRON_NETWORK={'enable_ipv6': False})
@test.create_stubs({api.network: ('security_group_list',)})
@test.create_stubs({api.neutron: ('security_group_list',)})
def test_add_rule_cidr_with_ipv6_disabled(self):
sec_group = self.security_groups.first()
@ -913,16 +913,16 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertFormError(res, 'form', 'cidr',
'Invalid version for IP address')
@test.create_stubs({api.network: ('security_group_list',)})
@test.create_stubs({api.neutron: ('security_group_list',)})
def test_detail_add_rule_invalid_port(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
if django.VERSION >= (1, 9):
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -938,21 +938,21 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertNoMessages()
self.assertContains(res, "Not a valid port number")
@test.create_stubs({api.network: ('security_group_rule_create',
@test.create_stubs({api.neutron: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_ingress_tcp_without_port(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.list()[3]
api.network.security_group_rule_create(IsA(http.HttpRequest),
api.neutron.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id, 'ingress', 'IPv4',
'tcp',
None,
None,
rule.ip_range['cidr'],
None).AndReturn(rule)
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -965,21 +965,21 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_create',
@test.create_stubs({api.neutron: ('security_group_rule_create',
'security_group_list')})
def test_detail_add_rule_custom_without_protocol(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.list()[3]
api.network.security_group_rule_create(IsA(http.HttpRequest),
api.neutron.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id, 'ingress', 'IPv4',
None,
None,
None,
rule.ip_range['cidr'],
None).AndReturn(rule)
api.network.security_group_list(
api.neutron.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()

View File

@ -48,7 +48,7 @@ class DetailView(tables.DataTableView):
def _get_data(self):
sg_id = filters.get_int_or_uuid(self.kwargs['security_group_id'])
try:
return api.network.security_group_get(self.request, sg_id)
return api.neutron.security_group_get(self.request, sg_id)
except Exception:
redirect = reverse('horizon:project:security_groups:index')
exceptions.handle(self.request,
@ -82,7 +82,7 @@ class UpdateView(forms.ModalFormView):
def get_object(self):
sg_id = filters.get_int_or_uuid(self.kwargs['security_group_id'])
try:
return api.network.security_group_get(self.request, sg_id)
return api.neutron.security_group_get(self.request, sg_id)
except Exception:
msg = _('Unable to retrieve security group.')
url = reverse('horizon:project:security_groups:index')
@ -131,7 +131,7 @@ class AddRuleView(forms.ModalFormView):
kwargs = super(AddRuleView, self).get_form_kwargs()
try:
groups = api.network.security_group_list(self.request)
groups = api.neutron.security_group_list(self.request)
except Exception:
groups = []
exceptions.handle(self.request,
@ -167,7 +167,7 @@ class IndexView(tables.DataTableView):
def get_data(self):
try:
security_groups = api.network.security_group_list(self.request)
security_groups = api.neutron.security_group_list(self.request)
except neutron_exc.ConnectionFailed:
security_groups = []
exceptions.handle(self.request)

View File

@ -20,7 +20,7 @@ from openstack_dashboard.test import helpers as test
class RestNetworkApiSecurityGroupTests(test.TestCase):
@mock.patch.object(network.api, 'network')
@mock.patch.object(network.api, 'neutron')
def test_security_group_detailed(self, client):
request = self.mock_rest_request()
client.security_group_list.return_value = [
@ -36,7 +36,7 @@ class RestNetworkApiSecurityGroupTests(test.TestCase):
class RestNetworkApiFloatingIpTests(test.TestCase):
@mock.patch.object(network.api, 'network')
@mock.patch.object(network.api, 'neutron')
def test_floating_ip_list(self, client):
request = self.mock_rest_request()
client.tenant_floating_ip_list.return_value = ([
@ -50,7 +50,7 @@ class RestNetworkApiFloatingIpTests(test.TestCase):
{'items': [{'ip': '1.2.3.4'}, {'ip': '2.3.4.5'}]})
client.tenant_floating_ip_list.assert_called_once_with(request)
@mock.patch.object(network.api, 'network')
@mock.patch.object(network.api, 'neutron')
def test_floating_ip_pool_list(self, client):
request = self.mock_rest_request()
client.floating_ip_pools_list.return_value = ([
@ -64,7 +64,7 @@ class RestNetworkApiFloatingIpTests(test.TestCase):
{'items': [{'name': '1'}, {'name': '2'}]})
client.floating_ip_pools_list.assert_called_once_with(request)
@mock.patch.object(network.api, 'network')
@mock.patch.object(network.api, 'neutron')
def test_allocate_floating_ip(self, client):
request = self.mock_rest_request(
body='{"pool_id": "pool"}'
@ -80,7 +80,7 @@ class RestNetworkApiFloatingIpTests(test.TestCase):
client.tenant_floating_ip_allocate.assert_called_once_with(request,
'pool')
@mock.patch.object(network.api, 'network')
@mock.patch.object(network.api, 'neutron')
def test_associate_floating_ip(self, client):
request = self.mock_rest_request(
body='{"address_id": "address", "port_id": "port"}'
@ -92,7 +92,7 @@ class RestNetworkApiFloatingIpTests(test.TestCase):
'address',
'port')
@mock.patch.object(network.api, 'network')
@mock.patch.object(network.api, 'neutron')
def test_disassociate_floating_ip(self, client):
request = self.mock_rest_request(
body='{"address_id": "address"}'

View File

@ -13,65 +13,16 @@
# under the License.
import collections
import copy
from django import http
from django.test.utils import override_settings
from mox3.mox import IsA
import six
from oslo_utils import uuidutils
from openstack_dashboard import api
from openstack_dashboard.test import helpers as test
class NetworkClientTestCase(test.APITestCase):
def test_networkclient_no_neutron(self):
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
.AndReturn(False)
self.mox.ReplayAll()
nc = api.network.NetworkClient(self.request)
self.assertIsNone(nc.floating_ips)
self.assertIsNone(nc.secgroups)
@test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_networkclient_neutron(self):
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
.AndReturn(True)
self.neutronclient = self.stub_neutronclient()
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'security-group').AndReturn(True)
self.mox.ReplayAll()
nc = api.network.NetworkClient(self.request)
self.assertIsInstance(nc.floating_ips, api.neutron.FloatingIpManager)
self.assertIsInstance(nc.secgroups, api.neutron.SecurityGroupManager)
@test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_networkclient_neutron_without_security_group(self):
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
.AndReturn(True)
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'security-group').AndReturn(False)
self.mox.ReplayAll()
nc = api.network.NetworkClient(self.request)
self.assertIsInstance(nc.floating_ips, api.neutron.FloatingIpManager)
self.assertIsNone(nc.secgroups)
class NetworkApiNeutronTestBase(test.APITestCase):
def setUp(self):
super(NetworkApiNeutronTestBase, self).setUp()
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
.AndReturn(True)
self.qclient = self.stub_neutronclient()
@ -179,487 +130,3 @@ class NetworkApiNeutronTests(NetworkApiNeutronTestBase):
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_router': False})
def test_servers_update_addresses_router_disabled(self):
self._test_servers_update_addresses(router_enabled=False)
class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
def setUp(self):
super(NetworkApiNeutronSecurityGroupTests, self).setUp()
self.qclient.list_extensions() \
.AndReturn({'extensions': self.api_extensions.list()})
self.sg_dict = dict([(sg['id'], sg['name']) for sg
in self.api_security_groups.list()])
def _cmp_sg_rule(self, exprule, retrule):
self.assertEqual(exprule['id'], retrule.id)
self.assertEqual(exprule['security_group_id'],
retrule.parent_group_id)
self.assertEqual(exprule['direction'],
retrule.direction)
self.assertEqual(exprule['ethertype'],
retrule.ethertype)
self.assertEqual(exprule['port_range_min'],
retrule.from_port)
self.assertEqual(exprule['port_range_max'],
retrule.to_port,)
if (exprule['remote_ip_prefix'] is None and
exprule['remote_group_id'] is None):
expcidr = ('::/0' if exprule['ethertype'] == 'IPv6'
else '0.0.0.0/0')
else:
expcidr = exprule['remote_ip_prefix']
self.assertEqual(expcidr, retrule.ip_range.get('cidr'))
self.assertEqual(self.sg_dict.get(exprule['remote_group_id']),
retrule.group.get('name'))
def _cmp_sg(self, exp_sg, ret_sg):
self.assertEqual(exp_sg['id'], ret_sg.id)
self.assertEqual(exp_sg['name'], ret_sg.name)
exp_rules = exp_sg['security_group_rules']
self.assertEqual(len(exp_rules), len(ret_sg.rules))
for (exprule, retrule) in six.moves.zip(exp_rules, ret_sg.rules):
self._cmp_sg_rule(exprule, retrule)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_list(self):
sgs = self.api_security_groups.list()
tenant_id = self.request.user.tenant_id
# api.neutron.is_extension_supported(self.request, 'security-group').\
# AndReturn(True)
# use deepcopy to ensure self.api_security_groups is not modified.
self.qclient.list_security_groups(tenant_id=tenant_id) \
.AndReturn({'security_groups': copy.deepcopy(sgs)})
self.mox.ReplayAll()
rets = api.network.security_group_list(self.request)
self.assertEqual(len(sgs), len(rets))
for (exp, ret) in six.moves.zip(sgs, rets):
self._cmp_sg(exp, ret)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_get(self):
secgroup = self.api_security_groups.first()
sg_ids = set([secgroup['id']] +
[rule['remote_group_id'] for rule
in secgroup['security_group_rules']
if rule['remote_group_id']])
related_sgs = [sg for sg in self.api_security_groups.list()
if sg['id'] in sg_ids]
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
# use deepcopy to ensure self.api_security_groups is not modified.
self.qclient.show_security_group(secgroup['id']) \
.AndReturn({'security_group': copy.deepcopy(secgroup)})
self.qclient.list_security_groups(id=sg_ids, fields=['id', 'name']) \
.AndReturn({'security_groups': related_sgs})
self.mox.ReplayAll()
ret = api.network.security_group_get(self.request, secgroup['id'])
self._cmp_sg(secgroup, ret)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_create(self):
secgroup = self.api_security_groups.list()[1]
body = {'security_group':
{'name': secgroup['name'],
'description': secgroup['description'],
'tenant_id': self.request.user.project_id}}
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.create_security_group(body) \
.AndReturn({'security_group': copy.deepcopy(secgroup)})
self.mox.ReplayAll()
ret = api.network.security_group_create(self.request, secgroup['name'],
secgroup['description'])
self._cmp_sg(secgroup, ret)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_update(self):
secgroup = self.api_security_groups.list()[1]
secgroup = copy.deepcopy(secgroup)
secgroup['name'] = 'newname'
secgroup['description'] = 'new description'
body = {'security_group':
{'name': secgroup['name'],
'description': secgroup['description']}}
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.update_security_group(secgroup['id'], body) \
.AndReturn({'security_group': secgroup})
self.mox.ReplayAll()
ret = api.network.security_group_update(self.request,
secgroup['id'],
secgroup['name'],
secgroup['description'])
self._cmp_sg(secgroup, ret)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_delete(self):
secgroup = self.api_security_groups.first()
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.delete_security_group(secgroup['id'])
self.mox.ReplayAll()
api.network.security_group_delete(self.request, secgroup['id'])
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_rule_create(self):
sg_rule = [r for r in self.api_security_group_rules.list()
if r['protocol'] == 'tcp' and r['remote_ip_prefix']][0]
sg_id = sg_rule['security_group_id']
secgroup = [sg for sg in self.api_security_groups.list()
if sg['id'] == sg_id][0]
post_rule = copy.deepcopy(sg_rule)
del post_rule['id']
del post_rule['tenant_id']
post_body = {'security_group_rule': post_rule}
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.create_security_group_rule(post_body) \
.AndReturn({'security_group_rule': copy.deepcopy(sg_rule)})
self.qclient.list_security_groups(id=set([sg_id]),
fields=['id', 'name']) \
.AndReturn({'security_groups': [copy.deepcopy(secgroup)]})
self.mox.ReplayAll()
ret = api.network.security_group_rule_create(
self.request, sg_rule['security_group_id'],
sg_rule['direction'], sg_rule['ethertype'], sg_rule['protocol'],
sg_rule['port_range_min'], sg_rule['port_range_max'],
sg_rule['remote_ip_prefix'], sg_rule['remote_group_id'])
self._cmp_sg_rule(sg_rule, ret)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_rule_delete(self):
sg_rule = self.api_security_group_rules.first()
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.delete_security_group_rule(sg_rule['id'])
self.mox.ReplayAll()
api.network.security_group_rule_delete(self.request, sg_rule['id'])
def _get_instance(self, cur_sg_ids):
instance_port = [p for p in self.api_ports.list()
if p['device_owner'].startswith('compute:')][0]
instance_id = instance_port['device_id']
# Emulate an instance with two ports
instance_ports = []
for _i in range(2):
p = copy.deepcopy(instance_port)
p['id'] = uuidutils.generate_uuid()
p['security_groups'] = cur_sg_ids
instance_ports.append(p)
return (instance_id, instance_ports)
def test_server_security_groups(self):
cur_sg_ids = [sg['id'] for sg in self.api_security_groups.list()[:2]]
instance_id, instance_ports = self._get_instance(cur_sg_ids)
self.qclient.list_ports(device_id=instance_id) \
.AndReturn({'ports': instance_ports})
secgroups = copy.deepcopy(self.api_security_groups.list())
self.qclient.list_security_groups(id=set(cur_sg_ids)) \
.AndReturn({'security_groups': secgroups})
self.mox.ReplayAll()
api.network.server_security_groups(self.request, instance_id)
def test_server_update_security_groups(self):
cur_sg_ids = [self.api_security_groups.first()['id']]
new_sg_ids = [sg['id'] for sg in self.api_security_groups.list()[:2]]
instance_id, instance_ports = self._get_instance(cur_sg_ids)
self.qclient.list_ports(device_id=instance_id) \
.AndReturn({'ports': instance_ports})
for p in instance_ports:
body = {'port': {'security_groups': new_sg_ids}}
self.qclient.update_port(p['id'], body=body).AndReturn({'port': p})
self.mox.ReplayAll()
api.network.server_update_security_groups(
self.request, instance_id, new_sg_ids)
class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
def setUp(self):
super(NetworkApiNeutronFloatingIpTests, self).setUp()
self.qclient.list_extensions() \
.AndReturn({'extensions': self.api_extensions.list()})
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_router': True})
def test_floating_ip_supported(self):
self.mox.ReplayAll()
self.assertTrue(api.network.floating_ip_supported(self.request))
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_router': False})
def test_floating_ip_supported_false(self):
self.mox.ReplayAll()
self.assertFalse(api.network.floating_ip_supported(self.request))
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_floating_ip_pools_list(self):
search_opts = {'router:external': True}
ext_nets = [n for n in self.api_networks.list()
if n['router:external']]
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.list_networks(**search_opts) \
.AndReturn({'networks': ext_nets})
self.mox.ReplayAll()
rets = api.network.floating_ip_pools_list(self.request)
for attr in ['id', 'name']:
self.assertEqual([p[attr] for p in ext_nets],
[getattr(p, attr) for p in rets])
def test_floating_ip_list(self):
fips = self.api_floating_ips.list()
filters = {'tenant_id': self.request.user.tenant_id}
self.qclient.list_floatingips(**filters) \
.AndReturn({'floatingips': fips})
self.qclient.list_ports(**filters) \
.AndReturn({'ports': self.api_ports.list()})
self.mox.ReplayAll()
rets = api.network.tenant_floating_ip_list(self.request)
assoc_port = self.api_ports.list()[1]
self.assertEqual(len(fips), len(rets))
for ret, exp in zip(rets, fips):
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']:
self.assertEqual(exp[attr], getattr(ret, attr))
if exp['port_id']:
dev_id = assoc_port['device_id'] if exp['port_id'] else None
self.assertEqual(dev_id, ret.instance_id)
self.assertEqual('compute', ret.instance_type)
else:
self.assertIsNone(ret.instance_id)
self.assertIsNone(ret.instance_type)
def test_floating_ip_list_all_tenants(self):
fips = self.api_floating_ips.list()
self.qclient.list_floatingips().AndReturn({'floatingips': fips})
self.qclient.list_ports().AndReturn({'ports': self.api_ports.list()})
self.mox.ReplayAll()
# all_tenants option for floating IP list is api.neutron specific,
# so we call api.neutron.FloatingIpManager directly and
# actually we don't need NetworkClient in this test.
# setUp() in the base class sets up mox to expect
# api.base.is_service_enabled() is called and we need to call
# NetworkClient even if we don't use it so that mox.VerifyAll
# doesn't complain it.
api.network.NetworkClient(self.request)
fip_manager = api.neutron.FloatingIpManager(self.request)
rets = fip_manager.list(all_tenants=True)
assoc_port = self.api_ports.list()[1]
self.assertEqual(len(fips), len(rets))
for ret, exp in zip(rets, fips):
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']:
self.assertEqual(getattr(ret, attr), exp[attr])
if exp['port_id']:
dev_id = assoc_port['device_id'] if exp['port_id'] else None
self.assertEqual(dev_id, ret.instance_id)
self.assertEqual('compute', ret.instance_type)
else:
self.assertIsNone(ret.instance_id)
self.assertIsNone(ret.instance_type)
def _test_floating_ip_get_associated(self, assoc_port, exp_instance_type):
fip = self.api_floating_ips.list()[1]
self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip})
self.qclient.show_port(assoc_port['id']) \
.AndReturn({'port': assoc_port})
self.mox.ReplayAll()
ret = api.network.tenant_floating_ip_get(self.request, fip['id'])
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']:
self.assertEqual(fip[attr], getattr(ret, attr))
self.assertEqual(assoc_port['device_id'], ret.instance_id)
self.assertEqual(exp_instance_type, ret.instance_type)
def test_floating_ip_get_associated(self):
assoc_port = self.api_ports.list()[1]
self._test_floating_ip_get_associated(assoc_port, 'compute')
def test_floating_ip_get_associated_with_loadbalancer_vip(self):
assoc_port = copy.deepcopy(self.api_ports.list()[1])
assoc_port['device_owner'] = 'neutron:LOADBALANCER'
assoc_port['device_id'] = uuidutils.generate_uuid()
assoc_port['name'] = 'vip-' + uuidutils.generate_uuid()
self._test_floating_ip_get_associated(assoc_port, 'loadbalancer')
def test_floating_ip_get_unassociated(self):
fip = self.api_floating_ips.list()[0]
self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip})
self.mox.ReplayAll()
ret = api.network.tenant_floating_ip_get(self.request, fip['id'])
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']:
self.assertEqual(fip[attr], getattr(ret, attr))
self.assertIsNone(ret.instance_id)
self.assertIsNone(ret.instance_type)
def test_floating_ip_allocate(self):
ext_nets = [n for n in self.api_networks.list()
if n['router:external']]
ext_net = ext_nets[0]
fip = self.api_floating_ips.first()
self.qclient.create_floatingip(
{'floatingip': {'floating_network_id': ext_net['id'],
'tenant_id': self.request.user.project_id}}) \
.AndReturn({'floatingip': fip})
self.mox.ReplayAll()
ret = api.network.tenant_floating_ip_allocate(self.request,
ext_net['id'])
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']:
self.assertEqual(fip[attr], getattr(ret, attr))
self.assertIsNone(ret.instance_id)
self.assertIsNone(ret.instance_type)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_floating_ip_release(self):
fip = self.api_floating_ips.first()
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.delete_floatingip(fip['id'])
self.mox.ReplayAll()
api.network.tenant_floating_ip_release(self.request, fip['id'])
def test_floating_ip_associate(self):
fip = self.api_floating_ips.list()[1]
assoc_port = self.api_ports.list()[1]
ip_address = assoc_port['fixed_ips'][0]['ip_address']
target_id = '%s_%s' % (assoc_port['id'], ip_address)
params = {'port_id': assoc_port['id'],
'fixed_ip_address': ip_address}
self.qclient.update_floatingip(fip['id'],
{'floatingip': params})
self.mox.ReplayAll()
api.network.floating_ip_associate(self.request, fip['id'], target_id)
def test_floating_ip_disassociate(self):
fip = self.api_floating_ips.list()[1]
self.qclient.update_floatingip(fip['id'],
{'floatingip': {'port_id': None}})
self.mox.ReplayAll()
api.network.floating_ip_disassociate(self.request, fip['id'])
def _get_target_id(self, port):
param = {'id': port['id'],
'addr': port['fixed_ips'][0]['ip_address']}
return '%(id)s_%(addr)s' % param
def _get_target_name(self, port):
param = {'svrid': port['device_id'],
'addr': port['fixed_ips'][0]['ip_address']}
return 'server_%(svrid)s: %(addr)s' % param
def _subs_from_port(self, port):
return [ip['subnet_id'] for ip in port['fixed_ips']]
@override_settings(
OPENSTACK_NEUTRON_NETWORK={
'enable_fip_topology_check': True,
}
)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_floating_ip_target_list(self):
ports = self.api_ports.list()
# Port on the first subnet is connected to a router
# attached to external network in neutron_data.
subnet_id = self.subnets.first().id
shared_nets = [n for n in self.api_networks.list() if n['shared']]
shared_subnet_ids = [s for n in shared_nets for s in n['subnets']]
target_ports = [
(self._get_target_id(p), self._get_target_name(p)) for p in ports
if (not p['device_owner'].startswith('network:') and
(subnet_id in self._subs_from_port(p) or
(set(shared_subnet_ids) & set(self._subs_from_port(p)))))
]
filters = {'tenant_id': self.request.user.tenant_id}
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.list_ports(**filters).AndReturn({'ports': ports})
servers = self.servers.list()
novaclient = self.stub_novaclient()
novaclient.servers = self.mox.CreateMockAnything()
search_opts = {'project_id': self.request.user.tenant_id}
novaclient.servers.list(False, search_opts).AndReturn(servers)
search_opts = {'router:external': True}
ext_nets = [n for n in self.api_networks.list()
if n['router:external']]
self.qclient.list_networks(**search_opts) \
.AndReturn({'networks': ext_nets})
self.qclient.list_routers().AndReturn({'routers':
self.api_routers.list()})
self.qclient.list_networks(shared=True).AndReturn({'networks':
shared_nets})
shared_subs = [s for s in self.api_subnets.list()
if s['id'] in shared_subnet_ids]
self.qclient.list_subnets().AndReturn({'subnets': shared_subs})
self.mox.ReplayAll()
rets = api.network.floating_ip_target_list(self.request)
self.assertEqual(len(target_ports), len(rets))
for ret, exp in zip(rets, target_ports):
self.assertEqual(exp[0], ret.id)
self.assertEqual(exp[1], ret.name)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_floating_ip_target_get_by_instance(self):
ports = self.api_ports.list()
candidates = [p for p in ports if p['device_id'] == '1']
search_opts = {'device_id': '1'}
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.list_ports(**search_opts).AndReturn({'ports': candidates})
self.mox.ReplayAll()
ret = api.network.floating_ip_target_get_by_instance(self.request, '1')
self.assertEqual(self._get_target_id(candidates[0]), ret)
def test_target_floating_ip_port_by_instance(self):
ports = self.api_ports.list()
candidates = [p for p in ports if p['device_id'] == '1']
search_opts = {'device_id': '1'}
self.qclient.list_ports(**search_opts).AndReturn({'ports': candidates})
self.mox.ReplayAll()
ret = api.network.floating_ip_target_list_by_instance(self.request,
'1')
self.assertEqual(self._get_target_id(candidates[0]), ret[0])
self.assertEqual(len(candidates), len(ret))
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_floating_ip_target_get_by_instance_with_preloaded_target(self):
target_list = [{'name': 'name11', 'id': 'id11', 'instance_id': 'vm1'},
{'name': 'name21', 'id': 'id21', 'instance_id': 'vm2'},
{'name': 'name22', 'id': 'id22', 'instance_id': 'vm2'}]
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.mox.ReplayAll()
ret = api.network.floating_ip_target_get_by_instance(
self.request, 'vm2', target_list)
self.assertEqual('id21', ret)
# @test.create_stubs({api.neutron: ('is_extension_supported', )})
def test_target_floating_ip_port_by_instance_with_preloaded_target(self):
target_list = [{'name': 'name11', 'id': 'id11', 'instance_id': 'vm1'},
{'name': 'name21', 'id': 'id21', 'instance_id': 'vm2'},
{'name': 'name22', 'id': 'id22', 'instance_id': 'vm2'}]
# api.neutron.is_extension_supported(self.request, 'security-group').\
# AndReturn(True)
self.mox.ReplayAll()
ret = api.network.floating_ip_target_list_by_instance(
self.request, 'vm2', target_list)
self.assertEqual(['id21', 'id22'], ret)

View File

@ -14,19 +14,24 @@
import copy
from mox3.mox import IsA
from neutronclient.common import exceptions as neutron_exc
from oslo_utils import uuidutils
import six
from django import http
from django.test.utils import override_settings
from oslo_utils import uuidutils
from neutronclient.common import exceptions as neutron_exc
from openstack_dashboard import api
from openstack_dashboard import policy
from openstack_dashboard.test import helpers as test
class NeutronApiTestBase(test.APITestCase):
def setUp(self):
super(NeutronApiTestBase, self).setUp()
self.qclient = self.stub_neutronclient()
class NeutronApiTests(test.APITestCase):
def test_network_list(self):
networks = {'networks': self.api_networks.list()}
@ -696,3 +701,433 @@ class NeutronApiTests(test.APITestCase):
request=self.request)
self.assertEqual(10, len(ret_val))
self.assertEqual(port_ids, [p.id for p in ret_val])
class NeutronApiSecurityGroupTests(NeutronApiTestBase):
def setUp(self):
super(NeutronApiSecurityGroupTests, self).setUp()
self.sg_dict = dict([(sg['id'], sg['name']) for sg
in self.api_security_groups.list()])
def _cmp_sg_rule(self, exprule, retrule):
self.assertEqual(exprule['id'], retrule.id)
self.assertEqual(exprule['security_group_id'],
retrule.parent_group_id)
self.assertEqual(exprule['direction'],
retrule.direction)
self.assertEqual(exprule['ethertype'],
retrule.ethertype)
self.assertEqual(exprule['port_range_min'],
retrule.from_port)
self.assertEqual(exprule['port_range_max'],
retrule.to_port,)
if (exprule['remote_ip_prefix'] is None and
exprule['remote_group_id'] is None):
expcidr = ('::/0' if exprule['ethertype'] == 'IPv6'
else '0.0.0.0/0')
else:
expcidr = exprule['remote_ip_prefix']
self.assertEqual(expcidr, retrule.ip_range.get('cidr'))
self.assertEqual(self.sg_dict.get(exprule['remote_group_id']),
retrule.group.get('name'))
def _cmp_sg(self, exp_sg, ret_sg):
self.assertEqual(exp_sg['id'], ret_sg.id)
self.assertEqual(exp_sg['name'], ret_sg.name)
exp_rules = exp_sg['security_group_rules']
self.assertEqual(len(exp_rules), len(ret_sg.rules))
for (exprule, retrule) in six.moves.zip(exp_rules, ret_sg.rules):
self._cmp_sg_rule(exprule, retrule)
def test_security_group_list(self):
sgs = self.api_security_groups.list()
tenant_id = self.request.user.tenant_id
# use deepcopy to ensure self.api_security_groups is not modified.
self.qclient.list_security_groups(tenant_id=tenant_id) \
.AndReturn({'security_groups': copy.deepcopy(sgs)})
self.mox.ReplayAll()
rets = api.neutron.security_group_list(self.request)
self.assertEqual(len(sgs), len(rets))
for (exp, ret) in six.moves.zip(sgs, rets):
self._cmp_sg(exp, ret)
def test_security_group_get(self):
secgroup = self.api_security_groups.first()
sg_ids = set([secgroup['id']] +
[rule['remote_group_id'] for rule
in secgroup['security_group_rules']
if rule['remote_group_id']])
related_sgs = [sg for sg in self.api_security_groups.list()
if sg['id'] in sg_ids]
# use deepcopy to ensure self.api_security_groups is not modified.
self.qclient.show_security_group(secgroup['id']) \
.AndReturn({'security_group': copy.deepcopy(secgroup)})
self.qclient.list_security_groups(id=sg_ids, fields=['id', 'name']) \
.AndReturn({'security_groups': related_sgs})
self.mox.ReplayAll()
ret = api.neutron.security_group_get(self.request, secgroup['id'])
self._cmp_sg(secgroup, ret)
def test_security_group_create(self):
secgroup = self.api_security_groups.list()[1]
body = {'security_group':
{'name': secgroup['name'],
'description': secgroup['description'],
'tenant_id': self.request.user.project_id}}
self.qclient.create_security_group(body) \
.AndReturn({'security_group': copy.deepcopy(secgroup)})
self.mox.ReplayAll()
ret = api.neutron.security_group_create(self.request, secgroup['name'],
secgroup['description'])
self._cmp_sg(secgroup, ret)
def test_security_group_update(self):
secgroup = self.api_security_groups.list()[1]
secgroup = copy.deepcopy(secgroup)
secgroup['name'] = 'newname'
secgroup['description'] = 'new description'
body = {'security_group':
{'name': secgroup['name'],
'description': secgroup['description']}}
self.qclient.update_security_group(secgroup['id'], body) \
.AndReturn({'security_group': secgroup})
self.mox.ReplayAll()
ret = api.neutron.security_group_update(self.request,
secgroup['id'],
secgroup['name'],
secgroup['description'])
self._cmp_sg(secgroup, ret)
def test_security_group_delete(self):
secgroup = self.api_security_groups.first()
self.qclient.delete_security_group(secgroup['id'])
self.mox.ReplayAll()
api.neutron.security_group_delete(self.request, secgroup['id'])
def test_security_group_rule_create(self):
sg_rule = [r for r in self.api_security_group_rules.list()
if r['protocol'] == 'tcp' and r['remote_ip_prefix']][0]
sg_id = sg_rule['security_group_id']
secgroup = [sg for sg in self.api_security_groups.list()
if sg['id'] == sg_id][0]
post_rule = copy.deepcopy(sg_rule)
del post_rule['id']
del post_rule['tenant_id']
post_body = {'security_group_rule': post_rule}
self.qclient.create_security_group_rule(post_body) \
.AndReturn({'security_group_rule': copy.deepcopy(sg_rule)})
self.qclient.list_security_groups(id=set([sg_id]),
fields=['id', 'name']) \
.AndReturn({'security_groups': [copy.deepcopy(secgroup)]})
self.mox.ReplayAll()
ret = api.neutron.security_group_rule_create(
self.request, sg_rule['security_group_id'],
sg_rule['direction'], sg_rule['ethertype'], sg_rule['protocol'],
sg_rule['port_range_min'], sg_rule['port_range_max'],
sg_rule['remote_ip_prefix'], sg_rule['remote_group_id'])
self._cmp_sg_rule(sg_rule, ret)
def test_security_group_rule_delete(self):
sg_rule = self.api_security_group_rules.first()
self.qclient.delete_security_group_rule(sg_rule['id'])
self.mox.ReplayAll()
api.neutron.security_group_rule_delete(self.request, sg_rule['id'])
def _get_instance(self, cur_sg_ids):
instance_port = [p for p in self.api_ports.list()
if p['device_owner'].startswith('compute:')][0]
instance_id = instance_port['device_id']
# Emulate an instance with two ports
instance_ports = []
for _i in range(2):
p = copy.deepcopy(instance_port)
p['id'] = uuidutils.generate_uuid()
p['security_groups'] = cur_sg_ids
instance_ports.append(p)
return (instance_id, instance_ports)
def test_server_security_groups(self):
cur_sg_ids = [sg['id'] for sg in self.api_security_groups.list()[:2]]
instance_id, instance_ports = self._get_instance(cur_sg_ids)
self.qclient.list_ports(device_id=instance_id) \
.AndReturn({'ports': instance_ports})
secgroups = copy.deepcopy(self.api_security_groups.list())
self.qclient.list_security_groups(id=set(cur_sg_ids)) \
.AndReturn({'security_groups': secgroups})
self.mox.ReplayAll()
api.neutron.server_security_groups(self.request, instance_id)
def test_server_update_security_groups(self):
cur_sg_ids = [self.api_security_groups.first()['id']]
new_sg_ids = [sg['id'] for sg in self.api_security_groups.list()[:2]]
instance_id, instance_ports = self._get_instance(cur_sg_ids)
self.qclient.list_ports(device_id=instance_id) \
.AndReturn({'ports': instance_ports})
for p in instance_ports:
body = {'port': {'security_groups': new_sg_ids}}
self.qclient.update_port(p['id'], body=body).AndReturn({'port': p})
self.mox.ReplayAll()
api.neutron.server_update_security_groups(
self.request, instance_id, new_sg_ids)
class NeutronApiFloatingIpTests(NeutronApiTestBase):
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_router': True})
def test_floating_ip_supported(self):
self.mox.ReplayAll()
self.assertTrue(api.neutron.floating_ip_supported(self.request))
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_router': False})
def test_floating_ip_supported_false(self):
self.mox.ReplayAll()
self.assertFalse(api.neutron.floating_ip_supported(self.request))
def test_floating_ip_pools_list(self):
search_opts = {'router:external': True}
ext_nets = [n for n in self.api_networks.list()
if n['router:external']]
self.qclient.list_networks(**search_opts) \
.AndReturn({'networks': ext_nets})
self.mox.ReplayAll()
rets = api.neutron.floating_ip_pools_list(self.request)
for attr in ['id', 'name']:
self.assertEqual([p[attr] for p in ext_nets],
[getattr(p, attr) for p in rets])
def test_floating_ip_list(self):
fips = self.api_floating_ips.list()
filters = {'tenant_id': self.request.user.tenant_id}
self.qclient.list_floatingips(**filters) \
.AndReturn({'floatingips': fips})
self.qclient.list_ports(**filters) \
.AndReturn({'ports': self.api_ports.list()})
self.mox.ReplayAll()
rets = api.neutron.tenant_floating_ip_list(self.request)
assoc_port = self.api_ports.list()[1]
self.assertEqual(len(fips), len(rets))
for ret, exp in zip(rets, fips):
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']:
self.assertEqual(exp[attr], getattr(ret, attr))
if exp['port_id']:
dev_id = assoc_port['device_id'] if exp['port_id'] else None
self.assertEqual(dev_id, ret.instance_id)
self.assertEqual('compute', ret.instance_type)
else:
self.assertIsNone(ret.instance_id)
self.assertIsNone(ret.instance_type)
def test_floating_ip_list_all_tenants(self):
fips = self.api_floating_ips.list()
self.qclient.list_floatingips().AndReturn({'floatingips': fips})
self.qclient.list_ports().AndReturn({'ports': self.api_ports.list()})
self.mox.ReplayAll()
fip_manager = api.neutron.FloatingIpManager(self.request)
rets = fip_manager.list(all_tenants=True)
assoc_port = self.api_ports.list()[1]
self.assertEqual(len(fips), len(rets))
for ret, exp in zip(rets, fips):
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']:
self.assertEqual(getattr(ret, attr), exp[attr])
if exp['port_id']:
dev_id = assoc_port['device_id'] if exp['port_id'] else None
self.assertEqual(dev_id, ret.instance_id)
self.assertEqual('compute', ret.instance_type)
else:
self.assertIsNone(ret.instance_id)
self.assertIsNone(ret.instance_type)
def _test_floating_ip_get_associated(self, assoc_port, exp_instance_type):
fip = self.api_floating_ips.list()[1]
self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip})
self.qclient.show_port(assoc_port['id']) \
.AndReturn({'port': assoc_port})
self.mox.ReplayAll()
ret = api.neutron.tenant_floating_ip_get(self.request, fip['id'])
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']:
self.assertEqual(fip[attr], getattr(ret, attr))
self.assertEqual(assoc_port['device_id'], ret.instance_id)
self.assertEqual(exp_instance_type, ret.instance_type)
def test_floating_ip_get_associated(self):
assoc_port = self.api_ports.list()[1]
self._test_floating_ip_get_associated(assoc_port, 'compute')
def test_floating_ip_get_associated_with_loadbalancer_vip(self):
assoc_port = copy.deepcopy(self.api_ports.list()[1])
assoc_port['device_owner'] = 'neutron:LOADBALANCER'
assoc_port['device_id'] = uuidutils.generate_uuid()
assoc_port['name'] = 'vip-' + uuidutils.generate_uuid()
self._test_floating_ip_get_associated(assoc_port, 'loadbalancer')
def test_floating_ip_get_unassociated(self):
fip = self.api_floating_ips.list()[0]
self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip})
self.mox.ReplayAll()
ret = api.neutron.tenant_floating_ip_get(self.request, fip['id'])
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']:
self.assertEqual(fip[attr], getattr(ret, attr))
self.assertIsNone(ret.instance_id)
self.assertIsNone(ret.instance_type)
def test_floating_ip_allocate(self):
ext_nets = [n for n in self.api_networks.list()
if n['router:external']]
ext_net = ext_nets[0]
fip = self.api_floating_ips.first()
self.qclient.create_floatingip(
{'floatingip': {'floating_network_id': ext_net['id'],
'tenant_id': self.request.user.project_id}}) \
.AndReturn({'floatingip': fip})
self.mox.ReplayAll()
ret = api.neutron.tenant_floating_ip_allocate(self.request,
ext_net['id'])
for attr in ['id', 'ip', 'pool', 'fixed_ip', 'port_id']:
self.assertEqual(fip[attr], getattr(ret, attr))
self.assertIsNone(ret.instance_id)
self.assertIsNone(ret.instance_type)
def test_floating_ip_release(self):
fip = self.api_floating_ips.first()
self.qclient.delete_floatingip(fip['id'])
self.mox.ReplayAll()
api.neutron.tenant_floating_ip_release(self.request, fip['id'])
def test_floating_ip_associate(self):
fip = self.api_floating_ips.list()[1]
assoc_port = self.api_ports.list()[1]
ip_address = assoc_port['fixed_ips'][0]['ip_address']
target_id = '%s_%s' % (assoc_port['id'], ip_address)
params = {'port_id': assoc_port['id'],
'fixed_ip_address': ip_address}
self.qclient.update_floatingip(fip['id'],
{'floatingip': params})
self.mox.ReplayAll()
api.neutron.floating_ip_associate(self.request, fip['id'], target_id)
def test_floating_ip_disassociate(self):
fip = self.api_floating_ips.list()[1]
self.qclient.update_floatingip(fip['id'],
{'floatingip': {'port_id': None}})
self.mox.ReplayAll()
api.neutron.floating_ip_disassociate(self.request, fip['id'])
def _get_target_id(self, port):
param = {'id': port['id'],
'addr': port['fixed_ips'][0]['ip_address']}
return '%(id)s_%(addr)s' % param
def _get_target_name(self, port):
param = {'svrid': port['device_id'],
'addr': port['fixed_ips'][0]['ip_address']}
return 'server_%(svrid)s: %(addr)s' % param
def _subs_from_port(self, port):
return [ip['subnet_id'] for ip in port['fixed_ips']]
@override_settings(
OPENSTACK_NEUTRON_NETWORK={
'enable_fip_topology_check': True,
}
)
def test_floating_ip_target_list(self):
ports = self.api_ports.list()
# Port on the first subnet is connected to a router
# attached to external network in neutron_data.
subnet_id = self.subnets.first().id
shared_nets = [n for n in self.api_networks.list() if n['shared']]
shared_subnet_ids = [s for n in shared_nets for s in n['subnets']]
target_ports = [
(self._get_target_id(p), self._get_target_name(p)) for p in ports
if (not p['device_owner'].startswith('network:') and
(subnet_id in self._subs_from_port(p) or
(set(shared_subnet_ids) & set(self._subs_from_port(p)))))
]
filters = {'tenant_id': self.request.user.tenant_id}
self.qclient.list_ports(**filters).AndReturn({'ports': ports})
servers = self.servers.list()
novaclient = self.stub_novaclient()
novaclient.servers = self.mox.CreateMockAnything()
search_opts = {'project_id': self.request.user.tenant_id}
novaclient.servers.list(False, search_opts).AndReturn(servers)
search_opts = {'router:external': True}
ext_nets = [n for n in self.api_networks.list()
if n['router:external']]
self.qclient.list_networks(**search_opts) \
.AndReturn({'networks': ext_nets})
self.qclient.list_routers().AndReturn({'routers':
self.api_routers.list()})
self.qclient.list_networks(shared=True).AndReturn({'networks':
shared_nets})
shared_subs = [s for s in self.api_subnets.list()
if s['id'] in shared_subnet_ids]
self.qclient.list_subnets().AndReturn({'subnets': shared_subs})
self.mox.ReplayAll()
rets = api.neutron.floating_ip_target_list(self.request)
self.assertEqual(len(target_ports), len(rets))
for ret, exp in zip(rets, target_ports):
self.assertEqual(exp[0], ret.id)
self.assertEqual(exp[1], ret.name)
def test_floating_ip_target_get_by_instance(self):
ports = self.api_ports.list()
candidates = [p for p in ports if p['device_id'] == '1']
search_opts = {'device_id': '1'}
self.qclient.list_ports(**search_opts).AndReturn({'ports': candidates})
self.mox.ReplayAll()
ret = api.neutron.floating_ip_target_get_by_instance(self.request, '1')
self.assertEqual(self._get_target_id(candidates[0]), ret)
def test_target_floating_ip_port_by_instance(self):
ports = self.api_ports.list()
candidates = [p for p in ports if p['device_id'] == '1']
search_opts = {'device_id': '1'}
self.qclient.list_ports(**search_opts).AndReturn({'ports': candidates})
self.mox.ReplayAll()
ret = api.neutron.floating_ip_target_list_by_instance(self.request,
'1')
self.assertEqual(self._get_target_id(candidates[0]), ret[0])
self.assertEqual(len(candidates), len(ret))
def test_floating_ip_target_get_by_instance_with_preloaded_target(self):
target_list = [{'name': 'name11', 'id': 'id11', 'instance_id': 'vm1'},
{'name': 'name21', 'id': 'id21', 'instance_id': 'vm2'},
{'name': 'name22', 'id': 'id22', 'instance_id': 'vm2'}]
self.mox.ReplayAll()
ret = api.neutron.floating_ip_target_get_by_instance(
self.request, 'vm2', target_list)
self.assertEqual('id21', ret)
def test_target_floating_ip_port_by_instance_with_preloaded_target(self):
target_list = [{'name': 'name11', 'id': 'id11', 'instance_id': 'vm1'},
{'name': 'name21', 'id': 'id21', 'instance_id': 'vm2'},
{'name': 'name22', 'id': 'id22', 'instance_id': 'vm2'}]
self.mox.ReplayAll()
ret = api.neutron.floating_ip_target_list_by_instance(
self.request, 'vm2', target_list)
self.assertEqual(['id21', 'id22'], ret)

View File

@ -115,7 +115,7 @@ class NovaRestTestCase(test.TestCase):
#
# Security Groups
#
@mock.patch.object(nova.api, 'network')
@mock.patch.object(nova.api, 'neutron')
def test_securitygroups_list(self, nc):
request = self.mock_rest_request()
nc.server_security_groups.return_value = [

View File

@ -73,7 +73,7 @@ class QuotaTests(test.APITestCase):
@test.create_stubs({api.nova: ('server_list',
'flavor_list',
'tenant_quota_get',),
api.network: ('tenant_floating_ip_list',
api.neutron: ('tenant_floating_ip_list',
'floating_ip_supported'),
api.base: ('is_service_enabled',),
cinder: ('volume_list', 'volume_snapshot_list',
@ -95,9 +95,9 @@ class QuotaTests(test.APITestCase):
if s.tenant_id == self.request.user.tenant_id]
api.nova.flavor_list(IsA(http.HttpRequest)) \
.AndReturn(self.flavors.list())
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.AndReturn(True)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
search_opts = {'tenant_id': self.request.user.tenant_id}
api.nova.server_list(IsA(http.HttpRequest),
@ -159,7 +159,7 @@ class QuotaTests(test.APITestCase):
@test.create_stubs({api.nova: ('server_list',
'flavor_list',
'tenant_quota_get',),
api.network: ('tenant_floating_ip_list',
api.neutron: ('tenant_floating_ip_list',
'floating_ip_supported'),
api.base: ('is_service_enabled',),
api.cinder: ('is_volume_service_enabled',)})
@ -178,9 +178,9 @@ class QuotaTests(test.APITestCase):
.AndReturn(self.flavors.list())
api.nova.tenant_quota_get(IsA(http.HttpRequest), '1') \
.AndReturn(self.quotas.first())
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.AndReturn(True)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
search_opts = {'tenant_id': self.request.user.tenant_id}
api.nova.server_list(IsA(http.HttpRequest), search_opts=search_opts) \
@ -202,7 +202,7 @@ class QuotaTests(test.APITestCase):
@test.create_stubs({api.nova: ('server_list',
'flavor_list',
'tenant_quota_get',),
api.network: ('tenant_floating_ip_list',
api.neutron: ('tenant_floating_ip_list',
'floating_ip_supported'),
api.base: ('is_service_enabled',),
api.cinder: ('is_volume_service_enabled',)})
@ -218,9 +218,9 @@ class QuotaTests(test.APITestCase):
.AndReturn(self.flavors.list())
api.nova.tenant_quota_get(IsA(http.HttpRequest), '1') \
.AndReturn(self.quotas.first())
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.AndReturn(True)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn([])
search_opts = {'tenant_id': self.request.user.tenant_id}
api.nova.server_list(IsA(http.HttpRequest), search_opts=search_opts) \
@ -243,7 +243,7 @@ class QuotaTests(test.APITestCase):
@test.create_stubs({api.nova: ('server_list',
'flavor_list',
'tenant_quota_get',),
api.network: ('tenant_floating_ip_list',
api.neutron: ('tenant_floating_ip_list',
'floating_ip_supported'),
api.base: ('is_service_enabled',),
cinder: ('volume_list', 'volume_snapshot_list',
@ -266,9 +266,9 @@ class QuotaTests(test.APITestCase):
.AndReturn(self.flavors.list())
api.nova.tenant_quota_get(IsA(http.HttpRequest), '1') \
.AndReturn(inf_quota)
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.AndReturn(True)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
api.neutron.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
search_opts = {'tenant_id': self.request.user.tenant_id}
api.nova.server_list(IsA(http.HttpRequest), search_opts=search_opts) \
@ -295,7 +295,7 @@ class QuotaTests(test.APITestCase):
@test.create_stubs({api.nova: ('server_list',
'flavor_list',
'tenant_quota_get',),
api.network: ('tenant_floating_ip_list',
api.neutron: ('tenant_floating_ip_list',
'floating_ip_supported'),
api.base: ('is_service_enabled',),
cinder: ('volume_list', 'volume_snapshot_list',
@ -316,7 +316,7 @@ class QuotaTests(test.APITestCase):
.AndReturn(self.flavors.list())
api.nova.tenant_quota_get(IsA(http.HttpRequest), '1') \
.AndReturn(self.quotas.first())
api.network.floating_ip_supported(IsA(http.HttpRequest)) \
api.neutron.floating_ip_supported(IsA(http.HttpRequest)) \
.AndReturn(False)
search_opts = {'tenant_id': self.request.user.tenant_id}
api.nova.server_list(IsA(http.HttpRequest), search_opts=search_opts) \

View File

@ -114,12 +114,12 @@ class BaseUsage(object):
def _get_neutron_usage(self, limits, resource_name):
resource_map = {
'floatingip': {
'api': api.network.tenant_floating_ip_list,
'api': api.neutron.tenant_floating_ip_list,
'limit_name': 'totalFloatingIpsUsed',
'message': _('Unable to retrieve floating IP addresses.')
},
'security_group': {
'api': api.network.security_group_list,
'api': api.neutron.security_group_list,
'limit_name': 'totalSecurityGroupsUsed',
'message': _('Unable to retrieve security groups.')
}
@ -160,7 +160,7 @@ class BaseUsage(object):
neutron_sg_used = (
api.neutron.is_extension_supported(self.request,
'security-group'))
if api.network.floating_ip_supported(self.request):
if api.neutron.floating_ip_supported(self.request):
self._get_neutron_usage(self.limits, 'floatingip')
if neutron_sg_used:
self._get_neutron_usage(self.limits, 'security_group')

View File

@ -21,7 +21,6 @@ from horizon.utils.memoized import memoized
from openstack_dashboard.api import base
from openstack_dashboard.api import cinder
from openstack_dashboard.api import network
from openstack_dashboard.api import neutron
from openstack_dashboard.api import nova
from openstack_dashboard.contrib.developer.profiler import api as profiler
@ -348,15 +347,15 @@ def _get_tenant_network_usages(request, usages, disabled_quotas, tenant_id):
if {'floatingip', 'floating_ips'} & enabled_quotas:
floating_ips = []
try:
if network.floating_ip_supported(request):
floating_ips = network.tenant_floating_ip_list(request)
if neutron.floating_ip_supported(request):
floating_ips = neutron.tenant_floating_ip_list(request)
except Exception:
pass
usages.tally('floating_ips', len(floating_ips))
if 'security_group' not in disabled_quotas:
security_groups = []
security_groups = network.security_group_list(request)
security_groups = neutron.security_group_list(request)
usages.tally('security_groups', len(security_groups))
if 'network' not in disabled_quotas: