diff --git a/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py b/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py index 41d3f3a5944..6875e277854 100644 --- a/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py +++ b/neutron/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py @@ -112,12 +112,15 @@ class DhcpAgentNotifyAPI(object): self.uses_native_notifications[resource] = {'create': False, 'update': False, 'delete': False} - registry.subscribe(self._native_event_send_dhcp_notification, - resource, events.AFTER_CREATE) - registry.subscribe(self._native_event_send_dhcp_notification, - resource, events.AFTER_UPDATE) - registry.subscribe(self._native_event_send_dhcp_notification, - resource, events.AFTER_DELETE) + callback = self._native_event_send_dhcp_notification + + # TODO(boden): remove shim below once all events use payloads + if resource == resources.NETWORK: + callback = self._native_event_send_dhcp_notification_payload + + registry.subscribe(callback, resource, events.AFTER_CREATE) + registry.subscribe(callback, resource, events.AFTER_UPDATE) + registry.subscribe(callback, resource, events.AFTER_DELETE) @property def plugin(self): @@ -283,6 +286,26 @@ class DhcpAgentNotifyAPI(object): 'fixed_ips': kwargs['port']['fixed_ips']}, kwargs['port']['network_id']) + def _native_event_send_dhcp_notification_payload( + self, resource, event, trigger, payload=None): + + # TODO(boden): collapse the native event methods back into one + + action = event.replace('after_', '') + # we unsubscribe the _send_dhcp_notification method now that we know + # the loaded core plugin emits native resource events + if resource not in self._unsubscribed_resources: + self.uses_native_notifications[resource][action] = True + if all(self.uses_native_notifications[resource].values()): + # only unsubscribe the API level listener if we are + # receiving all event types for this resource + self._unsubscribed_resources.append(resource) + registry.unsubscribe_by_resource(self._send_dhcp_notification, + resource) + method_name = '.'.join((resource, action, 'end')) + data = {resource: payload.latest_state} + self.notify(payload.context, data, method_name) + def _native_event_send_dhcp_notification(self, resource, event, trigger, context, **kwargs): action = event.replace('after_', '') diff --git a/neutron/db/db_base_plugin_v2.py b/neutron/db/db_base_plugin_v2.py index 7d86853599a..41f9b83e203 100644 --- a/neutron/db/db_base_plugin_v2.py +++ b/neutron/db/db_base_plugin_v2.py @@ -506,17 +506,19 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, with db_api.CONTEXT_WRITER.using(context): network_db = self._get_network(context, id) network = self._make_network_dict(network_db, context=context) - registry.notify(resources.NETWORK, events.PRECOMMIT_DELETE, - self, context=context, network_id=id, - network=network) + registry.publish(resources.NETWORK, events.PRECOMMIT_DELETE, + self, payload=events.DBEventPayload( + context, resource_id=id, + states=(network,))) # We expire network_db here because precommit deletion # might have left the relationship stale, for example, # if we deleted a segment. context.session.expire(network_db) network_db = self._get_network(context, id) context.session.delete(network_db) - registry.notify(resources.NETWORK, events.AFTER_DELETE, - self, context=context, network=network) + registry.publish(resources.NETWORK, events.AFTER_DELETE, + self, payload=events.DBEventPayload( + context, resource_id=id, states=(network,))) @db_api.retry_if_session_inactive() def get_network(self, context, id, fields=None): diff --git a/neutron/db/l3_dvr_db.py b/neutron/db/l3_dvr_db.py index 551b1816740..d036e93a42c 100644 --- a/neutron/db/l3_dvr_db.py +++ b/neutron/db/l3_dvr_db.py @@ -417,12 +417,13 @@ class DVRResourceOperationHandler(object): @registry.receives(resources.NETWORK, [events.AFTER_DELETE]) def delete_fip_namespaces_for_ext_net(self, rtype, event, trigger, - context, network, **kwargs): + payload=None): + network = payload.latest_state if network.get(extnet_apidef.EXTERNAL): # Send the information to all the L3 Agent hosts # to clean up the fip namespace as it is no longer required. self.l3plugin.l3_rpc_notifier.delete_fipnamespace_for_ext_net( - context, network['id']) + payload.context, payload.resource_id) def _get_ports_for_allowed_address_pair_ip(self, context, network_id, fixed_ip): diff --git a/neutron/db/securitygroups_db.py b/neutron/db/securitygroups_db.py index 5c6b29914b7..dd51624d136 100644 --- a/neutron/db/securitygroups_db.py +++ b/neutron/db/securitygroups_db.py @@ -890,9 +890,8 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase, @registry.receives(resources.PORT, [events.BEFORE_CREATE, events.BEFORE_UPDATE]) - @registry.receives(resources.NETWORK, [events.BEFORE_CREATE]) - def _ensure_default_security_group_handler(self, resource, event, trigger, - context, **kwargs): + def _ensure_default_security_group_handler_port( + self, resource, event, trigger, context, **kwargs): if event == events.BEFORE_UPDATE: tenant_id = kwargs['original_' + resource]['tenant_id'] else: @@ -900,6 +899,15 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase, if tenant_id: self._ensure_default_security_group(context, tenant_id) + @registry.receives(resources.NETWORK, [events.BEFORE_CREATE]) + def _ensure_default_security_group_handler_net( + self, resource, event, trigger, payload=None): + + # TODO(boden): refactor into single callback method + project_id = payload.latest_state['tenant_id'] + if project_id: + self._ensure_default_security_group(payload.context, project_id) + def _ensure_default_security_group(self, context, tenant_id): """Create a default security group if one doesn't exist. diff --git a/neutron/plugins/ml2/ovo_rpc.py b/neutron/plugins/ml2/ovo_rpc.py index 45cb126e9bc..ff639f79a71 100644 --- a/neutron/plugins/ml2/ovo_rpc.py +++ b/neutron/plugins/ml2/ovo_rpc.py @@ -37,6 +37,8 @@ LOG = logging.getLogger(__name__) class _ObjectChangeHandler(object): + _PAYLOAD_RESOURCES = (resources.NETWORK,) + def __init__(self, resource, object_class, resource_push_api): self._resource = resource self._obj_class = object_class @@ -51,7 +53,12 @@ class _ObjectChangeHandler(object): self._semantic_warned = False for event in (events.AFTER_CREATE, events.AFTER_UPDATE, events.AFTER_DELETE): - registry.subscribe(self.handle_event, resource, event) + handler = self.handle_event + + # TODO(boden): remove shim below once all events use payloads + if resource in self._PAYLOAD_RESOURCES: + handler = self.handle_payload_event + registry.subscribe(handler, resource, event) def wait(self): """Waits for all outstanding events to be dispatched.""" @@ -79,6 +86,18 @@ class _ObjectChangeHandler(object): self._semantic_warned = True return True + def handle_payload_event(self, resource, event, + trigger, payload=None): + if self._is_session_semantic_violated( + payload.context, resource, event): + return + resource_id = payload.resource_id + # we preserve the context so we can trace a receive on the agent back + # to the server-side event that triggered it + self._resources_to_push[resource_id] = payload.context.to_dict() + # spawn worker so we don't block main AFTER_UPDATE thread + self.fts.append(self._worker_pool.submit(self.dispatch_events)) + def handle_event(self, resource, event, trigger, context, *args, **kwargs): """Callback handler for resource change that pushes change to RPC. diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 20735174ac5..76e29072269 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -1042,8 +1042,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, def _before_create_network(self, context, network): net_data = network[net_def.RESOURCE_NAME] - registry.notify(resources.NETWORK, events.BEFORE_CREATE, self, - context=context, network=net_data) + registry.publish(resources.NETWORK, events.BEFORE_CREATE, self, + payload=events.DBEventPayload( + context, desired_state=net_data)) def _create_network_db(self, context, network): net_data = network[net_def.RESOURCE_NAME] @@ -1080,8 +1081,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, net_data[az_def.AZ_HINTS]) net_db[az_def.AZ_HINTS] = az_hints result[az_def.AZ_HINTS] = az_hints - registry.notify(resources.NETWORK, events.PRECOMMIT_CREATE, self, - context=context, request=net_data, network=result) + registry.publish(resources.NETWORK, events.PRECOMMIT_CREATE, self, + payload=events.DBEventPayload( + context, states=(result,), + resource_id=result['id'], + request_body=net_data)) resource_extend.apply_funcs('networks', result, net_db) mech_context = driver_context.NetworkContext(self, context, @@ -1097,8 +1101,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, return self._after_create_network(context, result, mech_context) def _after_create_network(self, context, result, mech_context): - kwargs = {'context': context, 'network': result} - registry.notify(resources.NETWORK, events.AFTER_CREATE, self, **kwargs) + registry.publish(resources.NETWORK, events.AFTER_CREATE, self, + payload=events.DBEventPayload( + context, states=(result,), + resource_id=result['id'])) try: self.mechanism_manager.create_network_postcommit(mech_context) except ml2_exc.MechanismDriverError: @@ -1166,9 +1172,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, # by re-calling update_network with the previous attributes. For # now the error is propagated to the caller, which is expected to # either undo/retry the operation or delete the resource. - kwargs = {'context': context, 'network': updated_network, - 'original_network': original_network} - registry.notify(resources.NETWORK, events.AFTER_UPDATE, self, **kwargs) + registry.publish(resources.NETWORK, events.AFTER_UPDATE, self, + payload=events.DBEventPayload( + context, + states=(original_network, updated_network,), + resource_id=updated_network['id'])) self.mechanism_manager.update_network_postcommit(mech_context) if need_network_update_notify: self.notifier.network_update(context, updated_network) @@ -1227,31 +1235,33 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, @registry.receives(resources.NETWORK, [events.PRECOMMIT_DELETE], priority=0) def _network_delete_precommit_handler(self, rtype, event, trigger, - context, network_id, **kwargs): - network = (kwargs.get('network') or - self.get_network(context, network_id)) + payload=None): + context = payload.context + network_id = payload.resource_id + network = payload.latest_state if payload.states else \ + self.get_network(context, network_id) mech_context = driver_context.NetworkContext(self, context, network) # TODO(kevinbenton): move this mech context into something like # a 'delete context' so it's not polluting the real context object - setattr(context, '_mech_context', mech_context) + setattr(payload.context, '_mech_context', mech_context) self.mechanism_manager.delete_network_precommit( mech_context) @registry.receives(resources.NETWORK, [events.AFTER_DELETE]) def _network_delete_after_delete_handler(self, rtype, event, trigger, - context, network, **kwargs): + payload=None): try: self.mechanism_manager.delete_network_postcommit( - context._mech_context) + payload.context._mech_context) except ml2_exc.MechanismDriverError: # TODO(apech) - One or more mechanism driver failed to # delete the network. Ideally we'd notify the caller of # the fact that an error occurred. LOG.error("mechanism_manager.delete_network_postcommit" " failed") - self.notifier.network_delete(context, network['id']) + self.notifier.network_delete(payload.context, payload.resource_id) def _before_create_subnet(self, context, subnet): subnet_data = subnet[subnet_def.RESOURCE_NAME] diff --git a/neutron/services/auto_allocate/db.py b/neutron/services/auto_allocate/db.py index 600ca0f5fbe..c51777f1473 100644 --- a/neutron/services/auto_allocate/db.py +++ b/neutron/services/auto_allocate/db.py @@ -41,20 +41,13 @@ CHECK_REQUIREMENTS = 'dry-run' def _ensure_external_network_default_value_callback( - resource, event, trigger, **kwargs): + resource, event, trigger, payload=None): """Ensure the is_default db field matches the create/update request.""" - # TODO(boden): remove shim once all callbacks use payloads - if 'payload' in kwargs: - _request = kwargs['payload'].request_body - _context = kwargs['payload'].context - _network = kwargs['payload'].desired_state - _orig = kwargs['payload'].states[0] - else: - _request = kwargs['request'] - _context = kwargs['context'] - _network = kwargs['network'] - _orig = kwargs.get('original_network') + _request = payload.request_body + _context = payload.context + _network = payload.desired_state or payload.latest_state + _orig = payload.states[0] @db_api.retry_if_session_inactive() def _do_ensure_external_network_default_value_callback( diff --git a/neutron/services/qos/qos_plugin.py b/neutron/services/qos/qos_plugin.py index 7bf892a560a..deeafe43b54 100644 --- a/neutron/services/qos/qos_plugin.py +++ b/neutron/services/qos/qos_plugin.py @@ -350,9 +350,9 @@ class QoSPlugin(qos.QoSPluginBase): self.validate_policy_for_port(context, policy, updated_port) def _validate_create_network_callback(self, resource, event, trigger, - **kwargs): - context = kwargs['context'] - network_id = kwargs['network']['id'] + payload=None): + context = payload.context + network_id = payload.resource_id network = network_object.Network.get_object(context, id=network_id) policy_id = network.qos_policy_id diff --git a/neutron/services/segments/db.py b/neutron/services/segments/db.py index 7406c5b16f0..b1c02e2fd1e 100644 --- a/neutron/services/segments/db.py +++ b/neutron/services/segments/db.py @@ -29,7 +29,6 @@ from oslo_db import exception as db_exc from oslo_log import helpers as log_helpers from oslo_utils import uuidutils -from neutron.common import utils as common_utils from neutron.db import segments_db as db from neutron.extensions import segment as extension from neutron import manager @@ -344,8 +343,9 @@ def _add_segment_host_mapping_for_segment(resource, event, trigger, def _delete_segments_for_network(resource, event, trigger, - context, network_id, **kwargs): - admin_ctx = common_utils.get_elevated_context(context) + payload=None, **kwargs): + network_id = payload.resource_id + admin_ctx = payload.context.elevated() global segments_plugin if not segments_plugin: segments_plugin = manager.NeutronManager.load_class_for_provider( diff --git a/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py b/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py index 0b7650a7c7e..6a81753e2e8 100644 --- a/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py +++ b/neutron/tests/unit/api/rpc/agentnotifiers/test_dhcp_rpc_agent_api.py @@ -263,7 +263,7 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase): def test__native_notification_unsubscribes(self): self.assertFalse(self.notifier._unsubscribed_resources) - for res in (resources.PORT, resources.NETWORK, resources.SUBNET): + for res in (resources.PORT, resources.SUBNET): self.notifier._unsubscribed_resources = [] kwargs = {res: {}} registry.notify(res, events.AFTER_CREATE, self, @@ -281,6 +281,23 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase): context=mock.Mock(), **kwargs) self.assertEqual([res], self.notifier._unsubscribed_resources) + for res in [resources.NETWORK]: + self.notifier._unsubscribed_resources = [] + registry.publish(res, events.AFTER_CREATE, self, + payload=events.DBEventPayload(mock.Mock())) + # don't unsubscribe until all three types are observed + self.assertEqual([], self.notifier._unsubscribed_resources) + registry.publish(res, events.AFTER_UPDATE, self, + payload=events.DBEventPayload(mock.Mock())) + self.assertEqual([], self.notifier._unsubscribed_resources) + registry.publish(res, events.AFTER_DELETE, self, + payload=events.DBEventPayload(mock.Mock())) + self.assertEqual([res], self.notifier._unsubscribed_resources) + # after first time, no further unsubscribing should happen + registry.publish(res, events.AFTER_CREATE, self, + payload=events.DBEventPayload(mock.Mock())) + self.assertEqual([res], self.notifier._unsubscribed_resources) + def test__only_status_changed(self): p1 = {'id': 1, 'status': 'DOWN', 'updated_at': '10:00:00', 'revision_number': 1} diff --git a/neutron/tests/unit/extensions/test_segment.py b/neutron/tests/unit/extensions/test_segment.py index 5ad5c731efb..4ed6d229ec0 100644 --- a/neutron/tests/unit/extensions/test_segment.py +++ b/neutron/tests/unit/extensions/test_segment.py @@ -420,9 +420,7 @@ class TestSegment(SegmentTestCase): dsn.assert_called_with(resources.NETWORK, events.PRECOMMIT_DELETE, mock.ANY, - context=mock.ANY, - network_id=mock.ANY, - network=mock.ANY) + payload=mock.ANY) class TestSegmentML2(SegmentTestCase): diff --git a/neutron/tests/unit/plugins/ml2/test_plugin.py b/neutron/tests/unit/plugins/ml2/test_plugin.py index acb5642c750..7311655e5f0 100644 --- a/neutron/tests/unit/plugins/ml2/test_plugin.py +++ b/neutron/tests/unit/plugins/ml2/test_plugin.py @@ -239,10 +239,10 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2, with self.network() as n: after_create.assert_called_once_with( resources.NETWORK, events.AFTER_CREATE, mock.ANY, - context=mock.ANY, network=mock.ANY) - kwargs = after_create.mock_calls[0][2] + payload=mock.ANY) + payload = after_create.mock_calls[0][2]['payload'] self.assertEqual(n['network']['id'], - kwargs['network']['id']) + payload.resource_id) def test_network_precommit_create_callback(self): precommit_create = mock.Mock() @@ -251,7 +251,7 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2, with self.network(): precommit_create.assert_called_once_with( resources.NETWORK, events.PRECOMMIT_CREATE, mock.ANY, - context=mock.ANY, network=mock.ANY, request=mock.ANY) + payload=mock.ANY) def test_network_precommit_create_callback_aborts(self): precommit_create = mock.Mock() @@ -289,11 +289,11 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2, self.deserialize(self.fmt, req.get_response(self.api)) after_update.assert_called_once_with( resources.NETWORK, events.AFTER_UPDATE, mock.ANY, - context=mock.ANY, network=mock.ANY, original_network=mock.ANY) - kwargs = after_update.mock_calls[0][2] + payload=mock.ANY) + payload = after_update.mock_calls[0][2]['payload'] self.assertEqual(n['network']['name'], - kwargs['original_network']['name']) - self.assertEqual('updated', kwargs['network']['name']) + payload.states[0]['name']) + self.assertEqual('updated', payload.latest_state['name']) def test_network_after_delete_callback(self): after_delete = mock.Mock() @@ -304,10 +304,10 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2, req.get_response(self.api) after_delete.assert_called_once_with( resources.NETWORK, events.AFTER_DELETE, mock.ANY, - context=mock.ANY, network=mock.ANY) - kwargs = after_delete.mock_calls[0][2] + payload=mock.ANY) + payload = after_delete.mock_calls[0][2]['payload'] self.assertEqual(n['network']['id'], - kwargs['network']['id']) + payload.resource_id) def test_create_port_obj_bulk(self): cfg.CONF.set_override('base_mac', "12:34:56:00") @@ -350,8 +350,13 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2, # capture session states during each before and after event before = [] after = [] - b_func = lambda *a, **k: before.append(k['context'].session.is_active) - a_func = lambda *a, **k: after.append(k['context'].session.is_active) + + def b_func(r, c, v, payload=None): + before.append(payload.context.session.is_active) + + def a_func(r, c, v, payload=None): + after.append(payload.context.session.is_active) + registry.subscribe(b_func, resources.NETWORK, events.BEFORE_CREATE) registry.subscribe(a_func, resources.NETWORK, events.AFTER_CREATE) data = [{'tenant_id': self._tenant_id}] * 4 diff --git a/neutron/tests/unit/services/auto_allocate/test_db.py b/neutron/tests/unit/services/auto_allocate/test_db.py index 6859f4c6da3..af27e139c7b 100644 --- a/neutron/tests/unit/services/auto_allocate/test_db.py +++ b/neutron/tests/unit/services/auto_allocate/test_db.py @@ -14,6 +14,7 @@ from unittest import mock from neutron_lib.api.definitions import constants as api_const +from neutron_lib.callbacks import events from neutron_lib import constants from neutron_lib import context from neutron_lib import exceptions as n_exc @@ -65,7 +66,10 @@ class AutoAllocateTestCase(testlib_api.SqlTestCase): return_value=network_mock ) as get_external_net: db._ensure_external_network_default_value_callback( - "NETWORK", "precommit_update", "test_plugin", **kwargs) + "NETWORK", "precommit_update", "test_plugin", + payload=events.DBEventPayload( + self.ctx, request_body=kwargs['request'], + states=(kwargs['original_network'], kwargs['network']))) get_external_nets.assert_called_once_with( self.ctx, _pager=mock.ANY, is_default=True) get_external_net.assert_called_once_with( @@ -94,7 +98,10 @@ class AutoAllocateTestCase(testlib_api.SqlTestCase): return_value=network_mock ) as get_external_net: db._ensure_external_network_default_value_callback( - "NETWORK", "precommit_update", "test_plugin", **kwargs) + "NETWORK", "precommit_update", "test_plugin", + payload=events.DBEventPayload( + self.ctx, request_body=kwargs['request'], + states=(kwargs['network'],))) get_external_nets.assert_not_called() get_external_net.assert_not_called() network_mock.update.assert_not_called() @@ -125,7 +132,10 @@ class AutoAllocateTestCase(testlib_api.SqlTestCase): return_value=network_mock ) as get_external_net: db._ensure_external_network_default_value_callback( - "NETWORK", "precommit_update", "test_plugin", **kwargs) + "NETWORK", "precommit_update", "test_plugin", + payload=events.DBEventPayload( + self.ctx, request_body=kwargs['request'], + states=(kwargs['original_network'], kwargs['network']))) get_external_nets.assert_called_once_with( self.ctx, _pager=mock.ANY, is_default=True) get_external_net.assert_not_called() @@ -158,7 +168,10 @@ class AutoAllocateTestCase(testlib_api.SqlTestCase): ) as get_external_net: self.assertRaises(exceptions.DefaultExternalNetworkExists, db._ensure_external_network_default_value_callback, - "NETWORK", "precommit_update", "test_plugin", **kwargs) + "NETWORK", "precommit_update", "test_plugin", + payload=events.DBEventPayload( + self.ctx, request_body=kwargs['request'], + states=(kwargs['original_network'], kwargs['network']))) get_external_nets.assert_called_once_with( self.ctx, _pager=mock.ANY, is_default=True) get_external_net.assert_not_called() diff --git a/neutron/tests/unit/services/qos/test_qos_plugin.py b/neutron/tests/unit/services/qos/test_qos_plugin.py index b254f952686..ccbafc6c9ea 100644 --- a/neutron/tests/unit/services/qos/test_qos_plugin.py +++ b/neutron/tests/unit/services/qos/test_qos_plugin.py @@ -1274,8 +1274,9 @@ class TestQosPluginDB(base.BaseQosTestCase): 'validate_policy_for_network') \ as mock_validate_policy: self.qos_plugin._validate_create_network_callback( - 'NETWORK', 'precommit_create', 'test_plugin', **kwargs) - + "NETWORK", "precommit_create", "test_plugin", + payload=events.DBEventPayload( + self.context, resource_id=kwargs['network']['id'],)) qos_policy = None if network_qos: qos_policy = net_qos_obj