Commit kafka reads once the item was persisted

In order to continue using the Kafka High Consumer API, the disruptor
was removed. This allows a direct call to to kafka to commit the offsets
when items are flushed.

Different ConsumerConnectors had to be created for Metrics and Alarms
so the offsets could be committed separately

Changed configuration to match the new model.  Remove configuration
parameters that were no longer needed

Changed the name Disruptor to Pipeline

Allow only one EventHandler per pipeline

Added code to flush the Metrics and Alarms, and shutdown the Kafka ConsumerConnections on a normal shutdown. This keeps the persister from losing Metrics and Alarms.

Made measurementTimeStampSimpleDateFormat not static since SimpleDateFormat is not thread safe

Changed some logging debug statements so Strings weren't created if debug not on

Created FlushableHandler as a base class and moved duplicate code into it from MetricHandler and AlarmStateTransitionHistoryHandler

Change-Id: Id31a1d148f8e796f5be483dd02544be49c009b18

Changed MetricHandler to take MetricEnvelope[]

Change-Id: Ifabbe253cc0163f150ada2252a41a5d9fb9ab423
This commit is contained in:
Craig Bryant 2014-07-18 16:09:36 -06:00
parent d8a5b87235
commit dbab337d76
52 changed files with 971 additions and 1429 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
target/
*.classpath
*.project
*.settings/
debs/
logs/

View File

@ -100,12 +100,6 @@
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>com.vertica</groupId>
<artifactId>vertica-jdbc</artifactId>

View File

@ -18,10 +18,23 @@
package com.hpcloud.mon.persister;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.consumer.AlarmStateTransitionsConsumer;
import com.hpcloud.mon.persister.consumer.AlarmStateTransitionConsumer;
import com.hpcloud.mon.persister.consumer.AlarmStateTransitionConsumerFactory;
import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumer;
import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumerFactory;
import com.hpcloud.mon.persister.consumer.KafkaChannel;
import com.hpcloud.mon.persister.consumer.KafkaChannelFactory;
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumer;
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumerFactory;
import com.hpcloud.mon.persister.consumer.MetricsConsumer;
import com.hpcloud.mon.persister.consumer.MetricsConsumerFactory;
import com.hpcloud.mon.persister.healthcheck.SimpleHealthCheck;
import com.hpcloud.mon.persister.repository.RepositoryCommitHeartbeat;
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipelineFactory;
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
import com.hpcloud.mon.persister.pipeline.MetricPipelineFactory;
import com.hpcloud.mon.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory;
import com.hpcloud.mon.persister.pipeline.event.MetricHandlerFactory;
import com.hpcloud.mon.persister.resource.Resource;
import com.google.inject.Guice;
@ -31,7 +44,11 @@ import io.dropwizard.Application;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MonPersisterApplication extends Application<MonPersisterConfiguration> {
private static final Logger logger = LoggerFactory.getLogger(MonPersisterApplication.class);
public static void main(String[] args) throws Exception {
new MonPersisterApplication().run(args);
@ -58,16 +75,79 @@ public class MonPersisterApplication extends Application<MonPersisterConfigurati
// Sample health check.
environment.healthChecks().register("test-health-check", new SimpleHealthCheck());
MetricsConsumer metricsConsumer = injector.getInstance(MetricsConsumer.class);
environment.lifecycle().manage(metricsConsumer);
final KafkaChannelFactory kafkaChannelFactory = injector.getInstance(KafkaChannelFactory.class);
final MetricsConsumerFactory metricsConsumerFactory =
injector.getInstance(MetricsConsumerFactory.class);
final KafkaMetricsConsumerFactory kafkaMetricsConsumerFactory =
injector.getInstance(KafkaMetricsConsumerFactory.class);
for (int i = 0; i < configuration.getMetricConfiguration().getNumThreads(); i++) {
final KafkaChannel kafkaChannel =
kafkaChannelFactory.create(configuration, configuration.getMetricConfiguration(), i);
final MetricPipeline metricPipeline = getMetricPipeline(configuration, i, injector);
final KafkaMetricsConsumer kafkaMetricsConsumer =
kafkaMetricsConsumerFactory.create(kafkaChannel, i, metricPipeline);
MetricsConsumer metricsConsumer =
metricsConsumerFactory.create(kafkaMetricsConsumer, metricPipeline);
environment.lifecycle().manage(metricsConsumer);
}
AlarmStateTransitionsConsumer alarmStateTransitionsConsumer =
injector.getInstance(AlarmStateTransitionsConsumer.class);
environment.lifecycle().manage(alarmStateTransitionsConsumer);
final AlarmStateTransitionConsumerFactory alarmStateTransitionsConsumerFactory =
injector.getInstance(AlarmStateTransitionConsumerFactory.class);
final KafkaAlarmStateTransitionConsumerFactory kafkaAlarmStateTransitionConsumerFactory =
injector.getInstance(KafkaAlarmStateTransitionConsumerFactory.class);
for (int i = 0; i < configuration.getAlarmHistoryConfiguration().getNumThreads(); i++) {
final KafkaChannel kafkaChannel =
kafkaChannelFactory
.create(configuration, configuration.getAlarmHistoryConfiguration(), i);
final AlarmStateTransitionPipeline pipeline =
getAlarmStateHistoryPipeline(configuration, i, injector);
final KafkaAlarmStateTransitionConsumer kafkaAlarmStateTransitionConsumer =
kafkaAlarmStateTransitionConsumerFactory.create(kafkaChannel, i, pipeline);
AlarmStateTransitionConsumer alarmStateTransitionConsumer =
alarmStateTransitionsConsumerFactory.create(kafkaAlarmStateTransitionConsumer, pipeline);
environment.lifecycle().manage(alarmStateTransitionConsumer);
}
}
RepositoryCommitHeartbeat repositoryCommitHeartbeat =
injector.getInstance(RepositoryCommitHeartbeat.class);
environment.lifecycle().manage(repositoryCommitHeartbeat);
private MetricPipeline getMetricPipeline(MonPersisterConfiguration configuration, int threadNum,
Injector injector) {
logger.debug("Creating metric pipeline...");
final int batchSize = configuration.getMetricConfiguration().getBatchSize();
logger.debug("Batch size for metric pipeline [" + batchSize + "]");
MetricHandlerFactory metricEventHandlerFactory =
injector.getInstance(MetricHandlerFactory.class);
MetricPipelineFactory metricPipelineFactory = injector.getInstance(MetricPipelineFactory.class);
final MetricPipeline pipeline =
metricPipelineFactory.create(metricEventHandlerFactory.create(
configuration.getMetricConfiguration(), threadNum, batchSize));
logger.debug("Instance of metric pipeline fully created");
return pipeline;
}
public AlarmStateTransitionPipeline getAlarmStateHistoryPipeline(
MonPersisterConfiguration configuration, int threadNum, Injector injector) {
logger.debug("Creating alarm state history pipeline...");
int batchSize = configuration.getAlarmHistoryConfiguration().getBatchSize();
logger.debug("Batch size for each AlarmStateHistoryPipeline [" + batchSize + "]");
AlarmStateTransitionedEventHandlerFactory alarmHistoryEventHandlerFactory =
injector.getInstance(AlarmStateTransitionedEventHandlerFactory.class);
AlarmStateTransitionPipelineFactory alarmStateTransitionPipelineFactory =
injector.getInstance(AlarmStateTransitionPipelineFactory.class);
AlarmStateTransitionPipeline pipeline =
alarmStateTransitionPipelineFactory.create(alarmHistoryEventHandlerFactory.create(
configuration.getAlarmHistoryConfiguration(), threadNum, batchSize));
logger.debug("Instance of alarm state history pipeline fully created");
return pipeline;
}
}

View File

@ -18,36 +18,39 @@
package com.hpcloud.mon.persister;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.consumer.AlarmStateTransitionsConsumer;
import com.hpcloud.mon.persister.consumer.AlarmStateTransitionConsumer;
import com.hpcloud.mon.persister.consumer.AlarmStateTransitionConsumerFactory;
import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumer;
import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumerFactory;
import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumerRunnableBasic;
import com.hpcloud.mon.persister.consumer.KafkaAlarmStateTransitionConsumerRunnableBasicFactory;
import com.hpcloud.mon.persister.consumer.KafkaChannel;
import com.hpcloud.mon.persister.consumer.KafkaChannelFactory;
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumer;
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumerFactory;
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumerRunnableBasic;
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumerRunnableBasicFactory;
import com.hpcloud.mon.persister.consumer.KafkaStreams;
import com.hpcloud.mon.persister.consumer.KafkaStreamsProvider;
import com.hpcloud.mon.persister.consumer.MetricsConsumer;
import com.hpcloud.mon.persister.consumer.MetricsConsumerFactory;
import com.hpcloud.mon.persister.dbi.DBIProvider;
import com.hpcloud.mon.persister.disruptor.AlarmHistoryDisruptorProvider;
import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor;
import com.hpcloud.mon.persister.disruptor.DisruptorExceptionHandler;
import com.hpcloud.mon.persister.disruptor.MetricDisruptor;
import com.hpcloud.mon.persister.disruptor.MetricDisruptorProvider;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandler;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandlerFactory;
import com.hpcloud.mon.persister.disruptor.event.MetricHandler;
import com.hpcloud.mon.persister.disruptor.event.MetricHandlerFactory;
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipelineFactory;
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
import com.hpcloud.mon.persister.pipeline.MetricPipelineFactory;
import com.hpcloud.mon.persister.pipeline.event.AlarmStateTransitionedEventHandler;
import com.hpcloud.mon.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactory;
import com.hpcloud.mon.persister.pipeline.event.MetricHandler;
import com.hpcloud.mon.persister.pipeline.event.MetricHandlerFactory;
import com.hpcloud.mon.persister.repository.AlarmRepository;
import com.hpcloud.mon.persister.repository.InfluxDBAlarmRepository;
import com.hpcloud.mon.persister.repository.InfluxDBMetricRepository;
import com.hpcloud.mon.persister.repository.MetricRepository;
import com.hpcloud.mon.persister.repository.RepositoryCommitHeartbeat;
import com.hpcloud.mon.persister.repository.VerticaAlarmRepository;
import com.hpcloud.mon.persister.repository.VerticaMetricRepository;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.lmax.disruptor.ExceptionHandler;
import io.dropwizard.setup.Environment;
@ -85,12 +88,38 @@ public class MonPersisterModule extends AbstractModule {
KafkaAlarmStateTransitionConsumerRunnableBasic.class).build(
KafkaAlarmStateTransitionConsumerRunnableBasicFactory.class));
bind(ExceptionHandler.class).to(DisruptorExceptionHandler.class);
install(new FactoryModuleBuilder().implement(
KafkaMetricsConsumer.class,
KafkaMetricsConsumer.class).build(
KafkaMetricsConsumerFactory.class));
bind(MetricDisruptor.class).toProvider(MetricDisruptorProvider.class).in(Scopes.SINGLETON);
install(new FactoryModuleBuilder().implement(
MetricPipeline.class,
MetricPipeline.class).build(
MetricPipelineFactory.class));
bind(AlarmStateHistoryDisruptor.class).toProvider(AlarmHistoryDisruptorProvider.class).in(
Scopes.SINGLETON);
install(new FactoryModuleBuilder().implement(
AlarmStateTransitionPipeline.class,
AlarmStateTransitionPipeline.class).build(
AlarmStateTransitionPipelineFactory.class));
install(new FactoryModuleBuilder().implement(
AlarmStateTransitionConsumer.class,
AlarmStateTransitionConsumer.class).build(
AlarmStateTransitionConsumerFactory.class));
install(new FactoryModuleBuilder().implement(
KafkaAlarmStateTransitionConsumer.class,
KafkaAlarmStateTransitionConsumer.class).build(
KafkaAlarmStateTransitionConsumerFactory.class));
install(new FactoryModuleBuilder().implement(
MetricsConsumer.class,
MetricsConsumer.class).build(
MetricsConsumerFactory.class));
install(new FactoryModuleBuilder().implement(KafkaChannel.class, KafkaChannel.class).build(
KafkaChannelFactory.class));
if (configuration.getDatabaseConfiguration().getDatabaseType().equals("vertica")) {
bind(DBI.class).toProvider(DBIProvider.class).in(Scopes.SINGLETON);
@ -106,10 +135,5 @@ public class MonPersisterModule extends AbstractModule {
System.out.println("Check your config file.");
System.exit(1);
}
bind(KafkaStreams.class).toProvider(KafkaStreamsProvider.class).in(Scopes.SINGLETON);
bind(MetricsConsumer.class);
bind(AlarmStateTransitionsConsumer.class);
bind(RepositoryCommitHeartbeat.class);
}
}

View File

@ -1,33 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DeduperConfiguration {
@JsonProperty
Integer dedupeRunFrequencySeconds;
public Integer getDedupeRunFrequencySeconds() {
return dedupeRunFrequencySeconds;
}
}

View File

@ -1,40 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DisruptorConfiguration {
@JsonProperty
Integer bufferSize;
public Integer getBufferSize() {
return bufferSize;
}
@JsonProperty
Integer numProcessors;
public Integer getNumProcessors() {
return numProcessors;
}
}

View File

@ -27,18 +27,9 @@ public class KafkaConfiguration {
@JsonProperty
String topic;
@JsonProperty
Integer numThreads;
@JsonProperty
String groupId;
@JsonProperty
String zookeeperConnect;
@JsonProperty
String consumerId;
@JsonProperty
Integer socketTimeoutMs;
@ -48,12 +39,6 @@ public class KafkaConfiguration {
@JsonProperty
Integer fetchMessageMaxBytes;
@JsonProperty
Boolean autoCommitEnable;
@JsonProperty
Integer autoCommitIntervalMs;
@JsonProperty
Integer queuedMaxMessageChunks;
@ -78,9 +63,6 @@ public class KafkaConfiguration {
@JsonProperty
Integer consumerTimeoutMs;
@JsonProperty
String clientId;
@JsonProperty
Integer zookeeperSessionTimeoutMs;
@ -94,22 +76,10 @@ public class KafkaConfiguration {
return topic;
}
public Integer getNumThreads() {
return numThreads;
}
public String getGroupId() {
return groupId;
}
public String getZookeeperConnect() {
return zookeeperConnect;
}
public String getConsumerId() {
return consumerId;
}
public Integer getSocketTimeoutMs() {
return socketTimeoutMs;
}
@ -122,14 +92,6 @@ public class KafkaConfiguration {
return fetchMessageMaxBytes;
}
public Boolean getAutoCommitEnable() {
return autoCommitEnable;
}
public Integer getAutoCommitIntervalMs() {
return autoCommitIntervalMs;
}
public Integer getQueuedMaxMessageChunks() {
return queuedMaxMessageChunks;
}
@ -162,10 +124,6 @@ public class KafkaConfiguration {
return consumerTimeoutMs;
}
public String getClientId() {
return clientId;
}
public Integer getZookeeperSessionTimeoutMs() {
return zookeeperSessionTimeoutMs;
}

View File

@ -42,19 +42,19 @@ public class MonPersisterConfiguration extends Configuration {
@JsonProperty
@NotNull
@Valid
private final AlarmHistoryConfiguration alarmHistoryConfiguration =
new AlarmHistoryConfiguration();
private final PipelineConfiguration alarmHistoryConfiguration =
new PipelineConfiguration();
public AlarmHistoryConfiguration getAlarmHistoryConfiguration() {
public PipelineConfiguration getAlarmHistoryConfiguration() {
return alarmHistoryConfiguration;
}
@JsonProperty
@NotNull
@Valid
private final MetricConfiguration metricConfiguration = new MetricConfiguration();
private final PipelineConfiguration metricConfiguration = new PipelineConfiguration();
public MetricConfiguration getMetricConfiguration() {
public PipelineConfiguration getMetricConfiguration() {
return metricConfiguration;
}
@ -67,25 +67,6 @@ public class MonPersisterConfiguration extends Configuration {
return kafkaConfiguration;
}
@Valid
@NotNull
@JsonProperty
private final DisruptorConfiguration disruptorConfiguration = new DisruptorConfiguration();
public DisruptorConfiguration getDisruptorConfiguration() {
return disruptorConfiguration;
}
@Valid
@NotNull
@JsonProperty
private final OutputProcessorConfiguration outputProcessorConfiguration =
new OutputProcessorConfiguration();
public OutputProcessorConfiguration getOutputProcessorConfiguration() {
return outputProcessorConfiguration;
}
@JsonProperty
private final DataSourceFactory dataSourceFactory = new DataSourceFactory();
@ -93,15 +74,6 @@ public class MonPersisterConfiguration extends Configuration {
return dataSourceFactory;
}
@Valid
@NotNull
@JsonProperty
private final DeduperConfiguration monDeDuperConfiguration = new DeduperConfiguration();
public DeduperConfiguration getMonDeDuperConfiguration() {
return monDeDuperConfiguration;
}
@Valid
@NotNull
@JsonProperty

View File

@ -1,33 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
public class OutputProcessorConfiguration {
@JsonProperty
Integer batchSize;
public Integer getBatchSize() {
return batchSize;
}
}

View File

@ -0,0 +1,96 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
public class PipelineConfiguration {
@JsonProperty
String topic;
@JsonProperty
String groupId;
@JsonProperty
String consumerId;
@JsonProperty
String clientId;
@JsonProperty
Integer batchSize;
@JsonProperty
Integer numThreads;
@JsonProperty
Integer maxBatchTime;
public String getTopic() {
return topic;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getConsumerId() {
return consumerId;
}
public void setConsumerId(String consumerId) {
this.consumerId = consumerId;
}
public String getClientId() {
return clientId;
}
public void setTopic(String topic) {
this.topic = topic;
}
public void setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
}
public void setNumThreads(Integer numThreads) {
this.numThreads = numThreads;
}
public void setMaxBatchTime(Integer maxBatchTime) {
this.maxBatchTime = maxBatchTime;
}
public Integer getBatchSize() {
return batchSize;
}
public Integer getNumThreads() {
return numThreads;
}
public Integer getMaxBatchTime() {
return maxBatchTime;
}
}

View File

@ -17,16 +17,17 @@
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder;
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
public class AlarmStateTransitionsConsumer extends Consumer<AlarmStateTransitionedEventHolder> {
public class AlarmStateTransitionConsumer extends Consumer<AlarmStateTransitionedEvent> {
@Inject
public AlarmStateTransitionsConsumer(KafkaAlarmStateTransitionConsumer kafkaConsumer,
AlarmStateHistoryDisruptor disruptor) {
super(kafkaConsumer, disruptor);
public AlarmStateTransitionConsumer(@Assisted KafkaAlarmStateTransitionConsumer kafkaConsumer,
@Assisted AlarmStateTransitionPipeline pipeline) {
super(kafkaConsumer, pipeline);
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
public interface AlarmStateTransitionConsumerFactory {
AlarmStateTransitionConsumer create(KafkaAlarmStateTransitionConsumer kafkaConsumer,
AlarmStateTransitionPipeline pipeline);
}

View File

@ -17,7 +17,7 @@
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.persister.disruptor.ManagedDisruptor;
import com.hpcloud.mon.persister.pipeline.ManagedPipeline;
import com.google.inject.Inject;
@ -29,25 +29,25 @@ import org.slf4j.LoggerFactory;
public class Consumer<T> implements Managed {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
private final KafkaConsumer consumer;
private final ManagedDisruptor<T> disruptor;
private final KafkaConsumer<T> consumer;
private final ManagedPipeline<T> pipeline;
@Inject
public Consumer(KafkaConsumer kafkaConsumer, ManagedDisruptor<T> disruptor) {
public Consumer(KafkaConsumer<T> kafkaConsumer, ManagedPipeline<T> pipeline) {
this.consumer = kafkaConsumer;
this.disruptor = disruptor;
this.pipeline = pipeline;
}
@Override
public void start() throws Exception {
logger.debug("start");
consumer.run();
consumer.start();
}
@Override
public void stop() throws Exception {
logger.debug("stop");
consumer.stop();
disruptor.shutdown();
pipeline.shutdown();
}
}

View File

@ -17,29 +17,29 @@
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import kafka.consumer.KafkaStream;
public class KafkaAlarmStateTransitionConsumer extends KafkaConsumer {
public class KafkaAlarmStateTransitionConsumer extends KafkaConsumer<AlarmStateTransitionedEvent> {
@Inject
private KafkaAlarmStateTransitionConsumerRunnableBasicFactory factory;
private final AlarmStateTransitionPipeline pipeline;
@Inject
public KafkaAlarmStateTransitionConsumer(MonPersisterConfiguration configuration) {
super(configuration);
public KafkaAlarmStateTransitionConsumer(@Assisted KafkaChannel kafkaChannel,
@Assisted int threadNum, @Assisted final AlarmStateTransitionPipeline pipeline) {
super(kafkaChannel, threadNum);
this.pipeline = pipeline;
}
@Override
protected Runnable createRunnable(KafkaStream<byte[], byte[]> stream, int threadNumber) {
return factory.create(stream, threadNumber);
}
@Override
protected String getStreamName() {
return this.configuration.getAlarmHistoryConfiguration().getTopic();
protected KafkaConsumerRunnableBasic<AlarmStateTransitionedEvent> createRunnable(
KafkaChannel kafkaChannel, int threadNumber) {
return factory.create(pipeline, kafkaChannel, threadNumber);
}
}

View File

@ -15,16 +15,11 @@
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor.event;
package com.hpcloud.mon.persister.consumer;
import com.lmax.disruptor.EventFactory;
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
public class MetricFactory implements EventFactory<MetricHolder> {
public static final MetricFactory INSTANCE = new MetricFactory();
@Override
public MetricHolder newInstance() {
return new MetricHolder();
}
public interface KafkaAlarmStateTransitionConsumerFactory {
KafkaAlarmStateTransitionConsumer create(KafkaChannel kafkaChannel, int threadNum,
final AlarmStateTransitionPipeline pipeline);
}

View File

@ -18,67 +18,50 @@
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder;
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.lmax.disruptor.EventTranslator;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaAlarmStateTransitionConsumerRunnableBasic implements Runnable {
public class KafkaAlarmStateTransitionConsumerRunnableBasic extends
KafkaConsumerRunnableBasic<AlarmStateTransitionedEvent> {
private static final Logger logger = LoggerFactory
.getLogger(KafkaAlarmStateTransitionConsumerRunnableBasic.class);
private final KafkaStream<byte[], byte[]> stream;
private final int threadNumber;
private final AlarmStateHistoryDisruptor disruptor;
private final ObjectMapper objectMapper;
@Inject
public KafkaAlarmStateTransitionConsumerRunnableBasic(AlarmStateHistoryDisruptor disruptor,
@Assisted KafkaStream<byte[], byte[]> stream, @Assisted int threadNumber) {
this.stream = stream;
this.threadNumber = threadNumber;
this.disruptor = disruptor;
public KafkaAlarmStateTransitionConsumerRunnableBasic(@Assisted AlarmStateTransitionPipeline pipeline,
@Assisted KafkaChannel kafkaChannel, @Assisted int threadNumber) {
super(kafkaChannel, pipeline, threadNumber);
this.objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
objectMapper.enable(DeserializationFeature.UNWRAP_ROOT_VALUE);
}
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
@Override
protected void publishHeartbeat() {
publishEvent(null);
}
final String s = new String(it.next().message());
@Override
protected void handleMessage(String message) {
try {
final AlarmStateTransitionedEvent event =
objectMapper.readValue(message, AlarmStateTransitionedEvent.class);
logger.debug("Thread " + threadNumber + ": " + s);
logger.debug(event.toString());
try {
final AlarmStateTransitionedEvent event =
objectMapper.readValue(s, AlarmStateTransitionedEvent.class);
logger.debug(event.toString());
disruptor.publishEvent(new EventTranslator<AlarmStateTransitionedEventHolder>() {
@Override
public void translateTo(AlarmStateTransitionedEventHolder eventHolder, long sequence) {
eventHolder.setEvent(event);
}
});
} catch (Exception e) {
logger.error("Failed to deserialize JSON message and place on disruptor queue: " + s, e);
}
publishEvent(event);
} catch (Exception e) {
logger.error("Failed to deserialize JSON message and send to handler: " + message, e);
}
logger.debug("Shutting down Thread: " + threadNumber);
}
}

View File

@ -17,9 +17,9 @@
package com.hpcloud.mon.persister.consumer;
import kafka.consumer.KafkaStream;
import com.hpcloud.mon.persister.pipeline.AlarmStateTransitionPipeline;
public interface KafkaAlarmStateTransitionConsumerRunnableBasicFactory {
KafkaAlarmStateTransitionConsumerRunnableBasic create(KafkaStream<byte[], byte[]> stream,
KafkaAlarmStateTransitionConsumerRunnableBasic create(AlarmStateTransitionPipeline pipeline, KafkaChannel kafkaChannel,
int threadNumber);
}

View File

@ -0,0 +1,122 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.persister.configuration.KafkaConfiguration;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaChannel {
private static final String KAFKA_CONFIGURATION = "Kafka configuration:";
private static final Logger logger = LoggerFactory.getLogger(KafkaChannel.class);
private final String topic;
private final ConsumerConnector consumerConnector;
private final int threadNum;
@Inject
public KafkaChannel(@Assisted MonPersisterConfiguration configuration,
@Assisted PipelineConfiguration pipelineConfiguration, @Assisted int threadNum) {
this.topic = pipelineConfiguration.getTopic();
this.threadNum = threadNum;
Properties kafkaProperties =
createKafkaProperties(configuration.getKafkaConfiguration(), pipelineConfiguration);
consumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig(kafkaProperties));
}
public final void markRead() {
this.consumerConnector.commitOffsets();
}
public KafkaStream<byte[], byte[]> getKafkaStream() {
final Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(this.topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streamMap =
this.consumerConnector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = streamMap.values().iterator().next();
if (streams.size() != 1) {
throw new IllegalStateException(String.format(
"Expected only one stream but instead there are %d", streams.size()));
}
return streams.get(0);
}
public void stop() {
this.consumerConnector.shutdown();
}
private ConsumerConfig createConsumerConfig(Properties kafkaProperties) {
return new ConsumerConfig(kafkaProperties);
}
private Properties createKafkaProperties(KafkaConfiguration kafkaConfiguration,
final PipelineConfiguration pipelineConfiguration) {
Properties properties = new Properties();
properties.put("group.id", pipelineConfiguration.getGroupId());
properties.put("zookeeper.connect", kafkaConfiguration.getZookeeperConnect());
properties.put("consumer.id",
String.format("%s_%d", pipelineConfiguration.getConsumerId(), this.threadNum));
properties.put("socket.timeout.ms", kafkaConfiguration.getSocketTimeoutMs().toString());
properties.put("socket.receive.buffer.bytes", kafkaConfiguration.getSocketReceiveBufferBytes()
.toString());
properties.put("fetch.message.max.bytes", kafkaConfiguration.getFetchMessageMaxBytes()
.toString());
// Set auto commit to false because the persister is going to explicitly commit
properties.put("auto.commit.enable", "false");
properties.put("queued.max.message.chunks", kafkaConfiguration.getQueuedMaxMessageChunks()
.toString());
properties.put("rebalance.max.retries", kafkaConfiguration.getRebalanceMaxRetries().toString());
properties.put("fetch.min.bytes", kafkaConfiguration.getFetchMinBytes().toString());
properties.put("fetch.wait.max.ms", kafkaConfiguration.getFetchWaitMaxMs().toString());
properties.put("rebalance.backoff.ms", kafkaConfiguration.getRebalanceBackoffMs().toString());
properties.put("refresh.leader.backoff.ms", kafkaConfiguration.getRefreshLeaderBackoffMs()
.toString());
properties.put("auto.offset.reset", kafkaConfiguration.getAutoOffsetReset());
properties.put("consumer.timeout.ms", kafkaConfiguration.getConsumerTimeoutMs().toString());
properties.put("client.id", String.format("%s_%d", pipelineConfiguration.getClientId(), threadNum));
properties.put("zookeeper.session.timeout.ms", kafkaConfiguration
.getZookeeperSessionTimeoutMs().toString());
properties.put("zookeeper.connection.timeout.ms", kafkaConfiguration
.getZookeeperConnectionTimeoutMs().toString());
properties
.put("zookeeper.sync.time.ms", kafkaConfiguration.getZookeeperSyncTimeMs().toString());
for (String key : properties.stringPropertyNames()) {
logger.info(KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key));
}
return properties;
}
}

View File

@ -18,21 +18,9 @@
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
import javax.inject.Inject;
import javax.inject.Provider;
public class KafkaStreamsProvider implements Provider<KafkaStreams> {
private final MonPersisterConfiguration configuration;
@Inject
public KafkaStreamsProvider(MonPersisterConfiguration configuration) {
this.configuration = configuration;
}
@Override
public KafkaStreams get() {
return new KafkaStreams(configuration);
}
}
public interface KafkaChannelFactory {
KafkaChannel create(MonPersisterConfiguration configuration,
PipelineConfiguration pipelineConfiguration, int threadNum);
}

View File

@ -14,70 +14,53 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.google.inject.Inject;
import kafka.consumer.KafkaStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public abstract class KafkaConsumer {
public abstract class KafkaConsumer<T> {
private static final String KAFKA_CONFIGURATION = "Kafka configuration:";
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
private static final int WAIT_TIME = 10;
private static final int WAIT_TIME = 10;
protected final MonPersisterConfiguration configuration;
private ExecutorService executorService;
private final KafkaChannel kafkaChannel;
private final int threadNum;
private KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic;
private final Integer numThreads;
private ExecutorService executorService;
@Inject
private KafkaStreams kafkaStreams;
public KafkaConsumer(KafkaChannel kafkaChannel, int threadNum) {
this.kafkaChannel = kafkaChannel;
this.threadNum = threadNum;
}
protected abstract Runnable createRunnable(KafkaStream<byte[], byte[]> stream, int threadNumber);
protected abstract String getStreamName();
protected abstract KafkaConsumerRunnableBasic<T> createRunnable(KafkaChannel kafkaChannel,
int threadNumber);
@Inject
public KafkaConsumer(MonPersisterConfiguration configuration) {
public void start() {
executorService = Executors.newFixedThreadPool(1);
KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic =
createRunnable(kafkaChannel, this.threadNum);
executorService.submit(kafkaConsumerRunnableBasic);
}
this.configuration = configuration;
this.numThreads = configuration.getKafkaConfiguration().getNumThreads();
logger.info(KAFKA_CONFIGURATION + " numThreads = " + numThreads);
}
public void run() {
List<KafkaStream<byte[], byte[]>> streams = kafkaStreams.getStreams().get(getStreamName());
executorService = Executors.newFixedThreadPool(numThreads);
int threadNumber = 0;
for (final KafkaStream<byte[], byte[]> stream : streams) {
executorService.submit(createRunnable(stream, threadNumber));
threadNumber++;
}
}
public void stop() {
kafkaStreams.stop();
if (executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) {
logger.warn("Did not shut down in %d seconds", WAIT_TIME);
}
} catch (InterruptedException e) {
logger.info("awaitTerminiation interrupted", e);
}
public void stop() {
kafkaConsumerRunnableBasic.stop();
if (executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) {
logger.warn("Did not shut down in {} seconds", WAIT_TIME);
}
} catch (InterruptedException e) {
logger.info("awaitTerminiation interrupted", e);
}
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.persister.pipeline.ManagedPipeline;
import kafka.consumer.ConsumerIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class KafkaConsumerRunnableBasic<T> implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnableBasic.class);
private final KafkaChannel kafkaChannel;
private final int threadNumber;
private final ManagedPipeline<T> pipeline;
private volatile boolean stop = false;
public KafkaConsumerRunnableBasic(KafkaChannel kafkaChannel,
ManagedPipeline<T> pipeline,
int threadNumber) {
this.kafkaChannel = kafkaChannel;
this.pipeline = pipeline;
this.threadNumber = threadNumber;
}
abstract protected void publishHeartbeat();
abstract protected void handleMessage(String message);
protected void markRead() {
this.kafkaChannel.markRead();
}
public void stop() {
this.stop = true;
}
public void run() {
final ConsumerIterator<byte[], byte[]> it = kafkaChannel.getKafkaStream().iterator();
logger.debug("KafkaChannel {} has stream", this.threadNumber);
while (!this.stop) {
try {
if (it.hasNext()) {
final String s = new String(it.next().message());
logger.debug("Thread {}: {}", threadNumber, s);
handleMessage(s);
}
} catch (kafka.consumer.ConsumerTimeoutException cte) {
publishHeartbeat();
continue;
}
}
logger.debug("Shutting down Thread: {}", threadNumber);
this.kafkaChannel.stop();
}
protected void publishEvent(final T event) {
if (pipeline.publishEvent(event)) {
markRead();
}
}
}

View File

@ -17,29 +17,29 @@
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import kafka.consumer.KafkaStream;
public class KafkaMetricsConsumer extends KafkaConsumer {
public class KafkaMetricsConsumer extends KafkaConsumer<MetricEnvelope[]> {
@Inject
private KafkaMetricsConsumerRunnableBasicFactory factory;
private final MetricPipeline pipeline;
@Inject
public KafkaMetricsConsumer(MonPersisterConfiguration configuration) {
super(configuration);
public KafkaMetricsConsumer(@Assisted KafkaChannel kafkaChannel, @Assisted int threadNum,
@Assisted MetricPipeline pipeline) {
super(kafkaChannel, threadNum);
this.pipeline = pipeline;
}
@Override
protected Runnable createRunnable(KafkaStream<byte[], byte[]> stream, int threadNumber) {
return factory.create(stream, threadNumber);
}
@Override
protected String getStreamName() {
return this.configuration.getMetricConfiguration().getTopic();
protected KafkaConsumerRunnableBasic<MetricEnvelope[]> createRunnable(KafkaChannel kafkaChannel,
int threadNumber) {
return factory.create(pipeline, kafkaChannel, threadNumber);
}
}

View File

@ -15,16 +15,11 @@
* limitations under the License.
*/
package com.hpcloud.mon.persister.configuration;
package com.hpcloud.mon.persister.consumer;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
public class AlarmHistoryConfiguration {
@JsonProperty
String topic;
public String getTopic() {
return topic;
}
public interface KafkaMetricsConsumerFactory {
public KafkaMetricsConsumer create(KafkaChannel kafkaChannel, int threadNum,
MetricPipeline pipeline);
}

View File

@ -18,68 +18,49 @@
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
import com.hpcloud.mon.persister.disruptor.MetricDisruptor;
import com.hpcloud.mon.persister.disruptor.event.MetricHolder;
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.lmax.disruptor.EventTranslator;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaMetricsConsumerRunnableBasic implements Runnable {
public class KafkaMetricsConsumerRunnableBasic extends KafkaConsumerRunnableBasic<MetricEnvelope[]> {
private static final Logger logger = LoggerFactory
.getLogger(KafkaMetricsConsumerRunnableBasic.class);
private final KafkaStream<byte[], byte[]> stream;
private final int threadNumber;
private final MetricDisruptor disruptor;
private final ObjectMapper objectMapper;
@Inject
public KafkaMetricsConsumerRunnableBasic(MetricDisruptor disruptor,
@Assisted KafkaStream<byte[], byte[]> stream, @Assisted int threadNumber) {
this.stream = stream;
this.threadNumber = threadNumber;
this.disruptor = disruptor;
public KafkaMetricsConsumerRunnableBasic(@Assisted MetricPipeline pipeline,
@Assisted KafkaChannel kafkaChannel, @Assisted int threadNumber) {
super(kafkaChannel, pipeline, threadNumber);
this.objectMapper = new ObjectMapper();
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
}
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
@Override
protected void publishHeartbeat() {
publishEvent(null);
}
final String s = new String(it.next().message());
@Override
protected void handleMessage(String message) {
try {
final MetricEnvelope[] envelopes = objectMapper.readValue(message, MetricEnvelope[].class);
logger.debug("Thread {}: {}", threadNumber, s);
try {
final MetricEnvelope[] envelopes = objectMapper.readValue(s, MetricEnvelope[].class);
for (final MetricEnvelope envelope : envelopes) {
logger.debug("{}", envelope);
disruptor.publishEvent(new EventTranslator<MetricHolder>() {
@Override
public void translateTo(MetricHolder event, long sequence) {
event.setEnvelope(envelope);
}
});
}
} catch (Exception e) {
logger.error("Failed to deserialize JSON message and place on disruptor queue: {}", e);
for (final MetricEnvelope envelope : envelopes) {
logger.debug("{}", envelope);
}
publishEvent(envelopes);
} catch (Exception e) {
logger.error("Failed to deserialize JSON message and place on pipeline queue: " + message,
e);
}
logger.debug("Shutting down Thread: {}", threadNumber);
}
}

View File

@ -17,8 +17,8 @@
package com.hpcloud.mon.persister.consumer;
import kafka.consumer.KafkaStream;
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
public interface KafkaMetricsConsumerRunnableBasicFactory {
KafkaMetricsConsumerRunnableBasic create(KafkaStream<byte[], byte[]> stream, int threadNumber);
KafkaMetricsConsumerRunnableBasic create(MetricPipeline pipeline, KafkaChannel kafkaChannel, int threadNumber);
}

View File

@ -1,112 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.persister.configuration.KafkaConfiguration;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaStreams {
private static final String KAFKA_CONFIGURATION = "Kafka configuration:";
private static final Logger logger = LoggerFactory.getLogger(KafkaStreams.class);
private final MonPersisterConfiguration configuration;
private final ConsumerConnector consumerConnector;
private final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap;
public KafkaStreams(MonPersisterConfiguration configuration) {
this.configuration = configuration;
Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfiguration());
ConsumerConfig consumerConfig = createConsumerConfig(kafkaProperties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<>();
Integer numThreads = configuration.getKafkaConfiguration().getNumThreads();
topicCountMap.put(this.configuration.getMetricConfiguration().getTopic(), (int) numThreads);
topicCountMap.put(this.configuration.getAlarmHistoryConfiguration().getTopic(),
(int) numThreads);
consumerMap = consumerConnector.createMessageStreams(topicCountMap);
}
public final Map<String, List<KafkaStream<byte[], byte[]>>> getStreams() {
return consumerMap;
}
private ConsumerConfig createConsumerConfig(Properties kafkaProperties) {
return new ConsumerConfig(kafkaProperties);
}
private Properties createKafkaProperties(KafkaConfiguration metricsKafkaConfiguration) {
Properties properties = new Properties();
properties.put("group.id", metricsKafkaConfiguration.getGroupId());
properties.put("zookeeper.connect", metricsKafkaConfiguration.getZookeeperConnect());
properties.put("consumer.id", metricsKafkaConfiguration.getConsumerId());
properties.put("socket.timeout.ms", metricsKafkaConfiguration.getSocketTimeoutMs().toString());
properties.put("socket.receive.buffer.bytes", metricsKafkaConfiguration
.getSocketReceiveBufferBytes().toString());
properties.put("fetch.message.max.bytes", metricsKafkaConfiguration.getFetchMessageMaxBytes()
.toString());
properties
.put("auto.commit.enable", metricsKafkaConfiguration.getAutoCommitEnable().toString());
properties.put("auto.commit.interval.ms", metricsKafkaConfiguration.getAutoCommitIntervalMs()
.toString());
properties.put("queued.max.message.chunks", metricsKafkaConfiguration
.getQueuedMaxMessageChunks().toString());
properties.put("rebalance.max.retries", metricsKafkaConfiguration.getRebalanceMaxRetries()
.toString());
properties.put("fetch.min.bytes", metricsKafkaConfiguration.getFetchMinBytes().toString());
properties.put("fetch.wait.max.ms", metricsKafkaConfiguration.getFetchWaitMaxMs().toString());
properties.put("rebalance.backoff.ms", metricsKafkaConfiguration.getRebalanceBackoffMs()
.toString());
properties.put("refresh.leader.backoff.ms", metricsKafkaConfiguration
.getRefreshLeaderBackoffMs().toString());
properties.put("auto.offset.reset", metricsKafkaConfiguration.getAutoOffsetReset());
properties.put("consumer.timeout.ms", metricsKafkaConfiguration.getConsumerTimeoutMs()
.toString());
properties.put("client.id", metricsKafkaConfiguration.getClientId());
properties.put("zookeeper.session.timeout.ms", metricsKafkaConfiguration
.getZookeeperSessionTimeoutMs().toString());
properties.put("zookeeper.connection.timeout.ms", metricsKafkaConfiguration
.getZookeeperConnectionTimeoutMs().toString());
properties.put("zookeeper.sync.time.ms", metricsKafkaConfiguration.getZookeeperSyncTimeMs()
.toString());
for (String key : properties.stringPropertyNames()) {
logger.info(KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key));
}
return properties;
}
public void stop() {
consumerConnector.shutdown();
}
}

View File

@ -17,15 +17,16 @@
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.persister.disruptor.MetricDisruptor;
import com.hpcloud.mon.persister.disruptor.event.MetricHolder;
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
public class MetricsConsumer extends Consumer<MetricHolder> {
public class MetricsConsumer extends Consumer<MetricEnvelope[]> {
@Inject
public MetricsConsumer(KafkaMetricsConsumer kafkaConsumer, MetricDisruptor disruptor) {
super(kafkaConsumer, disruptor);
public MetricsConsumer(@Assisted KafkaMetricsConsumer kafkaConsumer, @Assisted MetricPipeline pipeline) {
super(kafkaConsumer, pipeline);
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.hpcloud.mon.persister.consumer;
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
public interface MetricsConsumerFactory {
MetricsConsumer create(KafkaMetricsConsumer kafkaConsumer, MetricPipeline pipeline);
}

View File

@ -1,94 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventFactory;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandler;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandlerFactory;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.lmax.disruptor.ExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class AlarmHistoryDisruptorProvider implements Provider<AlarmStateHistoryDisruptor> {
private static final Logger logger = LoggerFactory.getLogger(AlarmHistoryDisruptorProvider.class);
private final MonPersisterConfiguration configuration;
private final AlarmStateTransitionedEventHandlerFactory eventHandlerFactory;
private final ExceptionHandler exceptionHandler;
private final AlarmStateHistoryDisruptor instance;
@Inject
public AlarmHistoryDisruptorProvider(MonPersisterConfiguration configuration,
AlarmStateTransitionedEventHandlerFactory eventHandlerFactory,
ExceptionHandler exceptionHandler) {
this.configuration = configuration;
this.eventHandlerFactory = eventHandlerFactory;
this.exceptionHandler = exceptionHandler;
this.instance = createInstance();
}
private AlarmStateHistoryDisruptor createInstance() {
logger.debug("Creating disruptor...");
Executor executor = Executors.newCachedThreadPool();
AlarmStateTransitionedEventFactory eventFactory = new AlarmStateTransitionedEventFactory();
int bufferSize = configuration.getDisruptorConfiguration().getBufferSize();
logger.debug("Buffer size for instance of disruptor [" + bufferSize + "]");
AlarmStateHistoryDisruptor disruptor =
new AlarmStateHistoryDisruptor(eventFactory, bufferSize, executor);
disruptor.handleExceptionsWith(exceptionHandler);
int batchSize = configuration.getOutputProcessorConfiguration().getBatchSize();
logger.debug("Batch size for each output processor [" + batchSize + "]");
int numOutputProcessors = configuration.getDisruptorConfiguration().getNumProcessors();
logger.debug("Number of output processors [" + numOutputProcessors + "]");
AlarmStateTransitionedEventHandler[] eventHandlers =
new AlarmStateTransitionedEventHandler[numOutputProcessors];
for (int i = 0; i < numOutputProcessors; ++i) {
eventHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize);
}
disruptor.handleEventsWith(eventHandlers);
disruptor.setHandlers(eventHandlers);
disruptor.start();
logger.debug("Instance of disruptor successfully started");
logger.debug("Instance of disruptor fully created");
return disruptor;
}
public AlarmStateHistoryDisruptor get() {
return instance;
}
}

View File

@ -1,39 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.Executor;
public class AlarmStateHistoryDisruptor extends ManagedDisruptor<AlarmStateTransitionedEventHolder> {
public AlarmStateHistoryDisruptor(EventFactory<AlarmStateTransitionedEventHolder> eventFactory,
int ringBufferSize, Executor executor) {
super(eventFactory, ringBufferSize, executor);
}
public AlarmStateHistoryDisruptor(
final EventFactory<AlarmStateTransitionedEventHolder> eventFactory, int ringBufferSize,
Executor executor, ProducerType producerType, WaitStrategy waitStrategy) {
super(eventFactory, ringBufferSize, executor, producerType, waitStrategy);
}
}

View File

@ -1,49 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor;
import com.lmax.disruptor.ExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DisruptorExceptionHandler implements ExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(DisruptorExceptionHandler.class);
@Override
public void handleEventException(Throwable ex, long sequence, Object event) {
logger.error("Disruptor encountered an exception during normal operation", ex);
throw new RuntimeException(ex);
}
@Override
public void handleOnStartException(Throwable ex) {
logger.error("Disruptor encountered an exception during startup", ex);
throw new RuntimeException(ex);
}
@Override
public void handleOnShutdownException(Throwable ex) {
logger.error("Disruptor encountered an exception during shutdown", ex);
throw new RuntimeException(ex);
}
}

View File

@ -1,52 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor;
import com.hpcloud.mon.persister.disruptor.event.FlushableHandler;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.Executor;
public class ManagedDisruptor<T> extends Disruptor<T> {
private FlushableHandler[] handlers = new FlushableHandler[0];
public ManagedDisruptor(EventFactory<T> eventFactory, int ringBufferSize, Executor executor) {
super(eventFactory, ringBufferSize, executor);
}
public ManagedDisruptor(final EventFactory<T> eventFactory, int ringBufferSize,
Executor executor, ProducerType producerType, WaitStrategy waitStrategy) {
super(eventFactory, ringBufferSize, executor, producerType, waitStrategy);
}
@Override
public void shutdown() {
for (FlushableHandler handler : handlers) {
handler.flush();
}
super.shutdown();
}
public void setHandlers(FlushableHandler[] handlers) {
this.handlers = handlers;
}
}

View File

@ -1,39 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor;
import com.hpcloud.mon.persister.disruptor.event.MetricHolder;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.Executor;
public class MetricDisruptor extends ManagedDisruptor<MetricHolder> {
public MetricDisruptor(EventFactory<MetricHolder> eventFactory, int ringBufferSize,
Executor executor) {
super(eventFactory, ringBufferSize, executor);
}
public MetricDisruptor(final EventFactory<MetricHolder> eventFactory, int ringBufferSize,
Executor executor, ProducerType producerType, WaitStrategy waitStrategy) {
super(eventFactory, ringBufferSize, executor, producerType, waitStrategy);
}
}

View File

@ -1,92 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.disruptor.event.MetricFactory;
import com.hpcloud.mon.persister.disruptor.event.MetricHandler;
import com.hpcloud.mon.persister.disruptor.event.MetricHandlerFactory;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.lmax.disruptor.ExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class MetricDisruptorProvider implements Provider<MetricDisruptor> {
private static final Logger logger = LoggerFactory.getLogger(MetricDisruptorProvider.class);
private final MonPersisterConfiguration configuration;
private final MetricHandlerFactory eventHandlerFactory;
private final ExceptionHandler exceptionHandler;
private final MetricDisruptor instance;
@Inject
public MetricDisruptorProvider(MonPersisterConfiguration configuration,
MetricHandlerFactory eventHandlerFactory, ExceptionHandler exceptionHandler) {
this.configuration = configuration;
this.eventHandlerFactory = eventHandlerFactory;
this.exceptionHandler = exceptionHandler;
this.instance = createInstance();
}
private MetricDisruptor createInstance() {
logger.debug("Creating disruptor...");
Executor executor = Executors.newCachedThreadPool();
MetricFactory eventFactory = new MetricFactory();
int bufferSize = configuration.getDisruptorConfiguration().getBufferSize();
logger.debug("Buffer size for instance of disruptor [" + bufferSize + "]");
MetricDisruptor disruptor = new MetricDisruptor(eventFactory, bufferSize, executor);
disruptor.handleExceptionsWith(exceptionHandler);
int batchSize = configuration.getOutputProcessorConfiguration().getBatchSize();
logger.debug("Batch size for each output processor [" + batchSize + "]");
int numOutputProcessors = configuration.getDisruptorConfiguration().getNumProcessors();
logger.debug("Number of output processors [" + numOutputProcessors + "]");
MetricHandler[] metricHandlers = new MetricHandler[numOutputProcessors];
for (int i = 0; i < numOutputProcessors; ++i) {
metricHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize);
}
disruptor.handleEventsWith(metricHandlers);
disruptor.setHandlers(metricHandlers);
disruptor.start();
logger.debug("Instance of disruptor successfully started");
logger.debug("Instance of disruptor fully created");
return disruptor;
}
public MetricDisruptor get() {
return instance;
}
}

View File

@ -1,29 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor.event;
import com.lmax.disruptor.EventFactory;
public class AlarmStateTransitionedEventFactory implements
EventFactory<AlarmStateTransitionedEventHolder> {
@Override
public AlarmStateTransitionedEventHolder newInstance() {
return new AlarmStateTransitionedEventHolder();
}
}

View File

@ -1,124 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor.event;
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.repository.AlarmRepository;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.lmax.disruptor.EventHandler;
import io.dropwizard.setup.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlarmStateTransitionedEventHandler implements
EventHandler<AlarmStateTransitionedEventHolder>, FlushableHandler {
private static final Logger logger = LoggerFactory
.getLogger(AlarmStateTransitionedEventHandler.class);
private final int ordinal;
private final int numProcessors;
private final int batchSize;
private long millisSinceLastFlush = System.currentTimeMillis();
private final long millisBetweenFlushes;
private final int secondsBetweenFlushes;
private final AlarmRepository repository;
private final Environment environment;
private final Meter processedMeter;
private final Meter commitMeter;
private final Timer commitTimer;
@Inject
public AlarmStateTransitionedEventHandler(AlarmRepository repository,
MonPersisterConfiguration configuration, Environment environment,
@Assisted("ordinal") int ordinal, @Assisted("numProcessors") int numProcessors,
@Assisted("batchSize") int batchSize) {
this.repository = repository;
this.environment = environment;
this.processedMeter =
this.environment.metrics().meter(
this.getClass().getName() + "." + "alarm-messages-processed-processedMeter");
this.commitMeter =
this.environment.metrics().meter(
this.getClass().getName() + "." + "commits-executed-processedMeter");
this.commitTimer =
this.environment.metrics().timer(
this.getClass().getName() + "." + "total-commit-and-flush-timer");
this.secondsBetweenFlushes =
configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds();
this.millisBetweenFlushes = secondsBetweenFlushes * 1000;
this.ordinal = ordinal;
this.numProcessors = numProcessors;
this.batchSize = batchSize;
}
@Override
public void onEvent(AlarmStateTransitionedEventHolder eventHolder, long sequence, boolean b)
throws Exception {
if (eventHolder.getEvent() == null) {
logger.debug("Received heartbeat message. Checking last flush time.");
if (millisSinceLastFlush + millisBetweenFlushes < System.currentTimeMillis()) {
logger.debug("It's been more than " + secondsBetweenFlushes
+ " seconds since last flush. Flushing staging tables now...");
flush();
} else {
logger.debug("It has not been more than " + secondsBetweenFlushes
+ " seconds since last flush. No need to perform flush at this time.");
}
return;
}
if (((sequence / batchSize) % this.numProcessors) != this.ordinal) {
return;
}
processedMeter.mark();
logger.debug("Sequence number: " + sequence + " Ordinal: " + ordinal + " Event: "
+ eventHolder.getEvent());
AlarmStateTransitionedEvent event = eventHolder.getEvent();
repository.addToBatch(event);
if (sequence % batchSize == (batchSize - 1)) {
Timer.Context context = commitTimer.time();
flush();
context.stop();
commitMeter.mark();
}
}
@Override
public void flush() {
repository.flush();
millisSinceLastFlush = System.currentTimeMillis();
}
}

View File

@ -15,18 +15,17 @@
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor.event;
package com.hpcloud.mon.persister.pipeline;
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
import com.hpcloud.mon.persister.pipeline.event.AlarmStateTransitionedEventHandler;
public class AlarmStateTransitionedEventHolder {
AlarmStateTransitionedEvent event;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
public AlarmStateTransitionedEvent getEvent() {
return event;
}
public void setEvent(AlarmStateTransitionedEvent event) {
this.event = event;
public class AlarmStateTransitionPipeline extends ManagedPipeline<AlarmStateTransitionedEvent> {
@Inject
public AlarmStateTransitionPipeline(@Assisted AlarmStateTransitionedEventHandler handler) {
super(handler);
}
}

View File

@ -15,16 +15,10 @@
* limitations under the License.
*/
package com.hpcloud.mon.persister.configuration;
package com.hpcloud.mon.persister.pipeline;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.hpcloud.mon.persister.pipeline.event.AlarmStateTransitionedEventHandler;
public class MetricConfiguration {
@JsonProperty
String topic;
public String getTopic() {
return topic;
}
public interface AlarmStateTransitionPipelineFactory {
AlarmStateTransitionPipeline create(AlarmStateTransitionedEventHandler handler);
}

View File

@ -0,0 +1,46 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.pipeline;
import com.hpcloud.mon.persister.pipeline.event.FlushableHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ManagedPipeline<T> {
private static final Logger logger = LoggerFactory.getLogger(ManagedPipeline.class);
private final FlushableHandler<T> eventHandler;
public ManagedPipeline(FlushableHandler<T> eventHandler) {
this.eventHandler = eventHandler;
}
public void shutdown() {
eventHandler.flush();
}
public boolean publishEvent(T holder) {
try {
return this.eventHandler.onEvent(holder);
} catch (Exception e) {
logger.error("Failed to handle event", e);
return false;
}
}
}

View File

@ -15,19 +15,18 @@
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor.event;
package com.hpcloud.mon.persister.pipeline;
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
import com.hpcloud.mon.persister.pipeline.event.MetricHandler;
public class MetricHolder {
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
MetricEnvelope metricEnvelope;
public class MetricPipeline extends ManagedPipeline<MetricEnvelope[]> {
public MetricEnvelope getMetricEnvelope() {
return metricEnvelope;
}
public void setEnvelope(MetricEnvelope metricEnvelope) {
this.metricEnvelope = metricEnvelope;
@Inject
public MetricPipeline(@Assisted MetricHandler metricHandler) {
super(metricHandler);
}
}

View File

@ -15,8 +15,10 @@
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor.event;
package com.hpcloud.mon.persister.pipeline;
public interface FlushableHandler {
public void flush();
import com.hpcloud.mon.persister.pipeline.event.MetricHandler;
public interface MetricPipelineFactory {
MetricPipeline create(MetricHandler metricHandler);
}

View File

@ -0,0 +1,64 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.pipeline.event;
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
import com.hpcloud.mon.persister.repository.AlarmRepository;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import io.dropwizard.setup.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlarmStateTransitionedEventHandler extends
FlushableHandler<AlarmStateTransitionedEvent> {
private static final Logger logger = LoggerFactory
.getLogger(AlarmStateTransitionedEventHandler.class);
private final AlarmRepository repository;
private final int ordinal;
@Inject
public AlarmStateTransitionedEventHandler(AlarmRepository repository,
@Assisted PipelineConfiguration configuration, Environment environment,
@Assisted("ordinal") int ordinal,
@Assisted("batchSize") int batchSize) {
super(configuration, environment, ordinal, batchSize,
AlarmStateTransitionedEventHandler.class.getName());
this.repository = repository;
this.ordinal = ordinal;
}
@Override
protected int process(AlarmStateTransitionedEvent event) throws Exception {
logger.debug("Ordinal: Event: {}", this.ordinal, event);
repository.addToBatch(event);
return 1;
}
@Override
protected void flushRepository() {
repository.flush();
}
}

View File

@ -15,11 +15,13 @@
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor.event;
package com.hpcloud.mon.persister.pipeline.event;
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
import com.google.inject.assistedinject.Assisted;
public interface AlarmStateTransitionedEventHandlerFactory {
AlarmStateTransitionedEventHandler create(@Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors, @Assisted("batchSize") int batchSize);
AlarmStateTransitionedEventHandler create(PipelineConfiguration configuration,
@Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize);
}

View File

@ -0,0 +1,116 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.pipeline.event;
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import io.dropwizard.setup.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class FlushableHandler<T> {
private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class);
private final int ordinal;
private final int batchSize;
private final String handlerName;
private long millisSinceLastFlush = System.currentTimeMillis();
private final long millisBetweenFlushes;
private final int secondsBetweenFlushes;
private int eventCount = 0;
private final Environment environment;
private final Meter processedMeter;
private final Meter commitMeter;
private final Timer commitTimer;
protected FlushableHandler(PipelineConfiguration configuration, Environment environment,
int ordinal, int batchSize, String baseName) {
this.handlerName = String.format("%s[%d]", baseName, ordinal);
this.environment = environment;
this.processedMeter =
this.environment.metrics()
.meter(handlerName + "." + "events-processed-processedMeter");
this.commitMeter =
this.environment.metrics().meter(handlerName + "." + "commits-executed-processedMeter");
this.commitTimer =
this.environment.metrics().timer(handlerName + "." + "total-commit-and-flush-timer");
this.secondsBetweenFlushes = configuration.getMaxBatchTime();
this.millisBetweenFlushes = secondsBetweenFlushes * 1000;
this.ordinal = ordinal;
this.batchSize = batchSize;
}
protected abstract void flushRepository();
protected abstract int process(T metricEvent) throws Exception;
public boolean onEvent(final T event) throws Exception {
if (event == null) {
long delta = millisSinceLastFlush + millisBetweenFlushes;
logger.debug("{} received heartbeat message, flush every {} seconds.", this.handlerName,
this.secondsBetweenFlushes);
if (delta < System.currentTimeMillis()) {
logger.debug("{}: {} seconds since last flush. Flushing to repository now.",
this.handlerName, delta);
flush();
return true;
} else {
logger.debug("{}: {} seconds since last flush. No need to flush at this time.",
this.handlerName, delta);
return false;
}
}
processedMeter.mark();
logger.debug("Ordinal: Event: {}", ordinal, event);
eventCount += process(event);
if (eventCount >= batchSize) {
flush();
return true;
} else {
return false;
}
}
public void flush() {
if (eventCount == 0) {
logger.debug("{}: Nothing to flush", this.handlerName);
}
Timer.Context context = commitTimer.time();
flushRepository();
context.stop();
commitMeter.mark();
millisSinceLastFlush = System.currentTimeMillis();
logger.debug("{}: Flushed {} events", this.handlerName, this.eventCount);
eventCount = 0;
}
}

View File

@ -15,21 +15,19 @@
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor.event;
package com.hpcloud.mon.persister.pipeline.event;
import static com.hpcloud.mon.persister.repository.VerticaMetricsConstants.MAX_COLUMN_LENGTH;
import com.hpcloud.mon.common.model.metric.Metric;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
import com.hpcloud.mon.persister.repository.MetricRepository;
import com.hpcloud.mon.persister.repository.Sha1HashId;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.lmax.disruptor.EventHandler;
import io.dropwizard.setup.Environment;
@ -43,106 +41,65 @@ import java.util.Map;
import java.util.TimeZone;
import java.util.TreeMap;
public class MetricHandler implements EventHandler<MetricHolder>, FlushableHandler {
public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
private static final Logger logger = LoggerFactory.getLogger(MetricHandler.class);
private static final String TENANT_ID = "tenantId";
private static final String REGION = "region";
private final int ordinal;
private final int numProcessors;
private final int batchSize;
private final SimpleDateFormat simpleDateFormat;
private long millisSinceLastFlush = System.currentTimeMillis();
private final long millisBetweenFlushes;
private final int secondsBetweenFlushes;
private final MetricRepository verticaMetricRepository;
private final Environment environment;
private final Counter metricCounter;
private final Counter definitionCounter;
private final Counter dimensionCounter;
private final Counter definitionDimensionsCounter;
private final Meter metricMeter;
private final Meter commitMeter;
private final Timer commitTimer;
@Inject
public MetricHandler(MetricRepository metricRepository, MonPersisterConfiguration configuration,
public MetricHandler(MetricRepository metricRepository, @Assisted PipelineConfiguration configuration,
Environment environment, @Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors, @Assisted("batchSize") int batchSize) {
@Assisted("batchSize") int batchSize) {
super(configuration, environment, ordinal, batchSize, MetricHandler.class.getName());
final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal);
this.verticaMetricRepository = metricRepository;
this.environment = environment;
this.metricCounter =
this.environment.metrics().counter(
this.getClass().getName() + "." + "metrics-added-to-batch-counter");
environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter");
this.definitionCounter =
this.environment.metrics().counter(
this.getClass().getName() + "." + "metric-definitions-added-to-batch-counter");
environment.metrics().counter(
handlerName + "." + "metric-definitions-added-to-batch-counter");
this.dimensionCounter =
this.environment.metrics().counter(
this.getClass().getName() + "." + "metric-dimensions-added-to-batch-counter");
environment.metrics().counter(
handlerName + "." + "metric-dimensions-added-to-batch-counter");
this.definitionDimensionsCounter =
this.environment.metrics()
.counter(
this.getClass().getName() + "."
+ "metric-definition-dimensions-added-to-batch-counter");
this.metricMeter =
this.environment.metrics().meter(
this.getClass().getName() + "." + "metrics-messages-processed-meter");
this.commitMeter =
this.environment.metrics()
.meter(this.getClass().getName() + "." + "commits-executed-meter");
this.commitTimer =
this.environment.metrics().timer(
this.getClass().getName() + "." + "total-commit-and-flush-timer");
this.secondsBetweenFlushes =
configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds();
this.millisBetweenFlushes = secondsBetweenFlushes * 1000;
environment.metrics().counter(
handlerName + "." + "metric-definition-dimensions-added-to-batch-counter");
this.ordinal = ordinal;
this.numProcessors = numProcessors;
this.batchSize = batchSize;
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT-0"));
}
@Override
public void onEvent(MetricHolder metricEvent, long sequence, boolean b) throws Exception {
if (metricEvent.getMetricEnvelope() == null) {
logger.debug("Received heartbeat message. Checking last flush time.");
if (millisSinceLastFlush + millisBetweenFlushes < System.currentTimeMillis()) {
logger.debug("It's been more than " + secondsBetweenFlushes
+ " seconds since last flush. Flushing staging tables now...");
flush();
} else {
logger.debug("It has not been more than " + secondsBetweenFlushes
+ " seconds since last flush. No need to perform flush at this time.");
}
return;
public int process(MetricEnvelope[] metricEnvelopes) throws Exception {
int metricCount = 0;
for (final MetricEnvelope metricEnvelope : metricEnvelopes) {
metricCount += processEnvelope(metricEnvelope);
}
return metricCount;
}
if (((sequence / batchSize) % this.numProcessors) != this.ordinal) {
return;
}
private int processEnvelope(MetricEnvelope metricEnvelope) {
int metricCount = 0;
Metric metric = metricEnvelope.metric;
Map<String, Object> meta = metricEnvelope.meta;
metricMeter.mark();
Metric metric = metricEvent.getMetricEnvelope().metric;
Map<String, Object> meta = metricEvent.getMetricEnvelope().meta;
logger.debug("sequence number: " + sequence);
logger.debug("ordinal: " + ordinal);
logger.debug("metric: " + metric.toString());
logger.debug("meta: " + meta.toString());
logger.debug("ordinal: {}", ordinal);
logger.debug("metric: {}", metric);
logger.debug("meta: {}", meta);
String tenantId = "";
if (meta.containsKey(TENANT_ID)) {
@ -225,27 +182,21 @@ public class MetricHandler implements EventHandler<MetricHolder>, FlushableHandl
double value = timeValuePairs[1];
verticaMetricRepository.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value);
metricCounter.inc();
metricCount++;
}
} else {
String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp() * 1000));
double value = metric.getValue();
verticaMetricRepository.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value);
metricCounter.inc();
metricCount++;
}
if (sequence % batchSize == (batchSize - 1)) {
Timer.Context context = commitTimer.time();
flush();
context.stop();
commitMeter.mark();
}
return metricCount;
}
@Override
public void flush() {
public void flushRepository() {
verticaMetricRepository.flush();
millisSinceLastFlush = System.currentTimeMillis();
}
private String trunc(String s, int l) {
@ -261,6 +212,5 @@ public class MetricHandler implements EventHandler<MetricHolder>, FlushableHandl
logger.warn("Resulting string {}", r);
return r;
}
}
}

View File

@ -15,11 +15,13 @@
* limitations under the License.
*/
package com.hpcloud.mon.persister.disruptor.event;
package com.hpcloud.mon.persister.pipeline.event;
import com.hpcloud.mon.persister.configuration.PipelineConfiguration;
import com.google.inject.assistedinject.Assisted;
public interface MetricHandlerFactory {
MetricHandler create(@Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors, @Assisted("batchSize") int batchSize);
MetricHandler create(PipelineConfiguration pipelineConfiguration,
@Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize);
}

View File

@ -95,7 +95,7 @@ public class InfluxDBAlarmRepository implements AlarmRepository {
Timer.Context context = flushTimer.time();
Serie serie = new Serie(ALARM_STATE_HISTORY_NAME);
logger.debug("Created serie: " + serie.getName());
logger.debug("Created serie: {}", serie.getName());
serie.setColumns(this.colNamesStringArry);
@ -131,7 +131,7 @@ public class InfluxDBAlarmRepository implements AlarmRepository {
context.stop();
long endTime = System.currentTimeMillis();
logger.debug("Commiting batch took " + (endTime - startTime) / 1000 + " seconds");
logger.debug("Commiting batch took {} seconds", (endTime - startTime) / 1000);
} catch (Exception e) {
logger.error("Failed to write alarm state history to database", e);
@ -154,7 +154,7 @@ public class InfluxDBAlarmRepository implements AlarmRepository {
}
sb.append(colVal);
}
logger.debug("Array of column values[{}]: [" + sb.toString() + "]", outerIdx);
logger.debug("Array of column values[{}]: [{}]", outerIdx, sb);
outerIdx++;
}
}
@ -171,6 +171,6 @@ public class InfluxDBAlarmRepository implements AlarmRepository {
}
sb.append(colName);
}
logger.debug("Array of column names: [" + sb.toString() + "]");
logger.debug("Array of column names: [{}]", sb);
}
}

View File

@ -17,11 +17,13 @@
package com.hpcloud.mon.persister.repository;
import com.google.inject.Inject;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.google.inject.Inject;
import io.dropwizard.setup.Environment;
import org.apache.commons.codec.digest.DigestUtils;
import org.influxdb.InfluxDB;
@ -41,8 +43,6 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import io.dropwizard.setup.Environment;
public class InfluxDBMetricRepository implements MetricRepository {
private static final Logger logger = LoggerFactory.getLogger(InfluxDBMetricRepository.class);
@ -59,7 +59,7 @@ public class InfluxDBMetricRepository implements MetricRepository {
private final com.codahale.metrics.Timer flushTimer;
public final Meter measurementMeter;
private static final SimpleDateFormat measurementTimeStampSimpleDateFormat = new
private final SimpleDateFormat measurementTimeStampSimpleDateFormat = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss zzz");
private static final Sha1HashId BLANK_SHA_1_HASH_ID = new Sha1HashId(DigestUtils.sha(""));
@ -123,8 +123,8 @@ public class InfluxDBMetricRepository implements MetricRepository {
TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
context.stop();
logger.debug("Writing measurements, definitions, and dimensions to database took " +
(endTime - startTime) / 1000 + " seconds");
logger.debug("Writing measurements, definitions, and dimensions to database took {} seconds",
(endTime - startTime) / 1000);
} catch (Exception e) {
logger.error("Failed to write measurements to database", e);
}
@ -148,7 +148,7 @@ public class InfluxDBMetricRepository implements MetricRepository {
for (Set<String> dimNameSet : dimNameSetMap.keySet()) {
Serie serie = new Serie(definition.name);
logger.debug("Created serie: " + serie.getName());
logger.debug("Created serie: {}", serie.getName());
// Add 4 for the tenant id, region, timestamp, and value.
String[] colNameStringArry = new String[dimNameSet.size() + 4];
@ -158,7 +158,7 @@ public class InfluxDBMetricRepository implements MetricRepository {
colNameStringArry[1] = "region";
int j = 2;
for (String dimName : dimNameSet) {
logger.debug("Adding column name[{}]: " + dimName, j);
logger.debug("Adding column name[{}]: {}", j, dimName);
colNameStringArry[j++] = dimName;
}
logger.debug("Adding column name[{}]: time", j);
@ -181,9 +181,9 @@ public class InfluxDBMetricRepository implements MetricRepository {
Object[][] colValsObjectArry = new Object[pointList.size()][dimNameSet.size() + 4];
int k = 0;
for (Point point : pointList) {
logger.debug("Adding column value[{}][0]: " + definition.tenantId, k, 0);
logger.debug("Adding column value[{}][0]: {}", k, definition.tenantId);
colValsObjectArry[k][0] = definition.tenantId;
logger.debug("Adding column value[{}][1]: " + definition.region, k, 1);
logger.debug("Adding column value[{}][1]: {}", k, definition.region);
colValsObjectArry[k][1] = definition.region;
int l = 2;
for (String dimName : dimNameSet) {
@ -196,9 +196,9 @@ public class InfluxDBMetricRepository implements MetricRepository {
}
Date d = measurementTimeStampSimpleDateFormat.parse(point.measurement.timeStamp + " UTC");
Long time = d.getTime() / 1000;
logger.debug("Adding column value[{}][{}]: " + time, k, l);
logger.debug("Adding column value[{}][{}]: {}", k, l, time);
colValsObjectArry[k][l++] = time;
logger.debug("Adding column value[{}][{}]: " + point.measurement.value, k, l);
logger.debug("Adding column value[{}][{}]: {}", k, l, point.measurement.value);
colValsObjectArry[k][l++] = point.measurement.value;
measurementMeter.mark();
k++;
@ -232,7 +232,7 @@ public class InfluxDBMetricRepository implements MetricRepository {
}
sb.append(colVal);
}
logger.debug("Array of column values[{}]: [" + sb.toString() + "]", outerIdx);
logger.debug("Array of column values[{}]: [{}]", outerIdx, sb);
outerIdx++;
}
}
@ -249,7 +249,7 @@ public class InfluxDBMetricRepository implements MetricRepository {
}
sb.append(colName);
}
logger.debug("Array of column names: [" + sb.toString() + "]");
logger.debug("Array of column names: [{}]", sb);
}
/**

View File

@ -1,115 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hpcloud.mon.persister.repository;
import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor;
import com.hpcloud.mon.persister.disruptor.MetricDisruptor;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder;
import com.hpcloud.mon.persister.disruptor.event.MetricHolder;
import com.google.inject.Inject;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;
import io.dropwizard.lifecycle.Managed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RepositoryCommitHeartbeat implements Managed {
private final HeartbeatRunnable deduperRunnable;
@Inject
public RepositoryCommitHeartbeat(MetricDisruptor metricDisruptor,
AlarmStateHistoryDisruptor alarmHistoryDisruptor) {
this.deduperRunnable = new HeartbeatRunnable(metricDisruptor, alarmHistoryDisruptor);
}
@Override
public void start() throws Exception {
Thread heartbeatThread = new Thread(deduperRunnable);
heartbeatThread.start();
}
@Override
public void stop() throws Exception {
this.deduperRunnable.stop();
}
private static class HeartbeatRunnable implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatRunnable.class);
private final Disruptor<MetricHolder> metricDisruptor;
private final Disruptor<AlarmStateTransitionedEventHolder> alarmHistoryDisruptor;
private boolean stop = false;
private HeartbeatRunnable(MetricDisruptor metricDisruptor,
AlarmStateHistoryDisruptor alarmHistoryDisruptor) {
this.metricDisruptor = metricDisruptor;
this.alarmHistoryDisruptor = alarmHistoryDisruptor;
}
@Override
public void run() {
for (;;) {
try {
// Send a heartbeat every second.
synchronized (this) {
this.wait(1000);
if (stop) {
logger.debug("Heartbeat thread is exiting");
break;
}
}
logger.debug("Waking up after sleeping 1 seconds, yawn...");
// Send heartbeat
logger.debug("Sending heartbeat message");
metricDisruptor.publishEvent(new EventTranslator<MetricHolder>() {
@Override
public void translateTo(MetricHolder event, long sequence) {
event.setEnvelope(null);
}
});
alarmHistoryDisruptor
.publishEvent(new EventTranslator<AlarmStateTransitionedEventHolder>() {
@Override
public void translateTo(AlarmStateTransitionedEventHolder event, long sequence) {
event.setEvent(null);
}
});
} catch (Exception e) {
logger.error("Failed to send heartbeat", e);
}
}
}
public synchronized void stop() {
stop = true;
this.notify();
}
}
}

View File

@ -1,24 +1,33 @@
name: mon-persister
alarmHistoryConfiguration:
batchSize: 100
numThreads: 1
maxBatchTime: 15
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
topic: alarm-state-transitions
groupId: persister_alarms
consumerId: 1
clientId: 1
metricConfiguration:
batchSize: 1000
numThreads: 2
maxBatchTime: 30
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
topic: metrics
groupId: persister_metrics
consumerId: 1
clientId: 1
#Kafka settings.
kafkaConfiguration:
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
numThreads: 1
groupId: 1
#zookeeperConnect: localhost:2181
# See http://kafka.apache.org/documentation.html#api for semantics and defaults.
zookeeperConnect: 192.168.10.4:2181
consumerId: 1
socketTimeoutMs: 30000
socketReceiveBufferBytes : 65536
fetchMessageMaxBytes: 1048576
autoCommitEnable: true
autoCommitIntervalMs: 60000
queuedMaxMessageChunks: 10
rebalanceMaxRetries: 4
fetchMinBytes: 1
@ -26,23 +35,11 @@ kafkaConfiguration:
rebalanceBackoffMs: 2000
refreshLeaderBackoffMs: 200
autoOffsetReset: largest
consumerTimeoutMs: -1
clientId: 1
consumerTimeoutMs: 1000
zookeeperSessionTimeoutMs : 60000
zookeeperConnectionTimeoutMs : 6000
zookeeperSyncTimeMs: 2000
disruptorConfiguration:
bufferSize: 1048576
numProcessors: 1
outputProcessorConfiguration:
batchSize: 100
monDeDuperConfiguration:
dedupeRunFrequencySeconds: 30
verticaMetricRepositoryConfiguration:
maxCacheSize: 2000000
@ -54,7 +51,7 @@ databaseConfiguration:
influxDbConfiguration:
name: mon
replicationFactor: 1
url: http://127.0.0.1:8086
url: http://192.168.10.4:8086
user: root
password: root

View File

@ -19,12 +19,13 @@ package com.hpcloud.mon.persister;
import com.hpcloud.mon.persister.consumer.KafkaMetricsConsumer;
import com.hpcloud.mon.persister.consumer.MetricsConsumer;
import com.hpcloud.mon.persister.pipeline.MetricPipeline;
import com.hpcloud.mon.persister.pipeline.event.MetricHandler;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
public class MonPersisterConsumerTest {
@ -35,33 +36,22 @@ public class MonPersisterConsumerTest {
@Mock
private MetricsConsumer monConsumer;
private MetricHandler metricHandler;
private MetricPipeline metricPipeline;
@Before
public void initMocks() {
metricHandler = Mockito.mock(MetricHandler.class);
metricPipeline = Mockito.spy(new MetricPipeline(metricHandler));
MockitoAnnotations.initMocks(this);
}
@Test
public void testKafkaConsumerStart() {
try {
monConsumer.start();
} catch (Exception e) {
e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
}
}
@Test
public void testKafkaConsumerStop() {
try {
monConsumer.stop();
} catch (Exception e) {
e.printStackTrace(); // To change body of catch statement use File | Settings | File
// Templates.
}
}
@After
public void after() {
System.out.println("after");
public void testKafkaConsumerLifecycle() throws Exception {
monConsumer.start();
monConsumer.stop();
metricPipeline.shutdown();
Mockito.verify(metricHandler).flush();
}
}

View File

@ -1,74 +0,0 @@
package com.hpcloud.mon.persister;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.consumer.KafkaConsumer;
import com.hpcloud.util.config.ConfigurationException;
import com.hpcloud.util.config.ConfigurationFactory;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
public class Test extends KafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(Test.class);
private static final String TOPIC = "Test";
public Test(MonPersisterConfiguration configuration) {
super(configuration);
}
public static void main(String[] args) throws IOException, ConfigurationException {
final MonPersisterConfiguration config = createConfig(args[0]);
config.getKafkaConfiguration();
final Test test = new Test(config);
test.run();
}
private static MonPersisterConfiguration createConfig(String configFileName) throws IOException,
ConfigurationException {
return ConfigurationFactory
.<MonPersisterConfiguration>forClass(MonPersisterConfiguration.class).build(
new File(configFileName));
}
@Override
protected Runnable createRunnable(KafkaStream<byte[], byte[]> stream, int threadNumber) {
logger.info("Created KafkaReader for {}", threadNumber);
return new KafkaReader(stream, threadNumber);
}
@Override
protected String getStreamName() {
return TOPIC;
}
protected class KafkaReader implements Runnable {
private final KafkaStream<byte[], byte[]> stream;
private final int threadNumber;
public KafkaReader(KafkaStream<byte[], byte[]> stream, int threadNumber) {
this.threadNumber = threadNumber;
this.stream = stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
final String s = new String(it.next().message());
logger.debug("Thread {}: {}", threadNumber, s);
}
logger.debug("Shutting down Thread: " + threadNumber);
}
}
}