RRD plugin.

Remove "conf" arguments (use cfg.CONF instead).
This commit is contained in:
François Rossigneux 2012-12-12 16:24:41 +01:00
parent 260a1134d7
commit 1585f351e0
14 changed files with 293 additions and 52 deletions

16
AUTHORS Normal file
View File

@ -0,0 +1,16 @@
PROJECT COORDINATOR:
François Rossigneux <francois.rossigneux@inria.fr>
MAIN DEVELOPERS:
François Rossigneux <francois.rossigneux@inria.fr>
CONTRIBUTORS:
Jean-Patrick Gelas <jpgelas@ens-lyon.fr>
Laurent Lefèvre <laurent.lefevre@inria.fr>
PLUGINS MAINTAINERS:
RRD graph builder : Jean-Patrick Gelas <jpgelas@ens-lyon.fr>
WTT file logging : Jean-Patrick Gelas <jpgelas@ens-lyon.fr>
FORMER MAINTAINER:

View File

@ -4,11 +4,19 @@
import sys import sys
from kwapi.plugins.api import app from kwapi.plugins.api import app
from kwapi.openstack.common import log, cfg from kwapi.openstack.common import cfg, log
app_opts = [
cfg.IntOpt('api_port',
required=True,
),
]
cfg.CONF.register_opts(app_opts)
if __name__ == '__main__': if __name__ == '__main__':
cfg.CONF(sys.argv[1:], project='kwapi', default_config_files=['/etc/kwapi/api.conf']) cfg.CONF(sys.argv[1:], project='kwapi', default_config_files=['/etc/kwapi/api.conf'])
log.setup('kwapi') log.setup('kwapi')
root = app.make_app(cfg.CONF) root = app.make_app()
root.run(host='0.0.0.0', port=cfg.CONF.api_port) root.run(host='0.0.0.0', port=cfg.CONF.api_port)

View File

@ -11,9 +11,9 @@ if __name__ == "__main__":
cfg.CONF(sys.argv[1:], project='kwapi', default_config_files=['/etc/kwapi/drivers.conf']) cfg.CONF(sys.argv[1:], project='kwapi', default_config_files=['/etc/kwapi/drivers.conf'])
log.setup('kwapi') log.setup('kwapi')
driver_manager.start_zmq_server(cfg.CONF) driver_manager.start_zmq_server()
driver_manager.load_all_drivers(cfg.CONF) driver_manager.load_all_drivers()
driver_manager.check_drivers_alive(cfg.CONF) driver_manager.check_drivers_alive()
signal.signal(signal.SIGTERM, driver_manager.signal_handler) signal.signal(signal.SIGTERM, driver_manager.signal_handler)
try: try:

22
bin/kwapi-rrd Executable file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
from kwapi.plugins.rrd import app
from kwapi.openstack.common import cfg, log
app_opts = [
cfg.IntOpt('rrd_port',
required=True,
),
]
cfg.CONF.register_opts(app_opts)
if __name__ == '__main__':
cfg.CONF(sys.argv[1:], project='kwapi', default_config_files=['/etc/kwapi/rrd.conf'])
log.setup('kwapi')
root = app.make_app()
root.run(host='0.0.0.0', port=cfg.CONF.rrd_port)

20
etc/kwapi/rrd.conf Normal file
View File

@ -0,0 +1,20 @@
# Kwapi config file
[DEFAULT]
# Communication
rrd_port = 8080
probes_endpoint = ipc:///tmp/kwapi
# Storage
rrd_dir = /tmp/kwapi-rrd
# Timers
cleaning_interval = 300
rebuild_graphs_interval = 5
# Billing
kwh_price = 0.125
# Log files
log_file = /tmp/kwapi-api.log

View File

@ -15,10 +15,10 @@ from kwapi.openstack.common import cfg, log
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
driver_manager_opts = [ driver_manager_opts = [
cfg.StrOpt('probes_endpoint', cfg.IntOpt('check_drivers_interval',
required=True, required=True,
), ),
cfg.IntOpt('check_drivers_interval', cfg.StrOpt('probes_endpoint',
required=True, required=True,
), ),
] ]
@ -27,8 +27,8 @@ cfg.CONF.register_opts(driver_manager_opts)
threads = [] threads = []
def load_all_drivers(conf): def load_all_drivers():
"""Loads all drivers from config.""" """Loads all drivers from config file."""
parser = cfg.ConfigParser(cfg.CONF.config_file[0], {}) parser = cfg.ConfigParser(cfg.CONF.config_file[0], {})
parser.parse() parser.parse()
for section, entries in parser.sections.iteritems(): for section, entries in parser.sections.iteritems():
@ -56,7 +56,7 @@ def load_driver(class_name, probe_ids, kwargs):
probeObject.start() probeObject.start()
return probeObject return probeObject
def check_drivers_alive(conf): def check_drivers_alive():
"""Checks all drivers and reloads those that crashed. """Checks all drivers and reloads those that crashed.
This method is executed automatically at the given interval. This method is executed automatically at the given interval.
@ -68,19 +68,21 @@ def check_drivers_alive(conf):
new_thread = load_driver(thread.__class__.__name__, thread.probe_ids, thread.kwargs) new_thread = load_driver(thread.__class__.__name__, thread.probe_ids, thread.kwargs)
if new_thread is not None: if new_thread is not None:
threads[index] = new_thread threads[index] = new_thread
if conf.check_drivers_interval > 0:
timer = Timer(conf.check_drivers_interval, check_drivers_alive, [conf]) # Schedule periodic execution of this function
if cfg.CONF.check_drivers_interval > 0:
timer = Timer(cfg.CONF.check_drivers_interval, check_drivers_alive)
timer.daemon = True timer.daemon = True
timer.start() timer.start()
def start_zmq_server(conf): def start_zmq_server():
"""Forwards probe values to the probes_endpoint defined in conf.""" """Forwards probe values to the probes_endpoint."""
context = zmq.Context.instance() context = zmq.Context.instance()
frontend = context.socket(zmq.SUB) frontend = context.socket(zmq.SUB)
frontend.bind('inproc://drivers') frontend.bind('inproc://drivers')
frontend.setsockopt(zmq.SUBSCRIBE, '') frontend.setsockopt(zmq.SUBSCRIBE, '')
backend = context.socket(zmq.PUB) backend = context.socket(zmq.PUB)
backend.bind(conf.probes_endpoint) backend.bind(cfg.CONF.probes_endpoint)
thread.start_new_thread(zmq.device, (zmq.FORWARDER, frontend, backend)) thread.start_new_thread(zmq.device, (zmq.FORWARDER, frontend, backend))
def signal_handler(signum, frame): def signal_handler(signum, frame):

View File

@ -15,31 +15,25 @@ app_opts = [
cfg.BoolOpt('acl_enabled', cfg.BoolOpt('acl_enabled',
required=True, required=True,
), ),
cfg.IntOpt('api_port',
required=True,
),
cfg.StrOpt('api_metering_secret',
required=True,
),
] ]
cfg.CONF.register_opts(app_opts) cfg.CONF.register_opts(app_opts)
def make_app(conf): def make_app():
"""Instantiates Flask app, attaches collector database, installs acl.""" """Instantiates Flask app, attaches collector database, installs acl."""
LOG.info('Starting API') LOG.info('Starting API')
app = flask.Flask('kwapi.api') app = flask.Flask(__name__)
app.register_blueprint(v1.blueprint, url_prefix='/v1') app.register_blueprint(v1.blueprint, url_prefix='/v1')
collector = Collector(cfg.CONF) collector = Collector()
collector.clean(cfg.CONF, periodic=True) collector.clean()
@app.before_request @app.before_request
def attach_config(): def attach_config():
flask.request.database = collector.database flask.request.database = collector.database
# Install the middleware wrapper # Install the middleware wrapper
if conf.acl_enabled: if cfg.CONF.acl_enabled:
return acl.install(app) return acl.install(app)
return app return app

View File

@ -11,12 +11,12 @@ from kwapi.openstack.common import cfg, log
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
collector_opts = [ collector_opts = [
cfg.MultiStrOpt('probes_endpoint', cfg.IntOpt('cleaning_interval',
required=True, required=True,
), ),
cfg.IntOpt('cleaning_interval', cfg.MultiStrOpt('probes_endpoint',
required=True, required=True,
), ),
] ]
cfg.CONF.register_opts(collector_opts) cfg.CONF.register_opts(collector_opts)
@ -42,11 +42,11 @@ class Record(dict):
class Collector: class Collector:
"""Collector gradually fills its database with received values from wattmeter drivers.""" """Collector gradually fills its database with received values from wattmeter drivers."""
def __init__(self, conf): def __init__(self):
"""Initializes an empty database and start listening the endpoint.""" """Initializes an empty database and start listening the endpoint."""
LOG.info('Starting Collector') LOG.info('Starting Collector')
self.database = {} self.database = {}
thread = threading.Thread(target=self.listen, args=[conf]) thread = threading.Thread(target=self.listen)
thread.daemon = True thread.daemon = True
thread.start() thread.start()
@ -66,41 +66,35 @@ class Collector:
else: else:
return False return False
def clean(self, conf, periodic): def clean(self):
"""Removes probes from database if they didn't send new values over the last period (seconds). """Removes probes from database if they didn't send new values over the last period (seconds).
If periodic, this method is executed automatically after the timeout interval. If periodic, this method is executed automatically after the timeout interval.
""" """
LOG.info('Cleaning collector') LOG.info('Cleaning collector')
# Cleaning # Cleaning
for probe in self.database.keys(): for probe in self.database.keys():
if time.time() - self.database[probe]['timestamp'] > conf.cleaning_interval: if time.time() - self.database[probe]['timestamp'] > cfg.CONF.cleaning_interval:
LOG.info('Removing data of probe %s' % probe) LOG.info('Removing data of probe %s' % probe)
self.remove(probe) self.remove(probe)
# Cancel next execution of this function
try:
self.timer.cancel()
except AttributeError:
pass
# Schedule periodic execution of this function # Schedule periodic execution of this function
if periodic: if cfg.CONF.cleaning_interval > 0:
self.timer = threading.Timer(conf.cleaning_interval, self.clean, [conf, True]) timer = threading.Timer(cfg.CONF.cleaning_interval, self.clean)
self.timer.daemon = True timer.daemon = True
self.timer.start() timer.start()
def listen(self, conf): def listen(self):
"""Subscribes to ZeroMQ messages, and adds received measurements to the database. """Subscribes to ZeroMQ messages, and adds received measurements to the database.
Messages are dictionaries dumped in JSON format. Messages are dictionaries dumped in JSON format.
""" """
LOG.info('Collector listenig to %s' % conf.probes_endpoint) LOG.info('Collector listenig to %s' % cfg.CONF.probes_endpoint)
context = zmq.Context() context = zmq.Context.instance()
subscriber = context.socket(zmq.SUB) subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.SUBSCRIBE, '') subscriber.setsockopt(zmq.SUBSCRIBE, '')
for endpoint in conf.probes_endpoint: for endpoint in cfg.CONF.probes_endpoint:
subscriber.connect(endpoint) subscriber.connect(endpoint)
while True: while True:

View File

36
kwapi/plugins/rrd/app.py Normal file
View File

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
"""Set up the RRD server application instance."""
import thread
import flask
from kwapi.openstack.common import cfg, log
#from collector import Collector
import rrd
import v1
LOG = log.getLogger(__name__)
app_opts = [
cfg.FloatOpt('kwh_price',
required=False,
),
]
cfg.CONF.register_opts(app_opts)
def make_app():
"""Instantiates Flask app, attaches collector database. """
LOG.info('Starting RRD')
app = flask.Flask(__name__)
app.register_blueprint(v1.blueprint)
thread.start_new_thread(rrd.listen, ())
thread.start_new_thread(rrd.build_rrd_graphs, ())
@app.before_request
def attach_config():
flask.request.rrd_files = rrd.rrd_files
return app

111
kwapi/plugins/rrd/rrd.py Normal file
View File

@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
"""Define functions to build and update rrd database and graph."""
import json
import os
import time
import threading
import uuid
import rrdtool
import zmq
from kwapi.openstack.common import cfg, log
LOG = log.getLogger(__name__)
rrd_opts = [
cfg.IntOpt('rebuild_graphs_interval',
required=True,
),
cfg.MultiStrOpt('probes_endpoint',
required=True,
),
cfg.StrOpt('rrd_dir',
required=True,
),
]
cfg.CONF.register_opts(rrd_opts)
rrd_files = {}
def create_rrd_file(filename):
"""Creates a RRD file."""
if not os.path.isdir(cfg.CONF.rrd_dir):
try:
os.makedirs(cfg.CONF.rrd_dir)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
if not os.path.exists(filename):
rrdtool.create(filename,
'--step', '60',
'--start', '0',
'DS:watt:GAUGE:600:0:U',
'RRA:AVERAGE:0.5:1:60',
)
def update_rrd_file(probe, watt):
"""Updates the RRD file. Filename is based on probe name."""
filename = cfg.CONF.rrd_dir + '/' + str(uuid.uuid5(uuid.NAMESPACE_DNS, str(probe))) + '.rrd'
if filename in rrd_files.values():
updatestr = str(u'%s:%s' % (time.time(), watt ))
try:
ret = rrdtool.update(filename, updatestr)
except rrdtool.error, e:
LOG.error('Error updating RRD: %s' % e)
else:
create_rrd_file(filename)
rrd_files[probe] = filename
def build_rrd_graphs():
"""Builds PNG graphs from RRD files.
If periodic, this method is executed automatically after the timeout interval.
"""
LOG.info('Build PNG graphs from RRD files')
for probe, rrd_file in rrd_files.iteritems():
png_file = os.path.dirname(rrd_file) + '/' + os.path.basename(rrd_file).replace('.rrd', '.png')
rrdtool.graph(png_file,
'--start', '-%i' % 3600,
'--end', 'now',
'--imgformat', 'PNG',
'DEF:watt=%s:watt:AVERAGE' % rrd_file,
'--title', "Last hour",
'--vertical-label', 'Watts',
'--lower-limit', '0',
'--rigid',
'AREA:watt#0000FF22:' + str(probe),
'LINE:watt#0000FFAA:',
'GPRINT:watt:LAST:Last measure\: %3.1lf W')
if cfg.CONF.rebuild_graphs_interval > 0:
timer = threading.Timer(cfg.CONF.rebuild_graphs_interval, build_rrd_graphs)
timer.daemon = True
timer.start()
def listen():
"""Subscribes to ZeroMQ messages, and adds received measurements to the database.
Messages are dictionaries dumped in JSON format.
"""
LOG.info('RRD listenig to %s' % cfg.CONF.probes_endpoint)
context = zmq.Context.instance()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.SUBSCRIBE, '')
for endpoint in cfg.CONF.probes_endpoint:
subscriber.connect(endpoint)
while True:
message = subscriber.recv()
measurements = json.loads(message)
if not isinstance(measurements, dict):
LOG.error('Bad message type (not a dict)')
else:
try:
update_rrd_file(measurements['probe_id'], float(measurements['w']))
except KeyError:
LOG.error('Malformed message (missing required key)')

View File

@ -0,0 +1,6 @@
<!doctype html>
<title>Kwapi monitoring</title>
<h1>Power Consumption Monitoring</h1>
{% for probe in probes %}
<img src="{{ probe }}" alt="Graph {{ probe }}"/>
{% endfor %}

30
kwapi/plugins/rrd/v1.py Normal file
View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
"""This blueprint defines all URLs and answers."""
import os
import flask
from jinja2 import TemplateNotFound
from kwapi.openstack.common import cfg
blueprint = flask.Blueprint('v1', __name__)
@blueprint.route('/')
def welcome():
"""Shows specified page."""
try:
return flask.render_template('index.html', probes=flask.request.rrd_files.keys())
except TemplateNotFound:
flask.abort(404)
@blueprint.route('/<probe>/')
def chart(probe):
"""Sends chart."""
try:
rrd_file = flask.request.rrd_files[probe]
png_file = os.path.dirname(rrd_file) + '/' + os.path.basename(rrd_file).replace('.rrd', '.png')
return flask.send_file(png_file)
except:
flask.abort(404)

View File

@ -30,12 +30,14 @@ setuptools.setup(
], ],
packages=setuptools.find_packages(), packages=setuptools.find_packages(),
package_data={'kwapi.plugins.rrd': ['templates/*.html']},
scripts=['bin/kwapi-api', scripts=['bin/kwapi-api',
'bin/kwapi-drivers'], 'bin/kwapi-drivers',
'bin/kwapi-rrd'],
data_files=[('/etc/kwapi', ['etc/kwapi/api.conf', 'etc/kwapi/drivers.conf'])], data_files=[('/etc/kwapi', ['etc/kwapi/api.conf', 'etc/kwapi/drivers.conf', 'etc/kwapi/rrd.conf'])],
install_requires=['flask', 'pyserial', 'python-keystoneclient', 'pyzmq'] install_requires=['flask', 'pyserial', 'python-keystoneclient', 'pyzmq', 'py-rrdtool']
) )