monasca-agent/tests_to_fix/test_redis.py

176 lines
5.8 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Redis check tests.
"""
import logging
import os
import unittest
import subprocess
import time
import pprint
import redis
from tests.common import load_check
from nose.plugins.skip import SkipTest
logger = logging.getLogger()
MAX_WAIT = 20
NOAUTH_PORT = 16379
AUTH_PORT = 26379
DEFAULT_PORT = 6379
MISSING_KEY_TOLERANCE = 0.5
class TestRedis(unittest.TestCase):
def is_travis(self):
global logger
logger.info("Running on travis-ci")
return "TRAVIS" in os.environ
def wait4(self, p, pattern):
"""Waits until a specific pattern shows up in the stdout
"""
out = p.stdout
loop = 0
while True:
l = out.readline()
if l.find(pattern) > -1:
break
else:
time.sleep(0.1)
loop += 1
if loop >= MAX_WAIT:
break
def setUp(self):
raise SkipTest("Requires Redis installed")
if not self.is_travis():
self.redis_noauth = subprocess.Popen(
["redis-server", "tests/redisnoauth.cfg"], stdout=subprocess.PIPE)
self.wait4(self.redis_noauth, "The server is now ready to accept connections")
self.redis_auth = subprocess.Popen(
["redis-server", "tests/redisauth.cfg"], stdout=subprocess.PIPE)
self.wait4(self.redis_auth, "The server is now ready to accept connections")
def tearDown(self):
if not self.is_travis():
self.redis_noauth.terminate()
self.redis_auth.terminate()
def test_redis_auth(self):
# Test connection with password
if not self.is_travis():
# correct password
r = load_check('redisdb', {}, {})
instance = {
'host': 'localhost',
'port': AUTH_PORT,
'password': 'datadog-is-devops-best-friend'
}
r.check(instance)
metrics = self._sort_metrics(r.get_metrics())
assert len(metrics) > 0, "No metrics returned"
# wrong passwords
instances = [
{
'host': 'localhost',
'port': AUTH_PORT,
'password': ''
},
{
'host': 'localhost',
'port': AUTH_PORT,
'password': 'badpassword'
}
]
for instance in instances:
r = load_check('redisdb', {}, {})
r.check(instance)
metrics = self._sort_metrics(r.get_metrics())
assert len(
metrics) == 0, "Should have failed with bad password; got %s instead" % metrics
def test_redis_default(self):
# Base test, uses the noauth instance
if self.is_travis():
port = DEFAULT_PORT
else:
port = NOAUTH_PORT
instance = {
'host': 'localhost',
'port': port
}
db = redis.Redis(port=port, db=14) # Datadog's test db
db.flushdb()
db.set("key1", "value")
db.set("key2", "value")
db.setex("expirekey", "expirevalue", 1000)
r = load_check('redisdb', {}, {})
r.check(instance)
metrics = self._sort_metrics(r.get_metrics())
assert metrics, "No metrics returned"
# Assert we have values, timestamps and dimensions for each metric.
for m in metrics:
assert isinstance(m[1], int) # timestamp
assert isinstance(m[2], (int, float, long)) # value
dimensions = m[3]["dimensions"]
expected_dimensions = {"redis_host": "localhost", "redis_port": port}
for e in expected_dimensions:
assert e in dimensions
def assert_key_present(expected, present, tolerance):
"Assert we have the rest of the keys (with some tolerance for missing keys)"
e = set(expected)
p = set(present)
assert len(e - p) < tolerance * len(e), pprint.pformat((p, e - p))
# gauges collected?
remaining_keys = [m[0] for m in metrics]
expected = r.GAUGE_KEYS.values()
assert_key_present(expected, remaining_keys, MISSING_KEY_TOLERANCE)
# Assert that the keys metrics are tagged by db. just check db0, since
# it's the only one we can guarantee is there.
db_metrics = self._sort_metrics(
[m for m in metrics if m[0] in ['redis.keys', 'redis.expires'] and "redis_db:db14" in m[3]["dimensions"]])
self.assertEqual(2, len(db_metrics))
self.assertEqual('redis.expires', db_metrics[0][0])
self.assertEqual(1, db_metrics[0][2])
self.assertEqual('redis.keys', db_metrics[1][0])
self.assertEqual(3, db_metrics[1][2])
# Run one more check and ensure we get total command count
# and other rates
time.sleep(5)
r.check(instance)
metrics = self._sort_metrics(r.get_metrics())
keys = [m[0] for m in metrics]
assert 'redis.net.commands' in keys
def _sort_metrics(self, metrics):
def sort_by(m):
return m[0], m[1], m[3]
return sorted(metrics, key=sort_by)
if __name__ == "__main__":
unittest.main()