Merge "Fixup event transaction semantics for ML2 bulk ops"

This commit is contained in:
Jenkins 2017-04-20 10:56:43 +00:00 committed by Gerrit Code Review
commit 0a90dc3ee0
2 changed files with 77 additions and 30 deletions

View File

@ -670,6 +670,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
objects = []
collection = "%ss" % resource
items = request_items[collection]
obj_before_create = getattr(self, '_before_create_%s' % resource)
for item in items:
obj_before_create(context, item)
with db_api.context_manager.writer.using(context):
obj_creator = getattr(self, '_create_%s_db' % resource)
for item in items:
@ -687,22 +690,23 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
"the %(resource)s:%(item)s"),
{'resource': resource, 'item': item})
postcommit_op = getattr(self.mechanism_manager,
'create_%s_postcommit' % resource)
postcommit_op = getattr(self, '_after_create_%s' % resource)
for obj in objects:
try:
postcommit_op(obj['mech_context'])
except ml2_exc.MechanismDriverError:
postcommit_op(context, obj['result'], obj['mech_context'])
except Exception:
with excutils.save_and_reraise_exception():
resource_ids = [res['result']['id'] for res in objects]
LOG.exception(_LE("mechanism_manager.create_%(res)s"
"_postcommit failed for %(res)s: "
LOG.exception(_LE("ML2 _after_create_%(res)s "
"failed for %(res)s: "
"'%(failed_id)s'. Deleting "
"%(res)ss %(resource_ids)s"),
{'res': resource,
'failed_id': obj['result']['id'],
'resource_ids': ', '.join(resource_ids)})
self._delete_objects(context, resource, objects)
# _after_handler will have deleted the object that threw
to_delete = [o for o in objects if o != obj]
self._delete_objects(context, resource, to_delete)
return objects
def _get_network_mtu(self, network):
@ -744,11 +748,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
mtus.append(mtu)
return min(mtus) if mtus else 0
def _before_create_network(self, context, network):
net_data = network[attributes.NETWORK]
registry.notify(resources.NETWORK, events.BEFORE_CREATE, self,
context=context, network=net_data)
def _create_network_db(self, context, network):
net_data = network[attributes.NETWORK]
tenant_id = net_data['tenant_id']
registry.notify(resources.NETWORK, events.BEFORE_CREATE, self,
context=context, network=net_data)
with db_api.context_manager.writer.using(context):
net_db = self.create_network_db(context, network)
result = self._make_network_dict(net_db, process_extensions=False,
@ -787,7 +794,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def create_network(self, context, network):
self._before_create_network(context, network)
result, mech_context = self._create_network_db(context, network)
return self._after_create_network(context, result, mech_context)
def _after_create_network(self, context, result, mech_context):
kwargs = {'context': context, 'network': result}
registry.notify(resources.NETWORK, events.AFTER_CREATE, self, **kwargs)
try:
@ -935,6 +946,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
" failed"))
self.notifier.network_delete(context, network['id'])
def _before_create_subnet(self, context, subnet):
# TODO(kevinbenton): BEFORE notification should be added here
pass
def _create_subnet_db(self, context, subnet):
with db_api.context_manager.writer.using(context):
result, net_db, ipam_sub = self._create_subnet_precommit(
@ -948,6 +963,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
result, network)
self.mechanism_manager.create_subnet_precommit(mech_context)
# TODO(kevinbenton): move this to '_after_subnet_create'
# db base plugin post commit ops
self._create_subnet_postcommit(context, result, net_db, ipam_sub)
@ -956,7 +972,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def create_subnet(self, context, subnet):
self._before_create_subnet(context, subnet)
result, mech_context = self._create_subnet_db(context, subnet)
return self._after_create_subnet(context, result, mech_context)
def _after_create_subnet(self, context, result, mech_context):
kwargs = {'context': context, 'subnet': result}
registry.notify(resources.SUBNET, events.AFTER_CREATE, self, **kwargs)
try:
@ -1070,7 +1090,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
context, port['id'], resources.PORT,
provisioning_blocks.DHCP_ENTITY)
def _create_port_db(self, context, port):
def _before_create_port(self, context, port):
attrs = port[attributes.PORT]
if not attrs.get('status'):
attrs['status'] = const.PORT_STATUS_DOWN
@ -1080,6 +1100,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# NOTE(kevinbenton): triggered outside of transaction since it
# emits 'AFTER' events if it creates.
self._ensure_default_security_group(context, attrs['tenant_id'])
def _create_port_db(self, context, port):
attrs = port[attributes.PORT]
with db_api.context_manager.writer.using(context):
dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, [])
port_db = self.create_port_db(context, port)
@ -1114,7 +1137,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def create_port(self, context, port):
self._before_create_port(context, port)
result, mech_context = self._create_port_db(context, port)
return self._after_create_port(context, result, mech_context)
def _after_create_port(self, context, result, mech_context):
# notify any plugin that is interested in port create events
kwargs = {'context': context, 'port': result}
registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs)
@ -1140,26 +1167,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@db_api.retry_if_session_inactive()
def create_port_bulk(self, context, ports):
objects = self._create_bulk_ml2(attributes.PORT, context, ports)
for obj in objects:
attrs = obj['attributes']
if attrs and attrs.get(portbindings.HOST_ID):
kwargs = {'context': context, 'port': obj['result']}
registry.notify(
resources.PORT, events.AFTER_CREATE, self, **kwargs)
try:
for obj in objects:
obj['bound_context'] = self._bind_port_if_needed(
obj['mech_context'])
return [obj['bound_context'].current for obj in objects]
except ml2_exc.MechanismDriverError:
with excutils.save_and_reraise_exception():
resource_ids = [res['result']['id'] for res in objects]
LOG.error(_LE("_bind_port_if_needed failed. "
"Deleting all ports from create bulk '%s'"),
resource_ids)
self._delete_objects(context, attributes.PORT, objects)
return [obj['result'] for obj in objects]
# TODO(yalei) - will be simplified after security group and address pair be
# converted to ext driver too.

View File

@ -277,6 +277,24 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
self.assertEqual(n['network']['id'],
kwargs['network']['id'])
def test_bulk_network_before_and_after_events_outside_of_txn(self):
# capture session states during each before and after event
before = []
after = []
b_func = lambda *a, **k: before.append(k['context'].session.is_active)
a_func = lambda *a, **k: after.append(k['context'].session.is_active)
registry.subscribe(b_func, resources.NETWORK, events.BEFORE_CREATE)
registry.subscribe(a_func, resources.NETWORK, events.AFTER_CREATE)
data = [{'tenant_id': self._tenant_id}] * 4
self._create_bulk_from_list(
self.fmt, 'network', data, context=context.get_admin_context())
# ensure events captured
self.assertTrue(before)
self.assertTrue(after)
# ensure session was closed for all
self.assertFalse(any(before))
self.assertFalse(any(after))
def _create_and_verify_networks(self, networks):
for net_idx, net in enumerate(networks):
# create
@ -727,6 +745,27 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
self._delete('ports', p['port']['id'])
self.assertFalse(self.tx_open)
def test_bulk_ports_before_and_after_events_outside_of_txn(self):
with self.network() as n:
pass
# capture session states during each before and after event
before = []
after = []
b_func = lambda *a, **k: before.append(k['context'].session.is_active)
a_func = lambda *a, **k: after.append(k['context'].session.is_active)
registry.subscribe(b_func, resources.PORT, events.BEFORE_CREATE)
registry.subscribe(a_func, resources.PORT, events.AFTER_CREATE)
data = [{'tenant_id': self._tenant_id,
'network_id': n['network']['id']}] * 4
self._create_bulk_from_list(
self.fmt, 'port', data, context=context.get_admin_context())
# ensure events captured
self.assertTrue(before)
self.assertTrue(after)
# ensure session was closed for all
self.assertFalse(any(before))
self.assertFalse(any(after))
def test_create_router_port_and_fail_create_postcommit(self):
with mock.patch.object(managers.MechanismManager,