Replace mongo aggregation with plain ol' map-reduce

Fixes bug 1262571

Previously, the mongodb storage driver an aggregation pipeline
over the meter collection in order to construct a list of resources
adorned with first & last sample timestamps etc.

However mongodb aggregation framework performs sorting in-memory,
in this case operating over a potentially very large collection.
It is also hardcoded to abort any sorts in an aggregation pipeline
that will consume more than 10% of physical memory, which is
observed in this case.

Now, we avoid the aggregation framework altogether and instead
use an equivalent map-reduce.

Change-Id: Ibef4a95acada411af385ff75ccb36c5724068b59
This commit is contained in:
Eoghan Glynn 2014-01-09 16:30:10 +00:00
parent 31d4c0ced4
commit ba6641afac
2 changed files with 60 additions and 30 deletions

View File

@ -24,6 +24,7 @@ import calendar
import copy
import json
import operator
import uuid
import weakref
import bson.code
@ -302,6 +303,39 @@ class Connection(base.Connection):
SORT_OPERATION_MAPPING = {'desc': (pymongo.DESCENDING, '$lt'),
'asc': (pymongo.ASCENDING, '$gt')}
MAP_RESOURCES = bson.code.Code("""
function () {
emit(this.resource_id,
{user_id: this.user_id,
project_id: this.project_id,
source: this.source,
first_timestamp: this.timestamp,
last_timestamp: this.timestamp,
metadata: this.resource_metadata})
}""")
REDUCE_RESOURCES = bson.code.Code("""
function (key, values) {
var merge = {user_id: values[0].user_id,
project_id: values[0].project_id,
source: values[0].source,
first_timestamp: values[0].first_timestamp,
last_timestamp: values[0].last_timestamp,
metadata: values[0].metadata}
values.forEach(function(value) {
if (merge.first_timestamp - value.first_timestamp > 0) {
merge.first_timestamp = value.first_timestamp;
merge.user_id = value.user_id;
merge.project_id = value.project_id;
merge.source = value.source;
} else if (merge.last_timestamp - value.last_timestamp <= 0) {
merge.last_timestamp = value.last_timestamp;
merge.metadata = value.metadata;
}
});
return merge;
}""")
def __init__(self, conf):
url = conf.database.connection
@ -311,7 +345,7 @@ class Connection(base.Connection):
# requires a new storage connection.
self.conn = self.CONNECTION_POOL.connect(url)
# Require MongoDB 2.2 to use aggregate() and TTL
# Require MongoDB 2.2 to use TTL
if self.conn.server_info()['versionArray'] < [2, 2]:
raise storage.StorageBadVersion("Need at least MongoDB 2.2")
@ -635,33 +669,29 @@ class Connection(base.Connection):
sort_keys = base._handle_sort_key('resource')
sort_instructions = self._build_sort_instructions(sort_keys)[0]
aggregate = self.db.meter.aggregate([
{"$match": q},
{"$sort": dict(sort_instructions)},
{"$group": {
"_id": "$resource_id",
"user_id": {"$first": "$user_id"},
"project_id": {"$first": "$project_id"},
"source": {"$first": "$source"},
"first_sample_timestamp": {"$min": "$timestamp"},
"last_sample_timestamp": {"$max": "$timestamp"},
"metadata": {"$first": "$resource_metadata"},
"meters_name": {"$push": "$counter_name"},
"meters_type": {"$push": "$counter_type"},
"meters_unit": {"$push": "$counter_unit"},
}},
])
# use a unique collection name for the results collection,
# as result post-sorting (as oppposed to reduce pre-sorting)
# is not possible on an inline M-R
out = 'resource_list_%s' % uuid.uuid4()
self.db.meter.map_reduce(self.MAP_RESOURCES,
self.REDUCE_RESOURCES,
out=out,
sort={'resource_id': 1},
query=q)
for result in aggregate['result']:
yield models.Resource(
resource_id=result['_id'],
user_id=result['user_id'],
project_id=result['project_id'],
first_sample_timestamp=result['first_sample_timestamp'],
last_sample_timestamp=result['last_sample_timestamp'],
source=result['source'],
metadata=result['metadata'],
)
try:
for r in self.db[out].find(sort=sort_instructions):
resource = r['value']
yield models.Resource(
resource_id=r['_id'],
user_id=resource['user_id'],
project_id=resource['project_id'],
first_sample_timestamp=resource['first_timestamp'],
last_sample_timestamp=resource['last_timestamp'],
source=resource['source'],
metadata=resource['metadata'])
finally:
self.db[out].drop()
def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery={}, pagination=None):

View File

@ -130,9 +130,9 @@ class TestListResources(FunctionalTest,
def test_instance_multiple_samples(self):
timestamps = [
datetime.datetime(2012, 7, 2, 10, 40),
datetime.datetime(2012, 7, 2, 10, 41),
datetime.datetime(2012, 7, 2, 10, 42),
datetime.datetime(2012, 7, 2, 10, 40),
]
for timestamp in timestamps:
datapoint = sample.Sample(
@ -145,7 +145,7 @@ class TestListResources(FunctionalTest,
'resource-id',
timestamp=timestamp,
resource_metadata={'display_name': 'test-server',
'tag': 'self.sample',
'tag': 'self.sample-%s' % timestamp,
},
source='test',
)
@ -157,7 +157,7 @@ class TestListResources(FunctionalTest,
data = self.get_json('/resources')
self.assertEqual(1, len(data))
self._verify_sample_timestamps(data[0], timestamps[0], timestamps[-1])
self._verify_sample_timestamps(data[0], timestamps[-1], timestamps[1])
def test_instances_one(self):
sample1 = sample.Sample(