Merge "Don't load many times the same publisher"

This commit is contained in:
Jenkins 2017-01-30 21:57:13 +00:00 committed by Gerrit Code Review
commit 8b511b05f5
3 changed files with 30 additions and 10 deletions

View File

@ -59,6 +59,7 @@ OPTS = [
),
]
LOG = log.getLogger(__name__)
@ -360,7 +361,7 @@ class Sink(object):
passed data directly from the sink which are published unchanged.
"""
def __init__(self, conf, cfg, transformer_manager):
def __init__(self, conf, cfg, transformer_manager, publisher_manager):
self.conf = conf
self.cfg = cfg
@ -380,9 +381,9 @@ class Sink(object):
if '://' not in p:
# Support old format without URL
p = p + "://"
try:
self.publishers.append(publisher.get_publisher(
self.conf, p, self.NAMESPACE))
self.publishers.append(publisher_manager.get(p))
except Exception:
LOG.error(_LE("Unable to load publisher %s"), p,
exc_info=True)
@ -415,7 +416,7 @@ class Sink(object):
class EventSink(Sink):
NAMESPACE = 'ceilometer.event.publisher'
PUBLISHER_PURPOSE = 'event'
def publish_events(self, events):
if events:
@ -439,7 +440,7 @@ class EventSink(Sink):
class SampleSink(Sink):
NAMESPACE = 'ceilometer.publisher'
PUBLISHER_PURPOSE = 'sample'
def _transform_sample(self, start, sample):
try:
@ -610,11 +611,13 @@ class SamplePipeline(Pipeline):
self.sink.publish_samples(supported)
SAMPLE_TYPE = {'pipeline': SamplePipeline,
SAMPLE_TYPE = {'name': 'sample',
'pipeline': SamplePipeline,
'source': SampleSource,
'sink': SampleSink}
EVENT_TYPE = {'pipeline': EventPipeline,
EVENT_TYPE = {'name': 'event',
'pipeline': EventPipeline,
'source': EventSource,
'sink': EventSink}
@ -670,6 +673,21 @@ class ConfigManagerBase(object):
return False
class PublisherManager(object):
def __init__(self, conf, purpose):
self._loaded_publishers = {}
self._conf = conf
self._purpose = purpose
def get(self, url):
if url not in self._loaded_publishers:
p = publisher.get_publisher(
self._conf, url,
'ceilometer.%s.publisher' % self._purpose)
self._loaded_publishers[url] = p
return self._loaded_publishers[url]
class PipelineManager(ConfigManagerBase):
"""Pipeline Manager
@ -754,6 +772,7 @@ class PipelineManager(ConfigManagerBase):
raise PipelineException("Both sources & sinks are required",
cfg)
LOG.info(_LI('detected decoupled pipeline config format'))
publisher_manager = PublisherManager(self.conf, p_type['name'])
unique_names = set()
sources = []
@ -776,7 +795,8 @@ class PipelineManager(ConfigManagerBase):
else:
unique_names.add(name)
sinks[s['name']] = p_type['sink'](self.conf, s,
transformer_manager)
transformer_manager,
publisher_manager)
unique_names.clear()
for source in sources:

View File

@ -22,7 +22,7 @@ import six
from stevedore import driver
def get_publisher(conf, url, namespace='ceilometer.publisher'):
def get_publisher(conf, url, namespace):
"""Get publisher driver and load it.
:param URL: URL for the publisher

View File

@ -226,7 +226,7 @@ ceilometer.transformer =
aggregator = ceilometer.transformer.conversions:AggregatorTransformer
arithmetic = ceilometer.transformer.arithmetic:ArithmeticTransformer
ceilometer.publisher =
ceilometer.sample.publisher =
test = ceilometer.publisher.test:TestPublisher
notifier = ceilometer.publisher.messaging:SampleNotifierPublisher
udp = ceilometer.publisher.udp:UDPPublisher