Validate metrics before publishing to kafka
Validate monasca metrics using monasca-common validate library (requires monasca-common >= 1.1.0) Change-Id: Iea784edbb3b57db57e6a90d1fc557b2c386c3713
This commit is contained in:
parent
0ea79c0305
commit
1c65ca011b
|
@ -11,14 +11,20 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import json
|
||||
import time
|
||||
|
||||
from monasca_common.validation import metrics as metric_validator
|
||||
from monasca_transform.component import Component
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.log_utils import LogUtils
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
ConfigInitializer.basic_config()
|
||||
log = LogUtils.init_logger(__name__)
|
||||
|
||||
|
||||
class InsertComponent(Component):
|
||||
|
||||
|
@ -33,6 +39,20 @@ class InsertComponent(Component):
|
|||
def get_component_type():
|
||||
return Component.INSERT_COMPONENT_TYPE
|
||||
|
||||
@staticmethod
|
||||
def _validate_metric(metric):
|
||||
"""validate monasca metric.
|
||||
"""
|
||||
try:
|
||||
# validate metric part, without the wrapper
|
||||
metric_validator.validate(metric["metric"])
|
||||
except Exception as e:
|
||||
log.info("Metric %s is invalid: Exception : %s"
|
||||
% (json.dumps(metric),
|
||||
e.message))
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _prepare_metric(instance_usage_dict, agg_params):
|
||||
"""transform instance usage rdd to a monasca metric.
|
||||
|
|
|
@ -57,10 +57,11 @@ class KafkaInsert(InsertComponent):
|
|||
# Approach # 2
|
||||
# using collect() to fetch all elements of an RDD and write to
|
||||
# kafka
|
||||
#
|
||||
|
||||
for instance_usage_row in instance_usage_df.collect():
|
||||
metric = InsertComponent._get_metric(
|
||||
instance_usage_row, agg_params)
|
||||
KafkaMessageAdapter.send_metric(metric)
|
||||
# validate metric part
|
||||
if InsertComponent._validate_metric(metric):
|
||||
KafkaMessageAdapter.send_metric(metric)
|
||||
return instance_usage_df
|
||||
|
|
|
@ -36,6 +36,7 @@ from monasca_transform.component.usage.fetch_quantity import \
|
|||
from monasca_transform.component.usage.fetch_quantity_util import \
|
||||
FetchQuantityUtilException
|
||||
from monasca_transform.config.config_initializer import ConfigInitializer
|
||||
from monasca_transform.log_utils import LogUtils
|
||||
from monasca_transform.transform.builder.generic_transform_builder \
|
||||
import GenericTransformBuilder
|
||||
|
||||
|
@ -55,19 +56,7 @@ from monasca_transform.transform.transform_utils import MonMetricUtils
|
|||
from monasca_transform.transform import TransformContextUtils
|
||||
|
||||
ConfigInitializer.basic_config()
|
||||
|
||||
# initialize logger
|
||||
log = logging.getLogger(__name__)
|
||||
_h = logging.FileHandler('%s/%s' % (
|
||||
cfg.CONF.service.service_log_path,
|
||||
cfg.CONF.service.service_log_filename))
|
||||
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
|
||||
"%(lineno)s - %(levelname)s - %(message)s'"))
|
||||
log.addHandler(_h)
|
||||
if cfg.CONF.service.enable_debug_log_entries:
|
||||
log.setLevel(logging.DEBUG)
|
||||
else:
|
||||
log.setLevel(logging.INFO)
|
||||
log = LogUtils.init_logger(__name__)
|
||||
|
||||
|
||||
class MonMetricsKafkaProcessor(object):
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
import logging
|
||||
from oslo_config import cfg
|
||||
|
||||
|
||||
class LogUtils(object):
|
||||
|
@ -31,3 +32,22 @@ class LogUtils(object):
|
|||
debugstr = "\n".join((debugstr, "type: %s" % (type(obj))))
|
||||
debugstr = "\n".join((debugstr, "dir: %s" % (dir(obj)), sep))
|
||||
LogUtils.log_debug(debugstr)
|
||||
|
||||
@staticmethod
|
||||
def init_logger(logger_name):
|
||||
|
||||
# initialize logger
|
||||
log = logging.getLogger(logger_name)
|
||||
_h = logging.FileHandler('%s/%s' % (
|
||||
cfg.CONF.service.service_log_path,
|
||||
cfg.CONF.service.service_log_filename))
|
||||
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
|
||||
"%(lineno)s - %(levelname)s"
|
||||
" - %(message)s'"))
|
||||
log.addHandler(_h)
|
||||
if cfg.CONF.service.enable_debug_log_entries:
|
||||
log.setLevel(logging.DEBUG)
|
||||
else:
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
return log
|
||||
|
|
|
@ -32,6 +32,7 @@ from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
|||
import DataDrivenSpecsRepo
|
||||
from monasca_transform.data_driven_specs.data_driven_specs_repo \
|
||||
import DataDrivenSpecsRepoFactory
|
||||
from monasca_transform.log_utils import LogUtils
|
||||
from monasca_transform.processor import Processor
|
||||
from monasca_transform.transform.storage_utils import \
|
||||
InvalidCacheStorageLevelException
|
||||
|
@ -40,19 +41,7 @@ from monasca_transform.transform.transform_utils import InstanceUsageUtils
|
|||
from monasca_transform.transform import TransformContextUtils
|
||||
|
||||
ConfigInitializer.basic_config()
|
||||
|
||||
# initialize logger
|
||||
log = logging.getLogger(__name__)
|
||||
_h = logging.FileHandler('%s/%s' % (
|
||||
cfg.CONF.service.service_log_path,
|
||||
cfg.CONF.service.service_log_filename))
|
||||
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
|
||||
"%(lineno)s - %(levelname)s - %(message)s'"))
|
||||
log.addHandler(_h)
|
||||
if cfg.CONF.service.enable_debug_log_entries:
|
||||
log.setLevel(logging.DEBUG)
|
||||
else:
|
||||
log.setLevel(logging.INFO)
|
||||
log = LogUtils.init_logger(__name__)
|
||||
|
||||
|
||||
class PreHourlyProcessor(Processor):
|
||||
|
|
|
@ -8,4 +8,3 @@ oslo.config>=1.2.1
|
|||
oslo.log
|
||||
oslo.service
|
||||
tooz
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# mock object framework
|
||||
hacking>=0.10.2
|
||||
flake8>=2.2.4
|
||||
flake8>=2.2.4,<3.0.0
|
||||
nose==1.3.0
|
||||
mock>=1.0.1
|
||||
tox
|
||||
|
|
|
@ -65,5 +65,7 @@ class DummyInsert(InsertComponent):
|
|||
for instance_usage_row in instance_usage_df.collect():
|
||||
metric = InsertComponent._get_metric(instance_usage_row,
|
||||
agg_params)
|
||||
DummyAdapter.send_metric(metric)
|
||||
# validate metric part
|
||||
if InsertComponent._validate_metric(metric):
|
||||
DummyAdapter.send_metric(metric)
|
||||
return instance_usage_df
|
||||
|
|
|
@ -91,6 +91,31 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
|
||||
return [json.loads(transform_specs_json_operation)]
|
||||
|
||||
def get_transform_specs_json_invalid_name(self):
|
||||
"""get transform_specs driver table info."""
|
||||
transform_specs_json = """
|
||||
{"aggregation_params_map":{
|
||||
"aggregation_pipeline":{"source":"streaming",
|
||||
"usage":"fetch_quantity",
|
||||
"setters":["rollup_quantity",
|
||||
"set_aggregated_metric_name",
|
||||
"set_aggregated_period"],
|
||||
"insert":["prepare_data",
|
||||
"insert_data"]},
|
||||
"aggregated_metric_name": "&invalidmetricname",
|
||||
"aggregation_period": "hourly",
|
||||
"aggregation_group_by_list": ["host", "metric_id"],
|
||||
"usage_fetch_operation": "sum",
|
||||
"setter_rollup_group_by_list": ["host"],
|
||||
"setter_rollup_operation": "sum",
|
||||
"dimension_list":["aggregation_period",
|
||||
"host",
|
||||
"project_id"]
|
||||
},
|
||||
"metric_group":"mem_total_all",
|
||||
"metric_id":"mem_total_all"}"""
|
||||
return [json.loads(transform_specs_json)]
|
||||
|
||||
def get_invalid_filter_transform_specs_json(self,
|
||||
field_to_filter,
|
||||
filter_expression,
|
||||
|
@ -1020,6 +1045,67 @@ class TestFetchQuantityAgg(SparkContextTest):
|
|||
self.assertTrue("Encountered invalid filter details:" in e.value)
|
||||
self.assertTrue("filter operation = invalid." in e.value)
|
||||
|
||||
@mock.patch('monasca_transform.data_driven_specs.data_driven_specs_repo.'
|
||||
'DataDrivenSpecsRepoFactory.get_data_driven_specs_repo')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_insert_component_manager')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_setter_component_manager')
|
||||
@mock.patch('monasca_transform.transform.builder.'
|
||||
'generic_transform_builder.GenericTransformBuilder.'
|
||||
'_get_usage_component_manager')
|
||||
def test_invalid_aggregated_metric_name(self,
|
||||
usage_manager,
|
||||
setter_manager,
|
||||
insert_manager,
|
||||
data_driven_specs_repo):
|
||||
|
||||
# load components
|
||||
usage_manager.return_value = MockComponentManager.get_usage_cmpt_mgr()
|
||||
setter_manager.return_value = \
|
||||
MockComponentManager.get_setter_cmpt_mgr()
|
||||
insert_manager.return_value = \
|
||||
MockComponentManager.get_insert_cmpt_mgr()
|
||||
|
||||
# init mock driver tables
|
||||
data_driven_specs_repo.return_value = \
|
||||
MockDataDrivenSpecsRepo(
|
||||
self.spark_context,
|
||||
self.get_pre_transform_specs_json(),
|
||||
self.get_transform_specs_json_invalid_name())
|
||||
|
||||
# Create an emulated set of Kafka messages (these were gathered
|
||||
# by extracting Monasca messages from the Metrics queue on mini-mon).
|
||||
|
||||
# Create an RDD out of the mocked Monasca metrics
|
||||
with open(DataProvider.fetch_quantity_data_path) as f:
|
||||
raw_lines = f.read().splitlines()
|
||||
raw_tuple_list = [eval(raw_line) for raw_line in raw_lines]
|
||||
|
||||
rdd_monasca = self.spark_context.parallelize(raw_tuple_list)
|
||||
|
||||
# decorate mocked RDD with dummy kafka offsets
|
||||
myOffsetRanges = [
|
||||
OffsetRange("metrics", 1, 10, 20)] # mimic rdd.offsetRanges()
|
||||
|
||||
transform_context = TransformContextUtils.get_context(
|
||||
offset_info=myOffsetRanges,
|
||||
batch_time_info=self.get_dummy_batch_time())
|
||||
|
||||
rdd_monasca_with_offsets = rdd_monasca.map(
|
||||
lambda x: RddTransformContext(x, transform_context))
|
||||
|
||||
# Call the primary method in mon_metrics_kafka
|
||||
MonMetricsKafkaProcessor.rdd_to_recordstore(
|
||||
rdd_monasca_with_offsets)
|
||||
|
||||
# get the metrics that have been submitted to the dummy message adapter
|
||||
metrics = DummyAdapter.adapter_impl.metric_list
|
||||
|
||||
# metrics should be empty
|
||||
self.assertFalse(metrics)
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("PATH *************************************************************")
|
||||
|
|
Loading…
Reference in New Issue