Bug fixes (loading/unloading drivers, and cleaning API).
Change-Id: Ib1d334c1d34742b1aabbfe24776d730cc84e548a
This commit is contained in:
parent
8171e22445
commit
085c7646eb
|
@ -17,7 +17,6 @@
|
|||
|
||||
import sys
|
||||
import signal
|
||||
import thread
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
|
@ -30,7 +29,7 @@ if __name__ == "__main__":
|
|||
default_config_files=['/etc/kwapi/drivers.conf'])
|
||||
log.setup('kwapi')
|
||||
|
||||
thread.start_new_thread(driver_manager.start_zmq_server, ())
|
||||
driver_manager.start_zmq_server()
|
||||
driver_manager.load_all_drivers()
|
||||
driver_manager.check_drivers_alive()
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
import ast
|
||||
import signal
|
||||
import thread
|
||||
from threading import Lock, Timer
|
||||
from threading import Lock, Timer, Thread
|
||||
|
||||
from oslo.config import cfg
|
||||
import zmq
|
||||
|
@ -107,13 +107,18 @@ def check_drivers_alive():
|
|||
|
||||
|
||||
def start_zmq_server():
|
||||
"""Forwards probe values to the probes_endpoint."""
|
||||
"""Binds the sockets."""
|
||||
context = zmq.Context.instance()
|
||||
context.set(zmq.MAX_SOCKETS, 100000)
|
||||
frontend = context.socket(zmq.XPUB)
|
||||
frontend.bind(cfg.CONF.probes_endpoint)
|
||||
backend = context.socket(zmq.XSUB)
|
||||
backend.bind('inproc://drivers')
|
||||
thread.start_new_thread(run_zmq_forwarder, (frontend, backend))
|
||||
|
||||
|
||||
def run_zmq_forwarder(frontend, backend):
|
||||
"""Forwards probe values to the probes_endpoint."""
|
||||
poll = zmq.Poller()
|
||||
poll.register(frontend, zmq.POLLIN)
|
||||
poll.register(backend, zmq.POLLIN)
|
||||
|
@ -136,5 +141,10 @@ def signal_handler(signum, frame):
|
|||
def terminate():
|
||||
"""Terminates driver threads."""
|
||||
lock.acquire()
|
||||
join_threads = []
|
||||
for driver in threads:
|
||||
thread.start_new_thread(driver.join, ())
|
||||
join_thread = Thread(target=driver.join)
|
||||
join_thread.start()
|
||||
join_threads.append(join_thread)
|
||||
for join_thread in join_threads:
|
||||
join_thread.join()
|
||||
|
|
|
@ -114,13 +114,11 @@ class Collector:
|
|||
"""
|
||||
LOG.info('Cleaning collector')
|
||||
# Cleaning
|
||||
self.lock.acquire()
|
||||
for probe in self.database.keys():
|
||||
if time.time() - self.database[probe]['timestamp'] > \
|
||||
cfg.CONF.cleaning_interval:
|
||||
LOG.info('Removing data of probe %s' % probe)
|
||||
self.remove(probe)
|
||||
self.lock.release()
|
||||
|
||||
# Schedule periodic execution of this function
|
||||
if cfg.CONF.cleaning_interval > 0:
|
||||
|
|
Loading…
Reference in New Issue