Validate metrics before publishing to kafka

Validate monasca metrics using monasca-common
validate library (requires monasca-common >= 1.1.0)

Change-Id: Iea784edbb3b57db57e6a90d1fc557b2c386c3713
This commit is contained in:
Ashwin Agate 2016-09-28 23:39:25 +00:00
parent 0ea79c0305
commit 1c65ca011b
9 changed files with 138 additions and 32 deletions

View File

@ -11,14 +11,20 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import json
import time
from monasca_common.validation import metrics as metric_validator
from monasca_transform.component import Component
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.log_utils import LogUtils
from oslo_config import cfg
ConfigInitializer.basic_config()
log = LogUtils.init_logger(__name__)
class InsertComponent(Component):
@ -33,6 +39,20 @@ class InsertComponent(Component):
def get_component_type():
return Component.INSERT_COMPONENT_TYPE
@staticmethod
def _validate_metric(metric):
"""validate monasca metric.
"""
try:
# validate metric part, without the wrapper
metric_validator.validate(metric["metric"])
except Exception as e:
log.info("Metric %s is invalid: Exception : %s"
% (json.dumps(metric),
e.message))
return False
return True
@staticmethod
def _prepare_metric(instance_usage_dict, agg_params):
"""transform instance usage rdd to a monasca metric.

View File

@ -57,10 +57,11 @@ class KafkaInsert(InsertComponent):
# Approach # 2
# using collect() to fetch all elements of an RDD and write to
# kafka
#
for instance_usage_row in instance_usage_df.collect():
metric = InsertComponent._get_metric(
instance_usage_row, agg_params)
KafkaMessageAdapter.send_metric(metric)
# validate metric part
if InsertComponent._validate_metric(metric):
KafkaMessageAdapter.send_metric(metric)
return instance_usage_df

View File

@ -36,6 +36,7 @@ from monasca_transform.component.usage.fetch_quantity import \
from monasca_transform.component.usage.fetch_quantity_util import \
FetchQuantityUtilException
from monasca_transform.config.config_initializer import ConfigInitializer
from monasca_transform.log_utils import LogUtils
from monasca_transform.transform.builder.generic_transform_builder \
import GenericTransformBuilder
@ -55,19 +56,7 @@ from monasca_transform.transform.transform_utils import MonMetricUtils
from monasca_transform.transform import TransformContextUtils
ConfigInitializer.basic_config()
# initialize logger
log = logging.getLogger(__name__)
_h = logging.FileHandler('%s/%s' % (
cfg.CONF.service.service_log_path,
cfg.CONF.service.service_log_filename))
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
"%(lineno)s - %(levelname)s - %(message)s'"))
log.addHandler(_h)
if cfg.CONF.service.enable_debug_log_entries:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
log = LogUtils.init_logger(__name__)
class MonMetricsKafkaProcessor(object):

View File

@ -13,6 +13,7 @@
# under the License.
import logging
from oslo_config import cfg
class LogUtils(object):
@ -31,3 +32,22 @@ class LogUtils(object):
debugstr = "\n".join((debugstr, "type: %s" % (type(obj))))
debugstr = "\n".join((debugstr, "dir: %s" % (dir(obj)), sep))
LogUtils.log_debug(debugstr)
@staticmethod
def init_logger(logger_name):
# initialize logger
log = logging.getLogger(logger_name)
_h = logging.FileHandler('%s/%s' % (
cfg.CONF.service.service_log_path,
cfg.CONF.service.service_log_filename))
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
"%(lineno)s - %(levelname)s"
" - %(message)s'"))
log.addHandler(_h)
if cfg.CONF.service.enable_debug_log_entries:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
return log

View File

@ -32,6 +32,7 @@ from monasca_transform.data_driven_specs.data_driven_specs_repo \
import DataDrivenSpecsRepo
from monasca_transform.data_driven_specs.data_driven_specs_repo \
import DataDrivenSpecsRepoFactory
from monasca_transform.log_utils import LogUtils
from monasca_transform.processor import Processor
from monasca_transform.transform.storage_utils import \
InvalidCacheStorageLevelException
@ -40,19 +41,7 @@ from monasca_transform.transform.transform_utils import InstanceUsageUtils
from monasca_transform.transform import TransformContextUtils
ConfigInitializer.basic_config()
# initialize logger
log = logging.getLogger(__name__)
_h = logging.FileHandler('%s/%s' % (
cfg.CONF.service.service_log_path,
cfg.CONF.service.service_log_filename))
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
"%(lineno)s - %(levelname)s - %(message)s'"))
log.addHandler(_h)
if cfg.CONF.service.enable_debug_log_entries:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
log = LogUtils.init_logger(__name__)
class PreHourlyProcessor(Processor):

View File

@ -8,4 +8,3 @@ oslo.config>=1.2.1
oslo.log
oslo.service
tooz

View File

@ -1,6 +1,6 @@
# mock object framework
hacking>=0.10.2
flake8>=2.2.4
flake8>=2.2.4,<3.0.0
nose==1.3.0
mock>=1.0.1
tox

View File

@ -65,5 +65,7 @@ class DummyInsert(InsertComponent):
for instance_usage_row in instance_usage_df.collect():
metric = InsertComponent._get_metric(instance_usage_row,
agg_params)
DummyAdapter.send_metric(metric)
# validate metric part
if InsertComponent._validate_metric(metric):
DummyAdapter.send_metric(metric)
return instance_usage_df

View File

@ -91,6 +91,31 @@ class TestFetchQuantityAgg(SparkContextTest):
return [json.loads(transform_specs_json_operation)]
def get_transform_specs_json_invalid_name(self):
"""get transform_specs driver table info."""
transform_specs_json = """
{"aggregation_params_map":{
"aggregation_pipeline":{"source":"streaming",
"usage":"fetch_quantity",
"setters":["rollup_quantity",
"set_aggregated_metric_name",
"set_aggregated_period"],
"insert":["prepare_data",
"insert_data"]},
"aggregated_metric_name": "&invalidmetricname",
"aggregation_period": "hourly",
"aggregation_group_by_list": ["host", "metric_id"],
"usage_fetch_operation": "sum",
"setter_rollup_group_by_list": ["host"],
"setter_rollup_operation": "sum",
"dimension_list":["aggregation_period",
"host",
"project_id"]
},
"metric_group":"mem_total_all",
"metric_id":"mem_total_all"}"""
return [json.loads(transform_specs_json)]
def get_invalid_filter_transform_specs_json(self,
field_to_filter,
filter_expression,
@ -1020,6 +1045,67 @@ class TestFetchQuantityAgg(SparkContextTest):
self.assertTrue("Encountered invalid filter details:" in e.value)
self.assertTrue("filter operation = invalid." in e.value)
@mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.'
'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo')
@mock.patch('monasca_transform.transform.builder.'
'generic_transform_builder.GenericTransformBuilder.'
'_get_insert_component_manager')
@mock.patch('monasca_transform.transform.builder.'
'generic_transform_builder.GenericTransformBuilder.'
'_get_setter_component_manager')
@mock.patch('monasca_transform.transform.builder.'
'generic_transform_builder.GenericTransformBuilder.'
'_get_usage_component_manager')
def test_invalid_aggregated_metric_name(self,
usage_manager,
setter_manager,
insert_manager,
data_driven_specs_repo):
# load components
usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr()
setter_manager.return_value = \
MockComponentManager.get_setter_cmpt_mgr()
insert_manager.return_value = \
MockComponentManager.get_insert_cmpt_mgr()
# init mock driver tables
data_driven_specs_repo.return_value = \
MockDataDrivenSpecsRepo(
self.spark_context,
self.get_pre_transform_specs_json(),
self.get_transform_specs_json_invalid_name())
# Create an emulated set of Kafka messages (these were gathered
# by extracting Monasca messages from the Metrics queue on mini-mon).
# Create an RDD out of the mocked Monasca metrics
with open(DataProvider.fetch_quantity_data_path) as f:
raw_lines = f.read().splitlines()
raw_tuple_list = [eval(raw_line) for raw_line in raw_lines]
rdd_monasca = self.spark_context.parallelize(raw_tuple_list)
# decorate mocked RDD with dummy kafka offsets
myOffsetRanges = [
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
transform_context = TransformContextUtils.get_context(
offset_info=myOffsetRanges,
batch_time_info=self.get_dummy_batch_time())
rdd_monasca_with_offsets = rdd_monasca.map(
lambda x: RddTransformContext(x, transform_context))
# Call the primary method in mon_metrics_kafka
MonMetricsKafkaProcessor.rdd_to_recordstore(
rdd_monasca_with_offsets)
# get the metrics that have been submitted to the dummy message adapter
metrics = DummyAdapter.adapter_impl.metric_list
# metrics should be empty
self.assertFalse(metrics)
if __name__ == "__main__":
print("PATH *************************************************************")