clean up tests and add overriden time method to utils

This commit is contained in:
root 2010-12-15 11:23:33 -08:00
parent b027903012
commit dd4ee43cc2
4 changed files with 60 additions and 27 deletions

View File

@ -79,15 +79,14 @@ class Lockout(wsgi.Middleware):
sneak in before the lockout hits, but this is extremely rare and would
only result in a couple of extra failed attempts."""
def __init__(self, application, time_fn=None):
"""middleware can pass a custom time function to fake for testing."""
def __init__(self, application):
"""middleware can use fake for testing."""
if FLAGS.lockout_memcached_servers:
import memcache
self.mc = memcache.Client(FLAGS.lockout_memcached_servers,
debug=0)
else:
from nova import fakememcache
self.mc = fakememcache.Client(time_fn=time_fn)
from nova import fakememcache as memcache
self.mc = memcache.Client(FLAGS.lockout_memcached_servers,
debug=0)
super(Lockout, self).__init__(application)
@webob.dec.wsgify

View File

@ -18,21 +18,20 @@
"""Super simple fake memcache client."""
import time
import utils
class Client(object):
"""Replicates a tiny subset of memcached client interface."""
def __init__(self, time_fn=time.time, *args, **kwargs):
"""Time fn is to allow testing through a custom function"""
self.time_fn = time_fn
def __init__(self, *args, **kwargs):
"""Ignores the passed in args"""
self.cache = {}
def get(self, key):
"""Retrieves the value for a key or None."""
(timeout, value) = self.cache.get(key, (0, None))
if timeout == 0 or self.time_fn() < timeout:
if timeout == 0 or utils.utcnow_ts() < timeout:
return value
return None
@ -40,7 +39,7 @@ class Client(object):
"""Sets the value for a key."""
timeout = 0
if time != 0:
timeout = self.time_fn() + time
timeout = utils.utcnow_ts() + time
self.cache[key] = (timeout, value)
return True

View File

@ -16,6 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import webob
import webob.dec
import webob.exc
@ -23,6 +24,7 @@ import webob.exc
from nova.api import ec2
from nova import flags
from nova import test
from nova import utils
FLAGS = flags.FLAGS
@ -39,14 +41,13 @@ def conditional_forbid(req):
class LockoutTestCase(test.TrialTestCase):
"""Test case for the Lockout middleware."""
def setUp(self): # pylint: disable-msg=C0103
self.local_time = 0
self.lockout = ec2.Lockout(conditional_forbid,
time_fn=self._constant_time)
super(LockoutTestCase, self).setUp()
utils.set_time_override()
self.lockout = ec2.Lockout(conditional_forbid)
def _constant_time(self):
"""Helper method to force timeouts."""
return self.local_time
def tearDown(self): # pylint: disable-msg=C0103
utils.clear_time_override()
super(LockoutTestCase, self).tearDown()
def _send_bad_attempts(self, access_key, num_attempts=1):
"""Fail x."""
@ -59,10 +60,6 @@ class LockoutTestCase(test.TrialTestCase):
req = webob.Request.blank('/?AWSAccessKeyId=%s' % access_key)
return (req.get_response(self.lockout).status_int == 403)
def _advance_time(self, time):
"""Increment time to 1 second past the lockout."""
self.local_time = self.local_time + time
def test_lockout(self):
self._send_bad_attempts('test', FLAGS.lockout_attempts)
self.assertTrue(self._is_locked_out('test'))
@ -70,20 +67,20 @@ class LockoutTestCase(test.TrialTestCase):
def test_timeout(self):
self._send_bad_attempts('test', FLAGS.lockout_attempts)
self.assertTrue(self._is_locked_out('test'))
self._advance_time(FLAGS.lockout_minutes * 60)
utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
self.assertFalse(self._is_locked_out('test'))
def test_multiple_keys(self):
self._send_bad_attempts('test1', FLAGS.lockout_attempts)
self.assertTrue(self._is_locked_out('test1'))
self.assertFalse(self._is_locked_out('test2'))
self._advance_time(FLAGS.lockout_minutes * 60)
utils.advance_time_seconds(FLAGS.lockout_minutes * 60)
self.assertFalse(self._is_locked_out('test1'))
self.assertFalse(self._is_locked_out('test2'))
def test_window_timeout(self):
self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
self.assertFalse(self._is_locked_out('test'))
self._advance_time(FLAGS.lockout_window * 60)
utils.advance_time_seconds(FLAGS.lockout_window * 60)
self._send_bad_attempts('test', FLAGS.lockout_attempts - 1)
self.assertFalse(self._is_locked_out('test'))

View File

@ -21,7 +21,6 @@ System-level utilities and helper functions.
"""
import datetime
import functools
import inspect
import logging
import os
@ -29,6 +28,7 @@ import random
import subprocess
import socket
import sys
import time
from xml.sax import saxutils
from twisted.internet.threads import deferToThread
@ -165,13 +165,51 @@ def get_my_ip():
return "127.0.0.1"
def utcnow():
"""Overridable version of datetime.datetime.utcnow."""
if utcnow.override_time:
return utcnow.override_time
return datetime.datetime.utcnow()
utcnow.override_time = None
def utcnow_ts():
"""Timestamp version of our utcnow function."""
return time.mktime(utcnow().timetuple())
def set_time_override(override_time=datetime.datetime.utcnow()):
"""Override utils.utcnow to return a constant time."""
utcnow.override_time = override_time
def advance_time_delta(timedelta):
"""Advance overriden time using a datetime.timedelta."""
assert(not utcnow.override_time is None)
utcnow.override_time += timedelta
def advance_time_seconds(seconds):
"""Advance overriden time by seconds."""
advance_time_delta(datetime.timedelta(0, seconds))
def clear_time_override():
"""Remove the overridden time."""
utcnow.override_time = None
def isotime(at=None):
"""Returns iso formatted utcnow."""
if not at:
at = datetime.datetime.utcnow()
at = utcnow()
return at.strftime(TIME_FORMAT)
def parse_isotime(timestr):
"""Turn an iso formatted time back into a datetime"""
return datetime.datetime.strptime(timestr, TIME_FORMAT)