monasca-thresh/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java

77 lines
2.7 KiB
Java

package com.hpcloud.mon.infrastructure.thresholding;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import com.hpcloud.configuration.KafkaConsumerConfiguration;
import com.hpcloud.configuration.KafkaConsumerProperties;
public abstract class KafkaSpout extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
private static final long serialVersionUID = 744004533863562119L;
private final KafkaConsumerConfiguration kafkaConsumerConfig;
private transient ConsumerConnector consumerConnector;
private transient List<KafkaStream<byte[], byte[]>> streams = null;
private SpoutOutputCollector collector;
protected KafkaSpout(KafkaConsumerConfiguration kafkaConsumerConfig) {
this.kafkaConsumerConfig = kafkaConsumerConfig;
LOG.info("Created");
}
@Override
public void activate() {
LOG.info("Activated");
if (streams == null) {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(kafkaConsumerConfig.getTopic(), new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
streams = consumerMap.get(kafkaConsumerConfig.getTopic());
}
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
LOG.info("Opened");
this.collector = collector;
LOG.info(" topic = " + kafkaConsumerConfig.getTopic());
Properties kafkaProperties = KafkaConsumerProperties.createKafkaProperties(kafkaConsumerConfig);
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
@Override
public void nextTuple() {
LOG.debug("nextTuple called");
ConsumerIterator<byte[], byte[]> it = streams.get(0).iterator();
if (it.hasNext()) {
byte[] message = it.next().message();
LOG.debug("Received message: " + message);
processMessage(message, collector);
}
}
protected abstract void processMessage(byte[] message, SpoutOutputCollector collector2);
}