Refresh monasca transform docs
* Refreshed monasca transform README.md * Added new documentation on generic aggregation components * Added new documentation on creating a new aggregation pipeline * Added new documentation for internal data formats used by monasca transform namely record store data format and instance usage data format. Change-Id: Id4cfb422ace2c59103c658e58e4ffb0d61a303f5 Story: 2001765 Task: 12167
This commit is contained in:
parent
c8aa020432
commit
3feaf7400e
81
README.md
81
README.md
|
@ -3,10 +3,85 @@ Team and repository tags
|
|||
|
||||
[![Team and repository tags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html)
|
||||
|
||||
<!-- Change things from this point on -->
|
||||
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
|
||||
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
|
||||
|
||||
|
||||
- [Monasca Transform](#monasca-transform)
|
||||
- [Use Cases handled by Monasca Transform](#use-cases-handled-by-monasca-transform)
|
||||
- [Operation](#operation)
|
||||
- [Architecture](#architecture)
|
||||
- [To set up the development environment](#to-set-up-the-development-environment)
|
||||
- [Generic aggregation components](#generic-aggregation-components)
|
||||
- [Create a new aggregation pipeline example](#create-a-new-aggregation-pipeline-example)
|
||||
- [Original proposal and blueprint](#original-proposal-and-blueprint)
|
||||
|
||||
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
|
||||
|
||||
# Monasca Transform
|
||||
|
||||
##To set up the development environment
|
||||
monasca-transform is a data driven aggregation engine which collects, groups and aggregates existing
|
||||
individual Monasca metrics according to business requirements and publishes new transformed
|
||||
(derived) metrics to the Monasca Kafka queue.
|
||||
|
||||
The monasca-transform dev environment uses devstack so see the README in the devstack directory.
|
||||
* Since the new transformed metrics are published as any other metric in Monasca, alarms can be
|
||||
set and triggered on the transformed metric.
|
||||
|
||||
* Monasca Transform uses [Apache Spark](http://spark.apache.org) to aggregate data. [Apache
|
||||
Spark](http://spark.apache.org) is a highly scalable, fast, in-memory, fault tolerant and
|
||||
parallel data processing framework. All monasca-transform components are implemented in Python
|
||||
and use Spark's [PySpark Python API](http://spark.apache.org/docs/latest/api/python/index.html)
|
||||
to interact with Spark.
|
||||
|
||||
* Monasca Transform does transformation and aggregation of incoming metrics in two phases.
|
||||
|
||||
* In the first phase spark streaming application is set to retrieve in data from kafka at a
|
||||
configurable *stream interval* (default *stream_inteval* is 10 minutes) and write the data
|
||||
aggregated for *stream interval* to *pre_hourly_metrics* topic in kafka.
|
||||
|
||||
* In the second phase, which is kicked off every hour, all metrics in *metrics_pre_hourly* topic
|
||||
in Kafka are aggregated again, this time over a larger interval of an hour. These hourly
|
||||
aggregated metrics published to *metrics* topic in kafka.
|
||||
|
||||
## Use Cases handled by Monasca Transform ##
|
||||
Please refer to **Problem Description** section on the [Monasca/Transform
|
||||
wiki](https://wiki.openstack.org/wiki/Monasca/Transform)
|
||||
|
||||
## Operation ##
|
||||
Please refer to **How Monasca Transform Operates** section on the [Monasca/Transform
|
||||
wiki](https://wiki.openstack.org/wiki/Monasca/Transform)
|
||||
|
||||
## Architecture ##
|
||||
Please refer to **Architecture** and **Logical processing data flow** sections on the
|
||||
[Monasca/Transform wiki](https://wiki.openstack.org/wiki/Monasca/Transform)
|
||||
|
||||
## To set up the development environment ##
|
||||
The monasca-transform uses [DevStack](https://docs.openstack.org/devstack/latest/) as a common dev
|
||||
environment. See the [README.md](devstack/README.md) in the devstack directory for details on how
|
||||
to include monasca-transform in a DevStack deployment.
|
||||
|
||||
## Generic aggregation components ##
|
||||
|
||||
Monasca Transform uses a set of generic aggregation components which can be assembled in to an
|
||||
aggregation pipeline.
|
||||
|
||||
Please refer to [generic aggregation components](docs/generic-aggregation-components.md) document for
|
||||
information on list of generic aggregation components available.
|
||||
|
||||
## Create a new aggregation pipeline example ##
|
||||
|
||||
Generic aggregation components make it easy to build new aggregation pipelines for different Monasca
|
||||
metrics.
|
||||
|
||||
This create a [new aggregation pipeline](docs/create-new-aggregation-pipeline.md) example shows how to
|
||||
create *pre_transform_specs* and *transform_specs* to create an aggregation pipeline for a new set
|
||||
of Monasca metrics, while leveraging existing set of generic aggregation components.
|
||||
|
||||
|
||||
## Original proposal and blueprint ##
|
||||
|
||||
Original proposal:
|
||||
[Monasca/Transform-proposal](https://wiki.openstack.org/wiki/Monasca/Transform-proposal)
|
||||
|
||||
Blueprint: [monasca-transform
|
||||
blueprint](https://blueprints.launchpad.net/monasca/+spec/monasca-transform)
|
||||
|
|
|
@ -0,0 +1,336 @@
|
|||
Team and repository tags
|
||||
========================
|
||||
|
||||
[![Team and repository tags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html)
|
||||
|
||||
<!-- Change things from this point on -->
|
||||
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
|
||||
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
|
||||
|
||||
|
||||
- [Create a new aggregation pipeline](#create-a-new-aggregation-pipeline)
|
||||
- [Using existing generic aggregation components](#using-existing-generic-aggregation-components)
|
||||
|
||||
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
|
||||
|
||||
<!-- Change things from this point on -->
|
||||
|
||||
# Create a new aggregation pipeline
|
||||
|
||||
Monasca Transform allows you to create new aggregation by creating *pre_transform_spec* and
|
||||
*transform_spec* for any set of Monasca metrics. This page gives you steps on how to create a new
|
||||
aggregation pipeline and test the pipeline in your DevStack environment.
|
||||
|
||||
Pre-requisite for following steps on this page is that you have already created a devstack
|
||||
development environment for Monasca Transform, following instructions in
|
||||
[devstack/README.md](devstack/README.md)
|
||||
|
||||
|
||||
## Using existing generic aggregation components ##
|
||||
|
||||
Most of the use cases will fall into this category where you should be able to create new
|
||||
aggregation for new set of metrics using existing set of generic aggregation components.
|
||||
|
||||
Let's consider a use case where we want to find out
|
||||
|
||||
* Maximum time monasca-agent takes to submit metrics over a period of an hour across all hosts
|
||||
|
||||
* Maximum time monasca-agent takes to submit metrics over period of a hour per host.
|
||||
|
||||
We know that monasca-agent on each host generates a small number of
|
||||
[monasca-agent metrics](https://github.com/openstack/monasca-agent/blob/master/docs/Plugins.md).
|
||||
|
||||
The metric we are interested in is
|
||||
|
||||
* **"monasca.collection_time_sec"**: Amount of time that the collector took for this collection run
|
||||
|
||||
**Steps:**
|
||||
|
||||
* **Step 1**: Identify the monasca metric to be aggregated from the Kafka topic
|
||||
```
|
||||
/opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic metrics | grep "monasca.collection_time_sec"
|
||||
|
||||
{"metric":{"timestamp":1523323485360.6650390625,"name":"monasca.collection_time_sec",
|
||||
"dimensions":{"hostname":"devstack","component":"monasca-agent",
|
||||
"service":"monitoring"},"value":0.0340659618, "value_meta":null},
|
||||
"meta":{"region":"RegionOne","tenantId":"d6bece1bbeff47c1b8734cd4e544dc02"},
|
||||
"creation_time":1523323489}
|
||||
```
|
||||
Note: "hostname" is available as a dimension, which we will use to find maximum collection time for each host.
|
||||
|
||||
* **Step 2**: Create a **pre_transform_spec**
|
||||
|
||||
"pre_transform_spec" drives the pre-processing of monasca metric to record store format. Look
|
||||
for existing example in
|
||||
"/monasca-transform-source/monasca_transform/data_driven_specs/pre_transform_specs/pre_transform_specs.json"
|
||||
|
||||
**pre_transform_spec**
|
||||
```
|
||||
{
|
||||
"event_processing_params":{
|
||||
"set_default_zone_to":"1",
|
||||
"set_default_geolocation_to":"1",
|
||||
"set_default_region_to":"W"
|
||||
},
|
||||
"event_type":"monasca.collection_time_sec", <-- EDITED
|
||||
"metric_id_list":["monasca_collection_host"], <-- EDITED
|
||||
"required_raw_fields_list":["creation_time", "metric.dimensions.hostname"], <--EDITED
|
||||
"service_id":"host_metrics"
|
||||
}
|
||||
```
|
||||
Lets look at all the fields that were edited (Marked as `<-- EDITED` above):
|
||||
|
||||
**event_type**: set to "monasca.collection_time_sec". These are the metrics we want to
|
||||
transform/aggregate.
|
||||
|
||||
**metric_id_list**: set to ['monasca_collection_host']. This is a transformation spec
|
||||
identifier. During pre-processing record generator generates additional "record_store" data for
|
||||
each item in this list. (To be renamed to transform_spec_list)
|
||||
|
||||
**required_raw_fields_list**: set to ["creation_time", "metric.dimensions.hostname"]
|
||||
This should list fields in the incoming metrics that are required. Pre-processing will
|
||||
eliminate or remove metrics which have missing required fields, during validation.
|
||||
|
||||
**service_id**: set to "host_metrics"
|
||||
This identifies the source service these metrics belong to. (To be removed)
|
||||
|
||||
**Note:** "metric_id" is a misnomer, it is not really a metric identifier but rather identifier
|
||||
for transformation spec. This will be changed to transform_spec_id in the future. Also
|
||||
"service_id" should be set by the source that is generating the metric. This will be removed in
|
||||
the future. (Please see Story [2001815](https://storyboard.openstack.org/#!/story/2001815))
|
||||
|
||||
* **Step 3**: Create a "transform_spec" to find maximum metric value for each host
|
||||
|
||||
"transform_spec" drives the aggregation of record store data created during pre-processing
|
||||
to final aggregated metric. Look for existing example in
|
||||
"/monasca-transform-source/monasca_transform/data_driven_specs/transform_specs/transform_specs.json"
|
||||
|
||||
**transform_spec**
|
||||
```
|
||||
{
|
||||
"aggregation_params_map":{
|
||||
|
||||
"aggregation_pipeline":{
|
||||
"source":"streaming",
|
||||
"usage":"fetch_quantity", <-- EDITED
|
||||
"setters":["set_aggregated_metric_name","set_aggregated_period"], <-- EDITED
|
||||
"insert":["insert_data_pre_hourly"] <-- EDITED
|
||||
},
|
||||
|
||||
"aggregated_metric_name":"monasca.collection_time_sec_host_agg", <-- EDITED
|
||||
"aggregation_period":"hourly", <-- EDITED
|
||||
"aggregation_group_by_list": ["host"],
|
||||
"usage_fetch_operation": "max", <-- EDITED
|
||||
"filter_by_list": [],
|
||||
"dimension_list":["aggregation_period","host"], <-- EDITED
|
||||
|
||||
"pre_hourly_operation":"max",
|
||||
"pre_hourly_group_by_list":["default"]},
|
||||
|
||||
"metric_group":"monasca_collection_host", <-- EDITED
|
||||
"metric_id":"monasca_collection_host" <-- EDITED
|
||||
}
|
||||
```
|
||||
Lets look at all the fields that were edited (Marked as `<-- EDITED` above):
|
||||
|
||||
aggregation pipeline fields
|
||||
|
||||
* **usage**: set to "fetch_quantity" Use "fetch_quantity" generic aggregation component. This
|
||||
component takes a "aggregation_group_by_list", "usage_fetch_operation" and "filter_by_list" as
|
||||
parameters.
|
||||
* **aggregation_group_by_list** set to ["host"]. Since we want to find monasca agent
|
||||
collection time for each host.
|
||||
* **usage_fetch_operation** set to "max". Since we want to find maximum value for
|
||||
monasca agent collection time.
|
||||
* **filter_by_list** set to []. Since we dont want filter out/ignore any metrics (based on
|
||||
say particular host or set of hosts)
|
||||
|
||||
* **setters**: set to ["set_aggregated_metric_name","set_aggregated_period"] These components set
|
||||
aggregated metric name and aggregation period in final aggregated metric.
|
||||
* **set_aggregated_metric_name** sets final aggregated metric name. This setter component takes
|
||||
"aggregated_metric_name" as a parameter.
|
||||
* **aggregated_metric_name**: set to "monasca.collection_time_sec_host_agg"
|
||||
* **set_aggregated_period** sets final aggregated metric period. This setter component takes
|
||||
"aggregation_period" as a parameter.
|
||||
* **aggregation_period**: set to "hourly"
|
||||
|
||||
* **insert**: set to ["insert_data_pre_hourly"]. These components are responsible for
|
||||
transforming instance usage data records to final metric format and writing the data to kafka
|
||||
topic.
|
||||
* **insert_data_pre_hourly** writes the to "metrics_pre_hourly" kafka topic, which gets
|
||||
processed by the pre hourly processor every hour.
|
||||
|
||||
pre hourly processor fields
|
||||
|
||||
* **pre_hourly_operation** set to "max"
|
||||
Find the hourly maximum value from records that were written to "metrics_pre_hourly" topic
|
||||
|
||||
* **pre_hourly_group_by_list** set to ["default"]
|
||||
|
||||
transformation spec identifier fields
|
||||
|
||||
* **metric_group** set to "monasca_collection_host". Group identifier for this transformation
|
||||
spec
|
||||
|
||||
* **metric_id** set to "monasca_collection_host". Identifier for this transformation spec.
|
||||
|
||||
**Note:** metric_group" and "metric_id" are misnomers, it is not really a metric identifier but
|
||||
rather identifier for transformation spec. This will be changed to "transform_group" and
|
||||
"transform_spec_id" in the future. (Please see Story
|
||||
[2001815](https://storyboard.openstack.org/#!/story/2001815))
|
||||
|
||||
* **Step 4**: Create a "transform_spec" to find maximum metric value across all hosts
|
||||
|
||||
Now let's create another transformation spec to find maximum metric value across all hosts.
|
||||
|
||||
**transform_spec**
|
||||
```
|
||||
{
|
||||
"aggregation_params_map":{
|
||||
|
||||
"aggregation_pipeline":{
|
||||
"source":"streaming",
|
||||
"usage":"fetch_quantity", <-- EDITED
|
||||
"setters":["set_aggregated_metric_name","set_aggregated_period"], <-- EDITED
|
||||
"insert":["insert_data_pre_hourly"] <-- EDITED
|
||||
},
|
||||
|
||||
"aggregated_metric_name":"monasca.collection_time_sec_all_agg", <-- EDITED
|
||||
"aggregation_period":"hourly", <-- EDITED
|
||||
"aggregation_group_by_list": [],
|
||||
"usage_fetch_operation": "max", <-- EDITED
|
||||
"filter_by_list": [],
|
||||
"dimension_list":["aggregation_period"], <-- EDITED
|
||||
|
||||
"pre_hourly_operation":"max",
|
||||
"pre_hourly_group_by_list":["default"]},
|
||||
|
||||
"metric_group":"monasca_collection_all", <-- EDITED
|
||||
"metric_id":"monasca_collection_all" <-- EDITED
|
||||
}
|
||||
```
|
||||
|
||||
The transformation spec above is almost identical to transformation spec created in **Step 3**
|
||||
with a few additional changes.
|
||||
|
||||
**aggregation_group_by_list** is set to [] i.e. empty list, since we want to find maximum value
|
||||
across all hosts (consider all the incoming metric data).
|
||||
|
||||
**aggregated_metric_name** is set to "monasca.collection_time_sec_all_agg".
|
||||
|
||||
**metric_group** is set to "monasca_collection_all", since we need a new transfomation spec
|
||||
group identifier.
|
||||
|
||||
**metric_id** is set to "monasca_collection_all", since we need a new transformation spec
|
||||
identifier.
|
||||
|
||||
* **Step 5**: Update "pre_transform_spec" with new transformation spec identifier
|
||||
|
||||
In **Step 4** we created a new transformation spec, with new "metric_id", namely
|
||||
"monasca_collection_all". We will have to now update the "pre_transform_spec" that we
|
||||
created in **Step 2** with new "metric_id" by adding it to the "metric_id_list"
|
||||
|
||||
**pre_transform_spec**
|
||||
```
|
||||
{
|
||||
"event_processing_params":{
|
||||
"set_default_zone_to":"1",
|
||||
"set_default_geolocation_to":"1",
|
||||
"set_default_region_to":"W"
|
||||
},
|
||||
"event_type":"monasca.collection_time_sec",
|
||||
"metric_id_list":["monasca_collection_host", "monasca_collection_all"], <-- EDITED
|
||||
"required_raw_fields_list":["creation_time", "metric.dimensions.hostname"],
|
||||
"service_id":"host_metrics"
|
||||
}
|
||||
```
|
||||
Thus we were able to add additional transformation or aggregation pipeline to the same incoming
|
||||
monasca metric very easily.
|
||||
|
||||
* **Step 6**: Update "pre_transform_spec" and "transform_spec"
|
||||
|
||||
* Edit
|
||||
"/monasca-transform-source/monasca_transform/data_driven_specs/pre_transform_specs/pre_transform_specs.json"
|
||||
and add following line.
|
||||
|
||||
```
|
||||
{"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},"event_type":"monasca.collection_time_sec","metric_id_list":["monasca_collection_host","monasca_collection_all"],"required_raw_fields_list":["creation_time"],"service_id":"host_metrics"}
|
||||
```
|
||||
|
||||
**Note:** Each line does not end with a comma (the file is not one big json document).
|
||||
|
||||
* Edit
|
||||
"/monasca-transform-source/monasca_transform/data_driven_specs/transform_specs/transform_specs.json"
|
||||
and add following lines.
|
||||
|
||||
```
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["insert_data_pre_hourly"]},"aggregated_metric_name":"monasca.collection_time_sec_host_agg","aggregation_period":"hourly","aggregation_group_by_list":["host"],"usage_fetch_operation":"max","filter_by_list":[],"dimension_list":["aggregation_period","host"],"pre_hourly_operation":"max","pre_hourly_group_by_list":["default"]},"metric_group":"monasca_collection_host","metric_id":"monasca_collection_host"}
|
||||
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["insert_data_pre_hourly"]},"aggregated_metric_name":"monasca.collection_time_sec_all_agg","aggregation_period":"hourly","aggregation_group_by_list":[],"usage_fetch_operation":"max","filter_by_list":[],"dimension_list":["aggregation_period"],"pre_hourly_operation":"max","pre_hourly_group_by_list":["default"]},"metric_group":"monasca_collection_all","metric_id":"monasca_collection_all"}
|
||||
```
|
||||
|
||||
* Run "refresh_monasca_transform.sh" script as documented in devstack
|
||||
[README](devstack/README.md) to refresh the specs in the database.
|
||||
```
|
||||
vagrant@devstack:~$ cd /opt/stack/monasca-transform
|
||||
vagrant@devstack:/opt/stack/monasca-transform$ tools/vagrant/refresh_monasca_transform.sh
|
||||
```
|
||||
|
||||
If successful, you should see this message.
|
||||
```
|
||||
***********************************************
|
||||
* *
|
||||
* SUCCESS!! refresh monasca transform done. *
|
||||
* *
|
||||
***********************************************
|
||||
```
|
||||
* **Step 7**: Verifying results
|
||||
|
||||
To verify if new aggregated metrics are being produced you can look at the "metrics_pre_hourly"
|
||||
topic in kafka. By default, monasca-transform fires of a batch every 10 minutes so you should
|
||||
see metrics in intermediate "instance_usage" data format being published to that topic every 10
|
||||
minutes.
|
||||
```
|
||||
/opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic metrics_pre_hourly
|
||||
|
||||
{"usage_hour":"06","geolocation":"NA","record_count":40.0,"app":"NA","deployment":"NA","resource_uuid":"NA",
|
||||
"pod_name":"NA","usage_minute":"NA","service_group":"NA","lastrecord_timestamp_string":"2018-04-1106:29:49",
|
||||
"user_id":"NA","zone":"NA","namespace":"NA","usage_date":"2018-04-11","daemon_set":"NA","processing_meta":{
|
||||
"event_type":"NA","metric_id":"monasca_collection_all"},
|
||||
"firstrecord_timestamp_unix":1523427604.208577,"project_id":"NA","lastrecord_timestamp_unix":1523428189.718174,
|
||||
"aggregation_period":"hourly","host":"NA","container_name":"NA","interface":"NA",
|
||||
"aggregated_metric_name":"monasca.collection_time_sec_all_agg","tenant_id":"NA","region":"NA",
|
||||
"firstrecord_timestamp_string":"2018-04-11 06:20:04","service_id":"NA","quantity":0.0687000751}
|
||||
|
||||
{"usage_hour":"06","geolocation":"NA","record_count":40.0,"app":"NA","deployment":"NA","resource_uuid":"NA",
|
||||
"pod_name":"NA","usage_minute":"NA","service_group":"NA","lastrecord_timestamp_string":"2018-04-11 06:29:49",
|
||||
"user_id":"NA","zone":"NA","namespace":"NA","usage_date":"2018-04-11","daemon_set":"NA","processing_meta":{
|
||||
"event_type":"NA","metric_id":"monasca_collection_host"},"firstrecord_timestamp_unix":1523427604.208577,
|
||||
"project_id":"NA","lastrecord_timestamp_unix":1523428189.718174,"aggregation_period":"hourly",
|
||||
"host":"devstack","container_name":"NA","interface":"NA",
|
||||
"aggregated_metric_name":"monasca.collection_time_sec_host_agg","tenant_id":"NA","region":"NA",
|
||||
"firstrecord_timestamp_string":"2018-04-11 06:20:04","service_id":"NA","quantity":0.0687000751}
|
||||
```
|
||||
|
||||
Similarly, to verify if final aggregated metrics are being published by pre hourly processor,
|
||||
you can look at "metrics" topic in kafka. By default pre hourly processor (which processes
|
||||
metrics from "metrics_pre_hourly" topic) runs 10 minutes past the top of the hour.
|
||||
```
|
||||
/opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic metrics | grep "_agg"
|
||||
|
||||
{"metric":{"timestamp":1523459468616,"value_meta":{"firstrecord_timestamp_string":"2018-04-11 14:00:13",
|
||||
"lastrecord_timestamp_string":"2018-04-11 14:59:46","record_count":239.0},"name":"monasca.collection_time_sec_host_agg",
|
||||
"value":0.1182248592,"dimensions":{"aggregation_period":"hourly","host":"devstack"}},
|
||||
"meta":{"region":"useast","tenantId":"df89c3db21954b08b0516b4b60b8baff"},"creation_time":1523459468}
|
||||
|
||||
{"metric":{"timestamp":1523455872740,"value_meta":{"firstrecord_timestamp_string":"2018-04-11 13:00:10",
|
||||
"lastrecord_timestamp_string":"2018-04-11 13:59:58","record_count":240.0},"name":"monasca.collection_time_sec_all_agg",
|
||||
"value":0.0898442268,"dimensions":{"aggregation_period":"hourly"}},
|
||||
"meta":{"region":"useast","tenantId":"df89c3db21954b08b0516b4b60b8baff"},"creation_time":1523455872}
|
||||
```
|
||||
|
||||
As you can see monaca-transform created two new aggregated metrics with name
|
||||
"monasca.collection_time_sec_host_agg" and "monasca.collection_time_sec_all_agg". "value_meta"
|
||||
section has three fields "firstrecord_timestamp" and "lastrecord_timestamp" and
|
||||
"record_count". These fields are for informational purposes only. It shows timestamp of the first metric,
|
||||
timestamp of the last metric and number of metrics that went into the calculation of the aggregated
|
||||
metric.
|
|
@ -0,0 +1,129 @@
|
|||
Team and repository tags
|
||||
========================
|
||||
|
||||
[![Team and repositorytags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html)
|
||||
|
||||
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
|
||||
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
|
||||
|
||||
|
||||
- [Monasca Transform Data Formats](#monasca-transform-data-formats)
|
||||
- [Record Store Data Format](#record-store-data-format)
|
||||
- [Instance Usage Data Format](#instance-usage-data-format)
|
||||
- [References](#references)
|
||||
|
||||
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
|
||||
|
||||
# Monasca Transform Data Formats
|
||||
|
||||
There are two data formats used by monasca transform. The following sections describes the schema
|
||||
(Spark's DataFrame[1] schema) for the two formats.
|
||||
|
||||
Note: These are internal formats used by Monasca Transform when aggregating data. If you are a user
|
||||
who wants to create new aggregation pipeline using an existing framework, you don't need to know or
|
||||
care about these two formats.
|
||||
|
||||
As a developer, if you want to write new aggregation components then you might have to know how to
|
||||
enhance the record store data format or instance usage data format with additional fields that you
|
||||
may need or to write new aggregation components that aggregate data from the additional fields.
|
||||
|
||||
**Source Metric**
|
||||
|
||||
This is an example monasca metric. Monasca metric is transformed into `record_store` data format and
|
||||
later transformed/aggregated using re-usable generic aggregation components, to derive
|
||||
'instance_usage` data format.
|
||||
|
||||
Example of a monasca metric:
|
||||
|
||||
```
|
||||
{"metric":{"timestamp":1523323485360.6650390625,
|
||||
"name":"monasca.collection_time_sec",
|
||||
"dimensions":{"hostname":"devstack",
|
||||
"component":"monasca-agent",
|
||||
"service":"monitoring"},
|
||||
"value":0.0340659618,
|
||||
"value_meta":null},
|
||||
"meta":{"region":"RegionOne","tenantId":"d6bece1bbeff47c1b8734cd4e544dc02"},
|
||||
"creation_time":1523323489}
|
||||
```
|
||||
|
||||
## Record Store Data Format ##
|
||||
|
||||
Data Frame Schema:
|
||||
|
||||
| Column Name | Column Data Type | Description |
|
||||
| ----------- | ---------------- | ----------- |
|
||||
| event_quantity | `pyspark.sql.types.DoubleType` | mapped to `metric.value`|
|
||||
| event_timestamp_unix | `pyspark.sql.types.DoubleType` | calculated as `metric.timestamp`/`1000` from source metric|
|
||||
| event_timestamp_string | `pyspark.sql.types.StringType` | mapped to `metric.timestamp` from the source metric|
|
||||
| event_type | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `metric.name` from source metric|
|
||||
| event_quantity_name | `pyspark.sql.types.StringType` | mapped to `metric.name` from source metric|
|
||||
| event_status | `pyspark.sql.types.StringType` | placeholder for the future. Currently mapped to `metric.dimensions.state` from the source metric |
|
||||
| event_version | `pyspark.sql.types.StringType` | placeholder for the future. Set to "1.0" |
|
||||
| record_type | `pyspark.sql.types.StringType` | placeholder for the future. Set to "metrics" |
|
||||
| resource_uuid | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.instanceId` or `metric.dimensions.resource_id` from source metric |
|
||||
| tenant_id | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.tenant_id` or `metric.dimensions.tenantid` or `metric.dimensions.project_id` |
|
||||
| user_id | `pyspark.sql.types.StringType` | mapped to `meta.userId` |
|
||||
| region | `pyspark.sql.types.StringType` | placeholder of the future. mapped to `meta.region`, defaults to `event_processing_params.set_default_region_to` (`pre_transform_spec`) |
|
||||
| zone | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `meta.zone`, defaults to `event_processing_params.set_default_zone_to` (`pre_transform_spec`) |
|
||||
| host | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.hostname` or `metric.value_meta.host` |
|
||||
| project_id | `pyspark.sql.types.StringType` | mapped to metric tenant_id |
|
||||
| service_group | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `service_id` in `pre_transform_spec` |
|
||||
| service_id | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `service_id` in `pre_transform_spec` |
|
||||
| event_date | `pyspark.sql.types.StringType` | "YYYY-mm-dd". Extracted from `metric.timestamp` |
|
||||
| event_hour | `pyspark.sql.types.StringType` | "HH". Extracted from `metric.timestamp` |
|
||||
| event_minute | `pyspark.sql.types.StringType` | "MM". Extracted from `metric.timestamp` |
|
||||
| event_second | `pyspark.sql.types.StringType` | "SS". Extracted from `metric.timestamp` |
|
||||
| metric_group | `pyspark.sql.types.StringType` | identifier for transform spec group |
|
||||
| metric_id | `pyspark.sql.types.StringType` | identifier for transform spec |
|
||||
| namespace | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.namespace` |
|
||||
| pod_name | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.pod_name` |
|
||||
| app | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.app` |
|
||||
| container_name | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.container_name`|
|
||||
| interface | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.interface` |
|
||||
| deployment | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.deployment` |
|
||||
| daemon_set | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.daemon_set` |
|
||||
|
||||
## Instance Usage Data Format ##
|
||||
|
||||
Data Frame Schema:
|
||||
|
||||
| Column Name | Column Data Type | Description |
|
||||
| ----------- | ---------------- | ----------- |
|
||||
| tenant_id | `pyspark.sql.types.StringType` | project_id, defaults to `NA` |
|
||||
| user_id | `pyspark.sql.types.StringType` | user_id, defaults to `NA`|
|
||||
| resource_uuid | `pyspark.sql.types.StringType` | resource_id, defaults to `NA`|
|
||||
| geolocation | `pyspark.sql.types.StringType` | placeholder for future, defaults to `NA`|
|
||||
| region | `pyspark.sql.types.StringType` | placeholder for future, defaults to `NA`|
|
||||
| zone | `pyspark.sql.types.StringType` | placeholder for future, defaults to `NA`|
|
||||
| host | `pyspark.sql.types.StringType` | compute hostname, defaults to `NA`|
|
||||
| project_id | `pyspark.sql.types.StringType` | project_id, defaults to `NA`|
|
||||
| aggregated_metric_name | `pyspark.sql.types.StringType` | aggregated metric name, defaults to `NA`|
|
||||
| firstrecord_timestamp_string | `pyspark.sql.types.StringType` | timestamp of the first metric used to derive this aggregated metric|
|
||||
| lastrecord_timestamp_string | `pyspark.sql.types.StringType` | timestamp of the last metric used to derive this aggregated metric|
|
||||
| service_group | `pyspark.sql.types.StringType` | placeholder for the future, defaults to `NA`|
|
||||
| service_id | `pyspark.sql.types.StringType` | placeholder for the future, defaults to `NA`|
|
||||
| usage_date | `pyspark.sql.types.StringType` | "YYYY-mm-dd" date|
|
||||
| usage_hour | `pyspark.sql.types.StringType` | "HH" hour|
|
||||
| usage_minute | `pyspark.sql.types.StringType` | "MM" minute|
|
||||
| aggregation_period | `pyspark.sql.types.StringType` | "hourly" or "minutely" |
|
||||
| namespace | `pyspark.sql.types.StringType` | |
|
||||
| pod_name | `pyspark.sql.types.StringType` | |
|
||||
| app | `pyspark.sql.types.StringType` | |
|
||||
| container_name | `pyspark.sql.types.StringType` | |
|
||||
| interface | `pyspark.sql.types.StringType` | |
|
||||
| deployment | `pyspark.sql.types.StringType` | |
|
||||
| daemon_set | `pyspark.sql.types.StringType` | |
|
||||
| firstrecord_timestamp_unix | `pyspark.sql.types.DoubleType` | epoch timestamp of the first metric used to derive this aggregated metric |
|
||||
| lastrecord_timestamp_unix | `pyspark.sql.types.DoubleType` | epoch timestamp of the first metric used to derive this aggregated metric |
|
||||
| quantity | `pyspark.sql.types.DoubleType` | aggregated metric quantity |
|
||||
| record_count | `pyspark.sql.types.DoubleType` | number of source metrics that were used to derive this aggregated metric. For informational purposes only. |
|
||||
| processing_meta | `pyspark.sql.types.MapType(pyspark.sql.types.StringType, pyspark.sql.types.StringType, True)` | Key-Value pairs to store additional information, to aid processing|
|
||||
|
||||
## References
|
||||
|
||||
[1] [Spark SQL, DataFrames and Datasets
|
||||
Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)
|
||||
|
||||
[2] [Spark
|
||||
DataTypes](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.types.DataType)
|
|
@ -0,0 +1,632 @@
|
|||
Team and repository tags
|
||||
========================
|
||||
|
||||
[![Team and repository tags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html)
|
||||
|
||||
<!-- Change things from this point on -->
|
||||
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
|
||||
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
|
||||
- [Monasca Transform Generic Aggregation Components](#monasca-transform-generic-aggregation-components)
|
||||
|
||||
- [Monasca Transform Generic Aggregation Components](#monasca-transform-generic-aggregation-components)
|
||||
- [Introduction](#introduction)
|
||||
- [1: Conversion of incoming metrics to record store data format](#1-conversion-of-incoming-metrics-to-record-store-data-format)
|
||||
- [Pre Transform Spec](#pre-transform-spec)
|
||||
- [2: Data aggregation using generic aggregation components](#2-data-aggregation-using-generic-aggregation-components)
|
||||
- [Transform Specs](#transform-specs)
|
||||
- [aggregation_params_map](#aggregation_params_map)
|
||||
- [aggregation_pipeline](#aggregation_pipeline)
|
||||
- [Other parameters](#other-parameters)
|
||||
- [metric_group and metric_id](#metric_group-and-metric_id)
|
||||
- [Generic Aggregation Components](#generic-aggregation-components)
|
||||
- [Usage Components](#usage-components)
|
||||
- [fetch_quantity](#fetch_quantity)
|
||||
- [fetch_quantity_util](#fetch_quantity_util)
|
||||
- [calculate_rate](#calculate_rate)
|
||||
- [Setter Components](#setter-components)
|
||||
- [set_aggregated_metric_name](#set_aggregated_metric_name)
|
||||
- [set_aggregated_period](#set_aggregated_period)
|
||||
- [rollup_quantity](#rollup_quantity)
|
||||
- [Insert Components](#insert-components)
|
||||
- [insert_data](#insert_data)
|
||||
- [insert_data_pre_hourly](#insert_data_pre_hourly)
|
||||
- [Processors](#processors)
|
||||
- [pre_hourly_processor](#pre_hourly_processor)
|
||||
- [Putting it all together](#putting-it-all-together)
|
||||
|
||||
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
|
||||
# Monasca Transform Generic Aggregation Components
|
||||
|
||||
# Introduction
|
||||
|
||||
Monasca Transform uses standard ETL (Extract-Transform-Load) design pattern to aggregate monasca
|
||||
metrics and uses innovative data/configuration driven mechanism to drive processing. It accomplishes
|
||||
data aggregation in two distinct steps, each is driven using external configuration specifications,
|
||||
namely *pre-transform_spec* and *transform_spec*.
|
||||
|
||||
## 1: Conversion of incoming metrics to record store data format ##
|
||||
|
||||
In the first step, the incoming metrics are converted into a canonical data format called as record
|
||||
store data using *pre_transform_spec*.
|
||||
|
||||
This logical processing data flow is explained in more detail in [Monasca/Transform wiki: Logical
|
||||
processing data flow section: Conversion to record store
|
||||
format](https://wiki.openstack.org/wiki/Monasca/Transform#Logical_processing_data_flow) and includes
|
||||
following operations:
|
||||
|
||||
* identifying metrics that are required (or in other words filtering out of unwanted metrics)
|
||||
|
||||
* validation and extraction of essential data in metric
|
||||
|
||||
* generating multiple records for incoming metrics if they are to be aggregated in multiple ways,
|
||||
and finally
|
||||
|
||||
* conversion of the incoming metrics to canonical record store data format. Please refer to record
|
||||
store section in [Data Formats](data_formats.md) for more information on record store format.
|
||||
|
||||
### Pre Transform Spec ###
|
||||
|
||||
Example *pre_transform_spec* for metric
|
||||
|
||||
```
|
||||
{
|
||||
"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},
|
||||
"event_type":"cpu.total_logical_cores",
|
||||
"metric_id_list":["cpu_total_all","cpu_total_host","cpu_util_all","cpu_util_host"],
|
||||
"required_raw_fields_list":["creation_time"],
|
||||
"service_id":"host_metrics"
|
||||
}
|
||||
```
|
||||
|
||||
*List of fields*
|
||||
|
||||
| field name | values | description |
|
||||
| ---------- | ------ | ----------- |
|
||||
| event_processing_params | Set default field values `set_default_zone_to`, `set_default_geolocation_to`, `set_default_region_to`| Set default values for certain fields in the record store data |
|
||||
| event_type | Name of the metric | identifies metric that needs to be aggregated |
|
||||
| metric_id_list | List of `metric_id`'s | List of identifiers, should match `metric_id` in transform specs. This is used by record generation step to generate multiple records if this metric is to be aggregated in multiple ways|
|
||||
| required_raw_fields_list | List of `field` | List of fields (use JSON dotted notation) that are required in the incoming metric, used for validating incoming metric |
|
||||
| service_id | service identifier | Identifies the service to which this metric belongs to. Note: this field not yet used |
|
||||
|
||||
## 2: Data aggregation using generic aggregation components ##
|
||||
|
||||
In the second step, the canonical record store data is aggregated using *transform_spec*. Each
|
||||
*transform_spec* defines series of generic aggregation components, which are specified in
|
||||
`aggregation_params_map.aggregation_pipeline` section. (See *transform_spec* example below).
|
||||
|
||||
Any parameters used by the generic aggregation components are also specified in the
|
||||
`aggregation_params_map` section (See *Other parameters* e.g. `aggregated_metric_name`, `aggregation_period`,
|
||||
`aggregation_group_by_list` etc. in *transform_spec* example below)
|
||||
|
||||
### Transform Specs ###
|
||||
|
||||
Example *transform_spec* for metric
|
||||
```
|
||||
{"aggregation_params_map":{
|
||||
"aggregation_pipeline":{
|
||||
"source":"streaming",
|
||||
"usage":"fetch_quantity",
|
||||
"setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],
|
||||
"insert":["prepare_data","insert_data_pre_hourly"]
|
||||
},
|
||||
"aggregated_metric_name":"cpu.total_logical_cores_agg",
|
||||
"aggregation_period":"hourly",
|
||||
"aggregation_group_by_list": ["host", "metric_id", "tenant_id"],
|
||||
"usage_fetch_operation": "avg",
|
||||
"filter_by_list": [],
|
||||
"setter_rollup_group_by_list": [],
|
||||
"setter_rollup_operation": "sum",
|
||||
"dimension_list":["aggregation_period","host","project_id"],
|
||||
"pre_hourly_operation":"avg",
|
||||
"pre_hourly_group_by_list":["default"]
|
||||
},
|
||||
"metric_group":"cpu_total_all",
|
||||
"metric_id":"cpu_total_all"
|
||||
}
|
||||
```
|
||||
|
||||
#### aggregation_params_map ####
|
||||
|
||||
This section specifies *aggregation_pipeline*, *Other parameters* (used by generic aggregation
|
||||
components in *aggregation_pipeline*).
|
||||
|
||||
##### aggregation_pipeline #####
|
||||
|
||||
Specifies generic aggregation components that should be used to process incoming metrics.
|
||||
|
||||
Note: generic aggregation components are re-usable and can be used to build different aggregation
|
||||
pipelines as required.
|
||||
|
||||
*List of fields*
|
||||
|
||||
| field name | values | description |
|
||||
| ---------- | ------ | ----------- |
|
||||
| source | ```streaming``` | source is ```streaming```. In the future this can be used to specify a component which can fetch data directly from monasca datastore |
|
||||
| usage | ```fetch_quantity```, ```fetch_quantity_util```, ```calculate_rate``` | [Usage Components](https://github.com/openstack/monasca-transform/tree/master/monasca_transform/component/usage)|
|
||||
| setters | ```pre_hourly_calculate_rate```, ```rollup_quantity```, ```set_aggregated_metric_name```, ```set_aggregated_period``` | [Setter Components](https://github.com/openstack/monasca-transform/tree/master/monasca_transform/component/setter)|
|
||||
| insert | ```insert_data```, ```insert_data_pre_hourly``` | [Insert Components](https://github.com/openstack/monasca-transform/tree/master/monasca_transform/component/insert)|
|
||||
|
||||
|
||||
##### Other parameters #####
|
||||
|
||||
Specifies parameters that generic aggregation components use to process and aggregate data.
|
||||
|
||||
*List of Other parameters*
|
||||
|
||||
| Parameter Name | Values | Description | Used by |
|
||||
| -------------- | ------ | ----------- | ------- |
|
||||
| aggregated_metric_name| e.g. "cpu.total_logical_cores_agg" | Name of the aggregated metric | [set_aggregated_metric_name](#set_aggregated_metric_name) |
|
||||
| aggregation_period |"hourly", "minutely" or "secondly" | Period over which to aggregate data. | [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate), [set_aggregated_period](#set_aggregated_period), [rollup_quantity](#rollup_quantity) |[fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) |
|
||||
| aggregation_group_by_list | e.g. "project_id", "hostname" | Group `record_store` data with these columns | [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) |
|
||||
| usage_fetch_operation | e.g "sum" | After the data is grouped by `aggregation_group_by_list`, perform this operation to find the aggregated metric value | [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) |
|
||||
| filter_by_list | Filter regex | Filter data using regex on a `record_store` column value| [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) |
|
||||
| setter_rollup_group_by_list | e.g. "project_id" | Group by these set of fields | [rollup_quantity](#rollup_quantity) |
|
||||
| setter_rollup_operation | e.g. "avg" | After data is grouped by `setter_rollup_group_by_list`, peform this operation to find aggregated metric value | [rollup_quantity](#rollup_quantity) |
|
||||
| dimension_list | e.g. "aggregation_period", "host", "project_id" | List of fields which specify dimensions in aggregated metric | [insert_data](#insert_data), [insert_data_pre_hourly](#insert_data_pre_hourly)|
|
||||
| pre_hourly_group_by_list | e.g. "default" | List of `instance usage data` fields to do a group by operation to aggregate data | [pre_hourly_processor](#pre_hourly_processor) |
|
||||
| pre_hourly_operation | e.g. "avg" | When aggregating data published to `metrics_pre_hourly` every hour, perform this operation to find hourly aggregated metric value | [pre_hourly_processor](#pre_hourly_processor) |
|
||||
|
||||
### metric_group and metric_id ###
|
||||
|
||||
Specifies a metric or list of metrics from the record store data, which will be processed by this
|
||||
*transform_spec*. Note: This can be a single metric or a group of metrics that will be combined to
|
||||
produce the final aggregated metric.
|
||||
|
||||
*List of fields*
|
||||
|
||||
| field name | values | description |
|
||||
| ---------- | ------ | ----------- |
|
||||
| metric_group | unique transform spec group identifier | group identifier for this transform spec e.g. "cpu_total_all" |
|
||||
| metric_id | unique transform spec identifier | identifier for this transform spec e.g. "cpu_total_all" |
|
||||
|
||||
**Note:** "metric_id" is a misnomer, it is not really a metric group/or metric identifier but rather
|
||||
identifier for transformation spec. This will be changed to "transform_spec_id" in the future.
|
||||
|
||||
## Generic Aggregation Components ##
|
||||
|
||||
*List of Generic Aggregation Components*
|
||||
|
||||
### Usage Components ###
|
||||
|
||||
All usage components implement a method
|
||||
|
||||
```
|
||||
def usage(transform_context, record_store_df):
|
||||
..
|
||||
..
|
||||
return instance_usage_df
|
||||
```
|
||||
|
||||
#### fetch_quantity ####
|
||||
|
||||
This component groups record store records by `aggregation_group_by_list`, sorts within
|
||||
group by timestamp field, finds usage based on `usage_fetch_operation`. Optionally this
|
||||
component also takes `filter_by_list` to include for exclude certain records from usage
|
||||
calculation.
|
||||
|
||||
*Other parameters*
|
||||
|
||||
* **aggregation_group_by_list**
|
||||
|
||||
List of fields to group by.
|
||||
|
||||
Possible values: any set of fields in record store data.
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
"aggregation_group_by_list": ["tenant_id"]
|
||||
```
|
||||
* **usage_fetch_operation**
|
||||
|
||||
Operation to be performed on grouped data set.
|
||||
|
||||
*Possible values:* "sum", "max", "min", "avg", "latest", "oldest"
|
||||
|
||||
* **aggregation_period**
|
||||
|
||||
Period to aggregate by.
|
||||
|
||||
*Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
"aggregation_period": "hourly"
|
||||
```
|
||||
|
||||
* **filter_by_list**
|
||||
|
||||
Filter (include or exclude) record store data as specified.
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
filter_by_list": "[{"field_to_filter": "hostname",
|
||||
"filter_expression": "comp-(\d)+",
|
||||
"filter_operation": "include"}]
|
||||
```
|
||||
|
||||
OR
|
||||
|
||||
```
|
||||
filter_by_list": "[{"field_to_filter": "hostname",
|
||||
"filter_expression": "controller-(\d)+",
|
||||
"filter_operation": "exclude"}]
|
||||
```
|
||||
|
||||
#### fetch_quantity_util ####
|
||||
|
||||
This component finds the utilized quantity based on *total_quantity* and *idle_perc* using
|
||||
following calculation
|
||||
|
||||
```
|
||||
utilized_quantity = (100 - idle_perc) * total_quantity / 100
|
||||
```
|
||||
|
||||
where,
|
||||
|
||||
* **total_quantity** data, identified by `usage_fetch_util_quantity_event_type` parameter and
|
||||
|
||||
* **idle_perc** data, identified by `usage_fetch_util_idle_perc_event_type` parameter
|
||||
|
||||
This component initially groups record store records by `aggregation_group_by_list` and
|
||||
`event_type`, sorts within group by timestamp field, calculates `total_quantity` and
|
||||
`idle_perc` values based on `usage_fetch_operation`. `utilized_quantity` is then calculated
|
||||
using the formula given above.
|
||||
|
||||
*Other parameters*
|
||||
|
||||
* **aggregation_group_by_list**
|
||||
|
||||
List of fields to group by.
|
||||
|
||||
Possible values: any set of fields in record store data.
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
"aggregation_group_by_list": ["tenant_id"]
|
||||
```
|
||||
* **usage_fetch_operation**
|
||||
|
||||
Operation to be performed on grouped data set
|
||||
|
||||
*Possible values:* "sum", "max", "min", "avg", "latest", "oldest"
|
||||
|
||||
* **aggregation_period**
|
||||
|
||||
Period to aggregate by.
|
||||
|
||||
*Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
"aggregation_period": "hourly"
|
||||
```
|
||||
|
||||
* **filter_by_list**
|
||||
|
||||
Filter (include or exclude) record store data as specified
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
filter_by_list": "[{"field_to_filter": "hostname",
|
||||
"filter_expression": "comp-(\d)+",
|
||||
"filter_operation": "include"}]
|
||||
```
|
||||
|
||||
OR
|
||||
|
||||
```
|
||||
filter_by_list": "[{"field_to_filter": "hostname",
|
||||
"filter_expression": "controller-(\d)+",
|
||||
"filter_operation": "exclude"}]
|
||||
```
|
||||
|
||||
* **usage_fetch_util_quantity_event_type**
|
||||
|
||||
event type (metric name) to identify data which will be used to calculate `total_quantity`
|
||||
|
||||
*Possible values:* metric name
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
"usage_fetch_util_quantity_event_type": "cpu.total_logical_cores"
|
||||
```
|
||||
|
||||
|
||||
* **usage_fetch_util_idle_perc_event_type**
|
||||
|
||||
event type (metric name) to identify data which will be used to calculate `total_quantity`
|
||||
|
||||
*Possible values:* metric name
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
"usage_fetch_util_idle_perc_event_type": "cpu.idle_perc"
|
||||
```
|
||||
|
||||
#### calculate_rate ####
|
||||
|
||||
This component finds the rate of change of quantity (in percent) over a time period using
|
||||
following calculation
|
||||
|
||||
```
|
||||
rate_of_change (in percent) = ((oldest_quantity - latest_quantity)/oldest_quantity) * 100
|
||||
```
|
||||
|
||||
where,
|
||||
|
||||
* **oldest_quantity**: oldest (or earliest) `average` quantity if there are multiple quantites in a
|
||||
group for a given time period.
|
||||
|
||||
* **latest_quantity**: latest `average` quantity if there are multiple quantities in a group
|
||||
for a given time period
|
||||
|
||||
*Other parameters*
|
||||
|
||||
* **aggregation_group_by_list**
|
||||
|
||||
List of fields to group by.
|
||||
|
||||
Possible values: any set of fields in record store data.
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
"aggregation_group_by_list": ["tenant_id"]
|
||||
```
|
||||
* **usage_fetch_operation**
|
||||
|
||||
Operation to be performed on grouped data set
|
||||
|
||||
*Possible values:* "sum", "max", "min", "avg", "latest", "oldest"
|
||||
|
||||
* **aggregation_period**
|
||||
|
||||
Period to aggregate by.
|
||||
|
||||
*Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
"aggregation_period": "hourly"
|
||||
```
|
||||
|
||||
* **filter_by_list**
|
||||
|
||||
Filter (include or exclude) record store data as specified
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
filter_by_list": "[{"field_to_filter": "hostname",
|
||||
"filter_expression": "comp-(\d)+",
|
||||
"filter_operation": "include"}]
|
||||
```
|
||||
|
||||
OR
|
||||
|
||||
```
|
||||
filter_by_list": "[{"field_to_filter": "hostname",
|
||||
"filter_expression": "controller-(\d)+",
|
||||
"filter_operation": "exclude"}]
|
||||
```
|
||||
|
||||
|
||||
### Setter Components ###
|
||||
|
||||
All usage components implement a method
|
||||
|
||||
```
|
||||
def setter(transform_context, instance_usage_df):
|
||||
..
|
||||
..
|
||||
return instance_usage_df
|
||||
```
|
||||
|
||||
#### set_aggregated_metric_name ####
|
||||
|
||||
This component sets final aggregated metric name by setting `aggregated_metric_name` field in
|
||||
`instance_usage` data.
|
||||
|
||||
*Other parameters*
|
||||
|
||||
* **aggregated_metric_name**
|
||||
|
||||
Name of the metric name being generated.
|
||||
|
||||
*Possible values:* any aggregated metric name. Convention is to end the metric name
|
||||
with "_agg".
|
||||
|
||||
Example:
|
||||
```
|
||||
"aggregated_metric_name":"cpu.total_logical_cores_agg"
|
||||
```
|
||||
|
||||
#### set_aggregated_period ####
|
||||
|
||||
This component sets final aggregated metric name by setting `aggregation_period` field in
|
||||
`instance_usage` data.
|
||||
|
||||
*Other parameters*
|
||||
|
||||
* **aggregated_period**
|
||||
|
||||
Name of the metric name being generated.
|
||||
|
||||
*Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
|
||||
|
||||
Example:
|
||||
```
|
||||
"aggregation_period": "hourly"
|
||||
```
|
||||
|
||||
**Note** If you are publishing metrics to *metrics_pre_hourly* kafka topic using
|
||||
`insert_data_pre_hourly` component(See *insert_data_pre_hourly* component below),
|
||||
`aggregation_period` will have to be set to `hourly`since by default all data in
|
||||
*metrics_pre_hourly* topic, by default gets aggregated every hour by `Pre Hourly Processor` (See
|
||||
`Processors` section below)
|
||||
|
||||
#### rollup_quantity ####
|
||||
|
||||
This component groups `instance_usage` records by `setter_rollup_group_by_list`, sorts within
|
||||
group by timestamp field, finds usage based on `setter_fetch_operation`.
|
||||
|
||||
*Other parameters*
|
||||
|
||||
* **setter_rollup_group_by_list**
|
||||
|
||||
List of fields to group by.
|
||||
|
||||
Possible values: any set of fields in record store data.
|
||||
|
||||
Example:
|
||||
```
|
||||
"setter_rollup_group_by_list": ["tenant_id"]
|
||||
```
|
||||
* **setter_fetch_operation**
|
||||
|
||||
Operation to be performed on grouped data set
|
||||
|
||||
*Possible values:* "sum", "max", "min", "avg"
|
||||
|
||||
Example:
|
||||
```
|
||||
"setter_fetch_operation": "avg"
|
||||
```
|
||||
|
||||
* **aggregation_period**
|
||||
|
||||
Period to aggregate by.
|
||||
|
||||
*Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
"aggregation_period": "hourly"
|
||||
```
|
||||
|
||||
### Insert Components ###
|
||||
|
||||
All usage components implement a method
|
||||
|
||||
```
|
||||
def insert(transform_context, instance_usage_df):
|
||||
..
|
||||
..
|
||||
return instance_usage_df
|
||||
```
|
||||
|
||||
#### insert_data ####
|
||||
|
||||
This component converts `instance_usage` data into monasca metric format and writes the metric to
|
||||
`metrics` topic in kafka.
|
||||
|
||||
*Other parameters*
|
||||
|
||||
* **dimension_list**
|
||||
|
||||
List of fields in `instance_usage` data that should be converted to monasca metric dimensions.
|
||||
|
||||
*Possible values:* any fields in `instance_usage` data
|
||||
|
||||
Example:
|
||||
```
|
||||
"dimension_list":["aggregation_period","host","project_id"]
|
||||
```
|
||||
|
||||
#### insert_data_pre_hourly ####
|
||||
|
||||
This component converts `instance_usage` data into monasca metric format and writes the metric to
|
||||
`metrics_pre_hourly` topic in kafka.
|
||||
|
||||
*Other parameters*
|
||||
|
||||
* **dimension_list**
|
||||
|
||||
List of fields in `instance_usage` data that should be converted to monasca metric dimensions.
|
||||
|
||||
*Possible values:* any fields in `instance_usage` data
|
||||
|
||||
Example:
|
||||
```
|
||||
"dimension_list":["aggregation_period","host","project_id"]
|
||||
```
|
||||
|
||||
## Processors ##
|
||||
|
||||
Processors are special components that process data from a kafka topic, at the desired time
|
||||
interval. These are different from generic aggregation components since they process data from
|
||||
specific kafka topic.
|
||||
|
||||
All processor components implement following methods
|
||||
|
||||
```
|
||||
def get_app_name(self):
|
||||
[...]
|
||||
return app_name
|
||||
|
||||
def is_time_to_run(self, current_time):
|
||||
if current_time > last_invoked + 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def run_processor(self, time):
|
||||
# do work...
|
||||
```
|
||||
|
||||
### pre_hourly_processor ###
|
||||
|
||||
Pre Hourly Processor, runs every hour and aggregates `instance_usage` data published to
|
||||
`metrics_pre_hourly` topic.
|
||||
|
||||
Pre Hourly Processor by default is set to run 10 minutes after the top of the hour and processes
|
||||
data from previous hour. `instance_usage` data is grouped by `pre_hourly_group_by_list`
|
||||
|
||||
*Other parameters*
|
||||
|
||||
* **pre_hourly_group_by_list**
|
||||
|
||||
List of fields to group by.
|
||||
|
||||
Possible values: any set of fields in `instance_usage` data or to `default`
|
||||
|
||||
Note: setting to `default` will group `instance_usage` data by `tenant_id`, `user_id`,
|
||||
`resource_uuid`, `geolocation`, `region`, `zone`, `host`, `project_id`,
|
||||
`aggregated_metric_name`, `aggregation_period`
|
||||
|
||||
Example:
|
||||
```
|
||||
"pre_hourly_group_by_list": ["tenant_id"]
|
||||
```
|
||||
|
||||
OR
|
||||
|
||||
```
|
||||
"pre_hourly_group_by_list": ["default"]
|
||||
```
|
||||
|
||||
* **pre_hourly_operation**
|
||||
|
||||
Operation to be performed on grouped data set.
|
||||
|
||||
*Possible values:* "sum", "max", "min", "avg", "rate"
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
"pre_hourly_operation": "avg"
|
||||
```
|
||||
|
||||
# Putting it all together
|
||||
Please refer to [Create a new aggregation pipeline](create-new-aggregation-pipeline.md) document to
|
||||
create a new aggregation pipeline.
|
Loading…
Reference in New Issue