storage: Add redis driver

This change adds a redis driver and uses it by default in devstack.
Because reading/writting from/to disks is too slow in our testing
environment.

Change-Id: If617260a9d8e38dc9ba9311c832be333346dd41e
This commit is contained in:
Mehdi Abaakouk 2017-02-27 23:05:35 +01:00
parent cbbcc987eb
commit eb1d64782a
14 changed files with 388 additions and 7 deletions

View File

@ -6,3 +6,4 @@ build-essential [platform:dpkg]
libffi-dev [platform:dpkg]
librados-dev [platform:dpkg]
ceph [platform:dpkg]
redis-server [platform:dpkg]

View File

@ -250,6 +250,9 @@ function configure_gnocchi {
elif [[ "$GNOCCHI_STORAGE_BACKEND" = 'file' ]] ; then
iniset $GNOCCHI_CONF storage driver file
iniset $GNOCCHI_CONF storage file_basepath $GNOCCHI_DATA_DIR/
elif [[ "$GNOCCHI_STORAGE_BACKEND" = 'redis' ]] ; then
iniset $GNOCCHI_CONF storage driver redis
iniset $GNOCCHI_CONF storage redis_url $GNOCCHI_REDIS_URL
else
echo "ERROR: could not configure storage driver"
exit 1
@ -353,7 +356,7 @@ function preinstall_gnocchi {
# install_gnocchi() - Collect source and prepare
function install_gnocchi {
if [ "${GNOCCHI_COORDINATOR_URL%%:*}" == "redis" ]; then
if [[ "$GNOCCHI_STORAGE_BACKEND" = 'redis' ]] || [[ "${GNOCCHI_COORDINATOR_URL%%:*}" == "redis" ]]; then
_gnocchi_install_redis
fi

View File

@ -44,14 +44,17 @@ GNOCCHI_STATSD_RESOURCE_ID=${GNOCCHI_STATSD_RESOURCE_ID:-$(uuidgen)}
GNOCCHI_STATSD_USER_ID=${GNOCCHI_STATSD_USER_ID:-$(uuidgen)}
GNOCCHI_STATSD_PROJECT_ID=${GNOCCHI_STATSD_PROJECT_ID:-$(uuidgen)}
# ceph gnocchi info
# Ceph gnocchi info
GNOCCHI_CEPH_USER=${GNOCCHI_CEPH_USER:-gnocchi}
GNOCCHI_CEPH_POOL=${GNOCCHI_CEPH_POOL:-gnocchi}
GNOCCHI_CEPH_POOL_PG=${GNOCCHI_CEPH_POOL_PG:-8}
GNOCCHI_CEPH_POOL_PGP=${GNOCCHI_CEPH_POOL_PGP:-8}
# Redis gnocchi info
GNOCCHI_REDIS_URL=${GNOCCHI_REDIS_URL:-redis://localhost:6379}
# Gnocchi backend
GNOCCHI_STORAGE_BACKEND=${GNOCCHI_STORAGE_BACKEND:-file}
GNOCCHI_STORAGE_BACKEND=${GNOCCHI_STORAGE_BACKEND:-redis}
# Grafana settings
GRAFANA_RPM_PKG=${GRAFANA_RPM_PKG:-https://grafanarel.s3.amazonaws.com/builds/grafana-3.0.4-1464167696.x86_64.rpm}

View File

@ -44,6 +44,7 @@ Gnocchi currently offers different storage drivers:
* `Ceph`_ (preferred)
* `OpenStack Swift`_
* `S3`_
* `Redis`_
The drivers are based on an intermediate library, named *Carbonara*, which
handles the time series manipulation, since none of these storage technologies
@ -63,6 +64,7 @@ the recommended driver.
.. _OpenStack Swift: http://docs.openstack.org/developer/swift/
.. _Ceph: https://ceph.com
.. _`S3`: https://aws.amazon.com/s3/
.. _`Redis`: https://redis.io
Available index back-ends
~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -31,6 +31,7 @@ The list of variants available is:
* ceph_recommended_lib provides Ceph (>=0.80) storage support
* ceph_alternative_lib provides Ceph (>=10.1.0) storage support
* file provides file driver support
* redis provides Redis storage support
* doc documentation building support
* test unit and functional tests support
@ -117,6 +118,9 @@ options you want to change and configure:
| storage.s3_* | Configuration options to access S3 |
| | if you use the S3 storage driver. |
+---------------------+---------------------------------------------------+
| storage.redis_* | Configuration options to access Redis |
| | if you use the Redis storage driver. |
+---------------------+---------------------------------------------------+
Configuring authentication
-----------------------------

View File

@ -26,6 +26,7 @@ import gnocchi.indexer
import gnocchi.storage
import gnocchi.storage.ceph
import gnocchi.storage.file
import gnocchi.storage.redis
import gnocchi.storage.s3
import gnocchi.storage.swift
@ -48,6 +49,7 @@ _STORAGE_OPTS = list(itertools.chain(gnocchi.storage.OPTS,
gnocchi.storage.ceph.OPTS,
gnocchi.storage.file.OPTS,
gnocchi.storage.swift.OPTS,
gnocchi.storage.redis.OPTS,
gnocchi.storage.s3.OPTS))

View File

@ -0,0 +1,127 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from six.moves.urllib import parse
from oslo_utils import strutils
try:
import redis
from redis import sentinel
except ImportError:
redis = None
sentinel = None
CLIENT_ARGS = frozenset([
'db',
'encoding',
'retry_on_timeout',
'socket_keepalive',
'socket_timeout',
'ssl',
'ssl_certfile',
'ssl_keyfile',
'sentinel',
'sentinel_fallback',
])
"""
Keys that we allow to proxy from the coordinator configuration into the
redis client (used to configure the redis client internals so that
it works as you expect/want it to).
See: http://redis-py.readthedocs.org/en/latest/#redis.Redis
See: https://github.com/andymccurdy/redis-py/blob/2.10.3/redis/client.py
"""
#: Client arguments that are expected/allowed to be lists.
CLIENT_LIST_ARGS = frozenset([
'sentinel_fallback',
])
#: Client arguments that are expected to be boolean convertible.
CLIENT_BOOL_ARGS = frozenset([
'retry_on_timeout',
'ssl',
])
#: Client arguments that are expected to be int convertible.
CLIENT_INT_ARGS = frozenset([
'db',
'socket_keepalive',
'socket_timeout',
])
#: Default socket timeout to use when none is provided.
CLIENT_DEFAULT_SOCKET_TO = 30
def get_client(conf):
if redis is None:
raise RuntimeError("python-redis unavailable")
parsed_url = parse.urlparse(conf.redis_url)
options = parse.parse_qs(parsed_url.query)
kwargs = {}
if parsed_url.hostname:
kwargs['host'] = parsed_url.hostname
if parsed_url.port:
kwargs['port'] = parsed_url.port
else:
if not parsed_url.path:
raise ValueError("Expected socket path in parsed urls path")
kwargs['unix_socket_path'] = parsed_url.path
if parsed_url.password:
kwargs['password'] = parsed_url.password
for a in CLIENT_ARGS:
if a not in options:
continue
if a in CLIENT_BOOL_ARGS:
v = strutils.bool_from_string(options[a][-1])
elif a in CLIENT_LIST_ARGS:
v = options[a][-1]
elif a in CLIENT_INT_ARGS:
v = int(options[a][-1])
else:
v = options[a][-1]
kwargs[a] = v
if 'socket_timeout' not in kwargs:
kwargs['socket_timeout'] = CLIENT_DEFAULT_SOCKET_TO
# Ask the sentinel for the current master if there is a
# sentinel arg.
if 'sentinel' in kwargs:
sentinel_hosts = [
tuple(fallback.split(':'))
for fallback in kwargs.get('sentinel_fallback', [])
]
sentinel_hosts.insert(0, (kwargs['host'], kwargs['port']))
sentinel_server = sentinel.Sentinel(
sentinel_hosts,
socket_timeout=kwargs['socket_timeout'])
sentinel_name = kwargs['sentinel']
del kwargs['sentinel']
if 'sentinel_fallback' in kwargs:
del kwargs['sentinel_fallback']
master_client = sentinel_server.master_for(sentinel_name, **kwargs)
# The master_client is a redis.StrictRedis using a
# Sentinel managed connection pool.
return master_client
return redis.StrictRedis(**kwargs)

View File

@ -0,0 +1,87 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import datetime
import os
import uuid
import six
from gnocchi.storage.common import redis
from gnocchi.storage.incoming import _carbonara
class RedisStorage(_carbonara.CarbonaraBasedStorage):
STORAGE_PREFIX = "incoming"
def __init__(self, conf):
super(RedisStorage, self).__init__(conf)
self._client = redis.get_client(conf)
def _build_measure_path(self, metric_id, random_id=None):
path = os.path.join(self.STORAGE_PREFIX, six.text_type(metric_id))
if random_id:
if random_id is True:
now = datetime.datetime.utcnow().strftime("_%Y%m%d_%H:%M:%S")
random_id = six.text_type(uuid.uuid4()) + now
return os.path.join(path, random_id)
return path
def _store_new_measures(self, metric, data):
path = self._build_measure_path(metric.id, True)
self._client.set(path.encode("utf8"), data)
def _build_report(self, details):
match = os.path.join(self.STORAGE_PREFIX, "*")
metric_details = {}
for key in self._client.scan_iter(match=match.encode('utf8')):
metric = key.decode('utf8').split(os.path.sep)[1]
count = metric_details.setdefault(metric, 0)
count += 1
return (len(metric_details.keys()), sum(metric_details.values()),
metric_details if details else None)
def list_metric_with_measures_to_process(self, size, part, full=False):
match = os.path.join(self.STORAGE_PREFIX, "*")
keys = self._client.scan_iter(match=match.encode('utf8'))
measures = set([k.decode('utf8').split(os.path.sep)[1] for k in keys])
if full:
return measures
return set(list(measures)[size * part:size * (part + 1)])
def _list_measures_container_for_metric_id(self, metric_id):
match = os.path.join(self._build_measure_path(metric_id), "*")
return list(self._client.scan_iter(match=match.encode("utf8")))
def delete_unprocessed_measures_for_metric_id(self, metric_id):
keys = self._list_measures_container_for_metric_id(metric_id)
if keys:
self._client.delete(*keys)
@contextlib.contextmanager
def process_measure_for_metric(self, metric):
keys = self._list_measures_container_for_metric_id(metric.id)
measures = []
for k in keys:
data = self._client.get(k)
sp_key = k.decode('utf8').split("/")[-1]
measures.extend(self._unserialize_measures(sp_key, data))
yield measures
if keys:
self._client.delete(*keys)

129
gnocchi/storage/redis.py Normal file
View File

@ -0,0 +1,129 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_config import cfg
from gnocchi import storage
from gnocchi.storage import _carbonara
from gnocchi.storage.common import redis
OPTS = [
cfg.StrOpt('redis_url',
default='redis://localhost:6379/',
help='Redis URL'),
]
class RedisStorage(_carbonara.CarbonaraBasedStorage):
WRITE_FULL = True
STORAGE_PREFIX = "timeseries"
def __init__(self, conf, incoming):
super(RedisStorage, self).__init__(conf, incoming)
self._client = redis.get_client(conf)
def _build_metric_dir(self, metric):
return os.path.join(self.STORAGE_PREFIX, str(metric.id))
def _build_unaggregated_timeserie_path(self, metric, version=3):
return os.path.join(
self._build_metric_dir(metric),
'none' + ("_v%s" % version if version else ""))
def _build_metric_path(self, metric, aggregation):
return os.path.join(self._build_metric_dir(metric),
"agg_" + aggregation)
def _build_metric_path_for_split(self, metric, aggregation,
timestamp_key, granularity, version=3):
path = os.path.join(self._build_metric_path(metric, aggregation),
timestamp_key + "_" + str(granularity))
return path + '_v%s' % version if version else path
def _create_metric(self, metric):
path = self._build_metric_dir(metric)
ret = self._client.set(path.encode("utf-8"), "created", nx=True)
if ret is None:
raise storage.MetricAlreadyExists(metric)
def _store_unaggregated_timeserie(self, metric, data, version=3):
path = self._build_unaggregated_timeserie_path(metric, version)
self._client.set(path.encode("utf8"), data)
def _get_unaggregated_timeserie(self, metric, version=3):
path = self._build_unaggregated_timeserie_path(metric, version)
data = self._client.get(path.encode("utf8"))
if data is None:
raise storage.MetricDoesNotExist(metric)
return data
def _delete_unaggregated_timeserie(self, metric, version=3):
path = self._build_unaggregated_timeserie_path(metric, version)
data = self._client.get(path.encode("utf8"))
if data is None:
raise storage.MetricDoesNotExist(metric)
self._client.delete(path.encode("utf8"))
def _list_split_keys_for_metric(self, metric, aggregation, granularity,
version=None):
path = self._build_metric_dir(metric)
if self._client.get(path.encode("utf8")) is None:
raise storage.MetricDoesNotExist(metric)
match = os.path.join(self._build_metric_path(metric, aggregation),
"*")
split_keys = set()
for key in self._client.scan_iter(match=match.encode("utf8")):
key = key.decode("utf8")
key = key.split(os.path.sep)[-1]
meta = key.split("_")
if meta[1] == str(granularity) and self._version_check(key,
version):
split_keys.add(meta[0])
return split_keys
def _delete_metric_measures(self, metric, timestamp_key, aggregation,
granularity, version=3):
path = self._build_metric_path_for_split(
metric, aggregation, timestamp_key, granularity, version)
self._client.delete(path.encode("utf8"))
def _store_metric_measures(self, metric, timestamp_key, aggregation,
granularity, data, offset=None, version=3):
path = self._build_metric_path_for_split(metric, aggregation,
timestamp_key, granularity,
version)
self._client.set(path.encode("utf8"), data)
def _delete_metric(self, metric):
path = self._build_metric_dir(metric)
self._client.delete(path.encode("utf8"))
# Carbonara API
def _get_measures(self, metric, timestamp_key, aggregation, granularity,
version=3):
path = self._build_metric_path_for_split(
metric, aggregation, timestamp_key, granularity, version)
data = self._client.get(path.encode("utf8"))
if data is None:
fpath = self._build_metric_dir(metric)
if self._client.get(fpath.encode("utf8")) is None:
raise storage.MetricDoesNotExist(metric)
raise storage.AggregationDoesNotExist(metric, aggregation)
return data

View File

@ -17,6 +17,7 @@ import functools
import json
import os
import subprocess
import threading
import uuid
import fixtures
@ -178,6 +179,9 @@ class FakeSwiftClient(object):
@six.add_metaclass(SkipNotImplementedMeta)
class TestCase(base.BaseTestCase):
REDIS_DB_INDEX = 0
REDIS_DB_LOCK = threading.Lock()
ARCHIVE_POLICIES = {
'no_granularity_match': archive_policy.ArchivePolicy(
"no_granularity_match",
@ -310,6 +314,11 @@ class TestCase(base.BaseTestCase):
"storage")
self.storage = storage.get_driver(self.conf)
if self.conf.storage.driver == 'redis':
# Create one prefix per test
self.storage.STORAGE_PREFIX = str(uuid.uuid4())
self.storage.incoming.STORAGE_PREFIX = str(uuid.uuid4())
# NOTE(jd) Do not upgrade the storage. We don't really need the storage
# upgrade for now, and the code that upgrade from pre-1.3
# (TimeSerieArchive) uses a lot of parallel lock, which makes tooz

View File

@ -0,0 +1,5 @@
---
features:
- |
A Redis driver has been introduced for storing incoming measures and
computed timeseries.

View File

@ -8,8 +8,8 @@ do
for indexer in ${GNOCCHI_TEST_INDEXER_DRIVERS}
do
case $GNOCCHI_TEST_STORAGE_DRIVER in
ceph)
pifpaf run ceph -- pifpaf -g GNOCCHI_INDEXER_URL run $indexer -- ./tools/pretty_tox.sh $*
ceph|redis)
pifpaf run $GNOCCHI_TEST_STORAGE_DRIVER -- pifpaf -g GNOCCHI_INDEXER_URL run $indexer -- ./tools/pretty_tox.sh $*
;;
s3)
if ! which s3rver >/dev/null 2>&1

View File

@ -40,6 +40,11 @@ s3 =
msgpack-python
lz4
tooz>=1.38
redis =
redis>=2.10.0 # MIT
msgpack-python
lz4
tooz>=1.38
swift =
python-swiftclient>=3.1.0
msgpack-python
@ -106,12 +111,14 @@ gnocchi.storage =
ceph = gnocchi.storage.ceph:CephStorage
file = gnocchi.storage.file:FileStorage
s3 = gnocchi.storage.s3:S3Storage
redis = gnocchi.storage.redis:RedisStorage
gnocchi.incoming =
ceph = gnocchi.storage.incoming.ceph:CephStorage
file = gnocchi.storage.incoming.file:FileStorage
swift = gnocchi.storage.incoming.swift:SwiftStorage
s3 = gnocchi.storage.incoming.s3:S3Storage
redis = gnocchi.storage.incoming.redis:RedisStorage
gnocchi.indexer =
mysql = gnocchi.indexer.sqlalchemy:SQLAlchemyIndexer

View File

@ -9,19 +9,21 @@ passenv = LANG OS_DEBUG OS_TEST_TIMEOUT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_L
setenv =
GNOCCHI_TEST_STORAGE_DRIVER=file
GNOCCHI_TEST_INDEXER_DRIVER=postgresql
GNOCCHI_TEST_STORAGE_DRIVERS=file swift ceph s3
GNOCCHI_TEST_STORAGE_DRIVERS=file swift ceph s3 redis
GNOCCHI_TEST_INDEXER_DRIVERS=postgresql mysql
file: GNOCCHI_TEST_STORAGE_DRIVERS=file
swift: GNOCCHI_TEST_STORAGE_DRIVERS=swift
ceph: GNOCCHI_TEST_STORAGE_DRIVERS=ceph
redis: GNOCCHI_TEST_STORAGE_DRIVERS=redis
s3: GNOCCHI_TEST_STORAGE_DRIVERS=s3
postgresql: GNOCCHI_TEST_INDEXER_DRIVERS=postgresql
mysql: GNOCCHI_TEST_INDEXER_DRIVERS=mysql
GNOCCHI_STORAGE_DEPS=file,swift,s3,ceph,ceph_recommended_lib
GNOCCHI_STORAGE_DEPS=file,swift,s3,ceph,ceph_recommended_lib,redis
ceph: GNOCCHI_STORAGE_DEPS=ceph,ceph_recommended_lib
swift: GNOCCHI_STORAGE_DEPS=swift
file: GNOCCHI_STORAGE_DEPS=file
redis: GNOCCHI_STORAGE_DEPS=redis
s3: GNOCCHI_STORAGE_DEPS=s3
deps = .[test]
postgresql: .[postgresql,{env:GNOCCHI_STORAGE_DEPS}]