Merge "Fixup event transaction semantics for ML2 bulk ops"
This commit is contained in:
commit
0a90dc3ee0
|
@ -670,6 +670,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
objects = []
|
||||
collection = "%ss" % resource
|
||||
items = request_items[collection]
|
||||
obj_before_create = getattr(self, '_before_create_%s' % resource)
|
||||
for item in items:
|
||||
obj_before_create(context, item)
|
||||
with db_api.context_manager.writer.using(context):
|
||||
obj_creator = getattr(self, '_create_%s_db' % resource)
|
||||
for item in items:
|
||||
|
@ -687,22 +690,23 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
"the %(resource)s:%(item)s"),
|
||||
{'resource': resource, 'item': item})
|
||||
|
||||
postcommit_op = getattr(self.mechanism_manager,
|
||||
'create_%s_postcommit' % resource)
|
||||
postcommit_op = getattr(self, '_after_create_%s' % resource)
|
||||
for obj in objects:
|
||||
try:
|
||||
postcommit_op(obj['mech_context'])
|
||||
except ml2_exc.MechanismDriverError:
|
||||
postcommit_op(context, obj['result'], obj['mech_context'])
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
resource_ids = [res['result']['id'] for res in objects]
|
||||
LOG.exception(_LE("mechanism_manager.create_%(res)s"
|
||||
"_postcommit failed for %(res)s: "
|
||||
LOG.exception(_LE("ML2 _after_create_%(res)s "
|
||||
"failed for %(res)s: "
|
||||
"'%(failed_id)s'. Deleting "
|
||||
"%(res)ss %(resource_ids)s"),
|
||||
{'res': resource,
|
||||
'failed_id': obj['result']['id'],
|
||||
'resource_ids': ', '.join(resource_ids)})
|
||||
self._delete_objects(context, resource, objects)
|
||||
# _after_handler will have deleted the object that threw
|
||||
to_delete = [o for o in objects if o != obj]
|
||||
self._delete_objects(context, resource, to_delete)
|
||||
return objects
|
||||
|
||||
def _get_network_mtu(self, network):
|
||||
|
@ -744,11 +748,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
mtus.append(mtu)
|
||||
return min(mtus) if mtus else 0
|
||||
|
||||
def _before_create_network(self, context, network):
|
||||
net_data = network[attributes.NETWORK]
|
||||
registry.notify(resources.NETWORK, events.BEFORE_CREATE, self,
|
||||
context=context, network=net_data)
|
||||
|
||||
def _create_network_db(self, context, network):
|
||||
net_data = network[attributes.NETWORK]
|
||||
tenant_id = net_data['tenant_id']
|
||||
registry.notify(resources.NETWORK, events.BEFORE_CREATE, self,
|
||||
context=context, network=net_data)
|
||||
with db_api.context_manager.writer.using(context):
|
||||
net_db = self.create_network_db(context, network)
|
||||
result = self._make_network_dict(net_db, process_extensions=False,
|
||||
|
@ -787,7 +794,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
@utils.transaction_guard
|
||||
@db_api.retry_if_session_inactive()
|
||||
def create_network(self, context, network):
|
||||
self._before_create_network(context, network)
|
||||
result, mech_context = self._create_network_db(context, network)
|
||||
return self._after_create_network(context, result, mech_context)
|
||||
|
||||
def _after_create_network(self, context, result, mech_context):
|
||||
kwargs = {'context': context, 'network': result}
|
||||
registry.notify(resources.NETWORK, events.AFTER_CREATE, self, **kwargs)
|
||||
try:
|
||||
|
@ -935,6 +946,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
" failed"))
|
||||
self.notifier.network_delete(context, network['id'])
|
||||
|
||||
def _before_create_subnet(self, context, subnet):
|
||||
# TODO(kevinbenton): BEFORE notification should be added here
|
||||
pass
|
||||
|
||||
def _create_subnet_db(self, context, subnet):
|
||||
with db_api.context_manager.writer.using(context):
|
||||
result, net_db, ipam_sub = self._create_subnet_precommit(
|
||||
|
@ -948,6 +963,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
result, network)
|
||||
self.mechanism_manager.create_subnet_precommit(mech_context)
|
||||
|
||||
# TODO(kevinbenton): move this to '_after_subnet_create'
|
||||
# db base plugin post commit ops
|
||||
self._create_subnet_postcommit(context, result, net_db, ipam_sub)
|
||||
|
||||
|
@ -956,7 +972,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
@utils.transaction_guard
|
||||
@db_api.retry_if_session_inactive()
|
||||
def create_subnet(self, context, subnet):
|
||||
self._before_create_subnet(context, subnet)
|
||||
result, mech_context = self._create_subnet_db(context, subnet)
|
||||
return self._after_create_subnet(context, result, mech_context)
|
||||
|
||||
def _after_create_subnet(self, context, result, mech_context):
|
||||
kwargs = {'context': context, 'subnet': result}
|
||||
registry.notify(resources.SUBNET, events.AFTER_CREATE, self, **kwargs)
|
||||
try:
|
||||
|
@ -1070,7 +1090,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
context, port['id'], resources.PORT,
|
||||
provisioning_blocks.DHCP_ENTITY)
|
||||
|
||||
def _create_port_db(self, context, port):
|
||||
def _before_create_port(self, context, port):
|
||||
attrs = port[attributes.PORT]
|
||||
if not attrs.get('status'):
|
||||
attrs['status'] = const.PORT_STATUS_DOWN
|
||||
|
@ -1080,6 +1100,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
# NOTE(kevinbenton): triggered outside of transaction since it
|
||||
# emits 'AFTER' events if it creates.
|
||||
self._ensure_default_security_group(context, attrs['tenant_id'])
|
||||
|
||||
def _create_port_db(self, context, port):
|
||||
attrs = port[attributes.PORT]
|
||||
with db_api.context_manager.writer.using(context):
|
||||
dhcp_opts = attrs.get(edo_ext.EXTRADHCPOPTS, [])
|
||||
port_db = self.create_port_db(context, port)
|
||||
|
@ -1114,7 +1137,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
@utils.transaction_guard
|
||||
@db_api.retry_if_session_inactive()
|
||||
def create_port(self, context, port):
|
||||
self._before_create_port(context, port)
|
||||
result, mech_context = self._create_port_db(context, port)
|
||||
return self._after_create_port(context, result, mech_context)
|
||||
|
||||
def _after_create_port(self, context, result, mech_context):
|
||||
# notify any plugin that is interested in port create events
|
||||
kwargs = {'context': context, 'port': result}
|
||||
registry.notify(resources.PORT, events.AFTER_CREATE, self, **kwargs)
|
||||
|
@ -1140,26 +1167,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
@db_api.retry_if_session_inactive()
|
||||
def create_port_bulk(self, context, ports):
|
||||
objects = self._create_bulk_ml2(attributes.PORT, context, ports)
|
||||
|
||||
for obj in objects:
|
||||
attrs = obj['attributes']
|
||||
if attrs and attrs.get(portbindings.HOST_ID):
|
||||
kwargs = {'context': context, 'port': obj['result']}
|
||||
registry.notify(
|
||||
resources.PORT, events.AFTER_CREATE, self, **kwargs)
|
||||
|
||||
try:
|
||||
for obj in objects:
|
||||
obj['bound_context'] = self._bind_port_if_needed(
|
||||
obj['mech_context'])
|
||||
return [obj['bound_context'].current for obj in objects]
|
||||
except ml2_exc.MechanismDriverError:
|
||||
with excutils.save_and_reraise_exception():
|
||||
resource_ids = [res['result']['id'] for res in objects]
|
||||
LOG.error(_LE("_bind_port_if_needed failed. "
|
||||
"Deleting all ports from create bulk '%s'"),
|
||||
resource_ids)
|
||||
self._delete_objects(context, attributes.PORT, objects)
|
||||
return [obj['result'] for obj in objects]
|
||||
|
||||
# TODO(yalei) - will be simplified after security group and address pair be
|
||||
# converted to ext driver too.
|
||||
|
|
|
@ -277,6 +277,24 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
|
|||
self.assertEqual(n['network']['id'],
|
||||
kwargs['network']['id'])
|
||||
|
||||
def test_bulk_network_before_and_after_events_outside_of_txn(self):
|
||||
# capture session states during each before and after event
|
||||
before = []
|
||||
after = []
|
||||
b_func = lambda *a, **k: before.append(k['context'].session.is_active)
|
||||
a_func = lambda *a, **k: after.append(k['context'].session.is_active)
|
||||
registry.subscribe(b_func, resources.NETWORK, events.BEFORE_CREATE)
|
||||
registry.subscribe(a_func, resources.NETWORK, events.AFTER_CREATE)
|
||||
data = [{'tenant_id': self._tenant_id}] * 4
|
||||
self._create_bulk_from_list(
|
||||
self.fmt, 'network', data, context=context.get_admin_context())
|
||||
# ensure events captured
|
||||
self.assertTrue(before)
|
||||
self.assertTrue(after)
|
||||
# ensure session was closed for all
|
||||
self.assertFalse(any(before))
|
||||
self.assertFalse(any(after))
|
||||
|
||||
def _create_and_verify_networks(self, networks):
|
||||
for net_idx, net in enumerate(networks):
|
||||
# create
|
||||
|
@ -727,6 +745,27 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
|
|||
self._delete('ports', p['port']['id'])
|
||||
self.assertFalse(self.tx_open)
|
||||
|
||||
def test_bulk_ports_before_and_after_events_outside_of_txn(self):
|
||||
with self.network() as n:
|
||||
pass
|
||||
# capture session states during each before and after event
|
||||
before = []
|
||||
after = []
|
||||
b_func = lambda *a, **k: before.append(k['context'].session.is_active)
|
||||
a_func = lambda *a, **k: after.append(k['context'].session.is_active)
|
||||
registry.subscribe(b_func, resources.PORT, events.BEFORE_CREATE)
|
||||
registry.subscribe(a_func, resources.PORT, events.AFTER_CREATE)
|
||||
data = [{'tenant_id': self._tenant_id,
|
||||
'network_id': n['network']['id']}] * 4
|
||||
self._create_bulk_from_list(
|
||||
self.fmt, 'port', data, context=context.get_admin_context())
|
||||
# ensure events captured
|
||||
self.assertTrue(before)
|
||||
self.assertTrue(after)
|
||||
# ensure session was closed for all
|
||||
self.assertFalse(any(before))
|
||||
self.assertFalse(any(after))
|
||||
|
||||
def test_create_router_port_and_fail_create_postcommit(self):
|
||||
|
||||
with mock.patch.object(managers.MechanismManager,
|
||||
|
|
Loading…
Reference in New Issue