diff --git a/ceilometer/publisher/opentelemetry_http.py b/ceilometer/publisher/opentelemetry_http.py new file mode 100644 index 0000000000..4a6697231c --- /dev/null +++ b/ceilometer/publisher/opentelemetry_http.py @@ -0,0 +1,152 @@ +# +# Copyright 2024 cmss, inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import time + +from oslo_log import log +from oslo_utils import timeutils + +from ceilometer.publisher import http +from ceilometer import sample as smp + +LOG = log.getLogger(__name__) + + +class OpentelemetryHttpPublisher(http.HttpPublisher): + """Publish metering data to Opentelemetry Collector endpoint + + This dispatcher inherits from all options of the http dispatcher. + + To use this publisher for samples, add the following section to the + /etc/ceilometer/pipeline.yaml file or simply add it to an existing + pipeline:: + + - name: meter_file + meters: + - "*" + publishers: + - opentelemetryhttp://opentelemetry-http-ip:4318/v1/metrics + + """ + + HEADERS = {'Content-type': 'application/json'} + + @staticmethod + def get_attribute_model(key, value): + return { + "key": key, + "value": { + "string_value": value + } + } + + def get_attributes_model(self, sample): + attributes = [] + resource_id_attr = self.get_attribute_model("resource_id", + sample.resource_id) + user_id_attr = self.get_attribute_model("user_id", sample.user_id) + project_id_attr = self.get_attribute_model("project_id", + sample.project_id) + + attributes.append(resource_id_attr) + attributes.append(user_id_attr) + attributes.append(project_id_attr) + + return attributes + + @staticmethod + def get_metrics_model(sample, data_points): + name = sample.name.replace(".", "_") + desc = str(sample.name) + " unit:" + sample.unit + unit = sample.unit + metrics = dict() + metric_type = None + if sample.type == smp.TYPE_CUMULATIVE: + metric_type = "counter" + else: + metric_type = "gauge" + metrics.update({ + "name": name, + "description": desc, + "unit": unit, + metric_type: {"data_points": data_points} + }) + return metrics + + @staticmethod + def get_data_points_model(timestamp, attributes, volume): + data_points = dict() + struct_time = timeutils.parse_isotime(timestamp).timetuple() + unix_time = int(time.mktime(struct_time)) + data_points.update({ + 'attributes': attributes, + "start_time_unix_nano": unix_time, + "time_unix_nano": unix_time, + "as_double": volume, + "flags": 0 + }) + return data_points + + def get_data_model(self, sample, data_points): + metrics = [self.get_metrics_model(sample, data_points)] + data = { + "resource_metrics": [{ + "scope_metrics": [{ + "scope": { + "name": "ceilometer", + "version": "v1" + }, + "metrics": metrics + }] + }] + } + return data + + def get_data_points(self, sample): + # attributes contain basic metadata + attributes = self.get_attributes_model(sample) + try: + return [self.get_data_points_model( + sample.timestamp, attributes, sample.volume)] + except Exception as e: + LOG.warning("Get data point error, %s" % e) + return [] + + def get_opentelemetry_model(self, sample): + data_points = self.get_data_points(sample) + if data_points: + data = self.get_data_model(sample, data_points) + return data + else: + return None + + def publish_samples(self, samples): + """Send a metering message for publishing + + :param samples: Samples from pipeline after transformation + """ + if not samples: + LOG.warning('Data samples is empty!') + return + + for s in samples: + data = self.get_opentelemetry_model(s) + if data: + self._do_post(json.dumps(data)) + + @staticmethod + def publish_events(events): + raise NotImplementedError diff --git a/ceilometer/tests/unit/publisher/test_opentelemetry_http.py b/ceilometer/tests/unit/publisher/test_opentelemetry_http.py new file mode 100644 index 0000000000..d02a1bf68e --- /dev/null +++ b/ceilometer/tests/unit/publisher/test_opentelemetry_http.py @@ -0,0 +1,184 @@ +# +# Copyright 2024 cmss, inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer/publisher/opentelemetry.py""" + +import datetime +import json +import time +from unittest import mock +import uuid + +from oslo_utils import timeutils +from oslotest import base +import requests +from urllib import parse as urlparse + +from ceilometer.publisher import opentelemetry_http +from ceilometer import sample +from ceilometer import service + + +class TestOpentelemetryHttpPublisher(base.BaseTestCase): + + resource_id = str(uuid.uuid4()) + format_time = datetime.datetime.utcnow().isoformat() + sample_data = [ + sample.Sample( + name='alpha', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id=resource_id, + timestamp=format_time, + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='beta', + type=sample.TYPE_DELTA, + unit='', + volume=3, + user_id='test', + project_id='test', + resource_id=resource_id, + timestamp=format_time, + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='gamma', + type=sample.TYPE_GAUGE, + unit='', + volume=5, + user_id='test', + project_id='test', + resource_id=resource_id, + timestamp=format_time, + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='delta.epsilon', + type=sample.TYPE_GAUGE, + unit='', + volume=7, + user_id='test', + project_id='test', + resource_id=resource_id, + timestamp=format_time, + resource_metadata={'name': 'TestPublish'}, + ), + ] + + @staticmethod + def _make_fake_json(sample, format_time): + struct_time = timeutils.parse_isotime(format_time).timetuple() + unix_time = int(time.mktime(struct_time)) + if sample.type == "cumulative": + metric_type = "counter" + else: + metric_type = "gauge" + return {"resource_metrics": [{ + "scope_metrics": [{ + "scope": { + "name": "ceilometer", + "version": "v1" + }, + "metrics": [{ + "name": sample.name.replace(".", "_"), + "description": sample.name + " unit:", + "unit": "", + metric_type: { + "data_points": [{ + "attributes": [{ + "key": "resource_id", + "value": { + "string_value": sample.resource_id + } + }, { + "key": "user_id", + "value": { + "string_value": "test" + } + }, { + "key": "project_id", + "value": { + "string_value": "test" + } + }], + "start_time_unix_nano": unix_time, + "time_unix_nano": unix_time, + "as_double": sample.volume, + "flags": 0 + }]}}]}]}]} + + def setUp(self): + super(TestOpentelemetryHttpPublisher, self).setUp() + self.CONF = service.prepare_service([], []) + + def test_post_samples(self): + """Test publisher post.""" + parsed_url = urlparse.urlparse( + 'opentelemetryhttp://localhost:4318/v1/metrics') + publisher = opentelemetry_http.OpentelemetryHttpPublisher( + self.CONF, parsed_url) + + res = requests.Response() + res.status_code = 200 + with mock.patch.object(requests.Session, 'post', + return_value=res) as m_req: + publisher.publish_samples(self.sample_data) + + datas = [] + for s in self.sample_data: + datas.append(self._make_fake_json(s, self.format_time)) + expected = [] + for d in datas: + expected.append(mock.call('http://localhost:4318/v1/metrics', + auth=None, + cert=None, + data=json.dumps(d), + headers={'Content-type': + 'application/json'}, + timeout=5, + verify=True)) + self.assertEqual(expected, m_req.mock_calls) + + def test_post_samples_ssl(self): + """Test publisher post.""" + parsed_url = urlparse.urlparse( + 'opentelemetryhttp://localhost:4318/v1/metrics?ssl=1') + publisher = opentelemetry_http.OpentelemetryHttpPublisher( + self.CONF, parsed_url) + + res = requests.Response() + res.status_code = 200 + with mock.patch.object(requests.Session, 'post', + return_value=res) as m_req: + publisher.publish_samples(self.sample_data) + + datas = [] + for s in self.sample_data: + datas.append(self._make_fake_json(s, self.format_time)) + expected = [] + for d in datas: + expected.append(mock.call('https://localhost:4318/v1/metrics', + auth=None, + cert=None, + data=json.dumps(d), + headers={'Content-type': + 'application/json'}, + timeout=5, + verify=True)) + self.assertEqual(expected, m_req.mock_calls) diff --git a/setup.cfg b/setup.cfg index 884e22fbe6..2c5890916b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -187,6 +187,7 @@ ceilometer.sample.publisher = https = ceilometer.publisher.http:HttpPublisher gnocchi = ceilometer.publisher.gnocchi:GnocchiPublisher zaqar = ceilometer.publisher.zaqar:ZaqarPublisher + opentelemetryhttp = ceilometer.publisher.opentelemetry_http:OpentelemetryHttpPublisher ceilometer.event.publisher = test = ceilometer.publisher.test:TestPublisher