Remove unique metric count aggregation
Removing unique metric count aggregation since it causes data from Kafka to be pulled and processed twice and leads to unecessary increase in over all time required to process a batch. Change-Id: I2046f95709232979dfd590d5293c803cac05bbb2
This commit is contained in:
parent
d76af6a0f0
commit
c3fcd61f93
|
@ -25,7 +25,6 @@ from pyspark.sql.functions import lit
|
|||
from pyspark.sql.functions import when
|
||||
from pyspark.sql import SQLContext
|
||||
|
||||
import json
|
||||
import logging
|
||||
from monasca_common.simport import simport
|
||||
from oslo_config import cfg
|
||||
|
@ -148,18 +147,6 @@ class MonMetricsKafkaProcessor(object):
|
|||
'.log'))
|
||||
rdd.saveAsTextFile(file_name)
|
||||
|
||||
@staticmethod
|
||||
def print_unique_metric_count(kvs):
|
||||
# print unique metric count
|
||||
lines = kvs.map(lambda x: x[1])
|
||||
counts = lines.map(
|
||||
lambda x: json.loads(x)["metric"]["name"]
|
||||
).map(
|
||||
lambda name: (name, 1)
|
||||
).reduceByKey(
|
||||
lambda a, b: a + b)
|
||||
counts.pprint(9999)
|
||||
|
||||
@staticmethod
|
||||
def save_kafka_offsets(current_offsets, app_name,
|
||||
batch_time_info):
|
||||
|
@ -564,9 +551,6 @@ def invoke():
|
|||
# transform to recordstore
|
||||
MonMetricsKafkaProcessor.transform_to_recordstore(kafka_stream)
|
||||
|
||||
# print unique metric count
|
||||
MonMetricsKafkaProcessor.print_unique_metric_count(kafka_stream)
|
||||
|
||||
# catch interrupt, stop streaming context gracefully
|
||||
# signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
|
|
Loading…
Reference in New Issue