From 4fa3fe8cbedc17d9482df718300c1ac7fc4c5a58 Mon Sep 17 00:00:00 2001 From: Swann Croiset Date: Wed, 1 Feb 2017 13:53:23 +0100 Subject: [PATCH] Poll OpenStack resources in background The collectd plugin spawns threads responsible for polling APIs. Change-Id: I6d32862835fc86ac81c6e15fcde83cc113627a88 --- .../files/collectd/collectd_base.py | 45 ++++++++++ .../files/collectd/collectd_openstack.py | 87 +++++++++++++------ 2 files changed, 105 insertions(+), 27 deletions(-) diff --git a/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py b/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py index c59149ff8..792327424 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py +++ b/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py @@ -17,6 +17,7 @@ from functools import wraps import json import signal import subprocess +import threading import time import traceback @@ -271,3 +272,47 @@ class CephBase(Base): if node.key == "Cluster": self.cluster = node.values[0] self.plugin_instance = self.cluster + + +class AsyncPoller(threading.Thread): + """Execute an independant thread to execute a function periodically + + Args: + collectd: used for logging + polling_function: a function to execute periodically + interval: the interval in second + name: (optional) the name of the thread + """ + + def __init__(self, collectd, polling_function, interval, name=None): + super(AsyncPoller, self).__init__(name=name) + self.collectd = collectd + self.polling_function = polling_function + self.interval = interval + self._results = None + + def run(self): + self.collectd.info('Starting thread {}'.format(self.name)) + while True: + try: + started_at = time.time() + + self._results = self.polling_function() + + tosleep = self.interval - (time.time() - started_at) + if tosleep > 0: + time.sleep(tosleep) + else: + self.collectd.warning( + 'Polling took more than {}s for {}'.format( + self.interval, self.name + ) + ) + + except Exception as e: + self._results = None + self.collectd.error('{} fails: {}'.format(self.name, e)) + time.sleep(10) + + def get_results(self): + return self._results diff --git a/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py b/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py index a4ec4decb..a8fa999b7 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py +++ b/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py @@ -163,7 +163,9 @@ class CollectdPlugin(base.Base): self.max_retries = 2 self.os_client = None self.extra_config = {} + self._threads = {} self.pagination_limit = None + self.polling_interval = 60 def _build_url(self, service, resource): s = (self.get_service(service) or {}) @@ -278,6 +280,9 @@ class CollectdPlugin(base.Base): keystone_url = node.values[0] elif node.key == 'PaginationLimit': self.pagination_limit = int(node.values[0]) + elif node.key == 'PollingInterval': + self.polling_interval = int(node.values[0]) + self.os_client = OSClient(username, password, tenant_name, keystone_url, self.timeout, self.logger, self.max_retries) @@ -302,46 +307,74 @@ class CollectdPlugin(base.Base): if detail: resource = '{}/detail'.format(resource) - url = self._build_url(project, resource) - if not url: - return - opts = {} if self.pagination_limit: opts['limit'] = self.pagination_limit opts.update(params) - objs = [] - while True: - r = self.os_client.make_request('get', url, params=opts) - if not r or object_name not in r.json(): - self.logger.warning('Could not find %s %s' % (project, - object_name)) - return objs + def openstack_api_poller(): + _objects = [] + _opts = {} + _opts.update(opts) + while True: + r = self.get(project, resource, params=_opts) + if not r or object_name not in r.json(): + if r is None: + err = '' + else: + err = r.text + self.collectd.warning('Could not find {}: {} {}'.format( + project, object_name, err + )) + # Avoid to provide incomplete data by reseting current + # set. + _objects = [] + break - resp = r.json() - bulk_objs = resp.get(object_name) + resp = r.json() + bulk_objs = resp.get(object_name) + if not bulk_objs: + # emtpy list + break - if not bulk_objs: - break + _objects.extend(bulk_objs) - objs.extend(bulk_objs) + links = resp.get('{}_links'.format(object_name)) + if links is None or self.pagination_limit is None: + # Either the pagination is not supported or there is + # no more data + # In both cases, we got at this stage all the data we + # can have. + break - links = resp.get('{}_links'.format(object_name)) - if links is None or self.pagination_limit is None: - # Either the pagination is not supported or there is no more - # data - break + # if there is no 'next' link in the response, all data has + # been read. + if len([i for i in links if i.get('rel') == 'next']) == 0: + break - # if there is no 'next' link in the response, all data has been - # read. - if len([i for i in links if i.get('rel') == 'next']) == 0: - break + _opts['marker'] = bulk_objs[-1]['id'] - opts['marker'] = bulk_objs[-1]['id'] + return _objects - return objs + poller_id = '{}:{}'.format(project, resource) + if poller_id not in self._threads: + t = base.AsyncPoller(self.collectd, + openstack_api_poller, + self.polling_interval, + poller_id) + t.start() + self._threads[poller_id] = t + + t = self._threads[poller_id] + if not t.is_alive(): + self.logger.warning("Unexpected end of the thread {}".format( + t.name)) + del self._threads[poller_id] + return [] + + results = t.get_results() + return [] if results is None else results def count_objects_group_by(self, list_object,