diff --git a/README.md b/README.md index 66154a3..5de1d51 100644 --- a/README.md +++ b/README.md @@ -3,10 +3,85 @@ Team and repository tags [![Team and repository tags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html) - + + + + +- [Monasca Transform](#monasca-transform) + - [Use Cases handled by Monasca Transform](#use-cases-handled-by-monasca-transform) + - [Operation](#operation) + - [Architecture](#architecture) + - [To set up the development environment](#to-set-up-the-development-environment) + - [Generic aggregation components](#generic-aggregation-components) + - [Create a new aggregation pipeline example](#create-a-new-aggregation-pipeline-example) + - [Original proposal and blueprint](#original-proposal-and-blueprint) + + # Monasca Transform -##To set up the development environment +monasca-transform is a data driven aggregation engine which collects, groups and aggregates existing +individual Monasca metrics according to business requirements and publishes new transformed +(derived) metrics to the Monasca Kafka queue. -The monasca-transform dev environment uses devstack so see the README in the devstack directory. + * Since the new transformed metrics are published as any other metric in Monasca, alarms can be + set and triggered on the transformed metric. + + * Monasca Transform uses [Apache Spark](http://spark.apache.org) to aggregate data. [Apache + Spark](http://spark.apache.org) is a highly scalable, fast, in-memory, fault tolerant and + parallel data processing framework. All monasca-transform components are implemented in Python + and use Spark's [PySpark Python API](http://spark.apache.org/docs/latest/api/python/index.html) + to interact with Spark. + + * Monasca Transform does transformation and aggregation of incoming metrics in two phases. + + * In the first phase spark streaming application is set to retrieve in data from kafka at a + configurable *stream interval* (default *stream_inteval* is 10 minutes) and write the data + aggregated for *stream interval* to *pre_hourly_metrics* topic in kafka. + + * In the second phase, which is kicked off every hour, all metrics in *metrics_pre_hourly* topic + in Kafka are aggregated again, this time over a larger interval of an hour. These hourly + aggregated metrics published to *metrics* topic in kafka. + +## Use Cases handled by Monasca Transform ## +Please refer to **Problem Description** section on the [Monasca/Transform +wiki](https://wiki.openstack.org/wiki/Monasca/Transform) + +## Operation ## +Please refer to **How Monasca Transform Operates** section on the [Monasca/Transform +wiki](https://wiki.openstack.org/wiki/Monasca/Transform) + +## Architecture ## +Please refer to **Architecture** and **Logical processing data flow** sections on the +[Monasca/Transform wiki](https://wiki.openstack.org/wiki/Monasca/Transform) + +## To set up the development environment ## +The monasca-transform uses [DevStack](https://docs.openstack.org/devstack/latest/) as a common dev +environment. See the [README.md](devstack/README.md) in the devstack directory for details on how +to include monasca-transform in a DevStack deployment. + +## Generic aggregation components ## + +Monasca Transform uses a set of generic aggregation components which can be assembled in to an +aggregation pipeline. + +Please refer to [generic aggregation components](docs/generic-aggregation-components.md) document for +information on list of generic aggregation components available. + +## Create a new aggregation pipeline example ## + +Generic aggregation components make it easy to build new aggregation pipelines for different Monasca +metrics. + +This create a [new aggregation pipeline](docs/create-new-aggregation-pipeline.md) example shows how to +create *pre_transform_specs* and *transform_specs* to create an aggregation pipeline for a new set +of Monasca metrics, while leveraging existing set of generic aggregation components. + + +## Original proposal and blueprint ## + +Original proposal: +[Monasca/Transform-proposal](https://wiki.openstack.org/wiki/Monasca/Transform-proposal) + +Blueprint: [monasca-transform +blueprint](https://blueprints.launchpad.net/monasca/+spec/monasca-transform) diff --git a/docs/create-new-aggregation-pipeline.md b/docs/create-new-aggregation-pipeline.md new file mode 100644 index 0000000..4ed2b7b --- /dev/null +++ b/docs/create-new-aggregation-pipeline.md @@ -0,0 +1,336 @@ +Team and repository tags +======================== + +[![Team and repository tags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html) + + + + + + +- [Create a new aggregation pipeline](#create-a-new-aggregation-pipeline) + - [Using existing generic aggregation components](#using-existing-generic-aggregation-components) + + + + + +# Create a new aggregation pipeline + +Monasca Transform allows you to create new aggregation by creating *pre_transform_spec* and +*transform_spec* for any set of Monasca metrics. This page gives you steps on how to create a new +aggregation pipeline and test the pipeline in your DevStack environment. + +Pre-requisite for following steps on this page is that you have already created a devstack +development environment for Monasca Transform, following instructions in +[devstack/README.md](devstack/README.md) + + +## Using existing generic aggregation components ## + +Most of the use cases will fall into this category where you should be able to create new +aggregation for new set of metrics using existing set of generic aggregation components. + +Let's consider a use case where we want to find out + +* Maximum time monasca-agent takes to submit metrics over a period of an hour across all hosts + +* Maximum time monasca-agent takes to submit metrics over period of a hour per host. + +We know that monasca-agent on each host generates a small number of +[monasca-agent metrics](https://github.com/openstack/monasca-agent/blob/master/docs/Plugins.md). + +The metric we are interested in is + +* **"monasca.collection_time_sec"**: Amount of time that the collector took for this collection run + +**Steps:** + + * **Step 1**: Identify the monasca metric to be aggregated from the Kafka topic + ``` + /opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic metrics | grep "monasca.collection_time_sec" + + {"metric":{"timestamp":1523323485360.6650390625,"name":"monasca.collection_time_sec", + "dimensions":{"hostname":"devstack","component":"monasca-agent", + "service":"monitoring"},"value":0.0340659618, "value_meta":null}, + "meta":{"region":"RegionOne","tenantId":"d6bece1bbeff47c1b8734cd4e544dc02"}, + "creation_time":1523323489} + ``` + Note: "hostname" is available as a dimension, which we will use to find maximum collection time for each host. + + * **Step 2**: Create a **pre_transform_spec** + + "pre_transform_spec" drives the pre-processing of monasca metric to record store format. Look + for existing example in + "/monasca-transform-source/monasca_transform/data_driven_specs/pre_transform_specs/pre_transform_specs.json" + + **pre_transform_spec** + ``` + { + "event_processing_params":{ + "set_default_zone_to":"1", + "set_default_geolocation_to":"1", + "set_default_region_to":"W" + }, + "event_type":"monasca.collection_time_sec", <-- EDITED + "metric_id_list":["monasca_collection_host"], <-- EDITED + "required_raw_fields_list":["creation_time", "metric.dimensions.hostname"], <--EDITED + "service_id":"host_metrics" + } + ``` + Lets look at all the fields that were edited (Marked as `<-- EDITED` above): + + **event_type**: set to "monasca.collection_time_sec". These are the metrics we want to + transform/aggregate. + + **metric_id_list**: set to ['monasca_collection_host']. This is a transformation spec + identifier. During pre-processing record generator generates additional "record_store" data for + each item in this list. (To be renamed to transform_spec_list) + + **required_raw_fields_list**: set to ["creation_time", "metric.dimensions.hostname"] + This should list fields in the incoming metrics that are required. Pre-processing will + eliminate or remove metrics which have missing required fields, during validation. + + **service_id**: set to "host_metrics" + This identifies the source service these metrics belong to. (To be removed) + + **Note:** "metric_id" is a misnomer, it is not really a metric identifier but rather identifier + for transformation spec. This will be changed to transform_spec_id in the future. Also + "service_id" should be set by the source that is generating the metric. This will be removed in + the future. (Please see Story [2001815](https://storyboard.openstack.org/#!/story/2001815)) + + * **Step 3**: Create a "transform_spec" to find maximum metric value for each host + + "transform_spec" drives the aggregation of record store data created during pre-processing + to final aggregated metric. Look for existing example in + "/monasca-transform-source/monasca_transform/data_driven_specs/transform_specs/transform_specs.json" + + **transform_spec** + ``` + { + "aggregation_params_map":{ + + "aggregation_pipeline":{ + "source":"streaming", + "usage":"fetch_quantity", <-- EDITED + "setters":["set_aggregated_metric_name","set_aggregated_period"], <-- EDITED + "insert":["insert_data_pre_hourly"] <-- EDITED + }, + + "aggregated_metric_name":"monasca.collection_time_sec_host_agg", <-- EDITED + "aggregation_period":"hourly", <-- EDITED + "aggregation_group_by_list": ["host"], + "usage_fetch_operation": "max", <-- EDITED + "filter_by_list": [], + "dimension_list":["aggregation_period","host"], <-- EDITED + + "pre_hourly_operation":"max", + "pre_hourly_group_by_list":["default"]}, + + "metric_group":"monasca_collection_host", <-- EDITED + "metric_id":"monasca_collection_host" <-- EDITED + } + ``` + Lets look at all the fields that were edited (Marked as `<-- EDITED` above): + + aggregation pipeline fields + + * **usage**: set to "fetch_quantity" Use "fetch_quantity" generic aggregation component. This + component takes a "aggregation_group_by_list", "usage_fetch_operation" and "filter_by_list" as + parameters. + * **aggregation_group_by_list** set to ["host"]. Since we want to find monasca agent + collection time for each host. + * **usage_fetch_operation** set to "max". Since we want to find maximum value for + monasca agent collection time. + * **filter_by_list** set to []. Since we dont want filter out/ignore any metrics (based on + say particular host or set of hosts) + + * **setters**: set to ["set_aggregated_metric_name","set_aggregated_period"] These components set + aggregated metric name and aggregation period in final aggregated metric. + * **set_aggregated_metric_name** sets final aggregated metric name. This setter component takes + "aggregated_metric_name" as a parameter. + * **aggregated_metric_name**: set to "monasca.collection_time_sec_host_agg" + * **set_aggregated_period** sets final aggregated metric period. This setter component takes + "aggregation_period" as a parameter. + * **aggregation_period**: set to "hourly" + + * **insert**: set to ["insert_data_pre_hourly"]. These components are responsible for + transforming instance usage data records to final metric format and writing the data to kafka + topic. + * **insert_data_pre_hourly** writes the to "metrics_pre_hourly" kafka topic, which gets + processed by the pre hourly processor every hour. + + pre hourly processor fields + + * **pre_hourly_operation** set to "max" + Find the hourly maximum value from records that were written to "metrics_pre_hourly" topic + + * **pre_hourly_group_by_list** set to ["default"] + + transformation spec identifier fields + + * **metric_group** set to "monasca_collection_host". Group identifier for this transformation + spec + + * **metric_id** set to "monasca_collection_host". Identifier for this transformation spec. + + **Note:** metric_group" and "metric_id" are misnomers, it is not really a metric identifier but + rather identifier for transformation spec. This will be changed to "transform_group" and + "transform_spec_id" in the future. (Please see Story + [2001815](https://storyboard.openstack.org/#!/story/2001815)) + + * **Step 4**: Create a "transform_spec" to find maximum metric value across all hosts + + Now let's create another transformation spec to find maximum metric value across all hosts. + + **transform_spec** + ``` + { + "aggregation_params_map":{ + + "aggregation_pipeline":{ + "source":"streaming", + "usage":"fetch_quantity", <-- EDITED + "setters":["set_aggregated_metric_name","set_aggregated_period"], <-- EDITED + "insert":["insert_data_pre_hourly"] <-- EDITED + }, + + "aggregated_metric_name":"monasca.collection_time_sec_all_agg", <-- EDITED + "aggregation_period":"hourly", <-- EDITED + "aggregation_group_by_list": [], + "usage_fetch_operation": "max", <-- EDITED + "filter_by_list": [], + "dimension_list":["aggregation_period"], <-- EDITED + + "pre_hourly_operation":"max", + "pre_hourly_group_by_list":["default"]}, + + "metric_group":"monasca_collection_all", <-- EDITED + "metric_id":"monasca_collection_all" <-- EDITED + } + ``` + + The transformation spec above is almost identical to transformation spec created in **Step 3** + with a few additional changes. + + **aggregation_group_by_list** is set to [] i.e. empty list, since we want to find maximum value + across all hosts (consider all the incoming metric data). + + **aggregated_metric_name** is set to "monasca.collection_time_sec_all_agg". + + **metric_group** is set to "monasca_collection_all", since we need a new transfomation spec + group identifier. + + **metric_id** is set to "monasca_collection_all", since we need a new transformation spec + identifier. + + * **Step 5**: Update "pre_transform_spec" with new transformation spec identifier + + In **Step 4** we created a new transformation spec, with new "metric_id", namely + "monasca_collection_all". We will have to now update the "pre_transform_spec" that we + created in **Step 2** with new "metric_id" by adding it to the "metric_id_list" + + **pre_transform_spec** + ``` + { + "event_processing_params":{ + "set_default_zone_to":"1", + "set_default_geolocation_to":"1", + "set_default_region_to":"W" + }, + "event_type":"monasca.collection_time_sec", + "metric_id_list":["monasca_collection_host", "monasca_collection_all"], <-- EDITED + "required_raw_fields_list":["creation_time", "metric.dimensions.hostname"], + "service_id":"host_metrics" + } + ``` + Thus we were able to add additional transformation or aggregation pipeline to the same incoming + monasca metric very easily. + + * **Step 6**: Update "pre_transform_spec" and "transform_spec" + + * Edit + "/monasca-transform-source/monasca_transform/data_driven_specs/pre_transform_specs/pre_transform_specs.json" + and add following line. + + ``` + {"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},"event_type":"monasca.collection_time_sec","metric_id_list":["monasca_collection_host","monasca_collection_all"],"required_raw_fields_list":["creation_time"],"service_id":"host_metrics"} + ``` + + **Note:** Each line does not end with a comma (the file is not one big json document). + + * Edit + "/monasca-transform-source/monasca_transform/data_driven_specs/transform_specs/transform_specs.json" + and add following lines. + + ``` + {"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["insert_data_pre_hourly"]},"aggregated_metric_name":"monasca.collection_time_sec_host_agg","aggregation_period":"hourly","aggregation_group_by_list":["host"],"usage_fetch_operation":"max","filter_by_list":[],"dimension_list":["aggregation_period","host"],"pre_hourly_operation":"max","pre_hourly_group_by_list":["default"]},"metric_group":"monasca_collection_host","metric_id":"monasca_collection_host"} + {"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["insert_data_pre_hourly"]},"aggregated_metric_name":"monasca.collection_time_sec_all_agg","aggregation_period":"hourly","aggregation_group_by_list":[],"usage_fetch_operation":"max","filter_by_list":[],"dimension_list":["aggregation_period"],"pre_hourly_operation":"max","pre_hourly_group_by_list":["default"]},"metric_group":"monasca_collection_all","metric_id":"monasca_collection_all"} + ``` + + * Run "refresh_monasca_transform.sh" script as documented in devstack + [README](devstack/README.md) to refresh the specs in the database. + ``` + vagrant@devstack:~$ cd /opt/stack/monasca-transform + vagrant@devstack:/opt/stack/monasca-transform$ tools/vagrant/refresh_monasca_transform.sh + ``` + + If successful, you should see this message. + ``` + *********************************************** + * * + * SUCCESS!! refresh monasca transform done. * + * * + *********************************************** + ``` + * **Step 7**: Verifying results + + To verify if new aggregated metrics are being produced you can look at the "metrics_pre_hourly" + topic in kafka. By default, monasca-transform fires of a batch every 10 minutes so you should + see metrics in intermediate "instance_usage" data format being published to that topic every 10 + minutes. + ``` + /opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic metrics_pre_hourly + + {"usage_hour":"06","geolocation":"NA","record_count":40.0,"app":"NA","deployment":"NA","resource_uuid":"NA", + "pod_name":"NA","usage_minute":"NA","service_group":"NA","lastrecord_timestamp_string":"2018-04-1106:29:49", + "user_id":"NA","zone":"NA","namespace":"NA","usage_date":"2018-04-11","daemon_set":"NA","processing_meta":{ + "event_type":"NA","metric_id":"monasca_collection_all"}, + "firstrecord_timestamp_unix":1523427604.208577,"project_id":"NA","lastrecord_timestamp_unix":1523428189.718174, + "aggregation_period":"hourly","host":"NA","container_name":"NA","interface":"NA", + "aggregated_metric_name":"monasca.collection_time_sec_all_agg","tenant_id":"NA","region":"NA", + "firstrecord_timestamp_string":"2018-04-11 06:20:04","service_id":"NA","quantity":0.0687000751} + + {"usage_hour":"06","geolocation":"NA","record_count":40.0,"app":"NA","deployment":"NA","resource_uuid":"NA", + "pod_name":"NA","usage_minute":"NA","service_group":"NA","lastrecord_timestamp_string":"2018-04-11 06:29:49", + "user_id":"NA","zone":"NA","namespace":"NA","usage_date":"2018-04-11","daemon_set":"NA","processing_meta":{ + "event_type":"NA","metric_id":"monasca_collection_host"},"firstrecord_timestamp_unix":1523427604.208577, + "project_id":"NA","lastrecord_timestamp_unix":1523428189.718174,"aggregation_period":"hourly", + "host":"devstack","container_name":"NA","interface":"NA", + "aggregated_metric_name":"monasca.collection_time_sec_host_agg","tenant_id":"NA","region":"NA", + "firstrecord_timestamp_string":"2018-04-11 06:20:04","service_id":"NA","quantity":0.0687000751} + ``` + + Similarly, to verify if final aggregated metrics are being published by pre hourly processor, + you can look at "metrics" topic in kafka. By default pre hourly processor (which processes + metrics from "metrics_pre_hourly" topic) runs 10 minutes past the top of the hour. + ``` + /opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic metrics | grep "_agg" + + {"metric":{"timestamp":1523459468616,"value_meta":{"firstrecord_timestamp_string":"2018-04-11 14:00:13", + "lastrecord_timestamp_string":"2018-04-11 14:59:46","record_count":239.0},"name":"monasca.collection_time_sec_host_agg", + "value":0.1182248592,"dimensions":{"aggregation_period":"hourly","host":"devstack"}}, + "meta":{"region":"useast","tenantId":"df89c3db21954b08b0516b4b60b8baff"},"creation_time":1523459468} + + {"metric":{"timestamp":1523455872740,"value_meta":{"firstrecord_timestamp_string":"2018-04-11 13:00:10", + "lastrecord_timestamp_string":"2018-04-11 13:59:58","record_count":240.0},"name":"monasca.collection_time_sec_all_agg", + "value":0.0898442268,"dimensions":{"aggregation_period":"hourly"}}, + "meta":{"region":"useast","tenantId":"df89c3db21954b08b0516b4b60b8baff"},"creation_time":1523455872} + ``` + + As you can see monaca-transform created two new aggregated metrics with name + "monasca.collection_time_sec_host_agg" and "monasca.collection_time_sec_all_agg". "value_meta" + section has three fields "firstrecord_timestamp" and "lastrecord_timestamp" and + "record_count". These fields are for informational purposes only. It shows timestamp of the first metric, + timestamp of the last metric and number of metrics that went into the calculation of the aggregated + metric. diff --git a/docs/data_formats.md b/docs/data_formats.md new file mode 100644 index 0000000..d611030 --- /dev/null +++ b/docs/data_formats.md @@ -0,0 +1,129 @@ +Team and repository tags +======================== + +[![Team and repositorytags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html) + + + + + +- [Monasca Transform Data Formats](#monasca-transform-data-formats) + - [Record Store Data Format](#record-store-data-format) + - [Instance Usage Data Format](#instance-usage-data-format) + - [References](#references) + + + +# Monasca Transform Data Formats + +There are two data formats used by monasca transform. The following sections describes the schema +(Spark's DataFrame[1] schema) for the two formats. + +Note: These are internal formats used by Monasca Transform when aggregating data. If you are a user +who wants to create new aggregation pipeline using an existing framework, you don't need to know or +care about these two formats. + +As a developer, if you want to write new aggregation components then you might have to know how to +enhance the record store data format or instance usage data format with additional fields that you +may need or to write new aggregation components that aggregate data from the additional fields. + +**Source Metric** + +This is an example monasca metric. Monasca metric is transformed into `record_store` data format and +later transformed/aggregated using re-usable generic aggregation components, to derive +'instance_usage` data format. + +Example of a monasca metric: + +``` +{"metric":{"timestamp":1523323485360.6650390625, + "name":"monasca.collection_time_sec", + "dimensions":{"hostname":"devstack", + "component":"monasca-agent", + "service":"monitoring"}, + "value":0.0340659618, + "value_meta":null}, + "meta":{"region":"RegionOne","tenantId":"d6bece1bbeff47c1b8734cd4e544dc02"}, + "creation_time":1523323489} +``` + +## Record Store Data Format ## + +Data Frame Schema: + +| Column Name | Column Data Type | Description | +| ----------- | ---------------- | ----------- | +| event_quantity | `pyspark.sql.types.DoubleType` | mapped to `metric.value`| +| event_timestamp_unix | `pyspark.sql.types.DoubleType` | calculated as `metric.timestamp`/`1000` from source metric| +| event_timestamp_string | `pyspark.sql.types.StringType` | mapped to `metric.timestamp` from the source metric| +| event_type | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `metric.name` from source metric| +| event_quantity_name | `pyspark.sql.types.StringType` | mapped to `metric.name` from source metric| +| event_status | `pyspark.sql.types.StringType` | placeholder for the future. Currently mapped to `metric.dimensions.state` from the source metric | +| event_version | `pyspark.sql.types.StringType` | placeholder for the future. Set to "1.0" | +| record_type | `pyspark.sql.types.StringType` | placeholder for the future. Set to "metrics" | +| resource_uuid | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.instanceId` or `metric.dimensions.resource_id` from source metric | +| tenant_id | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.tenant_id` or `metric.dimensions.tenantid` or `metric.dimensions.project_id` | +| user_id | `pyspark.sql.types.StringType` | mapped to `meta.userId` | +| region | `pyspark.sql.types.StringType` | placeholder of the future. mapped to `meta.region`, defaults to `event_processing_params.set_default_region_to` (`pre_transform_spec`) | +| zone | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `meta.zone`, defaults to `event_processing_params.set_default_zone_to` (`pre_transform_spec`) | +| host | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.hostname` or `metric.value_meta.host` | +| project_id | `pyspark.sql.types.StringType` | mapped to metric tenant_id | +| service_group | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `service_id` in `pre_transform_spec` | +| service_id | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `service_id` in `pre_transform_spec` | +| event_date | `pyspark.sql.types.StringType` | "YYYY-mm-dd". Extracted from `metric.timestamp` | +| event_hour | `pyspark.sql.types.StringType` | "HH". Extracted from `metric.timestamp` | +| event_minute | `pyspark.sql.types.StringType` | "MM". Extracted from `metric.timestamp` | +| event_second | `pyspark.sql.types.StringType` | "SS". Extracted from `metric.timestamp` | +| metric_group | `pyspark.sql.types.StringType` | identifier for transform spec group | +| metric_id | `pyspark.sql.types.StringType` | identifier for transform spec | +| namespace | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.namespace` | +| pod_name | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.pod_name` | +| app | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.app` | +| container_name | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.container_name`| +| interface | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.interface` | +| deployment | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.deployment` | +| daemon_set | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.daemon_set` | + +## Instance Usage Data Format ## + +Data Frame Schema: + +| Column Name | Column Data Type | Description | +| ----------- | ---------------- | ----------- | +| tenant_id | `pyspark.sql.types.StringType` | project_id, defaults to `NA` | +| user_id | `pyspark.sql.types.StringType` | user_id, defaults to `NA`| +| resource_uuid | `pyspark.sql.types.StringType` | resource_id, defaults to `NA`| +| geolocation | `pyspark.sql.types.StringType` | placeholder for future, defaults to `NA`| +| region | `pyspark.sql.types.StringType` | placeholder for future, defaults to `NA`| +| zone | `pyspark.sql.types.StringType` | placeholder for future, defaults to `NA`| +| host | `pyspark.sql.types.StringType` | compute hostname, defaults to `NA`| +| project_id | `pyspark.sql.types.StringType` | project_id, defaults to `NA`| +| aggregated_metric_name | `pyspark.sql.types.StringType` | aggregated metric name, defaults to `NA`| +| firstrecord_timestamp_string | `pyspark.sql.types.StringType` | timestamp of the first metric used to derive this aggregated metric| +| lastrecord_timestamp_string | `pyspark.sql.types.StringType` | timestamp of the last metric used to derive this aggregated metric| +| service_group | `pyspark.sql.types.StringType` | placeholder for the future, defaults to `NA`| +| service_id | `pyspark.sql.types.StringType` | placeholder for the future, defaults to `NA`| +| usage_date | `pyspark.sql.types.StringType` | "YYYY-mm-dd" date| +| usage_hour | `pyspark.sql.types.StringType` | "HH" hour| +| usage_minute | `pyspark.sql.types.StringType` | "MM" minute| +| aggregation_period | `pyspark.sql.types.StringType` | "hourly" or "minutely" | +| namespace | `pyspark.sql.types.StringType` | | +| pod_name | `pyspark.sql.types.StringType` | | +| app | `pyspark.sql.types.StringType` | | +| container_name | `pyspark.sql.types.StringType` | | +| interface | `pyspark.sql.types.StringType` | | +| deployment | `pyspark.sql.types.StringType` | | +| daemon_set | `pyspark.sql.types.StringType` | | +| firstrecord_timestamp_unix | `pyspark.sql.types.DoubleType` | epoch timestamp of the first metric used to derive this aggregated metric | +| lastrecord_timestamp_unix | `pyspark.sql.types.DoubleType` | epoch timestamp of the first metric used to derive this aggregated metric | +| quantity | `pyspark.sql.types.DoubleType` | aggregated metric quantity | +| record_count | `pyspark.sql.types.DoubleType` | number of source metrics that were used to derive this aggregated metric. For informational purposes only. | +| processing_meta | `pyspark.sql.types.MapType(pyspark.sql.types.StringType, pyspark.sql.types.StringType, True)` | Key-Value pairs to store additional information, to aid processing| + +## References + +[1] [Spark SQL, DataFrames and Datasets +Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) + +[2] [Spark +DataTypes](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.types.DataType) diff --git a/docs/generic-aggregation-components.md b/docs/generic-aggregation-components.md new file mode 100644 index 0000000..f9cb919 --- /dev/null +++ b/docs/generic-aggregation-components.md @@ -0,0 +1,632 @@ +Team and repository tags +======================== + +[![Team and repository tags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html) + + + + +- [Monasca Transform Generic Aggregation Components](#monasca-transform-generic-aggregation-components) + +- [Monasca Transform Generic Aggregation Components](#monasca-transform-generic-aggregation-components) +- [Introduction](#introduction) + - [1: Conversion of incoming metrics to record store data format](#1-conversion-of-incoming-metrics-to-record-store-data-format) + - [Pre Transform Spec](#pre-transform-spec) + - [2: Data aggregation using generic aggregation components](#2-data-aggregation-using-generic-aggregation-components) + - [Transform Specs](#transform-specs) + - [aggregation_params_map](#aggregation_params_map) + - [aggregation_pipeline](#aggregation_pipeline) + - [Other parameters](#other-parameters) + - [metric_group and metric_id](#metric_group-and-metric_id) + - [Generic Aggregation Components](#generic-aggregation-components) + - [Usage Components](#usage-components) + - [fetch_quantity](#fetch_quantity) + - [fetch_quantity_util](#fetch_quantity_util) + - [calculate_rate](#calculate_rate) + - [Setter Components](#setter-components) + - [set_aggregated_metric_name](#set_aggregated_metric_name) + - [set_aggregated_period](#set_aggregated_period) + - [rollup_quantity](#rollup_quantity) + - [Insert Components](#insert-components) + - [insert_data](#insert_data) + - [insert_data_pre_hourly](#insert_data_pre_hourly) + - [Processors](#processors) + - [pre_hourly_processor](#pre_hourly_processor) +- [Putting it all together](#putting-it-all-together) + + +# Monasca Transform Generic Aggregation Components + +# Introduction + +Monasca Transform uses standard ETL (Extract-Transform-Load) design pattern to aggregate monasca +metrics and uses innovative data/configuration driven mechanism to drive processing. It accomplishes +data aggregation in two distinct steps, each is driven using external configuration specifications, +namely *pre-transform_spec* and *transform_spec*. + +## 1: Conversion of incoming metrics to record store data format ## + +In the first step, the incoming metrics are converted into a canonical data format called as record +store data using *pre_transform_spec*. + +This logical processing data flow is explained in more detail in [Monasca/Transform wiki: Logical +processing data flow section: Conversion to record store +format](https://wiki.openstack.org/wiki/Monasca/Transform#Logical_processing_data_flow) and includes +following operations: + + * identifying metrics that are required (or in other words filtering out of unwanted metrics) + + * validation and extraction of essential data in metric + + * generating multiple records for incoming metrics if they are to be aggregated in multiple ways, + and finally + + * conversion of the incoming metrics to canonical record store data format. Please refer to record + store section in [Data Formats](data_formats.md) for more information on record store format. + +### Pre Transform Spec ### + +Example *pre_transform_spec* for metric + +``` +{ + "event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"}, + "event_type":"cpu.total_logical_cores", + "metric_id_list":["cpu_total_all","cpu_total_host","cpu_util_all","cpu_util_host"], + "required_raw_fields_list":["creation_time"], + "service_id":"host_metrics" +} +``` + +*List of fields* + +| field name | values | description | +| ---------- | ------ | ----------- | +| event_processing_params | Set default field values `set_default_zone_to`, `set_default_geolocation_to`, `set_default_region_to`| Set default values for certain fields in the record store data | +| event_type | Name of the metric | identifies metric that needs to be aggregated | +| metric_id_list | List of `metric_id`'s | List of identifiers, should match `metric_id` in transform specs. This is used by record generation step to generate multiple records if this metric is to be aggregated in multiple ways| +| required_raw_fields_list | List of `field` | List of fields (use JSON dotted notation) that are required in the incoming metric, used for validating incoming metric | +| service_id | service identifier | Identifies the service to which this metric belongs to. Note: this field not yet used | + +## 2: Data aggregation using generic aggregation components ## + +In the second step, the canonical record store data is aggregated using *transform_spec*. Each +*transform_spec* defines series of generic aggregation components, which are specified in +`aggregation_params_map.aggregation_pipeline` section. (See *transform_spec* example below). + +Any parameters used by the generic aggregation components are also specified in the +`aggregation_params_map` section (See *Other parameters* e.g. `aggregated_metric_name`, `aggregation_period`, +`aggregation_group_by_list` etc. in *transform_spec* example below) + +### Transform Specs ### + +Example *transform_spec* for metric +``` +{"aggregation_params_map":{ + "aggregation_pipeline":{ + "source":"streaming", + "usage":"fetch_quantity", + "setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"], + "insert":["prepare_data","insert_data_pre_hourly"] + }, + "aggregated_metric_name":"cpu.total_logical_cores_agg", + "aggregation_period":"hourly", + "aggregation_group_by_list": ["host", "metric_id", "tenant_id"], + "usage_fetch_operation": "avg", + "filter_by_list": [], + "setter_rollup_group_by_list": [], + "setter_rollup_operation": "sum", + "dimension_list":["aggregation_period","host","project_id"], + "pre_hourly_operation":"avg", + "pre_hourly_group_by_list":["default"] + }, + "metric_group":"cpu_total_all", + "metric_id":"cpu_total_all" +} +``` + +#### aggregation_params_map #### + +This section specifies *aggregation_pipeline*, *Other parameters* (used by generic aggregation +components in *aggregation_pipeline*). + +##### aggregation_pipeline ##### + +Specifies generic aggregation components that should be used to process incoming metrics. + +Note: generic aggregation components are re-usable and can be used to build different aggregation +pipelines as required. + +*List of fields* + +| field name | values | description | +| ---------- | ------ | ----------- | +| source | ```streaming``` | source is ```streaming```. In the future this can be used to specify a component which can fetch data directly from monasca datastore | +| usage | ```fetch_quantity```, ```fetch_quantity_util```, ```calculate_rate``` | [Usage Components](https://github.com/openstack/monasca-transform/tree/master/monasca_transform/component/usage)| +| setters | ```pre_hourly_calculate_rate```, ```rollup_quantity```, ```set_aggregated_metric_name```, ```set_aggregated_period``` | [Setter Components](https://github.com/openstack/monasca-transform/tree/master/monasca_transform/component/setter)| +| insert | ```insert_data```, ```insert_data_pre_hourly``` | [Insert Components](https://github.com/openstack/monasca-transform/tree/master/monasca_transform/component/insert)| + + +##### Other parameters ##### + +Specifies parameters that generic aggregation components use to process and aggregate data. + +*List of Other parameters* + +| Parameter Name | Values | Description | Used by | +| -------------- | ------ | ----------- | ------- | +| aggregated_metric_name| e.g. "cpu.total_logical_cores_agg" | Name of the aggregated metric | [set_aggregated_metric_name](#set_aggregated_metric_name) | +| aggregation_period |"hourly", "minutely" or "secondly" | Period over which to aggregate data. | [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate), [set_aggregated_period](#set_aggregated_period), [rollup_quantity](#rollup_quantity) |[fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) | +| aggregation_group_by_list | e.g. "project_id", "hostname" | Group `record_store` data with these columns | [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) | +| usage_fetch_operation | e.g "sum" | After the data is grouped by `aggregation_group_by_list`, perform this operation to find the aggregated metric value | [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) | +| filter_by_list | Filter regex | Filter data using regex on a `record_store` column value| [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) | +| setter_rollup_group_by_list | e.g. "project_id" | Group by these set of fields | [rollup_quantity](#rollup_quantity) | +| setter_rollup_operation | e.g. "avg" | After data is grouped by `setter_rollup_group_by_list`, peform this operation to find aggregated metric value | [rollup_quantity](#rollup_quantity) | +| dimension_list | e.g. "aggregation_period", "host", "project_id" | List of fields which specify dimensions in aggregated metric | [insert_data](#insert_data), [insert_data_pre_hourly](#insert_data_pre_hourly)| +| pre_hourly_group_by_list | e.g. "default" | List of `instance usage data` fields to do a group by operation to aggregate data | [pre_hourly_processor](#pre_hourly_processor) | +| pre_hourly_operation | e.g. "avg" | When aggregating data published to `metrics_pre_hourly` every hour, perform this operation to find hourly aggregated metric value | [pre_hourly_processor](#pre_hourly_processor) | + +### metric_group and metric_id ### + +Specifies a metric or list of metrics from the record store data, which will be processed by this +*transform_spec*. Note: This can be a single metric or a group of metrics that will be combined to +produce the final aggregated metric. + +*List of fields* + +| field name | values | description | +| ---------- | ------ | ----------- | +| metric_group | unique transform spec group identifier | group identifier for this transform spec e.g. "cpu_total_all" | +| metric_id | unique transform spec identifier | identifier for this transform spec e.g. "cpu_total_all" | + +**Note:** "metric_id" is a misnomer, it is not really a metric group/or metric identifier but rather +identifier for transformation spec. This will be changed to "transform_spec_id" in the future. + +## Generic Aggregation Components ## + +*List of Generic Aggregation Components* + +### Usage Components ### + +All usage components implement a method + +``` + def usage(transform_context, record_store_df): + .. + .. + return instance_usage_df +``` + +#### fetch_quantity #### + +This component groups record store records by `aggregation_group_by_list`, sorts within +group by timestamp field, finds usage based on `usage_fetch_operation`. Optionally this +component also takes `filter_by_list` to include for exclude certain records from usage +calculation. + +*Other parameters* + + * **aggregation_group_by_list** + + List of fields to group by. + + Possible values: any set of fields in record store data. + + Example: + + ``` + "aggregation_group_by_list": ["tenant_id"] + ``` + * **usage_fetch_operation** + + Operation to be performed on grouped data set. + + *Possible values:* "sum", "max", "min", "avg", "latest", "oldest" + + * **aggregation_period** + + Period to aggregate by. + + *Possible values:* 'daily', 'hourly', 'minutely', 'secondly'. + + Example: + + ``` + "aggregation_period": "hourly" + ``` + + * **filter_by_list** + + Filter (include or exclude) record store data as specified. + + Example: + + ``` + filter_by_list": "[{"field_to_filter": "hostname", + "filter_expression": "comp-(\d)+", + "filter_operation": "include"}] + ``` + + OR + + ``` + filter_by_list": "[{"field_to_filter": "hostname", + "filter_expression": "controller-(\d)+", + "filter_operation": "exclude"}] + ``` + +#### fetch_quantity_util #### + +This component finds the utilized quantity based on *total_quantity* and *idle_perc* using +following calculation + +``` +utilized_quantity = (100 - idle_perc) * total_quantity / 100 +``` + +where, + + * **total_quantity** data, identified by `usage_fetch_util_quantity_event_type` parameter and + + * **idle_perc** data, identified by `usage_fetch_util_idle_perc_event_type` parameter + +This component initially groups record store records by `aggregation_group_by_list` and +`event_type`, sorts within group by timestamp field, calculates `total_quantity` and +`idle_perc` values based on `usage_fetch_operation`. `utilized_quantity` is then calculated +using the formula given above. + +*Other parameters* + + * **aggregation_group_by_list** + + List of fields to group by. + + Possible values: any set of fields in record store data. + + Example: + + ``` + "aggregation_group_by_list": ["tenant_id"] + ``` + * **usage_fetch_operation** + + Operation to be performed on grouped data set + + *Possible values:* "sum", "max", "min", "avg", "latest", "oldest" + + * **aggregation_period** + + Period to aggregate by. + + *Possible values:* 'daily', 'hourly', 'minutely', 'secondly'. + + Example: + + ``` + "aggregation_period": "hourly" + ``` + + * **filter_by_list** + + Filter (include or exclude) record store data as specified + + Example: + + ``` + filter_by_list": "[{"field_to_filter": "hostname", + "filter_expression": "comp-(\d)+", + "filter_operation": "include"}] + ``` + + OR + + ``` + filter_by_list": "[{"field_to_filter": "hostname", + "filter_expression": "controller-(\d)+", + "filter_operation": "exclude"}] + ``` + + * **usage_fetch_util_quantity_event_type** + + event type (metric name) to identify data which will be used to calculate `total_quantity` + + *Possible values:* metric name + + Example: + + ``` + "usage_fetch_util_quantity_event_type": "cpu.total_logical_cores" + ``` + + + * **usage_fetch_util_idle_perc_event_type** + + event type (metric name) to identify data which will be used to calculate `total_quantity` + + *Possible values:* metric name + + Example: + + ``` + "usage_fetch_util_idle_perc_event_type": "cpu.idle_perc" + ``` + +#### calculate_rate #### + +This component finds the rate of change of quantity (in percent) over a time period using +following calculation + +``` +rate_of_change (in percent) = ((oldest_quantity - latest_quantity)/oldest_quantity) * 100 +``` + +where, + + * **oldest_quantity**: oldest (or earliest) `average` quantity if there are multiple quantites in a + group for a given time period. + + * **latest_quantity**: latest `average` quantity if there are multiple quantities in a group + for a given time period + +*Other parameters* + + * **aggregation_group_by_list** + + List of fields to group by. + + Possible values: any set of fields in record store data. + + Example: + + ``` + "aggregation_group_by_list": ["tenant_id"] + ``` + * **usage_fetch_operation** + + Operation to be performed on grouped data set + + *Possible values:* "sum", "max", "min", "avg", "latest", "oldest" + + * **aggregation_period** + + Period to aggregate by. + + *Possible values:* 'daily', 'hourly', 'minutely', 'secondly'. + + Example: + + ``` + "aggregation_period": "hourly" + ``` + + * **filter_by_list** + + Filter (include or exclude) record store data as specified + + Example: + + ``` + filter_by_list": "[{"field_to_filter": "hostname", + "filter_expression": "comp-(\d)+", + "filter_operation": "include"}] + ``` + + OR + + ``` + filter_by_list": "[{"field_to_filter": "hostname", + "filter_expression": "controller-(\d)+", + "filter_operation": "exclude"}] + ``` + + +### Setter Components ### + +All usage components implement a method + +``` + def setter(transform_context, instance_usage_df): + .. + .. + return instance_usage_df +``` + +#### set_aggregated_metric_name #### + +This component sets final aggregated metric name by setting `aggregated_metric_name` field in +`instance_usage` data. + +*Other parameters* + + * **aggregated_metric_name** + + Name of the metric name being generated. + + *Possible values:* any aggregated metric name. Convention is to end the metric name + with "_agg". + + Example: + ``` + "aggregated_metric_name":"cpu.total_logical_cores_agg" + ``` + +#### set_aggregated_period #### + +This component sets final aggregated metric name by setting `aggregation_period` field in +`instance_usage` data. + +*Other parameters* + + * **aggregated_period** + + Name of the metric name being generated. + + *Possible values:* 'daily', 'hourly', 'minutely', 'secondly'. + + Example: + ``` + "aggregation_period": "hourly" + ``` + +**Note** If you are publishing metrics to *metrics_pre_hourly* kafka topic using +`insert_data_pre_hourly` component(See *insert_data_pre_hourly* component below), +`aggregation_period` will have to be set to `hourly`since by default all data in +*metrics_pre_hourly* topic, by default gets aggregated every hour by `Pre Hourly Processor` (See +`Processors` section below) + +#### rollup_quantity #### + +This component groups `instance_usage` records by `setter_rollup_group_by_list`, sorts within +group by timestamp field, finds usage based on `setter_fetch_operation`. + +*Other parameters* + + * **setter_rollup_group_by_list** + + List of fields to group by. + + Possible values: any set of fields in record store data. + + Example: + ``` + "setter_rollup_group_by_list": ["tenant_id"] + ``` + * **setter_fetch_operation** + + Operation to be performed on grouped data set + + *Possible values:* "sum", "max", "min", "avg" + + Example: + ``` + "setter_fetch_operation": "avg" + ``` + + * **aggregation_period** + + Period to aggregate by. + + *Possible values:* 'daily', 'hourly', 'minutely', 'secondly'. + + Example: + + ``` + "aggregation_period": "hourly" + ``` + +### Insert Components ### + +All usage components implement a method + +``` + def insert(transform_context, instance_usage_df): + .. + .. + return instance_usage_df +``` + +#### insert_data #### + +This component converts `instance_usage` data into monasca metric format and writes the metric to +`metrics` topic in kafka. + +*Other parameters* + + * **dimension_list** + + List of fields in `instance_usage` data that should be converted to monasca metric dimensions. + + *Possible values:* any fields in `instance_usage` data + + Example: + ``` + "dimension_list":["aggregation_period","host","project_id"] + ``` + +#### insert_data_pre_hourly #### + +This component converts `instance_usage` data into monasca metric format and writes the metric to +`metrics_pre_hourly` topic in kafka. + +*Other parameters* + + * **dimension_list** + + List of fields in `instance_usage` data that should be converted to monasca metric dimensions. + + *Possible values:* any fields in `instance_usage` data + + Example: + ``` + "dimension_list":["aggregation_period","host","project_id"] + ``` + +## Processors ## + +Processors are special components that process data from a kafka topic, at the desired time +interval. These are different from generic aggregation components since they process data from +specific kafka topic. + +All processor components implement following methods + +``` +def get_app_name(self): + [...] + return app_name + +def is_time_to_run(self, current_time): + if current_time > last_invoked + 1: + return True + else: + return False + +def run_processor(self, time): + # do work... +``` + +### pre_hourly_processor ### + +Pre Hourly Processor, runs every hour and aggregates `instance_usage` data published to +`metrics_pre_hourly` topic. + +Pre Hourly Processor by default is set to run 10 minutes after the top of the hour and processes +data from previous hour. `instance_usage` data is grouped by `pre_hourly_group_by_list` + +*Other parameters* + + * **pre_hourly_group_by_list** + + List of fields to group by. + + Possible values: any set of fields in `instance_usage` data or to `default` + + Note: setting to `default` will group `instance_usage` data by `tenant_id`, `user_id`, + `resource_uuid`, `geolocation`, `region`, `zone`, `host`, `project_id`, + `aggregated_metric_name`, `aggregation_period` + + Example: + ``` + "pre_hourly_group_by_list": ["tenant_id"] + ``` + + OR + + ``` + "pre_hourly_group_by_list": ["default"] + ``` + + * **pre_hourly_operation** + + Operation to be performed on grouped data set. + + *Possible values:* "sum", "max", "min", "avg", "rate" + + Example: + + ``` + "pre_hourly_operation": "avg" + ``` + +# Putting it all together +Please refer to [Create a new aggregation pipeline](create-new-aggregation-pipeline.md) document to +create a new aggregation pipeline.