From aebd27f3b7cbbb6277440de029e773eb781dec9e Mon Sep 17 00:00:00 2001 From: Eugene Nikanorov Date: Mon, 12 Oct 2015 13:59:01 +0400 Subject: [PATCH] Resync L3, DHCP and OVS/LB agents upon revival In big and busy clusters there could be a condition when rabbitmq clustering mechanism synchronizes queues and during this period agents connected to that instance of rabbitmq can't communicate with the server and server considers them dead moving resources away. After agent become active again, it needs to cleanup state entries and synchronize its state with neutron-server. The solution is to make agents aware of their state from neutron-server point of view. This is done by changing state reports from cast to call that would return agent's status. When agent was dead and becomes alive, it would receive special AGENT_REVIVED status indicating that it should refresh its local data which it would not do otherwise. Conflicts: neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py neutron/tests/unit/agent/dhcp/test_agent.py neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py Closes-Bug: #1505166 Change-Id: Id28248f4f75821fbacf46e2c44e40f27f59172a9 (cherry picked from commit 3b6bd917e4b968a47a5aacb7f590143fc83816d9) --- neutron/agent/dhcp/agent.py | 9 ++++++--- neutron/agent/l3/agent.py | 11 +++++++---- neutron/common/constants.py | 8 ++++++++ neutron/db/agents_db.py | 17 +++++++++++++++-- .../agent/linuxbridge_neutron_agent.py | 19 +++++++++++++++---- .../openvswitch/agent/ovs_neutron_agent.py | 17 ++++++++++++----- neutron/tests/unit/agent/dhcp/test_agent.py | 14 ++++++++++++++ neutron/tests/unit/agent/l3/test_agent.py | 17 +++++++++++++++-- .../agent/test_linuxbridge_neutron_agent.py | 7 +++++++ .../agent/test_ovs_neutron_agent.py | 16 ++++++++-------- 10 files changed, 107 insertions(+), 28 deletions(-) diff --git a/neutron/agent/dhcp/agent.py b/neutron/agent/dhcp/agent.py index af0ccf5c7d2..ca3cb26cd76 100644 --- a/neutron/agent/dhcp/agent.py +++ b/neutron/agent/dhcp/agent.py @@ -557,7 +557,6 @@ class DhcpAgentWithStateReport(DhcpAgent): 'start_flag': True, 'agent_type': constants.AGENT_TYPE_DHCP} report_interval = self.conf.AGENT.report_interval - self.use_call = True if report_interval: self.heartbeat = loopingcall.FixedIntervalLoopingCall( self._report_state) @@ -568,8 +567,12 @@ class DhcpAgentWithStateReport(DhcpAgent): self.agent_state.get('configurations').update( self.cache.get_state()) ctx = context.get_admin_context_without_session() - self.state_rpc.report_state(ctx, self.agent_state, self.use_call) - self.use_call = False + agent_status = self.state_rpc.report_state( + ctx, self.agent_state, True) + if agent_status == constants.AGENT_REVIVED: + LOG.info(_LI("Agent has just been revived. " + "Scheduling full sync")) + self.schedule_resync("Agent has just been revived") except AttributeError: # This means the server does not support report_state LOG.warn(_LW("Neutron server does not support state report." diff --git a/neutron/agent/l3/agent.py b/neutron/agent/l3/agent.py index ec931e8a3dc..7098a1786e9 100644 --- a/neutron/agent/l3/agent.py +++ b/neutron/agent/l3/agent.py @@ -613,7 +613,6 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, class L3NATAgentWithStateReport(L3NATAgent): def __init__(self, host, conf=None): - self.use_call = True super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf) self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN) self.agent_state = { @@ -659,10 +658,14 @@ class L3NATAgentWithStateReport(L3NATAgent): configurations['interfaces'] = num_interfaces configurations['floating_ips'] = num_floating_ips try: - self.state_rpc.report_state(self.context, self.agent_state, - self.use_call) + agent_status = self.state_rpc.report_state(self.context, + self.agent_state, + True) + if agent_status == l3_constants.AGENT_REVIVED: + LOG.info(_LI('Agent has just been revived. ' + 'Doing a full sync.')) + self.fullsync = True self.agent_state.pop('start_flag', None) - self.use_call = False except AttributeError: # This means the server does not support report_state LOG.warn(_LW("Neutron server does not support state report." diff --git a/neutron/common/constants.py b/neutron/common/constants.py index a5f34ee9d2d..5ac94c8b4c1 100644 --- a/neutron/common/constants.py +++ b/neutron/common/constants.py @@ -200,3 +200,11 @@ ROUTER_MARK_MASK = "0xffff" # Time format ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' + +# Agent states as detected by server, used to reply on agent's state report +# agent has just been registered +AGENT_NEW = 'new' +# agent is alive +AGENT_ALIVE = 'alive' +# agent has just returned to alive after being dead +AGENT_REVIVED = 'revived' diff --git a/neutron/db/agents_db.py b/neutron/db/agents_db.py index 9417d5e3c37..7d8622b2957 100644 --- a/neutron/db/agents_db.py +++ b/neutron/db/agents_db.py @@ -219,6 +219,12 @@ class AgentDbMixin(ext_agent.AgentPluginBase): 'delta': delta}) def _create_or_update_agent(self, context, agent_state): + """Registers new agent in the database or updates existing. + + Returns agent status from server point of view: alive, new or revived. + It could be used by agent to do some sync with the server if needed. + """ + status = constants.AGENT_ALIVE with context.session.begin(subtransactions=True): res_keys = ['agent_type', 'binary', 'host', 'topic'] res = dict((k, agent_state[k]) for k in res_keys) @@ -230,6 +236,8 @@ class AgentDbMixin(ext_agent.AgentPluginBase): try: agent_db = self._get_agent_by_type_and_host( context, agent_state['agent_type'], agent_state['host']) + if not agent_db.is_active: + status = constants.AGENT_REVIVED res['heartbeat_timestamp'] = current_time if agent_state.get('start_flag'): res['started_at'] = current_time @@ -246,7 +254,9 @@ class AgentDbMixin(ext_agent.AgentPluginBase): greenthread.sleep(0) context.session.add(agent_db) self._log_heartbeat(agent_state, agent_db, configurations_dict) + status = constants.AGENT_NEW greenthread.sleep(0) + return status def create_or_update_agent(self, context, agent): """Create or update agent according to report.""" @@ -286,7 +296,10 @@ class AgentExtRpcCallback(object): self.plugin = plugin def report_state(self, context, **kwargs): - """Report state from agent to server.""" + """Report state from agent to server. + + Returns - agent's status: AGENT_NEW, AGENT_REVIVED, AGENT_ALIVE + """ time = kwargs['time'] time = timeutils.parse_strtime(time) agent_state = kwargs['agent_state']['agent_state'] @@ -301,7 +314,7 @@ class AgentExtRpcCallback(object): return if not self.plugin: self.plugin = manager.NeutronManager.get_plugin() - self.plugin.create_or_update_agent(context, agent_state) + return self.plugin.create_or_update_agent(context, agent_state) def _check_clock_sync_on_agent_start(self, agent_state, agent_time): """Checks if the server and the agent times are in sync. diff --git a/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py index 350f322fb03..a6377e22993 100644 --- a/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -871,6 +871,8 @@ class LinuxBridgeNeutronAgentRPC(service.Service): # stores received port_updates for processing by the main loop self.updated_devices = set() + # flag to do a sync after revival + self.fullsync = False self.context = context.get_admin_context_without_session() self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN) self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN) @@ -892,8 +894,13 @@ class LinuxBridgeNeutronAgentRPC(service.Service): try: devices = len(self.br_mgr.get_tap_devices()) self.agent_state.get('configurations')['devices'] = devices - self.state_rpc.report_state(self.context, - self.agent_state) + agent_status = self.state_rpc.report_state(self.context, + self.agent_state, + True) + if agent_status == constants.AGENT_REVIVED: + LOG.info(_LI('Agent has just been revived. ' + 'Doing a full sync.')) + self.fullsync = True self.agent_state.pop('start_flag', None) except Exception: LOG.exception(_LE("Failed reporting state!")) @@ -1096,11 +1103,15 @@ class LinuxBridgeNeutronAgentRPC(service.Service): while True: start = time.time() - device_info = self.scan_devices(previous=device_info, sync=sync) + if self.fullsync: + sync = True + self.fullsync = False if sync: LOG.info(_LI("Agent out of sync with plugin!")) - sync = False + + device_info = self.scan_devices(previous=device_info, sync=sync) + sync = False if (self._device_info_has_changes(device_info) or self.sg_agent.firewall_refresh_needed()): diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 4b91e349fd4..6eb196a1e36 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -182,6 +182,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, super(OVSNeutronAgent, self).__init__() self.conf = conf or cfg.CONF + self.fullsync = True # init bridge classes with configured datapath type. self.br_int_cls, self.br_phys_cls, self.br_tun_cls = ( functools.partial(bridge_classes[b], @@ -192,7 +193,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.veth_mtu = veth_mtu self.available_local_vlans = set(moves.range(p_const.MIN_VLAN_TAG, p_const.MAX_VLAN_TAG)) - self.use_call = True self.tunnel_types = tunnel_types or [] self.l2_pop = l2_population # TODO(ethuleau): Change ARP responder so it's not dependent on the @@ -326,9 +326,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.dvr_agent.in_distributed_mode()) try: - self.state_rpc.report_state(self.context, - self.agent_state, - self.use_call) + agent_status = self.state_rpc.report_state(self.context, + self.agent_state, + True) + if agent_status == n_const.AGENT_REVIVED: + LOG.info(_LI('Agent has just been revived. ' + 'Doing a full sync.')) + self.fullsync = True self.use_call = False self.agent_state.pop('start_flag', None) except Exception: @@ -1662,7 +1666,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, if not polling_manager: polling_manager = polling.get_polling_manager( minimize_polling=False) - sync = True ports = set() updated_ports_copy = set() @@ -1672,6 +1675,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, consecutive_resyncs = 0 need_clean_stale_flow = True while self._check_and_handle_signal(): + if self.fullsync: + LOG.info(_LI("rpc_loop doing a full sync.")) + sync = True + self.fullsync = False port_info = {} ancillary_port_info = {} start = time.time() diff --git a/neutron/tests/unit/agent/dhcp/test_agent.py b/neutron/tests/unit/agent/dhcp/test_agent.py index 3df63de42c3..cc5cf876d01 100644 --- a/neutron/tests/unit/agent/dhcp/test_agent.py +++ b/neutron/tests/unit/agent/dhcp/test_agent.py @@ -425,6 +425,20 @@ class TestDhcpAgent(base.BaseTestCase): dhcp.periodic_resync() spawn.assert_called_once_with(dhcp._periodic_resync_helper) + def test_report_state_revival_logic(self): + dhcp = dhcp_agent.DhcpAgentWithStateReport(HOSTNAME) + with mock.patch.object(dhcp.state_rpc, + 'report_state') as report_state,\ + mock.patch.object(dhcp, "run"): + report_state.return_value = const.AGENT_ALIVE + dhcp._report_state() + self.assertEqual(dhcp.needs_resync_reasons, {}) + + report_state.return_value = const.AGENT_REVIVED + dhcp._report_state() + self.assertEqual(dhcp.needs_resync_reasons[None], + ['Agent has just been revived']) + def test_periodic_resync_helper(self): with mock.patch.object(dhcp_agent.eventlet, 'sleep') as sleep: dhcp = dhcp_agent.DhcpAgent(HOSTNAME) diff --git a/neutron/tests/unit/agent/l3/test_agent.py b/neutron/tests/unit/agent/l3/test_agent.py index dccf2cc8fd4..68b5ed806a7 100644 --- a/neutron/tests/unit/agent/l3/test_agent.py +++ b/neutron/tests/unit/agent/l3/test_agent.py @@ -215,13 +215,26 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): conf=self.conf) self.assertEqual(agent.agent_state['start_flag'], True) - use_call_arg = agent.use_call agent.after_start() report_state.assert_called_once_with(agent.context, agent.agent_state, - use_call_arg) + True) self.assertTrue(agent.agent_state.get('start_flag') is None) + def test_report_state_revival_logic(self): + with mock.patch.object(agent_rpc.PluginReportStateAPI, + 'report_state') as report_state: + agent = l3_agent.L3NATAgentWithStateReport(host=HOSTNAME, + conf=self.conf) + report_state.return_value = l3_constants.AGENT_REVIVED + agent._report_state() + self.assertTrue(agent.fullsync) + + agent.fullsync = False + report_state.return_value = l3_constants.AGENT_ALIVE + agent._report_state() + self.assertFalse(agent.fullsync) + def test_periodic_sync_routers_task_call_clean_stale_namespaces(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) self.plugin_api.get_routers.return_value = [] diff --git a/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py index fbc6c57c276..0c25dc0dfba 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py @@ -366,6 +366,13 @@ class TestLinuxBridgeAgent(base.BaseTestCase): self.agent.stop() self.assertFalse(mock_set_rpc.called) + def test_report_state_revived(self): + with mock.patch.object(self.agent.state_rpc, + "report_state") as report_st: + report_st.return_value = constants.AGENT_REVIVED + self.agent._report_state() + self.assertTrue(self.agent.fullsync) + class TestLinuxBridgeManager(base.BaseTestCase): def setUp(self): diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 07d4e955e66..fe4c1fb8d68 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -145,9 +145,6 @@ class TestOvsNeutronAgent(object): return_value=[]): self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(), **kwargs) - # set back to true because initial report state will succeed due - # to mocked out RPC calls - self.agent.use_call = True self.agent.tun_br = self.br_tun_cls(br_name='br-tun') self.agent.sg_agent = mock.Mock() @@ -709,14 +706,13 @@ class TestOvsNeutronAgent(object): report_st.assert_called_with(self.agent.context, self.agent.agent_state, True) self.assertNotIn("start_flag", self.agent.agent_state) - self.assertFalse(self.agent.use_call) self.assertEqual( self.agent.agent_state["configurations"]["devices"], self.agent.int_br_device_count ) self.agent._report_state() report_st.assert_called_with(self.agent.context, - self.agent.agent_state, False) + self.agent.agent_state, True) def test_report_state_fail(self): with mock.patch.object(self.agent.state_rpc, @@ -729,6 +725,13 @@ class TestOvsNeutronAgent(object): report_st.assert_called_with(self.agent.context, self.agent.agent_state, True) + def test_report_state_revived(self): + with mock.patch.object(self.agent.state_rpc, + "report_state") as report_st: + report_st.return_value = n_const.AGENT_REVIVED + self.agent._report_state() + self.assertTrue(self.agent.fullsync) + def test_port_update(self): port = {"id": TEST_PORT_ID1, "network_id": TEST_NETWORK_ID1, @@ -1775,9 +1778,6 @@ class TestOvsDvrNeutronAgent(object): return_value=[]): self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(), **kwargs) - # set back to true because initial report state will succeed due - # to mocked out RPC calls - self.agent.use_call = True self.agent.tun_br = self.br_tun_cls(br_name='br-tun') self.agent.sg_agent = mock.Mock()