Remove RPC notification from transaction in create/update port

Removing notifications to the L3 agent from within the transaction in
create_port and update_port eliminates many lock wait timeouts in the
dvr check queue job and in scale testing locally.

Since this patch leaves context unused in _process_port_binding, the
argument is removed from the method.

Closes-Bug: #1371732

Change-Id: Ibd86611ad3e7eff085d769bdff777a5870f30c58
This commit is contained in:
Carl Baldwin 2014-09-19 17:37:17 +00:00
parent 6090d4d777
commit 411836b541
2 changed files with 90 additions and 12 deletions

View File

@ -159,7 +159,27 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# TODO(rkukura): Implement filtering.
return nets
def _process_port_binding(self, mech_context, context, attrs):
def _notify_l3_agent_new_port(self, context, port):
if not port:
return
# Whenever a DVR serviceable port comes up on a
# node, it has to be communicated to the L3 Plugin
# and agent for creating the respective namespaces.
if (utils.is_dvr_serviced(port['device_owner'])):
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
if (utils.is_extension_supported(
l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)):
l3plugin.dvr_update_router_addvm(context, port)
def _get_host_port_if_changed(self, mech_context, attrs):
binding = mech_context._binding
host = attrs and attrs.get(portbindings.HOST_ID)
if (attributes.is_attr_set(host) and binding.host != host):
return mech_context.current
def _process_port_binding(self, mech_context, attrs):
binding = mech_context._binding
port = mech_context.current
changes = False
@ -169,15 +189,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
binding.host != host):
binding.host = host
changes = True
# Whenever a DVR serviceable port comes up on a
# node, it has to be communicated to the L3 Plugin
# and agent for creating the respective namespaces.
if (utils.is_dvr_serviced(port['device_owner'])):
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
if (utils.is_extension_supported(
l3plugin, const.L3_DISTRIBUTED_EXT_ALIAS)):
l3plugin.dvr_update_router_addvm(context, port)
vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE)
if (attributes.is_attr_set(vnic_type) and
@ -770,7 +781,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
binding = db.add_port_binding(session, result['id'])
mech_context = driver_context.PortContext(self, context, result,
network, binding)
self._process_port_binding(mech_context, context, attrs)
new_host_port = self._get_host_port_if_changed(mech_context, attrs)
self._process_port_binding(mech_context, attrs)
result[addr_pair.ADDRESS_PAIRS] = (
self._process_create_allowed_address_pairs(
@ -780,6 +792,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
dhcp_opts)
self.mechanism_manager.create_port_precommit(mech_context)
# Notification must be sent after the above transaction is complete
self._notify_l3_agent_new_port(context, new_host_port)
try:
self.mechanism_manager.create_port_postcommit(mech_context)
except ml2_exc.MechanismDriverError:
@ -834,10 +849,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
mech_context = driver_context.PortContext(
self, context, updated_port, network, binding,
original_port=original_port)
new_host_port = self._get_host_port_if_changed(mech_context, attrs)
need_port_update_notify |= self._process_port_binding(
mech_context, context, attrs)
mech_context, attrs)
self.mechanism_manager.update_port_precommit(mech_context)
# Notification must be sent after the above transaction is complete
self._notify_l3_agent_new_port(context, new_host_port)
# TODO(apech) - handle errors raised by update_port, potentially
# by re-calling update_port with the previous attributes. For
# now the error is propogated to the caller, which is expected to

View File

@ -23,6 +23,7 @@ from neutron.common import constants
from neutron.common import exceptions as exc
from neutron.common import utils
from neutron import context
from neutron.db import db_base_plugin_v2 as base_plugin
from neutron.extensions import multiprovidernet as mpnet
from neutron.extensions import portbindings
from neutron.extensions import providernet as pnet
@ -36,6 +37,7 @@ from neutron.plugins.ml2 import driver_context
from neutron.plugins.ml2.drivers import type_vlan
from neutron.plugins.ml2 import models
from neutron.plugins.ml2 import plugin as ml2_plugin
from neutron.tests import base
from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit.ml2.drivers import mechanism_logger as mech_logger
from neutron.tests.unit.ml2.drivers import mechanism_test as mech_test
@ -940,3 +942,60 @@ class TestFaultyMechansimDriver(Ml2PluginV2FaultyDriverTestCase):
self.assertEqual(new_name, port['port']['name'])
self._delete('ports', port['port']['id'])
class TestMl2PluginCreateUpdatePort(base.BaseTestCase):
def setUp(self):
super(TestMl2PluginCreateUpdatePort, self).setUp()
self.context = mock.MagicMock()
def _ensure_transaction_is_closed(self):
transaction = self.context.session.begin(subtransactions=True)
enter = transaction.__enter__.call_count
exit = transaction.__exit__.call_count
self.assertEqual(enter, exit)
def _create_plugin_for_create_update_port(self, new_host_port):
plugin = ml2_plugin.Ml2Plugin()
plugin.extension_manager = mock.Mock()
plugin.type_manager = mock.Mock()
plugin.mechanism_manager = mock.Mock()
plugin.notifier = mock.Mock()
plugin._get_host_port_if_changed = mock.Mock(
return_value=new_host_port)
plugin._notify_l3_agent_new_port = mock.Mock()
plugin._notify_l3_agent_new_port.side_effect = (
lambda c, p: self._ensure_transaction_is_closed())
return plugin
def test_create_port_rpc_outside_transaction(self):
with contextlib.nested(
mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
mock.patch.object(base_plugin.NeutronDbPluginV2, 'create_port'),
) as (init, super_create_port):
init.return_value = None
new_host_port = mock.Mock()
plugin = self._create_plugin_for_create_update_port(new_host_port)
plugin.create_port(self.context, mock.MagicMock())
plugin._notify_l3_agent_new_port.assert_called_once_with(
self.context, new_host_port)
def test_update_port_rpc_outside_transaction(self):
with contextlib.nested(
mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'),
) as (init, super_update_port):
init.return_value = None
new_host_port = mock.Mock()
plugin = self._create_plugin_for_create_update_port(new_host_port)
plugin.update_port(self.context, 'fake_id', mock.MagicMock())
plugin._notify_l3_agent_new_port.assert_called_once_with(
self.context, new_host_port)