Merge "s3: use a single bucket for aggregate storage"

This commit is contained in:
Jenkins 2017-04-21 08:43:05 +00:00 committed by Gerrit Code Review
commit 4c46b69067
2 changed files with 55 additions and 44 deletions

View File

@ -59,41 +59,45 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
self.s3, self._region_name, self._bucket_prefix = (
s3.get_connection(conf)
)
self._bucket_name = '%s-aggregates' % self._bucket_prefix
def _bucket_name(self, metric):
return '%s-%s' % (self._bucket_prefix, str(metric.id))
@staticmethod
def _object_name(split_key, aggregation, granularity, version=3):
name = '%s_%s_%s' % (split_key, aggregation, granularity)
return name + '_v%s' % version if version else name
def _create_metric(self, metric):
def upgrade(self, index):
super(S3Storage, self).upgrade(index)
try:
s3.create_bucket(self.s3, self._bucket_name(metric),
self._region_name)
s3.create_bucket(self.s3, self._bucket_name, self._region_name)
except botocore.exceptions.ClientError as e:
if e.response['Error'].get('Code') != "BucketAlreadyExists":
raise
# raise storage.MetricAlreadyExists(metric)
@staticmethod
def _object_name(split_key, aggregation, granularity, version=3):
name = '%s_%s_%s' % (aggregation, granularity, split_key)
return name + '_v%s' % version if version else name
@staticmethod
def _prefix(metric):
return str(metric.id) + '/'
def _create_metric(self, metric):
pass
def _store_metric_measures(self, metric, timestamp_key, aggregation,
granularity, data, offset=0, version=3):
self.s3.put_object(
Bucket=self._bucket_name(metric),
Key=self._object_name(
Bucket=self._bucket_name,
Key=self._prefix(metric) + self._object_name(
timestamp_key, aggregation, granularity, version),
Body=data)
def _delete_metric_measures(self, metric, timestamp_key, aggregation,
granularity, version=3):
self.s3.delete_object(
Bucket=self._bucket_name(metric),
Key=self._object_name(
Bucket=self._bucket_name,
Key=self._prefix(metric) + self._object_name(
timestamp_key, aggregation, granularity, version))
def _delete_metric(self, metric):
bucket = self._bucket_name(metric)
bucket = self._bucket_name
response = {}
while response.get('IsTruncated', True):
if 'NextContinuationToken' in response:
@ -104,39 +108,38 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
kwargs = {}
try:
response = self.s3.list_objects_v2(
Bucket=bucket, **kwargs)
Bucket=bucket, Prefix=self._prefix(metric), **kwargs)
except botocore.exceptions.ClientError as e:
if e.response['Error'].get('Code') == "NoSuchBucket":
if e.response['Error'].get('Code') == "NoSuchKey":
# Maybe it never has been created (no measure)
return
raise
s3.bulk_delete(self.s3, bucket,
[c['Key'] for c in response.get('Contents', ())])
try:
self.s3.delete_bucket(Bucket=bucket)
except botocore.exceptions.ClientError as e:
if e.response['Error'].get('Code') != "NoSuchBucket":
raise
def _get_measures(self, metric, timestamp_key, aggregation, granularity,
version=3):
try:
response = self.s3.get_object(
Bucket=self._bucket_name(metric),
Key=self._object_name(
Bucket=self._bucket_name,
Key=self._prefix(metric) + self._object_name(
timestamp_key, aggregation, granularity, version))
except botocore.exceptions.ClientError as e:
code = e.response['Error'].get('Code')
if code == "NoSuchBucket":
raise storage.MetricDoesNotExist(metric)
elif code == "NoSuchKey":
if e.response['Error'].get('Code') == 'NoSuchKey':
try:
response = self.s3.list_objects_v2(
Bucket=self._bucket_name, Prefix=self._prefix(metric))
except botocore.exceptions.ClientError as e:
if e.response['Error'].get('Code') == 'NoSuchKey':
raise storage.MetricDoesNotExist(metric)
raise
raise storage.AggregationDoesNotExist(metric, aggregation)
raise
return response['Body'].read()
def _list_split_keys_for_metric(self, metric, aggregation, granularity,
version=3):
bucket = self._bucket_name(metric)
bucket = self._bucket_name
keys = set()
response = {}
while response.get('IsTruncated', True):
@ -149,41 +152,41 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
try:
response = self.s3.list_objects_v2(
Bucket=bucket,
Prefix=self._prefix(metric) + '%s_%s' % (aggregation,
granularity),
**kwargs)
except botocore.exceptions.ClientError as e:
if e.response['Error'].get('Code') == "NoSuchBucket":
if e.response['Error'].get('Code') == "NoSuchKey":
raise storage.MetricDoesNotExist(metric)
raise
for f in response.get('Contents', ()):
try:
meta = f['Key'].split('_')
if (aggregation == meta[1]
and granularity == float(meta[2])
and self._version_check(f['Key'], version)):
keys.add(meta[0])
if (self._version_check(f['Key'], version)):
keys.add(meta[2])
except (ValueError, IndexError):
# Might be "none", or any other file. Be resilient.
continue
return keys
@staticmethod
def _build_unaggregated_timeserie_path(version):
return 'none' + ("_v%s" % version if version else "")
def _build_unaggregated_timeserie_path(metric, version):
return S3Storage._prefix(metric) + 'none' + ("_v%s" % version
if version else "")
def _get_unaggregated_timeserie(self, metric, version=3):
try:
response = self.s3.get_object(
Bucket=self._bucket_name(metric),
Key=self._build_unaggregated_timeserie_path(version))
Bucket=self._bucket_name,
Key=self._build_unaggregated_timeserie_path(metric, version))
except botocore.exceptions.ClientError as e:
if e.response['Error'].get('Code') in ("NoSuchBucket",
"NoSuchKey"):
if e.response['Error'].get('Code') == "NoSuchKey":
raise storage.MetricDoesNotExist(metric)
raise
return response['Body'].read()
def _store_unaggregated_timeserie(self, metric, data, version=3):
self.s3.put_object(
Bucket=self._bucket_name(metric),
Key=self._build_unaggregated_timeserie_path(version),
Bucket=self._bucket_name,
Key=self._build_unaggregated_timeserie_path(metric, version),
Body=data)

View File

@ -0,0 +1,8 @@
---
fixes:
- |
Previously, s3 storage driver stored aggregates in a bucket per metric.
This would quickly run into bucket limit set by s3. s3 storage driver is
fixed so it stores all aggregates for all metrics in a single bucket.
Buckets previously created by Gnocchi will need to be deleted as they will
no longer be handled.