Add max batch size for writing to API

Add configurable maximum size of batches of measumrement to write
to Monasca API. Prevents killing Monasca API in memory limited
configurations. The default is no limit.

Change-Id: I2bf84501cc51c24843d7c3befd8f9dd42f010f0c
Story: 2001434
This commit is contained in:
Craig Bryant 2018-01-03 08:33:53 -07:00
parent 47bade9e09
commit 50824d1170
5 changed files with 17 additions and 5 deletions

View File

@ -57,6 +57,8 @@ Api:
max_measurement_buffer_size: {args.max_measurement_buffer_size}
# Maximum number of messages to send at one time when communication with the monasca-api is restored
backlog_send_rate: {args.backlog_send_rate}
# Maximum batch size of measurements to write to monasca-api, 0 is no limit
max_batch_size: {args.max_batch_size}
# Publish extra metrics to the API by adding this number of 'amplifier' dimensions.
# For load testing purposes only; set to 0 for production use.

View File

@ -110,6 +110,7 @@ All parameters require a '--' before the parameter such as '--verbose'. Run `mon
| detection_args_json | A JSON string can be passed to the detection plugin. | '{"process_config":{"process_names":["monasca-api","monasca-notification"],"dimensions":{"service":"monitoring"}}}' |
| max_measurement_buffer_size | Integer value for the maximum number of measurements to buffer locally while unable to connect to the monasca-api. If the queue exceeds this value, measurements will be dropped in batches. A value of '-1' indicates no limit | 100000 |
| backlog_send_rate | Integer value of how many batches of buffered measurements to send each time the forwarder flushes data | 1000 |
| max_batch_size | Maximum batch size of measurements to write to monasca-api, 0 is no limit | 0 |
| monasca_statsd_port | Integer value for statsd daemon port number | 8125 |
#### A note around using monasca-agent with different versions of Keystone
@ -280,4 +281,4 @@ If there is some problem with multiple plugins that end up blocking the entire t
Some of the plugins have their own thread pools to handle asynchronous checks. The collector thread pool is separate and has no special interaction with those thread pools.
# License
(C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
(C) Copyright 2015-2016, 2018 Hewlett Packard Enterprise Development LP

View File

@ -1,4 +1,4 @@
# (C) Copyright 2015-2017 Hewlett Packard Enterprise Development LP
# (C) Copyright 2015-2018 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
import logging
@ -71,7 +71,8 @@ class Config(object):
'max_buffer_size': 1000,
'max_measurement_buffer_size': -1,
'write_timeout': 10,
'backlog_send_rate': 5},
'backlog_send_rate': 5,
'max_batch_size': 0},
'Statsd': {'recent_point_threshold': None,
'monasca_statsd_interval': 20,
'monasca_statsd_forward_host': None,

View File

@ -1,4 +1,4 @@
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
# (C) Copyright 2015-2016,2018 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
import collections
@ -34,6 +34,7 @@ class MonascaAPI(object):
self._log_interval_remaining = 1
self._current_number_measurements = 0
self._max_buffer_size = int(config['max_buffer_size'])
self._max_batch_size = int(config['max_batch_size'])
self._max_measurement_buffer_size = int(
config['max_measurement_buffer_size'])
@ -115,6 +116,9 @@ class MonascaAPI(object):
measurement = envelope['measurement']
tenant = envelope['tenant_id']
tenant_group.setdefault(tenant, []).append(copy.deepcopy(measurement))
if self._max_batch_size and len(tenant_group[tenant]) >= self._max_batch_size:
self._post(tenant_group[tenant], tenant)
del tenant_group[tenant]
for tenant in tenant_group:
self._post(tenant_group[tenant], tenant)

View File

@ -1,5 +1,5 @@
#!/usr/bin/env python
# (C) Copyright 2015-2017 Hewlett Packard Enterprise Development LP
# (C) Copyright 2015-2018 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
""" Detect running daemons then configure and start the agent.
@ -289,6 +289,10 @@ def parse_arguments(parser):
help="Maximum number of batches of measurements to"
" buffer while unable to communicate with monasca-api",
default=1000)
parser.add_argument('--max_batch_size',
help="Maximum batch size of measurements to"
" write to monasca-api, 0 is no limit",
default=0)
parser.add_argument('--max_measurement_buffer_size',
help="Maximum number of measurements to buffer when unable to communicate"
" with the monasca-api",