diff --git a/agent.conf.example b/agent.conf.example index 55b53789..6198ee2b 100644 --- a/agent.conf.example +++ b/agent.conf.example @@ -41,8 +41,8 @@ use_mount: no # Change port the Agent is listening to # listen_port: 17123 -# Additional directory to look for Datadog checks -# additional_checksd: /etc/dd-agent/checks.d/ +# Additional directory to look for checks +# additional_checksd: /etc/mon-agent/checks.d/ # Allow non-local traffic to this Agent # This is required when using this Agent as a proxy for other Agents @@ -52,25 +52,24 @@ use_mount: no # non_local_traffic: no # ========================================================================== # -# DogStatsd configuration # +# MonStatsd configuration # # ========================================================================== # -# DogStatsd is a small server that aggregates your custom app metrics. For -# usage information, check out http://api.datadoghq.com +# MonStatsd is a small server that aggregates your custom app metrics. # Make sure your client is sending to the same port. -dogstatsd_port : 8125 +monstatsd_port : 8125 -## The dogstatsd flush period. -# dogstatsd_interval : 10 +## The monstatsd flush period. +# monstatsd_interval : 10 ## If 'yes', counters and rates will be normalized to 1 second (that is divided -## by the dogstatsd_interval) before being sent to the server. Defaults to 'yes' -# dogstatsd_normalize : yes +## by the monstatsd_interval) before being sent to the server. Defaults to 'yes' +# monstatsd_normalize : yes -# If you want to forward every packet received by the dogstatsd server +# If you want to forward every packet received by the monstatsd server # to another statsd server, uncomment these lines. -# WARNING: Make sure that forwarded packets are regular statsd packets and not "dogstatsd" packets, +# WARNING: Make sure that forwarded packets are regular statsd packets and not "monstatsd" packets, # as your other statsd server might not be able to handle them. # statsd_forward_host: address_of_own_statsd_server # statsd_forward_port: 8125 @@ -85,7 +84,7 @@ dogstatsd_port : 8125 # Some infrastrucures have many constantly changing virtual devices (e.g. folks # running constantly churning linux containers) whose metrics aren't -# interesting for datadog. To filter out a particular pattern of devices +# interesting. To filter out a particular pattern of devices # from collection, configure a regex here: # device_blacklist_re: .*\/dev\/mapper\/lxc-box.* diff --git a/monagent/collector/jmxfetch.py b/monagent/collector/jmxfetch.py index 84bc24fd..4347d904 100644 --- a/monagent/collector/jmxfetch.py +++ b/monagent/collector/jmxfetch.py @@ -12,7 +12,6 @@ import subprocess import tempfile import time -# datadog from monagent.common.util import PidFile, get_os log = logging.getLogger(__name__) @@ -289,7 +288,7 @@ class JMXFetch(object): @classmethod def start(cls, confd_path, agentConfig, logging_config, path_to_java, java_run_opts, default_check_frequency, jmx_checks, command, reporter=None): - statsd_port = agentConfig.get('dogstatsd_port', "8125") + statsd_port = agentConfig.get('monstatsd_port', "8125") if reporter is None: reporter = "statsd:%s" % str(statsd_port) diff --git a/monagent/common/aggregator.py b/monagent/common/aggregator.py index 7e37b64c..be96e56c 100644 --- a/monagent/common/aggregator.py +++ b/monagent/common/aggregator.py @@ -1,4 +1,4 @@ -""" Aggregation classes used by the collector and dogstatsd to batch messages sent to the forwarder. +""" Aggregation classes used by the collector and monstatsd to batch messages sent to the forwarder. """ import logging from time import time diff --git a/monagent/common/check_status.py b/monagent/common/check_status.py index 8b690094..4ffe90e7 100644 --- a/monagent/common/check_status.py +++ b/monagent/common/check_status.py @@ -512,9 +512,9 @@ class CollectorStatus(AgentStatus): return status_info -class DogstatsdStatus(AgentStatus): +class MonstatsdStatus(AgentStatus): - NAME = 'Dogstatsd' + NAME = 'Monstatsd' def __init__(self, flush_count=0, packet_count=0, packets_per_second=0, metric_count=0, event_count=0): AgentStatus.__init__(self) diff --git a/monagent/common/config.py b/monagent/common/config.py index 02b568e4..bb616c02 100644 --- a/monagent/common/config.py +++ b/monagent/common/config.py @@ -23,7 +23,7 @@ except ImportError: from yaml import Loader # project -from util import get_os, Platform +from util import get_os from monagent.collector.jmxfetch import JMXFetch, JMX_COLLECT_COMMAND # CONSTANTS @@ -188,10 +188,10 @@ def get_config(parse_args=True, cfg_path=None, options=None): # General config agent_config = { 'check_freq': DEFAULT_CHECK_FREQUENCY, - 'dogstatsd_interval': DEFAULT_STATSD_FREQUENCY, - 'dogstatsd_agregator_bucket_size': DEFAULT_STATSD_BUCKET_SIZE, - 'dogstatsd_normalize': 'yes', - 'dogstatsd_port': 8125, + 'monstatsd_interval': DEFAULT_STATSD_FREQUENCY, + 'monstatsd_agregator_bucket_size': DEFAULT_STATSD_BUCKET_SIZE, + 'monstatsd_normalize': 'yes', + 'monstatsd_port': 8125, 'forwarder_url': 'http://localhost:17123', 'hostname': None, 'listen_port': None, @@ -200,8 +200,8 @@ def get_config(parse_args=True, cfg_path=None, options=None): 'additional_checksd': '/etc/mon-agent/checks.d/', } - dogstatsd_interval = DEFAULT_STATSD_FREQUENCY - dogstatsd_agregator_bucket_size = DEFAULT_STATSD_BUCKET_SIZE + monstatsd_interval = DEFAULT_STATSD_FREQUENCY + monstatsd_agregator_bucket_size = DEFAULT_STATSD_BUCKET_SIZE # Config handling try: @@ -254,14 +254,14 @@ def get_config(parse_args=True, cfg_path=None, options=None): if config.get('Main', 'watchdog').lower() in ('no', 'false'): agent_config['watchdog'] = False - # Dogstatsd config - dogstatsd_defaults = { - 'dogstatsd_port': 8125, - 'dogstatsd_interval': dogstatsd_interval, - 'dogstatsd_agregator_bucket_size': dogstatsd_agregator_bucket_size, - 'dogstatsd_normalize': 'yes', + # monstatsd config + monstatsd_defaults = { + 'monstatsd_port': 8125, + 'monstatsd_interval': monstatsd_interval, + 'monstatsd_agregator_bucket_size': monstatsd_agregator_bucket_size, + 'monstatsd_normalize': 'yes', } - for key, value in dogstatsd_defaults.iteritems(): + for key, value in monstatsd_defaults.iteritems(): if config.has_option('Main', key): agent_config[key] = config.get('Main', key) else: @@ -274,7 +274,7 @@ def get_config(parse_args=True, cfg_path=None, options=None): agent_config['statsd_forward_port'] = int(config.get('Main', 'statsd_forward_port')) # normalize 'yes'/'no' to boolean - dogstatsd_defaults['dogstatsd_normalize'] = _is_affirmative(dogstatsd_defaults['dogstatsd_normalize']) + monstatsd_defaults['monstatsd_normalize'] = _is_affirmative(monstatsd_defaults['monstatsd_normalize']) # Optional config # FIXME not the prettiest code ever... @@ -365,7 +365,7 @@ def set_win32_cert_path(): def get_proxy(agent_config, use_system_settings=False): proxy_settings = {} - # First we read the proxy configuration from datadog.conf + # First we read the proxy configuration from agent.conf proxy_host = agent_config.get('proxy_host', None) if proxy_host is not None and not use_system_settings: proxy_settings['host'] = proxy_host @@ -381,7 +381,7 @@ def get_proxy(agent_config, use_system_settings=False): log.debug("Proxy Settings: %s:%s@%s:%s" % (proxy_settings['user'], "*****", proxy_settings['host'], proxy_settings['port'])) return proxy_settings - # If no proxy configuration was specified in datadog.conf + # If no proxy configuration was specified in agent.conf # We try to read it from the system settings try: import urllib @@ -582,7 +582,7 @@ def load_check_directory(agent_config): if not check_config: continue d = [ - "Configuring %s in datadog.conf is deprecated." % (check_name), + "Configuring %s in agent.conf is deprecated." % (check_name), "Please use conf.d. In a future release, support for the", "old style of configuration will be dropped.", ] @@ -659,10 +659,10 @@ def get_logging_config(cfg_path=None): if system_os != 'windows': logging_config = { 'log_level': None, - 'collector_log_file': '/var/log/datadog/collector.log', - 'forwarder_log_file': '/var/log/datadog/forwarder.log', - 'dogstatsd_log_file': '/var/log/datadog/dogstatsd.log', - 'jmxfetch_log_file': '/var/log/datadog/jmxfetch.log', + 'collector_log_file': '/var/log/mon-agent/collector.log', + 'forwarder_log_file': '/var/log/mon-agent/forwarder.log', + 'monstatsd_log_file': '/var/log/mon-agent/monstatsd.log', + 'jmxfetch_log_file': '/var/log/mon-agent/jmxfetch.log', 'log_to_event_viewer': False, 'log_to_syslog': True, 'syslog_host': None, @@ -692,7 +692,7 @@ def get_logging_config(cfg_path=None): config_example_file = "https://github.com/DataDog/dd-agent/blob/master/datadog.conf.example" sys.stderr.write("""Python logging config is no longer supported and will be ignored. - To configure logging, update the logging portion of 'datadog.conf' to match: + To configure logging, update the logging portion of 'agent.conf' to match: '%s'. """ % config_example_file) @@ -740,7 +740,6 @@ def get_logging_config(cfg_path=None): return logging_config - def initialize_logging(logger_name): global windows_file_handler_added try: diff --git a/monagent/monstatsd/__init__.py b/monagent/monstatsd/__init__.py old mode 100644 new mode 100755 index 69bb1c24..63f44342 --- a/monagent/monstatsd/__init__.py +++ b/monagent/monstatsd/__init__.py @@ -1 +1,412 @@ -__author__ = 'kuhlmant' +#!/usr/bin/env python +""" +A Python Statsd implementation with dimensions added +""" + +# set up logging before importing any other components +from monagent.common.config import initialize_logging +initialize_logging('monstatsd') + +import os +os.umask(022) + +# stdlib +import httplib as http_client +import json +import logging +import optparse +import re +import select +import signal +import socket +import sys +from time import time +import threading +from urllib import urlencode + +# project +from monagent.common.aggregator import MetricsBucketAggregator +from monagent.common.check_status import MonstatsdStatus +from monagent.common.config import get_config +from monagent.common.daemon import Daemon, AgentSupervisor +from monagent.common.util import PidFile, get_hostname, plural, get_uuid, chunks + +log = logging.getLogger('monstatsd') + + +WATCHDOG_TIMEOUT = 120 +UDP_SOCKET_TIMEOUT = 5 +# Since we call flush more often than the metrics aggregation interval, we should +# log a bunch of flushes in a row every so often. +FLUSH_LOGGING_PERIOD = 70 +FLUSH_LOGGING_INITIAL = 10 +FLUSH_LOGGING_COUNT = 5 +EVENT_CHUNK_SIZE = 50 + + +def serialize_metrics(metrics): + return json.dumps({"series": metrics}) + + +def serialize_event(event): + return json.dumps(event) + + +class Reporter(threading.Thread): + """ + The reporter periodically sends the aggregated metrics to the + server. + """ + + def __init__(self, interval, metrics_aggregator, api_host, use_watchdog=False, event_chunk_size=None): + threading.Thread.__init__(self) + self.interval = int(interval) + self.finished = threading.Event() + self.metrics_aggregator = metrics_aggregator + self.flush_count = 0 + self.log_count = 0 + + self.watchdog = None + if use_watchdog: + from monagent.common.util import Watchdog + self.watchdog = Watchdog(WATCHDOG_TIMEOUT) + + self.api_host = api_host + self.event_chunk_size = event_chunk_size or EVENT_CHUNK_SIZE + + self.http_conn_cls = http_client.HTTPSConnection + + match = re.match('^(https?)://(.*)', api_host) + + if match: + self.api_host = match.group(2) + if match.group(1) == 'http': + self.http_conn_cls = http_client.HTTPConnection + + def stop(self): + log.info("Stopping reporter") + self.finished.set() + + def run(self): + + log.info("Reporting to %s every %ss" % (self.api_host, self.interval)) + log.debug("Watchdog enabled: %s" % bool(self.watchdog)) + + # Persist a start-up message. + MonstatsdStatus().persist() + + while not self.finished.isSet(): # Use camel case isSet for 2.4 support. + self.finished.wait(self.interval) + self.flush() + if self.watchdog: + self.watchdog.reset() + + # Clean up the status messages. + log.debug("Stopped reporter") + MonstatsdStatus.remove_latest_status() + + def flush(self): + try: + self.flush_count += 1 + self.log_count += 1 + packets_per_second = self.metrics_aggregator.packets_per_second(self.interval) + packet_count = self.metrics_aggregator.total_count + + metrics = self.metrics_aggregator.flush() + count = len(metrics) + if self.flush_count % FLUSH_LOGGING_PERIOD == 0: + self.log_count = 0 + if count: + self.submit(metrics) + + events = self.metrics_aggregator.flush_events() + event_count = len(events) + if event_count: + self.submit_events(events) + + should_log = self.flush_count <= FLUSH_LOGGING_INITIAL or self.log_count <= FLUSH_LOGGING_COUNT + log_func = log.info + if not should_log: + log_func = log.debug + log_func("Flush #%s: flushed %s metric%s and %s event%s" % (self.flush_count, count, plural(count), event_count, plural(event_count))) + if self.flush_count == FLUSH_LOGGING_INITIAL: + log.info("First flushes done, %s flushes will be logged every %s flushes." % (FLUSH_LOGGING_COUNT, FLUSH_LOGGING_PERIOD)) + + # Persist a status message. + packet_count = self.metrics_aggregator.total_count + MonstatsdStatus( + flush_count=self.flush_count, + packet_count=packet_count, + packets_per_second=packets_per_second, + metric_count=count, + event_count=event_count, + ).persist() + + except Exception, e: + log.exception("Error flushing metrics") + + def submit(self, metrics): + # Copy and pasted from dogapi, because it's a bit of a pain to distribute python + # dependencies with the agent. + body = serialize_metrics(metrics) + headers = {'Content-Type':'application/json'} + method = 'POST' + + params = {} + url = '/api/v1/series?%s' % urlencode(params) + + start_time = time() + status = None + conn = self.http_conn_cls(self.api_host) + try: + conn.request(method, url, body, headers) + + #FIXME: add timeout handling code here + + response = conn.getresponse() + status = response.status + response.close() + finally: + conn.close() + duration = round((time() - start_time) * 1000.0, 4) + log.debug("%s %s %s%s (%sms)" % ( + status, method, self.api_host, url, duration)) + return duration + + def submit_events(self, events): + headers = {'Content-Type':'application/json'} + method = 'POST' + + events_len = len(events) + event_chunk_size = self.event_chunk_size + + for chunk in chunks(events, event_chunk_size): + payload = { + 'events': { + 'api': chunk + }, + 'uuid': get_uuid(), + 'hostname': get_hostname() + } + params = {} + url = '/intake?%s' % urlencode(params) + + status = None + conn = self.http_conn_cls(self.api_host) + try: + start_time = time() + conn.request(method, url, json.dumps(payload), headers) + + response = conn.getresponse() + status = response.status + response.close() + duration = round((time() - start_time) * 1000.0, 4) + log.debug("%s %s %s%s (%sms)" % ( + status, method, self.api_host, url, duration)) + + finally: + conn.close() + + +class Server(object): + """ + A statsd udp server. + """ + + def __init__(self, metrics_aggregator, host, port, forward_to_host=None, forward_to_port=None): + self.host = host + self.port = int(port) + self.address = (self.host, self.port) + self.metrics_aggregator = metrics_aggregator + self.buffer_size = 1024 * 8 + + self.running = False + + self.should_forward = forward_to_host is not None + + self.forward_udp_sock = None + # In case we want to forward every packet received to another statsd server + if self.should_forward: + if forward_to_port is None: + forward_to_port = 8125 + + log.info("External statsd forwarding enabled. All packets received will be forwarded to %s:%s" % (forward_to_host, forward_to_port)) + try: + self.forward_udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.forward_udp_sock.connect((forward_to_host, forward_to_port)) + except Exception, e: + log.exception("Error while setting up connection to external statsd server") + + def start(self): + """ Run the server. """ + # Bind to the UDP socket. + # IPv4 only + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.socket.setblocking(0) + try: + self.socket.bind(self.address) + except socket.gaierror: + if self.address[0] == 'localhost': + log.warning("Warning localhost seems undefined in your host file, using 127.0.0.1 instead") + self.address = ('127.0.0.1', self.address[1]) + self.socket.bind(self.address) + + log.info('Listening on host & port: %s' % str(self.address)) + + # Inline variables for quick look-up. + buffer_size = self.buffer_size + #todo dogstatsd is the only thing using this method of the aggregator, is there a more standard way to do it? + aggregator_submit = self.metrics_aggregator.submit_packets + sock = [self.socket] + socket_recv = self.socket.recv + select_select = select.select + select_error = select.error + timeout = UDP_SOCKET_TIMEOUT + should_forward = self.should_forward + forward_udp_sock = self.forward_udp_sock + + # Run our select loop. + self.running = True + while self.running: + try: + ready = select_select(sock, [], [], timeout) + if ready[0]: + message = socket_recv(buffer_size) + aggregator_submit(message) + + if should_forward: + forward_udp_sock.send(message) + except select_error, se: + # Ignore interrupted system calls from sigterm. + errno = se[0] + if errno != 4: + raise + except (KeyboardInterrupt, SystemExit): + break + except Exception, e: + log.exception('Error receiving datagram') + + def stop(self): + self.running = False + + +class Monstatsd(Daemon): + """ This class is the monstatsd daemon. """ + + def __init__(self, pid_file, server, reporter, autorestart): + Daemon.__init__(self, pid_file, autorestart=autorestart) + self.server = server + self.reporter = reporter + + + def _handle_sigterm(self, signum, frame): + log.debug("Caught sigterm. Stopping run loop.") + self.server.stop() + + def run(self): + # Gracefully exit on sigterm. + signal.signal(signal.SIGTERM, self._handle_sigterm) + + # Handle Keyboard Interrupt + signal.signal(signal.SIGINT, self._handle_sigterm) + + # Start the reporting thread before accepting data + self.reporter.start() + + try: + try: + self.server.start() + except Exception, e: + log.exception('Error starting server') + raise e + finally: + # The server will block until it's done. Once we're here, shutdown + # the reporting thread. + self.reporter.stop() + self.reporter.join() + log.info("Monstatsd is stopped") + # Restart if asked to restart + if self.autorestart: + sys.exit(AgentSupervisor.RESTART_EXIT_STATUS) + + def info(self): + logging.getLogger().setLevel(logging.ERROR) + return MonstatsdStatus.print_latest_status() + + +def init(config_path=None, use_watchdog=False): + """Configure the server and the reporting thread. + """ + c = get_config(parse_args=False, cfg_path=config_path) + log.debug("Configuration monstatsd") + + port = c['monstatsd_port'] + interval = int(c['monstatsd_interval']) + aggregator_interval = int(c['monstatsd_agregator_bucket_size']) + non_local_traffic = c['non_local_traffic'] + forward_to_host = c.get('statsd_forward_host') + forward_to_port = c.get('statsd_forward_port') + event_chunk_size = c.get('event_chunk_size') + + target = c['forwarder_url'] + + hostname = get_hostname(c) + + # Create the aggregator (which is the point of communication between the + # server and reporting threads. + assert 0 < interval + + aggregator = MetricsBucketAggregator(hostname, aggregator_interval, recent_point_threshold=c.get('recent_point_threshold', None)) + + # Start the reporting thread. + reporter = Reporter(interval, aggregator, target, use_watchdog, event_chunk_size) + + # Start the server on an IPv4 stack + # Default to loopback + server_host = 'localhost' + # If specified, bind to all addressses + if non_local_traffic: + server_host = '' + + server = Server(aggregator, server_host, port, forward_to_host=forward_to_host, forward_to_port=forward_to_port) + + return reporter, server, c + +def main(config_path=None): + """ The main entry point for the unix version of monstatsd. """ + parser = optparse.OptionParser("%prog [start|stop|restart|status]") + opts, args = parser.parse_args() + + reporter, server, cnf = init(config_path, use_watchdog=True) + pid_file = PidFile('monstatsd') + daemon = Monstatsd(pid_file.get_path(), server, reporter, + cnf.get('autorestart', False)) + + # If no args were passed in, run the server in the foreground. + if not args: + daemon.run() + return 0 + + # Otherwise, we're process the deamon command. + else: + command = args[0] + + if command == 'start': + daemon.start() + elif command == 'stop': + daemon.stop() + elif command == 'restart': + daemon.restart() + elif command == 'status': + daemon.status() + elif command == 'info': + return daemon.info() + else: + sys.stderr.write("Unknown command: %s\n\n" % command) + parser.print_help() + return 1 + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/monagent/monstatsd/dogstatsd.py b/monagent/monstatsd/dogstatsd.py deleted file mode 100755 index a827d8f2..00000000 --- a/monagent/monstatsd/dogstatsd.py +++ /dev/null @@ -1,412 +0,0 @@ -#!/usr/bin/env python -""" -A Python Statsd implementation with some datadog special sauce. -""" - -# set up logging before importing any other components -from monagent.common.config import initialize_logging -initialize_logging('dogstatsd') - -import os -os.umask(022) - -# stdlib -import httplib as http_client -import json -import logging -import optparse -import re -import select -import signal -import socket -import sys -from time import time -import threading -from urllib import urlencode - -# project -from monagent.common.aggregator import MetricsBucketAggregator -from common.check_status import DogstatsdStatus -from monagent.common.config import get_config -from common.daemon import Daemon, AgentSupervisor -from common.util import PidFile, get_hostname, plural, get_uuid, chunks - -log = logging.getLogger('dogstatsd') - - -WATCHDOG_TIMEOUT = 120 -UDP_SOCKET_TIMEOUT = 5 -# Since we call flush more often than the metrics aggregation interval, we should -# log a bunch of flushes in a row every so often. -FLUSH_LOGGING_PERIOD = 70 -FLUSH_LOGGING_INITIAL = 10 -FLUSH_LOGGING_COUNT = 5 -EVENT_CHUNK_SIZE = 50 - - -def serialize_metrics(metrics): - return json.dumps({"series": metrics}) - - -def serialize_event(event): - return json.dumps(event) - - -class Reporter(threading.Thread): - """ - The reporter periodically sends the aggregated metrics to the - server. - """ - - def __init__(self, interval, metrics_aggregator, api_host, use_watchdog=False, event_chunk_size=None): - threading.Thread.__init__(self) - self.interval = int(interval) - self.finished = threading.Event() - self.metrics_aggregator = metrics_aggregator - self.flush_count = 0 - self.log_count = 0 - - self.watchdog = None - if use_watchdog: - from common.util import Watchdog - self.watchdog = Watchdog(WATCHDOG_TIMEOUT) - - self.api_host = api_host - self.event_chunk_size = event_chunk_size or EVENT_CHUNK_SIZE - - self.http_conn_cls = http_client.HTTPSConnection - - match = re.match('^(https?)://(.*)', api_host) - - if match: - self.api_host = match.group(2) - if match.group(1) == 'http': - self.http_conn_cls = http_client.HTTPConnection - - def stop(self): - log.info("Stopping reporter") - self.finished.set() - - def run(self): - - log.info("Reporting to %s every %ss" % (self.api_host, self.interval)) - log.debug("Watchdog enabled: %s" % bool(self.watchdog)) - - # Persist a start-up message. - DogstatsdStatus().persist() - - while not self.finished.isSet(): # Use camel case isSet for 2.4 support. - self.finished.wait(self.interval) - self.flush() - if self.watchdog: - self.watchdog.reset() - - # Clean up the status messages. - log.debug("Stopped reporter") - DogstatsdStatus.remove_latest_status() - - def flush(self): - try: - self.flush_count += 1 - self.log_count += 1 - packets_per_second = self.metrics_aggregator.packets_per_second(self.interval) - packet_count = self.metrics_aggregator.total_count - - metrics = self.metrics_aggregator.flush() - count = len(metrics) - if self.flush_count % FLUSH_LOGGING_PERIOD == 0: - self.log_count = 0 - if count: - self.submit(metrics) - - events = self.metrics_aggregator.flush_events() - event_count = len(events) - if event_count: - self.submit_events(events) - - should_log = self.flush_count <= FLUSH_LOGGING_INITIAL or self.log_count <= FLUSH_LOGGING_COUNT - log_func = log.info - if not should_log: - log_func = log.debug - log_func("Flush #%s: flushed %s metric%s and %s event%s" % (self.flush_count, count, plural(count), event_count, plural(event_count))) - if self.flush_count == FLUSH_LOGGING_INITIAL: - log.info("First flushes done, %s flushes will be logged every %s flushes." % (FLUSH_LOGGING_COUNT, FLUSH_LOGGING_PERIOD)) - - # Persist a status message. - packet_count = self.metrics_aggregator.total_count - DogstatsdStatus( - flush_count=self.flush_count, - packet_count=packet_count, - packets_per_second=packets_per_second, - metric_count=count, - event_count=event_count, - ).persist() - - except Exception, e: - log.exception("Error flushing metrics") - - def submit(self, metrics): - # Copy and pasted from dogapi, because it's a bit of a pain to distribute python - # dependencies with the agent. - body = serialize_metrics(metrics) - headers = {'Content-Type':'application/json'} - method = 'POST' - - params = {} - url = '/api/v1/series?%s' % urlencode(params) - - start_time = time() - status = None - conn = self.http_conn_cls(self.api_host) - try: - conn.request(method, url, body, headers) - - #FIXME: add timeout handling code here - - response = conn.getresponse() - status = response.status - response.close() - finally: - conn.close() - duration = round((time() - start_time) * 1000.0, 4) - log.debug("%s %s %s%s (%sms)" % ( - status, method, self.api_host, url, duration)) - return duration - - def submit_events(self, events): - headers = {'Content-Type':'application/json'} - method = 'POST' - - events_len = len(events) - event_chunk_size = self.event_chunk_size - - for chunk in chunks(events, event_chunk_size): - payload = { - 'events': { - 'api': chunk - }, - 'uuid': get_uuid(), - 'hostname': get_hostname() - } - params = {} - url = '/intake?%s' % urlencode(params) - - status = None - conn = self.http_conn_cls(self.api_host) - try: - start_time = time() - conn.request(method, url, json.dumps(payload), headers) - - response = conn.getresponse() - status = response.status - response.close() - duration = round((time() - start_time) * 1000.0, 4) - log.debug("%s %s %s%s (%sms)" % ( - status, method, self.api_host, url, duration)) - - finally: - conn.close() - - -class Server(object): - """ - A statsd udp server. - """ - - def __init__(self, metrics_aggregator, host, port, forward_to_host=None, forward_to_port=None): - self.host = host - self.port = int(port) - self.address = (self.host, self.port) - self.metrics_aggregator = metrics_aggregator - self.buffer_size = 1024 * 8 - - self.running = False - - self.should_forward = forward_to_host is not None - - self.forward_udp_sock = None - # In case we want to forward every packet received to another statsd server - if self.should_forward: - if forward_to_port is None: - forward_to_port = 8125 - - log.info("External statsd forwarding enabled. All packets received will be forwarded to %s:%s" % (forward_to_host, forward_to_port)) - try: - self.forward_udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.forward_udp_sock.connect((forward_to_host, forward_to_port)) - except Exception, e: - log.exception("Error while setting up connection to external statsd server") - - def start(self): - """ Run the server. """ - # Bind to the UDP socket. - # IPv4 only - self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.socket.setblocking(0) - try: - self.socket.bind(self.address) - except socket.gaierror: - if self.address[0] == 'localhost': - log.warning("Warning localhost seems undefined in your host file, using 127.0.0.1 instead") - self.address = ('127.0.0.1', self.address[1]) - self.socket.bind(self.address) - - log.info('Listening on host & port: %s' % str(self.address)) - - # Inline variables for quick look-up. - buffer_size = self.buffer_size - #todo dogstatsd is the only thing using this method of the aggregator, is there a more standard way to do it? - aggregator_submit = self.metrics_aggregator.submit_packets - sock = [self.socket] - socket_recv = self.socket.recv - select_select = select.select - select_error = select.error - timeout = UDP_SOCKET_TIMEOUT - should_forward = self.should_forward - forward_udp_sock = self.forward_udp_sock - - # Run our select loop. - self.running = True - while self.running: - try: - ready = select_select(sock, [], [], timeout) - if ready[0]: - message = socket_recv(buffer_size) - aggregator_submit(message) - - if should_forward: - forward_udp_sock.send(message) - except select_error, se: - # Ignore interrupted system calls from sigterm. - errno = se[0] - if errno != 4: - raise - except (KeyboardInterrupt, SystemExit): - break - except Exception, e: - log.exception('Error receiving datagram') - - def stop(self): - self.running = False - - -class Dogstatsd(Daemon): - """ This class is the dogstatsd daemon. """ - - def __init__(self, pid_file, server, reporter, autorestart): - Daemon.__init__(self, pid_file, autorestart=autorestart) - self.server = server - self.reporter = reporter - - - def _handle_sigterm(self, signum, frame): - log.debug("Caught sigterm. Stopping run loop.") - self.server.stop() - - def run(self): - # Gracefully exit on sigterm. - signal.signal(signal.SIGTERM, self._handle_sigterm) - - # Handle Keyboard Interrupt - signal.signal(signal.SIGINT, self._handle_sigterm) - - # Start the reporting thread before accepting data - self.reporter.start() - - try: - try: - self.server.start() - except Exception, e: - log.exception('Error starting server') - raise e - finally: - # The server will block until it's done. Once we're here, shutdown - # the reporting thread. - self.reporter.stop() - self.reporter.join() - log.info("Dogstatsd is stopped") - # Restart if asked to restart - if self.autorestart: - sys.exit(AgentSupervisor.RESTART_EXIT_STATUS) - - def info(self): - logging.getLogger().setLevel(logging.ERROR) - return DogstatsdStatus.print_latest_status() - - -def init(config_path=None, use_watchdog=False): - """Configure the server and the reporting thread. - """ - c = get_config(parse_args=False, cfg_path=config_path) - log.debug("Configuration dogstatsd") - - port = c['dogstatsd_port'] - interval = int(c['dogstatsd_interval']) - aggregator_interval = int(c['dogstatsd_agregator_bucket_size']) - non_local_traffic = c['non_local_traffic'] - forward_to_host = c.get('statsd_forward_host') - forward_to_port = c.get('statsd_forward_port') - event_chunk_size = c.get('event_chunk_size') - - target = c['forwarder_url'] - - hostname = get_hostname(c) - - # Create the aggregator (which is the point of communication between the - # server and reporting threads. - assert 0 < interval - - aggregator = MetricsBucketAggregator(hostname, aggregator_interval, recent_point_threshold=c.get('recent_point_threshold', None)) - - # Start the reporting thread. - reporter = Reporter(interval, aggregator, target, use_watchdog, event_chunk_size) - - # Start the server on an IPv4 stack - # Default to loopback - server_host = 'localhost' - # If specified, bind to all addressses - if non_local_traffic: - server_host = '' - - server = Server(aggregator, server_host, port, forward_to_host=forward_to_host, forward_to_port=forward_to_port) - - return reporter, server, c - -def main(config_path=None): - """ The main entry point for the unix version of dogstatsd. """ - parser = optparse.OptionParser("%prog [start|stop|restart|status]") - opts, args = parser.parse_args() - - reporter, server, cnf = init(config_path, use_watchdog=True) - pid_file = PidFile('dogstatsd') - daemon = Dogstatsd(pid_file.get_path(), server, reporter, - cnf.get('autorestart', False)) - - # If no args were passed in, run the server in the foreground. - if not args: - daemon.run() - return 0 - - # Otherwise, we're process the deamon command. - else: - command = args[0] - - if command == 'start': - daemon.start() - elif command == 'stop': - daemon.stop() - elif command == 'restart': - daemon.restart() - elif command == 'status': - daemon.status() - elif command == 'info': - return daemon.info() - else: - sys.stderr.write("Unknown command: %s\n\n" % command) - parser.print_help() - return 1 - return 0 - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/monagent/win32/agent.py b/monagent/win32/agent.py index 0efa983b..42813db9 100644 --- a/monagent/win32/agent.py +++ b/monagent/win32/agent.py @@ -174,13 +174,13 @@ class DogstatsdProcess(multiprocessing.Process): self.is_enabled = True def run(self): - log.debug("Windows Service - Starting Dogstatsd server") + log.debug("Windows Service - Starting Monstatsd server") self.reporter, self.server, _ = dogstatsd.init() self.reporter.start() self.server.start() def stop(self): - log.debug("Windows Service - Stopping Dogstatsd server") + log.debug("Windows Service - Stopping Monstatsd server") self.server.stop() self.reporter.stop() self.reporter.join() diff --git a/packaging/Makefile b/packaging/Makefile index 65c81d3a..6c6bd91d 100644 --- a/packaging/Makefile +++ b/packaging/Makefile @@ -53,10 +53,10 @@ install_full: source # Install the source to usr/share cp -r $(ROOT)/* $(BUILD)/usr/share/mon/agent/ # Install the common executables. - ln -sf ../share/mon/agent/monagent/monstatsd/dogstatsd.py $(BUILD)/usr/bin/dogstatsd + ln -sf ../share/mon/agent/monagent/monstatsd/__init__.py $(BUILD)/usr/bin/monstatsd ln -sf ../share/mon/agent/monagent/forwarder/__init__.py $(BUILD)/usr/bin/mon-forwarder ln -sf ../share/mon/agent/monagent/collector/daemon.py $(BUILD)/usr/bin/mon-collector - chmod 755 $(BUILD)/usr/bin/dogstatsd + chmod 755 $(BUILD)/usr/bin/monstatsd chmod 755 $(BUILD)/usr/bin/mon-forwarder chmod 755 $(BUILD)/usr/bin/mon-collector diff --git a/packaging/mon-agent-deb/mon-agent.init b/packaging/mon-agent-deb/mon-agent.init index 7ff40857..66df14b4 100755 --- a/packaging/mon-agent-deb/mon-agent.init +++ b/packaging/mon-agent-deb/mon-agent.init @@ -16,7 +16,7 @@ export PYTHONPATH=$PYTHONPATH:/usr/share/mon/agent/ AGENTPATH="/usr/bin/mon-collector" AGENTCONF="/etc/mon-agent/agent.conf" -DOGSTATSDPATH="/usr/bin/dogstatsd" +MONSTATSDPATH="/usr/bin/monstatsd" AGENTUSER="mon-agent" FORWARDERPATH="/usr/bin/mon-forwarder" NAME="mon-agent" @@ -125,11 +125,11 @@ case "$1" in # (right now only mon-agent supports additional flags) su $AGENTUSER -c "$AGENTPATH info $@" COLLECTOR_RETURN=$? - su $AGENTUSER -c "$DOGSTATSDPATH info" - DOGSTATSD_RETURN=$? + su $AGENTUSER -c "$MONSTATSDPATH info" + MONSTATSD_RETURN=$? su $AGENTUSER -c "$FORWARDERPATH info" FORWARDER_RETURN=$? - exit $(($COLLECTOR_RETURN+$DOGSTATSD_RETURN+$FORWARDER_RETURN)) + exit $(($COLLECTOR_RETURN+$MONSTATSD_RETURN+$FORWARDER_RETURN)) ;; status) diff --git a/packaging/mon-agent-deb/postinst b/packaging/mon-agent-deb/postinst index 4c4c281f..87f8f95f 100644 --- a/packaging/mon-agent-deb/postinst +++ b/packaging/mon-agent-deb/postinst @@ -11,7 +11,7 @@ case "$1" in chown -R mon-agent:root /etc/mon-agent chown -R mon-agent:root /var/log/mon-agent chown -R root:root /usr/share/mon/agent - chown -h root:root /usr/bin/dogstatsd + chown -h root:root /usr/bin/monstatsd chown -h root:root /usr/bin/mon-collector chown -h root:root /usr/bin/mon-forwarder diff --git a/packaging/mon-agent-deb/supervisor.conf b/packaging/mon-agent-deb/supervisor.conf index 993776a3..fe3ef298 100644 --- a/packaging/mon-agent-deb/supervisor.conf +++ b/packaging/mon-agent-deb/supervisor.conf @@ -35,8 +35,8 @@ startsecs=3 priority=998 user=mon-agent -[program:dogstatsd] -command=/usr/bin/dogstatsd +[program:monstatsd] +command=/usr/bin/monstatsd stdout_logfile=NONE stderr_logfile=NONE startsecs=3 @@ -44,4 +44,4 @@ priority=998 user=mon-agent [group:mon-agent] -programs=forwarder,collector,dogstatsd +programs=forwarder,collector,monstatsd diff --git a/tests/test_dogstatsd.py b/tests/test_monstatsd.py similarity index 98% rename from tests/test_dogstatsd.py rename to tests/test_monstatsd.py index e390992d..f39fbab7 100644 --- a/tests/test_dogstatsd.py +++ b/tests/test_monstatsd.py @@ -6,10 +6,10 @@ import unittest import nose.tools as nt from monagent.common.aggregator import MetricsAggregator -from monstatsd import dogstatsd +from monagent import monstatsd -class TestUnitDogStatsd(unittest.TestCase): +class TestUnitMonStatsd(unittest.TestCase): @staticmethod def sort_metrics(metrics): @@ -94,8 +94,8 @@ class TestUnitDogStatsd(unittest.TestCase): import json from monagent.common.aggregator import api_formatter - dogstatsd.json = json - serialized = dogstatsd.serialize_metrics([api_formatter("foo", 12, 1, ('tag',), 'host')]) + monstatsd.json = json + serialized = monstatsd.serialize_metrics([api_formatter("foo", 12, 1, ('tag',), 'host')]) assert '"tags": ["tag"]' in serialized def test_counter(self): @@ -364,12 +364,12 @@ class TestUnitDogStatsd(unittest.TestCase): stats = MetricsAggregator('myhost') for i in xrange(10): stats.submit_packets('metric:10|c') - stats.send_packet_count('datadog.dogstatsd.packet.count') + stats.send_packet_count('monstatsd.packet.count') metrics = self.sort_metrics(stats.flush()) nt.assert_equals(2, len(metrics)) first, second = metrics - nt.assert_equal(first['metric'], 'datadog.dogstatsd.packet.count') + nt.assert_equal(first['metric'], 'monstatsd.packet.count') nt.assert_equal(first['points'][0][1], 10) def test_histogram_counter(self):