Resync L3, DHCP and OVS/LB agents upon revival
In big and busy clusters there could be a condition when
rabbitmq clustering mechanism synchronizes queues and during
this period agents connected to that instance of rabbitmq
can't communicate with the server and server considers them
dead moving resources away. After agent become active again,
it needs to cleanup state entries and synchronize its state
with neutron-server.
The solution is to make agents aware of their state from
neutron-server point of view. This is done by changing state
reports from cast to call that would return agent's status.
When agent was dead and becomes alive, it would receive special
AGENT_REVIVED status indicating that it should refresh its
local data which it would not do otherwise.
Conflicts:
neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py
neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
neutron/tests/unit/agent/dhcp/test_agent.py
neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/test_linuxbridge_neutron_agent.py
neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py
Closes-Bug: #1505166
Change-Id: Id28248f4f75821fbacf46e2c44e40f27f59172a9
(cherry picked from commit 3b6bd917e4
)
This commit is contained in:
parent
30b53c9cc5
commit
aebd27f3b7
|
@ -557,7 +557,6 @@ class DhcpAgentWithStateReport(DhcpAgent):
|
|||
'start_flag': True,
|
||||
'agent_type': constants.AGENT_TYPE_DHCP}
|
||||
report_interval = self.conf.AGENT.report_interval
|
||||
self.use_call = True
|
||||
if report_interval:
|
||||
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
|
||||
self._report_state)
|
||||
|
@ -568,8 +567,12 @@ class DhcpAgentWithStateReport(DhcpAgent):
|
|||
self.agent_state.get('configurations').update(
|
||||
self.cache.get_state())
|
||||
ctx = context.get_admin_context_without_session()
|
||||
self.state_rpc.report_state(ctx, self.agent_state, self.use_call)
|
||||
self.use_call = False
|
||||
agent_status = self.state_rpc.report_state(
|
||||
ctx, self.agent_state, True)
|
||||
if agent_status == constants.AGENT_REVIVED:
|
||||
LOG.info(_LI("Agent has just been revived. "
|
||||
"Scheduling full sync"))
|
||||
self.schedule_resync("Agent has just been revived")
|
||||
except AttributeError:
|
||||
# This means the server does not support report_state
|
||||
LOG.warn(_LW("Neutron server does not support state report."
|
||||
|
|
|
@ -613,7 +613,6 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
|
|||
class L3NATAgentWithStateReport(L3NATAgent):
|
||||
|
||||
def __init__(self, host, conf=None):
|
||||
self.use_call = True
|
||||
super(L3NATAgentWithStateReport, self).__init__(host=host, conf=conf)
|
||||
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
|
||||
self.agent_state = {
|
||||
|
@ -659,10 +658,14 @@ class L3NATAgentWithStateReport(L3NATAgent):
|
|||
configurations['interfaces'] = num_interfaces
|
||||
configurations['floating_ips'] = num_floating_ips
|
||||
try:
|
||||
self.state_rpc.report_state(self.context, self.agent_state,
|
||||
self.use_call)
|
||||
agent_status = self.state_rpc.report_state(self.context,
|
||||
self.agent_state,
|
||||
True)
|
||||
if agent_status == l3_constants.AGENT_REVIVED:
|
||||
LOG.info(_LI('Agent has just been revived. '
|
||||
'Doing a full sync.'))
|
||||
self.fullsync = True
|
||||
self.agent_state.pop('start_flag', None)
|
||||
self.use_call = False
|
||||
except AttributeError:
|
||||
# This means the server does not support report_state
|
||||
LOG.warn(_LW("Neutron server does not support state report."
|
||||
|
|
|
@ -200,3 +200,11 @@ ROUTER_MARK_MASK = "0xffff"
|
|||
|
||||
# Time format
|
||||
ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
|
||||
|
||||
# Agent states as detected by server, used to reply on agent's state report
|
||||
# agent has just been registered
|
||||
AGENT_NEW = 'new'
|
||||
# agent is alive
|
||||
AGENT_ALIVE = 'alive'
|
||||
# agent has just returned to alive after being dead
|
||||
AGENT_REVIVED = 'revived'
|
||||
|
|
|
@ -219,6 +219,12 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
|
|||
'delta': delta})
|
||||
|
||||
def _create_or_update_agent(self, context, agent_state):
|
||||
"""Registers new agent in the database or updates existing.
|
||||
|
||||
Returns agent status from server point of view: alive, new or revived.
|
||||
It could be used by agent to do some sync with the server if needed.
|
||||
"""
|
||||
status = constants.AGENT_ALIVE
|
||||
with context.session.begin(subtransactions=True):
|
||||
res_keys = ['agent_type', 'binary', 'host', 'topic']
|
||||
res = dict((k, agent_state[k]) for k in res_keys)
|
||||
|
@ -230,6 +236,8 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
|
|||
try:
|
||||
agent_db = self._get_agent_by_type_and_host(
|
||||
context, agent_state['agent_type'], agent_state['host'])
|
||||
if not agent_db.is_active:
|
||||
status = constants.AGENT_REVIVED
|
||||
res['heartbeat_timestamp'] = current_time
|
||||
if agent_state.get('start_flag'):
|
||||
res['started_at'] = current_time
|
||||
|
@ -246,7 +254,9 @@ class AgentDbMixin(ext_agent.AgentPluginBase):
|
|||
greenthread.sleep(0)
|
||||
context.session.add(agent_db)
|
||||
self._log_heartbeat(agent_state, agent_db, configurations_dict)
|
||||
status = constants.AGENT_NEW
|
||||
greenthread.sleep(0)
|
||||
return status
|
||||
|
||||
def create_or_update_agent(self, context, agent):
|
||||
"""Create or update agent according to report."""
|
||||
|
@ -286,7 +296,10 @@ class AgentExtRpcCallback(object):
|
|||
self.plugin = plugin
|
||||
|
||||
def report_state(self, context, **kwargs):
|
||||
"""Report state from agent to server."""
|
||||
"""Report state from agent to server.
|
||||
|
||||
Returns - agent's status: AGENT_NEW, AGENT_REVIVED, AGENT_ALIVE
|
||||
"""
|
||||
time = kwargs['time']
|
||||
time = timeutils.parse_strtime(time)
|
||||
agent_state = kwargs['agent_state']['agent_state']
|
||||
|
@ -301,7 +314,7 @@ class AgentExtRpcCallback(object):
|
|||
return
|
||||
if not self.plugin:
|
||||
self.plugin = manager.NeutronManager.get_plugin()
|
||||
self.plugin.create_or_update_agent(context, agent_state)
|
||||
return self.plugin.create_or_update_agent(context, agent_state)
|
||||
|
||||
def _check_clock_sync_on_agent_start(self, agent_state, agent_time):
|
||||
"""Checks if the server and the agent times are in sync.
|
||||
|
|
|
@ -871,6 +871,8 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
|
|||
|
||||
# stores received port_updates for processing by the main loop
|
||||
self.updated_devices = set()
|
||||
# flag to do a sync after revival
|
||||
self.fullsync = False
|
||||
self.context = context.get_admin_context_without_session()
|
||||
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
|
||||
|
@ -892,8 +894,13 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
|
|||
try:
|
||||
devices = len(self.br_mgr.get_tap_devices())
|
||||
self.agent_state.get('configurations')['devices'] = devices
|
||||
self.state_rpc.report_state(self.context,
|
||||
self.agent_state)
|
||||
agent_status = self.state_rpc.report_state(self.context,
|
||||
self.agent_state,
|
||||
True)
|
||||
if agent_status == constants.AGENT_REVIVED:
|
||||
LOG.info(_LI('Agent has just been revived. '
|
||||
'Doing a full sync.'))
|
||||
self.fullsync = True
|
||||
self.agent_state.pop('start_flag', None)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed reporting state!"))
|
||||
|
@ -1096,11 +1103,15 @@ class LinuxBridgeNeutronAgentRPC(service.Service):
|
|||
while True:
|
||||
start = time.time()
|
||||
|
||||
device_info = self.scan_devices(previous=device_info, sync=sync)
|
||||
if self.fullsync:
|
||||
sync = True
|
||||
self.fullsync = False
|
||||
|
||||
if sync:
|
||||
LOG.info(_LI("Agent out of sync with plugin!"))
|
||||
sync = False
|
||||
|
||||
device_info = self.scan_devices(previous=device_info, sync=sync)
|
||||
sync = False
|
||||
|
||||
if (self._device_info_has_changes(device_info)
|
||||
or self.sg_agent.firewall_refresh_needed()):
|
||||
|
|
|
@ -182,6 +182,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
super(OVSNeutronAgent, self).__init__()
|
||||
self.conf = conf or cfg.CONF
|
||||
|
||||
self.fullsync = True
|
||||
# init bridge classes with configured datapath type.
|
||||
self.br_int_cls, self.br_phys_cls, self.br_tun_cls = (
|
||||
functools.partial(bridge_classes[b],
|
||||
|
@ -192,7 +193,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
self.veth_mtu = veth_mtu
|
||||
self.available_local_vlans = set(moves.range(p_const.MIN_VLAN_TAG,
|
||||
p_const.MAX_VLAN_TAG))
|
||||
self.use_call = True
|
||||
self.tunnel_types = tunnel_types or []
|
||||
self.l2_pop = l2_population
|
||||
# TODO(ethuleau): Change ARP responder so it's not dependent on the
|
||||
|
@ -326,9 +326,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
self.dvr_agent.in_distributed_mode())
|
||||
|
||||
try:
|
||||
self.state_rpc.report_state(self.context,
|
||||
self.agent_state,
|
||||
self.use_call)
|
||||
agent_status = self.state_rpc.report_state(self.context,
|
||||
self.agent_state,
|
||||
True)
|
||||
if agent_status == n_const.AGENT_REVIVED:
|
||||
LOG.info(_LI('Agent has just been revived. '
|
||||
'Doing a full sync.'))
|
||||
self.fullsync = True
|
||||
self.use_call = False
|
||||
self.agent_state.pop('start_flag', None)
|
||||
except Exception:
|
||||
|
@ -1662,7 +1666,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
if not polling_manager:
|
||||
polling_manager = polling.get_polling_manager(
|
||||
minimize_polling=False)
|
||||
|
||||
sync = True
|
||||
ports = set()
|
||||
updated_ports_copy = set()
|
||||
|
@ -1672,6 +1675,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
consecutive_resyncs = 0
|
||||
need_clean_stale_flow = True
|
||||
while self._check_and_handle_signal():
|
||||
if self.fullsync:
|
||||
LOG.info(_LI("rpc_loop doing a full sync."))
|
||||
sync = True
|
||||
self.fullsync = False
|
||||
port_info = {}
|
||||
ancillary_port_info = {}
|
||||
start = time.time()
|
||||
|
|
|
@ -425,6 +425,20 @@ class TestDhcpAgent(base.BaseTestCase):
|
|||
dhcp.periodic_resync()
|
||||
spawn.assert_called_once_with(dhcp._periodic_resync_helper)
|
||||
|
||||
def test_report_state_revival_logic(self):
|
||||
dhcp = dhcp_agent.DhcpAgentWithStateReport(HOSTNAME)
|
||||
with mock.patch.object(dhcp.state_rpc,
|
||||
'report_state') as report_state,\
|
||||
mock.patch.object(dhcp, "run"):
|
||||
report_state.return_value = const.AGENT_ALIVE
|
||||
dhcp._report_state()
|
||||
self.assertEqual(dhcp.needs_resync_reasons, {})
|
||||
|
||||
report_state.return_value = const.AGENT_REVIVED
|
||||
dhcp._report_state()
|
||||
self.assertEqual(dhcp.needs_resync_reasons[None],
|
||||
['Agent has just been revived'])
|
||||
|
||||
def test_periodic_resync_helper(self):
|
||||
with mock.patch.object(dhcp_agent.eventlet, 'sleep') as sleep:
|
||||
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
|
||||
|
|
|
@ -215,13 +215,26 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
|||
conf=self.conf)
|
||||
|
||||
self.assertEqual(agent.agent_state['start_flag'], True)
|
||||
use_call_arg = agent.use_call
|
||||
agent.after_start()
|
||||
report_state.assert_called_once_with(agent.context,
|
||||
agent.agent_state,
|
||||
use_call_arg)
|
||||
True)
|
||||
self.assertTrue(agent.agent_state.get('start_flag') is None)
|
||||
|
||||
def test_report_state_revival_logic(self):
|
||||
with mock.patch.object(agent_rpc.PluginReportStateAPI,
|
||||
'report_state') as report_state:
|
||||
agent = l3_agent.L3NATAgentWithStateReport(host=HOSTNAME,
|
||||
conf=self.conf)
|
||||
report_state.return_value = l3_constants.AGENT_REVIVED
|
||||
agent._report_state()
|
||||
self.assertTrue(agent.fullsync)
|
||||
|
||||
agent.fullsync = False
|
||||
report_state.return_value = l3_constants.AGENT_ALIVE
|
||||
agent._report_state()
|
||||
self.assertFalse(agent.fullsync)
|
||||
|
||||
def test_periodic_sync_routers_task_call_clean_stale_namespaces(self):
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
self.plugin_api.get_routers.return_value = []
|
||||
|
|
|
@ -366,6 +366,13 @@ class TestLinuxBridgeAgent(base.BaseTestCase):
|
|||
self.agent.stop()
|
||||
self.assertFalse(mock_set_rpc.called)
|
||||
|
||||
def test_report_state_revived(self):
|
||||
with mock.patch.object(self.agent.state_rpc,
|
||||
"report_state") as report_st:
|
||||
report_st.return_value = constants.AGENT_REVIVED
|
||||
self.agent._report_state()
|
||||
self.assertTrue(self.agent.fullsync)
|
||||
|
||||
|
||||
class TestLinuxBridgeManager(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
|
|
|
@ -145,9 +145,6 @@ class TestOvsNeutronAgent(object):
|
|||
return_value=[]):
|
||||
self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(),
|
||||
**kwargs)
|
||||
# set back to true because initial report state will succeed due
|
||||
# to mocked out RPC calls
|
||||
self.agent.use_call = True
|
||||
self.agent.tun_br = self.br_tun_cls(br_name='br-tun')
|
||||
self.agent.sg_agent = mock.Mock()
|
||||
|
||||
|
@ -709,14 +706,13 @@ class TestOvsNeutronAgent(object):
|
|||
report_st.assert_called_with(self.agent.context,
|
||||
self.agent.agent_state, True)
|
||||
self.assertNotIn("start_flag", self.agent.agent_state)
|
||||
self.assertFalse(self.agent.use_call)
|
||||
self.assertEqual(
|
||||
self.agent.agent_state["configurations"]["devices"],
|
||||
self.agent.int_br_device_count
|
||||
)
|
||||
self.agent._report_state()
|
||||
report_st.assert_called_with(self.agent.context,
|
||||
self.agent.agent_state, False)
|
||||
self.agent.agent_state, True)
|
||||
|
||||
def test_report_state_fail(self):
|
||||
with mock.patch.object(self.agent.state_rpc,
|
||||
|
@ -729,6 +725,13 @@ class TestOvsNeutronAgent(object):
|
|||
report_st.assert_called_with(self.agent.context,
|
||||
self.agent.agent_state, True)
|
||||
|
||||
def test_report_state_revived(self):
|
||||
with mock.patch.object(self.agent.state_rpc,
|
||||
"report_state") as report_st:
|
||||
report_st.return_value = n_const.AGENT_REVIVED
|
||||
self.agent._report_state()
|
||||
self.assertTrue(self.agent.fullsync)
|
||||
|
||||
def test_port_update(self):
|
||||
port = {"id": TEST_PORT_ID1,
|
||||
"network_id": TEST_NETWORK_ID1,
|
||||
|
@ -1775,9 +1778,6 @@ class TestOvsDvrNeutronAgent(object):
|
|||
return_value=[]):
|
||||
self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(),
|
||||
**kwargs)
|
||||
# set back to true because initial report state will succeed due
|
||||
# to mocked out RPC calls
|
||||
self.agent.use_call = True
|
||||
self.agent.tun_br = self.br_tun_cls(br_name='br-tun')
|
||||
self.agent.sg_agent = mock.Mock()
|
||||
|
||||
|
|
Loading…
Reference in New Issue