Update pep8 checks
* set the maximum line length to 100 * cleaned up the codes for pep8 Change-Id: Iab260a4e77584aae31c0596f39146dd5092b807a Signed-off-by: Amir Mofakhar <amofakhar@op5.com>
This commit is contained in:
parent
c8aa020432
commit
37d4f09057
|
@ -31,8 +31,7 @@ InstanceUsageDataAggParamsBase = namedtuple('InstanceUsageDataAggParams',
|
|||
|
||||
|
||||
class InstanceUsageDataAggParams(InstanceUsageDataAggParamsBase):
|
||||
"""A tuple which is a wrapper containing the instance usage data
|
||||
and aggregation params
|
||||
"""A tuple which is a wrapper containing the instance usage data and aggregation params
|
||||
|
||||
namdetuple contains:
|
||||
|
||||
|
|
|
@ -41,8 +41,7 @@ class InsertComponent(Component):
|
|||
|
||||
@staticmethod
|
||||
def _validate_metric(metric):
|
||||
"""validate monasca metric.
|
||||
"""
|
||||
"""validate monasca metric."""
|
||||
try:
|
||||
# validate metric part, without the wrapper
|
||||
metric_validator.validate(metric["metric"])
|
||||
|
@ -124,9 +123,7 @@ class InsertComponent(Component):
|
|||
|
||||
@staticmethod
|
||||
def _get_metric(row, agg_params):
|
||||
"""write data to kafka. extracts and formats
|
||||
metric data and write s the data to kafka
|
||||
"""
|
||||
"""write data to kafka. extracts and formats metric data and write s the data to kafka"""
|
||||
instance_usage_dict = {"tenant_id": row.tenant_id,
|
||||
"user_id": row.user_id,
|
||||
"resource_uuid": row.resource_uuid,
|
||||
|
@ -171,9 +168,7 @@ class InsertComponent(Component):
|
|||
@staticmethod
|
||||
def _get_instance_usage_pre_hourly(row,
|
||||
metric_id):
|
||||
"""write data to kafka. extracts and formats
|
||||
metric data and writes the data to kafka
|
||||
"""
|
||||
"""write data to kafka. extracts and formats metric data and writes the data to kafka"""
|
||||
# retrieve the processing meta from the row
|
||||
processing_meta = row.processing_meta
|
||||
# add transform spec metric id to the processing meta
|
||||
|
@ -228,9 +223,7 @@ class InsertComponent(Component):
|
|||
|
||||
@staticmethod
|
||||
def _write_metrics_from_partition(partlistiter):
|
||||
"""iterate through all rdd elements in partition
|
||||
and write metrics to kafka
|
||||
"""
|
||||
"""iterate through all rdd elements in partition and write metrics to kafka"""
|
||||
for part in partlistiter:
|
||||
agg_params = part.agg_params
|
||||
row = part.instance_usage_data
|
||||
|
|
|
@ -18,9 +18,7 @@ from monasca_transform.messaging.adapter import KafkaMessageAdapter
|
|||
|
||||
|
||||
class KafkaInsert(InsertComponent):
|
||||
"""Insert component that writes instance usage data
|
||||
to kafka queue
|
||||
"""
|
||||
"""Insert component that writes instance usage data to kafka queue"""
|
||||
|
||||
@staticmethod
|
||||
def insert(transform_context, instance_usage_df):
|
||||
|
|
|
@ -18,9 +18,7 @@ from monasca_transform.messaging.adapter import KafkaMessageAdapterPreHourly
|
|||
|
||||
|
||||
class KafkaInsertPreHourly(InsertComponent):
|
||||
"""Insert component that writes instance usage data
|
||||
to kafka queue
|
||||
"""
|
||||
"""Insert component that writes instance usage data to kafka queue"""
|
||||
|
||||
@staticmethod
|
||||
def insert(transform_context, instance_usage_df):
|
||||
|
|
|
@ -16,9 +16,7 @@ from monasca_transform.component.insert import InsertComponent
|
|||
|
||||
|
||||
class PrepareData(InsertComponent):
|
||||
"""prepare for insert component validates instance usage
|
||||
data before calling Insert component
|
||||
"""
|
||||
"""prepare for insert component validates instance usage data before calling Insert component"""
|
||||
@staticmethod
|
||||
def insert(transform_context, instance_usage_df):
|
||||
"""write instance usage data to kafka"""
|
||||
|
|
|
@ -24,6 +24,7 @@ import json
|
|||
|
||||
class PreHourlyCalculateRateException(Exception):
|
||||
"""Exception thrown when doing pre-hourly rate calculations
|
||||
|
||||
Attributes:
|
||||
value: string representing the error
|
||||
"""
|
||||
|
|
|
@ -26,6 +26,7 @@ import json
|
|||
|
||||
class RollupQuantityException(Exception):
|
||||
"""Exception thrown when doing quantity rollup
|
||||
|
||||
Attributes:
|
||||
value: string representing the error
|
||||
"""
|
||||
|
|
|
@ -23,6 +23,7 @@ import json
|
|||
|
||||
class SetAggregatedMetricName(SetterComponent):
|
||||
"""setter component that sets final aggregated metric name.
|
||||
|
||||
aggregated metric name is available as a parameter 'aggregated_metric_name'
|
||||
in aggregation_params in metric processing driver table.
|
||||
"""
|
||||
|
@ -79,9 +80,7 @@ class SetAggregatedMetricName(SetterComponent):
|
|||
|
||||
@staticmethod
|
||||
def setter(transform_context, instance_usage_df):
|
||||
"""set the aggregated metric name field for elements in instance usage
|
||||
rdd
|
||||
"""
|
||||
"""set the aggregated metric name field for elements in instance usage rdd"""
|
||||
|
||||
transform_spec_df = transform_context.transform_spec_df_info
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import json
|
|||
|
||||
class SetAggregatedPeriod(SetterComponent):
|
||||
"""setter component that sets final aggregated metric name.
|
||||
|
||||
aggregated metric name is available as a parameter 'aggregated_metric_name'
|
||||
in aggregation_params in metric processing driver table.
|
||||
"""
|
||||
|
@ -80,9 +81,7 @@ class SetAggregatedPeriod(SetterComponent):
|
|||
|
||||
@staticmethod
|
||||
def setter(transform_context, instance_usage_df):
|
||||
"""set the aggregated metric name field for elements in instance usage
|
||||
rdd
|
||||
"""
|
||||
"""set the aggregated metric name field for elements in instance usage rdd"""
|
||||
|
||||
transform_spec_df = transform_context.transform_spec_df_info
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import json
|
|||
|
||||
class CalculateRateException(Exception):
|
||||
"""Exception thrown when calculating rate
|
||||
|
||||
Attributes:
|
||||
value: string representing the error
|
||||
"""
|
||||
|
@ -40,7 +41,9 @@ class CalculateRate(UsageComponent):
|
|||
|
||||
@staticmethod
|
||||
def usage(transform_context, record_store_df):
|
||||
"""component which groups together record store records by
|
||||
"""Method to return instance usage dataframe:
|
||||
|
||||
It groups together record store records by
|
||||
provided group by columns list,sorts within the group by event
|
||||
timestamp field, calculates the rate of change between the
|
||||
oldest and latest values, and returns the resultant value as an
|
||||
|
|
|
@ -33,6 +33,7 @@ import json
|
|||
|
||||
class FetchQuantityException(Exception):
|
||||
"""Exception thrown when fetching quantity
|
||||
|
||||
Attributes:
|
||||
value: string representing the error
|
||||
"""
|
||||
|
@ -50,8 +51,7 @@ GroupedDataWithOperation = namedtuple("GroupedDataWithOperation",
|
|||
|
||||
|
||||
class GroupedDataWithOperation(GroupedDataWithOperation):
|
||||
"""A tuple which is a wrapper containing record store data
|
||||
and the usage operation
|
||||
"""A tuple which is a wrapper containing record store data and the usage operation
|
||||
|
||||
namdetuple contains:
|
||||
|
||||
|
@ -76,8 +76,10 @@ class FetchQuantity(UsageComponent):
|
|||
|
||||
@staticmethod
|
||||
def _get_latest_oldest_quantity(grouping_results_with_operation):
|
||||
"""get quantity for each group by performing the requested
|
||||
usage operation and return a instance usage data.
|
||||
"""Method to return an instance usage data
|
||||
|
||||
Get quantity for each group by performing the requested
|
||||
usage operation and return an instance usage data.
|
||||
"""
|
||||
|
||||
# row
|
||||
|
@ -337,10 +339,12 @@ class FetchQuantity(UsageComponent):
|
|||
|
||||
@staticmethod
|
||||
def usage(transform_context, record_store_df):
|
||||
"""component which groups together record store records by
|
||||
"""Method to return the latest quantity as an instance usage dataframe:
|
||||
|
||||
It groups together record store records by
|
||||
provided group by columns list , sorts within the group by event
|
||||
timestamp field, applies group stats udf and returns the latest
|
||||
quantity as a instance usage dataframe
|
||||
quantity as an instance usage dataframe
|
||||
"""
|
||||
transform_spec_df = transform_context.transform_spec_df_info
|
||||
|
||||
|
@ -358,10 +362,12 @@ class FetchQuantity(UsageComponent):
|
|||
@staticmethod
|
||||
def usage_by_operation(transform_context, record_store_df,
|
||||
usage_fetch_operation):
|
||||
"""component which groups together record store records by
|
||||
"""Returns the latest quantity as a instance usage dataframe
|
||||
|
||||
It groups together record store records by
|
||||
provided group by columns list , sorts within the group by event
|
||||
timestamp field, applies group stats udf and returns the latest
|
||||
quantity as a instance usage dataframe
|
||||
quantity as an instance usage dataframe
|
||||
"""
|
||||
transform_spec_df = transform_context.transform_spec_df_info
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import json
|
|||
|
||||
class FetchQuantityUtilException(Exception):
|
||||
"""Exception thrown when fetching quantity
|
||||
|
||||
Attributes:
|
||||
value: string representing the error
|
||||
"""
|
||||
|
@ -62,8 +63,9 @@ class FetchQuantityUtil(UsageComponent):
|
|||
|
||||
@staticmethod
|
||||
def _format_quantity_util(row):
|
||||
"""calculate the utilized quantity based on idle percentage
|
||||
quantity and convert to instance usage format
|
||||
"""Converts calculated utilized quantity to an instance usage format
|
||||
|
||||
Calculation based on idle percentage
|
||||
"""
|
||||
#
|
||||
tenant_id = getattr(row, "tenant_id", "all")
|
||||
|
@ -141,7 +143,9 @@ class FetchQuantityUtil(UsageComponent):
|
|||
|
||||
@staticmethod
|
||||
def usage(transform_context, record_store_df):
|
||||
"""component which groups together record store records by
|
||||
"""Method to return instance usage dataframe:
|
||||
|
||||
It groups together record store records by
|
||||
provided group by columns list, sorts within the group by event
|
||||
timestamp field, applies group stats udf and returns the latest
|
||||
quantity as a instance usage dataframe
|
||||
|
|
|
@ -221,6 +221,7 @@ class MonMetricsKafkaProcessor(object):
|
|||
@staticmethod
|
||||
def process_metric(transform_context, record_store_df):
|
||||
"""process (aggregate) metric data from record_store data
|
||||
|
||||
All the parameters to drive processing should be available
|
||||
in transform_spec_df dataframe.
|
||||
"""
|
||||
|
@ -231,8 +232,7 @@ class MonMetricsKafkaProcessor(object):
|
|||
|
||||
@staticmethod
|
||||
def process_metrics(transform_context, record_store_df):
|
||||
"""start processing (aggregating) metrics
|
||||
"""
|
||||
"""start processing (aggregating) metrics"""
|
||||
#
|
||||
# look in record_store_df for list of metrics to be processed
|
||||
#
|
||||
|
@ -536,6 +536,7 @@ class MonMetricsKafkaProcessor(object):
|
|||
@staticmethod
|
||||
def transform_to_recordstore(kvs):
|
||||
"""Transform metrics data from kafka to record store format.
|
||||
|
||||
extracts, validates, filters, generates data from kakfa to only keep
|
||||
data that has to be aggregated. Generate data generates multiple
|
||||
records for for the same incoming metric if the metric has multiple
|
||||
|
|
|
@ -60,6 +60,7 @@ class OffsetSpec(object):
|
|||
@six.add_metaclass(abc.ABCMeta)
|
||||
class OffsetSpecs(object):
|
||||
"""Class representing offset specs to help recover.
|
||||
|
||||
From where processing should pick up in case of failure
|
||||
"""
|
||||
|
||||
|
|
|
@ -19,9 +19,7 @@ class Processor(object):
|
|||
|
||||
@abc.abstractmethod
|
||||
def get_app_name(self):
|
||||
"""get name of this application. Will be used to
|
||||
store offsets in database
|
||||
"""
|
||||
"""get name of this application. Will be used to store offsets in database"""
|
||||
raise NotImplementedError(
|
||||
"Class %s doesn't implement get_app_name()"
|
||||
% self.__class__.__name__)
|
||||
|
|
|
@ -59,7 +59,9 @@ class PreHourlyProcessorDataProvider(ProcessUtilDataProvider):
|
|||
|
||||
|
||||
class PreHourlyProcessor(Processor):
|
||||
"""Processor to process usage data published to metrics_pre_hourly topic a
|
||||
"""Publish metrics in kafka
|
||||
|
||||
Processor to process usage data published to metrics_pre_hourly topic a
|
||||
and publish final rolled up metrics to metrics topic in kafka.
|
||||
"""
|
||||
|
||||
|
@ -95,9 +97,7 @@ class PreHourlyProcessor(Processor):
|
|||
|
||||
@staticmethod
|
||||
def get_app_name():
|
||||
"""get name of this application. Will be used to
|
||||
store offsets in database
|
||||
"""
|
||||
"""get name of this application. Will be used to store offsets in database"""
|
||||
return "mon_metrics_kafka_pre_hourly"
|
||||
|
||||
@staticmethod
|
||||
|
@ -113,9 +113,7 @@ class PreHourlyProcessor(Processor):
|
|||
def _get_offsets_from_kafka(brokers,
|
||||
topic,
|
||||
offset_time):
|
||||
"""get dict representing kafka
|
||||
offsets.
|
||||
"""
|
||||
"""get dict representing kafka offsets."""
|
||||
# get client
|
||||
client = KafkaClient(brokers)
|
||||
|
||||
|
@ -144,9 +142,7 @@ class PreHourlyProcessor(Processor):
|
|||
|
||||
@staticmethod
|
||||
def _parse_saved_offsets(app_name, topic, saved_offset_spec):
|
||||
"""get dict representing saved
|
||||
offsets.
|
||||
"""
|
||||
"""get dict representing saved offsets."""
|
||||
offset_dict = {}
|
||||
for key, value in saved_offset_spec.items():
|
||||
if key.startswith("%s_%s" % (app_name, topic)):
|
||||
|
@ -197,8 +193,7 @@ class PreHourlyProcessor(Processor):
|
|||
topic,
|
||||
app_name,
|
||||
saved_offset_spec):
|
||||
"""get offset range from saved offset to latest.
|
||||
"""
|
||||
"""get offset range from saved offset to latest."""
|
||||
offset_range_list = []
|
||||
|
||||
# https://cwiki.apache.org/confluence/display/KAFKA/
|
||||
|
@ -243,8 +238,9 @@ class PreHourlyProcessor(Processor):
|
|||
|
||||
@staticmethod
|
||||
def get_processing_offset_range_list(processing_time):
|
||||
"""get offset range to fetch data from. The
|
||||
range will last from the last saved offsets to current offsets
|
||||
"""Get offset range to fetch data from.
|
||||
|
||||
The range will last from the last saved offsets to current offsets
|
||||
available. If there are no last saved offsets available in the
|
||||
database the starting offsets will be set to the earliest
|
||||
available in kafka.
|
||||
|
@ -284,13 +280,13 @@ class PreHourlyProcessor(Processor):
|
|||
|
||||
@staticmethod
|
||||
def get_offset_specs():
|
||||
"""get offset specifications.
|
||||
"""
|
||||
"""get offset specifications."""
|
||||
return simport.load(cfg.CONF.repositories.offsets)()
|
||||
|
||||
@staticmethod
|
||||
def get_effective_offset_range_list(offset_range_list):
|
||||
"""get effective batch offset range.
|
||||
"""Get effective batch offset range.
|
||||
|
||||
Effective batch offset range covers offsets starting
|
||||
from effective batch revision (defined by effective_batch_revision
|
||||
config property). By default this method will set the
|
||||
|
@ -432,8 +428,9 @@ class PreHourlyProcessor(Processor):
|
|||
|
||||
@staticmethod
|
||||
def filter_out_records_not_in_current_batch(instance_usage_df):
|
||||
"""Filter out any records which don't pertain to the
|
||||
current batch (i.e., records before or after the
|
||||
"""Filter out any records which don't pertain to the current batch
|
||||
|
||||
(i.e., records before or after the
|
||||
batch currently being processed).
|
||||
"""
|
||||
# get the most recent batch time from the stored offsets
|
||||
|
@ -476,7 +473,9 @@ class PreHourlyProcessor(Processor):
|
|||
|
||||
@staticmethod
|
||||
def process_instance_usage(transform_context, instance_usage_df):
|
||||
"""second stage aggregation. Aggregate instance usage rdd
|
||||
"""Second stage aggregation.
|
||||
|
||||
Aggregate instance usage rdd
|
||||
data and write results to metrics topic in kafka.
|
||||
"""
|
||||
|
||||
|
@ -525,8 +524,7 @@ class PreHourlyProcessor(Processor):
|
|||
|
||||
@staticmethod
|
||||
def do_transform(instance_usage_df):
|
||||
"""start processing (aggregating) metrics
|
||||
"""
|
||||
"""start processing (aggregating) metrics"""
|
||||
#
|
||||
# look in instance_usage_df for list of metrics to be processed
|
||||
#
|
||||
|
@ -571,10 +569,11 @@ class PreHourlyProcessor(Processor):
|
|||
|
||||
@staticmethod
|
||||
def run_processor(spark_context, processing_time):
|
||||
"""process data in metrics_pre_hourly queue, starting
|
||||
from the last saved offsets, else start from earliest
|
||||
offsets available
|
||||
"""
|
||||
"""Process data in metrics_pre_hourly queue
|
||||
|
||||
Starting from the last saved offsets, else start from earliest
|
||||
offsets available
|
||||
"""
|
||||
|
||||
offset_range_list = (
|
||||
PreHourlyProcessor.get_processing_offset_range_list(
|
||||
|
|
|
@ -40,6 +40,7 @@ class PreHourlyProcessorUtil(object):
|
|||
@staticmethod
|
||||
def is_time_to_run(check_date_time):
|
||||
"""return True if its time to run this processor.
|
||||
|
||||
It is time to run the processor if:
|
||||
The processor has no previous recorded run time.
|
||||
It is more than the configured 'late_metric_slack_time' (to allow
|
||||
|
|
|
@ -43,6 +43,7 @@ def main():
|
|||
|
||||
def shutdown_all_threads_and_die():
|
||||
"""Shut down all threads and exit process.
|
||||
|
||||
Hit it with a hammer to kill all threads and die.
|
||||
"""
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -51,9 +52,7 @@ def shutdown_all_threads_and_die():
|
|||
|
||||
|
||||
def get_process(proc_name):
|
||||
"""Get process given string in
|
||||
process cmd line.
|
||||
"""
|
||||
"""Get process given string in process cmd line."""
|
||||
LOG = log.getLogger(__name__)
|
||||
proc = None
|
||||
try:
|
||||
|
@ -91,8 +90,7 @@ def stop_spark_submit_process():
|
|||
|
||||
|
||||
class Transform(os_service.Service):
|
||||
"""Class used with Openstack service.
|
||||
"""
|
||||
"""Class used with Openstack service."""
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
@ -143,9 +141,7 @@ class TransformService(threading.Thread):
|
|||
CONF.service.election_polling_frequency))
|
||||
|
||||
def check_if_still_leader(self):
|
||||
"""Return true if the this host is the
|
||||
leader
|
||||
"""
|
||||
"""Return true if the this host is the leader"""
|
||||
leader = None
|
||||
try:
|
||||
leader = self.coordinator.get_leader(self.group).get()
|
||||
|
@ -289,8 +285,7 @@ class TransformService(threading.Thread):
|
|||
|
||||
|
||||
def main_service():
|
||||
"""Method to use with Openstack service.
|
||||
"""
|
||||
"""Method to use with Openstack service."""
|
||||
ConfigInitializer.basic_config()
|
||||
LogUtils.init_logger(__name__)
|
||||
launcher = os_service.ServiceLauncher(cfg.CONF)
|
||||
|
|
|
@ -23,9 +23,7 @@ TransformContextBase = namedtuple("TransformContext",
|
|||
|
||||
|
||||
class TransformContext(TransformContextBase):
|
||||
"""A tuple which contains all the configuration information
|
||||
to drive processing
|
||||
|
||||
"""A tuple which contains all the configuration information to drive processing
|
||||
|
||||
namedtuple contains:
|
||||
|
||||
|
|
|
@ -16,9 +16,10 @@ from monasca_transform.log_utils import LogUtils
|
|||
from stevedore import extension
|
||||
|
||||
|
||||
class GenericTransformBuilder (object):
|
||||
"""Build transformation pipeline based on
|
||||
aggregation_pipeline spec in metric processing
|
||||
class GenericTransformBuilder(object):
|
||||
"""Build transformation pipeline
|
||||
|
||||
Based on aggregation_pipeline spec in metric processing
|
||||
configuration
|
||||
"""
|
||||
|
||||
|
@ -67,9 +68,8 @@ class GenericTransformBuilder (object):
|
|||
|
||||
@staticmethod
|
||||
def _parse_transform_pipeline(transform_spec_df):
|
||||
"""parse aggregation pipeline from metric
|
||||
processing configuration
|
||||
"""
|
||||
"""Parse aggregation pipeline from metric processing configuration"""
|
||||
|
||||
# get aggregation pipeline df
|
||||
aggregation_pipeline_df = transform_spec_df\
|
||||
.select("aggregation_params_map.aggregation_pipeline")
|
||||
|
@ -95,8 +95,10 @@ class GenericTransformBuilder (object):
|
|||
@staticmethod
|
||||
def do_transform(transform_context,
|
||||
record_store_df):
|
||||
"""Build a dynamic aggregation pipeline and call components to
|
||||
process record store dataframe
|
||||
"""Method to return instance usage dataframe
|
||||
|
||||
Build a dynamic aggregation pipeline
|
||||
and call components to process record store dataframe
|
||||
"""
|
||||
transform_spec_df = transform_context.transform_spec_df_info
|
||||
(source,
|
||||
|
|
|
@ -20,8 +20,7 @@ RecordStoreWithGroupByBase = namedtuple("RecordStoreWithGroupBy",
|
|||
|
||||
|
||||
class RecordStoreWithGroupBy(RecordStoreWithGroupByBase):
|
||||
"""A tuple which is a wrapper containing record store data
|
||||
and the group by columns
|
||||
"""A tuple which is a wrapper containing record store data and the group by columns
|
||||
|
||||
namdetuple contains:
|
||||
|
||||
|
@ -36,8 +35,7 @@ GroupingResultsBase = namedtuple("GroupingResults",
|
|||
|
||||
|
||||
class GroupingResults(GroupingResultsBase):
|
||||
"""A tuple which is a wrapper containing grouping key
|
||||
and grouped result set
|
||||
"""A tuple which is a wrapper containing grouping key and grouped result set
|
||||
|
||||
namdetuple contains:
|
||||
|
||||
|
@ -52,7 +50,9 @@ class Grouping(object):
|
|||
|
||||
@staticmethod
|
||||
def _parse_grouping_key(grouping_str):
|
||||
"""parse grouping key which in "^key1=value1^key2=value2..." format
|
||||
"""parse grouping key
|
||||
|
||||
which in "^key1=value1^key2=value2..." format
|
||||
into a dictionary of key value pairs
|
||||
"""
|
||||
group_by_dict = {}
|
||||
|
|
|
@ -42,7 +42,9 @@ class GroupSortbyTimestamp(Grouping):
|
|||
|
||||
@staticmethod
|
||||
def _prepare_for_group_by(record_store_with_group_by_rdd):
|
||||
"""creates a new rdd where the first element of each row
|
||||
"""creates a new rdd where:
|
||||
|
||||
the first element of each row
|
||||
contains array of grouping key and event timestamp fields.
|
||||
Grouping key and event timestamp fields are used by
|
||||
partitioning and sorting function to partition the data
|
||||
|
@ -100,7 +102,9 @@ class GroupSortbyTimestamp(Grouping):
|
|||
|
||||
@staticmethod
|
||||
def _get_group_first_last_quantity_udf(grouplistiter):
|
||||
"""Return stats that include first row key, first_event_timestamp,
|
||||
"""Return stats that include:
|
||||
|
||||
first row key, first_event_timestamp,
|
||||
first event quantity, last_event_timestamp and last event quantity
|
||||
"""
|
||||
first_row = None
|
||||
|
@ -159,7 +163,9 @@ class GroupSortbyTimestamp(Grouping):
|
|||
def fetch_group_latest_oldest_quantity(record_store_df,
|
||||
transform_spec_df,
|
||||
group_by_columns_list):
|
||||
"""function to group record store data, sort by timestamp within group
|
||||
"""Function to group record store data
|
||||
|
||||
Sort by timestamp within group
|
||||
and get first and last timestamp along with quantity within each group
|
||||
|
||||
This function uses key-value pair rdd's groupBy function to do group_by
|
||||
|
|
|
@ -26,9 +26,11 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
|
||||
@staticmethod
|
||||
def _get_group_first_last_quantity_udf(partition_list_iter):
|
||||
"""user defined function to to through a list of partitions. Each
|
||||
partition contains elements for a group. All the elements are sorted by
|
||||
"""User defined function to go through a list of partitions.
|
||||
|
||||
Each partition contains elements for a group. All the elements are sorted by
|
||||
timestamp.
|
||||
|
||||
The stats include first row key, first_event_timestamp,
|
||||
fist event quantity, last_event_timestamp and last event quantity
|
||||
"""
|
||||
|
@ -87,8 +89,11 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
|
||||
@staticmethod
|
||||
def _prepare_for_group_by(record_store_with_group_by_rdd):
|
||||
"""creates a new rdd where the first element of each row
|
||||
contains array of grouping key and event timestamp fields.
|
||||
"""Creates a new rdd where:
|
||||
|
||||
The first element of each row contains array of grouping
|
||||
key and event timestamp fields.
|
||||
|
||||
Grouping key and event timestamp fields are used by
|
||||
partitioning and sorting function to partition the data
|
||||
by grouping key and then sort the elements in a group by the
|
||||
|
@ -118,7 +123,9 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
|
||||
@staticmethod
|
||||
def _get_partition_by_group(group_composite):
|
||||
"""get a hash of the grouping key, which is then used by partitioning
|
||||
"""Get a hash of the grouping key,
|
||||
|
||||
which is then used by partitioning
|
||||
function to get partition where the groups data should end up in.
|
||||
It uses hash % num_partitions to get partition
|
||||
"""
|
||||
|
@ -133,8 +140,7 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
|
||||
@staticmethod
|
||||
def _sort_by_timestamp(group_composite):
|
||||
"""get timestamp which will be used to sort grouped data
|
||||
"""
|
||||
"""get timestamp which will be used to sort grouped data"""
|
||||
event_timestamp_string = group_composite[1]
|
||||
return event_timestamp_string
|
||||
|
||||
|
@ -142,9 +148,7 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
def _group_sort_by_timestamp_partition(record_store_df,
|
||||
group_by_columns_list,
|
||||
num_of_groups):
|
||||
"""component that does a group by and then sorts all
|
||||
the items within the group by event timestamp.
|
||||
"""
|
||||
"""It does a group by and then sorts all the items within the group by event timestamp."""
|
||||
# convert the dataframe rdd to normal rdd and add the group by
|
||||
# column list
|
||||
record_store_with_group_by_rdd = record_store_df.rdd.\
|
||||
|
@ -174,6 +178,7 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
@staticmethod
|
||||
def _remove_none_filter(row):
|
||||
"""remove any rows which have None as grouping key
|
||||
|
||||
[GroupingResults(grouping_key="key1", results={})] rows get created
|
||||
when partition does not get any grouped data assigned to it
|
||||
"""
|
||||
|
@ -185,22 +190,18 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
transform_spec_df,
|
||||
group_by_columns_list,
|
||||
num_of_groups):
|
||||
"""function to group record store data, sort by timestamp within group
|
||||
"""Function to group record store data
|
||||
|
||||
Sort by timestamp within group
|
||||
and get first and last timestamp along with quantity within each group
|
||||
|
||||
To do group by it uses custom partitioning function which creates a new
|
||||
partition
|
||||
for each group and uses RDD's repartitionAndSortWithinPartitions
|
||||
partition for each group and uses RDD's repartitionAndSortWithinPartitions
|
||||
function to do the grouping and sorting within the group.
|
||||
|
||||
This is more scalable than just using RDD's group_by as using this
|
||||
technique
|
||||
group is not materialized into a list and stored in memory, but rather
|
||||
it uses RDD's in built partitioning capability to do the sort
|
||||
|
||||
num_of_groups should be more than expected groups, otherwise the same
|
||||
partition can get used for two groups which will cause incorrect
|
||||
results.
|
||||
technique group is not materialized into a list and stored in memory, but rather
|
||||
it uses RDD's in built partitioning capability to do the sort num_of_groups should
|
||||
be more than expected groups, otherwise the same
|
||||
partition can get used for two groups which will cause incorrect results.
|
||||
"""
|
||||
|
||||
# group and order elements in group using repartition
|
||||
|
|
|
@ -42,7 +42,9 @@ class GroupSortbyTimestamp(Grouping):
|
|||
|
||||
@staticmethod
|
||||
def _prepare_for_groupby(record_store_with_groupby_rdd):
|
||||
"""creates a new rdd where the first element of each row
|
||||
"""creates a new rdd where:
|
||||
|
||||
the first element of each row
|
||||
contains array of grouping key and event timestamp fields.
|
||||
Grouping key and event timestamp fields are used by
|
||||
partitioning and sorting function to partition the data
|
||||
|
@ -99,7 +101,9 @@ class GroupSortbyTimestamp(Grouping):
|
|||
|
||||
@staticmethod
|
||||
def _get_group_first_last_quantity_udf(grouplistiter):
|
||||
"""Return stats that include first row key, first_event_timestamp,
|
||||
"""Return stats that include:
|
||||
|
||||
first row key, first_event_timestamp,
|
||||
first event quantity, last_event_timestamp and last event quantity
|
||||
"""
|
||||
first_row = None
|
||||
|
@ -158,7 +162,9 @@ class GroupSortbyTimestamp(Grouping):
|
|||
def fetch_group_latest_oldest_quantity(record_store_df,
|
||||
transform_spec_df,
|
||||
groupby_columns_list):
|
||||
"""function to group record store data, sort by timestamp within group
|
||||
"""To group record store data
|
||||
|
||||
sort by timestamp within group
|
||||
and get first and last timestamp along with quantity within each group
|
||||
|
||||
This function uses key-value pair rdd's groupBy function to do groupby
|
||||
|
|
|
@ -26,8 +26,9 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
|
||||
@staticmethod
|
||||
def _get_group_first_last_quantity_udf(partitionlistiter):
|
||||
"""user defined function to to through a list of partitions. Each
|
||||
partition contains elements for a group. All the elements are sorted by
|
||||
"""user defined function to to through a list of partitions.
|
||||
|
||||
Each partition contains elements for a group. All the elements are sorted by
|
||||
timestamp.
|
||||
The stats include first row key, first_event_timestamp,
|
||||
fist event quantity, last_event_timestamp and last event quantity
|
||||
|
@ -87,7 +88,9 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
|
||||
@staticmethod
|
||||
def _prepare_for_groupby(record_store_with_groupby_rdd):
|
||||
"""creates a new rdd where the first element of each row
|
||||
"""creates a new rdd where:
|
||||
|
||||
the first element of each row
|
||||
contains array of grouping key and event timestamp fields.
|
||||
Grouping key and event timestamp fields are used by
|
||||
partitioning and sorting function to partition the data
|
||||
|
@ -118,7 +121,9 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
|
||||
@staticmethod
|
||||
def _get_partition_by_group(group_composite):
|
||||
"""get a hash of the grouping key, which is then used by partitioning
|
||||
"""get a hash of the grouping key,
|
||||
|
||||
which is then used by partitioning
|
||||
function to get partition where the groups data should end up in.
|
||||
It uses hash % num_partitions to get partition
|
||||
"""
|
||||
|
@ -133,8 +138,7 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
|
||||
@staticmethod
|
||||
def _sortby_timestamp(group_composite):
|
||||
"""get timestamp which will be used to sort grouped data
|
||||
"""
|
||||
"""get timestamp which will be used to sort grouped data"""
|
||||
event_timestamp_string = group_composite[1]
|
||||
return event_timestamp_string
|
||||
|
||||
|
@ -142,9 +146,7 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
def _group_sortby_timestamp_partition(record_store_df,
|
||||
groupby_columns_list,
|
||||
num_of_groups):
|
||||
"""component that does a group by and then sorts all
|
||||
the items within the group by event timestamp.
|
||||
"""
|
||||
"""It does a group by and then sorts all the items within the group by event timestamp."""
|
||||
# convert the dataframe rdd to normal rdd and add the group by
|
||||
# column list
|
||||
record_store_with_groupby_rdd = record_store_df.rdd.\
|
||||
|
@ -174,6 +176,7 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
@staticmethod
|
||||
def _remove_none_filter(row):
|
||||
"""remove any rows which have None as grouping key
|
||||
|
||||
[GroupingResults(grouping_key="key1", results={})] rows get created
|
||||
when partition does not get any grouped data assigned to it
|
||||
"""
|
||||
|
@ -185,7 +188,9 @@ class GroupSortbyTimestampPartition(Grouping):
|
|||
transform_spec_df,
|
||||
groupby_columns_list,
|
||||
num_of_groups):
|
||||
"""function to group record store data, sort by timestamp within group
|
||||
"""Function to group record store data
|
||||
|
||||
Sort by timestamp within group
|
||||
and get first and last timestamp along with quantity within each group
|
||||
|
||||
To do group by it uses custom partitioning function which creates a new
|
||||
|
|
|
@ -17,6 +17,7 @@ from pyspark import StorageLevel
|
|||
|
||||
class InvalidCacheStorageLevelException(Exception):
|
||||
"""Exception thrown when an invalid cache storage level is encountered
|
||||
|
||||
Attributes:
|
||||
value: string representing the error
|
||||
"""
|
||||
|
@ -33,9 +34,7 @@ class StorageUtils(object):
|
|||
|
||||
@staticmethod
|
||||
def get_storage_level(storage_level_str):
|
||||
"""get pyspark storage level from storage level
|
||||
string
|
||||
"""
|
||||
"""get pyspark storage level from storage level string"""
|
||||
if (storage_level_str == "DISK_ONLY"):
|
||||
return StorageLevel.DISK_ONLY
|
||||
elif (storage_level_str == "DISK_ONLY_2"):
|
||||
|
|
|
@ -19,9 +19,7 @@ from tests.functional.messaging.adapter import DummyAdapter
|
|||
|
||||
|
||||
class DummyInsert(InsertComponent):
|
||||
"""Insert component that writes metric data to
|
||||
to kafka queue
|
||||
"""
|
||||
"""Insert component that writes metric data to kafka queue"""
|
||||
|
||||
@staticmethod
|
||||
def insert(transform_context, instance_usage_df):
|
||||
|
|
|
@ -19,9 +19,7 @@ from tests.functional.messaging.adapter import DummyAdapter
|
|||
|
||||
|
||||
class DummyInsertPreHourly(InsertComponent):
|
||||
"""Insert component that writes metric data to
|
||||
to kafka queue
|
||||
"""
|
||||
"""Insert component that writes metric data to kafka queue"""
|
||||
|
||||
@staticmethod
|
||||
def insert(transform_context, instance_usage_df):
|
||||
|
|
|
@ -542,6 +542,7 @@ class TestDataDrivenSpecsRepo(SparkContextTest):
|
|||
event_type=None,
|
||||
pre_transform_specs_data_frame=None):
|
||||
"""get row for event type
|
||||
|
||||
:rtype: Row
|
||||
"""
|
||||
rows = pre_transform_specs_data_frame.filter(
|
||||
|
|
|
@ -53,8 +53,7 @@ class JSONOffsetSpecs(OffsetSpecs):
|
|||
log.info('No kafka offsets found at startup')
|
||||
|
||||
def _save(self):
|
||||
"""get the specs of last run time of offset
|
||||
"""
|
||||
"""get the specs of last run time of offset"""
|
||||
log.info("Saving json offsets: %s", self._kafka_offsets)
|
||||
|
||||
with open(self.kafka_offset_spec_file, 'w') as offset_file:
|
||||
|
|
6
tox.ini
6
tox.ini
|
@ -83,11 +83,7 @@ commands =
|
|||
|
||||
[flake8]
|
||||
max-complexity = 30
|
||||
# TODO: ignored checks should be enabled in the future
|
||||
# H904 Wrap long lines in parentheses instead of a backslash (DEPRECATED)
|
||||
# H405 Multiline docstring separated by empty line
|
||||
# E402 module level import not at top of file FIXME remove this
|
||||
ignore = H904,H405,E402
|
||||
max-line-length = 100
|
||||
# H106 Don’t put vim configuration in source files
|
||||
# H203 Use assertIs(Not)None to check for None
|
||||
enable-extensions=H106,H203
|
||||
|
|
Loading…
Reference in New Issue