From 47fdee443680c04a85d741a2f7efd31df4bfcff5 Mon Sep 17 00:00:00 2001 From: Kevin Benton Date: Mon, 17 Apr 2017 18:46:49 -0700 Subject: [PATCH] Fixup event transaction semantics for ML2 bulk ops This splits out code in ML2 for ports, subnets, and networks into clear before, during, and after functions corresponding to the state of the transaction they are called within. This allows the bulk creation to correctly call chunks of code outside of DB transactions even when doing many creations at once. Change-Id: Id6fe0f5066358be954a6ca14dd49c36755897e31 Closes-Bug: #1683550 --- neutron/plugins/ml2/plugin.py | 68 +++++++++++-------- neutron/tests/unit/plugins/ml2/test_plugin.py | 39 +++++++++++ 2 files changed, 77 insertions(+), 30 deletions(-) diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 30fb4f5e2ec..71d563eef13 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -670,6 +670,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, objects = [] collection = "%ss" % resource items = request_items[collection] + obj_before_create = getattr(self, '_before_create_%s' % resource) + for item in items: + obj_before_create(context, item) with db_api.context_manager.writer.using(context): obj_creator = getattr(self, '_create_%s_db' % resource) for item in items: @@ -687,22 +690,23 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, "the %(resource)s:%(item)s"), {'resource': resource, 'item': item}) - postcommit_op = getattr(self.mechanism_manager, - 'create_%s_postcommit' % resource) + postcommit_op = getattr(self, '_after_create_%s' % resource) for obj in objects: try: - postcommit_op(obj['mech_context']) - except ml2_exc.MechanismDriverError: + postcommit_op(context, obj['result'], obj['mech_context']) + except Exception: with excutils.save_and_reraise_exception(): resource_ids = [res['result']['id'] for res in objects] - LOG.exception(_LE("mechanism_manager.create_%(res)s" - "_postcommit failed for %(res)s: " + LOG.exception(_LE("ML2 _after_create_%(res)s " + "failed for %(res)s: " "'%(failed_id)s'. Deleting " "%(res)ss %(resource_ids)s"), {'res': resource, 'failed_id': obj['result']['id'], 'resource_ids': ', '.join(resource_ids)}) - self._delete_objects(context, resource, objects) + # _after_handler will have deleted the object that threw + to_delete = [o for o in objects if o != obj] + self._delete_objects(context, resource, to_delete) return objects def _get_network_mtu(self, network): @@ -744,11 +748,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, mtus.append(mtu) return min(mtus) if mtus else 0 + def _before_create_network(self, context, network): + net_data = network[attributes.NETWORK] + registry.notify(resources.NETWORK, events.BEFORE_CREATE, self, + context=context, network=net_data) + def _create_network_db(self, context, network): net_data = network[attributes.NETWORK] tenant_id = net_data['tenant_id'] - registry.notify(resources.NETWORK, events.BEFORE_CREATE, self, - context=context, network=net_data) with db_api.context_manager.writer.using(context): net_db = self.create_network_db(context, network) result = self._make_network_dict(net_db, process_extensions=False, @@ -785,7 +792,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, @utils.transaction_guard @db_api.retry_if_session_inactive() def create_network(self, context, network): + self._before_create_network(context, network) result, mech_context = self._create_network_db(context, network) + return self._after_create_network(context, result, mech_context) + + def _after_create_network(self, context, result, mech_context): kwargs = {'context': context, 'network': result} registry.notify(resources.NETWORK, events.AFTER_CREATE, self, **kwargs) try: @@ -915,6 +926,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, " failed")) self.notifier.network_delete(context, network['id']) + def _before_create_subnet(self, context, subnet): + # TODO(kevinbenton): BEFORE notification should be added here + pass + def _create_subnet_db(self, context, subnet): with db_api.context_manager.writer.using(context): result, net_db, ipam_sub = self._create_subnet_precommit( @@ -928,6 +943,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, result, network) self.mechanism_manager.create_subnet_precommit(mech_context) + # TODO(kevinbenton): move this to '_after_subnet_create' # db base plugin post commit ops self._create_subnet_postcommit(context, result, net_db, ipam_sub) @@ -936,7 +952,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, @utils.transaction_guard @db_api.retry_if_session_inactive() def create_subnet(self, context, subnet): + self._before_create_subnet(context, subnet) result, mech_context = self._create_subnet_db(context, subnet) + return self._after_create_subnet(context, result, mech_context) + + def _after_create_subnet(self, context, result, mech_context): kwargs = {'context': context, 'subnet': result} registry.notify(resources.SUBNET, events.AFTER_CREATE, self, **kwargs) try: @@ -1050,7 +1070,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, context, port['id'], resources.PORT, provisioning_blocks.DHCP_ENTITY) - def _create_port_db(self, context, port): + def _before_create_port(self, context, port): attrs = port[attributes.PORT] if not attrs.get('status'): attrs['status'] = const.PORT_STATUS_DOWN @@ -1060,6 +1080,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, # NOTE(kevinbenton): triggered outside of transaction since it # emits 'AFTER' events if it creates. self._ensure_default_security_group(context, attrs['tenant_id']) + + def _create_port_db(self, context, port): + attrs = port[attributes.PORT] with db_api.context_manager.writer.using(context): dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, []) port_db = self.create_port_db(context, port) @@ -1094,7 +1117,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, @utils.transaction_guard @db_api.retry_if_session_inactive() def create_port(self, context, port): + self._before_create_port(context, port) result, mech_context = self._create_port_db(context, port) + return self._after_create_port(context, result, mech_context) + + def _after_create_port(self, context, result, mech_context): # notify any plugin that is interested in port create events kwargs = {'context': context, 'port': result} registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs) @@ -1120,26 +1147,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, @db_api.retry_if_session_inactive() def create_port_bulk(self, context, ports): objects = self._create_bulk_ml2(attributes.PORT, context, ports) - - for obj in objects: - attrs = obj['attributes'] - if attrs and attrs.get(portbindings.HOST_ID): - kwargs = {'context': context, 'port': obj['result']} - registry.notify( - resources.PORT, events.AFTER_CREATE, self, **kwargs) - - try: - for obj in objects: - obj['bound_context'] = self._bind_port_if_needed( - obj['mech_context']) - return [obj['bound_context'].current for obj in objects] - except ml2_exc.MechanismDriverError: - with excutils.save_and_reraise_exception(): - resource_ids = [res['result']['id'] for res in objects] - LOG.error(_LE("_bind_port_if_needed failed. " - "Deleting all ports from create bulk '%s'"), - resource_ids) - self._delete_objects(context, attributes.PORT, objects) + return [obj['result'] for obj in objects] # TODO(yalei) - will be simplified after security group and address pair be # converted to ext driver too. diff --git a/neutron/tests/unit/plugins/ml2/test_plugin.py b/neutron/tests/unit/plugins/ml2/test_plugin.py index 7923fbbf76f..2d4155c66a4 100644 --- a/neutron/tests/unit/plugins/ml2/test_plugin.py +++ b/neutron/tests/unit/plugins/ml2/test_plugin.py @@ -241,6 +241,24 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2, self.assertEqual(n['network']['id'], kwargs['network']['id']) + def test_bulk_network_before_and_after_events_outside_of_txn(self): + # capture session states during each before and after event + before = [] + after = [] + b_func = lambda *a, **k: before.append(k['context'].session.is_active) + a_func = lambda *a, **k: after.append(k['context'].session.is_active) + registry.subscribe(b_func, resources.NETWORK, events.BEFORE_CREATE) + registry.subscribe(a_func, resources.NETWORK, events.AFTER_CREATE) + data = [{'tenant_id': self._tenant_id}] * 4 + self._create_bulk_from_list( + self.fmt, 'network', data, context=context.get_admin_context()) + # ensure events captured + self.assertTrue(before) + self.assertTrue(after) + # ensure session was closed for all + self.assertFalse(any(before)) + self.assertFalse(any(after)) + def _create_and_verify_networks(self, networks): for net_idx, net in enumerate(networks): # create @@ -691,6 +709,27 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase): self._delete('ports', p['port']['id']) self.assertFalse(self.tx_open) + def test_bulk_ports_before_and_after_events_outside_of_txn(self): + with self.network() as n: + pass + # capture session states during each before and after event + before = [] + after = [] + b_func = lambda *a, **k: before.append(k['context'].session.is_active) + a_func = lambda *a, **k: after.append(k['context'].session.is_active) + registry.subscribe(b_func, resources.PORT, events.BEFORE_CREATE) + registry.subscribe(a_func, resources.PORT, events.AFTER_CREATE) + data = [{'tenant_id': self._tenant_id, + 'network_id': n['network']['id']}] * 4 + self._create_bulk_from_list( + self.fmt, 'port', data, context=context.get_admin_context()) + # ensure events captured + self.assertTrue(before) + self.assertTrue(after) + # ensure session was closed for all + self.assertFalse(any(before)) + self.assertFalse(any(after)) + def test_create_router_port_and_fail_create_postcommit(self): with mock.patch.object(managers.MechanismManager,