Add MongoDB engine.

This change implements the first storage backend for
ceilometer using MongoDB and makes it the default
(instead of 'log').

- require pymongo and ming for tests
- clean up entry point registration using textwrap
- sketch in a simple data dump tool (tools/show_data.py)

Change-Id: I0e3763749e88a71bc57b263ea79fff8d065c03ce
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
This commit is contained in:
Doug Hellmann 2012-06-29 15:17:06 -04:00
parent 022ecffe56
commit 4ee4a6895f
8 changed files with 836 additions and 5 deletions

View File

@ -29,7 +29,7 @@ STORAGE_ENGINE_NAMESPACE = 'ceilometer.storage'
STORAGE_OPTS = [
cfg.StrOpt('metering_storage_engine',
default='log',
default='mongodb',
help='The name of the storage engine to use',
),
]

View File

@ -0,0 +1,319 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""MongoDB storage backend
"""
import datetime
from ceilometer import log
from ceilometer.openstack.common import cfg
from ceilometer.storage import base
import bson.code
import pymongo
LOG = log.getLogger(__name__)
class MongoDBStorage(base.StorageEngine):
"""Put the data into a MongoDB database
Collections:
- user
- { _id: user id
source: [ array of source ids reporting for the user ]
}
- project
- { _id: project id
source: [ array of source ids reporting for the project ]
}
- meter
- the raw incoming data
- resource
- the metadata for resources
- { _id: uuid of resource,
metadata: metadata dictionaries
timestamp: datetime of last update
user_id: uuid
project_id: uuid
meter: [ array of {counter_name: string, counter_type: string} ]
}
"""
OPTIONS = [
cfg.StrOpt('mongodb_dbname',
default='ceilometer',
help='Database name',
),
cfg.StrOpt('mongodb_host',
default='localhost',
help='hostname or IP of server running MongoDB',
),
cfg.IntOpt('mongodb_port',
default=27017,
help='port number where MongoDB is running',
),
]
def register_opts(self, conf):
"""Register any configuration options used by this engine.
"""
conf.register_opts(self.OPTIONS)
def get_connection(self, conf):
"""Return a Connection instance based on the configuration settings.
"""
return Connection(conf)
def make_query_from_filter(event_filter, require_meter=True):
"""Return a query dictionary based on the settings in the filter.
:param filter: EventFilter instance
:param require_meter: If true and the filter does not have a meter,
raise an error.
"""
q = {}
if event_filter.user:
q['user_id'] = event_filter.user
elif event_filter.project:
q['project_id'] = event_filter.project
else:
# NOTE(dhellmann): The EventFilter class should have detected
# this case already, but just in case someone passes something
# that isn't actually an EventFilter instance...
raise RuntimeError('One of "user" or "project" is required')
if event_filter.meter:
q['counter_name'] = event_filter.meter
elif require_meter:
raise RuntimeError('Missing required meter specifier')
if event_filter.start:
q['timestamp'] = {'$gte': event_filter.start}
if event_filter.end:
q['timestamp'] = {'$lt': event_filter.end}
if event_filter.resource:
q['resource_id'] = event_filter.resource
if event_filter.source:
q['source'] = event_filter.source
return q
class Connection(base.Connection):
"""MongoDB connection.
"""
# JavaScript function for doing map-reduce to get a counter volume
# total.
MAP_COUNTER_VOLUME = bson.code.Code("""
function() {
emit(this.resource_id, this.counter_volume);
}
""")
# JavaScript function for doing map-reduce to get a counter
# duration total.
MAP_COUNTER_DURATION = bson.code.Code("""
function() {
emit(this.resource_id, this.counter_duration);
}
""")
# JavaScript function for doing map-reduce to get a maximum value
# from a range. (from
# http://cookbook.mongodb.org/patterns/finding_max_and_min/)
REDUCE_MAX = bson.code.Code("""
function (key, values) {
return Math.max.apply(Math, values);
}
""")
# JavaScript function for doing map-reduce to get a sum.
REDUCE_SUM = bson.code.Code("""
function (key, values) {
var total = 0;
for (var i = 0; i < values.length; i++) {
total += values[i];
}
return total;
}
""")
def __init__(self, conf):
LOG.info('connecting to MongoDB on %s:%s',
conf.mongodb_host, conf.mongodb_port)
self.conn = self._get_connection(conf)
self.db = getattr(self.conn, conf.mongodb_dbname)
return
def _get_connection(self, conf):
"""Return a connection to the database.
.. note::
The tests use a subclass to override this and return an
in-memory connection.
"""
return pymongo.Connection(conf.mongodb_host,
conf.mongodb_port,
safe=True,
)
def record_metering_data(self, data):
"""Write the data to the backend storage system.
:param data: a dictionary such as returned by
ceilometer.meter.meter_message_from_counter
"""
# Make sure we know about the user and project
self.db.user.update(
{'_id': data['user_id']},
{'$addToSet': {'source': data['source'],
},
},
upsert=True,
)
self.db.project.update(
{'_id': data['project_id']},
{'$addToSet': {'source': data['source'],
},
},
upsert=True,
)
# Record the updated resource metadata
timestamp = datetime.datetime.utcnow()
self.db.resource.update(
{'_id': data['resource_id']},
{'$set': {'project_id': data['project_id'],
'user_id': data['user_id'],
# Current metadata being used and when it was
# last updated.
'timestamp': timestamp,
'metadata': data['resource_metadata'],
},
'$addToSet': {'meter': {'counter_name': data['counter_name'],
'counter_type': data['counter_type'],
},
},
},
upsert=True,
)
# Record the raw data for the event
self.db.meter.insert(data)
return
def get_users(self, source=None):
"""Return an iterable of user id strings.
:param source: Optional source filter.
"""
q = {}
if source is not None:
q['source'] = source
return self.db.user.distinct('_id')
def get_projects(self, source=None):
"""Return an iterable of project id strings.
:param source: Optional source filter.
"""
q = {}
if source is not None:
q['source'] = source
return self.db.project.distinct('_id')
def get_resources(self, user=None, project=None, source=None):
"""Return an iterable of dictionaries containing resource information.
{ 'resource_id': UUID of the resource,
'project_id': UUID of project owning the resource,
'user_id': UUID of user owning the resource,
'timestamp': UTC datetime of last update to the resource,
'metadata': most current metadata for the resource,
'meter': list of the meters reporting data for the resource,
}
:param user: Optional resource owner.
:param project: Optional resource owner.
:param source: Optional source filter.
"""
q = {}
if user is not None:
q['user_id'] = user
if project is not None:
q['project_id'] = project
if source is not None:
q['source'] = source
for resource in self.db.resource.find(q):
r = {}
r.update(resource)
r['resource_id'] = r['_id']
del r['_id']
yield r
def get_raw_events(self, event_filter):
"""Return an iterable of event data.
"""
q = make_query_from_filter(event_filter, require_meter=False)
events = self.db.meter.find(q)
return events
def get_volume_sum(self, event_filter):
"""Return the sum of the volume field for the events
described by the query parameters.
"""
q = make_query_from_filter(event_filter)
results = self.db.meter.map_reduce(self.MAP_COUNTER_VOLUME,
self.REDUCE_SUM,
{'inline': 1},
query=q,
)
return ({'resource_id': r['_id'], 'value': r['value']}
for r in results['results'])
def get_volume_max(self, event_filter):
"""Return the maximum of the volume field for the events
described by the query parameters.
"""
q = make_query_from_filter(event_filter)
results = self.db.meter.map_reduce(self.MAP_COUNTER_VOLUME,
self.REDUCE_MAX,
{'inline': 1},
query=q,
)
return ({'resource_id': r['_id'], 'value': r['value']}
for r in results['results'])
def get_duration_sum(self, event_filter):
"""Return the sum of time for the events described by the
query parameters.
"""
q = make_query_from_filter(event_filter)
results = self.db.meter.map_reduce(self.MAP_COUNTER_DURATION,
self.REDUCE_MAX,
{'inline': 1},
query=q,
)
return ({'resource_id': r['_id'], 'value': r['value']}
for r in results['results'])

View File

@ -17,6 +17,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import textwrap
import setuptools
setuptools.setup(
@ -31,7 +33,7 @@ setuptools.setup(
test_suite='nose.collector',
scripts=['bin/ceilometer-agent', 'bin/ceilometer-collector'],
py_modules=[],
entry_points="""
entry_points=textwrap.dedent("""
[ceilometer.collector.compute]
instance = ceilometer.compute.notifications:InstanceNotifications
@ -42,5 +44,6 @@ setuptools.setup(
[ceilometer.storage]
log = ceilometer.storage.impl_log:LogStorage
""",
mongodb = ceilometer.storage.impl_mongodb:MongoDBStorage
"""),
)

View File

@ -0,0 +1,369 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/storage/impl_mongodb.py
.. note::
(dhellmann) These tests have some dependencies which cannot be
installed in the CI environment right now.
Ming is necessary to provide the Mongo-in-memory implementation for
of MongoDB. The original source for Ming is at
http://sourceforge.net/project/merciless but there does not seem to
be a way to point to a "zipball" of the latest HEAD there, and we
need features present only in that version. I forked the project to
github to make it easier to install, and put the URL into the
test-requires file. Then I ended up making some changes to it so it
would be compatible with PyMongo's API.
https://github.com/dreamhost/Ming/zipball/master#egg=Ming
In order to run the tests that use map-reduce with MIM, some
additional system-level packages are required::
apt-get install nspr-config
apt-get install libnspr4-dev
apt-get install pkg-config
pip install python-spidermonkey
To run the tests *without* mim, set the environment variable
CEILOMETER_TEST_LIVE=1 before running tox.
"""
import datetime
import logging
import os
import unittest
from ming import mim
import mox
from nose.plugins import skip
from ceilometer import counter
from ceilometer import meter
from ceilometer import storage
from ceilometer.storage import impl_mongodb
LOG = logging.getLogger(__name__)
class Connection(impl_mongodb.Connection):
def _get_connection(self, conf):
# Use a real MongoDB server if we can connect, but fall back
# to a Mongo-in-memory connection if we cannot.
self.force_mongo = bool(int(os.environ.get('CEILOMETER_TEST_LIVE', 0)))
if self.force_mongo:
try:
return super(Connection, self)._get_connection(conf)
except:
LOG.debug('Unable to connect to mongod')
raise
else:
LOG.debug('Unable to connect to mongod, falling back to MIM')
return mim.Connection()
class MongoDBEngineTestBase(unittest.TestCase):
def setUp(self):
super(MongoDBEngineTestBase, self).setUp()
self.conf = mox.Mox().CreateMockAnything()
self.conf.metering_storage_engine = 'mongodb'
self.conf.mongodb_host = 'localhost'
self.conf.mongodb_port = 27017
self.conf.mongodb_dbname = 'testdb'
self.conn = Connection(self.conf)
self.conn.conn.drop_database(self.conf.mongodb_dbname)
self.db = self.conn.conn[self.conf.mongodb_dbname]
self.conn.db = self.db
self.counter = counter.Counter(
'test',
'instance',
'cumulative',
1,
'user-id',
'project-id',
'resource-id',
timestamp=datetime.datetime(2012, 7, 2, 10, 40),
duration=0,
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter',
}
)
self.msg = meter.meter_message_from_counter(self.counter)
self.conn.record_metering_data(self.msg)
self.counter2 = counter.Counter(
'test',
'instance',
'cumulative',
1,
'user-id',
'project-id',
'resource-id-alternate',
timestamp=datetime.datetime(2012, 7, 2, 10, 41),
duration=0,
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter2',
}
)
self.msg2 = meter.meter_message_from_counter(self.counter2)
self.conn.record_metering_data(self.msg2)
for i in range(2, 4):
c = counter.Counter(
'test',
'instance',
'cumulative',
1,
'user-id-%s' % i,
'project-id-%s' % i,
'resource-id-%s' % i,
timestamp=datetime.datetime(2012, 7, 2, 10, 40 + i),
duration=0,
resource_metadata={'display_name': 'test-server',
'tag': 'counter-%s' % i,
}
)
msg = meter.meter_message_from_counter(c)
self.conn.record_metering_data(msg)
class UserTest(MongoDBEngineTestBase):
def test_new_user(self):
user = self.db.user.find_one({'_id': 'user-id'})
assert user is not None
def test_new_user_source(self):
user = self.db.user.find_one({'_id': 'user-id'})
assert 'source' in user
assert user['source'] == ['test']
def test_get_users(self):
users = self.conn.get_users()
assert set(users) == set(['user-id', 'user-id-2', 'user-id-3'])
class ProjectTest(MongoDBEngineTestBase):
def test_new_project(self):
project = self.db.project.find_one({'_id': 'project-id'})
assert project is not None
def test_new_project_source(self):
project = self.db.project.find_one({'_id': 'project-id'})
assert 'source' in project
assert project['source'] == ['test']
def test_get_projects(self):
projects = self.conn.get_projects()
expected = set(['project-id', 'project-id-2', 'project-id-3'])
assert set(projects) == expected
class ResourceTest(MongoDBEngineTestBase):
def test_new_resource(self):
resource = self.db.resource.find_one({'_id': 'resource-id'})
assert resource is not None
def test_new_resource_project(self):
resource = self.db.resource.find_one({'_id': 'resource-id'})
assert 'project_id' in resource
assert resource['project_id'] == 'project-id'
def test_new_resource_user(self):
resource = self.db.resource.find_one({'_id': 'resource-id'})
assert 'user_id' in resource
assert resource['user_id'] == 'user-id'
def test_new_resource_meter(self):
resource = self.db.resource.find_one({'_id': 'resource-id'})
assert 'meter' in resource
assert resource['meter'] == [{'counter_name': 'instance',
'counter_type': 'cumulative',
}]
def test_new_resource_metadata(self):
resource = self.db.resource.find_one({'_id': 'resource-id'})
assert 'metadata' in resource
def test_get_resources(self):
resources = list(self.conn.get_resources())
assert len(resources) == 4
for resource in resources:
if resource['resource_id'] != 'resource-id':
continue
assert resource['resource_id'] == 'resource-id'
assert resource['project_id'] == 'project-id'
assert resource['user_id'] == 'user-id'
assert 'metadata' in resource
assert resource['meter'] == [{'counter_name': 'instance',
'counter_type': 'cumulative',
}]
break
else:
assert False, 'Never found resource-id'
def test_get_resources_by_user(self):
resources = list(self.conn.get_resources(user='user-id'))
assert len(resources) == 2
ids = set(r['resource_id'] for r in resources)
assert ids == set(['resource-id', 'resource-id-alternate'])
def test_get_resources_by_project(self):
resources = list(self.conn.get_resources(project='project-id'))
assert len(resources) == 2
ids = set(r['resource_id'] for r in resources)
assert ids == set(['resource-id', 'resource-id-alternate'])
class MeterTest(MongoDBEngineTestBase):
def test_new_meter(self):
meter = self.db.meter.find_one()
assert meter is not None
def test_get_raw_events_by_user(self):
f = storage.EventFilter(user='user-id')
results = list(self.conn.get_raw_events(f))
assert len(results) == 2
for meter in results:
assert meter in [self.msg, self.msg2]
def test_get_raw_events_by_project(self):
f = storage.EventFilter(project='project-id')
results = list(self.conn.get_raw_events(f))
assert results
for meter in results:
assert meter in [self.msg, self.msg2]
def test_get_raw_events_by_resource(self):
f = storage.EventFilter(user='user-id', resource='resource-id')
results = list(self.conn.get_raw_events(f))
assert results
meter = results[0]
assert meter is not None
assert meter == self.msg
def test_get_raw_events_by_start_time(self):
f = storage.EventFilter(
user='user-id',
start=datetime.datetime(2012, 7, 2, 10, 41),
)
results = list(self.conn.get_raw_events(f))
length = len(results)
assert length == 1
assert results[0]['timestamp'] == datetime.datetime(2012, 7, 2, 10, 41)
def test_get_raw_events_by_end_time(self):
f = storage.EventFilter(
user='user-id',
end=datetime.datetime(2012, 7, 2, 10, 41),
)
results = list(self.conn.get_raw_events(f))
length = len(results)
assert length == 1
assert results[0]['timestamp'] == datetime.datetime(2012, 7, 2, 10, 40)
def test_get_raw_events_by_meter(self):
f = storage.EventFilter(
user='user-id',
meter='no-such-meter',
)
results = list(self.conn.get_raw_events(f))
assert not results
def test_get_raw_events_by_meter2(self):
f = storage.EventFilter(
user='user-id',
meter='instance',
)
results = list(self.conn.get_raw_events(f))
assert results
class SumTest(MongoDBEngineTestBase):
def setUp(self):
super(SumTest, self).setUp()
# NOTE(dhellmann): mim requires spidermonkey to implement the
# map-reduce functions, so if we can't import it then just
# skip these tests unless we aren't using mim.
try:
import spidermonkey
except:
if isinstance(self.conn.conn, mim.Connection):
raise skip.SkipTest('requires spidermonkey')
def test_by_user(self):
f = storage.EventFilter(
user='user-id',
meter='instance',
)
results = list(self.conn.get_volume_sum(f))
assert results
counts = dict((r['resource_id'], r['value'])
for r in results)
assert counts['resource-id'] == 1
assert counts['resource-id-alternate'] == 1
assert set(counts.keys()) == set(['resource-id',
'resource-id-alternate'])
def test_by_project(self):
f = storage.EventFilter(
project='project-id',
meter='instance',
)
results = list(self.conn.get_volume_sum(f))
assert results
counts = dict((r['resource_id'], r['value'])
for r in results)
assert counts['resource-id'] == 1
assert counts['resource-id-alternate'] == 1
assert set(counts.keys()) == set(['resource-id',
'resource-id-alternate'])
def test_one_resource(self):
f = storage.EventFilter(
user='user-id',
meter='instance',
resource='resource-id',
)
results = list(self.conn.get_volume_sum(f))
assert results
counts = dict((r['resource_id'], r['value'])
for r in results)
assert counts['resource-id'] == 1
assert set(counts.keys()) == set(['resource-id'])
def test_make_query_without_user_or_project():
f = storage.EventFilter(user='set')
f.user = None # to bypass the check in the filter class
try:
impl_mongodb.make_query_from_filter(f)
except RuntimeError as err:
assert '"user"' in str(err)
assert '"project"' in str(err)

View File

@ -1,5 +1,5 @@
#https://github.com/openstack/nova/zipball/master#egg=nova
# Work-around for packaging issue in nova:
# Work-around for packaging issue in nova (https://bugs.launchpad.net/openstack-ci/+bug/1019423)
http://nova.openstack.org/tarballs/nova-2012.2~f2~20120629.14648.tar.gz
webob
kombu

132
tools/show_data.py Executable file
View File

@ -0,0 +1,132 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network (DreamHost)
#
# Author: Doug Hellmann <doug.hellmann@dreamhost.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from ceilometer import storage
from ceilometer.openstack.common import cfg
def show_users(db, args):
for u in sorted(db.get_users()):
print u
def show_resources(db, args):
if args:
users = args
else:
users = sorted(db.get_users())
for u in users:
print u
for resource in db.get_resources(user=u):
print ' %(resource_id)s %(timestamp)s' % resource
for k, v in sorted(resource['metadata'].iteritems()):
print ' %-10s : %s' % (k, v)
for meter in resource['meter']:
# FIXME(dhellmann): Need a way to tell whether to use
# max() or sum() by meter name without hard-coding.
if meter['counter_name'] in ['cpu', 'disk']:
totals = db.get_volume_max(storage.EventFilter(
user=u,
meter=meter['counter_name'],
resource=resource['resource_id'],
))
else:
totals = db.get_volume_sum(storage.EventFilter(
user=u,
meter=meter['counter_name'],
resource=resource['resource_id'],
))
print ' %s (%s): %s' % \
(meter['counter_name'], meter['counter_type'],
totals.next()['value'])
def show_total_resources(db, args):
if args:
users = args
else:
users = sorted(db.get_users())
for u in users:
print u
for meter in ['disk', 'cpu', 'instance']:
if meter in ['cpu', 'disk']:
total = db.get_volume_max(storage.EventFilter(
user=u,
meter=meter,
))
else:
total = db.get_volume_sum(storage.EventFilter(
user=u,
meter=meter,
))
for t in total:
print ' ', meter, t['resource_id'], t['value']
def show_raw(db, args):
fmt = ' %(timestamp)s %(counter_name)10s %(counter_volume)s'
for u in sorted(db.get_users()):
print u
for resource in db.get_resources(user=u):
print ' ', resource['resource_id']
for event in db.get_raw_events(storage.EventFilter(
user=u,
resource=resource['resource_id'],
)):
print fmt % event
def show_help(db, args):
print 'COMMANDS:'
for name in sorted(COMMANDS.keys()):
print name
def show_projects(db, args):
for u in sorted(db.get_projects()):
print u
COMMANDS = {
'users': show_users,
'projects': show_projects,
'help': show_help,
'resources': show_resources,
'total_resources': show_total_resources,
'raw': show_raw,
}
def main(argv):
extra_args = cfg.CONF(
sys.argv[1:],
# NOTE(dhellmann): Read the configuration file(s) for the
#ceilometer collector by default.
default_config_files=['/etc/ceilometer-collector.conf'],
)
storage.register_opts(cfg.CONF)
db = storage.get_connection(cfg.CONF)
command = extra_args[0] if extra_args else 'help'
COMMANDS[command](db, extra_args[1:])
if __name__ == '__main__':
main(sys.argv)

View File

@ -3,3 +3,11 @@ coverage
pep8>=1.0
mox
glance>=2011.3.1
# NOTE(dhellmann): Ming is necessary to provide the Mongo-in-memory
# implementation for of MongoDB. The original source for Ming is at
# http://sourceforge.net/project/merciless but there does not seem to
# be a way to point to a "zipball" of the latest HEAD there, and we
# need features present only in that version. I forked the project to
# github to make it easier to install, then ended up making some
# changes to it so it would be compatible with PyMongo's API.
https://github.com/dreamhost/Ming/zipball/master#egg=Ming

View File

@ -14,7 +14,7 @@ commands = {toxinidir}/run_tests.sh
sitepackages = True
[testenv:py27]
commands = {toxinidir}/run_tests.sh --with-coverage --cover-erase --cover-package=ceilometer --cover-inclusive
commands = {toxinidir}/run_tests.sh --with-coverage --cover-erase --cover-package=ceilometer --cover-inclusive []
[testenv:pep8]
deps = pep8==1.1