swift: push to multiple sacks
enable swift to push to multiple sacks and handle them accordingly Change-Id: I0550961960805505d85919f30bfa1677dc390345
This commit is contained in:
parent
d5059fbef7
commit
85b41d699a
|
@ -30,61 +30,65 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage):
|
||||||
super(SwiftStorage, self).__init__(conf)
|
super(SwiftStorage, self).__init__(conf)
|
||||||
self.swift = swift.get_connection(conf)
|
self.swift = swift.get_connection(conf)
|
||||||
|
|
||||||
def upgrade(self, indexer):
|
def upgrade(self, index):
|
||||||
super(SwiftStorage, self).upgrade(indexer)
|
super(SwiftStorage, self).upgrade(index)
|
||||||
self.swift.put_container(self.MEASURE_PREFIX)
|
for i in six.moves.range(self.NUM_SACKS):
|
||||||
|
self.swift.put_container(self.get_sack_name(i))
|
||||||
|
|
||||||
def _store_new_measures(self, metric, data):
|
def _store_new_measures(self, metric, data):
|
||||||
now = datetime.datetime.utcnow().strftime("_%Y%m%d_%H:%M:%S")
|
now = datetime.datetime.utcnow().strftime("_%Y%m%d_%H:%M:%S")
|
||||||
self.swift.put_object(
|
self.swift.put_object(
|
||||||
self.MEASURE_PREFIX,
|
self.get_sack_name(self.sack_for_metric(metric.id)),
|
||||||
six.text_type(metric.id) + "/" + six.text_type(uuid.uuid4()) + now,
|
six.text_type(metric.id) + "/" + six.text_type(uuid.uuid4()) + now,
|
||||||
data)
|
data)
|
||||||
|
|
||||||
def _build_report(self, details):
|
def _build_report(self, details):
|
||||||
metric_details = defaultdict(int)
|
metric_details = defaultdict(int)
|
||||||
if details:
|
nb_metrics = 0
|
||||||
headers, files = self.swift.get_container(self.MEASURE_PREFIX,
|
measures = 0
|
||||||
full_listing=True)
|
for i in six.moves.range(self.NUM_SACKS):
|
||||||
for f in files:
|
if details:
|
||||||
metric, __ = f['name'].split("/", 1)
|
headers, files = self.swift.get_container(
|
||||||
metric_details[metric] += 1
|
self.get_sack_name(i), full_listing=True)
|
||||||
nb_metrics = len(metric_details)
|
for f in files:
|
||||||
else:
|
metric, __ = f['name'].split("/", 1)
|
||||||
headers, files = self.swift.get_container(self.MEASURE_PREFIX,
|
metric_details[metric] += 1
|
||||||
delimiter='/',
|
else:
|
||||||
full_listing=True)
|
headers, files = self.swift.get_container(
|
||||||
nb_metrics = len(files)
|
self.get_sack_name(i), delimiter='/', full_listing=True)
|
||||||
measures = int(headers.get('x-container-object-count'))
|
nb_metrics += len(files)
|
||||||
return nb_metrics, measures, metric_details if details else None
|
measures += int(headers.get('x-container-object-count'))
|
||||||
|
return (nb_metrics or len(metric_details), measures,
|
||||||
|
metric_details if details else None)
|
||||||
|
|
||||||
def list_metric_with_measures_to_process(self, sack):
|
def list_metric_with_measures_to_process(self, sack):
|
||||||
headers, files = self.swift.get_container(self.MEASURE_PREFIX,
|
headers, files = self.swift.get_container(
|
||||||
delimiter='/',
|
self.get_sack_name(sack), delimiter='/', full_listing=True)
|
||||||
full_listing=True)
|
|
||||||
return set(f['subdir'][:-1] for f in files if 'subdir' in f)
|
return set(f['subdir'][:-1] for f in files if 'subdir' in f)
|
||||||
|
|
||||||
def _list_measure_files_for_metric_id(self, metric_id):
|
def _list_measure_files_for_metric_id(self, sack, metric_id):
|
||||||
headers, files = self.swift.get_container(
|
headers, files = self.swift.get_container(
|
||||||
self.MEASURE_PREFIX, path=six.text_type(metric_id),
|
self.get_sack_name(sack), path=six.text_type(metric_id),
|
||||||
full_listing=True)
|
full_listing=True)
|
||||||
return files
|
return files
|
||||||
|
|
||||||
def delete_unprocessed_measures_for_metric_id(self, metric_id):
|
def delete_unprocessed_measures_for_metric_id(self, metric_id):
|
||||||
files = self._list_measure_files_for_metric_id(metric_id)
|
sack = self.sack_for_metric(metric_id)
|
||||||
swift.bulk_delete(self.swift, self.MEASURE_PREFIX, files)
|
files = self._list_measure_files_for_metric_id(sack, metric_id)
|
||||||
|
swift.bulk_delete(self.swift, self.get_sack_name(sack), files)
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def process_measure_for_metric(self, metric):
|
def process_measure_for_metric(self, metric):
|
||||||
files = self._list_measure_files_for_metric_id(metric.id)
|
sack = self.sack_for_metric(metric.id)
|
||||||
|
sack_name = self.get_sack_name(sack)
|
||||||
|
files = self._list_measure_files_for_metric_id(sack, metric.id)
|
||||||
|
|
||||||
measures = []
|
measures = []
|
||||||
for f in files:
|
for f in files:
|
||||||
headers, data = self.swift.get_object(
|
headers, data = self.swift.get_object(sack_name, f['name'])
|
||||||
self.MEASURE_PREFIX, f['name'])
|
|
||||||
measures.extend(self._unserialize_measures(f['name'], data))
|
measures.extend(self._unserialize_measures(f['name'], data))
|
||||||
|
|
||||||
yield measures
|
yield measures
|
||||||
|
|
||||||
# Now clean objects
|
# Now clean objects
|
||||||
swift.bulk_delete(self.swift, self.MEASURE_PREFIX, files)
|
swift.bulk_delete(self.swift, sack_name, files)
|
||||||
|
|
Loading…
Reference in New Issue