diff --git a/kwapi/drivers/driver.py b/kwapi/drivers/driver.py index e5aced6..89e8d8e 100644 --- a/kwapi/drivers/driver.py +++ b/kwapi/drivers/driver.py @@ -63,6 +63,8 @@ class Driver(Thread): """Asks the driver thread to terminate.""" self.stop_request.set() super(Driver, self).join() + LOG.info('Unloading driver %s(probe_ids=%s, kwargs=%s)' + % (self.__class__.__name__, self.probe_ids, self.kwargs)) def stop_request_pending(self): """Returns true if a stop request is pending.""" diff --git a/kwapi/drivers/driver_manager.py b/kwapi/drivers/driver_manager.py index e768543..b80776d 100644 --- a/kwapi/drivers/driver_manager.py +++ b/kwapi/drivers/driver_manager.py @@ -19,7 +19,7 @@ import ast import signal import thread -from threading import Timer +from threading import Lock, Timer from oslo.config import cfg import zmq @@ -40,6 +40,7 @@ driver_manager_opts = [ cfg.CONF.register_opts(driver_manager_opts) threads = [] +lock = Lock() def load_all_drivers(): @@ -53,9 +54,11 @@ def load_all_drivers(): kwargs = {} if 'parameters' in entries.keys(): kwargs = ast.literal_eval(entries['parameters'][0]) + lock.acquire() driver_thread = load_driver(class_name, probe_ids, kwargs) if driver_thread is not None: threads.append(driver_thread) + lock.release() def load_driver(class_name, probe_ids, kwargs): @@ -81,29 +84,32 @@ def check_drivers_alive(): This method is executed automatically at the given interval. """ - LOG.info('Checks driver threads') - for index, driver_thread in enumerate(threads): - if not driver_thread.is_alive(): - LOG.warning('%s(probe_ids=%s, kwargs=%s) is crashed' - % (driver_thread.__class__.__name__, - driver_thread.probe_ids, driver_thread.kwargs)) - new_thread = load_driver(driver_thread.__class__.__name__, - driver_thread.probe_ids, - driver_thread.kwargs - ) - if new_thread is not None: - threads[index] = new_thread - - # Schedule periodic execution of this function - if cfg.CONF.check_drivers_interval > 0: - timer = Timer(cfg.CONF.check_drivers_interval, check_drivers_alive) - timer.daemon = True - timer.start() + if lock.acquire(False): + LOG.info('Checks driver threads') + for index, driver_thread in enumerate(threads): + if not driver_thread.is_alive(): + LOG.warning('%s(probe_ids=%s, kwargs=%s) is crashed' + % (driver_thread.__class__.__name__, + driver_thread.probe_ids, driver_thread.kwargs)) + new_thread = load_driver(driver_thread.__class__.__name__, + driver_thread.probe_ids, + driver_thread.kwargs + ) + if new_thread is not None: + threads[index] = new_thread + # Schedule periodic execution of this function + if cfg.CONF.check_drivers_interval > 0: + timer = Timer(cfg.CONF.check_drivers_interval, + check_drivers_alive) + timer.daemon = True + timer.start() + lock.release() def start_zmq_server(): """Forwards probe values to the probes_endpoint.""" context = zmq.Context.instance() + context.set(zmq.MAX_SOCKETS, 100000) frontend = context.socket(zmq.SUB) frontend.bind('inproc://drivers') frontend.setsockopt(zmq.SUBSCRIBE, '') @@ -120,5 +126,6 @@ def signal_handler(signum, frame): def terminate(): """Terminates driver threads.""" + lock.acquire() for driver in threads: thread.start_new_thread(driver.join, ()) diff --git a/kwapi/plugins/api/app.py b/kwapi/plugins/api/app.py index e47333a..346e235 100644 --- a/kwapi/plugins/api/app.py +++ b/kwapi/plugins/api/app.py @@ -46,7 +46,13 @@ def make_app(): @app.before_request def attach_config(): - flask.request.database = collector.database + flask.request.collector = collector + collector.lock.acquire() + + @app.after_request + def unlock(response): + collector.lock.release() + return response # Install the middleware wrapper if cfg.CONF.acl_enabled: diff --git a/kwapi/plugins/api/collector.py b/kwapi/plugins/api/collector.py index 27a735d..5fe40eb 100644 --- a/kwapi/plugins/api/collector.py +++ b/kwapi/plugins/api/collector.py @@ -80,25 +80,31 @@ class Collector: """Initializes an empty database and start listening the endpoint.""" LOG.info('Starting Collector') self.database = {} + self.lock = threading.Lock() thread = threading.Thread(target=self.listen) thread.daemon = True thread.start() def add(self, probe, watts): """Creates (or updates) consumption data for this probe.""" + self.lock.acquire() if probe in self.database.keys(): self.database[probe].add(watts) else: record = Record(timestamp=time.time(), kwh=0.0, watts=watts) self.database[probe] = record + self.lock.release() def remove(self, probe): """Removes this probe from database.""" - if probe in self.database.keys(): + self.lock.acquire() + try: del self.database[probe] return True - else: + except KeyError: return False + finally: + self.lock.release() def clean(self): """Removes probes from database if they didn't send new values over @@ -108,11 +114,13 @@ class Collector: """ LOG.info('Cleaning collector') # Cleaning + self.lock.acquire() for probe in self.database.keys(): if time.time() - self.database[probe]['timestamp'] > \ cfg.CONF.cleaning_interval: LOG.info('Removing data of probe %s' % probe) self.remove(probe) + self.lock.release() # Schedule periodic execution of this function if cfg.CONF.cleaning_interval > 0: diff --git a/kwapi/plugins/api/v1.py b/kwapi/plugins/api/v1.py index 56f594f..9556206 100644 --- a/kwapi/plugins/api/v1.py +++ b/kwapi/plugins/api/v1.py @@ -31,7 +31,7 @@ def welcome(): def list_probes_ids(): """Returns all known probes IDs.""" message = {} - message['probe_ids'] = flask.request.database.keys() + message['probe_ids'] = flask.request.collector.database.keys() return flask.jsonify(message) @@ -39,7 +39,7 @@ def list_probes_ids(): def list_probes(): """Returns all information about all known probes.""" message = {} - message['probes'] = flask.request.database + message['probes'] = flask.request.collector.database return flask.jsonify(message) @@ -48,7 +48,7 @@ def probe_info(probe): """Returns all information about this probe (id, timestamp, kWh, W).""" message = {} try: - message[probe] = flask.request.database[probe] + message[probe] = flask.request.collector.database[probe] except KeyError: flask.abort(404) return flask.jsonify(message) @@ -59,7 +59,10 @@ def probe_value(probe, meter): """Returns the probe meter value.""" message = {} try: - message[probe] = {meter: flask.request.database[probe][meter]} + message[probe] = \ + { + meter: flask.request.collector.database[probe][meter] + } except KeyError: flask.abort(404) return flask.jsonify(message) diff --git a/kwapi/plugins/rrd/rrd.py b/kwapi/plugins/rrd/rrd.py index 7189beb..9cf2247 100644 --- a/kwapi/plugins/rrd/rrd.py +++ b/kwapi/plugins/rrd/rrd.py @@ -21,6 +21,7 @@ import errno import itertools import json import os +from threading import Lock import time import uuid @@ -80,6 +81,7 @@ colors = ['#EA644A', '#EC9D48', '#ECD748', '#54EC48', '#48C4EC', '#7648EC', '#DE48EC', '#8A8187'] probes = set() probe_colors = {} +lock = Lock() def create_dirs(): @@ -292,7 +294,9 @@ def listen(): LOG.error('Malformed message (missing required key)') else: if not probe in probes: - probes.add(probe) color_seq = itertools.cycle(colors) + lock.acquire() + probes.add(probe) for probe in sorted(probes): probe_colors[probe] = color_seq.next() + lock.release()