Agent side push notifications for address groups

Adds agent side code to enable the OVS agent to receive address groups
from the push notifications cache.

Change-Id: I1f27eccb2a69c553631fdc12d34e9025925844c5
Partial-Bug: #1592028
This commit is contained in:
Miguel Lavalle 2020-10-20 17:37:22 -05:00
parent 4937928652
commit 25a694c098
4 changed files with 74 additions and 2 deletions

View File

@ -208,7 +208,8 @@ class CacheBackedPluginApi(PluginApi):
resources.SECURITYGROUP,
resources.SECURITYGROUPRULE,
resources.NETWORK,
resources.SUBNET]
resources.SUBNET,
resources.ADDRESSGROUP]
def __init__(self, *args, **kwargs):
super(CacheBackedPluginApi, self).__init__(*args, **kwargs)

View File

@ -212,6 +212,16 @@ class SecurityGroupAgentRpc(object):
else:
self.refresh_firewall(devices)
def address_group_updated(self, address_group_id):
LOG.info("Address group updated %r", address_group_id)
# TODO(mlavalle) A follow up patch in the address groups implementation
# series will add more code here
def address_group_deleted(self, address_group_id):
LOG.info("Address group deleted %r", address_group_id)
# TODO(mlavalle) A follow up patch in the address groups implementation
# series will add more code here
def remove_devices_filter(self, device_ids):
if not device_ids:
return

View File

@ -228,6 +228,12 @@ class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin):
'Port', events.AFTER_DELETE)
registry.subscribe(self._handle_sg_member_update,
'Port', events.AFTER_UPDATE)
self._register_legacy_ag_notification_callbacks(sg_agent)
def _register_legacy_ag_notification_callbacks(self, sg_agent):
for event in (events.AFTER_UPDATE, events.AFTER_DELETE):
registry.subscribe(self._handle_address_group_event,
resources.ADDRESSGROUP, event)
def security_group_info_for_devices(self, context, devices):
ports = self._get_devices_info(context, devices)
@ -240,6 +246,14 @@ class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin):
# error.
raise NotImplementedError()
def get_address_group_details(self, address_group_id):
ag_obj = self.rcache.get_resource_by_id(resources.ADDRESSGROUP,
address_group_id)
if not ag_obj:
LOG.debug("Address group %s does not exist in cache.",
address_group_id)
return ag_obj
def _add_child_sg_rules(self, rtype, event, trigger, context, updated,
**kwargs):
# whenever we receive a full security group, add all child rules
@ -292,6 +306,13 @@ class SecurityGroupServerAPIShim(sg_rpc_base.SecurityGroupInfoAPIMixin):
if sgs:
self._sg_agent.security_groups_member_updated(sgs)
def _handle_address_group_event(self, rtype, event, trigger, context,
resource_id, **kwargs):
if event == events.AFTER_UPDATE:
self._sg_agent.address_group_updated(resource_id)
else:
self._sg_agent.address_group_deleted(resource_id)
def _get_devices_info(self, context, devices):
# NOTE(kevinbenton): this format is required by the sg code, it is
# defined in get_port_from_device and mimics

View File

@ -22,6 +22,7 @@ from neutron.agent import resource_cache
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron import objects
from neutron.objects import address_group
from neutron.objects.port.extensions import port_security as psec
from neutron.objects import ports
from neutron.objects import securitygroup
@ -70,7 +71,7 @@ class SecurityGroupServerAPIShimTestCase(base.BaseTestCase):
super(SecurityGroupServerAPIShimTestCase, self).setUp()
objects.register_objects()
resource_types = [resources.PORT, resources.SECURITYGROUP,
resources.SECURITYGROUPRULE]
resources.SECURITYGROUPRULE, resources.ADDRESSGROUP]
self.rcache = resource_cache.RemoteResourceCache(resource_types)
# prevent any server lookup attempts
mock.patch.object(self.rcache, '_flood_cache_for_query').start()
@ -93,6 +94,26 @@ class SecurityGroupServerAPIShimTestCase(base.BaseTestCase):
self.rcache.record_resource_update(self.ctx, 'Port', p)
return p
def _make_address_group_ovo(self):
id = uuidutils.generate_uuid()
address_associations = [
address_group.AddressAssociation(
self.ctx,
address=netaddr.IPNetwork('10.0.0.1/32'),
address_group_id=id),
address_group.AddressAssociation(
self.ctx,
address=netaddr.IPNetwork('2001:db8::/32'),
address_group_id=id)
]
ag = address_group.AddressGroup(self.ctx, id=id,
name='an-address-group',
description='An address group',
addresses=address_associations)
self.rcache.record_resource_update(self.ctx, resources.ADDRESSGROUP,
ag)
return ag
@mock.patch.object(securitygroup.SecurityGroup, 'is_shared_with_tenant',
return_value=False)
def _make_security_group_ovo(self, *args, **kwargs):
@ -172,3 +193,22 @@ class SecurityGroupServerAPIShimTestCase(base.BaseTestCase):
self.rcache.record_resource_delete(self.ctx, 'Port', p1.id)
self.sg_agent.security_groups_member_updated.assert_called_with(
{s1.id})
def test_get_address_group_details(self):
ag = self._make_address_group_ovo()
retrieved_ag = self.shim.get_address_group_details(ag.id)
self.assertEqual(ag.id, retrieved_ag.id)
self.assertEqual(ag.name, retrieved_ag.name)
self.assertEqual(ag.description, retrieved_ag.description)
self.assertEqual(ag.addresses[0].address,
retrieved_ag.addresses[0].address)
self.assertEqual(ag.addresses[1].address,
retrieved_ag.addresses[1].address)
def test_address_group_update_events(self):
ag = self._make_address_group_ovo()
self.sg_agent.address_group_updated.assert_called_with(ag.id)
self.sg_agent.address_group_updated.reset_mock()
self.rcache.record_resource_delete(self.ctx, resources.ADDRESSGROUP,
ag.id)
self.sg_agent.address_group_deleted.assert_called_with(ag.id)