From 452c020f6328f61d21cfdf5dfbf85fe29e289e09 Mon Sep 17 00:00:00 2001 From: Deklan Dieterly Date: Tue, 18 Aug 2015 10:25:22 -0600 Subject: [PATCH] Enhance fail fast on error functionality Make persister fail faster on errors. Use one executor service for all perister threads. Shutdown all threads in thread executor service on any error. Catch java.lang.Throwable to catch java.lang.Error as well as java.lang.Exception. Change-Id: I0dc421cf6bb4ab3f52c47e97b7f396483283b561 --- .../persister/PersisterApplication.java | 24 ++++- .../persister/consumer/KafkaConsumer.java | 15 +-- .../consumer/KafkaConsumerFactory.java | 5 +- .../consumer/KafkaConsumerRunnableBasic.java | 100 ++++++++++++++---- .../repository/vertica/VerticaMetricRepo.java | 2 +- 5 files changed, 108 insertions(+), 38 deletions(-) diff --git a/java/src/main/java/monasca/persister/PersisterApplication.java b/java/src/main/java/monasca/persister/PersisterApplication.java index f3ddc4c7..ec3e1a22 100644 --- a/java/src/main/java/monasca/persister/PersisterApplication.java +++ b/java/src/main/java/monasca/persister/PersisterApplication.java @@ -17,6 +17,7 @@ package monasca.persister; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Key; @@ -25,6 +26,10 @@ import com.google.inject.TypeLiteral; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + import io.dropwizard.Application; import io.dropwizard.setup.Bootstrap; import io.dropwizard.setup.Environment; @@ -105,8 +110,18 @@ public class PersisterApplication extends Application { injector.getInstance(Key.get(new TypeLiteral>(){})); final KafkaConsumerRunnableBasicFactory kafkaMetricConsumerRunnableBasicFactory = - injector.getInstance(Key.get(new TypeLiteral>(){})); + injector.getInstance( + Key.get(new TypeLiteral>() { + })); + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .build(); + + int totalNumberOfThreads = configuration.getMetricConfiguration().getNumThreads() + + configuration.getAlarmHistoryConfiguration().getNumThreads(); + + ExecutorService executorService = Executors.newFixedThreadPool(totalNumberOfThreads, threadFactory); for (int i = 0; i < configuration.getMetricConfiguration().getNumThreads(); i++) { @@ -122,7 +137,7 @@ public class PersisterApplication extends Application { kafkaMetricConsumerRunnableBasicFactory.create(managedMetricPipeline, kafkaMetricChannel, threadId); final KafkaConsumer kafkaMetricConsumer = - kafkaMetricConsumerFactory.create(kafkaMetricConsumerRunnableBasic, threadId); + kafkaMetricConsumerFactory.create(kafkaMetricConsumerRunnableBasic, threadId, executorService); ManagedConsumer managedMetricConsumer = metricManagedConsumerFactory.create(kafkaMetricConsumer, threadId); @@ -158,7 +173,8 @@ public class PersisterApplication extends Application { kafkaAlarmStateTransitionConsumerRunnableBasicFactory.create(managedAlarmStateTransitionPipeline, kafkaAlarmStateTransitionChannel, threadId); final KafkaConsumer kafkaAlarmStateTransitionConsumer = - kafkaAlarmStateTransitionConsumerFactory.create(kafkaAlarmStateTransitionConsumerRunnableBasic, threadId); + kafkaAlarmStateTransitionConsumerFactory.create(kafkaAlarmStateTransitionConsumerRunnableBasic, threadId, + executorService); ManagedConsumer managedAlarmStateTransitionConsumer = alarmStateTransitionsManagedConsumerFactory.create(kafkaAlarmStateTransitionConsumer, threadId); diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java index d8520828..ff3b12b4 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumer.java @@ -33,7 +33,7 @@ public class KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); - private static final int WAIT_TIME = 10; + private static final int WAIT_TIME = 5; private ExecutorService executorService; @@ -43,10 +43,12 @@ public class KafkaConsumer { @Inject public KafkaConsumer( @Assisted KafkaConsumerRunnableBasic kafkaConsumerRunnableBasic, - @Assisted String threadId) { + @Assisted String threadId, + @Assisted ExecutorService executorService) { this.kafkaConsumerRunnableBasic = kafkaConsumerRunnableBasic; this.threadId = threadId; + this.executorService = executorService; } @@ -54,13 +56,6 @@ public class KafkaConsumer { logger.info("[{}]: start", this.threadId); - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat(threadId + "-%d") - .setDaemon(true) - .build(); - - executorService = Executors.newSingleThreadExecutor(threadFactory); - executorService.submit(kafkaConsumerRunnableBasic.setExecutorService(executorService)); } @@ -75,8 +70,6 @@ public class KafkaConsumer { logger.info("[{}]: shutting down executor service", this.threadId); - executorService.shutdown(); - try { logger.info("[{}]: awaiting termination...", this.threadId); diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumerFactory.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerFactory.java index 4fb02b7a..0960b3c2 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumerFactory.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerFactory.java @@ -17,10 +17,13 @@ package monasca.persister.consumer; +import java.util.concurrent.ExecutorService; + public interface KafkaConsumerFactory { KafkaConsumer create( KafkaConsumerRunnableBasic kafkaConsumerRunnableBasic, - String threadId); + String threadId, + ExecutorService executorService); } diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java index b4cbb29b..3d978ae6 100644 --- a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java +++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java @@ -27,8 +27,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import kafka.consumer.ConsumerIterator; +import monasca.persister.repository.RepoException; public class KafkaConsumerRunnableBasic implements Runnable { @@ -38,6 +40,7 @@ public class KafkaConsumerRunnableBasic implements Runnable { private final String threadId; private final ManagedPipeline pipeline; private volatile boolean stop = false; + private boolean fatalErrorDetected = false; private ExecutorService executorService; @@ -60,7 +63,7 @@ public class KafkaConsumerRunnableBasic implements Runnable { } - protected void publishHeartbeat() { + protected void publishHeartbeat() throws RepoException { publishEvent(null); @@ -82,9 +85,19 @@ public class KafkaConsumerRunnableBasic implements Runnable { try { - if (pipeline.shutdown()) { + if (!this.fatalErrorDetected) { - markRead(); + logger.info("[{}}: shutting pipeline down", this.threadId); + + if (pipeline.shutdown()) { + + markRead(); + + } + + } else { + + logger.info("[{}]: fatal error detected. Exiting immediately without flush", this.threadId); } @@ -93,6 +106,7 @@ public class KafkaConsumerRunnableBasic implements Runnable { logger.error("caught fatal exception while shutting down", e); } + } public void run() { @@ -103,12 +117,28 @@ public class KafkaConsumerRunnableBasic implements Runnable { logger.debug("[{}]: KafkaChannel has stream iterator", this.threadId); - while (!this.stop) { + while (!this.stop) { + + try { try { + if (isInterrupted()) { + + this.fatalErrorDetected = true; + break; + + } + if (it.hasNext()) { + if (isInterrupted()) { + + this.fatalErrorDetected = true; + break; + + } + final String msg = new String(it.next().message()); logger.debug("[{}]: {}", this.threadId, msg); @@ -119,45 +149,73 @@ public class KafkaConsumerRunnableBasic implements Runnable { } catch (kafka.consumer.ConsumerTimeoutException cte) { + if (isInterrupted()) { + + this.fatalErrorDetected = true; + break; + + } + publishHeartbeat(); } - if (Thread.currentThread().isInterrupted()) { + } catch (Throwable e) { - logger.debug("[{}]: is interrupted. breaking out of run loop", this.threadId); + logger.error( + "[{}]: caught fatal exception while publishing msg. Shutting entire persister down now!", + this.threadId, e); - break; + this.stop = true; + this.fatalErrorDetected = true; + + this.executorService.shutdownNow(); + + try { + + this.executorService.awaitTermination(5, TimeUnit.SECONDS); + + } catch (InterruptedException e1) { + + logger.info("[{}]: interrupted while awaiting termination", this.threadId, e1); } + + LogManager.shutdown(); + + System.exit(1); + } - logger.info("[{}]: shutting down", this.threadId); - - this.kafkaChannel.stop(); - } + logger.info("[{}]: shutting down", this.threadId); - protected void publishEvent(final String msg) { + this.kafkaChannel.stop(); - try { + } - if (pipeline.publishEvent(msg)) { + protected void publishEvent(final String msg) throws RepoException { - markRead(); + if (pipeline.publishEvent(msg)) { - } + markRead(); - } catch (Exception e) { + } - logger.error("caught fatal exception while publishing msg. Shutting entire persister down now!"); + } - this.executorService.shutdownNow(); + private boolean isInterrupted() { - LogManager.shutdown(); + if (Thread.currentThread().interrupted()) { - System.exit(-1); + logger.debug("[{}]: is interrupted. breaking out of run loop", this.threadId); + + return true; + + } else { + + return false; } } diff --git a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java index 21df854c..258473ad 100644 --- a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java @@ -450,7 +450,7 @@ public class VerticaMetricRepo extends VerticaRepo implements Repo