Make disruptor threads parallel.

This commit is contained in:
Deklan Dieterly 2014-02-25 07:47:49 -07:00
parent 64f10ce202
commit 23c32cd7db
1 changed files with 31 additions and 11 deletions

View File

@ -7,14 +7,18 @@ import com.hpcloud.event.StringEvent;
import com.hpcloud.event.StringEventFactory;
import com.hpcloud.event.StringEventHandler;
import com.hpcloud.event.StringEventHandlerFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class DisruptorProvider implements Provider<Disruptor> {
private static Logger logger = LoggerFactory.getLogger(DisruptorProvider.class);
private MonPersisterConfiguration configuration;
private StringEventHandlerFactory stringEventHandlerFactory;
private Disruptor instance;
@ -27,31 +31,47 @@ public class DisruptorProvider implements Provider<Disruptor> {
}
public synchronized Disruptor<StringEvent> get() {
logger.debug("Requesting instance of disruptor");
if (instance == null) {
logger.debug("Instance of disruptor is null. Creating disruptor...");
Executor executor = Executors.newCachedThreadPool();
StringEventFactory stringEventFactory = new StringEventFactory();
int buffersize = configuration.getDisruptorConfiguration().getBufferSize();
Disruptor<StringEvent> disruptor = new Disruptor(stringEventFactory, buffersize, executor);
int bufferSize = configuration.getDisruptorConfiguration().getBufferSize();
logger.debug("Buffer size for instance of disruptor [" + bufferSize + "]");
Disruptor<StringEvent> disruptor = new Disruptor(stringEventFactory, bufferSize, executor);
int batchSize = configuration.getVerticaOutputProcessorConfiguration().getBatchSize();
logger.debug("Batch size for each output processor [" + batchSize + "]");
int numOutputProcessors = configuration.getDisruptorConfiguration().getNumProcessors();
EventHandlerGroup<StringEvent> handlerGroup = null;
logger.debug("Number of output processors [" + numOutputProcessors + "]");
EventHandler[] stringEventHandlers = new StringEventHandler[numOutputProcessors];
for (int i = 0; i < numOutputProcessors; ++i) {
StringEventHandler stringEventHandler = stringEventHandlerFactory.create(i, numOutputProcessors, batchSize);
if (handlerGroup == null) {
handlerGroup = disruptor.handleEventsWith(stringEventHandler);
} else {
handlerGroup.then(stringEventHandler);
}
stringEventHandlers[i] = stringEventHandlerFactory.create(i, numOutputProcessors, batchSize);
}
disruptor.handleEventsWith(stringEventHandlers);
disruptor.start();
logger.debug("Instance of disruptor successfully started");
logger.debug("Instance of disruptor fully created");
instance = disruptor;
}
logger.debug("Returning instance of disruptor");
return instance;
}
}