From 9188e2db8c8fdb5e2b21b7df5da60746001a9dfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Rossigneux?= Date: Fri, 7 Jun 2013 21:51:00 +0200 Subject: [PATCH] ZeroMQ efficiency improvements. Multi-part messages: the first is the key. Allow to select the probes to watch. Hierarchized names are possible. Change-Id: Id99487b8831037d5d139edda9d5d4a75459e2d13 --- kwapi/drivers/driver.py | 7 ++++++- kwapi/plugins/api/collector.py | 11 +++++++++-- kwapi/plugins/rrd/rrd.py | 11 +++++++++-- pip-requires | 2 +- 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/kwapi/drivers/driver.py b/kwapi/drivers/driver.py index 00acd30..e5aced6 100644 --- a/kwapi/drivers/driver.py +++ b/kwapi/drivers/driver.py @@ -73,7 +73,12 @@ class Driver(Thread): measurements['probe_id'] = probe_id if cfg.CONF.enable_signing: security.append_signature(measurements, cfg.CONF.metering_secret) - self.publisher.send(json.dumps(measurements)) + self.publisher.send_multipart( + [ + probe_id + '.', + json.dumps(measurements) + ] + ) def subscribe(self, observer): """Appends the observer (callback method) to the observers list.""" diff --git a/kwapi/plugins/api/collector.py b/kwapi/plugins/api/collector.py index 829bf32..27a735d 100644 --- a/kwapi/plugins/api/collector.py +++ b/kwapi/plugins/api/collector.py @@ -36,6 +36,9 @@ collector_opts = [ cfg.MultiStrOpt('probes_endpoint', required=True, ), + cfg.MultiStrOpt('watch_probe', + required=False, + ), cfg.StrOpt('driver_metering_secret', required=True, ), @@ -126,12 +129,16 @@ class Collector: context = zmq.Context.instance() subscriber = context.socket(zmq.SUB) - subscriber.setsockopt(zmq.SUBSCRIBE, '') + if not cfg.CONF.watch_probe: + subscriber.setsockopt(zmq.SUBSCRIBE, '') + else: + for probe in cfg.CONF.watch_probe: + subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.') for endpoint in cfg.CONF.probes_endpoint: subscriber.connect(endpoint) while True: - message = subscriber.recv() + [probe, message] = subscriber.recv_multipart() measurements = json.loads(message) if not isinstance(measurements, dict): LOG.error('Bad message type (not a dict)') diff --git a/kwapi/plugins/rrd/rrd.py b/kwapi/plugins/rrd/rrd.py index 720cd32..2bae86c 100644 --- a/kwapi/plugins/rrd/rrd.py +++ b/kwapi/plugins/rrd/rrd.py @@ -46,6 +46,9 @@ rrd_opts = [ cfg.MultiStrOpt('probes_endpoint', required=True, ), + cfg.MultiStrOpt('watch_probe', + required=False, + ), cfg.StrOpt('driver_metering_secret', required=True, ), @@ -261,12 +264,16 @@ def listen(): context = zmq.Context.instance() subscriber = context.socket(zmq.SUB) - subscriber.setsockopt(zmq.SUBSCRIBE, '') + if not cfg.CONF.watch_probe: + subscriber.setsockopt(zmq.SUBSCRIBE, '') + else: + for probe in cfg.CONF.watch_probe: + subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.') for endpoint in cfg.CONF.probes_endpoint: subscriber.connect(endpoint) while True: - message = subscriber.recv() + [probe, message] = subscriber.recv_multipart() measurements = json.loads(message) if not isinstance(measurements, dict): LOG.error('Bad message type (not a dict)') diff --git a/pip-requires b/pip-requires index c46ff5d..c352c05 100644 --- a/pip-requires +++ b/pip-requires @@ -1,9 +1,9 @@ flask iso8601 +oslo.config pyserial pysnmp python-keystoneclient pyzmq #python-rrdtool webob -http://tarballs.openstack.org/oslo-config/oslo-config-2013.1b4.tar.gz#egg=oslo-config