Merge "Introduce NeutronService helper"

This commit is contained in:
Zuul 2020-05-25 14:25:57 +00:00 committed by Gerrit Code Review
commit 918a2c4b02
14 changed files with 3481 additions and 945 deletions

View File

@ -0,0 +1,57 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from rally.common import logging
from rally.common import utils
LOG = logging.getLogger(__name__)
_IPv4_START_CIDR = "10.2.0.0/24"
_IPv6_START_CIDR = "dead:beaf::/64"
_IPv4_CIDR_INCR = utils.RAMInt()
_IPv6_CIDR_INCR = utils.RAMInt()
def get_ip_version(ip):
return netaddr.IPNetwork(ip).version
def generate_cidr(ip_version=None, start_cidr=None):
"""Generate next CIDR for network or subnet, without IP overlapping.
This is process and thread safe, because `cidr_incr' points to
value stored directly in RAM. This guarantees that CIDRs will be
serial and unique even under hard multiprocessing/threading load.
:param ip_version: version of IP to take default value for start_cidr
:param start_cidr: start CIDR str
"""
if start_cidr is None:
if ip_version == 6:
start_cidr = _IPv6_START_CIDR
else:
start_cidr = _IPv4_START_CIDR
ip_version = get_ip_version(start_cidr)
if ip_version == 4:
cidr = str(netaddr.IPNetwork(start_cidr).next(next(_IPv4_CIDR_INCR)))
else:
cidr = str(netaddr.IPNetwork(start_cidr).next(next(_IPv6_CIDR_INCR)))
LOG.debug("CIDR generated: %s" % cidr)
return ip_version, cidr

File diff suppressed because it is too large Load Diff

View File

@ -14,26 +14,22 @@
# under the License.
import abc
import itertools
import netaddr
from neutronclient.common import exceptions as neutron_exceptions
from rally.common import cfg
from rally.common import logging
from rally.common import utils
from rally import exceptions
from rally_openstack.common import consts
from rally_openstack.common.services.network import net_utils
from rally_openstack.common.services.network import neutron
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
cidr_incr = utils.RAMInt()
ipv6_cidr_incr = utils.RAMInt()
def generate_cidr(start_cidr="10.2.0.0/24"):
"""Generate next CIDR for network or subnet, without IP overlapping.
@ -44,11 +40,7 @@ def generate_cidr(start_cidr="10.2.0.0/24"):
:param start_cidr: start CIDR str
:returns: next available CIDR str
"""
if netaddr.IPNetwork(start_cidr).version == 4:
cidr = str(netaddr.IPNetwork(start_cidr).next(next(cidr_incr)))
else:
cidr = str(netaddr.IPNetwork(start_cidr).next(next(ipv6_cidr_incr)))
LOG.debug("CIDR generated: %s" % cidr)
ip_version, cidr = net_utils.generate_cidr(start_cidr=start_cidr)
return cidr
@ -69,7 +61,7 @@ class NetworkWrapper(object, metaclass=abc.ABCMeta):
START_IPV6_CIDR = "dead:beaf::/64"
SERVICE_IMPL = None
def __init__(self, clients, owner, config=None):
def __init__(self, clients, owner, config=None, atomics=None):
"""Returns available network wrapper instance.
:param clients: rally.plugins.openstack.osclients.Clients instance
@ -124,10 +116,22 @@ class NeutronWrapper(NetworkWrapper):
LB_METHOD = "ROUND_ROBIN"
LB_PROTOCOL = "HTTP"
def __init__(self, *args, **kwargs):
super(NeutronWrapper, self).__init__(*args, **kwargs)
class _SingleClientWrapper(object):
def neutron(_self):
return self.client
self.neutron = neutron.NeutronService(
clients=_SingleClientWrapper(),
name_generator=self.owner.generate_random_name,
atomic_inst=getattr(self.owner, "_atomic_actions", [])
)
@property
def external_networks(self):
return self.client.list_networks(**{
"router:external": True})["networks"]
return self.neutron.list_networks(router_external=True)
@property
def ext_gw_mode_enabled(self):
@ -135,25 +139,30 @@ class NeutronWrapper(NetworkWrapper):
Without this extension, we can't pass the enable_snat parameter.
"""
return any(e["alias"] == "ext-gw-mode"
for e in self.client.list_extensions()["extensions"])
return self.neutron.supports_extension("ext-gw-mode", silent=True)
def get_network(self, net_id=None, name=None):
net = None
try:
if net_id:
net = self.client.show_network(net_id)["network"]
net = self.neutron.get_network(net_id)
else:
for net in self.client.list_networks(name=name)["networks"]:
break
networks = self.neutron.list_networks(name=name)
if networks:
net = networks[0]
except neutron_exceptions.NeutronClientException:
pass
if net:
return {"id": net["id"],
"name": net["name"],
"tenant_id": net["tenant_id"],
"tenant_id": net.get("tenant_id",
net.get("project_id", None)),
"status": net["status"],
"external": net["router:external"],
"subnets": net["subnets"],
"external": net.get("router:external", False),
"subnets": net.get("subnets", []),
"router_id": None}
except (TypeError, neutron_exceptions.NeutronClientException):
else:
raise NetworkWrapperException(
"Network not found: %s" % (name or net_id))
@ -164,14 +173,12 @@ class NeutronWrapper(NetworkWrapper):
:param **kwargs: POST /v2.0/routers request options
:returns: neutron router dict
"""
kwargs["name"] = self.owner.generate_random_name()
kwargs.pop("name", None)
if "tenant_id" in kwargs and "project_id" not in kwargs:
kwargs["project_id"] = kwargs.pop("tenant_id")
if external and "external_gateway_info" not in kwargs:
for net in self.external_networks:
kwargs["external_gateway_info"] = {"network_id": net["id"]}
if self.ext_gw_mode_enabled:
kwargs["external_gateway_info"]["enable_snat"] = True
return self.client.create_router({"router": kwargs})["router"]
return self.neutron.create_router(
discover_external_gw=external, **kwargs)
def create_v1_pool(self, tenant_id, subnet_id, **kwargs):
"""Create LB Pool (v1).
@ -194,9 +201,10 @@ class NeutronWrapper(NetworkWrapper):
def _generate_cidr(self, ip_version=4):
# TODO(amaretskiy): Generate CIDRs unique for network, not cluster
return generate_cidr(
ip_version, cidr = net_utils.generate_cidr(
start_cidr=self.start_cidr if ip_version == 4
else self.start_ipv6_cidr)
return cidr
def _create_network_infrastructure(self, tenant_id, **kwargs):
"""Create network.
@ -218,51 +226,34 @@ class NeutronWrapper(NetworkWrapper):
See above for recognized keyword args.
:returns: dict, network data
"""
network_args = {"network": kwargs.get("network_create_args", {})}
network_args["network"].update({
"tenant_id": tenant_id,
"name": self.owner.generate_random_name()})
network = self.client.create_network(network_args)["network"]
network_args = dict(kwargs.get("network_create_args", {}))
network_args["project_id"] = tenant_id
router = None
router_args = dict(kwargs.get("router_create_args", {}))
add_router = kwargs.get("add_router", False)
if router_args or add_router:
router_args["external"] = (
router_args.get("external", False) or add_router)
router_args["tenant_id"] = tenant_id
router = self.create_router(**router_args)
if not (router_args or add_router):
router_args = None
else:
router_args["project_id"] = tenant_id
router_args["discover_external_gw"] = router_args.pop(
"external", False) or add_router
subnet_create_args = {"project_id": tenant_id}
if "dns_nameservers" in kwargs:
subnet_create_args["dns_nameservers"] = kwargs["dns_nameservers"]
dualstack = kwargs.get("dualstack", False)
subnets = []
subnets_num = kwargs.get("subnets_num", 0)
ip_versions = itertools.cycle(
[self.SUBNET_IP_VERSION, self.SUBNET_IPV6_VERSION]
if dualstack else [self.SUBNET_IP_VERSION])
for i in range(subnets_num):
ip_version = next(ip_versions)
subnet_args = {
"subnet": {
"tenant_id": tenant_id,
"network_id": network["id"],
"name": self.owner.generate_random_name(),
"ip_version": ip_version,
"cidr": self._generate_cidr(ip_version),
"enable_dhcp": True,
"dns_nameservers": (
kwargs.get("dns_nameservers", ["8.8.8.8", "8.8.4.4"])
if ip_version == 4
else kwargs.get("dns_nameservers",
["dead:beaf::1", "dead:beaf::2"]))
}
}
subnet = self.client.create_subnet(subnet_args)["subnet"]
subnets.append(subnet)
if router:
self.client.add_interface_router(router["id"],
{"subnet_id": subnet["id"]})
net_topo = self.neutron.create_network_topology(
network_create_args=network_args,
router_create_args=router_args,
subnet_create_args=subnet_create_args,
subnets_dualstack=kwargs.get("dualstack", False),
subnets_count=kwargs.get("subnets_num", 0)
)
network = net_topo["network"]
subnets = net_topo["subnets"]
if net_topo["routers"]:
router = net_topo["routers"][0]
else:
router = None
return {
"network": {
@ -309,50 +300,33 @@ class NeutronWrapper(NetworkWrapper):
self.client.delete_pool(pool_id)
def delete_network(self, network):
"""Delete network
if network["router_id"]:
self.client.remove_gateway_router(network["router_id"])
:param network: network object returned by create_network method
"""
for port in self.client.list_ports(network_id=network["id"])["ports"]:
if port["device_owner"] in (
"network:router_interface",
"network:router_interface_distributed",
"network:ha_router_replicated_interface",
"network:router_gateway"):
try:
self.client.remove_interface_router(
port["device_id"], {"port_id": port["id"]})
except (neutron_exceptions.BadRequest,
neutron_exceptions.NotFound):
# Some neutron plugins don't use router as
# the device ID. Also, some plugin doesn't allow
# to update the ha rotuer interface as there is
# an internal logic to update the interface/data model
# instead.
pass
else:
try:
self.client.delete_port(port["id"])
except neutron_exceptions.PortNotFoundClient:
# port is auto-removed
pass
router = {"id": network["router_id"]} if network["router_id"] else None
# delete_network_topology uses only IDs, but let's transmit as much as
# possible info
topo = {
"network": {
"id": network["id"],
"name": network["name"],
"status": network["status"],
"subnets": network["subnets"],
"router:external": network["external"]
},
"subnets": [{"id": s} for s in network["subnets"]],
"routers": [router] if router else []
}
for subnet in self.client.list_subnets(
network_id=network["id"])["subnets"]:
self._delete_subnet(subnet["id"])
responce = self.client.delete_network(network["id"])
if network["router_id"]:
self.client.delete_router(network["router_id"])
return responce
self.neutron.delete_network_topology(topo)
def _delete_subnet(self, subnet_id):
self.client.delete_subnet(subnet_id)
self.neutron.delete_subnet(subnet_id)
def list_networks(self):
return self.client.list_networks()["networks"]
return self.neutron.list_networks()
def create_port(self, network_id, **kwargs):
"""Create neutron port.
@ -361,9 +335,7 @@ class NeutronWrapper(NetworkWrapper):
:param **kwargs: POST /v2.0/ports request options
:returns: neutron port dict
"""
kwargs["network_id"] = network_id
kwargs["name"] = self.owner.generate_random_name()
return self.client.create_port({"port": kwargs})["port"]
return self.neutron.create_port(network_id=network_id, **kwargs)
def create_floating_ip(self, ext_network=None,
tenant_id=None, port_id=None, **kwargs):
@ -377,34 +349,13 @@ class NeutronWrapper(NetworkWrapper):
"""
if not tenant_id:
raise ValueError("Missed tenant_id")
if type(ext_network) is dict:
net_id = ext_network["id"]
elif ext_network:
ext_net = self.get_network(name=ext_network)
if not ext_net["external"]:
raise NetworkWrapperException("Network is not external: %s"
% ext_network)
net_id = ext_net["id"]
else:
ext_networks = self.external_networks
if not ext_networks:
raise NetworkWrapperException(
"Failed to allocate floating IP: "
"no external networks found")
net_id = ext_networks[0]["id"]
kwargs = {"floatingip": {"floating_network_id": net_id,
"tenant_id": tenant_id}}
if not CONF.openstack.pre_newton_neutron:
descr = self.owner.generate_random_name()
kwargs["floatingip"]["description"] = descr
if port_id:
kwargs["floatingip"]["port_id"] = port_id
fip = self.client.create_floatingip(kwargs)["floatingip"]
try:
fip = self.neutron.create_floatingip(
floating_network=ext_network, project_id=tenant_id,
port_id=port_id)
except (exceptions.NotFoundException,
exceptions.GetResourceFailure) as e:
raise NetworkWrapperException(str(e)) from None
return {"id": fip["id"], "ip": fip["floating_ip_address"]}
def delete_floating_ip(self, fip_id, **kwargs):
@ -413,7 +364,7 @@ class NeutronWrapper(NetworkWrapper):
:param fip_id: int floating IP id
:param **kwargs: for compatibility, not used here
"""
self.client.delete_floatingip(fip_id)
self.neutron.delete_floatingip(fip_id)
def supports_extension(self, extension):
"""Check whether a neutron extension is supported
@ -422,11 +373,12 @@ class NeutronWrapper(NetworkWrapper):
:returns: result tuple
:rtype: (bool, string)
"""
extensions = self.client.list_extensions().get("extensions", [])
if any(ext.get("alias") == extension for ext in extensions):
return True, ""
try:
self.neutron.supports_extension(extension)
except exceptions.NotFoundException as e:
return False, str(e)
return False, "Neutron driver does not support %s" % extension
return True, ""
def wrap(clients, owner, config=None):

View File

@ -20,6 +20,7 @@ from rally.task import utils as task_utils
from rally_openstack.common.services.identity import identity
from rally_openstack.common.services.image import glance_v2
from rally_openstack.common.services.image import image
from rally_openstack.common.services.network import neutron
from rally_openstack.task.cleanup import base
@ -201,13 +202,11 @@ _neutron_order = get_order(300)
@base.resource(service=None, resource=None, admin_required=True)
class NeutronMixin(SynchronizedDeletion, base.ResourceManager):
# Neutron has the best client ever, so we need to override everything
def supports_extension(self, extension):
exts = self._manager().list_extensions().get("extensions", [])
if any(ext.get("alias") == extension for ext in exts):
return True
return False
@property
def _neutron(self):
return neutron.NeutronService(
self._admin_required and self.admin or self.user)
def _manager(self):
client = self._admin_required and self.admin or self.user
@ -220,7 +219,10 @@ class NeutronMixin(SynchronizedDeletion, base.ResourceManager):
return self.raw_resource["name"]
def delete(self):
delete_method = getattr(self._manager(), "delete_%s" % self._resource)
key = "delete_%s" % self._resource
delete_method = getattr(
self._neutron, key, getattr(self._manager(), key)
)
delete_method(self.id())
@property
@ -242,7 +244,7 @@ class NeutronMixin(SynchronizedDeletion, base.ResourceManager):
class NeutronLbaasV1Mixin(NeutronMixin):
def list(self):
if self.supports_extension("lbaas"):
if self._neutron.supports_extension("lbaas", silent=True):
return super(NeutronLbaasV1Mixin, self).list()
return []
@ -268,7 +270,7 @@ class NeutronV1Pool(NeutronLbaasV1Mixin):
class NeutronLbaasV2Mixin(NeutronMixin):
def list(self):
if self.supports_extension("lbaasv2"):
if self._neutron.supports_extension("lbaasv2", silent=True):
return super(NeutronLbaasV2Mixin, self).list()
return []
@ -373,7 +375,7 @@ class OctaviaHealthMonitors(OctaviaMixIn):
admin_required=True, perform_for_admin_only=True)
class NeutronBgpvpn(NeutronMixin):
def list(self):
if self.supports_extension("bgpvpn"):
if self._neutron.supports_extension("bgpvpn", silent=True):
return self._manager().list_bgpvpns()["bgpvpns"]
return []
@ -414,20 +416,23 @@ class NeutronPort(NeutronMixin):
# NOTE(andreykurilin): port is the kind of resource that can be created
# automatically. In this case it doesn't have name field which matches
# our resource name templates.
ROUTER_INTERFACE_OWNERS = ("network:router_interface",
"network:router_interface_distributed",
"network:ha_router_replicated_interface")
ROUTER_GATEWAY_OWNER = "network:router_gateway"
def __init__(self, *args, **kwargs):
super(NeutronPort, self).__init__(*args, **kwargs)
self._cache = {}
@property
def ROUTER_INTERFACE_OWNERS(self):
return self._neutron.ROUTER_INTERFACE_OWNERS
@property
def ROUTER_GATEWAY_OWNER(self):
return self._neutron.ROUTER_GATEWAY_OWNER
def _get_resources(self, resource):
if resource not in self._cache:
resources = getattr(self._manager(), "list_%s" % resource)()
self._cache[resource] = [r for r in resources[resource]
resources = getattr(self._neutron, "list_%s" % resource)()
self._cache[resource] = [r for r in resources
if r["tenant_id"] == self.tenant_uuid]
return self._cache[resource]
@ -455,23 +460,11 @@ class NeutronPort(NeutronMixin):
self.raw_resource.get("name", ""))
def delete(self):
device_owner = self.raw_resource["device_owner"]
if (device_owner in self.ROUTER_INTERFACE_OWNERS
or device_owner == self.ROUTER_GATEWAY_OWNER):
if device_owner == self.ROUTER_GATEWAY_OWNER:
self._manager().remove_gateway_router(
self.raw_resource["device_id"])
self._manager().remove_interface_router(
self.raw_resource["device_id"], {"port_id": self.id()})
else:
from neutronclient.common import exceptions as neutron_exceptions
try:
self._manager().delete_port(self.id())
except neutron_exceptions.PortNotFoundClient:
# Port can be already auto-deleted, skip silently
LOG.debug("Port %s was not deleted. Skip silently because "
"port can be already auto-deleted." % self.id())
found = self._neutron.delete_port(self.raw_resource)
if not found:
# Port can be already auto-deleted, skip silently
LOG.debug(f"Port {self.id()} was not deleted. Skip silently "
f"because port can be already auto-deleted.")
@base.resource("neutron", "subnet", order=next(_neutron_order),

View File

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
import random
from rally.common import cfg
@ -22,7 +21,7 @@ from rally import exceptions
from rally.task import atomic
from rally.task import utils
from rally_openstack.common.wrappers import network as network_wrapper
from rally_openstack.common.services.network import neutron
from rally_openstack.task import scenario
@ -32,7 +31,20 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class NeutronScenario(scenario.OpenStackScenario):
class NeutronBaseScenario(scenario.OpenStackScenario):
"""Base class for Neutron scenarios with basic atomic actions."""
def __init__(self, *args, **kwargs):
super(NeutronBaseScenario, self).__init__(*args, **kwargs)
if hasattr(self, "_clients"):
self.neutron = neutron.NeutronService(
clients=self._clients,
name_generator=self.generate_random_name,
atomic_inst=self.atomic_actions()
)
class NeutronScenario(NeutronBaseScenario):
"""Base class for Neutron scenarios with basic atomic actions."""
# TODO(rkiran): modify in case LBaaS-v2 requires
@ -51,12 +63,11 @@ class NeutronScenario(scenario.OpenStackScenario):
:param kwargs: dict, network options
:returns: str, Neutron network-id
"""
networks = self._list_networks()
for net in networks:
if (net["name"] == network) or (net["id"] == network):
return net["id"]
raise exceptions.NotFoundException(
message="Network %s not found." % network)
try:
return self.neutron.find_network(network)["id"]
except exceptions.GetResourceFailure:
raise exceptions.NotFoundException(
message="Network %s not found." % network)
@property
def _ext_gw_mode_enabled(self):
@ -64,39 +75,32 @@ class NeutronScenario(scenario.OpenStackScenario):
Without this extension, we can't pass the enable_snat parameter.
"""
return any(
e["alias"] == "ext-gw-mode"
for e in self.clients("neutron").list_extensions()["extensions"])
return self.neutron.supports_extension("ext-gw-mode", silent=True)
@atomic.action_timer("neutron.create_network")
def _create_network(self, network_create_args):
"""Create neutron network.
:param network_create_args: dict, POST /v2.0/networks request options
:returns: neutron network dict
"""
network_create_args["name"] = self.generate_random_name()
return self.clients("neutron").create_network(
{"network": network_create_args})
network_create_args.pop("name", None)
return {"network": self.neutron.create_network(**network_create_args)}
@atomic.action_timer("neutron.list_networks")
def _list_networks(self, **kwargs):
"""Return user networks list.
:param kwargs: network list options
"""
return self.clients("neutron").list_networks(**kwargs)["networks"]
return self.neutron.list_networks(**kwargs)
@atomic.action_timer("neutron.list_agents")
def _list_agents(self, **kwargs):
"""Fetches agents.
:param kwargs: neutron agent list options
:returns: user agents list
"""
return self.clients("neutron").list_agents(**kwargs)["agents"]
return self.neutron.list_agents(**kwargs)
@atomic.action_timer("neutron.update_network")
def _update_network(self, network, network_update_args):
"""Update the network.
@ -107,11 +111,9 @@ class NeutronScenario(scenario.OpenStackScenario):
:returns: updated neutron network dict
"""
network_update_args["name"] = self.generate_random_name()
body = {"network": network_update_args}
return self.clients("neutron").update_network(
network["network"]["id"], body)
return {"network": self.neutron.update_network(
network["network"]["id"], **network_update_args)}
@atomic.action_timer("neutron.show_network")
def _show_network(self, network, **kwargs):
"""show network details.
@ -119,18 +121,16 @@ class NeutronScenario(scenario.OpenStackScenario):
:param kwargs: dict, POST /v2.0/networks show options
:returns: details of the network
"""
return self.clients("neutron").show_network(
network["network"]["id"], **kwargs)
network = self.neutron.get_network(network["network"]["id"], **kwargs)
return {"network": network}
@atomic.action_timer("neutron.delete_network")
def _delete_network(self, network):
"""Delete neutron network.
:param network: Network object
"""
self.clients("neutron").delete_network(network["id"])
self.neutron.delete_network(network["id"])
@atomic.action_timer("neutron.create_subnet")
def _create_subnet(self, network, subnet_create_args, start_cidr=None):
"""Create neutron subnet.
@ -138,27 +138,17 @@ class NeutronScenario(scenario.OpenStackScenario):
:param subnet_create_args: POST /v2.0/subnets request options
:returns: neutron subnet dict
"""
network_id = network["network"]["id"]
if not subnet_create_args.get("cidr"):
start_cidr = start_cidr or "10.2.0.0/24"
subnet_create_args["cidr"] = (
network_wrapper.generate_cidr(start_cidr=start_cidr))
subnet_create_args.pop("name", None)
subnet_create_args["network_id"] = network["network"]["id"]
subnet_create_args["start_cidr"] = start_cidr
subnet_create_args["network_id"] = network_id
subnet_create_args["name"] = self.generate_random_name()
subnet_create_args["ip_version"] = netaddr.IPNetwork(
subnet_create_args["cidr"]).version
return {"subnet": self.neutron.create_subnet(**subnet_create_args)}
return self.clients("neutron").create_subnet(
{"subnet": subnet_create_args})
@atomic.action_timer("neutron.list_subnets")
def _list_subnets(self):
"""Returns user subnetworks list."""
return self.clients("neutron").list_subnets()["subnets"]
return self.neutron.list_subnets()
@atomic.action_timer("neutron.show_subnet")
def _show_subnet(self, subnet, **kwargs):
"""show subnet details.
@ -166,10 +156,8 @@ class NeutronScenario(scenario.OpenStackScenario):
:param kwargs: Optional additional arguments for subnet show
:returns: details of the subnet
"""
return self.clients("neutron").show_subnet(subnet["subnet"]["id"],
**kwargs)
return {"subnet": self.neutron.get_subnet(subnet["subnet"]["id"])}
@atomic.action_timer("neutron.update_subnet")
def _update_subnet(self, subnet, subnet_update_args):
"""Update the neutron subnet.
@ -180,46 +168,35 @@ class NeutronScenario(scenario.OpenStackScenario):
:returns: updated neutron subnet dict
"""
subnet_update_args["name"] = self.generate_random_name()
body = {"subnet": subnet_update_args}
return self.clients("neutron").update_subnet(
subnet["subnet"]["id"], body)
return {"subnet": self.neutron.update_subnet(
subnet["subnet"]["id"], **subnet_update_args)}
@atomic.action_timer("neutron.delete_subnet")
def _delete_subnet(self, subnet):
"""Delete neutron subnet
:param subnet: Subnet object
"""
self.clients("neutron").delete_subnet(subnet["subnet"]["id"])
self.neutron.delete_subnet(subnet["subnet"]["id"])
@atomic.action_timer("neutron.create_router")
def _create_router(self, router_create_args, external_gw=False):
"""Create neutron router.
:param router_create_args: POST /v2.0/routers request options
:returns: neutron router dict
"""
router_create_args["name"] = self.generate_random_name()
router_create_args.pop("name", None)
if ("tenant_id" in router_create_args
and "project_id" not in router_create_args):
router_create_args["project_id"] = router_create_args.pop(
"tenant_id")
if external_gw:
for network in self._list_networks():
if network.get("router:external"):
external_network = network
gw_info = {"network_id": external_network["id"]}
if self._ext_gw_mode_enabled:
gw_info["enable_snat"] = True
router_create_args.setdefault("external_gateway_info",
gw_info)
return {"router": self.neutron.create_router(
discover_external_gw=external_gw, **router_create_args)}
return self.clients("neutron").create_router(
{"router": router_create_args})
@atomic.action_timer("neutron.list_routers")
def _list_routers(self):
"""Returns user routers list."""
return self.clients("neutron").list_routers()["routers"]
return self.neutron.list_routers()
@atomic.action_timer("neutron.show_router")
def _show_router(self, router, **kwargs):
"""Show information of a given router.
@ -227,18 +204,16 @@ class NeutronScenario(scenario.OpenStackScenario):
:kwargs: dict, POST /v2.0/routers show options
:return: details of the router
"""
return self.clients("neutron").show_router(
router["router"]["id"], **kwargs)
return {"router": self.neutron.get_router(
router["router"]["id"], **kwargs)}
@atomic.action_timer("neutron.delete_router")
def _delete_router(self, router):
"""Delete neutron router
:param router: Router object
"""
self.clients("neutron").delete_router(router["router"]["id"])
self.neutron.delete_router(router["router"]["id"])
@atomic.action_timer("neutron.update_router")
def _update_router(self, router, router_update_args):
"""Update the neutron router.
@ -249,11 +224,9 @@ class NeutronScenario(scenario.OpenStackScenario):
:returns: updated neutron router dict
"""
router_update_args["name"] = self.generate_random_name()
body = {"router": router_update_args}
return self.clients("neutron").update_router(
router["router"]["id"], body)
return {"router": self.neutron.update_router(
router["router"]["id"], **router_update_args)}
@atomic.action_timer("neutron.create_port")
def _create_port(self, network, port_create_args):
"""Create neutron port.
@ -261,16 +234,13 @@ class NeutronScenario(scenario.OpenStackScenario):
:param port_create_args: POST /v2.0/ports request options
:returns: neutron port dict
"""
port_create_args["network_id"] = network["network"]["id"]
port_create_args["name"] = self.generate_random_name()
return self.clients("neutron").create_port({"port": port_create_args})
return {"port": self.neutron.create_port(
network_id=network["network"]["id"], **port_create_args)}
@atomic.action_timer("neutron.list_ports")
def _list_ports(self):
"""Return user ports list."""
return self.clients("neutron").list_ports()["ports"]
return self.neutron.list_ports()
@atomic.action_timer("neutron.show_port")
def _show_port(self, port, **params):
"""Return user port details.
@ -278,9 +248,8 @@ class NeutronScenario(scenario.OpenStackScenario):
:param params: neutron port show options
:returns: neutron port dict
"""
return self.clients("neutron").show_port(port["port"]["id"], **params)
return {"port": self.neutron.get_port(port["port"]["id"], **params)}
@atomic.action_timer("neutron.update_port")
def _update_port(self, port, port_update_args):
"""Update the neutron port.
@ -291,16 +260,15 @@ class NeutronScenario(scenario.OpenStackScenario):
:returns: updated neutron port dict
"""
port_update_args["name"] = self.generate_random_name()
body = {"port": port_update_args}
return self.clients("neutron").update_port(port["port"]["id"], body)
return {"port": self.neutron.update_port(port["port"]["id"],
**port_update_args)}
@atomic.action_timer("neutron.delete_port")
def _delete_port(self, port):
"""Delete neutron port.
:param port: Port object
"""
self.clients("neutron").delete_port(port["port"]["id"])
self.neutron.delete_port(port["port"]["id"])
@logging.log_deprecated_args(
"network_create_args is deprecated; use the network context instead",
@ -356,10 +324,16 @@ class NeutronScenario(scenario.OpenStackScenario):
:parm subnet_cidr_start: str, start value for subnets CIDR
:returns: tuple of result network and subnets list
"""
network = self._create_network(network_create_args or {})
subnets = self._create_subnets(network, subnet_create_args,
subnet_cidr_start, subnets_per_network)
return network, subnets
subnet_create_args = dict(subnet_create_args or {})
subnet_create_args["start_cidr"] = subnet_cidr_start
net_topo = self.neutron.create_network_topology(
network_create_args=(network_create_args or {}),
subnet_create_args=subnet_create_args,
subnets_count=subnets_per_network
)
subnets = [{"subnet": s} for s in net_topo["subnets"]]
return {"network": net_topo["network"]}, subnets
def _create_network_structure(self, network_create_args=None,
subnet_create_args=None,
@ -375,41 +349,39 @@ class NeutronScenario(scenario.OpenStackScenario):
:param router_create_args: dict, POST /v2.0/routers request options
:returns: tuple of (network, subnets, routers)
"""
network = self._create_network(network_create_args or {})
subnets = self._create_subnets(network, subnet_create_args,
subnet_cidr_start,
subnets_per_network)
routers = []
for subnet in subnets:
router = self._create_router(router_create_args or {})
self._add_interface_router(subnet["subnet"],
router["router"])
routers.append(router)
subnet_create_args = dict(subnet_create_args or {})
subnet_create_args["start_cidr"] = subnet_cidr_start
return (network, subnets, routers)
net_topo = self.neutron.create_network_topology(
network_create_args=(network_create_args or {}),
router_create_args=(router_create_args or {}),
router_per_subnet=True,
subnet_create_args=subnet_create_args,
subnets_count=subnets_per_network
)
return ({"network": net_topo["network"]},
[{"subnet": s} for s in net_topo["subnets"]],
[{"router": r} for r in net_topo["routers"]])
@atomic.action_timer("neutron.add_interface_router")
def _add_interface_router(self, subnet, router):
"""Connect subnet to router.
:param subnet: dict, neutron subnet
:param router: dict, neutron router
"""
self.clients("neutron").add_interface_router(
router["id"], {"subnet_id": subnet["id"]})
self.neutron.add_interface_to_router(router_id=router["id"],
subnet_id=subnet["id"])
@atomic.action_timer("neutron.remove_interface_router")
def _remove_interface_router(self, subnet, router):
"""Remove subnet from router
:param subnet: dict, neutron subnet
:param router: dict, neutron router
"""
self.clients("neutron").remove_interface_router(
router["id"], {"subnet_id": subnet["id"]})
self.neutron.remove_interface_from_router(
router_id=router["id"], subnet_id=subnet["id"])
@atomic.action_timer("neutron.add_gateway_router")
def _add_gateway_router(self, router, ext_net, enable_snat=None):
"""Set the external network gateway for a router.
@ -417,21 +389,18 @@ class NeutronScenario(scenario.OpenStackScenario):
:param ext_net: external network for the gateway
:param enable_snat: True if enable snat, None to avoid update
"""
gw_info = {"network_id": ext_net["network"]["id"]}
if enable_snat is not None:
if self._ext_gw_mode_enabled:
gw_info["enable_snat"] = enable_snat
self.clients("neutron").add_gateway_router(
router["router"]["id"], gw_info)
self.neutron.add_gateway_to_router(
router_id=router["router"]["id"],
network_id=ext_net["network"]["id"],
enable_snat=enable_snat
)
@atomic.action_timer("neutron.remove_gateway_router")
def _remove_gateway_router(self, router):
"""Removes an external network gateway from the specified router.
:param router: dict, neutron router
"""
self.clients("neutron").remove_gateway_router(
router["router"]["id"])
self.neutron.remove_gateway_from_router(router["router"]["id"])
@atomic.action_timer("neutron.create_pool")
def _create_lb_pool(self, subnet_id, **pool_create_args):
@ -533,7 +502,6 @@ class NeutronScenario(scenario.OpenStackScenario):
body = {"vip": vip_update_args}
return self.clients("neutron").update_vip(vip["vip"]["id"], body)
@atomic.action_timer("neutron.create_floating_ip")
def _create_floatingip(self, floating_network, **floating_ip_args):
"""Create floating IP with floating_network.
@ -541,40 +509,21 @@ class NeutronScenario(scenario.OpenStackScenario):
:param floating_ip_args: dict, POST /floatingips create options
:returns: dict, neutron floating IP
"""
from neutronclient.common import exceptions as ne
floating_network_id = self._get_network_id(
floating_network)
args = {"floating_network_id": floating_network_id}
if not CONF.openstack.pre_newton_neutron:
args["description"] = self.generate_random_name()
args.update(floating_ip_args)
try:
return self.clients("neutron").create_floatingip(
{"floatingip": args})
except ne.BadRequest as e:
error = "%s" % e
if "Unrecognized attribute" in error and "'description'" in error:
LOG.info("It looks like you have Neutron API of pre-Newton "
"OpenStack release. Setting "
"openstack.pre_newton_neutron option via Rally "
"configuration should fix an issue.")
raise
return {"floatingip": self.neutron.create_floatingip(
floating_network=floating_network, **floating_ip_args)}
@atomic.action_timer("neutron.list_floating_ips")
def _list_floating_ips(self, **kwargs):
"""Return floating IPs list."""
return self.clients("neutron").list_floatingips(**kwargs)
return {"floatingips": self.neutron.list_floatingips(**kwargs)}
@atomic.action_timer("neutron.delete_floating_ip")
def _delete_floating_ip(self, floating_ip):
"""Delete floating IP.
:param dict, floating IP object
"""
return self.clients("neutron").delete_floatingip(floating_ip["id"])
return self.neutron.delete_floatingip(floating_ip["id"])
@atomic.action_timer("neutron.associate_floating_ip")
def _associate_floating_ip(self, floatingip, port):
"""Associate floating IP with port.
@ -582,20 +531,18 @@ class NeutronScenario(scenario.OpenStackScenario):
:param port: port dict
:returns: updated floating IP dict
"""
return self.clients("neutron").update_floatingip(
floatingip["id"],
{"floatingip": {"port_id": port["id"]}})["floatingip"]
return self.neutron.associate_floatingip(
port_id=port["id"],
floatingip_id=floatingip["id"])
@atomic.action_timer("neutron.dissociate_floating_ip")
def _dissociate_floating_ip(self, floatingip):
"""Dissociate floating IP from ports.
:param floatingip: floating IP dict
:returns: updated floating IP dict
"""
return self.clients("neutron").update_floatingip(
floatingip["id"],
{"floatingip": {"port_id": None}})["floatingip"]
return self.neutron.dissociate_floatingip(
floatingip_id=floatingip["id"])
@atomic.action_timer("neutron.create_healthmonitor")
def _create_v1_healthmonitor(self, **healthmonitor_create_args):
@ -648,7 +595,6 @@ class NeutronScenario(scenario.OpenStackScenario):
return self.clients("neutron").update_health_monitor(
healthmonitor["health_monitor"]["id"], body)
@atomic.action_timer("neutron.create_security_group")
def _create_security_group(self, **security_group_create_args):
"""Create Neutron security-group.
@ -657,24 +603,21 @@ class NeutronScenario(scenario.OpenStackScenario):
:returns: dict, neutron security-group
"""
security_group_create_args["name"] = self.generate_random_name()
return self.clients("neutron").create_security_group(
{"security_group": security_group_create_args})
return {"security_group": self.neutron.create_security_group(
**security_group_create_args)}
@atomic.action_timer("neutron.delete_security_group")
def _delete_security_group(self, security_group):
"""Delete Neutron security group.
:param security_group: dict, neutron security_group
"""
return self.clients("neutron").delete_security_group(
return self.neutron.delete_security_group(
security_group["security_group"]["id"])
@atomic.action_timer("neutron.list_security_groups")
def _list_security_groups(self, **kwargs):
"""Return list of Neutron security groups."""
return self.clients("neutron").list_security_groups(**kwargs)
return {"security_groups": self.neutron.list_security_groups(**kwargs)}
@atomic.action_timer("neutron.show_security_group")
def _show_security_group(self, security_group, **kwargs):
"""Show security group details.
@ -682,10 +625,9 @@ class NeutronScenario(scenario.OpenStackScenario):
:param kwargs: Optional additional arguments for security_group show
:returns: security_group details
"""
return self.clients("neutron").show_security_group(
security_group["security_group"]["id"], **kwargs)
return {"security_group": self.neutron.get_security_group(
security_group["security_group"]["id"], **kwargs)}
@atomic.action_timer("neutron.update_security_group")
def _update_security_group(self, security_group,
**security_group_update_args):
"""Update Neutron security-group.
@ -696,9 +638,9 @@ class NeutronScenario(scenario.OpenStackScenario):
:returns: dict, updated neutron security-group
"""
security_group_update_args["name"] = self.generate_random_name()
body = {"security_group": security_group_update_args}
return self.clients("neutron").update_security_group(
security_group["security_group"]["id"], body)
return {"security_group": self.neutron.update_security_group(
security_group["security_group"]["id"],
**security_group_update_args)}
def update_loadbalancer_resource(self, lb):
try:
@ -857,7 +799,6 @@ class NeutronScenario(scenario.OpenStackScenario):
return self.clients("neutron").list_bgpvpn_router_assocs(
bgpvpn["bgpvpn"]["id"], **kwargs)
@atomic.action_timer("neutron.create_security_group_rule")
def _create_security_group_rule(self, security_group_id,
**security_group_rule_args):
"""Create Neutron security-group-rule.
@ -867,25 +808,19 @@ class NeutronScenario(scenario.OpenStackScenario):
/v2.0/security-group-rules request options
:returns: dict, neutron security-group-rule
"""
security_group_rule_args["security_group_id"] = security_group_id
if "direction" not in security_group_rule_args:
security_group_rule_args["direction"] = "ingress"
if "protocol" not in security_group_rule_args:
security_group_rule_args["protocol"] = "tcp"
return {"security_group_rule": self.neutron.create_security_group_rule(
security_group_id, **security_group_rule_args
)}
return self.clients("neutron").create_security_group_rule(
{"security_group_rule": security_group_rule_args})
@atomic.action_timer("neutron.list_security_group_rules")
def _list_security_group_rules(self, **kwargs):
"""List all security group rules.
:param kwargs: Optional additional arguments for roles list
:return: list of security group rules
"""
return self.clients("neutron").list_security_group_rules(**kwargs)
result = self.neutron.list_security_group_rules(**kwargs)
return {"security_group_rules": result}
@atomic.action_timer("neutron.show_security_group_rule")
def _show_security_group_rule(self, security_group_rule, **kwargs):
"""Show information of a given security group rule.
@ -893,17 +828,15 @@ class NeutronScenario(scenario.OpenStackScenario):
:param kwargs: Optional additional arguments for roles list
:return: details of security group rule
"""
return self.clients("neutron").show_security_group_rule(
security_group_rule, **kwargs)
return {"security_group_rule": self.neutron.get_security_group_rule(
security_group_rule, **kwargs)}
@atomic.action_timer("neutron.delete_security_group_rule")
def _delete_security_group_rule(self, security_group_rule):
"""Delete a given security group rule.
:param security_group_rule: id of security group rule
"""
self.clients("neutron").delete_security_group_rule(
security_group_rule)
self.neutron.delete_security_group_rule(security_group_rule)
@atomic.action_timer("neutron.delete_trunk")
def _delete_trunk(self, trunk_port):
@ -918,10 +851,6 @@ class NeutronScenario(scenario.OpenStackScenario):
def _list_trunks(self, **kwargs):
return self.clients("neutron").list_trunks(**kwargs)["trunks"]
@atomic.action_timer("neutron.list_ports_by_device_id")
def _list_ports_by_device_id(self, device_id):
return self.clients("neutron").list_ports(device_id=device_id)
@atomic.action_timer("neutron.list_subports_by_trunk")
def _list_subports_by_trunk(self, trunk_id):
return self.clients("neutron").trunk_get_subports(trunk_id)
@ -930,3 +859,6 @@ class NeutronScenario(scenario.OpenStackScenario):
def _add_subports_to_trunk(self, trunk_id, subports):
return self.clients("neutron").trunk_add_subports(
trunk_id, {"sub_ports": subports})
def _list_ports_by_device_id(self, device_id):
return self.neutron.list_ports(device_id=device_id)

View File

@ -23,13 +23,15 @@ from rally.task import utils
from rally_openstack.common.services.image import image as image_service
from rally_openstack.task import scenario
from rally_openstack.task.scenarios.cinder import utils as cinder_utils
from rally_openstack.task.scenarios.neutron import utils as neutron_utils
CONF = cfg.CONF
LOG = logging.getLogger(__file__)
class NovaScenario(scenario.OpenStackScenario):
class NovaScenario(neutron_utils.NeutronBaseScenario,
scenario.OpenStackScenario):
"""Base class for Nova scenarios with basic atomic actions."""
@atomic.action_timer("nova.list_servers")
@ -633,37 +635,18 @@ class NovaScenario(scenario.OpenStackScenario):
:param fixed_address: The fixedIP address the FloatingIP is to be
associated with (optional)
"""
with atomic.ActionTimer(self, "neutron.list_ports"):
ports = self.clients("neutron").list_ports(device_id=server.id)
port = ports["ports"][0]
if isinstance(address, dict):
floating_ip = self.neutron.associate_floatingip(
device_id=server.id, fixed_ip_address=fixed_address,
floatingip_id=address["id"])
else:
floating_ip = self.neutron.associate_floatingip(
device_id=server.id, fixed_ip_address=fixed_address,
floating_ip_address=address)
fip = address
if not isinstance(address, dict):
LOG.warning(
"The argument 'address' of "
"NovaScenario._associate_floating_ip method accepts a "
"dict-like representation of floating ip. Transmitting a "
"string with just an IP is deprecated.")
with atomic.ActionTimer(self, "neutron.list_floating_ips"):
all_fips = self.clients("neutron").list_floatingips(
tenant_id=self.context["tenant"]["id"])
filtered_fip = [f for f in all_fips["floatingips"]
if f["floating_ip_address"] == address]
if not filtered_fip:
raise exceptions.NotFoundException(
"There is no floating ip with '%s' address." % address)
fip = filtered_fip[0]
# the first case: fip object is returned from network wrapper
# the second case: from neutronclient directly
fip_ip = fip.get("ip", fip.get("floating_ip_address", None))
fip_update_dict = {"port_id": port["id"]}
if fixed_address:
fip_update_dict["fixed_ip_address"] = fixed_address
self.clients("neutron").update_floatingip(
fip["id"], {"floatingip": fip_update_dict}
)
utils.wait_for(server,
is_ready=self.check_ip_address(fip_ip),
is_ready=self.check_ip_address(
floating_ip["floating_ip_address"]),
update_resource=utils.get_from_manager())
# Update server data
server.addresses = server.manager.get(server.id).addresses
@ -675,32 +658,19 @@ class NovaScenario(scenario.OpenStackScenario):
:param server: The :class:`Server` to add an IP to.
:param address: The dict-like representation of FloatingIP to remove
"""
fip = address
if not isinstance(fip, dict):
LOG.warning(
"The argument 'address' of "
"NovaScenario._dissociate_floating_ip method accepts a "
"dict-like representation of floating ip. Transmitting a "
"string with just an IP is deprecated.")
with atomic.ActionTimer(self, "neutron.list_floating_ips"):
all_fips = self.clients("neutron").list_floatingips(
tenant_id=self.context["tenant"]["id"]
)
filtered_fip = [f for f in all_fips["floatingips"]
if f["floating_ip_address"] == address]
if not filtered_fip:
raise exceptions.NotFoundException(
"There is no floating ip with '%s' address." % address)
fip = filtered_fip[0]
self.clients("neutron").update_floatingip(
fip["id"], {"floatingip": {"port_id": None}}
)
# the first case: fip object is returned from network wrapper
# the second case: from neutronclient directly
fip_ip = fip.get("ip", fip.get("floating_ip_address", None))
if isinstance(address, dict):
floating_ip = self.neutron.dissociate_floatingip(
floatingip_id=address["id"]
)
else:
floating_ip = self.neutron.dissociate_floatingip(
floating_ip_address=address
)
utils.wait_for(
server,
is_ready=self.check_ip_address(fip_ip, must_exist=False),
is_ready=self.check_ip_address(
floating_ip["floating_ip_address"], must_exist=False),
update_resource=utils.get_from_manager()
)
# Update server data

View File

@ -0,0 +1,42 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from rally_openstack.common.services.network import net_utils
from tests.unit import test
PATH = "rally_openstack.common.services.network.net_utils"
class FunctionsTestCase(test.TestCase):
def test_generate_cidr(self):
with mock.patch("%s._IPv4_CIDR_INCR" % PATH, iter(range(1, 4))):
self.assertEqual((4, "10.2.1.0/24"), net_utils.generate_cidr())
self.assertEqual((4, "10.2.2.0/24"), net_utils.generate_cidr())
self.assertEqual((4, "10.2.3.0/24"), net_utils.generate_cidr())
with mock.patch("%s._IPv4_CIDR_INCR" % PATH, iter(range(1, 4))):
start_cidr = "1.1.0.0/26"
self.assertEqual(
(4, "1.1.0.64/26"),
net_utils.generate_cidr(start_cidr=start_cidr))
self.assertEqual(
(4, "1.1.0.128/26"),
net_utils.generate_cidr(start_cidr=start_cidr))
self.assertEqual(
(4, "1.1.0.192/26"),
net_utils.generate_cidr(start_cidr=start_cidr))

File diff suppressed because it is too large Load Diff

View File

@ -35,84 +35,87 @@ class Owner(utils.RandomNameGeneratorMixin):
@ddt.ddt
class NeutronWrapperTestCase(test.TestCase):
def setUp(self):
super(NeutronWrapperTestCase, self).setUp()
self.owner = Owner()
self.owner.generate_random_name = mock.Mock()
super(NeutronWrapperTestCase, self).setUp()
def get_wrapper(self, *skip_cidrs, **kwargs):
return network.NeutronWrapper(mock.Mock(), self.owner, config=kwargs)
self.wrapper = network.NeutronWrapper(mock.MagicMock(),
self.owner,
config={})
self._nc = self.wrapper.neutron.client
def test_SUBNET_IP_VERSION(self):
self.assertEqual(4, network.NeutronWrapper.SUBNET_IP_VERSION)
@mock.patch("rally_openstack.common.wrappers.network.generate_cidr")
@mock.patch(
"rally_openstack.common.services.network.net_utils.generate_cidr")
def test__generate_cidr(self, mock_generate_cidr):
cidrs = iter(range(5))
mock_generate_cidr.side_effect = (
lambda start_cidr: start_cidr + next(cidrs)
)
service = self.get_wrapper(start_cidr=3)
self.assertEqual(3, service._generate_cidr())
self.assertEqual(4, service._generate_cidr())
self.assertEqual(5, service._generate_cidr())
self.assertEqual(6, service._generate_cidr())
self.assertEqual(7, service._generate_cidr())
self.assertEqual([mock.call(start_cidr=3)] * 5,
mock_generate_cidr.mock_calls)
def fake_gen_cidr(ip_version=None, start_cidr=None):
return 4, 3 + next(cidrs)
mock_generate_cidr.side_effect = fake_gen_cidr
self.assertEqual(3, self.wrapper._generate_cidr())
self.assertEqual(4, self.wrapper._generate_cidr())
self.assertEqual(5, self.wrapper._generate_cidr())
self.assertEqual(6, self.wrapper._generate_cidr())
self.assertEqual(7, self.wrapper._generate_cidr())
self.assertEqual([mock.call(start_cidr=self.wrapper.start_cidr)] * 5,
mock_generate_cidr.call_args_list)
def test_external_networks(self):
wrap = self.get_wrapper()
wrap.client.list_networks.return_value = {"networks": "foo_networks"}
self.assertEqual("foo_networks", wrap.external_networks)
wrap.client.list_networks.assert_called_once_with(
self._nc.list_networks.return_value = {"networks": "foo_networks"}
self.assertEqual("foo_networks", self.wrapper.external_networks)
self._nc.list_networks.assert_called_once_with(
**{"router:external": True})
def test_get_network(self):
wrap = self.get_wrapper()
neutron_net = {"id": "foo_id",
"name": self.owner.generate_random_name.return_value,
"name": "foo_name",
"tenant_id": "foo_tenant",
"status": "foo_status",
"router:external": "foo_external",
"subnets": "foo_subnets"}
expected_net = {"id": "foo_id",
"name": self.owner.generate_random_name.return_value,
"name": "foo_name",
"tenant_id": "foo_tenant",
"status": "foo_status",
"external": "foo_external",
"router_id": None,
"subnets": "foo_subnets"}
wrap.client.show_network.return_value = {"network": neutron_net}
net = wrap.get_network(net_id="foo_id")
self._nc.show_network.return_value = {"network": neutron_net}
net = self.wrapper.get_network(net_id="foo_id")
self.assertEqual(expected_net, net)
wrap.client.show_network.assert_called_once_with("foo_id")
self._nc.show_network.assert_called_once_with("foo_id")
wrap.client.show_network.side_effect = (
self._nc.show_network.side_effect = (
neutron_exceptions.NeutronClientException)
self.assertRaises(network.NetworkWrapperException, wrap.get_network,
self.assertRaises(network.NetworkWrapperException,
self.wrapper.get_network,
net_id="foo_id")
wrap.client.list_networks.return_value = {"networks": [neutron_net]}
net = wrap.get_network(name="foo_name")
self._nc.list_networks.return_value = {"networks": [neutron_net]}
net = self.wrapper.get_network(name="foo_name")
self.assertEqual(expected_net, net)
wrap.client.list_networks.assert_called_once_with(name="foo_name")
self._nc.list_networks.assert_called_once_with(name="foo_name")
wrap.client.list_networks.return_value = {"networks": []}
self.assertRaises(network.NetworkWrapperException, wrap.get_network,
self._nc.list_networks.return_value = {"networks": []}
self.assertRaises(network.NetworkWrapperException,
self.wrapper.get_network,
name="foo_name")
def test_create_v1_pool(self):
subnet = "subnet_id"
tenant = "foo_tenant"
service = self.get_wrapper()
expected_pool = {"pool": {
"id": "pool_id",
"name": self.owner.generate_random_name.return_value,
"subnet_id": subnet,
"tenant_id": tenant}}
service.client.create_pool.return_value = expected_pool
resultant_pool = service.create_v1_pool(tenant, subnet)
service.client.create_pool.assert_called_once_with({
self.wrapper.client.create_pool.return_value = expected_pool
resultant_pool = self.wrapper.create_v1_pool(tenant, subnet)
self.wrapper.client.create_pool.assert_called_once_with({
"pool": {"lb_method": "ROUND_ROBIN",
"subnet_id": subnet,
"tenant_id": tenant,
@ -121,13 +124,12 @@ class NeutronWrapperTestCase(test.TestCase):
self.assertEqual(expected_pool, resultant_pool)
def test_create_network(self):
service = self.get_wrapper()
service.client.create_network.return_value = {
self._nc.create_network.return_value = {
"network": {"id": "foo_id",
"name": self.owner.generate_random_name.return_value,
"status": "foo_status"}}
net = service.create_network("foo_tenant")
service.client.create_network.assert_called_once_with({
net = self.wrapper.create_network("foo_tenant")
self._nc.create_network.assert_called_once_with({
"network": {"tenant_id": "foo_tenant",
"name": self.owner.generate_random_name.return_value}})
self.assertEqual({"id": "foo_id",
@ -140,22 +142,18 @@ class NeutronWrapperTestCase(test.TestCase):
def test_create_network_with_subnets(self):
subnets_num = 4
service = self.get_wrapper()
subnets_cidrs = iter(range(subnets_num))
subnets_ids = iter(range(subnets_num))
service._generate_cidr = mock.Mock(
side_effect=lambda v: "cidr-%d" % next(subnets_cidrs))
service.client.create_subnet = mock.Mock(
side_effect=lambda i: {
"subnet": {"id": "subnet-%d" % next(subnets_ids)}})
service.client.create_network.return_value = {
self._nc.create_subnet.side_effect = lambda i: {
"subnet": {"id": "subnet-%d" % next(subnets_ids)}}
self._nc.create_network.return_value = {
"network": {"id": "foo_id",
"name": self.owner.generate_random_name.return_value,
"status": "foo_status"}}
net = service.create_network("foo_tenant", subnets_num=subnets_num)
net = self.wrapper.create_network("foo_tenant",
subnets_num=subnets_num)
service.client.create_network.assert_called_once_with({
self._nc.create_network.assert_called_once_with({
"network": {"tenant_id": "foo_tenant",
"name": self.owner.generate_random_name.return_value}})
self.assertEqual({"id": "foo_id",
@ -167,25 +165,24 @@ class NeutronWrapperTestCase(test.TestCase):
"subnets": ["subnet-%d" % i
for i in range(subnets_num)]}, net)
self.assertEqual(
service.client.create_subnet.mock_calls,
[mock.call({"subnet":
{"name": self.owner.generate_random_name.return_value,
"enable_dhcp": True,
"network_id": "foo_id",
"tenant_id": "foo_tenant",
"ip_version": service.SUBNET_IP_VERSION,
"ip_version": self.wrapper.SUBNET_IP_VERSION,
"dns_nameservers": ["8.8.8.8", "8.8.4.4"],
"cidr": "cidr-%d" % i}})
for i in range(subnets_num)])
"cidr": mock.ANY}})
for i in range(subnets_num)],
self.wrapper.client.create_subnet.call_args_list
)
def test_create_network_with_router(self):
service = self.get_wrapper()
service.create_router = mock.Mock(return_value={"id": "foo_router"})
service.client.create_network.return_value = {
self._nc.create_router.return_value = {"router": {"id": "foo_router"}}
self._nc.create_network.return_value = {
"network": {"id": "foo_id",
"name": self.owner.generate_random_name.return_value,
"status": "foo_status"}}
net = service.create_network("foo_tenant", add_router=True)
net = self.wrapper.create_network("foo_tenant", add_router=True)
self.assertEqual({"id": "foo_id",
"name": self.owner.generate_random_name.return_value,
"status": "foo_status",
@ -193,23 +190,25 @@ class NeutronWrapperTestCase(test.TestCase):
"tenant_id": "foo_tenant",
"router_id": "foo_router",
"subnets": []}, net)
service.create_router.assert_called_once_with(external=True,
tenant_id="foo_tenant")
self._nc.create_router.assert_called_once_with({
"router": {
"name": self.owner.generate_random_name(),
"tenant_id": "foo_tenant"
}
})
def test_create_network_with_router_and_subnets(self):
subnets_num = 4
service = self.get_wrapper()
service._generate_cidr = mock.Mock(return_value="foo_cidr")
service.create_router = mock.Mock(return_value={"id": "foo_router"})
service.client.create_subnet = mock.Mock(
return_value={"subnet": {"id": "foo_subnet"}})
service.client.create_network.return_value = {
self.wrapper._generate_cidr = mock.Mock(return_value="foo_cidr")
self._nc.create_router.return_value = {"router": {"id": "foo_router"}}
self._nc.create_subnet.return_value = {"subnet": {"id": "foo_subnet"}}
self._nc.create_network.return_value = {
"network": {"id": "foo_id",
"name": self.owner.generate_random_name.return_value,
"status": "foo_status"}}
net = service.create_network("foo_tenant", add_router=True,
subnets_num=subnets_num,
dns_nameservers=["foo_nameservers"])
net = self.wrapper.create_network(
"foo_tenant", add_router=True, subnets_num=subnets_num,
dns_nameservers=["foo_nameservers"])
self.assertEqual({"id": "foo_id",
"name": self.owner.generate_random_name.return_value,
"status": "foo_status",
@ -217,76 +216,70 @@ class NeutronWrapperTestCase(test.TestCase):
"tenant_id": "foo_tenant",
"router_id": "foo_router",
"subnets": ["foo_subnet"] * subnets_num}, net)
service.create_router.assert_called_once_with(external=True,
tenant_id="foo_tenant")
self._nc.create_router.assert_called_once_with(
{"router": {"name": self.owner.generate_random_name.return_value,
"tenant_id": "foo_tenant"}})
self.assertEqual(
service.client.create_subnet.mock_calls,
[mock.call({"subnet":
{"name": self.owner.generate_random_name.return_value,
"enable_dhcp": True,
"network_id": "foo_id",
"tenant_id": "foo_tenant",
"ip_version": service.SUBNET_IP_VERSION,
"dns_nameservers": ["foo_nameservers"],
"cidr": "foo_cidr"}})] * subnets_num)
self.assertEqual(service.client.add_interface_router.mock_calls,
[
mock.call(
{"subnet": {
"name": self.owner.generate_random_name.return_value,
"network_id": "foo_id",
"tenant_id": "foo_tenant",
"ip_version": self.wrapper.SUBNET_IP_VERSION,
"dns_nameservers": ["foo_nameservers"],
"cidr": mock.ANY
}}
)
] * subnets_num,
self._nc.create_subnet.call_args_list,
)
self.assertEqual(self._nc.add_interface_router.call_args_list,
[mock.call("foo_router", {"subnet_id": "foo_subnet"})
for i in range(subnets_num)])
@mock.patch("rally_openstack.common.wrappers.network.NeutronWrapper"
".supports_extension", return_value=(False, ""))
def test_delete_network(self, mock_neutron_wrapper_supports_extension):
service = self.get_wrapper()
service.client.list_ports.return_value = {"ports": []}
service.client.list_subnets.return_value = {"subnets": []}
service.client.delete_network.return_value = "foo_deleted"
result = service.delete_network({"id": "foo_id", "router_id": None,
"subnets": []})
self.assertEqual("foo_deleted", result)
self.assertEqual([], service.client.remove_gateway_router.mock_calls)
self.assertEqual(
[], service.client.remove_interface_router.mock_calls)
self.assertEqual([], service.client.delete_router.mock_calls)
self.assertEqual([], service.client.delete_subnet.mock_calls)
service.client.delete_network.assert_called_once_with("foo_id")
def test_delete_v1_pool(self):
service = self.get_wrapper()
pool = {"pool": {"id": "pool-id"}}
service.delete_v1_pool(pool["pool"]["id"])
service.client.delete_pool.assert_called_once_with("pool-id")
self.wrapper.delete_v1_pool(pool["pool"]["id"])
self.wrapper.client.delete_pool.assert_called_once_with("pool-id")
@mock.patch("rally_openstack.common.wrappers.network.NeutronWrapper"
".supports_extension", return_value=(True, ""))
def test_delete_network_with_dhcp_and_router_and_ports_and_subnets(
self, mock_neutron_wrapper_supports_extension):
def test_delete_network(self):
self._nc.list_ports.return_value = {"ports": []}
self._nc.list_subnets.return_value = {"subnets": []}
self._nc.delete_network.return_value = "foo_deleted"
self.wrapper.delete_network(
{"id": "foo_id", "router_id": None, "subnets": [], "name": "x",
"status": "y", "external": False})
self.assertFalse(self._nc.remove_gateway_router.called)
self.assertFalse(self._nc.remove_interface_router.called)
self.assertFalse(self._nc.client.delete_router.called)
self.assertFalse(self._nc.client.delete_subnet.called)
self._nc.delete_network.assert_called_once_with("foo_id")
def test_delete_network_with_router_and_ports_and_subnets(self):
service = self.get_wrapper()
agents = ["foo_agent", "bar_agent"]
subnets = ["foo_subnet", "bar_subnet"]
ports = [{"id": "foo_port", "device_owner": "network:router_interface",
"device_id": "rounttter"},
{"id": "bar_port", "device_owner": "network:dhcp"}]
service.client.list_dhcp_agent_hosting_networks.return_value = (
{"agents": [{"id": agent_id} for agent_id in agents]})
service.client.list_ports.return_value = ({"ports": ports})
service.client.list_subnets.return_value = (
self._nc.list_ports.return_value = ({"ports": ports})
self._nc.list_subnets.return_value = (
{"subnets": [{"id": id_} for id_ in subnets]})
service.client.delete_network.return_value = "foo_deleted"
result = service.delete_network(
self.wrapper.delete_network(
{"id": "foo_id", "router_id": "foo_router", "subnets": subnets,
"lb_pools": []})
"lb_pools": [], "name": "foo", "status": "x", "external": False})
self.assertEqual("foo_deleted", result)
self.assertEqual(service.client.remove_gateway_router.mock_calls,
self.assertEqual(self._nc.remove_gateway_router.mock_calls,
[mock.call("foo_router")])
service.client.delete_port.assert_called_once_with(ports[1]["id"])
service.client.remove_interface_router.assert_called_once_with(
self._nc.delete_port.assert_called_once_with(ports[1]["id"])
self._nc.remove_interface_router.assert_called_once_with(
ports[0]["device_id"], {"port_id": ports[0]["id"]})
self.assertEqual(service.client.delete_subnet.mock_calls,
[mock.call(subnet_id) for subnet_id in subnets])
service.client.delete_network.assert_called_once_with("foo_id")
self.assertEqual(
[mock.call(subnet_id) for subnet_id in subnets],
self._nc.delete_subnet.call_args_list
)
self._nc.delete_network.assert_called_once_with("foo_id")
@ddt.data({"exception_type": neutron_exceptions.NotFound,
"should_raise": False},
@ -295,193 +288,153 @@ class NeutronWrapperTestCase(test.TestCase):
{"exception_type": KeyError,
"should_raise": True})
@ddt.unpack
@mock.patch("rally_openstack.common.wrappers.network.NeutronWrapper"
".supports_extension", return_value=(True, ""))
def test_delete_network_with_router_throw_exception(
self, mock_neutron_wrapper_supports_extension, exception_type,
should_raise):
self, exception_type, should_raise):
# Ensure cleanup context still move forward even
# remove_interface_router throw NotFound/BadRequest exception
service = self.get_wrapper()
service.client.remove_interface_router.side_effect = exception_type
agents = ["foo_agent", "bar_agent"]
self._nc.remove_interface_router.side_effect = exception_type
subnets = ["foo_subnet", "bar_subnet"]
ports = [{"id": "foo_port", "device_owner": "network:router_interface",
"device_id": "rounttter"},
{"id": "bar_port", "device_owner": "network:dhcp"}]
service.client.list_dhcp_agent_hosting_networks.return_value = (
{"agents": [{"id": agent_id} for agent_id in agents]})
service.client.list_ports.return_value = ({"ports": ports})
service.client.delete_network.return_value = "foo_deleted"
service.client.list_subnets.return_value = {"subnets": [
self._nc.list_ports.return_value = {"ports": ports}
self._nc.list_subnets.return_value = {"subnets": [
{"id": id_} for id_ in subnets]}
if should_raise:
self.assertRaises(exception_type, service.delete_network,
{"id": "foo_id", "router_id": "foo_router",
"subnets": subnets, "lb_pools": []})
self.assertNotEqual(service.client.delete_subnet.mock_calls,
[mock.call(subnet_id) for subnet_id in
subnets])
self.assertFalse(service.client.delete_network.called)
self.assertRaises(
exception_type, self.wrapper.delete_network,
{"id": "foo_id", "name": "foo", "router_id": "foo_router",
"subnets": subnets, "lb_pools": [], "status": "xxx",
"external": False})
self.assertFalse(self._nc.delete_subnet.called)
self.assertFalse(self._nc.delete_network.called)
else:
result = service.delete_network(
{"id": "foo_id", "router_id": "foo_router", "subnets": subnets,
"lb_pools": []})
self.wrapper.delete_network(
{"id": "foo_id", "name": "foo", "status": "xxx",
"router_id": "foo_router", "subnets": subnets,
"lb_pools": [], "external": False})
self.assertEqual("foo_deleted", result)
service.client.delete_port.assert_called_once_with(ports[1]["id"])
service.client.remove_interface_router.assert_called_once_with(
self._nc.delete_port.assert_called_once_with(ports[1]["id"])
self._nc.remove_interface_router.assert_called_once_with(
ports[0]["device_id"], {"port_id": ports[0]["id"]})
self.assertEqual(service.client.delete_subnet.mock_calls,
[mock.call(subnet_id) for subnet_id in subnets])
service.client.delete_network.assert_called_once_with("foo_id")
self.assertEqual(
[mock.call(subnet_id) for subnet_id in subnets],
self._nc.delete_subnet.call_args_list
)
self._nc.delete_network.assert_called_once_with("foo_id")
self.assertEqual(service.client.remove_gateway_router.mock_calls,
[mock.call("foo_router")])
self._nc.remove_gateway_router.assert_called_once_with(
"foo_router")
def test_list_networks(self):
service = self.get_wrapper()
service.client.list_networks.return_value = {"networks": "foo_nets"}
self.assertEqual("foo_nets", service.list_networks())
service.client.list_networks.assert_called_once_with()
self._nc.list_networks.return_value = {"networks": "foo_nets"}
self.assertEqual("foo_nets", self.wrapper.list_networks())
self._nc.list_networks.assert_called_once_with()
@mock.patch(SVC + "NeutronWrapper.external_networks")
def test_create_floating_ip(self, mock_neutron_wrapper_external_networks):
wrap = self.get_wrapper()
wrap.create_port = mock.Mock(return_value={"id": "port_id"})
wrap.client.create_floatingip = mock.Mock(
return_value={"floatingip": {"id": "fip_id",
"floating_ip_address": "fip_ip"}})
def test_create_floating_ip(self):
self._nc.create_port.return_value = {"port": {"id": "port_id"}}
self._nc.create_floatingip.return_value = {
"floatingip": {"id": "fip_id", "floating_ip_address": "fip_ip"}}
self.assertRaises(ValueError, wrap.create_floating_ip)
self.assertRaises(ValueError, self.wrapper.create_floating_ip)
mock_neutron_wrapper_external_networks.__get__ = lambda *args: []
self._nc.list_networks.return_value = {"networks": []}
self.assertRaises(network.NetworkWrapperException,
wrap.create_floating_ip, tenant_id="foo_tenant")
self.wrapper.create_floating_ip,
tenant_id="foo_tenant")
mock_neutron_wrapper_external_networks.__get__ = (
lambda *args: [{"id": "ext_id"}]
)
fip = wrap.create_floating_ip(tenant_id="foo_tenant",
port_id="port_id")
self._nc.list_networks.return_value = {"networks": [{"id": "ext_id"}]}
fip = self.wrapper.create_floating_ip(
tenant_id="foo_tenant", port_id="port_id")
self.assertEqual({"id": "fip_id", "ip": "fip_ip"}, fip)
wrap.get_network = mock.Mock(
return_value={"id": "foo_net", "external": True})
wrap.create_floating_ip(tenant_id="foo_tenant", ext_network="ext_net",
port_id="port_id")
self._nc.list_networks.return_value = {"networks": [
{"id": "ext_net_id", "name": "ext_net", "router:external": True}]}
self.wrapper.create_floating_ip(
tenant_id="foo_tenant", ext_network="ext_net", port_id="port_id")
wrap.get_network = mock.Mock(
return_value={"id": "foo_net", "external": False})
wrap.create_floating_ip(tenant_id="foo_tenant", port_id="port_id")
self.assertRaises(network.NetworkWrapperException,
wrap.create_floating_ip, tenant_id="foo_tenant",
ext_network="ext_net")
self.assertRaises(
network.NetworkWrapperException,
self.wrapper.create_floating_ip, tenant_id="foo_tenant",
ext_network="ext_net_2")
def test_delete_floating_ip(self):
wrap = self.get_wrapper()
wrap.delete_floating_ip("fip_id")
wrap.delete_floating_ip("fip_id", ignored_kwarg="bar")
self.wrapper.delete_floating_ip("fip_id")
self.wrapper.delete_floating_ip("fip_id", ignored_kwarg="bar")
self.assertEqual([mock.call("fip_id")] * 2,
wrap.client.delete_floatingip.mock_calls)
self._nc.delete_floatingip.call_args_list)
@mock.patch(SVC + "NeutronWrapper.external_networks")
def test_create_router(self, mock_neutron_wrapper_external_networks):
wrap = self.get_wrapper()
wrap.client.create_router.return_value = {"router": "foo_router"}
wrap.client.list_extensions.return_value = {
def test_create_router(self):
self._nc.create_router.return_value = {"router": "foo_router"}
self._nc.list_extensions.return_value = {
"extensions": [{"alias": "ext-gw-mode"}]}
mock_neutron_wrapper_external_networks.__get__ = (
lambda *args: [{"id": "ext_id"}]
)
self._nc.list_networks.return_value = {"networks": [{"id": "ext_id"}]}
router = wrap.create_router()
wrap.client.create_router.assert_called_once_with(
router = self.wrapper.create_router()
self._nc.create_router.assert_called_once_with(
{"router": {"name": self.owner.generate_random_name.return_value}})
self.assertEqual("foo_router", router)
router = wrap.create_router(external=True, foo="bar")
wrap.client.create_router.assert_called_with(
self.wrapper.create_router(external=True, flavor_id="bar")
self._nc.create_router.assert_called_with(
{"router": {"name": self.owner.generate_random_name.return_value,
"external_gateway_info": {
"network_id": "ext_id",
"enable_snat": True},
"foo": "bar"}})
"flavor_id": "bar"}})
@mock.patch(SVC + "NeutronWrapper.external_networks")
def test_create_router_without_ext_gw_mode_extension(
self, mock_neutron_wrapper_external_networks):
wrap = self.get_wrapper()
wrap.client.create_router.return_value = {"router": "foo_router"}
wrap.client.list_extensions.return_value = {"extensions": []}
mock_neutron_wrapper_external_networks.__get__ = (
lambda *args: [{"id": "ext_id"}]
)
def test_create_router_without_ext_gw_mode_extension(self):
self._nc.create_router.return_value = {"router": "foo_router"}
self._nc.list_extensions.return_value = {"extensions": []}
self._nc.list_networks.return_value = {"networks": [{"id": "ext_id"}]}
router = wrap.create_router()
wrap.client.create_router.assert_called_once_with(
router = self.wrapper.create_router()
self._nc.create_router.assert_called_once_with(
{"router": {"name": self.owner.generate_random_name.return_value}})
self.assertEqual(router, "foo_router")
router = wrap.create_router(external=True, foo="bar")
wrap.client.create_router.assert_called_with(
self.wrapper.create_router(external=True, flavor_id="bar")
self._nc.create_router.assert_called_with(
{"router": {"name": self.owner.generate_random_name.return_value,
"external_gateway_info": {"network_id": "ext_id"},
"foo": "bar"}})
"flavor_id": "bar"}})
def test_create_port(self):
wrap = self.get_wrapper()
wrap.client.create_port.return_value = {"port": "foo_port"}
self._nc.create_port.return_value = {"port": "foo_port"}
port = wrap.create_port("foo_net")
wrap.client.create_port.assert_called_once_with(
port = self.wrapper.create_port("foo_net")
self._nc.create_port.assert_called_once_with(
{"port": {"network_id": "foo_net",
"name": self.owner.generate_random_name.return_value}})
self.assertEqual("foo_port", port)
port = wrap.create_port("foo_net", foo="bar")
wrap.client.create_port.assert_called_with(
port = self.wrapper.create_port("foo_net", foo="bar")
self.wrapper.client.create_port.assert_called_with(
{"port": {"network_id": "foo_net",
"name": self.owner.generate_random_name.return_value,
"foo": "bar"}})
def test_supports_extension(self):
wrap = self.get_wrapper()
wrap.client.list_extensions.return_value = (
self._nc.list_extensions.return_value = (
{"extensions": [{"alias": "extension"}]})
self.assertTrue(wrap.supports_extension("extension")[0])
self.assertTrue(self.wrapper.supports_extension("extension")[0])
wrap.client.list_extensions.return_value = (
self.wrapper.neutron._cached_supported_extensions = None
self._nc.list_extensions.return_value = (
{"extensions": [{"alias": "extension"}]})
self.assertFalse(wrap.supports_extension("dummy-group")[0])
self.assertFalse(self.wrapper.supports_extension("dummy-group")[0])
wrap.client.list_extensions.return_value = {}
self.assertFalse(wrap.supports_extension("extension")[0])
self.wrapper.neutron._cached_supported_extensions = None
self._nc.list_extensions.return_value = {"extensions": []}
self.assertFalse(self.wrapper.supports_extension("extension")[0])
class FunctionsTestCase(test.TestCase):
def test_generate_cidr(self):
with mock.patch("rally_openstack.common.wrappers.network.cidr_incr",
iter(range(1, 4))):
self.assertEqual("10.2.1.0/24", network.generate_cidr())
self.assertEqual("10.2.2.0/24", network.generate_cidr())
self.assertEqual("10.2.3.0/24", network.generate_cidr())
with mock.patch("rally_openstack.common.wrappers.network.cidr_incr",
iter(range(1, 4))):
start_cidr = "1.1.0.0/26"
self.assertEqual("1.1.0.64/26", network.generate_cidr(start_cidr))
self.assertEqual("1.1.0.128/26", network.generate_cidr(start_cidr))
self.assertEqual("1.1.0.192/26", network.generate_cidr(start_cidr))
def test_wrap(self):
mock_clients = mock.Mock()
mock_clients.nova().networks.list.return_value = []
config = {"fakearg": "fake"}
owner = Owner()

View File

@ -149,16 +149,6 @@ class NeutronMixinTestCase(test.TestCase):
neut.user = mock.MagicMock()
self.assertEqual(neut.user.neutron.return_value, neut._manager())
@mock.patch("%s.NeutronMixin._manager" % BASE)
def test_supports_extension(self, mock__manager):
mock__manager().list_extensions.return_value = {
"extensions": [{"alias": "foo"}, {"alias": "bar"}]
}
neut = self.get_neutron_mixin()
self.assertTrue(neut.supports_extension("foo"))
self.assertTrue(neut.supports_extension("bar"))
self.assertFalse(neut.supports_extension("foobar"))
def test_id(self):
neut = self.get_neutron_mixin()
neut.raw_resource = {"id": "test"}
@ -200,11 +190,12 @@ class NeutronLbaasV1MixinTestCase(test.TestCase):
def get_neutron_lbaasv1_mixin(self, extensions=None):
if extensions is None:
extensions = []
neut = resources.NeutronLbaasV1Mixin()
user = mock.MagicMock()
neut = resources.NeutronLbaasV1Mixin(user=user)
neut._service = "neutron"
neut._resource = "some_resource"
neut._manager = mock.Mock()
neut._manager().list_extensions.return_value = {
user.neutron.return_value.list_extensions.return_value = {
"extensions": [{"alias": ext} for ext in extensions]
}
return neut
@ -234,11 +225,13 @@ class NeutronLbaasV2MixinTestCase(test.TestCase):
def get_neutron_lbaasv2_mixin(self, extensions=None):
if extensions is None:
extensions = []
neut = resources.NeutronLbaasV2Mixin()
user = mock.MagicMock()
neut = resources.NeutronLbaasV2Mixin(user=user)
neut._service = "neutron"
neut._resource = "some_resource"
neut._manager = mock.Mock()
neut._manager().list_extensions.return_value = {
user.neutron.return_value.list_extensions.return_value = {
"extensions": [{"alias": ext} for ext in extensions]
}
return neut
@ -310,7 +303,8 @@ class NeutronBgpvpnTestCase(test.TestCase):
admin = mock.Mock()
neut = resources.NeutronBgpvpn(admin=admin)
neut._manager = mock.Mock()
neut._manager().list_extensions.return_value = {
nc = admin.neutron.return_value
nc.list_extensions.return_value = {
"extensions": [{"alias": ext} for ext in extensions]
}
return neut

File diff suppressed because it is too large Load Diff

View File

@ -534,8 +534,10 @@ class NovaScenarioTestCase(test.ScenarioTestCase):
"nova.get_console_url_server")
def test__associate_floating_ip(self):
nova_scenario = utils.NovaScenario(context=self.context)
neutronclient = nova_scenario.clients("neutron")
clients = mock.MagicMock()
nova_scenario = utils.NovaScenario(context=self.context,
clients=clients)
neutronclient = clients.neutron.return_value
neutronclient.list_ports.return_value = {"ports": [{"id": "p1"},
{"id": "p2"}]}
@ -568,8 +570,10 @@ class NovaScenarioTestCase(test.ScenarioTestCase):
"nova.associate_floating_ip", count=2)
def test__associate_floating_ip_deprecated_behavior(self):
nova_scenario = utils.NovaScenario(context=self.context)
neutronclient = nova_scenario.clients("neutron")
clients = mock.MagicMock()
nova_scenario = utils.NovaScenario(context=self.context,
clients=clients)
neutronclient = clients.neutron.return_value
neutronclient.list_ports.return_value = {"ports": [{"id": "p1"},
{"id": "p2"}]}
@ -587,7 +591,7 @@ class NovaScenarioTestCase(test.ScenarioTestCase):
)
neutronclient.list_floatingips.assert_called_once_with(
tenant_id="fake_tenant")
floating_ip_address=fip_ip)
# it is an old behavior. let's check that it was not called
self.assertFalse(self.server.add_floating_ip.called)
@ -596,8 +600,10 @@ class NovaScenarioTestCase(test.ScenarioTestCase):
"nova.associate_floating_ip")
def test__dissociate_floating_ip(self):
nova_scenario = utils.NovaScenario(context=self.context)
neutronclient = nova_scenario.clients("neutron")
clients = mock.MagicMock()
nova_scenario = utils.NovaScenario(context=self.context,
clients=clients)
neutronclient = clients.neutron.return_value
fip_ip = "172.168.0.1"
fip_id = "some"
@ -628,8 +634,10 @@ class NovaScenarioTestCase(test.ScenarioTestCase):
"nova.dissociate_floating_ip", count=2)
def test__disassociate_floating_ip_deprecated_behavior(self):
nova_scenario = utils.NovaScenario(context=self.context)
neutronclient = nova_scenario.clients("neutron")
clients = mock.MagicMock()
nova_scenario = utils.NovaScenario(context=self.context,
clients=clients)
neutronclient = clients.neutron.return_value
fip_id = "fip1"
fip_ip = "172.168.0.1"
@ -645,7 +653,7 @@ class NovaScenarioTestCase(test.ScenarioTestCase):
)
neutronclient.list_floatingips.assert_called_once_with(
tenant_id="fake_tenant")
floating_ip_address=fip_ip)
self._test_atomic_action_timer(nova_scenario.atomic_actions(),
"nova.dissociate_floating_ip")