Merge "Allow to disable statsd self-moniotring"
This commit is contained in:
commit
07f3cde33b
|
@ -68,22 +68,22 @@ class LogPublisher(object):
|
||||||
self._kafka_publisher = producer.KafkaProducer(
|
self._kafka_publisher = producer.KafkaProducer(
|
||||||
url=CONF.log_publisher.kafka_url
|
url=CONF.log_publisher.kafka_url
|
||||||
)
|
)
|
||||||
|
if CONF.monitoring.enable:
|
||||||
|
self._statsd = client.get_client()
|
||||||
|
|
||||||
self._statsd = client.get_client()
|
# setup counter, gauges etc
|
||||||
|
self._logs_published_counter = self._statsd.get_counter(
|
||||||
# setup counter, gauges etc
|
metrics.LOGS_PUBLISHED_METRIC
|
||||||
self._logs_published_counter = self._statsd.get_counter(
|
)
|
||||||
metrics.LOGS_PUBLISHED_METRIC
|
self._publish_time_ms = self._statsd.get_timer(
|
||||||
)
|
metrics.LOGS_PUBLISH_TIME_METRIC
|
||||||
self._publish_time_ms = self._statsd.get_timer(
|
)
|
||||||
metrics.LOGS_PUBLISH_TIME_METRIC
|
self._logs_lost_counter = self._statsd.get_counter(
|
||||||
)
|
metrics.LOGS_PUBLISHED_LOST_METRIC
|
||||||
self._logs_lost_counter = self._statsd.get_counter(
|
)
|
||||||
metrics.LOGS_PUBLISHED_LOST_METRIC
|
self._logs_truncated_gauge = self._statsd.get_gauge(
|
||||||
)
|
metrics.LOGS_TRUNCATED_METRIC
|
||||||
self._logs_truncated_gauge = self._statsd.get_gauge(
|
)
|
||||||
metrics.LOGS_TRUNCATED_METRIC
|
|
||||||
)
|
|
||||||
|
|
||||||
LOG.info('Initializing LogPublisher <%s>', self)
|
LOG.info('Initializing LogPublisher <%s>', self)
|
||||||
|
|
||||||
|
@ -117,8 +117,10 @@ class LogPublisher(object):
|
||||||
for message in messages:
|
for message in messages:
|
||||||
msg = self._transform_message(message)
|
msg = self._transform_message(message)
|
||||||
send_messages.append(msg)
|
send_messages.append(msg)
|
||||||
|
if CONF.monitoring.enable:
|
||||||
with self._publish_time_ms.time(name=None):
|
with self._publish_time_ms.time(name=None):
|
||||||
|
self._publish(send_messages)
|
||||||
|
else:
|
||||||
self._publish(send_messages)
|
self._publish(send_messages)
|
||||||
|
|
||||||
sent_counter = len(send_messages)
|
sent_counter = len(send_messages)
|
||||||
|
@ -199,11 +201,13 @@ class LogPublisher(object):
|
||||||
|
|
||||||
envelope['log']['truncated'] = True
|
envelope['log']['truncated'] = True
|
||||||
envelope['log']['message'] = truncated_log_msg
|
envelope['log']['message'] = truncated_log_msg
|
||||||
self._logs_truncated_gauge.send(name=None, value=truncated_by)
|
if CONF.monitoring.enable:
|
||||||
|
self._logs_truncated_gauge.send(name=None, value=truncated_by)
|
||||||
|
|
||||||
msg_str = rest_utils.as_json(envelope)
|
msg_str = rest_utils.as_json(envelope)
|
||||||
else:
|
else:
|
||||||
self._logs_truncated_gauge.send(name=None, value=0)
|
if CONF.monitoring.enable:
|
||||||
|
self._logs_truncated_gauge.send(name=None, value=0)
|
||||||
|
|
||||||
return msg_str
|
return msg_str
|
||||||
|
|
||||||
|
@ -255,6 +259,6 @@ class LogPublisher(object):
|
||||||
error_str = ('Failed to send all messages, %d '
|
error_str = ('Failed to send all messages, %d '
|
||||||
'messages out of %d have not been published')
|
'messages out of %d have not been published')
|
||||||
LOG.error(error_str, failed_to_send, to_send_count)
|
LOG.error(error_str, failed_to_send, to_send_count)
|
||||||
|
if CONF.monitoring.enable:
|
||||||
self._logs_published_counter.increment(value=send_count)
|
self._logs_published_counter.increment(value=send_count)
|
||||||
self._logs_lost_counter.increment(value=failed_to_send)
|
self._logs_lost_counter.increment(value=failed_to_send)
|
||||||
|
|
|
@ -16,9 +16,11 @@
|
||||||
import falcon
|
import falcon
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
|
|
||||||
|
from monasca_log_api import conf
|
||||||
from monasca_log_api.monitoring import client
|
from monasca_log_api.monitoring import client
|
||||||
from monasca_log_api.monitoring import metrics
|
from monasca_log_api.monitoring import metrics
|
||||||
|
|
||||||
|
CONF = conf.CONF
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -32,27 +34,28 @@ class LogsApi(object):
|
||||||
"""
|
"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(LogsApi, self).__init__()
|
super(LogsApi, self).__init__()
|
||||||
self._statsd = client.get_client()
|
if CONF.monitoring.enable:
|
||||||
|
self._statsd = client.get_client()
|
||||||
|
|
||||||
# create_common counters, gauges etc.
|
# create_common counters, gauges etc.
|
||||||
self._metrics_dimensions = dimensions = {'version': self.version}
|
self._metrics_dimensions = dimensions = {'version': self.version}
|
||||||
|
|
||||||
self._logs_in_counter = self._statsd.get_counter(
|
self._logs_in_counter = self._statsd.get_counter(
|
||||||
name=metrics.LOGS_RECEIVED_METRIC,
|
name=metrics.LOGS_RECEIVED_METRIC,
|
||||||
dimensions=dimensions
|
dimensions=dimensions
|
||||||
)
|
)
|
||||||
self._logs_size_gauge = self._statsd.get_gauge(
|
self._logs_size_gauge = self._statsd.get_gauge(
|
||||||
name=metrics.LOGS_RECEIVED_BYTE_SIZE_METRICS,
|
name=metrics.LOGS_RECEIVED_BYTE_SIZE_METRICS,
|
||||||
dimensions=dimensions
|
dimensions=dimensions
|
||||||
)
|
)
|
||||||
self._logs_rejected_counter = self._statsd.get_counter(
|
self._logs_rejected_counter = self._statsd.get_counter(
|
||||||
name=metrics.LOGS_REJECTED_METRIC,
|
name=metrics.LOGS_REJECTED_METRIC,
|
||||||
dimensions=dimensions
|
dimensions=dimensions
|
||||||
)
|
)
|
||||||
self._logs_processing_time = self._statsd.get_timer(
|
self._logs_processing_time = self._statsd.get_timer(
|
||||||
name=metrics.LOGS_PROCESSING_TIME_METRIC,
|
name=metrics.LOGS_PROCESSING_TIME_METRIC,
|
||||||
dimensions=dimensions
|
dimensions=dimensions
|
||||||
)
|
)
|
||||||
|
|
||||||
LOG.info('Initializing LogsApi %s!' % self.version)
|
LOG.info('Initializing LogsApi %s!' % self.version)
|
||||||
|
|
||||||
|
|
|
@ -16,11 +16,14 @@
|
||||||
import falcon
|
import falcon
|
||||||
import six
|
import six
|
||||||
|
|
||||||
|
|
||||||
from monasca_log_api.app.base import log_publisher
|
from monasca_log_api.app.base import log_publisher
|
||||||
from monasca_log_api.app.controller.api import headers
|
from monasca_log_api.app.controller.api import headers
|
||||||
from monasca_log_api.app.controller.api import logs_api
|
from monasca_log_api.app.controller.api import logs_api
|
||||||
from monasca_log_api.app.controller.v2.aid import service
|
from monasca_log_api.app.controller.v2.aid import service
|
||||||
|
from monasca_log_api import conf
|
||||||
|
|
||||||
|
CONF = conf.CONF
|
||||||
_DEPRECATED_INFO = ('/v2.0/log/single has been deprecated. '
|
_DEPRECATED_INFO = ('/v2.0/log/single has been deprecated. '
|
||||||
'Please use /v3.0/logs')
|
'Please use /v3.0/logs')
|
||||||
|
|
||||||
|
@ -38,37 +41,44 @@ class Logs(logs_api.LogsApi):
|
||||||
|
|
||||||
@falcon.deprecated(_DEPRECATED_INFO)
|
@falcon.deprecated(_DEPRECATED_INFO)
|
||||||
def on_post(self, req, res):
|
def on_post(self, req, res):
|
||||||
with self._logs_processing_time.time(name=None):
|
if CONF.monitoring.enable:
|
||||||
try:
|
with self._logs_processing_time.time(name=None):
|
||||||
req.validate(self.SUPPORTED_CONTENT_TYPES)
|
self.process_on_post_request(req, res)
|
||||||
tenant_id = (req.project_id if req.project_id
|
else:
|
||||||
else req.cross_project_id)
|
self.process_on_post_request(req, res)
|
||||||
|
|
||||||
log = self.get_log(request=req)
|
def process_on_post_request(self, req, res):
|
||||||
envelope = self.get_envelope(
|
try:
|
||||||
log=log,
|
req.validate(self.SUPPORTED_CONTENT_TYPES)
|
||||||
tenant_id=tenant_id
|
tenant_id = (req.project_id if req.project_id
|
||||||
)
|
else req.cross_project_id)
|
||||||
|
|
||||||
|
log = self.get_log(request=req)
|
||||||
|
envelope = self.get_envelope(
|
||||||
|
log=log,
|
||||||
|
tenant_id=tenant_id
|
||||||
|
)
|
||||||
|
if CONF.monitoring.enable:
|
||||||
self._logs_size_gauge.send(name=None,
|
self._logs_size_gauge.send(name=None,
|
||||||
value=int(req.content_length))
|
value=int(req.content_length))
|
||||||
self._logs_in_counter.increment()
|
self._logs_in_counter.increment()
|
||||||
except Exception:
|
except Exception:
|
||||||
# any validation that failed means
|
# any validation that failed means
|
||||||
# log is invalid and rejected
|
# log is invalid and rejected
|
||||||
|
if CONF.monitoring.enable:
|
||||||
self._logs_rejected_counter.increment()
|
self._logs_rejected_counter.increment()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
self._kafka_publisher.send_message(envelope)
|
self._kafka_publisher.send_message(envelope)
|
||||||
|
|
||||||
res.status = falcon.HTTP_204
|
res.status = falcon.HTTP_204
|
||||||
res.add_link(
|
res.add_link(
|
||||||
target=str(_get_v3_link(req)),
|
target=str(_get_v3_link(req)),
|
||||||
rel='current', # [RFC5005]
|
rel='current', # [RFC5005]
|
||||||
title='V3 Logs',
|
title='V3 Logs',
|
||||||
type_hint='application/json'
|
type_hint='application/json'
|
||||||
)
|
)
|
||||||
res.append_header('DEPRECATED', 'true')
|
res.append_header('DEPRECATED', 'true')
|
||||||
|
|
||||||
def get_envelope(self, log, tenant_id):
|
def get_envelope(self, log, tenant_id):
|
||||||
return self._log_creator.new_log_envelope(
|
return self._log_creator.new_log_envelope(
|
||||||
|
|
|
@ -32,7 +32,7 @@ class BulkProcessor(log_publisher.LogPublisher):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, logs_in_counter, logs_rejected_counter):
|
def __init__(self, logs_in_counter=None, logs_rejected_counter=None):
|
||||||
"""Initializes BulkProcessor.
|
"""Initializes BulkProcessor.
|
||||||
|
|
||||||
:param logs_in_counter: V3 received logs counter
|
:param logs_in_counter: V3 received logs counter
|
||||||
|
@ -40,11 +40,11 @@ class BulkProcessor(log_publisher.LogPublisher):
|
||||||
"""
|
"""
|
||||||
super(BulkProcessor, self).__init__()
|
super(BulkProcessor, self).__init__()
|
||||||
|
|
||||||
assert logs_in_counter is not None
|
if CONF.monitoring.enable:
|
||||||
assert logs_rejected_counter is not None
|
assert logs_in_counter is not None
|
||||||
|
assert logs_rejected_counter is not None
|
||||||
self._logs_in_counter = logs_in_counter
|
self._logs_in_counter = logs_in_counter
|
||||||
self._logs_rejected_counter = logs_rejected_counter
|
self._logs_rejected_counter = logs_rejected_counter
|
||||||
|
|
||||||
self.service_region = CONF.service.region
|
self.service_region = CONF.service.region
|
||||||
|
|
||||||
|
@ -70,10 +70,13 @@ class BulkProcessor(log_publisher.LogPublisher):
|
||||||
log_tenant_id)
|
log_tenant_id)
|
||||||
if t_el:
|
if t_el:
|
||||||
to_send_msgs.append(t_el)
|
to_send_msgs.append(t_el)
|
||||||
|
if CONF.monitoring.enable:
|
||||||
with self._publish_time_ms.time(name=None):
|
with self._publish_time_ms.time(name=None):
|
||||||
|
self._publish(to_send_msgs)
|
||||||
|
else:
|
||||||
self._publish(to_send_msgs)
|
self._publish(to_send_msgs)
|
||||||
sent_count = len(to_send_msgs)
|
|
||||||
|
sent_count = len(to_send_msgs)
|
||||||
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.error('Failed to send bulk package <logs=%d, dimensions=%s>',
|
LOG.error('Failed to send bulk package <logs=%d, dimensions=%s>',
|
||||||
|
@ -81,7 +84,8 @@ class BulkProcessor(log_publisher.LogPublisher):
|
||||||
LOG.exception(ex)
|
LOG.exception(ex)
|
||||||
raise ex
|
raise ex
|
||||||
finally:
|
finally:
|
||||||
self._update_counters(len(to_send_msgs), num_of_msgs)
|
if CONF.monitoring.enable:
|
||||||
|
self._update_counters(len(to_send_msgs), num_of_msgs)
|
||||||
self._after_publish(sent_count, len(to_send_msgs))
|
self._after_publish(sent_count, len(to_send_msgs))
|
||||||
|
|
||||||
def _update_counters(self, in_counter, to_send_counter):
|
def _update_counters(self, in_counter, to_send_counter):
|
||||||
|
|
|
@ -21,8 +21,11 @@ from monasca_log_api.app.base import validation
|
||||||
from monasca_log_api.app.controller.api import logs_api
|
from monasca_log_api.app.controller.api import logs_api
|
||||||
from monasca_log_api.app.controller.v3.aid import bulk_processor
|
from monasca_log_api.app.controller.v3.aid import bulk_processor
|
||||||
from monasca_log_api.app.controller.v3.aid import helpers
|
from monasca_log_api.app.controller.v3.aid import helpers
|
||||||
|
from monasca_log_api import conf
|
||||||
from monasca_log_api.monitoring import metrics
|
from monasca_log_api.monitoring import metrics
|
||||||
|
|
||||||
|
|
||||||
|
CONF = conf.CONF
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -34,51 +37,61 @@ class Logs(logs_api.LogsApi):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(Logs, self).__init__()
|
super(Logs, self).__init__()
|
||||||
|
|
||||||
self._processor = bulk_processor.BulkProcessor(
|
if CONF.monitoring.enable:
|
||||||
logs_in_counter=self._logs_in_counter,
|
self._processor = bulk_processor.BulkProcessor(
|
||||||
logs_rejected_counter=self._logs_rejected_counter
|
logs_in_counter=self._logs_in_counter,
|
||||||
)
|
logs_rejected_counter=self._logs_rejected_counter
|
||||||
self._bulks_rejected_counter = self._statsd.get_counter(
|
)
|
||||||
name=metrics.LOGS_BULKS_REJECTED_METRIC,
|
self._bulks_rejected_counter = self._statsd.get_counter(
|
||||||
dimensions=self._metrics_dimensions
|
name=metrics.LOGS_BULKS_REJECTED_METRIC,
|
||||||
)
|
dimensions=self._metrics_dimensions
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self._processor = bulk_processor.BulkProcessor()
|
||||||
|
|
||||||
def on_post(self, req, res):
|
def on_post(self, req, res):
|
||||||
with self._logs_processing_time.time(name=None):
|
if CONF.monitoring.enable:
|
||||||
try:
|
with self._logs_processing_time.time(name=None):
|
||||||
req.validate(self.SUPPORTED_CONTENT_TYPES)
|
self.process_on_post_request(req, res)
|
||||||
|
else:
|
||||||
|
self.process_on_post_request(req, res)
|
||||||
|
|
||||||
request_body = helpers.read_json_msg_body(req)
|
def process_on_post_request(self, req, res):
|
||||||
|
try:
|
||||||
|
req.validate(self.SUPPORTED_CONTENT_TYPES)
|
||||||
|
|
||||||
log_list = self._get_logs(request_body)
|
request_body = helpers.read_json_msg_body(req)
|
||||||
global_dimensions = self._get_global_dimensions(request_body)
|
|
||||||
|
|
||||||
except Exception as ex:
|
log_list = self._get_logs(request_body)
|
||||||
LOG.error('Entire bulk package has been rejected')
|
global_dimensions = self._get_global_dimensions(request_body)
|
||||||
LOG.exception(ex)
|
|
||||||
|
|
||||||
|
except Exception as ex:
|
||||||
|
LOG.error('Entire bulk package has been rejected')
|
||||||
|
LOG.exception(ex)
|
||||||
|
if CONF.monitoring.enable:
|
||||||
self._bulks_rejected_counter.increment(value=1)
|
self._bulks_rejected_counter.increment(value=1)
|
||||||
|
|
||||||
raise ex
|
raise ex
|
||||||
|
|
||||||
|
if CONF.monitoring.enable:
|
||||||
self._bulks_rejected_counter.increment(value=0)
|
self._bulks_rejected_counter.increment(value=0)
|
||||||
self._logs_size_gauge.send(name=None,
|
self._logs_size_gauge.send(name=None,
|
||||||
value=int(req.content_length))
|
value=int(req.content_length))
|
||||||
|
|
||||||
tenant_id = (req.cross_project_id if req.cross_project_id
|
tenant_id = (req.cross_project_id if req.cross_project_id
|
||||||
else req.project_id)
|
else req.project_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._processor.send_message(
|
self._processor.send_message(
|
||||||
logs=log_list,
|
logs=log_list,
|
||||||
global_dimensions=global_dimensions,
|
global_dimensions=global_dimensions,
|
||||||
log_tenant_id=tenant_id
|
log_tenant_id=tenant_id
|
||||||
)
|
)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
res.status = getattr(ex, 'status', falcon.HTTP_500)
|
res.status = getattr(ex, 'status', falcon.HTTP_500)
|
||||||
return
|
return
|
||||||
|
|
||||||
res.status = falcon.HTTP_204
|
res.status = falcon.HTTP_204
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_global_dimensions(request_body):
|
def _get_global_dimensions(request_body):
|
||||||
|
|
|
@ -19,6 +19,9 @@ _DEFAULT_PORT = 8125
|
||||||
_DEFAULT_BUFFER_SIZE = 50
|
_DEFAULT_BUFFER_SIZE = 50
|
||||||
|
|
||||||
monitoring_opts = [
|
monitoring_opts = [
|
||||||
|
cfg.BoolOpt('enable',
|
||||||
|
default=True,
|
||||||
|
help='Determine if self monitoring is enabled'),
|
||||||
cfg.IPOpt('statsd_host',
|
cfg.IPOpt('statsd_host',
|
||||||
default=_DEFAULT_HOST,
|
default=_DEFAULT_HOST,
|
||||||
help=('IP address of statsd server, default to %s'
|
help=('IP address of statsd server, default to %s'
|
||||||
|
|
Loading…
Reference in New Issue