Merge "ceph: async write new measures"
This commit is contained in:
commit
121e5cc3a7
|
@ -79,6 +79,7 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
|
|||
|
||||
def add_measures_batch(self, metrics_and_measures):
|
||||
names_by_sack = defaultdict(list)
|
||||
ops = []
|
||||
for metric, measures in six.iteritems(metrics_and_measures):
|
||||
name = "_".join((
|
||||
self.MEASURE_PREFIX,
|
||||
|
@ -88,8 +89,12 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
|
|||
sack = self.get_sack_name(self.sack_for_metric(metric.id))
|
||||
names_by_sack[sack].append(name)
|
||||
data = self._encode_measures(measures)
|
||||
self.ioctx.write_full(name, data)
|
||||
ops.append(self.ioctx.aio_write_full(name, data))
|
||||
while ops:
|
||||
op = ops.pop()
|
||||
op.wait_for_complete()
|
||||
|
||||
ops = []
|
||||
for sack, names in names_by_sack.items():
|
||||
with rados.WriteOpCtx() as op:
|
||||
# NOTE(sileht): list all objects in a pool is too slow with
|
||||
|
@ -99,8 +104,11 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
|
|||
# omap the list of objects to process (not xattr because
|
||||
# it doesn't # allow to configure the locking behavior)
|
||||
self.ioctx.set_omap(op, tuple(names), (b"",) * len(names))
|
||||
self.ioctx.operate_write_op(op, sack,
|
||||
flags=self.OMAP_WRITE_FLAGS)
|
||||
ops.append(self.ioctx.operate_aio_write_op(
|
||||
op, sack, flags=self.OMAP_WRITE_FLAGS))
|
||||
while ops:
|
||||
op = ops.pop()
|
||||
op.wait_for_complete()
|
||||
|
||||
def _build_report(self, details):
|
||||
metrics = set()
|
||||
|
|
Loading…
Reference in New Issue