ceph: store measures in omap values

ceph sucks for small objects; POSTs to ceph incoming are always
small objects.

this changes logic to store new measures as omap values instead of
objects. this allows us to write to 'memory' aka leveldb/rocksdb
instead of disk.

this does not change the durability agreement since we are already
storing object keys in omap so if omap fails, we will lose link
to objects regardless if on disk or not.

using local 20OSD ceph cluster, with 18 metricd. this is:
- ~2x faster than aio_write patch to POST
- ~2x faster than aio_write patch to process
- ~3x faster than no aio_write patch to POST
- ~3x faster than no aio_write patch to process
- ~3.5x faster than 3.1 to POST
- something a lot faster than 3.1 to process (no idea why)

Change-Id: I4bae365955fdbafe4ad837596490774c42bc5251
This commit is contained in:
gord chung 2017-05-15 21:25:36 +00:00 committed by gordon chung
parent 63fa106d78
commit 3332206969
1 changed files with 31 additions and 65 deletions

View File

@ -14,7 +14,6 @@
from collections import defaultdict
import contextlib
import datetime
import functools
import json
import uuid
@ -78,8 +77,7 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
pass
def add_measures_batch(self, metrics_and_measures):
names_by_sack = defaultdict(list)
ops = []
data_by_sack = defaultdict(lambda: defaultdict(list))
for metric, measures in six.iteritems(metrics_and_measures):
name = "_".join((
self.MEASURE_PREFIX,
@ -87,15 +85,12 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
str(uuid.uuid4()),
datetime.datetime.utcnow().strftime("%Y%m%d_%H:%M:%S")))
sack = self.get_sack_name(self.sack_for_metric(metric.id))
names_by_sack[sack].append(name)
data = self._encode_measures(measures)
ops.append(self.ioctx.aio_write_full(name, data))
while ops:
op = ops.pop()
op.wait_for_complete()
data_by_sack[sack]['names'].append(name)
data_by_sack[sack]['measures'].append(
self._encode_measures(measures))
ops = []
for sack, names in names_by_sack.items():
for sack, data in data_by_sack.items():
with rados.WriteOpCtx() as op:
# NOTE(sileht): list all objects in a pool is too slow with
# many objects (2min for 20000 objects in 50osds cluster),
@ -103,7 +98,8 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
# So we create an object MEASURE_PREFIX, that have as
# omap the list of objects to process (not xattr because
# it doesn't # allow to configure the locking behavior)
self.ioctx.set_omap(op, tuple(names), (b"",) * len(names))
self.ioctx.set_omap(op, tuple(data['names']),
tuple(data['measures']))
ops.append(self.ioctx.operate_aio_write_op(
op, sack, flags=self.OMAP_WRITE_FLAGS))
while ops:
@ -117,7 +113,7 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
for i in six.moves.range(self.NUM_SACKS):
marker = ""
while True:
names = list(self._list_object_names_to_process(
names = list(self._list_keys_to_process(
i, marker=marker, limit=self.Q_LIMIT))
if names and names[0] < marker:
raise _carbonara.ReportGenerationError("Unable to cleanly "
@ -135,8 +131,7 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
return len(metrics), count, metric_details if details else None
def _list_object_names_to_process(self, sack, prefix="", marker="",
limit=-1):
def _list_keys_to_process(self, sack, prefix="", marker="", limit=-1):
with rados.ReadOpCtx() as op:
omaps, ret = self.ioctx.get_omap_vals(op, marker, prefix, limit)
try:
@ -162,7 +157,7 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
names = set()
marker = ""
while True:
obj_names = list(self._list_object_names_to_process(
obj_names = list(self._list_keys_to_process(
sack, marker=marker, limit=self.Q_LIMIT))
names.update(name.split("_")[1] for name in obj_names)
if len(obj_names) < self.Q_LIMIT:
@ -173,87 +168,58 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
def delete_unprocessed_measures_for_metric_id(self, metric_id):
sack = self.sack_for_metric(metric_id)
object_prefix = self.MEASURE_PREFIX + "_" + str(metric_id)
object_names = tuple(self._list_object_names_to_process(
sack, object_prefix))
key_prefix = self.MEASURE_PREFIX + "_" + str(metric_id)
keys = tuple(self._list_keys_to_process(sack, key_prefix))
if not object_names:
if not keys:
return
for op in list(map(self.ioctx.aio_remove, object_names)):
op.wait_for_complete_and_cb()
# Now clean objects and omap
with rados.WriteOpCtx() as op:
# NOTE(sileht): come on Ceph, no return code
# for this operation ?!!
self.ioctx.remove_omap_keys(op, object_names)
self.ioctx.remove_omap_keys(op, keys)
self.ioctx.operate_write_op(op, self.get_sack_name(sack),
flags=self.OMAP_WRITE_FLAGS)
def has_unprocessed(self, metric):
sack = self.sack_for_metric(metric.id)
object_prefix = self.MEASURE_PREFIX + "_" + str(metric.id)
return bool(self._list_object_names_to_process(sack, object_prefix))
return bool(self._list_keys_to_process(sack, object_prefix))
@contextlib.contextmanager
def process_measure_for_metric(self, metric):
sack = self.sack_for_metric(metric.id)
object_prefix = self.MEASURE_PREFIX + "_" + str(metric.id)
object_names = tuple(self._list_object_names_to_process(
sack, object_prefix))
key_prefix = self.MEASURE_PREFIX + "_" + str(metric.id)
measures = []
ops = []
bufsize = 8192 # Same sa rados_read one
tmp_measures = {}
def add_to_measures(name, comp, data):
# Check that the measure file has not been deleted while still
# listed in the OMAP this can happen after a crash
ret = comp.get_return_value()
processed_keys = []
with rados.ReadOpCtx() as op:
omaps, ret = self.ioctx.get_omap_vals(op, "", key_prefix, -1)
self.ioctx.operate_read_op(op, self.get_sack_name(sack),
flag=self.OMAP_READ_FLAGS)
# NOTE(sileht): after reading the libradospy, I'm
# not sure that ret will have the correct value
# get_omap_vals transforms the C int to python int
# before operate_read_op is called, I dunno if the int
# content is copied during this transformation or if
# this is a pointer to the C int, I think it's copied...
try:
ceph.errno_to_exception(ret)
except rados.ObjectNotFound:
# Object has been deleted, so this is just a stalled entry
# in the OMAP listing, ignore
return
if name in tmp_measures:
tmp_measures[name] += data
else:
tmp_measures[name] = data
if len(data) < bufsize:
measures.extend(self._unserialize_measures(name,
tmp_measures[name]))
del tmp_measures[name]
else:
ops.append(self.ioctx.aio_read(
name, bufsize, len(tmp_measures[name]),
functools.partial(add_to_measures, name)
))
for name in object_names:
ops.append(self.ioctx.aio_read(
name, bufsize, 0,
functools.partial(add_to_measures, name)
))
while ops:
op = ops.pop()
op.wait_for_complete_and_cb()
for k, v in omaps:
measures.extend(self._unserialize_measures(k, v))
processed_keys.append(k)
yield measures
# First delete all objects
for op in list(map(self.ioctx.aio_remove, object_names)):
op.wait_for_complete_and_cb()
# Now clean omap
with rados.WriteOpCtx() as op:
# NOTE(sileht): come on Ceph, no return code
# for this operation ?!!
self.ioctx.remove_omap_keys(op, object_names)
self.ioctx.remove_omap_keys(op, tuple(processed_keys))
self.ioctx.operate_write_op(op, self.get_sack_name(sack),
flags=self.OMAP_WRITE_FLAGS)