Merge "Collect request stats"
This commit is contained in:
commit
4c49b6432c
|
@ -24,6 +24,7 @@ os-client-config==1.28.0
|
|||
os-service-types==1.2.0
|
||||
oslotest==3.2.0
|
||||
pbr==2.0.0
|
||||
prometheus-client==0.4.2
|
||||
Pygments==2.2.0
|
||||
python-mimeparse==1.6.0
|
||||
python-subunit==1.0.0
|
||||
|
@ -32,6 +33,7 @@ requests==2.18.0
|
|||
requests-mock==1.2.0
|
||||
requestsexceptions==1.2.0
|
||||
six==1.10.0
|
||||
statsd==3.3.0
|
||||
stestr==1.0.0
|
||||
stevedore==1.20.0
|
||||
testrepository==0.0.18
|
||||
|
|
|
@ -451,6 +451,10 @@ class _OpenStackCloudMixin(_normalize.Normalizer):
|
|||
interface=self.config.get_interface(service_type),
|
||||
endpoint_override=self.config.get_endpoint(service_type),
|
||||
region_name=self.config.region_name,
|
||||
statsd_prefix=self.config.get_statsd_prefix(),
|
||||
statsd_client=self.config.get_statsd_client(),
|
||||
prometheus_counter=self.config.get_prometheus_counter(),
|
||||
prometheus_histogram=self.config.get_prometheus_histogram(),
|
||||
min_version=request_min_version,
|
||||
max_version=request_max_version)
|
||||
if adapter.get_endpoint():
|
||||
|
|
|
@ -21,6 +21,15 @@ from keystoneauth1 import session as ks_session
|
|||
import os_service_types
|
||||
import requestsexceptions
|
||||
from six.moves import urllib
|
||||
try:
|
||||
import statsd
|
||||
except ImportError:
|
||||
statsd = None
|
||||
try:
|
||||
import prometheus_client
|
||||
except ImportError:
|
||||
prometheus_client = None
|
||||
|
||||
|
||||
from openstack import version as openstack_version
|
||||
from openstack import _log
|
||||
|
@ -96,7 +105,9 @@ class CloudRegion(object):
|
|||
discovery_cache=None, extra_config=None,
|
||||
cache_expiration_time=0, cache_expirations=None,
|
||||
cache_path=None, cache_class='dogpile.cache.null',
|
||||
cache_arguments=None, password_callback=None):
|
||||
cache_arguments=None, password_callback=None,
|
||||
statsd_host=None, statsd_port=None, statsd_prefix=None,
|
||||
collector_registry=None):
|
||||
self._name = name
|
||||
self.region_name = region_name
|
||||
self.config = _util.normalize_keys(config)
|
||||
|
@ -116,6 +127,11 @@ class CloudRegion(object):
|
|||
self._cache_class = cache_class
|
||||
self._cache_arguments = cache_arguments
|
||||
self._password_callback = password_callback
|
||||
self._statsd_host = statsd_host
|
||||
self._statsd_port = statsd_port
|
||||
self._statsd_prefix = statsd_prefix
|
||||
self._statsd_client = None
|
||||
self._collector_registry = collector_registry
|
||||
|
||||
self._service_type_manager = os_service_types.ServiceTypes()
|
||||
|
||||
|
@ -471,6 +487,11 @@ class CloudRegion(object):
|
|||
self.get_connect_retries(service_type))
|
||||
kwargs.setdefault('status_code_retries',
|
||||
self.get_status_code_retries(service_type))
|
||||
kwargs.setdefault('statsd_prefix', self.get_statsd_prefix())
|
||||
kwargs.setdefault('statsd_client', self.get_statsd_client())
|
||||
kwargs.setdefault('prometheus_counter', self.get_prometheus_counter())
|
||||
kwargs.setdefault(
|
||||
'prometheus_histogram', self.get_prometheus_histogram())
|
||||
endpoint_override = self.get_endpoint(service_type)
|
||||
version = version_request.version
|
||||
min_api_version = (
|
||||
|
@ -746,3 +767,61 @@ class CloudRegion(object):
|
|||
def get_concurrency(self, service_type=None):
|
||||
return self._get_service_config(
|
||||
'concurrency', service_type=service_type)
|
||||
|
||||
def get_statsd_client(self):
|
||||
if not statsd:
|
||||
return None
|
||||
statsd_args = {}
|
||||
if self._statsd_host:
|
||||
statsd_args['host'] = self._statsd_host
|
||||
if self._statsd_port:
|
||||
statsd_args['port'] = self._statsd_port
|
||||
if statsd_args:
|
||||
return statsd.StatsClient(**statsd_args)
|
||||
else:
|
||||
return None
|
||||
|
||||
def get_statsd_prefix(self):
|
||||
return self._statsd_prefix or 'openstack.api'
|
||||
|
||||
def get_prometheus_registry(self):
|
||||
if not self._collector_registry and prometheus_client:
|
||||
self._collector_registry = prometheus_client.REGISTRY
|
||||
return self._collector_registry
|
||||
|
||||
def get_prometheus_histogram(self):
|
||||
registry = self.get_prometheus_registry()
|
||||
if not registry or not prometheus_client:
|
||||
return
|
||||
# We have to hide a reference to the histogram on the registry
|
||||
# object, because it's collectors must be singletons for a given
|
||||
# registry but register at creation time.
|
||||
hist = getattr(registry, '_openstacksdk_histogram', None)
|
||||
if not hist:
|
||||
hist = prometheus_client.Histogram(
|
||||
'openstack_http_response_time',
|
||||
'Time taken for an http response to an OpenStack service',
|
||||
labelnames=[
|
||||
'method', 'endpoint', 'service_type', 'status_code'
|
||||
],
|
||||
registry=registry,
|
||||
)
|
||||
registry._openstacksdk_histogram = hist
|
||||
return hist
|
||||
|
||||
def get_prometheus_counter(self):
|
||||
registry = self.get_prometheus_registry()
|
||||
if not registry or not prometheus_client:
|
||||
return
|
||||
counter = getattr(registry, '_openstacksdk_counter', None)
|
||||
if not counter:
|
||||
counter = prometheus_client.Counter(
|
||||
'openstack_http_requests',
|
||||
'Number of HTTP requests made to an OpenStack service',
|
||||
labelnames=[
|
||||
'method', 'endpoint', 'service_type', 'status_code'
|
||||
],
|
||||
registry=registry,
|
||||
)
|
||||
registry._openstacksdk_counter = counter
|
||||
return counter
|
||||
|
|
|
@ -140,7 +140,9 @@ class OpenStackConfig(object):
|
|||
envvar_prefix=None, secure_files=None,
|
||||
pw_func=None, session_constructor=None,
|
||||
app_name=None, app_version=None,
|
||||
load_yaml_config=True, load_envvars=True):
|
||||
load_yaml_config=True, load_envvars=True,
|
||||
statsd_host=None, statsd_port=None,
|
||||
statsd_prefix=None):
|
||||
self.log = _log.setup_logging('openstack.config')
|
||||
self._session_constructor = session_constructor
|
||||
self._app_name = app_name
|
||||
|
@ -276,6 +278,21 @@ class OpenStackConfig(object):
|
|||
self._cache_expirations = cache_settings.get(
|
||||
'expiration', self._cache_expirations)
|
||||
|
||||
if load_yaml_config:
|
||||
statsd_config = self.cloud_config.get('statsd', {})
|
||||
statsd_host = statsd_host or statsd_config.get('host')
|
||||
statsd_port = statsd_port or statsd_config.get('port')
|
||||
statsd_prefix = statsd_prefix or statsd_config.get('prefix')
|
||||
|
||||
if load_envvars:
|
||||
statsd_host = statsd_host or os.environ.get('STATSD_HOST')
|
||||
statsd_port = statsd_port or os.environ.get('STATSD_PORT')
|
||||
statsd_prefix = statsd_prefix or os.environ.get('STATSD_PREFIX')
|
||||
|
||||
self._statsd_host = statsd_host
|
||||
self._statsd_port = statsd_port
|
||||
self._statsd_prefix = statsd_prefix
|
||||
|
||||
# Flag location to hold the peeked value of an argparse timeout value
|
||||
self._argv_timeout = False
|
||||
|
||||
|
@ -1091,6 +1108,9 @@ class OpenStackConfig(object):
|
|||
cache_class=self._cache_class,
|
||||
cache_arguments=self._cache_arguments,
|
||||
password_callback=self._pw_callback,
|
||||
statsd_host=self._statsd_host,
|
||||
statsd_port=self._statsd_port,
|
||||
statsd_prefix=self._statsd_prefix,
|
||||
)
|
||||
# TODO(mordred) Backwards compat for OSC transition
|
||||
get_one_cloud = get_one
|
||||
|
|
|
@ -59,7 +59,9 @@ def _extract_name(url, service_type=None):
|
|||
# Strip leading version piece so that
|
||||
# GET /v2.0/networks
|
||||
# returns ['networks']
|
||||
if url_parts[0] in ('v1', 'v2', 'v2.0'):
|
||||
if (url_parts[0]
|
||||
and url_parts[0][0] == 'v'
|
||||
and url_parts[0][1] and url_parts[0][1].isdigit()):
|
||||
url_parts = url_parts[1:]
|
||||
name_parts = []
|
||||
# Pull out every other URL portion - so that
|
||||
|
@ -118,12 +120,21 @@ class Proxy(adapter.Adapter):
|
|||
``<service-type>_status_code_retries``.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
session,
|
||||
statsd_client=None, statsd_prefix=None,
|
||||
prometheus_counter=None, prometheus_histogram=None,
|
||||
*args, **kwargs):
|
||||
# NOTE(dtantsur): keystoneauth defaults retriable_status_codes to None,
|
||||
# override it with a class-level value.
|
||||
kwargs.setdefault('retriable_status_codes',
|
||||
self.retriable_status_codes)
|
||||
super(Proxy, self).__init__(*args, **kwargs)
|
||||
super(Proxy, self).__init__(session=session, *args, **kwargs)
|
||||
self._statsd_client = statsd_client
|
||||
self._statsd_prefix = statsd_prefix
|
||||
self._prometheus_counter = prometheus_counter
|
||||
self._prometheus_histogram = prometheus_histogram
|
||||
|
||||
def request(
|
||||
self, url, method, error_message=None,
|
||||
|
@ -132,8 +143,36 @@ class Proxy(adapter.Adapter):
|
|||
url, method,
|
||||
connect_retries=connect_retries, raise_exc=False,
|
||||
**kwargs)
|
||||
for h in response.history:
|
||||
self._report_stats(h)
|
||||
self._report_stats(response)
|
||||
return response
|
||||
|
||||
def _report_stats(self, response):
|
||||
if self._statsd_client:
|
||||
self._report_stats_statsd(response)
|
||||
if self._prometheus_counter and self._prometheus_histogram:
|
||||
self._report_stats_prometheus(response)
|
||||
|
||||
def _report_stats_statsd(self, response):
|
||||
name_parts = _extract_name(response.request.url, self.service_type)
|
||||
key = '.'.join(
|
||||
[self._statsd_prefix, self.service_type, response.request.method]
|
||||
+ name_parts)
|
||||
self._statsd_client.timing(key, int(response.elapsed.seconds * 1000))
|
||||
self._statsd_client.incr(key)
|
||||
|
||||
def _report_stats_prometheus(self, response):
|
||||
labels = dict(
|
||||
method=response.request.method,
|
||||
endpoint=response.request.url,
|
||||
service_type=self.service_type,
|
||||
status_code=response.status_code,
|
||||
)
|
||||
self._prometheus_counter.labels(**labels).inc()
|
||||
self._prometheus_histogram.labels(**labels).observe(
|
||||
response.elapsed.seconds)
|
||||
|
||||
def _version_matches(self, version):
|
||||
api_version = self.get_api_major_version()
|
||||
if api_version:
|
||||
|
|
|
@ -0,0 +1,268 @@
|
|||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# Copyright 2018 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
import os
|
||||
import pprint
|
||||
import threading
|
||||
import time
|
||||
import select
|
||||
import socket
|
||||
|
||||
import fixtures
|
||||
import prometheus_client
|
||||
import testtools.content
|
||||
|
||||
from openstack.tests.unit import base
|
||||
|
||||
|
||||
class StatsdFixture(fixtures.Fixture):
|
||||
def _setUp(self):
|
||||
self.running = True
|
||||
self.thread = threading.Thread(target=self.run)
|
||||
self.thread.daemon = True
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.sock.bind(('', 0))
|
||||
self.port = self.sock.getsockname()[1]
|
||||
self.wake_read, self.wake_write = os.pipe()
|
||||
self.stats = []
|
||||
self.thread.start()
|
||||
self.addCleanup(self._cleanup)
|
||||
|
||||
def run(self):
|
||||
while self.running:
|
||||
poll = select.poll()
|
||||
poll.register(self.sock, select.POLLIN)
|
||||
poll.register(self.wake_read, select.POLLIN)
|
||||
ret = poll.poll()
|
||||
for (fd, event) in ret:
|
||||
if fd == self.sock.fileno():
|
||||
data = self.sock.recvfrom(1024)
|
||||
if not data:
|
||||
return
|
||||
self.stats.append(data[0])
|
||||
if fd == self.wake_read:
|
||||
return
|
||||
|
||||
def _cleanup(self):
|
||||
self.running = False
|
||||
os.write(self.wake_write, b'1\n')
|
||||
self.thread.join()
|
||||
|
||||
|
||||
class TestStats(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.statsd = StatsdFixture()
|
||||
self.useFixture(self.statsd)
|
||||
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
|
||||
# see: https://github.com/jsocol/pystatsd/issues/61
|
||||
self.useFixture(
|
||||
fixtures.EnvironmentVariable('STATSD_HOST', '127.0.0.1'))
|
||||
self.useFixture(
|
||||
fixtures.EnvironmentVariable('STATSD_PORT', str(self.statsd.port)))
|
||||
|
||||
self.add_info_on_exception('statsd_content', self.statsd.stats)
|
||||
# Set up the above things before the super setup so that we have the
|
||||
# environment variables set when the Connection is created.
|
||||
super(TestStats, self).setUp()
|
||||
|
||||
self._registry = prometheus_client.CollectorRegistry()
|
||||
self.cloud.config._collector_registry = self._registry
|
||||
self.addOnException(self._add_prometheus_samples)
|
||||
|
||||
def _add_prometheus_samples(self, exc_info):
|
||||
samples = []
|
||||
for metric in self._registry.collect():
|
||||
for s in metric.samples:
|
||||
samples.append(s)
|
||||
self.addDetail(
|
||||
'prometheus_samples',
|
||||
testtools.content.text_content(pprint.pformat(samples)))
|
||||
|
||||
def assert_reported_stat(self, key, value=None, kind=None):
|
||||
"""Check statsd output
|
||||
|
||||
Check statsd return values. A ``value`` should specify a
|
||||
``kind``, however a ``kind`` may be specified without a
|
||||
``value`` for a generic match. Leave both empy to just check
|
||||
for key presence.
|
||||
|
||||
:arg str key: The statsd key
|
||||
:arg str value: The expected value of the metric ``key``
|
||||
:arg str kind: The expected type of the metric ``key`` For example
|
||||
|
||||
- ``c`` counter
|
||||
- ``g`` gauge
|
||||
- ``ms`` timing
|
||||
- ``s`` set
|
||||
"""
|
||||
|
||||
self.assertIsNotNone(self.statsd)
|
||||
|
||||
if value:
|
||||
self.assertNotEqual(kind, None)
|
||||
|
||||
start = time.time()
|
||||
while time.time() < (start + 1):
|
||||
# Note our fake statsd just queues up results in a queue.
|
||||
# We just keep going through them until we find one that
|
||||
# matches, or fail out. If statsd pipelines are used,
|
||||
# large single packets are sent with stats separated by
|
||||
# newlines; thus we first flatten the stats out into
|
||||
# single entries.
|
||||
stats = itertools.chain.from_iterable(
|
||||
[s.decode('utf-8').split('\n') for s in self.statsd.stats])
|
||||
for stat in stats:
|
||||
k, v = stat.split(':')
|
||||
if key == k:
|
||||
if kind is None:
|
||||
# key with no qualifiers is found
|
||||
return True
|
||||
|
||||
s_value, s_kind = v.split('|')
|
||||
|
||||
# if no kind match, look for other keys
|
||||
if kind != s_kind:
|
||||
continue
|
||||
|
||||
if value:
|
||||
# special-case value|ms because statsd can turn
|
||||
# timing results into float of indeterminate
|
||||
# length, hence foiling string matching.
|
||||
if kind == 'ms':
|
||||
if float(value) == float(s_value):
|
||||
return True
|
||||
if value == s_value:
|
||||
return True
|
||||
# otherwise keep looking for other matches
|
||||
continue
|
||||
|
||||
# this key matches
|
||||
return True
|
||||
time.sleep(0.1)
|
||||
|
||||
raise Exception("Key %s not found in reported stats" % key)
|
||||
|
||||
def assert_prometheus_stat(self, name, value, labels=None):
|
||||
sample_value = self._registry.get_sample_value(name, labels)
|
||||
self.assertEqual(sample_value, value)
|
||||
|
||||
def test_list_projects(self):
|
||||
|
||||
mock_uri = self.get_mock_url(
|
||||
service_type='identity', interface='admin', resource='projects',
|
||||
base_url_append='v3')
|
||||
|
||||
self.register_uris([
|
||||
dict(method='GET', uri=mock_uri, status_code=200,
|
||||
json={'projects': []})])
|
||||
|
||||
self.cloud.list_projects()
|
||||
self.assert_calls()
|
||||
|
||||
self.assert_reported_stat(
|
||||
'openstack.api.identity.GET.projects', value='1', kind='c')
|
||||
self.assert_prometheus_stat(
|
||||
'openstack_http_requests_total', 1, dict(
|
||||
service_type='identity',
|
||||
endpoint=mock_uri,
|
||||
method='GET',
|
||||
status_code='200'))
|
||||
|
||||
def test_projects(self):
|
||||
mock_uri = self.get_mock_url(
|
||||
service_type='identity', interface='admin', resource='projects',
|
||||
base_url_append='v3')
|
||||
|
||||
self.register_uris([
|
||||
dict(method='GET', uri=mock_uri, status_code=200,
|
||||
json={'projects': []})])
|
||||
|
||||
list(self.cloud.identity.projects())
|
||||
self.assert_calls()
|
||||
|
||||
self.assert_reported_stat(
|
||||
'openstack.api.identity.GET.projects', value='1', kind='c')
|
||||
self.assert_prometheus_stat(
|
||||
'openstack_http_requests_total', 1, dict(
|
||||
service_type='identity',
|
||||
endpoint=mock_uri,
|
||||
method='GET',
|
||||
status_code='200'))
|
||||
|
||||
def test_servers(self):
|
||||
|
||||
mock_uri = 'https://compute.example.com/v2.1/servers/detail'
|
||||
|
||||
self.register_uris([
|
||||
dict(method='GET', uri=mock_uri, status_code=200,
|
||||
json={'servers': []})])
|
||||
|
||||
list(self.cloud.compute.servers())
|
||||
self.assert_calls()
|
||||
|
||||
self.assert_reported_stat(
|
||||
'openstack.api.compute.GET.servers.detail', value='1', kind='c')
|
||||
self.assert_prometheus_stat(
|
||||
'openstack_http_requests_total', 1, dict(
|
||||
service_type='compute',
|
||||
endpoint=mock_uri,
|
||||
method='GET',
|
||||
status_code='200'))
|
||||
|
||||
def test_servers_no_detail(self):
|
||||
|
||||
mock_uri = 'https://compute.example.com/v2.1/servers'
|
||||
|
||||
self.register_uris([
|
||||
dict(method='GET', uri=mock_uri, status_code=200,
|
||||
json={'servers': []})])
|
||||
|
||||
self.cloud.compute.get('/servers')
|
||||
self.assert_calls()
|
||||
|
||||
self.assert_reported_stat(
|
||||
'openstack.api.compute.GET.servers', value='1', kind='c')
|
||||
self.assert_prometheus_stat(
|
||||
'openstack_http_requests_total', 1, dict(
|
||||
service_type='compute',
|
||||
endpoint=mock_uri,
|
||||
method='GET',
|
||||
status_code='200'))
|
||||
|
||||
|
||||
class TestNoStats(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestNoStats, self).setUp()
|
||||
self.statsd = StatsdFixture()
|
||||
self.useFixture(self.statsd)
|
||||
|
||||
def test_no_stats(self):
|
||||
|
||||
mock_uri = self.get_mock_url(
|
||||
service_type='identity', interface='admin', resource='projects',
|
||||
base_url_append='v3')
|
||||
|
||||
self.register_uris([
|
||||
dict(method='GET', uri=mock_uri, status_code=200,
|
||||
json={'projects': []})])
|
||||
|
||||
self.cloud.identity._statsd_client = None
|
||||
list(self.cloud.identity.projects())
|
||||
self.assert_calls()
|
||||
self.assertEqual([], self.statsd.stats)
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Added support for collecting and reporting stats on calls made to
|
||||
statsd and prometheus.
|
|
@ -8,9 +8,11 @@ extras>=1.0.0 # MIT
|
|||
fixtures>=3.0.0 # Apache-2.0/BSD
|
||||
jsonschema<3.0.0,>=2.6.0 # MIT
|
||||
mock>=2.0.0 # BSD
|
||||
prometheus-client>=0.4.2 # Apache-2.0
|
||||
python-subunit>=1.0.0 # Apache-2.0/BSD
|
||||
oslotest>=3.2.0 # Apache-2.0
|
||||
requests-mock>=1.2.0 # Apache-2.0
|
||||
statsd>=3.3.0
|
||||
stestr>=1.0.0 # Apache-2.0
|
||||
testrepository>=0.0.18 # Apache-2.0/BSD
|
||||
testscenarios>=0.4 # Apache-2.0/BSD
|
||||
|
|
Loading…
Reference in New Issue