afsmon/afsmon/tests/base.py

151 lines
5.2 KiB
Python

# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import logging
import fixtures
import select
import socket
import testtools
import threading
import time
_TRUE_VALUES = ('True', 'true', '1', 'yes')
logger = logging.getLogger("afsmon.tests.base")
class FakeStatsd(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
self.sock.bind(('', 0))
self.port = self.sock.getsockname()[1]
self.wake_read, self.wake_write = os.pipe()
self.stats = []
def run(self):
while True:
poll = select.poll()
poll.register(self.sock, select.POLLIN)
poll.register(self.wake_read, select.POLLIN)
ret = poll.poll()
for (fd, event) in ret:
if fd == self.sock.fileno():
data = self.sock.recvfrom(1024)
if not data:
return
self.stats.append(data[0])
if fd == self.wake_read:
return
def stop(self):
os.write(self.wake_write, b'1\n')
class TestCase(testtools.TestCase):
"""Test case base class for all unit tests."""
def setUp(self):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.log_fixture = self.useFixture(
fixtures.FakeLogger(level=logging.DEBUG))
self.statsd = FakeStatsd()
os.environ['STATSD_HOST'] = '127.0.0.1'
os.environ['STATSD_PORT'] = str(self.statsd.port)
self.statsd.start()
def shutdown(self):
self.statsd.stop()
self.statsd.join()
def assertReportedStat(self, key, value=None, kind=None):
"""Check statsd output
Check statsd return values. A ``value`` should specify a
``kind``, however a ``kind`` may be specified without a
``value`` for a generic match. Leave both empy to just check
for key presence.
:arg str key: The statsd key
:arg str value: The expected value of the metric ``key``
:arg str kind: The expected type of the metric ``key`` For example
- ``c`` counter
- ``g`` gauge
- ``ms`` timing
- ``s`` set
"""
if value:
self.assertNotEqual(kind, None)
start = time.time()
while time.time() < (start + 5):
# Note our fake statsd just queues up results in a queue.
# We just keep going through them until we find one that
# matches, or fail out.
for stat in self.statsd.stats:
k, v = stat.decode('utf-8').split(':')
if key == k:
if kind is None:
# key with no qualifiers is found
return True
s_value, s_kind = v.split('|')
# if no kind match, look for other keys
if kind != s_kind:
continue
if value:
# special-case value|ms because statsd can turn
# timing results into float of indeterminate
# length, hence foiling string matching.
if kind == 'ms':
if float(value) == float(s_value):
return True
if value == s_value:
return True
# otherwise keep looking for other matches
continue
# this key matches
return True
time.sleep(0.1)
raise Exception("Key %s not found in reported stats" % key)