From a6419e6cf36d9b18bd1c0e867a4e75caa61a4ab2 Mon Sep 17 00:00:00 2001 From: Dina Belova Date: Fri, 31 Oct 2014 16:02:36 +0300 Subject: [PATCH] Add configurable structure of OpenTSDB client Now it's possible to choose one of two client implementations - either via REST API or via direct socket usage. Socket client currently supports only meter posting. Change-Id: I6005cfa23ffbe4bdba15e62637150e8a29bdefc2 --- opentsdbclient/__init__.py | 2 +- opentsdbclient/base.py | 80 ++++++++ opentsdbclient/client.py | 113 +---------- opentsdbclient/rest/__init__.py | 0 opentsdbclient/rest/client.py | 113 +++++++++++ opentsdbclient/{ => rest}/utils.py | 0 opentsdbclient/socket/__init__.py | 0 opentsdbclient/socket/client.py | 221 +++++++++++++++++++++ opentsdbclient/tests/test_client.py | 52 +---- opentsdbclient/tests/test_rest_client.py | 66 ++++++ opentsdbclient/tests/test_socket_client.py | 84 ++++++++ requirements.txt | 1 + 12 files changed, 583 insertions(+), 149 deletions(-) create mode 100644 opentsdbclient/base.py create mode 100644 opentsdbclient/rest/__init__.py create mode 100644 opentsdbclient/rest/client.py rename opentsdbclient/{ => rest}/utils.py (100%) create mode 100644 opentsdbclient/socket/__init__.py create mode 100644 opentsdbclient/socket/client.py create mode 100644 opentsdbclient/tests/test_rest_client.py create mode 100644 opentsdbclient/tests/test_socket_client.py diff --git a/opentsdbclient/__init__.py b/opentsdbclient/__init__.py index 11b7754..033d43c 100644 --- a/opentsdbclient/__init__.py +++ b/opentsdbclient/__init__.py @@ -29,4 +29,4 @@ class InvalidOpenTSDBFormat(OpenTSDBError): msg += ' Please provide data in %s format.' super(InvalidOpenTSDBFormat, self).__init__(msg) self.actual = actual - self.expected = expected \ No newline at end of file + self.expected = expected diff --git a/opentsdbclient/base.py b/opentsdbclient/base.py new file mode 100644 index 0000000..e841799 --- /dev/null +++ b/opentsdbclient/base.py @@ -0,0 +1,80 @@ +# Copyright 2014: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import opentsdbclient + + +class BaseOpenTSDBClient(object): + def __init__(self, opentsdb_hosts, **kwargs): + self.hosts = opentsdb_hosts + + def get_statistics(self): + """Get info about what metrics are registered and with what stats.""" + raise NotImplementedError + + def put_meter(self, meters): + """Post new meter(s) to the database. + + Meter dictionary *should* contain the following four required fields: + - metric: the name of the metric you are storing + - timestamp: a Unix epoch style timestamp in seconds or milliseconds. + The timestamp must not contain non-numeric characters. + - value: the value to record for this data point. It may be quoted or + not quoted and must conform to the OpenTSDB value rules. + - tags: a map of tag name/tag value pairs. At least one pair must be + supplied. + """ + raise NotImplementedError + + def define_retention(self, tsuid, retention_days): + """Set retention days for the defined by ID timeseries. + + ########################################################## + NOTE: currently not working directly through the REST API. + that should be done directly on the HBase level. + ########################################################## + + :param tsuid: hexadecimal representation of the timeseries UID + :param retention_days: number of days of data points to retain for the + given timeseries. When set to 0, the default, + data is retained indefinitely. + """ + raise NotImplementedError + + def get_aggregators(self): + """Used to get the list of default aggregation functions.""" + raise NotImplementedError + + def get_version(self): + """Used to check OpenTSDB version. + + That might be needed in case of unknown bugs - this code is written + only for the 2.x REST API version, so some of the failures might refer + to the wrong OpenTSDB version installed. + """ + raise NotImplementedError + + def _check_meters(self, meters): + """Check that meters to be put are having nice format.""" + if type(meters) == dict: + meters = [meters] + for meter_dict in meters: + if (set(meter_dict.keys()) + != set(['metric', 'timestamp', 'value', 'tags'])): + raise opentsdbclient.InvalidOpenTSDBFormat( + actual=meter_dict, + expected="{'metric': , 'timestamp': , " + "'value': , 'tags': }") + return meters diff --git a/opentsdbclient/client.py b/opentsdbclient/client.py index 5eaa855..a3f8d29 100644 --- a/opentsdbclient/client.py +++ b/opentsdbclient/client.py @@ -13,109 +13,16 @@ # License for the specific language governing permissions and limitations # under the License. -import json - -import requests - import opentsdbclient -from opentsdbclient import utils +from opentsdbclient.rest import client as rest_cl +from opentsdbclient.socket import client as socket_cl -class OpenTSDBClient(object): - def __init__(self, opentsdb_host, opentsdb_port): - self.host = opentsdb_host - self.port = opentsdb_port - - def get_statistics(self): - """Get info about what metrics are registered and with what stats.""" - req = requests.get(utils.STATS_TEMPL % {'host': self.host, - 'port': self.port}) - return req - - def put_meter(self, meters): - """Post new meter(s) to the database. - - Meter dictionary *should* contain the following four required fields: - - metric: the name of the metric you are storing - - timestamp: a Unix epoch style timestamp in seconds or milliseconds. - The timestamp must not contain non-numeric characters. - - value: the value to record for this data point. It may be quoted or - not quoted and must conform to the OpenTSDB value rules. - - tags: a map of tag name/tag value pairs. At least one pair must be - supplied. - """ - res = [] - if type(meters) == dict: - meters = [meters] - for meter_dict in meters: - if (set(meter_dict.keys()) - != set(['metric', 'timestamp', 'value', 'tags'])): - raise opentsdbclient.InvalidOpenTSDBFormat( - actual=meter_dict, - expected="{'metric': , 'timestamp': , " - "'value': , 'tags': }") - - req = requests.post(utils.PUT_TEMPL % - {'host': self.host, 'port': self.port}, - data=json.dumps(meter_dict)) - res.append(req) - return res - - def define_retention(self, tsuid, retention_days): - """Set retention days for the defined by ID timeseries. - - ########################################################## - NOTE: currently not working directly through the REST API. - that should be done directly on the HBase level. - ########################################################## - - :param tsuid: hexadecimal representation of the timeseries UID - :param retention_days: number of days of data points to retain for the - given timeseries. When set to 0, the default, - data is retained indefinitely. - """ - meta_data = {'tsuid': tsuid, 'retention': retention_days} - req = requests.post(utils.META_TEMPL % {'host': self.host, - 'port': self.port, - 'tsuid': tsuid}, - data=json.dumps(meta_data)) - return req - - def get_aggregators(self): - """Used to get the list of default aggregation functions.""" - req = requests.get(utils.AGGR_TEMPL % {'host': self.host, - 'port': self.port}) - return req - - def get_version(self): - """Used to check OpenTSDB version. - - That might be needed in case of unknown bugs - this code is written - only for the 2.x REST API version, so some of the failures might refer - to the wrong OpenTSDB version installed. - """ - req = requests.get(utils.VERSION_TEMPL % {'host': self.host, - 'port': self.port}) - return req - - def _make_query(self, query, verb): - meth = getattr(requests, verb.lower(), None) - if meth is None: - pass - req = meth(utils.QUERY_TEMPL % {'host': self.host, 'port': self.port, - 'query': query}) - return req - - def get_query(self, query): - return self._make_query(query, 'get') - - def process_response(self, resp): - try: - res = json.loads(resp.text) - except Exception: - raise opentsdbclient.OpenTSDBError(resp.text) - - if 'errors' in res: - raise opentsdbclient.OpenTSDBError(res['error']) - - return res +def get_client(hosts, protocol='rest', **kwargs): + if protocol == 'rest': + return rest_cl.RESTOpenTSDBClient(hosts, **kwargs) + elif protocol == 'socket': + return socket_cl.SocketOpenTSDBClient(hosts, **kwargs) + else: + raise opentsdbclient.OpenTSDBError('No %s protocol to communicate with' + 'OpenTSDB implemented.' % protocol) diff --git a/opentsdbclient/rest/__init__.py b/opentsdbclient/rest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/opentsdbclient/rest/client.py b/opentsdbclient/rest/client.py new file mode 100644 index 0000000..b50e6c0 --- /dev/null +++ b/opentsdbclient/rest/client.py @@ -0,0 +1,113 @@ +# Copyright 2014: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json + +import requests + +import opentsdbclient +from opentsdbclient import base +from opentsdbclient.rest import utils + + +class RESTOpenTSDBClient(base.BaseOpenTSDBClient): + + def get_statistics(self): + """Get info about what metrics are registered and with what stats.""" + req = requests.get(utils.STATS_TEMPL % {'host': self.hosts[0][0], + 'port': self.hosts[0][1]}) + return req + + def put_meter(self, meters): + """Post new meter(s) to the database. + + Meter dictionary *should* contain the following four required fields: + - metric: the name of the metric you are storing + - timestamp: a Unix epoch style timestamp in seconds or milliseconds. + The timestamp must not contain non-numeric characters. + - value: the value to record for this data point. It may be quoted or + not quoted and must conform to the OpenTSDB value rules. + - tags: a map of tag name/tag value pairs. At least one pair must be + supplied. + """ + res = [] + meters = self._check_meters(meters) + for meter_dict in meters: + req = requests.post(utils.PUT_TEMPL % + {'host': self.hosts[0][0], + 'port': self.hosts[0][1]}, + data=json.dumps(meter_dict)) + res.append(req) + return res + + def define_retention(self, tsuid, retention_days): + """Set retention days for the defined by ID timeseries. + + ########################################################## + NOTE: currently not working directly through the REST API. + that should be done directly on the HBase level. + ########################################################## + + :param tsuid: hexadecimal representation of the timeseries UID + :param retention_days: number of days of data points to retain for the + given timeseries. When set to 0, the default, + data is retained indefinitely. + """ + meta_data = {'tsuid': tsuid, 'retention': retention_days} + req = requests.post(utils.META_TEMPL % {'host': self.hosts[0][0], + 'port': self.hosts[0][1], + 'tsuid': tsuid}, + data=json.dumps(meta_data)) + return req + + def get_aggregators(self): + """Used to get the list of default aggregation functions.""" + req = requests.get(utils.AGGR_TEMPL % {'host': self.hosts[0][0], + 'port': self.hosts[0][1]}) + return req + + def get_version(self): + """Used to check OpenTSDB version. + + That might be needed in case of unknown bugs - this code is written + only for the 2.x REST API version, so some of the failures might refer + to the wrong OpenTSDB version installed. + """ + req = requests.get(utils.VERSION_TEMPL % {'host': self.hosts[0][0], + 'port': self.hosts[0][1]}) + return req + + def _make_query(self, query, verb): + meth = getattr(requests, verb.lower(), None) + if meth is None: + pass + req = meth(utils.QUERY_TEMPL % {'host': self.hosts[0][0], + 'port': self.hosts[0][1], + 'query': query}) + return req + + def get_query(self, query): + return self._make_query(query, 'get') + + def process_response(self, resp): + try: + res = json.loads(resp.text) + except Exception: + raise opentsdbclient.OpenTSDBError(resp.text) + + if 'errors' in res: + raise opentsdbclient.OpenTSDBError(res['error']) + + return res diff --git a/opentsdbclient/utils.py b/opentsdbclient/rest/utils.py similarity index 100% rename from opentsdbclient/utils.py rename to opentsdbclient/rest/utils.py diff --git a/opentsdbclient/socket/__init__.py b/opentsdbclient/socket/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/opentsdbclient/socket/client.py b/opentsdbclient/socket/client.py new file mode 100644 index 0000000..d7ae068 --- /dev/null +++ b/opentsdbclient/socket/client.py @@ -0,0 +1,221 @@ +# Copyright 2014: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import itertools +import logging +import random +import socket +import time + +import six + +from opentsdbclient import base + +LOG = logging.getLogger('opentsdb_client') + + +class SocketOpenTSDBClient(base.BaseOpenTSDBClient): + + def __init__(self, opentsdb_hosts, conn_verify_trusted_time=60, + reconnect_interval=0, send_queue_max_size=1000, + max_uncaught_exceptions=100, wait_retry=False): + super(SocketOpenTSDBClient, self).__init__(opentsdb_hosts) + self.send_queue = [] + self.tsd = None + self.host = None + self.port = None + self.blacklisted_hosts = set() + self.current_tsd_host = -1 + self.last_verify = 0 + self.time_reconnect = 0 + self.conn_verify_trusted_time = conn_verify_trusted_time + self.reconnect_interval = reconnect_interval + self.send_queue_max_size = send_queue_max_size + self.max_uncaught_exceptions = max_uncaught_exceptions + self.wait_retry = wait_retry + + def blacklist_tsd_host(self): + """Marks the current TSD host we're trying to use as blacklisted.""" + LOG.info('Blacklisting %s:%s for a while', self.host, self.port) + self.blacklisted_hosts.add((self.host, self.port)) + + def verify_connection(self): + """Is used to check is socket connection is actually working.""" + if self.tsd is None: + return False + + # if last verifying was not so long ago, let's trust this connection + if self.last_verify > time.time() - self.conn_verify_trusted_time: + return True + + # if it's time to reconnect, let's close current one + if (self.reconnect_interval > 0 and + self.time_reconnect < time.time() - self.reconnect_interval): + try: + self.tsd.close() + except socket.error: + pass + self.time_reconnect = time.time() + return False + + LOG.debug('Verifying our TSD connection is alive') + try: + # this request is *really* light-weighted, good thing to check the + # connectivity + self.tsd.sendall('version\n') + except socket.error: + self.tsd = None + self.blacklist_tsd_host() + return False + + bufsize = 4096 + # read some ^^ data from socket connection to make sure it's *really* + # alive + try: + buf = self.tsd.recv(bufsize) + except socket.error: + self.tsd = None + self.blacklist_tsd_host() + return False + if not buf: + self.tsd = None + self.blacklist_tsd_host() + return False + + self.last_verify = time.time() + return True + + def pick_connection(self): + """Picks up a random host/port connection.""" + for self.current_tsd_host in range(self.current_tsd_host + 1, + len(self.hosts)): + host_port = self.hosts[self.current_tsd_host] + if host_port not in self.blacklisted_hosts: + break + else: + LOG.info('No more healthy hosts, ' + 'retry with previously blacklisted') + random.shuffle(self.hosts) + self.blacklisted_hosts.clear() + self.current_tsd_host = 0 + host_port = self.hosts[self.current_tsd_host] + + self.host, self.port = host_port + LOG.info('Selected connection: %s:%d', self.host, self.port) + + def maintain_connection(self): + while True: + if self.verify_connection(): + return + + # that's just a hack to sleep some time if OpenTSDB is somehow + # maintained at the moment + if self.wait_retry: + try_delay = random.randint(60, 360) + LOG.debug('SenderThread blocking %0.2f seconds', try_delay) + time.sleep(try_delay) + + # now actually try the connection + self.pick_connection() + + try: + addresses = socket.getaddrinfo(self.host, self.port, + socket.AF_UNSPEC, + socket.SOCK_STREAM, 0) + except socket.gaierror as e: + # Don't croak on transient DNS resolution issues. + if e[0] in (socket.EAI_AGAIN, socket.EAI_NONAME, + socket.EAI_NODATA): + LOG.debug('DNS resolution failure: %s: %s', self.host, e) + continue + raise + for family, socket_type, proto, canon_name, sock_addr in addresses: + try: + self.tsd = socket.socket(family, socket_type, proto) + self.tsd.settimeout(15) + self.tsd.connect(sock_addr) + # if we get here it connected + LOG.debug('Connection to %s was successful' + % (str(sock_addr))) + break + except socket.error as e: + LOG.warning('Connection attempt failed to %s:%d: %s', + self.host, self.port, e) + self.tsd.close() + self.tsd = None + if not self.tsd: + LOG.error('Failed to connect to %s:%d', self.host, self.port) + self.blacklist_tsd_host() + + def put_meter(self, meters): + """Post new meter(s) to the database. + + Meter dictionary *should* contain the following four required fields: + - metric: the name of the metric you are storing + - timestamp: a Unix epoch style timestamp in seconds or milliseconds. + The timestamp must not contain non-numeric characters. + - value: the value to record for this data point. It may be quoted or + not quoted and must conform to the OpenTSDB value rules. + - tags: a map of tag name/tag value pairs. At least one pair must be + supplied. + """ + + # put meter to the send_queue and check if it's time to send it to the + # OpenTSDB + meters = self._check_meters(meters) + self.send_queue = list(itertools.chain(self.send_queue, meters)) + + if len(self.send_queue) <= self.send_queue_max_size: + return + + self.maintain_connection() + + errors = 0 + try: + self.send_data() + except (ArithmeticError, EOFError, EnvironmentError, LookupError, + ValueError): + errors += 1 + if errors > self.max_uncaught_exceptions: + raise + LOG.exception('Uncaught exception while trying to send meters, ' + 'ignoring') + except Exception: + LOG.exception('Uncaught exception in while trying to send meters, ' + 'going to raise. Max number %s of uncaught errors ' + 'has been collected' % self.max_uncaught_exceptions) + raise + + def compose_line_from_meter(self, meter_dict): + tags = meter_dict.pop('tags') + tags = ''.join(' %s=%s' % (k, v) for k, v in six.iteritems(tags)) + line = '%(metric)s %(timestamp)d %(value)s' % meter_dict + return '%(metric)s%(tags)s' % {'metric': line, 'tags': tags} + + def send_data(self): + req = ''.join("put %s\n" % self.compose_line_from_meter(meter_dict) + for meter_dict in self.send_queue) + + try: + self.tsd.sendall(req) + self.send_queue = [] + except socket.error as e: + LOG.error('failed to send data: %s', e) + try: + self.tsd.close() + except socket.error: + pass + self.tsd = None + self.blacklist_tsd_host() diff --git a/opentsdbclient/tests/test_client.py b/opentsdbclient/tests/test_client.py index fe96882..e1e5b57 100644 --- a/opentsdbclient/tests/test_client.py +++ b/opentsdbclient/tests/test_client.py @@ -13,54 +13,16 @@ # License for the specific language governing permissions and limitations # under the License. -import json - -import mock -import requests - from opentsdbclient import client +from opentsdbclient.rest import client as rest_cl +from opentsdbclient.socket import client as socket_cl from opentsdbclient import tests class ClientTest(tests.BaseTestCase): - def setUp(self): - super(ClientTest, self).setUp() - self.client = client.OpenTSDBClient(opentsdb_host=self.host, - opentsdb_port=self.port) + def test_get_client(self): + r_cl = client.get_client((self.host, self.port), protocol='rest') + s_cl = client.get_client((self.host, self.port), protocol='socket') - @mock.patch.object(requests, 'get') - def test_get_statistics(self, get_mock): - self.client.get_statistics() - get_mock.assert_called_once_with('http://127.0.0.1:4242/api/stats') - - @mock.patch.object(requests, 'post') - def test_put_meter(self, post_mock): - put_dict = {'metric': 'bla', 'timestamp': '0', - 'value': 123, 'tags': {'some_tag': 'foo'}} - self.client.put_meter(put_dict) - post_mock.assert_called_once_with( - 'http://127.0.0.1:4242/api/put?details', data=json.dumps(put_dict)) - - @mock.patch.object(requests, 'post') - def test_define_retention(self, post_mock): - self.client.define_retention('foo', 12) - post_mock.assert_called_once_with( - 'http://127.0.0.1:4242/api/uid/tsmeta?tsuid=foo', - data='{"tsuid": "foo", "retention": 12}') - - @mock.patch.object(requests, 'get') - def test_get_aggregators(self, get_mock): - self.client.get_aggregators() - get_mock.assert_called_once_with( - 'http://127.0.0.1:4242/api/aggregators') - - @mock.patch.object(requests, 'get') - def test_get_version(self, get_mock): - self.client.get_version() - get_mock.assert_called_once_with('http://127.0.0.1:4242/api/version') - - @mock.patch.object(requests, 'get') - def test_get_query(self, get_mock): - self.client.get_query('start=0&end=12&m=max:2-min:bla') - get_mock.assert_called_once_with( - 'http://127.0.0.1:4242/api/query?start=0&end=12&m=max:2-min:bla') + self.assertIsInstance(r_cl, rest_cl.RESTOpenTSDBClient) + self.assertIsInstance(s_cl, socket_cl.SocketOpenTSDBClient) diff --git a/opentsdbclient/tests/test_rest_client.py b/opentsdbclient/tests/test_rest_client.py new file mode 100644 index 0000000..f7fb0b1 --- /dev/null +++ b/opentsdbclient/tests/test_rest_client.py @@ -0,0 +1,66 @@ +# Copyright 2014: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json + +import mock +import requests + +from opentsdbclient.rest import client +from opentsdbclient import tests + + +class RESTClientTest(tests.BaseTestCase): + def setUp(self): + super(RESTClientTest, self).setUp() + self.client = client.RESTOpenTSDBClient(opentsdb_hosts=[(self.host, + self.port)]) + + @mock.patch.object(requests, 'get') + def test_get_statistics(self, get_mock): + self.client.get_statistics() + get_mock.assert_called_once_with('http://127.0.0.1:4242/api/stats') + + @mock.patch.object(requests, 'post') + def test_put_meter(self, post_mock): + put_dict = {'metric': 'bla', 'timestamp': '0', + 'value': 123, 'tags': {'some_tag': 'foo'}} + self.client.put_meter(put_dict) + post_mock.assert_called_once_with( + 'http://127.0.0.1:4242/api/put?details', data=json.dumps(put_dict)) + + @mock.patch.object(requests, 'post') + def test_define_retention(self, post_mock): + self.client.define_retention('foo', 12) + post_mock.assert_called_once_with( + 'http://127.0.0.1:4242/api/uid/tsmeta?tsuid=foo', + data='{"tsuid": "foo", "retention": 12}') + + @mock.patch.object(requests, 'get') + def test_get_aggregators(self, get_mock): + self.client.get_aggregators() + get_mock.assert_called_once_with( + 'http://127.0.0.1:4242/api/aggregators') + + @mock.patch.object(requests, 'get') + def test_get_version(self, get_mock): + self.client.get_version() + get_mock.assert_called_once_with('http://127.0.0.1:4242/api/version') + + @mock.patch.object(requests, 'get') + def test_get_query(self, get_mock): + self.client.get_query('start=0&end=12&m=max:2-min:bla') + get_mock.assert_called_once_with( + 'http://127.0.0.1:4242/api/query?start=0&end=12&m=max:2-min:bla') diff --git a/opentsdbclient/tests/test_socket_client.py b/opentsdbclient/tests/test_socket_client.py new file mode 100644 index 0000000..9b02d67 --- /dev/null +++ b/opentsdbclient/tests/test_socket_client.py @@ -0,0 +1,84 @@ +# Copyright 2014: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import socket +import time + +import mock + +from opentsdbclient.socket import client +from opentsdbclient import tests + + +class SocketClientTest(tests.BaseTestCase): + def setUp(self): + super(SocketClientTest, self).setUp() + self.client = client.SocketOpenTSDBClient(opentsdb_hosts=[(self.host, + self.port)], + send_queue_max_size=1) + self.client.tsd = mock.MagicMock() + self.client.tsd.sendall = mock.MagicMock() + self.meters = [{'metric': 'bla1', 'timestamp': 12345, + 'value': 123, 'tags': {'some_tag': 'foo'}}, + {'metric': 'bla2', 'timestamp': 23456, + 'value': 123, 'tags': {'some_tag': 'foo'}}] + + def test_put_meter(self): + self.client.maintain_connection = mock.MagicMock() + + self.client.put_meter(self.meters) + self.client.tsd.sendall.assert_called_once_with( + 'put bla1 12345 123 some_tag=foo\n' + 'put bla2 23456 123 some_tag=foo\n') + + def test_send_data(self): + self.client.send_queue = self.meters + self.client.send_data() + self.client.tsd.sendall.assert_called_once_with( + 'put bla1 12345 123 some_tag=foo\n' + 'put bla2 23456 123 some_tag=foo\n') + + def test_compose_line_from_meter(self): + res = self.client.compose_line_from_meter(self.meters[0]) + self.assertEqual('bla1 12345 123 some_tag=foo', res) + + @mock.patch.object(socket, 'getaddrinfo') + @mock.patch.object(socket, 'socket') + def test_maintain_connection(self, sock_mock, sock_addr_mock): + self.client.verify_connection = mock.MagicMock() + pop_list = [True, False] + self.client.verify_connection.side_effect = lambda: pop_list.pop() + sock_addr_mock.side_effect = lambda a, b, c, d, e: [(1, 2, 3, 4, 5)] + tsd = mock.MagicMock() + tsd.connect = mock.MagicMock() + sock_mock.side_effect = tsd + self.client.maintain_connection() + sock_addr_mock.assert_called_once_with(self.host, self.port, + socket.AF_UNSPEC, + socket.SOCK_STREAM, 0) + sock_mock.assert_called_once_with(1, 2, 3) + self.client.tsd.connect.assert_called_once_with(5) + + def test_verify_connection_non_tsd(self): + self.client.tsd = None + self.assertFalse(self.client.verify_connection()) + + def test_verify_connection_tsd(self): + self.assertTrue(self.client.verify_connection()) + + def test_verify_connection_tsd_reconnect(self): + self.client.last_verify = time.time() - 3600 + self.client.reconnect_interval = 1 + self.assertFalse(self.client.verify_connection()) diff --git a/requirements.txt b/requirements.txt index c79006a..578c2b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ pbr>=0.6,!=0.7,<1.0 requests>=1.1 +six>=1.7.0