Merge "Retry to connect database when DB2 or mongodb is restarted"

This commit is contained in:
Jenkins 2014-12-08 10:13:21 +00:00 committed by Gerrit Code Review
commit 69702250de
2 changed files with 40 additions and 30 deletions

View File

@ -253,13 +253,12 @@ class ConnectionPool(object):
try:
if cfg.CONF.database.mongodb_replica_set:
client = MongoProxy(
Prefection(
pymongo.MongoReplicaSetClient(
url,
replicaSet=cfg.CONF.database.mongodb_replica_set)))
pymongo.MongoReplicaSetClient(
url,
replicaSet=cfg.CONF.database.mongodb_replica_set))
else:
client = MongoProxy(
Prefection(pymongo.MongoClient(url, safe=True)))
pymongo.MongoClient(url, safe=True))
return client
except pymongo.errors.ConnectionFailure as e:
LOG.warn(_('Unable to connect to the database server: '
@ -439,6 +438,12 @@ class MongoProxy(object):
"""
return MongoProxy(self.conn[item])
def find(self, *args, **kwargs):
# We need this modifying method to return a CursorProxy object so that
# we can handle the Cursor next function to catch the AutoReconnect
# exception.
return CursorProxy(self.conn.find(*args, **kwargs))
def __getattr__(self, item):
"""Wrap MongoDB connection.
@ -456,20 +461,25 @@ class MongoProxy(object):
return self.conn(*args, **kwargs)
class Prefection(pymongo.collection.Collection):
def __init__(self, conn):
self.conn = conn
class CursorProxy(pymongo.cursor.Cursor):
def __init__(self, cursor):
self.cursor = cursor
def find(self, *args, **kwargs):
# We need this modifying method to check a connection for MongoDB
# in context of MongoProxy approach. Initially 'find' returns Cursor
# object and doesn't connect to db while Cursor is not used.
found = self.find(*args, **kwargs)
def __getitem__(self, item):
return self.cursor[item]
@safe_mongo_call
def next(self):
"""Wrap Cursor next method.
This method will be executed before each Cursor next method call.
"""
try:
found[0]
except IndexError:
pass
return found
save_cursor = self.cursor.clone()
return self.cursor.next()
except pymongo.errors.AutoReconnect:
self.cursor = save_cursor
raise
def __getattr__(self, item):
return getattr(self.conn, item)
return getattr(self.cursor, item)

View File

@ -3505,11 +3505,11 @@ class MongoAutoReconnectTest(DBTestBase,
self.CONF.set_override('retry_interval', 0, group='database')
def test_mongo_client(self):
if self.CONF.database.mongodb_replica_set:
self.assertIsInstance(self.conn.conn.conn.conn,
if cfg.CONF.database.mongodb_replica_set:
self.assertIsInstance(self.conn.conn.conn,
pymongo.MongoReplicaSetClient)
else:
self.assertIsInstance(self.conn.conn.conn.conn,
self.assertIsInstance(self.conn.conn.conn,
pymongo.MongoClient)
@staticmethod
@ -3521,16 +3521,16 @@ class MongoAutoReconnectTest(DBTestBase,
return method(*args, **kwargs)
return side_effect
def test_mongo_find(self):
def test_mongo_cursor_next(self):
expected_first_sample_timestamp = datetime.datetime(2012, 7, 2, 10, 39)
raise_exc = [False, True]
method = self.conn.db.resource.find
with mock.patch('pymongo.collection.Collection.find',
mock.Mock()) as mock_find:
mock_find.side_effect = self.create_side_effect(method, raise_exc)
mock_find.__name__ = 'find'
resources = list(self.conn.get_resources())
self.assertEqual(9, len(resources))
method = self.conn.db.resource.find().cursor.next
with mock.patch('pymongo.cursor.Cursor.next',
mock.Mock()) as mock_next:
mock_next.side_effect = self.create_side_effect(method, raise_exc)
resource = self.conn.db.resource.find().next()
self.assertEqual(expected_first_sample_timestamp,
resource['first_sample_timestamp'])
def test_mongo_insert(self):
raise_exc = [False, True]