monasca-agent/monagent/monstatsd/udp.py

202 lines
7.1 KiB
Python

import logging
import select
import socket
from monagent.common.config import initialize_logging
initialize_logging('monstatsd')
log = logging.getLogger('monstatsd')
UDP_SOCKET_TIMEOUT = 5
class Server(object):
"""
A statsd udp server.
"""
def __init__(self, aggregator, host, port, forward_to_host=None, forward_to_port=None):
self.host = host
self.port = int(port)
self.address = (self.host, self.port)
self.aggregator = aggregator
self.buffer_size = 1024 * 8
self.running = False
self.should_forward = forward_to_host is not None
self.forward_udp_sock = None
# In case we want to forward every packet received to another statsd server
if self.should_forward:
if forward_to_port is None:
forward_to_port = 8125
log.info(
"External statsd forwarding enabled. All packets received will be forwarded to %s:%s" %
(forward_to_host, forward_to_port))
try:
self.forward_udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.forward_udp_sock.connect((forward_to_host, forward_to_port))
except Exception:
log.exception("Error while setting up connection to external statsd server")
@staticmethod
def _parse_event_packet(packet):
try:
name_and_metadata = packet.split(':', 1)
if len(name_and_metadata) != 2:
raise Exception(u'Unparseable event packet: %s' % packet)
# Event syntax:
# _e{5,4}:title|body|meta
name = name_and_metadata[0]
metadata = unicode(name_and_metadata[1])
title_length, text_length = name.split(',')
title_length = int(title_length[3:])
text_length = int(text_length[:-1])
event = {
'title': metadata[
:title_length],
'text': (
metadata[
title_length +
1:title_length +
text_length +
1]).replace(
'\\n',
'\n')}
meta = metadata[title_length + text_length + 1:]
for m in meta.split('|')[1:]:
if m[0] == u't':
event['alert_type'] = m[2:]
elif m[0] == u'k':
event['aggregation_key'] = m[2:]
elif m[0] == u's':
event['source_type_name'] = m[2:]
elif m[0] == u'd':
event['date_happened'] = int(m[2:])
elif m[0] == u'p':
event['priority'] = m[2:]
elif m[0] == u'h':
event['hostname'] = m[2:]
elif m[0] == u'#':
event['dimensions'] = sorted(m[1:].split(u','))
return event
except IndexError:
raise Exception(u'Unparseable event packet: %s' % packet)
@staticmethod
def _parse_metric_packet(packet):
name_and_metadata = packet.split(':', 1)
if len(name_and_metadata) != 2:
raise Exception('Unparseable metric packet: %s' % packet)
name = name_and_metadata[0]
metadata = name_and_metadata[1].split('|')
if len(metadata) < 2:
raise Exception('Unparseable metric packet: %s' % packet)
# Submit the metric
raw_value = metadata[0]
metric_type = metadata[1]
if metric_type == 's':
value = raw_value
else:
# Try to cast as an int first to avoid precision issues, then as a
# float.
try:
value = int(raw_value)
except ValueError:
try:
value = float(raw_value)
except ValueError:
# Otherwise, raise an error saying it must be a number
raise Exception('Metric value must be a number: %s, %s' % (name, raw_value))
# Parse the optional values - sample rate & dimensions.
sample_rate = 1
dimensions = None
for m in metadata[2:]:
# Parse the sample rate
if m[0] == '@':
sample_rate = float(m[1:])
assert 0 <= sample_rate <= 1
elif m[0] == '#':
dim_list = [dim.split(':') for dim in m[1:].split(',')]
dimensions = {key.strip(): value.strip() for key, value in dim_list}
return name, value, metric_type, dimensions, sample_rate
def submit_packets(self, packets):
for packet in packets.split("\n"):
if not packet.strip():
continue
if packet.startswith('_e'):
event = self._parse_event_packet(packet)
# todo it seems like this count should be done in the event method
self.aggregator.event_count += 1
self.aggregator.event(**event)
else:
# todo it seems like this count should be done in the submit_metric method
self.aggregator.count += 1
name, value, mtype, dimensions, sample_rate = self._parse_metric_packet(packet)
self.aggregator.submit_metric(
name, value, mtype, dimensions=dimensions, sample_rate=sample_rate)
def start(self):
""" Run the server. """
# Bind to the UDP socket.
# IPv4 only
open_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
open_socket.setblocking(0)
try:
open_socket.bind(self.address)
except socket.gaierror:
if self.address[0] == 'localhost':
log.warning(
"Warning localhost seems undefined in your host file, using 127.0.0.1 instead")
self.address = ('127.0.0.1', self.address[1])
open_socket.bind(self.address)
log.info('Listening on host & port: %s' % str(self.address))
# Inline variables for quick look-up.
buffer_size = self.buffer_size
sock = [open_socket]
socket_recv = open_socket.recv
select_select = select.select
select_error = select.error
timeout = UDP_SOCKET_TIMEOUT
should_forward = self.should_forward
forward_udp_sock = self.forward_udp_sock
# Run our select loop.
self.running = True
while self.running:
try:
ready = select_select(sock, [], [], timeout)
if ready[0]:
message = socket_recv(buffer_size)
self.submit_packets(message)
if should_forward:
forward_udp_sock.send(message)
except select_error as se:
# Ignore interrupted system calls from sigterm.
errno = se[0]
if errno != 4:
raise
except (KeyboardInterrupt, SystemExit):
break
except Exception:
log.exception('Error receiving datagram')
def stop(self):
self.running = False