Merge "configurable sacks framework"

This commit is contained in:
Jenkins 2017-05-18 19:03:23 +00:00 committed by Gerrit Code Review
commit 735c59bd69
12 changed files with 148 additions and 24 deletions

View File

@ -163,6 +163,35 @@ process is continuously increasing, you will need to (maybe temporarily)
increase the number of `gnocchi-metricd` daemons. You can run any number of
metricd daemon on any number of servers.
How to scale measure processing
===============================
Measurement data pushed to Gnocchi is divided into sacks for better
distribution. The number of partitions is controlled by the `sacks` option
under the `[incoming]` section. This value should be set based on the
number of active metrics the system will capture. Additionally, the number of
`sacks`, should be higher than the total number of active metricd workers.
distribution. Incoming metrics are pushed to specific sacks and each sack
is assigned to one or more `gnocchi-metricd` daemons for processing.
How many sacks do we need to create
-----------------------------------
This number of sacks enabled should be set based on the number of active
metrics the system will capture. Additionally, the number of sacks, should
be higher than the total number of active `gnocchi-metricd` workers.
In general, use the following equation to determine the appropriate `sacks`
value to set:
.. math::
sacks value = number of **active** metrics / 300
If the estimated number of metrics is the absolute maximum, divide the value
by 500 instead. If the estimated number of active metrics is conservative and
expected to grow, divide the value by 100 instead to accommodate growth.
How to monitor Gnocchi
======================

View File

@ -53,6 +53,9 @@ def upgrade():
help="Skip storage upgrade."),
cfg.BoolOpt("skip-archive-policies-creation", default=False,
help="Skip default archive policies creation."),
cfg.IntOpt("num-storage-sacks", default=128,
help="Initial number of storage sacks to create."),
])
conf = service.prepare_service(conf=conf)
index = indexer.get_driver(conf)
@ -63,7 +66,7 @@ def upgrade():
if not conf.skip_storage:
s = storage.get_driver(conf)
LOG.info("Upgrading storage %s", s)
s.upgrade(index)
s.upgrade(index, conf.num_storage_sacks)
if (not conf.skip_archive_policies_creation
and not index.list_archive_policies()

View File

@ -162,8 +162,8 @@ class StorageDriver(object):
def stop():
pass
def upgrade(self, index):
self.incoming.upgrade(index)
def upgrade(self, index, num_sacks):
self.incoming.upgrade(index, num_sacks)
def process_background_tasks(self, index, metrics, sync=False):
"""Process background tasks for this storage.

View File

@ -32,11 +32,40 @@ _NUM_WORKERS = utils.get_default_workers()
class CarbonaraBasedStorage(incoming.StorageDriver):
MEASURE_PREFIX = "measure"
SACK_PREFIX = "incoming-%s"
SACK_PREFIX = "incoming"
CFG_PREFIX = 'gnocchi-config'
CFG_SACKS = 'sacks'
_MEASURE_SERIAL_FORMAT = "Qd"
_MEASURE_SERIAL_LEN = struct.calcsize(_MEASURE_SERIAL_FORMAT)
NUM_SACKS = 8
@property
def NUM_SACKS(self):
if not hasattr(self, '_num_sacks'):
try:
self._num_sacks = int(self.get_storage_sacks())
except Exception as e:
LOG.error('Unable to detect the number of storage sacks. '
'Ensure gnocchi-upgrade has been executed: %s', e)
raise
return self._num_sacks
def get_sack_prefix(self, num_sacks=None):
sacks = num_sacks if num_sacks else self.NUM_SACKS
return self.SACK_PREFIX + str(sacks) + '-%s'
def upgrade(self, index, num_sacks):
super(CarbonaraBasedStorage, self).upgrade(index)
if not self.get_storage_sacks():
self.set_storage_settings(num_sacks)
@staticmethod
def get_storage_sacks():
"""Return the number of sacks in storage. None if not set."""
raise NotImplementedError
@staticmethod
def set_storage_settings(num_sacks):
raise NotImplementedError
def _unserialize_measures(self, measure_id, data):
nb_measures = len(data) // self._MEASURE_SERIAL_LEN
@ -93,4 +122,4 @@ class CarbonaraBasedStorage(incoming.StorageDriver):
return metric_id.int % self.NUM_SACKS
def get_sack_name(self, sack):
return self.SACK_PREFIX % sack
return self.get_sack_prefix() % sack

View File

@ -15,6 +15,7 @@ from collections import defaultdict
import contextlib
import datetime
import functools
import json
import uuid
import six
@ -57,6 +58,17 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
ceph.close_rados_connection(self.rados, self.ioctx)
super(CephStorage, self).stop()
def get_storage_sacks(self):
try:
return json.loads(
self.ioctx.read(self.CFG_PREFIX).decode())[self.CFG_SACKS]
except rados.ObjectNotFound:
return
def set_storage_settings(self, num_sacks):
self.ioctx.write_full(self.CFG_PREFIX,
json.dumps({self.CFG_SACKS: num_sacks}).encode())
def add_measures_batch(self, metrics_and_measures):
names_by_sack = defaultdict(list)
for metric, measures in six.iteritems(metrics_and_measures):

View File

@ -14,6 +14,7 @@
import contextlib
import datetime
import errno
import json
import os
import tempfile
import uuid
@ -30,11 +31,26 @@ class FileStorage(_carbonara.CarbonaraBasedStorage):
self.basepath = conf.file_basepath
self.basepath_tmp = os.path.join(self.basepath, 'tmp')
def upgrade(self, index):
super(FileStorage, self).upgrade(index)
def upgrade(self, index, num_sacks):
super(FileStorage, self).upgrade(index, num_sacks)
utils.ensure_paths([self.basepath_tmp])
def get_storage_sacks(self):
try:
with open(os.path.join(self.basepath_tmp, self.CFG_PREFIX),
'r') as f:
return json.load(f)[self.CFG_SACKS]
except IOError as e:
if e.errno == errno.ENOENT:
return
raise
def set_storage_settings(self, num_sacks):
data = {self.CFG_SACKS: num_sacks}
with open(os.path.join(self.basepath_tmp, self.CFG_PREFIX), 'w') as f:
json.dump(data, f)
utils.ensure_paths([self._sack_path(i)
for i in six.moves.range(self.NUM_SACKS)])
utils.ensure_paths([self.basepath_tmp])
def _sack_path(self, sack):
return os.path.join(self.basepath, self.get_sack_name(sack))

View File

@ -28,6 +28,12 @@ class RedisStorage(_carbonara.CarbonaraBasedStorage):
super(RedisStorage, self).__init__(conf)
self._client = redis.get_client(conf)
def get_storage_sacks(self):
return self._client.hget(self.CFG_PREFIX, self.CFG_SACKS)
def set_storage_settings(self, num_sacks):
self._client.hset(self.CFG_PREFIX, self.CFG_SACKS, num_sacks)
def _build_measure_path(self, metric_id):
return redis.SEP.join([
self.get_sack_name(self.sack_for_metric(metric_id)),

View File

@ -16,6 +16,7 @@
from collections import defaultdict
import contextlib
import datetime
import json
import uuid
import six
@ -29,9 +30,6 @@ botocore = s3.botocore
class S3Storage(_carbonara.CarbonaraBasedStorage):
# NOTE(gordc): override to follow s3 partitioning logic
SACK_PREFIX = '%s/'
def __init__(self, conf):
super(S3Storage, self).__init__(conf)
self.s3, self._region_name, self._bucket_prefix = (
@ -42,8 +40,26 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
self._bucket_prefix + "-" + self.MEASURE_PREFIX
)
def upgrade(self, indexer):
super(S3Storage, self).upgrade(indexer)
def get_storage_sacks(self):
try:
response = self.s3.get_object(Bucket=self._bucket_name_measures,
Key=self.CFG_PREFIX)
return json.loads(response['Body'].read().decode())[self.CFG_SACKS]
except botocore.exceptions.ClientError as e:
if e.response['Error'].get('Code') == "NoSuchKey":
return
def set_storage_settings(self, num_sacks):
data = {self.CFG_SACKS: num_sacks}
self.s3.put_object(Bucket=self._bucket_name_measures,
Key=self.CFG_PREFIX,
Body=json.dumps(data).encode())
def get_sack_prefix(self, num_sacks=None):
# NOTE(gordc): override to follow s3 partitioning logic
return '%s-' + ('%s/' % (num_sacks if num_sacks else self.NUM_SACKS))
def upgrade(self, indexer, num_sacks):
try:
s3.create_bucket(self.s3, self._bucket_name_measures,
self._region_name)
@ -52,6 +68,8 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
"BucketAlreadyExists", "BucketAlreadyOwnedByYou"
):
raise
# need to create bucket first to store storage settings object
super(S3Storage, self).upgrade(indexer, num_sacks)
def _store_new_measures(self, metric, data):
now = datetime.datetime.utcnow().strftime("_%Y%m%d_%H:%M:%S")
@ -77,8 +95,9 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
**kwargs)
# FIXME(gordc): this can be streamlined if not details
for c in response.get('Contents', ()):
__, metric, metric_file = c['Key'].split("/", 2)
metric_details[metric] += 1
if c['Key'] != self.CFG_PREFIX:
__, metric, metric_file = c['Key'].split("/", 2)
metric_details[metric] += 1
return (len(metric_details), sum(metric_details.values()),
metric_details if details else None)

View File

@ -14,6 +14,7 @@
from collections import defaultdict
import contextlib
import datetime
import json
import uuid
import six
@ -30,8 +31,18 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage):
super(SwiftStorage, self).__init__(conf)
self.swift = swift.get_connection(conf)
def upgrade(self, index):
super(SwiftStorage, self).upgrade(index)
def get_storage_sacks(self):
try:
__, data = self.swift.get_object(self.CFG_PREFIX, self.CFG_PREFIX)
return json.loads(data)[self.CFG_SACKS]
except swclient.ClientException as e:
if e.http_status == 404:
return
def set_storage_settings(self, num_sacks):
self.swift.put_container(self.CFG_PREFIX)
self.swift.put_object(self.CFG_PREFIX, self.CFG_PREFIX,
json.dumps({self.CFG_SACKS: num_sacks}))
for i in six.moves.range(self.NUM_SACKS):
self.swift.put_container(self.get_sack_name(i))

View File

@ -75,8 +75,8 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
else:
self._consistency_stop = None
def upgrade(self, index):
super(S3Storage, self).upgrade(index)
def upgrade(self, index, num_sacks):
super(S3Storage, self).upgrade(index, num_sacks)
try:
s3.create_bucket(self.s3, self._bucket_name, self._region_name)
except botocore.exceptions.ClientError as e:

View File

@ -325,10 +325,9 @@ class TestCase(base.BaseTestCase):
if self.conf.storage.driver == 'redis':
# Create one prefix per test
self.storage.STORAGE_PREFIX = str(uuid.uuid4())
self.storage.incoming.SACK_PREFIX = (
str(uuid.uuid4()) + self.storage.incoming.SACK_PREFIX)
self.storage.incoming.SACK_PREFIX = str(uuid.uuid4())
self.storage.upgrade(self.index)
self.storage.upgrade(self.index, 128)
def tearDown(self):
self.index.disconnect()

View File

@ -134,7 +134,7 @@ class ConfigFixture(fixture.GabbiFixture):
self.index = index
s = storage.get_driver(conf)
s.upgrade(index)
s.upgrade(index, 128)
LOAD_APP_KWARGS = {
'storage': s,