From c8b96cddd3d65b9b79788d93e72fe499f07ffae0 Mon Sep 17 00:00:00 2001 From: Monty Taylor Date: Thu, 1 Nov 2018 12:54:43 -0500 Subject: [PATCH] Collect request stats This is a thing that nodepool has been doing for ages. With the upcoming changes to remove the task manager, the mechanism it has been using to put activity in the right place isn't going to be available anymore. But also, people using openstacksdk from within a service might also want to be able to do the same logging. This improves upon the old method as well, as it uses the history in the response object to get and report on all of the calls made as part of a request. This will catch things that do auto retries. While we're in there, add support for reporting to prometheus instead. The prometheus support does not read from config, and does not run an http service, since openstacksdk is a library. It is expected that an application that uses openstacksdk and wants request stats collected will pass a prometheus_client.CollectorRegistry to collector_registry. Change-Id: I7218179dd5f0c068a52a4704b2ce1a0942fdc0d1 --- lower-constraints.txt | 2 + openstack/cloud/openstackcloud.py | 4 + openstack/config/cloud_region.py | 81 +++++- openstack/config/loader.py | 22 +- openstack/proxy.py | 45 ++- openstack/tests/unit/test_stats.py | 268 ++++++++++++++++++ .../notes/request-stats-9d70480bebbdb4d6.yaml | 5 + test-requirements.txt | 2 + 8 files changed, 424 insertions(+), 5 deletions(-) create mode 100644 openstack/tests/unit/test_stats.py create mode 100644 releasenotes/notes/request-stats-9d70480bebbdb4d6.yaml diff --git a/lower-constraints.txt b/lower-constraints.txt index 3de103693..93758a7e4 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -24,6 +24,7 @@ os-client-config==1.28.0 os-service-types==1.2.0 oslotest==3.2.0 pbr==2.0.0 +prometheus-client==0.4.2 Pygments==2.2.0 python-mimeparse==1.6.0 python-subunit==1.0.0 @@ -32,6 +33,7 @@ requests==2.18.0 requests-mock==1.2.0 requestsexceptions==1.2.0 six==1.10.0 +statsd==3.3.0 stestr==1.0.0 stevedore==1.20.0 testrepository==0.0.18 diff --git a/openstack/cloud/openstackcloud.py b/openstack/cloud/openstackcloud.py index 54e0939d6..027fe0373 100755 --- a/openstack/cloud/openstackcloud.py +++ b/openstack/cloud/openstackcloud.py @@ -451,6 +451,10 @@ class _OpenStackCloudMixin(_normalize.Normalizer): interface=self.config.get_interface(service_type), endpoint_override=self.config.get_endpoint(service_type), region_name=self.config.region_name, + statsd_prefix=self.config.get_statsd_prefix(), + statsd_client=self.config.get_statsd_client(), + prometheus_counter=self.config.get_prometheus_counter(), + prometheus_histogram=self.config.get_prometheus_histogram(), min_version=request_min_version, max_version=request_max_version) if adapter.get_endpoint(): diff --git a/openstack/config/cloud_region.py b/openstack/config/cloud_region.py index 50c13bc04..8bb208346 100644 --- a/openstack/config/cloud_region.py +++ b/openstack/config/cloud_region.py @@ -21,6 +21,15 @@ from keystoneauth1 import session as ks_session import os_service_types import requestsexceptions from six.moves import urllib +try: + import statsd +except ImportError: + statsd = None +try: + import prometheus_client +except ImportError: + prometheus_client = None + from openstack import version as openstack_version from openstack import _log @@ -96,7 +105,9 @@ class CloudRegion(object): discovery_cache=None, extra_config=None, cache_expiration_time=0, cache_expirations=None, cache_path=None, cache_class='dogpile.cache.null', - cache_arguments=None, password_callback=None): + cache_arguments=None, password_callback=None, + statsd_host=None, statsd_port=None, statsd_prefix=None, + collector_registry=None): self._name = name self.region_name = region_name self.config = _util.normalize_keys(config) @@ -116,6 +127,11 @@ class CloudRegion(object): self._cache_class = cache_class self._cache_arguments = cache_arguments self._password_callback = password_callback + self._statsd_host = statsd_host + self._statsd_port = statsd_port + self._statsd_prefix = statsd_prefix + self._statsd_client = None + self._collector_registry = collector_registry self._service_type_manager = os_service_types.ServiceTypes() @@ -471,6 +487,11 @@ class CloudRegion(object): self.get_connect_retries(service_type)) kwargs.setdefault('status_code_retries', self.get_status_code_retries(service_type)) + kwargs.setdefault('statsd_prefix', self.get_statsd_prefix()) + kwargs.setdefault('statsd_client', self.get_statsd_client()) + kwargs.setdefault('prometheus_counter', self.get_prometheus_counter()) + kwargs.setdefault( + 'prometheus_histogram', self.get_prometheus_histogram()) endpoint_override = self.get_endpoint(service_type) version = version_request.version min_api_version = ( @@ -746,3 +767,61 @@ class CloudRegion(object): def get_concurrency(self, service_type=None): return self._get_service_config( 'concurrency', service_type=service_type) + + def get_statsd_client(self): + if not statsd: + return None + statsd_args = {} + if self._statsd_host: + statsd_args['host'] = self._statsd_host + if self._statsd_port: + statsd_args['port'] = self._statsd_port + if statsd_args: + return statsd.StatsClient(**statsd_args) + else: + return None + + def get_statsd_prefix(self): + return self._statsd_prefix or 'openstack.api' + + def get_prometheus_registry(self): + if not self._collector_registry and prometheus_client: + self._collector_registry = prometheus_client.REGISTRY + return self._collector_registry + + def get_prometheus_histogram(self): + registry = self.get_prometheus_registry() + if not registry or not prometheus_client: + return + # We have to hide a reference to the histogram on the registry + # object, because it's collectors must be singletons for a given + # registry but register at creation time. + hist = getattr(registry, '_openstacksdk_histogram', None) + if not hist: + hist = prometheus_client.Histogram( + 'openstack_http_response_time', + 'Time taken for an http response to an OpenStack service', + labelnames=[ + 'method', 'endpoint', 'service_type', 'status_code' + ], + registry=registry, + ) + registry._openstacksdk_histogram = hist + return hist + + def get_prometheus_counter(self): + registry = self.get_prometheus_registry() + if not registry or not prometheus_client: + return + counter = getattr(registry, '_openstacksdk_counter', None) + if not counter: + counter = prometheus_client.Counter( + 'openstack_http_requests', + 'Number of HTTP requests made to an OpenStack service', + labelnames=[ + 'method', 'endpoint', 'service_type', 'status_code' + ], + registry=registry, + ) + registry._openstacksdk_counter = counter + return counter diff --git a/openstack/config/loader.py b/openstack/config/loader.py index 1b5540d24..42f830fa1 100644 --- a/openstack/config/loader.py +++ b/openstack/config/loader.py @@ -140,7 +140,9 @@ class OpenStackConfig(object): envvar_prefix=None, secure_files=None, pw_func=None, session_constructor=None, app_name=None, app_version=None, - load_yaml_config=True, load_envvars=True): + load_yaml_config=True, load_envvars=True, + statsd_host=None, statsd_port=None, + statsd_prefix=None): self.log = _log.setup_logging('openstack.config') self._session_constructor = session_constructor self._app_name = app_name @@ -276,6 +278,21 @@ class OpenStackConfig(object): self._cache_expirations = cache_settings.get( 'expiration', self._cache_expirations) + if load_yaml_config: + statsd_config = self.cloud_config.get('statsd', {}) + statsd_host = statsd_host or statsd_config.get('host') + statsd_port = statsd_port or statsd_config.get('port') + statsd_prefix = statsd_prefix or statsd_config.get('prefix') + + if load_envvars: + statsd_host = statsd_host or os.environ.get('STATSD_HOST') + statsd_port = statsd_port or os.environ.get('STATSD_PORT') + statsd_prefix = statsd_prefix or os.environ.get('STATSD_PREFIX') + + self._statsd_host = statsd_host + self._statsd_port = statsd_port + self._statsd_prefix = statsd_prefix + # Flag location to hold the peeked value of an argparse timeout value self._argv_timeout = False @@ -1091,6 +1108,9 @@ class OpenStackConfig(object): cache_class=self._cache_class, cache_arguments=self._cache_arguments, password_callback=self._pw_callback, + statsd_host=self._statsd_host, + statsd_port=self._statsd_port, + statsd_prefix=self._statsd_prefix, ) # TODO(mordred) Backwards compat for OSC transition get_one_cloud = get_one diff --git a/openstack/proxy.py b/openstack/proxy.py index 5bc23f85e..8144e771f 100644 --- a/openstack/proxy.py +++ b/openstack/proxy.py @@ -59,7 +59,9 @@ def _extract_name(url, service_type=None): # Strip leading version piece so that # GET /v2.0/networks # returns ['networks'] - if url_parts[0] in ('v1', 'v2', 'v2.0'): + if (url_parts[0] + and url_parts[0][0] == 'v' + and url_parts[0][1] and url_parts[0][1].isdigit()): url_parts = url_parts[1:] name_parts = [] # Pull out every other URL portion - so that @@ -118,12 +120,21 @@ class Proxy(adapter.Adapter): ``_status_code_retries``. """ - def __init__(self, *args, **kwargs): + def __init__( + self, + session, + statsd_client=None, statsd_prefix=None, + prometheus_counter=None, prometheus_histogram=None, + *args, **kwargs): # NOTE(dtantsur): keystoneauth defaults retriable_status_codes to None, # override it with a class-level value. kwargs.setdefault('retriable_status_codes', self.retriable_status_codes) - super(Proxy, self).__init__(*args, **kwargs) + super(Proxy, self).__init__(session=session, *args, **kwargs) + self._statsd_client = statsd_client + self._statsd_prefix = statsd_prefix + self._prometheus_counter = prometheus_counter + self._prometheus_histogram = prometheus_histogram def request( self, url, method, error_message=None, @@ -132,8 +143,36 @@ class Proxy(adapter.Adapter): url, method, connect_retries=connect_retries, raise_exc=False, **kwargs) + for h in response.history: + self._report_stats(h) + self._report_stats(response) return response + def _report_stats(self, response): + if self._statsd_client: + self._report_stats_statsd(response) + if self._prometheus_counter and self._prometheus_histogram: + self._report_stats_prometheus(response) + + def _report_stats_statsd(self, response): + name_parts = _extract_name(response.request.url, self.service_type) + key = '.'.join( + [self._statsd_prefix, self.service_type, response.request.method] + + name_parts) + self._statsd_client.timing(key, int(response.elapsed.seconds * 1000)) + self._statsd_client.incr(key) + + def _report_stats_prometheus(self, response): + labels = dict( + method=response.request.method, + endpoint=response.request.url, + service_type=self.service_type, + status_code=response.status_code, + ) + self._prometheus_counter.labels(**labels).inc() + self._prometheus_histogram.labels(**labels).observe( + response.elapsed.seconds) + def _version_matches(self, version): api_version = self.get_api_major_version() if api_version: diff --git a/openstack/tests/unit/test_stats.py b/openstack/tests/unit/test_stats.py new file mode 100644 index 000000000..d8c88c299 --- /dev/null +++ b/openstack/tests/unit/test_stats.py @@ -0,0 +1,268 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# Copyright 2014 OpenStack Foundation +# Copyright 2018 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import itertools +import os +import pprint +import threading +import time +import select +import socket + +import fixtures +import prometheus_client +import testtools.content + +from openstack.tests.unit import base + + +class StatsdFixture(fixtures.Fixture): + def _setUp(self): + self.running = True + self.thread = threading.Thread(target=self.run) + self.thread.daemon = True + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock.bind(('', 0)) + self.port = self.sock.getsockname()[1] + self.wake_read, self.wake_write = os.pipe() + self.stats = [] + self.thread.start() + self.addCleanup(self._cleanup) + + def run(self): + while self.running: + poll = select.poll() + poll.register(self.sock, select.POLLIN) + poll.register(self.wake_read, select.POLLIN) + ret = poll.poll() + for (fd, event) in ret: + if fd == self.sock.fileno(): + data = self.sock.recvfrom(1024) + if not data: + return + self.stats.append(data[0]) + if fd == self.wake_read: + return + + def _cleanup(self): + self.running = False + os.write(self.wake_write, b'1\n') + self.thread.join() + + +class TestStats(base.TestCase): + + def setUp(self): + self.statsd = StatsdFixture() + self.useFixture(self.statsd) + # note, use 127.0.0.1 rather than localhost to avoid getting ipv6 + # see: https://github.com/jsocol/pystatsd/issues/61 + self.useFixture( + fixtures.EnvironmentVariable('STATSD_HOST', '127.0.0.1')) + self.useFixture( + fixtures.EnvironmentVariable('STATSD_PORT', str(self.statsd.port))) + + self.add_info_on_exception('statsd_content', self.statsd.stats) + # Set up the above things before the super setup so that we have the + # environment variables set when the Connection is created. + super(TestStats, self).setUp() + + self._registry = prometheus_client.CollectorRegistry() + self.cloud.config._collector_registry = self._registry + self.addOnException(self._add_prometheus_samples) + + def _add_prometheus_samples(self, exc_info): + samples = [] + for metric in self._registry.collect(): + for s in metric.samples: + samples.append(s) + self.addDetail( + 'prometheus_samples', + testtools.content.text_content(pprint.pformat(samples))) + + def assert_reported_stat(self, key, value=None, kind=None): + """Check statsd output + + Check statsd return values. A ``value`` should specify a + ``kind``, however a ``kind`` may be specified without a + ``value`` for a generic match. Leave both empy to just check + for key presence. + + :arg str key: The statsd key + :arg str value: The expected value of the metric ``key`` + :arg str kind: The expected type of the metric ``key`` For example + + - ``c`` counter + - ``g`` gauge + - ``ms`` timing + - ``s`` set + """ + + self.assertIsNotNone(self.statsd) + + if value: + self.assertNotEqual(kind, None) + + start = time.time() + while time.time() < (start + 1): + # Note our fake statsd just queues up results in a queue. + # We just keep going through them until we find one that + # matches, or fail out. If statsd pipelines are used, + # large single packets are sent with stats separated by + # newlines; thus we first flatten the stats out into + # single entries. + stats = itertools.chain.from_iterable( + [s.decode('utf-8').split('\n') for s in self.statsd.stats]) + for stat in stats: + k, v = stat.split(':') + if key == k: + if kind is None: + # key with no qualifiers is found + return True + + s_value, s_kind = v.split('|') + + # if no kind match, look for other keys + if kind != s_kind: + continue + + if value: + # special-case value|ms because statsd can turn + # timing results into float of indeterminate + # length, hence foiling string matching. + if kind == 'ms': + if float(value) == float(s_value): + return True + if value == s_value: + return True + # otherwise keep looking for other matches + continue + + # this key matches + return True + time.sleep(0.1) + + raise Exception("Key %s not found in reported stats" % key) + + def assert_prometheus_stat(self, name, value, labels=None): + sample_value = self._registry.get_sample_value(name, labels) + self.assertEqual(sample_value, value) + + def test_list_projects(self): + + mock_uri = self.get_mock_url( + service_type='identity', interface='admin', resource='projects', + base_url_append='v3') + + self.register_uris([ + dict(method='GET', uri=mock_uri, status_code=200, + json={'projects': []})]) + + self.cloud.list_projects() + self.assert_calls() + + self.assert_reported_stat( + 'openstack.api.identity.GET.projects', value='1', kind='c') + self.assert_prometheus_stat( + 'openstack_http_requests_total', 1, dict( + service_type='identity', + endpoint=mock_uri, + method='GET', + status_code='200')) + + def test_projects(self): + mock_uri = self.get_mock_url( + service_type='identity', interface='admin', resource='projects', + base_url_append='v3') + + self.register_uris([ + dict(method='GET', uri=mock_uri, status_code=200, + json={'projects': []})]) + + list(self.cloud.identity.projects()) + self.assert_calls() + + self.assert_reported_stat( + 'openstack.api.identity.GET.projects', value='1', kind='c') + self.assert_prometheus_stat( + 'openstack_http_requests_total', 1, dict( + service_type='identity', + endpoint=mock_uri, + method='GET', + status_code='200')) + + def test_servers(self): + + mock_uri = 'https://compute.example.com/v2.1/servers/detail' + + self.register_uris([ + dict(method='GET', uri=mock_uri, status_code=200, + json={'servers': []})]) + + list(self.cloud.compute.servers()) + self.assert_calls() + + self.assert_reported_stat( + 'openstack.api.compute.GET.servers.detail', value='1', kind='c') + self.assert_prometheus_stat( + 'openstack_http_requests_total', 1, dict( + service_type='compute', + endpoint=mock_uri, + method='GET', + status_code='200')) + + def test_servers_no_detail(self): + + mock_uri = 'https://compute.example.com/v2.1/servers' + + self.register_uris([ + dict(method='GET', uri=mock_uri, status_code=200, + json={'servers': []})]) + + self.cloud.compute.get('/servers') + self.assert_calls() + + self.assert_reported_stat( + 'openstack.api.compute.GET.servers', value='1', kind='c') + self.assert_prometheus_stat( + 'openstack_http_requests_total', 1, dict( + service_type='compute', + endpoint=mock_uri, + method='GET', + status_code='200')) + + +class TestNoStats(base.TestCase): + + def setUp(self): + super(TestNoStats, self).setUp() + self.statsd = StatsdFixture() + self.useFixture(self.statsd) + + def test_no_stats(self): + + mock_uri = self.get_mock_url( + service_type='identity', interface='admin', resource='projects', + base_url_append='v3') + + self.register_uris([ + dict(method='GET', uri=mock_uri, status_code=200, + json={'projects': []})]) + + self.cloud.identity._statsd_client = None + list(self.cloud.identity.projects()) + self.assert_calls() + self.assertEqual([], self.statsd.stats) diff --git a/releasenotes/notes/request-stats-9d70480bebbdb4d6.yaml b/releasenotes/notes/request-stats-9d70480bebbdb4d6.yaml new file mode 100644 index 000000000..04748cae6 --- /dev/null +++ b/releasenotes/notes/request-stats-9d70480bebbdb4d6.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Added support for collecting and reporting stats on calls made to + statsd and prometheus. diff --git a/test-requirements.txt b/test-requirements.txt index bbc69b07f..82ccae1a3 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -8,9 +8,11 @@ extras>=1.0.0 # MIT fixtures>=3.0.0 # Apache-2.0/BSD jsonschema<3.0.0,>=2.6.0 # MIT mock>=2.0.0 # BSD +prometheus-client>=0.4.2 # Apache-2.0 python-subunit>=1.0.0 # Apache-2.0/BSD oslotest>=3.2.0 # Apache-2.0 requests-mock>=1.2.0 # Apache-2.0 +statsd>=3.3.0 stestr>=1.0.0 # Apache-2.0 testrepository>=0.0.18 # Apache-2.0/BSD testscenarios>=0.4 # Apache-2.0/BSD