Merge "Create notifications for dependent ports"

This commit is contained in:
Zuul 2017-12-13 00:06:35 +00:00 committed by Gerrit Code Review
commit 449dd1f125
4 changed files with 80 additions and 72 deletions

View File

@ -39,6 +39,7 @@ from neutron.extensions import portbindings
from neutron.plugins.common import constants as pconst
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2 import models
from neutron_lib.api.definitions import provider_net as provider
from neutron_lib import constants as n_constants
from neutron_lib import context as nctx
from neutron_lib import exceptions as n_exceptions
@ -327,6 +328,37 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
if not self.aim.get(aim_ctx, ap):
self.aim.create(aim_ctx, ap)
def _is_dependent_port_change(self, port):
# For now, we are only handling changes to DHCP
# ports and LBaaS ports. This can be expanded in
# the future to handle other changes, if needed
return bool(port and any(port['device_owner'].startswith(x) for x in
(n_constants.DEVICE_OWNER_DHCP,
n_constants.DEVICE_OWNER_LOADBALANCERV2)))
def _notify_if_dependent_port_change(self, context, port):
# Under some scenarios, opflex agents might not get
# triggers to update EP files, even though parameters
# contained in the files have changed (e.g. metadata
# route when using HA DHCP). We ensure that the EP files
# get updated by triggering port update notifications
# to the agents, which tells them to go get all their EP
# files for the ports they own on that network.
if self._is_dependent_port_change(port):
plugin_context = context._plugin_context
net = self.plugin.get_network(plugin_context, port['network_id'])
# Notifications are only needed to opflex networks, which will
# have opflex agents
if not net or not self._is_opflex_type(net[provider.NETWORK_TYPE]):
return
filters = {'network_id': [net['id']]}
ports_to_update = self.plugin.get_ports(plugin_context, filters)
# Exclude ports that triggered this notification
affected_port_ids = [p['id'] for p in ports_to_update
if not self._is_dependent_port_change(p)]
self._notify_port_update_bulk(plugin_context, affected_port_ids)
def create_network_precommit(self, context):
current = context.current
LOG.debug("APIC AIM MD creating network: %s", current)
@ -1411,6 +1443,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
context.bottom_bound_segment,
context.host
)
# REVISIT: it may be possible to move this to the precommit
self._notify_if_dependent_port_change(context, port)
def delete_port_precommit(self, context):
port = context.current
@ -1557,6 +1591,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
context.bottom_bound_segment,
context.host
)
# REVISIT: it may be possible to move this to the precommit
self._notify_if_dependent_port_change(context, port)
def create_floatingip(self, context, current):
if current['port_id']:

View File

@ -13,9 +13,7 @@
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import api as db_api
from neutron.extensions import portbindings
from neutron.plugins.ml2 import rpc as ml2_rpc
from neutron_lib import constants
from opflexagent import rpc as o_rpc
from oslo_log import log
@ -106,32 +104,6 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
LOG.exception(e)
return None
# REVISIT: this should exist in the mechanism driver, and should
# be addressed by any patch that refactors this RPC class
def notify_filtered_ports_per_network(self, context, **kwargs):
LOG.debug("APIC AIM handling get_ports_for_network for: %s", kwargs)
try:
host_id = kwargs.get('host')
network_id = kwargs.get('network')
core_plugin = self._core_plugin
filters = {'network_id': [network_id]}
ports_to_update = core_plugin.get_ports(context, filters)
# Exclude DHCP and LBaaS ports -- these are possible triggers
# for the network notifaction that resulted in the agents calling
# this RPC. Updates for those ports and will be handled from their
# own port notifications, if needed.
for p in ports_to_update:
if (p.get(portbindings.HOST_ID) == host_id) and not (
p['device_owner'].startswith(
constants.DEVICE_OWNER_DHCP) or
p['device_owner'].startswith(
constants.DEVICE_OWNER_LOADBALANCERV2)):
self._send_port_update_notification(context, p['id'])
except Exception as e:
LOG.error("An exception has occurred while requesting ports "
"for network %s", kwargs.get('network'))
LOG.exception(e)
# Child class needs to support:
# - self._send_port_update_notification(context, port)
def ip_address_owner_update(self, context, **kwargs):

View File

@ -301,9 +301,10 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
self.fmt)
return self.deserialize(self.fmt, req.get_response(self.api))
def _bind_dhcp_port_to_host(self, port_id, host):
def _bind_other_port_to_host(self, port_id, host,
owner=n_constants.DEVICE_OWNER_DHCP):
data = {'port': {'binding:host_id': host,
'device_owner': 'network:dhcp',
'device_owner': owner,
'device_id': 'someid'}}
# Create EP with bound port
req = self.new_update_request('ports', data, port_id,
@ -3703,6 +3704,45 @@ class TestPortBinding(ApicAimTestCase):
port['binding:vif_details'])
self.assertEqual(n_constants.PORT_STATUS_ACTIVE, port['status'])
def test_bind_dependent_port_triggers_port_notify(self):
# Test that a port update to a DHCP or LBaaS port triggers a
# port update notification to a compute port on the same net,
# but not to a compute port on a different net
self._register_agent('host1', AGENT_CONF_OPFLEX)
self._register_agent('host2', AGENT_CONF_OPFLEX)
net1 = self._make_network(self.fmt, 'net1', True)
self._make_subnet(self.fmt, net1, '10.0.1.1', '10.0.1.0/24')
net2 = self._make_network(self.fmt, 'net2', True)
self._make_subnet(self.fmt, net1, '20.0.1.1', '20.0.1.0/24')
p1 = self._make_port(self.fmt, net1['network']['id'])['port']
p1 = self._bind_port_to_host(p1['id'], 'host1')['port']
self.assertEqual(net1['network']['id'], p1['network_id'])
self.assertEqual('ovs', p1['binding:vif_type'])
self.assertEqual({'port_filter': False, 'ovs_hybrid_plug': False},
p1['binding:vif_details'])
p2 = self._make_port(self.fmt, net2['network']['id'])['port']
p2 = self._bind_port_to_host(p2['id'], 'host2')['port']
self.assertEqual(net2['network']['id'], p2['network_id'])
self.assertEqual('ovs', p2['binding:vif_type'])
self.assertEqual({'port_filter': False, 'ovs_hybrid_plug': False},
p2['binding:vif_details'])
with mock.patch.object(self.driver,
'_notify_port_update_bulk') as notify:
p3 = self._make_port(self.fmt, net1['network']['id'],
device_owner=n_constants.DEVICE_OWNER_DHCP)['port']
p3 = self._bind_other_port_to_host(p3['id'], 'host1',
owner=n_constants.DEVICE_OWNER_DHCP)['port']
mock_calls = [mock.call(mock.ANY, p1['id']),
mock.call(mock.ANY, p1['id'])]
notify.has_calls(mock_calls)
notify.reset_mock()
p4 = self._make_port(self.fmt, net1['network']['id'],
device_owner=n_constants.DEVICE_OWNER_LOADBALANCERV2)['port']
p4 = self._bind_other_port_to_host(p4['id'], 'host1',
owner=n_constants.DEVICE_OWNER_LOADBALANCERV2)['port']
notify.has_calls(mock_calls)
# TODO(rkukura): Add tests for opflex, local and unsupported
# network_type values.
@ -3800,7 +3840,7 @@ class TestPortBindingDvs(ApicAimTestCase):
self.driver.dvs_notifier.reset_mock()
p2 = self._make_port(self.fmt, net['network']['id'])['port']
self.assertEqual(net['network']['id'], p2['network_id'])
newp2 = self._bind_dhcp_port_to_host(p2['id'], 'h1')
newp2 = self._bind_other_port_to_host(p2['id'], 'h1')
# Called on the network's tenant
vif_det = newp2['port']['binding:vif_details']
self.assertIsNone(vif_det.get('dvs_port_group_name', None))
@ -3836,7 +3876,7 @@ class TestPortBindingDvs(ApicAimTestCase):
p2 = self._make_port(self.fmt, net['network']['id'])['port']
self.assertEqual(net['network']['id'], p2['network_id'])
# Bind port to trigger path binding
newp2 = self._bind_dhcp_port_to_host(p2['id'], 'h1')
newp2 = self._bind_other_port_to_host(p2['id'], 'h1')
# Called on the network's tenant
vif_det = newp2['port']['binding:vif_details']
self.assertIsNone(vif_det.get('dvs_port_group_name', None))

View File

@ -3226,46 +3226,6 @@ class TestPolicyTarget(AIMBaseTestCase):
def test_get_gbp_details_no_pt_no_as_unrouted(self):
self._do_test_gbp_details_no_pt(use_as=False, routed=False)
def test_notify_filtered_ports_per_network(self, pre_vrf=None):
l3p = self.create_l3_policy(name='myl3')['l3_policy']
l2p = self.create_l2_policy(name='myl2',
l3_policy_id=l3p['id'])['l2_policy']
ptg = self.create_policy_target_group(
name="ptg1", l2_policy_id=l2p['id'])['policy_target_group']
net_id = l2p['network_id']
pt1 = self.create_policy_target(name="pt1",
policy_target_group_id=ptg['id'])['policy_target']
pt2 = self.create_policy_target(name="pt2",
policy_target_group_id=ptg['id'])['policy_target']
pt3 = self.create_policy_target(name="pt3",
policy_target_group_id=ptg['id'])['policy_target']
with mock.patch.object(self.driver,
'_send_port_update_notification') as notify:
ctx = self._neutron_admin_context
self._bind_port_to_host(pt1['port_id'], 'h1')
self.driver.notify_filtered_ports_per_network(ctx,
host='h1', network=net_id)
notify.assert_called_with(mock.ANY, pt1['port_id'])
notify.reset_mock()
self._bind_other_port_to_host(pt2['port_id'], 'h2')
self.driver.notify_filtered_ports_per_network(ctx,
host='h2', network=net_id)
notify.assert_not_called()
self._bind_other_port_to_host(pt3['port_id'], 'h3',
n_constants.DEVICE_OWNER_LOADBALANCERV2)
self.driver.notify_filtered_ports_per_network(ctx,
host='h3', network=net_id)
notify.assert_not_called()
# test non-existing host
self.driver.notify_filtered_ports_per_network(ctx,
host='h4', network=net_id)
notify.assert_not_called()
# test non-existing network
self.driver.notify_filtered_ports_per_network(ctx,
host='h3', network='foo')
notify.assert_not_called()
def test_ip_address_owner_update(self):
l3p = self.create_l3_policy(name='myl3')['l3_policy']
l2p = self.create_l2_policy(name='myl2',