Refactored statsd code to be modular
The monasca-statsd package was in one monolithic class in a single file. Made it more modular and broke it into classes for the different metrics types, connection and added a Client class for easy management. Change-Id: If4f204f656cc8f8603b5e96e0e734983b3aadcd6
This commit is contained in:
parent
79e3609be8
commit
f9eba6c7cf
62
README.md
62
README.md
|
@ -1,7 +1,5 @@
|
|||
import monascastatsd.monasca_statsdmonasca-statsd
|
||||
================
|
||||
|
||||
A Monasca-Statsd Python client.
|
||||
================
|
||||
|
||||
Quick Start Guide
|
||||
-----------------
|
||||
|
@ -18,35 +16,60 @@ Then start instrumenting your code:
|
|||
|
||||
```
|
||||
# Import the module.
|
||||
import monascastatsd.monasca_statsd
|
||||
import monascastatsd as mstatsd
|
||||
|
||||
mstatsd = monascastatsd.monasca_statsd.MonascaStatsd()
|
||||
# Create the connection
|
||||
conn = mstatsd.Connection(host='localhost', port=8125)
|
||||
|
||||
# Optionally, configure the host and port if you're running Statsd on a
|
||||
# non-standard port.
|
||||
mstatsd.connect('localhost', 8125)
|
||||
# Create the client with optional dimensions
|
||||
client = mstatsd.Client(connection=conn, dimensions={'env': 'test'})
|
||||
|
||||
# Increment a counter.
|
||||
mstatsd.increment('page.views')
|
||||
NOTE: You can also create a client without specifying the connection and it will create the client with the default connection information for the monasca-agent statsd processor daemon which uses host='localhost' and port=8125.
|
||||
|
||||
client = mstatsd.Client(dimensions={'env': 'test'})
|
||||
|
||||
# Increment and decrement a counter.
|
||||
counter = client.get_counter(name='page.views')
|
||||
|
||||
counter.increment()
|
||||
counter += 3
|
||||
|
||||
counter.decrement()
|
||||
counter -= 3
|
||||
|
||||
# Record a gauge 50% of the time.
|
||||
mstatsd.gauge('users.online', 123, sample_rate=0.5)
|
||||
gauge = client.get_gauge('gauge', dimensions={'env': 'test'})
|
||||
|
||||
gauge.send('metric', 123.4, sample_rate=0.5)
|
||||
|
||||
# Sample a histogram.
|
||||
mstatsd.histogram('file.upload.size', 1234)
|
||||
histogram = client.get_histogram('histogram', dimensions={'test': 'True'})
|
||||
|
||||
histogram.send('metric', 123.4, dimensions={'color': 'red'})
|
||||
|
||||
# Time a function call.
|
||||
@mstatsd.timed('page.render')
|
||||
timer = client.get_timer()
|
||||
|
||||
@timer.timed('page.render')
|
||||
def render_page():
|
||||
# Render things ...
|
||||
pass
|
||||
|
||||
# Add a dimension to a metric.
|
||||
mstatsd.histogram('query.time', 10, dimensions = {'version': '1.0', 'environment': 'dev'})
|
||||
# Time a block of code.
|
||||
timer = client.get_timer()
|
||||
|
||||
with timer.time('t'):
|
||||
# Do stuff
|
||||
time.sleep(2)
|
||||
|
||||
# Add dimensions to any metric.
|
||||
histogram = client.get_histogram('my_hist')
|
||||
histogram.send('query.time', 10, dimensions = {'version': '1.0', 'environment': 'dev'})
|
||||
```
|
||||
Documentation
|
||||
Repository
|
||||
-------------
|
||||
|
||||
Read the full API docs
|
||||
The monasca-statsd code is located here:
|
||||
[here](https://github.com/stackforge/monasca-statsd).
|
||||
|
||||
Feedback
|
||||
|
@ -55,11 +78,6 @@ Feedback
|
|||
To suggest a feature, report a bug, or general discussion, head over
|
||||
[here](https://bugs.launchpad.net/monasca).
|
||||
|
||||
Change Log
|
||||
----------
|
||||
- 1.0.0
|
||||
- Initial version of the code
|
||||
|
||||
|
||||
License
|
||||
-------
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
from client import Client
|
||||
from connection import Connection
|
||||
from counter import Counter
|
||||
from gauge import Gauge
|
||||
from histogram import Histogram
|
||||
from metricbase import MetricBase
|
||||
from set import Set
|
||||
from timer import Timer
|
|
@ -0,0 +1,124 @@
|
|||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Monasca-Statsd is a Python client for Statsd that adds dimensions.
|
||||
"""
|
||||
from monascastatsd.connection import Connection
|
||||
from monascastatsd.counter import Counter
|
||||
from monascastatsd.gauge import Gauge
|
||||
from monascastatsd.histogram import Histogram
|
||||
from monascastatsd.set import Set
|
||||
from monascastatsd.timer import Timer
|
||||
|
||||
|
||||
class Client(object):
|
||||
|
||||
def __init__(self, name=None, connection=None, max_buffer_size=50, dimensions=None):
|
||||
"""Initialize a Client object.
|
||||
|
||||
>>> monascastatsd = MonascaStatsd()
|
||||
|
||||
:name: the name for this client. Everything sent by this client
|
||||
will be prefixed by name
|
||||
:param host: the host of the MonascaStatsd server.
|
||||
:param port: the port of the MonascaStatsd server.
|
||||
:param max_buffer_size: Maximum number of metric to buffer before
|
||||
sending to the server if sending metrics in batch
|
||||
"""
|
||||
if connection is None:
|
||||
self.connection = Connection(host='localhost',
|
||||
port=8125,
|
||||
max_buffer_size=50)
|
||||
else:
|
||||
self.connection = connection
|
||||
self._dimensions = dimensions
|
||||
self._name = name
|
||||
|
||||
def get_counter(self, name, connection=None, dimensions=None):
|
||||
"""Gets a Counter object.
|
||||
|
||||
"""
|
||||
if connection is None:
|
||||
connection = self.connection
|
||||
return Counter(name=self._update_name(name),
|
||||
connection=connection,
|
||||
dimensions=self._update_dimensions(dimensions))
|
||||
|
||||
def get_gauge(self, name=None, connection=None, dimensions=None):
|
||||
"""Gets a Gauge object.
|
||||
|
||||
"""
|
||||
if connection is None:
|
||||
connection = self.connection
|
||||
return Gauge(name=self._update_name(name),
|
||||
connection=connection,
|
||||
dimensions=self._update_dimensions(dimensions))
|
||||
|
||||
def get_histogram(self, name=None, connection=None, dimensions=None):
|
||||
"""Gets a Histogram object.
|
||||
|
||||
"""
|
||||
if connection is None:
|
||||
connection = self.connection
|
||||
return Histogram(name=self._update_name(name),
|
||||
connection=connection,
|
||||
dimensions=self._update_dimensions(dimensions))
|
||||
|
||||
def get_set(self, name=None, connection=None, dimensions=None):
|
||||
"""Gets a Set object.
|
||||
|
||||
"""
|
||||
if connection is None:
|
||||
connection = self.connection
|
||||
return Set(name=self._update_name(name),
|
||||
connection=connection,
|
||||
dimensions=self._update_dimensions(dimensions))
|
||||
|
||||
def get_timer(self, name=None, connection=None, dimensions=None):
|
||||
"""Gets a Timer object.
|
||||
|
||||
"""
|
||||
if connection is None:
|
||||
connection = self.connection
|
||||
return Timer(name=self._update_name(name),
|
||||
connection=connection,
|
||||
dimensions=self._update_dimensions(dimensions))
|
||||
|
||||
def _update_name(self, name):
|
||||
"""Update the metric name with the client
|
||||
|
||||
name that was passed in on instantiation.
|
||||
"""
|
||||
if self._name:
|
||||
metric = self._name
|
||||
if name:
|
||||
metric = metric + "." + name
|
||||
else:
|
||||
metric = name
|
||||
return metric
|
||||
|
||||
def _update_dimensions(self, dimensions):
|
||||
"""Update the dimensions list with the default
|
||||
|
||||
dimensions that were passed in on instantiation.
|
||||
"""
|
||||
if self._dimensions:
|
||||
new_dimensions = self._dimensions.copy()
|
||||
else:
|
||||
new_dimensions = {}
|
||||
if dimensions:
|
||||
new_dimensions.update(dimensions)
|
||||
|
||||
return new_dimensions
|
|
@ -0,0 +1,97 @@
|
|||
import logging
|
||||
import random
|
||||
import socket
|
||||
|
||||
try:
|
||||
import itertools
|
||||
except ImportError:
|
||||
imap = map
|
||||
|
||||
logging.basicConfig()
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Connection(object):
|
||||
|
||||
def __init__(self, host='localhost', port=8125, max_buffer_size=50):
|
||||
"""Initialize a Connection object.
|
||||
|
||||
>>> monascastatsd = MonascaStatsd()
|
||||
|
||||
:name: the name for this client. Everything sent by this client
|
||||
will be prefixed by name
|
||||
:param host: the host of the MonascaStatsd server.
|
||||
:param port: the port of the MonascaStatsd server.
|
||||
:param max_buffer_size: Maximum number of metric to buffer before
|
||||
sending to the server if sending metrics in batch
|
||||
"""
|
||||
self._host = None
|
||||
self._port = None
|
||||
self.socket = None
|
||||
self.max_buffer_size = max_buffer_size
|
||||
self._send = self._send_to_server
|
||||
self.connect(host, port)
|
||||
self.encoding = 'utf-8'
|
||||
|
||||
def __enter__(self):
|
||||
self.open_buffer(self.max_buffer_size)
|
||||
return self
|
||||
|
||||
def __exit__(self, the_type, value, traceback):
|
||||
self.close_buffer()
|
||||
|
||||
def open_buffer(self, max_buffer_size=50):
|
||||
"""Open a buffer to send a batch of metrics in one packet.
|
||||
|
||||
"""
|
||||
self.max_buffer_size = max_buffer_size
|
||||
self.buffer = []
|
||||
self._send = self._send_to_buffer
|
||||
|
||||
def close_buffer(self):
|
||||
"""Flush the buffer and switch back to single metric packets.
|
||||
|
||||
"""
|
||||
self._send = self._send_to_server
|
||||
self._flush_buffer()
|
||||
|
||||
def connect(self, host, port):
|
||||
"""Connect to the monascastatsd server on the given host and port.
|
||||
|
||||
"""
|
||||
self._host = host
|
||||
self._port = int(port)
|
||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.socket.connect((self._host, self._port))
|
||||
|
||||
def report(self, metric, metric_type, value, dimensions, sample_rate):
|
||||
"""Use this connection to report metrics.
|
||||
|
||||
"""
|
||||
if sample_rate != 1 and random.random() > sample_rate:
|
||||
return
|
||||
|
||||
payload = [metric, ":", value, "|", metric_type]
|
||||
if sample_rate != 1:
|
||||
payload.extend(["|@", sample_rate])
|
||||
if dimensions:
|
||||
payload.extend(["|#"])
|
||||
payload.append(dimensions)
|
||||
|
||||
encoded = "".join(itertools.imap(str, payload))
|
||||
self._send(encoded)
|
||||
|
||||
def _send_to_server(self, packet):
|
||||
try:
|
||||
self.socket.send(packet.encode(self.encoding))
|
||||
except socket.error:
|
||||
log.exception("Error submitting metric")
|
||||
|
||||
def _send_to_buffer(self, packet):
|
||||
self.buffer.append(packet)
|
||||
if len(self.buffer) >= self.max_buffer_size:
|
||||
self._flush_buffer()
|
||||
|
||||
def _flush_buffer(self):
|
||||
self._send_to_server("\n".join(self.buffer))
|
||||
self.buffer = []
|
|
@ -0,0 +1,55 @@
|
|||
from metricbase import MetricBase
|
||||
|
||||
|
||||
class Counter(MetricBase):
|
||||
|
||||
def __init__(self, name, connection, dimensions=None):
|
||||
super(self.__class__, self).__init__(name=name,
|
||||
connection=connection,
|
||||
dimensions=dimensions)
|
||||
|
||||
def increment(self, value=1, dimensions=None, sample_rate=1):
|
||||
"""Increment a counter, optionally setting a value, dimensions
|
||||
|
||||
and a sample rate.
|
||||
|
||||
>>> monascastatsd.increment()
|
||||
>>> monascastatsd.increment(12)
|
||||
"""
|
||||
self._connection.report(metric=self._name,
|
||||
metric_type='c',
|
||||
value=value,
|
||||
dimensions=self.update_dimensions(dimensions),
|
||||
sample_rate=sample_rate)
|
||||
|
||||
def decrement(self, value=1, dimensions=None, sample_rate=1):
|
||||
"""Decrement a counter, optionally setting a value, dimensions and a
|
||||
|
||||
sample rate.
|
||||
|
||||
>>> monascastatsd.decrement()
|
||||
>>> monascastatsd.decrement(2)
|
||||
"""
|
||||
self._connection.report(metric=self._name,
|
||||
metric_type='c',
|
||||
value=-value,
|
||||
dimensions=self.update_dimensions(dimensions),
|
||||
sample_rate=sample_rate)
|
||||
|
||||
def __add__(self, value):
|
||||
"""Increment the counter with `value`
|
||||
|
||||
:keyword value: The value to add to the counter
|
||||
:type value: int
|
||||
"""
|
||||
self.increment(value=value)
|
||||
return self
|
||||
|
||||
def __sub__(self, value):
|
||||
"""Decrement the counter with `value`
|
||||
|
||||
:keyword value: The value to remove from the counter
|
||||
:type value: int
|
||||
"""
|
||||
self.decrement(value=value)
|
||||
return self
|
|
@ -0,0 +1,24 @@
|
|||
from metricbase import MetricBase
|
||||
|
||||
|
||||
class Gauge(MetricBase):
|
||||
|
||||
def __init__(self, connection, name=None, dimensions=None):
|
||||
super(self.__class__, self).__init__(name=name,
|
||||
connection=connection,
|
||||
dimensions=dimensions)
|
||||
|
||||
def send(self, name, value, dimensions=None, sample_rate=1):
|
||||
"""Record the value of a gauge, optionally setting a list of
|
||||
|
||||
dimensions and a sample rate.
|
||||
|
||||
>>> monascastatsd.gauge('users.online', 123)
|
||||
>>> monascastatsd.gauge('active.connections', 1001,
|
||||
>>> dimensions={"protocol": "http"})
|
||||
"""
|
||||
self._connection.report(metric=self.update_name(name),
|
||||
metric_type='g',
|
||||
value=value,
|
||||
dimensions=self.update_dimensions(dimensions),
|
||||
sample_rate=sample_rate)
|
|
@ -0,0 +1,24 @@
|
|||
from metricbase import MetricBase
|
||||
|
||||
|
||||
class Histogram(MetricBase):
|
||||
|
||||
def __init__(self, connection, name=None, dimensions=None):
|
||||
super(self.__class__, self).__init__(name=name,
|
||||
connection=connection,
|
||||
dimensions=dimensions)
|
||||
|
||||
def send(self, name, value, dimensions=None, sample_rate=1):
|
||||
"""Sample a histogram value, optionally setting dimensions and a
|
||||
|
||||
sample rate.
|
||||
|
||||
>>> monascastatsd.histogram('uploaded.file.size', 1445)
|
||||
>>> monascastatsd.histogram('album.photo.count', 26,
|
||||
>>> dimensions={"gender": "female"})
|
||||
"""
|
||||
self._connection.report(metric=self.update_name(name),
|
||||
metric_type='h',
|
||||
value=value,
|
||||
dimensions=self.update_dimensions(dimensions),
|
||||
sample_rate=sample_rate)
|
|
@ -0,0 +1,36 @@
|
|||
class MetricBase(object):
|
||||
"""Base class for all metric types.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, name, connection, dimensions):
|
||||
self._name = name
|
||||
self._connection = connection
|
||||
self._dimensions = dimensions
|
||||
|
||||
def update_dimensions(self, dimensions):
|
||||
"""Update the dimensions list with the default
|
||||
|
||||
dimensions that were passed in on instantiation.
|
||||
"""
|
||||
if self._dimensions:
|
||||
new_dimensions = self._dimensions.copy()
|
||||
else:
|
||||
new_dimensions = {}
|
||||
if dimensions:
|
||||
new_dimensions.update(dimensions)
|
||||
|
||||
return new_dimensions
|
||||
|
||||
def update_name(self, name):
|
||||
"""Update the metric name with the metric
|
||||
|
||||
base name that was passed in on instantiation.
|
||||
"""
|
||||
if self._name:
|
||||
metric = self._name
|
||||
if name:
|
||||
metric = metric + "." + name
|
||||
else:
|
||||
metric = name
|
||||
return metric
|
|
@ -1,207 +0,0 @@
|
|||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Monasca-Statsd is a Python client for Statsd that adds dimensions.
|
||||
"""
|
||||
|
||||
import functools
|
||||
import logging
|
||||
import random
|
||||
import socket
|
||||
import time
|
||||
|
||||
try:
|
||||
import itertools
|
||||
except ImportError:
|
||||
imap = map
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MonascaStatsd(object):
|
||||
|
||||
def __init__(self, host='localhost', port=8125, max_buffer_size=50):
|
||||
"""Initialize a MonascaStatsd object.
|
||||
|
||||
>>> monascastatsd = MonascaStatsd()
|
||||
|
||||
:param host: the host of the MonascaStatsd server.
|
||||
:param port: the port of the MonascaStatsd server.
|
||||
:param max_buffer_size: Maximum number of metric to buffer before
|
||||
sending to the server if sending metrics in batch
|
||||
"""
|
||||
self._host = None
|
||||
self._port = None
|
||||
self.socket = None
|
||||
self.max_buffer_size = max_buffer_size
|
||||
self._send = self._send_to_server
|
||||
self.connect(host, port)
|
||||
self.encoding = 'utf-8'
|
||||
|
||||
def __enter__(self):
|
||||
self.open_buffer(self.max_buffer_size)
|
||||
return self
|
||||
|
||||
def __exit__(self, the_type, value, traceback):
|
||||
self.close_buffer()
|
||||
|
||||
def open_buffer(self, max_buffer_size=50):
|
||||
'''Open a buffer to send a batch of metrics in one packet
|
||||
|
||||
You can also use this as a context manager.
|
||||
|
||||
>>> with DogStatsd() as batch:
|
||||
>>> batch.gauge('users.online', 123)
|
||||
>>> batch.gauge('active.connections', 1001)
|
||||
|
||||
'''
|
||||
self.max_buffer_size = max_buffer_size
|
||||
self.buffer = []
|
||||
self._send = self._send_to_buffer
|
||||
|
||||
def close_buffer(self):
|
||||
'''Flush the buffer and switch back to single metric packets.'''
|
||||
self._send = self._send_to_server
|
||||
self._flush_buffer()
|
||||
|
||||
def connect(self, host, port):
|
||||
"""Connect to the monascastatsd server on the given host and port."""
|
||||
self._host = host
|
||||
self._port = int(port)
|
||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.socket.connect((self._host, self._port))
|
||||
|
||||
def gauge(self, metric, value, dimensions=None, sample_rate=1):
|
||||
"""Record the value of a gauge, optionally setting a list of
|
||||
|
||||
dimensions and a sample rate.
|
||||
|
||||
>>> monascastatsd.gauge('users.online', 123)
|
||||
>>> monascastatsd.gauge('active.connections', 1001,
|
||||
>>> dimensions={"protocol": "http"})
|
||||
"""
|
||||
return self._report(metric, 'g', value, dimensions, sample_rate)
|
||||
|
||||
def increment(self, metric, value=1, dimensions=None, sample_rate=1):
|
||||
"""Increment a counter, optionally setting a value, dimensions
|
||||
|
||||
and a sample rate.
|
||||
|
||||
>>> monascastatsd.increment('page.views')
|
||||
>>> monascastatsd.increment('files.transferred', 124)
|
||||
"""
|
||||
self._report(metric, 'c', value, dimensions, sample_rate)
|
||||
|
||||
def decrement(self, metric, value=1, dimensions=None, sample_rate=1):
|
||||
"""Decrement a counter, optionally setting a value, dimensions and a
|
||||
|
||||
sample rate.
|
||||
|
||||
>>> monascastatsd.decrement('files.remaining')
|
||||
>>> monascastatsd.decrement('active.connections', 2)
|
||||
"""
|
||||
self._report(metric, 'c', -value, dimensions, sample_rate)
|
||||
|
||||
def histogram(self, metric, value, dimensions=None, sample_rate=1):
|
||||
"""Sample a histogram value, optionally setting dimensions and a
|
||||
|
||||
sample rate.
|
||||
|
||||
>>> monascastatsd.histogram('uploaded.file.size', 1445)
|
||||
>>> monascastatsd.histogram('album.photo.count', 26,
|
||||
>>> dimensions={"gender": "female"})
|
||||
"""
|
||||
self._report(metric, 'h', value, dimensions, sample_rate)
|
||||
|
||||
def timing(self, metric, value, dimensions=None, sample_rate=1):
|
||||
"""Record a timing, optionally setting dimensions and a sample rate.
|
||||
|
||||
>>> monascastatsd.timing("query.response.time", 1234)
|
||||
"""
|
||||
self._report(metric, 'ms', value, dimensions, sample_rate)
|
||||
|
||||
def timed(self, metric, dimensions=None, sample_rate=1):
|
||||
"""A decorator that will measure the distribution of a function's
|
||||
|
||||
run time. Optionally specify a list of tag or a sample rate.
|
||||
::
|
||||
|
||||
@monascastatsd.timed('user.query.time', sample_rate=0.5)
|
||||
def get_user(user_id):
|
||||
# Do what you need to ...
|
||||
pass
|
||||
|
||||
# Is equivalent to ...
|
||||
start = time.time()
|
||||
try:
|
||||
get_user(user_id)
|
||||
finally:
|
||||
monascastatsd.timing('user.query.time', time.time() - start)
|
||||
"""
|
||||
def wrapper(func):
|
||||
@functools.wraps(func)
|
||||
def wrapped(*args, **kwargs):
|
||||
start = time.time()
|
||||
result = func(*args, **kwargs)
|
||||
self.timing(metric,
|
||||
time.time() - start,
|
||||
dimensions=dimensions,
|
||||
sample_rate=sample_rate)
|
||||
return result
|
||||
wrapped.__name__ = func.__name__
|
||||
wrapped.__doc__ = func.__doc__
|
||||
wrapped.__dict__.update(func.__dict__)
|
||||
return wrapped
|
||||
return wrapper
|
||||
|
||||
def set(self, metric, value, dimensions=None, sample_rate=1):
|
||||
"""Sample a set value.
|
||||
|
||||
>>> monascastatsd.set('visitors.uniques', 999)
|
||||
"""
|
||||
|
||||
self._report(metric, 's', value, dimensions, sample_rate)
|
||||
|
||||
def _report(self, metric, metric_type, value, dimensions, sample_rate):
|
||||
if sample_rate != 1 and random.random() > sample_rate:
|
||||
return
|
||||
|
||||
payload = [metric, ":", value, "|", metric_type]
|
||||
if sample_rate != 1:
|
||||
payload.extend(["|@", sample_rate])
|
||||
if dimensions:
|
||||
payload.extend(["|#"])
|
||||
payload.append(dimensions)
|
||||
|
||||
encoded = "".join(itertools.imap(str, payload))
|
||||
self._send(encoded)
|
||||
|
||||
def _send_to_server(self, packet):
|
||||
try:
|
||||
self.socket.send(packet.encode(self.encoding))
|
||||
except socket.error:
|
||||
log.exception("Error submitting metric")
|
||||
|
||||
def _send_to_buffer(self, packet):
|
||||
self.buffer.append(packet)
|
||||
if len(self.buffer) >= self.max_buffer_size:
|
||||
self._flush_buffer()
|
||||
|
||||
def _flush_buffer(self):
|
||||
self._send_to_server("\n".join(self.buffer))
|
||||
self.buffer = []
|
||||
|
||||
monascastatsd = MonascaStatsd()
|
|
@ -0,0 +1,21 @@
|
|||
from metricbase import MetricBase
|
||||
|
||||
|
||||
class Set(MetricBase):
|
||||
|
||||
def __init__(self, connection, name=None, dimensions=None):
|
||||
super(self.__class__, self).__init__(name=name,
|
||||
connection=connection,
|
||||
dimensions=dimensions)
|
||||
|
||||
def send(self, name, value, dimensions=None, sample_rate=1):
|
||||
"""Sample a set value.
|
||||
|
||||
>>> monascastatsd.set('visitors.uniques', 999)
|
||||
"""
|
||||
|
||||
self._connection.report(metric=self.update_name(name),
|
||||
metric_type='s',
|
||||
value=value,
|
||||
dimensions=self.update_dimensions(dimensions),
|
||||
sample_rate=sample_rate)
|
|
@ -0,0 +1,74 @@
|
|||
import contextlib
|
||||
import functools
|
||||
import time
|
||||
|
||||
from metricbase import MetricBase
|
||||
|
||||
|
||||
class Timer(MetricBase):
|
||||
|
||||
def __init__(self, connection, name=None, dimensions=None):
|
||||
super(self.__class__, self).__init__(name=name,
|
||||
connection=connection,
|
||||
dimensions=dimensions)
|
||||
|
||||
def timing(self, name, value, dimensions=None, sample_rate=1):
|
||||
"""Record a timing, optionally setting dimensions and a sample rate.
|
||||
|
||||
>>> monascastatsd.timing("query.response.time", 1234)
|
||||
"""
|
||||
self._connection.report(metric=self.update_name(name),
|
||||
metric_type='ms',
|
||||
value=value,
|
||||
dimensions=self.update_dimensions(dimensions),
|
||||
sample_rate=sample_rate)
|
||||
|
||||
def timed(self, name, dimensions=None, sample_rate=1):
|
||||
"""A decorator that will measure the distribution of a function's
|
||||
|
||||
run time. Optionally specify a list of tag or a sample rate.
|
||||
::
|
||||
|
||||
@monascastatsd.timed('user.query.time', sample_rate=0.5)
|
||||
def get_user(user_id):
|
||||
# Do what you need to ...
|
||||
pass
|
||||
|
||||
# Is equivalent to ...
|
||||
start = time.time()
|
||||
try:
|
||||
get_user(user_id)
|
||||
finally:
|
||||
monascastatsd.timing('user.query.time', time.time() - start)
|
||||
"""
|
||||
def wrapper(func):
|
||||
@functools.wraps(func)
|
||||
def wrapped(*args, **kwargs):
|
||||
start = time.time()
|
||||
result = func(*args, **kwargs)
|
||||
self.timing(name,
|
||||
time.time() - start,
|
||||
dimensions=dimensions,
|
||||
sample_rate=sample_rate)
|
||||
return result
|
||||
wrapped.__name__ = func.__name__
|
||||
wrapped.__doc__ = func.__doc__
|
||||
wrapped.__dict__.update(func.__dict__)
|
||||
return wrapped
|
||||
return wrapper
|
||||
|
||||
@contextlib.contextmanager
|
||||
def time(self, name, dimensions=None, sample_rate=1):
|
||||
"""Time a block of code, optionally setting dimensions and a sample rate.
|
||||
|
||||
try:
|
||||
with monascastatsd.time("query.response.time"):
|
||||
Do something...
|
||||
except Exception:
|
||||
Log something...
|
||||
"""
|
||||
|
||||
start_time = time.time()
|
||||
yield
|
||||
end_time = time.time()
|
||||
self.timing(name, end_time - start_time, dimensions, sample_rate)
|
|
@ -18,7 +18,7 @@ import ConfigParser
|
|||
import random
|
||||
import time
|
||||
|
||||
import monascastatsd.monasca_statsd
|
||||
import monascastatsd as mstatsd
|
||||
|
||||
import statsd
|
||||
|
||||
|
@ -43,30 +43,42 @@ class MonascaStatsdGenerator(object):
|
|||
def send_messages(self):
|
||||
'''Main processing for sending messages.'''
|
||||
try:
|
||||
mstatsd = monascastatsd.monasca_statsd.MonascaStatsd()
|
||||
mstatsd.connect(self.host, self.port)
|
||||
conn = mstatsd.Connection(host=self.host, port=self.port)
|
||||
self.client = mstatsd.Client(name='statsd-generator', connection=conn)
|
||||
for index in range(1, self.num_of_iterations + 1):
|
||||
print("Starting iteration " + str(index) +
|
||||
" of " + str(self.num_of_iterations))
|
||||
mstatsd.increment('Teraflops', 5)
|
||||
mstatsd.gauge('NumOfTeraflops',
|
||||
random.uniform(1.0, 10.0),
|
||||
dimensions={'Origin': 'Dev',
|
||||
'Environment': 'Test'})
|
||||
mstatsd.histogram('file.upload.size',
|
||||
random.randrange(1, 100),
|
||||
dimensions={'Version': '1.0'})
|
||||
counter = self.client.get_counter('teraflops')
|
||||
counter.increment(5)
|
||||
gauge = self.client.get_gauge()
|
||||
gauge.send('num_of_teraflops',
|
||||
random.uniform(1.0, 10.0),
|
||||
dimensions={'origin': 'dev',
|
||||
'environment': 'test'})
|
||||
histogram = self.client.get_histogram('hist')
|
||||
histogram.send('file.upload.size',
|
||||
random.randrange(1, 100),
|
||||
dimensions={'version': '1.0'})
|
||||
set = self.client.get_set('hist')
|
||||
set.send('load_time',
|
||||
random.randrange(1, 100),
|
||||
dimensions={'page_name': 'mypage.html'})
|
||||
|
||||
@mstatsd.timed('config_db_time',
|
||||
dimensions={'db_name': 'mydb'})
|
||||
timer = self.client.get_timer('timer')
|
||||
|
||||
@timer.timed('config_db_time',
|
||||
dimensions={'db_name': 'mydb'})
|
||||
def time_db():
|
||||
time.sleep(0.5)
|
||||
time.sleep(0.2)
|
||||
time_db()
|
||||
|
||||
with timer.time('time_block'):
|
||||
time.sleep(0.3)
|
||||
|
||||
# Send some regular statsd messages
|
||||
counter = statsd.Counter('statsd_generator')
|
||||
counter = statsd.Counter('statsd_counter')
|
||||
counter += 1
|
||||
gauge = statsd.Gauge('statsd_generator')
|
||||
gauge = statsd.Gauge('statsd_gauge')
|
||||
gauge.send('cpu_percent',
|
||||
random.uniform(1.0, 100.0))
|
||||
print("Completed iteration " + str(index) +
|
||||
|
|
|
@ -21,7 +21,7 @@ import socket
|
|||
import time
|
||||
import unittest
|
||||
|
||||
import monascastatsd.monasca_statsd
|
||||
import monascastatsd as mstatsd
|
||||
|
||||
|
||||
class FakeSocket(object):
|
||||
|
@ -50,105 +50,147 @@ class BrokenSocket(FakeSocket):
|
|||
raise socket.error("Socket error")
|
||||
|
||||
|
||||
class TestMonStatsd(unittest.TestCase):
|
||||
class TestMonascaStatsd(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.monascastatsd = monascastatsd.monasca_statsd.MonascaStatsd()
|
||||
self.monascastatsd.socket = FakeSocket()
|
||||
conn = mstatsd.Connection()
|
||||
conn.socket = FakeSocket()
|
||||
self.client = mstatsd.Client(connection=conn, dimensions={'env': 'test'})
|
||||
|
||||
def recv(self):
|
||||
return self.monascastatsd.socket.recv()
|
||||
|
||||
def test_set(self):
|
||||
self.monascastatsd.set('set', 123)
|
||||
assert self.recv() == 'set:123|s'
|
||||
|
||||
def test_gauge(self):
|
||||
self.monascastatsd.gauge('gauge', 123.4)
|
||||
assert self.recv() == 'gauge:123.4|g'
|
||||
def recv(self, metric_obj):
|
||||
return metric_obj._connection.socket.recv()
|
||||
|
||||
def test_counter(self):
|
||||
self.monascastatsd.increment('page.views')
|
||||
self.assertEqual('page.views:1|c', self.recv())
|
||||
counter = self.client.get_counter(name='page.views')
|
||||
|
||||
self.monascastatsd.increment('page.views', 11)
|
||||
self.assertEqual('page.views:11|c', self.recv())
|
||||
counter.increment()
|
||||
self.assertEqual("page.views:1|c|#{'env': 'test'}",
|
||||
self.recv(counter))
|
||||
|
||||
self.monascastatsd.decrement('page.views')
|
||||
self.assertEqual('page.views:-1|c', self.recv())
|
||||
counter += 1
|
||||
self.assertEqual("page.views:1|c|#{'env': 'test'}",
|
||||
self.recv(counter))
|
||||
|
||||
self.monascastatsd.decrement('page.views', 12)
|
||||
self.assertEqual('page.views:-12|c', self.recv())
|
||||
counter.increment(11)
|
||||
self.assertEqual("page.views:11|c|#{'env': 'test'}",
|
||||
self.recv(counter))
|
||||
|
||||
def test_histogram(self):
|
||||
self.monascastatsd.histogram('histo', 123.4)
|
||||
self.assertEqual('histo:123.4|h', self.recv())
|
||||
counter += 11
|
||||
self.assertEqual("page.views:11|c|#{'env': 'test'}",
|
||||
self.recv(counter))
|
||||
|
||||
def test_gauge_with_dimensions(self):
|
||||
self.monascastatsd.gauge('gt', 123.4,
|
||||
dimensions={'country': 'china',
|
||||
'age': 45,
|
||||
'color': 'blue'})
|
||||
self.assertEqual("gt:123.4|g|#{" +
|
||||
"'color': 'blue', " +
|
||||
"'country': 'china', " +
|
||||
"'age': 45}",
|
||||
self.recv())
|
||||
counter.decrement()
|
||||
self.assertEqual("page.views:-1|c|#{'env': 'test'}",
|
||||
self.recv(counter))
|
||||
|
||||
counter -= 1
|
||||
self.assertEqual("page.views:-1|c|#{'env': 'test'}",
|
||||
self.recv(counter))
|
||||
|
||||
counter.decrement(12)
|
||||
self.assertEqual("page.views:-12|c|#{'env': 'test'}",
|
||||
self.recv(counter))
|
||||
|
||||
counter -= 12
|
||||
self.assertEqual("page.views:-12|c|#{'env': 'test'}",
|
||||
self.recv(counter))
|
||||
|
||||
def test_counter_with_dimensions(self):
|
||||
self.monascastatsd.increment('ct',
|
||||
dimensions={'country': 'canada',
|
||||
'color': 'red'})
|
||||
self.assertEqual("ct:1|c|#{'color': 'red', 'country': 'canada'}",
|
||||
self.recv())
|
||||
counter = self.client.get_counter('counter_with_dims',
|
||||
dimensions={'date': '10/24', 'time': '23:00'})
|
||||
|
||||
counter.increment(dimensions={'country': 'canada', 'color': 'red'})
|
||||
self.assertEqual("counter_with_dims:1|c|#{'date': '10/24', 'color': 'red', " +
|
||||
"'country': 'canada', 'env': 'test', 'time': '23:00'}",
|
||||
self.recv(counter))
|
||||
|
||||
counter += 1
|
||||
self.assertEqual("counter_with_dims:1|c|#{'date': '10/24', 'env': 'test', 'time': '23:00'}",
|
||||
self.recv(counter))
|
||||
|
||||
def test_set(self):
|
||||
set = self.client.get_set('set')
|
||||
set.send('metric', 123)
|
||||
assert self.recv(set) == "set.metric:123|s|#{'env': 'test'}"
|
||||
|
||||
def test_gauge(self):
|
||||
gauge = self.client.get_gauge('gauge')
|
||||
gauge.send('metric', 123.4)
|
||||
assert self.recv(gauge) == "gauge.metric:123.4|g|#{'env': 'test'}"
|
||||
|
||||
def test_histogram(self):
|
||||
histogram = self.client.get_histogram('histogram')
|
||||
|
||||
histogram.send('metric', 123.4)
|
||||
self.assertEqual("histogram.metric:123.4|h|#{'env': 'test'}", self.recv(histogram))
|
||||
|
||||
def test_gauge_with_dimensions(self):
|
||||
gauge = self.client.get_gauge('gauge')
|
||||
gauge.send('gt', 123.4,
|
||||
dimensions={'country': 'china',
|
||||
'age': 45,
|
||||
'color': 'blue'})
|
||||
self.assertEqual("gauge.gt:123.4|g|#{" +
|
||||
"'color': 'blue', " +
|
||||
"'country': 'china', " +
|
||||
"'age': 45, " +
|
||||
"'env': 'test'}",
|
||||
self.recv(gauge))
|
||||
|
||||
def test_histogram_with_dimensions(self):
|
||||
self.monascastatsd.histogram('h', 1, dimensions={'color': 'red'})
|
||||
self.assertEqual("h:1|h|#{'color': 'red'}", self.recv())
|
||||
histogram = self.client.get_histogram('my_hist')
|
||||
histogram.send('h', 1, dimensions={'color': 'red'})
|
||||
self.assertEqual("my_hist.h:1|h|#{'color': 'red', 'env': 'test'}", self.recv(histogram))
|
||||
|
||||
def test_sample_rate(self):
|
||||
self.monascastatsd.increment('c', sample_rate=0)
|
||||
assert not self.recv()
|
||||
counter = self.client.get_counter('sampled_counter')
|
||||
counter.increment(sample_rate=0)
|
||||
assert not self.recv(counter)
|
||||
for _ in range(10000):
|
||||
self.monascastatsd.increment('sampled_counter', sample_rate=0.3)
|
||||
counter.increment(sample_rate=0.3)
|
||||
self.assert_almost_equal(3000,
|
||||
len(self.monascastatsd.socket.payloads),
|
||||
len(self.client.connection.socket.payloads),
|
||||
150)
|
||||
self.assertEqual('sampled_counter:1|c|@0.3', self.recv())
|
||||
self.assertEqual("sampled_counter:1|c|@0.3|#{'env': 'test'}", self.recv(counter))
|
||||
|
||||
def test_samples_with_dimensions(self):
|
||||
gauge = self.client.get_gauge()
|
||||
for _ in range(100):
|
||||
self.monascastatsd.gauge('gst',
|
||||
23,
|
||||
dimensions={'status': 'sampled'},
|
||||
sample_rate=0.9)
|
||||
gauge.send('gst',
|
||||
23,
|
||||
dimensions={'status': 'sampled'},
|
||||
sample_rate=0.9)
|
||||
|
||||
def test_samples_with_dimensions(self):
|
||||
for _ in range(100):
|
||||
self.monascastatsd.gauge('gst',
|
||||
23,
|
||||
dimensions={'status': 'sampled'},
|
||||
sample_rate=0.9)
|
||||
gauge.send('gst',
|
||||
23,
|
||||
dimensions={'status': 'sampled'},
|
||||
sample_rate=0.9)
|
||||
self.assertEqual('gst:23|g|@0.9|#status:sampled')
|
||||
|
||||
def test_timing(self):
|
||||
self.monascastatsd.timing('t', 123)
|
||||
self.assertEqual('t:123|ms', self.recv())
|
||||
timer = self.client.get_timer()
|
||||
timer.timing('t', 123)
|
||||
self.assertEqual("t:123|ms|#{'env': 'test'}", self.recv(timer))
|
||||
|
||||
@staticmethod
|
||||
def assert_almost_equal(a, b, delta):
|
||||
assert 0 <= abs(a - b) <= delta, "%s - %s not within %s" % (a,
|
||||
b,
|
||||
delta)
|
||||
def test_time(self):
|
||||
timer = self.client.get_timer()
|
||||
with timer.time('t'):
|
||||
time.sleep(2)
|
||||
packet = self.recv(timer)
|
||||
name_value, type_, dimensions = packet.split('|')
|
||||
name, value = name_value.split(':')
|
||||
|
||||
def test_socket_error(self):
|
||||
self.monascastatsd.socket = BrokenSocket()
|
||||
self.monascastatsd.gauge('no error', 1)
|
||||
assert True, 'success'
|
||||
self.assertEqual('ms', type_)
|
||||
self.assertEqual('t', name)
|
||||
self.assert_almost_equal(2.0, float(value), 0.1)
|
||||
self.assertEqual("{'env': 'test'}", dimensions.lstrip('#'))
|
||||
|
||||
def test_timed(self):
|
||||
timer = self.client.get_timer()
|
||||
|
||||
@self.monascastatsd.timed('timed.test')
|
||||
@timer.timed('timed.test')
|
||||
def func(a, b, c=1, d=1):
|
||||
"""docstring."""
|
||||
time.sleep(0.5)
|
||||
|
@ -161,42 +203,61 @@ class TestMonStatsd(unittest.TestCase):
|
|||
# Assert it handles args and kwargs correctly.
|
||||
self.assertEqual(result, (1, 2, 1, 3))
|
||||
|
||||
packet = self.recv()
|
||||
name_value, type_ = packet.split('|')
|
||||
packet = self.recv(timer)
|
||||
name_value, type_, dimensions = packet.split('|')
|
||||
name, value = name_value.split(':')
|
||||
|
||||
self.assertEqual('ms', type_)
|
||||
self.assertEqual('timed.test', name)
|
||||
self.assert_almost_equal(0.5, float(value), 0.1)
|
||||
self.assertEqual("{'env': 'test'}", dimensions.lstrip('#'))
|
||||
|
||||
def test_socket_error(self):
|
||||
self.client.connection.socket = BrokenSocket()
|
||||
self.client.get_gauge().send('no error', 1)
|
||||
assert True, 'success'
|
||||
self.client.connection.socket = FakeSocket()
|
||||
|
||||
def test_batched(self):
|
||||
self.monascastatsd.open_buffer()
|
||||
self.monascastatsd.gauge('page.views', 123)
|
||||
self.monascastatsd.timing('timer', 123)
|
||||
self.monascastatsd.close_buffer()
|
||||
self.client.connection.open_buffer()
|
||||
gauge = self.client.get_gauge('site')
|
||||
gauge.send('views', 123)
|
||||
timer = self.client.get_timer('site')
|
||||
timer.timing('timer', 123)
|
||||
self.client.connection.close_buffer()
|
||||
|
||||
self.assertEqual('page.views:123|g\ntimer:123|ms', self.recv())
|
||||
self.assertEqual("site.views:123|g|#{'env': 'test'}\nsite.timer:123|ms|#{'env': 'test'}",
|
||||
self.recv(gauge))
|
||||
|
||||
def test_context_manager(self):
|
||||
fake_socket = FakeSocket()
|
||||
with monascastatsd.monasca_statsd.MonascaStatsd() as statsd:
|
||||
statsd.socket = fake_socket
|
||||
statsd.gauge('page.views', 123)
|
||||
statsd.timing('timer', 123)
|
||||
with mstatsd.Connection() as conn:
|
||||
conn.socket = fake_socket
|
||||
client = mstatsd.Client(name='ContextTester', connection=conn)
|
||||
client.get_gauge('page').send('views', 123)
|
||||
client.get_timer('page').timing('timer', 12)
|
||||
|
||||
self.assertEqual('page.views:123|g\ntimer:123|ms', fake_socket.recv())
|
||||
self.assertEqual('ContextTester.page.views:123|g\nContextTester.page.timer:12|ms',
|
||||
fake_socket.recv())
|
||||
|
||||
def test_batched_buffer_autoflush(self):
|
||||
fake_socket = FakeSocket()
|
||||
with monascastatsd.monasca_statsd.MonascaStatsd() as statsd:
|
||||
statsd.socket = fake_socket
|
||||
with mstatsd.Connection() as conn:
|
||||
conn.socket = fake_socket
|
||||
client = mstatsd.Client(name='BufferedTester', connection=conn)
|
||||
counter = client.get_counter('mycounter')
|
||||
for _ in range(51):
|
||||
statsd.increment('mycounter')
|
||||
self.assertEqual('\n'.join(['mycounter:1|c' for _ in range(50)]),
|
||||
counter.increment()
|
||||
self.assertEqual('\n'.join(['BufferedTester.mycounter:1|c' for _ in range(50)]),
|
||||
fake_socket.recv())
|
||||
|
||||
self.assertEqual('mycounter:1|c', fake_socket.recv())
|
||||
self.assertEqual('BufferedTester.mycounter:1|c', fake_socket.recv())
|
||||
|
||||
@staticmethod
|
||||
def assert_almost_equal(a, b, delta):
|
||||
assert 0 <= abs(a - b) <= delta, "%s - %s not within %s" % (a,
|
||||
b,
|
||||
delta)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Loading…
Reference in New Issue