Add RPC for network ports

This adds an RPC to request notifications for ports on
a network. This will be used by a subsequent patch to
trigger port update notifications whenever resources
unrelated to a port change (e.g. let an instance's port
get notified if a HA DHCP port on a given network goes
down).

Change-Id: Ib316028dc9cfd8dd368787e7a25058af49f3e928
This commit is contained in:
Thomas Bachman 2017-12-05 16:25:48 +00:00
parent ed7dbd8e93
commit 7ac47391fd
2 changed files with 72 additions and 3 deletions

View File

@ -13,7 +13,9 @@
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import api as db_api
from neutron.extensions import portbindings
from neutron.plugins.ml2 import rpc as ml2_rpc
from neutron_lib import constants
from opflexagent import rpc as o_rpc
from oslo_log import log
@ -104,6 +106,32 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
LOG.exception(e)
return None
# REVISIT: this should exist in the mechanism driver, and should
# be addressed by any patch that refactors this RPC class
def notify_filtered_ports_per_network(self, context, **kwargs):
LOG.debug("APIC AIM handling get_ports_for_network for: %s", kwargs)
try:
host_id = kwargs.get('host')
network_id = kwargs.get('network')
core_plugin = self._core_plugin
filters = {'network_id': [network_id]}
ports_to_update = core_plugin.get_ports(context, filters)
# Exclude DHCP and LBaaS ports -- these are possible triggers
# for the network notifaction that resulted in the agents calling
# this RPC. Updates for those ports and will be handled from their
# own port notifications, if needed.
for p in ports_to_update:
if (p.get(portbindings.HOST_ID) == host_id) and not (
p['device_owner'].startswith(
constants.DEVICE_OWNER_DHCP) or
p['device_owner'].startswith(
constants.DEVICE_OWNER_LOADBALANCERV2)):
self._send_port_update_notification(context, p['id'])
except Exception as e:
LOG.error("An exception has occurred while requesting ports "
"for network %s", kwargs.get('network'))
LOG.exception(e)
# Child class needs to support:
# - self._send_port_update_notification(context, port)
def ip_address_owner_update(self, context, **kwargs):

View File

@ -221,9 +221,10 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
return super(AIMBaseTestCase, self)._bind_port_to_host(
port_id, host, data=data)
def _bind_dhcp_port_to_host(self, port_id, host):
def _bind_other_port_to_host(self, port_id, host,
owner=n_constants.DEVICE_OWNER_DHCP):
data = {'port': {'binding:host_id': host,
'device_owner': 'network:dhcp',
'device_owner': owner,
'device_id': 'someid'}}
return super(AIMBaseTestCase, self)._bind_port_to_host(
port_id, host, data=data)
@ -3225,6 +3226,46 @@ class TestPolicyTarget(AIMBaseTestCase):
def test_get_gbp_details_no_pt_no_as_unrouted(self):
self._do_test_gbp_details_no_pt(use_as=False, routed=False)
def test_notify_filtered_ports_per_network(self, pre_vrf=None):
l3p = self.create_l3_policy(name='myl3')['l3_policy']
l2p = self.create_l2_policy(name='myl2',
l3_policy_id=l3p['id'])['l2_policy']
ptg = self.create_policy_target_group(
name="ptg1", l2_policy_id=l2p['id'])['policy_target_group']
net_id = l2p['network_id']
pt1 = self.create_policy_target(name="pt1",
policy_target_group_id=ptg['id'])['policy_target']
pt2 = self.create_policy_target(name="pt2",
policy_target_group_id=ptg['id'])['policy_target']
pt3 = self.create_policy_target(name="pt3",
policy_target_group_id=ptg['id'])['policy_target']
with mock.patch.object(self.driver,
'_send_port_update_notification') as notify:
ctx = self._neutron_admin_context
self._bind_port_to_host(pt1['port_id'], 'h1')
self.driver.notify_filtered_ports_per_network(ctx,
host='h1', network=net_id)
notify.assert_called_with(mock.ANY, pt1['port_id'])
notify.reset_mock()
self._bind_other_port_to_host(pt2['port_id'], 'h2')
self.driver.notify_filtered_ports_per_network(ctx,
host='h2', network=net_id)
notify.assert_not_called()
self._bind_other_port_to_host(pt3['port_id'], 'h3',
n_constants.DEVICE_OWNER_LOADBALANCERV2)
self.driver.notify_filtered_ports_per_network(ctx,
host='h3', network=net_id)
notify.assert_not_called()
# test non-existing host
self.driver.notify_filtered_ports_per_network(ctx,
host='h4', network=net_id)
notify.assert_not_called()
# test non-existing network
self.driver.notify_filtered_ports_per_network(ctx,
host='h3', network='foo')
notify.assert_not_called()
def test_ip_address_owner_update(self):
l3p = self.create_l3_policy(name='myl3')['l3_policy']
l2p = self.create_l2_policy(name='myl2',
@ -3436,7 +3477,7 @@ class TestPolicyTargetDvs(AIMBaseTestCase):
self.agent_conf = AGENT_CONF
pt2 = self.create_policy_target(
policy_target_group_id=ptg['id'])['policy_target']
newp2 = self._bind_dhcp_port_to_host(pt2['port_id'], 'h1')
newp2 = self._bind_other_port_to_host(pt2['port_id'], 'h1')
vif_details = newp2['port']['binding:vif_details']
self.assertIsNone(vif_details.get('dvs_port_group_name'))
port_key = newp2['port']['binding:vif_details'].get('dvs_port_key')