Use pipelines for stats keys

Pipelines buffer stats and then send them out in more reasonable sized
chunks, helping to avoid small UDP packets going missing in a flood of
stats.  Use this in stats.py.

This needs a slight change to the assertedStats handler to extract the
combined stats.  This function is ported from Zuul where we updated to
handle pipeline stats (Id4f6f5a6cd66581a81299ed5c67a5c49c95c9b52) so
it is not really new code.

Change-Id: I3f68450c7164d1cf0f1f57f9a31e5dca2f72bc43
This commit is contained in:
Ian Wienand 2018-07-25 11:13:47 +10:00
parent 67824d8e64
commit cd9aa75640
3 changed files with 65 additions and 16 deletions

View File

@ -79,9 +79,11 @@ class StatsReporter(object):
keys.append('nodepool.launch.requestor.%s.%s' %
(requestor, subkey))
pipeline = self._statsd.pipeline()
for key in keys:
self._statsd.timing(key, dt)
self._statsd.incr(key)
pipeline.timing(key, dt)
pipeline.incr(key)
pipeline.send()
def updateNodeStats(self, zk_conn, provider):
'''
@ -123,11 +125,13 @@ class StatsReporter(object):
else:
states[key] = 1
pipeline = self._statsd.pipeline()
for key, count in states.items():
self._statsd.gauge(key, count)
pipeline.gauge(key, count)
# nodepool.provider.PROVIDER.max_servers
key = 'nodepool.provider.%s.max_servers' % provider.name
max_servers = sum([p.max_servers for p in provider.pools.values()
if p.max_servers])
self._statsd.gauge(key, max_servers)
pipeline.gauge(key, max_servers)
pipeline.send()

View File

@ -16,6 +16,7 @@
"""Common utilities used in testing"""
import glob
import itertools
import logging
import os
import random
@ -235,19 +236,63 @@ class BaseTestCase(testtools.TestCase):
time.sleep(0.1)
def assertReportedStat(self, key, value=None, kind=None):
"""Check statsd output
Check statsd return values. A ``value`` should specify a
``kind``, however a ``kind`` may be specified without a
``value`` for a generic match. Leave both empy to just check
for key presence.
:arg str key: The statsd key
:arg str value: The expected value of the metric ``key``
:arg str kind: The expected type of the metric ``key`` For example
- ``c`` counter
- ``g`` gauge
- ``ms`` timing
- ``s`` set
"""
if value:
self.assertNotEqual(kind, None)
start = time.time()
while time.time() < (start + 5):
for stat in self.statsd.stats:
k, v = stat.decode('utf8').split(':')
# Note our fake statsd just queues up results in a queue.
# We just keep going through them until we find one that
# matches, or fail out. If statsd pipelines are used,
# large single packets are sent with stats separated by
# newlines; thus we first flatten the stats out into
# single entries.
stats = itertools.chain.from_iterable(
[s.decode('utf-8').split('\n') for s in self.statsd.stats])
for stat in stats:
k, v = stat.split(':')
if key == k:
if value is None and kind is None:
return
elif value:
if value == v:
return
elif kind:
if v.endswith('|' + kind):
return
if kind is None:
# key with no qualifiers is found
return True
s_value, s_kind = v.split('|')
# if no kind match, look for other keys
if kind != s_kind:
continue
if value:
# special-case value|ms because statsd can turn
# timing results into float of indeterminate
# length, hence foiling string matching.
if kind == 'ms':
if float(value) == float(s_value):
return True
if value == s_value:
return True
# otherwise keep looking for other matches
continue
# this key matches
return True
time.sleep(0.1)
raise Exception("Key %s not found in reported stats" % key)

View File

@ -79,8 +79,8 @@ class TestLauncher(tests.DBTestCase):
)
self.zk.deleteNodeRequest(req)
self.waitForNodeRequestLockDeletion(req.id)
self.assertReportedStat('nodepool.nodes.ready', '1|g')
self.assertReportedStat('nodepool.nodes.building', '0|g')
self.assertReportedStat('nodepool.nodes.ready', value='1', kind='g')
self.assertReportedStat('nodepool.nodes.building', value='0', kind='g')
def test_node_assignment_order(self):
"""Test that nodes are assigned in the order requested"""