Big Switch: Lock consistency table for REST calls

Adds a new class to hold an SQL table lock throughout
a REST call to prevent servers in an HA deployment from
using a stale consistency hash value.

Also passes the current context down to the server manager
for every request since the consistency hash will need to
be read from the database instead of from a variable on the
server pool object.

Closes-Bug: #1328331
Change-Id: I5f8402c076d7732742c0ae4d9b9d6833d42a0b7b
This commit is contained in:
Kevin Benton 2014-06-06 14:15:46 -07:00
parent 24718e6f17
commit 09e706b210
4 changed files with 97 additions and 33 deletions

View File

@ -14,20 +14,22 @@
# under the License.
import sqlalchemy as sa
from neutron.common import exceptions
from neutron.db import api as db
from neutron.db import model_base
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
'''
A simple table to store the latest consistency hash
received from a server in case neutron gets restarted.
'''
class MultipleReadForUpdateCalls(exceptions.NeutronException):
message = _("Only one read_for_update call may be made at a time.")
class ConsistencyHash(model_base.BASEV2):
'''
A simple table to store the latest consistency hash
received from a server.
For now we only support one global state so the
hash_id will always be '1'
'''
@ -37,20 +39,44 @@ class ConsistencyHash(model_base.BASEV2):
hash = sa.Column(sa.String(255), nullable=False)
def get_consistency_hash(hash_id='1'):
session = db.get_session()
with session.begin(subtransactions=True):
query = session.query(ConsistencyHash)
res = query.filter_by(hash_id=hash_id).first()
if not res:
return False
return res.hash
class HashHandler(object):
'''
A wrapper object to keep track of the session and hold the SQL
lock between the read and the update to prevent other servers
from reading the hash during a transaction.
'''
def __init__(self, context=None, hash_id='1'):
self.hash_id = hash_id
self.session = db.get_session() if not context else context.session
self.hash_db_obj = None
self.transaction = None
def read_for_update(self):
if self.transaction:
raise MultipleReadForUpdateCalls()
self.transaction = self.session.begin(subtransactions=True)
# Lock for update here to prevent another server from reading the hash
# while this one is in the middle of a transaction.
# This may not lock the SQL table in MySQL Galera deployments
# but that's okay because the worst case is a double-sync
res = (self.session.query(ConsistencyHash).
filter_by(hash_id=self.hash_id).
with_lockmode('update').first())
if not res:
return ''
self.hash_db_obj = res
return res.hash
def put_consistency_hash(hash, hash_id='1'):
session = db.get_session()
with session.begin(subtransactions=True):
conhash = ConsistencyHash(hash_id=hash_id, hash=hash)
session.merge(conhash)
def put_hash(self, hash):
hash = hash or ''
if not self.transaction:
self.transaction = self.session.begin(subtransactions=True)
if self.hash_db_obj is not None:
self.hash_db_obj.hash = hash
else:
conhash = ConsistencyHash(hash_id=self.hash_id, hash=hash)
self.session.merge(conhash)
self.transaction.commit()
self.transaction = None
LOG.debug(_("Consistency hash for group %(hash_id)s updated "
"to %(hash)s"), {'hash_id': hash_id, 'hash': hash})
"to %(hash)s"), {'hash_id': self.hash_id, 'hash': hash})

View File

@ -45,6 +45,7 @@ on port-attach) on an additional PUT to do a bulk dump of all persistent data.
"""
import copy
import functools
import httplib
import re
@ -448,6 +449,14 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
raise exceptions.PortNotFound(port_id=port_id)
def put_context_in_serverpool(f):
@functools.wraps(f)
def wrapper(self, context, *args, **kwargs):
self.servers.set_context(context)
return f(self, context, *args, **kwargs)
return wrapper
class NeutronRestProxyV2(NeutronRestProxyV2Base,
addr_pair_db.AllowedAddressPairsMixin,
extradhcpopt_db.ExtraDhcpOptMixin,
@ -514,6 +523,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# Consume from all consumers in a thread
self.conn.consume_in_thread()
@put_context_in_serverpool
def create_network(self, context, network):
"""Create a network.
@ -557,6 +567,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# return created network
return new_net
@put_context_in_serverpool
def update_network(self, context, net_id, network):
"""Updates the properties of a particular Virtual Network.
@ -596,6 +607,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# NOTE(kevinbenton): workaround for eventlet/mysql deadlock
@utils.synchronized('bsn-port-barrier')
@put_context_in_serverpool
def delete_network(self, context, net_id):
"""Delete a network.
:param context: neutron api request context
@ -618,6 +630,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
self._send_delete_network(orig_net, context)
return ret_val
@put_context_in_serverpool
def create_port(self, context, port):
"""Create a port, which is a connection point of a device
(e.g., a VM NIC) to attach to a L2 Neutron network.
@ -708,6 +721,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
self._extend_port_dict_binding(context, port)
return [self._fields(port, fields) for port in ports]
@put_context_in_serverpool
def update_port(self, context, port_id, port):
"""Update values of a port.
@ -784,6 +798,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# NOTE(kevinbenton): workaround for eventlet/mysql deadlock
@utils.synchronized('bsn-port-barrier')
@put_context_in_serverpool
def delete_port(self, context, port_id, l3_port_check=True):
"""Delete a port.
:param context: neutron api request context
@ -809,6 +824,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
self._delete_port(context, port_id)
self.servers.rest_delete_port(tenid, port['network_id'], port_id)
@put_context_in_serverpool
def create_subnet(self, context, subnet):
LOG.debug(_("NeutronRestProxyV2: create_subnet() called"))
@ -825,6 +841,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
self._send_update_network(orig_net, context)
return new_subnet
@put_context_in_serverpool
def update_subnet(self, context, id, subnet):
LOG.debug(_("NeutronRestProxyV2: update_subnet() called"))
@ -843,6 +860,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# NOTE(kevinbenton): workaround for eventlet/mysql deadlock
@utils.synchronized('bsn-port-barrier')
@put_context_in_serverpool
def delete_subnet(self, context, id):
LOG.debug(_("NeutronRestProxyV2: delete_subnet() called"))
orig_subnet = super(NeutronRestProxyV2, self).get_subnet(context, id)
@ -881,6 +899,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
return tenantset
return defaultset
@put_context_in_serverpool
def create_router(self, context, router):
LOG.debug(_("NeutronRestProxyV2: create_router() called"))
@ -902,6 +921,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# return created router
return new_router
@put_context_in_serverpool
def update_router(self, context, router_id, router):
LOG.debug(_("NeutronRestProxyV2.update_router() called"))
@ -925,6 +945,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# NOTE(kevinbenton): workaround for eventlet/mysql deadlock.
# delete_router ends up calling _delete_port instead of delete_port.
@utils.synchronized('bsn-port-barrier')
@put_context_in_serverpool
def delete_router(self, context, router_id):
LOG.debug(_("NeutronRestProxyV2: delete_router() called"))
@ -1015,6 +1036,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
interface_id)
return del_ret
@put_context_in_serverpool
def create_floatingip(self, context, floatingip):
LOG.debug(_("NeutronRestProxyV2: create_floatingip() called"))
@ -1038,6 +1060,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# return created floating IP
return new_fl_ip
@put_context_in_serverpool
def update_floatingip(self, context, id, floatingip):
LOG.debug(_("NeutronRestProxyV2: update_floatingip() called"))
@ -1054,6 +1077,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
self._send_floatingip_update(context)
return new_fl_ip
@put_context_in_serverpool
def delete_floatingip(self, context, id):
LOG.debug(_("NeutronRestProxyV2: delete_floatingip() called"))
@ -1078,6 +1102,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
# overriding method from l3_db as original method calls
# self.delete_floatingip() which in turn calls self.delete_port() which
# is locked with 'bsn-port-barrier'
@put_context_in_serverpool
def delete_disassociated_floatingips(self, context, network_id):
query = self._model_query(context, l3_db.FloatingIP)
query = query.filter_by(floating_network_id=network_id,

View File

@ -39,6 +39,7 @@ import socket
import ssl
import eventlet
import eventlet.corolocal
from oslo.config import cfg
from neutron.common import exceptions
@ -121,7 +122,7 @@ class ServerProxy(object):
return self.capabilities
def rest_call(self, action, resource, data='', headers={}, timeout=False,
reconnect=False):
reconnect=False, hash_handler=None):
uri = self.base_uri + resource
body = json.dumps(data)
if not headers:
@ -131,7 +132,12 @@ class ServerProxy(object):
headers['NeutronProxy-Agent'] = self.name
headers['Instance-ID'] = self.neutron_id
headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID
headers[HASH_MATCH_HEADER] = self.mypool.consistency_hash or ''
if hash_handler:
# this will be excluded on calls that don't need hashes
# (e.g. topology sync, capability checks)
headers[HASH_MATCH_HEADER] = hash_handler.read_for_update()
else:
hash_handler = cdb.HashHandler()
if 'keep-alive' in self.capabilities:
headers['Connection'] = 'keep-alive'
else:
@ -178,9 +184,7 @@ class ServerProxy(object):
try:
self.currentconn.request(action, uri, body, headers)
response = self.currentconn.getresponse()
newhash = response.getheader(HASH_MATCH_HEADER)
if newhash:
self._put_consistency_hash(newhash)
hash_handler.put_hash(response.getheader(HASH_MATCH_HEADER))
respstr = response.read()
respdata = respstr
if response.status in self.success_codes:
@ -216,10 +220,6 @@ class ServerProxy(object):
'data': ret[3]})
return ret
def _put_consistency_hash(self, newhash):
self.mypool.consistency_hash = newhash
cdb.put_consistency_hash(newhash)
class ServerPool(object):
@ -235,6 +235,7 @@ class ServerPool(object):
self.neutron_id = cfg.CONF.RESTPROXY.neutron_id
self.base_uri = base_uri
self.name = name
self.contexts = {}
self.timeout = cfg.CONF.RESTPROXY.server_timeout
self.always_reconnect = not cfg.CONF.RESTPROXY.cache_connections
default_port = 8000
@ -246,10 +247,6 @@ class ServerPool(object):
self.get_topo_function = None
self.get_topo_function_args = {}
# Hash to send to backend with request as expected previous
# state to verify consistency.
self.consistency_hash = cdb.get_consistency_hash()
if not servers:
raise cfg.Error(_('Servers not defined. Aborting server manager.'))
servers = [s if len(s.rsplit(':', 1)) == 2
@ -268,6 +265,19 @@ class ServerPool(object):
cfg.CONF.RESTPROXY.consistency_interval)
LOG.debug(_("ServerPool: initialization done"))
def set_context(self, context):
# this context needs to be local to the greenthread
# so concurrent requests don't use the wrong context
self.contexts[eventlet.corolocal.get_ident()] = context
def pop_context(self):
# Don't store these contexts after use. They should only
# last for one request.
try:
return self.contexts.pop(eventlet.corolocal.get_ident())
except KeyError:
return None
def get_capabilities(self):
# lookup on first try
try:
@ -394,12 +404,14 @@ class ServerPool(object):
@utils.synchronized('bsn-rest-call')
def rest_call(self, action, resource, data, headers, ignore_codes,
timeout=False):
hash_handler = cdb.HashHandler(context=self.pop_context())
good_first = sorted(self.servers, key=lambda x: x.failed)
first_response = None
for active_server in good_first:
ret = active_server.rest_call(action, resource, data, headers,
timeout,
reconnect=self.always_reconnect)
reconnect=self.always_reconnect,
hash_handler=hash_handler)
# If inconsistent, do a full synchronization
if ret[0] == httplib.CONFLICT:
if not self.get_topo_function:

View File

@ -352,7 +352,8 @@ class ServerManagerTests(test_rp.BigSwitchProxyPluginV2TestCase):
# making a call should trigger a conflict sync
pl.servers.rest_call('GET', '/', '', None, [])
srestmock.assert_has_calls([
mock.call('GET', '/', '', None, False, reconnect=True),
mock.call('GET', '/', '', None, False, reconnect=True,
hash_handler=mock.ANY),
mock.call('PUT', '/topology',
{'routers': [], 'networks': []},
timeout=None)