diff --git a/monasca_log_api/app/base/log_publisher.py b/monasca_log_api/app/base/log_publisher.py index 4cdaf80d..2b51039d 100644 --- a/monasca_log_api/app/base/log_publisher.py +++ b/monasca_log_api/app/base/log_publisher.py @@ -68,22 +68,22 @@ class LogPublisher(object): self._kafka_publisher = producer.KafkaProducer( url=CONF.log_publisher.kafka_url ) + if CONF.monitoring.enable: + self._statsd = client.get_client() - self._statsd = client.get_client() - - # setup counter, gauges etc - self._logs_published_counter = self._statsd.get_counter( - metrics.LOGS_PUBLISHED_METRIC - ) - self._publish_time_ms = self._statsd.get_timer( - metrics.LOGS_PUBLISH_TIME_METRIC - ) - self._logs_lost_counter = self._statsd.get_counter( - metrics.LOGS_PUBLISHED_LOST_METRIC - ) - self._logs_truncated_gauge = self._statsd.get_gauge( - metrics.LOGS_TRUNCATED_METRIC - ) + # setup counter, gauges etc + self._logs_published_counter = self._statsd.get_counter( + metrics.LOGS_PUBLISHED_METRIC + ) + self._publish_time_ms = self._statsd.get_timer( + metrics.LOGS_PUBLISH_TIME_METRIC + ) + self._logs_lost_counter = self._statsd.get_counter( + metrics.LOGS_PUBLISHED_LOST_METRIC + ) + self._logs_truncated_gauge = self._statsd.get_gauge( + metrics.LOGS_TRUNCATED_METRIC + ) LOG.info('Initializing LogPublisher <%s>', self) @@ -117,8 +117,10 @@ class LogPublisher(object): for message in messages: msg = self._transform_message(message) send_messages.append(msg) - - with self._publish_time_ms.time(name=None): + if CONF.monitoring.enable: + with self._publish_time_ms.time(name=None): + self._publish(send_messages) + else: self._publish(send_messages) sent_counter = len(send_messages) @@ -199,11 +201,13 @@ class LogPublisher(object): envelope['log']['truncated'] = True envelope['log']['message'] = truncated_log_msg - self._logs_truncated_gauge.send(name=None, value=truncated_by) + if CONF.monitoring.enable: + self._logs_truncated_gauge.send(name=None, value=truncated_by) msg_str = rest_utils.as_json(envelope) else: - self._logs_truncated_gauge.send(name=None, value=0) + if CONF.monitoring.enable: + self._logs_truncated_gauge.send(name=None, value=0) return msg_str @@ -255,6 +259,6 @@ class LogPublisher(object): error_str = ('Failed to send all messages, %d ' 'messages out of %d have not been published') LOG.error(error_str, failed_to_send, to_send_count) - - self._logs_published_counter.increment(value=send_count) - self._logs_lost_counter.increment(value=failed_to_send) + if CONF.monitoring.enable: + self._logs_published_counter.increment(value=send_count) + self._logs_lost_counter.increment(value=failed_to_send) diff --git a/monasca_log_api/app/controller/api/logs_api.py b/monasca_log_api/app/controller/api/logs_api.py index a80e3be4..8d2f8b88 100644 --- a/monasca_log_api/app/controller/api/logs_api.py +++ b/monasca_log_api/app/controller/api/logs_api.py @@ -16,9 +16,11 @@ import falcon from oslo_log import log +from monasca_log_api import conf from monasca_log_api.monitoring import client from monasca_log_api.monitoring import metrics +CONF = conf.CONF LOG = log.getLogger(__name__) @@ -32,27 +34,28 @@ class LogsApi(object): """ def __init__(self): super(LogsApi, self).__init__() - self._statsd = client.get_client() + if CONF.monitoring.enable: + self._statsd = client.get_client() - # create_common counters, gauges etc. - self._metrics_dimensions = dimensions = {'version': self.version} + # create_common counters, gauges etc. + self._metrics_dimensions = dimensions = {'version': self.version} - self._logs_in_counter = self._statsd.get_counter( - name=metrics.LOGS_RECEIVED_METRIC, - dimensions=dimensions - ) - self._logs_size_gauge = self._statsd.get_gauge( - name=metrics.LOGS_RECEIVED_BYTE_SIZE_METRICS, - dimensions=dimensions - ) - self._logs_rejected_counter = self._statsd.get_counter( - name=metrics.LOGS_REJECTED_METRIC, - dimensions=dimensions - ) - self._logs_processing_time = self._statsd.get_timer( - name=metrics.LOGS_PROCESSING_TIME_METRIC, - dimensions=dimensions - ) + self._logs_in_counter = self._statsd.get_counter( + name=metrics.LOGS_RECEIVED_METRIC, + dimensions=dimensions + ) + self._logs_size_gauge = self._statsd.get_gauge( + name=metrics.LOGS_RECEIVED_BYTE_SIZE_METRICS, + dimensions=dimensions + ) + self._logs_rejected_counter = self._statsd.get_counter( + name=metrics.LOGS_REJECTED_METRIC, + dimensions=dimensions + ) + self._logs_processing_time = self._statsd.get_timer( + name=metrics.LOGS_PROCESSING_TIME_METRIC, + dimensions=dimensions + ) LOG.info('Initializing LogsApi %s!' % self.version) diff --git a/monasca_log_api/app/controller/v2/logs.py b/monasca_log_api/app/controller/v2/logs.py index 39ef7cf4..fb198e48 100644 --- a/monasca_log_api/app/controller/v2/logs.py +++ b/monasca_log_api/app/controller/v2/logs.py @@ -16,11 +16,14 @@ import falcon import six + from monasca_log_api.app.base import log_publisher from monasca_log_api.app.controller.api import headers from monasca_log_api.app.controller.api import logs_api from monasca_log_api.app.controller.v2.aid import service +from monasca_log_api import conf +CONF = conf.CONF _DEPRECATED_INFO = ('/v2.0/log/single has been deprecated. ' 'Please use /v3.0/logs') @@ -38,37 +41,44 @@ class Logs(logs_api.LogsApi): @falcon.deprecated(_DEPRECATED_INFO) def on_post(self, req, res): - with self._logs_processing_time.time(name=None): - try: - req.validate(self.SUPPORTED_CONTENT_TYPES) - tenant_id = (req.project_id if req.project_id - else req.cross_project_id) + if CONF.monitoring.enable: + with self._logs_processing_time.time(name=None): + self.process_on_post_request(req, res) + else: + self.process_on_post_request(req, res) - log = self.get_log(request=req) - envelope = self.get_envelope( - log=log, - tenant_id=tenant_id - ) + def process_on_post_request(self, req, res): + try: + req.validate(self.SUPPORTED_CONTENT_TYPES) + tenant_id = (req.project_id if req.project_id + else req.cross_project_id) + log = self.get_log(request=req) + envelope = self.get_envelope( + log=log, + tenant_id=tenant_id + ) + if CONF.monitoring.enable: self._logs_size_gauge.send(name=None, value=int(req.content_length)) self._logs_in_counter.increment() - except Exception: - # any validation that failed means - # log is invalid and rejected + except Exception: + # any validation that failed means + # log is invalid and rejected + if CONF.monitoring.enable: self._logs_rejected_counter.increment() - raise + raise - self._kafka_publisher.send_message(envelope) + self._kafka_publisher.send_message(envelope) - res.status = falcon.HTTP_204 - res.add_link( - target=str(_get_v3_link(req)), - rel='current', # [RFC5005] - title='V3 Logs', - type_hint='application/json' - ) - res.append_header('DEPRECATED', 'true') + res.status = falcon.HTTP_204 + res.add_link( + target=str(_get_v3_link(req)), + rel='current', # [RFC5005] + title='V3 Logs', + type_hint='application/json' + ) + res.append_header('DEPRECATED', 'true') def get_envelope(self, log, tenant_id): return self._log_creator.new_log_envelope( diff --git a/monasca_log_api/app/controller/v3/aid/bulk_processor.py b/monasca_log_api/app/controller/v3/aid/bulk_processor.py index 1afa2e89..8a431930 100644 --- a/monasca_log_api/app/controller/v3/aid/bulk_processor.py +++ b/monasca_log_api/app/controller/v3/aid/bulk_processor.py @@ -32,7 +32,7 @@ class BulkProcessor(log_publisher.LogPublisher): """ - def __init__(self, logs_in_counter, logs_rejected_counter): + def __init__(self, logs_in_counter=None, logs_rejected_counter=None): """Initializes BulkProcessor. :param logs_in_counter: V3 received logs counter @@ -40,11 +40,11 @@ class BulkProcessor(log_publisher.LogPublisher): """ super(BulkProcessor, self).__init__() - assert logs_in_counter is not None - assert logs_rejected_counter is not None - - self._logs_in_counter = logs_in_counter - self._logs_rejected_counter = logs_rejected_counter + if CONF.monitoring.enable: + assert logs_in_counter is not None + assert logs_rejected_counter is not None + self._logs_in_counter = logs_in_counter + self._logs_rejected_counter = logs_rejected_counter self.service_region = CONF.service.region @@ -70,10 +70,13 @@ class BulkProcessor(log_publisher.LogPublisher): log_tenant_id) if t_el: to_send_msgs.append(t_el) - - with self._publish_time_ms.time(name=None): + if CONF.monitoring.enable: + with self._publish_time_ms.time(name=None): + self._publish(to_send_msgs) + else: self._publish(to_send_msgs) - sent_count = len(to_send_msgs) + + sent_count = len(to_send_msgs) except Exception as ex: LOG.error('Failed to send bulk package ', @@ -81,7 +84,8 @@ class BulkProcessor(log_publisher.LogPublisher): LOG.exception(ex) raise ex finally: - self._update_counters(len(to_send_msgs), num_of_msgs) + if CONF.monitoring.enable: + self._update_counters(len(to_send_msgs), num_of_msgs) self._after_publish(sent_count, len(to_send_msgs)) def _update_counters(self, in_counter, to_send_counter): diff --git a/monasca_log_api/app/controller/v3/logs.py b/monasca_log_api/app/controller/v3/logs.py index 7d5654ca..6236fcf8 100644 --- a/monasca_log_api/app/controller/v3/logs.py +++ b/monasca_log_api/app/controller/v3/logs.py @@ -21,8 +21,11 @@ from monasca_log_api.app.base import validation from monasca_log_api.app.controller.api import logs_api from monasca_log_api.app.controller.v3.aid import bulk_processor from monasca_log_api.app.controller.v3.aid import helpers +from monasca_log_api import conf from monasca_log_api.monitoring import metrics + +CONF = conf.CONF LOG = log.getLogger(__name__) @@ -34,51 +37,61 @@ class Logs(logs_api.LogsApi): def __init__(self): super(Logs, self).__init__() - self._processor = bulk_processor.BulkProcessor( - logs_in_counter=self._logs_in_counter, - logs_rejected_counter=self._logs_rejected_counter - ) - self._bulks_rejected_counter = self._statsd.get_counter( - name=metrics.LOGS_BULKS_REJECTED_METRIC, - dimensions=self._metrics_dimensions - ) + if CONF.monitoring.enable: + self._processor = bulk_processor.BulkProcessor( + logs_in_counter=self._logs_in_counter, + logs_rejected_counter=self._logs_rejected_counter + ) + self._bulks_rejected_counter = self._statsd.get_counter( + name=metrics.LOGS_BULKS_REJECTED_METRIC, + dimensions=self._metrics_dimensions + ) + else: + self._processor = bulk_processor.BulkProcessor() def on_post(self, req, res): - with self._logs_processing_time.time(name=None): - try: - req.validate(self.SUPPORTED_CONTENT_TYPES) + if CONF.monitoring.enable: + with self._logs_processing_time.time(name=None): + self.process_on_post_request(req, res) + else: + self.process_on_post_request(req, res) - request_body = helpers.read_json_msg_body(req) + def process_on_post_request(self, req, res): + try: + req.validate(self.SUPPORTED_CONTENT_TYPES) - log_list = self._get_logs(request_body) - global_dimensions = self._get_global_dimensions(request_body) + request_body = helpers.read_json_msg_body(req) - except Exception as ex: - LOG.error('Entire bulk package has been rejected') - LOG.exception(ex) + log_list = self._get_logs(request_body) + global_dimensions = self._get_global_dimensions(request_body) + except Exception as ex: + LOG.error('Entire bulk package has been rejected') + LOG.exception(ex) + if CONF.monitoring.enable: self._bulks_rejected_counter.increment(value=1) - raise ex + raise ex + if CONF.monitoring.enable: self._bulks_rejected_counter.increment(value=0) self._logs_size_gauge.send(name=None, value=int(req.content_length)) - tenant_id = (req.cross_project_id if req.cross_project_id - else req.project_id) + tenant_id = (req.cross_project_id if req.cross_project_id + else req.project_id) - try: - self._processor.send_message( - logs=log_list, - global_dimensions=global_dimensions, - log_tenant_id=tenant_id - ) - except Exception as ex: - res.status = getattr(ex, 'status', falcon.HTTP_500) - return + try: + self._processor.send_message( + logs=log_list, + global_dimensions=global_dimensions, + log_tenant_id=tenant_id + ) + except Exception as ex: + res.status = getattr(ex, 'status', falcon.HTTP_500) + return - res.status = falcon.HTTP_204 + res.status = falcon.HTTP_204 @staticmethod def _get_global_dimensions(request_body): diff --git a/monasca_log_api/conf/monitoring.py b/monasca_log_api/conf/monitoring.py index 40533777..9ee029c6 100644 --- a/monasca_log_api/conf/monitoring.py +++ b/monasca_log_api/conf/monitoring.py @@ -19,6 +19,9 @@ _DEFAULT_PORT = 8125 _DEFAULT_BUFFER_SIZE = 50 monitoring_opts = [ + cfg.BoolOpt('enable', + default=True, + help='Determine if self monitoring is enabled'), cfg.IPOpt('statsd_host', default=_DEFAULT_HOST, help=('IP address of statsd server, default to %s'