L3 agent: paginate sync routers task

In case there are thousands of routers attached to thousands of
networks, sync_routers request might take a long time and lead to timeout
on agent side, so agent initiate another resync. This may lead to an endless
loop causing server overload and agent not being able to sync state.

This patch makes l3 agent first check how many routers are assigned to
it and then start to fetch routers by chunks.
Initial chunk size is set to 256 but may be decreased dynamically in case
timeouts happen while waiting response from server.

This approach allows to reduce the load on server side and to speed up
resync on agent side by starting processing right after receiving
the first chunk.

Closes-Bug: #1516260
Change-Id: Id675910c2a0b862bfb9e6f4fdaf3cd9fe337e52f
This commit is contained in:
Oleg Bondarev 2015-10-13 12:45:59 +03:00
parent 904cdc723b
commit 0e97feb0f3
5 changed files with 87 additions and 34 deletions

View File

@ -64,6 +64,11 @@ NS_PREFIX = namespaces.NS_PREFIX
INTERNAL_DEV_PREFIX = namespaces.INTERNAL_DEV_PREFIX
EXTERNAL_DEV_PREFIX = namespaces.EXTERNAL_DEV_PREFIX
# Number of routers to fetch from server at a time on resync.
# Needed to reduce load on server side and to speed up resync on agent side.
SYNC_ROUTERS_MAX_CHUNK_SIZE = 256
SYNC_ROUTERS_MIN_CHUNK_SIZE = 32
class L3PluginApi(object):
"""Agent side of the l3 agent RPC API.
@ -83,6 +88,7 @@ class L3PluginApi(object):
1.7 - DVR support: new L3 plugin methods added.
- delete_agent_gateway_port
1.8 - Added address scope information
1.9 - Added get_router_ids
"""
def __init__(self, topic, host):
@ -96,6 +102,11 @@ class L3PluginApi(object):
return cctxt.call(context, 'sync_routers', host=self.host,
router_ids=router_ids)
def get_router_ids(self, context):
"""Make a remote process call to retrieve scheduled routers ids."""
cctxt = self.client.prepare(version='1.9')
return cctxt.call(context, 'get_router_ids', host=self.host)
def get_external_network_id(self, context):
"""Make a remote process call to retrieve the external network id.
@ -188,6 +199,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
self.context = n_context.get_admin_context_without_session()
self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
self.fullsync = True
self.sync_routers_chunk_size = SYNC_ROUTERS_MAX_CHUNK_SIZE
# Get the list of service plugins from Neutron Server
# This is the first place where we contact neutron-server on startup
@ -532,45 +544,68 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
def fetch_and_sync_all_routers(self, context, ns_manager):
prev_router_ids = set(self.router_info)
curr_router_ids = set()
timestamp = timeutils.utcnow()
try:
if self.conf.router_id:
routers = self.plugin_rpc.get_routers(context,
[self.conf.router_id])
router_ids = ([self.conf.router_id] if self.conf.router_id else
self.plugin_rpc.get_router_ids(context))
# fetch routers by chunks to reduce the load on server and to
# start router processing earlier
for i in range(0, len(router_ids), self.sync_routers_chunk_size):
routers = self.plugin_rpc.get_routers(
context, router_ids[i:i + self.sync_routers_chunk_size])
LOG.debug('Processing :%r', routers)
for r in routers:
curr_router_ids.add(r['id'])
ns_manager.keep_router(r['id'])
if r.get('distributed'):
# need to keep fip namespaces as well
ext_net_id = (r['external_gateway_info'] or {}).get(
'network_id')
if ext_net_id:
ns_manager.keep_ext_net(ext_net_id)
update = queue.RouterUpdate(
r['id'],
queue.PRIORITY_SYNC_ROUTERS_TASK,
router=r,
timestamp=timestamp)
self._queue.add(update)
except oslo_messaging.MessagingTimeout:
if self.sync_routers_chunk_size > SYNC_ROUTERS_MIN_CHUNK_SIZE:
self.sync_routers_chunk_size = max(
self.sync_routers_chunk_size / 2,
SYNC_ROUTERS_MIN_CHUNK_SIZE)
LOG.error(_LE('Server failed to return info for routers in '
'required time, decreasing chunk size to: %s'),
self.sync_routers_chunk_size)
else:
routers = self.plugin_rpc.get_routers(context)
LOG.error(_LE('Server failed to return info for routers in '
'required time even with min chunk size: %s. '
'It might be under very high load or '
'just inoperable'),
self.sync_routers_chunk_size)
raise
except oslo_messaging.MessagingException:
LOG.exception(_LE("Failed synchronizing routers due to RPC error"))
raise n_exc.AbortSyncRouters()
else:
LOG.debug('Processing :%r', routers)
for r in routers:
ns_manager.keep_router(r['id'])
if r.get('distributed'):
# need to keep fip namespaces as well
ext_net_id = (r['external_gateway_info'] or {}).get(
'network_id')
if ext_net_id:
ns_manager.keep_ext_net(ext_net_id)
update = queue.RouterUpdate(r['id'],
queue.PRIORITY_SYNC_ROUTERS_TASK,
router=r,
timestamp=timestamp)
self._queue.add(update)
self.fullsync = False
LOG.debug("periodic_sync_routers_task successfully completed")
curr_router_ids = set([r['id'] for r in routers])
self.fullsync = False
LOG.debug("periodic_sync_routers_task successfully completed")
# adjust chunk size after successful sync
if self.sync_routers_chunk_size < SYNC_ROUTERS_MAX_CHUNK_SIZE:
self.sync_routers_chunk_size = min(
self.sync_routers_chunk_size + SYNC_ROUTERS_MIN_CHUNK_SIZE,
SYNC_ROUTERS_MAX_CHUNK_SIZE)
# Delete routers that have disappeared since the last sync
for router_id in prev_router_ids - curr_router_ids:
ns_manager.keep_router(router_id)
update = queue.RouterUpdate(router_id,
queue.PRIORITY_SYNC_ROUTERS_TASK,
timestamp=timestamp,
action=queue.DELETE_ROUTER)
self._queue.add(update)
# Delete routers that have disappeared since the last sync
for router_id in prev_router_ids - curr_router_ids:
ns_manager.keep_router(router_id)
update = queue.RouterUpdate(router_id,
queue.PRIORITY_SYNC_ROUTERS_TASK,
timestamp=timestamp,
action=queue.DELETE_ROUTER)
self._queue.add(update)
def after_start(self):
# Note: the FWaaS' vArmourL3NATAgent is a subclass of L3NATAgent. It

View File

@ -46,7 +46,8 @@ class L3RpcCallback(object):
# 1.6 Added process_prefix_update to support IPv6 Prefix Delegation
# 1.7 Added method delete_agent_gateway_port for DVR Routers
# 1.8 Added address scope information
target = oslo_messaging.Target(version='1.8')
# 1.9 Added get_router_ids
target = oslo_messaging.Target(version='1.9')
@property
def plugin(self):
@ -61,6 +62,10 @@ class L3RpcCallback(object):
plugin_constants.L3_ROUTER_NAT]
return self._l3plugin
def get_router_ids(self, context, host):
"""Returns IDs of routers scheduled to l3 agent on <host>"""
return self.l3plugin.list_router_ids_on_host(context, host)
@db_api.retry_db_errors
def sync_routers(self, context, **kwargs):
"""Sync routers according to filters to a specific agent.

View File

@ -370,8 +370,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
return self.get_sync_data(context, router_ids=router_ids, active=True)
def list_active_sync_routers_on_active_l3_agent(
self, context, host, router_ids):
def list_router_ids_on_host(self, context, host, router_ids=None):
agent = self._get_agent_by_type_and_host(
context, constants.AGENT_TYPE_L3, host)
if not agentschedulers_db.services_available(agent.admin_state_up):
@ -383,8 +382,15 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
if router_ids:
query = query.filter(
RouterL3AgentBinding.router_id.in_(router_ids))
router_ids = [item[0] for item in query]
return [item[0] for item in query]
def list_active_sync_routers_on_active_l3_agent(
self, context, host, router_ids):
router_ids = self.list_router_ids_on_host(context, host, router_ids)
if router_ids:
agent = self._get_agent_by_type_and_host(
context, constants.AGENT_TYPE_L3, host)
return self._get_active_l3_agent_routers_sync_data(context, host,
agent,
router_ids)

View File

@ -90,6 +90,10 @@ class L3AgentTestCase(framework.L3AgentTestFramework):
deleted_routers_info.append(ri)
ns_names_to_retrieve.add(ri.ns_name)
mocked_get_router_ids = self.mock_plugin_api.get_router_ids
mocked_get_router_ids.return_value = [r['id'] for r in
routers_to_keep +
routers_deleted_during_resync]
mocked_get_routers = self.mock_plugin_api.get_routers
mocked_get_routers.return_value = (routers_to_keep +
routers_deleted_during_resync)

View File

@ -200,6 +200,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
def test_periodic_sync_routers_task_raise_exception(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self.plugin_api.get_router_ids.return_value = ['fake_id']
self.plugin_api.get_routers.side_effect = ValueError
self.assertRaises(ValueError,
agent.periodic_sync_routers_task,
@ -247,6 +248,8 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
stale_router_ids = [_uuid(), _uuid()]
active_routers = [{'id': _uuid()}, {'id': _uuid()}]
self.plugin_api.get_router_ids.return_value = [r['id'] for r
in active_routers]
self.plugin_api.get_routers.return_value = active_routers
namespace_list = [namespaces.NS_PREFIX + r_id
for r_id in stale_router_ids]