browbeat/browbeat-containers/collectd-openstack/files/collectd_ceph_storage.py

336 lines
14 KiB
Python

#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Collectd python plugin to read ceph storage stats from ceph command line for
an OpenStack Cloud.
"""
import collectd
import json
import os
import subprocess
import time
import traceback
class CollectdCephStorage(object):
def __init__(self):
self.ceph_cluster = None
self.ceph_rados_bench = False
self.ceph_rados_bench_interval = 60
self.ceph_mon_stats = False
self.ceph_mon_stats_interval = 10
self.ceph_osd_stats = False
self.ceph_osd_stats_interval = 10
self.ceph_pg_stats = False
self.ceph_pg_stats_interval = 10
self.ceph_pool_stats = False
self.ceph_pool_stats_interval = 10
def configure_callback(self, config):
for node in config.children:
val = str(node.values[0])
if node.key == 'CephRadosBench':
self.ceph_rados_bench = val in ['True', 'true']
elif node.key == 'CephMONStats':
self.ceph_mon_stats = val in ['True', 'true']
elif node.key == 'CephOSDStats':
self.ceph_osd_stats = val in ['True', 'true']
elif node.key == 'CephPGStats':
self.ceph_pg_stats = val in ['True', 'true']
elif node.key == 'CephPoolStats':
self.ceph_pool_stats = val in ['True', 'true']
elif node.key == 'CephCluster':
self.ceph_cluster = val
elif node.key == 'CephRadosBenchInterval':
self.ceph_rados_bench_interval = int(float(val))
elif node.key == 'CephMONStatsInterval':
self.ceph_mon_stats_interval = int(float(val))
elif node.key == 'CephOSDStatsInterval':
self.ceph_osd_stats_interval = int(float(val))
elif node.key == 'CephPGStatsInterval':
self.ceph_pg_stats_interval = int(float(val))
elif node.key == 'CephPoolStatsInterval':
self.ceph_pool_stats_interval = int(float(val))
else:
collectd.warning(
'collectd-ceph-storage: Unknown config key: {}'
.format(node.key))
if not self.ceph_cluster:
collectd.warning('collectd-ceph-storage: CephCluster Undefined')
if self.ceph_rados_bench:
collectd.info('Registered Ceph Rados Bench')
collectd.register_read(
self.read_ceph_rados_bench,
self.ceph_rados_bench_interval, name='ceph-rados-bench')
if self.ceph_mon_stats:
collectd.info('Registered Ceph Mon')
collectd.register_read(
self.read_ceph_mon, self.ceph_mon_stats_interval,
name='ceph-monitor')
if self.ceph_osd_stats:
collectd.info('Registered Ceph OSD')
collectd.register_read(
self.read_ceph_osd, self.ceph_osd_stats_interval,
name='ceph-osd')
if self.ceph_pg_stats:
collectd.info('Registered Ceph PG')
collectd.register_read(
self.read_ceph_pg, self.ceph_pg_stats_interval, name='ceph-pg')
if self.ceph_pool_stats:
collectd.info('Registered Ceph Pool')
collectd.register_read(
self.read_ceph_pool, self.ceph_pool_stats_interval,
name='ceph-pool')
def dispatch_value(self, plugin_instance, type_instance, value, interval):
metric = collectd.Values()
metric.plugin = 'collectd-ceph-storage'
metric.interval = interval
metric.type = 'gauge'
metric.plugin_instance = plugin_instance
metric.type_instance = type_instance
metric.values = [value]
metric.dispatch()
def read_ceph_rados_bench(self):
"""Runs "rados bench" and collects latencies reported."""
rados_bench_ran, output = self.run_command(
['timeout', '30s', 'rados', '-p', 'rbd', 'bench', '10',
'write', '-t', '1', '-b', '65536', '2>/dev/null', '|',
'grep', '-i', 'latency', '|', 'awk',
'\'{print 1000*$3}\''], False)
if rados_bench_ran:
results = output.split('\n')
self.dispatch_value(
'cluster', 'avg_latency', results[0],
self.ceph_rados_bench_interval)
self.dispatch_value(
'cluster', 'stddev_latency', results[1],
self.ceph_rados_bench_interval)
self.dispatch_value(
'cluster', 'max_latency', results[2],
self.ceph_rados_bench_interval)
self.dispatch_value(
'cluster', 'min_latency', results[3],
self.ceph_rados_bench_interval)
def read_ceph_mon(self):
"""Reads stats from "ceph mon dump" command."""
mon_dump_ran, output = self.run_command(
['ceph', 'mon', 'dump', '-f', 'json', '--cluster',
self.ceph_cluster])
if mon_dump_ran:
json_data = json.loads(output)
self.dispatch_value(
'mon', 'number', len(json_data['mons']),
self.ceph_mon_stats_interval)
self.dispatch_value(
'mon', 'quorum', len(json_data['quorum']),
self.ceph_mon_stats_interval)
def read_ceph_osd(self):
"""Reads stats from "ceph osd dump" command."""
osd_dump_ran, output = self.run_command(
['ceph', 'osd', 'dump', '-f', 'json', '--cluster',
self.ceph_cluster])
if osd_dump_ran:
json_data = json.loads(output)
self.dispatch_value(
'pool', 'number', len(json_data['pools']),
self.ceph_osd_stats_interval)
for pool in json_data['pools']:
pool_name = 'pool-{}'.format(pool['pool_name'])
self.dispatch_value(
pool_name, 'size', pool['size'],
self.ceph_osd_stats_interval)
self.dispatch_value(
pool_name, 'pg_num', pool['pg_num'],
self.ceph_osd_stats_interval)
self.dispatch_value(
pool_name, 'pgp_num', pool['pg_placement_num'],
self.ceph_osd_stats_interval)
osds_up = 0
osds_down = 0
osds_in = 0
osds_out = 0
for osd in json_data['osds']:
if osd['up'] == 1:
osds_up += 1
else:
osds_down += 1
if osd['in'] == 1:
osds_in += 1
else:
osds_out += 1
self.dispatch_value(
'osd', 'up', osds_up, self.ceph_osd_stats_interval)
self.dispatch_value(
'osd', 'down', osds_down, self.ceph_osd_stats_interval)
self.dispatch_value(
'osd', 'in', osds_in, self.ceph_osd_stats_interval)
self.dispatch_value(
'osd', 'out', osds_out, self.ceph_osd_stats_interval)
def read_ceph_pg(self):
"""Reads stats from "ceph pg dump" command."""
pg_dump_ran, output = self.run_command(
['ceph', 'pg', 'dump', '-f', 'json', '--cluster',
self.ceph_cluster])
if pg_dump_ran:
json_data = json.loads(output)
pgs = {}
for pg in json_data['pg_map']['pg_stats']:
for state in pg['state'].split('+'):
if state not in pgs:
pgs[state] = 0
pgs[state] += 1
for state in pgs:
self.dispatch_value(
'pg', state, pgs[state], self.ceph_pg_stats_interval)
for osd in json_data['pg_map']['osd_stats']:
osd_id = 'osd-{}'.format(osd['osd'])
self.dispatch_value(
osd_id, 'kb_used', osd['kb_used'],
self.ceph_pg_stats_interval)
self.dispatch_value(
osd_id, 'kb_total', osd['kb'], self.ceph_pg_stats_interval)
self.dispatch_value(
osd_id, 'snap_trim_queue_len', osd['snap_trim_queue_len'],
self.ceph_pg_stats_interval)
self.dispatch_value(
osd_id, 'num_snap_trimming', osd['num_snap_trimming'],
self.ceph_pg_stats_interval)
if 'fs_perf_stat' in osd:
self.dispatch_value(
osd_id, 'apply_latency_ms',
osd['fs_perf_stat']['apply_latency_ms'],
self.ceph_pg_stats_interval)
self.dispatch_value(
osd_id, 'commit_latency_ms',
osd['fs_perf_stat']['commit_latency_ms'],
self.ceph_pg_stats_interval)
elif 'perf_stat' in osd:
self.dispatch_value(
osd_id, 'apply_latency_ms',
osd['perf_stat']['apply_latency_ms'],
self.ceph_pg_stats_interval)
self.dispatch_value(
osd_id, 'commit_latency_ms',
osd['perf_stat']['commit_latency_ms'],
self.ceph_pg_stats_interval)
def read_ceph_pool(self):
"""Reads stats from "ceph osd pool" and "ceph df" commands."""
stats_ran, stats_output = self.run_command(
['ceph', 'osd', 'pool', 'stats', '-f', 'json'])
df_ran, df_output = self.run_command(['ceph', 'df', '-f', 'json'])
if stats_ran:
json_stats_data = json.loads(stats_output)
for pool in json_stats_data:
pool_key = 'pool-{}'.format(pool['pool_name'])
for stat in (
'read_bytes_sec', 'write_bytes_sec', 'read_op_per_sec',
'write_op_per_sec'):
value = 0
if stat in pool['client_io_rate']:
value = pool['client_io_rate'][stat]
self.dispatch_value(
pool_key, stat, value, self.ceph_pool_stats_interval)
if df_ran:
json_df_data = json.loads(df_output)
for pool in json_df_data['pools']:
pool_key = 'pool-{}'.format(pool['name'])
for stat in ('bytes_used', 'kb_used', 'objects'):
value = pool['stats'][stat] if stat in pool['stats'] else 0
self.dispatch_value(
pool_key, stat, value, self.ceph_pool_stats_interval)
if 'total_bytes' in json_df_data['stats']:
# ceph 0.84+
self.dispatch_value(
'cluster', 'total_space',
int(json_df_data['stats']['total_bytes']),
self.ceph_pool_stats_interval)
self.dispatch_value(
'cluster', 'total_used',
int(json_df_data['stats']['total_used_bytes']),
self.ceph_pool_stats_interval)
self.dispatch_value(
'cluster', 'total_avail',
int(json_df_data['stats']['total_avail_bytes']),
self.ceph_pool_stats_interval)
else:
# ceph < 0.84
self.dispatch_value(
'cluster', 'total_space',
int(json_df_data['stats']['total_space']) * 1024.0,
self.ceph_pool_stats_interval)
self.dispatch_value(
'cluster', 'total_used',
int(json_df_data['stats']['total_used']) * 1024.0,
self.ceph_pool_stats_interval)
self.dispatch_value(
'cluster', 'total_avail',
int(json_df_data['stats']['total_avail']) * 1024.0,
self.ceph_pool_stats_interval)
def run_command(self, command, check_output=True):
"""Run a command for this collectd plugin. Returns a tuple with command
success and output or False and None for output.
"""
output = None
try:
if check_output:
output = subprocess.check_output(command)
else:
process = subprocess.Popen(' '.join(command), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
output = stdout.decode('utf-8')
except Exception as exc:
collectd.error(
'collectd-ceph-storage: {} exception: {}'.format(command, exc))
collectd.error(
'collectd-ceph-storage: {} traceback: {}'
.format(command, traceback.format_exc()))
return False, None
if output is None:
collectd.error(
'collectd-ceph-storage: failed to {}: output is None'
.format(command))
return False, None
return True, output
collectd_ceph_storage = CollectdCephStorage()
collectd.register_config(collectd_ceph_storage.configure_callback)