Adding unit tests for auth.py

implements blueprint https://blueprints.launchpad.net/python-reddwarfclient/+spec/reddwarfclient-unit-tests

Also changin the unit test library from unuttest2 to testtools per request of CI

Change-Id: I1b50cd4bf762c90b846ecb51db47f2afbfb6f175
This commit is contained in:
Deniz Demir 2012-12-11 10:00:17 -08:00 committed by denizdemir
parent 8c13466950
commit 4fb6634480
2 changed files with 416 additions and 1 deletions

414
tests/test_auth.py Normal file
View File

@ -0,0 +1,414 @@
import contextlib
from testtools import TestCase
from reddwarfclient import auth
from mock import Mock
from reddwarfclient import exceptions
"""
Unit tests for the classes and functions in auth.py.
"""
def check_url_none(test_case, auth_class):
# url is None, it must throw exception
authObj = auth_class(url=None, type=auth_class, client=None,
username=None, password=None, tenant=None)
try:
authObj.authenticate()
test_case.fail("AuthUrlNotGiven exception expected")
except exceptions.AuthUrlNotGiven:
pass
class AuthenticatorTest(TestCase):
def setUp(self):
super(AuthenticatorTest, self).setUp()
self.orig_load = auth.ServiceCatalog._load
self.orig__init = auth.ServiceCatalog.__init__
def tearDown(self):
super(AuthenticatorTest, self).tearDown()
auth.ServiceCatalog._load = self.orig_load
auth.ServiceCatalog.__init__ = self.orig__init
def test_get_authenticator_cls(self):
class_list = (auth.KeyStoneV2Authenticator,
auth.RaxAuthenticator,
auth.Auth1_1,
auth.FakeAuth)
for c in class_list:
self.assertEqual(c, auth.get_authenticator_cls(c))
class_names = {"keystone": auth.KeyStoneV2Authenticator,
"rax": auth.RaxAuthenticator,
"auth1.1": auth.Auth1_1,
"fake": auth.FakeAuth}
for cn in class_names.keys():
self.assertEqual(class_names[cn], auth.get_authenticator_cls(cn))
cls_or_name = "_unknown_"
self.assertRaises(ValueError, auth.get_authenticator_cls, cls_or_name)
def test__authenticate(self):
authObj = auth.Authenticator(Mock(), auth.KeyStoneV2Authenticator,
Mock(), Mock(), Mock(), Mock())
# test response code 200
resp = Mock()
resp.status = 200
body = "test_body"
auth.ServiceCatalog._load = Mock(return_value=1)
authObj.client._time_request = Mock(return_value=(resp, body))
sc = authObj._authenticate(Mock(), Mock())
self.assertEqual(body, sc.catalog)
# test AmbiguousEndpoints exception
auth.ServiceCatalog.__init__ = \
Mock(side_effect=exceptions.AmbiguousEndpoints)
self.assertRaises(exceptions.AmbiguousEndpoints,
authObj._authenticate, Mock(), Mock())
# test handling KeyError and raising AuthorizationFailure exception
auth.ServiceCatalog.__init__ = Mock(side_effect=KeyError)
self.assertRaises(exceptions.AuthorizationFailure,
authObj._authenticate, Mock(), Mock())
# test EndpointNotFound exception
mock = Mock(side_effect=exceptions.EndpointNotFound)
auth.ServiceCatalog.__init__ = mock
self.assertRaises(exceptions.EndpointNotFound,
authObj._authenticate, Mock(), Mock())
mock.side_effect = None
# test response code 305
resp.__getitem__ = Mock(return_value='loc')
resp.status = 305
body = "test_body"
authObj.client._time_request = Mock(return_value=(resp, body))
l = authObj._authenticate(Mock(), Mock())
self.assertEqual('loc', l)
# test any response code other than 200 and 305
resp.status = 404
exceptions.from_response = Mock(side_effect=ValueError)
self.assertRaises(ValueError, authObj._authenticate, Mock(), Mock())
def test_authenticate(self):
authObj = auth.Authenticator(Mock(), auth.KeyStoneV2Authenticator,
Mock(), Mock(), Mock(), Mock())
self.assertRaises(NotImplementedError, authObj.authenticate)
class KeyStoneV2AuthenticatorTest(TestCase):
def test_authenticate(self):
# url is None
check_url_none(self, auth.KeyStoneV2Authenticator)
# url is not None, so it must not throw exception
url = "test_url"
cls_type = auth.KeyStoneV2Authenticator
authObj = auth.KeyStoneV2Authenticator(url=url, type=cls_type,
client=None, username=None,
password=None, tenant=None)
def side_effect_func(url):
return url
mock = Mock()
mock.side_effect = side_effect_func
authObj._v2_auth = mock
r = authObj.authenticate()
self.assertEqual(url, r)
def test__v2_auth(self):
username = "reddwarf_user"
password = "reddwarf_password"
tenant = "tenant"
cls_type = auth.KeyStoneV2Authenticator
authObj = auth.KeyStoneV2Authenticator(url=None, type=cls_type,
client=None,
username=username,
password=password,
tenant=tenant)
def side_effect_func(url, body):
return body
mock = Mock()
mock.side_effect = side_effect_func
authObj._authenticate = mock
body = authObj._v2_auth(Mock())
self.assertEqual(username,
body['auth']['passwordCredentials']['username'])
self.assertEqual(password,
body['auth']['passwordCredentials']['password'])
self.assertEqual(tenant, body['auth']['tenantName'])
class Auth1_1Test(TestCase):
def test_authenticate(self):
# handle when url is None
check_url_none(self, auth.Auth1_1)
# url is not none
username = "reddwarf_user"
password = "reddwarf_password"
url = "test_url"
authObj = auth.Auth1_1(url=url,
type=auth.Auth1_1,
client=None, username=username,
password=password, tenant=None)
def side_effect_func(auth_url, body, root_key):
return auth_url, body, root_key
mock = Mock()
mock.side_effect = side_effect_func
authObj._authenticate = mock
auth_url, body, root_key = authObj.authenticate()
self.assertEqual(username, body['credentials']['username'])
self.assertEqual(password, body['credentials']['key'])
self.assertEqual(auth_url, url)
self.assertEqual('auth', root_key)
class RaxAuthenticatorTest(TestCase):
def test_authenticate(self):
# url is None
check_url_none(self, auth.RaxAuthenticator)
# url is not None, so it must not throw exception
url = "test_url"
authObj = auth.RaxAuthenticator(url=url,
type=auth.RaxAuthenticator,
client=None, username=None,
password=None, tenant=None)
def side_effect_func(url):
return url
mock = Mock()
mock.side_effect = side_effect_func
authObj._rax_auth = mock
r = authObj.authenticate()
self.assertEqual(url, r)
def test__rax_auth(self):
username = "reddwarf_user"
password = "reddwarf_password"
tenant = "tenant"
authObj = auth.RaxAuthenticator(url=None,
type=auth.RaxAuthenticator,
client=None, username=username,
password=password, tenant=tenant)
def side_effect_func(url, body):
return body
mock = Mock()
mock.side_effect = side_effect_func
authObj._authenticate = mock
body = authObj._rax_auth(Mock())
v = body['auth']['RAX-KSKEY:apiKeyCredentials']['username']
self.assertEqual(username, v)
v = body['auth']['RAX-KSKEY:apiKeyCredentials']['apiKey']
self.assertEqual(password, v)
v = body['auth']['RAX-KSKEY:apiKeyCredentials']['tenantName']
self.assertEqual(tenant, v)
class FakeAuthTest(TestCase):
def test_authenticate(self):
tenant = "tenant"
authObj = auth.FakeAuth(url=None,
type=auth.FakeAuth,
client=None, username=None,
password=None, tenant=tenant)
fc = authObj.authenticate()
public_url = "%s/%s" % ('http://localhost:8779/v1.0', tenant)
self.assertEqual(public_url, fc.get_public_url())
self.assertEqual(tenant, fc.get_token())
class ServiceCatalogTest(TestCase):
def setUp(self):
super(ServiceCatalogTest, self).setUp()
self.orig_url_for = auth.ServiceCatalog._url_for
self.orig__init__ = auth.ServiceCatalog.__init__
auth.ServiceCatalog.__init__ = Mock(return_value=None)
self.test_url = "http://localhost:1234/test"
def tearDown(self):
super(ServiceCatalogTest, self).tearDown()
auth.ServiceCatalog._url_for = self.orig_url_for
auth.ServiceCatalog.__init__ = self.orig__init__
def test__load(self):
url = "random_url"
auth.ServiceCatalog._url_for = Mock(return_value=url)
# when service_url is None
scObj = auth.ServiceCatalog()
scObj.region = None
scObj.service_url = None
scObj._load()
self.assertEqual(url, scObj.public_url)
self.assertEqual(url, scObj.management_url)
# service url is not None
service_url = "service_url"
scObj = auth.ServiceCatalog()
scObj.region = None
scObj.service_url = service_url
scObj._load()
self.assertEqual(service_url, scObj.public_url)
self.assertEqual(service_url, scObj.management_url)
def test_get_token(self):
test_id = "test_id"
scObj = auth.ServiceCatalog()
scObj.root_key = "root_key"
scObj.catalog = dict()
scObj.catalog[scObj.root_key] = dict()
scObj.catalog[scObj.root_key]['token'] = dict()
scObj.catalog[scObj.root_key]['token']['id'] = test_id
self.assertEqual(test_id, scObj.get_token())
def test_get_management_url(self):
test_mng_url = "test_management_url"
scObj = auth.ServiceCatalog()
scObj.management_url = test_mng_url
self.assertEqual(test_mng_url, scObj.get_management_url())
def test_get_public_url(self):
test_public_url = "test_public_url"
scObj = auth.ServiceCatalog()
scObj.public_url = test_public_url
self.assertEqual(test_public_url, scObj.get_public_url())
def test__url_for(self):
scObj = auth.ServiceCatalog()
# case for no endpoint found
self.case_no_endpoint_match(scObj)
# case for empty service catalog
self.case_endpoing_with_empty_catalog(scObj)
# more than one matching endpoints
self.case_ambiguous_endpoint(scObj)
# happy case
self.case_unique_endpoint(scObj)
# testing if-statements in for-loop to iterate services in catalog
self.case_iterating_services_in_catalog(scObj)
def case_no_endpoint_match(self, scObj):
# empty endpoint list
scObj.catalog = dict()
scObj.catalog['endpoints'] = list()
self.assertRaises(exceptions.EndpointNotFound, scObj._url_for)
def side_effect_func_ep(attr):
return "test_attr_value"
# simulating dict
endpoint = Mock()
mock = Mock()
mock.side_effect = side_effect_func_ep
endpoint.__getitem__ = mock
scObj.catalog['endpoints'].append(endpoint)
# not-empty list but not matching endpoint
filter_value = "not_matching_value"
self.assertRaises(exceptions.EndpointNotFound, scObj._url_for,
attr="test_attr", filter_value=filter_value)
filter_value = "test_attr_value" # so that we have an endpoint match
scObj.root_key = "access"
scObj.catalog[scObj.root_key] = dict()
self.assertRaises(exceptions.EndpointNotFound, scObj._url_for,
attr="test_attr", filter_value=filter_value)
def case_endpoing_with_empty_catalog(self, scObj):
# first, test with empty catalog, this should pass since
# there is already enpoint added
scObj.catalog[scObj.root_key]['serviceCatalog'] = list()
endpoint = scObj.catalog['endpoints'][0]
endpoint.get = Mock(return_value=self.test_url)
r_url = scObj._url_for(attr="test_attr",
filter_value="test_attr_value")
self.assertEqual(self.test_url, r_url)
def case_ambiguous_endpoint(self, scObj):
scObj.service_type = "reddwarf"
scObj.service_name = "test_service_name"
def side_effect_func_service(key):
if key == "type":
return "reddwarf"
elif key == "name":
return "test_service_name"
return None
mock1 = Mock()
mock1.side_effect = side_effect_func_service
service1 = Mock()
service1.get = mock1
endpoint2 = {"test_attr": "test_attr_value"}
service1.__getitem__ = Mock(return_value=[endpoint2])
scObj.catalog[scObj.root_key]['serviceCatalog'] = [service1]
self.assertRaises(exceptions.AmbiguousEndpoints, scObj._url_for,
attr="test_attr", filter_value="test_attr_value")
def case_unique_endpoint(self, scObj):
# changing the endpoint2 attribute to pass the filter
service1 = scObj.catalog[scObj.root_key]['serviceCatalog'][0]
endpoint2 = service1[0][0]
endpoint2["test_attr"] = "new value not matching filter"
r_url = scObj._url_for(attr="test_attr",
filter_value="test_attr_value")
self.assertEqual(self.test_url, r_url)
def case_iterating_services_in_catalog(self, scObj):
service1 = scObj.catalog[scObj.root_key]['serviceCatalog'][0]
scObj.catalog = dict()
scObj.root_key = "access"
scObj.catalog[scObj.root_key] = dict()
scObj.service_type = "no_match"
scObj.catalog[scObj.root_key]['serviceCatalog'] = [service1]
self.assertRaises(exceptions.EndpointNotFound, scObj._url_for)
scObj.service_type = "reddwarf"
scObj.service_name = "no_match"
self.assertRaises(exceptions.EndpointNotFound, scObj._url_for)
# no endpoints and no 'serviceCatalog' in catalog => raise exception
scObj = auth.ServiceCatalog()
scObj.catalog = dict()
scObj.root_key = "access"
scObj.catalog[scObj.root_key] = dict()
scObj.catalog[scObj.root_key]['serviceCatalog'] = []
self.assertRaises(exceptions.EndpointNotFound, scObj._url_for,
attr="test_attr", filter_value="test_attr_value")

View File

@ -4,4 +4,5 @@ openstack.nose_plugin>=0.11
pep8==1.1
sphinx>=1.1.2
unittest2>=0.5.1
testtools
testtools
mock>=1.0.1