ZeroMQ max sockets and mutexes.

Increase the ZeroMQ max sockets limit.
Add mutexes around critical sections.

Change-Id: Ic898cd58b3978eb6cf20bfded5683e9e835fa512
This commit is contained in:
François Rossigneux 2013-06-12 14:59:21 +02:00
parent e384035d0b
commit 863085890f
6 changed files with 57 additions and 27 deletions

View File

@ -63,6 +63,8 @@ class Driver(Thread):
"""Asks the driver thread to terminate."""
self.stop_request.set()
super(Driver, self).join()
LOG.info('Unloading driver %s(probe_ids=%s, kwargs=%s)'
% (self.__class__.__name__, self.probe_ids, self.kwargs))
def stop_request_pending(self):
"""Returns true if a stop request is pending."""

View File

@ -19,7 +19,7 @@
import ast
import signal
import thread
from threading import Timer
from threading import Lock, Timer
from oslo.config import cfg
import zmq
@ -40,6 +40,7 @@ driver_manager_opts = [
cfg.CONF.register_opts(driver_manager_opts)
threads = []
lock = Lock()
def load_all_drivers():
@ -53,9 +54,11 @@ def load_all_drivers():
kwargs = {}
if 'parameters' in entries.keys():
kwargs = ast.literal_eval(entries['parameters'][0])
lock.acquire()
driver_thread = load_driver(class_name, probe_ids, kwargs)
if driver_thread is not None:
threads.append(driver_thread)
lock.release()
def load_driver(class_name, probe_ids, kwargs):
@ -81,29 +84,32 @@ def check_drivers_alive():
This method is executed automatically at the given interval.
"""
LOG.info('Checks driver threads')
for index, driver_thread in enumerate(threads):
if not driver_thread.is_alive():
LOG.warning('%s(probe_ids=%s, kwargs=%s) is crashed'
% (driver_thread.__class__.__name__,
driver_thread.probe_ids, driver_thread.kwargs))
new_thread = load_driver(driver_thread.__class__.__name__,
driver_thread.probe_ids,
driver_thread.kwargs
)
if new_thread is not None:
threads[index] = new_thread
# Schedule periodic execution of this function
if cfg.CONF.check_drivers_interval > 0:
timer = Timer(cfg.CONF.check_drivers_interval, check_drivers_alive)
timer.daemon = True
timer.start()
if lock.acquire(False):
LOG.info('Checks driver threads')
for index, driver_thread in enumerate(threads):
if not driver_thread.is_alive():
LOG.warning('%s(probe_ids=%s, kwargs=%s) is crashed'
% (driver_thread.__class__.__name__,
driver_thread.probe_ids, driver_thread.kwargs))
new_thread = load_driver(driver_thread.__class__.__name__,
driver_thread.probe_ids,
driver_thread.kwargs
)
if new_thread is not None:
threads[index] = new_thread
# Schedule periodic execution of this function
if cfg.CONF.check_drivers_interval > 0:
timer = Timer(cfg.CONF.check_drivers_interval,
check_drivers_alive)
timer.daemon = True
timer.start()
lock.release()
def start_zmq_server():
"""Forwards probe values to the probes_endpoint."""
context = zmq.Context.instance()
context.set(zmq.MAX_SOCKETS, 100000)
frontend = context.socket(zmq.SUB)
frontend.bind('inproc://drivers')
frontend.setsockopt(zmq.SUBSCRIBE, '')
@ -120,5 +126,6 @@ def signal_handler(signum, frame):
def terminate():
"""Terminates driver threads."""
lock.acquire()
for driver in threads:
thread.start_new_thread(driver.join, ())

View File

@ -46,7 +46,13 @@ def make_app():
@app.before_request
def attach_config():
flask.request.database = collector.database
flask.request.collector = collector
collector.lock.acquire()
@app.after_request
def unlock(response):
collector.lock.release()
return response
# Install the middleware wrapper
if cfg.CONF.acl_enabled:

View File

@ -80,25 +80,31 @@ class Collector:
"""Initializes an empty database and start listening the endpoint."""
LOG.info('Starting Collector')
self.database = {}
self.lock = threading.Lock()
thread = threading.Thread(target=self.listen)
thread.daemon = True
thread.start()
def add(self, probe, watts):
"""Creates (or updates) consumption data for this probe."""
self.lock.acquire()
if probe in self.database.keys():
self.database[probe].add(watts)
else:
record = Record(timestamp=time.time(), kwh=0.0, watts=watts)
self.database[probe] = record
self.lock.release()
def remove(self, probe):
"""Removes this probe from database."""
if probe in self.database.keys():
self.lock.acquire()
try:
del self.database[probe]
return True
else:
except KeyError:
return False
finally:
self.lock.release()
def clean(self):
"""Removes probes from database if they didn't send new values over
@ -108,11 +114,13 @@ class Collector:
"""
LOG.info('Cleaning collector')
# Cleaning
self.lock.acquire()
for probe in self.database.keys():
if time.time() - self.database[probe]['timestamp'] > \
cfg.CONF.cleaning_interval:
LOG.info('Removing data of probe %s' % probe)
self.remove(probe)
self.lock.release()
# Schedule periodic execution of this function
if cfg.CONF.cleaning_interval > 0:

View File

@ -31,7 +31,7 @@ def welcome():
def list_probes_ids():
"""Returns all known probes IDs."""
message = {}
message['probe_ids'] = flask.request.database.keys()
message['probe_ids'] = flask.request.collector.database.keys()
return flask.jsonify(message)
@ -39,7 +39,7 @@ def list_probes_ids():
def list_probes():
"""Returns all information about all known probes."""
message = {}
message['probes'] = flask.request.database
message['probes'] = flask.request.collector.database
return flask.jsonify(message)
@ -48,7 +48,7 @@ def probe_info(probe):
"""Returns all information about this probe (id, timestamp, kWh, W)."""
message = {}
try:
message[probe] = flask.request.database[probe]
message[probe] = flask.request.collector.database[probe]
except KeyError:
flask.abort(404)
return flask.jsonify(message)
@ -59,7 +59,10 @@ def probe_value(probe, meter):
"""Returns the probe meter value."""
message = {}
try:
message[probe] = {meter: flask.request.database[probe][meter]}
message[probe] = \
{
meter: flask.request.collector.database[probe][meter]
}
except KeyError:
flask.abort(404)
return flask.jsonify(message)

View File

@ -21,6 +21,7 @@ import errno
import itertools
import json
import os
from threading import Lock
import time
import uuid
@ -80,6 +81,7 @@ colors = ['#EA644A', '#EC9D48', '#ECD748', '#54EC48', '#48C4EC', '#7648EC',
'#DE48EC', '#8A8187']
probes = set()
probe_colors = {}
lock = Lock()
def create_dirs():
@ -292,7 +294,9 @@ def listen():
LOG.error('Malformed message (missing required key)')
else:
if not probe in probes:
probes.add(probe)
color_seq = itertools.cycle(colors)
lock.acquire()
probes.add(probe)
for probe in sorted(probes):
probe_colors[probe] = color_seq.next()
lock.release()