Implemented KafkaAlarmEventForwarder to actually use Kafka. Has not been tested, yet, but pretty simple implementation.

Modified mon-thresh-sample-config.yml to use configuration for EventSpout and KafkaAlarmEventForwarder. Also, made it compatible with mini-mon and use mysql not vertica. Pointed it at the real topics and used descriptive groupIds.

Renamed externalRabbit to kafkaProducerConfig in ThresholdingConfiguration. Changed uses of it.

Added close to interface AlarmEventForwarder.
This commit is contained in:
craigbr 2014-03-14 15:39:04 -06:00
parent 9cc53bd0fd
commit 5288059b24
7 changed files with 88 additions and 28 deletions

View File

@ -6,10 +6,10 @@ metricSpoutConfig:
#Kafka settings.
kafkaConsumerConfiguration:
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
topic: test
topic: metrics
numThreads: 1
groupId: 1
zookeeperConnect: localhost:2181
groupId: thresh-metrics
zookeeperConnect: 192.168.10.10:2181
consumerId: 1
socketTimeoutMs: 30000
socketReceiveBufferBytes : 65536
@ -34,10 +34,10 @@ eventSpoutConfig:
#Kafka settings.
kafkaConsumerConfiguration:
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
topic: test
topic: events
numThreads: 1
groupId: 1
zookeeperConnect: localhost:2181
groupId: thresh-events
zookeeperConnect: 192.168.10.10:2181
consumerId: 1
socketTimeoutMs: 30000
socketReceiveBufferBytes : 65536
@ -58,16 +58,64 @@ eventSpoutConfig:
zookeeperSyncTimeMs: 2000
eventSpoutConfig:
#Kafka settings.
kafkaConsumerConfiguration:
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
topic: alarms
numThreads: 1
groupId: 2
zookeeperConnect: 192.168.10.10:2181
consumerId: 1
socketTimeoutMs: 30000
socketReceiveBufferBytes : 65536
fetchMessageMaxBytes: 1048576
autoCommitEnable: true
autoCommitIntervalMs: 60000
queuedMaxMessageChunks: 10
rebalanceMaxRetries: 4
fetchMinBytes: 1
fetchWaitMaxMs: 100
rebalanceBackoffMs: 2000
refreshLeaderBackoffMs: 200
autoOffsetReset: largest
consumerTimeoutMs: -1
clientId : 1
zookeeperSessionTimeoutMs : 6000
zookeeperConnectionTimeoutMs : 6000
zookeeperSyncTimeMs: 2000
kafkaProducerConfig:
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
topic: alarm-state-transitions
metadataBrokerList: 192.168.10.10:9092
serializerClass: kafka.serializer.StringEncoder
partitionerClass:
requestRequiredAcks: 1
requestTimeoutMs: 10000
producerType: sync
keySerializerClass:
compressionCodec: none
compressedTopics:
messageSendMaxRetries: 3
retryBackoffMs: 100
topicMetadataRefreshIntervalMs: 600000
queueBufferingMaxMs: 5000
queueBufferingMaxMessages: 10000
queueEnqueueTimeoutMs: -1
batchNumMessages: 200
sendBufferBytes: 102400
clientId : Threshold_Engine
sporadicMetricNamespaces:
- foo
database:
driverClass: com.vertica.jdbc.Driver
# url: jdbc:vertica://mon-aw1rdd1-vertica0001.rndd.aw1.hpcloud.net:5433/som
url: jdbc:vertica://15.185.94.245:5433/som
# user: persister
user: mon_persister
# user: dbadmin
driverClass: com.mysql.jdbc.Driver
url: jdbc:mysql://192.168.10.6:3306/mon
user: thresh
password: password
properties:
ssl: false

View File

@ -1,7 +0,0 @@
package com.hpcloud.mon;
import java.io.Serializable;
public class KafkaProducerConfiguration implements Serializable {
private static final long serialVersionUID = -7420121222391821487L;
}

View File

@ -1,6 +1,6 @@
package com.hpcloud.mon;
import com.hpcloud.configuration.KafkaConsumerConfiguration;
import com.hpcloud.configuration.KafkaProducerConfiguration;
import java.util.Set;
@ -9,7 +9,6 @@ import javax.validation.constraints.NotNull;
import org.hibernate.validator.constraints.NotEmpty;
import com.hpcloud.configuration.KafkaConsumerConfiguration;
import com.hpcloud.mon.infrastructure.thresholding.DataSourceFactory;
/**
@ -55,7 +54,7 @@ public class ThresholdingConfiguration {
/** Configuration for publishing to the alerts exchange on the external server. */
@NotEmpty public String alertsExchange = "alerts";
@NotEmpty public String alertsRoutingKey = "alert";
@Valid @NotNull public KafkaProducerConfiguration externalRabbit = new KafkaProducerConfiguration();
@Valid @NotNull public KafkaProducerConfiguration kafkaProducerConfig = new KafkaProducerConfiguration();
/** MaaS API database configuration. */
@Valid @NotNull public DataSourceFactory database = new DataSourceFactory();

View File

@ -110,7 +110,7 @@ public class TopologyModule extends AbstractModule {
// Aggregation / Event -> Thresholding
builder.setBolt("thresholding-bolt",
new AlarmThresholdingBolt(config.database, config.externalRabbit),
new AlarmThresholdingBolt(config.database, config.kafkaProducerConfig),
config.thresholdingBoltThreads)
.fieldsGrouping("aggregation-bolt", new Fields("alarmId"))
.fieldsGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID,

View File

@ -4,4 +4,5 @@ public interface AlarmEventForwarder {
void send(String alertExchange, String alertRoutingKey, String json);
void close();
}

View File

@ -12,7 +12,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.hpcloud.mon.KafkaProducerConfiguration;
import com.hpcloud.configuration.KafkaProducerConfiguration;
import com.hpcloud.mon.ThresholdingConfiguration;
import com.hpcloud.mon.common.event.AlarmDeletedEvent;
import com.hpcloud.mon.common.model.alarm.AlarmState;
@ -20,7 +20,6 @@ import com.hpcloud.mon.domain.model.Alarm;
import com.hpcloud.mon.domain.model.AlarmStateTransitionEvent;
import com.hpcloud.mon.domain.model.SubAlarm;
import com.hpcloud.mon.domain.service.AlarmDAO;
import com.hpcloud.mon.infrastructure.messaging.MessagingModule;
import com.hpcloud.mon.infrastructure.persistence.PersistenceModule;
import com.hpcloud.streaming.storm.Logging;
import com.hpcloud.streaming.storm.Streams;

View File

@ -1,20 +1,40 @@
package com.hpcloud.mon.infrastructure.thresholding;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import com.google.inject.AbstractModule;
import com.hpcloud.mon.KafkaProducerConfiguration;
import com.hpcloud.configuration.KafkaProducerConfiguration;
import com.hpcloud.configuration.KafkaProducerProperties;
public class KafkaAlarmEventForwarder extends AbstractModule implements AlarmEventForwarder {
private final Producer<String, String> producer;
private final String topic;
public KafkaAlarmEventForwarder(KafkaProducerConfiguration kafkaConfig) {
this.topic = kafkaConfig.getTopic();
Properties kafkaProperties = KafkaProducerProperties.createKafkaProperties(kafkaConfig);
ProducerConfig consumerConfig = new ProducerConfig(kafkaProperties);
producer = new Producer<String, String>(consumerConfig);
}
@Override
public void send(String alertExchange, String alertRoutingKey, String json) {
final KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic, alertRoutingKey, json);
producer.send(message);
}
@Override
public void close() {
producer.close();
}
@Override
protected void configure() {
}
}