From 0552c04687b6103203a9d52efff239f1ab429932 Mon Sep 17 00:00:00 2001 From: Swann Croiset Date: Tue, 10 Jan 2017 11:18:04 +0100 Subject: [PATCH] Improve Elasticsearch collectd plugin This change modifies the Elastcisearch plugin to retrieve the cluster metrics only from the node that is the elected master. This avoids sending and storing duplicated metrics into InfluxDB. Change-Id: Iaeb90593e5aa4a2ddedea6abd4445a1a64b554d2 Co-Authored-By: Simon Pasquier --- .../files/collectd/elasticsearch_cluster.py | 36 +++++++++++++++---- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/deployment_scripts/puppet/modules/lma_collector/files/collectd/elasticsearch_cluster.py b/deployment_scripts/puppet/modules/lma_collector/files/collectd/elasticsearch_cluster.py index 5947f6210..f60c1bcaf 100644 --- a/deployment_scripts/puppet/modules/lma_collector/files/collectd/elasticsearch_cluster.py +++ b/deployment_scripts/puppet/modules/lma_collector/files/collectd/elasticsearch_cluster.py @@ -35,6 +35,7 @@ class ElasticsearchClusterHealthPlugin(base.Base): self.plugin = NAME self.address = '127.0.0.1' self.port = 9200 + self._node_id = None self.session = requests.Session() self.url = None self.session.mount( @@ -51,25 +52,41 @@ class ElasticsearchClusterHealthPlugin(base.Base): if node.key == 'Port': self.port = node.values[0] - self.url = "http://{address}:{port}/_cluster/health".format( + self.url = "http://{address}:{port}/".format( **{ 'address': self.address, 'port': int(self.port), }) - def itermetrics(self): + def query_api(self, resource): + url = "{}{}".format(self.url, resource) try: - r = self.session.get(self.url) + r = self.session.get(url) except Exception as e: - msg = "Got exception for '{}': {}".format(self.url, e) + msg = "Got exception for '{}': {}".format(url, e) raise base.CheckException(msg) if r.status_code != 200: - msg = "{} responded with code {}".format( - self.url, r.status_code) + msg = "{} responded with code {}".format(url, r.status_code) raise base.CheckException(msg) - data = r.json() + return r.json() + + @property + def node_id(self): + if self._node_id is None: + local_node = self.query_api('_nodes/_local') + self._node_id = local_node.get('nodes', {}).keys()[0] + + return self._node_id + + def itermetrics(self): + # Collect cluster metrics only from the elected master + master_node = self.query_api('_cluster/state/master_node') + if master_node.get('master_node', '') != self.node_id: + return + + data = self.query_api('_cluster/health') self.logger.debug("Got response from Elasticsearch: '%s'" % data) yield { @@ -92,6 +109,10 @@ class ElasticsearchClusterHealthPlugin(base.Base): plugin = ElasticsearchClusterHealthPlugin(collectd, 'elasticsearch') +def init_callback(): + plugin.restore_sigchld() + + def config_callback(conf): plugin.config_callback(conf) @@ -99,5 +120,6 @@ def config_callback(conf): def read_callback(): plugin.read_callback() +collectd.register_init(init_callback) collectd.register_config(config_callback) collectd.register_read(read_callback)