s3: push to multiple sacks

enable s3 to push to multiple sacks and handle them accordingly

Change-Id: If68b4ef0c172f824b86b120092feb936a1c2fda2
This commit is contained in:
gord chung 2017-04-05 16:10:23 +00:00
parent 85b41d699a
commit 25cc9d7a03
1 changed files with 17 additions and 10 deletions

View File

@ -29,6 +29,9 @@ botocore = s3.botocore
class S3Storage(_carbonara.CarbonaraBasedStorage):
# NOTE(gordc): override to follow s3 partitioning logic
SACK_PREFIX = '%s/'
def __init__(self, conf):
super(S3Storage, self).__init__(conf)
self.s3, self._region_name, self._bucket_prefix = (
@ -54,10 +57,9 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
now = datetime.datetime.utcnow().strftime("_%Y%m%d_%H:%M:%S")
self.s3.put_object(
Bucket=self._bucket_name_measures,
Key=(six.text_type(metric.id)
+ "/"
+ six.text_type(uuid.uuid4())
+ now),
Key=(self.get_sack_name(self.sack_for_metric(metric.id))
+ six.text_type(metric.id) + "/"
+ six.text_type(uuid.uuid4()) + now),
Body=data)
def _build_report(self, details):
@ -73,8 +75,9 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
response = self.s3.list_objects_v2(
Bucket=self._bucket_name_measures,
**kwargs)
# FIXME(gordc): this can be streamlined if not details
for c in response.get('Contents', ()):
metric, metric_file = c['Key'].split("/", 1)
__, metric, metric_file = c['Key'].split("/", 2)
metric_details[metric] += 1
return (len(metric_details), sum(metric_details.values()),
metric_details if details else None)
@ -93,14 +96,15 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
kwargs = {}
response = self.s3.list_objects_v2(
Bucket=self._bucket_name_measures,
Prefix=self.get_sack_name(sack),
Delimiter="/",
MaxKeys=limit,
**kwargs)
for p in response.get('CommonPrefixes', ()):
metrics.add(p['Prefix'].rstrip('/'))
metrics.add(p['Prefix'].split('/', 2)[1])
return metrics
def _list_measure_files_for_metric_id(self, metric_id):
def _list_measure_files_for_metric_id(self, sack, metric_id):
files = set()
response = {}
while response.get('IsTruncated', True):
@ -112,7 +116,8 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
kwargs = {}
response = self.s3.list_objects_v2(
Bucket=self._bucket_name_measures,
Prefix=six.text_type(metric_id) + "/",
Prefix=(self.get_sack_name(sack)
+ six.text_type(metric_id) + "/"),
**kwargs)
for c in response.get('Contents', ()):
@ -121,12 +126,14 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
return files
def delete_unprocessed_measures_for_metric_id(self, metric_id):
files = self._list_measure_files_for_metric_id(metric_id)
sack = self.sack_for_metric(metric_id)
files = self._list_measure_files_for_metric_id(sack, metric_id)
s3.bulk_delete(self.s3, self._bucket_name_measures, files)
@contextlib.contextmanager
def process_measure_for_metric(self, metric):
files = self._list_measure_files_for_metric_id(metric.id)
sack = self.sack_for_metric(metric.id)
files = self._list_measure_files_for_metric_id(sack, metric.id)
measures = []
for f in files: