Minor class and variable name changes.
This commit is contained in:
parent
f88b0f25c5
commit
024c556979
|
@ -7,9 +7,9 @@ import com.google.inject.assistedinject.FactoryModuleBuilder;
|
|||
import com.hpcloud.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.consumer.KafkaConsumerRunnableBasic;
|
||||
import com.hpcloud.consumer.KafkaConsumerRunnableBasicFactory;
|
||||
import com.hpcloud.consumer.MonConsumer;
|
||||
import com.hpcloud.consumer.MonPersisterConsumer;
|
||||
import com.hpcloud.dbi.DBIProvider;
|
||||
import com.hpcloud.dedupe.MonDeDuperHeartbeat;
|
||||
import com.hpcloud.dedupe.MonPersisterDeduperHeartbeat;
|
||||
import com.hpcloud.disruptor.DisruptorExceptionHandler;
|
||||
import com.hpcloud.disruptor.DisruptorProvider;
|
||||
import com.hpcloud.disruptor.event.MetricMessageEventHandler;
|
||||
|
@ -52,8 +52,8 @@ public class MonPersisterModule extends AbstractModule {
|
|||
|
||||
bind(DBI.class).toProvider(DBIProvider.class).in(Scopes.SINGLETON);
|
||||
|
||||
bind(MonConsumer.class);
|
||||
bind(MonDeDuperHeartbeat.class);
|
||||
bind(MonPersisterConsumer.class);
|
||||
bind(MonPersisterDeduperHeartbeat.class);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,8 +3,8 @@ package com.hpcloud;
|
|||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.hpcloud.configuration.MonPersisterConfiguration;
|
||||
import com.hpcloud.consumer.MonConsumer;
|
||||
import com.hpcloud.dedupe.MonDeDuperHeartbeat;
|
||||
import com.hpcloud.consumer.MonPersisterConsumer;
|
||||
import com.hpcloud.dedupe.MonPersisterDeduperHeartbeat;
|
||||
import com.hpcloud.healthcheck.SimpleHealthCheck;
|
||||
import com.hpcloud.resource.Resource;
|
||||
import com.yammer.dropwizard.Service;
|
||||
|
@ -32,11 +32,11 @@ public class MonPersisterService extends Service<MonPersisterConfiguration> {
|
|||
// Sample health check.
|
||||
environment.addHealthCheck(new SimpleHealthCheck("test-health-check"));
|
||||
|
||||
MonConsumer monConsumer = injector.getInstance(MonConsumer.class);
|
||||
environment.manage(monConsumer);
|
||||
MonPersisterConsumer consumer = injector.getInstance(MonPersisterConsumer.class);
|
||||
environment.manage(consumer);
|
||||
|
||||
MonDeDuperHeartbeat monDeDuperHeartbeat = injector.getInstance(MonDeDuperHeartbeat.class);
|
||||
environment.manage(monDeDuperHeartbeat);
|
||||
MonPersisterDeduperHeartbeat deduperHeartbeat = injector.getInstance(MonPersisterDeduperHeartbeat.class);
|
||||
environment.manage(deduperHeartbeat);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ package com.hpcloud.configuration;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class MonDeDuperConfiguration {
|
||||
public class DeduperConfiguration {
|
||||
|
||||
@JsonProperty
|
||||
Integer dedupeRunFrequencySeconds;
|
|
@ -55,9 +55,9 @@ public class MonPersisterConfiguration extends Configuration {
|
|||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private MonDeDuperConfiguration monDeDuperConfiguration = new MonDeDuperConfiguration();
|
||||
private DeduperConfiguration monDeDuperConfiguration = new DeduperConfiguration();
|
||||
|
||||
public MonDeDuperConfiguration getMonDeDuperConfiguration() {
|
||||
public DeduperConfiguration getMonDeDuperConfiguration() {
|
||||
return monDeDuperConfiguration;
|
||||
}
|
||||
|
||||
|
@ -69,5 +69,4 @@ public class MonPersisterConfiguration extends Configuration {
|
|||
public VerticaMetricRepositoryConfiguration getVerticaMetricRepositoryConfiguration() {
|
||||
return verticaMetricRepositoryConfiguration;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -10,6 +10,4 @@ public class VerticaOutputProcessorConfiguration {
|
|||
public Integer getBatchSize() {
|
||||
return batchSize;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -24,9 +24,10 @@ public class KafkaConsumer {
|
|||
|
||||
private final String topic;
|
||||
private final Integer numThreads;
|
||||
private final ConsumerConnector consumerConnector;
|
||||
private ExecutorService executorService;
|
||||
private final KafkaConsumerRunnableBasicFactory kafkaConsumerRunnableBasicFactory;
|
||||
private final ConsumerConfig consumerConfig;
|
||||
private ConsumerConnector consumerConnector;
|
||||
|
||||
@Inject
|
||||
public KafkaConsumer(MonPersisterConfiguration configuration,
|
||||
|
@ -39,13 +40,13 @@ public class KafkaConsumer {
|
|||
logger.info(KAFKA_CONFIGURATION + " numThreads = " + numThreads);
|
||||
|
||||
Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration());
|
||||
ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties);
|
||||
this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
|
||||
|
||||
consumerConfig = createConsumerConfig(kafkaProperties);
|
||||
this.kafkaConsumerRunnableBasicFactory = kafkaConsumerRunnableBasicFactory;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
|
||||
|
||||
Map<String, Integer> topicCountMap = new HashMap<>();
|
||||
topicCountMap.put(topic, new Integer(numThreads));
|
||||
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
|
||||
|
@ -56,7 +57,6 @@ public class KafkaConsumer {
|
|||
for (final KafkaStream stream : streams) {
|
||||
executorService.submit(kafkaConsumerRunnableBasicFactory.create(stream, threadNumber));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
|
|
|
@ -6,15 +6,15 @@ import com.yammer.dropwizard.lifecycle.Managed;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class MonConsumer implements Managed {
|
||||
public class MonPersisterConsumer implements Managed {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(MonConsumer.class);
|
||||
private static Logger logger = LoggerFactory.getLogger(MonPersisterConsumer.class);
|
||||
|
||||
private KafkaConsumer kafkaConsumer;
|
||||
private Disruptor disruptor;
|
||||
|
||||
@Inject
|
||||
public MonConsumer(KafkaConsumer kafkaConsumer, Disruptor disruptor) {
|
||||
public MonPersisterConsumer(KafkaConsumer kafkaConsumer, Disruptor disruptor) {
|
||||
this.kafkaConsumer = kafkaConsumer;
|
||||
this.disruptor = disruptor;
|
||||
}
|
|
@ -8,24 +8,24 @@ import com.yammer.dropwizard.lifecycle.Managed;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class MonDeDuperHeartbeat implements Managed {
|
||||
public class MonPersisterDeduperHeartbeat implements Managed {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(MonDeDuperHeartbeat.class);
|
||||
private static Logger logger = LoggerFactory.getLogger(MonPersisterDeduperHeartbeat.class);
|
||||
|
||||
private final Disruptor disruptor;
|
||||
private final DeDuperRunnable deDuperRunnable;
|
||||
private final DeduperRunnable deduperRunnable;
|
||||
|
||||
@Inject
|
||||
public MonDeDuperHeartbeat(Disruptor disruptor) {
|
||||
public MonPersisterDeduperHeartbeat(Disruptor disruptor) {
|
||||
this.disruptor = disruptor;
|
||||
this.deDuperRunnable = new DeDuperRunnable(disruptor);
|
||||
this.deduperRunnable = new DeduperRunnable(disruptor);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
|
||||
Thread deduperThread = new Thread(deDuperRunnable);
|
||||
Thread deduperThread = new Thread(deduperRunnable);
|
||||
deduperThread.start();
|
||||
}
|
||||
|
||||
|
@ -33,13 +33,13 @@ public class MonDeDuperHeartbeat implements Managed {
|
|||
public void stop() throws Exception {
|
||||
}
|
||||
|
||||
private static class DeDuperRunnable implements Runnable {
|
||||
private static class DeduperRunnable implements Runnable {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(DeDuperRunnable.class);
|
||||
private static Logger logger = LoggerFactory.getLogger(DeduperRunnable.class);
|
||||
|
||||
private final Disruptor disruptor;
|
||||
|
||||
private DeDuperRunnable(Disruptor disruptor) {
|
||||
private DeduperRunnable(Disruptor disruptor) {
|
||||
this.disruptor = disruptor;
|
||||
}
|
||||
|
|
@ -12,7 +12,6 @@ public class DisruptorExceptionHandler implements ExceptionHandler {
|
|||
public void handleEventException(Throwable ex, long sequence, Object event) {
|
||||
|
||||
logger.error("Disruptor encountered an exception during normal operation", ex);
|
||||
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
||||
|
@ -20,7 +19,6 @@ public class DisruptorExceptionHandler implements ExceptionHandler {
|
|||
public void handleOnStartException(Throwable ex) {
|
||||
|
||||
logger.error("Disruptor encountered an exception during startup", ex);
|
||||
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
|
||||
|
@ -28,8 +26,6 @@ public class DisruptorExceptionHandler implements ExceptionHandler {
|
|||
public void handleOnShutdownException(Throwable ex) {
|
||||
|
||||
logger.error("Disruptor encountered an exception during shutdown", ex);
|
||||
|
||||
throw new RuntimeException(ex);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,6 @@ public class DisruptorProvider implements Provider<Disruptor> {
|
|||
logger.debug("Buffer size for instance of disruptor [" + bufferSize + "]");
|
||||
|
||||
Disruptor<MetricMessageEvent> disruptor = new Disruptor(metricMessageEventFactory, bufferSize, executor);
|
||||
|
||||
disruptor.handleExceptionsWith(exceptionHandler);
|
||||
|
||||
int batchSize = configuration.getVerticaOutputProcessorConfiguration().getBatchSize();
|
||||
|
@ -57,24 +56,19 @@ public class DisruptorProvider implements Provider<Disruptor> {
|
|||
EventHandler[] eventHandlers = new EventHandler[numOutputProcessors];
|
||||
|
||||
for (int i = 0; i < numOutputProcessors; ++i) {
|
||||
|
||||
eventHandlers[i] = metricMessageEventHandlerFactory.create(i, numOutputProcessors, batchSize);
|
||||
|
||||
}
|
||||
|
||||
disruptor.handleEventsWith(eventHandlers);
|
||||
|
||||
disruptor.start();
|
||||
logger.debug("Instance of disruptor successfully started");
|
||||
|
||||
logger.debug("Instance of disruptor successfully started");
|
||||
logger.debug("Instance of disruptor fully created");
|
||||
|
||||
return disruptor;
|
||||
|
||||
}
|
||||
|
||||
public Disruptor<MetricMessageEvent> get() {
|
||||
|
||||
return instance;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,9 +6,9 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class VerticaRepository {
|
||||
private static final Logger logger = LoggerFactory.getLogger(VerticaRepository.class);
|
||||
protected DBI dbi;
|
||||
protected Handle handle;
|
||||
private static final Logger logger = LoggerFactory.getLogger(VerticaRepository.class);
|
||||
|
||||
public VerticaRepository(DBI dbi) {
|
||||
this.dbi = dbi;
|
||||
|
@ -24,6 +24,4 @@ public class VerticaRepository {
|
|||
this.dbi = dbi;
|
||||
this.handle = dbi.open();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ kafkaConfiguration:
|
|||
autoOffsetReset: largest
|
||||
consumerTimeoutMs: -1
|
||||
clientId : 1
|
||||
zookeeperSessionTimeoutMs : 6000
|
||||
zookeeperSessionTimeoutMs : 60000
|
||||
zookeeperConnectionTimeoutMs : 6000
|
||||
zookeeperSyncTimeMs: 2000
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package com.hpcloud;
|
||||
|
||||
import com.hpcloud.consumer.KafkaConsumer;
|
||||
import com.hpcloud.consumer.MonConsumer;
|
||||
import com.hpcloud.consumer.MonPersisterConsumer;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -19,7 +19,7 @@ public class MonConsumerTest {
|
|||
private Disruptor disruptor;
|
||||
|
||||
@InjectMocks
|
||||
private MonConsumer monConsumer;
|
||||
private MonPersisterConsumer monConsumer;
|
||||
|
||||
@Before
|
||||
public void initMocks() {
|
||||
|
|
Loading…
Reference in New Issue