Update pep8 checks

* set the maximum line length to 100
* cleaned up the codes for pep8

Change-Id: Iab260a4e77584aae31c0596f39146dd5092b807a
Signed-off-by: Amir Mofakhar <amofakhar@op5.com>
This commit is contained in:
Amir Mofakhar 2018-04-12 10:54:22 +02:00
parent c8aa020432
commit 37d4f09057
31 changed files with 154 additions and 151 deletions

View File

@ -31,8 +31,7 @@ InstanceUsageDataAggParamsBase = namedtuple('InstanceUsageDataAggParams',
class InstanceUsageDataAggParams(InstanceUsageDataAggParamsBase):
"""A tuple which is a wrapper containing the instance usage data
and aggregation params
"""A tuple which is a wrapper containing the instance usage data and aggregation params
namdetuple contains:

View File

@ -41,8 +41,7 @@ class InsertComponent(Component):
@staticmethod
def _validate_metric(metric):
"""validate monasca metric.
"""
"""validate monasca metric."""
try:
# validate metric part, without the wrapper
metric_validator.validate(metric["metric"])
@ -124,9 +123,7 @@ class InsertComponent(Component):
@staticmethod
def _get_metric(row, agg_params):
"""write data to kafka. extracts and formats
metric data and write s the data to kafka
"""
"""write data to kafka. extracts and formats metric data and write s the data to kafka"""
instance_usage_dict = {"tenant_id": row.tenant_id,
"user_id": row.user_id,
"resource_uuid": row.resource_uuid,
@ -171,9 +168,7 @@ class InsertComponent(Component):
@staticmethod
def _get_instance_usage_pre_hourly(row,
metric_id):
"""write data to kafka. extracts and formats
metric data and writes the data to kafka
"""
"""write data to kafka. extracts and formats metric data and writes the data to kafka"""
# retrieve the processing meta from the row
processing_meta = row.processing_meta
# add transform spec metric id to the processing meta
@ -228,9 +223,7 @@ class InsertComponent(Component):
@staticmethod
def _write_metrics_from_partition(partlistiter):
"""iterate through all rdd elements in partition
and write metrics to kafka
"""
"""iterate through all rdd elements in partition and write metrics to kafka"""
for part in partlistiter:
agg_params = part.agg_params
row = part.instance_usage_data

View File

@ -18,9 +18,7 @@ from monasca_transform.messaging.adapter import KafkaMessageAdapter
class KafkaInsert(InsertComponent):
"""Insert component that writes instance usage data
to kafka queue
"""
"""Insert component that writes instance usage data to kafka queue"""
@staticmethod
def insert(transform_context, instance_usage_df):

View File

@ -18,9 +18,7 @@ from monasca_transform.messaging.adapter import KafkaMessageAdapterPreHourly
class KafkaInsertPreHourly(InsertComponent):
"""Insert component that writes instance usage data
to kafka queue
"""
"""Insert component that writes instance usage data to kafka queue"""
@staticmethod
def insert(transform_context, instance_usage_df):

View File

@ -16,9 +16,7 @@ from monasca_transform.component.insert import InsertComponent
class PrepareData(InsertComponent):
"""prepare for insert component validates instance usage
data before calling Insert component
"""
"""prepare for insert component validates instance usage data before calling Insert component"""
@staticmethod
def insert(transform_context, instance_usage_df):
"""write instance usage data to kafka"""

View File

@ -24,6 +24,7 @@ import json
class PreHourlyCalculateRateException(Exception):
"""Exception thrown when doing pre-hourly rate calculations
Attributes:
value: string representing the error
"""

View File

@ -26,6 +26,7 @@ import json
class RollupQuantityException(Exception):
"""Exception thrown when doing quantity rollup
Attributes:
value: string representing the error
"""

View File

@ -23,6 +23,7 @@ import json
class SetAggregatedMetricName(SetterComponent):
"""setter component that sets final aggregated metric name.
aggregated metric name is available as a parameter 'aggregated_metric_name'
in aggregation_params in metric processing driver table.
"""
@ -79,9 +80,7 @@ class SetAggregatedMetricName(SetterComponent):
@staticmethod
def setter(transform_context, instance_usage_df):
"""set the aggregated metric name field for elements in instance usage
rdd
"""
"""set the aggregated metric name field for elements in instance usage rdd"""
transform_spec_df = transform_context.transform_spec_df_info

View File

@ -23,6 +23,7 @@ import json
class SetAggregatedPeriod(SetterComponent):
"""setter component that sets final aggregated metric name.
aggregated metric name is available as a parameter 'aggregated_metric_name'
in aggregation_params in metric processing driver table.
"""
@ -80,9 +81,7 @@ class SetAggregatedPeriod(SetterComponent):
@staticmethod
def setter(transform_context, instance_usage_df):
"""set the aggregated metric name field for elements in instance usage
rdd
"""
"""set the aggregated metric name field for elements in instance usage rdd"""
transform_spec_df = transform_context.transform_spec_df_info

View File

@ -25,6 +25,7 @@ import json
class CalculateRateException(Exception):
"""Exception thrown when calculating rate
Attributes:
value: string representing the error
"""
@ -40,7 +41,9 @@ class CalculateRate(UsageComponent):
@staticmethod
def usage(transform_context, record_store_df):
"""component which groups together record store records by
"""Method to return instance usage dataframe:
It groups together record store records by
provided group by columns list,sorts within the group by event
timestamp field, calculates the rate of change between the
oldest and latest values, and returns the resultant value as an

View File

@ -33,6 +33,7 @@ import json
class FetchQuantityException(Exception):
"""Exception thrown when fetching quantity
Attributes:
value: string representing the error
"""
@ -50,8 +51,7 @@ GroupedDataWithOperation = namedtuple("GroupedDataWithOperation",
class GroupedDataWithOperation(GroupedDataWithOperation):
"""A tuple which is a wrapper containing record store data
and the usage operation
"""A tuple which is a wrapper containing record store data and the usage operation
namdetuple contains:
@ -76,8 +76,10 @@ class FetchQuantity(UsageComponent):
@staticmethod
def _get_latest_oldest_quantity(grouping_results_with_operation):
"""get quantity for each group by performing the requested
usage operation and return a instance usage data.
"""Method to return an instance usage data
Get quantity for each group by performing the requested
usage operation and return an instance usage data.
"""
# row
@ -337,10 +339,12 @@ class FetchQuantity(UsageComponent):
@staticmethod
def usage(transform_context, record_store_df):
"""component which groups together record store records by
"""Method to return the latest quantity as an instance usage dataframe:
It groups together record store records by
provided group by columns list , sorts within the group by event
timestamp field, applies group stats udf and returns the latest
quantity as a instance usage dataframe
quantity as an instance usage dataframe
"""
transform_spec_df = transform_context.transform_spec_df_info
@ -358,10 +362,12 @@ class FetchQuantity(UsageComponent):
@staticmethod
def usage_by_operation(transform_context, record_store_df,
usage_fetch_operation):
"""component which groups together record store records by
"""Returns the latest quantity as a instance usage dataframe
It groups together record store records by
provided group by columns list , sorts within the group by event
timestamp field, applies group stats udf and returns the latest
quantity as a instance usage dataframe
quantity as an instance usage dataframe
"""
transform_spec_df = transform_context.transform_spec_df_info

View File

@ -28,6 +28,7 @@ import json
class FetchQuantityUtilException(Exception):
"""Exception thrown when fetching quantity
Attributes:
value: string representing the error
"""
@ -62,8 +63,9 @@ class FetchQuantityUtil(UsageComponent):
@staticmethod
def _format_quantity_util(row):
"""calculate the utilized quantity based on idle percentage
quantity and convert to instance usage format
"""Converts calculated utilized quantity to an instance usage format
Calculation based on idle percentage
"""
#
tenant_id = getattr(row, "tenant_id", "all")
@ -141,7 +143,9 @@ class FetchQuantityUtil(UsageComponent):
@staticmethod
def usage(transform_context, record_store_df):
"""component which groups together record store records by
"""Method to return instance usage dataframe:
It groups together record store records by
provided group by columns list, sorts within the group by event
timestamp field, applies group stats udf and returns the latest
quantity as a instance usage dataframe

View File

@ -221,6 +221,7 @@ class MonMetricsKafkaProcessor(object):
@staticmethod
def process_metric(transform_context, record_store_df):
"""process (aggregate) metric data from record_store data
All the parameters to drive processing should be available
in transform_spec_df dataframe.
"""
@ -231,8 +232,7 @@ class MonMetricsKafkaProcessor(object):
@staticmethod
def process_metrics(transform_context, record_store_df):
"""start processing (aggregating) metrics
"""
"""start processing (aggregating) metrics"""
#
# look in record_store_df for list of metrics to be processed
#
@ -536,6 +536,7 @@ class MonMetricsKafkaProcessor(object):
@staticmethod
def transform_to_recordstore(kvs):
"""Transform metrics data from kafka to record store format.
extracts, validates, filters, generates data from kakfa to only keep
data that has to be aggregated. Generate data generates multiple
records for for the same incoming metric if the metric has multiple

View File

@ -60,6 +60,7 @@ class OffsetSpec(object):
@six.add_metaclass(abc.ABCMeta)
class OffsetSpecs(object):
"""Class representing offset specs to help recover.
From where processing should pick up in case of failure
"""

View File

@ -19,9 +19,7 @@ class Processor(object):
@abc.abstractmethod
def get_app_name(self):
"""get name of this application. Will be used to
store offsets in database
"""
"""get name of this application. Will be used to store offsets in database"""
raise NotImplementedError(
"Class %s doesn't implement get_app_name()"
% self.__class__.__name__)

View File

@ -59,7 +59,9 @@ class PreHourlyProcessorDataProvider(ProcessUtilDataProvider):
class PreHourlyProcessor(Processor):
"""Processor to process usage data published to metrics_pre_hourly topic a
"""Publish metrics in kafka
Processor to process usage data published to metrics_pre_hourly topic a
and publish final rolled up metrics to metrics topic in kafka.
"""
@ -95,9 +97,7 @@ class PreHourlyProcessor(Processor):
@staticmethod
def get_app_name():
"""get name of this application. Will be used to
store offsets in database
"""
"""get name of this application. Will be used to store offsets in database"""
return "mon_metrics_kafka_pre_hourly"
@staticmethod
@ -113,9 +113,7 @@ class PreHourlyProcessor(Processor):
def _get_offsets_from_kafka(brokers,
topic,
offset_time):
"""get dict representing kafka
offsets.
"""
"""get dict representing kafka offsets."""
# get client
client = KafkaClient(brokers)
@ -144,9 +142,7 @@ class PreHourlyProcessor(Processor):
@staticmethod
def _parse_saved_offsets(app_name, topic, saved_offset_spec):
"""get dict representing saved
offsets.
"""
"""get dict representing saved offsets."""
offset_dict = {}
for key, value in saved_offset_spec.items():
if key.startswith("%s_%s" % (app_name, topic)):
@ -197,8 +193,7 @@ class PreHourlyProcessor(Processor):
topic,
app_name,
saved_offset_spec):
"""get offset range from saved offset to latest.
"""
"""get offset range from saved offset to latest."""
offset_range_list = []
# https://cwiki.apache.org/confluence/display/KAFKA/
@ -243,8 +238,9 @@ class PreHourlyProcessor(Processor):
@staticmethod
def get_processing_offset_range_list(processing_time):
"""get offset range to fetch data from. The
range will last from the last saved offsets to current offsets
"""Get offset range to fetch data from.
The range will last from the last saved offsets to current offsets
available. If there are no last saved offsets available in the
database the starting offsets will be set to the earliest
available in kafka.
@ -284,13 +280,13 @@ class PreHourlyProcessor(Processor):
@staticmethod
def get_offset_specs():
"""get offset specifications.
"""
"""get offset specifications."""
return simport.load(cfg.CONF.repositories.offsets)()
@staticmethod
def get_effective_offset_range_list(offset_range_list):
"""get effective batch offset range.
"""Get effective batch offset range.
Effective batch offset range covers offsets starting
from effective batch revision (defined by effective_batch_revision
config property). By default this method will set the
@ -432,8 +428,9 @@ class PreHourlyProcessor(Processor):
@staticmethod
def filter_out_records_not_in_current_batch(instance_usage_df):
"""Filter out any records which don't pertain to the
current batch (i.e., records before or after the
"""Filter out any records which don't pertain to the current batch
(i.e., records before or after the
batch currently being processed).
"""
# get the most recent batch time from the stored offsets
@ -476,7 +473,9 @@ class PreHourlyProcessor(Processor):
@staticmethod
def process_instance_usage(transform_context, instance_usage_df):
"""second stage aggregation. Aggregate instance usage rdd
"""Second stage aggregation.
Aggregate instance usage rdd
data and write results to metrics topic in kafka.
"""
@ -525,8 +524,7 @@ class PreHourlyProcessor(Processor):
@staticmethod
def do_transform(instance_usage_df):
"""start processing (aggregating) metrics
"""
"""start processing (aggregating) metrics"""
#
# look in instance_usage_df for list of metrics to be processed
#
@ -571,10 +569,11 @@ class PreHourlyProcessor(Processor):
@staticmethod
def run_processor(spark_context, processing_time):
"""process data in metrics_pre_hourly queue, starting
from the last saved offsets, else start from earliest
offsets available
"""
"""Process data in metrics_pre_hourly queue
Starting from the last saved offsets, else start from earliest
offsets available
"""
offset_range_list = (
PreHourlyProcessor.get_processing_offset_range_list(

View File

@ -40,6 +40,7 @@ class PreHourlyProcessorUtil(object):
@staticmethod
def is_time_to_run(check_date_time):
"""return True if its time to run this processor.
It is time to run the processor if:
The processor has no previous recorded run time.
It is more than the configured 'late_metric_slack_time' (to allow

View File

@ -43,6 +43,7 @@ def main():
def shutdown_all_threads_and_die():
"""Shut down all threads and exit process.
Hit it with a hammer to kill all threads and die.
"""
LOG = log.getLogger(__name__)
@ -51,9 +52,7 @@ def shutdown_all_threads_and_die():
def get_process(proc_name):
"""Get process given string in
process cmd line.
"""
"""Get process given string in process cmd line."""
LOG = log.getLogger(__name__)
proc = None
try:
@ -91,8 +90,7 @@ def stop_spark_submit_process():
class Transform(os_service.Service):
"""Class used with Openstack service.
"""
"""Class used with Openstack service."""
LOG = log.getLogger(__name__)
@ -143,9 +141,7 @@ class TransformService(threading.Thread):
CONF.service.election_polling_frequency))
def check_if_still_leader(self):
"""Return true if the this host is the
leader
"""
"""Return true if the this host is the leader"""
leader = None
try:
leader = self.coordinator.get_leader(self.group).get()
@ -289,8 +285,7 @@ class TransformService(threading.Thread):
def main_service():
"""Method to use with Openstack service.
"""
"""Method to use with Openstack service."""
ConfigInitializer.basic_config()
LogUtils.init_logger(__name__)
launcher = os_service.ServiceLauncher(cfg.CONF)

View File

@ -23,9 +23,7 @@ TransformContextBase = namedtuple("TransformContext",
class TransformContext(TransformContextBase):
"""A tuple which contains all the configuration information
to drive processing
"""A tuple which contains all the configuration information to drive processing
namedtuple contains:

View File

@ -16,9 +16,10 @@ from monasca_transform.log_utils import LogUtils
from stevedore import extension
class GenericTransformBuilder (object):
"""Build transformation pipeline based on
aggregation_pipeline spec in metric processing
class GenericTransformBuilder(object):
"""Build transformation pipeline
Based on aggregation_pipeline spec in metric processing
configuration
"""
@ -67,9 +68,8 @@ class GenericTransformBuilder (object):
@staticmethod
def _parse_transform_pipeline(transform_spec_df):
"""parse aggregation pipeline from metric
processing configuration
"""
"""Parse aggregation pipeline from metric processing configuration"""
# get aggregation pipeline df
aggregation_pipeline_df = transform_spec_df\
.select("aggregation_params_map.aggregation_pipeline")
@ -95,8 +95,10 @@ class GenericTransformBuilder (object):
@staticmethod
def do_transform(transform_context,
record_store_df):
"""Build a dynamic aggregation pipeline and call components to
process record store dataframe
"""Method to return instance usage dataframe
Build a dynamic aggregation pipeline
and call components to process record store dataframe
"""
transform_spec_df = transform_context.transform_spec_df_info
(source,

View File

@ -20,8 +20,7 @@ RecordStoreWithGroupByBase = namedtuple("RecordStoreWithGroupBy",
class RecordStoreWithGroupBy(RecordStoreWithGroupByBase):
"""A tuple which is a wrapper containing record store data
and the group by columns
"""A tuple which is a wrapper containing record store data and the group by columns
namdetuple contains:
@ -36,8 +35,7 @@ GroupingResultsBase = namedtuple("GroupingResults",
class GroupingResults(GroupingResultsBase):
"""A tuple which is a wrapper containing grouping key
and grouped result set
"""A tuple which is a wrapper containing grouping key and grouped result set
namdetuple contains:
@ -52,7 +50,9 @@ class Grouping(object):
@staticmethod
def _parse_grouping_key(grouping_str):
"""parse grouping key which in "^key1=value1^key2=value2..." format
"""parse grouping key
which in "^key1=value1^key2=value2..." format
into a dictionary of key value pairs
"""
group_by_dict = {}

View File

@ -42,7 +42,9 @@ class GroupSortbyTimestamp(Grouping):
@staticmethod
def _prepare_for_group_by(record_store_with_group_by_rdd):
"""creates a new rdd where the first element of each row
"""creates a new rdd where:
the first element of each row
contains array of grouping key and event timestamp fields.
Grouping key and event timestamp fields are used by
partitioning and sorting function to partition the data
@ -100,7 +102,9 @@ class GroupSortbyTimestamp(Grouping):
@staticmethod
def _get_group_first_last_quantity_udf(grouplistiter):
"""Return stats that include first row key, first_event_timestamp,
"""Return stats that include:
first row key, first_event_timestamp,
first event quantity, last_event_timestamp and last event quantity
"""
first_row = None
@ -159,7 +163,9 @@ class GroupSortbyTimestamp(Grouping):
def fetch_group_latest_oldest_quantity(record_store_df,
transform_spec_df,
group_by_columns_list):
"""function to group record store data, sort by timestamp within group
"""Function to group record store data
Sort by timestamp within group
and get first and last timestamp along with quantity within each group
This function uses key-value pair rdd's groupBy function to do group_by

View File

@ -26,9 +26,11 @@ class GroupSortbyTimestampPartition(Grouping):
@staticmethod
def _get_group_first_last_quantity_udf(partition_list_iter):
"""user defined function to to through a list of partitions. Each
partition contains elements for a group. All the elements are sorted by
"""User defined function to go through a list of partitions.
Each partition contains elements for a group. All the elements are sorted by
timestamp.
The stats include first row key, first_event_timestamp,
fist event quantity, last_event_timestamp and last event quantity
"""
@ -87,8 +89,11 @@ class GroupSortbyTimestampPartition(Grouping):
@staticmethod
def _prepare_for_group_by(record_store_with_group_by_rdd):
"""creates a new rdd where the first element of each row
contains array of grouping key and event timestamp fields.
"""Creates a new rdd where:
The first element of each row contains array of grouping
key and event timestamp fields.
Grouping key and event timestamp fields are used by
partitioning and sorting function to partition the data
by grouping key and then sort the elements in a group by the
@ -118,7 +123,9 @@ class GroupSortbyTimestampPartition(Grouping):
@staticmethod
def _get_partition_by_group(group_composite):
"""get a hash of the grouping key, which is then used by partitioning
"""Get a hash of the grouping key,
which is then used by partitioning
function to get partition where the groups data should end up in.
It uses hash % num_partitions to get partition
"""
@ -133,8 +140,7 @@ class GroupSortbyTimestampPartition(Grouping):
@staticmethod
def _sort_by_timestamp(group_composite):
"""get timestamp which will be used to sort grouped data
"""
"""get timestamp which will be used to sort grouped data"""
event_timestamp_string = group_composite[1]
return event_timestamp_string
@ -142,9 +148,7 @@ class GroupSortbyTimestampPartition(Grouping):
def _group_sort_by_timestamp_partition(record_store_df,
group_by_columns_list,
num_of_groups):
"""component that does a group by and then sorts all
the items within the group by event timestamp.
"""
"""It does a group by and then sorts all the items within the group by event timestamp."""
# convert the dataframe rdd to normal rdd and add the group by
# column list
record_store_with_group_by_rdd = record_store_df.rdd.\
@ -174,6 +178,7 @@ class GroupSortbyTimestampPartition(Grouping):
@staticmethod
def _remove_none_filter(row):
"""remove any rows which have None as grouping key
[GroupingResults(grouping_key="key1", results={})] rows get created
when partition does not get any grouped data assigned to it
"""
@ -185,22 +190,18 @@ class GroupSortbyTimestampPartition(Grouping):
transform_spec_df,
group_by_columns_list,
num_of_groups):
"""function to group record store data, sort by timestamp within group
"""Function to group record store data
Sort by timestamp within group
and get first and last timestamp along with quantity within each group
To do group by it uses custom partitioning function which creates a new
partition
for each group and uses RDD's repartitionAndSortWithinPartitions
partition for each group and uses RDD's repartitionAndSortWithinPartitions
function to do the grouping and sorting within the group.
This is more scalable than just using RDD's group_by as using this
technique
group is not materialized into a list and stored in memory, but rather
it uses RDD's in built partitioning capability to do the sort
num_of_groups should be more than expected groups, otherwise the same
partition can get used for two groups which will cause incorrect
results.
technique group is not materialized into a list and stored in memory, but rather
it uses RDD's in built partitioning capability to do the sort num_of_groups should
be more than expected groups, otherwise the same
partition can get used for two groups which will cause incorrect results.
"""
# group and order elements in group using repartition

View File

@ -42,7 +42,9 @@ class GroupSortbyTimestamp(Grouping):
@staticmethod
def _prepare_for_groupby(record_store_with_groupby_rdd):
"""creates a new rdd where the first element of each row
"""creates a new rdd where:
the first element of each row
contains array of grouping key and event timestamp fields.
Grouping key and event timestamp fields are used by
partitioning and sorting function to partition the data
@ -99,7 +101,9 @@ class GroupSortbyTimestamp(Grouping):
@staticmethod
def _get_group_first_last_quantity_udf(grouplistiter):
"""Return stats that include first row key, first_event_timestamp,
"""Return stats that include:
first row key, first_event_timestamp,
first event quantity, last_event_timestamp and last event quantity
"""
first_row = None
@ -158,7 +162,9 @@ class GroupSortbyTimestamp(Grouping):
def fetch_group_latest_oldest_quantity(record_store_df,
transform_spec_df,
groupby_columns_list):
"""function to group record store data, sort by timestamp within group
"""To group record store data
sort by timestamp within group
and get first and last timestamp along with quantity within each group
This function uses key-value pair rdd's groupBy function to do groupby

View File

@ -26,8 +26,9 @@ class GroupSortbyTimestampPartition(Grouping):
@staticmethod
def _get_group_first_last_quantity_udf(partitionlistiter):
"""user defined function to to through a list of partitions. Each
partition contains elements for a group. All the elements are sorted by
"""user defined function to to through a list of partitions.
Each partition contains elements for a group. All the elements are sorted by
timestamp.
The stats include first row key, first_event_timestamp,
fist event quantity, last_event_timestamp and last event quantity
@ -87,7 +88,9 @@ class GroupSortbyTimestampPartition(Grouping):
@staticmethod
def _prepare_for_groupby(record_store_with_groupby_rdd):
"""creates a new rdd where the first element of each row
"""creates a new rdd where:
the first element of each row
contains array of grouping key and event timestamp fields.
Grouping key and event timestamp fields are used by
partitioning and sorting function to partition the data
@ -118,7 +121,9 @@ class GroupSortbyTimestampPartition(Grouping):
@staticmethod
def _get_partition_by_group(group_composite):
"""get a hash of the grouping key, which is then used by partitioning
"""get a hash of the grouping key,
which is then used by partitioning
function to get partition where the groups data should end up in.
It uses hash % num_partitions to get partition
"""
@ -133,8 +138,7 @@ class GroupSortbyTimestampPartition(Grouping):
@staticmethod
def _sortby_timestamp(group_composite):
"""get timestamp which will be used to sort grouped data
"""
"""get timestamp which will be used to sort grouped data"""
event_timestamp_string = group_composite[1]
return event_timestamp_string
@ -142,9 +146,7 @@ class GroupSortbyTimestampPartition(Grouping):
def _group_sortby_timestamp_partition(record_store_df,
groupby_columns_list,
num_of_groups):
"""component that does a group by and then sorts all
the items within the group by event timestamp.
"""
"""It does a group by and then sorts all the items within the group by event timestamp."""
# convert the dataframe rdd to normal rdd and add the group by
# column list
record_store_with_groupby_rdd = record_store_df.rdd.\
@ -174,6 +176,7 @@ class GroupSortbyTimestampPartition(Grouping):
@staticmethod
def _remove_none_filter(row):
"""remove any rows which have None as grouping key
[GroupingResults(grouping_key="key1", results={})] rows get created
when partition does not get any grouped data assigned to it
"""
@ -185,7 +188,9 @@ class GroupSortbyTimestampPartition(Grouping):
transform_spec_df,
groupby_columns_list,
num_of_groups):
"""function to group record store data, sort by timestamp within group
"""Function to group record store data
Sort by timestamp within group
and get first and last timestamp along with quantity within each group
To do group by it uses custom partitioning function which creates a new

View File

@ -17,6 +17,7 @@ from pyspark import StorageLevel
class InvalidCacheStorageLevelException(Exception):
"""Exception thrown when an invalid cache storage level is encountered
Attributes:
value: string representing the error
"""
@ -33,9 +34,7 @@ class StorageUtils(object):
@staticmethod
def get_storage_level(storage_level_str):
"""get pyspark storage level from storage level
string
"""
"""get pyspark storage level from storage level string"""
if (storage_level_str == "DISK_ONLY"):
return StorageLevel.DISK_ONLY
elif (storage_level_str == "DISK_ONLY_2"):

View File

@ -19,9 +19,7 @@ from tests.functional.messaging.adapter import DummyAdapter
class DummyInsert(InsertComponent):
"""Insert component that writes metric data to
to kafka queue
"""
"""Insert component that writes metric data to kafka queue"""
@staticmethod
def insert(transform_context, instance_usage_df):

View File

@ -19,9 +19,7 @@ from tests.functional.messaging.adapter import DummyAdapter
class DummyInsertPreHourly(InsertComponent):
"""Insert component that writes metric data to
to kafka queue
"""
"""Insert component that writes metric data to kafka queue"""
@staticmethod
def insert(transform_context, instance_usage_df):

View File

@ -542,6 +542,7 @@ class TestDataDrivenSpecsRepo(SparkContextTest):
event_type=None,
pre_transform_specs_data_frame=None):
"""get row for event type
:rtype: Row
"""
rows = pre_transform_specs_data_frame.filter(

View File

@ -53,8 +53,7 @@ class JSONOffsetSpecs(OffsetSpecs):
log.info('No kafka offsets found at startup')
def _save(self):
"""get the specs of last run time of offset
"""
"""get the specs of last run time of offset"""
log.info("Saving json offsets: %s", self._kafka_offsets)
with open(self.kafka_offset_spec_file, 'w') as offset_file:

View File

@ -83,11 +83,7 @@ commands =
[flake8]
max-complexity = 30
# TODO: ignored checks should be enabled in the future
# H904 Wrap long lines in parentheses instead of a backslash (DEPRECATED)
# H405 Multiline docstring separated by empty line
# E402 module level import not at top of file FIXME remove this
ignore = H904,H405,E402
max-line-length = 100
# H106 Dont put vim configuration in source files
# H203 Use assertIs(Not)None to check for None
enable-extensions=H106,H203