Ensure l3 agent receives notification about added router
Currently router_added (and other) notifications are sent
to agents with an RPC cast() method which does not ensure that
the message is actually delivered to the recipient.
If the message is lost (due to instability of messaging system
in some failover scenarios for example) neither server nor agent
will be aware of that and router will be "lost" till next agent
resync. Resync will only happen in case of errors on agent side
or restart.
The fix makes server use call() to notify agents about added routers
thus ensuring no routers will be lost.
This also unifies reschedule_router() method to avoid code duplication
between legacy and dvr agent schedulers.
Closes-Bug: #1482630
Related-Bug #1404743
Change-Id: Id08764ba837d8f47a28649d081a5876797fe369e
(cherry picked from commit 30b121dfa4
)
This commit is contained in:
parent
5bb0b87e47
commit
1eb40b49f3
|
@ -38,13 +38,15 @@ class L3AgentNotifyAPI(object):
|
|||
target = oslo_messaging.Target(topic=topic, version='1.0')
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def _notification_host(self, context, method, payload, host):
|
||||
def _notification_host(self, context, method, payload, host,
|
||||
use_call=False,):
|
||||
"""Notify the agent that is hosting the router."""
|
||||
LOG.debug('Notify agent at %(host)s the message '
|
||||
'%(method)s', {'host': host,
|
||||
'method': method})
|
||||
cctxt = self.client.prepare(server=host)
|
||||
cctxt.cast(context, method, payload=payload)
|
||||
rpc_method = cctxt.call if use_call else cctxt.cast
|
||||
rpc_method(context, method, payload=payload)
|
||||
|
||||
def _agent_notification(self, context, method, router_ids, operation,
|
||||
shuffle_agents):
|
||||
|
@ -157,5 +159,9 @@ class L3AgentNotifyAPI(object):
|
|||
{'router_id': router_id}, host)
|
||||
|
||||
def router_added_to_agent(self, context, router_ids, host):
|
||||
# need to use call here as we want to be sure agent received
|
||||
# notification and router will not be "lost". However using call()
|
||||
# itself is not a guarantee, calling code should handle exceptions and
|
||||
# retry
|
||||
self._notification_host(context, 'router_added_to_agent',
|
||||
router_ids, host)
|
||||
router_ids, host, use_call=True)
|
||||
|
|
|
@ -55,6 +55,10 @@ L3_AGENTS_SCHEDULER_OPTS = [
|
|||
|
||||
cfg.CONF.register_opts(L3_AGENTS_SCHEDULER_OPTS)
|
||||
|
||||
# default messaging timeout is 60 sec, so 2 here is chosen to not block API
|
||||
# call for more than 2 minutes
|
||||
AGENT_NOTIFY_MAX_ATTEMPTS = 2
|
||||
|
||||
|
||||
class RouterL3AgentBinding(model_base.BASEV2):
|
||||
"""Represents binding between neutron routers and L3 agents."""
|
||||
|
@ -270,7 +274,7 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
|||
query.delete()
|
||||
|
||||
def reschedule_router(self, context, router_id, candidates=None):
|
||||
"""Reschedule router to a new l3 agent
|
||||
"""Reschedule router to (a) new l3 agent(s)
|
||||
|
||||
Remove the router from the agent(s) currently hosting it and
|
||||
schedule it again
|
||||
|
@ -281,19 +285,45 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
|||
for agent in cur_agents:
|
||||
self._unbind_router(context, router_id, agent['id'])
|
||||
|
||||
new_agent = self.schedule_router(context, router_id,
|
||||
candidates=candidates)
|
||||
if not new_agent:
|
||||
self.schedule_router(context, router_id, candidates=candidates)
|
||||
new_agents = self.list_l3_agents_hosting_router(
|
||||
context, router_id)['agents']
|
||||
if not new_agents:
|
||||
raise l3agentscheduler.RouterReschedulingFailed(
|
||||
router_id=router_id)
|
||||
|
||||
self._notify_agents_router_rescheduled(context, router_id,
|
||||
cur_agents, new_agents)
|
||||
|
||||
def _notify_agents_router_rescheduled(self, context, router_id,
|
||||
old_agents, new_agents):
|
||||
l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
|
||||
if l3_notifier:
|
||||
for agent in cur_agents:
|
||||
l3_notifier.router_removed_from_agent(
|
||||
context, router_id, agent['host'])
|
||||
l3_notifier.router_added_to_agent(
|
||||
context, [router_id], new_agent.host)
|
||||
if not l3_notifier:
|
||||
return
|
||||
|
||||
old_hosts = [agent['host'] for agent in old_agents]
|
||||
new_hosts = [agent['host'] for agent in new_agents]
|
||||
for host in set(old_hosts) - set(new_hosts):
|
||||
l3_notifier.router_removed_from_agent(
|
||||
context, router_id, host)
|
||||
|
||||
for agent in new_agents:
|
||||
# Need to make sure agents are notified or unschedule otherwise
|
||||
for attempt in range(AGENT_NOTIFY_MAX_ATTEMPTS):
|
||||
try:
|
||||
l3_notifier.router_added_to_agent(
|
||||
context, [router_id], agent['host'])
|
||||
break
|
||||
except oslo_messaging.MessagingException:
|
||||
LOG.warning(_LW('Failed to notify L3 agent on host '
|
||||
'%(host)s about added router. Attempt '
|
||||
'%(attempt)d out of %(max_attempts)d'),
|
||||
{'host': agent['host'], 'attempt': attempt + 1,
|
||||
'max_attempts': AGENT_NOTIFY_MAX_ATTEMPTS})
|
||||
else:
|
||||
self._unbind_router(context, router_id, agent['id'])
|
||||
raise l3agentscheduler.RouterReschedulingFailed(
|
||||
router_id=router_id)
|
||||
|
||||
def list_routers_on_l3_agent(self, context, agent_id):
|
||||
query = context.session.query(RouterL3AgentBinding.router_id)
|
||||
|
|
|
@ -386,42 +386,12 @@ class L3_DVRsch_db_mixin(l3agent_sch_db.L3AgentSchedulerDbMixin):
|
|||
context, router_id, chosen_agent)
|
||||
return chosen_agent
|
||||
|
||||
def reschedule_router(self, context, router_id, candidates=None):
|
||||
"""Reschedule router to new l3 agents
|
||||
|
||||
Remove the router from l3 agents currently hosting it and
|
||||
schedule it again
|
||||
"""
|
||||
def _unbind_router(self, context, router_id, agent_id):
|
||||
router = self.get_router(context, router_id)
|
||||
is_distributed = router.get('distributed', False)
|
||||
if not is_distributed:
|
||||
return super(L3_DVRsch_db_mixin, self).reschedule_router(
|
||||
context, router_id, candidates)
|
||||
|
||||
old_agents = self.list_l3_agents_hosting_router(
|
||||
context, router_id)['agents']
|
||||
with context.session.begin(subtransactions=True):
|
||||
for agent in old_agents:
|
||||
self._unbind_router(context, router_id, agent['id'])
|
||||
self.unbind_snat_servicenode(context, router_id)
|
||||
|
||||
self.schedule_router(context, router_id, candidates=candidates)
|
||||
new_agents = self.list_l3_agents_hosting_router(
|
||||
context, router_id)['agents']
|
||||
if not new_agents:
|
||||
raise l3agentscheduler.RouterReschedulingFailed(
|
||||
router_id=router_id)
|
||||
|
||||
l3_notifier = self.agent_notifiers.get(n_const.AGENT_TYPE_L3)
|
||||
if l3_notifier:
|
||||
old_hosts = [agent['host'] for agent in old_agents]
|
||||
new_hosts = [agent['host'] for agent in new_agents]
|
||||
for host in set(old_hosts) - set(new_hosts):
|
||||
l3_notifier.router_removed_from_agent(
|
||||
context, router_id, host)
|
||||
for host in new_hosts:
|
||||
l3_notifier.router_added_to_agent(
|
||||
context, [router_id], host)
|
||||
super(L3_DVRsch_db_mixin, self)._unbind_router(context, router_id,
|
||||
agent_id)
|
||||
if router.get('distributed', False):
|
||||
self.unbind_snat(context, router_id, agent_id)
|
||||
|
||||
def _get_active_l3_agent_routers_sync_data(self, context, host, agent,
|
||||
router_ids):
|
||||
|
|
|
@ -229,6 +229,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin,
|
|||
service_plugins = {'l3_plugin_name': self.l3_plugin}
|
||||
else:
|
||||
service_plugins = None
|
||||
mock.patch('neutron.common.rpc.get_client').start()
|
||||
super(OvsAgentSchedulerTestCaseBase, self).setUp(
|
||||
self.plugin_str, service_plugins=service_plugins)
|
||||
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
|
||||
|
@ -773,6 +774,49 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
|
|||
self.assertIn(dvr_agent['host'],
|
||||
[a['host'] for a in agents['agents']])
|
||||
|
||||
def test_router_reschedule_succeeded_after_failed_notification(self):
|
||||
l3_plugin = (manager.NeutronManager.get_service_plugins()
|
||||
[service_constants.L3_ROUTER_NAT])
|
||||
l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3]
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_agent_states()
|
||||
with self.router() as router:
|
||||
# schedule the router to host A
|
||||
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
with mock.patch.object(
|
||||
l3_notifier, 'router_added_to_agent') as notification_mock:
|
||||
notification_mock.side_effect = [
|
||||
oslo_messaging.MessagingTimeout, None]
|
||||
self._take_down_agent_and_run_reschedule(L3_HOSTA)
|
||||
self.assertEqual(
|
||||
2, l3_notifier.router_added_to_agent.call_count)
|
||||
# make sure router was rescheduled even when first attempt
|
||||
# failed to notify l3 agent
|
||||
l3_agents = self._list_l3_agents_hosting_router(
|
||||
router['router']['id'])['agents']
|
||||
self.assertEqual(1, len(l3_agents))
|
||||
self.assertEqual(L3_HOSTB, l3_agents[0]['host'])
|
||||
|
||||
def test_router_reschedule_failed_notification_all_attempts(self):
|
||||
l3_plugin = (manager.NeutronManager.get_service_plugins()
|
||||
[service_constants.L3_ROUTER_NAT])
|
||||
l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3]
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
self._register_agent_states()
|
||||
with self.router() as router:
|
||||
# schedule the router to host A
|
||||
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
|
||||
with mock.patch.object(
|
||||
l3_notifier, 'router_added_to_agent') as notification_mock:
|
||||
notification_mock.side_effect = oslo_messaging.MessagingTimeout
|
||||
self._take_down_agent_and_run_reschedule(L3_HOSTA)
|
||||
self.assertEqual(
|
||||
l3_agentschedulers_db.AGENT_NOTIFY_MAX_ATTEMPTS,
|
||||
l3_notifier.router_added_to_agent.call_count)
|
||||
l3_agents = self._list_l3_agents_hosting_router(
|
||||
router['router']['id'])['agents']
|
||||
self.assertEqual(0, len(l3_agents))
|
||||
|
||||
def test_router_auto_schedule_with_invalid_router(self):
|
||||
with self.router() as router:
|
||||
l3_rpc_cb = l3_rpc.L3RpcCallback()
|
||||
|
@ -1494,7 +1538,7 @@ class OvsL3AgentNotifierTestCase(test_l3.L3NatTestCaseMixin,
|
|||
l3_notifier.client,
|
||||
'prepare',
|
||||
return_value=l3_notifier.client) as mock_prepare,\
|
||||
mock.patch.object(l3_notifier.client, 'cast') as mock_cast,\
|
||||
mock.patch.object(l3_notifier.client, 'call') as mock_call,\
|
||||
self.router() as router1:
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
|
@ -1503,7 +1547,7 @@ class OvsL3AgentNotifierTestCase(test_l3.L3NatTestCaseMixin,
|
|||
router1['router']['id'])
|
||||
routers = [router1['router']['id']]
|
||||
mock_prepare.assert_called_with(server='hosta')
|
||||
mock_cast.assert_called_with(
|
||||
mock_call.assert_called_with(
|
||||
mock.ANY, 'router_added_to_agent', payload=routers)
|
||||
notifications = fake_notifier.NOTIFICATIONS
|
||||
expected_event_type = 'l3_agent.router.add'
|
||||
|
@ -1518,6 +1562,7 @@ class OvsL3AgentNotifierTestCase(test_l3.L3NatTestCaseMixin,
|
|||
'prepare',
|
||||
return_value=l3_notifier.client) as mock_prepare,\
|
||||
mock.patch.object(l3_notifier.client, 'cast') as mock_cast,\
|
||||
mock.patch.object(l3_notifier.client, 'call'),\
|
||||
self.router() as router1:
|
||||
self._register_agent_states()
|
||||
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
|
||||
|
|
|
@ -1662,6 +1662,7 @@ class L3HATestCaseMixin(testlib_api.SqlTestCase,
|
|||
super(L3HATestCaseMixin, self).setUp()
|
||||
|
||||
self.adminContext = n_context.get_admin_context()
|
||||
mock.patch('neutron.common.rpc.get_client').start()
|
||||
self.plugin = L3HAPlugin()
|
||||
|
||||
self.setup_coreplugin('neutron.plugins.ml2.plugin.Ml2Plugin')
|
||||
|
|
Loading…
Reference in New Issue