diff --git a/monasca_transform/component/__init__.py b/monasca_transform/component/__init__.py index f084cc4..6ace350 100644 --- a/monasca_transform/component/__init__.py +++ b/monasca_transform/component/__init__.py @@ -31,8 +31,7 @@ InstanceUsageDataAggParamsBase = namedtuple('InstanceUsageDataAggParams', class InstanceUsageDataAggParams(InstanceUsageDataAggParamsBase): - """A tuple which is a wrapper containing the instance usage data - and aggregation params + """A tuple which is a wrapper containing the instance usage data and aggregation params namdetuple contains: diff --git a/monasca_transform/component/insert/__init__.py b/monasca_transform/component/insert/__init__.py index 2692df5..e18db5f 100644 --- a/monasca_transform/component/insert/__init__.py +++ b/monasca_transform/component/insert/__init__.py @@ -41,8 +41,7 @@ class InsertComponent(Component): @staticmethod def _validate_metric(metric): - """validate monasca metric. - """ + """validate monasca metric.""" try: # validate metric part, without the wrapper metric_validator.validate(metric["metric"]) @@ -124,9 +123,7 @@ class InsertComponent(Component): @staticmethod def _get_metric(row, agg_params): - """write data to kafka. extracts and formats - metric data and write s the data to kafka - """ + """write data to kafka. extracts and formats metric data and write s the data to kafka""" instance_usage_dict = {"tenant_id": row.tenant_id, "user_id": row.user_id, "resource_uuid": row.resource_uuid, @@ -171,9 +168,7 @@ class InsertComponent(Component): @staticmethod def _get_instance_usage_pre_hourly(row, metric_id): - """write data to kafka. extracts and formats - metric data and writes the data to kafka - """ + """write data to kafka. extracts and formats metric data and writes the data to kafka""" # retrieve the processing meta from the row processing_meta = row.processing_meta # add transform spec metric id to the processing meta @@ -228,9 +223,7 @@ class InsertComponent(Component): @staticmethod def _write_metrics_from_partition(partlistiter): - """iterate through all rdd elements in partition - and write metrics to kafka - """ + """iterate through all rdd elements in partition and write metrics to kafka""" for part in partlistiter: agg_params = part.agg_params row = part.instance_usage_data diff --git a/monasca_transform/component/insert/kafka_insert.py b/monasca_transform/component/insert/kafka_insert.py index 819c1f5..2cf4ad0 100644 --- a/monasca_transform/component/insert/kafka_insert.py +++ b/monasca_transform/component/insert/kafka_insert.py @@ -18,9 +18,7 @@ from monasca_transform.messaging.adapter import KafkaMessageAdapter class KafkaInsert(InsertComponent): - """Insert component that writes instance usage data - to kafka queue - """ + """Insert component that writes instance usage data to kafka queue""" @staticmethod def insert(transform_context, instance_usage_df): diff --git a/monasca_transform/component/insert/kafka_insert_pre_hourly.py b/monasca_transform/component/insert/kafka_insert_pre_hourly.py index ab59441..83980eb 100644 --- a/monasca_transform/component/insert/kafka_insert_pre_hourly.py +++ b/monasca_transform/component/insert/kafka_insert_pre_hourly.py @@ -18,9 +18,7 @@ from monasca_transform.messaging.adapter import KafkaMessageAdapterPreHourly class KafkaInsertPreHourly(InsertComponent): - """Insert component that writes instance usage data - to kafka queue - """ + """Insert component that writes instance usage data to kafka queue""" @staticmethod def insert(transform_context, instance_usage_df): diff --git a/monasca_transform/component/insert/prepare_data.py b/monasca_transform/component/insert/prepare_data.py index d463717..574ae81 100644 --- a/monasca_transform/component/insert/prepare_data.py +++ b/monasca_transform/component/insert/prepare_data.py @@ -16,9 +16,7 @@ from monasca_transform.component.insert import InsertComponent class PrepareData(InsertComponent): - """prepare for insert component validates instance usage - data before calling Insert component - """ + """prepare for insert component validates instance usage data before calling Insert component""" @staticmethod def insert(transform_context, instance_usage_df): """write instance usage data to kafka""" diff --git a/monasca_transform/component/setter/pre_hourly_calculate_rate.py b/monasca_transform/component/setter/pre_hourly_calculate_rate.py index 45b6943..c6cd8c1 100644 --- a/monasca_transform/component/setter/pre_hourly_calculate_rate.py +++ b/monasca_transform/component/setter/pre_hourly_calculate_rate.py @@ -24,6 +24,7 @@ import json class PreHourlyCalculateRateException(Exception): """Exception thrown when doing pre-hourly rate calculations + Attributes: value: string representing the error """ diff --git a/monasca_transform/component/setter/rollup_quantity.py b/monasca_transform/component/setter/rollup_quantity.py index 51b03f8..03f7cda 100644 --- a/monasca_transform/component/setter/rollup_quantity.py +++ b/monasca_transform/component/setter/rollup_quantity.py @@ -26,6 +26,7 @@ import json class RollupQuantityException(Exception): """Exception thrown when doing quantity rollup + Attributes: value: string representing the error """ diff --git a/monasca_transform/component/setter/set_aggregated_metric_name.py b/monasca_transform/component/setter/set_aggregated_metric_name.py index 14c4337..3268958 100644 --- a/monasca_transform/component/setter/set_aggregated_metric_name.py +++ b/monasca_transform/component/setter/set_aggregated_metric_name.py @@ -23,6 +23,7 @@ import json class SetAggregatedMetricName(SetterComponent): """setter component that sets final aggregated metric name. + aggregated metric name is available as a parameter 'aggregated_metric_name' in aggregation_params in metric processing driver table. """ @@ -79,9 +80,7 @@ class SetAggregatedMetricName(SetterComponent): @staticmethod def setter(transform_context, instance_usage_df): - """set the aggregated metric name field for elements in instance usage - rdd - """ + """set the aggregated metric name field for elements in instance usage rdd""" transform_spec_df = transform_context.transform_spec_df_info diff --git a/monasca_transform/component/setter/set_aggregated_period.py b/monasca_transform/component/setter/set_aggregated_period.py index fdcdeef..9ee7d8a 100644 --- a/monasca_transform/component/setter/set_aggregated_period.py +++ b/monasca_transform/component/setter/set_aggregated_period.py @@ -23,6 +23,7 @@ import json class SetAggregatedPeriod(SetterComponent): """setter component that sets final aggregated metric name. + aggregated metric name is available as a parameter 'aggregated_metric_name' in aggregation_params in metric processing driver table. """ @@ -80,9 +81,7 @@ class SetAggregatedPeriod(SetterComponent): @staticmethod def setter(transform_context, instance_usage_df): - """set the aggregated metric name field for elements in instance usage - rdd - """ + """set the aggregated metric name field for elements in instance usage rdd""" transform_spec_df = transform_context.transform_spec_df_info diff --git a/monasca_transform/component/usage/calculate_rate.py b/monasca_transform/component/usage/calculate_rate.py index 9a03ea8..e780374 100644 --- a/monasca_transform/component/usage/calculate_rate.py +++ b/monasca_transform/component/usage/calculate_rate.py @@ -25,6 +25,7 @@ import json class CalculateRateException(Exception): """Exception thrown when calculating rate + Attributes: value: string representing the error """ @@ -40,7 +41,9 @@ class CalculateRate(UsageComponent): @staticmethod def usage(transform_context, record_store_df): - """component which groups together record store records by + """Method to return instance usage dataframe: + + It groups together record store records by provided group by columns list,sorts within the group by event timestamp field, calculates the rate of change between the oldest and latest values, and returns the resultant value as an diff --git a/monasca_transform/component/usage/fetch_quantity.py b/monasca_transform/component/usage/fetch_quantity.py index 2622268..7e099bc 100644 --- a/monasca_transform/component/usage/fetch_quantity.py +++ b/monasca_transform/component/usage/fetch_quantity.py @@ -33,6 +33,7 @@ import json class FetchQuantityException(Exception): """Exception thrown when fetching quantity + Attributes: value: string representing the error """ @@ -50,8 +51,7 @@ GroupedDataWithOperation = namedtuple("GroupedDataWithOperation", class GroupedDataWithOperation(GroupedDataWithOperation): - """A tuple which is a wrapper containing record store data - and the usage operation + """A tuple which is a wrapper containing record store data and the usage operation namdetuple contains: @@ -76,8 +76,10 @@ class FetchQuantity(UsageComponent): @staticmethod def _get_latest_oldest_quantity(grouping_results_with_operation): - """get quantity for each group by performing the requested - usage operation and return a instance usage data. + """Method to return an instance usage data + + Get quantity for each group by performing the requested + usage operation and return an instance usage data. """ # row @@ -337,10 +339,12 @@ class FetchQuantity(UsageComponent): @staticmethod def usage(transform_context, record_store_df): - """component which groups together record store records by + """Method to return the latest quantity as an instance usage dataframe: + + It groups together record store records by provided group by columns list , sorts within the group by event timestamp field, applies group stats udf and returns the latest - quantity as a instance usage dataframe + quantity as an instance usage dataframe """ transform_spec_df = transform_context.transform_spec_df_info @@ -358,10 +362,12 @@ class FetchQuantity(UsageComponent): @staticmethod def usage_by_operation(transform_context, record_store_df, usage_fetch_operation): - """component which groups together record store records by + """Returns the latest quantity as a instance usage dataframe + + It groups together record store records by provided group by columns list , sorts within the group by event timestamp field, applies group stats udf and returns the latest - quantity as a instance usage dataframe + quantity as an instance usage dataframe """ transform_spec_df = transform_context.transform_spec_df_info diff --git a/monasca_transform/component/usage/fetch_quantity_util.py b/monasca_transform/component/usage/fetch_quantity_util.py index 624eb9d..369be93 100644 --- a/monasca_transform/component/usage/fetch_quantity_util.py +++ b/monasca_transform/component/usage/fetch_quantity_util.py @@ -28,6 +28,7 @@ import json class FetchQuantityUtilException(Exception): """Exception thrown when fetching quantity + Attributes: value: string representing the error """ @@ -62,8 +63,9 @@ class FetchQuantityUtil(UsageComponent): @staticmethod def _format_quantity_util(row): - """calculate the utilized quantity based on idle percentage - quantity and convert to instance usage format + """Converts calculated utilized quantity to an instance usage format + + Calculation based on idle percentage """ # tenant_id = getattr(row, "tenant_id", "all") @@ -141,7 +143,9 @@ class FetchQuantityUtil(UsageComponent): @staticmethod def usage(transform_context, record_store_df): - """component which groups together record store records by + """Method to return instance usage dataframe: + + It groups together record store records by provided group by columns list, sorts within the group by event timestamp field, applies group stats udf and returns the latest quantity as a instance usage dataframe diff --git a/monasca_transform/driver/mon_metrics_kafka.py b/monasca_transform/driver/mon_metrics_kafka.py index e2a0724..66edbd0 100644 --- a/monasca_transform/driver/mon_metrics_kafka.py +++ b/monasca_transform/driver/mon_metrics_kafka.py @@ -221,6 +221,7 @@ class MonMetricsKafkaProcessor(object): @staticmethod def process_metric(transform_context, record_store_df): """process (aggregate) metric data from record_store data + All the parameters to drive processing should be available in transform_spec_df dataframe. """ @@ -231,8 +232,7 @@ class MonMetricsKafkaProcessor(object): @staticmethod def process_metrics(transform_context, record_store_df): - """start processing (aggregating) metrics - """ + """start processing (aggregating) metrics""" # # look in record_store_df for list of metrics to be processed # @@ -536,6 +536,7 @@ class MonMetricsKafkaProcessor(object): @staticmethod def transform_to_recordstore(kvs): """Transform metrics data from kafka to record store format. + extracts, validates, filters, generates data from kakfa to only keep data that has to be aggregated. Generate data generates multiple records for for the same incoming metric if the metric has multiple diff --git a/monasca_transform/offset_specs.py b/monasca_transform/offset_specs.py index 214492e..0da0a12 100644 --- a/monasca_transform/offset_specs.py +++ b/monasca_transform/offset_specs.py @@ -60,6 +60,7 @@ class OffsetSpec(object): @six.add_metaclass(abc.ABCMeta) class OffsetSpecs(object): """Class representing offset specs to help recover. + From where processing should pick up in case of failure """ diff --git a/monasca_transform/processor/__init__.py b/monasca_transform/processor/__init__.py index f1b7ee6..1c99340 100644 --- a/monasca_transform/processor/__init__.py +++ b/monasca_transform/processor/__init__.py @@ -19,9 +19,7 @@ class Processor(object): @abc.abstractmethod def get_app_name(self): - """get name of this application. Will be used to - store offsets in database - """ + """get name of this application. Will be used to store offsets in database""" raise NotImplementedError( "Class %s doesn't implement get_app_name()" % self.__class__.__name__) diff --git a/monasca_transform/processor/pre_hourly_processor.py b/monasca_transform/processor/pre_hourly_processor.py index b01b8e2..7877751 100644 --- a/monasca_transform/processor/pre_hourly_processor.py +++ b/monasca_transform/processor/pre_hourly_processor.py @@ -59,7 +59,9 @@ class PreHourlyProcessorDataProvider(ProcessUtilDataProvider): class PreHourlyProcessor(Processor): - """Processor to process usage data published to metrics_pre_hourly topic a + """Publish metrics in kafka + + Processor to process usage data published to metrics_pre_hourly topic a and publish final rolled up metrics to metrics topic in kafka. """ @@ -95,9 +97,7 @@ class PreHourlyProcessor(Processor): @staticmethod def get_app_name(): - """get name of this application. Will be used to - store offsets in database - """ + """get name of this application. Will be used to store offsets in database""" return "mon_metrics_kafka_pre_hourly" @staticmethod @@ -113,9 +113,7 @@ class PreHourlyProcessor(Processor): def _get_offsets_from_kafka(brokers, topic, offset_time): - """get dict representing kafka - offsets. - """ + """get dict representing kafka offsets.""" # get client client = KafkaClient(brokers) @@ -144,9 +142,7 @@ class PreHourlyProcessor(Processor): @staticmethod def _parse_saved_offsets(app_name, topic, saved_offset_spec): - """get dict representing saved - offsets. - """ + """get dict representing saved offsets.""" offset_dict = {} for key, value in saved_offset_spec.items(): if key.startswith("%s_%s" % (app_name, topic)): @@ -197,8 +193,7 @@ class PreHourlyProcessor(Processor): topic, app_name, saved_offset_spec): - """get offset range from saved offset to latest. - """ + """get offset range from saved offset to latest.""" offset_range_list = [] # https://cwiki.apache.org/confluence/display/KAFKA/ @@ -243,8 +238,9 @@ class PreHourlyProcessor(Processor): @staticmethod def get_processing_offset_range_list(processing_time): - """get offset range to fetch data from. The - range will last from the last saved offsets to current offsets + """Get offset range to fetch data from. + + The range will last from the last saved offsets to current offsets available. If there are no last saved offsets available in the database the starting offsets will be set to the earliest available in kafka. @@ -284,13 +280,13 @@ class PreHourlyProcessor(Processor): @staticmethod def get_offset_specs(): - """get offset specifications. - """ + """get offset specifications.""" return simport.load(cfg.CONF.repositories.offsets)() @staticmethod def get_effective_offset_range_list(offset_range_list): - """get effective batch offset range. + """Get effective batch offset range. + Effective batch offset range covers offsets starting from effective batch revision (defined by effective_batch_revision config property). By default this method will set the @@ -432,8 +428,9 @@ class PreHourlyProcessor(Processor): @staticmethod def filter_out_records_not_in_current_batch(instance_usage_df): - """Filter out any records which don't pertain to the - current batch (i.e., records before or after the + """Filter out any records which don't pertain to the current batch + + (i.e., records before or after the batch currently being processed). """ # get the most recent batch time from the stored offsets @@ -476,7 +473,9 @@ class PreHourlyProcessor(Processor): @staticmethod def process_instance_usage(transform_context, instance_usage_df): - """second stage aggregation. Aggregate instance usage rdd + """Second stage aggregation. + + Aggregate instance usage rdd data and write results to metrics topic in kafka. """ @@ -525,8 +524,7 @@ class PreHourlyProcessor(Processor): @staticmethod def do_transform(instance_usage_df): - """start processing (aggregating) metrics - """ + """start processing (aggregating) metrics""" # # look in instance_usage_df for list of metrics to be processed # @@ -571,10 +569,11 @@ class PreHourlyProcessor(Processor): @staticmethod def run_processor(spark_context, processing_time): - """process data in metrics_pre_hourly queue, starting - from the last saved offsets, else start from earliest - offsets available - """ + """Process data in metrics_pre_hourly queue + + Starting from the last saved offsets, else start from earliest + offsets available + """ offset_range_list = ( PreHourlyProcessor.get_processing_offset_range_list( diff --git a/monasca_transform/processor/processor_util.py b/monasca_transform/processor/processor_util.py index 8d35cf6..58628c1 100644 --- a/monasca_transform/processor/processor_util.py +++ b/monasca_transform/processor/processor_util.py @@ -40,6 +40,7 @@ class PreHourlyProcessorUtil(object): @staticmethod def is_time_to_run(check_date_time): """return True if its time to run this processor. + It is time to run the processor if: The processor has no previous recorded run time. It is more than the configured 'late_metric_slack_time' (to allow diff --git a/monasca_transform/service/transform_service.py b/monasca_transform/service/transform_service.py index b219eea..02d0491 100644 --- a/monasca_transform/service/transform_service.py +++ b/monasca_transform/service/transform_service.py @@ -43,6 +43,7 @@ def main(): def shutdown_all_threads_and_die(): """Shut down all threads and exit process. + Hit it with a hammer to kill all threads and die. """ LOG = log.getLogger(__name__) @@ -51,9 +52,7 @@ def shutdown_all_threads_and_die(): def get_process(proc_name): - """Get process given string in - process cmd line. - """ + """Get process given string in process cmd line.""" LOG = log.getLogger(__name__) proc = None try: @@ -91,8 +90,7 @@ def stop_spark_submit_process(): class Transform(os_service.Service): - """Class used with Openstack service. - """ + """Class used with Openstack service.""" LOG = log.getLogger(__name__) @@ -143,9 +141,7 @@ class TransformService(threading.Thread): CONF.service.election_polling_frequency)) def check_if_still_leader(self): - """Return true if the this host is the - leader - """ + """Return true if the this host is the leader""" leader = None try: leader = self.coordinator.get_leader(self.group).get() @@ -289,8 +285,7 @@ class TransformService(threading.Thread): def main_service(): - """Method to use with Openstack service. - """ + """Method to use with Openstack service.""" ConfigInitializer.basic_config() LogUtils.init_logger(__name__) launcher = os_service.ServiceLauncher(cfg.CONF) diff --git a/monasca_transform/transform/__init__.py b/monasca_transform/transform/__init__.py index afb0480..a41cb4e 100644 --- a/monasca_transform/transform/__init__.py +++ b/monasca_transform/transform/__init__.py @@ -23,9 +23,7 @@ TransformContextBase = namedtuple("TransformContext", class TransformContext(TransformContextBase): - """A tuple which contains all the configuration information - to drive processing - + """A tuple which contains all the configuration information to drive processing namedtuple contains: diff --git a/monasca_transform/transform/builder/generic_transform_builder.py b/monasca_transform/transform/builder/generic_transform_builder.py index d628928..9555d81 100644 --- a/monasca_transform/transform/builder/generic_transform_builder.py +++ b/monasca_transform/transform/builder/generic_transform_builder.py @@ -16,9 +16,10 @@ from monasca_transform.log_utils import LogUtils from stevedore import extension -class GenericTransformBuilder (object): - """Build transformation pipeline based on - aggregation_pipeline spec in metric processing +class GenericTransformBuilder(object): + """Build transformation pipeline + + Based on aggregation_pipeline spec in metric processing configuration """ @@ -67,9 +68,8 @@ class GenericTransformBuilder (object): @staticmethod def _parse_transform_pipeline(transform_spec_df): - """parse aggregation pipeline from metric - processing configuration - """ + """Parse aggregation pipeline from metric processing configuration""" + # get aggregation pipeline df aggregation_pipeline_df = transform_spec_df\ .select("aggregation_params_map.aggregation_pipeline") @@ -95,8 +95,10 @@ class GenericTransformBuilder (object): @staticmethod def do_transform(transform_context, record_store_df): - """Build a dynamic aggregation pipeline and call components to - process record store dataframe + """Method to return instance usage dataframe + + Build a dynamic aggregation pipeline + and call components to process record store dataframe """ transform_spec_df = transform_context.transform_spec_df_info (source, diff --git a/monasca_transform/transform/grouping/__init__.py b/monasca_transform/transform/grouping/__init__.py index a17e74c..8fb06ae 100644 --- a/monasca_transform/transform/grouping/__init__.py +++ b/monasca_transform/transform/grouping/__init__.py @@ -20,8 +20,7 @@ RecordStoreWithGroupByBase = namedtuple("RecordStoreWithGroupBy", class RecordStoreWithGroupBy(RecordStoreWithGroupByBase): - """A tuple which is a wrapper containing record store data - and the group by columns + """A tuple which is a wrapper containing record store data and the group by columns namdetuple contains: @@ -36,8 +35,7 @@ GroupingResultsBase = namedtuple("GroupingResults", class GroupingResults(GroupingResultsBase): - """A tuple which is a wrapper containing grouping key - and grouped result set + """A tuple which is a wrapper containing grouping key and grouped result set namdetuple contains: @@ -52,7 +50,9 @@ class Grouping(object): @staticmethod def _parse_grouping_key(grouping_str): - """parse grouping key which in "^key1=value1^key2=value2..." format + """parse grouping key + + which in "^key1=value1^key2=value2..." format into a dictionary of key value pairs """ group_by_dict = {} diff --git a/monasca_transform/transform/grouping/group_sort_by_timestamp.py b/monasca_transform/transform/grouping/group_sort_by_timestamp.py index 091df4b..31a3a18 100644 --- a/monasca_transform/transform/grouping/group_sort_by_timestamp.py +++ b/monasca_transform/transform/grouping/group_sort_by_timestamp.py @@ -42,7 +42,9 @@ class GroupSortbyTimestamp(Grouping): @staticmethod def _prepare_for_group_by(record_store_with_group_by_rdd): - """creates a new rdd where the first element of each row + """creates a new rdd where: + + the first element of each row contains array of grouping key and event timestamp fields. Grouping key and event timestamp fields are used by partitioning and sorting function to partition the data @@ -100,7 +102,9 @@ class GroupSortbyTimestamp(Grouping): @staticmethod def _get_group_first_last_quantity_udf(grouplistiter): - """Return stats that include first row key, first_event_timestamp, + """Return stats that include: + + first row key, first_event_timestamp, first event quantity, last_event_timestamp and last event quantity """ first_row = None @@ -159,7 +163,9 @@ class GroupSortbyTimestamp(Grouping): def fetch_group_latest_oldest_quantity(record_store_df, transform_spec_df, group_by_columns_list): - """function to group record store data, sort by timestamp within group + """Function to group record store data + + Sort by timestamp within group and get first and last timestamp along with quantity within each group This function uses key-value pair rdd's groupBy function to do group_by diff --git a/monasca_transform/transform/grouping/group_sort_by_timestamp_partition.py b/monasca_transform/transform/grouping/group_sort_by_timestamp_partition.py index 1aaecd5..df3982a 100644 --- a/monasca_transform/transform/grouping/group_sort_by_timestamp_partition.py +++ b/monasca_transform/transform/grouping/group_sort_by_timestamp_partition.py @@ -26,9 +26,11 @@ class GroupSortbyTimestampPartition(Grouping): @staticmethod def _get_group_first_last_quantity_udf(partition_list_iter): - """user defined function to to through a list of partitions. Each - partition contains elements for a group. All the elements are sorted by + """User defined function to go through a list of partitions. + + Each partition contains elements for a group. All the elements are sorted by timestamp. + The stats include first row key, first_event_timestamp, fist event quantity, last_event_timestamp and last event quantity """ @@ -87,8 +89,11 @@ class GroupSortbyTimestampPartition(Grouping): @staticmethod def _prepare_for_group_by(record_store_with_group_by_rdd): - """creates a new rdd where the first element of each row - contains array of grouping key and event timestamp fields. + """Creates a new rdd where: + + The first element of each row contains array of grouping + key and event timestamp fields. + Grouping key and event timestamp fields are used by partitioning and sorting function to partition the data by grouping key and then sort the elements in a group by the @@ -118,7 +123,9 @@ class GroupSortbyTimestampPartition(Grouping): @staticmethod def _get_partition_by_group(group_composite): - """get a hash of the grouping key, which is then used by partitioning + """Get a hash of the grouping key, + + which is then used by partitioning function to get partition where the groups data should end up in. It uses hash % num_partitions to get partition """ @@ -133,8 +140,7 @@ class GroupSortbyTimestampPartition(Grouping): @staticmethod def _sort_by_timestamp(group_composite): - """get timestamp which will be used to sort grouped data - """ + """get timestamp which will be used to sort grouped data""" event_timestamp_string = group_composite[1] return event_timestamp_string @@ -142,9 +148,7 @@ class GroupSortbyTimestampPartition(Grouping): def _group_sort_by_timestamp_partition(record_store_df, group_by_columns_list, num_of_groups): - """component that does a group by and then sorts all - the items within the group by event timestamp. - """ + """It does a group by and then sorts all the items within the group by event timestamp.""" # convert the dataframe rdd to normal rdd and add the group by # column list record_store_with_group_by_rdd = record_store_df.rdd.\ @@ -174,6 +178,7 @@ class GroupSortbyTimestampPartition(Grouping): @staticmethod def _remove_none_filter(row): """remove any rows which have None as grouping key + [GroupingResults(grouping_key="key1", results={})] rows get created when partition does not get any grouped data assigned to it """ @@ -185,22 +190,18 @@ class GroupSortbyTimestampPartition(Grouping): transform_spec_df, group_by_columns_list, num_of_groups): - """function to group record store data, sort by timestamp within group + """Function to group record store data + + Sort by timestamp within group and get first and last timestamp along with quantity within each group - To do group by it uses custom partitioning function which creates a new - partition - for each group and uses RDD's repartitionAndSortWithinPartitions + partition for each group and uses RDD's repartitionAndSortWithinPartitions function to do the grouping and sorting within the group. - This is more scalable than just using RDD's group_by as using this - technique - group is not materialized into a list and stored in memory, but rather - it uses RDD's in built partitioning capability to do the sort - - num_of_groups should be more than expected groups, otherwise the same - partition can get used for two groups which will cause incorrect - results. + technique group is not materialized into a list and stored in memory, but rather + it uses RDD's in built partitioning capability to do the sort num_of_groups should + be more than expected groups, otherwise the same + partition can get used for two groups which will cause incorrect results. """ # group and order elements in group using repartition diff --git a/monasca_transform/transform/grouping/group_sortby_timestamp.py b/monasca_transform/transform/grouping/group_sortby_timestamp.py index 36d2b90..305a14e 100644 --- a/monasca_transform/transform/grouping/group_sortby_timestamp.py +++ b/monasca_transform/transform/grouping/group_sortby_timestamp.py @@ -42,7 +42,9 @@ class GroupSortbyTimestamp(Grouping): @staticmethod def _prepare_for_groupby(record_store_with_groupby_rdd): - """creates a new rdd where the first element of each row + """creates a new rdd where: + + the first element of each row contains array of grouping key and event timestamp fields. Grouping key and event timestamp fields are used by partitioning and sorting function to partition the data @@ -99,7 +101,9 @@ class GroupSortbyTimestamp(Grouping): @staticmethod def _get_group_first_last_quantity_udf(grouplistiter): - """Return stats that include first row key, first_event_timestamp, + """Return stats that include: + + first row key, first_event_timestamp, first event quantity, last_event_timestamp and last event quantity """ first_row = None @@ -158,7 +162,9 @@ class GroupSortbyTimestamp(Grouping): def fetch_group_latest_oldest_quantity(record_store_df, transform_spec_df, groupby_columns_list): - """function to group record store data, sort by timestamp within group + """To group record store data + + sort by timestamp within group and get first and last timestamp along with quantity within each group This function uses key-value pair rdd's groupBy function to do groupby diff --git a/monasca_transform/transform/grouping/group_sortby_timestamp_partition.py b/monasca_transform/transform/grouping/group_sortby_timestamp_partition.py index d373181..6a63cc1 100644 --- a/monasca_transform/transform/grouping/group_sortby_timestamp_partition.py +++ b/monasca_transform/transform/grouping/group_sortby_timestamp_partition.py @@ -26,8 +26,9 @@ class GroupSortbyTimestampPartition(Grouping): @staticmethod def _get_group_first_last_quantity_udf(partitionlistiter): - """user defined function to to through a list of partitions. Each - partition contains elements for a group. All the elements are sorted by + """user defined function to to through a list of partitions. + + Each partition contains elements for a group. All the elements are sorted by timestamp. The stats include first row key, first_event_timestamp, fist event quantity, last_event_timestamp and last event quantity @@ -87,7 +88,9 @@ class GroupSortbyTimestampPartition(Grouping): @staticmethod def _prepare_for_groupby(record_store_with_groupby_rdd): - """creates a new rdd where the first element of each row + """creates a new rdd where: + + the first element of each row contains array of grouping key and event timestamp fields. Grouping key and event timestamp fields are used by partitioning and sorting function to partition the data @@ -118,7 +121,9 @@ class GroupSortbyTimestampPartition(Grouping): @staticmethod def _get_partition_by_group(group_composite): - """get a hash of the grouping key, which is then used by partitioning + """get a hash of the grouping key, + + which is then used by partitioning function to get partition where the groups data should end up in. It uses hash % num_partitions to get partition """ @@ -133,8 +138,7 @@ class GroupSortbyTimestampPartition(Grouping): @staticmethod def _sortby_timestamp(group_composite): - """get timestamp which will be used to sort grouped data - """ + """get timestamp which will be used to sort grouped data""" event_timestamp_string = group_composite[1] return event_timestamp_string @@ -142,9 +146,7 @@ class GroupSortbyTimestampPartition(Grouping): def _group_sortby_timestamp_partition(record_store_df, groupby_columns_list, num_of_groups): - """component that does a group by and then sorts all - the items within the group by event timestamp. - """ + """It does a group by and then sorts all the items within the group by event timestamp.""" # convert the dataframe rdd to normal rdd and add the group by # column list record_store_with_groupby_rdd = record_store_df.rdd.\ @@ -174,6 +176,7 @@ class GroupSortbyTimestampPartition(Grouping): @staticmethod def _remove_none_filter(row): """remove any rows which have None as grouping key + [GroupingResults(grouping_key="key1", results={})] rows get created when partition does not get any grouped data assigned to it """ @@ -185,7 +188,9 @@ class GroupSortbyTimestampPartition(Grouping): transform_spec_df, groupby_columns_list, num_of_groups): - """function to group record store data, sort by timestamp within group + """Function to group record store data + + Sort by timestamp within group and get first and last timestamp along with quantity within each group To do group by it uses custom partitioning function which creates a new diff --git a/monasca_transform/transform/storage_utils.py b/monasca_transform/transform/storage_utils.py index 2bfcab0..e9cf2ad 100644 --- a/monasca_transform/transform/storage_utils.py +++ b/monasca_transform/transform/storage_utils.py @@ -17,6 +17,7 @@ from pyspark import StorageLevel class InvalidCacheStorageLevelException(Exception): """Exception thrown when an invalid cache storage level is encountered + Attributes: value: string representing the error """ @@ -33,9 +34,7 @@ class StorageUtils(object): @staticmethod def get_storage_level(storage_level_str): - """get pyspark storage level from storage level - string - """ + """get pyspark storage level from storage level string""" if (storage_level_str == "DISK_ONLY"): return StorageLevel.DISK_ONLY elif (storage_level_str == "DISK_ONLY_2"): diff --git a/tests/functional/component/insert/dummy_insert.py b/tests/functional/component/insert/dummy_insert.py index edd68fd..75ec5c8 100644 --- a/tests/functional/component/insert/dummy_insert.py +++ b/tests/functional/component/insert/dummy_insert.py @@ -19,9 +19,7 @@ from tests.functional.messaging.adapter import DummyAdapter class DummyInsert(InsertComponent): - """Insert component that writes metric data to - to kafka queue - """ + """Insert component that writes metric data to kafka queue""" @staticmethod def insert(transform_context, instance_usage_df): diff --git a/tests/functional/component/insert/dummy_insert_pre_hourly.py b/tests/functional/component/insert/dummy_insert_pre_hourly.py index 3f3dd50..72b7c8b 100644 --- a/tests/functional/component/insert/dummy_insert_pre_hourly.py +++ b/tests/functional/component/insert/dummy_insert_pre_hourly.py @@ -19,9 +19,7 @@ from tests.functional.messaging.adapter import DummyAdapter class DummyInsertPreHourly(InsertComponent): - """Insert component that writes metric data to - to kafka queue - """ + """Insert component that writes metric data to kafka queue""" @staticmethod def insert(transform_context, instance_usage_df): diff --git a/tests/functional/data_driven_specs/test_data_driven_specs.py b/tests/functional/data_driven_specs/test_data_driven_specs.py index 995f6ce..0058a13 100644 --- a/tests/functional/data_driven_specs/test_data_driven_specs.py +++ b/tests/functional/data_driven_specs/test_data_driven_specs.py @@ -542,6 +542,7 @@ class TestDataDrivenSpecsRepo(SparkContextTest): event_type=None, pre_transform_specs_data_frame=None): """get row for event type + :rtype: Row """ rows = pre_transform_specs_data_frame.filter( diff --git a/tests/functional/json_offset_specs.py b/tests/functional/json_offset_specs.py index 0426177..b7e137c 100644 --- a/tests/functional/json_offset_specs.py +++ b/tests/functional/json_offset_specs.py @@ -53,8 +53,7 @@ class JSONOffsetSpecs(OffsetSpecs): log.info('No kafka offsets found at startup') def _save(self): - """get the specs of last run time of offset - """ + """get the specs of last run time of offset""" log.info("Saving json offsets: %s", self._kafka_offsets) with open(self.kafka_offset_spec_file, 'w') as offset_file: diff --git a/tox.ini b/tox.ini index 5990b01..d1ff48c 100644 --- a/tox.ini +++ b/tox.ini @@ -83,11 +83,7 @@ commands = [flake8] max-complexity = 30 -# TODO: ignored checks should be enabled in the future -# H904 Wrap long lines in parentheses instead of a backslash (DEPRECATED) -# H405 Multiline docstring separated by empty line -# E402 module level import not at top of file FIXME remove this -ignore = H904,H405,E402 +max-line-length = 100 # H106 Don’t put vim configuration in source files # H203 Use assertIs(Not)None to check for None enable-extensions=H106,H203