From 1585f351e060a6aafcfaa7d47098284e0fbc62f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Rossigneux?= Date: Wed, 12 Dec 2012 16:24:41 +0100 Subject: [PATCH] RRD plugin. Remove "conf" arguments (use cfg.CONF instead). --- AUTHORS | 16 ++++ bin/kwapi-api | 12 ++- bin/kwapi-drivers | 6 +- bin/kwapi-rrd | 22 +++++ etc/kwapi/rrd.conf | 20 +++++ kwapi/drivers/driver_manager.py | 22 ++--- kwapi/plugins/api/app.py | 16 ++-- kwapi/plugins/api/collector.py | 40 ++++----- kwapi/plugins/rrd/__init__.py | 0 kwapi/plugins/rrd/app.py | 36 ++++++++ kwapi/plugins/rrd/rrd.py | 111 +++++++++++++++++++++++++ kwapi/plugins/rrd/templates/index.html | 6 ++ kwapi/plugins/rrd/v1.py | 30 +++++++ setup.py | 8 +- 14 files changed, 293 insertions(+), 52 deletions(-) create mode 100644 AUTHORS create mode 100755 bin/kwapi-rrd create mode 100644 etc/kwapi/rrd.conf create mode 100644 kwapi/plugins/rrd/__init__.py create mode 100644 kwapi/plugins/rrd/app.py create mode 100644 kwapi/plugins/rrd/rrd.py create mode 100644 kwapi/plugins/rrd/templates/index.html create mode 100644 kwapi/plugins/rrd/v1.py diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 0000000..bb934ce --- /dev/null +++ b/AUTHORS @@ -0,0 +1,16 @@ +PROJECT COORDINATOR: +François Rossigneux + +MAIN DEVELOPERS: +François Rossigneux + +CONTRIBUTORS: +Jean-Patrick Gelas +Laurent Lefèvre + +PLUGINS MAINTAINERS: +RRD graph builder : Jean-Patrick Gelas +WTT file logging : Jean-Patrick Gelas + + +FORMER MAINTAINER: diff --git a/bin/kwapi-api b/bin/kwapi-api index d021668..af2bb23 100755 --- a/bin/kwapi-api +++ b/bin/kwapi-api @@ -4,11 +4,19 @@ import sys from kwapi.plugins.api import app -from kwapi.openstack.common import log, cfg +from kwapi.openstack.common import cfg, log + +app_opts = [ + cfg.IntOpt('api_port', + required=True, + ), + ] + +cfg.CONF.register_opts(app_opts) if __name__ == '__main__': cfg.CONF(sys.argv[1:], project='kwapi', default_config_files=['/etc/kwapi/api.conf']) log.setup('kwapi') - root = app.make_app(cfg.CONF) + root = app.make_app() root.run(host='0.0.0.0', port=cfg.CONF.api_port) diff --git a/bin/kwapi-drivers b/bin/kwapi-drivers index dbb8d80..36377f1 100755 --- a/bin/kwapi-drivers +++ b/bin/kwapi-drivers @@ -11,9 +11,9 @@ if __name__ == "__main__": cfg.CONF(sys.argv[1:], project='kwapi', default_config_files=['/etc/kwapi/drivers.conf']) log.setup('kwapi') - driver_manager.start_zmq_server(cfg.CONF) - driver_manager.load_all_drivers(cfg.CONF) - driver_manager.check_drivers_alive(cfg.CONF) + driver_manager.start_zmq_server() + driver_manager.load_all_drivers() + driver_manager.check_drivers_alive() signal.signal(signal.SIGTERM, driver_manager.signal_handler) try: diff --git a/bin/kwapi-rrd b/bin/kwapi-rrd new file mode 100755 index 0000000..676e82b --- /dev/null +++ b/bin/kwapi-rrd @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import sys + +from kwapi.plugins.rrd import app +from kwapi.openstack.common import cfg, log + +app_opts = [ + cfg.IntOpt('rrd_port', + required=True, + ), + ] + +cfg.CONF.register_opts(app_opts) + +if __name__ == '__main__': + cfg.CONF(sys.argv[1:], project='kwapi', default_config_files=['/etc/kwapi/rrd.conf']) + log.setup('kwapi') + + root = app.make_app() + root.run(host='0.0.0.0', port=cfg.CONF.rrd_port) diff --git a/etc/kwapi/rrd.conf b/etc/kwapi/rrd.conf new file mode 100644 index 0000000..c8f6a45 --- /dev/null +++ b/etc/kwapi/rrd.conf @@ -0,0 +1,20 @@ +# Kwapi config file + +[DEFAULT] + +# Communication +rrd_port = 8080 +probes_endpoint = ipc:///tmp/kwapi + +# Storage +rrd_dir = /tmp/kwapi-rrd + +# Timers +cleaning_interval = 300 +rebuild_graphs_interval = 5 + +# Billing +kwh_price = 0.125 + +# Log files +log_file = /tmp/kwapi-api.log diff --git a/kwapi/drivers/driver_manager.py b/kwapi/drivers/driver_manager.py index 53f295e..19a9669 100644 --- a/kwapi/drivers/driver_manager.py +++ b/kwapi/drivers/driver_manager.py @@ -15,10 +15,10 @@ from kwapi.openstack.common import cfg, log LOG = log.getLogger(__name__) driver_manager_opts = [ - cfg.StrOpt('probes_endpoint', + cfg.IntOpt('check_drivers_interval', required=True, ), - cfg.IntOpt('check_drivers_interval', + cfg.StrOpt('probes_endpoint', required=True, ), ] @@ -27,8 +27,8 @@ cfg.CONF.register_opts(driver_manager_opts) threads = [] -def load_all_drivers(conf): - """Loads all drivers from config.""" +def load_all_drivers(): + """Loads all drivers from config file.""" parser = cfg.ConfigParser(cfg.CONF.config_file[0], {}) parser.parse() for section, entries in parser.sections.iteritems(): @@ -56,7 +56,7 @@ def load_driver(class_name, probe_ids, kwargs): probeObject.start() return probeObject -def check_drivers_alive(conf): +def check_drivers_alive(): """Checks all drivers and reloads those that crashed. This method is executed automatically at the given interval. @@ -68,19 +68,21 @@ def check_drivers_alive(conf): new_thread = load_driver(thread.__class__.__name__, thread.probe_ids, thread.kwargs) if new_thread is not None: threads[index] = new_thread - if conf.check_drivers_interval > 0: - timer = Timer(conf.check_drivers_interval, check_drivers_alive, [conf]) + + # Schedule periodic execution of this function + if cfg.CONF.check_drivers_interval > 0: + timer = Timer(cfg.CONF.check_drivers_interval, check_drivers_alive) timer.daemon = True timer.start() -def start_zmq_server(conf): - """Forwards probe values to the probes_endpoint defined in conf.""" +def start_zmq_server(): + """Forwards probe values to the probes_endpoint.""" context = zmq.Context.instance() frontend = context.socket(zmq.SUB) frontend.bind('inproc://drivers') frontend.setsockopt(zmq.SUBSCRIBE, '') backend = context.socket(zmq.PUB) - backend.bind(conf.probes_endpoint) + backend.bind(cfg.CONF.probes_endpoint) thread.start_new_thread(zmq.device, (zmq.FORWARDER, frontend, backend)) def signal_handler(signum, frame): diff --git a/kwapi/plugins/api/app.py b/kwapi/plugins/api/app.py index 3d092ff..2a6e4c7 100644 --- a/kwapi/plugins/api/app.py +++ b/kwapi/plugins/api/app.py @@ -15,31 +15,25 @@ app_opts = [ cfg.BoolOpt('acl_enabled', required=True, ), - cfg.IntOpt('api_port', - required=True, - ), - cfg.StrOpt('api_metering_secret', - required=True, - ), ] cfg.CONF.register_opts(app_opts) -def make_app(conf): +def make_app(): """Instantiates Flask app, attaches collector database, installs acl.""" LOG.info('Starting API') - app = flask.Flask('kwapi.api') + app = flask.Flask(__name__) app.register_blueprint(v1.blueprint, url_prefix='/v1') - collector = Collector(cfg.CONF) - collector.clean(cfg.CONF, periodic=True) + collector = Collector() + collector.clean() @app.before_request def attach_config(): flask.request.database = collector.database # Install the middleware wrapper - if conf.acl_enabled: + if cfg.CONF.acl_enabled: return acl.install(app) return app diff --git a/kwapi/plugins/api/collector.py b/kwapi/plugins/api/collector.py index 67ea5ea..2e490f7 100644 --- a/kwapi/plugins/api/collector.py +++ b/kwapi/plugins/api/collector.py @@ -11,12 +11,12 @@ from kwapi.openstack.common import cfg, log LOG = log.getLogger(__name__) collector_opts = [ - cfg.MultiStrOpt('probes_endpoint', + cfg.IntOpt('cleaning_interval', required=True, ), - cfg.IntOpt('cleaning_interval', - required=True, - ), + cfg.MultiStrOpt('probes_endpoint', + required=True, + ), ] cfg.CONF.register_opts(collector_opts) @@ -42,11 +42,11 @@ class Record(dict): class Collector: """Collector gradually fills its database with received values from wattmeter drivers.""" - def __init__(self, conf): + def __init__(self): """Initializes an empty database and start listening the endpoint.""" LOG.info('Starting Collector') self.database = {} - thread = threading.Thread(target=self.listen, args=[conf]) + thread = threading.Thread(target=self.listen) thread.daemon = True thread.start() @@ -66,41 +66,35 @@ class Collector: else: return False - def clean(self, conf, periodic): + def clean(self): """Removes probes from database if they didn't send new values over the last period (seconds). If periodic, this method is executed automatically after the timeout interval. """ LOG.info('Cleaning collector') - # Cleaning + # Cleaning for probe in self.database.keys(): - if time.time() - self.database[probe]['timestamp'] > conf.cleaning_interval: + if time.time() - self.database[probe]['timestamp'] > cfg.CONF.cleaning_interval: LOG.info('Removing data of probe %s' % probe) self.remove(probe) - # Cancel next execution of this function - try: - self.timer.cancel() - except AttributeError: - pass - # Schedule periodic execution of this function - if periodic: - self.timer = threading.Timer(conf.cleaning_interval, self.clean, [conf, True]) - self.timer.daemon = True - self.timer.start() + if cfg.CONF.cleaning_interval > 0: + timer = threading.Timer(cfg.CONF.cleaning_interval, self.clean) + timer.daemon = True + timer.start() - def listen(self, conf): + def listen(self): """Subscribes to ZeroMQ messages, and adds received measurements to the database. Messages are dictionaries dumped in JSON format. """ - LOG.info('Collector listenig to %s' % conf.probes_endpoint) + LOG.info('Collector listenig to %s' % cfg.CONF.probes_endpoint) - context = zmq.Context() + context = zmq.Context.instance() subscriber = context.socket(zmq.SUB) subscriber.setsockopt(zmq.SUBSCRIBE, '') - for endpoint in conf.probes_endpoint: + for endpoint in cfg.CONF.probes_endpoint: subscriber.connect(endpoint) while True: diff --git a/kwapi/plugins/rrd/__init__.py b/kwapi/plugins/rrd/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/kwapi/plugins/rrd/app.py b/kwapi/plugins/rrd/app.py new file mode 100644 index 0000000..9ad1c19 --- /dev/null +++ b/kwapi/plugins/rrd/app.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- + +"""Set up the RRD server application instance.""" + +import thread + +import flask + +from kwapi.openstack.common import cfg, log +#from collector import Collector +import rrd +import v1 + +LOG = log.getLogger(__name__) + +app_opts = [ + cfg.FloatOpt('kwh_price', + required=False, + ), + ] + +cfg.CONF.register_opts(app_opts) + +def make_app(): + """Instantiates Flask app, attaches collector database. """ + LOG.info('Starting RRD') + app = flask.Flask(__name__) + app.register_blueprint(v1.blueprint) + + thread.start_new_thread(rrd.listen, ()) + thread.start_new_thread(rrd.build_rrd_graphs, ()) + + @app.before_request + def attach_config(): + flask.request.rrd_files = rrd.rrd_files + return app diff --git a/kwapi/plugins/rrd/rrd.py b/kwapi/plugins/rrd/rrd.py new file mode 100644 index 0000000..9da7081 --- /dev/null +++ b/kwapi/plugins/rrd/rrd.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- + +"""Define functions to build and update rrd database and graph.""" + +import json +import os +import time +import threading +import uuid + +import rrdtool +import zmq + +from kwapi.openstack.common import cfg, log + +LOG = log.getLogger(__name__) + +rrd_opts = [ + cfg.IntOpt('rebuild_graphs_interval', + required=True, + ), + cfg.MultiStrOpt('probes_endpoint', + required=True, + ), + cfg.StrOpt('rrd_dir', + required=True, + ), + ] + +cfg.CONF.register_opts(rrd_opts) + +rrd_files = {} + +def create_rrd_file(filename): + """Creates a RRD file.""" + if not os.path.isdir(cfg.CONF.rrd_dir): + try: + os.makedirs(cfg.CONF.rrd_dir) + except OSError as exception: + if exception.errno != errno.EEXIST: + raise + if not os.path.exists(filename): + rrdtool.create(filename, + '--step', '60', + '--start', '0', + 'DS:watt:GAUGE:600:0:U', + 'RRA:AVERAGE:0.5:1:60', + ) + +def update_rrd_file(probe, watt): + """Updates the RRD file. Filename is based on probe name.""" + filename = cfg.CONF.rrd_dir + '/' + str(uuid.uuid5(uuid.NAMESPACE_DNS, str(probe))) + '.rrd' + if filename in rrd_files.values(): + updatestr = str(u'%s:%s' % (time.time(), watt )) + try: + ret = rrdtool.update(filename, updatestr) + except rrdtool.error, e: + LOG.error('Error updating RRD: %s' % e) + else: + create_rrd_file(filename) + rrd_files[probe] = filename + +def build_rrd_graphs(): + """Builds PNG graphs from RRD files. + If periodic, this method is executed automatically after the timeout interval. + + """ + LOG.info('Build PNG graphs from RRD files') + for probe, rrd_file in rrd_files.iteritems(): + png_file = os.path.dirname(rrd_file) + '/' + os.path.basename(rrd_file).replace('.rrd', '.png') + rrdtool.graph(png_file, + '--start', '-%i' % 3600, + '--end', 'now', + '--imgformat', 'PNG', + 'DEF:watt=%s:watt:AVERAGE' % rrd_file, + '--title', "Last hour", + '--vertical-label', 'Watts', + '--lower-limit', '0', + '--rigid', + 'AREA:watt#0000FF22:' + str(probe), + 'LINE:watt#0000FFAA:', + 'GPRINT:watt:LAST:Last measure\: %3.1lf W') + + if cfg.CONF.rebuild_graphs_interval > 0: + timer = threading.Timer(cfg.CONF.rebuild_graphs_interval, build_rrd_graphs) + timer.daemon = True + timer.start() + +def listen(): + """Subscribes to ZeroMQ messages, and adds received measurements to the database. + Messages are dictionaries dumped in JSON format. + + """ + LOG.info('RRD listenig to %s' % cfg.CONF.probes_endpoint) + + context = zmq.Context.instance() + subscriber = context.socket(zmq.SUB) + subscriber.setsockopt(zmq.SUBSCRIBE, '') + for endpoint in cfg.CONF.probes_endpoint: + subscriber.connect(endpoint) + + while True: + message = subscriber.recv() + measurements = json.loads(message) + if not isinstance(measurements, dict): + LOG.error('Bad message type (not a dict)') + else: + try: + update_rrd_file(measurements['probe_id'], float(measurements['w'])) + except KeyError: + LOG.error('Malformed message (missing required key)') diff --git a/kwapi/plugins/rrd/templates/index.html b/kwapi/plugins/rrd/templates/index.html new file mode 100644 index 0000000..cf68c1c --- /dev/null +++ b/kwapi/plugins/rrd/templates/index.html @@ -0,0 +1,6 @@ + +Kwapi monitoring +

Power Consumption Monitoring

+{% for probe in probes %} +Graph {{ probe }} +{% endfor %} diff --git a/kwapi/plugins/rrd/v1.py b/kwapi/plugins/rrd/v1.py new file mode 100644 index 0000000..39c2ebe --- /dev/null +++ b/kwapi/plugins/rrd/v1.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- + +"""This blueprint defines all URLs and answers.""" + +import os + +import flask +from jinja2 import TemplateNotFound + +from kwapi.openstack.common import cfg + +blueprint = flask.Blueprint('v1', __name__) + +@blueprint.route('/') +def welcome(): + """Shows specified page.""" + try: + return flask.render_template('index.html', probes=flask.request.rrd_files.keys()) + except TemplateNotFound: + flask.abort(404) + +@blueprint.route('//') +def chart(probe): + """Sends chart.""" + try: + rrd_file = flask.request.rrd_files[probe] + png_file = os.path.dirname(rrd_file) + '/' + os.path.basename(rrd_file).replace('.rrd', '.png') + return flask.send_file(png_file) + except: + flask.abort(404) diff --git a/setup.py b/setup.py index d4997ce..d3bfefd 100755 --- a/setup.py +++ b/setup.py @@ -30,12 +30,14 @@ setuptools.setup( ], packages=setuptools.find_packages(), + package_data={'kwapi.plugins.rrd': ['templates/*.html']}, scripts=['bin/kwapi-api', - 'bin/kwapi-drivers'], + 'bin/kwapi-drivers', + 'bin/kwapi-rrd'], - data_files=[('/etc/kwapi', ['etc/kwapi/api.conf', 'etc/kwapi/drivers.conf'])], + data_files=[('/etc/kwapi', ['etc/kwapi/api.conf', 'etc/kwapi/drivers.conf', 'etc/kwapi/rrd.conf'])], - install_requires=['flask', 'pyserial', 'python-keystoneclient', 'pyzmq'] + install_requires=['flask', 'pyserial', 'python-keystoneclient', 'pyzmq', 'py-rrdtool'] )