Merge "publisher: clean out context usage"

This commit is contained in:
Jenkins 2016-04-05 06:52:43 +00:00 committed by Gerrit Code Review
commit ee8f3f69c3
28 changed files with 219 additions and 256 deletions

View File

@ -18,7 +18,6 @@
import abc
import collections
from oslo_context import context
from oslo_log import log
import oslo_messaging
import six
@ -103,19 +102,18 @@ class NotificationBase(PluginBase):
try:
notification = messaging.convert_to_old_notification_format(
priority, notification)
self.to_samples_and_publish(context.get_admin_context(),
notification)
self.to_samples_and_publish(notification)
except Exception:
LOG.error(_LE('Fail to process notification'), exc_info=True)
def to_samples_and_publish(self, context, notification):
def to_samples_and_publish(self, notification):
"""Return samples produced by *process_notification*.
Samples produced for the given notification.
:param context: Execution context from the service or RPC call
:param notification: The notification to process.
"""
with self.manager.publisher(context) as p:
with self.manager.publisher() as p:
p(list(self.process_notification(notification)))

View File

@ -20,7 +20,6 @@ import logging
import sys
from oslo_config import cfg
from oslo_context import context
from oslo_utils import timeutils
from stevedore import extension
@ -80,7 +79,7 @@ def send_sample():
pipeline_manager = pipeline.setup_pipeline(
extension.ExtensionManager('ceilometer.transformer'))
with pipeline_manager.publisher(context.get_admin_context()) as p:
with pipeline_manager.publisher() as p:
p([sample.Sample(
name=cfg.CONF.sample_name,
type=cfg.CONF.sample_type,

View File

@ -15,7 +15,6 @@
import logging
from oslo_config import cfg
from oslo_context import context
import oslo_messaging
from stevedore import extension
@ -30,7 +29,6 @@ class EventsNotificationEndpoint(object):
def __init__(self, manager):
super(EventsNotificationEndpoint, self).__init__()
LOG.debug('Loading event definitions')
self.ctxt = context.get_admin_context()
self.event_converter = event_converter.setup_events(
extension.ExtensionManager(
namespace='ceilometer.event.trait_plugin'))
@ -61,7 +59,7 @@ class EventsNotificationEndpoint(object):
try:
event = self.event_converter.to_event(notification)
if event is not None:
with self.manager.publisher(self.ctxt) as p:
with self.manager.publisher() as p:
p(event)
except Exception:
if not cfg.CONF.notification.ack_on_event_error:

View File

@ -16,7 +16,6 @@ import itertools
import threading
from oslo_config import cfg
from oslo_context import context
from oslo_log import log
import oslo_messaging
from stevedore import extension
@ -156,7 +155,6 @@ class NotificationService(service_base.BaseService):
self.transport = messaging.get_transport()
if cfg.CONF.notification.workload_partitioning:
self.ctxt = context.get_admin_context()
self.group_id = self.NOTIFICATION_NAMESPACE
self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator.start()
@ -285,7 +283,7 @@ class NotificationService(service_base.BaseService):
listener = messaging.get_batch_notification_listener(
transport,
[oslo_messaging.Target(topic=topic)],
[pipe_endpoint(self.ctxt, pipe)],
[pipe_endpoint(pipe)],
batch_size=cfg.CONF.notification.batch_size,
batch_timeout=cfg.CONF.notification.batch_timeout)
listener.start()

View File

@ -80,8 +80,8 @@ class PipelineException(Exception):
@six.add_metaclass(abc.ABCMeta)
class PipelineEndpoint(object):
def __init__(self, context, pipeline):
self.publish_context = PublishContext(context, [pipeline])
def __init__(self, pipeline):
self.publish_context = PublishContext([pipeline])
@abc.abstractmethod
def sample(self, messages):
@ -149,7 +149,7 @@ class _PipelineTransportManager(object):
def add_transporter(self, transporter):
self.transporters.append(transporter)
def publisher(self, context):
def publisher(self):
serializer = self.serializer
hash_grouping = self.hash_grouping
transporters = self.transporters
@ -171,7 +171,7 @@ class _PipelineTransportManager(object):
grouping_keys)
% len(notifiers))
notifier = notifiers[key]
notifier.sample(context.to_dict(),
notifier.sample({},
event_type=event_type,
payload=[serialized_data])
return p
@ -204,10 +204,9 @@ class EventPipelineTransportManager(_PipelineTransportManager):
class PublishContext(object):
def __init__(self, context, pipelines=None):
def __init__(self, pipelines=None):
pipelines = pipelines or []
self.pipelines = set(pipelines)
self.context = context
def add_pipelines(self, pipelines):
self.pipelines.update(pipelines)
@ -215,12 +214,12 @@ class PublishContext(object):
def __enter__(self):
def p(data):
for p in self.pipelines:
p.publish_data(self.context, data)
p.publish_data(data)
return p
def __exit__(self, exc_type, exc_value, traceback):
for p in self.pipelines:
p.flush(self.context)
p.flush()
class Source(object):
@ -419,11 +418,11 @@ class EventSink(Sink):
NAMESPACE = 'ceilometer.event.publisher'
def publish_events(self, ctxt, events):
def publish_events(self, events):
if events:
for p in self.publishers:
try:
p.publish_events(ctxt, events)
p.publish_events(events)
except Exception:
LOG.exception(_("Pipeline %(pipeline)s: %(status)s"
" after error from publisher %(pub)s") %
@ -433,19 +432,19 @@ class EventSink(Sink):
if not self.multi_publish:
raise
def flush(self, ctxt):
@staticmethod
def flush():
"""Flush data after all events have been injected to pipeline."""
pass
class SampleSink(Sink):
NAMESPACE = 'ceilometer.publisher'
def _transform_sample(self, start, ctxt, sample):
def _transform_sample(self, start, sample):
try:
for transformer in self.transformers[start:]:
sample = transformer.handle_sample(ctxt, sample)
sample = transformer.handle_sample(sample)
if not sample:
LOG.debug(
"Pipeline %(pipeline)s: Sample dropped by "
@ -462,13 +461,12 @@ class SampleSink(Sink):
'smp': sample}))
LOG.exception(err)
def _publish_samples(self, start, ctxt, samples):
def _publish_samples(self, start, samples):
"""Push samples into pipeline for publishing.
:param start: The first transformer that the sample will be injected.
This is mainly for flush() invocation that transformer
may emit samples.
:param ctxt: Execution context from the manager or service.
:param samples: Sample list.
"""
@ -483,30 +481,30 @@ class SampleSink(Sink):
"%(smp)s from %(trans)s transformer", {'pipeline': self,
'smp': sample,
'trans': start})
sample = self._transform_sample(start, ctxt, sample)
sample = self._transform_sample(start, sample)
if sample:
transformed_samples.append(sample)
if transformed_samples:
for p in self.publishers:
try:
p.publish_samples(ctxt, transformed_samples)
p.publish_samples(transformed_samples)
except Exception:
LOG.exception(_(
"Pipeline %(pipeline)s: Continue after error "
"from publisher %(pub)s") % ({'pipeline': self,
'pub': p}))
def publish_samples(self, ctxt, samples):
self._publish_samples(0, ctxt, samples)
def publish_samples(self, samples):
self._publish_samples(0, samples)
def flush(self, ctxt):
def flush(self):
"""Flush data after all samples have been injected to pipeline."""
for (i, transformer) in enumerate(self.transformers):
try:
self._publish_samples(i + 1, ctxt,
list(transformer.flush(ctxt)))
self._publish_samples(i + 1,
list(transformer.flush()))
except Exception as err:
LOG.warning(_(
"Pipeline %(pipeline)s: Error flushing "
@ -528,15 +526,15 @@ class Pipeline(object):
return (self.source.name if self.source.name == self.sink.name
else '%s:%s' % (self.source.name, self.sink.name))
def flush(self, ctxt):
self.sink.flush(ctxt)
def flush(self):
self.sink.flush()
@property
def publishers(self):
return self.sink.publishers
@abc.abstractmethod
def publish_data(self, ctxt, data):
def publish_data(self, data):
"""Publish data from pipeline."""
@ -551,12 +549,12 @@ class EventPipeline(Pipeline):
def support_event(self, event_type):
return self.source.support_event(event_type)
def publish_data(self, ctxt, events):
def publish_data(self, events):
if not isinstance(events, list):
events = [events]
supported = [e for e in events
if self.source.support_event(e.event_type)]
self.sink.publish_events(ctxt, supported)
self.sink.publish_events(supported)
class SamplePipeline(Pipeline):
@ -605,12 +603,12 @@ class SamplePipeline(Pipeline):
return False
return True
def publish_data(self, ctxt, samples):
def publish_data(self, samples):
if not isinstance(samples, list):
samples = [samples]
supported = [s for s in samples if self.source.support_meter(s.name)
and self._validate_volume(s)]
self.sink.publish_samples(ctxt, supported)
self.sink.publish_samples(supported)
SAMPLE_TYPE = {'pipeline': SamplePipeline,
@ -741,12 +739,12 @@ class PipelineManager(object):
self.pipelines.append(pipe)
unique_names.clear()
def publisher(self, context):
def publisher(self):
"""Build a new Publisher for these manager pipelines.
:param context: The context.
"""
return PublishContext(context, self.pipelines)
return PublishContext(self.pipelines)
class PollingManager(object):

View File

@ -40,9 +40,9 @@ class PublisherBase(object):
pass
@abc.abstractmethod
def publish_samples(self, context, samples):
def publish_samples(self, samples):
"""Publish samples into final conduit."""
@abc.abstractmethod
def publish_events(self, context, events):
def publish_events(self, events):
"""Publish events into final conduit."""

View File

@ -35,7 +35,7 @@ class DirectPublisher(publisher.PublisherBase):
self.meter_conn = dispatcher.meter_conn
self.event_conn = dispatcher.event_conn
def publish_samples(self, context, samples):
def publish_samples(self, samples):
if not isinstance(samples, list):
samples = [samples]
@ -52,7 +52,7 @@ class DirectPublisher(publisher.PublisherBase):
meter['timestamp'] = timeutils.normalize_time(ts)
self.meter_conn.record_metering_data(meter)
def publish_events(self, context, events):
def publish_events(self, events):
if not isinstance(events, list):
events = [events]

View File

@ -87,20 +87,18 @@ class FilePublisher(publisher.PublisherBase):
rfh.setLevel(logging.INFO)
self.publisher_logger.addHandler(rfh)
def publish_samples(self, context, samples):
def publish_samples(self, samples):
"""Send a metering message for publishing
:param context: Execution context from the service or RPC call
:param samples: Samples from pipeline after transformation
"""
if self.publisher_logger:
for sample in samples:
self.publisher_logger.info(sample.as_dict())
def publish_events(self, context, events):
def publish_events(self, events):
"""Send an event message for publishing
:param context: Execution context from the service or RPC call
:param events: events from pipeline after transformation
"""
raise ceilometer.NotImplementedError

View File

@ -119,19 +119,17 @@ class HttpPublisher(publisher.PublisherBase):
LOG.error(_LE('Data post failed with status code %s') %
res.status_code)
def publish_samples(self, context, samples):
def publish_samples(self, samples):
"""Send a metering message for publishing
:param context: Execution context from the service or RPC call
:param samples: Samples from pipeline after transformation
"""
data = [sample.as_dict() for sample in samples]
self._do_post(data)
def publish_events(self, context, events):
def publish_events(self, events):
"""Send an event message for publishing
:param context: Execution context from the service or RPC call
:param events: events from pipeline after transformation
"""
data = [evt.as_dict()['raw']['payload'] for evt in events

View File

@ -84,7 +84,7 @@ class KafkaBrokerPublisher(messaging.MessagingPublisher):
raise messaging.DeliveryFailure('Kafka Client is not available, '
'please restart Kafka client')
def _send(self, context, event_type, data):
def _send(self, event_type, data):
self._ensure_connection()
# TODO(sileht): don't split the payload into multiple network
# message ... but how to do that without breaking consuming

View File

@ -98,10 +98,9 @@ class MessagingPublisher(publisher.PublisherBase):
self.retry = 1 if self.policy in ['queue', 'drop'] else None
def publish_samples(self, context, samples):
def publish_samples(self, samples):
"""Publish samples on RPC.
:param context: Execution context from the service or RPC call.
:param samples: Samples from pipeline after transformation.
"""
@ -112,7 +111,7 @@ class MessagingPublisher(publisher.PublisherBase):
for sample in samples
]
topic = cfg.CONF.publisher_notifier.metering_topic
self.local_queue.append((context, topic, meters))
self.local_queue.append((topic, meters))
if self.per_meter_topic:
for meter_name, meter_list in itertools.groupby(
@ -122,7 +121,7 @@ class MessagingPublisher(publisher.PublisherBase):
topic_name = topic + '.' + meter_name
LOG.debug('Publishing %(m)d samples on %(n)s',
{'m': len(meter_list), 'n': topic_name})
self.local_queue.append((context, topic_name, meter_list))
self.local_queue.append((topic_name, meter_list))
self.flush()
@ -150,11 +149,11 @@ class MessagingPublisher(publisher.PublisherBase):
def _process_queue(self, queue, policy):
current_retry = 0
while queue:
context, topic, data = queue[0]
topic, data = queue[0]
try:
self._send(context, topic, data)
self._send(topic, data)
except DeliveryFailure:
data = sum([len(m) for __, __, m in queue])
data = sum([len(m) for __, m in queue])
if policy == 'queue':
LOG.warning(_("Failed to publish %d datapoints, queue "
"them"), data)
@ -172,21 +171,20 @@ class MessagingPublisher(publisher.PublisherBase):
queue.pop(0)
return []
def publish_events(self, context, events):
def publish_events(self, events):
"""Send an event message for publishing
:param context: Execution context from the service or RPC call
:param events: events from pipeline after transformation
"""
ev_list = [utils.message_from_event(
event, cfg.CONF.publisher.telemetry_secret) for event in events]
topic = cfg.CONF.publisher_notifier.event_topic
self.local_queue.append((context, topic, ev_list))
self.local_queue.append((topic, ev_list))
self.flush()
@abc.abstractmethod
def _send(self, context, topic, meters):
def _send(self, topic, meters):
"""Send the meters to the messaging topic."""
@ -203,9 +201,9 @@ class NotifierPublisher(MessagingPublisher):
retry=self.retry
)
def _send(self, context, event_type, data):
def _send(self, event_type, data):
try:
self.notifier.sample(context.to_dict(), event_type=event_type,
self.notifier.sample({}, event_type=event_type,
payload=data)
except oslo_messaging.MessageDeliveryFailure as e:
raise_delivery_failure(e)

View File

@ -26,19 +26,17 @@ class TestPublisher(publisher.PublisherBase):
self.events = []
self.calls = 0
def publish_samples(self, context, samples):
def publish_samples(self, samples):
"""Send a metering message for publishing
:param context: Execution context from the service or RPC call
:param samples: Samples from pipeline after transformation
"""
self.samples.extend(samples)
self.calls += 1
def publish_events(self, context, events):
def publish_events(self, events):
"""Send an event message for publishing
:param context: Execution context from the service or RPC call
:param events: events from pipeline after transformation
"""
self.events.extend(events)

View File

@ -45,10 +45,9 @@ class UDPPublisher(publisher.PublisherBase):
self.socket = socket.socket(addr_family,
socket.SOCK_DGRAM)
def publish_samples(self, context, samples):
def publish_samples(self, samples):
"""Send a metering message for publishing
:param context: Execution context from the service or RPC call
:param samples: Samples from pipeline after transformation
"""
@ -67,10 +66,9 @@ class UDPPublisher(publisher.PublisherBase):
LOG.warning(_("Unable to send sample over UDP"))
LOG.exception(e)
def publish_events(self, context, events):
def publish_events(self, events):
"""Send an event message for publishing
:param context: Execution context from the service or RPC call
:param events: events from pipeline after transformation
"""
raise ceilometer.NotImplementedError

View File

@ -72,8 +72,7 @@ class TestDirectPublisher(tests_db.TestBase):
group='database')
parsed_url = netutils.urlsplit('direct://')
publisher = direct.DirectPublisher(parsed_url)
publisher.publish_samples(None,
self.test_data)
publisher.publish_samples(self.test_data)
meters = list(self.conn.get_meters(resource=self.resource_id))
names = sorted([meter.name for meter in meters])
@ -92,7 +91,7 @@ class TestEventDirectPublisher(tests_db.TestBase):
def test_direct_publisher(self):
parsed_url = netutils.urlsplit('direct://')
publisher = direct.DirectPublisher(parsed_url)
publisher.publish_events(None, self.test_data)
publisher.publish_events(self.test_data)
e_types = list(self.event_conn.get_event_types())
self.assertEqual(5, len(e_types))

View File

@ -23,7 +23,6 @@ import datetime
import traceback
import mock
from oslo_context import context
from oslo_utils import timeutils
from oslotest import base
from oslotest import mockpatch
@ -81,10 +80,10 @@ class BasePipelineTestCase(base.BaseTestCase):
return fake_drivers[url](url)
class PublisherClassException(publisher.PublisherBase):
def publish_samples(self, ctxt, samples):
def publish_samples(self, samples):
raise Exception()
def publish_events(self, ctxt, events):
def publish_events(self, events):
raise Exception()
class TransformerClass(transformer.TransformerBase):
@ -95,10 +94,11 @@ class BasePipelineTestCase(base.BaseTestCase):
self.__class__.samples = []
self.append_name = append_name
def flush(self, ctxt):
@staticmethod
def flush():
return []
def handle_sample(self, ctxt, counter):
def handle_sample(self, counter):
self.__class__.samples.append(counter)
newname = getattr(counter, 'name') + self.append_name
return sample.Sample(
@ -120,14 +120,14 @@ class BasePipelineTestCase(base.BaseTestCase):
def __init__(self):
self.__class__.samples = []
def handle_sample(self, ctxt, counter):
def handle_sample(self, counter):
self.__class__.samples.append(counter)
class TransformerClassException(object):
grouping_keys = ['resource_id']
@staticmethod
def handle_sample(ctxt, counter):
def handle_sample(counter):
raise Exception()
def setUp(self):
@ -268,7 +268,7 @@ class BasePipelineTestCase(base.BaseTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -284,7 +284,7 @@ class BasePipelineTestCase(base.BaseTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -302,7 +302,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata=self.test_counter.resource_metadata,
)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
self.assertEqual(2, len(publisher.samples))
@ -329,7 +329,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata=self.test_counter.resource_metadata,
)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([test_s])
LOG.warning.assert_called_once_with(
@ -362,7 +362,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata=self.test_counter.resource_metadata,
)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([test_s])
LOG.warning.assert_called_once_with(
@ -381,7 +381,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -393,7 +393,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -413,7 +413,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.samples))
@ -425,7 +425,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('counters', counter_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -480,7 +480,7 @@ class BasePipelineTestCase(base.BaseTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
self.test_counter = sample.Sample(
@ -495,7 +495,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata=self.test_counter.resource_metadata,
)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -518,7 +518,7 @@ class BasePipelineTestCase(base.BaseTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
self.test_counter = sample.Sample(
@ -533,7 +533,7 @@ class BasePipelineTestCase(base.BaseTestCase):
resource_metadata=self.test_counter.resource_metadata,
)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -550,7 +550,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('transformers', None)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.samples))
@ -561,7 +561,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('transformers', [])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.samples))
@ -583,7 +583,7 @@ class BasePipelineTestCase(base.BaseTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -617,7 +617,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
self.assertEqual(2, len(self.TransformerClass.samples))
@ -655,7 +655,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('transformers', transformer_cfg)
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -673,7 +673,7 @@ class BasePipelineTestCase(base.BaseTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -690,7 +690,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('publishers', ['except://', 'new://'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
new_publisher = pipeline_manager.pipelines[0].publishers[1]
@ -702,7 +702,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('counters', ['a', 'b'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter,
sample.Sample(
name='b',
@ -743,17 +743,17 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(None, self.test_counter)
pipe.publish_data(self.test_counter)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(publisher.samples))
pipe.flush(None)
pipe.flush()
self.assertEqual(0, len(publisher.samples))
pipe.publish_data(None, self.test_counter)
pipe.flush(None)
pipe.publish_data(self.test_counter)
pipe.flush()
self.assertEqual(0, len(publisher.samples))
for i in range(CACHE_SIZE - 2):
pipe.publish_data(None, self.test_counter)
pipe.flush(None)
pipe.publish_data(self.test_counter)
pipe.flush()
self.assertEqual(CACHE_SIZE, len(publisher.samples))
self.assertEqual('a_update_new', getattr(publisher.samples[0], 'name'))
@ -778,7 +778,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('counters', ['a', 'b'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter,
sample.Sample(
name='b',
@ -795,7 +795,7 @@ class BasePipelineTestCase(base.BaseTestCase):
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(publisher.samples))
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
self.assertEqual(CACHE_SIZE, len(publisher.samples))
@ -815,9 +815,9 @@ class BasePipelineTestCase(base.BaseTestCase):
pipe = pipeline_manager.pipelines[0]
publisher = pipe.publishers[0]
pipe.publish_data(None, self.test_counter)
pipe.publish_data(self.test_counter)
self.assertEqual(0, len(publisher.samples))
pipe.flush(None)
pipe.flush()
self.assertEqual(1, len(publisher.samples))
self.assertEqual('a_update',
getattr(publisher.samples[0], 'name'))
@ -855,10 +855,10 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(None, counters)
pipe.publish_data(counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.samples))
pipe.flush(None)
pipe.flush()
self.assertEqual(1, len(publisher.samples))
cpu_mins = publisher.samples[-1]
self.assertEqual('cpu_mins', getattr(cpu_mins, 'name'))
@ -909,7 +909,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(None, counters)
pipe.publish_data(counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(2, len(publisher.samples))
core_temp = publisher.samples[0]
@ -999,10 +999,10 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(None, counters)
pipe.publish_data(counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(2, len(publisher.samples))
pipe.flush(None)
pipe.flush()
self.assertEqual(2, len(publisher.samples))
cpu_util = publisher.samples[0]
self.assertEqual('cpu_util', getattr(cpu_util, 'name'))
@ -1084,10 +1084,10 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(None, counters)
pipe.publish_data(counters)
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(publisher.samples))
pipe.flush(None)
pipe.flush()
self.assertEqual(0, len(publisher.samples))
@mock.patch('ceilometer.transformer.conversions.LOG')
@ -1151,10 +1151,10 @@ class BasePipelineTestCase(base.BaseTestCase):
),
]
pipe.publish_data(None, counters)
pipe.publish_data(counters)
publisher = pipe.publishers[0]
self.assertEqual(1, len(publisher.samples))
pipe.flush(None)
pipe.flush()
self.assertEqual(1, len(publisher.samples))
cpu_util_sample = publisher.samples[0]
@ -1201,10 +1201,10 @@ class BasePipelineTestCase(base.BaseTestCase):
)
counters.append(s)
pipe.publish_data(None, counters)
pipe.publish_data(counters)
publisher = pipe.publishers[0]
self.assertEqual(2, len(publisher.samples))
pipe.flush(None)
pipe.flush()
self.assertEqual(2, len(publisher.samples))
bps = publisher.samples[0]
self.assertEqual('%s.rate' % meters[0], getattr(bps, 'name'))
@ -1330,8 +1330,8 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(None, counters)
pipe.flush(None)
pipe.publish_data(counters)
pipe.flush()
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(expected_length, len(publisher.samples))
return sorted(publisher.samples, key=lambda s: s.volume)
@ -1366,8 +1366,8 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(None, counters)
pipe.flush(None)
pipe.publish_data(counters)
pipe.flush()
publisher = pipeline_manager.pipelines[0].publishers[0]
actual = sorted(s.volume for s in publisher.samples)
self.assertEqual([2.0, 3.0, 6.0], actual)
@ -1562,13 +1562,13 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(None, [counters[0]])
pipe.flush(None)
pipe.publish_data([counters[0]])
pipe.flush()
publisher = pipe.publishers[0]
self.assertEqual(0, len(publisher.samples))
pipe.publish_data(None, [counters[1]])
pipe.flush(None)
pipe.publish_data([counters[1]])
pipe.flush()
publisher = pipe.publishers[0]
self.assertEqual(2, len(publisher.samples))
@ -1600,13 +1600,13 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(None, counters)
pipe.flush(None)
pipe.publish_data(counters)
pipe.flush()
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(publisher.samples))
timeutils.advance_time_seconds(120)
pipe.flush(None)
pipe.flush()
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.samples))
@ -1648,13 +1648,13 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(None, [counters[0]])
pipe.flush(None)
pipe.publish_data([counters[0]])
pipe.flush()
publisher = pipe.publishers[0]
self.assertEqual(0, len(publisher.samples))
pipe.publish_data(None, [counters[1]])
pipe.flush(None)
pipe.publish_data([counters[1]])
pipe.flush()
publisher = pipe.publishers[0]
self.assertEqual(1, len(publisher.samples))
@ -1680,20 +1680,19 @@ class BasePipelineTestCase(base.BaseTestCase):
counter.volume = offset
counter.type = sample.TYPE_CUMULATIVE
counter.unit = 'ns'
aggregator.handle_sample(context.get_admin_context(), counter)
aggregator.handle_sample(counter)
if offset == 1:
test_time = counter_time
counter_time = counter_time + datetime.timedelta(0, 1)
aggregated_counters = aggregator.flush(context.get_admin_context())
aggregated_counters = aggregator.flush()
self.assertEqual(len(aggregated_counters), 1)
self.assertEqual(aggregated_counters[0].timestamp,
timeutils.isotime(test_time))
rate_of_change_transformer.handle_sample(context.get_admin_context(),
aggregated_counters[0])
rate_of_change_transformer.handle_sample(aggregated_counters[0])
for offset in range(2):
counter = copy.copy(self.test_counter)
@ -1702,20 +1701,20 @@ class BasePipelineTestCase(base.BaseTestCase):
counter.volume = 2
counter.type = sample.TYPE_CUMULATIVE
counter.unit = 'ns'
aggregator.handle_sample(context.get_admin_context(), counter)
aggregator.handle_sample(counter)
if offset == 0:
test_time = counter_time
counter_time = counter_time + datetime.timedelta(0, 1)
aggregated_counters = aggregator.flush(context.get_admin_context())
aggregated_counters = aggregator.flush()
self.assertEqual(len(aggregated_counters), 2)
for counter in aggregated_counters:
if counter.resource_id == resource_id[0]:
rateOfChange = rate_of_change_transformer.handle_sample(
context.get_admin_context(), counter)
counter)
self.assertEqual(counter.timestamp,
timeutils.isotime(test_time))
@ -1796,8 +1795,8 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
for s in counters:
pipe.publish_data(None, s)
pipe.flush(None)
pipe.publish_data(s)
pipe.flush()
publisher = pipeline_manager.pipelines[0].publishers[0]
expected_len = len(test_resources) * len(expected)
self.assertEqual(expected_len, len(publisher.samples))
@ -1916,19 +1915,19 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(None, [counter])
pipe.publish_data([counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(publisher.samples))
pipe.flush(None)
pipe.flush()
self.assertEqual(1, len(publisher.samples))
self.assertEqual(1026.0, publisher.samples[0].volume)
pipe.flush(None)
pipe.flush()
self.assertEqual(1, len(publisher.samples))
counter.volume = 2048.0
pipe.publish_data(None, [counter])
pipe.flush(None)
pipe.publish_data([counter])
pipe.flush()
self.assertEqual(2, len(publisher.samples))
self.assertEqual(2050.0, publisher.samples[1].volume)
@ -1946,7 +1945,7 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
timeutils.advance_time_seconds(200)
pipe = pipeline_manager.pipelines[0]
pipe.flush(None)
pipe.flush()
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(0, len(publisher.samples))
@ -1967,8 +1966,8 @@ class BasePipelineTestCase(base.BaseTestCase):
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
pipe.publish_data(None, data)
pipe.flush(None)
pipe.publish_data(data)
pipe.flush()
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(expected, len(publisher.samples))
return publisher.samples

View File

@ -57,5 +57,4 @@ class NotificationBaseTestCase(base.BaseTestCase):
'payload': {'foo': 'bar'},
'message_id': '3577a84f-29ec-4904-9566-12c52289c2e8'
}
plugin.to_samples_and_publish.assert_called_with(mock.ANY,
notification)
plugin.to_samples_and_publish.assert_called_with(notification)

View File

@ -72,8 +72,7 @@ class TestFilePublisher(base.BaseTestCase):
parsed_url = netutils.urlsplit('file://%s?max_bytes=50&backup_count=3'
% name)
publisher = file.FilePublisher(parsed_url)
publisher.publish_samples(None,
self.test_data)
publisher.publish_samples(self.test_data)
handler = publisher.publisher_logger.handlers[0]
self.assertIsInstance(handler,
@ -90,8 +89,7 @@ class TestFilePublisher(base.BaseTestCase):
name = '%s/log_file_plain' % tempdir
parsed_url = netutils.urlsplit('file://%s' % name)
publisher = file.FilePublisher(parsed_url)
publisher.publish_samples(None,
self.test_data)
publisher.publish_samples(self.test_data)
handler = publisher.publisher_logger.handlers[0]
self.assertIsInstance(handler,
@ -114,7 +112,6 @@ class TestFilePublisher(base.BaseTestCase):
'file://%s/log_file_bad'
'?max_bytes=yus&backup_count=5y' % tempdir)
publisher = file.FilePublisher(parsed_url)
publisher.publish_samples(None,
self.test_data)
publisher.publish_samples(self.test_data)
self.assertIsNone(publisher.publisher_logger)

View File

@ -123,14 +123,14 @@ class TestHttpPublisher(base.BaseTestCase):
res = mock.Mock()
res.status_code = 200
with mock.patch.object(Session, 'post', return_value=res) as m_req:
publisher.publish_samples(None, self.sample_data)
publisher.publish_samples(self.sample_data)
self.assertEqual(1, m_req.call_count)
self.assertFalse(thelog.error.called)
res.status_code = 401
with mock.patch.object(Session, 'post', return_value=res) as m_req:
publisher.publish_samples(None, self.sample_data)
publisher.publish_samples(self.sample_data)
self.assertEqual(1, m_req.call_count)
self.assertTrue(thelog.error.called)
@ -144,14 +144,14 @@ class TestHttpPublisher(base.BaseTestCase):
res = mock.Mock()
res.status_code = 200
with mock.patch.object(Session, 'post', return_value=res) as m_req:
publisher.publish_events(None, self.event_data)
publisher.publish_events(self.event_data)
self.assertEqual(1, m_req.call_count)
self.assertFalse(thelog.error.called)
res.status_code = 401
with mock.patch.object(Session, 'post', return_value=res) as m_req:
publisher.publish_samples(None, self.event_data)
publisher.publish_samples(self.event_data)
self.assertEqual(1, m_req.call_count)
self.assertTrue(thelog.error.called)
@ -164,7 +164,7 @@ class TestHttpPublisher(base.BaseTestCase):
res = mock.Mock()
res.status_code = 200
with mock.patch.object(Session, 'post', return_value=res) as m_req:
publisher.publish_events(None, self.empty_event_data)
publisher.publish_events(self.empty_event_data)
self.assertEqual(0, m_req.call_count)
self.assertTrue(thelog.debug.called)

View File

@ -102,7 +102,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
'kafka://127.0.0.1:9092?topic=ceilometer'))
with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_samples(mock.MagicMock(), self.test_data)
publisher.publish_samples(self.test_data)
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
@ -111,7 +111,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
netutils.urlsplit('kafka://127.0.0.1:9092'))
with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_samples(mock.MagicMock(), self.test_data)
publisher.publish_samples(self.test_data)
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
@ -132,7 +132,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
fake_producer.send_messages.side_effect = TypeError
self.assertRaises(msg_publisher.DeliveryFailure,
publisher.publish_samples,
mock.MagicMock(), self.test_data)
self.test_data)
self.assertEqual(100, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
@ -142,7 +142,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = Exception("test")
publisher.publish_samples(mock.MagicMock(), self.test_data)
publisher.publish_samples(self.test_data)
self.assertEqual(1, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
@ -152,7 +152,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = Exception("test")
publisher.publish_samples(mock.MagicMock(), self.test_data)
publisher.publish_samples(self.test_data)
self.assertEqual(1, len(fake_producer.send_messages.mock_calls))
self.assertEqual(1, len(publisher.local_queue))
@ -166,13 +166,13 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
for i in range(0, 2000):
for s in self.test_data:
s.name = 'test-%d' % i
publisher.publish_samples(mock.MagicMock(), self.test_data)
publisher.publish_samples(self.test_data)
self.assertEqual(1024, len(publisher.local_queue))
self.assertEqual('test-976',
publisher.local_queue[0][2][0]['counter_name'])
publisher.local_queue[0][1][0]['counter_name'])
self.assertEqual('test-1999',
publisher.local_queue[1023][2][0]['counter_name'])
publisher.local_queue[1023][1][0]['counter_name'])
def test_publish_to_host_from_down_to_up_with_queue(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
@ -183,14 +183,14 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
for i in range(0, 16):
for s in self.test_data:
s.name = 'test-%d' % i
publisher.publish_samples(mock.MagicMock(), self.test_data)
publisher.publish_samples(self.test_data)
self.assertEqual(16, len(publisher.local_queue))
fake_producer.send_messages.side_effect = None
for s in self.test_data:
s.name = 'test-%d' % 16
publisher.publish_samples(mock.MagicMock(), self.test_data)
publisher.publish_samples(self.test_data)
self.assertEqual(0, len(publisher.local_queue))
def test_publish_event_with_default_policy(self):
@ -198,13 +198,13 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
netutils.urlsplit('kafka://127.0.0.1:9092?topic=ceilometer'))
with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_events(mock.MagicMock(), self.test_event_data)
publisher.publish_events(self.test_event_data)
self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
with mock.patch.object(publisher, '_producer') as fake_producer:
fake_producer.send_messages.side_effect = Exception("test")
self.assertRaises(msg_publisher.DeliveryFailure,
publisher.publish_events,
mock.MagicMock(), self.test_event_data)
self.test_event_data)
self.assertEqual(100, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))

View File

@ -151,12 +151,12 @@ class TestPublisherPolicy(TestPublisher):
self.assertRaises(
msg_publisher.DeliveryFailure,
getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
self.test_data)
self.assertTrue(mylog.info.called)
self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with(
mock.ANY, self.topic, mock.ANY)
self.topic, mock.ANY)
@mock.patch('ceilometer.publisher.messaging.LOG')
def test_published_with_policy_block(self, mylog):
@ -168,11 +168,11 @@ class TestPublisherPolicy(TestPublisher):
self.assertRaises(
msg_publisher.DeliveryFailure,
getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
self.test_data)
self.assertTrue(mylog.info.called)
self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with(
mock.ANY, self.topic, mock.ANY)
self.topic, mock.ANY)
@mock.patch('ceilometer.publisher.messaging.LOG')
def test_published_with_policy_incorrect(self, mylog):
@ -184,12 +184,12 @@ class TestPublisherPolicy(TestPublisher):
self.assertRaises(
msg_publisher.DeliveryFailure,
getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
self.test_data)
self.assertTrue(mylog.warning.called)
self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with(
mock.ANY, self.topic, mock.ANY)
self.topic, mock.ANY)
@mock.patch('ceilometer.publisher.messaging.LOG', mock.Mock())
@ -201,11 +201,10 @@ class TestPublisherPolicyReactions(TestPublisher):
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
getattr(publisher, self.pub_func)(mock.MagicMock(),
self.test_data)
getattr(publisher, self.pub_func)(self.test_data)
self.assertEqual(0, len(publisher.local_queue))
fake_send.assert_called_once_with(
mock.ANY, self.topic, mock.ANY)
self.topic, mock.ANY)
def test_published_with_policy_queue_and_rpc_down(self):
publisher = self.publisher_cls(
@ -214,11 +213,10 @@ class TestPublisherPolicyReactions(TestPublisher):
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
getattr(publisher, self.pub_func)(mock.MagicMock(),
self.test_data)
getattr(publisher, self.pub_func)(self.test_data)
self.assertEqual(1, len(publisher.local_queue))
fake_send.assert_called_once_with(
mock.ANY, self.topic, mock.ANY)
self.topic, mock.ANY)
def test_published_with_policy_queue_and_rpc_down_up(self):
self.rpc_unreachable = True
@ -228,21 +226,19 @@ class TestPublisherPolicyReactions(TestPublisher):
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
getattr(publisher, self.pub_func)(mock.MagicMock(),
self.test_data)
getattr(publisher, self.pub_func)(self.test_data)
self.assertEqual(1, len(publisher.local_queue))
fake_send.side_effect = mock.MagicMock()
getattr(publisher, self.pub_func)(mock.MagicMock(),
self.test_data)
getattr(publisher, self.pub_func)(self.test_data)
self.assertEqual(0, len(publisher.local_queue))
topic = self.topic
expected = [mock.call(mock.ANY, topic, mock.ANY),
mock.call(mock.ANY, topic, mock.ANY),
mock.call(mock.ANY, topic, mock.ANY)]
expected = [mock.call(topic, mock.ANY),
mock.call(topic, mock.ANY),
mock.call(topic, mock.ANY)]
self.assertEqual(expected, fake_send.mock_calls)
def test_published_with_policy_sized_queue_and_rpc_down(self):
@ -255,21 +251,20 @@ class TestPublisherPolicyReactions(TestPublisher):
for i in range(0, 5):
for s in self.test_data:
setattr(s, self.attr, 'test-%d' % i)
getattr(publisher, self.pub_func)(mock.MagicMock(),
self.test_data)
getattr(publisher, self.pub_func)(self.test_data)
self.assertEqual(3, len(publisher.local_queue))
self.assertEqual(
'test-2',
publisher.local_queue[0][2][0][self.attr]
publisher.local_queue[0][1][0][self.attr]
)
self.assertEqual(
'test-3',
publisher.local_queue[1][2][0][self.attr]
publisher.local_queue[1][1][0][self.attr]
)
self.assertEqual(
'test-4',
publisher.local_queue[2][2][0][self.attr]
publisher.local_queue[2][1][0][self.attr]
)
def test_published_with_policy_default_sized_queue_and_rpc_down(self):
@ -282,15 +277,14 @@ class TestPublisherPolicyReactions(TestPublisher):
for i in range(0, 2000):
for s in self.test_data:
setattr(s, self.attr, 'test-%d' % i)
getattr(publisher, self.pub_func)(mock.MagicMock(),
self.test_data)
getattr(publisher, self.pub_func)(self.test_data)
self.assertEqual(1024, len(publisher.local_queue))
self.assertEqual(
'test-976',
publisher.local_queue[0][2][0][self.attr]
publisher.local_queue[0][1][0][self.attr]
)
self.assertEqual(
'test-1999',
publisher.local_queue[1023][2][0][self.attr]
publisher.local_queue[1023][1][0][self.attr]
)

View File

@ -133,8 +133,7 @@ class TestUDPPublisher(base.BaseTestCase):
self._make_fake_socket(self.data_sent)):
publisher = udp.UDPPublisher(
netutils.urlsplit('udp://somehost'))
publisher.publish_samples(None,
self.test_data)
publisher.publish_samples(self.test_data)
self.assertEqual(5, len(self.data_sent))
@ -172,5 +171,4 @@ class TestUDPPublisher(base.BaseTestCase):
self._make_broken_socket):
publisher = udp.UDPPublisher(
netutils.urlsplit('udp://localhost'))
publisher.publish_samples(None,
self.test_data)
publisher.publish_samples(self.test_data)

View File

@ -139,7 +139,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
self.test_counter = sample.Sample(
@ -154,7 +154,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
resource_metadata=self.test_counter.resource_metadata,
)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
self.assertEqual(2, len(pipeline_manager.pipelines))
@ -182,7 +182,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
self.test_counter = sample.Sample(
@ -197,7 +197,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
resource_metadata=self.test_counter.resource_metadata,
)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_counter])
self.assertEqual(2, len(pipeline_manager.pipelines))

View File

@ -37,10 +37,10 @@ class EventPipelineTestCase(base.BaseTestCase):
return fake_drivers[url](url)
class PublisherClassException(publisher.PublisherBase):
def publish_samples(self, ctxt, samples):
def publish_samples(self, samples):
pass
def publish_events(self, ctxt, events):
def publish_events(self, events):
raise Exception()
def setUp(self):
@ -199,13 +199,13 @@ class EventPipelineTestCase(base.BaseTestCase):
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.events))
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_event2])
self.assertEqual(2, len(publisher.events))
@ -218,7 +218,7 @@ class EventPipelineTestCase(base.BaseTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -231,7 +231,7 @@ class EventPipelineTestCase(base.BaseTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -252,7 +252,7 @@ class EventPipelineTestCase(base.BaseTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.events))
@ -264,7 +264,7 @@ class EventPipelineTestCase(base.BaseTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -324,7 +324,7 @@ class EventPipelineTestCase(base.BaseTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_event, self.test_event2])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -342,7 +342,7 @@ class EventPipelineTestCase(base.BaseTestCase):
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[0]
@ -358,7 +358,7 @@ class EventPipelineTestCase(base.BaseTestCase):
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager,
self.p_type)
with pipeline_manager.publisher(None) as p:
with pipeline_manager.publisher() as p:
p([self.test_event])
publisher = pipeline_manager.pipelines[0].publishers[1]
@ -401,7 +401,7 @@ class EventPipelineTestCase(base.BaseTestCase):
self.transformer_manager,
self.p_type)
event_pipeline_endpoint = pipeline.EventPipelineEndpoint(
mock.Mock(), pipeline_manager.pipelines[0])
pipeline_manager.pipelines[0])
fake_publisher.publish_events.side_effect = Exception
ret = event_pipeline_endpoint.sample([

View File

@ -14,7 +14,6 @@
# under the License.
import copy
from oslo_context import context
from oslo_utils import timeutils
from oslotest import base
@ -94,7 +93,7 @@ class AggregatorTransformerTestCase(base.BaseTestCase):
retention_time="300")
self._insert_sample_data(aggregator)
samples = aggregator.flush(context.get_admin_context())
samples = aggregator.flush()
self.assertEqual([], samples)
@ -102,7 +101,7 @@ class AggregatorTransformerTestCase(base.BaseTestCase):
aggregator = conversions.AggregatorTransformer(size="100")
self._insert_sample_data(aggregator)
samples = aggregator.flush(context.get_admin_context())
samples = aggregator.flush()
self.assertEqual(100, len(samples))
@ -111,5 +110,5 @@ class AggregatorTransformerTestCase(base.BaseTestCase):
sample = copy.copy(self.SAMPLE)
sample.resource_id = sample.resource_id + str(self._sample_offset)
sample.timestamp = timeutils.isotime()
aggregator.handle_sample(context.get_admin_context(), sample)
aggregator.handle_sample(sample)
self._sample_offset += 1

View File

@ -36,10 +36,9 @@ class TransformerBase(object):
super(TransformerBase, self).__init__()
@abc.abstractmethod
def handle_sample(self, context, sample):
def handle_sample(self, sample):
"""Transform a sample.
:param context: Passed from the data collector.
:param sample: A sample.
"""
@ -47,11 +46,9 @@ class TransformerBase(object):
def grouping_keys(self):
"""Keys used to group transformer."""
def flush(self, context):
"""Flush samples cached previously.
:param context: Passed from the data collector.
"""
@staticmethod
def flush():
"""Flush samples cached previously."""
return []

View File

@ -30,13 +30,13 @@ class TransformerAccumulator(transformer.TransformerBase):
self.size = size
super(TransformerAccumulator, self).__init__(**kwargs)
def handle_sample(self, context, sample):
def handle_sample(self, sample):
if self.size >= 1:
self.samples.append(sample)
else:
return sample
def flush(self, context):
def flush(self):
if len(self.samples) >= self.size:
x = self.samples
self.samples = []

View File

@ -95,11 +95,11 @@ class ArithmeticTransformer(transformer.TransformerBase):
LOG.warning(_('Unable to evaluate expression %(expr)s: %(exc)s'),
{'expr': self.expr, 'exc': e})
def handle_sample(self, context, _sample):
def handle_sample(self, _sample):
self._update_cache(_sample)
self.latest_timestamp = _sample.timestamp
def flush(self, context):
def flush(self):
new_samples = []
cache_clean_list = []
if not self.misconfigured:

View File

@ -72,7 +72,7 @@ class DeltaTransformer(BaseConversionTransformer):
self.growth_only = growth_only
self.cache = {}
def handle_sample(self, context, s):
def handle_sample(self, s):
"""Handle a sample, converting if necessary."""
key = s.name + s.resource_id
prev = self.cache.get(key)
@ -159,7 +159,7 @@ class ScalingTransformer(BaseConversionTransformer):
resource_metadata=s.resource_metadata
)
def handle_sample(self, context, s):
def handle_sample(self, s):
"""Handle a sample, converting if necessary."""
LOG.debug('handling sample %s', s)
if self.source.get('unit', s.unit) == s.unit:
@ -181,7 +181,7 @@ class RateOfChangeTransformer(ScalingTransformer):
self.cache = {}
self.scale = self.scale or '1'
def handle_sample(self, context, s):
def handle_sample(self, s):
"""Handle a sample, converting if necessary."""
LOG.debug('handling sample %s', s)
key = s.name + s.resource_id
@ -293,7 +293,7 @@ class AggregatorTransformer(ScalingTransformer):
# NOTE(sileht): it assumes, a meter always have the same unit/type
return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
def handle_sample(self, context, sample_):
def handle_sample(self, sample_):
if not self.initial_timestamp:
self.initial_timestamp = timeutils.parse_isotime(sample_.timestamp)
@ -317,7 +317,7 @@ class AggregatorTransformer(ScalingTransformer):
setattr(self.samples[key], field,
getattr(sample_, field))
def flush(self, context):
def flush(self):
if not self.initial_timestamp:
return []