Allow limiting the agent buffering on measurements instead of messages

Size of messages may vary based on the number of metrics the
agent is generating, limiting by measurements will be more stable.

Store messages as json strings when queuing to save space

Change-Id: If6dc0a272a26ac951a5d8823dafd32cab69a3981
This commit is contained in:
Ryan Brandt 2016-09-09 10:55:56 -06:00
parent ff1045810c
commit 443f2b1111
5 changed files with 68 additions and 13 deletions

View File

@ -37,13 +37,19 @@ Api:
# Name of the ca certs file
ca_file: {args.ca_file}
# The following 2 options are for handling buffering and reconnection to the monasca-api
# If you want the messages to be sent as fast as possible, set these two options to
# the same number. If you have a larger system with many agents, you may want to throttle
# the number of messages sent to the API by setting the backlog_send_rate to a lower number.
# The following 3 options are for handling buffering and reconnection to the monasca-api
# If the agent forwarder is consuming too much memory, you may want to set
# max_measurement_buffer_size to a lower value. If you have a larger system with many agents,
# you may want to throttle the number of messages sent to the API by setting the
# backlog_send_rate to a lower number.
# Maximum number of messages to buffer when unable to communicate with the monasca-api
# DEPRECATED - please use max_measurement_buffer_size instead
# Maximum number of messages (batches of measurements) to buffer when unable to communicate
# with the monasca-api (-1 means no limit)
max_buffer_size: 1000
# Maximum number of measurements to buffer when unable to communicate with the monasca-api
# (-1 means no limit)
max_measurement_buffer_size: {args.max_measurement_buffer_size}
# Maximum number of messages to send at one time when communication with the monasca-api is restored
backlog_send_rate: 1000

View File

@ -1,4 +1,4 @@
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
import logging
import os
@ -66,6 +66,7 @@ class Config(object):
'keystone_timeout': 20,
'keystone_url': '',
'max_buffer_size': 1000,
'max_measurement_buffer_size': -1,
'write_timeout': 10,
'backlog_send_rate': 5},
'Statsd': {'recent_point_threshold': None,

View File

@ -2,6 +2,7 @@
import collections
import copy
import json
import logging
import random
import time
@ -32,9 +33,24 @@ class MonascaAPI(object):
self.mon_client = None
self._failure_reason = None
self._resume_time = None
self._log_interval_remaining = 1
self._current_number_measurements = 0
self.max_buffer_size = int(config['max_buffer_size'])
self.max_measurement_buffer_size = int(config['max_measurement_buffer_size'])
if self.max_buffer_size > -1:
log.debug("'max_buffer_size' is deprecated. Please use"
" 'max_measurement_buffer_size' instead")
if self.max_measurement_buffer_size > -1:
log.debug("Overriding 'max_buffer_size' option with new"
" 'max_measurment_buffer_size' option")
self.max_buffer_size = -1
self.backlog_send_rate = int(config['backlog_send_rate'])
self.message_queue = collections.deque(maxlen=self.max_buffer_size)
if self.max_buffer_size > -1:
self.message_queue = collections.deque(maxlen=self.max_buffer_size)
else:
self.message_queue = collections.deque()
self.write_timeout = int(config['write_timeout'])
# 'amplifier' is completely optional and may not exist in the config
try:
@ -65,10 +81,13 @@ class MonascaAPI(object):
messages_sent = 0
for index in range(0, len(self.message_queue)):
if index < self.backlog_send_rate:
msg = self.message_queue.pop()
msg = json.loads(self.message_queue.pop())
if self._send_message(**msg):
messages_sent += 1
for value in msg.values():
self._current_number_measurements -= len(value)
else:
self._queue_message(msg, self._failure_reason)
break
@ -76,6 +95,7 @@ class MonascaAPI(object):
break
log.info("Sent {0} messages from the backlog.".format(messages_sent))
log.info("{0} messages remaining in the queue.".format(len(self.message_queue)))
self._log_interval_remaining = 0
else:
self._queue_message(kwargs.copy(), self._failure_reason)
@ -147,10 +167,35 @@ class MonascaAPI(object):
return False
def _queue_message(self, msg, reason):
self.message_queue.append(msg)
queue_size = len(self.message_queue)
if queue_size is 1 or queue_size % MonascaAPI.LOG_INTERVAL == 0:
if self.max_buffer_size == 0 or self.max_measurement_buffer_size == 0:
return
self.message_queue.append(json.dumps(msg))
for value in msg.values():
self._current_number_measurements += len(value)
if self.max_measurement_buffer_size > -1:
while self._current_number_measurements > self.max_measurement_buffer_size:
self._remove_oldest_from_queue()
if self._log_interval_remaining <= 1:
log.warn("{0}. Queuing the messages to send later...".format(reason))
log.info("Current agent queue size: {0} of {1}.".format(len(self.message_queue),
self.max_buffer_size))
log.info("Current measurements in queue: {0} of {1}".format(
self._current_number_measurements, self.max_measurement_buffer_size))
log.info("A message will be logged for every {0} messages queued.".format(MonascaAPI.LOG_INTERVAL))
self._log_interval_remaining = MonascaAPI.LOG_INTERVAL
else:
self._log_interval_remaining -= 1
def _remove_oldest_from_queue(self):
removed_batch = json.loads(self.message_queue.popleft())
num_discarded = 0
for value in removed_batch.values():
num_discarded += len(value)
self._current_number_measurements -= num_discarded
log.warn("Queue too large, discarding oldest batch: {0} measurements discarded".format(
num_discarded))

View File

@ -30,7 +30,6 @@ import tornado.web
# agent import
import monasca_agent.common.config as cfg
import monasca_agent.common.metrics as metrics
import monasca_agent.common.util as util
import monasca_agent.forwarder.api.monasca_api as mon

View File

@ -1,5 +1,5 @@
#!/usr/bin/env python
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
""" Detect running daemons then configure and start the agent.
"""
@ -246,6 +246,10 @@ def parse_arguments(parser):
"Useful for load testing; not for production use.", default=0)
parser.add_argument('-v', '--verbose', help="Verbose Output", action="store_true")
parser.add_argument('--dry_run', help="Make no changes just report on changes", action="store_true")
parser.add_argument('--max_measurement_buffer_size',
help='Maximum number of measurements to buffer when unable to communicate'
' with the monasca-api',
default=-1)
return parser.parse_args()