diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index 25be26059d..424819c366 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -48,43 +48,64 @@ class HBaseStorage(base.StorageEngine): Collections: - user - - { _id: user id - s_source_name: each source reported for user is stored with prefix s_ - the value of each entry is '1' - sources: this field contains the first source reported for user. - This data is not used but stored for simplification of impl - } + - row_key: user_id + - Column Families: + f: contains all sources with 's' prefix + - project - - { _id: project id - s_source_name: the same as for users - sources: the same as for users - } - - meter - - {_id_reverted_ts: row key is constructed in this way for efficient - filtering - parsed_info_from_incoming_data: e.g. counter_name, counter_type - resource_metadata: raw metadata for corresponding resource - r_metadata_name: flattened metadata for corresponding resource - message: raw incoming data - recorded_at: when the sample has been recorded - source: source for the sample - } + - row_key: project_id + - Column Families: + f: contains all sources with 's' prefix + + - meter (describes sample actually) + - row-key: consists of reversed timestamp, meter and an md5 of + user+resource+project for purposes of uniqueness + - Column Families: + f: contains the following qualifiers: + -counter_name : + -counter_type : + -counter_unit : + -counter_volume : + -message: + -message_id: + -message_signature: + -resource_metadata: raw metadata for corresponding resource + of the meter + -project_id: + -resource_id: + -user_id: + -recorded_at: + -flattened metadata with prefix r_metadata. e.g. + f:r_metadata.display_name or f:r_metadata.tag + -rts: + -timestamp: + -source for meter with prefix 's' + - resource - - the metadata for resources - - { _id: uuid of resource, - metadata: raw metadata dictionaries - r_metadata: flattened metadata fir quick filtering - timestamp: datetime of last update - user_id: uuid - project_id: uuid - meter: [ array of {counter_name: string, counter_type: string} ] - source: source of resource - } + - row_key: uuid of resource + - Column Families: + f: contains the following qualifiers: + -resource_metadata: raw metadata for corresponding resource + -project_id: + -resource_id: + -user_id: + -flattened metadata with prefix r_metadata. e.g. + f:r_metadata.display_name or f:r_metadata.tag + -sources for all corresponding meters with prefix 's' + -all meters for this resource in format + "%s!%s!%s+%s" % (counter_name, counter_type, counter_unit, + source) + - alarm - - the raw incoming alarm data + - row_key: uuid of alarm + - Column Families: + f: contains the raw incoming alarm data + - alarm_h - - raw incoming alarm_history data. Timestamp becomes now() - if not determined + - row_key: uuid of alarm + "_" + reversed timestamp + - Column Families: + f: raw incoming alarm_history data. Timestamp becomes now() + if not determined """ @staticmethod @@ -143,10 +164,10 @@ class Connection(base.Connection): def upgrade(self): with self.conn_pool.connection() as conn: - conn.create_table(self.PROJECT_TABLE, {'f': dict()}) - conn.create_table(self.USER_TABLE, {'f': dict()}) - conn.create_table(self.RESOURCE_TABLE, {'f': dict()}) - conn.create_table(self.METER_TABLE, {'f': dict()}) + conn.create_table(self.PROJECT_TABLE, {'f': dict(max_versions=1)}) + conn.create_table(self.USER_TABLE, {'f': dict(max_versions=1)}) + conn.create_table(self.RESOURCE_TABLE, {'f': dict(max_versions=1)}) + conn.create_table(self.METER_TABLE, {'f': dict(max_versions=1)}) conn.create_table(self.ALARM_TABLE, {'f': dict()}) conn.create_table(self.ALARM_HISTORY_TABLE, {'f': dict()}) @@ -284,35 +305,28 @@ class Connection(base.Connection): resource_table = conn.table(self.RESOURCE_TABLE) meter_table = conn.table(self.METER_TABLE) - # Make sure we know about the user and project if data['user_id']: - self._update_sources(user_table, data['user_id'], - data['source']) - self._update_sources(project_table, data['project_id'], - data['source']) + user_table.put(data['user_id'], serialize_entry( + **{'source': data['source']})) + + project_table.put(data['project_id'], serialize_entry( + **{'source': data['source']}) + ) - # Get metadata from user's data resource_metadata = data.get('resource_metadata', {}) # Determine the name of new meter new_meter = _format_meter_reference( data['counter_name'], data['counter_type'], - data['counter_unit']) - flatten_result, sources, meters, metadata = \ - deserialize_entry(resource_table.row(data['resource_id'])) - - # Update if resource has new information - if (data['source'] not in sources) or ( - new_meter not in meters) or ( - metadata != resource_metadata): - resource_table.put(data['resource_id'], - serialize_entry( - **{'sources': [data['source']], - 'meters': [new_meter], - 'metadata': resource_metadata, - 'resource_id': data['resource_id'], - 'project_id': data['project_id'], - 'user_id': data['user_id']})) + data['counter_unit'], data['source']) + #TODO(nprivalova): try not to store resource_id + resource = serialize_entry(**{ + 'source': data['source'], 'meter': new_meter, + 'resource_metadata': resource_metadata, + 'resource_id': data['resource_id'], + 'project_id': data['project_id'], 'user_id': data['user_id']}) + resource_table.put(data['resource_id'], resource) + #TODO(nprivalova): improve uniqueness # Rowkey consists of reversed timestamp, meter and an md5 of # user+resource+project for purposes of uniqueness m = hashlib.md5() @@ -323,19 +337,13 @@ class Connection(base.Connection): # alphabetically. rts = reverse_timestamp(data['timestamp']) row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest()) - record = serialize_entry(data, **{'metadata': resource_metadata, + record = serialize_entry(data, **{'source': data['source'], 'rts': rts, 'message': data, 'recorded_at': timeutils.utcnow( )}) meter_table.put(row, record) - def _update_sources(self, table, id, source): - user, sources, _, _ = deserialize_entry(table.row(id)) - if source not in sources: - sources.append(source) - table.put(id, serialize_entry(user, **{'sources': sources})) - def get_users(self, source=None): """Return an iterable of user id strings. @@ -343,7 +351,7 @@ class Connection(base.Connection): """ with self.conn_pool.connection() as conn: user_table = conn.table(self.USER_TABLE) - LOG.debug(_("source: %s") % source) + LOG.debug(_("Query User table: source=%s") % source) scan_args = {} if source: scan_args['columns'] = ['f:s_%s' % source] @@ -356,7 +364,7 @@ class Connection(base.Connection): """ with self.conn_pool.connection() as conn: project_table = conn.table(self.PROJECT_TABLE) - LOG.debug(_("source: %s") % source) + LOG.debug(_("Query Project table: source=%s") % source) scan_args = {} if source: scan_args['columns'] = ['f:s_%s' % source] @@ -389,7 +397,6 @@ class Connection(base.Connection): resource=resource, source=source, metaquery=metaquery) q, start_row, stop_row = make_sample_query_from_filter( sample_filter, require_meter=False) - with self.conn_pool.connection() as conn: meter_table = conn.table(self.METER_TABLE) LOG.debug(_("Query Meter table: %s") % q) @@ -406,8 +413,9 @@ class Connection(base.Connection): meters = sorted(d_meters, key=_resource_id_from_record_tuple) for resource_id, r_meters in itertools.groupby( meters, key=_resource_id_from_record_tuple): - # We need deserialized entry(data[0]) and metadata(data[3]) - meter_rows = [(data[0], data[3]) for data in sorted( + # We need deserialized entry(data[0]), sources (data[1]) and + # metadata(data[3]) + meter_rows = [(data[0], data[1], data[3]) for data in sorted( r_meters, key=_timestamp_from_record_tuple)] latest_data = meter_rows[-1] min_ts = meter_rows[0][0]['timestamp'] @@ -417,9 +425,9 @@ class Connection(base.Connection): first_sample_timestamp=min_ts, last_sample_timestamp=max_ts, project_id=latest_data[0]['project_id'], - source=latest_data[0]['source'], + source=latest_data[1][0], user_id=latest_data[0]['user_id'], - metadata=latest_data[1], + metadata=latest_data[2], ) def get_meters(self, user=None, project=None, resource=None, source=None, @@ -444,23 +452,29 @@ class Connection(base.Connection): LOG.debug(_("Query Resource table: %s") % q) gen = resource_table.scan(filter=q) - + # We need result set to be sure that user doesn't receive several + # same meters. Please see bug + # https://bugs.launchpad.net/ceilometer/+bug/1301371 + result = set() for ignored, data in gen: - flatten_result, s, m, md = deserialize_entry(data) - if not m: - continue - # Meter table may have only one "meter" and "source". That's - # why only first lists element is get in this method - name, type, unit = m[0].split("!") - yield models.Meter( - name=name, - type=type, - unit=unit, - resource_id=flatten_result['resource_id'], - project_id=flatten_result['project_id'], - source=s[0] if s else None, - user_id=flatten_result['user_id'], - ) + flatten_result, s, meters, md = deserialize_entry(data) + for m in meters: + meter_raw, m_source = m.split("+") + name, type, unit = meter_raw.split('!') + meter_dict = {'name': name, + 'type': type, + 'unit': unit, + 'resource_id': flatten_result['resource_id'], + 'project_id': flatten_result['project_id'], + 'user_id': flatten_result['user_id']} + frozen_meter = frozenset(meter_dict.items()) + if frozen_meter in result: + continue + result.add(frozen_meter) + meter_dict.update({'source': + m_source if m_source else None}) + + yield models.Meter(**meter_dict) def get_samples(self, sample_filter, limit=None): """Return an iterable of models.Sample instances. @@ -609,7 +623,10 @@ class MTable(object): return ((k, self.row(k)) for k in keys) def put(self, key, data): - self._rows[key] = data + if key not in self._rows: + self._rows[key] = data + else: + self._rows[key].update(data) def delete(self, key): del self._rows[key] @@ -799,9 +816,14 @@ def make_query(metaquery=None, **kwargs): # found in table. for key, value in kwargs.items(): if value is not None: - q.append("SingleColumnValueFilter " - "('f', '%s', =, 'binary:%s', true, true)" % - (key, dump(value))) + if key == 'source': + q.append("SingleColumnValueFilter " + "('f', 's_%s', =, 'binary:%s', true, true)" % + (value, dump('1'))) + else: + q.append("SingleColumnValueFilter " + "('f', '%s', =, 'binary:%s', true, true)" % + (key, dump(value))) res_q = None if len(q): res_q = " AND ".join(q) @@ -869,10 +891,10 @@ def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None): return start_row, end_row -def _format_meter_reference(counter_name, counter_type, counter_unit): +def _format_meter_reference(counter_name, counter_type, counter_unit, source): """Format reference to meter data. """ - return "%s!%s!%s" % (counter_name, counter_type, counter_unit) + return "%s!%s!%s+%s" % (counter_name, counter_type, counter_unit, source) def _timestamp_from_record_tuple(record): @@ -906,14 +928,14 @@ def deserialize_entry(entry, get_raw_meta=True): for k, v in entry.items(): if k.startswith('f:s_'): sources.append(k[4:]) - elif k.startswith('f:m_'): - meters.append(k[4:]) elif k.startswith('f:r_metadata.'): metadata_flattened[k[len('f:r_metadata.'):]] = load(v) + elif k.startswith('f:m_'): + meters.append(k[4:]) else: flatten_result[k[2:]] = load(v) if get_raw_meta: - metadata = flatten_result.get('metadata', {}) + metadata = flatten_result.get('resource_metadata', {}) else: metadata = metadata_flattened @@ -931,28 +953,24 @@ def serialize_entry(data={}, **kwargs): result = {} for k, v in entry_dict.items(): - if k == 'sources': - # user and project tables may contain several sources and meters - # that's why we store it separately as pairs "source/meter name:1". - # Resource and meter table contain only one and it's possible - # to store pairs like "source/meter:source name/meter name". But to - # keep things simple it's possible to store all variants in all - # tables because it doesn't break logic and overhead is not too big - for source in v: - result['f:s_%s' % source] = dump('1') - if v: - result['f:source'] = dump(v[0]) - elif k == 'meters': - for meter in v: - result['f:m_%s' % meter] = dump('1') - elif k == 'metadata': + if k == 'source': + # user, project and resource tables may contain several sources. + # Besides, resource table may contain several meters. + # To make insertion safe we need to store all meters and sources in + # a separate cell. For this purpose s_ and m_ prefixes are + # introduced. + result['f:s_%s' % v] = dump('1') + + elif k == 'meter': + result['f:m_%s' % v] = dump('1') + elif k == 'resource_metadata': # keep raw metadata as well as flattened to provide # capability with API v2. It will be flattened in another # way on API level. But we need flattened too for quick filtering. flattened_meta = dump_metadata(v) for k, m in flattened_meta.items(): result['f:r_metadata.' + k] = dump(m) - result['f:metadata'] = dump(v) + result['f:resource_metadata'] = dump(v) else: result['f:' + k] = dump(v) return result diff --git a/ceilometer/tests/api/v2/test_list_meters_scenarios.py b/ceilometer/tests/api/v2/test_list_meters_scenarios.py index e8075c8a2c..e41cd4bfc7 100644 --- a/ceilometer/tests/api/v2/test_list_meters_scenarios.py +++ b/ceilometer/tests/api/v2/test_list_meters_scenarios.py @@ -132,6 +132,22 @@ class TestListMeters(FunctionalTest, 'util': 0.75, 'is_public': False}, source='test_source'), + sample.Sample( + 'meter.test.new', + 'cumulative', + '', + 1, + 'user-id', + 'project-id', + 'resource-id', + timestamp=datetime.datetime(2012, 7, 2, 10, 40), + resource_metadata={'display_name': 'test-server', + 'tag': 'self.sample3', + 'size': 0, + 'util': 0.75, + 'is_public': False}, + source='test_source'), + sample.Sample( 'meter.mine', 'gauge', @@ -160,13 +176,13 @@ class TestListMeters(FunctionalTest, def test_list_meters(self): data = self.get_json('/meters') - self.assertEqual(4, len(data)) + self.assertEqual(5, len(data)) self.assertEqual(set(['resource-id', 'resource-id2', 'resource-id3', 'resource-id4']), set(r['resource_id'] for r in data)) - self.assertEqual(set(['meter.test', 'meter.mine']), + self.assertEqual(set(['meter.test', 'meter.mine', 'meter.test.new']), set(r['name'] for r in data)) self.assertEqual(set(['test_source', 'test_source1']), set(r['source'] for r in data)) @@ -187,7 +203,7 @@ class TestListMeters(FunctionalTest, def test_list_samples(self): data = self.get_json('/samples') - self.assertEqual(5, len(data)) + self.assertEqual(6, len(data)) def test_query_samples_with_invalid_field_name_and_non_eq_operator(self): resp = self.get_json('/samples', @@ -459,7 +475,7 @@ class TestListMeters(FunctionalTest, 'value': 'resource-id', }]) nids = set(r['name'] for r in data) - self.assertEqual(set(['meter.test']), nids) + self.assertEqual(set(['meter.test', 'meter.test.new']), nids) sids = set(r['source'] for r in data) self.assertEqual(set(['test_source']), sids) @@ -540,7 +556,8 @@ class TestListMeters(FunctionalTest, self.assertEqual(set(['user-id']), uids) nids = set(r['name'] for r in data) - self.assertEqual(set(['meter.mine', 'meter.test']), nids) + self.assertEqual(set(['meter.mine', 'meter.test', 'meter.test.new']), + nids) rids = set(r['resource_id'] for r in data) self.assertEqual(set(['resource-id', 'resource-id2']), rids)