From 09e706b210513968cac2bad78fe81ee9cbb5b5be Mon Sep 17 00:00:00 2001 From: Kevin Benton Date: Fri, 6 Jun 2014 14:15:46 -0700 Subject: [PATCH] Big Switch: Lock consistency table for REST calls Adds a new class to hold an SQL table lock throughout a REST call to prevent servers in an HA deployment from using a stale consistency hash value. Also passes the current context down to the server manager for every request since the consistency hash will need to be read from the database instead of from a variable on the server pool object. Closes-Bug: #1328331 Change-Id: I5f8402c076d7732742c0ae4d9b9d6833d42a0b7b --- .../plugins/bigswitch/db/consistency_db.py | 62 +++++++++++++------ neutron/plugins/bigswitch/plugin.py | 25 ++++++++ neutron/plugins/bigswitch/servermanager.py | 40 +++++++----- .../unit/bigswitch/test_servermanager.py | 3 +- 4 files changed, 97 insertions(+), 33 deletions(-) diff --git a/neutron/plugins/bigswitch/db/consistency_db.py b/neutron/plugins/bigswitch/db/consistency_db.py index cd89a2690..4d1a1db79 100644 --- a/neutron/plugins/bigswitch/db/consistency_db.py +++ b/neutron/plugins/bigswitch/db/consistency_db.py @@ -14,20 +14,22 @@ # under the License. import sqlalchemy as sa +from neutron.common import exceptions from neutron.db import api as db from neutron.db import model_base from neutron.openstack.common import log as logging LOG = logging.getLogger(__name__) -''' -A simple table to store the latest consistency hash -received from a server in case neutron gets restarted. -''' + +class MultipleReadForUpdateCalls(exceptions.NeutronException): + message = _("Only one read_for_update call may be made at a time.") class ConsistencyHash(model_base.BASEV2): ''' + A simple table to store the latest consistency hash + received from a server. For now we only support one global state so the hash_id will always be '1' ''' @@ -37,20 +39,44 @@ class ConsistencyHash(model_base.BASEV2): hash = sa.Column(sa.String(255), nullable=False) -def get_consistency_hash(hash_id='1'): - session = db.get_session() - with session.begin(subtransactions=True): - query = session.query(ConsistencyHash) - res = query.filter_by(hash_id=hash_id).first() - if not res: - return False - return res.hash +class HashHandler(object): + ''' + A wrapper object to keep track of the session and hold the SQL + lock between the read and the update to prevent other servers + from reading the hash during a transaction. + ''' + def __init__(self, context=None, hash_id='1'): + self.hash_id = hash_id + self.session = db.get_session() if not context else context.session + self.hash_db_obj = None + self.transaction = None + def read_for_update(self): + if self.transaction: + raise MultipleReadForUpdateCalls() + self.transaction = self.session.begin(subtransactions=True) + # Lock for update here to prevent another server from reading the hash + # while this one is in the middle of a transaction. + # This may not lock the SQL table in MySQL Galera deployments + # but that's okay because the worst case is a double-sync + res = (self.session.query(ConsistencyHash). + filter_by(hash_id=self.hash_id). + with_lockmode('update').first()) + if not res: + return '' + self.hash_db_obj = res + return res.hash -def put_consistency_hash(hash, hash_id='1'): - session = db.get_session() - with session.begin(subtransactions=True): - conhash = ConsistencyHash(hash_id=hash_id, hash=hash) - session.merge(conhash) + def put_hash(self, hash): + hash = hash or '' + if not self.transaction: + self.transaction = self.session.begin(subtransactions=True) + if self.hash_db_obj is not None: + self.hash_db_obj.hash = hash + else: + conhash = ConsistencyHash(hash_id=self.hash_id, hash=hash) + self.session.merge(conhash) + self.transaction.commit() + self.transaction = None LOG.debug(_("Consistency hash for group %(hash_id)s updated " - "to %(hash)s"), {'hash_id': hash_id, 'hash': hash}) + "to %(hash)s"), {'hash_id': self.hash_id, 'hash': hash}) diff --git a/neutron/plugins/bigswitch/plugin.py b/neutron/plugins/bigswitch/plugin.py index 9249f5d6b..44ea5e4fa 100644 --- a/neutron/plugins/bigswitch/plugin.py +++ b/neutron/plugins/bigswitch/plugin.py @@ -45,6 +45,7 @@ on port-attach) on an additional PUT to do a bulk dump of all persistent data. """ import copy +import functools import httplib import re @@ -448,6 +449,14 @@ class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2, raise exceptions.PortNotFound(port_id=port_id) +def put_context_in_serverpool(f): + @functools.wraps(f) + def wrapper(self, context, *args, **kwargs): + self.servers.set_context(context) + return f(self, context, *args, **kwargs) + return wrapper + + class NeutronRestProxyV2(NeutronRestProxyV2Base, addr_pair_db.AllowedAddressPairsMixin, extradhcpopt_db.ExtraDhcpOptMixin, @@ -514,6 +523,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # Consume from all consumers in a thread self.conn.consume_in_thread() + @put_context_in_serverpool def create_network(self, context, network): """Create a network. @@ -557,6 +567,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # return created network return new_net + @put_context_in_serverpool def update_network(self, context, net_id, network): """Updates the properties of a particular Virtual Network. @@ -596,6 +607,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # NOTE(kevinbenton): workaround for eventlet/mysql deadlock @utils.synchronized('bsn-port-barrier') + @put_context_in_serverpool def delete_network(self, context, net_id): """Delete a network. :param context: neutron api request context @@ -618,6 +630,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, self._send_delete_network(orig_net, context) return ret_val + @put_context_in_serverpool def create_port(self, context, port): """Create a port, which is a connection point of a device (e.g., a VM NIC) to attach to a L2 Neutron network. @@ -708,6 +721,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, self._extend_port_dict_binding(context, port) return [self._fields(port, fields) for port in ports] + @put_context_in_serverpool def update_port(self, context, port_id, port): """Update values of a port. @@ -784,6 +798,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # NOTE(kevinbenton): workaround for eventlet/mysql deadlock @utils.synchronized('bsn-port-barrier') + @put_context_in_serverpool def delete_port(self, context, port_id, l3_port_check=True): """Delete a port. :param context: neutron api request context @@ -809,6 +824,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, self._delete_port(context, port_id) self.servers.rest_delete_port(tenid, port['network_id'], port_id) + @put_context_in_serverpool def create_subnet(self, context, subnet): LOG.debug(_("NeutronRestProxyV2: create_subnet() called")) @@ -825,6 +841,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, self._send_update_network(orig_net, context) return new_subnet + @put_context_in_serverpool def update_subnet(self, context, id, subnet): LOG.debug(_("NeutronRestProxyV2: update_subnet() called")) @@ -843,6 +860,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # NOTE(kevinbenton): workaround for eventlet/mysql deadlock @utils.synchronized('bsn-port-barrier') + @put_context_in_serverpool def delete_subnet(self, context, id): LOG.debug(_("NeutronRestProxyV2: delete_subnet() called")) orig_subnet = super(NeutronRestProxyV2, self).get_subnet(context, id) @@ -881,6 +899,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, return tenantset return defaultset + @put_context_in_serverpool def create_router(self, context, router): LOG.debug(_("NeutronRestProxyV2: create_router() called")) @@ -902,6 +921,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # return created router return new_router + @put_context_in_serverpool def update_router(self, context, router_id, router): LOG.debug(_("NeutronRestProxyV2.update_router() called")) @@ -925,6 +945,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # NOTE(kevinbenton): workaround for eventlet/mysql deadlock. # delete_router ends up calling _delete_port instead of delete_port. @utils.synchronized('bsn-port-barrier') + @put_context_in_serverpool def delete_router(self, context, router_id): LOG.debug(_("NeutronRestProxyV2: delete_router() called")) @@ -1015,6 +1036,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, interface_id) return del_ret + @put_context_in_serverpool def create_floatingip(self, context, floatingip): LOG.debug(_("NeutronRestProxyV2: create_floatingip() called")) @@ -1038,6 +1060,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # return created floating IP return new_fl_ip + @put_context_in_serverpool def update_floatingip(self, context, id, floatingip): LOG.debug(_("NeutronRestProxyV2: update_floatingip() called")) @@ -1054,6 +1077,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, self._send_floatingip_update(context) return new_fl_ip + @put_context_in_serverpool def delete_floatingip(self, context, id): LOG.debug(_("NeutronRestProxyV2: delete_floatingip() called")) @@ -1078,6 +1102,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, # overriding method from l3_db as original method calls # self.delete_floatingip() which in turn calls self.delete_port() which # is locked with 'bsn-port-barrier' + @put_context_in_serverpool def delete_disassociated_floatingips(self, context, network_id): query = self._model_query(context, l3_db.FloatingIP) query = query.filter_by(floating_network_id=network_id, diff --git a/neutron/plugins/bigswitch/servermanager.py b/neutron/plugins/bigswitch/servermanager.py index c39c4e9f0..8ee64778f 100644 --- a/neutron/plugins/bigswitch/servermanager.py +++ b/neutron/plugins/bigswitch/servermanager.py @@ -39,6 +39,7 @@ import socket import ssl import eventlet +import eventlet.corolocal from oslo.config import cfg from neutron.common import exceptions @@ -121,7 +122,7 @@ class ServerProxy(object): return self.capabilities def rest_call(self, action, resource, data='', headers={}, timeout=False, - reconnect=False): + reconnect=False, hash_handler=None): uri = self.base_uri + resource body = json.dumps(data) if not headers: @@ -131,7 +132,12 @@ class ServerProxy(object): headers['NeutronProxy-Agent'] = self.name headers['Instance-ID'] = self.neutron_id headers['Orchestration-Service-ID'] = ORCHESTRATION_SERVICE_ID - headers[HASH_MATCH_HEADER] = self.mypool.consistency_hash or '' + if hash_handler: + # this will be excluded on calls that don't need hashes + # (e.g. topology sync, capability checks) + headers[HASH_MATCH_HEADER] = hash_handler.read_for_update() + else: + hash_handler = cdb.HashHandler() if 'keep-alive' in self.capabilities: headers['Connection'] = 'keep-alive' else: @@ -178,9 +184,7 @@ class ServerProxy(object): try: self.currentconn.request(action, uri, body, headers) response = self.currentconn.getresponse() - newhash = response.getheader(HASH_MATCH_HEADER) - if newhash: - self._put_consistency_hash(newhash) + hash_handler.put_hash(response.getheader(HASH_MATCH_HEADER)) respstr = response.read() respdata = respstr if response.status in self.success_codes: @@ -216,10 +220,6 @@ class ServerProxy(object): 'data': ret[3]}) return ret - def _put_consistency_hash(self, newhash): - self.mypool.consistency_hash = newhash - cdb.put_consistency_hash(newhash) - class ServerPool(object): @@ -235,6 +235,7 @@ class ServerPool(object): self.neutron_id = cfg.CONF.RESTPROXY.neutron_id self.base_uri = base_uri self.name = name + self.contexts = {} self.timeout = cfg.CONF.RESTPROXY.server_timeout self.always_reconnect = not cfg.CONF.RESTPROXY.cache_connections default_port = 8000 @@ -246,10 +247,6 @@ class ServerPool(object): self.get_topo_function = None self.get_topo_function_args = {} - # Hash to send to backend with request as expected previous - # state to verify consistency. - self.consistency_hash = cdb.get_consistency_hash() - if not servers: raise cfg.Error(_('Servers not defined. Aborting server manager.')) servers = [s if len(s.rsplit(':', 1)) == 2 @@ -268,6 +265,19 @@ class ServerPool(object): cfg.CONF.RESTPROXY.consistency_interval) LOG.debug(_("ServerPool: initialization done")) + def set_context(self, context): + # this context needs to be local to the greenthread + # so concurrent requests don't use the wrong context + self.contexts[eventlet.corolocal.get_ident()] = context + + def pop_context(self): + # Don't store these contexts after use. They should only + # last for one request. + try: + return self.contexts.pop(eventlet.corolocal.get_ident()) + except KeyError: + return None + def get_capabilities(self): # lookup on first try try: @@ -394,12 +404,14 @@ class ServerPool(object): @utils.synchronized('bsn-rest-call') def rest_call(self, action, resource, data, headers, ignore_codes, timeout=False): + hash_handler = cdb.HashHandler(context=self.pop_context()) good_first = sorted(self.servers, key=lambda x: x.failed) first_response = None for active_server in good_first: ret = active_server.rest_call(action, resource, data, headers, timeout, - reconnect=self.always_reconnect) + reconnect=self.always_reconnect, + hash_handler=hash_handler) # If inconsistent, do a full synchronization if ret[0] == httplib.CONFLICT: if not self.get_topo_function: diff --git a/neutron/tests/unit/bigswitch/test_servermanager.py b/neutron/tests/unit/bigswitch/test_servermanager.py index 3a54e315b..a6883d36b 100644 --- a/neutron/tests/unit/bigswitch/test_servermanager.py +++ b/neutron/tests/unit/bigswitch/test_servermanager.py @@ -352,7 +352,8 @@ class ServerManagerTests(test_rp.BigSwitchProxyPluginV2TestCase): # making a call should trigger a conflict sync pl.servers.rest_call('GET', '/', '', None, []) srestmock.assert_has_calls([ - mock.call('GET', '/', '', None, False, reconnect=True), + mock.call('GET', '/', '', None, False, reconnect=True, + hash_handler=mock.ANY), mock.call('PUT', '/topology', {'routers': [], 'networks': []}, timeout=None)