Routed networks IPv4 inventory in Nova GRP

Using Nova's generic resource pools (GRP) API, publish routed networks
IPv4 inventory. This inventory is then used by the Nova scheduler to
place instances based on the availability of IPv4 addresses in
routed networks segments.

Change-Id: Ib6b00c4889d6a34765844ce46280819dff0108c5
Partially-Implements: blueprint routed-networks
This commit is contained in:
Miguel Lavalle 2016-08-22 08:45:25 -05:00
parent ebe62dcd33
commit 5cbdd10b21
6 changed files with 932 additions and 10 deletions

View File

@ -26,6 +26,7 @@ ROUTER_INTERFACE = 'router_interface'
SECURITY_GROUP = 'security_group'
SECURITY_GROUP_RULE = 'security_group_rule'
SEGMENT = 'segment'
SEGMENT_HOST_MAPPING = 'segment_host_mapping'
SUBNET = 'subnet'
SUBNETS = 'subnets'
SUBNET_GATEWAY = 'subnet_gateway'

View File

@ -41,6 +41,10 @@ class NetworkQosBindingNotFound(e.NotFound):
"could not be found.")
class PlacementEndpointNotFound(e.NotFound):
message = _("Placement API endpoint not found")
class PlacementResourceProviderNotFound(e.NotFound):
message = _("Placement resource provider not found %(resource_provider)s.")

View File

@ -275,6 +275,9 @@ def _update_segment_host_mapping_for_agent(resource, event, trigger,
segment['id'] for segment in segments
if check_segment_for_agent(segment, agent)}
update_segment_host_mapping(context, host, current_segment_ids)
registry.notify(resources.SEGMENT_HOST_MAPPING, events.AFTER_CREATE,
plugin, context=context, host=host,
current_segment_ids=current_segment_ids)
def _add_segment_host_mapping_for_segment(resource, event, trigger,

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
from keystoneauth1 import exceptions as ks_exc
from keystoneauth1 import loading as ks_loading
from keystoneauth1 import session
@ -27,6 +29,16 @@ LOG = logging.getLogger(__name__)
PLACEMENT_API_WITH_AGGREGATES = 'placement 1.1'
def check_placement_api_available(f):
@functools.wraps(f)
def wrapper(self, *a, **k):
try:
return f(self, *a, **k)
except ks_exc.EndpointNotFound:
raise n_exc.PlacementEndpointNotFound()
return wrapper
class PlacementAPIClient(object):
"""Client class for placement ReST API."""
@ -55,6 +67,7 @@ class PlacementAPIClient(object):
return self._client.delete(url, endpoint_filter=self.ks_filter,
**kwargs)
@check_placement_api_available
def create_resource_provider(self, resource_provider):
"""Create a resource provider.
@ -64,6 +77,7 @@ class PlacementAPIClient(object):
url = '/resource_providers'
self._post(url, resource_provider)
@check_placement_api_available
def delete_resource_provider(self, resource_provider_uuid):
"""Delete a resource provider.
@ -73,6 +87,7 @@ class PlacementAPIClient(object):
url = '/resource_providers/%s' % resource_provider_uuid
self._delete(url)
@check_placement_api_available
def create_inventory(self, resource_provider_uuid, inventory):
"""Create an inventory.
@ -86,6 +101,7 @@ class PlacementAPIClient(object):
url = '/resource_providers/%s/inventories' % resource_provider_uuid
self._post(url, inventory)
@check_placement_api_available
def get_inventory(self, resource_provider_uuid, resource_class):
"""Get resource provider inventory.
@ -112,6 +128,7 @@ class PlacementAPIClient(object):
else:
raise
@check_placement_api_available
def update_inventory(self, resource_provider_uuid, inventory,
resource_class):
"""Update an inventory.
@ -134,6 +151,7 @@ class PlacementAPIClient(object):
resource_provider=resource_provider_uuid,
resource_class=resource_class)
@check_placement_api_available
def associate_aggregates(self, resource_provider_uuid, aggregates):
"""Associate a list of aggregates with a resource provider.
@ -147,6 +165,7 @@ class PlacementAPIClient(object):
headers={'openstack-api-version':
PLACEMENT_API_WITH_AGGREGATES})
@check_placement_api_available
def list_aggregates(self, resource_provider_uuid):
"""List resource provider aggregates.

View File

@ -14,20 +14,37 @@
# License for the specific language governing permissions and limitations
# under the License.
from keystoneauth1 import loading as ks_loading
import netaddr
from neutron_lib import constants
from neutron_lib.plugins import directory
from novaclient import client as nova_client
from novaclient import exceptions as nova_exc
from oslo_config import cfg
from oslo_log import log
from neutron._i18n import _
from neutron._i18n import _, _LE, _LI
from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import exceptions as n_exc
from neutron.db import common_db_mixin
from neutron.db import models_v2
from neutron.extensions import ip_allocation
from neutron.extensions import l2_adjacency
from neutron.extensions import segment
from neutron.notifiers import batch_notifier
from neutron.services.segments import db
from neutron.services.segments import exceptions
from neutron.services.segments import placement_client
LOG = log.getLogger(__name__)
NOVA_API_VERSION = '2.41'
IPV4_RESOURCE_CLASS = 'IPV4_ADDRESS'
SEGMENT_NAME_STUB = 'Neutron segment id %s'
MAX_INVENTORY_UPDATE_RETRIES = 10
def _extend_network_dict_binding(plugin, network_res, network_db):
@ -67,6 +84,7 @@ class Plugin(db.SegmentDbMixin, segment.SegmentPluginBase):
attributes.SUBNETS, [_extend_subnet_dict_binding])
common_db_mixin.CommonDbMixin.register_dict_extend_funcs(
attributes.PORTS, [_extend_port_dict_binding])
self.nova_updater = NovaSegmentNotifier()
registry.subscribe(
self._prevent_segment_delete_with_subnet_associated,
@ -90,3 +108,297 @@ class Plugin(db.SegmentDbMixin, segment.SegmentPluginBase):
reason = _("The segment is still associated with subnet(s) "
"%s") % ", ".join(subnet_ids)
raise exceptions.SegmentInUse(segment_id=segment_id, reason=reason)
class Event(object):
def __init__(self, method, segment_ids, total=None, reserved=None,
segment_host_mappings=None, host=None):
self.method = method
if isinstance(segment_ids, set):
self.segment_ids = segment_ids
else:
self.segment_id = segment_ids
self.total = total
self.reserved = reserved
self.segment_host_mappings = segment_host_mappings
self.host = host
class NovaSegmentNotifier(object):
def __init__(self):
self.p_client, self.n_client = self._get_clients()
self.batch_notifier = batch_notifier.BatchNotifier(
cfg.CONF.send_events_interval, self._send_notifications)
registry.subscribe(self._notify_subnet_created, resources.SUBNET,
events.AFTER_CREATE)
registry.subscribe(self._notify_subnet_updated, resources.SUBNET,
events.AFTER_UPDATE)
registry.subscribe(self._notify_subnet_deleted, resources.SUBNET,
events.AFTER_DELETE)
registry.subscribe(self._notify_host_addition_to_aggregate,
resources.SEGMENT_HOST_MAPPING, events.AFTER_CREATE)
registry.subscribe(self._notify_port_created_or_deleted,
resources.PORT, events.AFTER_CREATE)
registry.subscribe(self._notify_port_updated, resources.PORT,
events.AFTER_UPDATE)
registry.subscribe(self._notify_port_created_or_deleted,
resources.PORT, events.AFTER_DELETE)
def _get_clients(self):
p_client = placement_client.PlacementAPIClient()
n_auth = ks_loading.load_auth_from_conf_options(cfg.CONF, 'nova')
n_session = ks_loading.load_session_from_conf_options(
cfg.CONF,
'nova',
auth=n_auth)
extensions = [
ext for ext in nova_client.discover_extensions(NOVA_API_VERSION)
if ext.name == "server_external_events"]
n_client = nova_client.Client(
NOVA_API_VERSION,
session=n_session,
region_name=cfg.CONF.nova.region_name,
endpoint_type=cfg.CONF.nova.endpoint_type,
extensions=extensions)
return p_client, n_client
def _send_notifications(self, batched_events):
for event in batched_events:
try:
event.method(event)
except n_exc.PlacementEndpointNotFound:
LOG.debug('Placement API was not found when trying to '
'update routed networks IPv4 inventories')
return
def _notify_subnet_created(self, resource, event, trigger, context,
subnet, **kwargs):
segment_id = subnet.get('segment_id')
if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4:
return
total, reserved = self._calculate_inventory_total_and_reserved(subnet)
if total:
query = context.session.query(
db.SegmentHostMapping).filter_by(segment_id=segment_id)
self.batch_notifier.queue_event(Event(
self._create_or_update_nova_inventory, segment_id, total=total,
reserved=reserved, segment_host_mappings=query.all()))
def _create_or_update_nova_inventory(self, event):
try:
self._update_nova_inventory(event)
except n_exc.PlacementResourceProviderNotFound:
self._create_nova_inventory(event.segment_id, event.total,
event.reserved,
event.segment_host_mappings)
def _update_nova_inventory(self, event):
for count in range(MAX_INVENTORY_UPDATE_RETRIES):
ipv4_inventory = self.p_client.get_inventory(event.segment_id,
IPV4_RESOURCE_CLASS)
if event.total:
ipv4_inventory['total'] += event.total
if event.reserved:
ipv4_inventory['reserved'] += event.reserved
try:
self.p_client.update_inventory(event.segment_id,
ipv4_inventory,
IPV4_RESOURCE_CLASS)
return
except n_exc.PlacementInventoryUpdateConflict:
LOG.debug('Re-trying to update Nova IPv4 inventory for '
'routed network segment: %s', event.segment_id)
LOG.error(_LE('Failed to update Nova IPv4 inventory for routed '
'network segment: %s'), event.segment_id)
def _create_nova_inventory(self, segment_id, total, reserved,
segment_host_mappings):
name = SEGMENT_NAME_STUB % segment_id
resource_provider = {'name': name, 'uuid': segment_id}
self.p_client.create_resource_provider(resource_provider)
aggregate = self.n_client.aggregates.create(name, None)
self.p_client.associate_aggregates(segment_id, [aggregate.uuid])
for mapping in segment_host_mappings:
self.n_client.aggregates.add_host(aggregate.id, mapping['host'])
ipv4_inventory = {'total': total, 'reserved': reserved,
'min_unit': 1, 'max_unit': 1, 'step_size': 1,
'allocation_ratio': 1.0,
'resource_class': IPV4_RESOURCE_CLASS}
self.p_client.create_inventory(segment_id, ipv4_inventory)
def _calculate_inventory_total_and_reserved(self, subnet):
total = 0
reserved = 0
allocation_pools = subnet.get('allocation_pools') or []
for pool in allocation_pools:
total += int(netaddr.IPAddress(pool['end']) -
netaddr.IPAddress(pool['start'])) + 1
if total:
if subnet['gateway_ip']:
total += 1
reserved += 1
if subnet['enable_dhcp']:
reserved += 1
return total, reserved
def _notify_subnet_updated(self, resource, event, trigger, context,
subnet, original_subnet, **kwargs):
segment_id = subnet.get('segment_id')
if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4:
return
filters = {'segment_id': [segment_id],
'ip_version': [constants.IP_VERSION_4]}
if not subnet['allocation_pools']:
plugin = directory.get_plugin()
alloc_pools = [s['allocation_pools'] for s in
plugin.get_subnets(context, filters=filters)]
if not any(alloc_pools):
self.batch_notifier.queue_event(Event(
self._delete_nova_inventory, segment_id))
return
original_total, original_reserved = (
self._calculate_inventory_total_and_reserved(original_subnet))
updated_total, updated_reserved = (
self._calculate_inventory_total_and_reserved(subnet))
total = updated_total - original_total
reserved = updated_reserved - original_reserved
if total or reserved:
segment_host_mappings = None
if not original_subnet['allocation_pools']:
segment_host_mappings = context.session.query(
db.SegmentHostMapping).filter_by(
segment_id=segment_id).all()
self.batch_notifier.queue_event(Event(
self._create_or_update_nova_inventory, segment_id, total=total,
reserved=reserved,
segment_host_mappings=segment_host_mappings))
def _notify_subnet_deleted(self, resource, event, trigger, context,
subnet, **kwargs):
segment_id = subnet.get('segment_id')
if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4:
return
total, reserved = self._calculate_inventory_total_and_reserved(subnet)
if total:
filters = {'segment_id': [segment_id], 'ip_version': [4]}
plugin = directory.get_plugin()
if plugin.get_subnets_count(context, filters=filters) > 0:
self.batch_notifier.queue_event(Event(
self._update_nova_inventory, segment_id, total=-total,
reserved=-reserved))
else:
self.batch_notifier.queue_event(Event(
self._delete_nova_inventory, segment_id))
def _get_aggregate_id(self, segment_id):
aggregate_uuid = self.p_client.list_aggregates(
segment_id)['aggregates'][0]
aggregates = self.n_client.aggregates.list()
for aggregate in aggregates:
if aggregate.uuid == aggregate_uuid:
return aggregate.id
def _delete_nova_inventory(self, event):
aggregate_id = self._get_aggregate_id(event.segment_id)
aggregate = self.n_client.aggregates.get_details(
aggregate_id)
for host in aggregate.hosts:
self.n_client.aggregates.remove_host(aggregate_id,
host)
self.n_client.aggregates.delete(aggregate_id)
self.p_client.delete_resource_provider(event.segment_id)
def _notify_host_addition_to_aggregate(self, resource, event, trigger,
context, host, current_segment_ids,
**kwargs):
query = context.session.query(models_v2.Subnet).filter(
models_v2.Subnet.segment_id.in_(current_segment_ids))
segment_ids = {subnet['segment_id'] for subnet in query}
self.batch_notifier.queue_event(Event(self._add_host_to_aggregate,
segment_ids, host=host))
def _add_host_to_aggregate(self, event):
for segment_id in event.segment_ids:
try:
aggregate_id = self._get_aggregate_id(segment_id)
except n_exc.PlacementAggregateNotFound:
LOG.info(_LI('When adding host %(host)s, aggregate not found '
'for routed network segment %(segment_id)s'),
{'host': event.host, 'segment_id': segment_id})
continue
try:
self.n_client.aggregates.add_host(aggregate_id, event.host)
except nova_exc.Conflict:
LOG.info(_LI('Host %(host)s already exists in aggregate for '
'routed network segment %(segment_id)s'),
{'host': event.host, 'segment_id': segment_id})
def _notify_port_created_or_deleted(self, resource, event, trigger,
context, port, **kwargs):
if not self._does_port_require_nova_inventory_update(port):
return
ipv4_subnets_number, segment_id = (
self._get_ipv4_subnets_number_and_segment_id(port, context))
if segment_id:
if event == events.AFTER_DELETE:
ipv4_subnets_number = -ipv4_subnets_number
self.batch_notifier.queue_event(Event(self._update_nova_inventory,
segment_id, reserved=ipv4_subnets_number))
def _notify_port_updated(self, resource, event, trigger, context,
**kwargs):
port = kwargs.get('port')
original_port = kwargs.get('original_port')
does_original_port_require_nova_inventory_update = (
self._does_port_require_nova_inventory_update(original_port))
does_port_require_nova_inventory_update = (
self._does_port_require_nova_inventory_update(port))
if not (does_original_port_require_nova_inventory_update or
does_port_require_nova_inventory_update):
return
original_port_ipv4_subnets_number, segment_id = (
self._get_ipv4_subnets_number_and_segment_id(original_port,
context))
if not segment_id:
return
port_ipv4_subnets_number = len(self._get_ipv4_subnet_ids(port))
if not does_original_port_require_nova_inventory_update:
original_port_ipv4_subnets_number = 0
if not does_port_require_nova_inventory_update:
port_ipv4_subnets_number = 0
update = port_ipv4_subnets_number - original_port_ipv4_subnets_number
if update:
self.batch_notifier.queue_event(Event(self._update_nova_inventory,
segment_id, reserved=update))
def _get_ipv4_subnets_number_and_segment_id(self, port, context):
ipv4_subnet_ids = self._get_ipv4_subnet_ids(port)
if not ipv4_subnet_ids:
return 0, None
segment_id = context.session.query(
models_v2.Subnet).filter_by(id=ipv4_subnet_ids[0]).one()[
'segment_id']
if not segment_id:
return 0, None
return len(ipv4_subnet_ids), segment_id
def _does_port_require_nova_inventory_update(self, port):
device_owner = port.get('device_owner')
if (device_owner.startswith(constants.DEVICE_OWNER_COMPUTE_PREFIX) or
device_owner == constants.DEVICE_OWNER_DHCP):
return False
return True
def _get_ipv4_subnet_ids(self, port):
ipv4_subnet_ids = []
for ip in port.get('fixed_ips', []):
if netaddr.IPAddress(
ip['ip_address']).version == constants.IP_VERSION_4:
ipv4_subnet_ids.append(ip['subnet_id'])
return ipv4_subnet_ids

View File

@ -12,11 +12,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from keystoneauth1 import exceptions as ks_exc
import mock
import netaddr
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import directory
from novaclient import exceptions as nova_exc
from oslo_config import cfg
from oslo_utils import uuidutils
import webob.exc
@ -44,6 +48,7 @@ from neutron.plugins.ml2 import config
from neutron.services.segments import db
from neutron.services.segments import exceptions as segment_exc
from neutron.services.segments import placement_client
from neutron.services.segments import plugin as seg_plugin
from neutron.tests import base
from neutron.tests.common import helpers
from neutron.tests.unit.db import test_db_base_plugin_v2
@ -796,7 +801,8 @@ class TestMl2HostSegmentMappingAgentServerSynch(HostSegmentMappingTestCase):
mock_function.assert_not_called()
class TestSegmentAwareIpam(SegmentTestCase):
class SegmentAwareIpamTestCase(SegmentTestCase):
def _setup_host_mappings(self, mappings=()):
ctx = context.get_admin_context()
for segment_id, host in mappings:
@ -808,6 +814,13 @@ class TestSegmentAwareIpam(SegmentTestCase):
cidr='2001:db8:0:0::/64',
physnet='physnet'):
"""Creates one network with one segment and one subnet"""
network, segment = self._create_test_network_and_segment(network,
physnet)
subnet = self._create_test_subnet_with_segment(network, segment, cidr)
return network, segment, subnet
def _create_test_network_and_segment(self, network=None,
physnet='physnet'):
if not network:
with self.network() as network:
pass
@ -816,15 +829,29 @@ class TestSegmentAwareIpam(SegmentTestCase):
network_id=network['network']['id'],
physical_network=physnet,
network_type=p_constants.TYPE_VLAN)
return network, segment
def _create_test_subnet_with_segment(self, network, segment,
cidr='2001:db8:0:0::/64',
allocation_pools=None):
ip_version = netaddr.IPNetwork(cidr).version if cidr else None
with self.subnet(network=network,
segment_id=segment['segment']['id'],
ip_version=ip_version,
cidr=cidr) as subnet:
cidr=cidr,
allocation_pools=allocation_pools) as subnet:
self._validate_l2_adjacency(network['network']['id'],
is_adjacent=False)
return network, segment, subnet
return subnet
def _validate_l2_adjacency(self, network_id, is_adjacent):
request = self.new_show_request('networks', network_id)
response = self.deserialize(self.fmt, request.get_response(self.api))
self.assertEqual(is_adjacent,
response['network'][l2_adjacency.L2_ADJACENCY])
class TestSegmentAwareIpam(SegmentAwareIpamTestCase):
def _create_test_segments_with_subnets(self, num):
"""Creates one network with num segments and num subnets"""
@ -1101,12 +1128,6 @@ class TestSegmentAwareIpam(SegmentTestCase):
self.assertEqual(webob.exc.HTTPOk.code, response.status_int)
self._assert_one_ip_in_subnet(response, subnet['subnet']['cidr'])
def _validate_l2_adjacency(self, network_id, is_adjacent):
request = self.new_show_request('networks', network_id)
response = self.deserialize(self.fmt, request.get_response(self.api))
self.assertEqual(is_adjacent,
response['network'][l2_adjacency.L2_ADJACENCY])
def _validate_deferred_ip_allocation(self, port_id):
request = self.new_show_request('ports', port_id)
response = self.deserialize(self.fmt, request.get_response(self.api))
@ -1394,6 +1415,562 @@ class TestSegmentAwareIpamML2(TestSegmentAwareIpam):
super(TestSegmentAwareIpamML2, self).setUp(plugin='ml2')
class TestNovaSegmentNotifier(SegmentAwareIpamTestCase):
_mechanism_drivers = ['openvswitch', 'logger']
def setUp(self):
config.cfg.CONF.set_override('mechanism_drivers',
self._mechanism_drivers,
group='ml2')
config.cfg.CONF.set_override('network_vlan_ranges',
['physnet:200:209', 'physnet0:200:209',
'physnet1:200:209', 'physnet2:200:209'],
group='ml2_type_vlan')
super(TestNovaSegmentNotifier, self).setUp(plugin='ml2')
self.segments_plugin = directory.get_plugin(ext_segment.SEGMENTS)
nova_updater = self.segments_plugin.nova_updater
nova_updater.p_client = mock.MagicMock()
self.mock_p_client = nova_updater.p_client
nova_updater.n_client = mock.MagicMock()
self.mock_n_client = nova_updater.n_client
self.batch_notifier = nova_updater.batch_notifier
self.batch_notifier._waiting_to_send = True
def _calculate_inventory_total_and_reserved(self, subnet):
total = 0
reserved = 0
allocation_pools = subnet.get('allocation_pools') or []
for pool in allocation_pools:
total += int(netaddr.IPAddress(pool['end']) -
netaddr.IPAddress(pool['start'])) + 1
if total:
if subnet['gateway_ip']:
total += 1
reserved += 1
if subnet['enable_dhcp']:
reserved += 1
return total, reserved
def _assert_inventory_creation(self, segment_id, aggregate, subnet):
self.batch_notifier._notify()
self.mock_p_client.get_inventory.assert_called_with(
segment_id, seg_plugin.IPV4_RESOURCE_CLASS)
self.mock_p_client.update_inventory.assert_not_called()
name = seg_plugin.SEGMENT_NAME_STUB % segment_id
resource_provider = {'name': name, 'uuid': segment_id}
self.mock_p_client.create_resource_provider.assert_called_with(
resource_provider)
self.mock_n_client.aggregates.create.assert_called_with(name, None)
self.mock_p_client.associate_aggregates.assert_called_with(
segment_id, [aggregate.uuid])
self.mock_n_client.aggregates.add_host.assert_called_with(aggregate.id,
'fakehost')
total, reserved = self._calculate_inventory_total_and_reserved(
subnet['subnet'])
inventory, _ = self._get_inventory(total, reserved)
self.mock_p_client.create_inventory.assert_called_with(
segment_id, inventory)
self.assertEqual(
inventory['total'],
self.mock_p_client.create_inventory.call_args[0][1]['total'])
self.assertEqual(
inventory['reserved'],
self.mock_p_client.create_inventory.call_args[0][1]['reserved'])
self.mock_p_client.reset_mock()
self.mock_p_client.get_inventory.side_effect = None
self.mock_n_client.reset_mock()
def _test_first_subnet_association_with_segment(self, cidr='10.0.0.0/24',
allocation_pools=None):
network, segment = self._create_test_network_and_segment()
segment_id = segment['segment']['id']
self._setup_host_mappings([(segment_id, 'fakehost')])
self.mock_p_client.get_inventory.side_effect = (
neutron_exc.PlacementResourceProviderNotFound(
resource_provider=segment_id,
resource_class=seg_plugin.IPV4_RESOURCE_CLASS))
aggregate = mock.MagicMock()
aggregate.uuid = uuidutils.generate_uuid()
aggregate.id = 1
self.mock_n_client.aggregates.create.return_value = aggregate
subnet = self._create_test_subnet_with_segment(
network, segment, cidr=cidr, allocation_pools=allocation_pools)
self._assert_inventory_creation(segment_id, aggregate, subnet)
return network, segment, subnet
def test_first_subnet_association_with_segment(self):
self._test_first_subnet_association_with_segment()
def _assert_inventory_update(self, segment_id, inventory, subnet=None,
original_subnet=None):
self.batch_notifier._notify()
self.mock_p_client.get_inventory.assert_called_with(
segment_id, seg_plugin.IPV4_RESOURCE_CLASS)
original_total = original_reserved = total = reserved = 0
if original_subnet:
original_total, original_reserved = (
self._calculate_inventory_total_and_reserved(original_subnet))
if subnet:
total, reserved = self._calculate_inventory_total_and_reserved(
subnet)
inventory['total'] += total - original_total
inventory['reserved'] += reserved - original_reserved
self.mock_p_client.update_inventory.assert_called_with(segment_id,
inventory, seg_plugin.IPV4_RESOURCE_CLASS)
self.assertEqual(
inventory['total'],
self.mock_p_client.update_inventory.call_args[0][1]['total'])
self.assertEqual(
inventory['reserved'],
self.mock_p_client.update_inventory.call_args[0][1]['reserved'])
self.mock_p_client.reset_mock()
self.mock_n_client.reset_mock()
def _get_inventory(self, total, reserved):
inventory = {'total': total, 'reserved': reserved, 'min_unit': 1,
'max_unit': 1, 'step_size': 1, 'allocation_ratio': 1.0,
'resource_class': seg_plugin.IPV4_RESOURCE_CLASS}
return inventory, copy.deepcopy(inventory)
def _test_second_subnet_association_with_segment(self):
network, segment, first_subnet = (
self._test_first_subnet_association_with_segment())
segment_id = segment['segment']['id']
# Associate an IPv6 subnet with the segment
self._create_test_subnet_with_segment(network, segment)
first_total, first_reserved = (
self._calculate_inventory_total_and_reserved(
first_subnet['subnet']))
inventory, original_inventory = self._get_inventory(first_total,
first_reserved)
self.mock_p_client.get_inventory.return_value = inventory
second_subnet = self._create_test_subnet_with_segment(
network, segment, cidr='10.0.1.0/24')
self._assert_inventory_update(segment_id, original_inventory,
subnet=second_subnet['subnet'])
return segment_id, first_subnet, second_subnet
def test_second_subnet_association_with_segment(self):
self._test_second_subnet_association_with_segment()
def test_delete_last_ipv4_subnet(self):
network, segment, subnet = (
self._test_first_subnet_association_with_segment())
# Associate an IPv6 subnet with the segment
self._create_test_subnet_with_segment(network, segment)
segment_id = segment['segment']['id']
aggregate = mock.MagicMock()
aggregate.uuid = uuidutils.generate_uuid()
aggregate.id = 1
aggregate.hosts = ['fakehost1']
self.mock_p_client.list_aggregates.return_value = {
'aggregates': [aggregate.uuid]}
self.mock_n_client.aggregates.list.return_value = [aggregate]
self.mock_n_client.aggregates.get_details.return_value = aggregate
self._delete('subnets', subnet['subnet']['id'])
self.batch_notifier._notify()
self._assert_inventory_delete(segment_id, aggregate)
def _assert_inventory_delete(self, segment_id, aggregate):
self.mock_p_client.list_aggregates.assert_called_with(segment_id)
self.assertEqual(1, self.mock_n_client.aggregates.list.call_count)
self.mock_n_client.aggregates.get_details.assert_called_with(
aggregate.id)
calls = [mock.call(aggregate.id, host) for host in aggregate.hosts]
self.mock_n_client.aggregates.remove_host.assert_has_calls(calls)
self.mock_n_client.aggregates.delete.assert_called_with(aggregate.id)
self.mock_p_client.delete_resource_provider.assert_called_with(
segment_id)
self.mock_p_client.reset_mock()
self.mock_n_client.reset_mock()
def test_delete_ipv4_subnet(self):
segment_id, first_subnet, second_subnet = (
self._test_second_subnet_association_with_segment())
first_total, first_reserved = (
self._calculate_inventory_total_and_reserved(
first_subnet['subnet']))
second_total, second_reserved = (
self._calculate_inventory_total_and_reserved(
second_subnet['subnet']))
inventory, original_inventory = self._get_inventory(
first_total + second_total, first_reserved + second_reserved)
self.mock_p_client.get_inventory.return_value = inventory
self._delete('subnets', first_subnet['subnet']['id'])
self._assert_inventory_update(segment_id, original_inventory,
original_subnet=first_subnet['subnet'])
def _test_update_ipv4_subnet_allocation_pools(self, allocation_pools,
new_allocation_pools):
network, segment, original_subnet = (
self._test_first_subnet_association_with_segment(
cidr='10.0.0.0/24', allocation_pools=allocation_pools))
segment_id = segment['segment']['id']
self.mock_p_client.reset_mock()
self.mock_n_client.reset_mock()
total, reserved = self._calculate_inventory_total_and_reserved(
original_subnet['subnet'])
inventory, original_inventory = self._get_inventory(total, reserved)
self.mock_p_client.get_inventory.return_value = inventory
subnet_data = {'subnet': {'allocation_pools': new_allocation_pools}}
subnet_req = self.new_update_request('subnets',
subnet_data,
original_subnet['subnet']['id'])
subnet = self.deserialize(self.fmt, subnet_req.get_response(self.api))
self._assert_inventory_update(
segment_id, original_inventory, subnet=subnet['subnet'],
original_subnet=original_subnet['subnet'])
def test_update_ipv4_subnet_expand_allocation_pool(self):
self._test_update_ipv4_subnet_allocation_pools(
[{'start': '10.0.0.2', 'end': '10.0.0.100'}],
[{'start': '10.0.0.2', 'end': '10.0.0.254'}])
def test_update_ipv4_subnet_add_allocation_pool(self):
self._test_update_ipv4_subnet_allocation_pools(
[{'start': '10.0.0.2', 'end': '10.0.0.100'}],
[{'start': '10.0.0.2', 'end': '10.0.0.100'},
{'start': '10.0.0.200', 'end': '10.0.0.254'}])
def test_update_ipv4_subnet_contract_allocation_pool(self):
self._test_update_ipv4_subnet_allocation_pools(
[{'start': '10.0.0.2', 'end': '10.0.0.254'}],
[{'start': '10.0.0.2', 'end': '10.0.0.100'}])
def test_update_ipv4_subnet_remove_allocation_pool(self):
self._test_update_ipv4_subnet_allocation_pools(
[{'start': '10.0.0.2', 'end': '10.0.0.100'},
{'start': '10.0.0.200', 'end': '10.0.0.254'}],
[{'start': '10.0.0.2', 'end': '10.0.0.100'}])
def _test_update_ipv4_subnet_delete_allocation_pools(self):
segment_id, first_subnet, second_subnet = (
self._test_second_subnet_association_with_segment())
first_total, first_reserved = (
self._calculate_inventory_total_and_reserved(
first_subnet['subnet']))
second_total, second_reserved = (
self._calculate_inventory_total_and_reserved(
second_subnet['subnet']))
inventory, original_inventory = self._get_inventory(
first_total + second_total, first_reserved + second_reserved)
self.mock_p_client.get_inventory.return_value = inventory
subnet_data = {'subnet': {'allocation_pools': []}}
subnet_req = self.new_update_request('subnets',
subnet_data,
first_subnet['subnet']['id'])
subnet_req.get_response(self.api)
self._assert_inventory_update(segment_id, original_inventory,
original_subnet=first_subnet['subnet'])
return segment_id, second_subnet
def test_update_ipv4_subnet_delete_allocation_pools(self):
self._test_update_ipv4_subnet_delete_allocation_pools()
def test_update_ipv4_subnet_delete_restore_last_allocation_pool(self):
segment_id, subnet = (
self._test_update_ipv4_subnet_delete_allocation_pools())
self.mock_p_client.reset_mock()
self.mock_n_client.reset_mock()
allocation_pools = subnet['subnet']['allocation_pools']
aggregate = mock.MagicMock()
aggregate.uuid = uuidutils.generate_uuid()
aggregate.id = 1
aggregate.hosts = ['fakehost1']
self.mock_p_client.list_aggregates.return_value = {
'aggregates': [aggregate.uuid]}
self.mock_n_client.aggregates.list.return_value = [aggregate]
self.mock_n_client.aggregates.get_details.return_value = aggregate
subnet_data = {'subnet': {'allocation_pools': []}}
self._update('subnets', subnet['subnet']['id'], subnet_data)
self.batch_notifier._notify()
self._assert_inventory_delete(segment_id, aggregate)
self.mock_p_client.get_inventory.side_effect = (
neutron_exc.PlacementResourceProviderNotFound(
resource_provider=segment_id,
resource_class=seg_plugin.IPV4_RESOURCE_CLASS))
aggregate.hosts = []
self.mock_n_client.aggregates.create.return_value = aggregate
subnet_data = {'subnet': {'allocation_pools': allocation_pools}}
subnet = self._update('subnets', subnet['subnet']['id'], subnet_data)
self._assert_inventory_creation(segment_id, aggregate, subnet)
def test_add_host_to_segment_aggregate(self):
db.subscribe()
network, segment, first_subnet = (
self._test_first_subnet_association_with_segment())
segment_id = segment['segment']['id']
aggregate = mock.MagicMock()
aggregate.uuid = uuidutils.generate_uuid()
aggregate.id = 1
aggregate.hosts = ['fakehost1']
self.mock_p_client.list_aggregates.return_value = {
'aggregates': [aggregate.uuid]}
self.mock_n_client.aggregates.list.return_value = [aggregate]
host = 'otherfakehost'
helpers.register_ovs_agent(host=host,
bridge_mappings={'physnet': 'br-eth-1'},
plugin=self.plugin, start_flag=True)
self.batch_notifier._notify()
self.mock_p_client.list_aggregates.assert_called_with(segment_id)
self.assertEqual(1, self.mock_n_client.aggregates.list.call_count)
self.mock_n_client.aggregates.add_host.assert_called_with(aggregate.id,
host)
def test_add_host_to_non_existent_segment_aggregate(self):
db.subscribe()
network, segment, first_subnet = (
self._test_first_subnet_association_with_segment())
with mock.patch.object(seg_plugin.LOG, 'info') as log:
segment_id = segment['segment']['id']
aggregate = mock.MagicMock()
aggregate.uuid = uuidutils.generate_uuid()
aggregate.id = 1
aggregate.hosts = ['fakehost1']
self.mock_p_client.list_aggregates.side_effect = (
neutron_exc.PlacementAggregateNotFound(
resource_provider=segment_id))
self.mock_n_client.aggregates.list.return_value = [aggregate]
host = 'otherfakehost'
helpers.register_ovs_agent(host=host,
bridge_mappings={'physnet': 'br-eth-1'},
plugin=self.plugin, start_flag=True)
self.batch_notifier._notify()
self.mock_p_client.list_aggregates.assert_called_with(segment_id)
self.assertTrue(log.called)
self.mock_n_client.aggregates.add_host.assert_not_called()
def test_add_host_segment_aggregate_conflict(self):
db.subscribe()
network, segment, first_subnet = (
self._test_first_subnet_association_with_segment())
with mock.patch.object(seg_plugin.LOG, 'info') as log:
segment_id = segment['segment']['id']
aggregate = mock.MagicMock()
aggregate.uuid = uuidutils.generate_uuid()
aggregate.id = 1
aggregate.hosts = ['fakehost1']
self.mock_p_client.list_aggregates.return_value = {
'aggregates': [aggregate.uuid]}
self.mock_n_client.aggregates.add_host.side_effect = (
nova_exc.Conflict(nova_exc.Conflict.http_status))
self.mock_n_client.aggregates.list.return_value = [aggregate]
host = 'otherfakehost'
helpers.register_ovs_agent(host=host,
bridge_mappings={'physnet': 'br-eth-1'},
plugin=self.plugin, start_flag=True)
self.batch_notifier._notify()
self.mock_p_client.list_aggregates.assert_called_with(segment_id)
self.mock_n_client.aggregates.add_host.assert_called_with(
aggregate.id, host)
self.assertTrue(log.called)
def _assert_inventory_update_port(self, segment_id, inventory,
num_fixed_ips):
inventory['reserved'] += num_fixed_ips
self.mock_p_client.get_inventory.assert_called_with(
segment_id, seg_plugin.IPV4_RESOURCE_CLASS)
self.mock_p_client.update_inventory.assert_called_with(segment_id,
inventory, seg_plugin.IPV4_RESOURCE_CLASS)
self.assertEqual(
inventory['total'],
self.mock_p_client.update_inventory.call_args[0][1]['total'])
self.assertEqual(
inventory['reserved'],
self.mock_p_client.update_inventory.call_args[0][1]['reserved'])
self.mock_p_client.reset_mock()
self.mock_n_client.reset_mock()
def _create_test_port(self, network_id, tenant_id, subnet, **kwargs):
port = self._make_port(self.fmt, network_id, tenant_id=tenant_id,
arg_list=(portbindings.HOST_ID,), **kwargs)
self.batch_notifier._notify()
return port
def _test_create_port(self, **kwargs):
network, segment, subnet = (
self._test_first_subnet_association_with_segment())
total, reserved = self._calculate_inventory_total_and_reserved(
subnet['subnet'])
inventory, original_inventory = self._get_inventory(total, reserved)
self.mock_p_client.get_inventory.return_value = inventory
port = self._create_test_port(network['network']['id'],
network['network']['tenant_id'], subnet,
**kwargs)
return segment['segment']['id'], original_inventory, port
def test_create_bound_port(self):
kwargs = {portbindings.HOST_ID: 'fakehost'}
segment_id, original_inventory, _ = self._test_create_port(**kwargs)
self._assert_inventory_update_port(segment_id, original_inventory, 1)
def test_create_bound_port_compute_owned(self):
kwargs = {portbindings.HOST_ID: 'fakehost',
'device_owner': constants.DEVICE_OWNER_COMPUTE_PREFIX}
self._test_create_port(**kwargs)
self.mock_p_client.get_inventory.assert_not_called()
self.mock_p_client.update_inventory.assert_not_called()
def test_create_bound_port_dhcp_owned(self):
kwargs = {portbindings.HOST_ID: 'fakehost',
'device_owner': constants.DEVICE_OWNER_DHCP}
self._test_create_port(**kwargs)
self.mock_p_client.get_inventory.assert_not_called()
self.mock_p_client.update_inventory.assert_not_called()
def test_create_unbound_port(self):
self._test_create_port()
self.mock_p_client.get_inventory.assert_not_called()
self.mock_p_client.update_inventory.assert_not_called()
def test_delete_bound_port(self):
kwargs = {portbindings.HOST_ID: 'fakehost'}
segment_id, before_create_inventory, port = self._test_create_port(
**kwargs)
self.mock_p_client.reset_mock()
inventory, original_inventory = self._get_inventory(
before_create_inventory['total'],
before_create_inventory['reserved'] + 1)
self.mock_p_client.get_inventory.return_value = inventory
self._delete('ports', port['port']['id'])
self.batch_notifier._notify()
self._assert_inventory_update_port(segment_id, original_inventory, -1)
def _create_port_for_update_test(self, num_fixed_ips=1, dhcp_owned=False,
compute_owned=False):
segment_id, first_subnet, second_subnet = (
self._test_second_subnet_association_with_segment())
first_total, first_reserved = (
self._calculate_inventory_total_and_reserved(
first_subnet['subnet']))
second_total, second_reserved = (
self._calculate_inventory_total_and_reserved(
second_subnet['subnet']))
inventory, original_inventory = self._get_inventory(
first_total + second_total, first_reserved + second_reserved)
self.mock_p_client.get_inventory.return_value = inventory
kwargs = {portbindings.HOST_ID: 'fakehost',
'fixed_ips': [{'subnet_id': first_subnet['subnet']['id']}]}
created_fixed_ips = num_fixed_ips
if num_fixed_ips > 1:
kwargs['fixed_ips'].append(
{'subnet_id': second_subnet['subnet']['id']})
if dhcp_owned:
kwargs['device_owner'] = constants.DEVICE_OWNER_DHCP
if compute_owned:
kwargs['device_owner'] = constants.DEVICE_OWNER_COMPUTE_PREFIX
port = self._create_test_port(first_subnet['subnet']['network_id'],
first_subnet['subnet']['tenant_id'],
first_subnet, **kwargs)
if dhcp_owned or compute_owned:
self.mock_p_client.get_inventory.assert_not_called()
self.mock_p_client.update_inventory.assert_not_called()
else:
self._assert_inventory_update_port(segment_id, original_inventory,
created_fixed_ips)
return first_subnet, second_subnet, port
def _port_update(self, first_subnet, second_subnet, fixed_ips_subnets,
port, reserved_increment_before=1,
reserved_increment_after=1, dhcp_owned=False,
compute_owned=False):
first_total, first_reserved = (
self._calculate_inventory_total_and_reserved(
first_subnet['subnet']))
second_total, second_reserved = (
self._calculate_inventory_total_and_reserved(
second_subnet['subnet']))
inventory, original_inventory = self._get_inventory(
first_total + second_total,
first_reserved + second_reserved + reserved_increment_before)
self.mock_p_client.get_inventory.return_value = inventory
port_data = {'port': {'device_owner': ''}}
if fixed_ips_subnets:
port_data['port']['fixed_ips'] = []
for subnet in fixed_ips_subnets:
port_data['port']['fixed_ips'].append(
{'subnet_id': subnet['subnet']['id']})
if dhcp_owned:
port_data['port']['device_owner'] = constants.DEVICE_OWNER_DHCP
if compute_owned:
port_data['port']['device_owner'] = (
constants.DEVICE_OWNER_COMPUTE_PREFIX)
self._update('ports', port['port']['id'], port_data)
self.batch_notifier._notify()
self._assert_inventory_update_port(
first_subnet['subnet']['segment_id'], original_inventory,
reserved_increment_after)
def test_update_port_add_fixed_ip(self):
first_subnet, second_subnet, port = self._create_port_for_update_test()
self._port_update(first_subnet, second_subnet,
[first_subnet, second_subnet], port)
def test_update_port_remove_fixed_ip(self):
first_subnet, second_subnet, port = self._create_port_for_update_test(
num_fixed_ips=2)
self._port_update(first_subnet, second_subnet,
[first_subnet], port, reserved_increment_before=2,
reserved_increment_after=-1)
def test_update_port_change_to_dhcp_owned(self):
first_subnet, second_subnet, port = self._create_port_for_update_test()
self._port_update(first_subnet, second_subnet, [], port,
reserved_increment_after=-1, dhcp_owned=True)
def test_update_port_change_to_no_dhcp_owned(self):
first_subnet, second_subnet, port = self._create_port_for_update_test(
dhcp_owned=True)
self._port_update(first_subnet, second_subnet, [], port,
reserved_increment_before=0,
reserved_increment_after=1)
def test_update_port_change_to_compute_owned(self):
first_subnet, second_subnet, port = self._create_port_for_update_test()
self._port_update(first_subnet, second_subnet, [], port,
reserved_increment_after=-1, compute_owned=True)
def test_update_port_change_to_no_compute_owned(self):
first_subnet, second_subnet, port = self._create_port_for_update_test(
compute_owned=True)
self._port_update(first_subnet, second_subnet, [], port,
reserved_increment_before=0,
reserved_increment_after=1)
def test_placement_api_inventory_update_conflict(self):
with mock.patch.object(seg_plugin.LOG, 'debug') as log_debug:
with mock.patch.object(seg_plugin.LOG, 'error') as log_error:
event = seg_plugin.Event(mock.ANY, mock.ANY, total=1,
reserved=0)
inventory, original_inventory = self._get_inventory(100, 2)
self.mock_p_client.get_inventory.return_value = inventory
self.mock_p_client.update_inventory.side_effect = (
neutron_exc.PlacementInventoryUpdateConflict(
resource_provider=mock.ANY,
resource_class=seg_plugin.IPV4_RESOURCE_CLASS))
self.segments_plugin.nova_updater._update_nova_inventory(event)
self.assertEqual(seg_plugin.MAX_INVENTORY_UPDATE_RETRIES,
self.mock_p_client.get_inventory.call_count)
self.assertEqual(
seg_plugin.MAX_INVENTORY_UPDATE_RETRIES,
self.mock_p_client.update_inventory.call_count)
self.assertEqual(
seg_plugin.MAX_INVENTORY_UPDATE_RETRIES,
log_debug.call_count)
self.assertTrue(log_error.called)
def test_placement_api_not_available(self):
with mock.patch.object(seg_plugin.LOG, 'debug') as log:
event = seg_plugin.Event(
self.segments_plugin.nova_updater._update_nova_inventory,
mock.ANY, total=1, reserved=0)
self.mock_p_client.get_inventory.side_effect = (
neutron_exc.PlacementEndpointNotFound())
self.segments_plugin.nova_updater._send_notifications([event])
self.assertTrue(log.called)
class TestDhcpAgentSegmentScheduling(HostSegmentMappingTestCase):
_mechanism_drivers = ['openvswitch', 'logger']
@ -1613,3 +2190,9 @@ class PlacementAPIClientTestCase(base.DietTestCase):
self.mock_request.side_effect = ks_exc.NotFound
self.assertRaises(neutron_exc.PlacementAggregateNotFound,
self.client.list_aggregates, rp_uuid)
def test_placement_api_not_found(self):
rp_uuid = uuidutils.generate_uuid()
self.mock_request.side_effect = ks_exc.EndpointNotFound
self.assertRaises(neutron_exc.PlacementEndpointNotFound,
self.client.list_aggregates, rp_uuid)