265 lines
9.1 KiB
Python
265 lines
9.1 KiB
Python
#
|
|
# Copyright Ericsson AB 2013. All rights reserved
|
|
#
|
|
# Authors: Ildiko Vancsa <ildiko.vancsa@ericsson.com>
|
|
# Balazs Gibizer <balazs.gibizer@ericsson.com>
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
"""Common functions for MongoDB backend
|
|
"""
|
|
|
|
import time
|
|
import weakref
|
|
|
|
from oslo_config import cfg
|
|
from oslo_log import log
|
|
from oslo_utils import netutils
|
|
import pymongo
|
|
import pymongo.errors
|
|
import six
|
|
|
|
from panko.i18n import _, _LI
|
|
|
|
ERROR_INDEX_WITH_DIFFERENT_SPEC_ALREADY_EXISTS = 86
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
EVENT_TRAIT_TYPES = {'none': 0, 'string': 1, 'integer': 2, 'float': 3,
|
|
'datetime': 4}
|
|
OP_SIGN = {'lt': '$lt', 'le': '$lte', 'ne': '$ne', 'gt': '$gt', 'ge': '$gte'}
|
|
|
|
MINIMUM_COMPATIBLE_MONGODB_VERSION = [2, 4]
|
|
COMPLETE_AGGREGATE_COMPATIBLE_VERSION = [2, 6]
|
|
|
|
|
|
def make_timestamp_range(start, end,
|
|
start_timestamp_op=None, end_timestamp_op=None):
|
|
|
|
"""Create the query document to find timestamps within that range.
|
|
|
|
This is done by given two possible datetimes and their operations.
|
|
By default, using $gte for the lower bound and $lt for the upper bound.
|
|
"""
|
|
ts_range = {}
|
|
|
|
if start:
|
|
if start_timestamp_op == 'gt':
|
|
start_timestamp_op = '$gt'
|
|
else:
|
|
start_timestamp_op = '$gte'
|
|
ts_range[start_timestamp_op] = start
|
|
|
|
if end:
|
|
if end_timestamp_op == 'le':
|
|
end_timestamp_op = '$lte'
|
|
else:
|
|
end_timestamp_op = '$lt'
|
|
ts_range[end_timestamp_op] = end
|
|
return ts_range
|
|
|
|
|
|
def make_events_query_from_filter(event_filter):
|
|
"""Return start and stop row for filtering and a query.
|
|
|
|
Query is based on the selected parameter.
|
|
|
|
:param event_filter: storage.EventFilter object.
|
|
"""
|
|
query = {}
|
|
q_list = []
|
|
ts_range = make_timestamp_range(event_filter.start_timestamp,
|
|
event_filter.end_timestamp)
|
|
if ts_range:
|
|
q_list.append({'timestamp': ts_range})
|
|
if event_filter.event_type:
|
|
q_list.append({'event_type': event_filter.event_type})
|
|
if event_filter.message_id:
|
|
q_list.append({'_id': event_filter.message_id})
|
|
|
|
if event_filter.traits_filter:
|
|
for trait_filter in event_filter.traits_filter:
|
|
op = trait_filter.pop('op', 'eq')
|
|
dict_query = {}
|
|
for k, v in six.iteritems(trait_filter):
|
|
if v is not None:
|
|
# All parameters in EventFilter['traits'] are optional, so
|
|
# we need to check if they are in the query or no.
|
|
if k == 'key':
|
|
dict_query.setdefault('trait_name', v)
|
|
elif k in ['string', 'integer', 'datetime', 'float']:
|
|
dict_query.setdefault('trait_type',
|
|
EVENT_TRAIT_TYPES[k])
|
|
dict_query.setdefault('trait_value',
|
|
v if op == 'eq'
|
|
else {OP_SIGN[op]: v})
|
|
dict_query = {'$elemMatch': dict_query}
|
|
q_list.append({'traits': dict_query})
|
|
if event_filter.admin_proj:
|
|
q_list.append({'$or': [
|
|
{'traits': {'$not': {'$elemMatch': {'trait_name': 'project_id'}}}},
|
|
{'traits': {
|
|
'$elemMatch': {'trait_name': 'project_id',
|
|
'trait_value': event_filter.admin_proj}}}]})
|
|
if q_list:
|
|
query = {'$and': q_list}
|
|
|
|
return query
|
|
|
|
|
|
class ConnectionPool(object):
|
|
|
|
def __init__(self):
|
|
self._pool = {}
|
|
|
|
def connect(self, url):
|
|
connection_options = pymongo.uri_parser.parse_uri(url)
|
|
del connection_options['database']
|
|
del connection_options['username']
|
|
del connection_options['password']
|
|
del connection_options['collection']
|
|
pool_key = tuple(connection_options)
|
|
|
|
if pool_key in self._pool:
|
|
client = self._pool.get(pool_key)()
|
|
if client:
|
|
return client
|
|
splitted_url = netutils.urlsplit(url)
|
|
log_data = {'db': splitted_url.scheme,
|
|
'nodelist': connection_options['nodelist']}
|
|
LOG.info(_LI('Connecting to %(db)s on %(nodelist)s') % log_data)
|
|
client = self._mongo_connect(url)
|
|
self._pool[pool_key] = weakref.ref(client)
|
|
return client
|
|
|
|
@staticmethod
|
|
def _mongo_connect(url):
|
|
try:
|
|
return MongoProxy(pymongo.MongoClient(url))
|
|
except pymongo.errors.ConnectionFailure as e:
|
|
LOG.warning(_('Unable to connect to the database server: '
|
|
'%(errmsg)s.') % {'errmsg': e})
|
|
raise
|
|
|
|
|
|
def safe_mongo_call(call):
|
|
def closure(*args, **kwargs):
|
|
# NOTE(idegtiarov) options max_retries and retry_interval have been
|
|
# registered in storage.__init__ in oslo_db.options.set_defaults
|
|
# default values for both options are 10.
|
|
max_retries = cfg.CONF.database.max_retries
|
|
retry_interval = cfg.CONF.database.retry_interval
|
|
attempts = 0
|
|
while True:
|
|
try:
|
|
return call(*args, **kwargs)
|
|
except pymongo.errors.AutoReconnect as err:
|
|
if 0 <= max_retries <= attempts:
|
|
LOG.error(_('Unable to reconnect to the primary mongodb '
|
|
'after %(retries)d retries. Giving up.') %
|
|
{'retries': max_retries})
|
|
raise
|
|
LOG.warning(_('Unable to reconnect to the primary '
|
|
'mongodb: %(errmsg)s. Trying again in '
|
|
'%(retry_interval)d seconds.') %
|
|
{'errmsg': err, 'retry_interval': retry_interval})
|
|
attempts += 1
|
|
time.sleep(retry_interval)
|
|
return closure
|
|
|
|
|
|
class MongoConn(object):
|
|
def __init__(self, method):
|
|
self.method = method
|
|
|
|
@safe_mongo_call
|
|
def __call__(self, *args, **kwargs):
|
|
return self.method(*args, **kwargs)
|
|
|
|
MONGO_METHODS = set([typ for typ in dir(pymongo.collection.Collection)
|
|
if not typ.startswith('_')])
|
|
MONGO_METHODS.update(set([typ for typ in dir(pymongo.MongoClient)
|
|
if not typ.startswith('_')]))
|
|
MONGO_METHODS.update(set([typ for typ in dir(pymongo)
|
|
if not typ.startswith('_')]))
|
|
|
|
|
|
class MongoProxy(object):
|
|
def __init__(self, conn):
|
|
self.conn = conn
|
|
|
|
def __getitem__(self, item):
|
|
"""Create and return proxy around the method in the connection.
|
|
|
|
:param item: name of the connection
|
|
"""
|
|
return MongoProxy(self.conn[item])
|
|
|
|
def find(self, *args, **kwargs):
|
|
# We need this modifying method to return a CursorProxy object so that
|
|
# we can handle the Cursor next function to catch the AutoReconnect
|
|
# exception.
|
|
return CursorProxy(self.conn.find(*args, **kwargs))
|
|
|
|
def create_index(self, keys, name=None, *args, **kwargs):
|
|
try:
|
|
self.conn.create_index(keys, name=name, *args, **kwargs)
|
|
except pymongo.errors.OperationFailure as e:
|
|
if e.code is ERROR_INDEX_WITH_DIFFERENT_SPEC_ALREADY_EXISTS:
|
|
LOG.info(_LI("Index %s will be recreate.") % name)
|
|
self._recreate_index(keys, name, *args, **kwargs)
|
|
|
|
@safe_mongo_call
|
|
def _recreate_index(self, keys, name, *args, **kwargs):
|
|
self.conn.drop_index(name)
|
|
self.conn.create_index(keys, name=name, *args, **kwargs)
|
|
|
|
def __getattr__(self, item):
|
|
"""Wrap MongoDB connection.
|
|
|
|
If item is the name of an executable method, for example find or
|
|
insert, wrap this method in the MongoConn.
|
|
Else wrap getting attribute with MongoProxy.
|
|
"""
|
|
if item in ('name', 'database'):
|
|
return getattr(self.conn, item)
|
|
if item in MONGO_METHODS:
|
|
return MongoConn(getattr(self.conn, item))
|
|
return MongoProxy(getattr(self.conn, item))
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
return self.conn(*args, **kwargs)
|
|
|
|
|
|
class CursorProxy(pymongo.cursor.Cursor):
|
|
def __init__(self, cursor):
|
|
self.cursor = cursor
|
|
|
|
def __getitem__(self, item):
|
|
return self.cursor[item]
|
|
|
|
@safe_mongo_call
|
|
def next(self):
|
|
"""Wrap Cursor next method.
|
|
|
|
This method will be executed before each Cursor next method call.
|
|
"""
|
|
try:
|
|
save_cursor = self.cursor.clone()
|
|
return self.cursor.next()
|
|
except pymongo.errors.AutoReconnect:
|
|
self.cursor = save_cursor
|
|
raise
|
|
|
|
def __getattr__(self, item):
|
|
return getattr(self.cursor, item)
|