diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py new file mode 100644 index 0000000000..db17bf7aa8 --- /dev/null +++ b/ceilometer/storage/impl_hbase.py @@ -0,0 +1,661 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2012, 2013 Dell Inc. +# +# Author: Stas Maksimov +# Author: Shengjie Min +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Openstack Ceilometer HBase storage backend + +.. note:: + This driver is designed to enable Ceilometer store its data in HBase. + The implementation is using HBase Thrift interface so it's necessary to have + the HBase Thrift server installed and started: + (https://ccp.cloudera.com/display/CDHDOC/HBase+Installation) + + This driver has been tested against HBase 0.92.1/CDH 4.1.1, + HBase 0.94.4/HDP 1.2 and HBase 0.94.5/Apache. + Versions earlier than 0.92.1 are not supported due to feature + incompatibility. + + Due to limitations of HBase the driver implements its own data aggregations + which may harm its performance. It is likely that the performance could be + improved if co-processors were used, however at the moment the co-processor + support is not exposed through Thrift API. + + The following four tables are expected to exist in HBase: + create 'project', {NAME=>'f'} + create 'user', {NAME=>'f'} + create 'resource', {NAME=>'f'} + create 'meter', {NAME=>'f'} + + The driver is using HappyBase which is a wrapper library used to interact + with HBase via Thrift protocol: + http://happybase.readthedocs.org/en/latest/index.html# + +""" + +from urlparse import urlparse +import json +import hashlib +import copy +import datetime +import happybase +from collections import defaultdict + +from oslo.config import cfg + +from ceilometer.openstack.common import log, timeutils +from ceilometer.storage import base + +LOG = log.getLogger(__name__) + + +class HBaseStorage(base.StorageEngine): + """Put the data into a HBase database + + Collections: + + - user + - { _id: user id + source: [ array of source ids reporting for the user ] + } + - project + - { _id: project id + source: [ array of source ids reporting for the project ] + } + - meter + - the raw incoming data + - resource + - the metadata for resources + - { _id: uuid of resource, + metadata: metadata dictionaries + timestamp: datetime of last update + user_id: uuid + project_id: uuid + meter: [ array of {counter_name: string, counter_type: string} ] + } + """ + + OPTIONS = [ + cfg.StrOpt('table_prefix', + default='', + help='Database table prefix', + ), + ] + + def register_opts(self, conf): + """Register any configuration options used by this engine. + """ + conf.register_opts(self.OPTIONS) + + @staticmethod + def get_connection(conf): + """Return a Connection instance based on the configuration settings. + """ + return Connection(conf) + + +class Connection(base.Connection): + """HBase connection. + """ + + def __init__(self, conf): + ''' + Hbase Connection Initialization + ''' + opts = self._parse_connection_url(conf.database_connection) + opts['table_prefix'] = conf.table_prefix + self.conn = self._get_connection(opts) + self.conn.open() + self.project = self.conn.table('project') + self.user = self.conn.table('user') + self.resource = self.conn.table('resource') + self.meter = self.conn.table('meter') + + def upgrade(self, version=None): + pass + + def clear(self): + pass + + @staticmethod + def _get_connection(conf): + """Return a connection to the database. + + .. note:: + + The tests use a subclass to override this and return an + in-memory connection. + """ + LOG.debug('connecting to HBase on %s:%s', conf['host'], conf['port']) + return happybase.Connection(host=conf['host'], port=conf['port'], + table_prefix=conf['table_prefix']) + + @staticmethod + def _parse_connection_url(url): + """Parse connection parameters from a database url. + + .. note:: + + HBase Thrift does not support authentication and there is no + database name, so we are not looking for these in the url. + """ + opts = {} + result = urlparse(url) + opts['dbtype'] = result.scheme + if ':' in result.netloc: + opts['host'], port = result.netloc.split(':') + else: + opts['host'] = result.netloc + port = 9090 + opts['port'] = port and int(port) or 9090 + return opts + + def record_metering_data(self, data): + """Write the data to the backend storage system. + + :param data: a dictionary such as returned by + ceilometer.meter.meter_message_from_counter + """ + # Make sure we know about the user and project + if data['user_id']: + user = self.user.row(data['user_id']) + sources = _load_hbase_list(user, 's') + # Update if source is new + if data['source'] not in sources: + user['f:s_%s' % data['source']] = "1" + self.user.put(data['user_id'], user) + + project = self.project.row(data['project_id']) + sources = _load_hbase_list(project, 's') + # Update if source is new + if data['source'] not in sources: + project['f:s_%s' % data['source']] = "1" + self.project.put(data['project_id'], project) + + # Record the updated resource metadata. + received_timestamp = timeutils.utcnow() + + resource = self.resource.row(data['resource_id']) + new_meter = "%s!%s!%s" % ( + data['counter_name'], data['counter_type'], data['counter_unit']) + new_resource = {'f:resource_id': data['resource_id'], + 'f:project_id': data['project_id'], + 'f:user_id': data['user_id'], + 'f:timestamp': timeutils.strtime(data['timestamp']), + 'f:received_timestamp': timeutils.strtime( + received_timestamp), + 'f:metadata': json.dumps(data['resource_metadata']), + 'f:source': data["source"], + 'f:m_%s' % new_meter: "1", + } + # Update if resource has new information + if new_resource != resource: + meters = _load_hbase_list(resource, 'm') + if new_meter not in meters: + new_resource['f:m_%s' % new_meter] = "1" + + self.resource.put(data['resource_id'], new_resource) + + # Rowkey consists of reversed timestamp, meter and an md5 of + # user+resource+project for purposes of uniqueness + m = hashlib.md5() + m.update("%s%s%s" % (data['user_id'], data['resource_id'], + data['project_id'])) + + # We use reverse timestamps in rowkeys as they are sorted + # alphabetically. + rts = reverse_timestamp(data['timestamp']) + row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest()) + + # Convert timestamp to string as json.dumps won't + ts = timeutils.strtime(data['timestamp']) + + record = {'f:timestamp': ts, + 'f:counter_name': data['counter_name'], + 'f:counter_type': data['counter_type'], + 'f:counter_volume': str(data['counter_volume']), + 'f:counter_unit': data['counter_unit'], + # TODO(shengjie) consider using QualifierFilter + # keep dimensions as column qualifier for quicker look up + # TODO(shengjie) extra dimensions need to be added as CQ + 'f:user_id': data['user_id'], + 'f:project_id': data['project_id'], + 'f:resource_id': data['resource_id'], + 'f:source': data['source'], + # add in reversed_ts here for time range scan + 'f:rts': str(rts) + } + # Don't want to be changing the original data object + data = copy.copy(data) + data['timestamp'] = ts + # Save original event + record['f:message'] = json.dumps(data) + self.meter.put(row, record) + + def get_users(self, source=None): + """Return an iterable of user id strings. + + :param source: Optional source filter. + """ + LOG.debug("source: %s" % source) + scan_args = {} + if source: + scan_args['columns'] = ['f:s_%s' % source] + return sorted(key for key, ignored in self.user.scan(**scan_args)) + + def get_projects(self, source=None): + """Return an iterable of project id strings. + + :param source: Optional source filter. + """ + LOG.debug("source: %s" % source) + scan_args = {} + if source: + scan_args['columns'] = ['f:s_%s' % source] + return (key for key, ignored in self.project.scan(**scan_args)) + + def get_resources(self, user=None, project=None, source=None, + start_timestamp=None, end_timestamp=None, + metaquery={}): + """Return an iterable of dictionaries containing resource information. + + :type end_timestamp: object + { 'resource_id': UUID of the resource, + 'project_id': UUID of project owning the resource, + 'user_id': UUID of user owning the resource, + 'timestamp': UTC datetime of last update to the resource, + 'metadata': most current metadata for the resource, + 'meter': list of the meters reporting data for the resource, + } + + :param user: Optional ID for user that owns the resource. + :param project: Optional ID for project that owns the resource. + :param source: Optional source filter. + :param start_timestamp: Optional modified timestamp start range. + :param end_timestamp: Optional modified timestamp end range. + """ + q, start_row, end_row = make_query(user=user, + project=project, + source=source, + start=start_timestamp, + end=end_timestamp, + require_meter=False) + LOG.debug("q: %s" % q) + # TODO implement metaquery support + if len(metaquery) > 0: + raise NotImplementedError('metaquery not implemented') + + resource_ids = {} + if start_timestamp or end_timestamp: + # Look for resources matching the above criteria and with + # events in the time range we care about, then change the + # resource query to return just those resources by id. + g = self.meter.scan(filter=q, row_start=start_row, + row_stop=end_row) + for ignored, data in g: + resource_ids[data['f:resource_id']] = data['f:resource_id'] + + q = make_query(user=user, project=project, source=source, + query_only=True, require_meter=False) + LOG.debug("q: %s" % q) + for resource_id, data in self.resource.scan(filter=q): + if not resource_ids or resource_id in resource_ids: + r = {'resource_id': resource_id, + 'metadata': json.loads(data['f:metadata']), + 'project_id': data['f:project_id'], + 'received_timestamp': data['f:received_timestamp'], + 'source': data['f:source'], + 'timestamp': + timeutils.parse_strtime(data['f:timestamp']), + 'user_id': data['f:user_id'], + 'meter': []} + + for m in data: + if m.startswith('f:m_'): + name, type, unit = m[4:].split("!") + r['meter'].append({"counter_name": name, + "counter_type": type, + "counter_unit": unit}) + + yield r + + def get_meters(self, user=None, project=None, resource=None, source=None, + metaquery={}): + """Return an iterable of dictionaries containing meter information. + + { 'name': name of the meter, + 'type': type of the meter (guage, counter), + 'unit': unit of the meter, + 'resource_id': UUID of the resource, + 'project_id': UUID of project owning the resource, + 'user_id': UUID of user owning the resource, + } + + :param user: Optional ID for user that owns the resource. + :param project: Optional ID for project that owns the resource. + :param resource: Optional resource filter. + :param source: Optional source filter. + :param metaquery: Optional dict with metadata to match on. + """ + q, ignored, ignored = make_query(user=user, project=project, + resource=resource, source=source, + require_meter=False) + LOG.debug("q: %s" % q) + # TODO implement metaquery support + if len(metaquery) > 0: + raise NotImplementedError('metaquery not implemented') + + gen = self.resource.scan(filter=q) + + for ignored, data in gen: + # Meter columns are stored like this: + # "m_{counter_name}|{counter_type}|{counter_unit}" => "1" + # where 'm' is a prefix (m for meter), value is always set to 1 + meter = None + for m in data: + if m.startswith('f:m_'): + meter = m + break + if meter is None: + continue + name, type, unit = meter[4:].split("!") + m = {'name': name, + 'type': type, + 'unit': unit, + 'resource_id': data['f:resource_id'], + 'project_id': data['f:project_id'], + 'user_id': data['f:user_id'], + } + yield m + + def get_raw_events(self, event_filter): + """Return an iterable of raw event data as created by + :func:`ceilometer.meter.meter_message_from_counter`. + """ + q, start, stop = make_query_from_filter(event_filter, + require_meter=False) + LOG.debug("q: %s" % q) + + gen = self.meter.scan(filter=q, row_start=start, row_stop=stop) + meters = [] + for ignored, meter in gen: + meter = json.loads(meter['f:message']) + meter['timestamp'] = timeutils.parse_strtime(meter['timestamp']) + meters.append(meter) + return meters + + def _update_meter_stats(self, stat, meter): + """Do the stats calculation on a requested time bucket in stats dict + + :param stats: dict where aggregated stats are kept + :param index: time bucket index in stats + :param meter: meter record as returned from HBase + :param start_time: query start time + :param period: length of the time bucket + """ + vol = int(meter['f:counter_volume']) + ts = timeutils.parse_strtime(meter['f:timestamp']) + stat['min'] = min(vol, stat['min'] or vol) + stat['max'] = max(vol, stat['max']) + stat['sum'] = vol + (stat['sum'] or 0) + stat['count'] += 1 + stat['avg'] = (stat['sum'] / float(stat['count'])) + stat['duration_start'] = min(ts, stat['duration_start'] or ts) + stat['duration_end'] = max(ts, stat['duration_end'] or ts) + stat['duration'] = \ + timeutils.delta_seconds(stat['duration_start'], + stat['duration_end']) + + def get_meter_statistics(self, event_filter, period=None): + """Return a dictionary containing meter statistics. + described by the query parameters. + + The filter must have a meter value set. + + { 'min': + 'max': + 'avg': + 'sum': + 'count': + 'period': + 'period_start': + 'period_end': + 'duration': + 'duration_start': + 'duration_end': + } + + .. note:: + + Due to HBase limitations the aggregations are implemented + in the driver itself, therefore this method will be quite slow + because of all the Thrift traffic it is going to create. + """ + q, start, stop = make_query_from_filter(event_filter) + + meters = list(meter for (ignored, meter) in + self.meter.scan(filter=q, + row_start=start, + row_stop=stop) + ) + + start_time = event_filter.start \ + or timeutils.parse_strtime(meters[-1]['f:timestamp']) + end_time = event_filter.end \ + or timeutils.parse_strtime(meters[0]['f:timestamp']) + + results = [] + + if not period: + period = 0 + period_start = start_time + period_end = end_time + + # As our HBase meters are stored as newest-first, we need to iterate + # in the reverse order + for meter in meters[::-1]: + ts = timeutils.parse_strtime(meter['f:timestamp']) + if period: + offset = int(timeutils.delta_seconds( + start_time, ts) / period) * period + period_start = start_time + datetime.timedelta(0, offset) + + if not len(results) or not results[-1]['period_start'] == \ + period_start: + if period: + period_end = period_start + datetime.timedelta( + 0, period) + results.append({'count': 0, + 'min': 0, + 'max': 0, + 'avg': 0, + 'sum': 0, + 'period': period, + 'period_start': period_start, + 'period_end': period_end, + 'duration': None, + 'duration_start': None, + 'duration_end': None, + }) + self._update_meter_stats(results[-1], meter) + return list(results) + + def get_volume_sum(self, event_filter): + """Return the sum of the volume field for the events + described by the query parameters. + """ + q, start, stop = make_query_from_filter(event_filter) + LOG.debug("q: %s" % q) + gen = self.meter.scan(filter=q, row_start=start, row_stop=stop) + results = defaultdict(int) + for ignored, meter in gen: + results[meter['f:resource_id']] \ + += int(meter['f:counter_volume']) + + return ({'resource_id': k, 'value': v} + for (k, v) in results.iteritems()) + + def get_volume_max(self, event_filter): + """Return the maximum of the volume field for the events + described by the query parameters. + """ + + q, start, stop = make_query_from_filter(event_filter) + LOG.debug("q: %s" % q) + gen = self.meter.scan(filter=q, row_start=start, row_stop=stop) + results = defaultdict(int) + for ignored, meter in gen: + results[meter['f:resource_id']] = \ + max(results[meter['f:resource_id']], + int(meter['f:counter_volume'])) + return ({'resource_id': k, 'value': v} + for (k, v) in results.iteritems()) + + def get_event_interval(self, event_filter): + """Return the min and max timestamps from events, + using the event_filter to limit the events seen. + + ( datetime.datetime(), datetime.datetime() ) + """ + q, start, stop = make_query_from_filter(event_filter) + LOG.debug("q: %s" % q) + gen = self.meter.scan(filter=q, row_start=start, row_stop=stop) + a_min = None + a_max = None + for ignored, meter in gen: + timestamp = timeutils.parse_strtime(meter['f:timestamp']) + if a_min is None: + a_min = timestamp + else: + if timestamp < a_min: + a_min = timestamp + if a_max is None: + a_max = timestamp + else: + if timestamp > a_max: + a_max = timestamp + + return a_min, a_max + + +################################################# +# Here be various HBase helpers +def reverse_timestamp(dt): + """Reverse timestamp so that newer timestamps are represented by smaller + numbers than older ones. + + Reverse timestamps is a technique used in HBase rowkey design. When period + queries are required the HBase rowkeys must include timestamps, but as + rowkeys in HBase are ordered lexicographically, the timestamps must be + reversed. + """ + epoch = datetime.datetime(1970, 1, 1) + td = dt - epoch + ts = (td.microseconds + + (td.seconds + td.days * 24 * 3600) * 100000) / 100000 + return 0x7fffffffffffffff - ts + + +def make_query(user=None, project=None, meter=None, + resource=None, source=None, start=None, end=None, + require_meter=True, query_only=False): + """Return a filter query based on the selected parameters. + :param user: Optional user-id + :param project: Optional project-id + :param meter: Optional counter-name + :param resource: Optional resource-id + :param source: Optional source-id + :param start: Optional start timestamp + :param end: Optional end timestamp + :param require_meter: If true and the filter does not have a meter, + raise an error. + :param query_only: If true only returns the filter query, + otherwise also returns start and stop rowkeys + """ + q = [] + + if user: + q.append("SingleColumnValueFilter ('f', 'user_id', =, 'binary:%s')" + % user) + if project: + q.append("SingleColumnValueFilter ('f', 'project_id', =, 'binary:%s')" + % project) + if resource: + q.append("SingleColumnValueFilter ('f', 'resource_id', =, 'binary:%s')" + % resource) + if source: + q.append("SingleColumnValueFilter " + "('f', 'source', =, 'binary:%s')" % source) + # when start_time and end_time is provided, + # if it's filtered by meter, + # rowkey will be used in the query; + # if it's non meter filter query(eg. project_id, user_id etc), + # SingleColumnValueFilter against rts will be appended to the query + # query other tables should have no start and end passed in + stopRow, startRow = "", "" + rts_start = str(reverse_timestamp(start) + 1) if start else "" + rts_end = str(reverse_timestamp(end) + 1) if end else "" + + if meter: + # if it's meter filter without start and end, + # startRow = meter while stopRow = meter + MAX_BYTE + if not rts_start: + rts_start = chr(127) + stopRow = "%s_%s" % (meter, rts_start) + startRow = "%s_%s" % (meter, rts_end) + elif require_meter: + raise RuntimeError('Missing required meter specifier') + else: + if rts_start: + q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" % + rts_start) + if rts_end: + q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" % + rts_end) + + query_filter = None + if len(q): + query_filter = " AND ".join(q) + if query_only: + return query_filter + else: + return query_filter, startRow, stopRow + + +def make_query_from_filter(event_filter, require_meter=True): + """Return a query dictionary based on the settings in the filter. + + :param filter: EventFilter instance + :param require_meter: If true and the filter does not have a meter, + raise an error. + """ + if event_filter.metaquery is not None and len(event_filter.metaquery) > 0: + raise NotImplementedError('metaquery not implemented') + + return make_query(event_filter.user, event_filter.project, + event_filter.meter, event_filter.resource, + event_filter.source, event_filter.start, + event_filter.end, require_meter) + + +def _load_hbase_list(d, prefix): + """Deserialise dict stored as HBase column family + """ + ret = [] + prefix = 'f:%s_' % prefix + for key in (k for k in d if k.startswith(prefix)): + ret.append(key[len(prefix):]) + return ret diff --git a/setup.py b/setup.py index 2954d548be..727f366416 100755 --- a/setup.py +++ b/setup.py @@ -129,6 +129,7 @@ setuptools.setup( postgresql = ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage sqlite = ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage test = ceilometer.storage.impl_test:TestDBStorage + hbase = ceilometer.storage.impl_hbase:HBaseStorage [ceilometer.compute.virt] libvirt = ceilometer.compute.virt.libvirt.inspector:LibvirtInspector diff --git a/tests/storage/test_impl_hbase.py b/tests/storage/test_impl_hbase.py new file mode 100644 index 0000000000..e83e8dfb99 --- /dev/null +++ b/tests/storage/test_impl_hbase.py @@ -0,0 +1,291 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2012, 2013 Dell Inc. +# +# Author: Stas Maksimov +# Author: Shengjie Min +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer/storage/impl_hbase.py + +.. note:: + To run the tests using in-memory mocked HappyBase API, + set the environment variable CEILOMETER_TEST_LIVE=0 (this is the default + value) + + In order to run the tests against real HBase server set the environment + variable CEILOMETER_TEST_LIVE=1 and set HBASE_URL below to + point to that HBase instance before running the tests. Make sure the Thrift + server is running on that server. + +""" + +from time import sleep +import logging + +import os +import copy +import re + +from oslo.config import cfg + +from tests.storage import base +from ceilometer.storage import impl_hbase + +from ceilometer.storage.impl_hbase import _load_hbase_list + +LOG = logging.getLogger(__name__) + +CEILOMETER_TEST_LIVE = bool(int(os.environ.get('CEILOMETER_TEST_LIVE', 0))) + +# Export this variable before running tests against real HBase +# e.g. export CEILOMETER_TEST_HBASE_URL = hbase://192.168.1.100:9090 +CEILOMETER_TEST_HBASE_URL = os.environ.get('CEILOMETER_TEST_HBASE_URL') +if CEILOMETER_TEST_LIVE: + if not CEILOMETER_TEST_HBASE_URL: + raise RuntimeError("CEILOMETER_TEST_LIVE is on, but " + "CEILOMETER_TEST_HBASE_URL is not defined") +PROJECT_TABLE = "project" +USER_TABLE = "user" +RESOURCE_TABLE = "resource" +METER_TABLE = "meter" + +TABLES = [PROJECT_TABLE, USER_TABLE, RESOURCE_TABLE, METER_TABLE] + + +class TestConnection(impl_hbase.Connection): + + def __init__(self, conf): + if CEILOMETER_TEST_LIVE: + super(TestConnection, self).__init__(conf) + else: + self.conn = MConnection() + self.project = self.conn.table('project') + self.user = self.conn.table('user') + self.resource = self.conn.table('resource') + self.meter = self.conn.table('meter') + + def create_schema(self): + LOG.debug('Creating HBase schema...') + self.conn.create_table(PROJECT_TABLE, {'f': dict()}) + self.conn.create_table(USER_TABLE, {'f': dict()}) + self.conn.create_table(RESOURCE_TABLE, {'f': dict()}) + self.conn.create_table(METER_TABLE, {'f': dict()}) + # Real HBase needs some time to propagate create_table changes + if CEILOMETER_TEST_LIVE: + sleep(10) + + def drop_schema(self): + LOG.debug('Dropping HBase schema...') + for table in TABLES: + try: + self.conn.disable_table(table) + except: + None + try: + self.conn.delete_table(table) + except: + None + # Real HBase needs some time to propagate delete_table changes + if CEILOMETER_TEST_LIVE: + sleep(10) + + +class HBaseEngine(base.DBEngineBase): + + def get_connection(self): + self.conf = cfg.CONF + + self.conf.database_connection = CEILOMETER_TEST_HBASE_URL + # use prefix so we don't affect any existing tables + self.conf.table_prefix = 't' + + self.conn = TestConnection(self.conf) + + self.conn.drop_schema() + self.conn.create_schema() + + self.conn.upgrade() + return self.conn + + def clean_up(self): + pass + + def get_sources_by_project_id(self, id): + project = self.conn.project.row(id) + return _load_hbase_list(project, 's') + + def get_sources_by_user_id(self, id): + user = self.conn.user.row(id) + return _load_hbase_list(user, 's') + + +class HBaseEngineTestBase(base.DBTestBase): + + def get_engine(cls): + return HBaseEngine() + + +class UserTest(base.UserTest, HBaseEngineTestBase): + pass + + +class ProjectTest(base.ProjectTest, HBaseEngineTestBase): + pass + + +class ResourceTest(base.ResourceTest, HBaseEngineTestBase): + pass + + +class MeterTest(base.MeterTest, HBaseEngineTestBase): + pass + + +class RawEventTest(base.RawEventTest, HBaseEngineTestBase): + pass + + +class TestGetEventInterval(base.TestGetEventInterval, + HBaseEngineTestBase): + pass + + +class SumTest(base.SumTest, HBaseEngineTestBase): + pass + + +class MaxProjectTest(base.MaxProjectTest, HBaseEngineTestBase): + pass + + +class MaxResourceTest(base.MaxResourceTest, HBaseEngineTestBase): + pass + + +class StatisticsTest(base.StatisticsTest, HBaseEngineTestBase): + pass + + +############### +# This is a very crude version of "in-memory HBase", which implements just +# enough functionality of HappyBase API to support testing of our driver. +# +class MTable(): + """HappyBase.Table mock + """ + def __init__(self, name, families): + self.name = name + self.families = families + self.rows = {} + + def row(self, key): + return self.rows[key] if key in self.rows else {} + + def put(self, key, data): + self.rows[key] = data + + def scan(self, filter=None, columns=[], row_start=None, row_stop=None): + sorted_keys = sorted(self.rows) + # copy data into a sorted dict + rows = {} + for row in sorted_keys: + if row_start: + if row < row_start: + continue + if row_stop: + if row > row_stop: + break + rows[row] = copy.copy(self.rows[row]) + if columns: + ret = {} + for row in rows.keys(): + data = rows[row] + for key in data: + # if all(key in columns for key in data): + if key in columns: + ret[row] = data + rows = ret + elif filter: + # TODO: we should really parse this properly, but at the moment we + # are only going to support AND here + filters = filter.split('AND') + for f in filters: + # Extract filter name and its arguments + g = re.search("(.*)\((.*),?\)", f) + fname = g.group(1).strip() + fargs = [s.strip().replace('\'', '').replace('\"', '') + for s in g.group(2).split(',')] + m = getattr(self, fname) + if callable(m): + # overwrite rows for filtering to take effect + # in case of multiple filters + rows = m(fargs, rows) + else: + raise NotImplementedError("%s filter is not implemented, " + "you may want to add it!") + for k in sorted(rows): + yield k, rows[k] + + def SingleColumnValueFilter(self, args, rows): + """This method is called from scan() when 'SingleColumnValueFilter' + is found in the 'filter' argument + """ + op = args[2] + column = "%s:%s" % (args[0], args[1]) + value = args[3] + if value.startswith('binary:'): + value = value[7:] + r = {} + for row in rows: + data = rows[row] + + if op == '=': + if column in data and data[column] == value: + r[row] = data + elif op == '<=': + if column in data and data[column] <= value: + r[row] = data + elif op == '>=': + if column in data and data[column] >= value: + r[row] = data + else: + raise NotImplementedError("In-memory " + "SingleColumnValueFilter " + "doesn't support the %s operation " + "yet" % op) + return r + + +class MConnection(): + """HappyBase.Connection mock + """ + def __init__(self): + self.tables = {} + + def open(self): + LOG.debug("Opening in-memory HBase connection") + return + + def create_table(self, n, families={}): + if n in self.tables: + return self.tables[n] + t = MTable(n, families) + self.tables[n] = t + return t + + def delete_table(self, name, use_prefix=True): + self.tables.remove(self.tables[name]) + + def table(self, name): + return self.create_table(name) diff --git a/tools/pip-requires b/tools/pip-requires index 07345200bd..1393a4806c 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -22,3 +22,4 @@ extras wsme>=0.5b1 pyyaml http://tarballs.openstack.org/oslo-config/oslo-config-2013.1b4.tar.gz#egg=oslo-config +happybase>=0.4