diff --git a/monasca_transform/driver/mon_metrics_kafka.py b/monasca_transform/driver/mon_metrics_kafka.py index 06e8eee..7aa4114 100644 --- a/monasca_transform/driver/mon_metrics_kafka.py +++ b/monasca_transform/driver/mon_metrics_kafka.py @@ -25,7 +25,6 @@ from pyspark.sql.functions import lit from pyspark.sql.functions import when from pyspark.sql import SQLContext -import json import logging from monasca_common.simport import simport from oslo_config import cfg @@ -148,18 +147,6 @@ class MonMetricsKafkaProcessor(object): '.log')) rdd.saveAsTextFile(file_name) - @staticmethod - def print_unique_metric_count(kvs): - # print unique metric count - lines = kvs.map(lambda x: x[1]) - counts = lines.map( - lambda x: json.loads(x)["metric"]["name"] - ).map( - lambda name: (name, 1) - ).reduceByKey( - lambda a, b: a + b) - counts.pprint(9999) - @staticmethod def save_kafka_offsets(current_offsets, app_name, batch_time_info): @@ -564,9 +551,6 @@ def invoke(): # transform to recordstore MonMetricsKafkaProcessor.transform_to_recordstore(kafka_stream) - # print unique metric count - MonMetricsKafkaProcessor.print_unique_metric_count(kafka_stream) - # catch interrupt, stop streaming context gracefully # signal.signal(signal.SIGINT, signal_handler)