ZeroMQ efficiency improvements.
Multi-part messages: the first is the key. Allow to select the probes to watch. Hierarchized names are possible. Change-Id: Id99487b8831037d5d139edda9d5d4a75459e2d13
This commit is contained in:
parent
4e938179c9
commit
9188e2db8c
|
@ -73,7 +73,12 @@ class Driver(Thread):
|
|||
measurements['probe_id'] = probe_id
|
||||
if cfg.CONF.enable_signing:
|
||||
security.append_signature(measurements, cfg.CONF.metering_secret)
|
||||
self.publisher.send(json.dumps(measurements))
|
||||
self.publisher.send_multipart(
|
||||
[
|
||||
probe_id + '.',
|
||||
json.dumps(measurements)
|
||||
]
|
||||
)
|
||||
|
||||
def subscribe(self, observer):
|
||||
"""Appends the observer (callback method) to the observers list."""
|
||||
|
|
|
@ -36,6 +36,9 @@ collector_opts = [
|
|||
cfg.MultiStrOpt('probes_endpoint',
|
||||
required=True,
|
||||
),
|
||||
cfg.MultiStrOpt('watch_probe',
|
||||
required=False,
|
||||
),
|
||||
cfg.StrOpt('driver_metering_secret',
|
||||
required=True,
|
||||
),
|
||||
|
@ -126,12 +129,16 @@ class Collector:
|
|||
|
||||
context = zmq.Context.instance()
|
||||
subscriber = context.socket(zmq.SUB)
|
||||
subscriber.setsockopt(zmq.SUBSCRIBE, '')
|
||||
if not cfg.CONF.watch_probe:
|
||||
subscriber.setsockopt(zmq.SUBSCRIBE, '')
|
||||
else:
|
||||
for probe in cfg.CONF.watch_probe:
|
||||
subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.')
|
||||
for endpoint in cfg.CONF.probes_endpoint:
|
||||
subscriber.connect(endpoint)
|
||||
|
||||
while True:
|
||||
message = subscriber.recv()
|
||||
[probe, message] = subscriber.recv_multipart()
|
||||
measurements = json.loads(message)
|
||||
if not isinstance(measurements, dict):
|
||||
LOG.error('Bad message type (not a dict)')
|
||||
|
|
|
@ -46,6 +46,9 @@ rrd_opts = [
|
|||
cfg.MultiStrOpt('probes_endpoint',
|
||||
required=True,
|
||||
),
|
||||
cfg.MultiStrOpt('watch_probe',
|
||||
required=False,
|
||||
),
|
||||
cfg.StrOpt('driver_metering_secret',
|
||||
required=True,
|
||||
),
|
||||
|
@ -261,12 +264,16 @@ def listen():
|
|||
|
||||
context = zmq.Context.instance()
|
||||
subscriber = context.socket(zmq.SUB)
|
||||
subscriber.setsockopt(zmq.SUBSCRIBE, '')
|
||||
if not cfg.CONF.watch_probe:
|
||||
subscriber.setsockopt(zmq.SUBSCRIBE, '')
|
||||
else:
|
||||
for probe in cfg.CONF.watch_probe:
|
||||
subscriber.setsockopt(zmq.SUBSCRIBE, probe + '.')
|
||||
for endpoint in cfg.CONF.probes_endpoint:
|
||||
subscriber.connect(endpoint)
|
||||
|
||||
while True:
|
||||
message = subscriber.recv()
|
||||
[probe, message] = subscriber.recv_multipart()
|
||||
measurements = json.loads(message)
|
||||
if not isinstance(measurements, dict):
|
||||
LOG.error('Bad message type (not a dict)')
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
flask
|
||||
iso8601
|
||||
oslo.config
|
||||
pyserial
|
||||
pysnmp
|
||||
python-keystoneclient
|
||||
pyzmq
|
||||
#python-rrdtool
|
||||
webob
|
||||
http://tarballs.openstack.org/oslo-config/oslo-config-2013.1b4.tar.gz#egg=oslo-config
|
||||
|
|
Loading…
Reference in New Issue