mongodb: switch to retrying, stop using global conf
Change-Id: I79aac151c60c880699abba2a7f54cb89f189ad88
This commit is contained in:
parent
5c995c3924
commit
a6db438577
|
@ -33,7 +33,10 @@ class Connection(pymongo_base.Connection):
|
|||
# We need that otherwise we overflow the MongoDB instance with new
|
||||
# connection since we instantiate a Pymongo client each time someone
|
||||
# requires a new storage connection.
|
||||
self.conn = self.CONNECTION_POOL.connect(url)
|
||||
self.conn = self.CONNECTION_POOL.connect(
|
||||
url,
|
||||
conf.database.max_retries,
|
||||
conf.database.retry_interval)
|
||||
|
||||
# Require MongoDB 2.4 to use $setOnInsert
|
||||
if self.conn.server_info()['versionArray'] < [2, 4]:
|
||||
|
|
|
@ -18,14 +18,13 @@
|
|||
"""Common functions for MongoDB backend
|
||||
"""
|
||||
|
||||
import time
|
||||
import weakref
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_utils import netutils
|
||||
import pymongo
|
||||
import pymongo.errors
|
||||
import retrying
|
||||
import six
|
||||
|
||||
from panko.i18n import _, _LI
|
||||
|
@ -121,7 +120,7 @@ class ConnectionPool(object):
|
|||
def __init__(self):
|
||||
self._pool = {}
|
||||
|
||||
def connect(self, url):
|
||||
def connect(self, url, max_retries, retry_interval):
|
||||
connection_options = pymongo.uri_parser.parse_uri(url)
|
||||
del connection_options['database']
|
||||
del connection_options['username']
|
||||
|
@ -137,54 +136,26 @@ class ConnectionPool(object):
|
|||
log_data = {'db': splitted_url.scheme,
|
||||
'nodelist': connection_options['nodelist']}
|
||||
LOG.info(_LI('Connecting to %(db)s on %(nodelist)s') % log_data)
|
||||
client = self._mongo_connect(url)
|
||||
self._pool[pool_key] = weakref.ref(client)
|
||||
return client
|
||||
|
||||
@staticmethod
|
||||
def _mongo_connect(url):
|
||||
try:
|
||||
return MongoProxy(pymongo.MongoClient(url))
|
||||
client = MongoProxy(pymongo.MongoClient(url),
|
||||
max_retries, retry_interval)
|
||||
except pymongo.errors.ConnectionFailure as e:
|
||||
LOG.warning(_('Unable to connect to the database server: '
|
||||
'%(errmsg)s.') % {'errmsg': e})
|
||||
raise
|
||||
self._pool[pool_key] = weakref.ref(client)
|
||||
return client
|
||||
|
||||
|
||||
def safe_mongo_call(call):
|
||||
def closure(*args, **kwargs):
|
||||
# NOTE(idegtiarov) options max_retries and retry_interval have been
|
||||
# registered in storage.__init__ in oslo_db.options.set_defaults
|
||||
# default values for both options are 10.
|
||||
max_retries = cfg.CONF.database.max_retries
|
||||
retry_interval = cfg.CONF.database.retry_interval
|
||||
attempts = 0
|
||||
while True:
|
||||
try:
|
||||
return call(*args, **kwargs)
|
||||
except pymongo.errors.AutoReconnect as err:
|
||||
if 0 <= max_retries <= attempts:
|
||||
LOG.error(_('Unable to reconnect to the primary mongodb '
|
||||
'after %(retries)d retries. Giving up.') %
|
||||
{'retries': max_retries})
|
||||
raise
|
||||
LOG.warning(_('Unable to reconnect to the primary '
|
||||
'mongodb: %(errmsg)s. Trying again in '
|
||||
'%(retry_interval)d seconds.') %
|
||||
{'errmsg': err, 'retry_interval': retry_interval})
|
||||
attempts += 1
|
||||
time.sleep(retry_interval)
|
||||
return closure
|
||||
def _safe_mongo_call(max_retries, retry_interval):
|
||||
return retrying.retry(
|
||||
retry_on_exception=lambda e: isinstance(
|
||||
e, pymongo.errors.AutoReconnect),
|
||||
wait_fixed=retry_interval * 1000,
|
||||
stop_max_attempt_number=max_retries if max_retries >= 0 else None
|
||||
)
|
||||
|
||||
|
||||
class MongoConn(object):
|
||||
def __init__(self, method):
|
||||
self.method = method
|
||||
|
||||
@safe_mongo_call
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.method(*args, **kwargs)
|
||||
|
||||
MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection)
|
||||
if not typ.startswith('_')])
|
||||
MONGO_METHODS.update(set([typ for typ in dir(pymongo.MongoClient)
|
||||
|
@ -194,8 +165,12 @@ MONGO_METHODS.update(set([typ for typ in dir(pymongo)
|
|||
|
||||
|
||||
class MongoProxy(object):
|
||||
def __init__(self, conn):
|
||||
def __init__(self, conn, max_retries, retry_interval):
|
||||
self.conn = conn
|
||||
self.max_retries = max_retries
|
||||
self.retry_interval = retry_interval
|
||||
self._recreate_index = _safe_mongo_call(
|
||||
self.max_retries, self.retry_interval)(self._recreate_index)
|
||||
|
||||
def __getitem__(self, item):
|
||||
"""Create and return proxy around the method in the connection.
|
||||
|
@ -208,7 +183,9 @@ class MongoProxy(object):
|
|||
# We need this modifying method to return a CursorProxy object so that
|
||||
# we can handle the Cursor next function to catch the AutoReconnect
|
||||
# exception.
|
||||
return CursorProxy(self.conn.find(*args, **kwargs))
|
||||
return CursorProxy(self.conn.find(*args, **kwargs),
|
||||
self.max_retries,
|
||||
self.retry_interval)
|
||||
|
||||
def create_index(self, keys, name=None, *args, **kwargs):
|
||||
try:
|
||||
|
@ -218,7 +195,6 @@ class MongoProxy(object):
|
|||
LOG.info(_LI("Index %s will be recreate.") % name)
|
||||
self._recreate_index(keys, name, *args, **kwargs)
|
||||
|
||||
@safe_mongo_call
|
||||
def _recreate_index(self, keys, name, *args, **kwargs):
|
||||
self.conn.drop_index(name)
|
||||
self.conn.create_index(keys, name=name, *args, **kwargs)
|
||||
|
@ -233,22 +209,25 @@ class MongoProxy(object):
|
|||
if item in ('name', 'database'):
|
||||
return getattr(self.conn, item)
|
||||
if item in MONGO_METHODS:
|
||||
return MongoConn(getattr(self.conn, item))
|
||||
return MongoProxy(getattr(self.conn, item))
|
||||
return _safe_mongo_call(
|
||||
self.max_retries, self.retry_interval
|
||||
)(getattr(self.conn, item))
|
||||
return MongoProxy(getattr(self.conn, item),
|
||||
self.max_retries, self.retry_interval)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.conn(*args, **kwargs)
|
||||
|
||||
|
||||
class CursorProxy(pymongo.cursor.Cursor):
|
||||
def __init__(self, cursor):
|
||||
def __init__(self, cursor, max_retry, retry_interval):
|
||||
self.cursor = cursor
|
||||
self.next = _safe_mongo_call(max_retry, retry_interval)(self._next)
|
||||
|
||||
def __getitem__(self, item):
|
||||
return self.cursor[item]
|
||||
|
||||
@safe_mongo_call
|
||||
def next(self):
|
||||
def _next(self):
|
||||
"""Wrap Cursor next method.
|
||||
|
||||
This method will be executed before each Cursor next method call.
|
||||
|
|
Loading…
Reference in New Issue