Moved opening the Kafka connection to open instead of the constructor so the MetricSpout can be serialized by Storm
This commit is contained in:
parent
44f440d920
commit
45a7140232
|
@ -28,7 +28,7 @@ public class MetricSpout extends BaseRichSpout {
|
|||
private final MetricSpoutConfig metricSpoutConfig;
|
||||
private final MetricDeserializer metricDeserializer;
|
||||
|
||||
private final ConsumerConnector consumerConnector;
|
||||
private ConsumerConnector consumerConnector;
|
||||
|
||||
private List<KafkaStream<byte[], byte[]>> streams = null;
|
||||
|
||||
|
@ -37,10 +37,6 @@ public class MetricSpout extends BaseRichSpout {
|
|||
public MetricSpout(MetricSpoutConfig metricSpoutConfig, MetricDeserializer metricDeserializer) {
|
||||
this.metricSpoutConfig = metricSpoutConfig;
|
||||
this.metricDeserializer = metricDeserializer;
|
||||
|
||||
Properties kafkaProperties = KafkaConsumerProperties.createKafkaProperties(metricSpoutConfig.kafkaConsumerConfiguration);
|
||||
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
|
||||
this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,6 +53,10 @@ public class MetricSpout extends BaseRichSpout {
|
|||
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
|
||||
this.collector = collector;
|
||||
|
||||
Properties kafkaProperties = KafkaConsumerProperties.createKafkaProperties(metricSpoutConfig.kafkaConsumerConfiguration);
|
||||
ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
|
||||
this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue