Retire stackforge/tomograph

This commit is contained in:
Monty Taylor 2015-10-17 16:05:02 -04:00
parent def962c912
commit e0b80e8964
46 changed files with 7 additions and 3247 deletions

View File

@ -1,4 +0,0 @@
[gerrit]
host=review.openstack.org
port=29418
project=stackforge/tomograph.git

View File

@ -1,209 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
THIRD-PARTY DEPENDENCIES
========================
Convenience copies of some third-party dependencies are distributed with
Apache Cassandra as Java jar files in lib/. Licensing information for
these files can be found in the lib/licenses directory.

123
README.md
View File

@ -1,123 +0,0 @@
tomograph
=========
A library to help distributed applications send trace information to
metrics backends like [Zipkin][zipkin] and [Statsd][statsd].
Data Model
----------
A request to a distributed application is modeled as a trace. Each
trace consists of a set of spans, and a span is a set of notes.
Each span's extent is defined by its first and last notes. Any number
of additional notes can be added in between -- for example in a
handler for ERROR-level logging.
The tomograph data model is basically the Dapper/Zipkin data model.
For translation to statsd, we emit the length of the span as a timer
metric, and each note gets emitted individually as a counter metric.
For example, here is a basic client/server interaction. It is one
trace, with two spans, each with two notes -- their beginning and end:
![zipkin client server](https://raw.github.com/timjr/tomograph/master/doc/screenshots/client-server-zipkin.png)
This is the same data as it would be viewed in using the statsd
backend with graphite:
![graphite client server](https://raw.github.com/timjr/tomograph/master/doc/screenshots/client-server-graphite.png)
Tracing Your Application
------------------------
There are a few basic ways to add tracing to your application. The
lowest level one is to call start, stop, and annotate yourself:
import tomograph
tomograph.start('my service', 'a query', '127.0.0.1', 80)
(...)
tomograph.annotate('something happened')
tomograph.tag('key', 'value')
(...)
tomograph.stop('a query')
Each start/stop pair defines a span. Spans can be arbitrarily nested
using this interface as long they stay on a single thread: tomograph
keeps the current span stack in thread local storage.
When continuing a trace from one thread to another, you must grab the
trace token from tomograph and pass it:
token = tomograph.get_trace_info()
(...)
tomograph.start('my service', 'a query', '127.0.0.1', 80, token)
(...)
That will enable tomograph to connect all of the spans into one trace.
Helpers
-------
There are some slightly higher level interfaces to help you add
tracing. For HTTP, add_trace_info_header() will add an X-Trace-Info
header to a dict on the client side, and start_http() will consume
that header on the server side:
def traced_http_client(url, body, headers):
tomograph.start('client', 'http request', socket.gethostname(), 0)
tomograph.add_trace_info_header(headers)
http_request(url, body, headers)
tomograph.stop('http request')
def traced_http_server(request):
tomograph.start_http('server', 'http response', request)
(...)
tomograph.stop('http response')
There's no need to call start and stop yourself -- you can use the
@tomograph.traced decorator:
@tomograph.traced('My Server', 'myfunc')
def myfunc(yadda):
dosomething()
For WSGI pipelines, there's the class tomograph.Middleware that will
consume the X-Trace-Info header. It can be added to a paste pipeline
like so:
[pipeline:foo]
pipeline = tomo foo bar baz...
[filter:tomo]
paste.filter_factory = tomograph:Middleware.factory
service_name = glance-registry
If you use [SQL Alchemy][sql alchemy] in your application, there are
some event listeners available that will trace SQL statement
execution:
_ENGINE = sqlalchemy.create_engine(FLAGS.sql_connection, **engine_args)
sqlalchemy.event.listen(_ENGINE, 'before_execute', tomograph.before_execute('my app'))
sqlalchemy.event.listen(_ENGINE, 'after_execute', tomograph.after_execute('my app'))
sqlalchemy.event.listen(_ENGINE, 'dbapi_error', tomograph.dbapi_error('my app'))
Screenshots
-----------
Here is a slightly more involved example -- a glance image list
command in [Openstack][openstack]. It uses SQL statement tracing and
the tomograph middleware:
![zipkin glance image list](https://raw.github.com/timjr/tomograph/master/doc/screenshots/zipkin-glance-image-list.png)
[openstack]: http://www.openstack.org/
[statsd]: https://github.com/etsy/statsd
[zipkin]: http://twitter.github.com/zipkin/
[sql alchemy]: http://www.sqlalchemy.org/

7
README.rst Normal file
View File

@ -0,0 +1,7 @@
This project is no longer maintained.
The contents of this repository are still available in the Git source code
management system. To see the contents of this repository before it reached
its end of life, please check out the previous commit with
"git checkout HEAD^1".

View File

@ -1,74 +0,0 @@
Tracing Openstack with Tomograph
================================
1. Install Openstack using your preferred method.
2. Git clone tomograph
git clone git@github.com:timjr/tomograph.git
cd tomograph
sudo python setup.py develop
3. Apply tomograph patches to Openstack:
cd nova; patch -p1 < tomograph/doc/openstack-patches/nova-stable-folsom.patch
cd keystone; patch -p1 < tomograph/doc/openstack-patches/keystone-stable-folsom.patch
cd glance; patch -p1 < tomograph/doc/openstack-patches/glance-stable-folsom.patch
cd glance-client; patch -p1 < tomograph/doc/openstack-patches/glance-client-stable-folsom.patch
4. Modify the paste config for glance-registry to include the tomograph middleware:
# in glance-registry-paste.ini:
[pipeline:glance-registry]
pipeline = tomo unauthenticated-context registryapp
[pipeline:glance-registry-keystone]
pipeline = tomo authtoken context registryapp
[filter:tomo]
paste.filter_factory = tomograph:Middleware.factory
service_name = glance-registry
5. Restart Openstack and boot a VM. You should see log messages from the tomograph logging backend:
2013-04-18 02:02:08,797 INFO tomograph.backends.log Span(trace_id=5731049070570866, parent_id=None, ...
Viewing Traces in Zipkin
====================
1. Set up cassandra, (something like the following):
wget http://mirror.metrocast.net/apache/cassandra/1.2.3/apache-cassandra-1.2.3-bin.tar.gz
tar xvzf apache-cassandra-1.2.3-bin.tar.gz
sudo mkdir /var/lib/cassandra
sudo chmod a+rw /var/lib/cassandra
sudo mkdir /var/log/cassandra
sudo chmod a+rw /var/log/cassandra
apache-cassandra-1.2.3/bin/cassandra &> cassandra-out
2. Get zipkin and set up its schema:
git clone git://github.com/twitter/zipkin.git
apache-cassandra-1.2.3/bin/cassandra-cli -host localhost -port 9160 -f zipkin/zipkin-cassandra/src/schema/cassandra-schema.txt
3. Start the zipkin components. Note, you should wait until the build for each component is done before starting the next one, because sbt does not seem to handle multiple builds running in the same directory very well. We use setsid instead of nohup because sbt seems to try to frob the terminal so it gets a SIGTTOU and stops otherwise:
cd zipkin
setsid bin/collector &> collector-out
setsid bin/query &> query-out
setsid bin/web &> web-out
3. Restart Openstack
4. Boot a VM
5. View the trace:
visit http://localhost:8080
select rpcrun_instance from the service menu
make sure the time is set to now or later than now
"find traces"
click on the rpcrun_instance trace

View File

@ -1,30 +0,0 @@
diff --git a/glanceclient/common/http.py b/glanceclient/common/http.py
index 7146ace..52cc409 100644
--- a/glanceclient/common/http.py
+++ b/glanceclient/common/http.py
@@ -20,6 +20,7 @@ import posixpath
import socket
import StringIO
import struct
+import tomograph
import urlparse
try:
@@ -154,6 +155,9 @@ class HTTPClient(object):
if self.auth_token:
kwargs['headers'].setdefault('X-Auth-Token', self.auth_token)
+ tomograph.start('glanceclient', 'http', socket.gethostname(), 0)
+ tomograph.add_trace_info_header(kwargs['headers'])
+
self.log_curl_request(method, url, kwargs)
conn = self.get_connection()
@@ -201,6 +205,7 @@ class HTTPClient(object):
else:
self.log_http_response(resp)
+ tomograph.stop('http')
if 400 <= resp.status < 600:
LOG.error("Request returned failure status.")
raise exc.from_response(resp, body_str)

View File

@ -1,135 +0,0 @@
diff --git a/glance/api/middleware/tomo.py b/glance/api/middleware/tomo.py
new file mode 100644
index 0000000..c4814bf
--- /dev/null
+++ b/glance/api/middleware/tomo.py
@@ -0,0 +1,16 @@
+from glance.common import wsgi
+
+import tomograph
+
+class Tomo(wsgi.Middleware):
+
+ def __init__(self, app):
+ super(Tomo, self).__init__(app)
+
+ def process_request(self, req):
+ """Try to find a version first in the accept header, then the URL"""
+ tomograph.start_http('glanceregistry', 'WSGI', req)
+
+ def process_response(self, resp):
+ tomograph.stop('WSGI')
+ return resp
diff --git a/glance/api/middleware/version_negotiation.py b/glance/api/middleware/version_negotiation.py
index 74cc3bf..fa54253 100644
--- a/glance/api/middleware/version_negotiation.py
+++ b/glance/api/middleware/version_negotiation.py
@@ -26,6 +26,8 @@ from glance.common import wsgi
import glance.openstack.common.log as logging
from glance.openstack.common import cfg
+import tomograph
+
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@@ -39,6 +41,8 @@ class VersionNegotiationFilter(wsgi.Middleware):
def process_request(self, req):
"""Try to find a version first in the accept header, then the URL"""
+ tomograph.start_http('glanceapi', 'WSGI', req)
+
msg = _("Determining version of request: %(method)s %(path)s"
" Accept: %(accept)s")
args = {'method': req.method, 'path': req.path, 'accept': req.accept}
@@ -71,6 +75,10 @@ class VersionNegotiationFilter(wsgi.Middleware):
LOG.debug('new uri %s' % req.path_info)
return None
+ def process_response(self, resp):
+ tomograph.stop('WSGI')
+ return resp
+
def _match_version_string(self, subject):
"""
Given a string, tries to match a major and/or
diff --git a/glance/common/client.py b/glance/common/client.py
index 88dbda7..5e03a15 100644
--- a/glance/common/client.py
+++ b/glance/common/client.py
@@ -28,6 +28,8 @@ import re
import select
import urllib
import urlparse
+import socket
+import tomograph
try:
from eventlet.green import socket, ssl
@@ -496,6 +498,9 @@ class BaseClient(object):
connection_type = self.get_connection_type()
headers = headers or {}
+ tomograph.start('registryclient', 'http', socket.gethostname(), 0)
+ tomograph.add_trace_info_header(headers)
+
if 'x-auth-token' not in headers and self.auth_tok:
headers['x-auth-token'] = self.auth_tok
@@ -557,6 +562,7 @@ class BaseClient(object):
def _retry(res):
return res.getheader('Retry-After')
+ tomograph.stop('http')
status_code = self.get_status_code(res)
if status_code in self.OK_RESPONSE_CODES:
return res
diff --git a/glance/common/wsgi.py b/glance/common/wsgi.py
index d324861..e8bd023 100644
--- a/glance/common/wsgi.py
+++ b/glance/common/wsgi.py
@@ -29,6 +29,7 @@ import os
import signal
import sys
import time
+import tomograph
import eventlet
from eventlet.green import socket, ssl
@@ -365,6 +366,14 @@ class Debug(Middleware):
print
+class Tomo(Middleware):
+ def process_request(self, req):
+ tomograph.start_http('glance', 'WSGI', req)
+
+ def process_response(self, req):
+ tomograph.stop('WSGI')
+
+
class Router(object):
"""
WSGI middleware that maps incoming requests to WSGI apps.
diff --git a/glance/db/sqlalchemy/api.py b/glance/db/sqlalchemy/api.py
index 779a434..1e8b824 100644
--- a/glance/db/sqlalchemy/api.py
+++ b/glance/db/sqlalchemy/api.py
@@ -36,6 +36,7 @@ from glance.openstack.common import cfg
import glance.openstack.common.log as os_logging
from glance.openstack.common import timeutils
+import tomograph
_ENGINE = None
_MAKER = None
@@ -100,6 +101,9 @@ def configure_db():
try:
_ENGINE = sqlalchemy.create_engine(sql_connection, **engine_args)
+ sqlalchemy.event.listen(_ENGINE, 'before_execute', tomograph.before_execute('glance'))
+ sqlalchemy.event.listen(_ENGINE, 'after_execute', tomograph.after_execute('glance'))
+
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener)

View File

@ -1,77 +0,0 @@
diff --git a/keystone/common/sql/core.py b/keystone/common/sql/core.py
index e9b780a..4dcf4dc 100644
--- a/keystone/common/sql/core.py
+++ b/keystone/common/sql/core.py
@@ -28,6 +28,7 @@ from keystone.common import logging
from keystone import config
from keystone.openstack.common import jsonutils
+import tomograph
CONF = config.CONF
@@ -137,6 +138,9 @@ class Base(object):
"""Return a SQLAlchemy session."""
if self._MAKER is None or self._ENGINE is None:
self._ENGINE = self.get_engine()
+ sqlalchemy.event.listen(self._ENGINE, 'before_execute', tomograph.before_execute('keystone'))
+ sqlalchemy.event.listen(self._ENGINE, 'after_execute', tomograph.after_execute('keystone'))
+
self._MAKER = self.get_maker(self._ENGINE,
autocommit,
expire_on_commit)
diff --git a/keystone/contrib/stats/core.py b/keystone/contrib/stats/core.py
index a479ee3..efa9dc1 100644
--- a/keystone/contrib/stats/core.py
+++ b/keystone/contrib/stats/core.py
@@ -23,6 +23,7 @@ from keystone.common import logging
from keystone.common import manager
from keystone.common import wsgi
+import tomograph
CONF = config.CONF
LOG = logging.getLogger(__name__)
@@ -141,9 +142,11 @@ class StatsMiddleware(wsgi.Middleware):
def process_request(self, request):
"""Monitor incoming request attributes."""
+ tomograph.start_http('keystone', 'WSGI', request)
self.capture_stats(request.host, request, self.request_attributes)
def process_response(self, request, response):
"""Monitor outgoing response attributes."""
+ tomograph.stop('WSGI')
self.capture_stats(request.host, response, self.response_attributes)
return response
diff --git a/keystone/middleware/auth_token.py b/keystone/middleware/auth_token.py
index ddadf9f..3ee86cd 100644
--- a/keystone/middleware/auth_token.py
+++ b/keystone/middleware/auth_token.py
@@ -108,6 +108,8 @@ from keystone.common import cms
from keystone.common import utils
from keystone.openstack.common import timeutils
+import tomograph
+
CONF = None
try:
from openstack.common import cfg
@@ -398,6 +400,8 @@ class AuthProtocol(object):
while True:
try:
+ headers = kwargs.setdefault('headers', {})
+ tomograph.add_trace_info_header(headers)
conn.request(method, path, **kwargs)
response = conn.getresponse()
body = response.read()
@@ -437,6 +441,8 @@ class AuthProtocol(object):
if additional_headers:
kwargs['headers'].update(additional_headers)
+ tomograph.add_trace_info_header(kwargs['headers'])
+
if body:
kwargs['body'] = jsonutils.dumps(body)

View File

@ -1,600 +0,0 @@
diff --git a/nova/api/ec2/__init__.py b/nova/api/ec2/__init__.py
index 2ae685c..7863db2 100644
--- a/nova/api/ec2/__init__.py
+++ b/nova/api/ec2/__init__.py
@@ -42,6 +42,7 @@ from nova.openstack.common import timeutils
from nova import utils
from nova import wsgi
+import tomograph
LOG = logging.getLogger(__name__)
@@ -95,6 +96,7 @@ class FaultWrapper(wsgi.Middleware):
"""Calls the middleware stack, captures any exceptions into faults."""
@webob.dec.wsgify(RequestClass=wsgi.Request)
+ @tomograph.traced('FaultWrapper', 'middleware')
def __call__(self, req):
try:
return req.get_response(self.application)
@@ -107,6 +109,7 @@ class RequestLogging(wsgi.Middleware):
"""Access-Log akin logging for all EC2 API requests."""
@webob.dec.wsgify(RequestClass=wsgi.Request)
+ @tomograph.traced('RequestLogging', 'middleware')
def __call__(self, req):
start = timeutils.utcnow()
rv = req.get_response(self.application)
@@ -169,6 +172,7 @@ class Lockout(wsgi.Middleware):
super(Lockout, self).__init__(application)
@webob.dec.wsgify(RequestClass=wsgi.Request)
+ @tomograph.traced('Lockout', 'middleware')
def __call__(self, req):
access_key = str(req.params['AWSAccessKeyId'])
failures_key = "authfailures-%s" % access_key
@@ -197,6 +201,7 @@ class EC2KeystoneAuth(wsgi.Middleware):
"""Authenticate an EC2 request with keystone and convert to context."""
@webob.dec.wsgify(RequestClass=wsgi.Request)
+ @tomograph.traced('EC2KeystoneAuth', 'middleware')
def __call__(self, req):
request_id = context.generate_request_id()
signature = req.params.get('Signature')
@@ -225,8 +230,11 @@ class EC2KeystoneAuth(wsgi.Middleware):
creds = {'ec2Credentials': cred_dict}
else:
creds = {'auth': {'OS-KSEC2:ec2Credentials': cred_dict}}
+
creds_json = jsonutils.dumps(creds)
+
headers = {'Content-Type': 'application/json'}
+ tomograph.add_trace_info_header(headers)
o = urlparse.urlparse(FLAGS.keystone_ec2_url)
if o.scheme == "http":
@@ -282,6 +290,7 @@ class NoAuth(wsgi.Middleware):
"""Add user:project as 'nova.context' to WSGI environ."""
@webob.dec.wsgify(RequestClass=wsgi.Request)
+ @tomograph.traced('NoAuth', 'middleware')
def __call__(self, req):
if 'AWSAccessKeyId' not in req.params:
raise webob.exc.HTTPBadRequest()
@@ -306,6 +315,7 @@ class Requestify(wsgi.Middleware):
self.controller = importutils.import_object(controller)
@webob.dec.wsgify(RequestClass=wsgi.Request)
+ @tomograph.traced('Requestify', 'middleware')
def __call__(self, req):
non_args = ['Action', 'Signature', 'AWSAccessKeyId', 'SignatureMethod',
'SignatureVersion', 'Version', 'Timestamp']
@@ -394,6 +404,7 @@ class Authorizer(wsgi.Middleware):
}
@webob.dec.wsgify(RequestClass=wsgi.Request)
+ @tomograph.traced('Authorizer', 'middleware')
def __call__(self, req):
context = req.environ['nova.context']
controller = req.environ['ec2.request'].controller.__class__.__name__
@@ -448,6 +459,7 @@ class Validator(wsgi.Middleware):
super(Validator, self).__init__(application)
@webob.dec.wsgify(RequestClass=wsgi.Request)
+ @tomograph.traced('Validator', 'middleware')
def __call__(self, req):
if validator.validate(req.environ['ec2.request'].args,
validator.DEFAULT_VALIDATOR):
@@ -466,6 +478,7 @@ class Executor(wsgi.Application):
"""
@webob.dec.wsgify(RequestClass=wsgi.Request)
+ @tomograph.traced('Executor', 'application')
def __call__(self, req):
context = req.environ['nova.context']
request_id = context.request_id
diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py
index 6afb05a..e2fc7f8 100644
--- a/nova/api/ec2/cloud.py
+++ b/nova/api/ec2/cloud.py
@@ -43,6 +43,7 @@ from nova import quota
from nova import utils
from nova import volume
+import tomograph
FLAGS = flags.FLAGS
diff --git a/nova/api/openstack/__init__.py b/nova/api/openstack/__init__.py
index ac7021f..f391eb8 100644
--- a/nova/api/openstack/__init__.py
+++ b/nova/api/openstack/__init__.py
@@ -23,6 +23,7 @@ WSGI middleware for OpenStack API controllers.
import routes
import webob.dec
import webob.exc
+import tomograph
from nova.api.openstack import wsgi
from nova.openstack.common import log as logging
diff --git a/nova/api/openstack/wsgi.py b/nova/api/openstack/wsgi.py
index e440889..f4554fa 100644
--- a/nova/api/openstack/wsgi.py
+++ b/nova/api/openstack/wsgi.py
@@ -30,6 +30,8 @@ from nova.openstack.common import log as logging
from nova import utils
from nova import wsgi
+import tomograph
+
XMLNS_V10 = 'http://docs.rackspacecloud.com/servers/api/v1.0'
XMLNS_V11 = 'http://docs.openstack.org/compute/api/v1.1'
diff --git a/nova/compute/instance_types.py b/nova/compute/instance_types.py
index 6869672..e1052be 100644
--- a/nova/compute/instance_types.py
+++ b/nova/compute/instance_types.py
@@ -102,7 +102,8 @@ def destroy(name):
LOG.exception(_('Instance type %s not found for deletion') % name)
raise exception.InstanceTypeNotFoundByName(instance_type_name=name)
-
+import tomograph
+@tomograph.traced('get_all_types', 'foo')
def get_all_types(ctxt=None, inactive=False, filters=None):
"""Get all non-deleted instance_types.
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index bbb71dd..9c92cce 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -2650,6 +2650,13 @@ class ComputeManager(manager.SchedulerDependentManager):
usage['bw_in'], usage['bw_out'],
last_refreshed=refreshed)
+ @manager.periodic_task(fast_task=True)
+ def _driver_metrics(self, context):
+ capabilities = self.driver.get_host_stats(refresh=True)
+ capabilities['host_ip'] = FLAGS.my_ip
+ capabilities['num_instances'] = self.driver.get_num_instances()
+ LOG.audit("driver_metrics", extra=capabilities)
+
@manager.periodic_task
def _report_driver_status(self, context):
curr_time = time.time()
diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py
index afec290..744a9c1 100644
--- a/nova/compute/rpcapi.py
+++ b/nova/compute/rpcapi.py
@@ -24,7 +24,7 @@ from nova.openstack.common import jsonutils
from nova.openstack.common import rpc
import nova.openstack.common.rpc.proxy
-
+import tomograph
FLAGS = flags.FLAGS
@@ -501,8 +501,9 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
def terminate_instance(self, ctxt, instance):
instance_p = jsonutils.to_primitive(instance)
- self.cast(ctxt, self.make_msg('terminate_instance',
- instance=instance_p),
+ msg = self.make_msg('terminate_instance', instance=instance_p)
+ msg['trace_info'] = tomograph.get_trace_info()
+ self.cast(ctxt, msg,
topic=_compute_topic(self.topic, ctxt, None, instance))
def unpause_instance(self, ctxt, instance):
diff --git a/nova/db/sqlalchemy/session.py b/nova/db/sqlalchemy/session.py
index 6e754be..92ea58b 100644
--- a/nova/db/sqlalchemy/session.py
+++ b/nova/db/sqlalchemy/session.py
@@ -25,6 +25,7 @@ from sqlalchemy.exc import DisconnectionError, OperationalError
import sqlalchemy.interfaces
import sqlalchemy.orm
from sqlalchemy.pool import NullPool, StaticPool
+import tomograph
import nova.exception
import nova.flags as flags
@@ -122,6 +123,8 @@ def get_engine():
_ENGINE = sqlalchemy.create_engine(FLAGS.sql_connection, **engine_args)
+ sqlalchemy.event.listen(_ENGINE, 'before_execute', tomograph.before_execute('nova'))
+ sqlalchemy.event.listen(_ENGINE, 'after_execute', tomograph.after_execute('nova'))
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener)
elif "sqlite" in connection_dict.drivername:
@@ -158,6 +161,7 @@ def get_engine():
if (remaining != 'infinite' and remaining == 0) or \
not is_db_connection_error(e.args[0]):
raise
+
return _ENGINE
@@ -207,3 +211,5 @@ def debug_mysql_do_query():
# return the new _do_query method
return _do_query
+
+
diff --git a/nova/image/s3.py b/nova/image/s3.py
index 80f9448..1597864 100644
--- a/nova/image/s3.py
+++ b/nova/image/s3.py
@@ -38,6 +38,7 @@ from nova.openstack.common import cfg
from nova.openstack.common import log as logging
from nova import utils
+#import tomograph
LOG = logging.getLogger(__name__)
@@ -72,6 +73,7 @@ class S3ImageService(object):
self.service = service or glance.get_default_image_service()
self.service.__init__(*args, **kwargs)
+ #@tomograph.traced('uuidxlate1', 'foo')
def _translate_uuids_to_ids(self, context, images):
return [self._translate_uuid_to_id(context, img) for img in images]
@@ -136,6 +138,7 @@ class S3ImageService(object):
image = self.service.update(context, image_uuid, metadata, data)
return self._translate_uuid_to_id(context, image)
+ #@tomograph.traced('s3details', 'foo')
def detail(self, context, **kwargs):
#NOTE(bcwaldon): sort asc to make sure we assign lower ids
# to older images
diff --git a/nova/manager.py b/nova/manager.py
index 275d98b..9a5efb1 100644
--- a/nova/manager.py
+++ b/nova/manager.py
@@ -84,6 +84,7 @@ def periodic_task(*args, **kwargs):
def decorator(f):
f._periodic_task = True
f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
+ f._fast_task = kwargs.pop('fast_task', False)
return f
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
@@ -115,8 +116,10 @@ class ManagerMeta(type):
# parent's toes.
try:
cls._periodic_tasks = cls._periodic_tasks[:]
+ cls._fast_tasks = cls._fast_tasks[:]
except AttributeError:
cls._periodic_tasks = []
+ cls._fast_tasks = []
try:
cls._ticks_to_skip = cls._ticks_to_skip.copy()
@@ -127,8 +130,11 @@ class ManagerMeta(type):
if getattr(value, '_periodic_task', False):
task = value
name = task.__name__
- cls._periodic_tasks.append((name, task))
- cls._ticks_to_skip[name] = task._ticks_between_runs
+ if getattr(value, '_fast_task', False):
+ cls._fast_tasks.append((name, task))
+ else:
+ cls._periodic_tasks.append((name, task))
+ cls._ticks_to_skip[name] = task._ticks_between_runs
class Manager(base.Base):
@@ -156,6 +162,16 @@ class Manager(base.Base):
'''
return rpc_dispatcher.RpcDispatcher([self])
+ def fast_tasks(self, context):
+ for task_name, task in self._fast_tasks:
+ full_task_name = '.'.join([self.__class__.__name__, task_name])
+ #LOG.debug(_("Running fast task %(full_task_name)s"), locals())
+ try:
+ task(self, context)
+ except Exception as e:
+ LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
+ locals())
+
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
for task_name, task in self._periodic_tasks:
diff --git a/nova/network/manager.py b/nova/network/manager.py
index 6a51f05..d59a883 100644
--- a/nova/network/manager.py
+++ b/nova/network/manager.py
@@ -1,5 +1,5 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
+# $Id$
# Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
diff --git a/nova/openstack/common/rpc/__init__.py b/nova/openstack/common/rpc/__init__.py
index 0f82c47..d08c154 100644
--- a/nova/openstack/common/rpc/__init__.py
+++ b/nova/openstack/common/rpc/__init__.py
@@ -27,7 +27,7 @@ For some wrappers that add message versioning to rpc, see:
from nova.openstack.common import cfg
from nova.openstack.common import importutils
-
+import tomograph
rpc_opts = [
cfg.StrOpt('rpc_backend',
@@ -105,6 +105,7 @@ def call(context, topic, msg, timeout=None):
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
+ msg['trace_info'] = tomograph.get_trace_info()
return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
@@ -123,6 +124,10 @@ def cast(context, topic, msg):
:returns: None
"""
+ try:
+ msg['trace_info'] = tomograph.get_trace_info()
+ except:
+ pass
return _get_impl().cast(cfg.CONF, context, topic, msg)
@@ -144,6 +149,10 @@ def fanout_cast(context, topic, msg):
:returns: None
"""
+ try:
+ msg['trace_info'] = tomograph.get_trace_info()
+ except:
+ pass
return _get_impl().fanout_cast(cfg.CONF, context, topic, msg)
@@ -174,6 +183,7 @@ def multicall(context, topic, msg, timeout=None):
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
+ msg['trace_info'] = tomograph.get_trace_info()
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
@@ -215,6 +225,10 @@ def cast_to_server(context, server_params, topic, msg):
:returns: None
"""
+ try:
+ msg['trace_info'] = tomograph.get_trace_info()
+ except:
+ pass
return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic,
msg)
@@ -231,6 +245,10 @@ def fanout_cast_to_server(context, server_params, topic, msg):
:returns: None
"""
+ try:
+ msg['trace_info'] = tomograph.get_trace_info()
+ except:
+ pass
return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params,
topic, msg)
diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py
index a884084..9fcff67 100644
--- a/nova/openstack/common/rpc/amqp.py
+++ b/nova/openstack/common/rpc/amqp.py
@@ -40,6 +40,8 @@ from nova.openstack.common.gettextutils import _
from nova.openstack.common import local
from nova.openstack.common.rpc import common as rpc_common
+import socket
+import tomograph
LOG = logging.getLogger(__name__)
@@ -255,14 +257,15 @@ class ProxyCallback(object):
method = message_data.get('method')
args = message_data.get('args', {})
version = message_data.get('version', None)
+ trace_info = message_data.get('trace_info')
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
- self.pool.spawn_n(self._process_data, ctxt, version, method, args)
+ self.pool.spawn_n(self._process_data, ctxt, version, trace_info, method, args)
- def _process_data(self, ctxt, version, method, args):
+ def _process_data(self, ctxt, version, trace_info, method, args):
"""Process a message in a new thread.
If the proxy object we have has a dispatch method
@@ -271,6 +274,10 @@ class ProxyCallback(object):
the old behavior of magically calling the specified method on the
proxy we have here.
"""
+ moo = method
+ if isinstance(moo, unicode):
+ moo = moo.encode('ascii', 'ignore')
+ tomograph.start("rpc" + moo, 'proxy', socket.gethostname(), 1000, trace_info)
ctxt.update_store()
try:
rval = self.proxy.dispatch(ctxt, version, method, **args)
@@ -286,6 +293,7 @@ class ProxyCallback(object):
LOG.exception('Exception during message handling')
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
+ tomograph.stop('proxy')
class MulticallWaiter(object):
@@ -301,6 +309,7 @@ class MulticallWaiter(object):
def done(self):
if self._done:
return
+ #tomograph.stop('rpc')
self._done = True
self._iterator.close()
self._iterator = None
diff --git a/nova/openstack/common/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py
index a077552..b0f48e1 100644
--- a/nova/openstack/common/rpc/proxy.py
+++ b/nova/openstack/common/rpc/proxy.py
@@ -21,6 +21,8 @@ For more information about rpc API version numbers, see:
rpc/dispatcher.py
"""
+import socket
+import tomograph
from nova.openstack.common import rpc
@@ -77,6 +79,7 @@ class RpcProxy(object):
:returns: The return value from the remote method.
"""
self._set_version(msg, version)
+ msg['trace_info'] = tomograph.get_trace_info()
return rpc.call(context, self._get_topic(topic), msg, timeout)
def multicall(self, context, msg, topic=None, version=None, timeout=None):
@@ -95,6 +98,7 @@ class RpcProxy(object):
from the remote method as they arrive.
"""
self._set_version(msg, version)
+ msg['trace_info'] = tomograph.get_trace_info()
return rpc.multicall(context, self._get_topic(topic), msg, timeout)
def cast(self, context, msg, topic=None, version=None):
@@ -110,6 +114,10 @@ class RpcProxy(object):
remote method.
"""
self._set_version(msg, version)
+ try:
+ msg['trace_info'] = tomograph.get_trace_info()
+ except:
+ pass
rpc.cast(context, self._get_topic(topic), msg)
def fanout_cast(self, context, msg, topic=None, version=None):
@@ -125,6 +133,10 @@ class RpcProxy(object):
from the remote method.
"""
self._set_version(msg, version)
+ try:
+ msg['trace_info'] = tomograph.get_trace_info()
+ except:
+ pass
rpc.fanout_cast(context, self._get_topic(topic), msg)
def cast_to_server(self, context, server_params, msg, topic=None,
@@ -143,6 +155,10 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
+ try:
+ msg['trace_info'] = tomograph.get_trace_info()
+ except:
+ pass
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
@@ -161,5 +177,9 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
+ try:
+ msg['trace_info'] = tomograph.get_trace_info()
+ except:
+ pass
rpc.fanout_cast_to_server(context, server_params,
self._get_topic(topic), msg)
diff --git a/nova/service.py b/nova/service.py
index 6f350c6..9ff1815 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -24,11 +24,13 @@ import inspect
import os
import random
import signal
+import socket
import sys
import time
import eventlet
import greenlet
+import tomograph
from nova.common import eventlet_backdoor
from nova import context
@@ -443,6 +445,10 @@ class Service(object):
initial_delay=initial_delay)
self.timers.append(periodic)
+ fast_periodic = utils.LoopingCall(self.fast_tasks)
+ fast_periodic.start(interval=5)
+ self.timers.append(fast_periodic)
+
def _create_service_ref(self, context):
zone = FLAGS.node_availability_zone
service_ref = db.service_create(context,
@@ -527,6 +533,10 @@ class Service(object):
ctxt = context.get_admin_context()
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
+ def fast_tasks(self):
+ ctxt = context.get_admin_context()
+ self.manager.fast_tasks(ctxt)
+
def report_state(self):
"""Update the state of this service in the datastore."""
ctxt = context.get_admin_context()
@@ -572,11 +582,12 @@ class WSGIService(object):
"""
self.name = name
+ LOG.error('wsgi server named {0} coming online...'.format(name))
self.manager = self._get_manager()
self.loader = loader or wsgi.Loader()
- self.app = self.loader.load_app(name)
self.host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0")
self.port = getattr(FLAGS, '%s_listen_port' % name, 0)
+ self.app = tomograph.tracewrap(self.loader.load_app(name), self.name, "WSGI", self.host, self.port)
self.workers = getattr(FLAGS, '%s_workers' % name, None)
self.server = wsgi.Server(name,
self.app,
diff --git a/nova/wsgi.py b/nova/wsgi.py
index afb5303..07a3b37 100644
--- a/nova/wsgi.py
+++ b/nova/wsgi.py
@@ -34,6 +34,7 @@ from nova import exception
from nova import flags
from nova.openstack.common import log as logging
+import tomograph
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
@@ -238,6 +239,7 @@ class Middleware(Application):
"""Do whatever you'd like to the response."""
return response
+ @tomograph.traced(None, 'middleware')
@webob.dec.wsgify(RequestClass=Request)
def __call__(self, req):
response = self.process_request(req)
@@ -373,3 +375,4 @@ class Loader(object):
except LookupError as err:
LOG.error(err)
raise exception.PasteAppNotFound(name=name, path=self.config_path)
+

Binary file not shown.

Before

Width:  |  Height:  |  Size: 112 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 71 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 166 KiB

View File

@ -1,75 +0,0 @@
import os
import sys
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
sys.path.insert(0, os.path.abspath('../../'))
sys.path.insert(0, os.path.abspath('../'))
sys.path.insert(0, os.path.abspath('./'))
sys.path.insert(0, os.path.abspath('.'))
from tomograph import version as tomograph_version
# Supress warnings for docs that aren't used yet
#unused_docs = [
#]
# -- General configuration -----------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = [
'sphinx.ext.intersphinx',
]
intersphinx_mapping = {
'sphinx': ('http://sphinx.pocoo.org', None)
}
# The suffix of source filenames.
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = 'TOMOGRAPH'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
release = tomograph_version.version_string()
version = tomograph_version.canonical_version_string()
# Set the default Pygments syntax
highlight_language = 'python'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
exclude_patterns = []
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
show_authors = False
# -- Options for HTML output ---------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
html_theme = 'default'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
html_theme_options = {
"bodyfont": "Arial, sans-serif",
"headfont": "Arial, sans-serif"
}
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = 'img/tomograph-tiny.png'

View File

@ -1,11 +0,0 @@
.. _index:
=====================
Tomograph
=====================
.. rubric:: A library to help distributed applications send trace information to
metrics backends like [Zipkin][zipkin] and [Statsd][statsd].
----

View File

@ -1,3 +0,0 @@
webob
statsd

View File

@ -1,9 +0,0 @@
# Install bounded pep8/pyflakes first, then let flake8 install
pep8==1.4.5
pyflakes>=0.7.2,<0.7.4
flake8==2.0
hacking>=0.5.6,<0.8
nose
nose-exclude
sphinx>=1.1.2

View File

@ -1,5 +0,0 @@
webob
statsd
eventlet
thrift

View File

@ -1,39 +0,0 @@
[metadata]
name = tomograph
summary = Tiny tims tracing tomograph
description-file =
README.md
author = Tomograph Developers
author-email = timjr@yahoo-inc.com
classifier =
Development Status :: 3 - Alpha Development Status
Environment :: OpenStack
Intended Audience :: Information Technology
Intended Audience :: Developers
License :: OSI Approved :: Apache Software License
Operating System :: POSIX :: Linux
Programming Language :: Python
Programming Language :: Python :: 2
Programming Language :: Python :: 2.6
Programming Language :: Python :: 2.7
Programming Language :: Python :: 3.3
[global]
setup-hooks =
pbr.hooks.setup_hook
[files]
packages =
tomograph
[nosetests]
cover-erase = true
verbosity = 2
[build_sphinx]
source-dir = doc/source
build-dir = doc/build
all_files = 1
[upload_sphinx]
upload-dir = docs/build/html

View File

@ -1,23 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT
import setuptools
setuptools.setup(
setup_requires=['pbr'],
pbr=True)

View File

@ -1,11 +0,0 @@
# Install bounded pep8/pyflakes first, then let flake8 install
pep8==1.4.5
pyflakes>=0.7.2,<0.7.4
flake8==2.0
hacking>=0.5.6,<0.8
nose
nose-exclude
openstack.nose_plugin>=0.7
pylint==0.25.2
sphinx>=1.1.2

View File

@ -1,38 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
import tomograph
import sys
import time
@tomograph.traced('test server', 'server response', port=80)
def server(latency):
tomograph.annotate('this is an annotation')
time.sleep(latency)
tomograph.tag('this is double', 1.1)
tomograph.tag('this is a string', 'foo')
tomograph.tag('this is an int', 42)
@tomograph.traced('test client', 'client request')
def client(client_overhead, server_latency):
time.sleep(client_overhead)
server(server_latency)
if __name__ == '__main__':
if len(sys.argv) > 1:
tomograph.config.set_backends(sys.argv[1:])
client(0.1, 2.4)

View File

@ -1,40 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
import sys
import time
import tomograph
@tomograph.traced('test server', 'server response', port=80)
def server(latency):
time.sleep(latency)
@tomograph.traced('test client', 'client request')
def client(client_overhead, server_latency):
time.sleep(client_overhead)
server(server_latency)
def clientloop():
for i in xrange(10000):
client(0, 0)
if __name__ == '__main__':
if len(sys.argv) > 1:
tomograph.config.set_backends(sys.argv[1:])
#cProfile.run('clientloop()', 'tomo-bench')
clientloop()

View File

@ -1,43 +0,0 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
### Initialize logging in case it hasn't been done. We need two
### versions of this, one for the eventlet logging module and one for
### the non-eventlet one...
from __future__ import absolute_import
import logging
import sys
import eventlet
eventlet_logging = eventlet.import_patched('logging')
eventlet_sys = eventlet.import_patched('sys')
def _initLogging(logging, sys):
"""Set up some default stuff, in case nobody configured logging yet."""
logger = logging.getLogger('tomograph')
if logger.level == logging.NOTSET:
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s '
'%(name)s %(message)s'))
logger.addHandler(handler)
_initLogging(logging, sys)
_initLogging(eventlet_logging, eventlet_sys)
from tomograph.tomograph import *

View File

@ -1,10 +0,0 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.

View File

@ -1,19 +0,0 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
import logging
import sys
logger = logging.getLogger(__name__)
def send(span):
logger.info(span)

View File

@ -1,13 +0,0 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
from statsd import *

View File

@ -1,57 +0,0 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
import eventlet
from tomograph import cache
from tomograph import config
logging = eventlet.import_patched('logging')
socket = eventlet.import_patched('socket')
threading = eventlet.import_patched('threading')
logger = logging.getLogger(__name__)
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
hostname_cache = cache.Cache(socket.gethostbyname)
lock = threading.Lock()
def send(span):
def statsd_send(name, value, units):
stat = (str(name).replace(' ', '-') + ':' + str(int(value)) +
'|' + str(units))
with lock:
try:
udp_socket.sendto(stat,
(hostname_cache.get(config.statsd_host),
config.statsd_port))
except Exception:
if config.debug:
logger.warning("Error sending metric to statsd.",
exc_info=True)
def server_name(note):
address = note.address.replace('.', '-')
return note.service_name + ' ' + address + ' ' + str(note.port)
# the timing stat:
delta = span.notes[-1].time - span.notes[0].time
deltams = delta * 1000
time_stat_name = server_name(span.notes[0]) + '.' + span.name
statsd_send(time_stat_name, deltams, 'ms')
# a count stat for each note
for note in span.notes:
stat_name = server_name(note) + '.' + span.name + '.' + str(note.value)
statsd_send(stat_name, 1, 'c')

View File

@ -1,12 +0,0 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
from zipkin import *

View File

@ -1 +0,0 @@
__all__ = ['ttypes', 'constants', 'scribe']

View File

@ -1,11 +0,0 @@
#
# Autogenerated by Thrift Compiler (0.9.0)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
# options string: py:new_style=true
#
from thrift.Thrift import TType, TMessageType, TException, TApplicationException
from ttypes import *

View File

@ -1,88 +0,0 @@
#!/usr/bin/env python
#
# Autogenerated by Thrift Compiler (0.9.0)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
# options string: py:new_style=true
#
import sys
import pprint
from urlparse import urlparse
from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.transport import THttpClient
from thrift.protocol import TBinaryProtocol
import scribe
from ttypes import *
if len(sys.argv) <= 1 or sys.argv[1] == '--help':
print ''
print 'Usage: ' + sys.argv[0] + ' [-h host[:port]] [-u url] [-f[ramed]] function [arg1 [arg2...]]'
print ''
print 'Functions:'
print ' ResultCode Log( messages)'
print ''
sys.exit(0)
pp = pprint.PrettyPrinter(indent = 2)
host = 'localhost'
port = 9090
uri = ''
framed = False
http = False
argi = 1
if sys.argv[argi] == '-h':
parts = sys.argv[argi+1].split(':')
host = parts[0]
if len(parts) > 1:
port = int(parts[1])
argi += 2
if sys.argv[argi] == '-u':
url = urlparse(sys.argv[argi+1])
parts = url[1].split(':')
host = parts[0]
if len(parts) > 1:
port = int(parts[1])
else:
port = 80
uri = url[2]
if url[4]:
uri += '?%s' % url[4]
http = True
argi += 2
if sys.argv[argi] == '-f' or sys.argv[argi] == '-framed':
framed = True
argi += 1
cmd = sys.argv[argi]
args = sys.argv[argi+1:]
if http:
transport = THttpClient.THttpClient(host, port, uri)
else:
socket = TSocket.TSocket(host, port)
if framed:
transport = TTransport.TFramedTransport(socket)
else:
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = scribe.Client(protocol)
transport.open()
if cmd == 'Log':
if len(args) != 1:
print 'Log requires 1 args'
sys.exit(1)
pp.pprint(client.Log(eval(args[0]),))
else:
print 'Unrecognized method %s' % cmd
sys.exit(1)
transport.close()

View File

@ -1,228 +0,0 @@
#
# Autogenerated by Thrift Compiler (0.9.0)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
# options string: py:new_style=true
#
from thrift.Thrift import TType, TMessageType, TException, TApplicationException
from ttypes import *
from thrift.Thrift import TProcessor
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol, TProtocol
try:
from thrift.protocol import fastbinary
except:
fastbinary = None
class Iface(object):
def Log(self, messages):
"""
Parameters:
- messages
"""
pass
class Client(Iface):
def __init__(self, iprot, oprot=None):
self._iprot = self._oprot = iprot
if oprot is not None:
self._oprot = oprot
self._seqid = 0
def Log(self, messages):
"""
Parameters:
- messages
"""
self.send_Log(messages)
return self.recv_Log()
def send_Log(self, messages):
self._oprot.writeMessageBegin('Log', TMessageType.CALL, self._seqid)
args = Log_args()
args.messages = messages
args.write(self._oprot)
self._oprot.writeMessageEnd()
self._oprot.trans.flush()
def recv_Log(self, ):
(fname, mtype, rseqid) = self._iprot.readMessageBegin()
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
x.read(self._iprot)
self._iprot.readMessageEnd()
raise x
result = Log_result()
result.read(self._iprot)
self._iprot.readMessageEnd()
if result.success is not None:
return result.success
raise TApplicationException(TApplicationException.MISSING_RESULT, "Log failed: unknown result");
class Processor(Iface, TProcessor):
def __init__(self, handler):
self._handler = handler
self._processMap = {}
self._processMap["Log"] = Processor.process_Log
def process(self, iprot, oprot):
(name, type, seqid) = iprot.readMessageBegin()
if name not in self._processMap:
iprot.skip(TType.STRUCT)
iprot.readMessageEnd()
x = TApplicationException(TApplicationException.UNKNOWN_METHOD, 'Unknown function %s' % (name))
oprot.writeMessageBegin(name, TMessageType.EXCEPTION, seqid)
x.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
return
else:
self._processMap[name](self, seqid, iprot, oprot)
return True
def process_Log(self, seqid, iprot, oprot):
args = Log_args()
args.read(iprot)
iprot.readMessageEnd()
result = Log_result()
result.success = self._handler.Log(args.messages)
oprot.writeMessageBegin("Log", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
# HELPER FUNCTIONS AND STRUCTURES
class Log_args(object):
"""
Attributes:
- messages
"""
thrift_spec = (
None, # 0
(1, TType.LIST, 'messages', (TType.STRUCT,(LogEntry, LogEntry.thrift_spec)), None, ), # 1
)
def __init__(self, messages=None,):
self.messages = messages
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
return
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.LIST:
self.messages = []
(_etype3, _size0) = iprot.readListBegin()
for _i4 in xrange(_size0):
_elem5 = LogEntry()
_elem5.read(iprot)
self.messages.append(_elem5)
iprot.readListEnd()
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('Log_args')
if self.messages is not None:
oprot.writeFieldBegin('messages', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.messages))
for iter6 in self.messages:
iter6.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
return
def __repr__(self):
L = ['%s=%r' % (key, value)
for key, value in self.__dict__.iteritems()]
return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
def __eq__(self, other):
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not (self == other)
class Log_result(object):
"""
Attributes:
- success
"""
thrift_spec = (
(0, TType.I32, 'success', None, None, ), # 0
)
def __init__(self, success=None,):
self.success = success
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
return
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 0:
if ftype == TType.I32:
self.success = iprot.readI32();
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('Log_result')
if self.success is not None:
oprot.writeFieldBegin('success', TType.I32, 0)
oprot.writeI32(self.success)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
return
def __repr__(self):
L = ['%s=%r' % (key, value)
for key, value in self.__dict__.iteritems()]
return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
def __eq__(self, other):
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not (self == other)

View File

@ -1,104 +0,0 @@
#
# Autogenerated by Thrift Compiler (0.9.0)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
# options string: py:new_style=true
#
from thrift.Thrift import TType, TMessageType, TException, TApplicationException
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol, TProtocol
try:
from thrift.protocol import fastbinary
except:
fastbinary = None
class ResultCode(object):
OK = 0
TRY_LATER = 1
_VALUES_TO_NAMES = {
0: "OK",
1: "TRY_LATER",
}
_NAMES_TO_VALUES = {
"OK": 0,
"TRY_LATER": 1,
}
class LogEntry(object):
"""
Attributes:
- category
- message
"""
thrift_spec = (
None, # 0
(1, TType.STRING, 'category', None, None, ), # 1
(2, TType.STRING, 'message', None, None, ), # 2
)
def __init__(self, category=None, message=None,):
self.category = category
self.message = message
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
return
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.STRING:
self.category = iprot.readString();
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.STRING:
self.message = iprot.readString();
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('LogEntry')
if self.category is not None:
oprot.writeFieldBegin('category', TType.STRING, 1)
oprot.writeString(self.category)
oprot.writeFieldEnd()
if self.message is not None:
oprot.writeFieldBegin('message', TType.STRING, 2)
oprot.writeString(self.message)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
return
def __repr__(self):
L = ['%s=%r' % (key, value)
for key, value in self.__dict__.iteritems()]
return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
def __eq__(self, other):
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not (self == other)

View File

@ -1 +0,0 @@
__all__ = ['ttypes', 'constants']

View File

@ -1,15 +0,0 @@
#
# Autogenerated by Thrift Compiler (0.9.0)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
# options string: py:new_style=true
#
from thrift.Thrift import TType, TMessageType, TException, TApplicationException
from ttypes import *
CLIENT_SEND = "cs"
CLIENT_RECV = "cr"
SERVER_SEND = "ss"
SERVER_RECV = "sr"

View File

@ -1,477 +0,0 @@
#
# Autogenerated by Thrift Compiler (0.9.0)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
# options string: py:new_style=true
#
from thrift.Thrift import TType, TMessageType, TException, TApplicationException
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol, TProtocol
try:
from thrift.protocol import fastbinary
except:
fastbinary = None
class AnnotationType(object):
BOOL = 0
BYTES = 1
I16 = 2
I32 = 3
I64 = 4
DOUBLE = 5
STRING = 6
_VALUES_TO_NAMES = {
0: "BOOL",
1: "BYTES",
2: "I16",
3: "I32",
4: "I64",
5: "DOUBLE",
6: "STRING",
}
_NAMES_TO_VALUES = {
"BOOL": 0,
"BYTES": 1,
"I16": 2,
"I32": 3,
"I64": 4,
"DOUBLE": 5,
"STRING": 6,
}
class Endpoint(object):
"""
Attributes:
- ipv4
- port
- service_name
"""
thrift_spec = (
None, # 0
(1, TType.I32, 'ipv4', None, None, ), # 1
(2, TType.I16, 'port', None, None, ), # 2
(3, TType.STRING, 'service_name', None, None, ), # 3
)
def __init__(self, ipv4=None, port=None, service_name=None,):
self.ipv4 = ipv4
self.port = port
self.service_name = service_name
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
return
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.I32:
self.ipv4 = iprot.readI32();
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.I16:
self.port = iprot.readI16();
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.STRING:
self.service_name = iprot.readString();
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('Endpoint')
if self.ipv4 is not None:
oprot.writeFieldBegin('ipv4', TType.I32, 1)
oprot.writeI32(self.ipv4)
oprot.writeFieldEnd()
if self.port is not None:
oprot.writeFieldBegin('port', TType.I16, 2)
oprot.writeI16(self.port)
oprot.writeFieldEnd()
if self.service_name is not None:
oprot.writeFieldBegin('service_name', TType.STRING, 3)
oprot.writeString(self.service_name)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
return
def __repr__(self):
L = ['%s=%r' % (key, value)
for key, value in self.__dict__.iteritems()]
return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
def __eq__(self, other):
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not (self == other)
class Annotation(object):
"""
Attributes:
- timestamp
- value
- host
- duration
"""
thrift_spec = (
None, # 0
(1, TType.I64, 'timestamp', None, None, ), # 1
(2, TType.STRING, 'value', None, None, ), # 2
(3, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 3
(4, TType.I32, 'duration', None, None, ), # 4
)
def __init__(self, timestamp=None, value=None, host=None, duration=None,):
self.timestamp = timestamp
self.value = value
self.host = host
self.duration = duration
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
return
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.I64:
self.timestamp = iprot.readI64();
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.STRING:
self.value = iprot.readString();
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.STRUCT:
self.host = Endpoint()
self.host.read(iprot)
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.I32:
self.duration = iprot.readI32();
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('Annotation')
if self.timestamp is not None:
oprot.writeFieldBegin('timestamp', TType.I64, 1)
oprot.writeI64(self.timestamp)
oprot.writeFieldEnd()
if self.value is not None:
oprot.writeFieldBegin('value', TType.STRING, 2)
oprot.writeString(self.value)
oprot.writeFieldEnd()
if self.host is not None:
oprot.writeFieldBegin('host', TType.STRUCT, 3)
self.host.write(oprot)
oprot.writeFieldEnd()
if self.duration is not None:
oprot.writeFieldBegin('duration', TType.I32, 4)
oprot.writeI32(self.duration)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
return
def __repr__(self):
L = ['%s=%r' % (key, value)
for key, value in self.__dict__.iteritems()]
return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
def __eq__(self, other):
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not (self == other)
class BinaryAnnotation(object):
"""
Attributes:
- key
- value
- annotation_type
- host
"""
thrift_spec = (
None, # 0
(1, TType.STRING, 'key', None, None, ), # 1
(2, TType.STRING, 'value', None, None, ), # 2
(3, TType.I32, 'annotation_type', None, None, ), # 3
(4, TType.STRUCT, 'host', (Endpoint, Endpoint.thrift_spec), None, ), # 4
)
def __init__(self, key=None, value=None, annotation_type=None, host=None,):
self.key = key
self.value = value
self.annotation_type = annotation_type
self.host = host
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
return
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.STRING:
self.key = iprot.readString();
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.STRING:
self.value = iprot.readString();
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.I32:
self.annotation_type = iprot.readI32();
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.STRUCT:
self.host = Endpoint()
self.host.read(iprot)
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('BinaryAnnotation')
if self.key is not None:
oprot.writeFieldBegin('key', TType.STRING, 1)
oprot.writeString(self.key)
oprot.writeFieldEnd()
if self.value is not None:
oprot.writeFieldBegin('value', TType.STRING, 2)
oprot.writeString(self.value)
oprot.writeFieldEnd()
if self.annotation_type is not None:
oprot.writeFieldBegin('annotation_type', TType.I32, 3)
oprot.writeI32(self.annotation_type)
oprot.writeFieldEnd()
if self.host is not None:
oprot.writeFieldBegin('host', TType.STRUCT, 4)
self.host.write(oprot)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
return
def __repr__(self):
L = ['%s=%r' % (key, value)
for key, value in self.__dict__.iteritems()]
return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
def __eq__(self, other):
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not (self == other)
class Span(object):
"""
Attributes:
- trace_id
- name
- id
- parent_id
- annotations
- binary_annotations
- debug
"""
thrift_spec = (
None, # 0
(1, TType.I64, 'trace_id', None, None, ), # 1
None, # 2
(3, TType.STRING, 'name', None, None, ), # 3
(4, TType.I64, 'id', None, None, ), # 4
(5, TType.I64, 'parent_id', None, None, ), # 5
(6, TType.LIST, 'annotations', (TType.STRUCT,(Annotation, Annotation.thrift_spec)), None, ), # 6
None, # 7
(8, TType.LIST, 'binary_annotations', (TType.STRUCT,(BinaryAnnotation, BinaryAnnotation.thrift_spec)), None, ), # 8
(9, TType.BOOL, 'debug', None, False, ), # 9
)
def __init__(self, trace_id=None, name=None, id=None, parent_id=None, annotations=None, binary_annotations=None, debug=thrift_spec[9][4],):
self.trace_id = trace_id
self.name = name
self.id = id
self.parent_id = parent_id
self.annotations = annotations
self.binary_annotations = binary_annotations
self.debug = debug
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
return
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.I64:
self.trace_id = iprot.readI64();
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.STRING:
self.name = iprot.readString();
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.I64:
self.id = iprot.readI64();
else:
iprot.skip(ftype)
elif fid == 5:
if ftype == TType.I64:
self.parent_id = iprot.readI64();
else:
iprot.skip(ftype)
elif fid == 6:
if ftype == TType.LIST:
self.annotations = []
(_etype3, _size0) = iprot.readListBegin()
for _i4 in xrange(_size0):
_elem5 = Annotation()
_elem5.read(iprot)
self.annotations.append(_elem5)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 8:
if ftype == TType.LIST:
self.binary_annotations = []
(_etype9, _size6) = iprot.readListBegin()
for _i10 in xrange(_size6):
_elem11 = BinaryAnnotation()
_elem11.read(iprot)
self.binary_annotations.append(_elem11)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 9:
if ftype == TType.BOOL:
self.debug = iprot.readBool();
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('Span')
if self.trace_id is not None:
oprot.writeFieldBegin('trace_id', TType.I64, 1)
oprot.writeI64(self.trace_id)
oprot.writeFieldEnd()
if self.name is not None:
oprot.writeFieldBegin('name', TType.STRING, 3)
oprot.writeString(self.name)
oprot.writeFieldEnd()
if self.id is not None:
oprot.writeFieldBegin('id', TType.I64, 4)
oprot.writeI64(self.id)
oprot.writeFieldEnd()
if self.parent_id is not None:
oprot.writeFieldBegin('parent_id', TType.I64, 5)
oprot.writeI64(self.parent_id)
oprot.writeFieldEnd()
if self.annotations is not None:
oprot.writeFieldBegin('annotations', TType.LIST, 6)
oprot.writeListBegin(TType.STRUCT, len(self.annotations))
for iter12 in self.annotations:
iter12.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.binary_annotations is not None:
oprot.writeFieldBegin('binary_annotations', TType.LIST, 8)
oprot.writeListBegin(TType.STRUCT, len(self.binary_annotations))
for iter13 in self.binary_annotations:
iter13.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.debug is not None:
oprot.writeFieldBegin('debug', TType.BOOL, 9)
oprot.writeBool(self.debug)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
return
def __repr__(self):
L = ['%s=%r' % (key, value)
for key, value in self.__dict__.iteritems()]
return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
def __eq__(self, other):
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not (self == other)

View File

@ -1,118 +0,0 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
import eventlet
socket = eventlet.import_patched('socket')
time = eventlet.import_patched('time')
scribe = eventlet.import_patched('tomograph.backends.zipkin.'
'generated.scribe.scribe')
TTransport = eventlet.import_patched('thrift.transport.TTransport')
TSocket = eventlet.import_patched('thrift.transport.TSocket')
collections = eventlet.import_patched('collections')
traceback = eventlet.import_patched('traceback')
threading = eventlet.import_patched('threading')
from thrift.protocol import TBinaryProtocol
class ScribeSender(object):
def __init__(self, host='127.0.0.1', port=1463, debug=False,
target_write_size=1000, max_write_interval=1.0,
socket_timeout=5.0, max_queue_length=50000, must_yield=True):
self.dropped = 0
self._remote_host = host
self._remote_port = port
self._socket_timeout = socket_timeout
self._hostname = socket.gethostname()
self._max_queue_length = max_queue_length
self._max_write_interval = max_write_interval
self._target_write_size = target_write_size
self._debug = True
self._log_buffer = collections.deque()
self._last_write = 0
self._must_yield = must_yield
self._lock = threading.Lock()
eventlet.spawn_after(max_write_interval, self.flush)
def close(self):
self.flush()
def send(self, category, msg):
"""Send one record to scribe."""
log_entry = scribe.LogEntry(category=category, message=msg)
self._log_buffer.append(log_entry)
self._dropMsgs()
now = time.time()
if len(self._log_buffer) >= self._target_write_size or \
now - self._last_write > self._max_write_interval:
self._last_write = now
eventlet.spawn_n(self.flush)
# We can't do a "looping call" or other permanent thread
# kind of thing because openstack creates new hubs. The
# spawn_after here (and in __init__) give us a simple way
# to ensure stuff doesn't sit in the buffer forever just
# because the app isn't logging stuff.
eventlet.spawn_after(self._max_write_interval, self.flush)
if self._must_yield:
# prevent the flushers from starving
eventlet.sleep()
def flush(self):
dropped = 0
try:
self._lock.acquire()
buf = []
while self._log_buffer:
buf.append(self._log_buffer.popleft())
if buf:
if self._debug:
print("ScribeSender: flushing {0} msgs".format(len(buf)))
try:
client = self._getClient()
result = client.Log(messages=buf)
if result == scribe.ResultCode.TRY_LATER:
dropped += len(buf)
except Exception:
if self._debug:
print("ScribeSender: caught exception writing "
"log message:")
traceback.print_exc()
dropped += len(buf)
finally:
self._lock.release()
self.dropped += dropped
if self._debug and dropped:
print("ScribeSender: dropped {0} messages for "
"communication problem.".format(dropped))
def _dropMsgs(self):
dropped = 0
while len(self._log_buffer) > self._max_queue_length:
self._log_buffer.popleft()
dropped += 1
self.dropped += dropped
if self._debug and dropped:
print("ScribeSender: dropped {0} messages for queue "
"length.".format(dropped))
def _getClient(self):
# We can't just keep a connection because the app might fork
# and we'll be left with the parent process's connection.
# More robust to just open one for each flush.
sock = TSocket.TSocket(host=self._remote_host, port=self._remote_port)
sock.setTimeout(self._socket_timeout * 1000)
transport = TTransport.TFramedTransport(sock)
transport.open()
protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
return scribe.Client(protocol)

View File

@ -1,114 +0,0 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
from __future__ import print_function
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket
from thrift.transport import TTransport
from tomograph.backends.zipkin.generated.scribe import scribe
from tomograph.backends.zipkin import sender
from tomograph.backends.zipkin import zipkin_thrift
from tomograph import cache
from tomograph import config
import StringIO
import atexit
import base64
import random
import socket
import struct
import sys
import time
import traceback
scribe_config = {
'host': config.zipkin_host,
'port': config.zipkin_port,
'socket_timeout': config.zipkin_socket_timeout,
'target_write_size': config.zipkin_target_write_size,
'max_queue_length': config.zipkin_max_queue_length,
'must_yield': config.zipkin_must_yield,
'max_write_interval': config.zipkin_max_write_interval,
'debug': config.zipkin_debug_scribe_sender,
}
scribe_sender = sender.ScribeSender(**scribe_config)
atexit.register(scribe_sender.close)
hostname_cache = cache.Cache(socket.gethostbyname)
def send(span):
def endpoint(note):
try:
ip = hostname_cache.get(note.address)
except Exception:
print('host resolution error: %s' % traceback.format_exc(),
file=sys.stderr)
ip = '0.0.0.0'
return zipkin_thrift.Endpoint(ipv4=ip_to_i32(ip),
port=port_to_i16(note.port),
service_name=note.service_name)
def annotation(note):
return zipkin_thrift.Annotation(timestamp=int(note.time * 1e6),
value=note.value,
duration=note.duration,
host=endpoint(note))
def binary_annotation(dimension):
if isinstance(dimension.value, str):
tag_type = zipkin_thrift.AnnotationType.STRING
val = dimension.value
elif isinstance(dimension.value, float):
tag_type = zipkin_thrift.AnnotationType.DOUBLE
val = struct.pack('>d', dimension.value)
elif isinstance(dimension.value, int):
tag_type = zipkin_thrift.AnnotationType.I64
val = struct.pack('>q', dimension.value)
else:
raise RuntimeError("unsupported tag type")
return zipkin_thrift.BinaryAnnotation(key=dimension.key,
value=val,
annotation_type=tag_type,
host=endpoint(dimension))
binary_annotations = [binary_annotation(d) for d in span.dimensions]
zspan = zipkin_thrift.Span(trace_id=span.trace_id,
id=span.id,
name=span.name,
parent_id=span.parent_id,
annotations=[annotation(n) for n in span.notes],
binary_annotations=binary_annotations)
out = StringIO.StringIO()
#raw = TBinaryProtocol.TBinaryProtocolAccelerated(out)
raw = TBinaryProtocol.TBinaryProtocol(out)
try:
zspan.write(raw)
except OverflowError:
traceback.print_exc()
scribe_sender.send('zipkin', base64.b64encode(out.getvalue()))
def ip_to_i32(ip_str):
"""convert an ip address from a string to a signed 32-bit number"""
return struct.unpack('!i', socket.inet_aton(ip_str))[0]
def port_to_i16(port_num):
"""conver a port number to a signed 16-bit int"""
if port_num > 2 ** 15:
port_num -= 2 ** 16
return port_num

View File

@ -1,12 +0,0 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
from generated.zipkinCore.constants import *

View File

@ -1,31 +0,0 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
import threading
class Cache(object):
def __init__(self, thunk, size_limit=1000):
self._map = {}
self._thunk = thunk
self._size_limit = size_limit
self._lock = threading.Lock()
def get(self, k):
with self._lock:
if k in self._map:
return self._map[k]
else:
while len(self._map) >= self._size_limit:
self._map.popitem()
v = self._thunk(k)
self._map[k] = v
return v

View File

@ -1,62 +0,0 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
import logging
logger = logging.getLogger(__name__)
enabled_backends = ['tomograph.backends.zipkin',
'tomograph.backends.statsd',
'tomograph.backends.log']
backend_modules = []
zipkin_host = '127.0.0.1'
zipkin_port = 9410
statsd_host = '127.0.0.1'
statsd_port = 8125
zipkin_socket_timeout = 5.0
zipkin_max_queue_length = 50000
zipkin_target_write_size = 1000
zipkin_max_write_interval = 1
zipkin_must_yield = True
zipkin_debug_scribe_sender = False
debug = False
db_tracing_enabled = True
db_trace_as_spans = False
def set_backends(backends):
"""Set the list of enabled backends. Backend name should be the full
module name of the backend. All backends must support a send(span) method.
"""
global enabled_backends
global backend_modules
enabled_backends = backends[:]
backend_modules = []
for backend in enabled_backends:
try:
logger.info('loading backend {0}'.format(backend))
module = __import__(backend)
for submodule in backend.split('.')[1:]:
module = getattr(module, submodule)
backend_modules.append(module)
except (ImportError, AttributeError, ValueError) as err:
raise RuntimeError('Could not load tomograph backend '
'{0}: {1}'.format(backend, err))
def get_backends():
if not backend_modules:
set_backends(enabled_backends)
return backend_modules

View File

@ -1,232 +0,0 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
from __future__ import absolute_import
import base64
import logging
import pickle
import random
import socket
import sys
import time
from eventlet import corolocal
from tomograph import config
from tomograph import types
import webob.dec
span_stack = corolocal.local()
def start(service_name, name, address, port, trace_info=None):
parent_id = None
if tracing_started():
trace_id = span_stack.trace_id
parent_id = cur_span().id
else:
if trace_info is None:
trace_id = span_stack.trace_id = getId()
else:
trace_id = span_stack.trace_id = trace_info[0]
parent_id = trace_info[1]
span_stack.spans = []
span = types.Span(trace_id, parent_id, getId(), name, [], [])
span_stack.spans.append(span)
annotate('start', service_name, address, port)
def tracing_started():
return hasattr(span_stack, 'trace_id')
def cur_span():
if not tracing_started():
start('orphan', 'orphan', '127.0.0.1', '1')
return span_stack.spans[-1]
def get_trace_info():
if tracing_started():
return (span_stack.trace_id, cur_span().id)
else:
return None
def stop(name):
annotate('stop')
span = span_stack.spans.pop()
assert span.name == name, ('start span name {0} not equal '
'to end span name {1}'.format(span.name, name))
if not span_stack.spans:
del(span_stack.trace_id)
for backend in config.get_backends():
backend.send(span)
def annotate(value, service_name=None, address=None, port=None, duration=None):
"""Add an annotation at a particular point in time, (with an optional
duration).
"""
# attempt to default some values
if service_name is None:
service_name = cur_span().notes[0].service_name
if address is None:
address = cur_span().notes[0].address
if port is None:
port = cur_span().notes[0].port
if duration is None:
duration = 0
note = types.Note(time.time(), str(value), service_name, address,
int(port), int(duration))
cur_span().notes.append(note)
def tag(key, value, service_name=None, address=None, port=None):
"""Add a key/value tag to the current span. values can be int,
float, or string.
"""
assert (isinstance(value, str) or isinstance(value, int)
or isinstance(value, float))
if service_name is None:
service_name = cur_span().notes[0].service_name
if address is None:
address = cur_span().notes[0].address
if port is None:
port = cur_span().notes[0].port
tag = types.Tag(str(key), value, service_name, address, port)
cur_span().dimensions.append(tag)
def getId():
return random.randrange(sys.maxint >> 10)
## wrapper/decorators
def tracewrap(func, service_name, name, host='0.0.0.0', port=0):
if host == '0.0.0.0':
host = socket.gethostname()
def trace_and_call(*args, **kwargs):
if service_name is None and len(args) > 0 \
and isinstance(args[0], object):
s = args[0].__class__.__name__
else:
s = service_name
start(s, name, host, port)
ret = func(*args, **kwargs)
stop(name)
return ret
return trace_and_call
def traced(service_name, name, host='0.0.0.0', port=0):
def t1(func):
return tracewrap(func, service_name, name, host, port)
return t1
## sqlalchemy event listeners
def before_execute(name):
def handler(conn, clauseelement, multiparams, params):
if not config.db_tracing_enabled:
return
h = str(conn.connection.connection)
a = h.find("'")
b = h.find("'", a + 1)
if b > a:
h = h[a + 1:b]
else:
h = 'unknown'
port = conn.connection.connection.port
#print >>sys.stderr, 'connection is {0}:{1}'.format(h, port)
#print >>sys.stderr, 'sql statement is {0}'.format(clauseelement)
if config.db_trace_as_spans:
start(str(name) + 'db client', 'execute', h, port)
annotate(clauseelement)
return handler
def after_execute(name):
# name isn't used, at least not yet...
def handler(conn, clauseelement, multiparams, params, result):
if not config.db_tracing_enabled:
return
annotate(clauseelement)
# fix up the duration on the annotation for the sql query
start_time = cur_span().notes[0].time
last_note = cur_span().notes.pop()
cur_span().notes.append(types.Note(last_note.time, last_note.value,
last_note.service_name,
last_note.address,
last_note.port,
time.time() - start_time))
if config.db_trace_as_spans:
stop('execute')
return handler
def dbapi_error(name):
def handler(conn, cursor, statement, parameters, context, exception):
if not config.db_tracing_enabled:
return
annotate('database exception {0}'.format(exception))
stop('execute')
return handler
## http helpers
def start_http(service_name, name, request):
trace_info_enc = request.headers.get('X-Trace-Info')
(host, port) = request.host.split(':')
if trace_info_enc:
trace_info = pickle.loads(base64.b64decode(trace_info_enc))
else:
trace_info = None
start(service_name, name, host, port, trace_info)
def add_trace_info_header(headers):
trace_info = get_trace_info()
if trace_info:
headers['X-Trace-Info'] = base64.b64encode(pickle.dumps(trace_info))
## WSGI middleware
class Middleware(object):
"""WSGI Middleware that enables tomograph tracing for an application."""
def __init__(self, application, service_name='Server', name='WSGI'):
self.application = application
self.service_name = service_name
self.name = name
@classmethod
def factory(cls, global_conf, **local_conf):
def filter(app):
return cls(app, **local_conf)
return filter
@webob.dec.wsgify
def __call__(self, req):
start_http(self.service_name, self.name, req)
response = req.get_response(self.application)
stop(self.name)
return response

View File

@ -1,18 +0,0 @@
# Copyright (c) 2012 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
import collections
Span = collections.namedtuple('Span', 'trace_id parent_id id name notes'
' dimensions')
Note = collections.namedtuple('Note', 'time value service_name'
' address port duration')
Tag = collections.namedtuple('Tag', 'key value service_name address port')

View File

@ -1,25 +0,0 @@
# Copyright (c) 2013 Yahoo! Inc. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You may
# obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0 Unless required by
# applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and
# limitations under the License. See accompanying LICENSE file.
TOMOGRAPH_VERSION = ['2013', '1', None]
YEAR, COUNT, REVISION = TOMOGRAPH_VERSION
FINAL = False # May never be final ;)
def canonical_version_string():
return '.'.join(filter(None, TOMOGRAPH_VERSION))
def version_string():
if FINAL:
return canonical_version_string()
else:
return '%s-dev' % (canonical_version_string())

50
tox.ini
View File

@ -1,50 +0,0 @@
[tox]
minversion = 1.6
skipsdist = True
envlist = py26,py27,py33,pep8
[testenv]
usedevelop = True
install_command = pip install {opts} {packages}
setenv = VIRTUAL_ENV={envdir}
LANG=en_US.UTF-8
LANGUAGE=en_US:en
LC_ALL=C
NOSE_WITH_OPENSTACK=1
NOSE_OPENSTACK_COLOR=1
NOSE_OPENSTACK_RED=0.05
NOSE_OPENSTACK_YELLOW=0.025
NOSE_OPENSTACK_SHOW_ELAPSED=1
NOSE_OPENSTACK_STDOUT=1
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = nosetests {posargs}
[testenv:py33]
deps = -r{toxinidir}/py33-requirements.txt
-r{toxinidir}/py33-test-requirements.txt
commands = true
[tox:jenkins]
downloadcache = ~/cache/pip
[testenv:pep8]
commands =
flake8 {posargs}
[testenv:pylint]
setenv = VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/requirements.txt
pylint==0.26.0
commands = pylint
[testenv:cover]
setenv = NOSE_WITH_COVERAGE=1
[testenv:venv]
commands = {posargs}
[flake8]
ignore = H202,H402,F401,F403,H303
builtins = _
exclude = .venv,.tox,dist,doc,*egg,.git,build,tools,./tomograph/backends/zipkin/generated