Merge "Create notifications for dependent ports"
This commit is contained in:
commit
449dd1f125
|
@ -39,6 +39,7 @@ from neutron.extensions import portbindings
|
|||
from neutron.plugins.common import constants as pconst
|
||||
from neutron.plugins.ml2 import driver_api as api
|
||||
from neutron.plugins.ml2 import models
|
||||
from neutron_lib.api.definitions import provider_net as provider
|
||||
from neutron_lib import constants as n_constants
|
||||
from neutron_lib import context as nctx
|
||||
from neutron_lib import exceptions as n_exceptions
|
||||
|
@ -327,6 +328,37 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
|
|||
if not self.aim.get(aim_ctx, ap):
|
||||
self.aim.create(aim_ctx, ap)
|
||||
|
||||
def _is_dependent_port_change(self, port):
|
||||
# For now, we are only handling changes to DHCP
|
||||
# ports and LBaaS ports. This can be expanded in
|
||||
# the future to handle other changes, if needed
|
||||
return bool(port and any(port['device_owner'].startswith(x) for x in
|
||||
(n_constants.DEVICE_OWNER_DHCP,
|
||||
n_constants.DEVICE_OWNER_LOADBALANCERV2)))
|
||||
|
||||
def _notify_if_dependent_port_change(self, context, port):
|
||||
# Under some scenarios, opflex agents might not get
|
||||
# triggers to update EP files, even though parameters
|
||||
# contained in the files have changed (e.g. metadata
|
||||
# route when using HA DHCP). We ensure that the EP files
|
||||
# get updated by triggering port update notifications
|
||||
# to the agents, which tells them to go get all their EP
|
||||
# files for the ports they own on that network.
|
||||
if self._is_dependent_port_change(port):
|
||||
plugin_context = context._plugin_context
|
||||
net = self.plugin.get_network(plugin_context, port['network_id'])
|
||||
# Notifications are only needed to opflex networks, which will
|
||||
# have opflex agents
|
||||
if not net or not self._is_opflex_type(net[provider.NETWORK_TYPE]):
|
||||
return
|
||||
|
||||
filters = {'network_id': [net['id']]}
|
||||
ports_to_update = self.plugin.get_ports(plugin_context, filters)
|
||||
# Exclude ports that triggered this notification
|
||||
affected_port_ids = [p['id'] for p in ports_to_update
|
||||
if not self._is_dependent_port_change(p)]
|
||||
self._notify_port_update_bulk(plugin_context, affected_port_ids)
|
||||
|
||||
def create_network_precommit(self, context):
|
||||
current = context.current
|
||||
LOG.debug("APIC AIM MD creating network: %s", current)
|
||||
|
@ -1411,6 +1443,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
|
|||
context.bottom_bound_segment,
|
||||
context.host
|
||||
)
|
||||
# REVISIT: it may be possible to move this to the precommit
|
||||
self._notify_if_dependent_port_change(context, port)
|
||||
|
||||
def delete_port_precommit(self, context):
|
||||
port = context.current
|
||||
|
@ -1557,6 +1591,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
|
|||
context.bottom_bound_segment,
|
||||
context.host
|
||||
)
|
||||
# REVISIT: it may be possible to move this to the precommit
|
||||
self._notify_if_dependent_port_change(context, port)
|
||||
|
||||
def create_floatingip(self, context, current):
|
||||
if current['port_id']:
|
||||
|
|
|
@ -13,9 +13,7 @@
|
|||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.db import api as db_api
|
||||
from neutron.extensions import portbindings
|
||||
from neutron.plugins.ml2 import rpc as ml2_rpc
|
||||
from neutron_lib import constants
|
||||
from opflexagent import rpc as o_rpc
|
||||
from oslo_log import log
|
||||
|
||||
|
@ -106,32 +104,6 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
|
|||
LOG.exception(e)
|
||||
return None
|
||||
|
||||
# REVISIT: this should exist in the mechanism driver, and should
|
||||
# be addressed by any patch that refactors this RPC class
|
||||
def notify_filtered_ports_per_network(self, context, **kwargs):
|
||||
LOG.debug("APIC AIM handling get_ports_for_network for: %s", kwargs)
|
||||
try:
|
||||
host_id = kwargs.get('host')
|
||||
network_id = kwargs.get('network')
|
||||
core_plugin = self._core_plugin
|
||||
filters = {'network_id': [network_id]}
|
||||
ports_to_update = core_plugin.get_ports(context, filters)
|
||||
# Exclude DHCP and LBaaS ports -- these are possible triggers
|
||||
# for the network notifaction that resulted in the agents calling
|
||||
# this RPC. Updates for those ports and will be handled from their
|
||||
# own port notifications, if needed.
|
||||
for p in ports_to_update:
|
||||
if (p.get(portbindings.HOST_ID) == host_id) and not (
|
||||
p['device_owner'].startswith(
|
||||
constants.DEVICE_OWNER_DHCP) or
|
||||
p['device_owner'].startswith(
|
||||
constants.DEVICE_OWNER_LOADBALANCERV2)):
|
||||
self._send_port_update_notification(context, p['id'])
|
||||
except Exception as e:
|
||||
LOG.error("An exception has occurred while requesting ports "
|
||||
"for network %s", kwargs.get('network'))
|
||||
LOG.exception(e)
|
||||
|
||||
# Child class needs to support:
|
||||
# - self._send_port_update_notification(context, port)
|
||||
def ip_address_owner_update(self, context, **kwargs):
|
||||
|
|
|
@ -301,9 +301,10 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
|
|||
self.fmt)
|
||||
return self.deserialize(self.fmt, req.get_response(self.api))
|
||||
|
||||
def _bind_dhcp_port_to_host(self, port_id, host):
|
||||
def _bind_other_port_to_host(self, port_id, host,
|
||||
owner=n_constants.DEVICE_OWNER_DHCP):
|
||||
data = {'port': {'binding:host_id': host,
|
||||
'device_owner': 'network:dhcp',
|
||||
'device_owner': owner,
|
||||
'device_id': 'someid'}}
|
||||
# Create EP with bound port
|
||||
req = self.new_update_request('ports', data, port_id,
|
||||
|
@ -3703,6 +3704,45 @@ class TestPortBinding(ApicAimTestCase):
|
|||
port['binding:vif_details'])
|
||||
self.assertEqual(n_constants.PORT_STATUS_ACTIVE, port['status'])
|
||||
|
||||
def test_bind_dependent_port_triggers_port_notify(self):
|
||||
# Test that a port update to a DHCP or LBaaS port triggers a
|
||||
# port update notification to a compute port on the same net,
|
||||
# but not to a compute port on a different net
|
||||
self._register_agent('host1', AGENT_CONF_OPFLEX)
|
||||
self._register_agent('host2', AGENT_CONF_OPFLEX)
|
||||
net1 = self._make_network(self.fmt, 'net1', True)
|
||||
self._make_subnet(self.fmt, net1, '10.0.1.1', '10.0.1.0/24')
|
||||
net2 = self._make_network(self.fmt, 'net2', True)
|
||||
self._make_subnet(self.fmt, net1, '20.0.1.1', '20.0.1.0/24')
|
||||
|
||||
p1 = self._make_port(self.fmt, net1['network']['id'])['port']
|
||||
p1 = self._bind_port_to_host(p1['id'], 'host1')['port']
|
||||
self.assertEqual(net1['network']['id'], p1['network_id'])
|
||||
self.assertEqual('ovs', p1['binding:vif_type'])
|
||||
self.assertEqual({'port_filter': False, 'ovs_hybrid_plug': False},
|
||||
p1['binding:vif_details'])
|
||||
p2 = self._make_port(self.fmt, net2['network']['id'])['port']
|
||||
p2 = self._bind_port_to_host(p2['id'], 'host2')['port']
|
||||
self.assertEqual(net2['network']['id'], p2['network_id'])
|
||||
self.assertEqual('ovs', p2['binding:vif_type'])
|
||||
self.assertEqual({'port_filter': False, 'ovs_hybrid_plug': False},
|
||||
p2['binding:vif_details'])
|
||||
with mock.patch.object(self.driver,
|
||||
'_notify_port_update_bulk') as notify:
|
||||
p3 = self._make_port(self.fmt, net1['network']['id'],
|
||||
device_owner=n_constants.DEVICE_OWNER_DHCP)['port']
|
||||
p3 = self._bind_other_port_to_host(p3['id'], 'host1',
|
||||
owner=n_constants.DEVICE_OWNER_DHCP)['port']
|
||||
mock_calls = [mock.call(mock.ANY, p1['id']),
|
||||
mock.call(mock.ANY, p1['id'])]
|
||||
notify.has_calls(mock_calls)
|
||||
notify.reset_mock()
|
||||
p4 = self._make_port(self.fmt, net1['network']['id'],
|
||||
device_owner=n_constants.DEVICE_OWNER_LOADBALANCERV2)['port']
|
||||
p4 = self._bind_other_port_to_host(p4['id'], 'host1',
|
||||
owner=n_constants.DEVICE_OWNER_LOADBALANCERV2)['port']
|
||||
notify.has_calls(mock_calls)
|
||||
|
||||
# TODO(rkukura): Add tests for opflex, local and unsupported
|
||||
# network_type values.
|
||||
|
||||
|
@ -3800,7 +3840,7 @@ class TestPortBindingDvs(ApicAimTestCase):
|
|||
self.driver.dvs_notifier.reset_mock()
|
||||
p2 = self._make_port(self.fmt, net['network']['id'])['port']
|
||||
self.assertEqual(net['network']['id'], p2['network_id'])
|
||||
newp2 = self._bind_dhcp_port_to_host(p2['id'], 'h1')
|
||||
newp2 = self._bind_other_port_to_host(p2['id'], 'h1')
|
||||
# Called on the network's tenant
|
||||
vif_det = newp2['port']['binding:vif_details']
|
||||
self.assertIsNone(vif_det.get('dvs_port_group_name', None))
|
||||
|
@ -3836,7 +3876,7 @@ class TestPortBindingDvs(ApicAimTestCase):
|
|||
p2 = self._make_port(self.fmt, net['network']['id'])['port']
|
||||
self.assertEqual(net['network']['id'], p2['network_id'])
|
||||
# Bind port to trigger path binding
|
||||
newp2 = self._bind_dhcp_port_to_host(p2['id'], 'h1')
|
||||
newp2 = self._bind_other_port_to_host(p2['id'], 'h1')
|
||||
# Called on the network's tenant
|
||||
vif_det = newp2['port']['binding:vif_details']
|
||||
self.assertIsNone(vif_det.get('dvs_port_group_name', None))
|
||||
|
|
|
@ -3226,46 +3226,6 @@ class TestPolicyTarget(AIMBaseTestCase):
|
|||
def test_get_gbp_details_no_pt_no_as_unrouted(self):
|
||||
self._do_test_gbp_details_no_pt(use_as=False, routed=False)
|
||||
|
||||
def test_notify_filtered_ports_per_network(self, pre_vrf=None):
|
||||
l3p = self.create_l3_policy(name='myl3')['l3_policy']
|
||||
l2p = self.create_l2_policy(name='myl2',
|
||||
l3_policy_id=l3p['id'])['l2_policy']
|
||||
ptg = self.create_policy_target_group(
|
||||
name="ptg1", l2_policy_id=l2p['id'])['policy_target_group']
|
||||
net_id = l2p['network_id']
|
||||
|
||||
pt1 = self.create_policy_target(name="pt1",
|
||||
policy_target_group_id=ptg['id'])['policy_target']
|
||||
pt2 = self.create_policy_target(name="pt2",
|
||||
policy_target_group_id=ptg['id'])['policy_target']
|
||||
pt3 = self.create_policy_target(name="pt3",
|
||||
policy_target_group_id=ptg['id'])['policy_target']
|
||||
with mock.patch.object(self.driver,
|
||||
'_send_port_update_notification') as notify:
|
||||
ctx = self._neutron_admin_context
|
||||
self._bind_port_to_host(pt1['port_id'], 'h1')
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h1', network=net_id)
|
||||
notify.assert_called_with(mock.ANY, pt1['port_id'])
|
||||
notify.reset_mock()
|
||||
self._bind_other_port_to_host(pt2['port_id'], 'h2')
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h2', network=net_id)
|
||||
notify.assert_not_called()
|
||||
self._bind_other_port_to_host(pt3['port_id'], 'h3',
|
||||
n_constants.DEVICE_OWNER_LOADBALANCERV2)
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h3', network=net_id)
|
||||
notify.assert_not_called()
|
||||
# test non-existing host
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h4', network=net_id)
|
||||
notify.assert_not_called()
|
||||
# test non-existing network
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h3', network='foo')
|
||||
notify.assert_not_called()
|
||||
|
||||
def test_ip_address_owner_update(self):
|
||||
l3p = self.create_l3_policy(name='myl3')['l3_policy']
|
||||
l2p = self.create_l2_policy(name='myl2',
|
||||
|
|
Loading…
Reference in New Issue