Add http publisher

This patch adds a http publisher so that notification agent can be
configured to send samples or events directly to a http endpoint.

Change-Id: Ic1b2cabb49bcdbb229a3ae65982e3b3e263407f3
This commit is contained in:
Tong Li 2016-03-18 11:21:02 -04:00 committed by litong01
parent 04d91aad63
commit c634b5f0d1
3 changed files with 311 additions and 0 deletions

139
ceilometer/publisher/http.py Executable file
View File

@ -0,0 +1,139 @@
#
# Copyright 2016 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from oslo_serialization import jsonutils
import requests
from requests import adapters
from six.moves.urllib import parse as urlparse
from ceilometer.i18n import _LE
from ceilometer import publisher
LOG = log.getLogger(__name__)
class HttpPublisher(publisher.PublisherBase):
"""Publisher metering data to a http endpoint
The publisher which records metering data into a http endpoint. The
endpoint should be configured in ceilometer pipeline configuration file.
If the timeout and/or retry_count are not specified, the default timeout
and retry_count will be set to 1000 and 2 respectively.
To use this publisher for samples, add the following section to the
/etc/ceilometer/publisher.yaml file or simply add it to an existing
pipeline::
- name: meter_file
interval: 600
counters:
- "*"
transformers:
publishers:
- http://host:80/path?timeout=1&max_retries=2
To use this publisher for events, the raw message needs to be present in
the event. To enable that, ceilometer.conf file will need to have a
section like the following:
[event]
store_raw = info
Then in the event_pipeline.yaml file, you can use the publisher in one of
the sinks like the following:
- name: event_sink
transformers:
publishers:
- http://host:80/path?timeout=1&max_retries=2
Http end point is required for this publisher to work properly.
"""
def __init__(self, parsed_url):
super(HttpPublisher, self).__init__(parsed_url)
self.target = parsed_url.geturl()
if not parsed_url.hostname:
raise ValueError('The hostname of an endpoint for '
'HttpPublisher is required')
# non-numeric port from the url string will cause a ValueError
# exception when the port is read. Do a read to make sure the port
# is valid, if not, ValueError will be thrown.
parsed_url.port
self.headers = {'Content-type': 'application/json'}
# Handling other configuration options in the query string
if parsed_url.query:
params = urlparse.parse_qs(parsed_url.query)
self.timeout = self._get_param(params, 'timeout', 1)
self.max_retries = self._get_param(params, 'max_retries', 2)
else:
self.timeout = 1
self.max_retries = 2
LOG.debug('HttpPublisher for endpoint %s is initialized!' %
self.target)
@staticmethod
def _get_param(params, name, default_value):
try:
return int(params.get(name)[-1])
except (ValueError, TypeError):
LOG.debug('Default value %(value)s is used for %(name)s' %
{'value': default_value, 'name': name})
return default_value
def _do_post(self, data):
if not data:
LOG.debug('Data set is empty!')
return
session = requests.Session()
session.mount(self.target,
adapters.HTTPAdapter(max_retries=self.max_retries))
content = ','.join([jsonutils.dumps(item) for item in data])
content = '[' + content + ']'
LOG.debug('Data to be posted by HttpPublisher: %s' % content)
res = session.post(self.target, data=content, headers=self.headers,
timeout=self.timeout)
if res.status_code >= 300:
LOG.error(_LE('Data post failed with status code %s') %
res.status_code)
def publish_samples(self, context, samples):
"""Send a metering message for publishing
:param context: Execution context from the service or RPC call
:param samples: Samples from pipeline after transformation
"""
data = [sample.as_dict() for sample in samples]
self._do_post(data)
def publish_events(self, context, events):
"""Send an event message for publishing
:param context: Execution context from the service or RPC call
:param events: events from pipeline after transformation
"""
data = [evt.as_dict()['raw']['payload'] for evt in events
if evt.as_dict().get('raw', {}).get('payload')]
self._do_post(data)

View File

@ -0,0 +1,170 @@
#
# Copyright 2016 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/publisher/http.py
"""
import datetime
import mock
from oslotest import base
from requests import Session
from six.moves.urllib import parse as urlparse
import uuid
from ceilometer.event.storage import models as event
from ceilometer.publisher import http
from ceilometer import sample
class TestHttpPublisher(base.BaseTestCase):
resource_id = str(uuid.uuid4())
sample_data = [
sample.Sample(
name='alpha',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id=resource_id,
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
sample.Sample(
name='beta',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id=resource_id,
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
sample.Sample(
name='gamma',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id=resource_id,
timestamp=datetime.datetime.now().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
]
event_data = [event.Event(
message_id=str(uuid.uuid4()), event_type='event_%d' % i,
generated=datetime.datetime.utcnow().isoformat(),
traits=[], raw={'payload': {'some': 'aa'}}) for i in range(0, 2)]
empty_event_data = [event.Event(
message_id=str(uuid.uuid4()), event_type='event_%d' % i,
generated=datetime.datetime.utcnow().isoformat(),
traits=[], raw={'payload': {}}) for i in range(0, 2)]
def test_http_publisher_config(self):
"""Test publisher config parameters."""
# invalid hostname, the given url, results in an empty hostname
parsed_url = urlparse.urlparse('http:/aaa.bb/path')
self.assertRaises(ValueError, http.HttpPublisher,
parsed_url)
# invalid port
parsed_url = urlparse.urlparse('http://aaa:bb/path')
self.assertRaises(ValueError, http.HttpPublisher,
parsed_url)
parsed_url = urlparse.urlparse('http://localhost:90/path1')
publisher = http.HttpPublisher(parsed_url)
# By default, timeout and retry_count should be set to 1000 and 2
# respectively
self.assertEqual(1, publisher.timeout)
self.assertEqual(2, publisher.max_retries)
parsed_url = urlparse.urlparse('http://localhost:90/path1?'
'timeout=19&max_retries=4')
publisher = http.HttpPublisher(parsed_url)
self.assertEqual(19, publisher.timeout)
self.assertEqual(4, publisher.max_retries)
parsed_url = urlparse.urlparse('http://localhost:90/path1?'
'timeout=19')
publisher = http.HttpPublisher(parsed_url)
self.assertEqual(19, publisher.timeout)
self.assertEqual(2, publisher.max_retries)
parsed_url = urlparse.urlparse('http://localhost:90/path1?'
'max_retries=6')
publisher = http.HttpPublisher(parsed_url)
self.assertEqual(1, publisher.timeout)
self.assertEqual(6, publisher.max_retries)
@mock.patch('ceilometer.publisher.http.LOG')
def test_http_post_samples(self, thelog):
"""Test publisher post."""
parsed_url = urlparse.urlparse('http://localhost:90/path1')
publisher = http.HttpPublisher(parsed_url)
res = mock.Mock()
res.status_code = 200
with mock.patch.object(Session, 'post', return_value=res) as m_req:
publisher.publish_samples(None, self.sample_data)
self.assertEqual(1, m_req.call_count)
self.assertFalse(thelog.error.called)
res.status_code = 401
with mock.patch.object(Session, 'post', return_value=res) as m_req:
publisher.publish_samples(None, self.sample_data)
self.assertEqual(1, m_req.call_count)
self.assertTrue(thelog.error.called)
@mock.patch('ceilometer.publisher.http.LOG')
def test_http_post_events(self, thelog):
"""Test publisher post."""
parsed_url = urlparse.urlparse('http://localhost:90/path1')
publisher = http.HttpPublisher(parsed_url)
res = mock.Mock()
res.status_code = 200
with mock.patch.object(Session, 'post', return_value=res) as m_req:
publisher.publish_events(None, self.event_data)
self.assertEqual(1, m_req.call_count)
self.assertFalse(thelog.error.called)
res.status_code = 401
with mock.patch.object(Session, 'post', return_value=res) as m_req:
publisher.publish_samples(None, self.event_data)
self.assertEqual(1, m_req.call_count)
self.assertTrue(thelog.error.called)
@mock.patch('ceilometer.publisher.http.LOG')
def test_http_post_empty_data(self, thelog):
parsed_url = urlparse.urlparse('http://localhost:90/path1')
publisher = http.HttpPublisher(parsed_url)
res = mock.Mock()
res.status_code = 200
with mock.patch.object(Session, 'post', return_value=res) as m_req:
publisher.publish_events(None, self.empty_event_data)
self.assertEqual(0, m_req.call_count)
self.assertTrue(thelog.debug.called)

2
setup.cfg Normal file → Executable file
View File

@ -225,12 +225,14 @@ ceilometer.publisher =
file = ceilometer.publisher.file:FilePublisher
direct = ceilometer.publisher.direct:DirectPublisher
kafka = ceilometer.publisher.kafka_broker:KafkaBrokerPublisher
http = ceilometer.publisher.http:HttpPublisher
ceilometer.event.publisher =
test = ceilometer.publisher.test:TestPublisher
direct = ceilometer.publisher.direct:DirectPublisher
notifier = ceilometer.publisher.messaging:EventNotifierPublisher
kafka = ceilometer.publisher.kafka_broker:KafkaBrokerPublisher
http = ceilometer.publisher.http:HttpPublisher
ceilometer.event.trait_plugin =
split = ceilometer.event.trait_plugins:SplitterTraitPlugin