diff --git a/thresh/pom.xml b/thresh/pom.xml index 2420454..d66ef16 100644 --- a/thresh/pom.xml +++ b/thresh/pom.xml @@ -5,7 +5,7 @@ monasca monasca-thresh - 1.1.0-SNAPSHOT + 2.0.0-SNAPSHOT http://github.com/openstack/monasca-thresh jar @@ -22,7 +22,7 @@ ${project.version}-${timestamp}-${gitRevision} ${project.artifactId}-${computedVersion} 1.1.0-SNAPSHOT - 0.9.5 + 1.0.0 9.1-901.jdbc4 2.3.9 0.9.9-RC1 @@ -113,11 +113,6 @@ monasca-common-persistence ${mon.common.version} - - monasca-common - monasca-common-streaming - ${mon.common.version} - monasca-common monasca-common-hibernate @@ -187,11 +182,9 @@ test - monasca-common - monasca-common-streaming - ${mon.common.version} - test-jar - test + commons-codec + commons-codec + 1.4 diff --git a/thresh/src/main/java/monasca/thresh/ThresholdingEngine.java b/thresh/src/main/java/monasca/thresh/ThresholdingEngine.java index 8095014..29130a6 100644 --- a/thresh/src/main/java/monasca/thresh/ThresholdingEngine.java +++ b/thresh/src/main/java/monasca/thresh/ThresholdingEngine.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,17 +17,13 @@ package monasca.thresh; -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.core.util.StatusPrinter; - import monasca.common.util.Injector; import monasca.common.util.config.ConfigurationFactory; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.StormTopology; - +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,8 +66,6 @@ public class ThresholdingEngine { System.exit(0); } - // Let's show the logging status. - StatusPrinter.print((LoggerContext) LoggerFactory.getILoggerFactory()); showVersion(); if (args.length < 2) { diff --git a/thresh/src/main/java/monasca/thresh/TopologyModule.java b/thresh/src/main/java/monasca/thresh/TopologyModule.java index 716ecc5..761241d 100644 --- a/thresh/src/main/java/monasca/thresh/TopologyModule.java +++ b/thresh/src/main/java/monasca/thresh/TopologyModule.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,15 +29,15 @@ import monasca.thresh.utils.StatsdMetricConsumer; import monasca.common.util.Injector; -import backtype.storm.Config; -import backtype.storm.generated.StormTopology; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; - import com.google.inject.AbstractModule; import com.google.inject.Provides; +import org.apache.storm.Config; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; + import javax.inject.Named; /** diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBolt.java index 401c953..9fe570c 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBolt.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014-2016 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,20 +17,12 @@ package monasca.thresh.infrastructure.thresholding; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - +import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.event.AlarmDefinitionDeletedEvent; import monasca.common.model.event.AlarmDefinitionUpdatedEvent; import monasca.common.model.event.AlarmDeletedEvent; import monasca.common.model.metric.MetricDefinition; -import monasca.common.streaming.storm.Logging; import monasca.common.util.Injector; import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.AlarmDefinition; @@ -41,7 +33,15 @@ import monasca.thresh.domain.model.TenantIdAndMetricName; import monasca.thresh.domain.service.AlarmDAO; import monasca.thresh.domain.service.AlarmDefinitionDAO; import monasca.thresh.infrastructure.persistence.PersistenceModule; +import monasca.thresh.utils.Logging; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java index 18a5b34..318f21a 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014,2016 Hewlett Packard Enterprise Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,17 +24,8 @@ import monasca.common.model.event.AlarmUpdatedEvent; import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.metric.MetricDefinition; -import monasca.common.streaming.storm.Logging; -import monasca.common.streaming.storm.Streams; import monasca.common.util.Injector; import monasca.common.util.Serialization; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Tuple; - import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.AlarmDefinition; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; @@ -43,7 +34,14 @@ import monasca.thresh.domain.model.SubExpression; import monasca.thresh.domain.service.AlarmDAO; import monasca.thresh.domain.service.AlarmDefinitionDAO; import monasca.thresh.infrastructure.persistence.PersistenceModule; +import monasca.thresh.utils.Logging; +import monasca.thresh.utils.Streams; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventProcessingBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventProcessingBolt.java index 576dd89..404811d 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventProcessingBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventProcessingBolt.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,22 +24,21 @@ import monasca.common.model.event.AlarmDeletedEvent; import monasca.common.model.event.AlarmUpdatedEvent; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.metric.MetricDefinition; +import monasca.common.util.Injector; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; import monasca.thresh.domain.model.SubExpression; import monasca.thresh.domain.model.TenantIdAndMetricName; import monasca.thresh.domain.service.AlarmDAO; import monasca.thresh.infrastructure.persistence.PersistenceModule; -import monasca.common.streaming.storm.Logging; -import monasca.common.util.Injector; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import monasca.thresh.utils.Logging; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventSpout.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventSpout.java index 5fbd1e2..ec2bdd2 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventSpout.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventSpout.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,10 +20,9 @@ package monasca.thresh.infrastructure.thresholding; import monasca.thresh.EventSpoutConfig; import monasca.thresh.infrastructure.thresholding.deserializer.EventDeserializer; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Values; - +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/KafkaSpout.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/KafkaSpout.java index ce22c86..d7d3c0e 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/KafkaSpout.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/KafkaSpout.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,16 +20,15 @@ package monasca.thresh.infrastructure.thresholding; import monasca.common.configuration.KafkaConsumerProperties; import monasca.thresh.KafkaSpoutConfig; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.base.BaseRichSpout; - import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.base.BaseRichSpout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java index 0f03951..d5b6a7d 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * Copyright 2016 FUJITSU LIMITED * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,19 +18,7 @@ package monasca.thresh.infrastructure.thresholding; -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - import monasca.common.model.metric.Metric; -import monasca.common.streaming.storm.Logging; -import monasca.common.streaming.storm.Streams; -import monasca.common.streaming.storm.Tuples; import monasca.thresh.ThresholdingConfiguration; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; import monasca.thresh.domain.model.SubAlarm; @@ -38,7 +26,18 @@ import monasca.thresh.domain.model.SubAlarmStats; import monasca.thresh.domain.model.SubExpression; import monasca.thresh.domain.model.TenantIdAndMetricName; import monasca.thresh.domain.service.SubAlarmStatsRepository; +import monasca.thresh.utils.Logging; +import monasca.thresh.utils.Streams; +import monasca.thresh.utils.Tuples; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBolt.java index 0e13697..46136fa 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBolt.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,18 +22,7 @@ import monasca.common.model.event.AlarmDefinitionDeletedEvent; import monasca.common.model.alarm.AlarmExpression; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.metric.Metric; -import monasca.common.streaming.storm.Logging; -import monasca.common.streaming.storm.Streams; import monasca.common.util.Injector; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.AlarmDefinition; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; @@ -44,7 +33,16 @@ import monasca.thresh.domain.model.TenantIdAndMetricName; import monasca.thresh.domain.service.AlarmDAO; import monasca.thresh.domain.service.AlarmDefinitionDAO; import monasca.thresh.infrastructure.persistence.PersistenceModule; +import monasca.thresh.utils.Logging; +import monasca.thresh.utils.Streams; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricSpout.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricSpout.java index 0d536ae..dab8952 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricSpout.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricSpout.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,15 +20,13 @@ package monasca.thresh.infrastructure.thresholding; import monasca.common.model.metric.Metric; import monasca.common.model.metric.MetricEnvelope; import monasca.common.model.metric.MetricEnvelopes; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; - import monasca.thresh.MetricSpoutConfig; import monasca.thresh.domain.model.TenantIdAndMetricName; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/deserializer/EventDeserializer.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/deserializer/EventDeserializer.java index c6d361b..b4de546 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/deserializer/EventDeserializer.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/deserializer/EventDeserializer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,16 +22,14 @@ import monasca.common.model.event.AlarmDefinitionDeletedEvent; import monasca.common.model.event.AlarmDefinitionUpdatedEvent; import monasca.common.model.event.AlarmDeletedEvent; import monasca.common.model.event.AlarmUpdatedEvent; -import monasca.common.streaming.storm.TupleDeserializer; import monasca.common.util.Serialization; +import monasca.thresh.utils.TupleDeserializer; -import backtype.storm.tuple.Fields; - +import org.apache.storm.tuple.Fields; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; -import java.io.UnsupportedEncodingException; import java.util.Collections; import java.util.List; diff --git a/thresh/src/main/java/monasca/thresh/utils/Logging.java b/thresh/src/main/java/monasca/thresh/utils/Logging.java new file mode 100644 index 0000000..0e503ad --- /dev/null +++ b/thresh/src/main/java/monasca/thresh/utils/Logging.java @@ -0,0 +1,32 @@ +/* + * (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package monasca.thresh.utils; + +import org.apache.storm.task.TopologyContext; + + +/** + * Storm related logging utilities. + */ +public final class Logging { + private Logging() { + } + + public static String categoryFor(Class type, TopologyContext ctx) { + return String.format("%s-%s", type.getName(), ctx.getThisTaskId()); + } +} diff --git a/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java b/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java index d44959b..daf02cc 100644 --- a/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java +++ b/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,15 +29,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import monasca.common.streaming.storm.Logging; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.Config; -import backtype.storm.metric.api.IMetricsConsumer; -import backtype.storm.task.IErrorReporter; -import backtype.storm.task.TopologyContext; +import org.apache.storm.Config; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.task.IErrorReporter; +import org.apache.storm.task.TopologyContext; import com.fasterxml.jackson.core.JsonGenerationException; import com.fasterxml.jackson.core.JsonParseException; diff --git a/thresh/src/main/java/monasca/thresh/utils/Streams.java b/thresh/src/main/java/monasca/thresh/utils/Streams.java new file mode 100644 index 0000000..bf1ce14 --- /dev/null +++ b/thresh/src/main/java/monasca/thresh/utils/Streams.java @@ -0,0 +1,27 @@ +/* + * (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package monasca.thresh.utils; + +/** + * Utilities for working with streams. + */ +public final class Streams { + public static final String DEFAULT_STREAM_ID = "default"; + + private Streams() { + } +} diff --git a/thresh/src/main/java/monasca/thresh/utils/TupleDeserializer.java b/thresh/src/main/java/monasca/thresh/utils/TupleDeserializer.java new file mode 100644 index 0000000..6077063 --- /dev/null +++ b/thresh/src/main/java/monasca/thresh/utils/TupleDeserializer.java @@ -0,0 +1,38 @@ +/* + * (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package monasca.thresh.utils; + +import org.apache.storm.tuple.Fields; + +import java.util.List; + +/** + * Deserializes tuples. Similar to a Scheme, but allows for multiple records per + * {@link #deserialize(byte[])} call. + */ +public interface TupleDeserializer { + /** + * Returns a list of deserialized tuples, consisting of a list of tuples each with a list of + * fields, for the {@code tuple}, else null if the {@code tuple} cannot be deserialized. + */ + List> deserialize(byte[] tuple); + + /** + * Returns the output fields. + */ + Fields getOutputFields(); +} diff --git a/thresh/src/main/java/monasca/thresh/utils/Tuples.java b/thresh/src/main/java/monasca/thresh/utils/Tuples.java new file mode 100644 index 0000000..a6c1468 --- /dev/null +++ b/thresh/src/main/java/monasca/thresh/utils/Tuples.java @@ -0,0 +1,33 @@ +/* + * (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package monasca.thresh.utils; + +import org.apache.storm.Constants; +import org.apache.storm.tuple.Tuple; + +/** + * Utilities for working with Tuples. + */ +public final class Tuples { + private Tuples() { + } + + public static boolean isTickTuple(Tuple tuple) { + return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) + && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); + } +} diff --git a/thresh/src/test/java/monasca/thresh/ThresholdingEngineAlarmTest.java b/thresh/src/test/java/monasca/thresh/ThresholdingEngineAlarmTest.java index 4a84b23..35e09f9 100644 --- a/thresh/src/test/java/monasca/thresh/ThresholdingEngineAlarmTest.java +++ b/thresh/src/test/java/monasca/thresh/ThresholdingEngineAlarmTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; + import monasca.common.configuration.KafkaProducerConfiguration; import monasca.common.model.event.AlarmDefinitionCreatedEvent; import monasca.common.model.event.AlarmDefinitionDeletedEvent; @@ -37,17 +38,8 @@ import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.metric.Metric; import monasca.common.model.metric.MetricDefinition; -import monasca.common.streaming.storm.TopologyTestCase; import monasca.common.util.Injector; import monasca.common.util.Serialization; - -import backtype.storm.Config; -import backtype.storm.testing.FeederSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; - -import com.google.inject.AbstractModule; - import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.AlarmDefinition; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; @@ -63,6 +55,12 @@ import monasca.thresh.infrastructure.thresholding.MetricFilteringBoltTest; import monasca.thresh.infrastructure.thresholding.MetricSpout; import monasca.thresh.infrastructure.thresholding.ProducerModule; +import com.google.inject.AbstractModule; + +import org.apache.storm.Config; +import org.apache.storm.testing.FeederSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.testng.annotations.AfterMethod; diff --git a/thresh/src/test/java/monasca/thresh/ThresholdingEngineTest.java b/thresh/src/test/java/monasca/thresh/ThresholdingEngineTest.java index 2ea7406..508ee5e 100644 --- a/thresh/src/test/java/monasca/thresh/ThresholdingEngineTest.java +++ b/thresh/src/test/java/monasca/thresh/ThresholdingEngineTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,11 +26,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import backtype.storm.Config; -import backtype.storm.testing.FeederSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import com.google.inject.AbstractModule; import monasca.common.configuration.KafkaProducerConfiguration; import monasca.common.model.alarm.AlarmExpression; import monasca.common.model.alarm.AlarmState; @@ -39,7 +34,6 @@ import monasca.common.model.event.AlarmDefinitionCreatedEvent; import monasca.common.model.event.AlarmStateTransitionedEvent; import monasca.common.model.metric.Metric; import monasca.common.model.metric.MetricDefinition; -import monasca.common.streaming.storm.TopologyTestCase; import monasca.common.util.Injector; import monasca.common.util.Serialization; import monasca.thresh.domain.model.Alarm; @@ -52,6 +46,13 @@ import monasca.thresh.infrastructure.thresholding.AlarmEventForwarder; import monasca.thresh.infrastructure.thresholding.MetricFilteringBolt; import monasca.thresh.infrastructure.thresholding.MetricSpout; import monasca.thresh.infrastructure.thresholding.ProducerModule; + +import com.google.inject.AbstractModule; + +import org.apache.storm.Config; +import org.apache.storm.testing.FeederSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.testng.annotations.AfterMethod; @@ -180,7 +181,6 @@ public class ThresholdingEngineTest extends TopologyTestCase { public void afterMethod() throws Exception { System.out.println("Stopping topology"); stopTopology(); - cluster = null; } public void testWithInitialAlarmDefinition_NonDeterministic() throws Exception { diff --git a/thresh/src/test/java/monasca/thresh/TopologyTestCase.java b/thresh/src/test/java/monasca/thresh/TopologyTestCase.java new file mode 100644 index 0000000..2bcddc8 --- /dev/null +++ b/thresh/src/test/java/monasca/thresh/TopologyTestCase.java @@ -0,0 +1,54 @@ +/* + * (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package monasca.thresh; + +import monasca.common.util.Injector; + +import com.google.common.base.Preconditions; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.generated.StormTopology; +import org.testng.annotations.Test; + +@Test(groups = "integration") +public class TopologyTestCase { + public static final String TEST_TOPOLOGY_NAME = "test-maas-alarming"; + protected static volatile LocalCluster cluster; + + protected void startTopology() throws Exception { + if (cluster == null) { + synchronized (TopologyTestCase.class) { + if (cluster == null) { + Preconditions.checkArgument(Injector.isBound(Config.class), + "You must bind a storm config"); + Preconditions.checkArgument(Injector.isBound(StormTopology.class), + "You must bind a storm topology"); + + cluster = new LocalCluster(); + cluster.submitTopology(TEST_TOPOLOGY_NAME, Injector.getInstance(Config.class), + Injector.getInstance(StormTopology.class)); + } + } + } + } + + protected static void stopTopology() { + if (cluster != null) { + cluster.killTopology(TEST_TOPOLOGY_NAME); + cluster.shutdown(); + cluster = null; + } + } +} diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBoltTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBoltTest.java index 573c6a2..ee7ea89 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBoltTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBoltTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * Copyright 2016 FUJITSU LIMITED * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -30,13 +30,6 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; -import backtype.storm.Testing; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.testing.MkTupleParam; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - import monasca.common.model.alarm.AlarmExpression; import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; @@ -52,6 +45,12 @@ import monasca.thresh.domain.model.TenantIdAndMetricName; import monasca.thresh.domain.service.AlarmDAO; import monasca.thresh.domain.service.AlarmDefinitionDAO; +import org.apache.storm.Testing; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.MkTupleParam; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.testng.annotations.BeforeMethod; diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBoltTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBoltTest.java index 00f26a5..8a83d12 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBoltTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBoltTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014,2016 Hewlett Packard Enterprise Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,6 @@ import monasca.common.model.alarm.AggregateFunction; import monasca.common.model.alarm.AlarmExpression; import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; -import monasca.common.streaming.storm.Streams; import monasca.common.util.Serialization; import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.AlarmDefinition; @@ -38,13 +37,13 @@ import monasca.thresh.domain.model.SubAlarm; import monasca.thresh.domain.model.SubExpression; import monasca.thresh.domain.service.AlarmDAO; import monasca.thresh.domain.service.AlarmDefinitionDAO; +import monasca.thresh.utils.Streams; -import backtype.storm.Testing; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.testing.MkTupleParam; -import backtype.storm.tuple.Tuple; - +import org.apache.storm.Testing; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.MkTupleParam; +import org.apache.storm.tuple.Tuple; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/EventProcessingBoltTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/EventProcessingBoltTest.java index 2357366..2220929 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/EventProcessingBoltTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/EventProcessingBoltTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014,2016 Hewlett Packard Enterprise Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,19 +31,6 @@ import monasca.common.model.alarm.AlarmExpression; import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.metric.MetricDefinition; -import monasca.common.streaming.storm.Streams; - -import backtype.storm.Testing; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.testing.MkTupleParam; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import com.google.common.collect.Sets; - import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.AlarmDefinition; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; @@ -51,7 +38,18 @@ import monasca.thresh.domain.model.SubAlarm; import monasca.thresh.domain.model.SubExpression; import monasca.thresh.domain.model.TenantIdAndMetricName; import monasca.thresh.domain.service.AlarmDAO; +import monasca.thresh.utils.Streams; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Sets; + +import org.apache.storm.Testing; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.MkTupleParam; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.mockito.verification.VerificationMode; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java index b3cfb9d..7a51068 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * Copyright 2016 FUJITSU LIMITED * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -28,22 +28,13 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; + import monasca.common.model.alarm.AlarmOperator; import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.metric.Metric; import monasca.common.model.metric.MetricDefinition; -import monasca.common.streaming.storm.Streams; - -import backtype.storm.Constants; -import backtype.storm.Testing; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.testing.MkTupleParam; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - import monasca.thresh.ThresholdingConfiguration; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; import monasca.thresh.domain.model.SubAlarm; @@ -51,7 +42,15 @@ import monasca.thresh.domain.model.SubAlarmStats; import monasca.thresh.domain.model.SubExpression; import monasca.thresh.domain.model.TenantIdAndMetricName; import monasca.thresh.domain.service.SubAlarmStatsRepository; +import monasca.thresh.utils.Streams; +import org.apache.storm.Constants; +import org.apache.storm.Testing; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.MkTupleParam; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBoltTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBoltTest.java index f35c67e..65e4c9d 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBoltTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBoltTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; + import monasca.common.model.event.AlarmDefinitionCreatedEvent; import monasca.common.model.event.AlarmDefinitionDeletedEvent; import monasca.common.model.alarm.AlarmExpression; @@ -30,15 +31,6 @@ import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.metric.Metric; import monasca.common.model.metric.MetricDefinition; -import monasca.common.streaming.storm.Streams; - -import backtype.storm.Testing; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.testing.MkTupleParam; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.AlarmDefinition; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; @@ -47,7 +39,14 @@ import monasca.thresh.domain.model.SubExpression; import monasca.thresh.domain.model.TenantIdAndMetricName; import monasca.thresh.domain.service.AlarmDAO; import monasca.thresh.domain.service.AlarmDefinitionDAO; +import monasca.thresh.utils.Streams; +import org.apache.storm.Testing; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.MkTupleParam; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.mockito.verification.VerificationMode; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test;