use payloads for PORT and FLOATING_IP

This patch switches over to callback payloads for PORT
and FLOATING_IP PRECOMMIT_DELETE events.

Change-Id: I2b3dd3ac70bcdd51125650f0a997859316ff644a
This commit is contained in:
Nurmatov Mamatisa 2021-07-12 17:59:06 +03:00
parent 6266c293dc
commit e7c61d3eba
4 changed files with 71 additions and 51 deletions

View File

@ -1517,11 +1517,12 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
@registry.receives(resources.PORT, [events.PRECOMMIT_DELETE]) @registry.receives(resources.PORT, [events.PRECOMMIT_DELETE])
def _precommit_delete_port_callback( def _precommit_delete_port_callback(
self, resource, event, trigger, **kwargs): self, resource, event, trigger, payload):
if (kwargs['port']['device_owner'] == port = payload.latest_state
if (port['device_owner'] ==
constants.DEVICE_OWNER_FLOATINGIP): constants.DEVICE_OWNER_FLOATINGIP):
registry.notify(resources.FLOATING_IP, events.PRECOMMIT_DELETE, registry.publish(resources.FLOATING_IP, events.PRECOMMIT_DELETE,
self, **kwargs) self, payload)
def _delete_floatingip(self, context, id): def _delete_floatingip(self, context, id):
floatingip = self._get_floatingip(context, id) floatingip = self._get_floatingip(context, id)

View File

@ -1953,34 +1953,39 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
network = self.get_network(context, port['network_id']) network = self.get_network(context, port['network_id'])
bound_mech_contexts = [] bound_mech_contexts = []
kwargs = {
'context': context,
'id': id,
'network': network,
'port': port,
'port_db': port_db,
'bindings': binding,
}
device_owner = port['device_owner'] device_owner = port['device_owner']
metadata = {'network': network,
'port_db': port_db,
'bindings': binding}
if device_owner == const.DEVICE_OWNER_DVR_INTERFACE: if device_owner == const.DEVICE_OWNER_DVR_INTERFACE:
bindings = db.get_distributed_port_bindings(context, bindings = db.get_distributed_port_bindings(context,
id) id)
for bind in bindings: for bind in bindings:
levels = db.get_binding_level_objs(context, id, bind.host) levels = db.get_binding_level_objs(context, id, bind.host)
kwargs['bind'] = bind metadata['bind'] = bind
kwargs['levels'] = levels metadata['levels'] = levels
registry.notify(resources.PORT, events.PRECOMMIT_DELETE, registry.publish(resources.PORT,
self, **kwargs) events.PRECOMMIT_DELETE,
self,
payload=events.DBEventPayload(
context,
resource_id=id,
metadata=metadata,
states=(port,)))
mech_context = driver_context.PortContext( mech_context = driver_context.PortContext(
self, context, port, network, bind, levels) self, context, port, network, bind, levels)
self.mechanism_manager.delete_port_precommit(mech_context) self.mechanism_manager.delete_port_precommit(mech_context)
bound_mech_contexts.append(mech_context) bound_mech_contexts.append(mech_context)
else: else:
levels = db.get_binding_level_objs(context, id, binding.host) levels = db.get_binding_level_objs(context, id, binding.host)
kwargs['bind'] = None metadata['bind'] = None
kwargs['levels'] = levels metadata['levels'] = levels
registry.notify(resources.PORT, events.PRECOMMIT_DELETE, registry.publish(resources.PORT, events.PRECOMMIT_DELETE, self,
self, **kwargs) payload=events.DBEventPayload(
context,
resource_id=id,
metadata=metadata,
states=(port,)))
mech_context = driver_context.PortContext( mech_context = driver_context.PortContext(
self, context, port, network, binding, levels) self, context, port, network, binding, levels)
self.mechanism_manager.delete_port_precommit(mech_context) self.mechanism_manager.delete_port_precommit(mech_context)

View File

@ -147,8 +147,16 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
if l3_dvr_db.is_distributed_router(router): if l3_dvr_db.is_distributed_router(router):
raise pf_exc.PortHasPortForwarding(port_id=port_id) raise pf_exc.PortHasPortForwarding(port_id=port_id)
@registry.receives(resources.FLOATING_IP, [events.PRECOMMIT_UPDATE, @registry.receives(resources.FLOATING_IP, [events.PRECOMMIT_DELETE])
events.PRECOMMIT_DELETE]) def _check_floatingip_request_precommit_delete(
self, resource, event, trigger, payload):
# TODO(isabek): refactor back into 1 method when FIP code is moved
# to event payloads
return self._check_floatingip_request(resource, event, trigger,
payload.context,
port=payload.latest_state)
@registry.receives(resources.FLOATING_IP, [events.PRECOMMIT_UPDATE])
def _check_floatingip_request(self, resource, event, trigger, context, def _check_floatingip_request(self, resource, event, trigger, context,
**kwargs): **kwargs):
# We only support the "free" floatingip to be associated with # We only support the "free" floatingip to be associated with
@ -178,23 +186,20 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
if exist_pf_resources: if exist_pf_resources:
raise pf_exc.FipInUseByPortForwarding(id=floatingip_id) raise pf_exc.FipInUseByPortForwarding(id=floatingip_id)
@registry.receives(resources.PORT, [events.AFTER_UPDATE]) @registry.receives(resources.PORT, [events.AFTER_UPDATE,
def _process_updated_port_request(self, resource, event, trigger, events.PRECOMMIT_DELETE])
def _process_port_request_handler(self, resource, event, trigger,
payload): payload):
# TODO(isabek): refactor back into 1 method when all code is moved
# to event payloads
return self._process_port_request(resource, event, trigger,
payload.context,
port=payload.latest_state)
@registry.receives(resources.PORT, [events.PRECOMMIT_DELETE]) return self._process_port_request(event, payload.context,
payload.latest_state)
@db_api.retry_if_session_inactive() @db_api.retry_if_session_inactive()
def _process_port_request(self, resource, event, trigger, context, def _process_port_request(self, event, context, port):
**kwargs):
# Deleting floatingip will receive port resource with precommit_delete # Deleting floatingip will receive port resource with precommit_delete
# event, so just return, then check the request in # event, so just return, then check the request in
# _check_floatingip_request callback. # _check_floatingip_request callback.
if kwargs['port']['device_owner'].startswith( if port['device_owner'].startswith(
lib_consts.DEVICE_OWNER_FLOATINGIP): lib_consts.DEVICE_OWNER_FLOATINGIP):
return return
@ -204,8 +209,8 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
# port forwarding resources need to be deleted for port's AFTER_UPDATE # port forwarding resources need to be deleted for port's AFTER_UPDATE
# event. Or get all affected ip addresses for port's PRECOMMIT_DELETE # event. Or get all affected ip addresses for port's PRECOMMIT_DELETE
# event. # event.
port_id = kwargs['port']['id'] port_id = port['id']
update_fixed_ips = kwargs['port']['fixed_ips'] update_fixed_ips = port['fixed_ips']
update_ip_set = set() update_ip_set = set()
for update_fixed_ip in update_fixed_ips: for update_fixed_ip in update_fixed_ips:
if (netaddr.IPNetwork(update_fixed_ip.get('ip_address')).version == if (netaddr.IPNetwork(update_fixed_ip.get('ip_address')).version ==

View File

@ -2086,27 +2086,18 @@ class TestMl2DvrPortsV2(TestMl2PortsV2):
if floating_ip: if floating_ip:
router_ids.add(ns_to_delete['router_id']) router_ids.add(ns_to_delete['router_id'])
with self.port() as port,\ with self.port() as port, \
mock.patch.object(registry, 'notify') as notify, \
mock.patch.object(registry, 'publish') as publish, \ mock.patch.object(registry, 'publish') as publish, \
mock.patch.object(self.l3plugin, mock.patch.object(self.l3plugin,
'disassociate_floatingips', 'disassociate_floatingips',
return_value=router_ids): return_value=router_ids):
port_id = port['port']['id'] port_id = port['port']['id']
self.plugin.delete_port(self.context, port_id) self.plugin.delete_port(self.context, port_id)
self.assertEqual(1, notify.call_count) self.assertEqual(3, publish.call_count)
self.assertEqual(2, publish.call_count)
# needed for a full match in the assertion below # needed for a full match in the assertion below
port['port']['extra_dhcp_opts'] = [] port['port']['extra_dhcp_opts'] = []
port['port']['standard_attr_id'] = mock.ANY port['port']['standard_attr_id'] = mock.ANY
expected = [mock.call(resources.PORT, events.PRECOMMIT_DELETE,
mock.ANY, network=mock.ANY, bind=mock.ANY,
port=port['port'], port_db=mock.ANY,
context=self.context, levels=mock.ANY,
id=mock.ANY, bindings=mock.ANY)]
notify.assert_has_calls(expected)
expected = [mock.call(resources.PORT, events.BEFORE_DELETE, expected = [mock.call(resources.PORT, events.BEFORE_DELETE,
mock.ANY, payload=mock.ANY)] mock.ANY, payload=mock.ANY)]
publish.assert_has_calls(expected) publish.assert_has_calls(expected)
@ -2115,12 +2106,23 @@ class TestMl2DvrPortsV2(TestMl2PortsV2):
self.assertEqual(port_id, payload.resource_id) self.assertEqual(port_id, payload.resource_id)
self.assertTrue(payload.metadata['port_check']) self.assertTrue(payload.metadata['port_check'])
expected = [mock.call(resources.PORT, events.AFTER_DELETE, expected = [mock.call(resources.PORT, events.PRECOMMIT_DELETE,
mock.ANY, payload=mock.ANY)] mock.ANY, payload=mock.ANY)]
publish.assert_has_calls(expected) publish.assert_has_calls(expected)
payload = publish.call_args_list[1][1]['payload'] payload = publish.call_args_list[1][1]['payload']
self.assertEqual(port_id, payload.resource_id) self.assertEqual(port_id, payload.resource_id)
self.assertEqual(port['port'], payload.latest_state)
self.assertTrue(payload.metadata['network'])
self.assertTrue(payload.metadata['port_db'])
self.assertTrue(payload.metadata['bindings'])
expected = [mock.call(resources.PORT, events.AFTER_DELETE,
mock.ANY, payload=mock.ANY)]
publish.assert_has_calls(expected)
payload = publish.call_args_list[2][1]['payload']
self.assertEqual(port_id, payload.resource_id)
def test_delete_port_with_floatingip_notifies_l3_plugin(self): def test_delete_port_with_floatingip_notifies_l3_plugin(self):
self.test_delete_port_notifies_l3_plugin(floating_ip=True) self.test_delete_port_notifies_l3_plugin(floating_ip=True)
@ -2130,14 +2132,21 @@ class TestMl2DvrPortsV2(TestMl2PortsV2):
with self.port(device_owner='network:floatingip') as port: with self.port(device_owner='network:floatingip') as port:
try: try:
registry.subscribe(fake_method, resources.FLOATING_IP, registry.subscribe(fake_method, resources.FLOATING_IP,
events.PRECOMMIT_DELETE) events.PRECOMMIT_DELETE)
port_id = port['port']['id'] port_id = port['port']['id']
self.plugin.delete_port(self.context, port_id) self.plugin.delete_port(self.context, port_id)
fake_method.assert_called_once_with( fake_method.assert_called_once_with(
resources.FLOATING_IP, events.PRECOMMIT_DELETE, mock.ANY, resources.FLOATING_IP, events.PRECOMMIT_DELETE, mock.ANY,
bind=mock.ANY, bindings=mock.ANY, context=mock.ANY, payload=mock.ANY)
id=mock.ANY, levels=mock.ANY, network=mock.ANY, payload = fake_method.call_args_list[0][1]['payload']
port=mock.ANY, port_db=mock.ANY) self.assertEqual(port_id, payload.resource_id)
port_dict = payload.latest_state
port_dict.pop('standard_attr_id')
self.assertEqual(port['port'], port_dict)
self.assertTrue(payload.metadata['network'])
self.assertTrue(payload.metadata['port_db'])
self.assertTrue(payload.metadata['bindings'])
finally: finally:
registry.unsubscribe(fake_method, resources.FLOATING_IP, registry.unsubscribe(fake_method, resources.FLOATING_IP,
events.PRECOMMIT_DELETE) events.PRECOMMIT_DELETE)