Java persister config: defaults and robustness
This commit adds defaults for configuration settings where that makes sense so they can be omitted. Likewise, unknown settings will now be ignored everywhere except for the database sections (the relevant configuration classes for that are in monasca-common). Change-Id: I35ae02e44a11e088c36943d8c053aac07dbe7524 Story: 2001415 Task: 6115
This commit is contained in:
parent
c923911cb5
commit
406f521ef1
|
@ -17,8 +17,10 @@
|
||||||
|
|
||||||
package monasca.persister.configuration;
|
package monasca.persister.configuration;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
|
||||||
|
@JsonIgnoreProperties(ignoreUnknown=true)
|
||||||
public class KafkaConfig {
|
public class KafkaConfig {
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -26,110 +28,170 @@ public class KafkaConfig {
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
String zookeeperConnect;
|
String zookeeperConnect;
|
||||||
|
String _zookeeperConnect = "127.0.0.1";
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer socketTimeoutMs;
|
Integer socketTimeoutMs;
|
||||||
|
Integer _socketTimeoutMs = 30000;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer socketReceiveBufferBytes;
|
Integer socketReceiveBufferBytes;
|
||||||
|
Integer _socketReceiveBufferBytes = 65536;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer fetchMessageMaxBytes;
|
Integer fetchMessageMaxBytes;
|
||||||
|
Integer _fetchMessageMaxBytes = 1048576;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer queuedMaxMessageChunks;
|
Integer queuedMaxMessageChunks;
|
||||||
|
Integer _queuedMaxMessageChunks = 10;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer rebalanceMaxRetries;
|
Integer rebalanceMaxRetries;
|
||||||
|
Integer _rebalanceMaxRetries = 4;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer fetchMinBytes;
|
Integer fetchMinBytes;
|
||||||
|
Integer _fetchMinBytes = 1;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer fetchWaitMaxMs;
|
Integer fetchWaitMaxMs;
|
||||||
|
Integer _fetchWaitMaxMs = 100;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer rebalanceBackoffMs;
|
Integer rebalanceBackoffMs;
|
||||||
|
Integer _rebalanceBackoffMs = 2000;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer refreshLeaderBackoffMs;
|
Integer refreshLeaderBackoffMs;
|
||||||
|
Integer _refreshLeaderBackoffMs = 200;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
String autoOffsetReset;
|
String autoOffsetReset;
|
||||||
|
String _autoOffsetReset = "largest";
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer consumerTimeoutMs;
|
Integer consumerTimeoutMs;
|
||||||
|
Integer _consumerTimeoutMs = 1000;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer zookeeperSessionTimeoutMs;
|
Integer zookeeperSessionTimeoutMs;
|
||||||
|
Integer _zookeeperSessionTimeoutMs = 60000;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer zookeeperConnectionTimeoutMs;
|
Integer zookeeperConnectionTimeoutMs;
|
||||||
|
Integer _zookeeperConnectionTimeoutMs = 60000;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer zookeeperSyncTimeMs;
|
Integer zookeeperSyncTimeMs;
|
||||||
|
Integer _zookeeperSyncTimeMs = 2000;
|
||||||
|
|
||||||
public String getTopic() {
|
public String getTopic() {
|
||||||
return topic;
|
return topic;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getZookeeperConnect() {
|
public String getZookeeperConnect() {
|
||||||
|
if ( zookeeperConnect == null ) {
|
||||||
|
return _zookeeperConnect;
|
||||||
|
}
|
||||||
return zookeeperConnect;
|
return zookeeperConnect;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getSocketTimeoutMs() {
|
public Integer getSocketTimeoutMs() {
|
||||||
|
if ( socketTimeoutMs == null ) {
|
||||||
|
return _socketTimeoutMs;
|
||||||
|
}
|
||||||
return socketTimeoutMs;
|
return socketTimeoutMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getSocketReceiveBufferBytes() {
|
public Integer getSocketReceiveBufferBytes() {
|
||||||
|
if ( socketReceiveBufferBytes == null ) {
|
||||||
|
return _socketReceiveBufferBytes;
|
||||||
|
}
|
||||||
return socketReceiveBufferBytes;
|
return socketReceiveBufferBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getFetchMessageMaxBytes() {
|
public Integer getFetchMessageMaxBytes() {
|
||||||
|
if ( fetchMessageMaxBytes == null ) {
|
||||||
|
return _fetchMessageMaxBytes;
|
||||||
|
}
|
||||||
return fetchMessageMaxBytes;
|
return fetchMessageMaxBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getQueuedMaxMessageChunks() {
|
public Integer getQueuedMaxMessageChunks() {
|
||||||
|
if ( queuedMaxMessageChunks == null ) {
|
||||||
|
return _queuedMaxMessageChunks;
|
||||||
|
}
|
||||||
return queuedMaxMessageChunks;
|
return queuedMaxMessageChunks;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getRebalanceMaxRetries() {
|
public Integer getRebalanceMaxRetries() {
|
||||||
|
if ( rebalanceMaxRetries == null ) {
|
||||||
|
return _rebalanceMaxRetries;
|
||||||
|
}
|
||||||
return rebalanceMaxRetries;
|
return rebalanceMaxRetries;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getFetchMinBytes() {
|
public Integer getFetchMinBytes() {
|
||||||
|
if ( fetchMinBytes == null ) {
|
||||||
|
return _fetchMinBytes;
|
||||||
|
}
|
||||||
return fetchMinBytes;
|
return fetchMinBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getFetchWaitMaxMs() {
|
public Integer getFetchWaitMaxMs() {
|
||||||
|
if ( fetchWaitMaxMs == null ) {
|
||||||
|
return _fetchWaitMaxMs;
|
||||||
|
}
|
||||||
return fetchWaitMaxMs;
|
return fetchWaitMaxMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getRebalanceBackoffMs() {
|
public Integer getRebalanceBackoffMs() {
|
||||||
|
if ( rebalanceBackoffMs == null ) {
|
||||||
|
return _rebalanceBackoffMs;
|
||||||
|
}
|
||||||
return rebalanceBackoffMs;
|
return rebalanceBackoffMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getRefreshLeaderBackoffMs() {
|
public Integer getRefreshLeaderBackoffMs() {
|
||||||
|
if ( refreshLeaderBackoffMs == null ) {
|
||||||
|
return _refreshLeaderBackoffMs;
|
||||||
|
}
|
||||||
return refreshLeaderBackoffMs;
|
return refreshLeaderBackoffMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getAutoOffsetReset() {
|
public String getAutoOffsetReset() {
|
||||||
|
if ( autoOffsetReset == null ) {
|
||||||
|
return _autoOffsetReset;
|
||||||
|
}
|
||||||
return autoOffsetReset;
|
return autoOffsetReset;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getConsumerTimeoutMs() {
|
public Integer getConsumerTimeoutMs() {
|
||||||
|
if ( consumerTimeoutMs == null ) {
|
||||||
|
return _consumerTimeoutMs;
|
||||||
|
}
|
||||||
return consumerTimeoutMs;
|
return consumerTimeoutMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getZookeeperSessionTimeoutMs() {
|
public Integer getZookeeperSessionTimeoutMs() {
|
||||||
|
if ( zookeeperSessionTimeoutMs == null ) {
|
||||||
|
return _zookeeperSessionTimeoutMs;
|
||||||
|
}
|
||||||
return zookeeperSessionTimeoutMs;
|
return zookeeperSessionTimeoutMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getZookeeperConnectionTimeoutMs() {
|
public Integer getZookeeperConnectionTimeoutMs() {
|
||||||
|
if ( zookeeperConnectionTimeoutMs == null ) {
|
||||||
|
return _zookeeperConnectionTimeoutMs;
|
||||||
|
}
|
||||||
return zookeeperConnectionTimeoutMs;
|
return zookeeperConnectionTimeoutMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getZookeeperSyncTimeMs() {
|
public Integer getZookeeperSyncTimeMs() {
|
||||||
|
if ( zookeeperSyncTimeMs == null ) {
|
||||||
|
return _zookeeperSyncTimeMs;
|
||||||
|
}
|
||||||
return zookeeperSyncTimeMs;
|
return zookeeperSyncTimeMs;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package monasca.persister.configuration;
|
package monasca.persister.configuration;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
|
||||||
import monasca.common.configuration.CassandraDbConfiguration;
|
import monasca.common.configuration.CassandraDbConfiguration;
|
||||||
|
@ -30,22 +31,30 @@ import io.dropwizard.db.DataSourceFactory;
|
||||||
import javax.validation.Valid;
|
import javax.validation.Valid;
|
||||||
import javax.validation.constraints.NotNull;
|
import javax.validation.constraints.NotNull;
|
||||||
|
|
||||||
|
@JsonIgnoreProperties(ignoreUnknown=true)
|
||||||
public class PersisterConfig extends Configuration {
|
public class PersisterConfig extends Configuration {
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private String name;
|
private String name;
|
||||||
|
private String _name = "monasca-persister";
|
||||||
|
|
||||||
public String getName() {
|
public String getName() {
|
||||||
|
if ( name == null ) {
|
||||||
|
return _name;
|
||||||
|
}
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
@NotNull
|
@NotNull
|
||||||
@Valid
|
@Valid
|
||||||
private final PipelineConfig alarmHistoryConfiguration =
|
private final PipelineConfig alarmHistoryConfiguration = new PipelineConfig();
|
||||||
new PipelineConfig();
|
|
||||||
|
|
||||||
public PipelineConfig getAlarmHistoryConfiguration() {
|
public PipelineConfig getAlarmHistoryConfiguration() {
|
||||||
|
// Set alarm history configuration specific defaults
|
||||||
|
alarmHistoryConfiguration.setDefaults("alarm-state-transitions",
|
||||||
|
"1_alarm-state-transitions",
|
||||||
|
1);
|
||||||
return alarmHistoryConfiguration;
|
return alarmHistoryConfiguration;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,7 +63,12 @@ public class PersisterConfig extends Configuration {
|
||||||
@Valid
|
@Valid
|
||||||
private final PipelineConfig metricConfiguration = new PipelineConfig();
|
private final PipelineConfig metricConfiguration = new PipelineConfig();
|
||||||
|
|
||||||
|
|
||||||
public PipelineConfig getMetricConfiguration() {
|
public PipelineConfig getMetricConfiguration() {
|
||||||
|
// Set metric configuration specific defaults
|
||||||
|
metricConfiguration.setDefaults("metrics",
|
||||||
|
"1_metrics",
|
||||||
|
20000);
|
||||||
return metricConfiguration;
|
return metricConfiguration;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,35 +17,58 @@
|
||||||
|
|
||||||
package monasca.persister.configuration;
|
package monasca.persister.configuration;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
|
||||||
|
@JsonIgnoreProperties(ignoreUnknown=true)
|
||||||
public class PipelineConfig {
|
public class PipelineConfig {
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
String topic;
|
String topic;
|
||||||
|
String _topic; // No default: default provided by constructor
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
String groupId;
|
String groupId;
|
||||||
|
String _groupId; // No default: default provided by constructor
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
String consumerId;
|
String consumerId;
|
||||||
|
String _consumerId = "monasca-persister";
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
String clientId;
|
String clientId;
|
||||||
|
String _clientId = "monasca-persister";
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer batchSize;
|
Integer batchSize;
|
||||||
|
Integer _batchSize; // No default: default provided by constructor
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer numThreads;
|
Integer numThreads;
|
||||||
|
Integer _numThreads = 1;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer maxBatchTime;
|
Integer maxBatchTime;
|
||||||
|
Integer _maxBatchTime = 10;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
Integer commitBatchTime;
|
Integer commitBatchTime;
|
||||||
|
Integer _commitBatchTime = 0;
|
||||||
|
|
||||||
|
/** Used to set default values for properties that have different sensible
|
||||||
|
* defaults for metric and alarm configurations, respectively.
|
||||||
|
*/
|
||||||
|
public void setDefaults(String defaultTopic, String defaultGroupId,
|
||||||
|
Integer defaultBatchSize) {
|
||||||
|
_batchSize = defaultBatchSize;
|
||||||
|
_groupId = defaultGroupId;
|
||||||
|
_topic = defaultTopic;
|
||||||
|
}
|
||||||
|
|
||||||
public Integer getCommitBatchTime() {
|
public Integer getCommitBatchTime() {
|
||||||
|
if ( commitBatchTime == null ) {
|
||||||
|
return _commitBatchTime;
|
||||||
|
}
|
||||||
return commitBatchTime;
|
return commitBatchTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,10 +77,16 @@ public class PipelineConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getTopic() {
|
public String getTopic() {
|
||||||
|
if ( topic == null ) {
|
||||||
|
return _topic;
|
||||||
|
}
|
||||||
return topic;
|
return topic;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getGroupId() {
|
public String getGroupId() {
|
||||||
|
if ( groupId == null ) {
|
||||||
|
return _groupId;
|
||||||
|
}
|
||||||
return groupId;
|
return groupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,6 +95,9 @@ public class PipelineConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getConsumerId() {
|
public String getConsumerId() {
|
||||||
|
if ( consumerId == null ) {
|
||||||
|
return _consumerId;
|
||||||
|
}
|
||||||
return consumerId;
|
return consumerId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,6 +106,9 @@ public class PipelineConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getClientId() {
|
public String getClientId() {
|
||||||
|
if ( clientId == null ) {
|
||||||
|
return _clientId;
|
||||||
|
}
|
||||||
return clientId;
|
return clientId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,14 +129,23 @@ public class PipelineConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getBatchSize() {
|
public Integer getBatchSize() {
|
||||||
|
if ( batchSize == null ) {
|
||||||
|
return _batchSize;
|
||||||
|
}
|
||||||
return batchSize;
|
return batchSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getNumThreads() {
|
public Integer getNumThreads() {
|
||||||
|
if ( numThreads == null ) {
|
||||||
|
return _numThreads;
|
||||||
|
}
|
||||||
return numThreads;
|
return numThreads;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getMaxBatchTime() {
|
public Integer getMaxBatchTime() {
|
||||||
|
if ( maxBatchTime == null ) {
|
||||||
|
return _maxBatchTime;
|
||||||
|
}
|
||||||
return maxBatchTime;
|
return maxBatchTime;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue