Basic functionality for sending string message to DB.

DB schema 'test'.
  DB table 'test.metric'.
  DB column 'test.metric.metric'.
This commit is contained in:
Deklan Dieterly 2014-02-24 11:32:24 -07:00
parent 96269503af
commit fcb4de1727
11 changed files with 249 additions and 41 deletions

17
pom.xml
View File

@ -40,7 +40,12 @@
<version>3.0</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-assistedinject</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
</dependency>
@ -54,6 +59,16 @@
<artifactId>disruptor</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>com.yammer.dropwizard</groupId>
<artifactId>dropwizard-jdbi</artifactId>
<version>0.6.2</version>
</dependency>
<dependency>
<groupId>com.vertica</groupId>
<artifactId>vertica-jdbc</artifactId>
<version>6.1.0</version>
</dependency>
</dependencies>

View File

@ -0,0 +1,51 @@
package com.hpcloud;
import com.google.inject.Inject;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class DisruptorFactory {
private MonPersisterConfiguration configuration;
private StringEventHandlerFactory stringEventHandlerFactory;
private Disruptor instance;
@Inject
public DisruptorFactory(MonPersisterConfiguration configuration,
StringEventHandlerFactory stringEventHandlerFactory) {
this.configuration = configuration;
this.stringEventHandlerFactory = stringEventHandlerFactory;
}
public synchronized Disruptor<StringEvent> create() {
if (instance == null) {
Executor executor = Executors.newCachedThreadPool();
StringEventFactory stringEventFactory = new StringEventFactory();
int buffersize = configuration.getDisruptorConfiguration().bufferSize;
Disruptor<StringEvent> disruptor = new Disruptor(stringEventFactory, buffersize, executor);
int batchSize = configuration.getVerticaOutputProcessorConfiguration().batchSize;
int numOutputProcessors = configuration.getVerticaOutputProcessorConfiguration().numProcessors;
EventHandlerGroup<StringEvent> handlerGroup = null;
for (int i = 0; i < numOutputProcessors; ++i) {
StringEventHandler stringEventHandler = stringEventHandlerFactory.create(i, numOutputProcessors, batchSize);
if (handlerGroup == null) {
handlerGroup = disruptor.handleEventsWith(stringEventHandler);
} else {
handlerGroup.then(stringEventHandler);
}
}
disruptor.start();
instance = disruptor;
}
return instance;
}
}

View File

@ -27,11 +27,11 @@ public class KafkaConsumer {
private final Disruptor disruptor;
@Inject
public KafkaConsumer(MonPersisterConfiguration configuration, Disruptor disruptor) {
public KafkaConsumer(MonPersisterConfiguration configuration, DisruptorFactory disruptorFactory) {
this.topic = configuration.getKafkaConfiguration().topic;
this.numThreads = configuration.getKafkaConfiguration().numThreads;
this.disruptor = disruptor;
this.disruptor = disruptorFactory.create();
Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration());
ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties);
this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

View File

@ -2,6 +2,7 @@ package com.hpcloud;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.yammer.dropwizard.config.Configuration;
import com.yammer.dropwizard.db.DatabaseConfiguration;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
@ -41,4 +42,14 @@ public class MonPersisterConfiguration extends Configuration {
public VerticaOutputProcessorConfiguration getVerticaOutputProcessorConfiguration() {
return verticaOutputProcessorConfiguration;
}
@Valid
@NotNull
@JsonProperty
private DatabaseConfiguration databaseConfiguration = new DatabaseConfiguration();
public DatabaseConfiguration getDatabaseConfiguration() {
return databaseConfiguration;
}
}

View File

@ -1,23 +1,48 @@
package com.hpcloud;
import com.google.inject.AbstractModule;
import com.lmax.disruptor.dsl.Disruptor;
import com.google.inject.Provider;
import com.google.inject.ProvisionException;
import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.yammer.dropwizard.config.Environment;
import com.yammer.dropwizard.jdbi.DBIFactory;
import org.skife.jdbi.v2.DBI;
public class MonPersisterModule extends AbstractModule {
private final MonPersisterConfiguration configuration;
private final Disruptor disruptor;
private final Environment environment;
public MonPersisterModule(MonPersisterConfiguration configuration, Disruptor<StringEvent> disruptor) {
public MonPersisterModule(MonPersisterConfiguration configuration, Environment environment) {
this.configuration = configuration;
this.disruptor = disruptor;
this.environment = environment;
}
@Override
protected void configure() {
bind(MonPersisterConfiguration.class).toInstance(configuration);
bind(Disruptor.class).toInstance(disruptor);
install(new FactoryModuleBuilder()
.implement(StringEventHandler.class, StringEventHandler.class)
.build(StringEventHandlerFactory.class));
bind(DisruptorFactory.class);
bind(MonConsumer.class);
bind(DBI.class).toProvider(new Provider<DBI>() {
@Override
public DBI get() {
try {
return new DBIFactory().build(environment, configuration.getDatabaseConfiguration(), "vertica");
} catch (ClassNotFoundException e) {
throw new ProvisionException("Failed to provision DBI", e);
}
}
}).in(Scopes.SINGLETON);
}
}

View File

@ -2,15 +2,10 @@ package com.hpcloud;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.yammer.dropwizard.Service;
import com.yammer.dropwizard.config.Bootstrap;
import com.yammer.dropwizard.config.Environment;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class MonPersisterService extends Service<MonPersisterConfiguration> {
public static void main(String[] args) throws Exception {
@ -25,9 +20,7 @@ public class MonPersisterService extends Service<MonPersisterConfiguration> {
@Override
public void run(MonPersisterConfiguration configuration, Environment environment) throws Exception {
Disruptor<StringEvent> disruptor = createDisruptor(configuration);
Injector injector = Guice.createInjector(new MonPersisterModule(configuration, disruptor));
Injector injector = Guice.createInjector(new MonPersisterModule(configuration, environment));
// Sample resource.
environment.addResource(new Resource());
@ -38,27 +31,4 @@ public class MonPersisterService extends Service<MonPersisterConfiguration> {
environment.manage(monConsumer);
}
private Disruptor<StringEvent> createDisruptor(MonPersisterConfiguration configuration) {
Executor executor = Executors.newCachedThreadPool();
StringEventFactory stringEventFactory = new StringEventFactory();
int buffersize = configuration.getDisruptorConfiguration().bufferSize;
Disruptor<StringEvent> disruptor = new Disruptor(stringEventFactory, buffersize, executor);
int numOutputProcessors = configuration.getVerticaOutputProcessorConfiguration().numProcessors;
EventHandlerGroup<StringEvent> handlerGroup = null;
for (int i = 0; i < numOutputProcessors; ++i) {
StringEventHandler stringEventHandler = new StringEventHandler();
if (handlerGroup == null) {
handlerGroup = disruptor.handleEventsWith(stringEventHandler);
} else {
handlerGroup.then(stringEventHandler);
}
}
disruptor.start();
return disruptor;
}
}

View File

@ -1,11 +1,44 @@
package com.hpcloud;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.lmax.disruptor.EventHandler;
public class StringEventHandler implements EventHandler<StringEvent> {
private final int ordinal;
private final int numProcessors;
private final int batchSize;
VerticaMetricRepository verticaMetricRepository;
@Inject
public StringEventHandler(VerticaMetricRepository verticaMetricRepository,
@Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors,
@Assisted("batchSize") int batchSize) {
this.verticaMetricRepository = verticaMetricRepository;
this.ordinal = ordinal;
this.numProcessors = numProcessors;
this.batchSize = batchSize;
}
@Override
public void onEvent(StringEvent stringEvent, long l, boolean b) throws Exception {
public void onEvent(StringEvent stringEvent, long sequence, boolean b) throws Exception {
if (((sequence / batchSize) % this.numProcessors) != this.ordinal) {
return;
}
System.out.println("Event: " + stringEvent.getValue());
verticaMetricRepository.addToBatch(stringEvent.getValue());
if (sequence % batchSize == (batchSize - 1)) {
verticaMetricRepository.commitBatch();
}
}
}

View File

@ -0,0 +1,9 @@
package com.hpcloud;
import com.google.inject.assistedinject.Assisted;
public interface StringEventHandlerFactory {
StringEventHandler create(@Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors,
@Assisted("batchSize") int batchSize);
}

View File

@ -0,0 +1,32 @@
package com.hpcloud;
import org.skife.jdbi.v2.DBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.security.NoSuchAlgorithmException;
public class VerticaMetricRepository extends VerticaRepository {
private static final Logger logger = LoggerFactory.getLogger(VerticaMetricRepository.class);
private static final String SQL_INSERT_INTO_METRICS =
"INSERT INTO test.metric VALUES (:metric)";
private static final String METRIC_COLUMN_NAME = "metric";
@Inject
public VerticaMetricRepository(DBI dbi) throws NoSuchAlgorithmException {
super(dbi);
batch = handle.prepareBatch(SQL_INSERT_INTO_METRICS);
}
public void addToBatch(String aString) {
batch.add().bind(METRIC_COLUMN_NAME, aString);
}
public void commitBatch() {
batch.execute();
}
}

View File

@ -0,0 +1,31 @@
package com.hpcloud;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VerticaRepository {
protected DBI dbi;
protected Handle handle;
protected PreparedBatch batch;
private static final Logger logger = LoggerFactory.getLogger(VerticaRepository.class);
public VerticaRepository(DBI dbi) {
this.dbi = dbi;
this.handle = dbi.open();
this.handle.execute("SET TIME ZONE TO 'UTC'");
}
public VerticaRepository() {
}
public void setDBI(DBI dbi)
throws Exception {
this.dbi = dbi;
this.handle = dbi.open();
}
}

View File

@ -31,9 +31,40 @@ disruptorConfiguration:
numThreads: 1
verticaOutputProcessorConfiguration:
batchSize: 512
batchSize: 10
numProcessors: 1
databaseConfiguration:
driverClass: com.vertica.jdbc.Driver
# url: jdbc:vertica://mon-aw1rdd1-vertica0001.rndd.aw1.hpcloud.net:5433/som
url: jdbc:vertica://15.185.94.245:5433/som
user: persister
password: password
properties:
ssl: false
# the maximum amount of time to wait on an empty pool before throwing an exception
maxWaitForConnection: 1s
# the SQL query to run when validating a connection's liveness
validationQuery: "/* MyService Health Check */ SELECT 1"
# the minimum number of connections to keep open
minSize: 8
# the maximum number of connections to keep open
maxSize: 41
# whether or not idle connections should be validated
checkConnectionWhileIdle: false
# how long a connection must be held before it can be validated
checkConnectionHealthWhenIdleFor: 10s
# the maximum lifetime of an idle connection
closeConnectionIfIdleFor: 1 minute
# Logging settings.
logging: