diff --git a/neutron/agent/dhcp/agent.py b/neutron/agent/dhcp/agent.py index af0ccf5c7d2..ca3cb26cd76 100644 --- a/neutron/agent/dhcp/agent.py +++ b/neutron/agent/dhcp/agent.py @@ -557,7 +557,6 @@ class DhcpAgentWithStateReport(DhcpAgent): 'start_flag': True, 'agent_type': constants.AGENT_TYPE_DHCP} report_interval = self.conf.AGENT.report_interval - self.use_call = True if report_interval: self.heartbeat = loopingcall.FixedIntervalLoopingCall( self._report_state) @@ -568,8 +567,12 @@ class DhcpAgentWithStateReport(DhcpAgent): self.agent_state.get('configurations').update( self.cache.get_state()) ctx = context.get_admin_context_without_session() - self.state_rpc.report_state(ctx, self.agent_state, self.use_call) - self.use_call = False + agent_status = self.state_rpc.report_state( + ctx, self.agent_state, True) + if agent_status == constants.AGENT_REVIVED: + LOG.info(_LI("Agent has just been revived. " + "Scheduling full sync")) + self.schedule_resync("Agent has just been revived") except AttributeError: # This means the server does not support report_state LOG.warn(_LW("Neutron server does not support state report." diff --git a/neutron/agent/l3/agent.py b/neutron/agent/l3/agent.py index ec931e8a3dc..7098a1786e9 100644 --- a/neutron/agent/l3/agent.py +++ b/neutron/agent/l3/agent.py @@ -613,7 +613,6 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, class L3NATAgentWithStateReport(L3NATAgent): def __init__(self, host, conf=None): - self.use_call = True super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf) self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN) self.agent_state = { @@ -659,10 +658,14 @@ class L3NATAgentWithStateReport(L3NATAgent): configurations['interfaces'] = num_interfaces configurations['floating_ips'] = num_floating_ips try: - self.state_rpc.report_state(self.context, self.agent_state, - self.use_call) + agent_status = self.state_rpc.report_state(self.context, + self.agent_state, + True) + if agent_status == l3_constants.AGENT_REVIVED: + LOG.info(_LI('Agent has just been revived. ' + 'Doing a full sync.')) + self.fullsync = True self.agent_state.pop('start_flag', None) - self.use_call = False except AttributeError: # This means the server does not support report_state LOG.warn(_LW("Neutron server does not support state report." diff --git a/neutron/common/constants.py b/neutron/common/constants.py index a5f34ee9d2d..5ac94c8b4c1 100644 --- a/neutron/common/constants.py +++ b/neutron/common/constants.py @@ -200,3 +200,11 @@ ROUTER_MARK_MASK = "0xffff" # Time format ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' + +# Agent states as detected by server, used to reply on agent's state report +# agent has just been registered +AGENT_NEW = 'new' +# agent is alive +AGENT_ALIVE = 'alive' +# agent has just returned to alive after being dead +AGENT_REVIVED = 'revived' diff --git a/neutron/db/agents_db.py b/neutron/db/agents_db.py index 9417d5e3c37..7d8622b2957 100644 --- a/neutron/db/agents_db.py +++ b/neutron/db/agents_db.py @@ -219,6 +219,12 @@ class AgentDbMixin(ext_agent.AgentPluginBase): 'delta': delta}) def _create_or_update_agent(self, context, agent_state): + """Registers new agent in the database or updates existing. + + Returns agent status from server point of view: alive, new or revived. + It could be used by agent to do some sync with the server if needed. + """ + status = constants.AGENT_ALIVE with context.session.begin(subtransactions=True): res_keys = ['agent_type', 'binary', 'host', 'topic'] res = dict((k, agent_state[k]) for k in res_keys) @@ -230,6 +236,8 @@ class AgentDbMixin(ext_agent.AgentPluginBase): try: agent_db = self._get_agent_by_type_and_host( context, agent_state['agent_type'], agent_state['host']) + if not agent_db.is_active: + status = constants.AGENT_REVIVED res['heartbeat_timestamp'] = current_time if agent_state.get('start_flag'): res['started_at'] = current_time @@ -246,7 +254,9 @@ class AgentDbMixin(ext_agent.AgentPluginBase): greenthread.sleep(0) context.session.add(agent_db) self._log_heartbeat(agent_state, agent_db, configurations_dict) + status = constants.AGENT_NEW greenthread.sleep(0) + return status def create_or_update_agent(self, context, agent): """Create or update agent according to report.""" @@ -286,7 +296,10 @@ class AgentExtRpcCallback(object): self.plugin = plugin def report_state(self, context, **kwargs): - """Report state from agent to server.""" + """Report state from agent to server. + + Returns - agent's status: AGENT_NEW, AGENT_REVIVED, AGENT_ALIVE + """ time = kwargs['time'] time = timeutils.parse_strtime(time) agent_state = kwargs['agent_state']['agent_state'] @@ -301,7 +314,7 @@ class AgentExtRpcCallback(object): return if not self.plugin: self.plugin = manager.NeutronManager.get_plugin() - self.plugin.create_or_update_agent(context, agent_state) + return self.plugin.create_or_update_agent(context, agent_state) def _check_clock_sync_on_agent_start(self, agent_state, agent_time): """Checks if the server and the agent times are in sync. diff --git a/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py index 350f322fb03..a6377e22993 100644 --- a/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -871,6 +871,8 @@ class LinuxBridgeNeutronAgentRPC(service.Service): # stores received port_updates for processing by the main loop self.updated_devices = set() + # flag to do a sync after revival + self.fullsync = False self.context = context.get_admin_context_without_session() self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN) self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN) @@ -892,8 +894,13 @@ class LinuxBridgeNeutronAgentRPC(service.Service): try: devices = len(self.br_mgr.get_tap_devices()) self.agent_state.get('configurations')['devices'] = devices - self.state_rpc.report_state(self.context, - self.agent_state) + agent_status = self.state_rpc.report_state(self.context, + self.agent_state, + True) + if agent_status == constants.AGENT_REVIVED: + LOG.info(_LI('Agent has just been revived. ' + 'Doing a full sync.')) + self.fullsync = True self.agent_state.pop('start_flag', None) except Exception: LOG.exception(_LE("Failed reporting state!")) @@ -1096,11 +1103,15 @@ class LinuxBridgeNeutronAgentRPC(service.Service): while True: start = time.time() - device_info = self.scan_devices(previous=device_info, sync=sync) + if self.fullsync: + sync = True + self.fullsync = False if sync: LOG.info(_LI("Agent out of sync with plugin!")) - sync = False + + device_info = self.scan_devices(previous=device_info, sync=sync) + sync = False if (self._device_info_has_changes(device_info) or self.sg_agent.firewall_refresh_needed()): diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 4b91e349fd4..6eb196a1e36 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -182,6 +182,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, super(OVSNeutronAgent, self).__init__() self.conf = conf or cfg.CONF + self.fullsync = True # init bridge classes with configured datapath type. self.br_int_cls, self.br_phys_cls, self.br_tun_cls = ( functools.partial(bridge_classes[b], @@ -192,7 +193,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.veth_mtu = veth_mtu self.available_local_vlans = set(moves.range(p_const.MIN_VLAN_TAG, p_const.MAX_VLAN_TAG)) - self.use_call = True self.tunnel_types = tunnel_types or [] self.l2_pop = l2_population # TODO(ethuleau): Change ARP responder so it's not dependent on the @@ -326,9 +326,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.dvr_agent.in_distributed_mode()) try: - self.state_rpc.report_state(self.context, - self.agent_state, - self.use_call) + agent_status = self.state_rpc.report_state(self.context, + self.agent_state, + True) + if agent_status == n_const.AGENT_REVIVED: + LOG.info(_LI('Agent has just been revived. ' + 'Doing a full sync.')) + self.fullsync = True self.use_call = False self.agent_state.pop('start_flag', None) except Exception: @@ -1662,7 +1666,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, if not polling_manager: polling_manager = polling.get_polling_manager( minimize_polling=False) - sync = True ports = set() updated_ports_copy = set() @@ -1672,6 +1675,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, consecutive_resyncs = 0 need_clean_stale_flow = True while self._check_and_handle_signal(): + if self.fullsync: + LOG.info(_LI("rpc_loop doing a full sync.")) + sync = True + self.fullsync = False port_info = {} ancillary_port_info = {} start = time.time() diff --git a/neutron/tests/unit/agent/dhcp/test_agent.py b/neutron/tests/unit/agent/dhcp/test_agent.py index 3df63de42c3..cc5cf876d01 100644 --- a/neutron/tests/unit/agent/dhcp/test_agent.py +++ b/neutron/tests/unit/agent/dhcp/test_agent.py @@ -425,6 +425,20 @@ class TestDhcpAgent(base.BaseTestCase): dhcp.periodic_resync() spawn.assert_called_once_with(dhcp._periodic_resync_helper) + def test_report_state_revival_logic(self): + dhcp = dhcp_agent.DhcpAgentWithStateReport(HOSTNAME) + with mock.patch.object(dhcp.state_rpc, + 'report_state') as report_state,\ + mock.patch.object(dhcp, "run"): + report_state.return_value = const.AGENT_ALIVE + dhcp._report_state() + self.assertEqual(dhcp.needs_resync_reasons, {}) + + report_state.return_value = const.AGENT_REVIVED + dhcp._report_state() + self.assertEqual(dhcp.needs_resync_reasons[None], + ['Agent has just been revived']) + def test_periodic_resync_helper(self): with mock.patch.object(dhcp_agent.eventlet, 'sleep') as sleep: dhcp = dhcp_agent.DhcpAgent(HOSTNAME) diff --git a/neutron/tests/unit/agent/l3/test_agent.py b/neutron/tests/unit/agent/l3/test_agent.py index 837325f6e77..98e4d1e459f 100644 --- a/neutron/tests/unit/agent/l3/test_agent.py +++ b/neutron/tests/unit/agent/l3/test_agent.py @@ -215,13 +215,26 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): conf=self.conf) self.assertEqual(agent.agent_state['start_flag'], True) - use_call_arg = agent.use_call agent.after_start() report_state.assert_called_once_with(agent.context, agent.agent_state, - use_call_arg) + True) self.assertTrue(agent.agent_state.get('start_flag') is None) + def test_report_state_revival_logic(self): + with mock.patch.object(agent_rpc.PluginReportStateAPI, + 'report_state') as report_state: + agent = l3_agent.L3NATAgentWithStateReport(host=HOSTNAME, + conf=self.conf) + report_state.return_value = l3_constants.AGENT_REVIVED + agent._report_state() + self.assertTrue(agent.fullsync) + + agent.fullsync = False + report_state.return_value = l3_constants.AGENT_ALIVE + agent._report_state() + self.assertFalse(agent.fullsync) + def test_periodic_sync_routers_task_call_clean_stale_namespaces(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) self.plugin_api.get_routers.return_value = [] diff --git a/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py index fbc6c57c276..0c25dc0dfba 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py @@ -366,6 +366,13 @@ class TestLinuxBridgeAgent(base.BaseTestCase): self.agent.stop() self.assertFalse(mock_set_rpc.called) + def test_report_state_revived(self): + with mock.patch.object(self.agent.state_rpc, + "report_state") as report_st: + report_st.return_value = constants.AGENT_REVIVED + self.agent._report_state() + self.assertTrue(self.agent.fullsync) + class TestLinuxBridgeManager(base.BaseTestCase): def setUp(self): diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 07d4e955e66..fe4c1fb8d68 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -145,9 +145,6 @@ class TestOvsNeutronAgent(object): return_value=[]): self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(), **kwargs) - # set back to true because initial report state will succeed due - # to mocked out RPC calls - self.agent.use_call = True self.agent.tun_br = self.br_tun_cls(br_name='br-tun') self.agent.sg_agent = mock.Mock() @@ -709,14 +706,13 @@ class TestOvsNeutronAgent(object): report_st.assert_called_with(self.agent.context, self.agent.agent_state, True) self.assertNotIn("start_flag", self.agent.agent_state) - self.assertFalse(self.agent.use_call) self.assertEqual( self.agent.agent_state["configurations"]["devices"], self.agent.int_br_device_count ) self.agent._report_state() report_st.assert_called_with(self.agent.context, - self.agent.agent_state, False) + self.agent.agent_state, True) def test_report_state_fail(self): with mock.patch.object(self.agent.state_rpc, @@ -729,6 +725,13 @@ class TestOvsNeutronAgent(object): report_st.assert_called_with(self.agent.context, self.agent.agent_state, True) + def test_report_state_revived(self): + with mock.patch.object(self.agent.state_rpc, + "report_state") as report_st: + report_st.return_value = n_const.AGENT_REVIVED + self.agent._report_state() + self.assertTrue(self.agent.fullsync) + def test_port_update(self): port = {"id": TEST_PORT_ID1, "network_id": TEST_NETWORK_ID1, @@ -1775,9 +1778,6 @@ class TestOvsDvrNeutronAgent(object): return_value=[]): self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(), **kwargs) - # set back to true because initial report state will succeed due - # to mocked out RPC calls - self.agent.use_call = True self.agent.tun_br = self.br_tun_cls(br_name='br-tun') self.agent.sg_agent = mock.Mock()