Merge "Refresh monasca transform docs"

This commit is contained in:
Zuul 2018-04-24 11:30:06 +00:00 committed by Gerrit Code Review
commit 0d318c0d44
4 changed files with 1175 additions and 3 deletions

View File

@ -3,10 +3,85 @@ Team and repository tags
[![Team and repository tags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html)
<!-- Change things from this point on -->
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
- [Monasca Transform](#monasca-transform)
- [Use Cases handled by Monasca Transform](#use-cases-handled-by-monasca-transform)
- [Operation](#operation)
- [Architecture](#architecture)
- [To set up the development environment](#to-set-up-the-development-environment)
- [Generic aggregation components](#generic-aggregation-components)
- [Create a new aggregation pipeline example](#create-a-new-aggregation-pipeline-example)
- [Original proposal and blueprint](#original-proposal-and-blueprint)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
# Monasca Transform
##To set up the development environment
monasca-transform is a data driven aggregation engine which collects, groups and aggregates existing
individual Monasca metrics according to business requirements and publishes new transformed
(derived) metrics to the Monasca Kafka queue.
The monasca-transform dev environment uses devstack so see the README in the devstack directory.
* Since the new transformed metrics are published as any other metric in Monasca, alarms can be
set and triggered on the transformed metric.
* Monasca Transform uses [Apache Spark](http://spark.apache.org) to aggregate data. [Apache
Spark](http://spark.apache.org) is a highly scalable, fast, in-memory, fault tolerant and
parallel data processing framework. All monasca-transform components are implemented in Python
and use Spark's [PySpark Python API](http://spark.apache.org/docs/latest/api/python/index.html)
to interact with Spark.
* Monasca Transform does transformation and aggregation of incoming metrics in two phases.
* In the first phase spark streaming application is set to retrieve in data from kafka at a
configurable *stream interval* (default *stream_inteval* is 10 minutes) and write the data
aggregated for *stream interval* to *pre_hourly_metrics* topic in kafka.
* In the second phase, which is kicked off every hour, all metrics in *metrics_pre_hourly* topic
in Kafka are aggregated again, this time over a larger interval of an hour. These hourly
aggregated metrics published to *metrics* topic in kafka.
## Use Cases handled by Monasca Transform ##
Please refer to **Problem Description** section on the [Monasca/Transform
wiki](https://wiki.openstack.org/wiki/Monasca/Transform)
## Operation ##
Please refer to **How Monasca Transform Operates** section on the [Monasca/Transform
wiki](https://wiki.openstack.org/wiki/Monasca/Transform)
## Architecture ##
Please refer to **Architecture** and **Logical processing data flow** sections on the
[Monasca/Transform wiki](https://wiki.openstack.org/wiki/Monasca/Transform)
## To set up the development environment ##
The monasca-transform uses [DevStack](https://docs.openstack.org/devstack/latest/) as a common dev
environment. See the [README.md](devstack/README.md) in the devstack directory for details on how
to include monasca-transform in a DevStack deployment.
## Generic aggregation components ##
Monasca Transform uses a set of generic aggregation components which can be assembled in to an
aggregation pipeline.
Please refer to [generic aggregation components](docs/generic-aggregation-components.md) document for
information on list of generic aggregation components available.
## Create a new aggregation pipeline example ##
Generic aggregation components make it easy to build new aggregation pipelines for different Monasca
metrics.
This create a [new aggregation pipeline](docs/create-new-aggregation-pipeline.md) example shows how to
create *pre_transform_specs* and *transform_specs* to create an aggregation pipeline for a new set
of Monasca metrics, while leveraging existing set of generic aggregation components.
## Original proposal and blueprint ##
Original proposal:
[Monasca/Transform-proposal](https://wiki.openstack.org/wiki/Monasca/Transform-proposal)
Blueprint: [monasca-transform
blueprint](https://blueprints.launchpad.net/monasca/+spec/monasca-transform)

View File

@ -0,0 +1,336 @@
Team and repository tags
========================
[![Team and repository tags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html)
<!-- Change things from this point on -->
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
- [Create a new aggregation pipeline](#create-a-new-aggregation-pipeline)
- [Using existing generic aggregation components](#using-existing-generic-aggregation-components)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
<!-- Change things from this point on -->
# Create a new aggregation pipeline
Monasca Transform allows you to create new aggregation by creating *pre_transform_spec* and
*transform_spec* for any set of Monasca metrics. This page gives you steps on how to create a new
aggregation pipeline and test the pipeline in your DevStack environment.
Pre-requisite for following steps on this page is that you have already created a devstack
development environment for Monasca Transform, following instructions in
[devstack/README.md](devstack/README.md)
## Using existing generic aggregation components ##
Most of the use cases will fall into this category where you should be able to create new
aggregation for new set of metrics using existing set of generic aggregation components.
Let's consider a use case where we want to find out
* Maximum time monasca-agent takes to submit metrics over a period of an hour across all hosts
* Maximum time monasca-agent takes to submit metrics over period of a hour per host.
We know that monasca-agent on each host generates a small number of
[monasca-agent metrics](https://github.com/openstack/monasca-agent/blob/master/docs/Plugins.md).
The metric we are interested in is
* **"monasca.collection_time_sec"**: Amount of time that the collector took for this collection run
**Steps:**
* **Step 1**: Identify the monasca metric to be aggregated from the Kafka topic
```
/opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic metrics | grep "monasca.collection_time_sec"
{"metric":{"timestamp":1523323485360.6650390625,"name":"monasca.collection_time_sec",
"dimensions":{"hostname":"devstack","component":"monasca-agent",
"service":"monitoring"},"value":0.0340659618, "value_meta":null},
"meta":{"region":"RegionOne","tenantId":"d6bece1bbeff47c1b8734cd4e544dc02"},
"creation_time":1523323489}
```
Note: "hostname" is available as a dimension, which we will use to find maximum collection time for each host.
* **Step 2**: Create a **pre_transform_spec**
"pre_transform_spec" drives the pre-processing of monasca metric to record store format. Look
for existing example in
"/monasca-transform-source/monasca_transform/data_driven_specs/pre_transform_specs/pre_transform_specs.json"
**pre_transform_spec**
```
{
"event_processing_params":{
"set_default_zone_to":"1",
"set_default_geolocation_to":"1",
"set_default_region_to":"W"
},
"event_type":"monasca.collection_time_sec", <-- EDITED
"metric_id_list":["monasca_collection_host"], <-- EDITED
"required_raw_fields_list":["creation_time", "metric.dimensions.hostname"], <--EDITED
"service_id":"host_metrics"
}
```
Lets look at all the fields that were edited (Marked as `<-- EDITED` above):
**event_type**: set to "monasca.collection_time_sec". These are the metrics we want to
transform/aggregate.
**metric_id_list**: set to ['monasca_collection_host']. This is a transformation spec
identifier. During pre-processing record generator generates additional "record_store" data for
each item in this list. (To be renamed to transform_spec_list)
**required_raw_fields_list**: set to ["creation_time", "metric.dimensions.hostname"]
This should list fields in the incoming metrics that are required. Pre-processing will
eliminate or remove metrics which have missing required fields, during validation.
**service_id**: set to "host_metrics"
This identifies the source service these metrics belong to. (To be removed)
**Note:** "metric_id" is a misnomer, it is not really a metric identifier but rather identifier
for transformation spec. This will be changed to transform_spec_id in the future. Also
"service_id" should be set by the source that is generating the metric. This will be removed in
the future. (Please see Story [2001815](https://storyboard.openstack.org/#!/story/2001815))
* **Step 3**: Create a "transform_spec" to find maximum metric value for each host
"transform_spec" drives the aggregation of record store data created during pre-processing
to final aggregated metric. Look for existing example in
"/monasca-transform-source/monasca_transform/data_driven_specs/transform_specs/transform_specs.json"
**transform_spec**
```
{
"aggregation_params_map":{
"aggregation_pipeline":{
"source":"streaming",
"usage":"fetch_quantity", <-- EDITED
"setters":["set_aggregated_metric_name","set_aggregated_period"], <-- EDITED
"insert":["insert_data_pre_hourly"] <-- EDITED
},
"aggregated_metric_name":"monasca.collection_time_sec_host_agg", <-- EDITED
"aggregation_period":"hourly", <-- EDITED
"aggregation_group_by_list": ["host"],
"usage_fetch_operation": "max", <-- EDITED
"filter_by_list": [],
"dimension_list":["aggregation_period","host"], <-- EDITED
"pre_hourly_operation":"max",
"pre_hourly_group_by_list":["default"]},
"metric_group":"monasca_collection_host", <-- EDITED
"metric_id":"monasca_collection_host" <-- EDITED
}
```
Lets look at all the fields that were edited (Marked as `<-- EDITED` above):
aggregation pipeline fields
* **usage**: set to "fetch_quantity" Use "fetch_quantity" generic aggregation component. This
component takes a "aggregation_group_by_list", "usage_fetch_operation" and "filter_by_list" as
parameters.
* **aggregation_group_by_list** set to ["host"]. Since we want to find monasca agent
collection time for each host.
* **usage_fetch_operation** set to "max". Since we want to find maximum value for
monasca agent collection time.
* **filter_by_list** set to []. Since we dont want filter out/ignore any metrics (based on
say particular host or set of hosts)
* **setters**: set to ["set_aggregated_metric_name","set_aggregated_period"] These components set
aggregated metric name and aggregation period in final aggregated metric.
* **set_aggregated_metric_name** sets final aggregated metric name. This setter component takes
"aggregated_metric_name" as a parameter.
* **aggregated_metric_name**: set to "monasca.collection_time_sec_host_agg"
* **set_aggregated_period** sets final aggregated metric period. This setter component takes
"aggregation_period" as a parameter.
* **aggregation_period**: set to "hourly"
* **insert**: set to ["insert_data_pre_hourly"]. These components are responsible for
transforming instance usage data records to final metric format and writing the data to kafka
topic.
* **insert_data_pre_hourly** writes the to "metrics_pre_hourly" kafka topic, which gets
processed by the pre hourly processor every hour.
pre hourly processor fields
* **pre_hourly_operation** set to "max"
Find the hourly maximum value from records that were written to "metrics_pre_hourly" topic
* **pre_hourly_group_by_list** set to ["default"]
transformation spec identifier fields
* **metric_group** set to "monasca_collection_host". Group identifier for this transformation
spec
* **metric_id** set to "monasca_collection_host". Identifier for this transformation spec.
**Note:** metric_group" and "metric_id" are misnomers, it is not really a metric identifier but
rather identifier for transformation spec. This will be changed to "transform_group" and
"transform_spec_id" in the future. (Please see Story
[2001815](https://storyboard.openstack.org/#!/story/2001815))
* **Step 4**: Create a "transform_spec" to find maximum metric value across all hosts
Now let's create another transformation spec to find maximum metric value across all hosts.
**transform_spec**
```
{
"aggregation_params_map":{
"aggregation_pipeline":{
"source":"streaming",
"usage":"fetch_quantity", <-- EDITED
"setters":["set_aggregated_metric_name","set_aggregated_period"], <-- EDITED
"insert":["insert_data_pre_hourly"] <-- EDITED
},
"aggregated_metric_name":"monasca.collection_time_sec_all_agg", <-- EDITED
"aggregation_period":"hourly", <-- EDITED
"aggregation_group_by_list": [],
"usage_fetch_operation": "max", <-- EDITED
"filter_by_list": [],
"dimension_list":["aggregation_period"], <-- EDITED
"pre_hourly_operation":"max",
"pre_hourly_group_by_list":["default"]},
"metric_group":"monasca_collection_all", <-- EDITED
"metric_id":"monasca_collection_all" <-- EDITED
}
```
The transformation spec above is almost identical to transformation spec created in **Step 3**
with a few additional changes.
**aggregation_group_by_list** is set to [] i.e. empty list, since we want to find maximum value
across all hosts (consider all the incoming metric data).
**aggregated_metric_name** is set to "monasca.collection_time_sec_all_agg".
**metric_group** is set to "monasca_collection_all", since we need a new transfomation spec
group identifier.
**metric_id** is set to "monasca_collection_all", since we need a new transformation spec
identifier.
* **Step 5**: Update "pre_transform_spec" with new transformation spec identifier
In **Step 4** we created a new transformation spec, with new "metric_id", namely
"monasca_collection_all". We will have to now update the "pre_transform_spec" that we
created in **Step 2** with new "metric_id" by adding it to the "metric_id_list"
**pre_transform_spec**
```
{
"event_processing_params":{
"set_default_zone_to":"1",
"set_default_geolocation_to":"1",
"set_default_region_to":"W"
},
"event_type":"monasca.collection_time_sec",
"metric_id_list":["monasca_collection_host", "monasca_collection_all"], <-- EDITED
"required_raw_fields_list":["creation_time", "metric.dimensions.hostname"],
"service_id":"host_metrics"
}
```
Thus we were able to add additional transformation or aggregation pipeline to the same incoming
monasca metric very easily.
* **Step 6**: Update "pre_transform_spec" and "transform_spec"
* Edit
"/monasca-transform-source/monasca_transform/data_driven_specs/pre_transform_specs/pre_transform_specs.json"
and add following line.
```
{"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},"event_type":"monasca.collection_time_sec","metric_id_list":["monasca_collection_host","monasca_collection_all"],"required_raw_fields_list":["creation_time"],"service_id":"host_metrics"}
```
**Note:** Each line does not end with a comma (the file is not one big json document).
* Edit
"/monasca-transform-source/monasca_transform/data_driven_specs/transform_specs/transform_specs.json"
and add following lines.
```
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["insert_data_pre_hourly"]},"aggregated_metric_name":"monasca.collection_time_sec_host_agg","aggregation_period":"hourly","aggregation_group_by_list":["host"],"usage_fetch_operation":"max","filter_by_list":[],"dimension_list":["aggregation_period","host"],"pre_hourly_operation":"max","pre_hourly_group_by_list":["default"]},"metric_group":"monasca_collection_host","metric_id":"monasca_collection_host"}
{"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["insert_data_pre_hourly"]},"aggregated_metric_name":"monasca.collection_time_sec_all_agg","aggregation_period":"hourly","aggregation_group_by_list":[],"usage_fetch_operation":"max","filter_by_list":[],"dimension_list":["aggregation_period"],"pre_hourly_operation":"max","pre_hourly_group_by_list":["default"]},"metric_group":"monasca_collection_all","metric_id":"monasca_collection_all"}
```
* Run "refresh_monasca_transform.sh" script as documented in devstack
[README](devstack/README.md) to refresh the specs in the database.
```
vagrant@devstack:~$ cd /opt/stack/monasca-transform
vagrant@devstack:/opt/stack/monasca-transform$ tools/vagrant/refresh_monasca_transform.sh
```
If successful, you should see this message.
```
***********************************************
* *
* SUCCESS!! refresh monasca transform done. *
* *
***********************************************
```
* **Step 7**: Verifying results
To verify if new aggregated metrics are being produced you can look at the "metrics_pre_hourly"
topic in kafka. By default, monasca-transform fires of a batch every 10 minutes so you should
see metrics in intermediate "instance_usage" data format being published to that topic every 10
minutes.
```
/opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic metrics_pre_hourly
{"usage_hour":"06","geolocation":"NA","record_count":40.0,"app":"NA","deployment":"NA","resource_uuid":"NA",
"pod_name":"NA","usage_minute":"NA","service_group":"NA","lastrecord_timestamp_string":"2018-04-1106:29:49",
"user_id":"NA","zone":"NA","namespace":"NA","usage_date":"2018-04-11","daemon_set":"NA","processing_meta":{
"event_type":"NA","metric_id":"monasca_collection_all"},
"firstrecord_timestamp_unix":1523427604.208577,"project_id":"NA","lastrecord_timestamp_unix":1523428189.718174,
"aggregation_period":"hourly","host":"NA","container_name":"NA","interface":"NA",
"aggregated_metric_name":"monasca.collection_time_sec_all_agg","tenant_id":"NA","region":"NA",
"firstrecord_timestamp_string":"2018-04-11 06:20:04","service_id":"NA","quantity":0.0687000751}
{"usage_hour":"06","geolocation":"NA","record_count":40.0,"app":"NA","deployment":"NA","resource_uuid":"NA",
"pod_name":"NA","usage_minute":"NA","service_group":"NA","lastrecord_timestamp_string":"2018-04-11 06:29:49",
"user_id":"NA","zone":"NA","namespace":"NA","usage_date":"2018-04-11","daemon_set":"NA","processing_meta":{
"event_type":"NA","metric_id":"monasca_collection_host"},"firstrecord_timestamp_unix":1523427604.208577,
"project_id":"NA","lastrecord_timestamp_unix":1523428189.718174,"aggregation_period":"hourly",
"host":"devstack","container_name":"NA","interface":"NA",
"aggregated_metric_name":"monasca.collection_time_sec_host_agg","tenant_id":"NA","region":"NA",
"firstrecord_timestamp_string":"2018-04-11 06:20:04","service_id":"NA","quantity":0.0687000751}
```
Similarly, to verify if final aggregated metrics are being published by pre hourly processor,
you can look at "metrics" topic in kafka. By default pre hourly processor (which processes
metrics from "metrics_pre_hourly" topic) runs 10 minutes past the top of the hour.
```
/opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic metrics | grep "_agg"
{"metric":{"timestamp":1523459468616,"value_meta":{"firstrecord_timestamp_string":"2018-04-11 14:00:13",
"lastrecord_timestamp_string":"2018-04-11 14:59:46","record_count":239.0},"name":"monasca.collection_time_sec_host_agg",
"value":0.1182248592,"dimensions":{"aggregation_period":"hourly","host":"devstack"}},
"meta":{"region":"useast","tenantId":"df89c3db21954b08b0516b4b60b8baff"},"creation_time":1523459468}
{"metric":{"timestamp":1523455872740,"value_meta":{"firstrecord_timestamp_string":"2018-04-11 13:00:10",
"lastrecord_timestamp_string":"2018-04-11 13:59:58","record_count":240.0},"name":"monasca.collection_time_sec_all_agg",
"value":0.0898442268,"dimensions":{"aggregation_period":"hourly"}},
"meta":{"region":"useast","tenantId":"df89c3db21954b08b0516b4b60b8baff"},"creation_time":1523455872}
```
As you can see monaca-transform created two new aggregated metrics with name
"monasca.collection_time_sec_host_agg" and "monasca.collection_time_sec_all_agg". "value_meta"
section has three fields "firstrecord_timestamp" and "lastrecord_timestamp" and
"record_count". These fields are for informational purposes only. It shows timestamp of the first metric,
timestamp of the last metric and number of metrics that went into the calculation of the aggregated
metric.

129
docs/data_formats.md Normal file
View File

@ -0,0 +1,129 @@
Team and repository tags
========================
[![Team and repositorytags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html)
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
- [Monasca Transform Data Formats](#monasca-transform-data-formats)
- [Record Store Data Format](#record-store-data-format)
- [Instance Usage Data Format](#instance-usage-data-format)
- [References](#references)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
# Monasca Transform Data Formats
There are two data formats used by monasca transform. The following sections describes the schema
(Spark's DataFrame[1] schema) for the two formats.
Note: These are internal formats used by Monasca Transform when aggregating data. If you are a user
who wants to create new aggregation pipeline using an existing framework, you don't need to know or
care about these two formats.
As a developer, if you want to write new aggregation components then you might have to know how to
enhance the record store data format or instance usage data format with additional fields that you
may need or to write new aggregation components that aggregate data from the additional fields.
**Source Metric**
This is an example monasca metric. Monasca metric is transformed into `record_store` data format and
later transformed/aggregated using re-usable generic aggregation components, to derive
'instance_usage` data format.
Example of a monasca metric:
```
{"metric":{"timestamp":1523323485360.6650390625,
"name":"monasca.collection_time_sec",
"dimensions":{"hostname":"devstack",
"component":"monasca-agent",
"service":"monitoring"},
"value":0.0340659618,
"value_meta":null},
"meta":{"region":"RegionOne","tenantId":"d6bece1bbeff47c1b8734cd4e544dc02"},
"creation_time":1523323489}
```
## Record Store Data Format ##
Data Frame Schema:
| Column Name | Column Data Type | Description |
| ----------- | ---------------- | ----------- |
| event_quantity | `pyspark.sql.types.DoubleType` | mapped to `metric.value`|
| event_timestamp_unix | `pyspark.sql.types.DoubleType` | calculated as `metric.timestamp`/`1000` from source metric|
| event_timestamp_string | `pyspark.sql.types.StringType` | mapped to `metric.timestamp` from the source metric|
| event_type | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `metric.name` from source metric|
| event_quantity_name | `pyspark.sql.types.StringType` | mapped to `metric.name` from source metric|
| event_status | `pyspark.sql.types.StringType` | placeholder for the future. Currently mapped to `metric.dimensions.state` from the source metric |
| event_version | `pyspark.sql.types.StringType` | placeholder for the future. Set to "1.0" |
| record_type | `pyspark.sql.types.StringType` | placeholder for the future. Set to "metrics" |
| resource_uuid | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.instanceId` or `metric.dimensions.resource_id` from source metric |
| tenant_id | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.tenant_id` or `metric.dimensions.tenantid` or `metric.dimensions.project_id` |
| user_id | `pyspark.sql.types.StringType` | mapped to `meta.userId` |
| region | `pyspark.sql.types.StringType` | placeholder of the future. mapped to `meta.region`, defaults to `event_processing_params.set_default_region_to` (`pre_transform_spec`) |
| zone | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `meta.zone`, defaults to `event_processing_params.set_default_zone_to` (`pre_transform_spec`) |
| host | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.hostname` or `metric.value_meta.host` |
| project_id | `pyspark.sql.types.StringType` | mapped to metric tenant_id |
| service_group | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `service_id` in `pre_transform_spec` |
| service_id | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `service_id` in `pre_transform_spec` |
| event_date | `pyspark.sql.types.StringType` | "YYYY-mm-dd". Extracted from `metric.timestamp` |
| event_hour | `pyspark.sql.types.StringType` | "HH". Extracted from `metric.timestamp` |
| event_minute | `pyspark.sql.types.StringType` | "MM". Extracted from `metric.timestamp` |
| event_second | `pyspark.sql.types.StringType` | "SS". Extracted from `metric.timestamp` |
| metric_group | `pyspark.sql.types.StringType` | identifier for transform spec group |
| metric_id | `pyspark.sql.types.StringType` | identifier for transform spec |
| namespace | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.namespace` |
| pod_name | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.pod_name` |
| app | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.app` |
| container_name | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.container_name`|
| interface | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.interface` |
| deployment | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.deployment` |
| daemon_set | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.daemon_set` |
## Instance Usage Data Format ##
Data Frame Schema:
| Column Name | Column Data Type | Description |
| ----------- | ---------------- | ----------- |
| tenant_id | `pyspark.sql.types.StringType` | project_id, defaults to `NA` |
| user_id | `pyspark.sql.types.StringType` | user_id, defaults to `NA`|
| resource_uuid | `pyspark.sql.types.StringType` | resource_id, defaults to `NA`|
| geolocation | `pyspark.sql.types.StringType` | placeholder for future, defaults to `NA`|
| region | `pyspark.sql.types.StringType` | placeholder for future, defaults to `NA`|
| zone | `pyspark.sql.types.StringType` | placeholder for future, defaults to `NA`|
| host | `pyspark.sql.types.StringType` | compute hostname, defaults to `NA`|
| project_id | `pyspark.sql.types.StringType` | project_id, defaults to `NA`|
| aggregated_metric_name | `pyspark.sql.types.StringType` | aggregated metric name, defaults to `NA`|
| firstrecord_timestamp_string | `pyspark.sql.types.StringType` | timestamp of the first metric used to derive this aggregated metric|
| lastrecord_timestamp_string | `pyspark.sql.types.StringType` | timestamp of the last metric used to derive this aggregated metric|
| service_group | `pyspark.sql.types.StringType` | placeholder for the future, defaults to `NA`|
| service_id | `pyspark.sql.types.StringType` | placeholder for the future, defaults to `NA`|
| usage_date | `pyspark.sql.types.StringType` | "YYYY-mm-dd" date|
| usage_hour | `pyspark.sql.types.StringType` | "HH" hour|
| usage_minute | `pyspark.sql.types.StringType` | "MM" minute|
| aggregation_period | `pyspark.sql.types.StringType` | "hourly" or "minutely" |
| namespace | `pyspark.sql.types.StringType` | |
| pod_name | `pyspark.sql.types.StringType` | |
| app | `pyspark.sql.types.StringType` | |
| container_name | `pyspark.sql.types.StringType` | |
| interface | `pyspark.sql.types.StringType` | |
| deployment | `pyspark.sql.types.StringType` | |
| daemon_set | `pyspark.sql.types.StringType` | |
| firstrecord_timestamp_unix | `pyspark.sql.types.DoubleType` | epoch timestamp of the first metric used to derive this aggregated metric |
| lastrecord_timestamp_unix | `pyspark.sql.types.DoubleType` | epoch timestamp of the first metric used to derive this aggregated metric |
| quantity | `pyspark.sql.types.DoubleType` | aggregated metric quantity |
| record_count | `pyspark.sql.types.DoubleType` | number of source metrics that were used to derive this aggregated metric. For informational purposes only. |
| processing_meta | `pyspark.sql.types.MapType(pyspark.sql.types.StringType, pyspark.sql.types.StringType, True)` | Key-Value pairs to store additional information, to aid processing|
## References
[1] [Spark SQL, DataFrames and Datasets
Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)
[2] [Spark
DataTypes](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.types.DataType)

View File

@ -0,0 +1,632 @@
Team and repository tags
========================
[![Team and repository tags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html)
<!-- Change things from this point on -->
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
- [Monasca Transform Generic Aggregation Components](#monasca-transform-generic-aggregation-components)
- [Monasca Transform Generic Aggregation Components](#monasca-transform-generic-aggregation-components)
- [Introduction](#introduction)
- [1: Conversion of incoming metrics to record store data format](#1-conversion-of-incoming-metrics-to-record-store-data-format)
- [Pre Transform Spec](#pre-transform-spec)
- [2: Data aggregation using generic aggregation components](#2-data-aggregation-using-generic-aggregation-components)
- [Transform Specs](#transform-specs)
- [aggregation_params_map](#aggregation_params_map)
- [aggregation_pipeline](#aggregation_pipeline)
- [Other parameters](#other-parameters)
- [metric_group and metric_id](#metric_group-and-metric_id)
- [Generic Aggregation Components](#generic-aggregation-components)
- [Usage Components](#usage-components)
- [fetch_quantity](#fetch_quantity)
- [fetch_quantity_util](#fetch_quantity_util)
- [calculate_rate](#calculate_rate)
- [Setter Components](#setter-components)
- [set_aggregated_metric_name](#set_aggregated_metric_name)
- [set_aggregated_period](#set_aggregated_period)
- [rollup_quantity](#rollup_quantity)
- [Insert Components](#insert-components)
- [insert_data](#insert_data)
- [insert_data_pre_hourly](#insert_data_pre_hourly)
- [Processors](#processors)
- [pre_hourly_processor](#pre_hourly_processor)
- [Putting it all together](#putting-it-all-together)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
# Monasca Transform Generic Aggregation Components
# Introduction
Monasca Transform uses standard ETL (Extract-Transform-Load) design pattern to aggregate monasca
metrics and uses innovative data/configuration driven mechanism to drive processing. It accomplishes
data aggregation in two distinct steps, each is driven using external configuration specifications,
namely *pre-transform_spec* and *transform_spec*.
## 1: Conversion of incoming metrics to record store data format ##
In the first step, the incoming metrics are converted into a canonical data format called as record
store data using *pre_transform_spec*.
This logical processing data flow is explained in more detail in [Monasca/Transform wiki: Logical
processing data flow section: Conversion to record store
format](https://wiki.openstack.org/wiki/Monasca/Transform#Logical_processing_data_flow) and includes
following operations:
* identifying metrics that are required (or in other words filtering out of unwanted metrics)
* validation and extraction of essential data in metric
* generating multiple records for incoming metrics if they are to be aggregated in multiple ways,
and finally
* conversion of the incoming metrics to canonical record store data format. Please refer to record
store section in [Data Formats](data_formats.md) for more information on record store format.
### Pre Transform Spec ###
Example *pre_transform_spec* for metric
```
{
"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},
"event_type":"cpu.total_logical_cores",
"metric_id_list":["cpu_total_all","cpu_total_host","cpu_util_all","cpu_util_host"],
"required_raw_fields_list":["creation_time"],
"service_id":"host_metrics"
}
```
*List of fields*
| field name | values | description |
| ---------- | ------ | ----------- |
| event_processing_params | Set default field values `set_default_zone_to`, `set_default_geolocation_to`, `set_default_region_to`| Set default values for certain fields in the record store data |
| event_type | Name of the metric | identifies metric that needs to be aggregated |
| metric_id_list | List of `metric_id`'s | List of identifiers, should match `metric_id` in transform specs. This is used by record generation step to generate multiple records if this metric is to be aggregated in multiple ways|
| required_raw_fields_list | List of `field` | List of fields (use JSON dotted notation) that are required in the incoming metric, used for validating incoming metric |
| service_id | service identifier | Identifies the service to which this metric belongs to. Note: this field not yet used |
## 2: Data aggregation using generic aggregation components ##
In the second step, the canonical record store data is aggregated using *transform_spec*. Each
*transform_spec* defines series of generic aggregation components, which are specified in
`aggregation_params_map.aggregation_pipeline` section. (See *transform_spec* example below).
Any parameters used by the generic aggregation components are also specified in the
`aggregation_params_map` section (See *Other parameters* e.g. `aggregated_metric_name`, `aggregation_period`,
`aggregation_group_by_list` etc. in *transform_spec* example below)
### Transform Specs ###
Example *transform_spec* for metric
```
{"aggregation_params_map":{
"aggregation_pipeline":{
"source":"streaming",
"usage":"fetch_quantity",
"setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],
"insert":["prepare_data","insert_data_pre_hourly"]
},
"aggregated_metric_name":"cpu.total_logical_cores_agg",
"aggregation_period":"hourly",
"aggregation_group_by_list": ["host", "metric_id", "tenant_id"],
"usage_fetch_operation": "avg",
"filter_by_list": [],
"setter_rollup_group_by_list": [],
"setter_rollup_operation": "sum",
"dimension_list":["aggregation_period","host","project_id"],
"pre_hourly_operation":"avg",
"pre_hourly_group_by_list":["default"]
},
"metric_group":"cpu_total_all",
"metric_id":"cpu_total_all"
}
```
#### aggregation_params_map ####
This section specifies *aggregation_pipeline*, *Other parameters* (used by generic aggregation
components in *aggregation_pipeline*).
##### aggregation_pipeline #####
Specifies generic aggregation components that should be used to process incoming metrics.
Note: generic aggregation components are re-usable and can be used to build different aggregation
pipelines as required.
*List of fields*
| field name | values | description |
| ---------- | ------ | ----------- |
| source | ```streaming``` | source is ```streaming```. In the future this can be used to specify a component which can fetch data directly from monasca datastore |
| usage | ```fetch_quantity```, ```fetch_quantity_util```, ```calculate_rate``` | [Usage Components](https://github.com/openstack/monasca-transform/tree/master/monasca_transform/component/usage)|
| setters | ```pre_hourly_calculate_rate```, ```rollup_quantity```, ```set_aggregated_metric_name```, ```set_aggregated_period``` | [Setter Components](https://github.com/openstack/monasca-transform/tree/master/monasca_transform/component/setter)|
| insert | ```insert_data```, ```insert_data_pre_hourly``` | [Insert Components](https://github.com/openstack/monasca-transform/tree/master/monasca_transform/component/insert)|
##### Other parameters #####
Specifies parameters that generic aggregation components use to process and aggregate data.
*List of Other parameters*
| Parameter Name | Values | Description | Used by |
| -------------- | ------ | ----------- | ------- |
| aggregated_metric_name| e.g. "cpu.total_logical_cores_agg" | Name of the aggregated metric | [set_aggregated_metric_name](#set_aggregated_metric_name) |
| aggregation_period |"hourly", "minutely" or "secondly" | Period over which to aggregate data. | [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate), [set_aggregated_period](#set_aggregated_period), [rollup_quantity](#rollup_quantity) |[fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) |
| aggregation_group_by_list | e.g. "project_id", "hostname" | Group `record_store` data with these columns | [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) |
| usage_fetch_operation | e.g "sum" | After the data is grouped by `aggregation_group_by_list`, perform this operation to find the aggregated metric value | [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) |
| filter_by_list | Filter regex | Filter data using regex on a `record_store` column value| [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) |
| setter_rollup_group_by_list | e.g. "project_id" | Group by these set of fields | [rollup_quantity](#rollup_quantity) |
| setter_rollup_operation | e.g. "avg" | After data is grouped by `setter_rollup_group_by_list`, peform this operation to find aggregated metric value | [rollup_quantity](#rollup_quantity) |
| dimension_list | e.g. "aggregation_period", "host", "project_id" | List of fields which specify dimensions in aggregated metric | [insert_data](#insert_data), [insert_data_pre_hourly](#insert_data_pre_hourly)|
| pre_hourly_group_by_list | e.g. "default" | List of `instance usage data` fields to do a group by operation to aggregate data | [pre_hourly_processor](#pre_hourly_processor) |
| pre_hourly_operation | e.g. "avg" | When aggregating data published to `metrics_pre_hourly` every hour, perform this operation to find hourly aggregated metric value | [pre_hourly_processor](#pre_hourly_processor) |
### metric_group and metric_id ###
Specifies a metric or list of metrics from the record store data, which will be processed by this
*transform_spec*. Note: This can be a single metric or a group of metrics that will be combined to
produce the final aggregated metric.
*List of fields*
| field name | values | description |
| ---------- | ------ | ----------- |
| metric_group | unique transform spec group identifier | group identifier for this transform spec e.g. "cpu_total_all" |
| metric_id | unique transform spec identifier | identifier for this transform spec e.g. "cpu_total_all" |
**Note:** "metric_id" is a misnomer, it is not really a metric group/or metric identifier but rather
identifier for transformation spec. This will be changed to "transform_spec_id" in the future.
## Generic Aggregation Components ##
*List of Generic Aggregation Components*
### Usage Components ###
All usage components implement a method
```
def usage(transform_context, record_store_df):
..
..
return instance_usage_df
```
#### fetch_quantity ####
This component groups record store records by `aggregation_group_by_list`, sorts within
group by timestamp field, finds usage based on `usage_fetch_operation`. Optionally this
component also takes `filter_by_list` to include for exclude certain records from usage
calculation.
*Other parameters*
* **aggregation_group_by_list**
List of fields to group by.
Possible values: any set of fields in record store data.
Example:
```
"aggregation_group_by_list": ["tenant_id"]
```
* **usage_fetch_operation**
Operation to be performed on grouped data set.
*Possible values:* "sum", "max", "min", "avg", "latest", "oldest"
* **aggregation_period**
Period to aggregate by.
*Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
Example:
```
"aggregation_period": "hourly"
```
* **filter_by_list**
Filter (include or exclude) record store data as specified.
Example:
```
filter_by_list": "[{"field_to_filter": "hostname",
"filter_expression": "comp-(\d)+",
"filter_operation": "include"}]
```
OR
```
filter_by_list": "[{"field_to_filter": "hostname",
"filter_expression": "controller-(\d)+",
"filter_operation": "exclude"}]
```
#### fetch_quantity_util ####
This component finds the utilized quantity based on *total_quantity* and *idle_perc* using
following calculation
```
utilized_quantity = (100 - idle_perc) * total_quantity / 100
```
where,
* **total_quantity** data, identified by `usage_fetch_util_quantity_event_type` parameter and
* **idle_perc** data, identified by `usage_fetch_util_idle_perc_event_type` parameter
This component initially groups record store records by `aggregation_group_by_list` and
`event_type`, sorts within group by timestamp field, calculates `total_quantity` and
`idle_perc` values based on `usage_fetch_operation`. `utilized_quantity` is then calculated
using the formula given above.
*Other parameters*
* **aggregation_group_by_list**
List of fields to group by.
Possible values: any set of fields in record store data.
Example:
```
"aggregation_group_by_list": ["tenant_id"]
```
* **usage_fetch_operation**
Operation to be performed on grouped data set
*Possible values:* "sum", "max", "min", "avg", "latest", "oldest"
* **aggregation_period**
Period to aggregate by.
*Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
Example:
```
"aggregation_period": "hourly"
```
* **filter_by_list**
Filter (include or exclude) record store data as specified
Example:
```
filter_by_list": "[{"field_to_filter": "hostname",
"filter_expression": "comp-(\d)+",
"filter_operation": "include"}]
```
OR
```
filter_by_list": "[{"field_to_filter": "hostname",
"filter_expression": "controller-(\d)+",
"filter_operation": "exclude"}]
```
* **usage_fetch_util_quantity_event_type**
event type (metric name) to identify data which will be used to calculate `total_quantity`
*Possible values:* metric name
Example:
```
"usage_fetch_util_quantity_event_type": "cpu.total_logical_cores"
```
* **usage_fetch_util_idle_perc_event_type**
event type (metric name) to identify data which will be used to calculate `total_quantity`
*Possible values:* metric name
Example:
```
"usage_fetch_util_idle_perc_event_type": "cpu.idle_perc"
```
#### calculate_rate ####
This component finds the rate of change of quantity (in percent) over a time period using
following calculation
```
rate_of_change (in percent) = ((oldest_quantity - latest_quantity)/oldest_quantity) * 100
```
where,
* **oldest_quantity**: oldest (or earliest) `average` quantity if there are multiple quantites in a
group for a given time period.
* **latest_quantity**: latest `average` quantity if there are multiple quantities in a group
for a given time period
*Other parameters*
* **aggregation_group_by_list**
List of fields to group by.
Possible values: any set of fields in record store data.
Example:
```
"aggregation_group_by_list": ["tenant_id"]
```
* **usage_fetch_operation**
Operation to be performed on grouped data set
*Possible values:* "sum", "max", "min", "avg", "latest", "oldest"
* **aggregation_period**
Period to aggregate by.
*Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
Example:
```
"aggregation_period": "hourly"
```
* **filter_by_list**
Filter (include or exclude) record store data as specified
Example:
```
filter_by_list": "[{"field_to_filter": "hostname",
"filter_expression": "comp-(\d)+",
"filter_operation": "include"}]
```
OR
```
filter_by_list": "[{"field_to_filter": "hostname",
"filter_expression": "controller-(\d)+",
"filter_operation": "exclude"}]
```
### Setter Components ###
All usage components implement a method
```
def setter(transform_context, instance_usage_df):
..
..
return instance_usage_df
```
#### set_aggregated_metric_name ####
This component sets final aggregated metric name by setting `aggregated_metric_name` field in
`instance_usage` data.
*Other parameters*
* **aggregated_metric_name**
Name of the metric name being generated.
*Possible values:* any aggregated metric name. Convention is to end the metric name
with "_agg".
Example:
```
"aggregated_metric_name":"cpu.total_logical_cores_agg"
```
#### set_aggregated_period ####
This component sets final aggregated metric name by setting `aggregation_period` field in
`instance_usage` data.
*Other parameters*
* **aggregated_period**
Name of the metric name being generated.
*Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
Example:
```
"aggregation_period": "hourly"
```
**Note** If you are publishing metrics to *metrics_pre_hourly* kafka topic using
`insert_data_pre_hourly` component(See *insert_data_pre_hourly* component below),
`aggregation_period` will have to be set to `hourly`since by default all data in
*metrics_pre_hourly* topic, by default gets aggregated every hour by `Pre Hourly Processor` (See
`Processors` section below)
#### rollup_quantity ####
This component groups `instance_usage` records by `setter_rollup_group_by_list`, sorts within
group by timestamp field, finds usage based on `setter_fetch_operation`.
*Other parameters*
* **setter_rollup_group_by_list**
List of fields to group by.
Possible values: any set of fields in record store data.
Example:
```
"setter_rollup_group_by_list": ["tenant_id"]
```
* **setter_fetch_operation**
Operation to be performed on grouped data set
*Possible values:* "sum", "max", "min", "avg"
Example:
```
"setter_fetch_operation": "avg"
```
* **aggregation_period**
Period to aggregate by.
*Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
Example:
```
"aggregation_period": "hourly"
```
### Insert Components ###
All usage components implement a method
```
def insert(transform_context, instance_usage_df):
..
..
return instance_usage_df
```
#### insert_data ####
This component converts `instance_usage` data into monasca metric format and writes the metric to
`metrics` topic in kafka.
*Other parameters*
* **dimension_list**
List of fields in `instance_usage` data that should be converted to monasca metric dimensions.
*Possible values:* any fields in `instance_usage` data
Example:
```
"dimension_list":["aggregation_period","host","project_id"]
```
#### insert_data_pre_hourly ####
This component converts `instance_usage` data into monasca metric format and writes the metric to
`metrics_pre_hourly` topic in kafka.
*Other parameters*
* **dimension_list**
List of fields in `instance_usage` data that should be converted to monasca metric dimensions.
*Possible values:* any fields in `instance_usage` data
Example:
```
"dimension_list":["aggregation_period","host","project_id"]
```
## Processors ##
Processors are special components that process data from a kafka topic, at the desired time
interval. These are different from generic aggregation components since they process data from
specific kafka topic.
All processor components implement following methods
```
def get_app_name(self):
[...]
return app_name
def is_time_to_run(self, current_time):
if current_time > last_invoked + 1:
return True
else:
return False
def run_processor(self, time):
# do work...
```
### pre_hourly_processor ###
Pre Hourly Processor, runs every hour and aggregates `instance_usage` data published to
`metrics_pre_hourly` topic.
Pre Hourly Processor by default is set to run 10 minutes after the top of the hour and processes
data from previous hour. `instance_usage` data is grouped by `pre_hourly_group_by_list`
*Other parameters*
* **pre_hourly_group_by_list**
List of fields to group by.
Possible values: any set of fields in `instance_usage` data or to `default`
Note: setting to `default` will group `instance_usage` data by `tenant_id`, `user_id`,
`resource_uuid`, `geolocation`, `region`, `zone`, `host`, `project_id`,
`aggregated_metric_name`, `aggregation_period`
Example:
```
"pre_hourly_group_by_list": ["tenant_id"]
```
OR
```
"pre_hourly_group_by_list": ["default"]
```
* **pre_hourly_operation**
Operation to be performed on grouped data set.
*Possible values:* "sum", "max", "min", "avg", "rate"
Example:
```
"pre_hourly_operation": "avg"
```
# Putting it all together
Please refer to [Create a new aggregation pipeline](create-new-aggregation-pipeline.md) document to
create a new aggregation pipeline.