Starting organizing code into

This commit is contained in:
Deklan Dieterly 2014-02-24 14:36:31 -07:00
parent fcb4de1727
commit 08e92465c1
23 changed files with 251 additions and 132 deletions

View File

@ -1,14 +0,0 @@
package com.hpcloud;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DisruptorConfiguration {
@JsonProperty
Integer bufferSize;
@JsonProperty
Integer numThreads;
}

View File

@ -1,72 +0,0 @@
package com.hpcloud;
import com.fasterxml.jackson.annotation.JsonProperty;
public class KafkaConfiguration {
@JsonProperty
String topic;
@JsonProperty
Integer numThreads;
@JsonProperty
String groupId;
@JsonProperty
String zookeeperConnect;
@JsonProperty
String consumerId;
@JsonProperty
Integer socketTimeoutMs;
@JsonProperty
Integer socketReceiveBufferBytes;
@JsonProperty
Integer fetchMessageMaxBytes;
@JsonProperty
Boolean autoCommitEnable;
@JsonProperty
Integer autoCommitIntervalMs;
@JsonProperty
Integer queuedMaxMessageChunks;
@JsonProperty
Integer rebalanceMaxRetries;
@JsonProperty
Integer fetchMinBytes;
@JsonProperty
Integer fetchWaitMaxMs;
@JsonProperty
Integer rebalanceBackoffMs;
@JsonProperty
Integer refreshLeaderBackoffMs;
@JsonProperty
String autoOffsetReset;
@JsonProperty
Integer consumerTimeoutMs;
@JsonProperty
String clientId;
@JsonProperty
Integer zookeeperSessionTimeoutMs;
@JsonProperty
Integer zookeeperConnectionTimeoutMs;
@JsonProperty
Integer zookeeperSyncTimeMs;
}

View File

@ -5,6 +5,11 @@ import com.google.inject.Provider;
import com.google.inject.ProvisionException;
import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.hpcloud.configuration.MonPersisterConfiguration;
import com.hpcloud.consumer.MonConsumer;
import com.hpcloud.disruptor.DisruptorFactory;
import com.hpcloud.event.StringEventHandler;
import com.hpcloud.event.StringEventHandlerFactory;
import com.yammer.dropwizard.config.Environment;
import com.yammer.dropwizard.jdbi.DBIFactory;
import org.skife.jdbi.v2.DBI;

View File

@ -2,6 +2,10 @@ package com.hpcloud;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.hpcloud.configuration.MonPersisterConfiguration;
import com.hpcloud.consumer.MonConsumer;
import com.hpcloud.healthcheck.SimpleHealthCheck;
import com.hpcloud.resource.Resource;
import com.yammer.dropwizard.Service;
import com.yammer.dropwizard.config.Bootstrap;
import com.yammer.dropwizard.config.Environment;

View File

@ -0,0 +1,21 @@
package com.hpcloud.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DisruptorConfiguration {
@JsonProperty
Integer bufferSize;
public Integer getBufferSize() {
return bufferSize;
}
@JsonProperty
Integer numProcessors;
public Integer getNumProcessors() {
return numProcessors;
}
}

View File

@ -0,0 +1,161 @@
package com.hpcloud.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
public class KafkaConfiguration {
@JsonProperty
String topic;
@JsonProperty
Integer numThreads;
@JsonProperty
String groupId;
@JsonProperty
String zookeeperConnect;
@JsonProperty
String consumerId;
@JsonProperty
Integer socketTimeoutMs;
@JsonProperty
Integer socketReceiveBufferBytes;
@JsonProperty
Integer fetchMessageMaxBytes;
@JsonProperty
Boolean autoCommitEnable;
@JsonProperty
Integer autoCommitIntervalMs;
@JsonProperty
Integer queuedMaxMessageChunks;
@JsonProperty
Integer rebalanceMaxRetries;
@JsonProperty
Integer fetchMinBytes;
@JsonProperty
Integer fetchWaitMaxMs;
@JsonProperty
Integer rebalanceBackoffMs;
@JsonProperty
Integer refreshLeaderBackoffMs;
@JsonProperty
String autoOffsetReset;
@JsonProperty
Integer consumerTimeoutMs;
@JsonProperty
String clientId;
@JsonProperty
Integer zookeeperSessionTimeoutMs;
@JsonProperty
Integer zookeeperConnectionTimeoutMs;
@JsonProperty
Integer zookeeperSyncTimeMs;
public String getTopic() {
return topic;
}
public Integer getNumThreads() {
return numThreads;
}
public String getGroupId() {
return groupId;
}
public String getZookeeperConnect() {
return zookeeperConnect;
}
public String getConsumerId() {
return consumerId;
}
public Integer getSocketTimeoutMs() {
return socketTimeoutMs;
}
public Integer getSocketReceiveBufferBytes() {
return socketReceiveBufferBytes;
}
public Integer getFetchMessageMaxBytes() {
return fetchMessageMaxBytes;
}
public Boolean getAutoCommitEnable() {
return autoCommitEnable;
}
public Integer getAutoCommitIntervalMs() {
return autoCommitIntervalMs;
}
public Integer getQueuedMaxMessageChunks() {
return queuedMaxMessageChunks;
}
public Integer getRebalanceMaxRetries() {
return rebalanceMaxRetries;
}
public Integer getFetchMinBytes() {
return fetchMinBytes;
}
public Integer getFetchWaitMaxMs() {
return fetchWaitMaxMs;
}
public Integer getRebalanceBackoffMs() {
return rebalanceBackoffMs;
}
public Integer getRefreshLeaderBackoffMs() {
return refreshLeaderBackoffMs;
}
public String getAutoOffsetReset() {
return autoOffsetReset;
}
public Integer getConsumerTimeoutMs() {
return consumerTimeoutMs;
}
public String getClientId() {
return clientId;
}
public Integer getZookeeperSessionTimeoutMs() {
return zookeeperSessionTimeoutMs;
}
public Integer getZookeeperConnectionTimeoutMs() {
return zookeeperConnectionTimeoutMs;
}
public Integer getZookeeperSyncTimeMs() {
return zookeeperSyncTimeMs;
}
}

View File

@ -1,4 +1,4 @@
package com.hpcloud;
package com.hpcloud.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.yammer.dropwizard.config.Configuration;

View File

@ -1,4 +1,4 @@
package com.hpcloud;
package com.hpcloud.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -7,6 +7,9 @@ public class VerticaOutputProcessorConfiguration {
@JsonProperty
Integer batchSize;
@JsonProperty
Integer numProcessors;
public Integer getBatchSize() {
return batchSize;
}
}

View File

@ -1,6 +1,9 @@
package com.hpcloud;
package com.hpcloud.consumer;
import com.google.inject.Inject;
import com.hpcloud.configuration.KafkaConfiguration;
import com.hpcloud.configuration.MonPersisterConfiguration;
import com.hpcloud.disruptor.DisruptorFactory;
import com.lmax.disruptor.dsl.Disruptor;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
@ -29,8 +32,8 @@ public class KafkaConsumer {
@Inject
public KafkaConsumer(MonPersisterConfiguration configuration, DisruptorFactory disruptorFactory) {
this.topic = configuration.getKafkaConfiguration().topic;
this.numThreads = configuration.getKafkaConfiguration().numThreads;
this.topic = configuration.getKafkaConfiguration().getTopic();
this.numThreads = configuration.getKafkaConfiguration().getNumThreads();
this.disruptor = disruptorFactory.create();
Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration());
ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties);
@ -66,26 +69,26 @@ public class KafkaConsumer {
private Properties createKafkaProperties(KafkaConfiguration kafkaConfiguration) {
Properties properties = new Properties();
properties.put("group.id", kafkaConfiguration.groupId);
properties.put("zookeeper.connect", kafkaConfiguration.zookeeperConnect);
properties.put("consumer.id", kafkaConfiguration.consumerId);
properties.put("socket.timeout.ms", kafkaConfiguration.socketTimeoutMs.toString());
properties.put("socket.receive.buffer.bytes", kafkaConfiguration.socketReceiveBufferBytes.toString());
properties.put("fetch.message.max.bytes", kafkaConfiguration.fetchMessageMaxBytes.toString());
properties.put("auto.commit.enable", kafkaConfiguration.autoCommitEnable.toString());
properties.put("auto.commit.interval.ms", kafkaConfiguration.autoCommitIntervalMs.toString());
properties.put("queued.max.message.chunks", kafkaConfiguration.queuedMaxMessageChunks.toString());
properties.put("rebalance.max.retries", kafkaConfiguration.rebalanceMaxRetries.toString());
properties.put("fetch.min.bytes", kafkaConfiguration.fetchMinBytes.toString());
properties.put("fetch.wait.max.ms", kafkaConfiguration.fetchWaitMaxMs.toString());
properties.put("rebalance.backoff.ms", kafkaConfiguration.rebalanceBackoffMs.toString());
properties.put("refresh.leader.backoff.ms", kafkaConfiguration.refreshLeaderBackoffMs.toString());
properties.put("auto.offset.reset", kafkaConfiguration.autoOffsetReset);
properties.put("consumer.timeout.ms", kafkaConfiguration.consumerTimeoutMs.toString());
properties.put("client.id", kafkaConfiguration.clientId);
properties.put("zookeeper.session.timeout.ms", kafkaConfiguration.zookeeperSessionTimeoutMs.toString());
properties.put("zookeeper.connection.timeout.ms", kafkaConfiguration.zookeeperSessionTimeoutMs.toString());
properties.put("zookeeper.sync.time.ms", kafkaConfiguration.zookeeperSyncTimeMs.toString());
properties.put("group.id", kafkaConfiguration.getGroupId());
properties.put("zookeeper.connect", kafkaConfiguration.getZookeeperConnect());
properties.put("consumer.id", kafkaConfiguration.getConsumerId());
properties.put("socket.timeout.ms", kafkaConfiguration.getSocketTimeoutMs().toString());
properties.put("socket.receive.buffer.bytes", kafkaConfiguration.getSocketReceiveBufferBytes().toString());
properties.put("fetch.message.max.bytes", kafkaConfiguration.getFetchMessageMaxBytes().toString());
properties.put("auto.commit.enable", kafkaConfiguration.getAutoCommitEnable().toString());
properties.put("auto.commit.interval.ms", kafkaConfiguration.getAutoCommitIntervalMs().toString());
properties.put("queued.max.message.chunks", kafkaConfiguration.getQueuedMaxMessageChunks().toString());
properties.put("rebalance.max.retries", kafkaConfiguration.getRebalanceMaxRetries().toString());
properties.put("fetch.min.bytes", kafkaConfiguration.getFetchMinBytes().toString());
properties.put("fetch.wait.max.ms", kafkaConfiguration.getFetchWaitMaxMs().toString());
properties.put("rebalance.backoff.ms", kafkaConfiguration.getRebalanceBackoffMs().toString());
properties.put("refresh.leader.backoff.ms", kafkaConfiguration.getRefreshLeaderBackoffMs().toString());
properties.put("auto.offset.reset", kafkaConfiguration.getAutoOffsetReset());
properties.put("consumer.timeout.ms", kafkaConfiguration.getConsumerTimeoutMs().toString());
properties.put("client.id", kafkaConfiguration.getClientId());
properties.put("zookeeper.session.timeout.ms", kafkaConfiguration.getZookeeperSessionTimeoutMs().toString());
properties.put("zookeeper.connection.timeout.ms", kafkaConfiguration.getZookeeperConnectionTimeoutMs().toString());
properties.put("zookeeper.sync.time.ms", kafkaConfiguration.getZookeeperSyncTimeMs().toString());
return properties;
}

View File

@ -1,5 +1,6 @@
package com.hpcloud;
package com.hpcloud.consumer;
import com.hpcloud.event.StringEvent;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;
import kafka.consumer.ConsumerIterator;

View File

@ -1,4 +1,4 @@
package com.hpcloud;
package com.hpcloud.consumer;
import com.google.inject.Inject;
import com.yammer.dropwizard.lifecycle.Managed;

View File

@ -1,6 +1,11 @@
package com.hpcloud;
package com.hpcloud.disruptor;
import com.google.inject.Inject;
import com.hpcloud.configuration.MonPersisterConfiguration;
import com.hpcloud.event.StringEvent;
import com.hpcloud.event.StringEventFactory;
import com.hpcloud.event.StringEventHandler;
import com.hpcloud.event.StringEventHandlerFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
@ -26,11 +31,11 @@ public class DisruptorFactory {
Executor executor = Executors.newCachedThreadPool();
StringEventFactory stringEventFactory = new StringEventFactory();
int buffersize = configuration.getDisruptorConfiguration().bufferSize;
int buffersize = configuration.getDisruptorConfiguration().getBufferSize();
Disruptor<StringEvent> disruptor = new Disruptor(stringEventFactory, buffersize, executor);
int batchSize = configuration.getVerticaOutputProcessorConfiguration().batchSize;
int numOutputProcessors = configuration.getVerticaOutputProcessorConfiguration().numProcessors;
int batchSize = configuration.getVerticaOutputProcessorConfiguration().getBatchSize();
int numOutputProcessors = configuration.getDisruptorConfiguration().getNumProcessors();
EventHandlerGroup<StringEvent> handlerGroup = null;
for (int i = 0; i < numOutputProcessors; ++i) {

View File

@ -1,4 +1,4 @@
package com.hpcloud;
package com.hpcloud.event;
public class StringEvent {

View File

@ -1,4 +1,4 @@
package com.hpcloud;
package com.hpcloud.event;
import com.lmax.disruptor.EventFactory;

View File

@ -1,7 +1,8 @@
package com.hpcloud;
package com.hpcloud.event;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.hpcloud.repository.VerticaMetricRepository;
import com.lmax.disruptor.EventHandler;
public class StringEventHandler implements EventHandler<StringEvent> {

View File

@ -1,4 +1,4 @@
package com.hpcloud;
package com.hpcloud.event;
import com.google.inject.assistedinject.Assisted;

View File

@ -1,10 +1,10 @@
package com.hpcloud;
package com.hpcloud.healthcheck;
import com.yammer.metrics.core.HealthCheck;
public class SimpleHealthCheck extends HealthCheck {
protected SimpleHealthCheck(String name) {
public SimpleHealthCheck(String name) {
super(name);
}

View File

@ -1,4 +1,4 @@
package com.hpcloud;
package com.hpcloud.repository;
import org.skife.jdbi.v2.DBI;
import org.slf4j.Logger;

View File

@ -1,4 +1,4 @@
package com.hpcloud;
package com.hpcloud.repository;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;

View File

@ -1,4 +1,4 @@
package com.hpcloud;
package com.hpcloud.resource;
public class PlaceHolder {
private final String content;

View File

@ -1,4 +1,4 @@
package com.hpcloud;
package com.hpcloud.resource;
import javax.ws.rs.GET;
import javax.ws.rs.Path;

View File

@ -4,7 +4,7 @@ name: mon-persister
kafkaConfiguration:
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
topic: test
numThreads: 1
numThreads: 2
groupId: 1
zookeeperConnect: localhost:2181
consumerId: 1
@ -28,11 +28,10 @@ kafkaConfiguration:
disruptorConfiguration:
bufferSize: 1048576
numThreads: 1
numProcessors: 2
verticaOutputProcessorConfiguration:
batchSize: 10
numProcessors: 1
databaseConfiguration:
driverClass: com.vertica.jdbc.Driver

View File

@ -1,5 +1,7 @@
package com.hpcloud;
import com.hpcloud.consumer.KafkaConsumer;
import com.hpcloud.consumer.MonConsumer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;