Create notifications for dependent ports

Changes to certain neutron resources, such as DHCP or LBaaS ports,
require notiications be generated to ports who depend on those
changes (e.g. if a DHCP agent port state changes, the host route
to the metadata agent needs updating). This patch checks if:
   o  an update or delete to a DHCP or LBaaS port occurs
   o  the port belongs to an opflex type network

If both of the above conditions are met, the plugin generates port
update notifications to all the ports on that network, so they can
update their parameters using the request_endpoint_details_list RPC.

Change-Id: Ida893b1c331c4778a1276de54a379627d54b7d23
This commit is contained in:
Thomas Bachman 2017-11-13 19:14:57 +00:00
parent c31162c9b1
commit 0c18b43893
4 changed files with 80 additions and 72 deletions

View File

@ -39,6 +39,7 @@ from neutron.extensions import portbindings
from neutron.plugins.common import constants as pconst
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2 import models
from neutron_lib.api.definitions import provider_net as provider
from neutron_lib import constants as n_constants
from neutron_lib import context as nctx
from neutron_lib import exceptions as n_exceptions
@ -327,6 +328,37 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
if not self.aim.get(aim_ctx, ap):
self.aim.create(aim_ctx, ap)
def _is_dependent_port_change(self, port):
# For now, we are only handling changes to DHCP
# ports and LBaaS ports. This can be expanded in
# the future to handle other changes, if needed
return bool(port and any(port['device_owner'].startswith(x) for x in
(n_constants.DEVICE_OWNER_DHCP,
n_constants.DEVICE_OWNER_LOADBALANCERV2)))
def _notify_if_dependent_port_change(self, context, port):
# Under some scenarios, opflex agents might not get
# triggers to update EP files, even though parameters
# contained in the files have changed (e.g. metadata
# route when using HA DHCP). We ensure that the EP files
# get updated by triggering port update notifications
# to the agents, which tells them to go get all their EP
# files for the ports they own on that network.
if self._is_dependent_port_change(port):
plugin_context = context._plugin_context
net = self.plugin.get_network(plugin_context, port['network_id'])
# Notifications are only needed to opflex networks, which will
# have opflex agents
if not net or not self._is_opflex_type(net[provider.NETWORK_TYPE]):
return
filters = {'network_id': [net['id']]}
ports_to_update = self.plugin.get_ports(plugin_context, filters)
# Exclude ports that triggered this notification
affected_port_ids = [p['id'] for p in ports_to_update
if not self._is_dependent_port_change(p)]
self._notify_port_update_bulk(plugin_context, affected_port_ids)
def create_network_precommit(self, context):
current = context.current
LOG.debug("APIC AIM MD creating network: %s", current)
@ -1411,6 +1443,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
context.bottom_bound_segment,
context.host
)
# REVISIT: it may be possible to move this to the precommit
self._notify_if_dependent_port_change(context, port)
def delete_port_precommit(self, context):
port = context.current
@ -1557,6 +1591,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
context.bottom_bound_segment,
context.host
)
# REVISIT: it may be possible to move this to the precommit
self._notify_if_dependent_port_change(context, port)
def create_floatingip(self, context, current):
if current['port_id']:

View File

@ -13,9 +13,7 @@
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import api as db_api
from neutron.extensions import portbindings
from neutron.plugins.ml2 import rpc as ml2_rpc
from neutron_lib import constants
from opflexagent import rpc as o_rpc
from oslo_log import log
@ -106,32 +104,6 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
LOG.exception(e)
return None
# REVISIT: this should exist in the mechanism driver, and should
# be addressed by any patch that refactors this RPC class
def notify_filtered_ports_per_network(self, context, **kwargs):
LOG.debug("APIC AIM handling get_ports_for_network for: %s", kwargs)
try:
host_id = kwargs.get('host')
network_id = kwargs.get('network')
core_plugin = self._core_plugin
filters = {'network_id': [network_id]}
ports_to_update = core_plugin.get_ports(context, filters)
# Exclude DHCP and LBaaS ports -- these are possible triggers
# for the network notifaction that resulted in the agents calling
# this RPC. Updates for those ports and will be handled from their
# own port notifications, if needed.
for p in ports_to_update:
if (p.get(portbindings.HOST_ID) == host_id) and not (
p['device_owner'].startswith(
constants.DEVICE_OWNER_DHCP) or
p['device_owner'].startswith(
constants.DEVICE_OWNER_LOADBALANCERV2)):
self._send_port_update_notification(context, p['id'])
except Exception as e:
LOG.error("An exception has occurred while requesting ports "
"for network %s", kwargs.get('network'))
LOG.exception(e)
# Child class needs to support:
# - self._send_port_update_notification(context, port)
def ip_address_owner_update(self, context, **kwargs):

View File

@ -301,9 +301,10 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
self.fmt)
return self.deserialize(self.fmt, req.get_response(self.api))
def _bind_dhcp_port_to_host(self, port_id, host):
def _bind_other_port_to_host(self, port_id, host,
owner=n_constants.DEVICE_OWNER_DHCP):
data = {'port': {'binding:host_id': host,
'device_owner': 'network:dhcp',
'device_owner': owner,
'device_id': 'someid'}}
# Create EP with bound port
req = self.new_update_request('ports', data, port_id,
@ -3703,6 +3704,45 @@ class TestPortBinding(ApicAimTestCase):
port['binding:vif_details'])
self.assertEqual(n_constants.PORT_STATUS_ACTIVE, port['status'])
def test_bind_dependent_port_triggers_port_notify(self):
# Test that a port update to a DHCP or LBaaS port triggers a
# port update notification to a compute port on the same net,
# but not to a compute port on a different net
self._register_agent('host1', AGENT_CONF_OPFLEX)
self._register_agent('host2', AGENT_CONF_OPFLEX)
net1 = self._make_network(self.fmt, 'net1', True)
self._make_subnet(self.fmt, net1, '10.0.1.1', '10.0.1.0/24')
net2 = self._make_network(self.fmt, 'net2', True)
self._make_subnet(self.fmt, net1, '20.0.1.1', '20.0.1.0/24')
p1 = self._make_port(self.fmt, net1['network']['id'])['port']
p1 = self._bind_port_to_host(p1['id'], 'host1')['port']
self.assertEqual(net1['network']['id'], p1['network_id'])
self.assertEqual('ovs', p1['binding:vif_type'])
self.assertEqual({'port_filter': False, 'ovs_hybrid_plug': False},
p1['binding:vif_details'])
p2 = self._make_port(self.fmt, net2['network']['id'])['port']
p2 = self._bind_port_to_host(p2['id'], 'host2')['port']
self.assertEqual(net2['network']['id'], p2['network_id'])
self.assertEqual('ovs', p2['binding:vif_type'])
self.assertEqual({'port_filter': False, 'ovs_hybrid_plug': False},
p2['binding:vif_details'])
with mock.patch.object(self.driver,
'_notify_port_update_bulk') as notify:
p3 = self._make_port(self.fmt, net1['network']['id'],
device_owner=n_constants.DEVICE_OWNER_DHCP)['port']
p3 = self._bind_other_port_to_host(p3['id'], 'host1',
owner=n_constants.DEVICE_OWNER_DHCP)['port']
mock_calls = [mock.call(mock.ANY, p1['id']),
mock.call(mock.ANY, p1['id'])]
notify.has_calls(mock_calls)
notify.reset_mock()
p4 = self._make_port(self.fmt, net1['network']['id'],
device_owner=n_constants.DEVICE_OWNER_LOADBALANCERV2)['port']
p4 = self._bind_other_port_to_host(p4['id'], 'host1',
owner=n_constants.DEVICE_OWNER_LOADBALANCERV2)['port']
notify.has_calls(mock_calls)
# TODO(rkukura): Add tests for opflex, local and unsupported
# network_type values.
@ -3800,7 +3840,7 @@ class TestPortBindingDvs(ApicAimTestCase):
self.driver.dvs_notifier.reset_mock()
p2 = self._make_port(self.fmt, net['network']['id'])['port']
self.assertEqual(net['network']['id'], p2['network_id'])
newp2 = self._bind_dhcp_port_to_host(p2['id'], 'h1')
newp2 = self._bind_other_port_to_host(p2['id'], 'h1')
# Called on the network's tenant
vif_det = newp2['port']['binding:vif_details']
self.assertIsNone(vif_det.get('dvs_port_group_name', None))
@ -3836,7 +3876,7 @@ class TestPortBindingDvs(ApicAimTestCase):
p2 = self._make_port(self.fmt, net['network']['id'])['port']
self.assertEqual(net['network']['id'], p2['network_id'])
# Bind port to trigger path binding
newp2 = self._bind_dhcp_port_to_host(p2['id'], 'h1')
newp2 = self._bind_other_port_to_host(p2['id'], 'h1')
# Called on the network's tenant
vif_det = newp2['port']['binding:vif_details']
self.assertIsNone(vif_det.get('dvs_port_group_name', None))

View File

@ -3226,46 +3226,6 @@ class TestPolicyTarget(AIMBaseTestCase):
def test_get_gbp_details_no_pt_no_as_unrouted(self):
self._do_test_gbp_details_no_pt(use_as=False, routed=False)
def test_notify_filtered_ports_per_network(self, pre_vrf=None):
l3p = self.create_l3_policy(name='myl3')['l3_policy']
l2p = self.create_l2_policy(name='myl2',
l3_policy_id=l3p['id'])['l2_policy']
ptg = self.create_policy_target_group(
name="ptg1", l2_policy_id=l2p['id'])['policy_target_group']
net_id = l2p['network_id']
pt1 = self.create_policy_target(name="pt1",
policy_target_group_id=ptg['id'])['policy_target']
pt2 = self.create_policy_target(name="pt2",
policy_target_group_id=ptg['id'])['policy_target']
pt3 = self.create_policy_target(name="pt3",
policy_target_group_id=ptg['id'])['policy_target']
with mock.patch.object(self.driver,
'_send_port_update_notification') as notify:
ctx = self._neutron_admin_context
self._bind_port_to_host(pt1['port_id'], 'h1')
self.driver.notify_filtered_ports_per_network(ctx,
host='h1', network=net_id)
notify.assert_called_with(mock.ANY, pt1['port_id'])
notify.reset_mock()
self._bind_other_port_to_host(pt2['port_id'], 'h2')
self.driver.notify_filtered_ports_per_network(ctx,
host='h2', network=net_id)
notify.assert_not_called()
self._bind_other_port_to_host(pt3['port_id'], 'h3',
n_constants.DEVICE_OWNER_LOADBALANCERV2)
self.driver.notify_filtered_ports_per_network(ctx,
host='h3', network=net_id)
notify.assert_not_called()
# test non-existing host
self.driver.notify_filtered_ports_per_network(ctx,
host='h4', network=net_id)
notify.assert_not_called()
# test non-existing network
self.driver.notify_filtered_ports_per_network(ctx,
host='h3', network='foo')
notify.assert_not_called()
def test_ip_address_owner_update(self):
l3p = self.create_l3_policy(name='myl3')['l3_policy']
l2p = self.create_l2_policy(name='myl2',