summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-04-24 11:30:06 +0000
committerGerrit Code Review <review@openstack.org>2018-04-24 11:30:06 +0000
commit0d318c0d44c7bbedf014ecb06e8d671d700f4af5 (patch)
tree058eb37378fb42834f53b32160c15b1dd5c2aa45
parent395a3237f59bb79de040ccb658edf223d3411fb2 (diff)
parent3feaf7400eabbf3a7da431c94fbd86d1008647e5 (diff)
Merge "Refresh monasca transform docs"
-rw-r--r--README.md81
-rw-r--r--docs/create-new-aggregation-pipeline.md336
-rw-r--r--docs/data_formats.md129
-rw-r--r--docs/generic-aggregation-components.md632
4 files changed, 1175 insertions, 3 deletions
diff --git a/README.md b/README.md
index 66154a3..5de1d51 100644
--- a/README.md
+++ b/README.md
@@ -3,10 +3,85 @@ Team and repository tags
3 3
4[![Team and repository tags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html) 4[![Team and repository tags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html)
5 5
6<!-- Change things from this point on --> 6<!-- START doctoc generated TOC please keep comment here to allow auto update -->
7<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
8
9
10- [Monasca Transform](#monasca-transform)
11 - [Use Cases handled by Monasca Transform](#use-cases-handled-by-monasca-transform)
12 - [Operation](#operation)
13 - [Architecture](#architecture)
14 - [To set up the development environment](#to-set-up-the-development-environment)
15 - [Generic aggregation components](#generic-aggregation-components)
16 - [Create a new aggregation pipeline example](#create-a-new-aggregation-pipeline-example)
17 - [Original proposal and blueprint](#original-proposal-and-blueprint)
18
19<!-- END doctoc generated TOC please keep comment here to allow auto update -->
7 20
8# Monasca Transform 21# Monasca Transform
9 22
10##To set up the development environment 23monasca-transform is a data driven aggregation engine which collects, groups and aggregates existing
24individual Monasca metrics according to business requirements and publishes new transformed
25(derived) metrics to the Monasca Kafka queue.
26
27 * Since the new transformed metrics are published as any other metric in Monasca, alarms can be
28 set and triggered on the transformed metric.
29
30 * Monasca Transform uses [Apache Spark](http://spark.apache.org) to aggregate data. [Apache
31 Spark](http://spark.apache.org) is a highly scalable, fast, in-memory, fault tolerant and
32 parallel data processing framework. All monasca-transform components are implemented in Python
33 and use Spark's [PySpark Python API](http://spark.apache.org/docs/latest/api/python/index.html)
34 to interact with Spark.
35
36 * Monasca Transform does transformation and aggregation of incoming metrics in two phases.
37
38 * In the first phase spark streaming application is set to retrieve in data from kafka at a
39 configurable *stream interval* (default *stream_inteval* is 10 minutes) and write the data
40 aggregated for *stream interval* to *pre_hourly_metrics* topic in kafka.
41
42 * In the second phase, which is kicked off every hour, all metrics in *metrics_pre_hourly* topic
43 in Kafka are aggregated again, this time over a larger interval of an hour. These hourly
44 aggregated metrics published to *metrics* topic in kafka.
45
46## Use Cases handled by Monasca Transform ##
47Please refer to **Problem Description** section on the [Monasca/Transform
48wiki](https://wiki.openstack.org/wiki/Monasca/Transform)
49
50## Operation ##
51Please refer to **How Monasca Transform Operates** section on the [Monasca/Transform
52wiki](https://wiki.openstack.org/wiki/Monasca/Transform)
53
54## Architecture ##
55Please refer to **Architecture** and **Logical processing data flow** sections on the
56[Monasca/Transform wiki](https://wiki.openstack.org/wiki/Monasca/Transform)
57
58## To set up the development environment ##
59The monasca-transform uses [DevStack](https://docs.openstack.org/devstack/latest/) as a common dev
60environment. See the [README.md](devstack/README.md) in the devstack directory for details on how
61to include monasca-transform in a DevStack deployment.
62
63## Generic aggregation components ##
64
65Monasca Transform uses a set of generic aggregation components which can be assembled in to an
66aggregation pipeline.
67
68Please refer to [generic aggregation components](docs/generic-aggregation-components.md) document for
69information on list of generic aggregation components available.
70
71## Create a new aggregation pipeline example ##
72
73Generic aggregation components make it easy to build new aggregation pipelines for different Monasca
74metrics.
75
76This create a [new aggregation pipeline](docs/create-new-aggregation-pipeline.md) example shows how to
77create *pre_transform_specs* and *transform_specs* to create an aggregation pipeline for a new set
78of Monasca metrics, while leveraging existing set of generic aggregation components.
79
80
81## Original proposal and blueprint ##
82
83Original proposal:
84[Monasca/Transform-proposal](https://wiki.openstack.org/wiki/Monasca/Transform-proposal)
11 85
12The monasca-transform dev environment uses devstack so see the README in the devstack directory. 86Blueprint: [monasca-transform
87blueprint](https://blueprints.launchpad.net/monasca/+spec/monasca-transform)
diff --git a/docs/create-new-aggregation-pipeline.md b/docs/create-new-aggregation-pipeline.md
new file mode 100644
index 0000000..4ed2b7b
--- /dev/null
+++ b/docs/create-new-aggregation-pipeline.md
@@ -0,0 +1,336 @@
1Team and repository tags
2========================
3
4[![Team and repository tags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html)
5
6<!-- Change things from this point on -->
7<!-- START doctoc generated TOC please keep comment here to allow auto update -->
8<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
9
10
11- [Create a new aggregation pipeline](#create-a-new-aggregation-pipeline)
12 - [Using existing generic aggregation components](#using-existing-generic-aggregation-components)
13
14<!-- END doctoc generated TOC please keep comment here to allow auto update -->
15
16<!-- Change things from this point on -->
17
18# Create a new aggregation pipeline
19
20Monasca Transform allows you to create new aggregation by creating *pre_transform_spec* and
21*transform_spec* for any set of Monasca metrics. This page gives you steps on how to create a new
22aggregation pipeline and test the pipeline in your DevStack environment.
23
24Pre-requisite for following steps on this page is that you have already created a devstack
25development environment for Monasca Transform, following instructions in
26[devstack/README.md](devstack/README.md)
27
28
29## Using existing generic aggregation components ##
30
31Most of the use cases will fall into this category where you should be able to create new
32aggregation for new set of metrics using existing set of generic aggregation components.
33
34Let's consider a use case where we want to find out
35
36* Maximum time monasca-agent takes to submit metrics over a period of an hour across all hosts
37
38* Maximum time monasca-agent takes to submit metrics over period of a hour per host.
39
40We know that monasca-agent on each host generates a small number of
41[monasca-agent metrics](https://github.com/openstack/monasca-agent/blob/master/docs/Plugins.md).
42
43The metric we are interested in is
44
45* **"monasca.collection_time_sec"**: Amount of time that the collector took for this collection run
46
47**Steps:**
48
49 * **Step 1**: Identify the monasca metric to be aggregated from the Kafka topic
50 ```
51 /opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic metrics | grep "monasca.collection_time_sec"
52
53 {"metric":{"timestamp":1523323485360.6650390625,"name":"monasca.collection_time_sec",
54 "dimensions":{"hostname":"devstack","component":"monasca-agent",
55 "service":"monitoring"},"value":0.0340659618, "value_meta":null},
56 "meta":{"region":"RegionOne","tenantId":"d6bece1bbeff47c1b8734cd4e544dc02"},
57 "creation_time":1523323489}
58 ```
59 Note: "hostname" is available as a dimension, which we will use to find maximum collection time for each host.
60
61 * **Step 2**: Create a **pre_transform_spec**
62
63 "pre_transform_spec" drives the pre-processing of monasca metric to record store format. Look
64 for existing example in
65 "/monasca-transform-source/monasca_transform/data_driven_specs/pre_transform_specs/pre_transform_specs.json"
66
67 **pre_transform_spec**
68 ```
69 {
70 "event_processing_params":{
71 "set_default_zone_to":"1",
72 "set_default_geolocation_to":"1",
73 "set_default_region_to":"W"
74 },
75 "event_type":"monasca.collection_time_sec", <-- EDITED
76 "metric_id_list":["monasca_collection_host"], <-- EDITED
77 "required_raw_fields_list":["creation_time", "metric.dimensions.hostname"], <--EDITED
78 "service_id":"host_metrics"
79 }
80 ```
81 Lets look at all the fields that were edited (Marked as `<-- EDITED` above):
82
83 **event_type**: set to "monasca.collection_time_sec". These are the metrics we want to
84 transform/aggregate.
85
86 **metric_id_list**: set to ['monasca_collection_host']. This is a transformation spec
87 identifier. During pre-processing record generator generates additional "record_store" data for
88 each item in this list. (To be renamed to transform_spec_list)
89
90 **required_raw_fields_list**: set to ["creation_time", "metric.dimensions.hostname"]
91 This should list fields in the incoming metrics that are required. Pre-processing will
92 eliminate or remove metrics which have missing required fields, during validation.
93
94 **service_id**: set to "host_metrics"
95 This identifies the source service these metrics belong to. (To be removed)
96
97 **Note:** "metric_id" is a misnomer, it is not really a metric identifier but rather identifier
98 for transformation spec. This will be changed to transform_spec_id in the future. Also
99 "service_id" should be set by the source that is generating the metric. This will be removed in
100 the future. (Please see Story [2001815](https://storyboard.openstack.org/#!/story/2001815))
101
102 * **Step 3**: Create a "transform_spec" to find maximum metric value for each host
103
104 "transform_spec" drives the aggregation of record store data created during pre-processing
105 to final aggregated metric. Look for existing example in
106 "/monasca-transform-source/monasca_transform/data_driven_specs/transform_specs/transform_specs.json"
107
108 **transform_spec**
109 ```
110 {
111 "aggregation_params_map":{
112
113 "aggregation_pipeline":{
114 "source":"streaming",
115 "usage":"fetch_quantity", <-- EDITED
116 "setters":["set_aggregated_metric_name","set_aggregated_period"], <-- EDITED
117 "insert":["insert_data_pre_hourly"] <-- EDITED
118 },
119
120 "aggregated_metric_name":"monasca.collection_time_sec_host_agg", <-- EDITED
121 "aggregation_period":"hourly", <-- EDITED
122 "aggregation_group_by_list": ["host"],
123 "usage_fetch_operation": "max", <-- EDITED
124 "filter_by_list": [],
125 "dimension_list":["aggregation_period","host"], <-- EDITED
126
127 "pre_hourly_operation":"max",
128 "pre_hourly_group_by_list":["default"]},
129
130 "metric_group":"monasca_collection_host", <-- EDITED
131 "metric_id":"monasca_collection_host" <-- EDITED
132 }
133 ```
134 Lets look at all the fields that were edited (Marked as `<-- EDITED` above):
135
136 aggregation pipeline fields
137
138 * **usage**: set to "fetch_quantity" Use "fetch_quantity" generic aggregation component. This
139 component takes a "aggregation_group_by_list", "usage_fetch_operation" and "filter_by_list" as
140 parameters.
141 * **aggregation_group_by_list** set to ["host"]. Since we want to find monasca agent
142 collection time for each host.
143 * **usage_fetch_operation** set to "max". Since we want to find maximum value for
144 monasca agent collection time.
145 * **filter_by_list** set to []. Since we dont want filter out/ignore any metrics (based on
146 say particular host or set of hosts)
147
148 * **setters**: set to ["set_aggregated_metric_name","set_aggregated_period"] These components set
149 aggregated metric name and aggregation period in final aggregated metric.
150 * **set_aggregated_metric_name** sets final aggregated metric name. This setter component takes
151 "aggregated_metric_name" as a parameter.
152 * **aggregated_metric_name**: set to "monasca.collection_time_sec_host_agg"
153 * **set_aggregated_period** sets final aggregated metric period. This setter component takes
154 "aggregation_period" as a parameter.
155 * **aggregation_period**: set to "hourly"
156
157 * **insert**: set to ["insert_data_pre_hourly"]. These components are responsible for
158 transforming instance usage data records to final metric format and writing the data to kafka
159 topic.
160 * **insert_data_pre_hourly** writes the to "metrics_pre_hourly" kafka topic, which gets
161 processed by the pre hourly processor every hour.
162
163 pre hourly processor fields
164
165 * **pre_hourly_operation** set to "max"
166 Find the hourly maximum value from records that were written to "metrics_pre_hourly" topic
167
168 * **pre_hourly_group_by_list** set to ["default"]
169
170 transformation spec identifier fields
171
172 * **metric_group** set to "monasca_collection_host". Group identifier for this transformation
173 spec
174
175 * **metric_id** set to "monasca_collection_host". Identifier for this transformation spec.
176
177 **Note:** metric_group" and "metric_id" are misnomers, it is not really a metric identifier but
178 rather identifier for transformation spec. This will be changed to "transform_group" and
179 "transform_spec_id" in the future. (Please see Story
180 [2001815](https://storyboard.openstack.org/#!/story/2001815))
181
182 * **Step 4**: Create a "transform_spec" to find maximum metric value across all hosts
183
184 Now let's create another transformation spec to find maximum metric value across all hosts.
185
186 **transform_spec**
187 ```
188 {
189 "aggregation_params_map":{
190
191 "aggregation_pipeline":{
192 "source":"streaming",
193 "usage":"fetch_quantity", <-- EDITED
194 "setters":["set_aggregated_metric_name","set_aggregated_period"], <-- EDITED
195 "insert":["insert_data_pre_hourly"] <-- EDITED
196 },
197
198 "aggregated_metric_name":"monasca.collection_time_sec_all_agg", <-- EDITED
199 "aggregation_period":"hourly", <-- EDITED
200 "aggregation_group_by_list": [],
201 "usage_fetch_operation": "max", <-- EDITED
202 "filter_by_list": [],
203 "dimension_list":["aggregation_period"], <-- EDITED
204
205 "pre_hourly_operation":"max",
206 "pre_hourly_group_by_list":["default"]},
207
208 "metric_group":"monasca_collection_all", <-- EDITED
209 "metric_id":"monasca_collection_all" <-- EDITED
210 }
211 ```
212
213 The transformation spec above is almost identical to transformation spec created in **Step 3**
214 with a few additional changes.
215
216 **aggregation_group_by_list** is set to [] i.e. empty list, since we want to find maximum value
217 across all hosts (consider all the incoming metric data).
218
219 **aggregated_metric_name** is set to "monasca.collection_time_sec_all_agg".
220
221 **metric_group** is set to "monasca_collection_all", since we need a new transfomation spec
222 group identifier.
223
224 **metric_id** is set to "monasca_collection_all", since we need a new transformation spec
225 identifier.
226
227 * **Step 5**: Update "pre_transform_spec" with new transformation spec identifier
228
229 In **Step 4** we created a new transformation spec, with new "metric_id", namely
230 "monasca_collection_all". We will have to now update the "pre_transform_spec" that we
231 created in **Step 2** with new "metric_id" by adding it to the "metric_id_list"
232
233 **pre_transform_spec**
234 ```
235 {
236 "event_processing_params":{
237 "set_default_zone_to":"1",
238 "set_default_geolocation_to":"1",
239 "set_default_region_to":"W"
240 },
241 "event_type":"monasca.collection_time_sec",
242 "metric_id_list":["monasca_collection_host", "monasca_collection_all"], <-- EDITED
243 "required_raw_fields_list":["creation_time", "metric.dimensions.hostname"],
244 "service_id":"host_metrics"
245 }
246 ```
247 Thus we were able to add additional transformation or aggregation pipeline to the same incoming
248 monasca metric very easily.
249
250 * **Step 6**: Update "pre_transform_spec" and "transform_spec"
251
252 * Edit
253 "/monasca-transform-source/monasca_transform/data_driven_specs/pre_transform_specs/pre_transform_specs.json"
254 and add following line.
255
256 ```
257 {"event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},"event_type":"monasca.collection_time_sec","metric_id_list":["monasca_collection_host","monasca_collection_all"],"required_raw_fields_list":["creation_time"],"service_id":"host_metrics"}
258 ```
259
260 **Note:** Each line does not end with a comma (the file is not one big json document).
261
262 * Edit
263 "/monasca-transform-source/monasca_transform/data_driven_specs/transform_specs/transform_specs.json"
264 and add following lines.
265
266 ```
267 {"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["insert_data_pre_hourly"]},"aggregated_metric_name":"monasca.collection_time_sec_host_agg","aggregation_period":"hourly","aggregation_group_by_list":["host"],"usage_fetch_operation":"max","filter_by_list":[],"dimension_list":["aggregation_period","host"],"pre_hourly_operation":"max","pre_hourly_group_by_list":["default"]},"metric_group":"monasca_collection_host","metric_id":"monasca_collection_host"}
268 {"aggregation_params_map":{"aggregation_pipeline":{"source":"streaming","usage":"fetch_quantity","setters":["set_aggregated_metric_name","set_aggregated_period"],"insert":["insert_data_pre_hourly"]},"aggregated_metric_name":"monasca.collection_time_sec_all_agg","aggregation_period":"hourly","aggregation_group_by_list":[],"usage_fetch_operation":"max","filter_by_list":[],"dimension_list":["aggregation_period"],"pre_hourly_operation":"max","pre_hourly_group_by_list":["default"]},"metric_group":"monasca_collection_all","metric_id":"monasca_collection_all"}
269 ```
270
271 * Run "refresh_monasca_transform.sh" script as documented in devstack
272 [README](devstack/README.md) to refresh the specs in the database.
273 ```
274 vagrant@devstack:~$ cd /opt/stack/monasca-transform
275 vagrant@devstack:/opt/stack/monasca-transform$ tools/vagrant/refresh_monasca_transform.sh
276 ```
277
278 If successful, you should see this message.
279 ```
280 ***********************************************
281 * *
282 * SUCCESS!! refresh monasca transform done. *
283 * *
284 ***********************************************
285 ```
286 * **Step 7**: Verifying results
287
288 To verify if new aggregated metrics are being produced you can look at the "metrics_pre_hourly"
289 topic in kafka. By default, monasca-transform fires of a batch every 10 minutes so you should
290 see metrics in intermediate "instance_usage" data format being published to that topic every 10
291 minutes.
292 ```
293 /opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic metrics_pre_hourly
294
295 {"usage_hour":"06","geolocation":"NA","record_count":40.0,"app":"NA","deployment":"NA","resource_uuid":"NA",
296 "pod_name":"NA","usage_minute":"NA","service_group":"NA","lastrecord_timestamp_string":"2018-04-1106:29:49",
297 "user_id":"NA","zone":"NA","namespace":"NA","usage_date":"2018-04-11","daemon_set":"NA","processing_meta":{
298 "event_type":"NA","metric_id":"monasca_collection_all"},
299 "firstrecord_timestamp_unix":1523427604.208577,"project_id":"NA","lastrecord_timestamp_unix":1523428189.718174,
300 "aggregation_period":"hourly","host":"NA","container_name":"NA","interface":"NA",
301 "aggregated_metric_name":"monasca.collection_time_sec_all_agg","tenant_id":"NA","region":"NA",
302 "firstrecord_timestamp_string":"2018-04-11 06:20:04","service_id":"NA","quantity":0.0687000751}
303
304 {"usage_hour":"06","geolocation":"NA","record_count":40.0,"app":"NA","deployment":"NA","resource_uuid":"NA",
305 "pod_name":"NA","usage_minute":"NA","service_group":"NA","lastrecord_timestamp_string":"2018-04-11 06:29:49",
306 "user_id":"NA","zone":"NA","namespace":"NA","usage_date":"2018-04-11","daemon_set":"NA","processing_meta":{
307 "event_type":"NA","metric_id":"monasca_collection_host"},"firstrecord_timestamp_unix":1523427604.208577,
308 "project_id":"NA","lastrecord_timestamp_unix":1523428189.718174,"aggregation_period":"hourly",
309 "host":"devstack","container_name":"NA","interface":"NA",
310 "aggregated_metric_name":"monasca.collection_time_sec_host_agg","tenant_id":"NA","region":"NA",
311 "firstrecord_timestamp_string":"2018-04-11 06:20:04","service_id":"NA","quantity":0.0687000751}
312 ```
313
314 Similarly, to verify if final aggregated metrics are being published by pre hourly processor,
315 you can look at "metrics" topic in kafka. By default pre hourly processor (which processes
316 metrics from "metrics_pre_hourly" topic) runs 10 minutes past the top of the hour.
317 ```
318 /opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic metrics | grep "_agg"
319
320 {"metric":{"timestamp":1523459468616,"value_meta":{"firstrecord_timestamp_string":"2018-04-11 14:00:13",
321 "lastrecord_timestamp_string":"2018-04-11 14:59:46","record_count":239.0},"name":"monasca.collection_time_sec_host_agg",
322 "value":0.1182248592,"dimensions":{"aggregation_period":"hourly","host":"devstack"}},
323 "meta":{"region":"useast","tenantId":"df89c3db21954b08b0516b4b60b8baff"},"creation_time":1523459468}
324
325 {"metric":{"timestamp":1523455872740,"value_meta":{"firstrecord_timestamp_string":"2018-04-11 13:00:10",
326 "lastrecord_timestamp_string":"2018-04-11 13:59:58","record_count":240.0},"name":"monasca.collection_time_sec_all_agg",
327 "value":0.0898442268,"dimensions":{"aggregation_period":"hourly"}},
328 "meta":{"region":"useast","tenantId":"df89c3db21954b08b0516b4b60b8baff"},"creation_time":1523455872}
329 ```
330
331 As you can see monaca-transform created two new aggregated metrics with name
332 "monasca.collection_time_sec_host_agg" and "monasca.collection_time_sec_all_agg". "value_meta"
333 section has three fields "firstrecord_timestamp" and "lastrecord_timestamp" and
334 "record_count". These fields are for informational purposes only. It shows timestamp of the first metric,
335 timestamp of the last metric and number of metrics that went into the calculation of the aggregated
336 metric.
diff --git a/docs/data_formats.md b/docs/data_formats.md
new file mode 100644
index 0000000..d611030
--- /dev/null
+++ b/docs/data_formats.md
@@ -0,0 +1,129 @@
1Team and repository tags
2========================
3
4[![Team and repositorytags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html)
5
6<!-- START doctoc generated TOC please keep comment here to allow auto update -->
7<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
8
9
10- [Monasca Transform Data Formats](#monasca-transform-data-formats)
11 - [Record Store Data Format](#record-store-data-format)
12 - [Instance Usage Data Format](#instance-usage-data-format)
13 - [References](#references)
14
15<!-- END doctoc generated TOC please keep comment here to allow auto update -->
16
17# Monasca Transform Data Formats
18
19There are two data formats used by monasca transform. The following sections describes the schema
20(Spark's DataFrame[1] schema) for the two formats.
21
22Note: These are internal formats used by Monasca Transform when aggregating data. If you are a user
23who wants to create new aggregation pipeline using an existing framework, you don't need to know or
24care about these two formats.
25
26As a developer, if you want to write new aggregation components then you might have to know how to
27enhance the record store data format or instance usage data format with additional fields that you
28may need or to write new aggregation components that aggregate data from the additional fields.
29
30**Source Metric**
31
32This is an example monasca metric. Monasca metric is transformed into `record_store` data format and
33later transformed/aggregated using re-usable generic aggregation components, to derive
34'instance_usage` data format.
35
36Example of a monasca metric:
37
38```
39{"metric":{"timestamp":1523323485360.6650390625,
40 "name":"monasca.collection_time_sec",
41 "dimensions":{"hostname":"devstack",
42 "component":"monasca-agent",
43 "service":"monitoring"},
44 "value":0.0340659618,
45 "value_meta":null},
46 "meta":{"region":"RegionOne","tenantId":"d6bece1bbeff47c1b8734cd4e544dc02"},
47 "creation_time":1523323489}
48```
49
50## Record Store Data Format ##
51
52Data Frame Schema:
53
54| Column Name | Column Data Type | Description |
55| ----------- | ---------------- | ----------- |
56| event_quantity | `pyspark.sql.types.DoubleType` | mapped to `metric.value`|
57| event_timestamp_unix | `pyspark.sql.types.DoubleType` | calculated as `metric.timestamp`/`1000` from source metric|
58| event_timestamp_string | `pyspark.sql.types.StringType` | mapped to `metric.timestamp` from the source metric|
59| event_type | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `metric.name` from source metric|
60| event_quantity_name | `pyspark.sql.types.StringType` | mapped to `metric.name` from source metric|
61| event_status | `pyspark.sql.types.StringType` | placeholder for the future. Currently mapped to `metric.dimensions.state` from the source metric |
62| event_version | `pyspark.sql.types.StringType` | placeholder for the future. Set to "1.0" |
63| record_type | `pyspark.sql.types.StringType` | placeholder for the future. Set to "metrics" |
64| resource_uuid | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.instanceId` or `metric.dimensions.resource_id` from source metric |
65| tenant_id | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.tenant_id` or `metric.dimensions.tenantid` or `metric.dimensions.project_id` |
66| user_id | `pyspark.sql.types.StringType` | mapped to `meta.userId` |
67| region | `pyspark.sql.types.StringType` | placeholder of the future. mapped to `meta.region`, defaults to `event_processing_params.set_default_region_to` (`pre_transform_spec`) |
68| zone | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `meta.zone`, defaults to `event_processing_params.set_default_zone_to` (`pre_transform_spec`) |
69| host | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.hostname` or `metric.value_meta.host` |
70| project_id | `pyspark.sql.types.StringType` | mapped to metric tenant_id |
71| service_group | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `service_id` in `pre_transform_spec` |
72| service_id | `pyspark.sql.types.StringType` | placeholder for the future. mapped to `service_id` in `pre_transform_spec` |
73| event_date | `pyspark.sql.types.StringType` | "YYYY-mm-dd". Extracted from `metric.timestamp` |
74| event_hour | `pyspark.sql.types.StringType` | "HH". Extracted from `metric.timestamp` |
75| event_minute | `pyspark.sql.types.StringType` | "MM". Extracted from `metric.timestamp` |
76| event_second | `pyspark.sql.types.StringType` | "SS". Extracted from `metric.timestamp` |
77| metric_group | `pyspark.sql.types.StringType` | identifier for transform spec group |
78| metric_id | `pyspark.sql.types.StringType` | identifier for transform spec |
79| namespace | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.namespace` |
80| pod_name | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.pod_name` |
81| app | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.app` |
82| container_name | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.container_name`|
83| interface | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.interface` |
84| deployment | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.deployment` |
85| daemon_set | `pyspark.sql.types.StringType` | mapped to `metric.dimensions.daemon_set` |
86
87## Instance Usage Data Format ##
88
89Data Frame Schema:
90
91| Column Name | Column Data Type | Description |
92| ----------- | ---------------- | ----------- |
93| tenant_id | `pyspark.sql.types.StringType` | project_id, defaults to `NA` |
94| user_id | `pyspark.sql.types.StringType` | user_id, defaults to `NA`|
95| resource_uuid | `pyspark.sql.types.StringType` | resource_id, defaults to `NA`|
96| geolocation | `pyspark.sql.types.StringType` | placeholder for future, defaults to `NA`|
97| region | `pyspark.sql.types.StringType` | placeholder for future, defaults to `NA`|
98| zone | `pyspark.sql.types.StringType` | placeholder for future, defaults to `NA`|
99| host | `pyspark.sql.types.StringType` | compute hostname, defaults to `NA`|
100| project_id | `pyspark.sql.types.StringType` | project_id, defaults to `NA`|
101| aggregated_metric_name | `pyspark.sql.types.StringType` | aggregated metric name, defaults to `NA`|
102| firstrecord_timestamp_string | `pyspark.sql.types.StringType` | timestamp of the first metric used to derive this aggregated metric|
103| lastrecord_timestamp_string | `pyspark.sql.types.StringType` | timestamp of the last metric used to derive this aggregated metric|
104| service_group | `pyspark.sql.types.StringType` | placeholder for the future, defaults to `NA`|
105| service_id | `pyspark.sql.types.StringType` | placeholder for the future, defaults to `NA`|
106| usage_date | `pyspark.sql.types.StringType` | "YYYY-mm-dd" date|
107| usage_hour | `pyspark.sql.types.StringType` | "HH" hour|
108| usage_minute | `pyspark.sql.types.StringType` | "MM" minute|
109| aggregation_period | `pyspark.sql.types.StringType` | "hourly" or "minutely" |
110| namespace | `pyspark.sql.types.StringType` | |
111| pod_name | `pyspark.sql.types.StringType` | |
112| app | `pyspark.sql.types.StringType` | |
113| container_name | `pyspark.sql.types.StringType` | |
114| interface | `pyspark.sql.types.StringType` | |
115| deployment | `pyspark.sql.types.StringType` | |
116| daemon_set | `pyspark.sql.types.StringType` | |
117| firstrecord_timestamp_unix | `pyspark.sql.types.DoubleType` | epoch timestamp of the first metric used to derive this aggregated metric |
118| lastrecord_timestamp_unix | `pyspark.sql.types.DoubleType` | epoch timestamp of the first metric used to derive this aggregated metric |
119| quantity | `pyspark.sql.types.DoubleType` | aggregated metric quantity |
120| record_count | `pyspark.sql.types.DoubleType` | number of source metrics that were used to derive this aggregated metric. For informational purposes only. |
121| processing_meta | `pyspark.sql.types.MapType(pyspark.sql.types.StringType, pyspark.sql.types.StringType, True)` | Key-Value pairs to store additional information, to aid processing|
122
123## References
124
125[1] [Spark SQL, DataFrames and Datasets
126Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)
127
128[2] [Spark
129DataTypes](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.types.DataType)
diff --git a/docs/generic-aggregation-components.md b/docs/generic-aggregation-components.md
new file mode 100644
index 0000000..f9cb919
--- /dev/null
+++ b/docs/generic-aggregation-components.md
@@ -0,0 +1,632 @@
1Team and repository tags
2========================
3
4[![Team and repository tags](https://governance.openstack.org/badges/monasca-transform.svg)](https://governance.openstack.org/reference/tags/index.html)
5
6<!-- Change things from this point on -->
7<!-- START doctoc generated TOC please keep comment here to allow auto update -->
8<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
9- [Monasca Transform Generic Aggregation Components](#monasca-transform-generic-aggregation-components)
10
11- [Monasca Transform Generic Aggregation Components](#monasca-transform-generic-aggregation-components)
12- [Introduction](#introduction)
13 - [1: Conversion of incoming metrics to record store data format](#1-conversion-of-incoming-metrics-to-record-store-data-format)
14 - [Pre Transform Spec](#pre-transform-spec)
15 - [2: Data aggregation using generic aggregation components](#2-data-aggregation-using-generic-aggregation-components)
16 - [Transform Specs](#transform-specs)
17 - [aggregation_params_map](#aggregation_params_map)
18 - [aggregation_pipeline](#aggregation_pipeline)
19 - [Other parameters](#other-parameters)
20 - [metric_group and metric_id](#metric_group-and-metric_id)
21 - [Generic Aggregation Components](#generic-aggregation-components)
22 - [Usage Components](#usage-components)
23 - [fetch_quantity](#fetch_quantity)
24 - [fetch_quantity_util](#fetch_quantity_util)
25 - [calculate_rate](#calculate_rate)
26 - [Setter Components](#setter-components)
27 - [set_aggregated_metric_name](#set_aggregated_metric_name)
28 - [set_aggregated_period](#set_aggregated_period)
29 - [rollup_quantity](#rollup_quantity)
30 - [Insert Components](#insert-components)
31 - [insert_data](#insert_data)
32 - [insert_data_pre_hourly](#insert_data_pre_hourly)
33 - [Processors](#processors)
34 - [pre_hourly_processor](#pre_hourly_processor)
35- [Putting it all together](#putting-it-all-together)
36
37<!-- END doctoc generated TOC please keep comment here to allow auto update -->
38# Monasca Transform Generic Aggregation Components
39
40# Introduction
41
42Monasca Transform uses standard ETL (Extract-Transform-Load) design pattern to aggregate monasca
43metrics and uses innovative data/configuration driven mechanism to drive processing. It accomplishes
44data aggregation in two distinct steps, each is driven using external configuration specifications,
45namely *pre-transform_spec* and *transform_spec*.
46
47## 1: Conversion of incoming metrics to record store data format ##
48
49In the first step, the incoming metrics are converted into a canonical data format called as record
50store data using *pre_transform_spec*.
51
52This logical processing data flow is explained in more detail in [Monasca/Transform wiki: Logical
53processing data flow section: Conversion to record store
54format](https://wiki.openstack.org/wiki/Monasca/Transform#Logical_processing_data_flow) and includes
55following operations:
56
57 * identifying metrics that are required (or in other words filtering out of unwanted metrics)
58
59 * validation and extraction of essential data in metric
60
61 * generating multiple records for incoming metrics if they are to be aggregated in multiple ways,
62 and finally
63
64 * conversion of the incoming metrics to canonical record store data format. Please refer to record
65 store section in [Data Formats](data_formats.md) for more information on record store format.
66
67### Pre Transform Spec ###
68
69Example *pre_transform_spec* for metric
70
71```
72{
73 "event_processing_params":{"set_default_zone_to":"1","set_default_geolocation_to":"1","set_default_region_to":"W"},
74 "event_type":"cpu.total_logical_cores",
75 "metric_id_list":["cpu_total_all","cpu_total_host","cpu_util_all","cpu_util_host"],
76 "required_raw_fields_list":["creation_time"],
77 "service_id":"host_metrics"
78}
79```
80
81*List of fields*
82
83| field name | values | description |
84| ---------- | ------ | ----------- |
85| event_processing_params | Set default field values `set_default_zone_to`, `set_default_geolocation_to`, `set_default_region_to`| Set default values for certain fields in the record store data |
86| event_type | Name of the metric | identifies metric that needs to be aggregated |
87| metric_id_list | List of `metric_id`'s | List of identifiers, should match `metric_id` in transform specs. This is used by record generation step to generate multiple records if this metric is to be aggregated in multiple ways|
88| required_raw_fields_list | List of `field` | List of fields (use JSON dotted notation) that are required in the incoming metric, used for validating incoming metric |
89| service_id | service identifier | Identifies the service to which this metric belongs to. Note: this field not yet used |
90
91## 2: Data aggregation using generic aggregation components ##
92
93In the second step, the canonical record store data is aggregated using *transform_spec*. Each
94*transform_spec* defines series of generic aggregation components, which are specified in
95`aggregation_params_map.aggregation_pipeline` section. (See *transform_spec* example below).
96
97Any parameters used by the generic aggregation components are also specified in the
98`aggregation_params_map` section (See *Other parameters* e.g. `aggregated_metric_name`, `aggregation_period`,
99`aggregation_group_by_list` etc. in *transform_spec* example below)
100
101### Transform Specs ###
102
103Example *transform_spec* for metric
104```
105{"aggregation_params_map":{
106 "aggregation_pipeline":{
107 "source":"streaming",
108 "usage":"fetch_quantity",
109 "setters":["rollup_quantity","set_aggregated_metric_name","set_aggregated_period"],
110 "insert":["prepare_data","insert_data_pre_hourly"]
111 },
112 "aggregated_metric_name":"cpu.total_logical_cores_agg",
113 "aggregation_period":"hourly",
114 "aggregation_group_by_list": ["host", "metric_id", "tenant_id"],
115 "usage_fetch_operation": "avg",
116 "filter_by_list": [],
117 "setter_rollup_group_by_list": [],
118 "setter_rollup_operation": "sum",
119 "dimension_list":["aggregation_period","host","project_id"],
120 "pre_hourly_operation":"avg",
121 "pre_hourly_group_by_list":["default"]
122 },
123 "metric_group":"cpu_total_all",
124 "metric_id":"cpu_total_all"
125}
126```
127
128#### aggregation_params_map ####
129
130This section specifies *aggregation_pipeline*, *Other parameters* (used by generic aggregation
131components in *aggregation_pipeline*).
132
133##### aggregation_pipeline #####
134
135Specifies generic aggregation components that should be used to process incoming metrics.
136
137Note: generic aggregation components are re-usable and can be used to build different aggregation
138pipelines as required.
139
140*List of fields*
141
142| field name | values | description |
143| ---------- | ------ | ----------- |
144| source | ```streaming``` | source is ```streaming```. In the future this can be used to specify a component which can fetch data directly from monasca datastore |
145| usage | ```fetch_quantity```, ```fetch_quantity_util```, ```calculate_rate``` | [Usage Components](https://github.com/openstack/monasca-transform/tree/master/monasca_transform/component/usage)|
146| setters | ```pre_hourly_calculate_rate```, ```rollup_quantity```, ```set_aggregated_metric_name```, ```set_aggregated_period``` | [Setter Components](https://github.com/openstack/monasca-transform/tree/master/monasca_transform/component/setter)|
147| insert | ```insert_data```, ```insert_data_pre_hourly``` | [Insert Components](https://github.com/openstack/monasca-transform/tree/master/monasca_transform/component/insert)|
148
149
150##### Other parameters #####
151
152Specifies parameters that generic aggregation components use to process and aggregate data.
153
154*List of Other parameters*
155
156| Parameter Name | Values | Description | Used by |
157| -------------- | ------ | ----------- | ------- |
158| aggregated_metric_name| e.g. "cpu.total_logical_cores_agg" | Name of the aggregated metric | [set_aggregated_metric_name](#set_aggregated_metric_name) |
159| aggregation_period |"hourly", "minutely" or "secondly" | Period over which to aggregate data. | [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate), [set_aggregated_period](#set_aggregated_period), [rollup_quantity](#rollup_quantity) |[fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) |
160| aggregation_group_by_list | e.g. "project_id", "hostname" | Group `record_store` data with these columns | [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) |
161| usage_fetch_operation | e.g "sum" | After the data is grouped by `aggregation_group_by_list`, perform this operation to find the aggregated metric value | [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) |
162| filter_by_list | Filter regex | Filter data using regex on a `record_store` column value| [fetch_quantity](#fetch_quantity), [fetch_quantity_util](#fetch_quantity_util), [calculate_rate](#calculate_rate) |
163| setter_rollup_group_by_list | e.g. "project_id" | Group by these set of fields | [rollup_quantity](#rollup_quantity) |
164| setter_rollup_operation | e.g. "avg" | After data is grouped by `setter_rollup_group_by_list`, peform this operation to find aggregated metric value | [rollup_quantity](#rollup_quantity) |
165| dimension_list | e.g. "aggregation_period", "host", "project_id" | List of fields which specify dimensions in aggregated metric | [insert_data](#insert_data), [insert_data_pre_hourly](#insert_data_pre_hourly)|
166| pre_hourly_group_by_list | e.g. "default" | List of `instance usage data` fields to do a group by operation to aggregate data | [pre_hourly_processor](#pre_hourly_processor) |
167| pre_hourly_operation | e.g. "avg" | When aggregating data published to `metrics_pre_hourly` every hour, perform this operation to find hourly aggregated metric value | [pre_hourly_processor](#pre_hourly_processor) |
168
169### metric_group and metric_id ###
170
171Specifies a metric or list of metrics from the record store data, which will be processed by this
172*transform_spec*. Note: This can be a single metric or a group of metrics that will be combined to
173produce the final aggregated metric.
174
175*List of fields*
176
177| field name | values | description |
178| ---------- | ------ | ----------- |
179| metric_group | unique transform spec group identifier | group identifier for this transform spec e.g. "cpu_total_all" |
180| metric_id | unique transform spec identifier | identifier for this transform spec e.g. "cpu_total_all" |
181
182**Note:** "metric_id" is a misnomer, it is not really a metric group/or metric identifier but rather
183identifier for transformation spec. This will be changed to "transform_spec_id" in the future.
184
185## Generic Aggregation Components ##
186
187*List of Generic Aggregation Components*
188
189### Usage Components ###
190
191All usage components implement a method
192
193```
194 def usage(transform_context, record_store_df):
195 ..
196 ..
197 return instance_usage_df
198```
199
200#### fetch_quantity ####
201
202This component groups record store records by `aggregation_group_by_list`, sorts within
203group by timestamp field, finds usage based on `usage_fetch_operation`. Optionally this
204component also takes `filter_by_list` to include for exclude certain records from usage
205calculation.
206
207*Other parameters*
208
209 * **aggregation_group_by_list**
210
211 List of fields to group by.
212
213 Possible values: any set of fields in record store data.
214
215 Example:
216
217 ```
218 "aggregation_group_by_list": ["tenant_id"]
219 ```
220 * **usage_fetch_operation**
221
222 Operation to be performed on grouped data set.
223
224 *Possible values:* "sum", "max", "min", "avg", "latest", "oldest"
225
226 * **aggregation_period**
227
228 Period to aggregate by.
229
230 *Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
231
232 Example:
233
234 ```
235 "aggregation_period": "hourly"
236 ```
237
238 * **filter_by_list**
239
240 Filter (include or exclude) record store data as specified.
241
242 Example:
243
244 ```
245 filter_by_list": "[{"field_to_filter": "hostname",
246 "filter_expression": "comp-(\d)+",
247 "filter_operation": "include"}]
248 ```
249
250 OR
251
252 ```
253 filter_by_list": "[{"field_to_filter": "hostname",
254 "filter_expression": "controller-(\d)+",
255 "filter_operation": "exclude"}]
256 ```
257
258#### fetch_quantity_util ####
259
260This component finds the utilized quantity based on *total_quantity* and *idle_perc* using
261following calculation
262
263```
264utilized_quantity = (100 - idle_perc) * total_quantity / 100
265```
266
267where,
268
269 * **total_quantity** data, identified by `usage_fetch_util_quantity_event_type` parameter and
270
271 * **idle_perc** data, identified by `usage_fetch_util_idle_perc_event_type` parameter
272
273This component initially groups record store records by `aggregation_group_by_list` and
274`event_type`, sorts within group by timestamp field, calculates `total_quantity` and
275`idle_perc` values based on `usage_fetch_operation`. `utilized_quantity` is then calculated
276using the formula given above.
277
278*Other parameters*
279
280 * **aggregation_group_by_list**
281
282 List of fields to group by.
283
284 Possible values: any set of fields in record store data.
285
286 Example:
287
288 ```
289 "aggregation_group_by_list": ["tenant_id"]
290 ```
291 * **usage_fetch_operation**
292
293 Operation to be performed on grouped data set
294
295 *Possible values:* "sum", "max", "min", "avg", "latest", "oldest"
296
297 * **aggregation_period**
298
299 Period to aggregate by.
300
301 *Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
302
303 Example:
304
305 ```
306 "aggregation_period": "hourly"
307 ```
308
309 * **filter_by_list**
310
311 Filter (include or exclude) record store data as specified
312
313 Example:
314
315 ```
316 filter_by_list": "[{"field_to_filter": "hostname",
317 "filter_expression": "comp-(\d)+",
318 "filter_operation": "include"}]
319 ```
320
321 OR
322
323 ```
324 filter_by_list": "[{"field_to_filter": "hostname",
325 "filter_expression": "controller-(\d)+",
326 "filter_operation": "exclude"}]
327 ```
328
329 * **usage_fetch_util_quantity_event_type**
330
331 event type (metric name) to identify data which will be used to calculate `total_quantity`
332
333 *Possible values:* metric name
334
335 Example:
336
337 ```
338 "usage_fetch_util_quantity_event_type": "cpu.total_logical_cores"
339 ```
340
341
342 * **usage_fetch_util_idle_perc_event_type**
343
344 event type (metric name) to identify data which will be used to calculate `total_quantity`
345
346 *Possible values:* metric name
347
348 Example:
349
350 ```
351 "usage_fetch_util_idle_perc_event_type": "cpu.idle_perc"
352 ```
353
354#### calculate_rate ####
355
356This component finds the rate of change of quantity (in percent) over a time period using
357following calculation
358
359```
360rate_of_change (in percent) = ((oldest_quantity - latest_quantity)/oldest_quantity) * 100
361```
362
363where,
364
365 * **oldest_quantity**: oldest (or earliest) `average` quantity if there are multiple quantites in a
366 group for a given time period.
367
368 * **latest_quantity**: latest `average` quantity if there are multiple quantities in a group
369 for a given time period
370
371*Other parameters*
372
373 * **aggregation_group_by_list**
374
375 List of fields to group by.
376
377 Possible values: any set of fields in record store data.
378
379 Example:
380
381 ```
382 "aggregation_group_by_list": ["tenant_id"]
383 ```
384 * **usage_fetch_operation**
385
386 Operation to be performed on grouped data set
387
388 *Possible values:* "sum", "max", "min", "avg", "latest", "oldest"
389
390 * **aggregation_period**
391
392 Period to aggregate by.
393
394 *Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
395
396 Example:
397
398 ```
399 "aggregation_period": "hourly"
400 ```
401
402 * **filter_by_list**
403
404 Filter (include or exclude) record store data as specified
405
406 Example:
407
408 ```
409 filter_by_list": "[{"field_to_filter": "hostname",
410 "filter_expression": "comp-(\d)+",
411 "filter_operation": "include"}]
412 ```
413
414 OR
415
416 ```
417 filter_by_list": "[{"field_to_filter": "hostname",
418 "filter_expression": "controller-(\d)+",
419 "filter_operation": "exclude"}]
420 ```
421
422
423### Setter Components ###
424
425All usage components implement a method
426
427```
428 def setter(transform_context, instance_usage_df):
429 ..
430 ..
431 return instance_usage_df
432```
433
434#### set_aggregated_metric_name ####
435
436This component sets final aggregated metric name by setting `aggregated_metric_name` field in
437`instance_usage` data.
438
439*Other parameters*
440
441 * **aggregated_metric_name**
442
443 Name of the metric name being generated.
444
445 *Possible values:* any aggregated metric name. Convention is to end the metric name
446 with "_agg".
447
448 Example:
449 ```
450 "aggregated_metric_name":"cpu.total_logical_cores_agg"
451 ```
452
453#### set_aggregated_period ####
454
455This component sets final aggregated metric name by setting `aggregation_period` field in
456`instance_usage` data.
457
458*Other parameters*
459
460 * **aggregated_period**
461
462 Name of the metric name being generated.
463
464 *Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
465
466 Example:
467 ```
468 "aggregation_period": "hourly"
469 ```
470
471**Note** If you are publishing metrics to *metrics_pre_hourly* kafka topic using
472`insert_data_pre_hourly` component(See *insert_data_pre_hourly* component below),
473`aggregation_period` will have to be set to `hourly`since by default all data in
474*metrics_pre_hourly* topic, by default gets aggregated every hour by `Pre Hourly Processor` (See
475`Processors` section below)
476
477#### rollup_quantity ####
478
479This component groups `instance_usage` records by `setter_rollup_group_by_list`, sorts within
480group by timestamp field, finds usage based on `setter_fetch_operation`.
481
482*Other parameters*
483
484 * **setter_rollup_group_by_list**
485
486 List of fields to group by.
487
488 Possible values: any set of fields in record store data.
489
490 Example:
491 ```
492 "setter_rollup_group_by_list": ["tenant_id"]
493 ```
494 * **setter_fetch_operation**
495
496 Operation to be performed on grouped data set
497
498 *Possible values:* "sum", "max", "min", "avg"
499
500 Example:
501 ```
502 "setter_fetch_operation": "avg"
503 ```
504
505 * **aggregation_period**
506
507 Period to aggregate by.
508
509 *Possible values:* 'daily', 'hourly', 'minutely', 'secondly'.
510
511 Example:
512
513 ```
514 "aggregation_period": "hourly"
515 ```
516
517### Insert Components ###
518
519All usage components implement a method
520
521```
522 def insert(transform_context, instance_usage_df):
523 ..
524 ..
525 return instance_usage_df
526```
527
528#### insert_data ####
529
530This component converts `instance_usage` data into monasca metric format and writes the metric to
531`metrics` topic in kafka.
532
533*Other parameters*
534
535 * **dimension_list**
536
537 List of fields in `instance_usage` data that should be converted to monasca metric dimensions.
538
539 *Possible values:* any fields in `instance_usage` data
540
541 Example:
542 ```
543 "dimension_list":["aggregation_period","host","project_id"]
544 ```
545
546#### insert_data_pre_hourly ####
547
548This component converts `instance_usage` data into monasca metric format and writes the metric to
549`metrics_pre_hourly` topic in kafka.
550
551*Other parameters*
552
553 * **dimension_list**
554
555 List of fields in `instance_usage` data that should be converted to monasca metric dimensions.
556
557 *Possible values:* any fields in `instance_usage` data
558
559 Example:
560 ```
561 "dimension_list":["aggregation_period","host","project_id"]
562 ```
563
564## Processors ##
565
566Processors are special components that process data from a kafka topic, at the desired time
567interval. These are different from generic aggregation components since they process data from
568specific kafka topic.
569
570All processor components implement following methods
571
572```
573def get_app_name(self):
574 [...]
575 return app_name
576
577def is_time_to_run(self, current_time):
578 if current_time > last_invoked + 1:
579 return True
580 else:
581 return False
582
583def run_processor(self, time):
584 # do work...
585```
586
587### pre_hourly_processor ###
588
589Pre Hourly Processor, runs every hour and aggregates `instance_usage` data published to
590`metrics_pre_hourly` topic.
591
592Pre Hourly Processor by default is set to run 10 minutes after the top of the hour and processes
593data from previous hour. `instance_usage` data is grouped by `pre_hourly_group_by_list`
594
595*Other parameters*
596
597 * **pre_hourly_group_by_list**
598
599 List of fields to group by.
600
601 Possible values: any set of fields in `instance_usage` data or to `default`
602
603 Note: setting to `default` will group `instance_usage` data by `tenant_id`, `user_id`,
604 `resource_uuid`, `geolocation`, `region`, `zone`, `host`, `project_id`,
605 `aggregated_metric_name`, `aggregation_period`
606
607 Example:
608 ```
609 "pre_hourly_group_by_list": ["tenant_id"]
610 ```
611
612 OR
613
614 ```
615 "pre_hourly_group_by_list": ["default"]
616 ```
617
618 * **pre_hourly_operation**
619
620 Operation to be performed on grouped data set.
621
622 *Possible values:* "sum", "max", "min", "avg", "rate"
623
624 Example:
625
626 ```
627 "pre_hourly_operation": "avg"
628 ```
629
630# Putting it all together
631Please refer to [Create a new aggregation pipeline](create-new-aggregation-pipeline.md) document to
632create a new aggregation pipeline.