Make entities (Resource, User, Project) able to store lists

When one Resource contains several meters we should store all of them.
The same is true about User, Project and Resource regarding sources.
To make it possible it is needed to change the way we store data in Hbase.

Now Resource, User and Project may contain several sources. Besides, Resource may contain several meters.
To store all of them we use ColumnFamily f and columns m_meters and s_sources.
All meters stored as JSON-ed list.
E.g.
resource_1: {f: {meters: [meter1, meter2, meter3]} }
The same for users and projects:
user_1: {f: [sources: [source_1, source_2]}

We cannot update these lists safely.

In this commit m_meters and s_sources are changed to m_{unique_meter_structure} and s_{unique_source_name}.
So we getting rid of 'lists' and may use safe put() method.
All meters in Resource and sources in User, Project, Resource are
stored with stub value=1 to make filtering simpler.

New ColumnFamilies are not introdused here because CF is a 'namespace' and make sence in case when
there is a lot of requests only for this CF. But it's not the case for Ceilometer: each request may
contain _id field or metadata filter. It's better to store all this info in one CF.

Closes bug 1288284

Change-Id: I5814202e3d59fd29f96c8734e445367f766e6a4a
This commit is contained in:
Nadya Privalova 2014-04-02 17:44:04 +04:00
parent a404a2b151
commit b938f10c32
2 changed files with 154 additions and 119 deletions

View File

@ -48,43 +48,64 @@ class HBaseStorage(base.StorageEngine):
Collections:
- user
- { _id: user id
s_source_name: each source reported for user is stored with prefix s_
the value of each entry is '1'
sources: this field contains the first source reported for user.
This data is not used but stored for simplification of impl
}
- row_key: user_id
- Column Families:
f: contains all sources with 's' prefix
- project
- { _id: project id
s_source_name: the same as for users
sources: the same as for users
}
- meter
- {_id_reverted_ts: row key is constructed in this way for efficient
filtering
parsed_info_from_incoming_data: e.g. counter_name, counter_type
resource_metadata: raw metadata for corresponding resource
r_metadata_name: flattened metadata for corresponding resource
message: raw incoming data
recorded_at: when the sample has been recorded
source: source for the sample
}
- row_key: project_id
- Column Families:
f: contains all sources with 's' prefix
- meter (describes sample actually)
- row-key: consists of reversed timestamp, meter and an md5 of
user+resource+project for purposes of uniqueness
- Column Families:
f: contains the following qualifiers:
-counter_name : <name of counter>
-counter_type : <type of counter>
-counter_unit : <unit of counter>
-counter_volume : <volume of counter>
-message: <raw incoming data>
-message_id: <id of message>
-message_signature: <signature of message>
-resource_metadata: raw metadata for corresponding resource
of the meter
-project_id: <id of project>
-resource_id: <id of resource>
-user_id: <id of user>
-recorded_at: <datetime when sample has been recorded (utc.now)>
-flattened metadata with prefix r_metadata. e.g.
f:r_metadata.display_name or f:r_metadata.tag
-rts: <reversed timestamp of entry>
-timestamp: <meter's timestamp (came from message)>
-source for meter with prefix 's'
- resource
- the metadata for resources
- { _id: uuid of resource,
metadata: raw metadata dictionaries
r_metadata: flattened metadata fir quick filtering
timestamp: datetime of last update
user_id: uuid
project_id: uuid
meter: [ array of {counter_name: string, counter_type: string} ]
source: source of resource
}
- row_key: uuid of resource
- Column Families:
f: contains the following qualifiers:
-resource_metadata: raw metadata for corresponding resource
-project_id: <id of project>
-resource_id: <id of resource>
-user_id: <id of user>
-flattened metadata with prefix r_metadata. e.g.
f:r_metadata.display_name or f:r_metadata.tag
-sources for all corresponding meters with prefix 's'
-all meters for this resource in format
"%s!%s!%s+%s" % (counter_name, counter_type, counter_unit,
source)
- alarm
- the raw incoming alarm data
- row_key: uuid of alarm
- Column Families:
f: contains the raw incoming alarm data
- alarm_h
- raw incoming alarm_history data. Timestamp becomes now()
if not determined
- row_key: uuid of alarm + "_" + reversed timestamp
- Column Families:
f: raw incoming alarm_history data. Timestamp becomes now()
if not determined
"""
@staticmethod
@ -143,10 +164,10 @@ class Connection(base.Connection):
def upgrade(self):
with self.conn_pool.connection() as conn:
conn.create_table(self.PROJECT_TABLE, {'f': dict()})
conn.create_table(self.USER_TABLE, {'f': dict()})
conn.create_table(self.RESOURCE_TABLE, {'f': dict()})
conn.create_table(self.METER_TABLE, {'f': dict()})
conn.create_table(self.PROJECT_TABLE, {'f': dict(max_versions=1)})
conn.create_table(self.USER_TABLE, {'f': dict(max_versions=1)})
conn.create_table(self.RESOURCE_TABLE, {'f': dict(max_versions=1)})
conn.create_table(self.METER_TABLE, {'f': dict(max_versions=1)})
conn.create_table(self.ALARM_TABLE, {'f': dict()})
conn.create_table(self.ALARM_HISTORY_TABLE, {'f': dict()})
@ -284,35 +305,28 @@ class Connection(base.Connection):
resource_table = conn.table(self.RESOURCE_TABLE)
meter_table = conn.table(self.METER_TABLE)
# Make sure we know about the user and project
if data['user_id']:
self._update_sources(user_table, data['user_id'],
data['source'])
self._update_sources(project_table, data['project_id'],
data['source'])
user_table.put(data['user_id'], serialize_entry(
**{'source': data['source']}))
project_table.put(data['project_id'], serialize_entry(
**{'source': data['source']})
)
# Get metadata from user's data
resource_metadata = data.get('resource_metadata', {})
# Determine the name of new meter
new_meter = _format_meter_reference(
data['counter_name'], data['counter_type'],
data['counter_unit'])
flatten_result, sources, meters, metadata = \
deserialize_entry(resource_table.row(data['resource_id']))
# Update if resource has new information
if (data['source'] not in sources) or (
new_meter not in meters) or (
metadata != resource_metadata):
resource_table.put(data['resource_id'],
serialize_entry(
**{'sources': [data['source']],
'meters': [new_meter],
'metadata': resource_metadata,
'resource_id': data['resource_id'],
'project_id': data['project_id'],
'user_id': data['user_id']}))
data['counter_unit'], data['source'])
#TODO(nprivalova): try not to store resource_id
resource = serialize_entry(**{
'source': data['source'], 'meter': new_meter,
'resource_metadata': resource_metadata,
'resource_id': data['resource_id'],
'project_id': data['project_id'], 'user_id': data['user_id']})
resource_table.put(data['resource_id'], resource)
#TODO(nprivalova): improve uniqueness
# Rowkey consists of reversed timestamp, meter and an md5 of
# user+resource+project for purposes of uniqueness
m = hashlib.md5()
@ -323,19 +337,13 @@ class Connection(base.Connection):
# alphabetically.
rts = reverse_timestamp(data['timestamp'])
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
record = serialize_entry(data, **{'metadata': resource_metadata,
record = serialize_entry(data, **{'source': data['source'],
'rts': rts,
'message': data,
'recorded_at': timeutils.utcnow(
)})
meter_table.put(row, record)
def _update_sources(self, table, id, source):
user, sources, _, _ = deserialize_entry(table.row(id))
if source not in sources:
sources.append(source)
table.put(id, serialize_entry(user, **{'sources': sources}))
def get_users(self, source=None):
"""Return an iterable of user id strings.
@ -343,7 +351,7 @@ class Connection(base.Connection):
"""
with self.conn_pool.connection() as conn:
user_table = conn.table(self.USER_TABLE)
LOG.debug(_("source: %s") % source)
LOG.debug(_("Query User table: source=%s") % source)
scan_args = {}
if source:
scan_args['columns'] = ['f:s_%s' % source]
@ -356,7 +364,7 @@ class Connection(base.Connection):
"""
with self.conn_pool.connection() as conn:
project_table = conn.table(self.PROJECT_TABLE)
LOG.debug(_("source: %s") % source)
LOG.debug(_("Query Project table: source=%s") % source)
scan_args = {}
if source:
scan_args['columns'] = ['f:s_%s' % source]
@ -389,7 +397,6 @@ class Connection(base.Connection):
resource=resource, source=source, metaquery=metaquery)
q, start_row, stop_row = make_sample_query_from_filter(
sample_filter, require_meter=False)
with self.conn_pool.connection() as conn:
meter_table = conn.table(self.METER_TABLE)
LOG.debug(_("Query Meter table: %s") % q)
@ -406,8 +413,9 @@ class Connection(base.Connection):
meters = sorted(d_meters, key=_resource_id_from_record_tuple)
for resource_id, r_meters in itertools.groupby(
meters, key=_resource_id_from_record_tuple):
# We need deserialized entry(data[0]) and metadata(data[3])
meter_rows = [(data[0], data[3]) for data in sorted(
# We need deserialized entry(data[0]), sources (data[1]) and
# metadata(data[3])
meter_rows = [(data[0], data[1], data[3]) for data in sorted(
r_meters, key=_timestamp_from_record_tuple)]
latest_data = meter_rows[-1]
min_ts = meter_rows[0][0]['timestamp']
@ -417,9 +425,9 @@ class Connection(base.Connection):
first_sample_timestamp=min_ts,
last_sample_timestamp=max_ts,
project_id=latest_data[0]['project_id'],
source=latest_data[0]['source'],
source=latest_data[1][0],
user_id=latest_data[0]['user_id'],
metadata=latest_data[1],
metadata=latest_data[2],
)
def get_meters(self, user=None, project=None, resource=None, source=None,
@ -444,23 +452,29 @@ class Connection(base.Connection):
LOG.debug(_("Query Resource table: %s") % q)
gen = resource_table.scan(filter=q)
# We need result set to be sure that user doesn't receive several
# same meters. Please see bug
# https://bugs.launchpad.net/ceilometer/+bug/1301371
result = set()
for ignored, data in gen:
flatten_result, s, m, md = deserialize_entry(data)
if not m:
continue
# Meter table may have only one "meter" and "source". That's
# why only first lists element is get in this method
name, type, unit = m[0].split("!")
yield models.Meter(
name=name,
type=type,
unit=unit,
resource_id=flatten_result['resource_id'],
project_id=flatten_result['project_id'],
source=s[0] if s else None,
user_id=flatten_result['user_id'],
)
flatten_result, s, meters, md = deserialize_entry(data)
for m in meters:
meter_raw, m_source = m.split("+")
name, type, unit = meter_raw.split('!')
meter_dict = {'name': name,
'type': type,
'unit': unit,
'resource_id': flatten_result['resource_id'],
'project_id': flatten_result['project_id'],
'user_id': flatten_result['user_id']}
frozen_meter = frozenset(meter_dict.items())
if frozen_meter in result:
continue
result.add(frozen_meter)
meter_dict.update({'source':
m_source if m_source else None})
yield models.Meter(**meter_dict)
def get_samples(self, sample_filter, limit=None):
"""Return an iterable of models.Sample instances.
@ -609,7 +623,10 @@ class MTable(object):
return ((k, self.row(k)) for k in keys)
def put(self, key, data):
self._rows[key] = data
if key not in self._rows:
self._rows[key] = data
else:
self._rows[key].update(data)
def delete(self, key):
del self._rows[key]
@ -799,9 +816,14 @@ def make_query(metaquery=None, **kwargs):
# found in table.
for key, value in kwargs.items():
if value is not None:
q.append("SingleColumnValueFilter "
"('f', '%s', =, 'binary:%s', true, true)" %
(key, dump(value)))
if key == 'source':
q.append("SingleColumnValueFilter "
"('f', 's_%s', =, 'binary:%s', true, true)" %
(value, dump('1')))
else:
q.append("SingleColumnValueFilter "
"('f', '%s', =, 'binary:%s', true, true)" %
(key, dump(value)))
res_q = None
if len(q):
res_q = " AND ".join(q)
@ -869,10 +891,10 @@ def _make_general_rowkey_scan(rts_start=None, rts_end=None, some_id=None):
return start_row, end_row
def _format_meter_reference(counter_name, counter_type, counter_unit):
def _format_meter_reference(counter_name, counter_type, counter_unit, source):
"""Format reference to meter data.
"""
return "%s!%s!%s" % (counter_name, counter_type, counter_unit)
return "%s!%s!%s+%s" % (counter_name, counter_type, counter_unit, source)
def _timestamp_from_record_tuple(record):
@ -906,14 +928,14 @@ def deserialize_entry(entry, get_raw_meta=True):
for k, v in entry.items():
if k.startswith('f:s_'):
sources.append(k[4:])
elif k.startswith('f:m_'):
meters.append(k[4:])
elif k.startswith('f:r_metadata.'):
metadata_flattened[k[len('f:r_metadata.'):]] = load(v)
elif k.startswith('f:m_'):
meters.append(k[4:])
else:
flatten_result[k[2:]] = load(v)
if get_raw_meta:
metadata = flatten_result.get('metadata', {})
metadata = flatten_result.get('resource_metadata', {})
else:
metadata = metadata_flattened
@ -931,28 +953,24 @@ def serialize_entry(data={}, **kwargs):
result = {}
for k, v in entry_dict.items():
if k == 'sources':
# user and project tables may contain several sources and meters
# that's why we store it separately as pairs "source/meter name:1".
# Resource and meter table contain only one and it's possible
# to store pairs like "source/meter:source name/meter name". But to
# keep things simple it's possible to store all variants in all
# tables because it doesn't break logic and overhead is not too big
for source in v:
result['f:s_%s' % source] = dump('1')
if v:
result['f:source'] = dump(v[0])
elif k == 'meters':
for meter in v:
result['f:m_%s' % meter] = dump('1')
elif k == 'metadata':
if k == 'source':
# user, project and resource tables may contain several sources.
# Besides, resource table may contain several meters.
# To make insertion safe we need to store all meters and sources in
# a separate cell. For this purpose s_ and m_ prefixes are
# introduced.
result['f:s_%s' % v] = dump('1')
elif k == 'meter':
result['f:m_%s' % v] = dump('1')
elif k == 'resource_metadata':
# keep raw metadata as well as flattened to provide
# capability with API v2. It will be flattened in another
# way on API level. But we need flattened too for quick filtering.
flattened_meta = dump_metadata(v)
for k, m in flattened_meta.items():
result['f:r_metadata.' + k] = dump(m)
result['f:metadata'] = dump(v)
result['f:resource_metadata'] = dump(v)
else:
result['f:' + k] = dump(v)
return result

View File

@ -132,6 +132,22 @@ class TestListMeters(FunctionalTest,
'util': 0.75,
'is_public': False},
source='test_source'),
sample.Sample(
'meter.test.new',
'cumulative',
'',
1,
'user-id',
'project-id',
'resource-id',
timestamp=datetime.datetime(2012, 7, 2, 10, 40),
resource_metadata={'display_name': 'test-server',
'tag': 'self.sample3',
'size': 0,
'util': 0.75,
'is_public': False},
source='test_source'),
sample.Sample(
'meter.mine',
'gauge',
@ -160,13 +176,13 @@ class TestListMeters(FunctionalTest,
def test_list_meters(self):
data = self.get_json('/meters')
self.assertEqual(4, len(data))
self.assertEqual(5, len(data))
self.assertEqual(set(['resource-id',
'resource-id2',
'resource-id3',
'resource-id4']),
set(r['resource_id'] for r in data))
self.assertEqual(set(['meter.test', 'meter.mine']),
self.assertEqual(set(['meter.test', 'meter.mine', 'meter.test.new']),
set(r['name'] for r in data))
self.assertEqual(set(['test_source', 'test_source1']),
set(r['source'] for r in data))
@ -187,7 +203,7 @@ class TestListMeters(FunctionalTest,
def test_list_samples(self):
data = self.get_json('/samples')
self.assertEqual(5, len(data))
self.assertEqual(6, len(data))
def test_query_samples_with_invalid_field_name_and_non_eq_operator(self):
resp = self.get_json('/samples',
@ -459,7 +475,7 @@ class TestListMeters(FunctionalTest,
'value': 'resource-id',
}])
nids = set(r['name'] for r in data)
self.assertEqual(set(['meter.test']), nids)
self.assertEqual(set(['meter.test', 'meter.test.new']), nids)
sids = set(r['source'] for r in data)
self.assertEqual(set(['test_source']), sids)
@ -540,7 +556,8 @@ class TestListMeters(FunctionalTest,
self.assertEqual(set(['user-id']), uids)
nids = set(r['name'] for r in data)
self.assertEqual(set(['meter.mine', 'meter.test']), nids)
self.assertEqual(set(['meter.mine', 'meter.test', 'meter.test.new']),
nids)
rids = set(r['resource_id'] for r in data)
self.assertEqual(set(['resource-id', 'resource-id2']), rids)