From 2d819d12b7abb40595ec2eba9b52ddc263bf1b7c Mon Sep 17 00:00:00 2001 From: Swann Croiset Date: Thu, 16 Feb 2017 14:54:27 +0100 Subject: [PATCH] Use changes-since parameter when polling Nova server details Change-Id: Ia629286ab2c08fc3cb16a489142be79568590c34 --- .../files/collectd/collectd_base.py | 29 ++++++++++++---- .../files/collectd/collectd_openstack.py | 24 +++++++++++--- .../files/collectd/openstack_nova.py | 33 +++++++++++++------ 3 files changed, 64 insertions(+), 22 deletions(-) diff --git a/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py b/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py index 792327424..31adabbca 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py +++ b/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py @@ -282,14 +282,19 @@ class AsyncPoller(threading.Thread): polling_function: a function to execute periodically interval: the interval in second name: (optional) the name of the thread + reset_on_read: (default False) if True, all results returned by the + polling_function() are accumulated until they are + read. """ - def __init__(self, collectd, polling_function, interval, name=None): + def __init__(self, collectd, polling_function, interval, name=None, + reset_on_read=False): super(AsyncPoller, self).__init__(name=name) self.collectd = collectd self.polling_function = polling_function self.interval = interval - self._results = None + self._results = [] + self._reset_on_read = reset_on_read def run(self): self.collectd.info('Starting thread {}'.format(self.name)) @@ -297,8 +302,7 @@ class AsyncPoller(threading.Thread): try: started_at = time.time() - self._results = self.polling_function() - + self.results = self.polling_function() tosleep = self.interval - (time.time() - started_at) if tosleep > 0: time.sleep(tosleep) @@ -310,9 +314,20 @@ class AsyncPoller(threading.Thread): ) except Exception as e: - self._results = None + self.results = [] self.collectd.error('{} fails: {}'.format(self.name, e)) time.sleep(10) - def get_results(self): - return self._results + @property + def results(self): + r = self._results + if self._reset_on_read: + self._results = [] + return r + + @results.setter + def results(self, value): + if self._reset_on_read: + self._results.extend(value) + else: + self._results = value diff --git a/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py b/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py index 1aeeb3664..2da86703f 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py +++ b/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py @@ -166,6 +166,8 @@ class CollectdPlugin(base.Base): self._threads = {} self.pagination_limit = None self.polling_interval = 60 + self._last_run = None + self.changes_since = False def _build_url(self, service, resource): s = (self.get_service(service) or {}) @@ -251,7 +253,7 @@ class CollectdPlugin(base.Base): url = self._build_url(service, resource) if not url: return - self.logger.info("GET '%s'" % url) + self.logger.info('GET({}) {}'.format(url, params)) return self.os_client.make_request('get', url, params=params) @property @@ -287,7 +289,7 @@ class CollectdPlugin(base.Base): self.max_retries) def get_objects(self, project, object_name, api_version='', - params=None, detail=False): + params=None, detail=False, since=False): """ Return a list of OpenStack objects The API version is not always included in the URL endpoint @@ -295,6 +297,7 @@ class CollectdPlugin(base.Base): api_version parameter to specify which version should be used. """ + self.changes_since = since if params is None: params = {} @@ -316,9 +319,18 @@ class CollectdPlugin(base.Base): _objects = [] _opts = {} _opts.update(opts) + + if self.changes_since and self._last_run: + _opts['changes-since'] = self._last_run.isoformat() + + # Keep track of the initial request time + last_run = datetime.datetime.now(tz=dateutil.tz.tzutc()) + has_failure = False + while True: r = self.get(project, resource, params=_opts) if not r or object_name not in r.json(): + has_failure = True if r is None: err = '' else: @@ -354,6 +366,9 @@ class CollectdPlugin(base.Base): _opts['marker'] = bulk_objs[-1]['id'] + if not has_failure: + self._last_run = last_run + return _objects poller_id = '{}:{}'.format(project, resource) @@ -361,7 +376,7 @@ class CollectdPlugin(base.Base): t = base.AsyncPoller(self.collectd, openstack_api_poller, self.polling_interval, - poller_id) + poller_id, self.changes_since) t.start() self._threads[poller_id] = t @@ -372,8 +387,7 @@ class CollectdPlugin(base.Base): del self._threads[poller_id] return [] - results = t.get_results() - return [] if results is None else results + return t.results def count_objects_group_by(self, list_object, diff --git a/deployment_scripts/puppet/modules/lma_collector/files/collectd/openstack_nova.py b/deployment_scripts/puppet/modules/lma_collector/files/collectd/openstack_nova.py index 8cdd57d08..48bbf0a60 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/collectd/openstack_nova.py +++ b/deployment_scripts/puppet/modules/lma_collector/files/collectd/openstack_nova.py @@ -17,6 +17,8 @@ import collectd import collectd_openstack as openstack +from itertools import groupby + PLUGIN_NAME = 'nova' INTERVAL = openstack.INTERVAL @@ -32,22 +34,33 @@ class NovaInstanceStatsPlugin(openstack.CollectdPlugin): self.plugin = PLUGIN_NAME self.interval = INTERVAL self.pagination_limit = 500 + self._cache = {} def itermetrics(self): - servers_details = self.get_objects('nova', 'servers', - params={'all_tenants': 1}, - detail=True) + server_details = self.get_objects('nova', 'servers', + params={'all_tenants': 1}, + detail=True, since=True) - def groupby(d): - return d.get('status', 'unknown').lower() + for server in server_details: + _id = server.get('id') + status = server.get('status', 'unknown').lower() + if status == 'deleted': + try: + self.logger.debug( + 'remove deleted instance {} from cache'.format(_id)) + del self._cache[_id] + except KeyError: + self.logger.warning( + 'cannot find instance in cache {}'.format(_id)) + else: + self._cache[_id] = status - status = self.count_objects_group_by(servers_details, - group_by_func=groupby) - for s, nb in status.iteritems(): + servers = sorted(self._cache.values()) + for status, g in groupby(servers): yield { 'plugin_instance': 'instances', - 'values': nb, - 'type_instance': s, + 'values': len(list(g)), + 'type_instance': status, }