Merge "OSprofiler with Jaeger Tracing as backend"
This commit is contained in:
commit
eb6376c052
|
@ -1,6 +1,8 @@
|
|||
coverage===4.0
|
||||
ddt===1.0.1
|
||||
elasticsearch===2.0.0
|
||||
futures===3.0.0
|
||||
jaeger-client==3.8.0
|
||||
mock===2.0.0
|
||||
netaddr===0.7.18
|
||||
openstackdocstheme===1.18.1
|
||||
|
|
|
@ -18,8 +18,10 @@ import hashlib
|
|||
import hmac
|
||||
import json
|
||||
import os
|
||||
import uuid
|
||||
|
||||
from oslo_utils import secretutils
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
|
||||
|
||||
|
@ -147,3 +149,13 @@ def import_modules_from_package(package):
|
|||
new_package = ".".join(root.split(os.sep)).split("....")[1]
|
||||
module_name = "%s.%s" % (new_package, filename[:-3])
|
||||
__import__(module_name)
|
||||
|
||||
|
||||
def shorten_id(span_id):
|
||||
"""Convert from uuid4 to 64 bit id for OpenTracing"""
|
||||
try:
|
||||
short_id = uuid.UUID(span_id).int & (1 << 64) - 1
|
||||
except ValueError:
|
||||
# Return a new short id for this
|
||||
short_id = shorten_id(uuidutils.generate_uuid())
|
||||
return short_id
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from osprofiler.drivers import base # noqa
|
||||
from osprofiler.drivers import elasticsearch_driver # noqa
|
||||
from osprofiler.drivers import jaeger # noqa
|
||||
from osprofiler.drivers import loginsight # noqa
|
||||
from osprofiler.drivers import messaging # noqa
|
||||
from osprofiler.drivers import mongodb # noqa
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
# Copyright 2018 Fujitsu Ltd.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import datetime
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_serialization import jsonutils
|
||||
import six.moves.urllib.parse as parser
|
||||
|
||||
from osprofiler import _utils as utils
|
||||
from osprofiler.drivers import base
|
||||
from osprofiler import exc
|
||||
|
||||
|
||||
class Jaeger(base.Driver):
|
||||
def __init__(self, connection_str, project=None, service=None, host=None,
|
||||
conf=cfg.CONF, **kwargs):
|
||||
"""Jaeger driver for OSProfiler."""
|
||||
|
||||
super(Jaeger, self).__init__(connection_str, project=project,
|
||||
service=service, host=host,
|
||||
conf=conf, **kwargs)
|
||||
try:
|
||||
import jaeger_client
|
||||
self.jaeger_client = jaeger_client
|
||||
except ImportError:
|
||||
raise exc.CommandError(
|
||||
"To use OSProfiler with Uber Jaeger tracer, "
|
||||
"you have to install `jaeger-client` manually. "
|
||||
"Install with pip:\n `pip install jaeger-client`."
|
||||
)
|
||||
|
||||
parsed_url = parser.urlparse(connection_str)
|
||||
cfg = {
|
||||
"local_agent": {
|
||||
"reporting_host": parsed_url.hostname,
|
||||
"reporting_port": parsed_url.port,
|
||||
}
|
||||
}
|
||||
|
||||
# Initialize tracer for each profiler
|
||||
service_name = "{}-{}".format(project, service)
|
||||
config = jaeger_client.Config(cfg, service_name=service_name)
|
||||
self.tracer = config.initialize_tracer()
|
||||
|
||||
self.spans = collections.deque()
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
return "jaeger"
|
||||
|
||||
def notify(self, payload):
|
||||
if payload["name"].endswith("start"):
|
||||
timestamp = datetime.datetime.strptime(payload["timestamp"],
|
||||
"%Y-%m-%dT%H:%M:%S.%f")
|
||||
epoch = datetime.datetime.utcfromtimestamp(0)
|
||||
start_time = (timestamp - epoch).total_seconds()
|
||||
|
||||
# Create parent span
|
||||
child_of = self.jaeger_client.SpanContext(
|
||||
trace_id=utils.shorten_id(payload["base_id"]),
|
||||
span_id=utils.shorten_id(payload["parent_id"]),
|
||||
parent_id=None,
|
||||
flags=self.jaeger_client.span.SAMPLED_FLAG
|
||||
)
|
||||
|
||||
# Create Jaeger Tracing span
|
||||
span = self.tracer.start_span(
|
||||
operation_name=payload["name"].rstrip("-start"),
|
||||
child_of=child_of,
|
||||
tags=self.create_span_tags(payload),
|
||||
start_time=start_time
|
||||
)
|
||||
|
||||
# Replace Jaeger Tracing span_id (random id) to OSProfiler span_id
|
||||
span.context.span_id = utils.shorten_id(payload["trace_id"])
|
||||
self.spans.append(span)
|
||||
else:
|
||||
span = self.spans.pop()
|
||||
|
||||
# Store result of db call and function call
|
||||
for call in ("db", "function"):
|
||||
if payload.get("info", {}).get(call) is not None:
|
||||
span.set_tag("result", payload["info"][call]["result"])
|
||||
|
||||
# Span error tag and log
|
||||
if payload["info"].get("etype") is not None:
|
||||
span.set_tag("error", True)
|
||||
span.log_kv({"error.kind": payload["info"]["etype"]})
|
||||
span.log_kv({"message": payload["info"]["message"]})
|
||||
|
||||
span.finish(finish_time=time.time())
|
||||
|
||||
def get_report(self, base_id):
|
||||
"""Please use Jaeger Tracing UI for this task."""
|
||||
return self._parse_results()
|
||||
|
||||
def list_traces(self, fields=None):
|
||||
"""Please use Jaeger Tracing UI for this task."""
|
||||
return []
|
||||
|
||||
def list_error_traces(self):
|
||||
"""Please use Jaeger Tracing UI for this task."""
|
||||
return []
|
||||
|
||||
def create_span_tags(self, payload):
|
||||
"""Create tags for OpenTracing span.
|
||||
|
||||
:param info: Information from OSProfiler trace.
|
||||
:returns tags: A dictionary contains standard tags
|
||||
from OpenTracing sematic conventions,
|
||||
and some other custom tags related to http, db calls.
|
||||
"""
|
||||
tags = {}
|
||||
info = payload["info"]
|
||||
|
||||
if info.get("db"):
|
||||
# DB calls
|
||||
tags["db.statement"] = info["db"]["statement"]
|
||||
tags["db.params"] = jsonutils.dumps(info["db"]["params"])
|
||||
elif info.get("request"):
|
||||
# WSGI call
|
||||
tags["http.path"] = info["request"]["path"]
|
||||
tags["http.query"] = info["request"]["query"]
|
||||
tags["http.method"] = info["request"]["method"]
|
||||
tags["http.scheme"] = info["request"]["scheme"]
|
||||
elif info.get("function"):
|
||||
# RPC, function calls
|
||||
tags["args"] = info["function"]["args"]
|
||||
tags["kwargs"] = info["function"]["kwargs"]
|
||||
tags["name"] = info["function"]["name"]
|
||||
|
||||
return tags
|
|
@ -87,10 +87,11 @@ sets the notifier to oslo_messaging.
|
|||
|
||||
Examples of possible values:
|
||||
|
||||
* messaging://: use oslo_messaging driver for sending notifications.
|
||||
* mongodb://127.0.0.1:27017 : use mongodb driver for sending notifications.
|
||||
* elasticsearch://127.0.0.1:9200 : use elasticsearch driver for sending
|
||||
notifications.
|
||||
* messaging:// - use oslo_messaging driver for sending spans.
|
||||
* redis://127.0.0.1:6379 - use redis driver for sending spans.
|
||||
* mongodb://127.0.0.1:27017 - use mongodb driver for sending spans.
|
||||
* elasticsearch://127.0.0.1:9200 - use elasticsearch driver for sending spans.
|
||||
* jaeger://127.0.0.1:6831 - use jaeger tracing as driver for sending spans.
|
||||
""")
|
||||
|
||||
_es_doc_type_opt = cfg.StrOpt(
|
||||
|
|
|
@ -23,6 +23,7 @@ import threading
|
|||
from oslo_utils import reflection
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from osprofiler import _utils as utils
|
||||
from osprofiler import notifier
|
||||
|
||||
|
||||
|
@ -343,7 +344,10 @@ class Trace(object):
|
|||
|
||||
def __exit__(self, etype, value, traceback):
|
||||
if etype:
|
||||
info = {"etype": reflection.get_class_name(etype)}
|
||||
info = {
|
||||
"etype": reflection.get_class_name(etype),
|
||||
"message": value.args[0] if value.args else None
|
||||
}
|
||||
stop(info=info)
|
||||
else:
|
||||
stop()
|
||||
|
@ -359,6 +363,14 @@ class _Profiler(object):
|
|||
self._name = collections.deque()
|
||||
self._host = socket.gethostname()
|
||||
|
||||
def get_shorten_id(self, uuid_id):
|
||||
"""Return shorten id of a uuid that will be used in OpenTracing drivers
|
||||
|
||||
:param uuid_id: A string of uuid that was generated by uuidutils
|
||||
:returns: A shorter 64-bit long id
|
||||
"""
|
||||
return format(utils.shorten_id(uuid_id), "x")
|
||||
|
||||
def get_base_id(self):
|
||||
"""Return base id of a trace.
|
||||
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
# Copyright 2018 Fujitsu Ltd.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from osprofiler.drivers import jaeger
|
||||
from osprofiler.tests import test
|
||||
|
||||
|
||||
class JaegerTestCase(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(JaegerTestCase, self).setUp()
|
||||
self.payload_start = {
|
||||
"name": "api-start",
|
||||
"base_id": "4e3e0ec6-2938-40b1-8504-09eb1d4b0dee",
|
||||
"trace_id": "1c089ea8-28fe-4f3d-8c00-f6daa2bc32f1",
|
||||
"parent_id": "e2715537-3d1c-4f0c-b3af-87355dc5fc5b",
|
||||
"timestamp": "2018-05-03T04:31:51.781381",
|
||||
"info": {
|
||||
"host": "test"
|
||||
}
|
||||
}
|
||||
|
||||
self.payload_stop = {
|
||||
"name": "api-stop",
|
||||
"base_id": "4e3e0ec6-2938-40b1-8504-09eb1d4b0dee",
|
||||
"trace_id": "1c089ea8-28fe-4f3d-8c00-f6daa2bc32f1",
|
||||
"parent_id": "e2715537-3d1c-4f0c-b3af-87355dc5fc5b",
|
||||
"timestamp": "2018-05-03T04:31:51.781381",
|
||||
"info": {
|
||||
"host": "test",
|
||||
"function": {
|
||||
"result": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.driver = jaeger.Jaeger("jaeger://127.0.0.1:6831",
|
||||
project="nova", service="api")
|
||||
|
||||
@mock.patch("osprofiler._utils.shorten_id")
|
||||
def test_notify_start(self, mock_shorten_id):
|
||||
self.driver.notify(self.payload_start)
|
||||
calls = [
|
||||
mock.call(self.payload_start["base_id"]),
|
||||
mock.call(self.payload_start["parent_id"]),
|
||||
mock.call(self.payload_start["trace_id"])
|
||||
]
|
||||
mock_shorten_id.assert_has_calls(calls, any_order=True)
|
||||
|
||||
@mock.patch("jaeger_client.span.Span")
|
||||
@mock.patch("time.time")
|
||||
def test_notify_stop(self, mock_time, mock_span):
|
||||
fake_time = 1525416065.5958152
|
||||
mock_time.return_value = fake_time
|
||||
|
||||
span = mock_span()
|
||||
self.driver.spans.append(mock_span())
|
||||
|
||||
self.driver.notify(self.payload_stop)
|
||||
|
||||
mock_time.assert_called_once()
|
||||
mock_time.reset_mock()
|
||||
|
||||
span.finish.assert_called_once_with(finish_time=fake_time)
|
|
@ -62,6 +62,13 @@ class ProfilerGlobMethodsTestCase(test.TestCase):
|
|||
|
||||
class ProfilerTestCase(test.TestCase):
|
||||
|
||||
def test_profiler_get_shorten_id(self):
|
||||
uuid_id = "4e3e0ec6-2938-40b1-8504-09eb1d4b0dee"
|
||||
prof = profiler._Profiler("secret", base_id="1", parent_id="2")
|
||||
result = prof.get_shorten_id(uuid_id)
|
||||
expected = "850409eb1d4b0dee"
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
def test_profiler_get_base_id(self):
|
||||
prof = profiler._Profiler("secret", base_id="1", parent_id="2")
|
||||
self.assertEqual(prof.get_base_id(), "1")
|
||||
|
@ -167,7 +174,10 @@ class WithTraceTestCase(test.TestCase):
|
|||
|
||||
self.assertRaises(ValueError, foo)
|
||||
mock_start.assert_called_once_with("foo", info=None)
|
||||
mock_stop.assert_called_once_with(info={"etype": "ValueError"})
|
||||
mock_stop.assert_called_once_with(info={
|
||||
"etype": "ValueError",
|
||||
"message": "bar"
|
||||
})
|
||||
|
||||
|
||||
@profiler.trace("function", info={"info": "some_info"})
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
|
||||
|
@ -111,6 +112,29 @@ class UtilsTestCase(test.TestCase):
|
|||
|
||||
self.assertIsNone(utils.signed_unpack(data, hmac_data, hmac))
|
||||
|
||||
def test_shorten_id_with_valid_uuid(self):
|
||||
valid_id = "4e3e0ec6-2938-40b1-8504-09eb1d4b0dee"
|
||||
|
||||
uuid_obj = uuid.UUID(valid_id)
|
||||
|
||||
with mock.patch("uuid.UUID") as mock_uuid:
|
||||
mock_uuid.return_value = uuid_obj
|
||||
|
||||
result = utils.shorten_id(valid_id)
|
||||
expected = 9584796812364680686
|
||||
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
@mock.patch("oslo_utils.uuidutils.generate_uuid")
|
||||
def test_shorten_id_with_invalid_uuid(self, mock_gen_uuid):
|
||||
invalid_id = "invalid"
|
||||
mock_gen_uuid.return_value = "1c089ea8-28fe-4f3d-8c00-f6daa2bc32f1"
|
||||
|
||||
result = utils.shorten_id(invalid_id)
|
||||
expected = 10088334584203457265
|
||||
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
def test_itersubclasses(self):
|
||||
|
||||
class A(object):
|
||||
|
|
|
@ -22,3 +22,7 @@ redis # MIT
|
|||
|
||||
# Build release notes
|
||||
reno # Apache-2.0
|
||||
|
||||
# For Jaeger Tracing
|
||||
jaeger-client # Apache-2.0
|
||||
futures;python_version=='2.7' or python_version=='2.6' # PSF
|
||||
|
|
Loading…
Reference in New Issue