Added unit test cases for server.py

Added unit test cases to cover all code paths in REPLICATE and __call__
functions in account/server.py and container/server.py

Change-Id: Ia335e9a6668821d3e34b12fc3a133a707880e87f
This commit is contained in:
Sivasathurappan Radhakrishnan 2015-11-06 01:49:58 +00:00
parent e1910dff17
commit ce173e9ed2
2 changed files with 188 additions and 6 deletions

View File

@ -30,12 +30,13 @@ from six import StringIO
import xml.dom.minidom
from swift import __version__ as swift_version
from swift.common.swob import Request
from swift.common.swob import (Request, WsgiBytesIO, HTTPNoContent)
from swift.common import constraints
from swift.account.server import AccountController
from swift.common.utils import normalize_timestamp, replication, public
from swift.common.utils import (normalize_timestamp, replication, public,
mkdirs, storage_directory)
from swift.common.request_helpers import get_sys_meta_prefix
from test.unit import patch_policies
from test.unit import patch_policies, debug_logger
from swift.common.storage_policy import StoragePolicy, POLICIES
@ -155,6 +156,49 @@ class TestAccountController(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 507)
def test_REPLICATE_insufficient_storage(self):
conf = {'devices': self.testdir, 'mount_check': 'true'}
self.account_controller = AccountController(conf)
def fake_check_mount(*args, **kwargs):
return False
with mock.patch("swift.common.constraints.check_mount",
fake_check_mount):
req = Request.blank('/sda1/p/suff',
environ={'REQUEST_METHOD': 'REPLICATE'},
headers={})
resp = req.get_response(self.account_controller)
self.assertEqual(resp.status_int, 507)
def test_REPLICATE_works(self):
mkdirs(os.path.join(self.testdir, 'sda1', 'account', 'p', 'a', 'a'))
db_file = os.path.join(self.testdir, 'sda1',
storage_directory('account', 'p', 'a'),
'a' + '.db')
open(db_file, 'w')
def fake_rsync_then_merge(self, drive, db_file, args):
return HTTPNoContent()
with mock.patch("swift.common.db_replicator.ReplicatorRpc."
"rsync_then_merge", fake_rsync_then_merge):
req = Request.blank('/sda1/p/a/',
environ={'REQUEST_METHOD': 'REPLICATE'},
headers={})
json_string = '["rsync_then_merge", "a.db"]'
inbuf = WsgiBytesIO(json_string)
req.environ['wsgi.input'] = inbuf
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
# check valuerror
wsgi_input_valuerror = '["sync" : sync, "-1"]'
inbuf1 = WsgiBytesIO(wsgi_input_valuerror)
req.environ['wsgi.input'] = inbuf1
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 400)
def test_HEAD_not_found(self):
# Test the case in which account does not exist (can be recreated)
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'})
@ -1690,6 +1734,52 @@ class TestAccountController(unittest.TestCase):
self.assertEqual(errbuf.getvalue(), '')
self.assertEqual(outbuf.getvalue()[:4], '405 ')
def test__call__raise_timeout(self):
inbuf = WsgiBytesIO()
errbuf = StringIO()
outbuf = StringIO()
self.logger = debug_logger('test')
self.account_controller = AccountController(
{'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'false', 'log_requests': 'false'},
logger=self.logger)
def start_response(*args):
# Sends args to outbuf
outbuf.writelines(args)
method = 'PUT'
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
'SERVER_NAME': '127.0.0.1',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'CONTENT_LENGTH': '0',
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': inbuf,
'wsgi.errors': errbuf,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False}
@public
def mock_put_method(*args, **kwargs):
raise Exception()
with mock.patch.object(self.account_controller, method,
new=mock_put_method):
response = self.account_controller.__call__(env, start_response)
self.assertTrue(response[0].startswith(
'Traceback (most recent call last):'))
self.assertEqual(self.logger.get_lines_for_level('error'), [
'ERROR __call__ error with %(method)s %(path)s : ' % {
'method': 'PUT', 'path': '/sda1/p/a/c'},
])
self.assertEqual(self.logger.get_lines_for_level('info'), [])
def test_GET_log_requests_true(self):
self.controller.logger = FakeLogger()
self.controller.log_requests = True

View File

@ -34,13 +34,15 @@ from six import BytesIO
from six import StringIO
from swift import __version__ as swift_version
from swift.common.swob import Request, HeaderKeyDict
from swift.common.swob import (Request, HeaderKeyDict,
WsgiBytesIO, HTTPNoContent)
import swift.container
from swift.container import server as container_server
from swift.common import constraints
from swift.common.utils import (Timestamp, mkdirs, public, replication,
lock_parent_directory, json)
from test.unit import fake_http_connect
storage_directory, lock_parent_directory,
json)
from test.unit import fake_http_connect, debug_logger
from swift.common.storage_policy import (POLICIES, StoragePolicy)
from swift.common.request_helpers import get_sys_meta_prefix
@ -1152,6 +1154,50 @@ class TestContainerController(unittest.TestCase):
self.assertEqual(info['x_container_sync_point1'], -1)
self.assertEqual(info['x_container_sync_point2'], -1)
def test_REPLICATE_insufficient_storage(self):
conf = {'devices': self.testdir, 'mount_check': 'true'}
self.container_controller = container_server.ContainerController(
conf)
def fake_check_mount(*args, **kwargs):
return False
with mock.patch("swift.common.constraints.check_mount",
fake_check_mount):
req = Request.blank('/sda1/p/suff',
environ={'REQUEST_METHOD': 'REPLICATE'},
headers={})
resp = req.get_response(self.container_controller)
self.assertEqual(resp.status_int, 507)
def test_REPLICATE_works(self):
mkdirs(os.path.join(self.testdir, 'sda1', 'containers', 'p', 'a', 'a'))
db_file = os.path.join(self.testdir, 'sda1',
storage_directory('containers', 'p', 'a'),
'a' + '.db')
open(db_file, 'w')
def fake_rsync_then_merge(self, drive, db_file, args):
return HTTPNoContent()
with mock.patch("swift.container.replicator.ContainerReplicatorRpc."
"rsync_then_merge", fake_rsync_then_merge):
req = Request.blank('/sda1/p/a/',
environ={'REQUEST_METHOD': 'REPLICATE'},
headers={})
json_string = '["rsync_then_merge", "a.db"]'
inbuf = WsgiBytesIO(json_string)
req.environ['wsgi.input'] = inbuf
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
# check valuerror
wsgi_input_valuerror = '["sync" : sync, "-1"]'
inbuf1 = WsgiBytesIO(wsgi_input_valuerror)
req.environ['wsgi.input'] = inbuf1
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 400)
def test_DELETE(self):
req = Request.blank(
'/sda1/p/a/c',
@ -2586,6 +2632,52 @@ class TestContainerController(unittest.TestCase):
self.assertEqual(errbuf.getvalue(), '')
self.assertEqual(outbuf.getvalue()[:4], '405 ')
def test__call__raise_timeout(self):
inbuf = WsgiBytesIO()
errbuf = StringIO()
outbuf = StringIO()
self.logger = debug_logger('test')
self.container_controller = container_server.ContainerController(
{'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'false', 'log_requests': 'false'},
logger=self.logger)
def start_response(*args):
# Sends args to outbuf
outbuf.writelines(args)
method = 'PUT'
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
'SERVER_NAME': '127.0.0.1',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'CONTENT_LENGTH': '0',
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': inbuf,
'wsgi.errors': errbuf,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False}
@public
def mock_put_method(*args, **kwargs):
raise Exception()
with mock.patch.object(self.container_controller, method,
new=mock_put_method):
response = self.container_controller.__call__(env, start_response)
self.assertTrue(response[0].startswith(
'Traceback (most recent call last):'))
self.assertEqual(self.logger.get_lines_for_level('error'), [
'ERROR __call__ error with %(method)s %(path)s : ' % {
'method': 'PUT', 'path': '/sda1/p/a/c'},
])
self.assertEqual(self.logger.get_lines_for_level('info'), [])
def test_GET_log_requests_true(self):
self.controller.logger = FakeLogger()
self.controller.log_requests = True