Prioritize port create and update ready messages

The DHCP agent prioritizes RPC messages based on the
priority field send from neutron-server, but then groups
them all in the same dhcp_ready_ports set when sending
them back to the server to clear the provisioning block(s).

Priority should be given to new and changed ports, since
those are most likely to be associated with new instances
which can fail to boot if they are not handled quickly when
the agent is very busy, for example, right after it was
restarted.

Conflicts:
    neutron/tests/unit/agent/dhcp/test_agent.py

Change-Id: Ib5074abadd7189bb4bdd5e46c677f1bfb071221e
Closes-bug: #1864675
(cherry picked from commit 113dfac608)
This commit is contained in:
Brian Haley 2020-02-25 15:01:47 -05:00
parent 2d0adf4a05
commit e2f01c65d9
2 changed files with 79 additions and 16 deletions

View File

@ -87,6 +87,7 @@ class DhcpAgent(manager.Manager):
super(DhcpAgent, self).__init__(host=host)
self.needs_resync_reasons = collections.defaultdict(list)
self.dhcp_ready_ports = set()
self.dhcp_prio_ready_ports = set()
self.conf = conf or cfg.CONF
# If 'resync_throttle' is configured more than 'resync_interval' by
# mistake, raise exception and log with message.
@ -242,23 +243,29 @@ class DhcpAgent(manager.Manager):
def _dhcp_ready_ports_loop(self):
"""Notifies the server of any ports that had reservations setup."""
while True:
# this is just watching a set so we can do it really frequently
# this is just watching sets so we can do it really frequently
eventlet.sleep(0.1)
if self.dhcp_ready_ports:
ports_to_send = set()
for port_count in range(min(len(self.dhcp_ready_ports),
DHCP_READY_PORTS_SYNC_MAX)):
ports_to_send.add(self.dhcp_ready_ports.pop())
prio_ports_to_send = set()
ports_to_send = set()
for port_count in range(min(len(self.dhcp_prio_ready_ports) +
len(self.dhcp_ready_ports),
DHCP_READY_PORTS_SYNC_MAX)):
if self.dhcp_prio_ready_ports:
prio_ports_to_send.add(self.dhcp_prio_ready_ports.pop())
continue
ports_to_send.add(self.dhcp_ready_ports.pop())
if prio_ports_to_send or ports_to_send:
try:
self.plugin_rpc.dhcp_ready_on_ports(ports_to_send)
self.plugin_rpc.dhcp_ready_on_ports(prio_ports_to_send |
ports_to_send)
LOG.info("DHCP configuration for ports %s is completed",
ports_to_send)
prio_ports_to_send | ports_to_send)
continue
except Exception:
LOG.exception("Failure notifying DHCP server of "
"ready DHCP ports. Will retry on next "
"iteration.")
self.dhcp_prio_ready_ports |= prio_ports_to_send
self.dhcp_ready_ports |= ports_to_send
def start_ready_ports_loop(self):
@ -540,9 +547,10 @@ class DhcpAgent(manager.Manager):
network = self.cache.get_network_by_id(updated_port.network_id)
if not network:
return
self.reload_allocations(updated_port, network)
# treat update as a create event
self.reload_allocations(updated_port, network, prio=True)
def reload_allocations(self, port, network):
def reload_allocations(self, port, network, prio=False):
LOG.info("Trigger reload_allocations for port %s", port)
driver_action = 'reload_allocations'
if self._is_port_on_this_agent(port):
@ -568,7 +576,10 @@ class DhcpAgent(manager.Manager):
driver_action = 'restart'
self.cache.put_port(port)
self.call_driver(driver_action, network)
self.dhcp_ready_ports.add(port.id)
if prio:
self.dhcp_prio_ready_ports.add(port.id)
else:
self.dhcp_ready_ports.add(port.id)
self.update_isolated_metadata_proxy(network)
def _is_port_on_this_agent(self, port):
@ -605,7 +616,7 @@ class DhcpAgent(manager.Manager):
"DHCP cache is out of sync",
created_port.network_id)
return
self.reload_allocations(created_port, network)
self.reload_allocations(created_port, network, prio=True)
def port_delete_end(self, context, payload):
"""Handle the port.delete.end notification event."""

View File

@ -494,6 +494,55 @@ class TestDhcpAgent(base.BaseTestCase):
ready.call_args_list[1][0][0])
self.assertEqual(set(range(port_count)), ports_ready)
def test_dhcp_ready_ports_loop_with_limit_ports_per_call_prio(self):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
sync_max = dhcp_agent.DHCP_READY_PORTS_SYNC_MAX
port_count = 4
# port set ranges must be unique to differentiate results
dhcp.dhcp_prio_ready_ports = set(range(sync_max))
dhcp.dhcp_ready_ports = set(range(sync_max, sync_max + port_count))
with mock.patch.object(dhcp.plugin_rpc,
'dhcp_ready_on_ports') as ready:
# exit after 1 iteration
with mock.patch.object(dhcp_agent.eventlet, 'sleep',
side_effect=[0, RuntimeError]):
with testtools.ExpectedException(RuntimeError):
dhcp._dhcp_ready_ports_loop()
# only priority ports should have been processed
self.assertEqual(set(), dhcp.dhcp_prio_ready_ports)
self.assertEqual(set(range(sync_max, sync_max + port_count)),
dhcp.dhcp_ready_ports)
# one call is expected, with DHCP_READY_PORTS_SYNC_MAX ports
self.assertEqual(1, ready.call_count)
self.assertEqual(sync_max, len(ready.call_args_list[0][0][0]))
# priority ports need to be ready
ports_ready = ready.call_args_list[0][0][0]
self.assertEqual(set(range(sync_max)), ports_ready)
# add some priority ports, to make sure they are processed
dhcp.dhcp_prio_ready_ports = set(range(port_count))
with mock.patch.object(dhcp.plugin_rpc,
'dhcp_ready_on_ports') as ready:
# exit after 1 iteration
with mock.patch.object(dhcp_agent.eventlet, 'sleep',
side_effect=[0, RuntimeError]):
with testtools.ExpectedException(RuntimeError):
dhcp._dhcp_ready_ports_loop()
# all ports should have been processed
self.assertEqual(set(), dhcp.dhcp_prio_ready_ports)
self.assertEqual(set(), dhcp.dhcp_ready_ports)
# one call is expected, with (port_count * 2) ports
self.assertEqual(1, ready.call_count)
self.assertEqual(port_count * 2, len(ready.call_args_list[0][0][0]))
# all ports need to be ready
ports_ready = ready.call_args_list[0][0][0]
all_ports = (set(range(port_count)) |
set(range(sync_max, sync_max + port_count)))
self.assertEqual(all_ports, ports_ready)
def test_dhcp_ready_ports_updates_after_enable_dhcp(self):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
self.assertEqual(set(), dhcp.dhcp_ready_ports)
@ -1165,7 +1214,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.dhcp.port_update_end(None, payload)
self.dhcp._process_resource_update()
self.reload_allocations.assert_called_once_with(fake_port2,
fake_network)
fake_network,
prio=True)
def test_reload_allocations(self):
self.cache.get_port_by_id.return_value = fake_port2
@ -1186,7 +1236,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.dhcp.port_create_end(None, payload)
self.dhcp._process_resource_update()
self.reload_allocations.assert_called_once_with(fake_port2,
fake_network)
fake_network,
prio=True)
def test_port_create_end_no_resync_if_same_port_already_in_cache(self):
self.reload_allocations_p = mock.patch.object(self.dhcp,
@ -1200,7 +1251,8 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
self.dhcp.port_create_end(None, payload)
self.dhcp._process_resource_update()
self.reload_allocations.assert_called_once_with(fake_port2,
new_fake_network)
new_fake_network,
prio=True)
self.schedule_resync.assert_not_called()
def test_port_update_change_ip_on_port(self):