Merge "Generalise the logic of resource auto rescheduling"
This commit is contained in:
commit
5ceac04b7c
|
@ -21,6 +21,7 @@ import debtcollector
|
|||
from neutron_lib import constants
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import timeutils
|
||||
import sqlalchemy as sa
|
||||
|
@ -207,6 +208,63 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
|
|||
seconds=agent_dead_limit)
|
||||
return cutoff
|
||||
|
||||
def reschedule_resources_from_down_agents(self, agent_type,
|
||||
get_down_bindings,
|
||||
agent_id_attr,
|
||||
resource_id_attr,
|
||||
resource_name,
|
||||
reschedule_resource,
|
||||
rescheduling_failed):
|
||||
"""Reschedule resources from down neutron agents
|
||||
if admin state is up.
|
||||
"""
|
||||
agent_dead_limit = self.agent_dead_limit_seconds()
|
||||
self.wait_down_agents(agent_type, agent_dead_limit)
|
||||
|
||||
context = ncontext.get_admin_context()
|
||||
try:
|
||||
down_bindings = get_down_bindings(context, agent_dead_limit)
|
||||
|
||||
agents_back_online = set()
|
||||
for binding in down_bindings:
|
||||
binding_agent_id = getattr(binding, agent_id_attr)
|
||||
binding_resource_id = getattr(binding, resource_id_attr)
|
||||
if binding_agent_id in agents_back_online:
|
||||
continue
|
||||
else:
|
||||
# we need new context to make sure we use different DB
|
||||
# transaction - otherwise we may fetch same agent record
|
||||
# each time due to REPEATABLE_READ isolation level
|
||||
context = ncontext.get_admin_context()
|
||||
agent = self._get_agent(context, binding_agent_id)
|
||||
if agent.is_active:
|
||||
agents_back_online.add(binding_agent_id)
|
||||
continue
|
||||
|
||||
LOG.warning(_LW(
|
||||
"Rescheduling %(resource_name)s %(resource)s from agent "
|
||||
"%(agent)s because the agent did not report to the server "
|
||||
"in the last %(dead_time)s seconds."),
|
||||
{'resource_name': resource_name,
|
||||
'resource': binding_resource_id,
|
||||
'agent': binding_agent_id,
|
||||
'dead_time': agent_dead_limit})
|
||||
try:
|
||||
reschedule_resource(context, binding_resource_id)
|
||||
except (rescheduling_failed, oslo_messaging.RemoteError):
|
||||
# Catch individual rescheduling errors here
|
||||
# so one broken one doesn't stop the iteration.
|
||||
LOG.exception(_LE("Failed to reschedule %(resource_name)s "
|
||||
"%(resource)s"),
|
||||
{'resource_name': resource_name,
|
||||
'resource': binding_resource_id})
|
||||
except Exception:
|
||||
# we want to be thorough and catch whatever is raised
|
||||
# to avoid loop abortion
|
||||
LOG.exception(_LE("Exception encountered during %(resource_name)s "
|
||||
"rescheduling."),
|
||||
{'resource_name': resource_name})
|
||||
|
||||
|
||||
class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
|
||||
.DhcpAgentSchedulerPluginBase,
|
||||
|
|
|
@ -27,10 +27,9 @@ from sqlalchemy import orm
|
|||
from sqlalchemy.orm import joinedload
|
||||
from sqlalchemy import sql
|
||||
|
||||
from neutron._i18n import _, _LE, _LI, _LW
|
||||
from neutron._i18n import _, _LI
|
||||
from neutron.common import constants as n_const
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron import context as n_ctx
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import agentschedulers_db
|
||||
from neutron.db import l3_attrs_db
|
||||
|
@ -103,59 +102,27 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
|
|||
|
||||
def reschedule_routers_from_down_agents(self):
|
||||
"""Reschedule routers from down l3 agents if admin state is up."""
|
||||
agent_dead_limit = self.agent_dead_limit_seconds()
|
||||
self.wait_down_agents('L3', agent_dead_limit)
|
||||
cutoff = self.get_cutoff_time(agent_dead_limit)
|
||||
self.reschedule_resources_from_down_agents(
|
||||
agent_type='L3',
|
||||
get_down_bindings=self.get_down_router_bindings,
|
||||
agent_id_attr='l3_agent_id',
|
||||
resource_id_attr='router_id',
|
||||
resource_name='router',
|
||||
reschedule_resource=self.reschedule_router,
|
||||
rescheduling_failed=l3agentscheduler.RouterReschedulingFailed)
|
||||
|
||||
context = n_ctx.get_admin_context()
|
||||
try:
|
||||
down_bindings = (
|
||||
context.session.query(RouterL3AgentBinding).
|
||||
join(agents_db.Agent).
|
||||
filter(agents_db.Agent.heartbeat_timestamp < cutoff,
|
||||
agents_db.Agent.admin_state_up).
|
||||
outerjoin(l3_attrs_db.RouterExtraAttributes,
|
||||
l3_attrs_db.RouterExtraAttributes.router_id ==
|
||||
RouterL3AgentBinding.router_id).
|
||||
filter(sa.or_(l3_attrs_db.RouterExtraAttributes.ha ==
|
||||
sql.false(),
|
||||
l3_attrs_db.RouterExtraAttributes.ha ==
|
||||
sql.null())))
|
||||
|
||||
agents_back_online = set()
|
||||
for binding in down_bindings:
|
||||
if binding.l3_agent_id in agents_back_online:
|
||||
continue
|
||||
else:
|
||||
# we need new context to make sure we use different DB
|
||||
# transaction - otherwise we may fetch same agent record
|
||||
# each time due to REPEATABLE_READ isolation level
|
||||
context = n_ctx.get_admin_context()
|
||||
agent = self._get_agent(context, binding.l3_agent_id)
|
||||
if agent.is_active:
|
||||
agents_back_online.add(binding.l3_agent_id)
|
||||
continue
|
||||
|
||||
LOG.warning(_LW(
|
||||
"Rescheduling router %(router)s from agent %(agent)s "
|
||||
"because the agent did not report to the server in "
|
||||
"the last %(dead_time)s seconds."),
|
||||
{'router': binding.router_id,
|
||||
'agent': binding.l3_agent_id,
|
||||
'dead_time': agent_dead_limit})
|
||||
try:
|
||||
self.reschedule_router(context, binding.router_id)
|
||||
except (l3agentscheduler.RouterReschedulingFailed,
|
||||
oslo_messaging.RemoteError):
|
||||
# Catch individual router rescheduling errors here
|
||||
# so one broken one doesn't stop the iteration.
|
||||
LOG.exception(_LE("Failed to reschedule router %s"),
|
||||
binding.router_id)
|
||||
except Exception:
|
||||
# we want to be thorough and catch whatever is raised
|
||||
# to avoid loop abortion
|
||||
LOG.exception(_LE("Exception encountered during router "
|
||||
"rescheduling."))
|
||||
def get_down_router_bindings(self, context, agent_dead_limit):
|
||||
cutoff = self.get_cutoff_time(agent_dead_limit)
|
||||
return (context.session.query(RouterL3AgentBinding).
|
||||
join(agents_db.Agent).
|
||||
filter(agents_db.Agent.heartbeat_timestamp < cutoff,
|
||||
agents_db.Agent.admin_state_up).
|
||||
outerjoin(l3_attrs_db.RouterExtraAttributes,
|
||||
l3_attrs_db.RouterExtraAttributes.router_id ==
|
||||
RouterL3AgentBinding.router_id).filter(
|
||||
sa.or_(
|
||||
l3_attrs_db.RouterExtraAttributes.ha == sql.false(),
|
||||
l3_attrs_db.RouterExtraAttributes.ha == sql.null())))
|
||||
|
||||
def _get_agent_mode(self, agent_db):
|
||||
agent_conf = self.get_configuration_dict(agent_db)
|
||||
|
|
Loading…
Reference in New Issue