summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2017-02-09 12:37:57 +0000
committerGerrit Code Review <review@openstack.org>2017-02-09 12:37:57 +0000
commite805c9fe585d8c1f26892f406774768064e53d48 (patch)
tree442dde74398a1c45151137b80a07bc20eef0be9a
parent6067e8ba6d319460b880f610badd2f223a52989e (diff)
parent4fa3fe8cbedc17d9482df718300c1ac7fc4c5a58 (diff)
Merge "Poll OpenStack resources in background"
-rw-r--r--deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py45
-rw-r--r--deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py101
2 files changed, 112 insertions, 34 deletions
diff --git a/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py b/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py
index c59149f..7923274 100644
--- a/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py
+++ b/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_base.py
@@ -17,6 +17,7 @@ from functools import wraps
17import json 17import json
18import signal 18import signal
19import subprocess 19import subprocess
20import threading
20import time 21import time
21import traceback 22import traceback
22 23
@@ -271,3 +272,47 @@ class CephBase(Base):
271 if node.key == "Cluster": 272 if node.key == "Cluster":
272 self.cluster = node.values[0] 273 self.cluster = node.values[0]
273 self.plugin_instance = self.cluster 274 self.plugin_instance = self.cluster
275
276
277class AsyncPoller(threading.Thread):
278 """Execute an independant thread to execute a function periodically
279
280 Args:
281 collectd: used for logging
282 polling_function: a function to execute periodically
283 interval: the interval in second
284 name: (optional) the name of the thread
285 """
286
287 def __init__(self, collectd, polling_function, interval, name=None):
288 super(AsyncPoller, self).__init__(name=name)
289 self.collectd = collectd
290 self.polling_function = polling_function
291 self.interval = interval
292 self._results = None
293
294 def run(self):
295 self.collectd.info('Starting thread {}'.format(self.name))
296 while True:
297 try:
298 started_at = time.time()
299
300 self._results = self.polling_function()
301
302 tosleep = self.interval - (time.time() - started_at)
303 if tosleep > 0:
304 time.sleep(tosleep)
305 else:
306 self.collectd.warning(
307 'Polling took more than {}s for {}'.format(
308 self.interval, self.name
309 )
310 )
311
312 except Exception as e:
313 self._results = None
314 self.collectd.error('{} fails: {}'.format(self.name, e))
315 time.sleep(10)
316
317 def get_results(self):
318 return self._results
diff --git a/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py b/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py
index a4ec4de..a8fa999 100644
--- a/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py
+++ b/deployment_scripts/puppet/modules/lma_collector/files/collectd/collectd_openstack.py
@@ -163,7 +163,9 @@ class CollectdPlugin(base.Base):
163 self.max_retries = 2 163 self.max_retries = 2
164 self.os_client = None 164 self.os_client = None
165 self.extra_config = {} 165 self.extra_config = {}
166 self._threads = {}
166 self.pagination_limit = None 167 self.pagination_limit = None
168 self.polling_interval = 60
167 169
168 def _build_url(self, service, resource): 170 def _build_url(self, service, resource):
169 s = (self.get_service(service) or {}) 171 s = (self.get_service(service) or {})
@@ -278,6 +280,9 @@ class CollectdPlugin(base.Base):
278 keystone_url = node.values[0] 280 keystone_url = node.values[0]
279 elif node.key == 'PaginationLimit': 281 elif node.key == 'PaginationLimit':
280 self.pagination_limit = int(node.values[0]) 282 self.pagination_limit = int(node.values[0])
283 elif node.key == 'PollingInterval':
284 self.polling_interval = int(node.values[0])
285
281 self.os_client = OSClient(username, password, tenant_name, 286 self.os_client = OSClient(username, password, tenant_name,
282 keystone_url, self.timeout, self.logger, 287 keystone_url, self.timeout, self.logger,
283 self.max_retries) 288 self.max_retries)
@@ -302,46 +307,74 @@ class CollectdPlugin(base.Base):
302 if detail: 307 if detail:
303 resource = '{}/detail'.format(resource) 308 resource = '{}/detail'.format(resource)
304 309
305 url = self._build_url(project, resource)
306 if not url:
307 return
308
309 opts = {} 310 opts = {}
310 if self.pagination_limit: 311 if self.pagination_limit:
311 opts['limit'] = self.pagination_limit 312 opts['limit'] = self.pagination_limit
312 313
313 opts.update(params) 314 opts.update(params)
314 objs = []
315
316 while True:
317 r = self.os_client.make_request('get', url, params=opts)
318 if not r or object_name not in r.json():
319 self.logger.warning('Could not find %s %s' % (project,
320 object_name))
321 return objs
322
323 resp = r.json()
324 bulk_objs = resp.get(object_name)
325 315
326 if not bulk_objs: 316 def openstack_api_poller():
327 break 317 _objects = []
328 318 _opts = {}
329 objs.extend(bulk_objs) 319 _opts.update(opts)
330 320 while True:
331 links = resp.get('{}_links'.format(object_name)) 321 r = self.get(project, resource, params=_opts)
332 if links is None or self.pagination_limit is None: 322 if not r or object_name not in r.json():
333 # Either the pagination is not supported or there is no more 323 if r is None:
334 # data 324 err = ''
335 break 325 else:
336 326 err = r.text
337 # if there is no 'next' link in the response, all data has been 327 self.collectd.warning('Could not find {}: {} {}'.format(
338 # read. 328 project, object_name, err
339 if len([i for i in links if i.get('rel') == 'next']) == 0: 329 ))
340 break 330 # Avoid to provide incomplete data by reseting current
341 331 # set.
342 opts['marker'] = bulk_objs[-1]['id'] 332 _objects = []
343 333 break
344 return objs 334
335 resp = r.json()
336 bulk_objs = resp.get(object_name)
337 if not bulk_objs:
338 # emtpy list
339 break
340
341 _objects.extend(bulk_objs)
342
343 links = resp.get('{}_links'.format(object_name))
344 if links is None or self.pagination_limit is None:
345 # Either the pagination is not supported or there is
346 # no more data
347 # In both cases, we got at this stage all the data we
348 # can have.
349 break
350
351 # if there is no 'next' link in the response, all data has
352 # been read.
353 if len([i for i in links if i.get('rel') == 'next']) == 0:
354 break
355
356 _opts['marker'] = bulk_objs[-1]['id']
357
358 return _objects
359
360 poller_id = '{}:{}'.format(project, resource)
361 if poller_id not in self._threads:
362 t = base.AsyncPoller(self.collectd,
363 openstack_api_poller,
364 self.polling_interval,
365 poller_id)
366 t.start()
367 self._threads[poller_id] = t
368
369 t = self._threads[poller_id]
370 if not t.is_alive():
371 self.logger.warning("Unexpected end of the thread {}".format(
372 t.name))
373 del self._threads[poller_id]
374 return []
375
376 results = t.get_results()
377 return [] if results is None else results
345 378
346 def count_objects_group_by(self, 379 def count_objects_group_by(self,
347 list_object, 380 list_object,