ZeroMQ efficiency improvements.

Multi-part messages: the first is the key.
Allow to select the probes to watch.
Hierarchized names are possible.

Change-Id: Id99487b8831037d5d139edda9d5d4a75459e2d13
This commit is contained in:
François Rossigneux 2013-06-07 21:51:00 +02:00
parent 4e938179c9
commit 9188e2db8c
4 changed files with 25 additions and 6 deletions

View File

@ -73,7 +73,12 @@ class Driver(Thread):
measurements['probe_id'] = probe_id
if cfg.CONF.enable_signing:
security.append_signature(measurements, cfg.CONF.metering_secret)
self.publisher.send(json.dumps(measurements))
self.publisher.send_multipart(
[
probe_id + '.',
json.dumps(measurements)
]
)
def subscribe(self, observer):
"""Appends the observer (callback method) to the observers list."""

View File

@ -36,6 +36,9 @@ collector_opts = [
cfg.MultiStrOpt('probes_endpoint',
required=True,
),
cfg.MultiStrOpt('watch_probe',
required=False,
),
cfg.StrOpt('driver_metering_secret',
required=True,
),
@ -126,12 +129,16 @@ class Collector:
context = zmq.Context.instance()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.SUBSCRIBE, '')
if not cfg.CONF.watch_probe:
subscriber.setsockopt(zmq.SUBSCRIBE, '')
else:
for probe in cfg.CONF.watch_probe:
subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.')
for endpoint in cfg.CONF.probes_endpoint:
subscriber.connect(endpoint)
while True:
message = subscriber.recv()
[probe, message] = subscriber.recv_multipart()
measurements = json.loads(message)
if not isinstance(measurements, dict):
LOG.error('Bad message type (not a dict)')

View File

@ -46,6 +46,9 @@ rrd_opts = [
cfg.MultiStrOpt('probes_endpoint',
required=True,
),
cfg.MultiStrOpt('watch_probe',
required=False,
),
cfg.StrOpt('driver_metering_secret',
required=True,
),
@ -261,12 +264,16 @@ def listen():
context = zmq.Context.instance()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.SUBSCRIBE, '')
if not cfg.CONF.watch_probe:
subscriber.setsockopt(zmq.SUBSCRIBE, '')
else:
for probe in cfg.CONF.watch_probe:
subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.')
for endpoint in cfg.CONF.probes_endpoint:
subscriber.connect(endpoint)
while True:
message = subscriber.recv()
[probe, message] = subscriber.recv_multipart()
measurements = json.loads(message)
if not isinstance(measurements, dict):
LOG.error('Bad message type (not a dict)')

View File

@ -1,9 +1,9 @@
flask
iso8601
oslo.config
pyserial
pysnmp
python-keystoneclient
pyzmq
#python-rrdtool
webob
http://tarballs.openstack.org/oslo-config/oslo-config-2013.1b4.tar.gz#egg=oslo-config