Basic consuming functionality working.

kafka consuming and sending thru disruptor.
  disruptor handlers handling.
This commit is contained in:
Deklan Dieterly 2014-02-21 14:44:02 -07:00
parent 9d8efa53bc
commit 96269503af
12 changed files with 167 additions and 20 deletions

View File

@ -49,6 +49,11 @@
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.2.0</version>
</dependency>
</dependencies>

View File

@ -0,0 +1,14 @@
package com.hpcloud;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DisruptorConfiguration {
@JsonProperty
Integer bufferSize;
@JsonProperty
Integer numThreads;
}

View File

@ -1,7 +1,7 @@
package com.hpcloud;
import com.google.inject.Inject;
import com.yammer.dropwizard.config.Environment;
import com.lmax.disruptor.dsl.Disruptor;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
@ -24,14 +24,16 @@ public class KafkaConsumer {
private final Integer numThreads;
private final ConsumerConnector consumerConnector;
private ExecutorService executorService;
private final Disruptor disruptor;
@Inject
public KafkaConsumer(MonPersisterConfiguration configuration, Environment environment) {
public KafkaConsumer(MonPersisterConfiguration configuration, Disruptor disruptor) {
Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration());
ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties);
this.topic = configuration.getKafkaConfiguration().topic;
this.numThreads = configuration.getKafkaConfiguration().numThreads;
this.disruptor = disruptor;
Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration());
ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties);
this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
@ -44,7 +46,7 @@ public class KafkaConsumer {
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executorService.submit(new KafkaConsumerRunnableBasic(stream, threadNumber));
executorService.submit(new KafkaConsumerRunnableBasic(stream, threadNumber, disruptor));
}
}

View File

@ -1,22 +1,38 @@
package com.hpcloud;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class KafkaConsumerRunnableBasic implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
private KafkaStream stream;
private int threadNumber;
private Disruptor disruptor;
public KafkaConsumerRunnableBasic(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
public KafkaConsumerRunnableBasic(KafkaStream stream, int threadNumber, Disruptor disruptor) {
this.stream = stream;
this.threadNumber = threadNumber;
this.disruptor = disruptor;
}
@SuppressWarnings("unchecked")
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
final String s = new String(it.next().message());
System.out.println("Thread " + threadNumber + ": " + s);
disruptor.publishEvent(new EventTranslator<StringEvent>() {
@Override
public void translateTo(StringEvent event, long sequence) {
event.set(s);
}
});
}
System.out.println("Shutting down Thread: " + threadNumber);
}
}

View File

@ -23,4 +23,22 @@ public class MonPersisterConfiguration extends Configuration {
public KafkaConfiguration getKafkaConfiguration() {
return kafkaConfiguration;
}
@Valid
@NotNull
@JsonProperty
private DisruptorConfiguration disruptorConfiguration = new DisruptorConfiguration();
public DisruptorConfiguration getDisruptorConfiguration() {
return disruptorConfiguration;
}
@Valid
@NotNull
@JsonProperty
private VerticaOutputProcessorConfiguration verticaOutputProcessorConfiguration = new VerticaOutputProcessorConfiguration();
public VerticaOutputProcessorConfiguration getVerticaOutputProcessorConfiguration() {
return verticaOutputProcessorConfiguration;
}
}

View File

@ -1,22 +1,22 @@
package com.hpcloud;
import com.google.inject.AbstractModule;
import com.yammer.dropwizard.config.Environment;
import com.lmax.disruptor.dsl.Disruptor;
public class MonPersisterModule extends AbstractModule {
private final MonPersisterConfiguration configuration;
private final Environment environment;
private final Disruptor disruptor;
public MonPersisterModule(MonPersisterConfiguration configuration, Environment environment) {
public MonPersisterModule(MonPersisterConfiguration configuration, Disruptor<StringEvent> disruptor) {
this.configuration = configuration;
this.environment = environment;
this.disruptor = disruptor;
}
@Override
protected void configure() {
bind(MonPersisterConfiguration.class).toInstance(configuration);
bind(Environment.class).toInstance(environment);
bind(Disruptor.class).toInstance(disruptor);
bind(MonConsumer.class);
}

View File

@ -2,10 +2,15 @@ package com.hpcloud;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.yammer.dropwizard.Service;
import com.yammer.dropwizard.config.Bootstrap;
import com.yammer.dropwizard.config.Environment;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class MonPersisterService extends Service<MonPersisterConfiguration> {
public static void main(String[] args) throws Exception {
@ -19,7 +24,10 @@ public class MonPersisterService extends Service<MonPersisterConfiguration> {
@Override
public void run(MonPersisterConfiguration configuration, Environment environment) throws Exception {
Injector injector = Guice.createInjector(new MonPersisterModule(configuration, environment));
Disruptor<StringEvent> disruptor = createDisruptor(configuration);
Injector injector = Guice.createInjector(new MonPersisterModule(configuration, disruptor));
// Sample resource.
environment.addResource(new Resource());
@ -29,4 +37,28 @@ public class MonPersisterService extends Service<MonPersisterConfiguration> {
MonConsumer monConsumer = injector.getInstance(MonConsumer.class);
environment.manage(monConsumer);
}
private Disruptor<StringEvent> createDisruptor(MonPersisterConfiguration configuration) {
Executor executor = Executors.newCachedThreadPool();
StringEventFactory stringEventFactory = new StringEventFactory();
int buffersize = configuration.getDisruptorConfiguration().bufferSize;
Disruptor<StringEvent> disruptor = new Disruptor(stringEventFactory, buffersize, executor);
int numOutputProcessors = configuration.getVerticaOutputProcessorConfiguration().numProcessors;
EventHandlerGroup<StringEvent> handlerGroup = null;
for (int i = 0; i < numOutputProcessors; ++i) {
StringEventHandler stringEventHandler = new StringEventHandler();
if (handlerGroup == null) {
handlerGroup = disruptor.handleEventsWith(stringEventHandler);
} else {
handlerGroup.then(stringEventHandler);
}
}
disruptor.start();
return disruptor;
}
}

View File

@ -0,0 +1,16 @@
package com.hpcloud;
public class StringEvent {
private String value;
public String getValue() {
return value;
}
public void set(String value) {
this.value = value;
}
}

View File

@ -0,0 +1,13 @@
package com.hpcloud;
import com.lmax.disruptor.EventFactory;
public class StringEventFactory implements EventFactory<StringEvent> {
public static final StringEventFactory INSTANCE = new StringEventFactory();
@Override
public StringEvent newInstance() {
return new StringEvent();
}
}

View File

@ -0,0 +1,11 @@
package com.hpcloud;
import com.lmax.disruptor.EventHandler;
public class StringEventHandler implements EventHandler<StringEvent> {
@Override
public void onEvent(StringEvent stringEvent, long l, boolean b) throws Exception {
System.out.println("Event: " + stringEvent.getValue());
}
}

View File

@ -0,0 +1,12 @@
package com.hpcloud;
import com.fasterxml.jackson.annotation.JsonProperty;
public class VerticaOutputProcessorConfiguration {
@JsonProperty
Integer batchSize;
@JsonProperty
Integer numProcessors;
}

View File

@ -26,6 +26,14 @@ kafkaConfiguration:
zookeeperConnectionTimeoutMs : 6000
zookeeperSyncTimeMs: 2000
disruptorConfiguration:
bufferSize: 1048576
numThreads: 1
verticaOutputProcessorConfiguration:
batchSize: 512
numProcessors: 1
# Logging settings.
logging: