ZeroMQ max sockets and mutexes.
Increase the ZeroMQ max sockets limit. Add mutexes around critical sections. Change-Id: Ic898cd58b3978eb6cf20bfded5683e9e835fa512
This commit is contained in:
parent
e384035d0b
commit
863085890f
|
@ -63,6 +63,8 @@ class Driver(Thread):
|
|||
"""Asks the driver thread to terminate."""
|
||||
self.stop_request.set()
|
||||
super(Driver, self).join()
|
||||
LOG.info('Unloading driver %s(probe_ids=%s, kwargs=%s)'
|
||||
% (self.__class__.__name__, self.probe_ids, self.kwargs))
|
||||
|
||||
def stop_request_pending(self):
|
||||
"""Returns true if a stop request is pending."""
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
import ast
|
||||
import signal
|
||||
import thread
|
||||
from threading import Timer
|
||||
from threading import Lock, Timer
|
||||
|
||||
from oslo.config import cfg
|
||||
import zmq
|
||||
|
@ -40,6 +40,7 @@ driver_manager_opts = [
|
|||
cfg.CONF.register_opts(driver_manager_opts)
|
||||
|
||||
threads = []
|
||||
lock = Lock()
|
||||
|
||||
|
||||
def load_all_drivers():
|
||||
|
@ -53,9 +54,11 @@ def load_all_drivers():
|
|||
kwargs = {}
|
||||
if 'parameters' in entries.keys():
|
||||
kwargs = ast.literal_eval(entries['parameters'][0])
|
||||
lock.acquire()
|
||||
driver_thread = load_driver(class_name, probe_ids, kwargs)
|
||||
if driver_thread is not None:
|
||||
threads.append(driver_thread)
|
||||
lock.release()
|
||||
|
||||
|
||||
def load_driver(class_name, probe_ids, kwargs):
|
||||
|
@ -81,29 +84,32 @@ def check_drivers_alive():
|
|||
This method is executed automatically at the given interval.
|
||||
|
||||
"""
|
||||
LOG.info('Checks driver threads')
|
||||
for index, driver_thread in enumerate(threads):
|
||||
if not driver_thread.is_alive():
|
||||
LOG.warning('%s(probe_ids=%s, kwargs=%s) is crashed'
|
||||
% (driver_thread.__class__.__name__,
|
||||
driver_thread.probe_ids, driver_thread.kwargs))
|
||||
new_thread = load_driver(driver_thread.__class__.__name__,
|
||||
driver_thread.probe_ids,
|
||||
driver_thread.kwargs
|
||||
)
|
||||
if new_thread is not None:
|
||||
threads[index] = new_thread
|
||||
|
||||
# Schedule periodic execution of this function
|
||||
if cfg.CONF.check_drivers_interval > 0:
|
||||
timer = Timer(cfg.CONF.check_drivers_interval, check_drivers_alive)
|
||||
timer.daemon = True
|
||||
timer.start()
|
||||
if lock.acquire(False):
|
||||
LOG.info('Checks driver threads')
|
||||
for index, driver_thread in enumerate(threads):
|
||||
if not driver_thread.is_alive():
|
||||
LOG.warning('%s(probe_ids=%s, kwargs=%s) is crashed'
|
||||
% (driver_thread.__class__.__name__,
|
||||
driver_thread.probe_ids, driver_thread.kwargs))
|
||||
new_thread = load_driver(driver_thread.__class__.__name__,
|
||||
driver_thread.probe_ids,
|
||||
driver_thread.kwargs
|
||||
)
|
||||
if new_thread is not None:
|
||||
threads[index] = new_thread
|
||||
# Schedule periodic execution of this function
|
||||
if cfg.CONF.check_drivers_interval > 0:
|
||||
timer = Timer(cfg.CONF.check_drivers_interval,
|
||||
check_drivers_alive)
|
||||
timer.daemon = True
|
||||
timer.start()
|
||||
lock.release()
|
||||
|
||||
|
||||
def start_zmq_server():
|
||||
"""Forwards probe values to the probes_endpoint."""
|
||||
context = zmq.Context.instance()
|
||||
context.set(zmq.MAX_SOCKETS, 100000)
|
||||
frontend = context.socket(zmq.SUB)
|
||||
frontend.bind('inproc://drivers')
|
||||
frontend.setsockopt(zmq.SUBSCRIBE, '')
|
||||
|
@ -120,5 +126,6 @@ def signal_handler(signum, frame):
|
|||
|
||||
def terminate():
|
||||
"""Terminates driver threads."""
|
||||
lock.acquire()
|
||||
for driver in threads:
|
||||
thread.start_new_thread(driver.join, ())
|
||||
|
|
|
@ -46,7 +46,13 @@ def make_app():
|
|||
|
||||
@app.before_request
|
||||
def attach_config():
|
||||
flask.request.database = collector.database
|
||||
flask.request.collector = collector
|
||||
collector.lock.acquire()
|
||||
|
||||
@app.after_request
|
||||
def unlock(response):
|
||||
collector.lock.release()
|
||||
return response
|
||||
|
||||
# Install the middleware wrapper
|
||||
if cfg.CONF.acl_enabled:
|
||||
|
|
|
@ -80,25 +80,31 @@ class Collector:
|
|||
"""Initializes an empty database and start listening the endpoint."""
|
||||
LOG.info('Starting Collector')
|
||||
self.database = {}
|
||||
self.lock = threading.Lock()
|
||||
thread = threading.Thread(target=self.listen)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
def add(self, probe, watts):
|
||||
"""Creates (or updates) consumption data for this probe."""
|
||||
self.lock.acquire()
|
||||
if probe in self.database.keys():
|
||||
self.database[probe].add(watts)
|
||||
else:
|
||||
record = Record(timestamp=time.time(), kwh=0.0, watts=watts)
|
||||
self.database[probe] = record
|
||||
self.lock.release()
|
||||
|
||||
def remove(self, probe):
|
||||
"""Removes this probe from database."""
|
||||
if probe in self.database.keys():
|
||||
self.lock.acquire()
|
||||
try:
|
||||
del self.database[probe]
|
||||
return True
|
||||
else:
|
||||
except KeyError:
|
||||
return False
|
||||
finally:
|
||||
self.lock.release()
|
||||
|
||||
def clean(self):
|
||||
"""Removes probes from database if they didn't send new values over
|
||||
|
@ -108,11 +114,13 @@ class Collector:
|
|||
"""
|
||||
LOG.info('Cleaning collector')
|
||||
# Cleaning
|
||||
self.lock.acquire()
|
||||
for probe in self.database.keys():
|
||||
if time.time() - self.database[probe]['timestamp'] > \
|
||||
cfg.CONF.cleaning_interval:
|
||||
LOG.info('Removing data of probe %s' % probe)
|
||||
self.remove(probe)
|
||||
self.lock.release()
|
||||
|
||||
# Schedule periodic execution of this function
|
||||
if cfg.CONF.cleaning_interval > 0:
|
||||
|
|
|
@ -31,7 +31,7 @@ def welcome():
|
|||
def list_probes_ids():
|
||||
"""Returns all known probes IDs."""
|
||||
message = {}
|
||||
message['probe_ids'] = flask.request.database.keys()
|
||||
message['probe_ids'] = flask.request.collector.database.keys()
|
||||
return flask.jsonify(message)
|
||||
|
||||
|
||||
|
@ -39,7 +39,7 @@ def list_probes_ids():
|
|||
def list_probes():
|
||||
"""Returns all information about all known probes."""
|
||||
message = {}
|
||||
message['probes'] = flask.request.database
|
||||
message['probes'] = flask.request.collector.database
|
||||
return flask.jsonify(message)
|
||||
|
||||
|
||||
|
@ -48,7 +48,7 @@ def probe_info(probe):
|
|||
"""Returns all information about this probe (id, timestamp, kWh, W)."""
|
||||
message = {}
|
||||
try:
|
||||
message[probe] = flask.request.database[probe]
|
||||
message[probe] = flask.request.collector.database[probe]
|
||||
except KeyError:
|
||||
flask.abort(404)
|
||||
return flask.jsonify(message)
|
||||
|
@ -59,7 +59,10 @@ def probe_value(probe, meter):
|
|||
"""Returns the probe meter value."""
|
||||
message = {}
|
||||
try:
|
||||
message[probe] = {meter: flask.request.database[probe][meter]}
|
||||
message[probe] = \
|
||||
{
|
||||
meter: flask.request.collector.database[probe][meter]
|
||||
}
|
||||
except KeyError:
|
||||
flask.abort(404)
|
||||
return flask.jsonify(message)
|
||||
|
|
|
@ -21,6 +21,7 @@ import errno
|
|||
import itertools
|
||||
import json
|
||||
import os
|
||||
from threading import Lock
|
||||
import time
|
||||
import uuid
|
||||
|
||||
|
@ -80,6 +81,7 @@ colors = ['#EA644A', '#EC9D48', '#ECD748', '#54EC48', '#48C4EC', '#7648EC',
|
|||
'#DE48EC', '#8A8187']
|
||||
probes = set()
|
||||
probe_colors = {}
|
||||
lock = Lock()
|
||||
|
||||
|
||||
def create_dirs():
|
||||
|
@ -292,7 +294,9 @@ def listen():
|
|||
LOG.error('Malformed message (missing required key)')
|
||||
else:
|
||||
if not probe in probes:
|
||||
probes.add(probe)
|
||||
color_seq = itertools.cycle(colors)
|
||||
lock.acquire()
|
||||
probes.add(probe)
|
||||
for probe in sorted(probes):
|
||||
probe_colors[probe] = color_seq.next()
|
||||
lock.release()
|
||||
|
|
Loading…
Reference in New Issue