Listening refactoring in plugins.

Change-Id: I9e7aeda9637f6139fed7039e7111c44e8e5783dc
This commit is contained in:
François Rossigneux 2014-02-13 16:23:33 +01:00
parent 825f5d9cce
commit 7c25fab119
5 changed files with 77 additions and 94 deletions

View File

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
#
# Author: François Rossigneux <francois.rossigneux@inria.fr>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from oslo.config import cfg
import zmq
from kwapi import security
from kwapi.openstack.common import log
LOG = log.getLogger(__name__)
def listen(function):
"""Subscribes to ZeroMQ messages, and adds received measurements to the
database. Messages are dictionaries dumped in JSON format.
"""
LOG.info('Listening to %s' % cfg.CONF.probes_endpoint)
context = zmq.Context.instance()
subscriber = context.socket(zmq.SUB)
if not cfg.CONF.watch_probe:
subscriber.setsockopt(zmq.SUBSCRIBE, '')
else:
for probe in cfg.CONF.watch_probe:
subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.')
for endpoint in cfg.CONF.probes_endpoint:
subscriber.connect(endpoint)
while True:
[probe, message] = subscriber.recv_multipart()
measurements = json.loads(message)
if not isinstance(measurements, dict):
LOG.error('Bad message type (not a dict)')
elif cfg.CONF.signature_checking and \
not security.verify_signature(measurements,
cfg.CONF.driver_metering_secret):
LOG.error('Bad message signature')
else:
try:
probe = measurements['probe_id'].encode('utf-8')
function(probe, float(measurements['w']))
except (TypeError, ValueError):
LOG.error('Malformed power consumption data: %s'
% measurements['w'])
except KeyError:
LOG.error('Malformed message (missing required key)')

View File

@ -17,11 +17,13 @@
"""Set up the API server application instance."""
import sys
import thread
import flask
from oslo.config import cfg
from kwapi.openstack.common import log
from kwapi.plugins import listen
import acl
from collector import Collector
import v1
@ -49,6 +51,8 @@ def make_app():
collector = Collector()
collector.clean()
thread.start_new_thread(listen, (collector.add,))
@app.before_request
def attach_config():
flask.request.collector = collector

View File

@ -14,15 +14,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import threading
import time
from oslo.config import cfg
import zmq
from kwapi.openstack.common import log
from kwapi import security
LOG = log.getLogger(__name__)
@ -81,9 +78,6 @@ class Collector:
LOG.info('Starting Collector')
self.database = {}
self.lock = threading.Lock()
thread = threading.Thread(target=self.listen)
thread.daemon = True
thread.start()
def add(self, probe, watts):
"""Creates (or updates) consumption data for this probe."""
@ -125,40 +119,3 @@ class Collector:
timer = threading.Timer(cfg.CONF.cleaning_interval, self.clean)
timer.daemon = True
timer.start()
def listen(self):
"""Subscribes to ZeroMQ messages, and adds received measurements to the
database. Messages are dictionaries dumped in JSON format.
"""
LOG.info('API listening to %s' % cfg.CONF.probes_endpoint)
context = zmq.Context.instance()
subscriber = context.socket(zmq.SUB)
if not cfg.CONF.watch_probe:
subscriber.setsockopt(zmq.SUBSCRIBE, '')
else:
for probe in cfg.CONF.watch_probe:
subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.')
for endpoint in cfg.CONF.probes_endpoint:
subscriber.connect(endpoint)
while True:
[probe, message] = subscriber.recv_multipart()
measurements = json.loads(message)
if not isinstance(measurements, dict):
LOG.error('Bad message type (not a dict)')
elif cfg.CONF.signature_checking and \
not security.verify_signature(
measurements,
cfg.CONF.driver_metering_secret):
LOG.error('Bad message signature')
else:
try:
self.add(measurements['probe_id'],
float(measurements['w']))
except (TypeError, ValueError):
LOG.error('Malformed power consumption data: %s'
% measurements['w'])
except KeyError:
LOG.error('Malformed message (missing required key)')

View File

@ -23,6 +23,7 @@ import flask
from oslo.config import cfg
from kwapi.openstack.common import log
from kwapi.plugins import listen
import rrd
import v1
@ -43,7 +44,8 @@ def make_app():
app = flask.Flask(__name__)
app.register_blueprint(v1.blueprint)
thread.start_new_thread(rrd.listen, ())
thread.start_new_thread(listen, (rrd.update_rrd,))
rrd.create_dirs()
@app.before_request
def attach_config():

View File

@ -19,7 +19,6 @@
import collections
import colorsys
import errno
import json
import os
from threading import Lock
import struct
@ -28,10 +27,8 @@ import uuid
from oslo.config import cfg
import rrdtool
import zmq
from kwapi.openstack.common import log
from kwapi import security
LOG = log.getLogger(__name__)
@ -165,6 +162,14 @@ def create_rrd_file(filename):
def update_rrd(probe, watts):
"""Updates RRD file associated with this probe."""
if not probe in probes:
color_seq = color_generator(len(probes)+1)
lock.acquire()
probes.add(probe)
for probe in sorted(probes, reverse=True):
probe_colors[probe] = color_seq.next()
lock.release()
filename = cfg.CONF.rrd_dir + '/' + \
str(uuid.uuid5(uuid.NAMESPACE_DNS, str(probe))) + '.rrd'
if not os.path.isfile(filename):
@ -283,50 +288,3 @@ def build_graph(scale, probe=None):
else:
LOG.info('Retrieve PNG summary graph from cache')
return png_file
def listen():
"""Subscribes to ZeroMQ messages, and adds received measurements to the
database. Messages are dictionaries dumped in JSON format.
"""
LOG.info('RRD listening to %s' % cfg.CONF.probes_endpoint)
create_dirs()
context = zmq.Context.instance()
subscriber = context.socket(zmq.SUB)
if not cfg.CONF.watch_probe:
subscriber.setsockopt(zmq.SUBSCRIBE, '')
else:
for probe in cfg.CONF.watch_probe:
subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.')
for endpoint in cfg.CONF.probes_endpoint:
subscriber.connect(endpoint)
while True:
[probe, message] = subscriber.recv_multipart()
measurements = json.loads(message)
if not isinstance(measurements, dict):
LOG.error('Bad message type (not a dict)')
elif cfg.CONF.signature_checking and \
not security.verify_signature(measurements,
cfg.CONF.driver_metering_secret):
LOG.error('Bad message signature')
else:
try:
probe = measurements['probe_id'].encode('utf-8')
update_rrd(probe, float(measurements['w']))
except (TypeError, ValueError):
LOG.error('Malformed power consumption data: %s'
% measurements['w'])
except KeyError:
LOG.error('Malformed message (missing required key)')
else:
if not probe in probes:
color_seq = color_generator(len(probes)+1)
lock.acquire()
probes.add(probe)
for probe in sorted(probes, reverse=True):
probe_colors[probe] = color_seq.next()
lock.release()