python-opentsdbclient/opentsdbclient/socket/client.py

222 lines
8.3 KiB
Python

# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import logging
import random
import socket
import time
import six
from opentsdbclient import base
LOG = logging.getLogger('opentsdb_client')
class SocketOpenTSDBClient(base.BaseOpenTSDBClient):
def __init__(self, opentsdb_hosts, conn_verify_trusted_time=60,
reconnect_interval=0, send_queue_max_size=1000,
max_uncaught_exceptions=100, wait_retry=False):
super(SocketOpenTSDBClient, self).__init__(opentsdb_hosts)
self.send_queue = []
self.tsd = None
self.host = None
self.port = None
self.blacklisted_hosts = set()
self.current_tsd_host = -1
self.last_verify = 0
self.time_reconnect = 0
self.conn_verify_trusted_time = conn_verify_trusted_time
self.reconnect_interval = reconnect_interval
self.send_queue_max_size = send_queue_max_size
self.max_uncaught_exceptions = max_uncaught_exceptions
self.wait_retry = wait_retry
def blacklist_tsd_host(self):
"""Marks the current TSD host we're trying to use as blacklisted."""
LOG.info('Blacklisting %s:%s for a while', self.host, self.port)
self.blacklisted_hosts.add((self.host, self.port))
def verify_connection(self):
"""Is used to check is socket connection is actually working."""
if self.tsd is None:
return False
# if last verifying was not so long ago, let's trust this connection
if self.last_verify > time.time() - self.conn_verify_trusted_time:
return True
# if it's time to reconnect, let's close current one
if (self.reconnect_interval > 0 and
self.time_reconnect < time.time() - self.reconnect_interval):
try:
self.tsd.close()
except socket.error:
pass
self.time_reconnect = time.time()
return False
LOG.debug('Verifying our TSD connection is alive')
try:
# this request is *really* light-weighted, good thing to check the
# connectivity
self.tsd.sendall('version\n')
except socket.error:
self.tsd = None
self.blacklist_tsd_host()
return False
bufsize = 4096
# read some ^^ data from socket connection to make sure it's *really*
# alive
try:
buf = self.tsd.recv(bufsize)
except socket.error:
self.tsd = None
self.blacklist_tsd_host()
return False
if not buf:
self.tsd = None
self.blacklist_tsd_host()
return False
self.last_verify = time.time()
return True
def pick_connection(self):
"""Picks up a random host/port connection."""
for self.current_tsd_host in range(self.current_tsd_host + 1,
len(self.hosts)):
host_port = self.hosts[self.current_tsd_host]
if host_port not in self.blacklisted_hosts:
break
else:
LOG.info('No more healthy hosts, '
'retry with previously blacklisted')
random.shuffle(self.hosts)
self.blacklisted_hosts.clear()
self.current_tsd_host = 0
host_port = self.hosts[self.current_tsd_host]
self.host, self.port = host_port
LOG.info('Selected connection: %s:%d', self.host, self.port)
def maintain_connection(self):
while True:
if self.verify_connection():
return
# that's just a hack to sleep some time if OpenTSDB is somehow
# maintained at the moment
if self.wait_retry:
try_delay = random.randint(60, 360)
LOG.debug('SenderThread blocking %0.2f seconds', try_delay)
time.sleep(try_delay)
# now actually try the connection
self.pick_connection()
try:
addresses = socket.getaddrinfo(self.host, self.port,
socket.AF_UNSPEC,
socket.SOCK_STREAM, 0)
except socket.gaierror as e:
# Don't croak on transient DNS resolution issues.
if e[0] in (socket.EAI_AGAIN, socket.EAI_NONAME,
socket.EAI_NODATA):
LOG.debug('DNS resolution failure: %s: %s', self.host, e)
continue
raise
for family, socket_type, proto, canon_name, sock_addr in addresses:
try:
self.tsd = socket.socket(family, socket_type, proto)
self.tsd.settimeout(15)
self.tsd.connect(sock_addr)
# if we get here it connected
LOG.debug('Connection to %s was successful'
% (str(sock_addr)))
break
except socket.error as e:
LOG.warning('Connection attempt failed to %s:%d: %s',
self.host, self.port, e)
self.tsd.close()
self.tsd = None
if not self.tsd:
LOG.error('Failed to connect to %s:%d', self.host, self.port)
self.blacklist_tsd_host()
def put_meter(self, meters):
"""Post new meter(s) to the database.
Meter dictionary *should* contain the following four required fields:
- metric: the name of the metric you are storing
- timestamp: a Unix epoch style timestamp in seconds or milliseconds.
The timestamp must not contain non-numeric characters.
- value: the value to record for this data point. It may be quoted or
not quoted and must conform to the OpenTSDB value rules.
- tags: a map of tag name/tag value pairs. At least one pair must be
supplied.
"""
# put meter to the send_queue and check if it's time to send it to the
# OpenTSDB
meters = self._check_meters(meters)
self.send_queue = list(itertools.chain(self.send_queue, meters))
if len(self.send_queue) <= self.send_queue_max_size:
return
self.maintain_connection()
errors = 0
try:
self.send_data()
except (ArithmeticError, EOFError, EnvironmentError, LookupError,
ValueError):
errors += 1
if errors > self.max_uncaught_exceptions:
raise
LOG.exception('Uncaught exception while trying to send meters, '
'ignoring')
except Exception:
LOG.exception('Uncaught exception in while trying to send meters, '
'going to raise. Max number %s of uncaught errors '
'has been collected' % self.max_uncaught_exceptions)
raise
def compose_line_from_meter(self, meter_dict):
tags = meter_dict.pop('tags')
tags = ''.join(' %s=%s' % (k, v) for k, v in six.iteritems(tags))
line = '%(metric)s %(timestamp)d %(value)s' % meter_dict
return '%(metric)s%(tags)s' % {'metric': line, 'tags': tags}
def send_data(self):
req = ''.join("put %s\n" % self.compose_line_from_meter(meter_dict)
for meter_dict in self.send_queue)
try:
self.tsd.sendall(req)
self.send_queue = []
except socket.error as e:
LOG.error('failed to send data: %s', e)
try:
self.tsd.close()
except socket.error:
pass
self.tsd = None
self.blacklist_tsd_host()