Add opentelemetry publisher base on http

Opentelemetry is a standard data protocol of observability.
This patch add a new publisher about 'opentelemetry', it use
http to send data in json format to opentelemetry collector.
In the future, we can try to send data to opentelemetry by gRPC.

Change-Id: Ifa20d29d27e35d98999bf0e7ae519ebcedf811aa
This commit is contained in:
minruigao 2023-12-14 16:48:03 +08:00
parent 7d471b92d5
commit c972cecb37
3 changed files with 337 additions and 0 deletions

View File

@ -0,0 +1,152 @@
#
# Copyright 2024 cmss, inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import time
from oslo_log import log
from oslo_utils import timeutils
from ceilometer.publisher import http
from ceilometer import sample as smp
LOG = log.getLogger(__name__)
class OpentelemetryHttpPublisher(http.HttpPublisher):
"""Publish metering data to Opentelemetry Collector endpoint
This dispatcher inherits from all options of the http dispatcher.
To use this publisher for samples, add the following section to the
/etc/ceilometer/pipeline.yaml file or simply add it to an existing
pipeline::
- name: meter_file
meters:
- "*"
publishers:
- opentelemetryhttp://opentelemetry-http-ip:4318/v1/metrics
"""
HEADERS = {'Content-type': 'application/json'}
@staticmethod
def get_attribute_model(key, value):
return {
"key": key,
"value": {
"string_value": value
}
}
def get_attributes_model(self, sample):
attributes = []
resource_id_attr = self.get_attribute_model("resource_id",
sample.resource_id)
user_id_attr = self.get_attribute_model("user_id", sample.user_id)
project_id_attr = self.get_attribute_model("project_id",
sample.project_id)
attributes.append(resource_id_attr)
attributes.append(user_id_attr)
attributes.append(project_id_attr)
return attributes
@staticmethod
def get_metrics_model(sample, data_points):
name = sample.name.replace(".", "_")
desc = str(sample.name) + " unit:" + sample.unit
unit = sample.unit
metrics = dict()
metric_type = None
if sample.type == smp.TYPE_CUMULATIVE:
metric_type = "counter"
else:
metric_type = "gauge"
metrics.update({
"name": name,
"description": desc,
"unit": unit,
metric_type: {"data_points": data_points}
})
return metrics
@staticmethod
def get_data_points_model(timestamp, attributes, volume):
data_points = dict()
struct_time = timeutils.parse_isotime(timestamp).timetuple()
unix_time = int(time.mktime(struct_time))
data_points.update({
'attributes': attributes,
"start_time_unix_nano": unix_time,
"time_unix_nano": unix_time,
"as_double": volume,
"flags": 0
})
return data_points
def get_data_model(self, sample, data_points):
metrics = [self.get_metrics_model(sample, data_points)]
data = {
"resource_metrics": [{
"scope_metrics": [{
"scope": {
"name": "ceilometer",
"version": "v1"
},
"metrics": metrics
}]
}]
}
return data
def get_data_points(self, sample):
# attributes contain basic metadata
attributes = self.get_attributes_model(sample)
try:
return [self.get_data_points_model(
sample.timestamp, attributes, sample.volume)]
except Exception as e:
LOG.warning("Get data point error, %s" % e)
return []
def get_opentelemetry_model(self, sample):
data_points = self.get_data_points(sample)
if data_points:
data = self.get_data_model(sample, data_points)
return data
else:
return None
def publish_samples(self, samples):
"""Send a metering message for publishing
:param samples: Samples from pipeline after transformation
"""
if not samples:
LOG.warning('Data samples is empty!')
return
for s in samples:
data = self.get_opentelemetry_model(s)
if data:
self._do_post(json.dumps(data))
@staticmethod
def publish_events(events):
raise NotImplementedError

View File

@ -0,0 +1,184 @@
#
# Copyright 2024 cmss, inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/publisher/opentelemetry.py"""
import datetime
import json
import time
from unittest import mock
import uuid
from oslo_utils import timeutils
from oslotest import base
import requests
from urllib import parse as urlparse
from ceilometer.publisher import opentelemetry_http
from ceilometer import sample
from ceilometer import service
class TestOpentelemetryHttpPublisher(base.BaseTestCase):
resource_id = str(uuid.uuid4())
format_time = datetime.datetime.utcnow().isoformat()
sample_data = [
sample.Sample(
name='alpha',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id=resource_id,
timestamp=format_time,
resource_metadata={'name': 'TestPublish'},
),
sample.Sample(
name='beta',
type=sample.TYPE_DELTA,
unit='',
volume=3,
user_id='test',
project_id='test',
resource_id=resource_id,
timestamp=format_time,
resource_metadata={'name': 'TestPublish'},
),
sample.Sample(
name='gamma',
type=sample.TYPE_GAUGE,
unit='',
volume=5,
user_id='test',
project_id='test',
resource_id=resource_id,
timestamp=format_time,
resource_metadata={'name': 'TestPublish'},
),
sample.Sample(
name='delta.epsilon',
type=sample.TYPE_GAUGE,
unit='',
volume=7,
user_id='test',
project_id='test',
resource_id=resource_id,
timestamp=format_time,
resource_metadata={'name': 'TestPublish'},
),
]
@staticmethod
def _make_fake_json(sample, format_time):
struct_time = timeutils.parse_isotime(format_time).timetuple()
unix_time = int(time.mktime(struct_time))
if sample.type == "cumulative":
metric_type = "counter"
else:
metric_type = "gauge"
return {"resource_metrics": [{
"scope_metrics": [{
"scope": {
"name": "ceilometer",
"version": "v1"
},
"metrics": [{
"name": sample.name.replace(".", "_"),
"description": sample.name + " unit:",
"unit": "",
metric_type: {
"data_points": [{
"attributes": [{
"key": "resource_id",
"value": {
"string_value": sample.resource_id
}
}, {
"key": "user_id",
"value": {
"string_value": "test"
}
}, {
"key": "project_id",
"value": {
"string_value": "test"
}
}],
"start_time_unix_nano": unix_time,
"time_unix_nano": unix_time,
"as_double": sample.volume,
"flags": 0
}]}}]}]}]}
def setUp(self):
super(TestOpentelemetryHttpPublisher, self).setUp()
self.CONF = service.prepare_service([], [])
def test_post_samples(self):
"""Test publisher post."""
parsed_url = urlparse.urlparse(
'opentelemetryhttp://localhost:4318/v1/metrics')
publisher = opentelemetry_http.OpentelemetryHttpPublisher(
self.CONF, parsed_url)
res = requests.Response()
res.status_code = 200
with mock.patch.object(requests.Session, 'post',
return_value=res) as m_req:
publisher.publish_samples(self.sample_data)
datas = []
for s in self.sample_data:
datas.append(self._make_fake_json(s, self.format_time))
expected = []
for d in datas:
expected.append(mock.call('http://localhost:4318/v1/metrics',
auth=None,
cert=None,
data=json.dumps(d),
headers={'Content-type':
'application/json'},
timeout=5,
verify=True))
self.assertEqual(expected, m_req.mock_calls)
def test_post_samples_ssl(self):
"""Test publisher post."""
parsed_url = urlparse.urlparse(
'opentelemetryhttp://localhost:4318/v1/metrics?ssl=1')
publisher = opentelemetry_http.OpentelemetryHttpPublisher(
self.CONF, parsed_url)
res = requests.Response()
res.status_code = 200
with mock.patch.object(requests.Session, 'post',
return_value=res) as m_req:
publisher.publish_samples(self.sample_data)
datas = []
for s in self.sample_data:
datas.append(self._make_fake_json(s, self.format_time))
expected = []
for d in datas:
expected.append(mock.call('https://localhost:4318/v1/metrics',
auth=None,
cert=None,
data=json.dumps(d),
headers={'Content-type':
'application/json'},
timeout=5,
verify=True))
self.assertEqual(expected, m_req.mock_calls)

View File

@ -187,6 +187,7 @@ ceilometer.sample.publisher =
https = ceilometer.publisher.http:HttpPublisher
gnocchi = ceilometer.publisher.gnocchi:GnocchiPublisher
zaqar = ceilometer.publisher.zaqar:ZaqarPublisher
opentelemetryhttp = ceilometer.publisher.opentelemetry_http:OpentelemetryHttpPublisher
ceilometer.event.publisher =
test = ceilometer.publisher.test:TestPublisher