Add configurable structure of OpenTSDB client

Now it's possible to choose one of two client implementations -
either via REST API or via direct socket usage. Socket client
currently supports only meter posting.

Change-Id: I6005cfa23ffbe4bdba15e62637150e8a29bdefc2
This commit is contained in:
Dina Belova 2014-10-31 16:02:36 +03:00
parent c43115400e
commit a6419e6cf3
12 changed files with 583 additions and 149 deletions

View File

@ -29,4 +29,4 @@ class InvalidOpenTSDBFormat(OpenTSDBError):
msg += ' Please provide data in %s format.'
super(InvalidOpenTSDBFormat, self).__init__(msg)
self.actual = actual
self.expected = expected
self.expected = expected

80
opentsdbclient/base.py Normal file
View File

@ -0,0 +1,80 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import opentsdbclient
class BaseOpenTSDBClient(object):
def __init__(self, opentsdb_hosts, **kwargs):
self.hosts = opentsdb_hosts
def get_statistics(self):
"""Get info about what metrics are registered and with what stats."""
raise NotImplementedError
def put_meter(self, meters):
"""Post new meter(s) to the database.
Meter dictionary *should* contain the following four required fields:
- metric: the name of the metric you are storing
- timestamp: a Unix epoch style timestamp in seconds or milliseconds.
The timestamp must not contain non-numeric characters.
- value: the value to record for this data point. It may be quoted or
not quoted and must conform to the OpenTSDB value rules.
- tags: a map of tag name/tag value pairs. At least one pair must be
supplied.
"""
raise NotImplementedError
def define_retention(self, tsuid, retention_days):
"""Set retention days for the defined by ID timeseries.
##########################################################
NOTE: currently not working directly through the REST API.
that should be done directly on the HBase level.
##########################################################
:param tsuid: hexadecimal representation of the timeseries UID
:param retention_days: number of days of data points to retain for the
given timeseries. When set to 0, the default,
data is retained indefinitely.
"""
raise NotImplementedError
def get_aggregators(self):
"""Used to get the list of default aggregation functions."""
raise NotImplementedError
def get_version(self):
"""Used to check OpenTSDB version.
That might be needed in case of unknown bugs - this code is written
only for the 2.x REST API version, so some of the failures might refer
to the wrong OpenTSDB version installed.
"""
raise NotImplementedError
def _check_meters(self, meters):
"""Check that meters to be put are having nice format."""
if type(meters) == dict:
meters = [meters]
for meter_dict in meters:
if (set(meter_dict.keys())
!= set(['metric', 'timestamp', 'value', 'tags'])):
raise opentsdbclient.InvalidOpenTSDBFormat(
actual=meter_dict,
expected="{'metric': <meter_name>, 'timestamp': <ts>, "
"'value': <value>, 'tags': <at least one pair>}")
return meters

View File

@ -13,109 +13,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import requests
import opentsdbclient
from opentsdbclient import utils
from opentsdbclient.rest import client as rest_cl
from opentsdbclient.socket import client as socket_cl
class OpenTSDBClient(object):
def __init__(self, opentsdb_host, opentsdb_port):
self.host = opentsdb_host
self.port = opentsdb_port
def get_statistics(self):
"""Get info about what metrics are registered and with what stats."""
req = requests.get(utils.STATS_TEMPL % {'host': self.host,
'port': self.port})
return req
def put_meter(self, meters):
"""Post new meter(s) to the database.
Meter dictionary *should* contain the following four required fields:
- metric: the name of the metric you are storing
- timestamp: a Unix epoch style timestamp in seconds or milliseconds.
The timestamp must not contain non-numeric characters.
- value: the value to record for this data point. It may be quoted or
not quoted and must conform to the OpenTSDB value rules.
- tags: a map of tag name/tag value pairs. At least one pair must be
supplied.
"""
res = []
if type(meters) == dict:
meters = [meters]
for meter_dict in meters:
if (set(meter_dict.keys())
!= set(['metric', 'timestamp', 'value', 'tags'])):
raise opentsdbclient.InvalidOpenTSDBFormat(
actual=meter_dict,
expected="{'metric': <meter_name>, 'timestamp': <ts>, "
"'value': <value>, 'tags': <at least one pair>}")
req = requests.post(utils.PUT_TEMPL %
{'host': self.host, 'port': self.port},
data=json.dumps(meter_dict))
res.append(req)
return res
def define_retention(self, tsuid, retention_days):
"""Set retention days for the defined by ID timeseries.
##########################################################
NOTE: currently not working directly through the REST API.
that should be done directly on the HBase level.
##########################################################
:param tsuid: hexadecimal representation of the timeseries UID
:param retention_days: number of days of data points to retain for the
given timeseries. When set to 0, the default,
data is retained indefinitely.
"""
meta_data = {'tsuid': tsuid, 'retention': retention_days}
req = requests.post(utils.META_TEMPL % {'host': self.host,
'port': self.port,
'tsuid': tsuid},
data=json.dumps(meta_data))
return req
def get_aggregators(self):
"""Used to get the list of default aggregation functions."""
req = requests.get(utils.AGGR_TEMPL % {'host': self.host,
'port': self.port})
return req
def get_version(self):
"""Used to check OpenTSDB version.
That might be needed in case of unknown bugs - this code is written
only for the 2.x REST API version, so some of the failures might refer
to the wrong OpenTSDB version installed.
"""
req = requests.get(utils.VERSION_TEMPL % {'host': self.host,
'port': self.port})
return req
def _make_query(self, query, verb):
meth = getattr(requests, verb.lower(), None)
if meth is None:
pass
req = meth(utils.QUERY_TEMPL % {'host': self.host, 'port': self.port,
'query': query})
return req
def get_query(self, query):
return self._make_query(query, 'get')
def process_response(self, resp):
try:
res = json.loads(resp.text)
except Exception:
raise opentsdbclient.OpenTSDBError(resp.text)
if 'errors' in res:
raise opentsdbclient.OpenTSDBError(res['error'])
return res
def get_client(hosts, protocol='rest', **kwargs):
if protocol == 'rest':
return rest_cl.RESTOpenTSDBClient(hosts, **kwargs)
elif protocol == 'socket':
return socket_cl.SocketOpenTSDBClient(hosts, **kwargs)
else:
raise opentsdbclient.OpenTSDBError('No %s protocol to communicate with'
'OpenTSDB implemented.' % protocol)

View File

View File

@ -0,0 +1,113 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import requests
import opentsdbclient
from opentsdbclient import base
from opentsdbclient.rest import utils
class RESTOpenTSDBClient(base.BaseOpenTSDBClient):
def get_statistics(self):
"""Get info about what metrics are registered and with what stats."""
req = requests.get(utils.STATS_TEMPL % {'host': self.hosts[0][0],
'port': self.hosts[0][1]})
return req
def put_meter(self, meters):
"""Post new meter(s) to the database.
Meter dictionary *should* contain the following four required fields:
- metric: the name of the metric you are storing
- timestamp: a Unix epoch style timestamp in seconds or milliseconds.
The timestamp must not contain non-numeric characters.
- value: the value to record for this data point. It may be quoted or
not quoted and must conform to the OpenTSDB value rules.
- tags: a map of tag name/tag value pairs. At least one pair must be
supplied.
"""
res = []
meters = self._check_meters(meters)
for meter_dict in meters:
req = requests.post(utils.PUT_TEMPL %
{'host': self.hosts[0][0],
'port': self.hosts[0][1]},
data=json.dumps(meter_dict))
res.append(req)
return res
def define_retention(self, tsuid, retention_days):
"""Set retention days for the defined by ID timeseries.
##########################################################
NOTE: currently not working directly through the REST API.
that should be done directly on the HBase level.
##########################################################
:param tsuid: hexadecimal representation of the timeseries UID
:param retention_days: number of days of data points to retain for the
given timeseries. When set to 0, the default,
data is retained indefinitely.
"""
meta_data = {'tsuid': tsuid, 'retention': retention_days}
req = requests.post(utils.META_TEMPL % {'host': self.hosts[0][0],
'port': self.hosts[0][1],
'tsuid': tsuid},
data=json.dumps(meta_data))
return req
def get_aggregators(self):
"""Used to get the list of default aggregation functions."""
req = requests.get(utils.AGGR_TEMPL % {'host': self.hosts[0][0],
'port': self.hosts[0][1]})
return req
def get_version(self):
"""Used to check OpenTSDB version.
That might be needed in case of unknown bugs - this code is written
only for the 2.x REST API version, so some of the failures might refer
to the wrong OpenTSDB version installed.
"""
req = requests.get(utils.VERSION_TEMPL % {'host': self.hosts[0][0],
'port': self.hosts[0][1]})
return req
def _make_query(self, query, verb):
meth = getattr(requests, verb.lower(), None)
if meth is None:
pass
req = meth(utils.QUERY_TEMPL % {'host': self.hosts[0][0],
'port': self.hosts[0][1],
'query': query})
return req
def get_query(self, query):
return self._make_query(query, 'get')
def process_response(self, resp):
try:
res = json.loads(resp.text)
except Exception:
raise opentsdbclient.OpenTSDBError(resp.text)
if 'errors' in res:
raise opentsdbclient.OpenTSDBError(res['error'])
return res

View File

View File

@ -0,0 +1,221 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import logging
import random
import socket
import time
import six
from opentsdbclient import base
LOG = logging.getLogger('opentsdb_client')
class SocketOpenTSDBClient(base.BaseOpenTSDBClient):
def __init__(self, opentsdb_hosts, conn_verify_trusted_time=60,
reconnect_interval=0, send_queue_max_size=1000,
max_uncaught_exceptions=100, wait_retry=False):
super(SocketOpenTSDBClient, self).__init__(opentsdb_hosts)
self.send_queue = []
self.tsd = None
self.host = None
self.port = None
self.blacklisted_hosts = set()
self.current_tsd_host = -1
self.last_verify = 0
self.time_reconnect = 0
self.conn_verify_trusted_time = conn_verify_trusted_time
self.reconnect_interval = reconnect_interval
self.send_queue_max_size = send_queue_max_size
self.max_uncaught_exceptions = max_uncaught_exceptions
self.wait_retry = wait_retry
def blacklist_tsd_host(self):
"""Marks the current TSD host we're trying to use as blacklisted."""
LOG.info('Blacklisting %s:%s for a while', self.host, self.port)
self.blacklisted_hosts.add((self.host, self.port))
def verify_connection(self):
"""Is used to check is socket connection is actually working."""
if self.tsd is None:
return False
# if last verifying was not so long ago, let's trust this connection
if self.last_verify > time.time() - self.conn_verify_trusted_time:
return True
# if it's time to reconnect, let's close current one
if (self.reconnect_interval > 0 and
self.time_reconnect < time.time() - self.reconnect_interval):
try:
self.tsd.close()
except socket.error:
pass
self.time_reconnect = time.time()
return False
LOG.debug('Verifying our TSD connection is alive')
try:
# this request is *really* light-weighted, good thing to check the
# connectivity
self.tsd.sendall('version\n')
except socket.error:
self.tsd = None
self.blacklist_tsd_host()
return False
bufsize = 4096
# read some ^^ data from socket connection to make sure it's *really*
# alive
try:
buf = self.tsd.recv(bufsize)
except socket.error:
self.tsd = None
self.blacklist_tsd_host()
return False
if not buf:
self.tsd = None
self.blacklist_tsd_host()
return False
self.last_verify = time.time()
return True
def pick_connection(self):
"""Picks up a random host/port connection."""
for self.current_tsd_host in range(self.current_tsd_host + 1,
len(self.hosts)):
host_port = self.hosts[self.current_tsd_host]
if host_port not in self.blacklisted_hosts:
break
else:
LOG.info('No more healthy hosts, '
'retry with previously blacklisted')
random.shuffle(self.hosts)
self.blacklisted_hosts.clear()
self.current_tsd_host = 0
host_port = self.hosts[self.current_tsd_host]
self.host, self.port = host_port
LOG.info('Selected connection: %s:%d', self.host, self.port)
def maintain_connection(self):
while True:
if self.verify_connection():
return
# that's just a hack to sleep some time if OpenTSDB is somehow
# maintained at the moment
if self.wait_retry:
try_delay = random.randint(60, 360)
LOG.debug('SenderThread blocking %0.2f seconds', try_delay)
time.sleep(try_delay)
# now actually try the connection
self.pick_connection()
try:
addresses = socket.getaddrinfo(self.host, self.port,
socket.AF_UNSPEC,
socket.SOCK_STREAM, 0)
except socket.gaierror as e:
# Don't croak on transient DNS resolution issues.
if e[0] in (socket.EAI_AGAIN, socket.EAI_NONAME,
socket.EAI_NODATA):
LOG.debug('DNS resolution failure: %s: %s', self.host, e)
continue
raise
for family, socket_type, proto, canon_name, sock_addr in addresses:
try:
self.tsd = socket.socket(family, socket_type, proto)
self.tsd.settimeout(15)
self.tsd.connect(sock_addr)
# if we get here it connected
LOG.debug('Connection to %s was successful'
% (str(sock_addr)))
break
except socket.error as e:
LOG.warning('Connection attempt failed to %s:%d: %s',
self.host, self.port, e)
self.tsd.close()
self.tsd = None
if not self.tsd:
LOG.error('Failed to connect to %s:%d', self.host, self.port)
self.blacklist_tsd_host()
def put_meter(self, meters):
"""Post new meter(s) to the database.
Meter dictionary *should* contain the following four required fields:
- metric: the name of the metric you are storing
- timestamp: a Unix epoch style timestamp in seconds or milliseconds.
The timestamp must not contain non-numeric characters.
- value: the value to record for this data point. It may be quoted or
not quoted and must conform to the OpenTSDB value rules.
- tags: a map of tag name/tag value pairs. At least one pair must be
supplied.
"""
# put meter to the send_queue and check if it's time to send it to the
# OpenTSDB
meters = self._check_meters(meters)
self.send_queue = list(itertools.chain(self.send_queue, meters))
if len(self.send_queue) <= self.send_queue_max_size:
return
self.maintain_connection()
errors = 0
try:
self.send_data()
except (ArithmeticError, EOFError, EnvironmentError, LookupError,
ValueError):
errors += 1
if errors > self.max_uncaught_exceptions:
raise
LOG.exception('Uncaught exception while trying to send meters, '
'ignoring')
except Exception:
LOG.exception('Uncaught exception in while trying to send meters, '
'going to raise. Max number %s of uncaught errors '
'has been collected' % self.max_uncaught_exceptions)
raise
def compose_line_from_meter(self, meter_dict):
tags = meter_dict.pop('tags')
tags = ''.join(' %s=%s' % (k, v) for k, v in six.iteritems(tags))
line = '%(metric)s %(timestamp)d %(value)s' % meter_dict
return '%(metric)s%(tags)s' % {'metric': line, 'tags': tags}
def send_data(self):
req = ''.join("put %s\n" % self.compose_line_from_meter(meter_dict)
for meter_dict in self.send_queue)
try:
self.tsd.sendall(req)
self.send_queue = []
except socket.error as e:
LOG.error('failed to send data: %s', e)
try:
self.tsd.close()
except socket.error:
pass
self.tsd = None
self.blacklist_tsd_host()

View File

@ -13,54 +13,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import requests
from opentsdbclient import client
from opentsdbclient.rest import client as rest_cl
from opentsdbclient.socket import client as socket_cl
from opentsdbclient import tests
class ClientTest(tests.BaseTestCase):
def setUp(self):
super(ClientTest, self).setUp()
self.client = client.OpenTSDBClient(opentsdb_host=self.host,
opentsdb_port=self.port)
def test_get_client(self):
r_cl = client.get_client((self.host, self.port), protocol='rest')
s_cl = client.get_client((self.host, self.port), protocol='socket')
@mock.patch.object(requests, 'get')
def test_get_statistics(self, get_mock):
self.client.get_statistics()
get_mock.assert_called_once_with('http://127.0.0.1:4242/api/stats')
@mock.patch.object(requests, 'post')
def test_put_meter(self, post_mock):
put_dict = {'metric': 'bla', 'timestamp': '0',
'value': 123, 'tags': {'some_tag': 'foo'}}
self.client.put_meter(put_dict)
post_mock.assert_called_once_with(
'http://127.0.0.1:4242/api/put?details', data=json.dumps(put_dict))
@mock.patch.object(requests, 'post')
def test_define_retention(self, post_mock):
self.client.define_retention('foo', 12)
post_mock.assert_called_once_with(
'http://127.0.0.1:4242/api/uid/tsmeta?tsuid=foo',
data='{"tsuid": "foo", "retention": 12}')
@mock.patch.object(requests, 'get')
def test_get_aggregators(self, get_mock):
self.client.get_aggregators()
get_mock.assert_called_once_with(
'http://127.0.0.1:4242/api/aggregators')
@mock.patch.object(requests, 'get')
def test_get_version(self, get_mock):
self.client.get_version()
get_mock.assert_called_once_with('http://127.0.0.1:4242/api/version')
@mock.patch.object(requests, 'get')
def test_get_query(self, get_mock):
self.client.get_query('start=0&end=12&m=max:2-min:bla')
get_mock.assert_called_once_with(
'http://127.0.0.1:4242/api/query?start=0&end=12&m=max:2-min:bla')
self.assertIsInstance(r_cl, rest_cl.RESTOpenTSDBClient)
self.assertIsInstance(s_cl, socket_cl.SocketOpenTSDBClient)

View File

@ -0,0 +1,66 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
import requests
from opentsdbclient.rest import client
from opentsdbclient import tests
class RESTClientTest(tests.BaseTestCase):
def setUp(self):
super(RESTClientTest, self).setUp()
self.client = client.RESTOpenTSDBClient(opentsdb_hosts=[(self.host,
self.port)])
@mock.patch.object(requests, 'get')
def test_get_statistics(self, get_mock):
self.client.get_statistics()
get_mock.assert_called_once_with('http://127.0.0.1:4242/api/stats')
@mock.patch.object(requests, 'post')
def test_put_meter(self, post_mock):
put_dict = {'metric': 'bla', 'timestamp': '0',
'value': 123, 'tags': {'some_tag': 'foo'}}
self.client.put_meter(put_dict)
post_mock.assert_called_once_with(
'http://127.0.0.1:4242/api/put?details', data=json.dumps(put_dict))
@mock.patch.object(requests, 'post')
def test_define_retention(self, post_mock):
self.client.define_retention('foo', 12)
post_mock.assert_called_once_with(
'http://127.0.0.1:4242/api/uid/tsmeta?tsuid=foo',
data='{"tsuid": "foo", "retention": 12}')
@mock.patch.object(requests, 'get')
def test_get_aggregators(self, get_mock):
self.client.get_aggregators()
get_mock.assert_called_once_with(
'http://127.0.0.1:4242/api/aggregators')
@mock.patch.object(requests, 'get')
def test_get_version(self, get_mock):
self.client.get_version()
get_mock.assert_called_once_with('http://127.0.0.1:4242/api/version')
@mock.patch.object(requests, 'get')
def test_get_query(self, get_mock):
self.client.get_query('start=0&end=12&m=max:2-min:bla')
get_mock.assert_called_once_with(
'http://127.0.0.1:4242/api/query?start=0&end=12&m=max:2-min:bla')

View File

@ -0,0 +1,84 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import time
import mock
from opentsdbclient.socket import client
from opentsdbclient import tests
class SocketClientTest(tests.BaseTestCase):
def setUp(self):
super(SocketClientTest, self).setUp()
self.client = client.SocketOpenTSDBClient(opentsdb_hosts=[(self.host,
self.port)],
send_queue_max_size=1)
self.client.tsd = mock.MagicMock()
self.client.tsd.sendall = mock.MagicMock()
self.meters = [{'metric': 'bla1', 'timestamp': 12345,
'value': 123, 'tags': {'some_tag': 'foo'}},
{'metric': 'bla2', 'timestamp': 23456,
'value': 123, 'tags': {'some_tag': 'foo'}}]
def test_put_meter(self):
self.client.maintain_connection = mock.MagicMock()
self.client.put_meter(self.meters)
self.client.tsd.sendall.assert_called_once_with(
'put bla1 12345 123 some_tag=foo\n'
'put bla2 23456 123 some_tag=foo\n')
def test_send_data(self):
self.client.send_queue = self.meters
self.client.send_data()
self.client.tsd.sendall.assert_called_once_with(
'put bla1 12345 123 some_tag=foo\n'
'put bla2 23456 123 some_tag=foo\n')
def test_compose_line_from_meter(self):
res = self.client.compose_line_from_meter(self.meters[0])
self.assertEqual('bla1 12345 123 some_tag=foo', res)
@mock.patch.object(socket, 'getaddrinfo')
@mock.patch.object(socket, 'socket')
def test_maintain_connection(self, sock_mock, sock_addr_mock):
self.client.verify_connection = mock.MagicMock()
pop_list = [True, False]
self.client.verify_connection.side_effect = lambda: pop_list.pop()
sock_addr_mock.side_effect = lambda a, b, c, d, e: [(1, 2, 3, 4, 5)]
tsd = mock.MagicMock()
tsd.connect = mock.MagicMock()
sock_mock.side_effect = tsd
self.client.maintain_connection()
sock_addr_mock.assert_called_once_with(self.host, self.port,
socket.AF_UNSPEC,
socket.SOCK_STREAM, 0)
sock_mock.assert_called_once_with(1, 2, 3)
self.client.tsd.connect.assert_called_once_with(5)
def test_verify_connection_non_tsd(self):
self.client.tsd = None
self.assertFalse(self.client.verify_connection())
def test_verify_connection_tsd(self):
self.assertTrue(self.client.verify_connection())
def test_verify_connection_tsd_reconnect(self):
self.client.last_verify = time.time() - 3600
self.client.reconnect_interval = 1
self.assertFalse(self.client.verify_connection())

View File

@ -1,2 +1,3 @@
pbr>=0.6,!=0.7,<1.0
requests>=1.1
six>=1.7.0