Merge "Lock in DHCP agent based on network_id" into stable/newton

This commit is contained in:
Jenkins 2017-05-05 23:49:42 +00:00 committed by Gerrit Code Review
commit 9c08fc4076
2 changed files with 75 additions and 22 deletions

View File

@ -19,11 +19,13 @@ import os
import eventlet
from neutron_lib import constants
from neutron_lib import exceptions
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_service import loopingcall
from oslo_utils import importutils
import six
from neutron._i18n import _, _LE, _LI, _LW
from neutron.agent.linux import dhcp
@ -38,6 +40,31 @@ from neutron import context
from neutron import manager
LOG = logging.getLogger(__name__)
_SYNC_STATE_LOCK = lockutils.ReaderWriterLock()
def _sync_lock(f):
"""Decorator to block all operations for a global sync call."""
@six.wraps(f)
def wrapped(*args, **kwargs):
with _SYNC_STATE_LOCK.write_lock():
return f(*args, **kwargs)
return wrapped
def _wait_if_syncing(f):
"""Decorator to wait if any sync operations are in progress."""
@six.wraps(f)
def wrapped(*args, **kwargs):
with _SYNC_STATE_LOCK.read_lock():
return f(*args, **kwargs)
return wrapped
def _net_lock(network_id):
"""Returns a context manager lock based on network_id."""
lock_name = 'dhcp-agent-network-lock-%s' % network_id
return lockutils.lock(lock_name, utils.SYNCHRONIZED_PREFIX)
class DhcpAgent(manager.Manager):
@ -144,7 +171,7 @@ class DhcpAgent(manager.Manager):
"""
self.needs_resync_reasons[network_id].append(reason)
@utils.synchronized('dhcp-agent')
@_sync_lock
def sync_state(self, networks=None):
"""Sync the local DHCP state with Neutron. If no networks are passed,
or 'None' is one of the networks, sync all of the networks.
@ -327,52 +354,64 @@ class DhcpAgent(manager.Manager):
# Update the metadata proxy after the dhcp driver has been updated
self.update_isolated_metadata_proxy(network)
@utils.synchronized('dhcp-agent')
@_wait_if_syncing
def network_create_end(self, context, payload):
"""Handle the network.create.end notification event."""
network_id = payload['network']['id']
self.enable_dhcp_helper(network_id)
with _net_lock(network_id):
self.enable_dhcp_helper(network_id)
@utils.synchronized('dhcp-agent')
@_wait_if_syncing
def network_update_end(self, context, payload):
"""Handle the network.update.end notification event."""
network_id = payload['network']['id']
if payload['network']['admin_state_up']:
self.enable_dhcp_helper(network_id)
else:
self.disable_dhcp_helper(network_id)
with _net_lock(network_id):
if payload['network']['admin_state_up']:
self.enable_dhcp_helper(network_id)
else:
self.disable_dhcp_helper(network_id)
@utils.synchronized('dhcp-agent')
@_wait_if_syncing
def network_delete_end(self, context, payload):
"""Handle the network.delete.end notification event."""
self.disable_dhcp_helper(payload['network_id'])
network_id = payload['network_id']
with _net_lock(network_id):
self.disable_dhcp_helper(network_id)
@utils.synchronized('dhcp-agent')
@_wait_if_syncing
def subnet_update_end(self, context, payload):
"""Handle the subnet.update.end notification event."""
network_id = payload['subnet']['network_id']
self.refresh_dhcp_helper(network_id)
with _net_lock(network_id):
self.refresh_dhcp_helper(network_id)
# Use the update handler for the subnet create event.
subnet_create_end = subnet_update_end
@utils.synchronized('dhcp-agent')
@_wait_if_syncing
def subnet_delete_end(self, context, payload):
"""Handle the subnet.delete.end notification event."""
subnet_id = payload['subnet_id']
network = self.cache.get_network_by_subnet_id(subnet_id)
if network:
if not network:
return
with _net_lock(network.id):
network = self.cache.get_network_by_subnet_id(subnet_id)
if not network:
return
self.refresh_dhcp_helper(network.id)
@utils.synchronized('dhcp-agent')
@_wait_if_syncing
def port_update_end(self, context, payload):
"""Handle the port.update.end notification event."""
updated_port = dhcp.DictModel(payload['port'])
if self.cache.is_port_message_stale(payload['port']):
LOG.debug("Discarding stale port update: %s", updated_port)
return
network = self.cache.get_network_by_id(updated_port.network_id)
if network:
with _net_lock(updated_port.network_id):
if self.cache.is_port_message_stale(payload['port']):
LOG.debug("Discarding stale port update: %s", updated_port)
return
network = self.cache.get_network_by_id(updated_port.network_id)
if not network:
return
LOG.info(_LI("Trigger reload_allocations for port %s"),
updated_port)
driver_action = 'reload_allocations'
@ -409,12 +448,17 @@ class DhcpAgent(manager.Manager):
# Use the update handler for the port create event.
port_create_end = port_update_end
@utils.synchronized('dhcp-agent')
@_wait_if_syncing
def port_delete_end(self, context, payload):
"""Handle the port.delete.end notification event."""
port = self.cache.get_port_by_id(payload['port_id'])
self.cache.deleted_ports.add(payload['port_id'])
if port:
if not port:
return
with _net_lock(port.network_id):
port = self.cache.get_port_by_id(payload['port_id'])
if not port:
return
network = self.cache.get_network_by_id(port.network_id)
self.cache.remove_port(port)
if self._is_port_on_this_agent(port):

View File

@ -1033,6 +1033,14 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.call_driver.assert_called_once_with('reload_allocations',
fake_network)
def test_port_update_end_grabs_lock(self):
payload = dict(port=fake_port2)
self.cache.get_network_by_id.return_value = None
self.cache.get_port_by_id.return_value = fake_port2
with mock.patch('neutron.agent.dhcp.agent._net_lock') as nl:
self.dhcp.port_update_end(None, payload)
nl.assert_called_once_with(fake_port2.network_id)
def test_port_update_change_ip_on_port(self):
payload = dict(port=fake_port1)
self.cache.get_network_by_id.return_value = fake_network
@ -1101,6 +1109,7 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.cache.assert_has_calls(
[mock.call.get_port_by_id(fake_port2.id),
mock.call.deleted_ports.add(fake_port2.id),
mock.call.get_port_by_id(fake_port2.id),
mock.call.get_network_by_id(fake_network.id),
mock.call.remove_port(fake_port2)])
self.call_driver.assert_has_calls(