Listening refactoring in plugins.
Change-Id: I9e7aeda9637f6139fed7039e7111c44e8e5783dc
This commit is contained in:
parent
825f5d9cce
commit
7c25fab119
|
@ -0,0 +1,62 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Author: François Rossigneux <francois.rossigneux@inria.fr>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
|
||||
from oslo.config import cfg
|
||||
import zmq
|
||||
|
||||
from kwapi import security
|
||||
from kwapi.openstack.common import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def listen(function):
|
||||
"""Subscribes to ZeroMQ messages, and adds received measurements to the
|
||||
database. Messages are dictionaries dumped in JSON format.
|
||||
|
||||
"""
|
||||
LOG.info('Listening to %s' % cfg.CONF.probes_endpoint)
|
||||
|
||||
context = zmq.Context.instance()
|
||||
subscriber = context.socket(zmq.SUB)
|
||||
if not cfg.CONF.watch_probe:
|
||||
subscriber.setsockopt(zmq.SUBSCRIBE, '')
|
||||
else:
|
||||
for probe in cfg.CONF.watch_probe:
|
||||
subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.')
|
||||
for endpoint in cfg.CONF.probes_endpoint:
|
||||
subscriber.connect(endpoint)
|
||||
|
||||
while True:
|
||||
[probe, message] = subscriber.recv_multipart()
|
||||
measurements = json.loads(message)
|
||||
if not isinstance(measurements, dict):
|
||||
LOG.error('Bad message type (not a dict)')
|
||||
elif cfg.CONF.signature_checking and \
|
||||
not security.verify_signature(measurements,
|
||||
cfg.CONF.driver_metering_secret):
|
||||
LOG.error('Bad message signature')
|
||||
else:
|
||||
try:
|
||||
probe = measurements['probe_id'].encode('utf-8')
|
||||
function(probe, float(measurements['w']))
|
||||
except (TypeError, ValueError):
|
||||
LOG.error('Malformed power consumption data: %s'
|
||||
% measurements['w'])
|
||||
except KeyError:
|
||||
LOG.error('Malformed message (missing required key)')
|
|
@ -17,11 +17,13 @@
|
|||
"""Set up the API server application instance."""
|
||||
|
||||
import sys
|
||||
import thread
|
||||
|
||||
import flask
|
||||
from oslo.config import cfg
|
||||
|
||||
from kwapi.openstack.common import log
|
||||
from kwapi.plugins import listen
|
||||
import acl
|
||||
from collector import Collector
|
||||
import v1
|
||||
|
@ -49,6 +51,8 @@ def make_app():
|
|||
collector = Collector()
|
||||
collector.clean()
|
||||
|
||||
thread.start_new_thread(listen, (collector.add,))
|
||||
|
||||
@app.before_request
|
||||
def attach_config():
|
||||
flask.request.collector = collector
|
||||
|
|
|
@ -14,15 +14,12 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo.config import cfg
|
||||
import zmq
|
||||
|
||||
from kwapi.openstack.common import log
|
||||
from kwapi import security
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
@ -81,9 +78,6 @@ class Collector:
|
|||
LOG.info('Starting Collector')
|
||||
self.database = {}
|
||||
self.lock = threading.Lock()
|
||||
thread = threading.Thread(target=self.listen)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
def add(self, probe, watts):
|
||||
"""Creates (or updates) consumption data for this probe."""
|
||||
|
@ -125,40 +119,3 @@ class Collector:
|
|||
timer = threading.Timer(cfg.CONF.cleaning_interval, self.clean)
|
||||
timer.daemon = True
|
||||
timer.start()
|
||||
|
||||
def listen(self):
|
||||
"""Subscribes to ZeroMQ messages, and adds received measurements to the
|
||||
database. Messages are dictionaries dumped in JSON format.
|
||||
|
||||
"""
|
||||
LOG.info('API listening to %s' % cfg.CONF.probes_endpoint)
|
||||
|
||||
context = zmq.Context.instance()
|
||||
subscriber = context.socket(zmq.SUB)
|
||||
if not cfg.CONF.watch_probe:
|
||||
subscriber.setsockopt(zmq.SUBSCRIBE, '')
|
||||
else:
|
||||
for probe in cfg.CONF.watch_probe:
|
||||
subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.')
|
||||
for endpoint in cfg.CONF.probes_endpoint:
|
||||
subscriber.connect(endpoint)
|
||||
|
||||
while True:
|
||||
[probe, message] = subscriber.recv_multipart()
|
||||
measurements = json.loads(message)
|
||||
if not isinstance(measurements, dict):
|
||||
LOG.error('Bad message type (not a dict)')
|
||||
elif cfg.CONF.signature_checking and \
|
||||
not security.verify_signature(
|
||||
measurements,
|
||||
cfg.CONF.driver_metering_secret):
|
||||
LOG.error('Bad message signature')
|
||||
else:
|
||||
try:
|
||||
self.add(measurements['probe_id'],
|
||||
float(measurements['w']))
|
||||
except (TypeError, ValueError):
|
||||
LOG.error('Malformed power consumption data: %s'
|
||||
% measurements['w'])
|
||||
except KeyError:
|
||||
LOG.error('Malformed message (missing required key)')
|
||||
|
|
|
@ -23,6 +23,7 @@ import flask
|
|||
from oslo.config import cfg
|
||||
|
||||
from kwapi.openstack.common import log
|
||||
from kwapi.plugins import listen
|
||||
import rrd
|
||||
import v1
|
||||
|
||||
|
@ -43,7 +44,8 @@ def make_app():
|
|||
app = flask.Flask(__name__)
|
||||
app.register_blueprint(v1.blueprint)
|
||||
|
||||
thread.start_new_thread(rrd.listen, ())
|
||||
thread.start_new_thread(listen, (rrd.update_rrd,))
|
||||
rrd.create_dirs()
|
||||
|
||||
@app.before_request
|
||||
def attach_config():
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
import collections
|
||||
import colorsys
|
||||
import errno
|
||||
import json
|
||||
import os
|
||||
from threading import Lock
|
||||
import struct
|
||||
|
@ -28,10 +27,8 @@ import uuid
|
|||
|
||||
from oslo.config import cfg
|
||||
import rrdtool
|
||||
import zmq
|
||||
|
||||
from kwapi.openstack.common import log
|
||||
from kwapi import security
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
@ -165,6 +162,14 @@ def create_rrd_file(filename):
|
|||
|
||||
def update_rrd(probe, watts):
|
||||
"""Updates RRD file associated with this probe."""
|
||||
if not probe in probes:
|
||||
color_seq = color_generator(len(probes)+1)
|
||||
lock.acquire()
|
||||
probes.add(probe)
|
||||
for probe in sorted(probes, reverse=True):
|
||||
probe_colors[probe] = color_seq.next()
|
||||
lock.release()
|
||||
|
||||
filename = cfg.CONF.rrd_dir + '/' + \
|
||||
str(uuid.uuid5(uuid.NAMESPACE_DNS, str(probe))) + '.rrd'
|
||||
if not os.path.isfile(filename):
|
||||
|
@ -283,50 +288,3 @@ def build_graph(scale, probe=None):
|
|||
else:
|
||||
LOG.info('Retrieve PNG summary graph from cache')
|
||||
return png_file
|
||||
|
||||
|
||||
def listen():
|
||||
"""Subscribes to ZeroMQ messages, and adds received measurements to the
|
||||
database. Messages are dictionaries dumped in JSON format.
|
||||
|
||||
"""
|
||||
LOG.info('RRD listening to %s' % cfg.CONF.probes_endpoint)
|
||||
|
||||
create_dirs()
|
||||
|
||||
context = zmq.Context.instance()
|
||||
subscriber = context.socket(zmq.SUB)
|
||||
if not cfg.CONF.watch_probe:
|
||||
subscriber.setsockopt(zmq.SUBSCRIBE, '')
|
||||
else:
|
||||
for probe in cfg.CONF.watch_probe:
|
||||
subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.')
|
||||
for endpoint in cfg.CONF.probes_endpoint:
|
||||
subscriber.connect(endpoint)
|
||||
|
||||
while True:
|
||||
[probe, message] = subscriber.recv_multipart()
|
||||
measurements = json.loads(message)
|
||||
if not isinstance(measurements, dict):
|
||||
LOG.error('Bad message type (not a dict)')
|
||||
elif cfg.CONF.signature_checking and \
|
||||
not security.verify_signature(measurements,
|
||||
cfg.CONF.driver_metering_secret):
|
||||
LOG.error('Bad message signature')
|
||||
else:
|
||||
try:
|
||||
probe = measurements['probe_id'].encode('utf-8')
|
||||
update_rrd(probe, float(measurements['w']))
|
||||
except (TypeError, ValueError):
|
||||
LOG.error('Malformed power consumption data: %s'
|
||||
% measurements['w'])
|
||||
except KeyError:
|
||||
LOG.error('Malformed message (missing required key)')
|
||||
else:
|
||||
if not probe in probes:
|
||||
color_seq = color_generator(len(probes)+1)
|
||||
lock.acquire()
|
||||
probes.add(probe)
|
||||
for probe in sorted(probes, reverse=True):
|
||||
probe_colors[probe] = color_seq.next()
|
||||
lock.release()
|
||||
|
|
Loading…
Reference in New Issue