Adding RabbitMQ detection and setup

Change-Id: I2ed2aa0f199026adb5fee672fe77fab231f2d549
This commit is contained in:
gary-hessler 2014-09-15 09:20:49 -06:00
parent 669fbc7d30
commit 70aa0d713c
8 changed files with 329 additions and 75 deletions

View File

@ -711,7 +711,70 @@ The Kafka checks return the following metrics:
## RabbitMQ Checks
TBD
This section describes the RabbitMQ check that can be performed by the Agent. The RabbitMQ check gathers metrics on Nodes, Exchanges and Queues from the rabbit server. The RabbitMQ check requires a configuration file called rabbitmq.yaml to be available in the agent conf.d configuration directory. The config file must contain the names of the Exchanges and Queues that you are interested in monitoring.
NOTE: The agent RabbitMQ plugin requires the RabbitMQ Management Plugin to be installed. The management plugin is included in the RabbitMQ distribution. To enable it, use the rabbitmq-plugins command like this:
```
rabbitmq-plugins enable rabbitmq_management
```
Sample config:
```
init_config:
instances:
- exchanges: [nova, cinder, ceilometer, glance, keystone, neutron, heat]
nodes: [rabbit@devstack]
queues: [conductor]
rabbitmq_api_url: http://localhost:15672/api
rabbitmq_user: guest
rabbitmq_pass: guest
```
If you want the monasca-setup program to detect and auto-configure the plugin for you, you must create the file /root/.rabbitmq.cnf with the information needed in the configuration yaml file before running the setup program. It should look like this:
```
[client]
user=guest
password=pass
nodes=rabbit@devstack
queues=conductor
exchanges=nova,cinder,ceilometer,glance,keystone,neutron,heat
```
The RabbitMQ checks return the following metrics:
| Metric Name | Dimensions | Check Type |
| ----------- | ---------- | --------- |
| rabbitmq.node.fd_used | hostname, node, service=rabbitmq | Node |
| rabbitmq.node.sockets_used | hostname, node, service=rabbitmq | Node |
| rabbitmq.node.run_queue | hostname, node, service=rabbitmq | Node |
| rabbitmq.node.mem_used | hostname, node, service=rabbitmq | Node |
| rabbitmq.exchange.messages.received_count | hostname, exchange, vhost, type, service=rabbitmq | Exchange |
| rabbitmq.exchange.messages.received_rate | hostname, exchange, vhost, type, service=rabbitmq | Exchange |
| rabbitmq.exchange.messages.published_count | hostname, exchange, vhost, type, service=rabbitmq | Exchange |
| rabbitmq.exchange.messages.published_rate | hostname, exchange, vhost, type, service=rabbitmq | Exchange |
| rabbitmq.queue.consumers | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.memory | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.active_consumers | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.rate | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.ready | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.ready_rate | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.publish_count | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.publish_rate | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.deliver_count | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.deliver_rate | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.redeliver_count | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.redeliver_rate | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.unacknowledged | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.unacknowledged_rate | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.deliver_get_count | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.deliver_get_rate | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.ack_count | hostname, queue, vhost, service=rabbitmq | Queue |
| rabbitmq.queue.messages.ack_rate | hostname, queue, vhost, service=rabbitmq | Queue |
## OpenStack Monitoring
The `monasca-setup` script when run on a system that is running OpenStack services, configures the Agent to send the following list of metrics:
@ -880,7 +943,7 @@ The following ceilometer processes are monitored, if they exist when the monasca
# Statsd
The Monasca Agent ships with a Statsd daemon implementation called monasca-statsd. A statsd client can be used to send metrics to the Forwarder via the Statsd daemon.
monascastatsd will accept metrics submitted by functions in either the standard statsd Python client library, or the monasca-agent's [python-monasca-statsd Python client library](https://github.com/hpcloud-mon/python-monascastatsd). The advantage of using the python-monasca-statsd library is that it is possible to specify dimensions on submitted metrics. Dimensions are not handled by the standard statsd client.
monascastatsd will accept metrics submitted by functions in either the standard statsd Python client library, or the monasca-agent's [monasca-statsd Python client library](https://github.com/stackforge/monasca-statsd). The advantage of using the python-monasca-statsd library is that it is possible to specify dimensions on submitted metrics. Dimensions are not handled by the standard statsd client.
Statsd metrics are not bundled along with the metrics gathered by the Collector, but are flushed to the agent Forwarder on a separate schedule (every 10 seconds by default, rather than 15 seconds for Collector metrics).
@ -894,9 +957,9 @@ statsd.timing('pipeline', 2468.34) # Pipeline took 2468.34 ms to execute
statsd.gauge('gaugething', 3.14159265) # 'gauge' would be the preferred metric type for Monitoring
```
The [python-monasca-statsd](https://github.com/hpcloud-mon/python-monascastatsd) library provides a python based implementation of a statsd client but also adds the ability to add dimensions to the the statsd metrics for the client.
The [monasca-statsd](https://github.com/stackforge/monasca-statsd library provides a python based implementation of a statsd client but also adds the ability to add dimensions to the the statsd metrics for the client.
Here are some examples of how code can be instrumented using calls to python-monascastatsd.
Here are some examples of how code can be instrumented using calls to monasca-statsd.
```
* Import the module once it's installed.

View File

@ -1,7 +1,24 @@
init_config:
instances:
# for every instance a 'rabbitmq_api_url' must be provided, pointing to the api
# NOTE: The rabbitmq management plugin must be enabled for this check to work!
# To enable the management plugin:
# sudo rabbitmq-plugins enable rabbitmq_management
# OUTPUT:
# The following plugins have been enabled:
# mochiweb
# webmachine
# rabbitmq_web_dispatch
# amqp_client
# rabbitmq_management_agent
# rabbitmq_management
# Plugin configuration has changed. Restart RabbitMQ for changes to take effect.
# To restart the rabbitmq-server
# sudo service rabbitmq-server restart
# OUTPUT:
# * Restarting message broker rabbitmq-server
#
# For every instance a 'rabbitmq_api_url' must be provided, pointing to the api
# url of the RabbitMQ Managment Plugin (http://www.rabbitmq.com/management.html)
# optional: 'rabbitmq_user' (default: guest) and 'rabbitmq_pass' (default: guest)
#

View File

@ -164,7 +164,7 @@ class Collector(object):
metrics_list.append(monagent.common.metrics.Measurement(name,
timestamp,
value,
{'service': 'monasca', 'component': 'collector'}))
{'component': 'collector'}))
emitter_statuses = self._emit(metrics_list)
self.emit_duration = timer.step()

View File

@ -8,35 +8,71 @@ from monagent.collector.checks import AgentCheck
EVENT_TYPE = SOURCE_TYPE_NAME = 'rabbitmq'
QUEUE_TYPE = 'queues'
EXCHANGE_TYPE = 'exchanges'
NODE_TYPE = 'nodes'
MAX_DETAILED_QUEUES = 200
MAX_DETAILED_NODES = 100
MAX_DETAILED_QUEUES = 150
MAX_DETAILED_EXCHANGES = 100
MAX_DETAILED_NODES = 50
# Post an event in the stream when the number of queues or nodes to
# collect is above 90% of the limit
ALERT_THRESHOLD = 0.9
QUEUE_ATTRIBUTES = ['active_consumers',
'consumers',
'memory',
'messages',
'messages_ready',
'messages_unacknowledged']
QUEUE_ATTRIBUTES = [
# Path, Name
('active_consumers', 'active_consumers'),
('consumers', 'consumers'),
('memory', 'memory'),
NODE_ATTRIBUTES = ['fd_used',
'mem_used',
'run_queue',
'sockets_used']
('messages', 'messages'),
('messages_details/rate', 'messages.rate'),
ATTRIBUTES = {QUEUE_TYPE: QUEUE_ATTRIBUTES, NODE_TYPE: NODE_ATTRIBUTES}
('messages_ready', 'messages.ready'),
('messages_ready_details/rate', 'messages.ready_rate'),
('messages_unacknowledged', 'messages.unacknowledged'),
('messages_unacknowledged_details/rate', 'messages.unacknowledged_rate'),
('message_stats/ack', 'messages.ack_count'),
('message_stats/ack_details/rate', 'messages.ack_rate'),
('message_stats/deliver', 'messages.deliver_count'),
('message_stats/deliver_details/rate', 'messages.deliver_rate'),
('message_stats/deliver_get', 'messages.deliver_get_count'),
('message_stats/deliver_get_details/rate', 'messages.deliver_get_rate'),
('message_stats/publish', 'messages.publish_count'),
('message_stats/publish_details/rate', 'messages.publish_rate'),
('message_stats/redeliver', 'messages.redeliver_count'),
('message_stats/redeliver_details/rate', 'messages.redeliver_rate')]
EXCHANGE_ATTRIBUTES = [('message_stats/publish_out', 'messages.published_count'),
('message_stats/publish_out_details/rate', 'messages.published_rate'),
('message_stats/publish_in', 'messages.received_count'),
('message_stats/publish_in_details/rate', 'messages.received_rate')]
NODE_ATTRIBUTES = [
('fd_used', 'fd_used'),
('mem_used', 'mem_used'),
('run_queue', 'run_queue'),
('sockets_used', 'sockets_used')]
ATTRIBUTES = {QUEUE_TYPE: QUEUE_ATTRIBUTES,
EXCHANGE_TYPE: EXCHANGE_ATTRIBUTES,
NODE_TYPE: NODE_ATTRIBUTES}
DIMENSIONS_MAP = {
QUEUE_TYPE: {'node': 'node',
'name': 'queue',
'vhost': 'vhost',
'policy': 'policy'},
NODE_TYPE: {'name': 'node'}
'queues': {'name': 'queue',
'vhost': 'vhost',
'policy': 'policy'},
'exchanges': {'name': 'exchange',
'vhost': 'vhost',
'type': 'type'},
'nodes': {'name': 'node'}
}
METRIC_SUFFIX = {QUEUE_TYPE: "queue", NODE_TYPE: "node"}
METRIC_SUFFIX = {QUEUE_TYPE: "queue", EXCHANGE_TYPE: "exchange", NODE_TYPE: "node"}
class RabbitMQ(AgentCheck):
@ -62,17 +98,20 @@ class RabbitMQ(AgentCheck):
base_url += '/'
username = instance.get('rabbitmq_user', 'guest')
password = instance.get('rabbitmq_pass', 'guest')
dimensions = instance.get('dimensions', {})
# Limit of queues/nodes to collect metrics from
max_detailed = {
QUEUE_TYPE: int(instance.get('max_detailed_queues', MAX_DETAILED_QUEUES)),
EXCHANGE_TYPE: int(instance.get('max_detailed_exchanges', MAX_DETAILED_EXCHANGES)),
NODE_TYPE: int(instance.get('max_detailed_nodes', MAX_DETAILED_NODES)),
}
# List of queues/nodes to collect metrics from
specified = {
QUEUE_TYPE: instance.get('queues', []),
NODE_TYPE: instance.get('nodes', []),
EXCHANGE_TYPE: instance.get('exchanges', []),
NODE_TYPE: instance.get('nodes', [])
}
for object_type, specified_objects in specified.iteritems():
@ -86,13 +125,29 @@ class RabbitMQ(AgentCheck):
opener = urllib2.build_opener(auth_handler)
urllib2.install_opener(opener)
return base_url, max_detailed, specified
return base_url, max_detailed, specified, dimensions
def check(self, instance):
base_url, max_detailed, specified = self._get_config(instance)
self.get_stats(
instance, base_url, QUEUE_TYPE, max_detailed[QUEUE_TYPE], specified[QUEUE_TYPE])
self.get_stats(instance, base_url, NODE_TYPE, max_detailed[NODE_TYPE], specified[NODE_TYPE])
base_url, max_detailed, specified, dimensions = self._get_config(instance)
self.get_stats(instance,
base_url,
QUEUE_TYPE,
max_detailed[QUEUE_TYPE],
list(specified[QUEUE_TYPE]),
dimensions.copy())
self.get_stats(instance,
base_url,
NODE_TYPE,
max_detailed[NODE_TYPE],
list(specified[NODE_TYPE]),
dimensions.copy())
self.get_stats(instance,
base_url,
EXCHANGE_TYPE,
max_detailed[EXCHANGE_TYPE],
specified[EXCHANGE_TYPE],
dimensions.copy())
@staticmethod
def _get_data(url):
@ -104,15 +159,14 @@ class RabbitMQ(AgentCheck):
raise Exception('Cannot parse JSON response from API url: %s %s' % (url, str(e)))
return data
def get_stats(self, instance, base_url, object_type, max_detailed, specified_list):
def get_stats(self, instance, base_url, object_type, max_detailed, specified_list, dimensions):
"""instance: the check instance
base_url: the url of the rabbitmq management api (e.g. http://localhost:15672/api)
object_type: either QUEUE_TYPE or NODE_TYPE
object_type: either QUEUE_TYPE, EXCHANGE_TYPE or NODE_TYPE
max_detailed: the limit of objects to collect for this type
specified_list: a list of specified queues or nodes (specified in the yaml file)
"""
data = self._get_data(urlparse.urljoin(base_url, object_type))
# Make a copy of this list as we will remove items from it at each iteration
specified_items = list(specified_list)
@ -130,25 +184,17 @@ class RabbitMQ(AgentCheck):
raise Exception("The maximum number of %s you can specify is %d." %
(object_type, max_detailed))
# a list of queues/nodes is specified. We process only those
# If a list of exchanges/queues/nodes is specified,
# we process only those.
if specified_items is not None and len(specified_items) > 0:
if object_type == NODE_TYPE:
for data_line in data:
name = data_line.get("name")
if name in specified_items:
self._get_metrics(data_line, object_type)
specified_items.remove(name)
else: # object_type == QUEUE_TYPE
for data_line in data:
name = data_line.get("name")
absolute_name = '%s/%s' % (data_line.get("vhost"), name)
if name in specified_items:
self._get_metrics(data_line, object_type)
specified_items.remove(name)
elif absolute_name in specified_items:
self._get_metrics(data_line, object_type)
specified_items.remove(absolute_name)
for data_line in data:
name = data_line.get("name")
if name not in specified_items:
if object_type == QUEUE_TYPE:
name = '%s/%s' % (data_line.get("vhost"), name)
if name in specified_items:
self._get_metrics(data_line, object_type, dimensions)
specified_items.remove(name)
# No queues/node are specified. We will process every queue/node if it's under the limit
else:
@ -159,30 +205,38 @@ class RabbitMQ(AgentCheck):
if len(data) > max_detailed:
# Display a warning in the info page
self.warning(
"Too many queues to fetch. You must choose the %s you are interested in by editing the rabbitmq.yaml configuration file or get in touch with Datadog Support" %
"Too many queues to fetch. You must choose the %s you are interested in by editing the rabbitmq.yaml configuration file" %
object_type)
for data_line in data[:max_detailed]:
# We truncate the list of nodes/queues if it's above the limit
self._get_metrics(data_line, object_type)
self._get_metrics(data_line, object_type, dimensions)
def _get_metrics(self, data, object_type):
dimensions = {}
dimensions_list = DIMENSIONS_MAP[object_type]
def _get_metrics(self, data, object_type, dimensions):
dimensions_list = DIMENSIONS_MAP[object_type].copy()
for d in dimensions_list.iterkeys():
dim = data.get(d, None)
if dim is not None:
dimensions['rabbitmq_%s' % dimensions_list[d]] = dim
if dim not in [None, ""]:
dimensions[dimensions_list[d]] = dim
if not "service" in dimensions:
dimensions['service'] = 'rabbitmq'
for attribute in ATTRIBUTES[object_type]:
value = data.get(attribute, None)
if value is not None:
try:
self.gauge('rabbitmq.%s.%s' % (METRIC_SUFFIX[object_type], attribute), float(
value), dimensions=dimensions)
except ValueError:
self.log.debug("Caught ValueError for %s %s = %s with dimensions: %s" % (
METRIC_SUFFIX[object_type], attribute, value, dimensions))
for attribute, metric_name in ATTRIBUTES[object_type]:
# Walk down through the data path, e.g. foo/bar => d['foo']['bar']
root = data
keys = attribute.split('/')
for path in keys[:-1]:
root = root.get(path, {})
value = root.get(keys[-1], None)
if value == None:
value = 0.0
try:
self.log.debug("Collected data for %s: metric name: %s: value: %f dimensions: %s" % (object_type, metric_name, float(value), str(dimensions)))
self.gauge('rabbitmq.%s.%s' % (METRIC_SUFFIX[object_type], metric_name), float(value), dimensions=dimensions.copy())
except ValueError:
self.log.debug("Caught ValueError for %s %s = %s with dimensions: %s" % (
METRIC_SUFFIX[object_type], attribute, value, dimensions))
def alert(self, base_url, max_detailed, size, object_type):
key = "%s%s" % (base_url, object_type)
@ -195,7 +249,7 @@ class RabbitMQ(AgentCheck):
title = "RabbitMQ integration is approaching the limit on the number of %s that can be collected from on %s" % (
object_type, self.hostname)
msg = """%s %s are present. The limit is %s.
Please get in touch with Datadog support to increase the limit.""" % (size, object_type, max_detailed)
Please get in touch with Monasca development to increase the limit.""" % (size, object_type, max_detailed)
event = {
"timestamp": int(time.time()),

View File

@ -285,12 +285,14 @@ class MetricsAggregator(Aggregator):
device_name=None, timestamp=None, sample_rate=1):
# Avoid calling extra functions to dedupe dimensions if there are none
if dimensions is None:
dimensions = {}
context = (name, tuple(dimensions.items()), hostname, device_name)
new_dimensions = {}
else:
new_dimensions = dimensions.copy()
context = (name, tuple(new_dimensions.items()), hostname, device_name)
if context not in self.metrics:
metric_class = self.metric_type_to_class[mtype]
self.metrics[context] = metric_class(self.formatter, name, dimensions,
self.metrics[context] = metric_class(self.formatter, name, new_dimensions,
hostname or self.hostname, device_name)
cur_time = time()
if timestamp is not None and cur_time - int(timestamp) > self.recent_point_threshold:

View File

@ -98,5 +98,5 @@ def dropwizard_health_check(name, url):
'instances': [{'name': name,
'url': url,
'timeout': 1,
'include_content': True}]}
'include_content': False}]}
return config

View File

@ -0,0 +1,116 @@
import logging
import urllib2
import monsetup.agent_config
import monsetup.detection
log = logging.getLogger(__name__)
rabbit_conf = '/root/.rabbitmq.cnf'
rabbitmq_api_url = 'http://localhost:15672/api'
class RabbitMQ(monsetup.detection.Plugin):
"""Detect RabbitMQ daemons and setup configuration to monitor them.
This plugin needs user/pass info for rabbitmq setup, this is
best placed in /root/.rabbit.cnf. You can also specify exchanges
and rabbitmq nodes that you want to monitor in a format such as
[client]
user = guest
password = guest
nodes=rabbit@localhost, rabbit2@domain
exchanges=nova, cinder, neutron
"""
def _detect(self):
"""Run detection, set self.available True if the service is detected.
"""
if monsetup.detection.find_process_cmdline('rabbitmq-server') is not None:
self.available = True
def build_config(self):
"""Build the config as a Plugins object and return.
"""
config = monsetup.agent_config.Plugins()
# First watch the process
config.merge(monsetup.detection.watch_process(['rabbitmq-server']))
log.info("\tWatching the rabbitmq-server process.")
# Attempt login, requires either an empty root password from localhost
# or relying on a configured /root/.rabbit.cnf
if self.dependencies_installed():
log.info(
"\tUsing client credentials from {:s}".format(rabbit_conf))
# Read the rabbitmq config file to extract the needed variables.
client_section = False
rabbit_user = 'guest'
rabbit_pass = 'guest'
try:
with open(rabbit_conf, "r") as confFile:
for row in confFile:
if "[client]" in row:
client_section = True
pass
if client_section:
if "user=" in row:
rabbit_user = row.split("=")[1].strip()
if "password=" in row:
rabbit_pass = row.split("=")[1].strip()
if "exchanges=" in row:
rabbit_exchanges = row.split("=")[1].strip()
if "queues=" in row:
rabbit_queues = row.split("=")[1].strip()
if "nodes=" in row:
rabbit_nodes =row.split("=")[1].strip()
except IOError:
log.warn("\tI/O error reading {:s}".format(rabbit_conf))
log.warn("\tWill try to setup RabbitMQ plugin using default credentials guest:guest")
url = rabbitmq_api_url + '/aliveness-test/%2F'
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None,
rabbitmq_api_url,
rabbit_user,
rabbit_pass)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
response = None
try:
request = opener.open(url)
response = request.read()
request.close()
if '{"status":"ok"}' in response:
config['rabbitmq'] = {'init_config': None,'instances':
[{'rabbitmq_api_url': rabbitmq_api_url,
'rabbitmq_user': rabbit_user,
'rabbitmq_pass': rabbit_pass,
'queues': [x.strip() for x in rabbit_queues.split(',')],
'exchanges': [x.strip() for x in rabbit_exchanges.split(',')],
'nodes': [x.strip() for x in rabbit_nodes.split(',')]}]}
else:
log.warn('Unable to access the RabbitMQ admin URL;' +
' the RabbitMQ plugin is not configured.' +
' Please correct and re-run monasca-setup.')
except urllib2.HTTPError, e:
log.error('Error code %s received when accessing %s' % (e.code, url) +
' RabbitMQ plugin is not configured.')
else:
log.error('\tThe RabbitMQ management console is not installed or unavailable.' +
' RabbitMQ plugin is not configured.')
return config
def dependencies_installed(self):
# ensure the rabbit management api is available
try:
urllib2.urlopen(rabbitmq_api_url).read()
except urllib2.URLError:
return False
return True

View File

@ -16,6 +16,7 @@ import agent_config
from detection.plugins import kafka
from detection.plugins import mon
from detection.plugins import mysql
from detection.plugins import rabbitmq
from detection.plugins import network
from detection.plugins import zookeeper
from detection.plugins import nova
@ -29,8 +30,9 @@ from service import sysv
# List of all detection plugins to run
DETECTION_PLUGINS = [kafka.Kafka, mon.MonAPI, mon.MonPersister, mon.MonThresh, mysql.MySQL,
network.Network, nova.Nova, cinder.Cinder, swift.Swift, glance.Glance,
ceilometer.Ceilometer, neutron.Neutron, keystone.Keystone, zookeeper.Zookeeper]
rabbitmq.RabbitMQ, network.Network, nova.Nova, cinder.Cinder, swift.Swift,
glance.Glance, ceilometer.Ceilometer, neutron.Neutron, keystone.Keystone,
zookeeper.Zookeeper]
# Map OS to service type
OS_SERVICE_MAP = {'Linux': sysv.SysV}