Remove unique metric count aggregation

Removing unique metric count aggregation
since it causes data from Kafka to be pulled
and processed twice and leads to unecessary
increase in over all time required to process
a batch.

Change-Id: I2046f95709232979dfd590d5293c803cac05bbb2
This commit is contained in:
Ashwin Agate 2016-11-17 20:04:49 +00:00 committed by David C Kennedy
parent d76af6a0f0
commit c3fcd61f93
1 changed files with 0 additions and 16 deletions

View File

@ -25,7 +25,6 @@ from pyspark.sql.functions import lit
from pyspark.sql.functions import when
from pyspark.sql import SQLContext
import json
import logging
from monasca_common.simport import simport
from oslo_config import cfg
@ -148,18 +147,6 @@ class MonMetricsKafkaProcessor(object):
'.log'))
rdd.saveAsTextFile(file_name)
@staticmethod
def print_unique_metric_count(kvs):
# print unique metric count
lines = kvs.map(lambda x: x[1])
counts = lines.map(
lambda x: json.loads(x)["metric"]["name"]
).map(
lambda name: (name, 1)
).reduceByKey(
lambda a, b: a + b)
counts.pprint(9999)
@staticmethod
def save_kafka_offsets(current_offsets, app_name,
batch_time_info):
@ -564,9 +551,6 @@ def invoke():
# transform to recordstore
MonMetricsKafkaProcessor.transform_to_recordstore(kafka_stream)
# print unique metric count
MonMetricsKafkaProcessor.print_unique_metric_count(kafka_stream)
# catch interrupt, stop streaming context gracefully
# signal.signal(signal.SIGINT, signal_handler)