Removing probes observers and callback method. Using ZeroMQ instead (Drivers directly emit their data on the bus).

Add the method start_zmq_server to the driver_manager module, to set up the endpoint where subscribers can listen probe values.
This commit is contained in:
François Rossigneux 2012-12-01 16:48:28 +01:00
parent a1d2802a05
commit c046dd0880
6 changed files with 21 additions and 18 deletions

View File

@ -11,6 +11,7 @@ if __name__ == "__main__":
cfg.CONF(sys.argv[1:], project='kwapi', default_config_files=['/etc/kwapi/drivers.conf'])
log.setup('kwapi')
driver_manager.start_zmq_server(cfg.CONF)
driver_manager.load_all_drivers(cfg.CONF)
driver_manager.check_drivers_alive(cfg.CONF)

View File

@ -66,7 +66,7 @@ class Collector:
return False
def clean(self, conf, periodic):
"""Removes probes from database if they didn't send new values over the last timeout period (seconds).
"""Removes probes from database if they didn't send new values over the last period (seconds).
If periodic, this method is executed automatically after the timeout interval.
"""

View File

@ -2,6 +2,8 @@
from threading import Thread, Event
import zmq
from kwapi.openstack.common import log
LOG = log.getLogger(__name__)
@ -17,6 +19,8 @@ class Driver(Thread):
self.kwargs = kwargs
self.probe_observers = []
self.stop_request = Event()
self.publisher = zmq.Context.instance().socket(zmq.PUB)
self.publisher.connect('inproc://drivers')
def run(self):
"""Run the driver thread. Needs to be implemented in a derived class."""
@ -31,10 +35,9 @@ class Driver(Thread):
"""Returns true if a stop request is pending."""
return self.stop_request.is_set()
def update_value(self, probe_id, value):
"""Calls the callback method of all observers, with the following arguments: probe_id, value."""
for notify_new_value in self.probe_observers :
notify_new_value(probe_id, value)
def send_value(self, probe_id, value):
"""Sends a message via ZeroMQ with the following format: probe_id, value."""
self.publisher.send(probe_id + ':' + str(value))
def subscribe(self, observer):
"""Appends the observer (callback method) to the observers list."""

View File

@ -25,11 +25,6 @@ driver_manager_opts = [
cfg.CONF.register_opts(driver_manager_opts)
context = zmq.Context()
publisher = context.socket(zmq.PUB)
#TODO Read conf
publisher.bind('ipc:///tmp/kwapi')
threads = []
def load_all_drivers(conf):
@ -58,7 +53,6 @@ def load_driver(class_name, probe_ids, kwargs):
except Exception as exception:
LOG.error('Exception occurred while initializing %s(%s, %s): %s' % (class_name, probe_ids, kwargs, exception))
else:
probeObject.subscribe(send_value)
probeObject.start()
return probeObject
@ -79,6 +73,16 @@ def check_drivers_alive(conf):
timer.daemon = True
timer.start()
def start_zmq_server(conf):
"""Forwards probe values to the probes_endpoint defined in conf."""
context = zmq.Context.instance()
frontend = context.socket(zmq.SUB)
frontend.bind('inproc://drivers')
frontend.setsockopt(zmq.SUBSCRIBE, '')
backend = context.socket(zmq.PUB)
backend.bind(conf.probes_endpoint)
thread.start_new_thread(zmq.device, (zmq.FORWARDER, frontend, backend))
def signal_handler(signum, frame):
"""Intercepts TERM signal and properly terminates probe threads."""
if signum is signal.SIGTERM:
@ -88,8 +92,3 @@ def terminate():
"""Terminates driver threads"""
for thread in threads:
thread.join()
publisher.close()
def send_value(probe_id, value):
"""Sends a message via ZeroMQ, with the following format: "probe_id:value"."""
publisher.send(probe_id + ':' + str(value))

View File

@ -25,5 +25,5 @@ class Dummy(Driver):
while not self.stop_request_pending():
for probe_id in self.probe_ids:
value = randrange(self.min_value, self.max_value)
self.update_value(probe_id, value)
self.send_value(probe_id, value)
time.sleep(1)

View File

@ -45,7 +45,7 @@ class Wattsup(Driver):
self.serial.close()
self.stop()
value = self.extract_watts(packet)
self.update_value(self.probe_ids[0], value)
self.send_value(self.probe_ids[0], value)
def get_packet(self):
"""Returns the next packet sent by the wattmeter."""