Merge "Java persister config: defaults and robustness"

This commit is contained in:
Zuul 2018-01-19 01:41:54 +00:00 committed by Gerrit Code Review
commit daaf71a4d2
3 changed files with 122 additions and 2 deletions

View File

@ -17,8 +17,10 @@
package monasca.persister.configuration;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@JsonIgnoreProperties(ignoreUnknown=true)
public class KafkaConfig {
@JsonProperty
@ -26,110 +28,170 @@ public class KafkaConfig {
@JsonProperty
String zookeeperConnect;
String _zookeeperConnect = "127.0.0.1";
@JsonProperty
Integer socketTimeoutMs;
Integer _socketTimeoutMs = 30000;
@JsonProperty
Integer socketReceiveBufferBytes;
Integer _socketReceiveBufferBytes = 65536;
@JsonProperty
Integer fetchMessageMaxBytes;
Integer _fetchMessageMaxBytes = 1048576;
@JsonProperty
Integer queuedMaxMessageChunks;
Integer _queuedMaxMessageChunks = 10;
@JsonProperty
Integer rebalanceMaxRetries;
Integer _rebalanceMaxRetries = 4;
@JsonProperty
Integer fetchMinBytes;
Integer _fetchMinBytes = 1;
@JsonProperty
Integer fetchWaitMaxMs;
Integer _fetchWaitMaxMs = 100;
@JsonProperty
Integer rebalanceBackoffMs;
Integer _rebalanceBackoffMs = 2000;
@JsonProperty
Integer refreshLeaderBackoffMs;
Integer _refreshLeaderBackoffMs = 200;
@JsonProperty
String autoOffsetReset;
String _autoOffsetReset = "largest";
@JsonProperty
Integer consumerTimeoutMs;
Integer _consumerTimeoutMs = 1000;
@JsonProperty
Integer zookeeperSessionTimeoutMs;
Integer _zookeeperSessionTimeoutMs = 60000;
@JsonProperty
Integer zookeeperConnectionTimeoutMs;
Integer _zookeeperConnectionTimeoutMs = 60000;
@JsonProperty
Integer zookeeperSyncTimeMs;
Integer _zookeeperSyncTimeMs = 2000;
public String getTopic() {
return topic;
}
public String getZookeeperConnect() {
if ( zookeeperConnect == null ) {
return _zookeeperConnect;
}
return zookeeperConnect;
}
public Integer getSocketTimeoutMs() {
if ( socketTimeoutMs == null ) {
return _socketTimeoutMs;
}
return socketTimeoutMs;
}
public Integer getSocketReceiveBufferBytes() {
if ( socketReceiveBufferBytes == null ) {
return _socketReceiveBufferBytes;
}
return socketReceiveBufferBytes;
}
public Integer getFetchMessageMaxBytes() {
if ( fetchMessageMaxBytes == null ) {
return _fetchMessageMaxBytes;
}
return fetchMessageMaxBytes;
}
public Integer getQueuedMaxMessageChunks() {
if ( queuedMaxMessageChunks == null ) {
return _queuedMaxMessageChunks;
}
return queuedMaxMessageChunks;
}
public Integer getRebalanceMaxRetries() {
if ( rebalanceMaxRetries == null ) {
return _rebalanceMaxRetries;
}
return rebalanceMaxRetries;
}
public Integer getFetchMinBytes() {
if ( fetchMinBytes == null ) {
return _fetchMinBytes;
}
return fetchMinBytes;
}
public Integer getFetchWaitMaxMs() {
if ( fetchWaitMaxMs == null ) {
return _fetchWaitMaxMs;
}
return fetchWaitMaxMs;
}
public Integer getRebalanceBackoffMs() {
if ( rebalanceBackoffMs == null ) {
return _rebalanceBackoffMs;
}
return rebalanceBackoffMs;
}
public Integer getRefreshLeaderBackoffMs() {
if ( refreshLeaderBackoffMs == null ) {
return _refreshLeaderBackoffMs;
}
return refreshLeaderBackoffMs;
}
public String getAutoOffsetReset() {
if ( autoOffsetReset == null ) {
return _autoOffsetReset;
}
return autoOffsetReset;
}
public Integer getConsumerTimeoutMs() {
if ( consumerTimeoutMs == null ) {
return _consumerTimeoutMs;
}
return consumerTimeoutMs;
}
public Integer getZookeeperSessionTimeoutMs() {
if ( zookeeperSessionTimeoutMs == null ) {
return _zookeeperSessionTimeoutMs;
}
return zookeeperSessionTimeoutMs;
}
public Integer getZookeeperConnectionTimeoutMs() {
if ( zookeeperConnectionTimeoutMs == null ) {
return _zookeeperConnectionTimeoutMs;
}
return zookeeperConnectionTimeoutMs;
}
public Integer getZookeeperSyncTimeMs() {
if ( zookeeperSyncTimeMs == null ) {
return _zookeeperSyncTimeMs;
}
return zookeeperSyncTimeMs;
}
}

View File

@ -19,6 +19,7 @@
package monasca.persister.configuration;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import monasca.common.configuration.CassandraDbConfiguration;
@ -30,22 +31,30 @@ import io.dropwizard.db.DataSourceFactory;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
@JsonIgnoreProperties(ignoreUnknown=true)
public class PersisterConfig extends Configuration {
@JsonProperty
private String name;
private String _name = "monasca-persister";
public String getName() {
if ( name == null ) {
return _name;
}
return name;
}
@JsonProperty
@NotNull
@Valid
private final PipelineConfig alarmHistoryConfiguration =
new PipelineConfig();
private final PipelineConfig alarmHistoryConfiguration = new PipelineConfig();
public PipelineConfig getAlarmHistoryConfiguration() {
// Set alarm history configuration specific defaults
alarmHistoryConfiguration.setDefaults("alarm-state-transitions",
"1_alarm-state-transitions",
1);
return alarmHistoryConfiguration;
}
@ -54,7 +63,12 @@ public class PersisterConfig extends Configuration {
@Valid
private final PipelineConfig metricConfiguration = new PipelineConfig();
public PipelineConfig getMetricConfiguration() {
// Set metric configuration specific defaults
metricConfiguration.setDefaults("metrics",
"1_metrics",
20000);
return metricConfiguration;
}

View File

@ -17,35 +17,58 @@
package monasca.persister.configuration;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@JsonIgnoreProperties(ignoreUnknown=true)
public class PipelineConfig {
@JsonProperty
String topic;
String _topic; // No default: default provided by constructor
@JsonProperty
String groupId;
String _groupId; // No default: default provided by constructor
@JsonProperty
String consumerId;
String _consumerId = "monasca-persister";
@JsonProperty
String clientId;
String _clientId = "monasca-persister";
@JsonProperty
Integer batchSize;
Integer _batchSize; // No default: default provided by constructor
@JsonProperty
Integer numThreads;
Integer _numThreads = 1;
@JsonProperty
Integer maxBatchTime;
Integer _maxBatchTime = 10;
@JsonProperty
Integer commitBatchTime;
Integer _commitBatchTime = 0;
/** Used to set default values for properties that have different sensible
* defaults for metric and alarm configurations, respectively.
*/
public void setDefaults(String defaultTopic, String defaultGroupId,
Integer defaultBatchSize) {
_batchSize = defaultBatchSize;
_groupId = defaultGroupId;
_topic = defaultTopic;
}
public Integer getCommitBatchTime() {
if ( commitBatchTime == null ) {
return _commitBatchTime;
}
return commitBatchTime;
}
@ -54,10 +77,16 @@ public class PipelineConfig {
}
public String getTopic() {
if ( topic == null ) {
return _topic;
}
return topic;
}
public String getGroupId() {
if ( groupId == null ) {
return _groupId;
}
return groupId;
}
@ -66,6 +95,9 @@ public class PipelineConfig {
}
public String getConsumerId() {
if ( consumerId == null ) {
return _consumerId;
}
return consumerId;
}
@ -74,6 +106,9 @@ public class PipelineConfig {
}
public String getClientId() {
if ( clientId == null ) {
return _clientId;
}
return clientId;
}
@ -94,14 +129,23 @@ public class PipelineConfig {
}
public Integer getBatchSize() {
if ( batchSize == null ) {
return _batchSize;
}
return batchSize;
}
public Integer getNumThreads() {
if ( numThreads == null ) {
return _numThreads;
}
return numThreads;
}
public Integer getMaxBatchTime() {
if ( maxBatchTime == null ) {
return _maxBatchTime;
}
return maxBatchTime;
}
}