Merge "s3: use a single bucket for aggregate storage"
This commit is contained in:
commit
4c46b69067
|
@ -59,41 +59,45 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
|
|||
self.s3, self._region_name, self._bucket_prefix = (
|
||||
s3.get_connection(conf)
|
||||
)
|
||||
self._bucket_name = '%s-aggregates' % self._bucket_prefix
|
||||
|
||||
def _bucket_name(self, metric):
|
||||
return '%s-%s' % (self._bucket_prefix, str(metric.id))
|
||||
|
||||
@staticmethod
|
||||
def _object_name(split_key, aggregation, granularity, version=3):
|
||||
name = '%s_%s_%s' % (split_key, aggregation, granularity)
|
||||
return name + '_v%s' % version if version else name
|
||||
|
||||
def _create_metric(self, metric):
|
||||
def upgrade(self, index):
|
||||
super(S3Storage, self).upgrade(index)
|
||||
try:
|
||||
s3.create_bucket(self.s3, self._bucket_name(metric),
|
||||
self._region_name)
|
||||
s3.create_bucket(self.s3, self._bucket_name, self._region_name)
|
||||
except botocore.exceptions.ClientError as e:
|
||||
if e.response['Error'].get('Code') != "BucketAlreadyExists":
|
||||
raise
|
||||
# raise storage.MetricAlreadyExists(metric)
|
||||
|
||||
@staticmethod
|
||||
def _object_name(split_key, aggregation, granularity, version=3):
|
||||
name = '%s_%s_%s' % (aggregation, granularity, split_key)
|
||||
return name + '_v%s' % version if version else name
|
||||
|
||||
@staticmethod
|
||||
def _prefix(metric):
|
||||
return str(metric.id) + '/'
|
||||
|
||||
def _create_metric(self, metric):
|
||||
pass
|
||||
|
||||
def _store_metric_measures(self, metric, timestamp_key, aggregation,
|
||||
granularity, data, offset=0, version=3):
|
||||
self.s3.put_object(
|
||||
Bucket=self._bucket_name(metric),
|
||||
Key=self._object_name(
|
||||
Bucket=self._bucket_name,
|
||||
Key=self._prefix(metric) + self._object_name(
|
||||
timestamp_key, aggregation, granularity, version),
|
||||
Body=data)
|
||||
|
||||
def _delete_metric_measures(self, metric, timestamp_key, aggregation,
|
||||
granularity, version=3):
|
||||
self.s3.delete_object(
|
||||
Bucket=self._bucket_name(metric),
|
||||
Key=self._object_name(
|
||||
Bucket=self._bucket_name,
|
||||
Key=self._prefix(metric) + self._object_name(
|
||||
timestamp_key, aggregation, granularity, version))
|
||||
|
||||
def _delete_metric(self, metric):
|
||||
bucket = self._bucket_name(metric)
|
||||
bucket = self._bucket_name
|
||||
response = {}
|
||||
while response.get('IsTruncated', True):
|
||||
if 'NextContinuationToken' in response:
|
||||
|
@ -104,39 +108,38 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
|
|||
kwargs = {}
|
||||
try:
|
||||
response = self.s3.list_objects_v2(
|
||||
Bucket=bucket, **kwargs)
|
||||
Bucket=bucket, Prefix=self._prefix(metric), **kwargs)
|
||||
except botocore.exceptions.ClientError as e:
|
||||
if e.response['Error'].get('Code') == "NoSuchBucket":
|
||||
if e.response['Error'].get('Code') == "NoSuchKey":
|
||||
# Maybe it never has been created (no measure)
|
||||
return
|
||||
raise
|
||||
s3.bulk_delete(self.s3, bucket,
|
||||
[c['Key'] for c in response.get('Contents', ())])
|
||||
try:
|
||||
self.s3.delete_bucket(Bucket=bucket)
|
||||
except botocore.exceptions.ClientError as e:
|
||||
if e.response['Error'].get('Code') != "NoSuchBucket":
|
||||
raise
|
||||
|
||||
def _get_measures(self, metric, timestamp_key, aggregation, granularity,
|
||||
version=3):
|
||||
try:
|
||||
response = self.s3.get_object(
|
||||
Bucket=self._bucket_name(metric),
|
||||
Key=self._object_name(
|
||||
Bucket=self._bucket_name,
|
||||
Key=self._prefix(metric) + self._object_name(
|
||||
timestamp_key, aggregation, granularity, version))
|
||||
except botocore.exceptions.ClientError as e:
|
||||
code = e.response['Error'].get('Code')
|
||||
if code == "NoSuchBucket":
|
||||
raise storage.MetricDoesNotExist(metric)
|
||||
elif code == "NoSuchKey":
|
||||
if e.response['Error'].get('Code') == 'NoSuchKey':
|
||||
try:
|
||||
response = self.s3.list_objects_v2(
|
||||
Bucket=self._bucket_name, Prefix=self._prefix(metric))
|
||||
except botocore.exceptions.ClientError as e:
|
||||
if e.response['Error'].get('Code') == 'NoSuchKey':
|
||||
raise storage.MetricDoesNotExist(metric)
|
||||
raise
|
||||
raise storage.AggregationDoesNotExist(metric, aggregation)
|
||||
raise
|
||||
return response['Body'].read()
|
||||
|
||||
def _list_split_keys_for_metric(self, metric, aggregation, granularity,
|
||||
version=3):
|
||||
bucket = self._bucket_name(metric)
|
||||
bucket = self._bucket_name
|
||||
keys = set()
|
||||
response = {}
|
||||
while response.get('IsTruncated', True):
|
||||
|
@ -149,41 +152,41 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
|
|||
try:
|
||||
response = self.s3.list_objects_v2(
|
||||
Bucket=bucket,
|
||||
Prefix=self._prefix(metric) + '%s_%s' % (aggregation,
|
||||
granularity),
|
||||
**kwargs)
|
||||
except botocore.exceptions.ClientError as e:
|
||||
if e.response['Error'].get('Code') == "NoSuchBucket":
|
||||
if e.response['Error'].get('Code') == "NoSuchKey":
|
||||
raise storage.MetricDoesNotExist(metric)
|
||||
raise
|
||||
for f in response.get('Contents', ()):
|
||||
try:
|
||||
meta = f['Key'].split('_')
|
||||
if (aggregation == meta[1]
|
||||
and granularity == float(meta[2])
|
||||
and self._version_check(f['Key'], version)):
|
||||
keys.add(meta[0])
|
||||
if (self._version_check(f['Key'], version)):
|
||||
keys.add(meta[2])
|
||||
except (ValueError, IndexError):
|
||||
# Might be "none", or any other file. Be resilient.
|
||||
continue
|
||||
return keys
|
||||
|
||||
@staticmethod
|
||||
def _build_unaggregated_timeserie_path(version):
|
||||
return 'none' + ("_v%s" % version if version else "")
|
||||
def _build_unaggregated_timeserie_path(metric, version):
|
||||
return S3Storage._prefix(metric) + 'none' + ("_v%s" % version
|
||||
if version else "")
|
||||
|
||||
def _get_unaggregated_timeserie(self, metric, version=3):
|
||||
try:
|
||||
response = self.s3.get_object(
|
||||
Bucket=self._bucket_name(metric),
|
||||
Key=self._build_unaggregated_timeserie_path(version))
|
||||
Bucket=self._bucket_name,
|
||||
Key=self._build_unaggregated_timeserie_path(metric, version))
|
||||
except botocore.exceptions.ClientError as e:
|
||||
if e.response['Error'].get('Code') in ("NoSuchBucket",
|
||||
"NoSuchKey"):
|
||||
if e.response['Error'].get('Code') == "NoSuchKey":
|
||||
raise storage.MetricDoesNotExist(metric)
|
||||
raise
|
||||
return response['Body'].read()
|
||||
|
||||
def _store_unaggregated_timeserie(self, metric, data, version=3):
|
||||
self.s3.put_object(
|
||||
Bucket=self._bucket_name(metric),
|
||||
Key=self._build_unaggregated_timeserie_path(version),
|
||||
Bucket=self._bucket_name,
|
||||
Key=self._build_unaggregated_timeserie_path(metric, version),
|
||||
Body=data)
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
---
|
||||
fixes:
|
||||
- |
|
||||
Previously, s3 storage driver stored aggregates in a bucket per metric.
|
||||
This would quickly run into bucket limit set by s3. s3 storage driver is
|
||||
fixed so it stores all aggregates for all metrics in a single bucket.
|
||||
Buckets previously created by Gnocchi will need to be deleted as they will
|
||||
no longer be handled.
|
Loading…
Reference in New Issue