Collect request stats

This is a thing that nodepool has been doing for ages. With the upcoming
changes to remove the task manager, the mechanism it has been using to
put activity in the right place isn't going to be available anymore. But
also, people using openstacksdk from within a service might also want to
be able to do the same logging.

This improves upon the old method as well, as it uses the history in the
response object to get and report on all of the calls made as part of a
request. This will catch things that do auto retries.

While we're in there, add support for reporting to prometheus instead.
The prometheus support does not read from config, and does not run an
http service, since openstacksdk is a library. It is expected that
an application that uses openstacksdk and wants request stats collected
will pass a prometheus_client.CollectorRegistry to collector_registry.

Change-Id: I7218179dd5f0c068a52a4704b2ce1a0942fdc0d1
This commit is contained in:
Monty Taylor 2018-11-01 12:54:43 -05:00
parent f9b0911166
commit c8b96cddd3
8 changed files with 424 additions and 5 deletions

View File

@ -24,6 +24,7 @@ os-client-config==1.28.0
os-service-types==1.2.0
oslotest==3.2.0
pbr==2.0.0
prometheus-client==0.4.2
Pygments==2.2.0
python-mimeparse==1.6.0
python-subunit==1.0.0
@ -32,6 +33,7 @@ requests==2.18.0
requests-mock==1.2.0
requestsexceptions==1.2.0
six==1.10.0
statsd==3.3.0
stestr==1.0.0
stevedore==1.20.0
testrepository==0.0.18

View File

@ -451,6 +451,10 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
interface=self.config.get_interface(service_type),
endpoint_override=self.config.get_endpoint(service_type),
region_name=self.config.region_name,
statsd_prefix=self.config.get_statsd_prefix(),
statsd_client=self.config.get_statsd_client(),
prometheus_counter=self.config.get_prometheus_counter(),
prometheus_histogram=self.config.get_prometheus_histogram(),
min_version=request_min_version,
max_version=request_max_version)
if adapter.get_endpoint():

View File

@ -21,6 +21,15 @@ from keystoneauth1 import session as ks_session
import os_service_types
import requestsexceptions
from six.moves import urllib
try:
import statsd
except ImportError:
statsd = None
try:
import prometheus_client
except ImportError:
prometheus_client = None
from openstack import version as openstack_version
from openstack import _log
@ -96,7 +105,9 @@ class CloudRegion(object):
discovery_cache=None, extra_config=None,
cache_expiration_time=0, cache_expirations=None,
cache_path=None, cache_class='dogpile.cache.null',
cache_arguments=None, password_callback=None):
cache_arguments=None, password_callback=None,
statsd_host=None, statsd_port=None, statsd_prefix=None,
collector_registry=None):
self._name = name
self.region_name = region_name
self.config = _util.normalize_keys(config)
@ -116,6 +127,11 @@ class CloudRegion(object):
self._cache_class = cache_class
self._cache_arguments = cache_arguments
self._password_callback = password_callback
self._statsd_host = statsd_host
self._statsd_port = statsd_port
self._statsd_prefix = statsd_prefix
self._statsd_client = None
self._collector_registry = collector_registry
self._service_type_manager = os_service_types.ServiceTypes()
@ -471,6 +487,11 @@ class CloudRegion(object):
self.get_connect_retries(service_type))
kwargs.setdefault('status_code_retries',
self.get_status_code_retries(service_type))
kwargs.setdefault('statsd_prefix', self.get_statsd_prefix())
kwargs.setdefault('statsd_client', self.get_statsd_client())
kwargs.setdefault('prometheus_counter', self.get_prometheus_counter())
kwargs.setdefault(
'prometheus_histogram', self.get_prometheus_histogram())
endpoint_override = self.get_endpoint(service_type)
version = version_request.version
min_api_version = (
@ -746,3 +767,61 @@ class CloudRegion(object):
def get_concurrency(self, service_type=None):
return self._get_service_config(
'concurrency', service_type=service_type)
def get_statsd_client(self):
if not statsd:
return None
statsd_args = {}
if self._statsd_host:
statsd_args['host'] = self._statsd_host
if self._statsd_port:
statsd_args['port'] = self._statsd_port
if statsd_args:
return statsd.StatsClient(**statsd_args)
else:
return None
def get_statsd_prefix(self):
return self._statsd_prefix or 'openstack.api'
def get_prometheus_registry(self):
if not self._collector_registry and prometheus_client:
self._collector_registry = prometheus_client.REGISTRY
return self._collector_registry
def get_prometheus_histogram(self):
registry = self.get_prometheus_registry()
if not registry or not prometheus_client:
return
# We have to hide a reference to the histogram on the registry
# object, because it's collectors must be singletons for a given
# registry but register at creation time.
hist = getattr(registry, '_openstacksdk_histogram', None)
if not hist:
hist = prometheus_client.Histogram(
'openstack_http_response_time',
'Time taken for an http response to an OpenStack service',
labelnames=[
'method', 'endpoint', 'service_type', 'status_code'
],
registry=registry,
)
registry._openstacksdk_histogram = hist
return hist
def get_prometheus_counter(self):
registry = self.get_prometheus_registry()
if not registry or not prometheus_client:
return
counter = getattr(registry, '_openstacksdk_counter', None)
if not counter:
counter = prometheus_client.Counter(
'openstack_http_requests',
'Number of HTTP requests made to an OpenStack service',
labelnames=[
'method', 'endpoint', 'service_type', 'status_code'
],
registry=registry,
)
registry._openstacksdk_counter = counter
return counter

View File

@ -140,7 +140,9 @@ class OpenStackConfig(object):
envvar_prefix=None, secure_files=None,
pw_func=None, session_constructor=None,
app_name=None, app_version=None,
load_yaml_config=True, load_envvars=True):
load_yaml_config=True, load_envvars=True,
statsd_host=None, statsd_port=None,
statsd_prefix=None):
self.log = _log.setup_logging('openstack.config')
self._session_constructor = session_constructor
self._app_name = app_name
@ -276,6 +278,21 @@ class OpenStackConfig(object):
self._cache_expirations = cache_settings.get(
'expiration', self._cache_expirations)
if load_yaml_config:
statsd_config = self.cloud_config.get('statsd', {})
statsd_host = statsd_host or statsd_config.get('host')
statsd_port = statsd_port or statsd_config.get('port')
statsd_prefix = statsd_prefix or statsd_config.get('prefix')
if load_envvars:
statsd_host = statsd_host or os.environ.get('STATSD_HOST')
statsd_port = statsd_port or os.environ.get('STATSD_PORT')
statsd_prefix = statsd_prefix or os.environ.get('STATSD_PREFIX')
self._statsd_host = statsd_host
self._statsd_port = statsd_port
self._statsd_prefix = statsd_prefix
# Flag location to hold the peeked value of an argparse timeout value
self._argv_timeout = False
@ -1091,6 +1108,9 @@ class OpenStackConfig(object):
cache_class=self._cache_class,
cache_arguments=self._cache_arguments,
password_callback=self._pw_callback,
statsd_host=self._statsd_host,
statsd_port=self._statsd_port,
statsd_prefix=self._statsd_prefix,
)
# TODO(mordred) Backwards compat for OSC transition
get_one_cloud = get_one

View File

@ -59,7 +59,9 @@ def _extract_name(url, service_type=None):
# Strip leading version piece so that
# GET /v2.0/networks
# returns ['networks']
if url_parts[0] in ('v1', 'v2', 'v2.0'):
if (url_parts[0]
and url_parts[0][0] == 'v'
and url_parts[0][1] and url_parts[0][1].isdigit()):
url_parts = url_parts[1:]
name_parts = []
# Pull out every other URL portion - so that
@ -118,12 +120,21 @@ class Proxy(adapter.Adapter):
``<service-type>_status_code_retries``.
"""
def __init__(self, *args, **kwargs):
def __init__(
self,
session,
statsd_client=None, statsd_prefix=None,
prometheus_counter=None, prometheus_histogram=None,
*args, **kwargs):
# NOTE(dtantsur): keystoneauth defaults retriable_status_codes to None,
# override it with a class-level value.
kwargs.setdefault('retriable_status_codes',
self.retriable_status_codes)
super(Proxy, self).__init__(*args, **kwargs)
super(Proxy, self).__init__(session=session, *args, **kwargs)
self._statsd_client = statsd_client
self._statsd_prefix = statsd_prefix
self._prometheus_counter = prometheus_counter
self._prometheus_histogram = prometheus_histogram
def request(
self, url, method, error_message=None,
@ -132,8 +143,36 @@ class Proxy(adapter.Adapter):
url, method,
connect_retries=connect_retries, raise_exc=False,
**kwargs)
for h in response.history:
self._report_stats(h)
self._report_stats(response)
return response
def _report_stats(self, response):
if self._statsd_client:
self._report_stats_statsd(response)
if self._prometheus_counter and self._prometheus_histogram:
self._report_stats_prometheus(response)
def _report_stats_statsd(self, response):
name_parts = _extract_name(response.request.url, self.service_type)
key = '.'.join(
[self._statsd_prefix, self.service_type, response.request.method]
+ name_parts)
self._statsd_client.timing(key, int(response.elapsed.seconds * 1000))
self._statsd_client.incr(key)
def _report_stats_prometheus(self, response):
labels = dict(
method=response.request.method,
endpoint=response.request.url,
service_type=self.service_type,
status_code=response.status_code,
)
self._prometheus_counter.labels(**labels).inc()
self._prometheus_histogram.labels(**labels).observe(
response.elapsed.seconds)
def _version_matches(self, version):
api_version = self.get_api_major_version()
if api_version:

View File

@ -0,0 +1,268 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# Copyright 2014 OpenStack Foundation
# Copyright 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import os
import pprint
import threading
import time
import select
import socket
import fixtures
import prometheus_client
import testtools.content
from openstack.tests.unit import base
class StatsdFixture(fixtures.Fixture):
def _setUp(self):
self.running = True
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('', 0))
self.port = self.sock.getsockname()[1]
self.wake_read, self.wake_write = os.pipe()
self.stats = []
self.thread.start()
self.addCleanup(self._cleanup)
def run(self):
while self.running:
poll = select.poll()
poll.register(self.sock, select.POLLIN)
poll.register(self.wake_read, select.POLLIN)
ret = poll.poll()
for (fd, event) in ret:
if fd == self.sock.fileno():
data = self.sock.recvfrom(1024)
if not data:
return
self.stats.append(data[0])
if fd == self.wake_read:
return
def _cleanup(self):
self.running = False
os.write(self.wake_write, b'1\n')
self.thread.join()
class TestStats(base.TestCase):
def setUp(self):
self.statsd = StatsdFixture()
self.useFixture(self.statsd)
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
# see: https://github.com/jsocol/pystatsd/issues/61
self.useFixture(
fixtures.EnvironmentVariable('STATSD_HOST', '127.0.0.1'))
self.useFixture(
fixtures.EnvironmentVariable('STATSD_PORT', str(self.statsd.port)))
self.add_info_on_exception('statsd_content', self.statsd.stats)
# Set up the above things before the super setup so that we have the
# environment variables set when the Connection is created.
super(TestStats, self).setUp()
self._registry = prometheus_client.CollectorRegistry()
self.cloud.config._collector_registry = self._registry
self.addOnException(self._add_prometheus_samples)
def _add_prometheus_samples(self, exc_info):
samples = []
for metric in self._registry.collect():
for s in metric.samples:
samples.append(s)
self.addDetail(
'prometheus_samples',
testtools.content.text_content(pprint.pformat(samples)))
def assert_reported_stat(self, key, value=None, kind=None):
"""Check statsd output
Check statsd return values. A ``value`` should specify a
``kind``, however a ``kind`` may be specified without a
``value`` for a generic match. Leave both empy to just check
for key presence.
:arg str key: The statsd key
:arg str value: The expected value of the metric ``key``
:arg str kind: The expected type of the metric ``key`` For example
- ``c`` counter
- ``g`` gauge
- ``ms`` timing
- ``s`` set
"""
self.assertIsNotNone(self.statsd)
if value:
self.assertNotEqual(kind, None)
start = time.time()
while time.time() < (start + 1):
# Note our fake statsd just queues up results in a queue.
# We just keep going through them until we find one that
# matches, or fail out. If statsd pipelines are used,
# large single packets are sent with stats separated by
# newlines; thus we first flatten the stats out into
# single entries.
stats = itertools.chain.from_iterable(
[s.decode('utf-8').split('\n') for s in self.statsd.stats])
for stat in stats:
k, v = stat.split(':')
if key == k:
if kind is None:
# key with no qualifiers is found
return True
s_value, s_kind = v.split('|')
# if no kind match, look for other keys
if kind != s_kind:
continue
if value:
# special-case value|ms because statsd can turn
# timing results into float of indeterminate
# length, hence foiling string matching.
if kind == 'ms':
if float(value) == float(s_value):
return True
if value == s_value:
return True
# otherwise keep looking for other matches
continue
# this key matches
return True
time.sleep(0.1)
raise Exception("Key %s not found in reported stats" % key)
def assert_prometheus_stat(self, name, value, labels=None):
sample_value = self._registry.get_sample_value(name, labels)
self.assertEqual(sample_value, value)
def test_list_projects(self):
mock_uri = self.get_mock_url(
service_type='identity', interface='admin', resource='projects',
base_url_append='v3')
self.register_uris([
dict(method='GET', uri=mock_uri, status_code=200,
json={'projects': []})])
self.cloud.list_projects()
self.assert_calls()
self.assert_reported_stat(
'openstack.api.identity.GET.projects', value='1', kind='c')
self.assert_prometheus_stat(
'openstack_http_requests_total', 1, dict(
service_type='identity',
endpoint=mock_uri,
method='GET',
status_code='200'))
def test_projects(self):
mock_uri = self.get_mock_url(
service_type='identity', interface='admin', resource='projects',
base_url_append='v3')
self.register_uris([
dict(method='GET', uri=mock_uri, status_code=200,
json={'projects': []})])
list(self.cloud.identity.projects())
self.assert_calls()
self.assert_reported_stat(
'openstack.api.identity.GET.projects', value='1', kind='c')
self.assert_prometheus_stat(
'openstack_http_requests_total', 1, dict(
service_type='identity',
endpoint=mock_uri,
method='GET',
status_code='200'))
def test_servers(self):
mock_uri = 'https://compute.example.com/v2.1/servers/detail'
self.register_uris([
dict(method='GET', uri=mock_uri, status_code=200,
json={'servers': []})])
list(self.cloud.compute.servers())
self.assert_calls()
self.assert_reported_stat(
'openstack.api.compute.GET.servers.detail', value='1', kind='c')
self.assert_prometheus_stat(
'openstack_http_requests_total', 1, dict(
service_type='compute',
endpoint=mock_uri,
method='GET',
status_code='200'))
def test_servers_no_detail(self):
mock_uri = 'https://compute.example.com/v2.1/servers'
self.register_uris([
dict(method='GET', uri=mock_uri, status_code=200,
json={'servers': []})])
self.cloud.compute.get('/servers')
self.assert_calls()
self.assert_reported_stat(
'openstack.api.compute.GET.servers', value='1', kind='c')
self.assert_prometheus_stat(
'openstack_http_requests_total', 1, dict(
service_type='compute',
endpoint=mock_uri,
method='GET',
status_code='200'))
class TestNoStats(base.TestCase):
def setUp(self):
super(TestNoStats, self).setUp()
self.statsd = StatsdFixture()
self.useFixture(self.statsd)
def test_no_stats(self):
mock_uri = self.get_mock_url(
service_type='identity', interface='admin', resource='projects',
base_url_append='v3')
self.register_uris([
dict(method='GET', uri=mock_uri, status_code=200,
json={'projects': []})])
self.cloud.identity._statsd_client = None
list(self.cloud.identity.projects())
self.assert_calls()
self.assertEqual([], self.statsd.stats)

View File

@ -0,0 +1,5 @@
---
features:
- |
Added support for collecting and reporting stats on calls made to
statsd and prometheus.

View File

@ -8,9 +8,11 @@ extras>=1.0.0 # MIT
fixtures>=3.0.0 # Apache-2.0/BSD
jsonschema<3.0.0,>=2.6.0 # MIT
mock>=2.0.0 # BSD
prometheus-client>=0.4.2 # Apache-2.0
python-subunit>=1.0.0 # Apache-2.0/BSD
oslotest>=3.2.0 # Apache-2.0
requests-mock>=1.2.0 # Apache-2.0
statsd>=3.3.0
stestr>=1.0.0 # Apache-2.0
testrepository>=0.0.18 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD