configurable sacks framework
this adds framework to configure sack size without corruption. - default to 128 sacks - add note on how to calculate how many sacks to set actual ability to change value done in subsequent patch. Change-Id: I389c1a7ca9b3fe39b3716782e85073796ad26333
This commit is contained in:
parent
b21b878f36
commit
1bacea9416
|
@ -133,6 +133,35 @@ process is continuously increasing, you will need to (maybe temporarily)
|
|||
increase the number of `gnocchi-metricd` daemons. You can run any number of
|
||||
metricd daemon on any number of servers.
|
||||
|
||||
How to scale measure processing
|
||||
===============================
|
||||
|
||||
Measurement data pushed to Gnocchi is divided into sacks for better
|
||||
distribution. The number of partitions is controlled by the `sacks` option
|
||||
under the `[incoming]` section. This value should be set based on the
|
||||
number of active metrics the system will capture. Additionally, the number of
|
||||
`sacks`, should be higher than the total number of active metricd workers.
|
||||
distribution. Incoming metrics are pushed to specific sacks and each sack
|
||||
is assigned to one or more `gnocchi-metricd` daemons for processing.
|
||||
|
||||
How many sacks do we need to create
|
||||
-----------------------------------
|
||||
|
||||
This number of sacks enabled should be set based on the number of active
|
||||
metrics the system will capture. Additionally, the number of sacks, should
|
||||
be higher than the total number of active `gnocchi-metricd` workers.
|
||||
|
||||
In general, use the following equation to determine the appropriate `sacks`
|
||||
value to set:
|
||||
|
||||
.. math::
|
||||
|
||||
sacks value = number of **active** metrics / 300
|
||||
|
||||
If the estimated number of metrics is the absolute maximum, divide the value
|
||||
by 500 instead. If the estimated number of active metrics is conservative and
|
||||
expected to grow, divide the value by 100 instead to accommodate growth.
|
||||
|
||||
How to monitor Gnocchi
|
||||
======================
|
||||
|
||||
|
|
|
@ -53,6 +53,9 @@ def upgrade():
|
|||
help="Skip storage upgrade."),
|
||||
cfg.BoolOpt("skip-archive-policies-creation", default=False,
|
||||
help="Skip default archive policies creation."),
|
||||
cfg.IntOpt("num-storage-sacks", default=128,
|
||||
help="Initial number of storage sacks to create."),
|
||||
|
||||
])
|
||||
conf = service.prepare_service(conf=conf)
|
||||
index = indexer.get_driver(conf)
|
||||
|
@ -63,7 +66,7 @@ def upgrade():
|
|||
if not conf.skip_storage:
|
||||
s = storage.get_driver(conf)
|
||||
LOG.info("Upgrading storage %s", s)
|
||||
s.upgrade(index)
|
||||
s.upgrade(index, conf.num_storage_sacks)
|
||||
|
||||
if (not conf.skip_archive_policies_creation
|
||||
and not index.list_archive_policies()
|
||||
|
|
|
@ -162,8 +162,8 @@ class StorageDriver(object):
|
|||
def stop():
|
||||
pass
|
||||
|
||||
def upgrade(self, index):
|
||||
self.incoming.upgrade(index)
|
||||
def upgrade(self, index, num_sacks):
|
||||
self.incoming.upgrade(index, num_sacks)
|
||||
|
||||
def process_background_tasks(self, index, metrics, sync=False):
|
||||
"""Process background tasks for this storage.
|
||||
|
|
|
@ -32,11 +32,40 @@ _NUM_WORKERS = utils.get_default_workers()
|
|||
|
||||
class CarbonaraBasedStorage(incoming.StorageDriver):
|
||||
MEASURE_PREFIX = "measure"
|
||||
SACK_PREFIX = "incoming-%s"
|
||||
SACK_PREFIX = "incoming"
|
||||
CFG_PREFIX = 'gnocchi-config'
|
||||
CFG_SACKS = 'sacks'
|
||||
_MEASURE_SERIAL_FORMAT = "Qd"
|
||||
_MEASURE_SERIAL_LEN = struct.calcsize(_MEASURE_SERIAL_FORMAT)
|
||||
|
||||
NUM_SACKS = 8
|
||||
@property
|
||||
def NUM_SACKS(self):
|
||||
if not hasattr(self, '_num_sacks'):
|
||||
try:
|
||||
self._num_sacks = int(self.get_storage_sacks())
|
||||
except Exception as e:
|
||||
LOG.error('Unable to detect the number of storage sacks. '
|
||||
'Ensure gnocchi-upgrade has been executed: %s', e)
|
||||
raise
|
||||
return self._num_sacks
|
||||
|
||||
def get_sack_prefix(self, num_sacks=None):
|
||||
sacks = num_sacks if num_sacks else self.NUM_SACKS
|
||||
return self.SACK_PREFIX + str(sacks) + '-%s'
|
||||
|
||||
def upgrade(self, index, num_sacks):
|
||||
super(CarbonaraBasedStorage, self).upgrade(index)
|
||||
if not self.get_storage_sacks():
|
||||
self.set_storage_settings(num_sacks)
|
||||
|
||||
@staticmethod
|
||||
def get_storage_sacks():
|
||||
"""Return the number of sacks in storage. None if not set."""
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def set_storage_settings(num_sacks):
|
||||
raise NotImplementedError
|
||||
|
||||
def _unserialize_measures(self, measure_id, data):
|
||||
nb_measures = len(data) // self._MEASURE_SERIAL_LEN
|
||||
|
@ -93,4 +122,4 @@ class CarbonaraBasedStorage(incoming.StorageDriver):
|
|||
return metric_id.int % self.NUM_SACKS
|
||||
|
||||
def get_sack_name(self, sack):
|
||||
return self.SACK_PREFIX % sack
|
||||
return self.get_sack_prefix() % sack
|
||||
|
|
|
@ -15,6 +15,7 @@ from collections import defaultdict
|
|||
import contextlib
|
||||
import datetime
|
||||
import functools
|
||||
import json
|
||||
import uuid
|
||||
|
||||
import six
|
||||
|
@ -57,6 +58,17 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
|
|||
ceph.close_rados_connection(self.rados, self.ioctx)
|
||||
super(CephStorage, self).stop()
|
||||
|
||||
def get_storage_sacks(self):
|
||||
try:
|
||||
return json.loads(
|
||||
self.ioctx.read(self.CFG_PREFIX).decode())[self.CFG_SACKS]
|
||||
except rados.ObjectNotFound:
|
||||
return
|
||||
|
||||
def set_storage_settings(self, num_sacks):
|
||||
self.ioctx.write_full(self.CFG_PREFIX,
|
||||
json.dumps({self.CFG_SACKS: num_sacks}).encode())
|
||||
|
||||
def add_measures_batch(self, metrics_and_measures):
|
||||
names_by_sack = defaultdict(list)
|
||||
for metric, measures in six.iteritems(metrics_and_measures):
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
import contextlib
|
||||
import datetime
|
||||
import errno
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import uuid
|
||||
|
@ -30,11 +31,26 @@ class FileStorage(_carbonara.CarbonaraBasedStorage):
|
|||
self.basepath = conf.file_basepath
|
||||
self.basepath_tmp = os.path.join(self.basepath, 'tmp')
|
||||
|
||||
def upgrade(self, index):
|
||||
super(FileStorage, self).upgrade(index)
|
||||
def upgrade(self, index, num_sacks):
|
||||
super(FileStorage, self).upgrade(index, num_sacks)
|
||||
utils.ensure_paths([self.basepath_tmp])
|
||||
|
||||
def get_storage_sacks(self):
|
||||
try:
|
||||
with open(os.path.join(self.basepath_tmp, self.CFG_PREFIX),
|
||||
'r') as f:
|
||||
return json.load(f)[self.CFG_SACKS]
|
||||
except IOError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
return
|
||||
raise
|
||||
|
||||
def set_storage_settings(self, num_sacks):
|
||||
data = {self.CFG_SACKS: num_sacks}
|
||||
with open(os.path.join(self.basepath_tmp, self.CFG_PREFIX), 'w') as f:
|
||||
json.dump(data, f)
|
||||
utils.ensure_paths([self._sack_path(i)
|
||||
for i in six.moves.range(self.NUM_SACKS)])
|
||||
utils.ensure_paths([self.basepath_tmp])
|
||||
|
||||
def _sack_path(self, sack):
|
||||
return os.path.join(self.basepath, self.get_sack_name(sack))
|
||||
|
|
|
@ -28,6 +28,12 @@ class RedisStorage(_carbonara.CarbonaraBasedStorage):
|
|||
super(RedisStorage, self).__init__(conf)
|
||||
self._client = redis.get_client(conf)
|
||||
|
||||
def get_storage_sacks(self):
|
||||
return self._client.hget(self.CFG_PREFIX, self.CFG_SACKS)
|
||||
|
||||
def set_storage_settings(self, num_sacks):
|
||||
self._client.hset(self.CFG_PREFIX, self.CFG_SACKS, num_sacks)
|
||||
|
||||
def _build_measure_path(self, metric_id):
|
||||
return redis.SEP.join([
|
||||
self.get_sack_name(self.sack_for_metric(metric_id)),
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
from collections import defaultdict
|
||||
import contextlib
|
||||
import datetime
|
||||
import json
|
||||
import uuid
|
||||
|
||||
import six
|
||||
|
@ -29,9 +30,6 @@ botocore = s3.botocore
|
|||
|
||||
class S3Storage(_carbonara.CarbonaraBasedStorage):
|
||||
|
||||
# NOTE(gordc): override to follow s3 partitioning logic
|
||||
SACK_PREFIX = '%s/'
|
||||
|
||||
def __init__(self, conf):
|
||||
super(S3Storage, self).__init__(conf)
|
||||
self.s3, self._region_name, self._bucket_prefix = (
|
||||
|
@ -42,8 +40,26 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
|
|||
self._bucket_prefix + "-" + self.MEASURE_PREFIX
|
||||
)
|
||||
|
||||
def upgrade(self, indexer):
|
||||
super(S3Storage, self).upgrade(indexer)
|
||||
def get_storage_sacks(self):
|
||||
try:
|
||||
response = self.s3.get_object(Bucket=self._bucket_name_measures,
|
||||
Key=self.CFG_PREFIX)
|
||||
return json.loads(response['Body'].read().decode())[self.CFG_SACKS]
|
||||
except botocore.exceptions.ClientError as e:
|
||||
if e.response['Error'].get('Code') == "NoSuchKey":
|
||||
return
|
||||
|
||||
def set_storage_settings(self, num_sacks):
|
||||
data = {self.CFG_SACKS: num_sacks}
|
||||
self.s3.put_object(Bucket=self._bucket_name_measures,
|
||||
Key=self.CFG_PREFIX,
|
||||
Body=json.dumps(data).encode())
|
||||
|
||||
def get_sack_prefix(self, num_sacks=None):
|
||||
# NOTE(gordc): override to follow s3 partitioning logic
|
||||
return '%s-' + ('%s/' % (num_sacks if num_sacks else self.NUM_SACKS))
|
||||
|
||||
def upgrade(self, indexer, num_sacks):
|
||||
try:
|
||||
s3.create_bucket(self.s3, self._bucket_name_measures,
|
||||
self._region_name)
|
||||
|
@ -52,6 +68,8 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
|
|||
"BucketAlreadyExists", "BucketAlreadyOwnedByYou"
|
||||
):
|
||||
raise
|
||||
# need to create bucket first to store storage settings object
|
||||
super(S3Storage, self).upgrade(indexer, num_sacks)
|
||||
|
||||
def _store_new_measures(self, metric, data):
|
||||
now = datetime.datetime.utcnow().strftime("_%Y%m%d_%H:%M:%S")
|
||||
|
@ -77,8 +95,9 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
|
|||
**kwargs)
|
||||
# FIXME(gordc): this can be streamlined if not details
|
||||
for c in response.get('Contents', ()):
|
||||
__, metric, metric_file = c['Key'].split("/", 2)
|
||||
metric_details[metric] += 1
|
||||
if c['Key'] != self.CFG_PREFIX:
|
||||
__, metric, metric_file = c['Key'].split("/", 2)
|
||||
metric_details[metric] += 1
|
||||
return (len(metric_details), sum(metric_details.values()),
|
||||
metric_details if details else None)
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
from collections import defaultdict
|
||||
import contextlib
|
||||
import datetime
|
||||
import json
|
||||
import uuid
|
||||
|
||||
import six
|
||||
|
@ -30,8 +31,18 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage):
|
|||
super(SwiftStorage, self).__init__(conf)
|
||||
self.swift = swift.get_connection(conf)
|
||||
|
||||
def upgrade(self, index):
|
||||
super(SwiftStorage, self).upgrade(index)
|
||||
def get_storage_sacks(self):
|
||||
try:
|
||||
__, data = self.swift.get_object(self.CFG_PREFIX, self.CFG_PREFIX)
|
||||
return json.loads(data)[self.CFG_SACKS]
|
||||
except swclient.ClientException as e:
|
||||
if e.http_status == 404:
|
||||
return
|
||||
|
||||
def set_storage_settings(self, num_sacks):
|
||||
self.swift.put_container(self.CFG_PREFIX)
|
||||
self.swift.put_object(self.CFG_PREFIX, self.CFG_PREFIX,
|
||||
json.dumps({self.CFG_SACKS: num_sacks}))
|
||||
for i in six.moves.range(self.NUM_SACKS):
|
||||
self.swift.put_container(self.get_sack_name(i))
|
||||
|
||||
|
|
|
@ -75,8 +75,8 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
|
|||
else:
|
||||
self._consistency_stop = None
|
||||
|
||||
def upgrade(self, index):
|
||||
super(S3Storage, self).upgrade(index)
|
||||
def upgrade(self, index, num_sacks):
|
||||
super(S3Storage, self).upgrade(index, num_sacks)
|
||||
try:
|
||||
s3.create_bucket(self.s3, self._bucket_name, self._region_name)
|
||||
except botocore.exceptions.ClientError as e:
|
||||
|
|
|
@ -325,10 +325,9 @@ class TestCase(base.BaseTestCase):
|
|||
if self.conf.storage.driver == 'redis':
|
||||
# Create one prefix per test
|
||||
self.storage.STORAGE_PREFIX = str(uuid.uuid4())
|
||||
self.storage.incoming.SACK_PREFIX = (
|
||||
str(uuid.uuid4()) + self.storage.incoming.SACK_PREFIX)
|
||||
self.storage.incoming.SACK_PREFIX = str(uuid.uuid4())
|
||||
|
||||
self.storage.upgrade(self.index)
|
||||
self.storage.upgrade(self.index, 128)
|
||||
|
||||
def tearDown(self):
|
||||
self.index.disconnect()
|
||||
|
|
|
@ -134,7 +134,7 @@ class ConfigFixture(fixture.GabbiFixture):
|
|||
self.index = index
|
||||
|
||||
s = storage.get_driver(conf)
|
||||
s.upgrade(index)
|
||||
s.upgrade(index, 128)
|
||||
|
||||
LOAD_APP_KWARGS = {
|
||||
'storage': s,
|
||||
|
|
Loading…
Reference in New Issue