diff --git a/cloudkitty/orchestrator.py b/cloudkitty/orchestrator.py index e3eefdff..509ab454 100644 --- a/cloudkitty/orchestrator.py +++ b/cloudkitty/orchestrator.py @@ -55,6 +55,9 @@ CONF.register_opts(orchestrator_opts, group='orchestrator') FETCHERS_NAMESPACE = 'cloudkitty.tenant.fetchers' PROCESSORS_NAMESPACE = 'cloudkitty.rating.processors' +PERIOD = CONF.collect.period +WAIT_TIME = CONF.collect.wait_periods * CONF.collect.period + class RatingEndpoint(object): target = oslo_messaging.Target(namespace='rating', @@ -154,13 +157,10 @@ class Worker(BaseWorker): self._collector = collector self._storage = storage - self._period = CONF.collect.period - self._wait_time = CONF.collect.wait_periods * self._period - super(Worker, self).__init__(tenant_id) def _collect(self, service, start_timestamp): - next_timestamp = start_timestamp + self._period + next_timestamp = start_timestamp + PERIOD raw_data = self._collector.retrieve(service, start_timestamp, next_timestamp, @@ -172,15 +172,7 @@ class Worker(BaseWorker): def check_state(self): timestamp = self._storage.get_state(self._tenant_id) - if not timestamp: - month_start = ck_utils.get_month_start() - return ck_utils.dt2ts(month_start) - - now = ck_utils.utcnow_ts() - next_timestamp = timestamp + self._period - if next_timestamp + self._wait_time < now: - return next_timestamp - return 0 + return ck_utils.check_time_state(timestamp, PERIOD, WAIT_TIME) def run(self): while True: @@ -202,7 +194,7 @@ class Worker(BaseWorker): raise collector.NoDataCollected('', service) except collector.NoDataCollected: begin = timestamp - end = begin + self._period + end = begin + PERIOD for processor in self._processors: processor.obj.nodata(begin, end) self._storage.nodata(begin, end, self._tenant_id) @@ -260,16 +252,7 @@ class Orchestrator(object): def _check_state(self, tenant_id): timestamp = self.storage.get_state(tenant_id) - if not timestamp: - month_start = ck_utils.get_month_start() - return ck_utils.dt2ts(month_start) - - now = ck_utils.utcnow_ts() - next_timestamp = timestamp + CONF.collect.period - wait_time = CONF.collect.wait_periods * CONF.collect.period - if next_timestamp + wait_time < now: - return next_timestamp - return 0 + return ck_utils.check_time_state(timestamp, PERIOD, WAIT_TIME) def process_messages(self): # TODO(sheeprine): Code kept to handle threading and asynchronous @@ -299,7 +282,7 @@ class Orchestrator(object): # being processed eventlet.sleep(1) # FIXME(sheeprine): We may cause a drift here - eventlet.sleep(CONF.collect.period) + eventlet.sleep(PERIOD) def terminate(self): self.coord.stop() diff --git a/cloudkitty/utils.py b/cloudkitty/utils.py index 55483527..c48715d0 100644 --- a/cloudkitty/utils.py +++ b/cloudkitty/utils.py @@ -179,3 +179,15 @@ def refresh_stevedore(namespace=None): del cache[namespace] else: cache.clear() + + +def check_time_state(timestamp=None, period=0, wait_time=0): + if not timestamp: + month_start = get_month_start() + return dt2ts(month_start) + + now = utcnow_ts() + next_timestamp = timestamp + period + if next_timestamp + wait_time < now: + return next_timestamp + return 0