From 406f521ef112fd7321d620a19d9ec305e773a513 Mon Sep 17 00:00:00 2001 From: Johannes Grassler Date: Wed, 10 Jan 2018 11:46:14 +0100 Subject: [PATCH] Java persister config: defaults and robustness This commit adds defaults for configuration settings where that makes sense so they can be omitted. Likewise, unknown settings will now be ignored everywhere except for the database sections (the relevant configuration classes for that are in monasca-common). Change-Id: I35ae02e44a11e088c36943d8c053aac07dbe7524 Story: 2001415 Task: 6115 --- .../persister/configuration/KafkaConfig.java | 62 +++++++++++++++++++ .../configuration/PersisterConfig.java | 18 +++++- .../configuration/PipelineConfig.java | 44 +++++++++++++ 3 files changed, 122 insertions(+), 2 deletions(-) diff --git a/java/src/main/java/monasca/persister/configuration/KafkaConfig.java b/java/src/main/java/monasca/persister/configuration/KafkaConfig.java index ee2102de..6fd1454d 100644 --- a/java/src/main/java/monasca/persister/configuration/KafkaConfig.java +++ b/java/src/main/java/monasca/persister/configuration/KafkaConfig.java @@ -17,8 +17,10 @@ package monasca.persister.configuration; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +@JsonIgnoreProperties(ignoreUnknown=true) public class KafkaConfig { @JsonProperty @@ -26,110 +28,170 @@ public class KafkaConfig { @JsonProperty String zookeeperConnect; + String _zookeeperConnect = "127.0.0.1"; @JsonProperty Integer socketTimeoutMs; + Integer _socketTimeoutMs = 30000; @JsonProperty Integer socketReceiveBufferBytes; + Integer _socketReceiveBufferBytes = 65536; @JsonProperty Integer fetchMessageMaxBytes; + Integer _fetchMessageMaxBytes = 1048576; @JsonProperty Integer queuedMaxMessageChunks; + Integer _queuedMaxMessageChunks = 10; @JsonProperty Integer rebalanceMaxRetries; + Integer _rebalanceMaxRetries = 4; @JsonProperty Integer fetchMinBytes; + Integer _fetchMinBytes = 1; @JsonProperty Integer fetchWaitMaxMs; + Integer _fetchWaitMaxMs = 100; @JsonProperty Integer rebalanceBackoffMs; + Integer _rebalanceBackoffMs = 2000; @JsonProperty Integer refreshLeaderBackoffMs; + Integer _refreshLeaderBackoffMs = 200; @JsonProperty String autoOffsetReset; + String _autoOffsetReset = "largest"; @JsonProperty Integer consumerTimeoutMs; + Integer _consumerTimeoutMs = 1000; @JsonProperty Integer zookeeperSessionTimeoutMs; + Integer _zookeeperSessionTimeoutMs = 60000; @JsonProperty Integer zookeeperConnectionTimeoutMs; + Integer _zookeeperConnectionTimeoutMs = 60000; @JsonProperty Integer zookeeperSyncTimeMs; + Integer _zookeeperSyncTimeMs = 2000; public String getTopic() { return topic; } public String getZookeeperConnect() { + if ( zookeeperConnect == null ) { + return _zookeeperConnect; + } return zookeeperConnect; } public Integer getSocketTimeoutMs() { + if ( socketTimeoutMs == null ) { + return _socketTimeoutMs; + } return socketTimeoutMs; } public Integer getSocketReceiveBufferBytes() { + if ( socketReceiveBufferBytes == null ) { + return _socketReceiveBufferBytes; + } return socketReceiveBufferBytes; } public Integer getFetchMessageMaxBytes() { + if ( fetchMessageMaxBytes == null ) { + return _fetchMessageMaxBytes; + } return fetchMessageMaxBytes; } public Integer getQueuedMaxMessageChunks() { + if ( queuedMaxMessageChunks == null ) { + return _queuedMaxMessageChunks; + } return queuedMaxMessageChunks; } public Integer getRebalanceMaxRetries() { + if ( rebalanceMaxRetries == null ) { + return _rebalanceMaxRetries; + } return rebalanceMaxRetries; } public Integer getFetchMinBytes() { + if ( fetchMinBytes == null ) { + return _fetchMinBytes; + } return fetchMinBytes; } public Integer getFetchWaitMaxMs() { + if ( fetchWaitMaxMs == null ) { + return _fetchWaitMaxMs; + } return fetchWaitMaxMs; } public Integer getRebalanceBackoffMs() { + if ( rebalanceBackoffMs == null ) { + return _rebalanceBackoffMs; + } return rebalanceBackoffMs; } public Integer getRefreshLeaderBackoffMs() { + if ( refreshLeaderBackoffMs == null ) { + return _refreshLeaderBackoffMs; + } return refreshLeaderBackoffMs; } public String getAutoOffsetReset() { + if ( autoOffsetReset == null ) { + return _autoOffsetReset; + } return autoOffsetReset; } public Integer getConsumerTimeoutMs() { + if ( consumerTimeoutMs == null ) { + return _consumerTimeoutMs; + } return consumerTimeoutMs; } public Integer getZookeeperSessionTimeoutMs() { + if ( zookeeperSessionTimeoutMs == null ) { + return _zookeeperSessionTimeoutMs; + } return zookeeperSessionTimeoutMs; } public Integer getZookeeperConnectionTimeoutMs() { + if ( zookeeperConnectionTimeoutMs == null ) { + return _zookeeperConnectionTimeoutMs; + } return zookeeperConnectionTimeoutMs; } public Integer getZookeeperSyncTimeMs() { + if ( zookeeperSyncTimeMs == null ) { + return _zookeeperSyncTimeMs; + } return zookeeperSyncTimeMs; } } diff --git a/java/src/main/java/monasca/persister/configuration/PersisterConfig.java b/java/src/main/java/monasca/persister/configuration/PersisterConfig.java index b8d09a68..9efab2e9 100644 --- a/java/src/main/java/monasca/persister/configuration/PersisterConfig.java +++ b/java/src/main/java/monasca/persister/configuration/PersisterConfig.java @@ -19,6 +19,7 @@ package monasca.persister.configuration; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import monasca.common.configuration.CassandraDbConfiguration; @@ -30,22 +31,30 @@ import io.dropwizard.db.DataSourceFactory; import javax.validation.Valid; import javax.validation.constraints.NotNull; +@JsonIgnoreProperties(ignoreUnknown=true) public class PersisterConfig extends Configuration { @JsonProperty private String name; + private String _name = "monasca-persister"; public String getName() { + if ( name == null ) { + return _name; + } return name; } @JsonProperty @NotNull @Valid - private final PipelineConfig alarmHistoryConfiguration = - new PipelineConfig(); + private final PipelineConfig alarmHistoryConfiguration = new PipelineConfig(); public PipelineConfig getAlarmHistoryConfiguration() { + // Set alarm history configuration specific defaults + alarmHistoryConfiguration.setDefaults("alarm-state-transitions", + "1_alarm-state-transitions", + 1); return alarmHistoryConfiguration; } @@ -54,7 +63,12 @@ public class PersisterConfig extends Configuration { @Valid private final PipelineConfig metricConfiguration = new PipelineConfig(); + public PipelineConfig getMetricConfiguration() { + // Set metric configuration specific defaults + metricConfiguration.setDefaults("metrics", + "1_metrics", + 20000); return metricConfiguration; } diff --git a/java/src/main/java/monasca/persister/configuration/PipelineConfig.java b/java/src/main/java/monasca/persister/configuration/PipelineConfig.java index b5f95016..d578280e 100644 --- a/java/src/main/java/monasca/persister/configuration/PipelineConfig.java +++ b/java/src/main/java/monasca/persister/configuration/PipelineConfig.java @@ -17,35 +17,58 @@ package monasca.persister.configuration; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +@JsonIgnoreProperties(ignoreUnknown=true) public class PipelineConfig { @JsonProperty String topic; + String _topic; // No default: default provided by constructor @JsonProperty String groupId; + String _groupId; // No default: default provided by constructor @JsonProperty String consumerId; + String _consumerId = "monasca-persister"; @JsonProperty String clientId; + String _clientId = "monasca-persister"; @JsonProperty Integer batchSize; + Integer _batchSize; // No default: default provided by constructor @JsonProperty Integer numThreads; + Integer _numThreads = 1; @JsonProperty Integer maxBatchTime; + Integer _maxBatchTime = 10; @JsonProperty Integer commitBatchTime; + Integer _commitBatchTime = 0; + + /** Used to set default values for properties that have different sensible + * defaults for metric and alarm configurations, respectively. + */ + public void setDefaults(String defaultTopic, String defaultGroupId, + Integer defaultBatchSize) { + _batchSize = defaultBatchSize; + _groupId = defaultGroupId; + _topic = defaultTopic; + } public Integer getCommitBatchTime() { + if ( commitBatchTime == null ) { + return _commitBatchTime; + } return commitBatchTime; } @@ -54,10 +77,16 @@ public class PipelineConfig { } public String getTopic() { + if ( topic == null ) { + return _topic; + } return topic; } public String getGroupId() { + if ( groupId == null ) { + return _groupId; + } return groupId; } @@ -66,6 +95,9 @@ public class PipelineConfig { } public String getConsumerId() { + if ( consumerId == null ) { + return _consumerId; + } return consumerId; } @@ -74,6 +106,9 @@ public class PipelineConfig { } public String getClientId() { + if ( clientId == null ) { + return _clientId; + } return clientId; } @@ -94,14 +129,23 @@ public class PipelineConfig { } public Integer getBatchSize() { + if ( batchSize == null ) { + return _batchSize; + } return batchSize; } public Integer getNumThreads() { + if ( numThreads == null ) { + return _numThreads; + } return numThreads; } public Integer getMaxBatchTime() { + if ( maxBatchTime == null ) { + return _maxBatchTime; + } return maxBatchTime; } }