New feature: Add zipkin tracing to eventlet

Zipkin is a trend distributed tracing framewrok developed at Twitter.
Such tracing is useful for both developers and operatos to
understand the behavior of complex distributed systems
and find performance bottlenecks.

This patch provides a WSGI application using eventlet
with tracing facility that complies with Zipkin.

Signed-off-by: Yuichi Bando <bando.yuichi@lab.ntt.co.jp>

Original commit modified for PEP-8 fixes.
https://github.com/eventlet/eventlet/pull/218
This commit is contained in:
Yuichi Bando 2015-03-26 14:08:26 +09:00 committed by Sergey Shepelev
parent 86607d3820
commit 654a271b82
18 changed files with 1139 additions and 0 deletions

135
eventlet/zipkin/README.rst Normal file
View File

@ -0,0 +1,135 @@
eventlet.zipkin
===============
`Zipkin <http://twitter.github.io/zipkin/>`_ is a distributed tracing system developed at Twitter.
This package provides a WSGI application using eventlet
with tracing facility that complies with Zipkin.
Why use it?
From the http://twitter.github.io/zipkin/:
"Collecting traces helps developers gain deeper knowledge about how
certain requests perform in a distributed system. Let's say we're having
problems with user requests timing out. We can look up traced requests
that timed out and display it in the web UI. We'll be able to quickly
find the service responsible for adding the unexpected response time. If
the service has been annotated adequately we can also find out where in
that service the issue is happening."
Screenshot
----------
Zipkin web ui screenshots obtained when applying this module to
`OpenStack swift <https://github.com/openstack/swift>`_ are in example/.
Requirement
-----------
A eventlet.zipkin needs `python scribe client <https://pypi.python.org/pypi/facebook-scribe/>`_
and `thrift <https://thrift.apache.org/>`_ (>=0.9),
because the zipkin collector speaks `scribe <https://github.com/facebookarchive/scribe>`_ protocol.
Below command will install both scribe client and thrift.
Install facebook-scribe:
::
pip install facebook-scribe
**Python**: ``2.6``, ``2.7`` (Because the current Python Thrift release doesn't
support Python 3)
How to use
----------
Add tracing facility to your application
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Apply the monkey patch before you start wsgi server.
.. code:: python
# Add only 2 lines to your code
from eventlet.zipkin import patcher
patcher.enable_trace_patch()
# existing code
from eventlet import wsgi
wsgi.server(sock, app)
You can pass some parameters to ``enable_trace_patch()``
* host: Scribe daemon IP address (default: '127.0.0.1')
* port: Scribe daemon port (default: 9410)
* trace_app_log: A Boolean indicating if the tracer will trace application log together or not. This facility assume that your application uses python standard logging library. (default: False)
* sampling_rate: A Float value (0.0~1.0) that indicates the tracing frequency. If you specify 1.0, all requests are traced and sent to Zipkin collecotr. If you specify 0.1, only 1/10 requests are traced. (defult: 1.0)
(Option) Annotation API
~~~~~~~~~~~~~~~~~~~~~~~
If you want to record additional information,
you can use below API from anywhere in your code.
.. code:: python
from eventlet.zipkin import api
api.put_annotation('Cache miss for %s' % request)
api.put_key_value('key', 'value')
Zipkin simple setup
-------------------
::
$ git clone https://github.com/twitter/zipkin.git
$ cd zipkin
# Open 3 terminals
(terminal1) $ bin/collector
(terminal2) $ bin/query
(terminal3) $ bin/web
Access http://localhost:8080 from your browser.
(Option) fluentd
----------------
If you want to buffer the tracing data for performance,
`fluentd scribe plugin <http://docs.fluentd.org/articles/in_scribe>`_ is available.
Since ``out_scribe plugin`` extends `Buffer Plugin <http://docs.fluentd.org/articles/buffer-plugin-overview>`_ ,
you can customize buffering parameters in the manner of fluentd.
Scribe plugin is included in td-agent by default.
Sample: ``/etc/td-agent/td-agent.conf``
::
# in_scribe
<source>
type scribe
port 9999
</source>
# out_scribe
<match zipkin.**>
type scribe
host Zipkin_collector_IP
port 9410
flush_interval 60s
buffer_chunk_limit 256m
</match>
| And, you need to specify ``patcher.enable_trace_patch(port=9999)`` for in_scribe.
| In this case, trace data is passed like below.
| Your application => Local fluentd in_scribe (9999) => Local fluentd out_scribe <buffering> =====> Remote zipkin collector (9410)

View File

View File

@ -0,0 +1,8 @@
_thrift
========
* This directory is auto-generated by Thrift Compiler by using
https://github.com/twitter/zipkin/blob/master/zipkin-thrift/src/main/thrift/com/twitter/zipkin/zipkinCore.thrift
* Do not modify this directory.

View File

View File

@ -0,0 +1,55 @@
# Copyright 2012 Twitter Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
namespace java com.twitter.zipkin.gen
namespace rb Zipkin
//************** Collection related structs **************
// these are the annotations we always expect to find in a span
const string CLIENT_SEND = "cs"
const string CLIENT_RECV = "cr"
const string SERVER_SEND = "ss"
const string SERVER_RECV = "sr"
// this represents a host and port in a network
struct Endpoint {
1: i32 ipv4,
2: i16 port // beware that this will give us negative ports. some conversion needed
3: string service_name // which service did this operation happen on?
}
// some event took place, either one by the framework or by the user
struct Annotation {
1: i64 timestamp // microseconds from epoch
2: string value // what happened at the timestamp?
3: optional Endpoint host // host this happened on
}
enum AnnotationType { BOOL, BYTES, I16, I32, I64, DOUBLE, STRING }
struct BinaryAnnotation {
1: string key,
2: binary value,
3: AnnotationType annotation_type,
4: optional Endpoint host
}
struct Span {
1: i64 trace_id // unique trace id, use for all spans in trace
3: string name, // span name, rpc method for example
4: i64 id, // unique span id, only used for this span
5: optional i64 parent_id, // parent span id
6: list<Annotation> annotations, // list of all annotations/events that occured
8: list<BinaryAnnotation> binary_annotations // any binary annotations
}

View File

@ -0,0 +1 @@
__all__ = ['ttypes', 'constants']

View File

@ -0,0 +1,14 @@
#
# Autogenerated by Thrift Compiler (0.8.0)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
#
from thrift.Thrift import TType, TMessageType, TException
from ttypes import *
CLIENT_SEND = "cs"
CLIENT_RECV = "cr"
SERVER_SEND = "ss"
SERVER_RECV = "sr"

View File

@ -0,0 +1,452 @@
#
# Autogenerated by Thrift Compiler (0.8.0)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
#
from thrift.Thrift import TType, TMessageType, TException
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol, TProtocol
try:
from thrift.protocol import fastbinary
except:
fastbinary = None
class AnnotationType:
BOOL = 0
BYTES = 1
I16 = 2
I32 = 3
I64 = 4
DOUBLE = 5
STRING = 6
_VALUES_TO_NAMES = {
0: "BOOL",
1: "BYTES",
2: "I16",
3: "I32",
4: "I64",
5: "DOUBLE",
6: "STRING",
}
_NAMES_TO_VALUES = {
"BOOL": 0,
"BYTES": 1,
"I16": 2,
"I32": 3,
"I64": 4,
"DOUBLE": 5,
"STRING": 6,
}
class Endpoint:
"""
Attributes:
- ipv4
- port
- service_name
"""
thrift_spec = (
None, # 0
(1, TType.I32, 'ipv4', None, None, ), # 1
(2, TType.I16, 'port', None, None, ), # 2
(3, TType.STRING, 'service_name', None, None, ), # 3
)
def __init__(self, ipv4=None, port=None, service_name=None,):
self.ipv4 = ipv4
self.port = port
self.service_name = service_name
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
return
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.I32:
self.ipv4 = iprot.readI32();
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.I16:
self.port = iprot.readI16();
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.STRING:
self.service_name = iprot.readString();
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('Endpoint')
if self.ipv4 is not None:
oprot.writeFieldBegin('ipv4', TType.I32, 1)
oprot.writeI32(self.ipv4)
oprot.writeFieldEnd()
if self.port is not None:
oprot.writeFieldBegin('port', TType.I16, 2)
oprot.writeI16(self.port)
oprot.writeFieldEnd()
if self.service_name is not None:
oprot.writeFieldBegin('service_name', TType.STRING, 3)
oprot.writeString(self.service_name)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
return
def __repr__(self):
L = ['%s=%r' % (key, value)
for key, value in self.__dict__.iteritems()]
return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
def __eq__(self, other):
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not (self == other)
class Annotation:
"""
Attributes:
- timestamp
- value
- host
"""
thrift_spec = (
None, # 0
(1, TType.I64, 'timestamp', None, None, ), # 1
(2, TType.STRING, 'value', None, None, ), # 2
(3, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 3
)
def __init__(self, timestamp=None, value=None, host=None,):
self.timestamp = timestamp
self.value = value
self.host = host
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
return
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.I64:
self.timestamp = iprot.readI64();
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.STRING:
self.value = iprot.readString();
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.STRUCT:
self.host = Endpoint()
self.host.read(iprot)
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('Annotation')
if self.timestamp is not None:
oprot.writeFieldBegin('timestamp', TType.I64, 1)
oprot.writeI64(self.timestamp)
oprot.writeFieldEnd()
if self.value is not None:
oprot.writeFieldBegin('value', TType.STRING, 2)
oprot.writeString(self.value)
oprot.writeFieldEnd()
if self.host is not None:
oprot.writeFieldBegin('host', TType.STRUCT, 3)
self.host.write(oprot)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
return
def __repr__(self):
L = ['%s=%r' % (key, value)
for key, value in self.__dict__.iteritems()]
return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
def __eq__(self, other):
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not (self == other)
class BinaryAnnotation:
"""
Attributes:
- key
- value
- annotation_type
- host
"""
thrift_spec = (
None, # 0
(1, TType.STRING, 'key', None, None, ), # 1
(2, TType.STRING, 'value', None, None, ), # 2
(3, TType.I32, 'annotation_type', None, None, ), # 3
(4, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 4
)
def __init__(self, key=None, value=None, annotation_type=None, host=None,):
self.key = key
self.value = value
self.annotation_type = annotation_type
self.host = host
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
return
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.STRING:
self.key = iprot.readString();
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.STRING:
self.value = iprot.readString();
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.I32:
self.annotation_type = iprot.readI32();
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.STRUCT:
self.host = Endpoint()
self.host.read(iprot)
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('BinaryAnnotation')
if self.key is not None:
oprot.writeFieldBegin('key', TType.STRING, 1)
oprot.writeString(self.key)
oprot.writeFieldEnd()
if self.value is not None:
oprot.writeFieldBegin('value', TType.STRING, 2)
oprot.writeString(self.value)
oprot.writeFieldEnd()
if self.annotation_type is not None:
oprot.writeFieldBegin('annotation_type', TType.I32, 3)
oprot.writeI32(self.annotation_type)
oprot.writeFieldEnd()
if self.host is not None:
oprot.writeFieldBegin('host', TType.STRUCT, 4)
self.host.write(oprot)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
return
def __repr__(self):
L = ['%s=%r' % (key, value)
for key, value in self.__dict__.iteritems()]
return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
def __eq__(self, other):
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not (self == other)
class Span:
"""
Attributes:
- trace_id
- name
- id
- parent_id
- annotations
- binary_annotations
"""
thrift_spec = (
None, # 0
(1, TType.I64, 'trace_id', None, None, ), # 1
None, # 2
(3, TType.STRING, 'name', None, None, ), # 3
(4, TType.I64, 'id', None, None, ), # 4
(5, TType.I64, 'parent_id', None, None, ), # 5
(6, TType.LIST, 'annotations', (TType.STRUCT,(Annotation, Annotation.thrift_spec)), None, ), # 6
None, # 7
(8, TType.LIST, 'binary_annotations', (TType.STRUCT,(BinaryAnnotation, BinaryAnnotation.thrift_spec)), None, ), # 8
)
def __init__(self, trace_id=None, name=None, id=None, parent_id=None, annotations=None, binary_annotations=None,):
self.trace_id = trace_id
self.name = name
self.id = id
self.parent_id = parent_id
self.annotations = annotations
self.binary_annotations = binary_annotations
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
return
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.I64:
self.trace_id = iprot.readI64();
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.STRING:
self.name = iprot.readString();
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.I64:
self.id = iprot.readI64();
else:
iprot.skip(ftype)
elif fid == 5:
if ftype == TType.I64:
self.parent_id = iprot.readI64();
else:
iprot.skip(ftype)
elif fid == 6:
if ftype == TType.LIST:
self.annotations = []
(_etype3, _size0) = iprot.readListBegin()
for _i4 in xrange(_size0):
_elem5 = Annotation()
_elem5.read(iprot)
self.annotations.append(_elem5)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 8:
if ftype == TType.LIST:
self.binary_annotations = []
(_etype9, _size6) = iprot.readListBegin()
for _i10 in xrange(_size6):
_elem11 = BinaryAnnotation()
_elem11.read(iprot)
self.binary_annotations.append(_elem11)
iprot.readListEnd()
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('Span')
if self.trace_id is not None:
oprot.writeFieldBegin('trace_id', TType.I64, 1)
oprot.writeI64(self.trace_id)
oprot.writeFieldEnd()
if self.name is not None:
oprot.writeFieldBegin('name', TType.STRING, 3)
oprot.writeString(self.name)
oprot.writeFieldEnd()
if self.id is not None:
oprot.writeFieldBegin('id', TType.I64, 4)
oprot.writeI64(self.id)
oprot.writeFieldEnd()
if self.parent_id is not None:
oprot.writeFieldBegin('parent_id', TType.I64, 5)
oprot.writeI64(self.parent_id)
oprot.writeFieldEnd()
if self.annotations is not None:
oprot.writeFieldBegin('annotations', TType.LIST, 6)
oprot.writeListBegin(TType.STRUCT, len(self.annotations))
for iter12 in self.annotations:
iter12.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.binary_annotations is not None:
oprot.writeFieldBegin('binary_annotations', TType.LIST, 8)
oprot.writeListBegin(TType.STRUCT, len(self.binary_annotations))
for iter13 in self.binary_annotations:
iter13.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
return
def __repr__(self):
L = ['%s=%r' % (key, value)
for key, value in self.__dict__.iteritems()]
return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
def __eq__(self, other):
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not (self == other)

186
eventlet/zipkin/api.py Normal file
View File

@ -0,0 +1,186 @@
import os
import sys
import time
import struct
import socket
import random
from eventlet.green import threading
from eventlet.zipkin._thrift.zipkinCore import ttypes
from eventlet.zipkin._thrift.zipkinCore.constants import SERVER_SEND
client = None
_tls = threading.local() # thread local storage
def put_annotation(msg, endpoint=None):
""" This is annotation API.
You can add your own annotation from in your code.
Annotation is recorded with timestamp automatically.
e.g.) put_annotation('cache hit for %s' % request)
:param msg: String message
:param endpoint: host info
"""
if is_sample():
a = ZipkinDataBuilder.build_annotation(msg, endpoint)
trace_data = get_trace_data()
trace_data.add_annotation(a)
def put_key_value(key, value, endpoint=None):
""" This is binary annotation API.
You can add your own key-value extra information from in your code.
Key-value doesn't have a time component.
e.g.) put_key_value('http.uri', '/hoge/index.html')
:param key: String
:param value: String
:param endpoint: host info
"""
if is_sample():
b = ZipkinDataBuilder.build_binary_annotation(key, value, endpoint)
trace_data = get_trace_data()
trace_data.add_binary_annotation(b)
def is_tracing():
""" Return whether the current thread is tracking or not """
return hasattr(_tls, 'trace_data')
def is_sample():
""" Return whether it should record trace information
for the request or not
"""
return is_tracing() and _tls.trace_data.sampled
def get_trace_data():
if is_tracing():
return _tls.trace_data
def set_trace_data(trace_data):
_tls.trace_data = trace_data
def init_trace_data():
if is_tracing():
del _tls.trace_data
def _uniq_id():
"""
Create a random 64-bit signed integer appropriate
for use as trace and span IDs.
XXX: By experimentation zipkin has trouble recording traces with ids
larger than (2 ** 56) - 1
"""
return random.randint(0, (2 ** 56) - 1)
def generate_trace_id():
return _uniq_id()
def generate_span_id():
return _uniq_id()
class TraceData(object):
END_ANNOTATION = SERVER_SEND
def __init__(self, name, trace_id, span_id, parent_id, sampled, endpoint):
"""
:param name: RPC name (String)
:param trace_id: int
:param span_id: int
:param parent_id: int or None
:param sampled: lets the downstream servers know
if I should record trace data for the request (bool)
:param endpoint: zipkin._thrift.zipkinCore.ttypes.EndPoint
"""
self.name = name
self.trace_id = trace_id
self.span_id = span_id
self.parent_id = parent_id
self.sampled = sampled
self.endpoint = endpoint
self.annotations = []
self.bannotations = []
self._done = False
def add_annotation(self, annotation):
if annotation.host is None:
annotation.host = self.endpoint
if not self._done:
self.annotations.append(annotation)
if annotation.value == self.END_ANNOTATION:
self.flush()
def add_binary_annotation(self, bannotation):
if bannotation.host is None:
bannotation.host = self.endpoint
if not self._done:
self.bannotations.append(bannotation)
def flush(self):
span = ZipkinDataBuilder.build_span(name=self.name,
trace_id=self.trace_id,
span_id=self.span_id,
parent_id=self.parent_id,
annotations=self.annotations,
bannotations=self.bannotations)
client.send_to_collector(span)
self.annotations = []
self.bannotations = []
self._done = True
class ZipkinDataBuilder:
@staticmethod
def build_span(name, trace_id, span_id, parent_id,
annotations, bannotations):
return ttypes.Span(
name=name,
trace_id=trace_id,
id=span_id,
parent_id=parent_id,
annotations=annotations,
binary_annotations=bannotations
)
@staticmethod
def build_annotation(value, endpoint=None):
if isinstance(value, unicode):
value = value.encode('utf-8')
return ttypes.Annotation(time.time() * 1000 * 1000,
str(value), endpoint)
@staticmethod
def build_binary_annotation(key, value, endpoint=None):
annotation_type = ttypes.AnnotationType.STRING
return ttypes.BinaryAnnotation(key, value, annotation_type, endpoint)
@staticmethod
def build_endpoint(ipv4=None, port=None, service_name=None):
if ipv4 is not None:
ipv4 = ZipkinDataBuilder._ipv4_to_int(ipv4)
if service_name is None:
service_name = ZipkinDataBuilder._get_script_name()
return ttypes.Endpoint(
ipv4=ipv4,
port=port,
service_name=service_name
)
@staticmethod
def _ipv4_to_int(ipv4):
return struct.unpack('!i', socket.inet_aton(ipv4))[0]
@staticmethod
def _get_script_name():
return os.path.basename(sys.argv[0])

56
eventlet/zipkin/client.py Normal file
View File

@ -0,0 +1,56 @@
import base64
import warnings
from scribe import scribe
from thrift.transport import TTransport, TSocket
from thrift.protocol import TBinaryProtocol
from eventlet import GreenPile
CATEGORY = 'zipkin'
class ZipkinClient(object):
def __init__(self, host='127.0.0.1', port=9410):
"""
:param host: zipkin collector IP addoress (default '127.0.0.1')
:param port: zipkin collector port (default 9410)
"""
self.host = host
self.port = port
self.pile = GreenPile(1)
self._connect()
def _connect(self):
socket = TSocket.TSocket(self.host, self.port)
self.transport = TTransport.TFramedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(self.transport,
False, False)
self.scribe_client = scribe.Client(protocol)
try:
self.transport.open()
except TTransport.TTransportException as e:
warnings.warn(e.message)
def _build_message(self, thrift_obj):
trans = TTransport.TMemoryBuffer()
protocol = TBinaryProtocol.TBinaryProtocolAccelerated(trans=trans)
thrift_obj.write(protocol)
return base64.b64encode(trans.getvalue())
def send_to_collector(self, span):
self.pile.spawn(self._send, span)
def _send(self, span):
log_entry = scribe.LogEntry(CATEGORY, self._build_message(span))
try:
self.scribe_client.Log([log_entry])
except Exception as e:
msg = 'ZipkinClient send error %s' % str(e)
warnings.warn(msg)
self._connect()
def close(self):
self.transport.close()

BIN
eventlet/zipkin/example/ex1.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 52 KiB

BIN
eventlet/zipkin/example/ex2.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 40 KiB

BIN
eventlet/zipkin/example/ex3.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 72 KiB

View File

@ -0,0 +1,33 @@
from eventlet import greenthread
from eventlet.zipkin import api
__original_init__ = greenthread.GreenThread.__init__
__original_main__ = greenthread.GreenThread.main
def _patched__init(self, parent):
# parent thread saves current TraceData from tls to self
if api.is_tracing():
self.trace_data = api.get_trace_data()
__original_init__(self, parent)
def _patched_main(self, function, args, kwargs):
# child thread inherits TraceData
if hasattr(self, 'trace_data'):
api.set_trace_data(self.trace_data)
__original_main__(self, function, args, kwargs)
def patch():
greenthread.GreenThread.__init__ = _patched__init
greenthread.GreenThread.main = _patched_main
def unpatch():
greenthread.GreenThread.__init__ = __original_init__
greenthread.GreenThread.main = __original_main__

61
eventlet/zipkin/http.py Normal file
View File

@ -0,0 +1,61 @@
import warnings
from eventlet.support import six
from eventlet.green import httplib
from eventlet.zipkin import api
# see https://twitter.github.io/zipkin/Instrumenting.html
HDR_TRACE_ID = 'X-B3-TraceId'
HDR_SPAN_ID = 'X-B3-SpanId'
HDR_PARENT_SPAN_ID = 'X-B3-ParentSpanId'
HDR_SAMPLED = 'X-B3-Sampled'
if six.PY2:
__org_endheaders__ = httplib.HTTPConnection.endheaders
__org_begin__ = httplib.HTTPResponse.begin
def _patched_endheaders(self):
if api.is_tracing():
trace_data = api.get_trace_data()
new_span_id = api.generate_span_id()
self.putheader(HDR_TRACE_ID, hex_str(trace_data.trace_id))
self.putheader(HDR_SPAN_ID, hex_str(new_span_id))
self.putheader(HDR_PARENT_SPAN_ID, hex_str(trace_data.span_id))
self.putheader(HDR_SAMPLED, int(trace_data.sampled))
api.put_annotation('Client Send')
__org_endheaders__(self)
def _patched_begin(self):
__org_begin__(self)
if api.is_tracing():
api.put_annotation('Client Recv (%s)' % self.status)
def patch():
if six.PY2:
httplib.HTTPConnection.endheaders = _patched_endheaders
httplib.HTTPResponse.begin = _patched_begin
if six.PY3:
warnings.warn("Since current Python thrift release \
doesn't support Python 3, eventlet.zipkin.http \
doesn't also support Python 3 (http.client)")
def unpatch():
if six.PY2:
httplib.HTTPConnection.endheaders = __org_endheaders__
httplib.HTTPResponse.begin = __org_begin__
if six.PY3:
pass
def hex_str(n):
"""
Thrift uses a binary representation of trace and span ids
HTTP headers use a hexadecimal representation of the same
"""
return '%0.16x' % (n,)

19
eventlet/zipkin/log.py Normal file
View File

@ -0,0 +1,19 @@
import logging
from eventlet.zipkin import api
__original_handle__ = logging.Logger.handle
def _patched_handle(self, record):
__original_handle__(self, record)
api.put_annotation(record.getMessage())
def patch():
logging.Logger.handle = _patched_handle
def unpatch():
logging.Logger.handle = __original_handle__

View File

@ -0,0 +1,41 @@
from eventlet.zipkin import http
from eventlet.zipkin import wsgi
from eventlet.zipkin import greenthread
from eventlet.zipkin import log
from eventlet.zipkin import api
from eventlet.zipkin.client import ZipkinClient
def enable_trace_patch(host='127.0.0.1', port=9410,
trace_app_log=False, sampling_rate=1.0):
""" Apply monkey patch to trace your WSGI application.
:param host: Scribe daemon IP address (default: '127.0.0.1')
:param port: Scribe daemon port (default: 9410)
:param trace_app_log: A Boolean indicating if the tracer will trace
application log together or not. This facility assume that
your application uses python standard logging library.
(default: False)
:param sampling_rate: A Float value (0.0~1.0) that indicates
the tracing frequency. If you specify 1.0, all request
are traced (and sent to Zipkin collecotr).
If you specify 0.1, only 1/10 requests are traced. (default: 1.0)
"""
api.client = ZipkinClient(host, port)
# monkey patch for adding tracing facility
wsgi.patch(sampling_rate)
http.patch()
greenthread.patch()
# monkey patch for capturing application log
if trace_app_log:
log.patch()
def disable_trace_patch():
http.unpatch()
wsgi.unpatch()
greenthread.unpatch()
log.unpatch()
api.client.close()

78
eventlet/zipkin/wsgi.py Normal file
View File

@ -0,0 +1,78 @@
import random
from eventlet import wsgi
from eventlet.zipkin import api
from eventlet.zipkin._thrift.zipkinCore.constants import \
SERVER_RECV, SERVER_SEND
from eventlet.zipkin.http import \
HDR_TRACE_ID, HDR_SPAN_ID, HDR_PARENT_SPAN_ID, HDR_SAMPLED
_sampler = None
__original_handle_one_response__ = wsgi.HttpProtocol.handle_one_response
def _patched_handle_one_response(self):
api.init_trace_data()
trace_id = int_or_none(self.headers.getheader(HDR_TRACE_ID))
span_id = int_or_none(self.headers.getheader(HDR_SPAN_ID))
parent_id = int_or_none(self.headers.getheader(HDR_PARENT_SPAN_ID))
sampled = bool_or_none(self.headers.getheader(HDR_SAMPLED))
if trace_id is None: # front-end server
trace_id = span_id = api.generate_trace_id()
parent_id = None
sampled = _sampler.sampling()
ip, port = self.request.getsockname()[:2]
ep = api.ZipkinDataBuilder.build_endpoint(ip, port)
trace_data = api.TraceData(name=self.command,
trace_id=trace_id,
span_id=span_id,
parent_id=parent_id,
sampled=sampled,
endpoint=ep)
api.set_trace_data(trace_data)
api.put_annotation(SERVER_RECV)
api.put_key_value('http.uri', self.path)
__original_handle_one_response__(self)
if api.is_sample():
api.put_annotation(SERVER_SEND)
class Sampler(object):
def __init__(self, sampling_rate):
self.sampling_rate = sampling_rate
def sampling(self):
# avoid generating unneeded random numbers
if self.sampling_rate == 1.0:
return True
r = random.random()
if r < self.sampling_rate:
return True
return False
def int_or_none(val):
if val is None:
return None
return int(val, 16)
def bool_or_none(val):
if val == '1':
return True
if val == '0':
return False
return None
def patch(sampling_rate):
global _sampler
_sampler = Sampler(sampling_rate)
wsgi.HttpProtocol.handle_one_response = _patched_handle_one_response
def unpatch():
wsgi.HttpProtocol.handle_one_response = __original_handle_one_response__