Add option to remove networks from dead DHCP agents

Networks are removed from dead agents after a certain
configurable time.
Then unhosted networks could be picked up by alive DHCP agents.

The feature is added for all plugins that support DHCP scheduling

DocImpact
Change-Id: I6ab03b1642f54aa1d5de8844d95c758f1cb273f1
Closes-Bug: #1386794
This commit is contained in:
Eugene Nikanorov 2014-10-24 17:39:01 +04:00
parent 12637036d6
commit 0db17ea6ec
17 changed files with 276 additions and 35 deletions

View File

@ -173,6 +173,11 @@ lock_path = $state_path/lock
# admin_state_up set to True to alive agents.
# allow_automatic_l3agent_failover = False
# Allow automatic removal of networks from dead DHCP agents with
# admin_state_up set to True.
# Networks could then be rescheduled if network_auto_schedule is True
# allow_automatic_dhcp_failover = True
# Number of DHCP agents scheduled to host a network. This enables redundant
# DHCP agents for configured networks.
# dhcp_agents_per_network = 1

View File

@ -13,7 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import random
import time
from oslo.config import cfg
from oslo.utils import timeutils
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
@ -21,11 +26,14 @@ from sqlalchemy.orm import joinedload
from neutron.common import constants
from neutron.common import utils
from neutron import context as ncontext
from neutron.db import agents_db
from neutron.db import model_base
from neutron.extensions import agent as ext_agent
from neutron.extensions import dhcpagentscheduler
from neutron.i18n import _LE, _LI, _LW
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
LOG = logging.getLogger(__name__)
@ -37,6 +45,9 @@ AGENTS_SCHEDULER_OPTS = [
help=_('Driver to use for scheduling network to DHCP agent')),
cfg.BoolOpt('network_auto_schedule', default=True,
help=_('Allow auto scheduling networks to DHCP agent.')),
cfg.BoolOpt('allow_automatic_dhcp_failover', default=True,
help=_('Automatically remove networks from offline DHCP '
'agents.')),
cfg.IntOpt('dhcp_agents_per_network', default=1,
help=_('Number of DHCP agents scheduled to host a network.')),
]
@ -95,6 +106,39 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
original_agent['host'])
return result
def setup_agent_status_check(self, function):
self.periodic_agent_loop = loopingcall.FixedIntervalLoopingCall(
function)
# TODO(enikanorov): make interval configurable rather than computed
interval = max(cfg.CONF.agent_down_time / 2, 1)
# add random initial delay to allow agents to check in after the
# neutron server first starts. random to offset multiple servers
initial_delay = random.randint(interval, interval * 2)
self.periodic_agent_loop.start(interval=interval,
initial_delay=initial_delay)
def agent_dead_limit_seconds(self):
return cfg.CONF.agent_down_time * 2
def wait_down_agents(self, agent_type, agent_dead_limit):
"""Gives chance for agents to send a heartbeat."""
# check for an abrupt clock change since last check. if a change is
# detected, sleep for a while to let the agents check in.
tdelta = timeutils.utcnow() - getattr(self, '_clock_jump_canary',
timeutils.utcnow())
if timeutils.total_seconds(tdelta) > cfg.CONF.agent_down_time:
LOG.warn(_LW("Time since last %s agent reschedule check has "
"exceeded the interval between checks. Waiting "
"before check to allow agents to send a heartbeat "
"in case there was a clock adjustment."), agent_type)
time.sleep(agent_dead_limit)
self._clock_jump_canary = timeutils.utcnow()
def get_cutoff_time(self, agent_dead_limit):
cutoff = timeutils.utcnow() - datetime.timedelta(
seconds=agent_dead_limit)
return cutoff
class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
.DhcpAgentSchedulerPluginBase,
@ -104,6 +148,128 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
network_scheduler = None
def start_periodic_dhcp_agent_status_check(self):
if not cfg.CONF.allow_automatic_dhcp_failover:
LOG.info(_LI("Skipping periodic DHCP agent status check because "
"automatic network rescheduling is disabled."))
return
self.setup_agent_status_check(self.remove_networks_from_down_agents)
def _agent_starting_up(self, context, agent):
"""Check if agent was just started.
Method returns True if agent is in its 'starting up' period.
Return value depends on amount of networks assigned to the agent.
It doesn't look at latest heartbeat timestamp as it is assumed
that this method is called for agents that are considered dead.
"""
agent_dead_limit = datetime.timedelta(
seconds=self.agent_dead_limit_seconds())
network_count = (context.session.query(NetworkDhcpAgentBinding).
filter_by(dhcp_agent_id=agent['id']).count())
# amount of networks assigned to agent affect amount of time we give
# it so startup. Tests show that it's more or less sage to assume
# that DHCP agent processes each network in less than 2 seconds.
# So, give it this additional time for each of the networks.
additional_time = datetime.timedelta(seconds=2 * network_count)
LOG.debug("Checking if agent starts up and giving it additional %s",
additional_time)
agent_expected_up = (agent['started_at'] + agent_dead_limit +
additional_time)
return agent_expected_up > timeutils.utcnow()
def _schedule_network(self, context, network_id, dhcp_notifier):
LOG.info(_LI("Scheduling unhosted network %s"), network_id)
try:
# TODO(enikanorov): have to issue redundant db query
# to satisfy scheduling interface
network = self.get_network(context, network_id)
agents = self.schedule_network(context, network)
if not agents:
LOG.info(_LI("Failed to schedule network %s, "
"no eligible agents or it might be "
"already scheduled by another server"),
network_id)
return
if not dhcp_notifier:
return
for agent in agents:
LOG.info(_LI("Adding network %(net)s to agent "
"%(agent)%s on host %(host)s"),
{'net': network_id,
'agent': agent.id,
'host': agent.host})
dhcp_notifier.network_added_to_agent(
context, network_id, agent.host)
except Exception:
# catching any exception during scheduling
# so if _schedule_network is invoked in the loop it could
# continue in any case
LOG.exception(_LE("Failed to schedule network %s"), network_id)
def _filter_bindings(self, context, bindings):
"""Skip bindings for which the agent is dead, but starting up."""
# to save few db calls: store already checked agents in dict
# id -> is_agent_starting_up
checked_agents = {}
for binding in bindings:
agent_id = binding.dhcp_agent['id']
if agent_id not in checked_agents:
if self._agent_starting_up(context, binding.dhcp_agent):
# When agent starts and it has many networks to process
# it may fail to send state reports in defined interval.
# The server will consider it dead and try to remove
# networks from it.
checked_agents[agent_id] = True
LOG.debug("Agent %s is starting up, skipping", agent_id)
else:
checked_agents[agent_id] = False
if not checked_agents[agent_id]:
yield binding
def remove_networks_from_down_agents(self):
"""Remove networks from down DHCP agents if admin state is up.
Reschedule them if configured so.
"""
agent_dead_limit = self.agent_dead_limit_seconds()
self.wait_down_agents('DHCP', agent_dead_limit)
cutoff = self.get_cutoff_time(agent_dead_limit)
context = ncontext.get_admin_context()
down_bindings = (
context.session.query(NetworkDhcpAgentBinding).
join(agents_db.Agent).
filter(agents_db.Agent.heartbeat_timestamp < cutoff,
agents_db.Agent.admin_state_up))
dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP)
for binding in self._filter_bindings(context, down_bindings):
LOG.warn(_LW("Removing network %(network)s from agent %(agent)s "
"because the agent did not report to the server in "
"the last %(dead_time)s seconds."),
{'network': binding.network_id,
'agent': binding.dhcp_agent_id,
'dead_time': agent_dead_limit})
try:
self.remove_network_from_dhcp_agent(context,
binding.dhcp_agent_id,
binding.network_id)
except dhcpagentscheduler.NetworkNotHostedByDhcpAgent:
# measures against concurrent operation
LOG.debug("Network %(net)s already removed from DHCP agent "
"%s(agent)",
{'net': binding.network_id,
'agent': binding.dhcp_agent_id})
# still continue and allow concurrent scheduling attempt
if cfg.CONF.network_auto_schedule:
self._schedule_network(
context, binding.network_id, dhcp_notifier)
def get_dhcp_agents_hosting_networks(
self, context, network_ids, active=None):
if not network_ids:

View File

@ -12,14 +12,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import random
import time
from oslo.config import cfg
from oslo.db import exception as db_exc
from oslo import messaging
from oslo.utils import timeutils
import sqlalchemy as sa
from sqlalchemy import func
from sqlalchemy import or_
@ -39,7 +34,6 @@ from neutron.extensions import l3agentscheduler
from neutron.i18n import _LE, _LI, _LW
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
LOG = logging.getLogger(__name__)
@ -79,41 +73,22 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
router_scheduler = None
def start_periodic_agent_status_check(self):
def start_periodic_l3_agent_status_check(self):
if not cfg.CONF.allow_automatic_l3agent_failover:
LOG.info(_LI("Skipping period L3 agent status check because "
"automatic router rescheduling is disabled."))
return
self.periodic_agent_loop = loopingcall.FixedIntervalLoopingCall(
self.setup_agent_status_check(
self.reschedule_routers_from_down_agents)
interval = max(cfg.CONF.agent_down_time / 2, 1)
# add random initial delay to allow agents to check in after the
# neutron server first starts. random to offset multiple servers
self.periodic_agent_loop.start(interval=interval,
initial_delay=random.randint(interval, interval * 2))
def reschedule_routers_from_down_agents(self):
"""Reschedule routers from down l3 agents if admin state is up."""
# give agents extra time to handle transient failures
agent_dead_limit = cfg.CONF.agent_down_time * 2
# check for an abrupt clock change since last check. if a change is
# detected, sleep for a while to let the agents check in.
tdelta = timeutils.utcnow() - getattr(self, '_clock_jump_canary',
timeutils.utcnow())
if timeutils.total_seconds(tdelta) > cfg.CONF.agent_down_time:
LOG.warn(_LW("Time since last L3 agent reschedule check has "
"exceeded the interval between checks. Waiting "
"before check to allow agents to send a heartbeat "
"in case there was a clock adjustment."))
time.sleep(agent_dead_limit)
self._clock_jump_canary = timeutils.utcnow()
agent_dead_limit = self.agent_dead_limit_seconds()
self.wait_down_agents('L3', agent_dead_limit)
cutoff = self.get_cutoff_time(agent_dead_limit)
context = n_ctx.get_admin_context()
cutoff = timeutils.utcnow() - datetime.timedelta(
seconds=agent_dead_limit)
down_bindings = (
context.session.query(RouterL3AgentBinding).
join(agents_db.Agent).

View File

@ -500,6 +500,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
if cfg.CONF.RESTPROXY.sync_data:
self._send_all_data()
self.start_periodic_dhcp_agent_status_check()
LOG.debug("NeutronRestProxyV2: initialization done")
def _setup_rpc(self):

View File

@ -239,6 +239,7 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
cfg.CONF.router_scheduler_driver
)
self.brocade_init()
self.start_periodic_dhcp_agent_status_check()
def brocade_init(self):
"""Brocade specific initialization."""

View File

@ -102,6 +102,7 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.network_scheduler = importutils.import_object(
q_conf.CONF.network_scheduler_driver
)
self.start_periodic_dhcp_agent_status_check()
def _setup_rpc(self):
# RPC support

View File

@ -134,6 +134,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
cfg.CONF.network_scheduler_driver
)
self.start_periodic_dhcp_agent_status_check()
LOG.info(_LI("Modular L2 Plugin initialization complete"))
def _setup_rpc(self):

View File

@ -145,6 +145,7 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
'default': self.deactivate_port,
}
}
self.start_periodic_dhcp_agent_status_check()
def setup_rpc(self):
self.service_topics = {svc_constants.CORE: topics.PLUGIN,

View File

@ -136,6 +136,7 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
cfg.CONF.network_scheduler_driver)
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
self.start_periodic_dhcp_agent_status_check()
def oneconvergence_init(self):
"""Initialize the connections and set the log levels for the plugin."""

View File

@ -180,6 +180,7 @@ class NsxPluginV2(addr_pair_db.AllowedAddressPairsMixin,
self.nsx_sync_opts.min_sync_req_delay,
self.nsx_sync_opts.min_chunk_size,
self.nsx_sync_opts.max_random_sync_delay)
self.start_periodic_dhcp_agent_status_check()
def _ensure_default_network_gateway(self):
if self._is_default_net_gw_in_sync:

View File

@ -54,7 +54,7 @@ class L3RouterPlugin(common_db_mixin.CommonDbMixin,
self.setup_rpc()
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
self.start_periodic_agent_status_check()
self.start_periodic_l3_agent_status_check()
super(L3RouterPlugin, self).__init__()
def setup_rpc(self):

View File

@ -57,6 +57,10 @@ class BigSwitchTestBase(object):
cfg.CONF.set_override('add_meta_server_route', False, 'RESTPROXY')
def setup_patches(self):
self.dhcp_periodic_p = mock.patch(
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
'start_periodic_dhcp_agent_status_check')
self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
self.plugin_notifier_p = mock.patch(NOTIFIER)
# prevent any greenthreads from spawning
self.spawn_p = mock.patch(SPAWN, new=lambda *args, **kwargs: None)

View File

@ -237,7 +237,7 @@ class OvsAgentSchedulerTestCaseBase(test_l3_plugin.L3NatTestCaseMixin,
self.patched_l3_notify = self.l3_notify_p.start()
self.l3_periodic_p = mock.patch('neutron.db.l3_agentschedulers_db.'
'L3AgentSchedulerDbMixin.'
'start_periodic_agent_status_check')
'start_periodic_l3_agent_status_check')
self.patched_l3_periodic = self.l3_periodic_p.start()
self.dhcp_notify_p = mock.patch(
'neutron.extensions.dhcpagentscheduler.notify')

View File

@ -119,6 +119,11 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase,
self.net_create_status = 'ACTIVE'
self.port_create_status = 'ACTIVE'
self.dhcp_periodic_p = mock.patch(
'neutron.db.agentschedulers_db.DhcpAgentSchedulerDbMixin.'
'start_periodic_dhcp_agent_status_check')
self.patched_dhcp_periodic = self.dhcp_periodic_p.start()
def _is_native_bulk_supported():
plugin_obj = manager.NeutronManager.get_plugin()
native_bulk_attr_name = ("_%s__native_bulk_support"

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import datetime
import mock
@ -24,7 +25,7 @@ from neutron.common import constants
from neutron.common import topics
from neutron import context
from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import agentschedulers_db as sched_db
from neutron.db import models_v2
from neutron.scheduler import dhcp_agent_scheduler
from neutron.tests.unit import testlib_api
@ -68,6 +69,7 @@ class TestDhcpSchedulerBaseTestCase(testlib_api.SqlTestCase):
old_time = agent['heartbeat_timestamp']
hour_old = old_time - datetime.timedelta(hours=1)
agent['heartbeat_timestamp'] = hour_old
agent['started_at'] = hour_old
self._save_agents(dhcp_agents)
return dhcp_agents
@ -80,7 +82,7 @@ class TestDhcpSchedulerBaseTestCase(testlib_api.SqlTestCase):
scheduler = dhcp_agent_scheduler.ChanceScheduler()
scheduler._schedule_bind_network(self.ctx, agents, network_id)
results = self.ctx.session.query(
agentschedulers_db.NetworkDhcpAgentBinding).filter_by(
sched_db.NetworkDhcpAgentBinding).filter_by(
network_id=network_id).all()
self.assertEqual(len(agents), len(results))
for result in results:
@ -189,5 +191,70 @@ class TestAutoScheduleNetworks(TestDhcpSchedulerBaseTestCase):
plugin, self.ctx, host)
self.assertEqual(expected_result, observed_ret_value)
hosted_agents = self.ctx.session.query(
agentschedulers_db.NetworkDhcpAgentBinding).all()
sched_db.NetworkDhcpAgentBinding).all()
self.assertEqual(expected_hosted_agents, len(hosted_agents))
class TestNetworksFailover(TestDhcpSchedulerBaseTestCase,
sched_db.DhcpAgentSchedulerDbMixin):
def test_reschedule_network_from_down_agent(self):
plugin = mock.MagicMock()
plugin.get_subnets.return_value = [{"network_id": self.network_id,
"enable_dhcp": True}]
agents = self._create_and_set_agents_down(['host-a', 'host-b'], 1)
self._test_schedule_bind_network([agents[0]], self.network_id)
self._save_networks(["foo-network-2"])
self._test_schedule_bind_network([agents[1]], "foo-network-2")
with contextlib.nested(
mock.patch.object(self, 'remove_network_from_dhcp_agent'),
mock.patch.object(self, 'schedule_network',
return_value=[agents[1]]),
mock.patch.object(self, 'get_network', create=True,
return_value={'id': self.network_id})
) as (rn, sch, getn):
notifier = mock.MagicMock()
self.agent_notifiers[constants.AGENT_TYPE_DHCP] = notifier
self.remove_networks_from_down_agents()
rn.assert_called_with(mock.ANY, agents[0].id, self.network_id)
sch.assert_called_with(mock.ANY, {'id': self.network_id})
notifier.network_added_to_agent.assert_called_with(
mock.ANY, self.network_id, agents[1].host)
def test_reschedule_network_from_down_agent_failed(self):
plugin = mock.MagicMock()
plugin.get_subnets.return_value = [{"network_id": self.network_id,
"enable_dhcp": True}]
agents = self._create_and_set_agents_down(['host-a'], 1)
self._test_schedule_bind_network([agents[0]], self.network_id)
with contextlib.nested(
mock.patch.object(self, 'remove_network_from_dhcp_agent'),
mock.patch.object(self, 'schedule_network',
return_value=None),
mock.patch.object(self, 'get_network', create=True,
return_value={'id': self.network_id})
) as (rn, sch, getn):
notifier = mock.MagicMock()
self.agent_notifiers[constants.AGENT_TYPE_DHCP] = notifier
self.remove_networks_from_down_agents()
rn.assert_called_with(mock.ANY, agents[0].id, self.network_id)
sch.assert_called_with(mock.ANY, {'id': self.network_id})
self.assertFalse(notifier.network_added_to_agent.called)
def test_filter_bindings(self):
bindings = [
sched_db.NetworkDhcpAgentBinding(network_id='foo1',
dhcp_agent={'id': 'id1'}),
sched_db.NetworkDhcpAgentBinding(network_id='foo2',
dhcp_agent={'id': 'id1'}),
sched_db.NetworkDhcpAgentBinding(network_id='foo3',
dhcp_agent={'id': 'id2'}),
sched_db.NetworkDhcpAgentBinding(network_id='foo4',
dhcp_agent={'id': 'id2'})]
with mock.patch.object(self, '_agent_starting_up',
side_effect=[True, False]):
res = [b for b in self._filter_bindings(None, bindings)]
# once per each agent id1 and id2
self.assertEqual(2, len(res))
res_ids = [b.network_id for b in res]
self.assertIn('foo3', res_ids)
self.assertIn('foo4', res_ids)

View File

@ -80,6 +80,10 @@ class ConfigurationTest(base.BaseTestCase):
# Avoid runs of the synchronizer looping call
patch_sync = mock.patch.object(sync, '_start_loopingcall')
patch_sync.start()
dhcp_periodic_p = mock.patch('neutron.db.agentschedulers_db.'
'DhcpAgentSchedulerDbMixin.'
'start_periodic_dhcp_agent_status_check')
dhcp_periodic_p.start()
def _assert_required_options(self, cluster):
self.assertEqual(cluster.nsx_controllers, ['fake_1:443', 'fake_2:443'])
@ -230,6 +234,10 @@ class OldNVPConfigurationTest(base.BaseTestCase):
# Avoid runs of the synchronizer looping call
patch_sync = mock.patch.object(sync, '_start_loopingcall')
patch_sync.start()
dhcp_periodic_p = mock.patch('neutron.db.agentschedulers_db.'
'DhcpAgentSchedulerDbMixin.'
'start_periodic_dhcp_agent_status_check')
dhcp_periodic_p.start()
def _assert_required_options(self, cluster):
self.assertEqual(cluster.nsx_controllers, ['fake_1:443', 'fake_2:443'])

View File

@ -292,6 +292,10 @@ class SyncTestCase(testlib_api.SqlTestCase):
'--config-file', vmware.get_fake_conf('nsx.ini.test')]
self.config_parse(args=args)
cfg.CONF.set_override('allow_overlapping_ips', True)
dhcp_periodic_p = mock.patch('neutron.db.agentschedulers_db.'
'DhcpAgentSchedulerDbMixin.'
'start_periodic_dhcp_agent_status_check')
dhcp_periodic_p.start()
self._plugin = plugin.NsxPlugin()
# Mock neutron manager plugin load functions to speed up tests
mock_nm_get_plugin = mock.patch('neutron.manager.NeutronManager.'