Added basic Kafka consumer.

This commit is contained in:
Deklan Dieterly 2014-02-21 11:12:23 -07:00
parent e9456b0dd3
commit c9ac70f801
8 changed files with 171 additions and 19 deletions

View File

@ -4,6 +4,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class KafkaConfiguration {
@JsonProperty
String topic;
@JsonProperty
Integer numThreads;
@JsonProperty
String groupId;

View File

@ -1,20 +1,91 @@
package com.hpcloud;
import com.yammer.dropwizard.lifecycle.Managed;
import com.google.inject.Inject;
import com.yammer.dropwizard.config.Environment;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaConsumer implements Managed {
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
public class KafkaConsumer {
@Override
public void start() throws Exception {
logger.debug("start");
private static Logger logger = LoggerFactory.getLogger(MonConsumer.class);
private final ConsumerConnector consumerConnector;
private final String topic;
private final Integer numThreads;
private ExecutorService executorService;
@Inject
public KafkaConsumer(MonPersisterConfiguration configuration, Environment environment) {
Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration());
ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties);
this.topic = configuration.getKafkaConfiguration().topic;
this.numThreads = configuration.getKafkaConfiguration().numThreads;
this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
@Override
public void stop() throws Exception {
logger.debug("stop");
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executorService = Executors.newFixedThreadPool(numThreads);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executorService.submit(new KafkaConsumerRunnableBasic(stream, threadNumber));
}
}
public void stop() {
if (consumerConnector != null) {
consumerConnector.shutdown();
}
if (executorService != null) {
executorService.shutdown();
}
}
private ConsumerConfig createConsumerConfig(Properties kafkaProperties) {
return new ConsumerConfig(kafkaProperties);
}
private Properties createKafkaProperties(KafkaConfiguration kafkaConfiguration) {
Properties properties = new Properties();
properties.put("group.id", kafkaConfiguration.groupId);
properties.put("zookeeper.connect", kafkaConfiguration.zookeeperConnect);
properties.put("consumer.id", kafkaConfiguration.consumerId);
properties.put("socket.timeout.ms", kafkaConfiguration.socketTimeoutMs);
properties.put("socket.receive.buffer.bytes", kafkaConfiguration.socketReceiveBufferBytes);
properties.put("fetch.message.max.bytes", kafkaConfiguration.fetchMessageMaxBytes);
properties.put("auto.commit.enable", kafkaConfiguration.autoCommitEnable);
properties.put("auto.commit.interval.ms", kafkaConfiguration.autoCommitIntervalMs);
properties.put("queued.max.message.chunks", kafkaConfiguration.queuedMaxMessageChunks);
properties.put("rebalance.max.retries", kafkaConfiguration.rebalanceMaxRetries);
properties.put("fetch.min.bytes", kafkaConfiguration.fetchMinBytes);
properties.put("fetch.wait.max.ms", kafkaConfiguration.fetchWaitMaxMs);
properties.put("rebalance.backoff.ms", kafkaConfiguration.rebalanceBackoffMs);
properties.put("refresh.leader.backoff.ms", kafkaConfiguration.refreshLeaderBackoffMs);
properties.put("auto.offset.reset", kafkaConfiguration.autoOffsetReset);
properties.put("consumer.timeout.ms", kafkaConfiguration.consumerTimeoutMs);
properties.put("client.id", kafkaConfiguration.clientId);
properties.put("zookeeper.session.timeout.ms", kafkaConfiguration.zookeeperSessionTimeoutMs);
properties.put("zookeeper.connection.timeout.ms", kafkaConfiguration.zookeeperSessionTimeoutMs);
properties.put("zookeeper.sync.time.ms", kafkaConfiguration.zookeeperSyncTimeMs);
return properties;
}
}

View File

@ -0,0 +1,22 @@
package com.hpcloud;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class KafkaConsumerRunnableBasic implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public KafkaConsumerRunnableBasic(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
@SuppressWarnings("unchecked")
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}

View File

@ -0,0 +1,30 @@
package com.hpcloud;
import com.google.inject.Inject;
import com.yammer.dropwizard.lifecycle.Managed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MonConsumer implements Managed {
private static Logger logger = LoggerFactory.getLogger(MonConsumer.class);
KafkaConsumer kafkaConsumer;
@Inject
public MonConsumer(KafkaConsumer kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
}
@Override
public void start() throws Exception {
logger.debug("start");
kafkaConsumer.run();
}
@Override
public void stop() throws Exception {
logger.debug("stop");
kafkaConsumer.stop();
}
}

View File

@ -1,11 +1,23 @@
package com.hpcloud;
import com.google.inject.AbstractModule;
import com.yammer.dropwizard.config.Environment;
public class MonPersisterModule extends AbstractModule {
private final MonPersisterConfiguration configuration;
private final Environment environment;
public MonPersisterModule(MonPersisterConfiguration configuration, Environment environment) {
this.configuration = configuration;
this.environment = environment;
}
@Override
protected void configure() {
bind(KafkaConsumer.class);
bind(MonPersisterConfiguration.class).toInstance(configuration);
bind(Environment.class).toInstance(environment);
bind(MonConsumer.class);
}
}

View File

@ -19,12 +19,14 @@ public class MonPersisterService extends Service<MonPersisterConfiguration> {
@Override
public void run(MonPersisterConfiguration configuration, Environment environment) throws Exception {
Injector injector = Guice.createInjector(new MonPersisterModule());
Injector injector = Guice.createInjector(new MonPersisterModule(configuration, environment));
// Sample resource.
environment.addResource(new Resource());
// Sample health check.
environment.addHealthCheck(new SimpleHealthCheck("test-health-check"));
KafkaConsumer kafkaConsumer = injector.getInstance(KafkaConsumer.class);
environment.manage(kafkaConsumer);
MonConsumer monConsumer = injector.getInstance(MonConsumer.class);
environment.manage(monConsumer);
}
}

View File

@ -3,6 +3,8 @@ name: mon-persister
#Kafka settings.
kafkaConfiguration:
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
topic: test
numThreads: 1
groupId: 1
zookeeperConnect: localhost:2181
consumerId: 1

View File

@ -3,20 +3,27 @@ package com.hpcloud;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
public class KafkaConsumerTest {
public class MonConsumerTest {
KafkaConsumer kafkaConsumer;
@Mock
private KafkaConsumer kafkaConsumer;
@InjectMocks
private MonConsumer monConsumer;
@Before
public void before() {
kafkaConsumer = new KafkaConsumer();
public void initMocks() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testKafkaConsumerStart() {
try {
kafkaConsumer.start();
monConsumer.start();
} catch (Exception e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
@ -26,7 +33,7 @@ public class KafkaConsumerTest {
@Test
public void testKafkaConsumerStop() {
try {
kafkaConsumer.stop();
monConsumer.stop();
} catch (Exception e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}