Mark agents db mixin operations retriable
Marks the CRUD operations in the agents DB mixin as retriable. Change-Id: I1310d85daa29906b18d083e33a11a8b1463c0ad7 Partial-Bug: #1612798
This commit is contained in:
parent
ad13bdfa27
commit
a74cd2f779
|
@ -20,7 +20,6 @@ from neutron_lib.api import converters
|
|||
from neutron_lib import constants
|
||||
from neutron_lib.db import model_base
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from oslo_serialization import jsonutils
|
||||
|
@ -144,6 +143,7 @@ class AgentAvailabilityZoneMixin(az_ext.AvailabilityZonePluginBase):
|
|||
result[key] = agent.admin_state_up or result.get(key, False)
|
||||
return result
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def get_availability_zones(self, context, filters=None, fields=None,
|
||||
sorts=None, limit=None, marker=None,
|
||||
page_reverse=False):
|
||||
|
@ -156,6 +156,7 @@ class AgentAvailabilityZoneMixin(az_ext.AvailabilityZonePluginBase):
|
|||
for k, v in six.iteritems(self._list_availability_zones(
|
||||
context, filters))]
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def validate_availability_zones(self, context, resource_type,
|
||||
availability_zones):
|
||||
"""Verify that the availability zones exist."""
|
||||
|
@ -186,6 +187,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
|||
raise ext_agent.AgentNotFound(id=id)
|
||||
return agent
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def get_enabled_agent_on_host(self, context, agent_type, host):
|
||||
"""Return agent of agent_type for the specified host."""
|
||||
query = context.session.query(Agent)
|
||||
|
@ -254,6 +256,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
|||
res['availability_zone'] = agent['availability_zone']
|
||||
return self._fields(res, fields)
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def delete_agent(self, context, id):
|
||||
agent = self._get_agent(context, id)
|
||||
registry.notify(resources.AGENT, events.BEFORE_DELETE, self,
|
||||
|
@ -261,6 +264,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
|||
with context.session.begin(subtransactions=True):
|
||||
context.session.delete(agent)
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def update_agent(self, context, id, agent):
|
||||
agent_data = agent['agent']
|
||||
with context.session.begin(subtransactions=True):
|
||||
|
@ -268,10 +272,12 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
|||
agent.update(agent_data)
|
||||
return self._make_agent_dict(agent)
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def get_agents_db(self, context, filters=None):
|
||||
query = self._get_collection_query(context, Agent, filters=filters)
|
||||
return query.all()
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def get_agents(self, context, filters=None, fields=None):
|
||||
agents = self._get_collection(context, Agent,
|
||||
self._make_agent_dict,
|
||||
|
@ -282,6 +288,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
|||
agents = [agent for agent in agents if agent['alive'] == alive]
|
||||
return agents
|
||||
|
||||
@db_api.retry_db_errors
|
||||
def agent_health_check(self):
|
||||
"""Scan agents and log if some are considered dead."""
|
||||
agents = self.get_agents(context.get_admin_context(),
|
||||
|
@ -315,10 +322,12 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
|||
raise ext_agent.MultipleAgentFoundByTypeHost(agent_type=agent_type,
|
||||
host=host)
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def get_agent(self, context, id, fields=None):
|
||||
agent = self._get_agent(context, id)
|
||||
return self._make_agent_dict(agent, fields)
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def filter_hosts_with_network_access(
|
||||
self, context, network_id, candidate_hosts):
|
||||
"""Filter hosts with access to network_id.
|
||||
|
@ -341,10 +350,12 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
|||
'uuid': state.get('uuid'),
|
||||
'delta': delta})
|
||||
|
||||
def _create_or_update_agent(self, context, agent_state):
|
||||
@db_api.retry_if_session_inactive()
|
||||
def create_or_update_agent(self, context, agent_state):
|
||||
"""Registers new agent in the database or updates existing.
|
||||
|
||||
Returns agent status from server point of view: alive, new or revived.
|
||||
Returns tuple of agent status and state.
|
||||
Status is from server point of view: alive, new or revived.
|
||||
It could be used by agent to do some sync with the server if needed.
|
||||
"""
|
||||
status = n_const.AGENT_ALIVE
|
||||
|
@ -397,26 +408,7 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
|||
registry.notify(resources.AGENT, event_type, self, context=context,
|
||||
host=agent_state['host'], plugin=self,
|
||||
agent=agent_state)
|
||||
return status
|
||||
|
||||
def create_or_update_agent(self, context, agent):
|
||||
"""Create or update agent according to report."""
|
||||
try:
|
||||
return self._create_or_update_agent(context, agent)
|
||||
except db_exc.DBDuplicateEntry:
|
||||
# It might happen that two or more concurrent transactions
|
||||
# are trying to insert new rows having the same value of
|
||||
# (agent_type, host) pair at the same time (if there has
|
||||
# been no such entry in the table and multiple agent status
|
||||
# updates are being processed at the moment). In this case
|
||||
# having a unique constraint on (agent_type, host) columns
|
||||
# guarantees that only one transaction will succeed and
|
||||
# insert a new agent entry, others will fail and be rolled
|
||||
# back. That means we must retry them one more time: no
|
||||
# INSERTs will be issued, because
|
||||
# _get_agent_by_type_and_host() will return the existing
|
||||
# agent entry, which will be updated multiple times
|
||||
return self._create_or_update_agent(context, agent)
|
||||
return status, agent_state
|
||||
|
||||
def _get_agents_considered_for_versions(self):
|
||||
up_agents = self.get_agents(context.get_admin_context(),
|
||||
|
@ -469,7 +461,7 @@ class AgentExtRpcCallback(object):
|
|||
# Initialize RPC api directed to other neutron-servers
|
||||
self.server_versions_rpc = resources_rpc.ResourcesPushToServersRpcApi()
|
||||
|
||||
@db_api.retry_db_errors
|
||||
@db_api.retry_if_session_inactive()
|
||||
def report_state(self, context, **kwargs):
|
||||
"""Report state from agent to server.
|
||||
|
||||
|
@ -489,7 +481,8 @@ class AgentExtRpcCallback(object):
|
|||
return
|
||||
if not self.plugin:
|
||||
self.plugin = manager.NeutronManager.get_plugin()
|
||||
agent_status = self.plugin.create_or_update_agent(context, agent_state)
|
||||
agent_status, agent_state = self.plugin.create_or_update_agent(
|
||||
context, agent_state)
|
||||
self._update_local_agent_resource_versions(context, agent_state)
|
||||
return agent_status
|
||||
|
||||
|
|
Loading…
Reference in New Issue