Prepare for auth plugins

In order to get ready for auth plugins I kind of needed to
turn auth plugins on their head.  The goals:

* The plugin should know what to construct based on what the
  user provides.  If a token is provided, use it.
* If a token is provided and it expires, allow the user to reauth
  with user name and password without the user having to do
  anything special.
* Support auth plugin names like identity_v2 and identity_v3 where
  the user does not need to know exactly what method they are
  using.
* Keep is simple.

In the simple department, I think this makes it simplier for the
user.

Change-Id: I7778438930bad2a32f79189b46d4fe7dbfbb67f6
This commit is contained in:
Terry Howe 2014-09-09 08:41:02 -06:00
parent b027df1f6c
commit 549301b24b
9 changed files with 354 additions and 304 deletions

View File

@ -69,3 +69,13 @@ class BaseAuthPlugin(object):
If nothing happens returns False to indicate give up.
"""
return False
@classmethod
def get_options(cls):
"""Return the list of parameters associated with the auth plugin.
This list may be used to generate arguments.
:returns list: A list of strings describing plugin parameters.
"""
return []

View File

@ -46,26 +46,24 @@ def create(username=None, password=None, token=None, auth_url=None,
version = version.lower().replace('v', '')
version = version.split('.')[0]
if version == '3':
if not token:
args = {'username': username, 'password': password}
if project_name:
args['project_name'] = project_name
if domain_name:
args['domain_name'] = domain_name
if project_domain_name:
args['project_domain_name'] = project_domain_name
if user_domain_name:
args['user_domain_name'] = user_domain_name
return v3.Password(auth_url, **args)
else:
return v3.Token(auth_url, token=token)
args = {'user_name': username, 'password': password}
if project_name:
args['project_name'] = project_name
if domain_name:
args['domain_name'] = domain_name
if project_domain_name:
args['project_domain_name'] = project_domain_name
if user_domain_name:
args['user_domain_name'] = user_domain_name
if token:
args['token'] = token
return v3.Auth(auth_url, **args)
elif version == '2':
if not token:
args = {}
if project_name:
args['tenant_name'] = project_name
return v2.Password(auth_url, username, password, **args)
else:
return v2.Token(auth_url, token)
args = {'user_name': username, 'password': password}
if project_name:
args['project_name'] = project_name
if token:
args['token'] = token
return v2.Auth(auth_url, **args)
msg = ("No support for identity version: %s" % version)
raise exceptions.NoMatchingPlugin(msg)

View File

@ -70,6 +70,7 @@ class BaseIdentityPlugin(base.BaseAuthPlugin):
if self.access_info.will_expire_soon(self.BEST_BEFORE_SECONDS):
# if it's about to expire we should re-authenticate now.
self.invalidate()
return True
# otherwise it's fine and use the existing one.

View File

@ -12,11 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from openstack.auth import access
from openstack.auth.identity import base
from openstack import exceptions
@ -24,30 +21,59 @@ from openstack import exceptions
_logger = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Auth(base.BaseIdentityPlugin):
valid_options = [
'auth_url',
'user_name',
'user_id',
'password',
'project_id',
'project_name',
'reauthenticate',
'token',
'trust_id',
]
def __init__(self, auth_url,
trust_id=None,
tenant_id=None,
tenant_name=None,
reauthenticate=True):
user_name=None,
user_id=None,
password='',
token=None,
project_id=None,
project_name=None,
reauthenticate=True,
trust_id=None):
"""Construct an Identity V2 Authentication Plugin.
A user_name, user_id or token must be provided.
:param string auth_url: Identity service endpoint for authorization.
:param string user_name: Username for authentication.
:param string user_id: User ID for authentication.
:param string password: Password for authentication.
:param string project_id: Tenant ID for project scoping.
:param string project_name: Tenant name for project scoping.
:param bool reauthenticate: Get new token if token expires.
:param string token: Existing token for authentication.
:param string trust_id: Trust ID for trust scoping.
:param string tenant_id: Tenant ID for project scoping.
:param string tenant_name: Tenant name for project scoping.
:param bool reauthenticate: Allow fetching a new token if the current
one is going to expire.
(optional) default True
:raises TypeError: if a user_id, user_name or token is not provided.
"""
super(Auth, self).__init__(auth_url=auth_url,
reauthenticate=reauthenticate)
if not (user_id or user_name or token):
msg = 'You need to specify either a user_name, user_id or token'
raise TypeError(msg)
self.user_id = user_id
self.user_name = user_name
self.password = password
self.token = token
self.trust_id = trust_id
self.tenant_id = tenant_id
self.tenant_name = tenant_name
self.tenant_id = project_id
self.tenant_name = project_name
def authorize(self, transport, **kwargs):
headers = {'Accept': 'application/json'}
@ -71,64 +97,19 @@ class Auth(base.BaseIdentityPlugin):
return access.AccessInfoV2(**resp_data)
@abc.abstractmethod
def get_auth_data(self, headers=None):
"""Return the authentication section of an auth plugin.
:param dict headers: The headers that will be sent with the auth
request if a plugin needs to add to them.
:return dict: A dict of authentication data for the auth type.
"""
class Password(Auth):
def __init__(self, auth_url, username=None, password=None, user_id=None,
**kwargs):
"""A plugin for authenticating with a username and password.
A username or user_id must be provided.
:param string auth_url: Identity service endpoint for authorization.
:param string username: Username for authentication.
:param string password: Password for authentication.
:param string user_id: User ID for authentication.
:raises TypeError: if a user_id or username is not provided.
"""
super(Password, self).__init__(auth_url, **kwargs)
if not (user_id or username):
msg = 'You need to specify either a username or user_id'
raise TypeError(msg)
self.user_id = user_id
self.username = username
self.password = password
def get_auth_data(self, headers=None):
auth = {'password': self.password}
if self.username:
auth['username'] = self.username
elif self.user_id:
auth['userId'] = self.user_id
return {'passwordCredentials': auth}
class Token(Auth):
def __init__(self, auth_url, token, **kwargs):
"""A plugin for authenticating with an existing token.
:param string auth_url: Identity service endpoint for authorization.
:param string token: Existing token for authentication.
"""
super(Token, self).__init__(auth_url, **kwargs)
self.token = token
def get_auth_data(self, headers=None):
if headers is not None:
headers['X-Auth-Token'] = self.token
def get_auth_data(self, headers):
if self.token is None:
auth = {'password': self.password}
if self.user_name:
auth['username'] = self.user_name
elif self.user_id:
auth['userId'] = self.user_id
return {'passwordCredentials': auth}
headers['X-Auth-Token'] = self.token
return {'token': {'id': self.token}}
def invalidate(self):
if super(Auth, self).invalidate():
self.token = None
return True
return False

View File

@ -26,42 +26,90 @@ _logger = logging.getLogger(__name__)
class Auth(base.BaseIdentityPlugin):
def __init__(self, auth_url, auth_methods,
trust_id=None,
valid_options = [
'auth_url',
'domain_id',
'domain_name',
'password',
'project_domain_id',
'project_domain_name',
'project_id',
'project_name',
'reauthenticate',
'token',
'trust_id',
'user_domain_id',
'user_domain_name',
'user_id',
'user_name',
]
def __init__(self, auth_url,
domain_id=None,
domain_name=None,
project_id=None,
project_name=None,
password='',
project_domain_id=None,
project_domain_name=None,
reauthenticate=True):
project_id=None,
project_name=None,
reauthenticate=True,
token=None,
trust_id=None,
user_domain_id=None,
user_domain_name=None,
user_id=None,
user_name=None):
"""Construct an Identity V3 Authentication Plugin.
This authorization plugin should be constructed with a password
and user_id or user_name. It may also be constructed with a token.
:param string auth_url: Identity service endpoint for authentication.
:param list auth_methods: A collection of methods to authenticate with.
:param string trust_id: Trust ID for trust scoping.
:param string domain_id: Domain ID for domain scoping.
:param string domain_name: Domain name for domain scoping.
:param string project_id: Project ID for project scoping.
:param string project_name: Project name for project scoping.
:param string password: User password for authentication.
:param string project_domain_id: Project's domain ID for project.
:param string project_domain_name: Project's domain name for project.
:param bool reauthenticate: Allow fetching a new token if the current
one is going to expire.
(optional) default True
:param string project_id: Project ID for project scoping.
:param string project_name: Project name for project scoping.
:param bool reauthenticate: Get new token if token expires.
:param string token: Token to use for authentication.
:param string trust_id: Trust ID for trust scoping.
:param string user_domain_id: User's domain ID for authentication.
:param string user_domain_name: User's domain name for authentication.
:param string user_name: User name for authentication.
:param string user_id: User ID for authentication.
:raises TypeError: if a user_id, user_name or token is not provided.
"""
super(Auth, self).__init__(auth_url=auth_url,
reauthenticate=reauthenticate)
self.auth_methods = auth_methods
self.trust_id = trust_id
if not (user_id or user_name or token):
msg = 'You need to specify either a user_name, user_id or token'
raise TypeError(msg)
self.domain_id = domain_id
self.domain_name = domain_name
self.project_id = project_id
self.project_name = project_name
self.project_domain_id = project_domain_id
self.project_domain_name = project_domain_name
self.project_id = project_id
self.project_name = project_name
self.reauthenticate = reauthenticate
self.trust_id = trust_id
self.password_method = PasswordMethod(
password=password,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
user_name=user_name,
user_id=user_id,
)
if token:
self.token_method = TokenMethod(token=token)
self.auth_methods = [self.token_method]
else:
self.auth_methods = [self.password_method]
@property
def token_url(self):
@ -120,6 +168,12 @@ class Auth(base.BaseIdentityPlugin):
return access.AccessInfoV3(resp.headers['X-Subject-Token'],
**resp_data)
def invalidate(self):
if super(Auth, self).invalidate():
self.auth_methods = [self.password_method]
return True
return False
@six.add_metaclass(abc.ABCMeta)
class AuthMethod(object):
@ -128,27 +182,10 @@ class AuthMethod(object):
V3 Tokens allow multiple methods to be presented when authentication
against the server. Each one of these methods is implemented by an
AuthMethod.
Note: When implementing an AuthMethod use the method_parameters
and do not use positional arguments. Otherwise they can't be picked up by
the factory method and don't work as well with AuthConstructors.
"""
_method_parameters = []
def __init__(self, **kwargs):
for param in self._method_parameters:
setattr(self, param, kwargs.pop(param, None))
if kwargs:
msg = "Unexpected Attributes: %s" % ", ".join(kwargs.keys())
raise AttributeError(msg)
@classmethod
def _extract_kwargs(cls, kwargs):
"""Remove parameters related to this method from other kwargs."""
return dict([(p, kwargs.pop(p, None))
for p in cls._method_parameters])
for param in kwargs:
setattr(self, param, kwargs.get(param, None))
@abc.abstractmethod
def get_auth_data(self, transport, auth, headers, **kwargs):
@ -163,53 +200,14 @@ class AuthMethod(object):
"""
@six.add_metaclass(abc.ABCMeta)
class _AuthConstructor(Auth):
"""AuthConstructor creates an authentication plugin with one method.
AuthConstructor is a means of creating an authentication plugin that
contains only one authentication method. This is generally the required
usage.
An AuthConstructor creates an AuthMethod based on the method's
arguments and the auth_method_class defined by the plugin. It then
creates the auth plugin with only that authentication method.
"""
_auth_method_class = None
def __init__(self, auth_url, *args, **kwargs):
method_kwargs = self._auth_method_class._extract_kwargs(kwargs)
method = self._auth_method_class(*args, **method_kwargs)
super(_AuthConstructor, self).__init__(auth_url, [method], **kwargs)
class PasswordMethod(AuthMethod):
_method_parameters = ['user_id',
'username',
'user_domain_id',
'user_domain_name',
'password']
def __init__(self, **kwargs):
"""Construct a User/Password based authentication method.
:param string password: Password for authentication.
:param string username: Username for authentication.
:param string user_id: User ID for authentication.
:param string user_domain_id: User's domain ID for authentication.
:param string user_domain_name: User's domain name for authentication.
"""
super(PasswordMethod, self).__init__(**kwargs)
def get_auth_data(self, transport, auth, headers, **kwargs):
user = {'password': self.password}
if self.user_id:
user['id'] = self.user_id
elif self.username:
user['name'] = self.username
elif self.user_name:
user['name'] = self.user_name
if self.user_domain_id:
user['domain'] = {'id': self.user_domain_id}
@ -219,28 +217,7 @@ class PasswordMethod(AuthMethod):
return 'password', {'user': user}
class Password(_AuthConstructor):
_auth_method_class = PasswordMethod
class TokenMethod(AuthMethod):
_method_parameters = ['token']
def __init__(self, **kwargs):
"""Construct an Auth plugin to fetch a token from a token.
:param string token: Token for authentication.
"""
super(TokenMethod, self).__init__(**kwargs)
def get_auth_data(self, transport, auth, headers, **kwargs):
headers['X-Auth-Token'] = self.token
return 'token', {'id': self.token}
class Token(_AuthConstructor):
_auth_method_class = TokenMethod
def __init__(self, auth_url, token, **kwargs):
super(Token, self).__init__(auth_url, token=token, **kwargs)

View File

@ -28,7 +28,7 @@ class TestAuthenticatorCreate(base.TestCase):
project_domain_name='8',
user_domain_name='9',
)
self.assertEqual('1', auth.auth_methods[0].username)
self.assertEqual('1', auth.auth_methods[0].user_name)
self.assertEqual('2', auth.auth_methods[0].password)
self.assertEqual('4', auth.auth_url)
self.assertEqual('6', auth.project_name)
@ -57,7 +57,7 @@ class TestAuthenticatorCreate(base.TestCase):
version='2',
project_name='6',
)
self.assertEqual('1', auth.username)
self.assertEqual('1', auth.user_name)
self.assertEqual('2', auth.password)
self.assertEqual('4', auth.auth_url)
self.assertEqual('6', auth.tenant_name)

View File

@ -25,35 +25,45 @@ TEST_RESPONSE_DICT = common.TEST_RESPONSE_DICT_V2
class TestV2Auth(testtools.TestCase):
def test_password(self):
kargs = {'trust_id': common.TEST_TRUST_ID,
'tenant_id': common.TEST_TENANT_ID,
'tenant_name': common.TEST_TENANT_NAME}
kargs = {
'password': common.TEST_PASS,
'project_id': common.TEST_TENANT_ID,
'project_name': common.TEST_TENANT_NAME,
'trust_id': common.TEST_TRUST_ID,
'user_name': common.TEST_USER,
}
sot = v2.Password(TEST_URL, common.TEST_USER, common.TEST_PASS,
**kargs)
sot = v2.Auth(TEST_URL, **kargs)
self.assertEqual(common.TEST_USER, sot.username)
self.assertEqual(common.TEST_USER, sot.user_name)
self.assertEqual(common.TEST_PASS, sot.password)
self.assertEqual(common.TEST_TRUST_ID, sot.trust_id)
self.assertEqual(common.TEST_TENANT_ID, sot.tenant_id)
self.assertEqual(common.TEST_TENANT_NAME, sot.tenant_name)
expected = {'passwordCredentials': {'password': common.TEST_PASS,
'username': common.TEST_USER}}
self.assertEqual(expected, sot.get_auth_data())
headers = {}
self.assertEqual(expected, sot.get_auth_data(headers))
self.assertEqual({}, headers)
def test_token(self):
kargs = {'trust_id': common.TEST_TRUST_ID,
'tenant_id': common.TEST_TENANT_ID,
'tenant_name': common.TEST_TENANT_NAME}
kargs = {
'project_id': common.TEST_TENANT_ID,
'project_name': common.TEST_TENANT_NAME,
'token': common.TEST_TOKEN,
'trust_id': common.TEST_TRUST_ID,
}
sot = v2.Token(TEST_URL, common.TEST_TOKEN, **kargs)
sot = v2.Auth(TEST_URL, **kargs)
self.assertEqual(common.TEST_TOKEN, sot.token)
self.assertEqual(common.TEST_TRUST_ID, sot.trust_id)
self.assertEqual(common.TEST_TENANT_ID, sot.tenant_id)
self.assertEqual(common.TEST_TENANT_NAME, sot.tenant_name)
expected = {'token': {'id': common.TEST_TOKEN}}
self.assertEqual(expected, sot.get_auth_data())
headers = {}
self.assertEqual(expected, sot.get_auth_data(headers))
self.assertEqual({'X-Auth-Token': common.TEST_TOKEN}, headers)
def create_mock_transport(self, xresp):
transport = mock.Mock()
@ -65,10 +75,13 @@ class TestV2Auth(testtools.TestCase):
return transport
def test_authorize_tenant_id(self):
kargs = {'trust_id': common.TEST_TRUST_ID,
'tenant_id': common.TEST_TENANT_ID,
'tenant_name': common.TEST_TENANT_NAME}
sot = v2.Token(TEST_URL, common.TEST_TOKEN, **kargs)
kargs = {
'project_id': common.TEST_TENANT_ID,
'project_name': common.TEST_TENANT_NAME,
'token': common.TEST_TOKEN,
'trust_id': common.TEST_TRUST_ID,
}
sot = v2.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport(TEST_RESPONSE_DICT)
resp = sot.authorize(xport)
@ -85,8 +98,11 @@ class TestV2Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_tenant_name(self):
kargs = {'tenant_name': common.TEST_TENANT_NAME}
sot = v2.Token(TEST_URL, common.TEST_TOKEN, **kargs)
kargs = {
'project_name': common.TEST_TENANT_NAME,
'token': common.TEST_TOKEN,
}
sot = v2.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport(TEST_RESPONSE_DICT)
resp = sot.authorize(xport)
@ -102,7 +118,8 @@ class TestV2Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_only(self):
sot = v2.Token(TEST_URL, common.TEST_TOKEN)
kargs = {'token': common.TEST_TOKEN}
sot = v2.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport(TEST_RESPONSE_DICT)
resp = sot.authorize(xport)
@ -117,7 +134,42 @@ class TestV2Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_bad_response(self):
sot = v2.Token(TEST_URL, common.TEST_TOKEN)
kargs = {'token': common.TEST_TOKEN}
sot = v2.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport({})
self.assertRaises(exceptions.InvalidResponse, sot.authorize, xport)
def test_invalidate(self):
kargs = {
'password': common.TEST_PASS,
'token': common.TEST_TOKEN,
'user_name': common.TEST_USER,
}
sot = v2.Auth(TEST_URL, **kargs)
expected = {'token': {'id': common.TEST_TOKEN}}
headers = {}
self.assertEqual(expected, sot.get_auth_data(headers))
self.assertEqual({'X-Auth-Token': common.TEST_TOKEN}, headers)
self.assertEqual(True, sot.invalidate())
expected = {'passwordCredentials': {'password': common.TEST_PASS,
'username': common.TEST_USER}}
headers = {}
self.assertEqual(expected, sot.get_auth_data(headers))
self.assertEqual({}, headers)
def test_valid_options(self):
expected = [
'auth_url',
'user_name',
'user_id',
'password',
'project_id',
'project_name',
'reauthenticate',
'token',
'trust_id',
]
self.assertEqual(expected, v2.Auth.valid_options)

View File

@ -22,21 +22,22 @@ TEST_URL = 'http://127.0.0.1:5000/v3.0'
class TestV3Auth(testtools.TestCase):
def test_password_user_domain(self):
kargs = {'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME}
method = v3.PasswordMethod(username=common.TEST_USER,
user_id=common.TEST_USER_ID,
user_domain_id=common.TEST_DOMAIN_ID,
user_domain_name=common.TEST_DOMAIN_NAME,
password=common.TEST_PASS)
sot = v3.Auth(TEST_URL, [method], **kargs)
kargs = {
'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME,
'user_name': common.TEST_USER,
'user_id': common.TEST_USER_ID,
'user_domain_id': common.TEST_DOMAIN_ID,
'user_domain_name': common.TEST_DOMAIN_NAME,
'password': common.TEST_PASS,
}
sot = v3.Auth(TEST_URL, **kargs)
self.assertEqual(1, len(sot.auth_methods))
auther = sot.auth_methods[0]
self.assertEqual(common.TEST_USER_ID, auther.user_id)
self.assertEqual(common.TEST_USER, auther.username)
self.assertEqual(common.TEST_USER, auther.user_name)
self.assertEqual(common.TEST_DOMAIN_ID, auther.user_domain_id)
self.assertEqual(common.TEST_DOMAIN_NAME, auther.user_domain_name)
self.assertEqual(common.TEST_PASS, auther.password)
@ -53,21 +54,22 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(TEST_URL + '/auth/tokens', sot.token_url)
def test_password_domain(self):
kargs = {'domain_id': common.TEST_DOMAIN_ID,
'domain_name': common.TEST_DOMAIN_NAME,
'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME}
methods = [v3.PasswordMethod(username=common.TEST_USER,
user_id=common.TEST_USER_ID,
password=common.TEST_PASS)]
sot = v3.Auth(TEST_URL, methods, **kargs)
kargs = {
'domain_id': common.TEST_DOMAIN_ID,
'domain_name': common.TEST_DOMAIN_NAME,
'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME,
'user_name': common.TEST_USER,
'user_id': common.TEST_USER_ID,
'password': common.TEST_PASS,
}
sot = v3.Auth(TEST_URL, **kargs)
self.assertEqual(1, len(sot.auth_methods))
auther = sot.auth_methods[0]
self.assertEqual(common.TEST_USER_ID, auther.user_id)
self.assertEqual(common.TEST_USER, auther.username)
self.assertEqual(common.TEST_USER, auther.user_name)
self.assertEqual(None, auther.user_domain_id)
self.assertEqual(None, auther.user_domain_name)
self.assertEqual(common.TEST_PASS, auther.password)
@ -84,14 +86,15 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(TEST_URL + '/auth/tokens', sot.token_url)
def test_token_project_domain(self):
kargs = {'project_domain_id': common.TEST_DOMAIN_ID,
'project_domain_name': common.TEST_DOMAIN_NAME,
'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
kargs = {
'project_domain_id': common.TEST_DOMAIN_ID,
'project_domain_name': common.TEST_DOMAIN_NAME,
'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
self.assertEqual(1, len(sot.auth_methods))
auther = sot.auth_methods[0]
@ -118,8 +121,8 @@ class TestV3Auth(testtools.TestCase):
return transport
def test_authorize_token(self):
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods)
kargs = {'token': common.TEST_TOKEN}
sot = v3.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -136,9 +139,11 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_domain_id(self):
kargs = {'domain_id': common.TEST_DOMAIN_ID}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
kargs = {
'domain_id': common.TEST_DOMAIN_ID,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -156,9 +161,11 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_domain_name(self):
kargs = {'domain_name': common.TEST_DOMAIN_NAME}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
kargs = {
'domain_name': common.TEST_DOMAIN_NAME,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -177,9 +184,11 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_project_id(self):
kargs = {'project_id': common.TEST_PROJECT_ID}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
kargs = {
'project_id': common.TEST_PROJECT_ID,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -198,10 +207,12 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_project_name(self):
kargs = {'project_name': common.TEST_PROJECT_NAME,
'project_domain_id': common.TEST_DOMAIN_ID}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
kargs = {
'project_name': common.TEST_PROJECT_NAME,
'project_domain_id': common.TEST_DOMAIN_ID,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -222,10 +233,12 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_project_name_domain_name(self):
kargs = {'project_name': common.TEST_PROJECT_NAME,
'project_domain_name': common.TEST_DOMAIN_NAME}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
kargs = {
'project_name': common.TEST_PROJECT_NAME,
'project_domain_name': common.TEST_DOMAIN_NAME,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -246,9 +259,11 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_trust_id(self):
kargs = {'trust_id': common.TEST_TRUST_ID}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
kargs = {
'token': common.TEST_TOKEN,
'trust_id': common.TEST_TRUST_ID,
}
sot = v3.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -266,55 +281,71 @@ class TestV3Auth(testtools.TestCase):
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_multi_method(self):
methods = [v3.PasswordMethod(username=common.TEST_USER,
password=common.TEST_PASS),
v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
eurl = TEST_URL + '/auth/tokens'
eheaders = {'Accept': 'application/json',
'X-Auth-Token': common.TEST_TOKEN}
up = {'password': common.TEST_PASS, 'name': common.TEST_USER}
ejson = {'auth': {'identity': {'token': {'id': common.TEST_TOKEN},
'password': {'user': up},
'methods': ['password', 'token']}}}
xport.post.assert_called_with(eurl, headers=eheaders, json=ejson)
ecatalog = common.TEST_RESPONSE_DICT_V3['token'].copy()
ecatalog['auth_token'] = common.TEST_SUBJECT
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_mutually_exclusive(self):
x = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
kargs = {'token': common.TEST_TOKEN}
kargs = {'domain_id': 'a',
'project_id': 'b'}
sot = v3.Auth(TEST_URL, methods, **kargs)
sot = v3.Auth(TEST_URL, **kargs)
sot.domain_id = 'a'
sot.project_id = 'b'
self.assertRaises(exceptions.AuthorizationFailure, sot.authorize, x)
kargs = {'domain_name': 'a',
'project_name': 'b'}
sot = v3.Auth(TEST_URL, methods, **kargs)
sot = v3.Auth(TEST_URL, **kargs)
sot.domain_name = 'a'
sot.project_name = 'b'
self.assertRaises(exceptions.AuthorizationFailure, sot.authorize, x)
kargs = {'domain_name': 'a',
'trust_id': 'b'}
sot = v3.Auth(TEST_URL, methods, **kargs)
sot = v3.Auth(TEST_URL, **kargs)
sot.domain_name = 'a'
sot.trust_id = 'b'
self.assertRaises(exceptions.AuthorizationFailure, sot.authorize, x)
kargs = {'project_id': 'a',
'trust_id': 'b'}
sot = v3.Auth(TEST_URL, methods, **kargs)
sot = v3.Auth(TEST_URL, **kargs)
sot.project_id = 'a'
sot.trust_id = 'b'
self.assertRaises(exceptions.AuthorizationFailure, sot.authorize, x)
def test_authorize_bad_response(self):
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods)
kargs = {'token': common.TEST_TOKEN}
sot = v3.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport({})
self.assertRaises(exceptions.InvalidResponse, sot.authorize, xport)
def test_invalidate(self):
kargs = {
'user_name': common.TEST_USER,
'password': common.TEST_PASS,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
self.assertEqual(1, len(sot.auth_methods))
auther = sot.auth_methods[0]
self.assertEqual(common.TEST_TOKEN, auther.token)
self.assertEqual(True, sot.invalidate())
self.assertEqual(1, len(sot.auth_methods))
auther = sot.auth_methods[0]
self.assertEqual(common.TEST_USER, auther.user_name)
self.assertEqual(common.TEST_PASS, auther.password)
def test_valid_options(self):
expected = [
'auth_url',
'domain_id',
'domain_name',
'password',
'project_domain_id',
'project_domain_name',
'project_id',
'project_name',
'reauthenticate',
'token',
'trust_id',
'user_domain_id',
'user_domain_name',
'user_id',
'user_name',
]
self.assertEqual(expected, v3.Auth.valid_options)

View File

@ -96,7 +96,7 @@ class TestSessionCreate(base.TestCase):
user_agent='9',
region='10',
)
self.assertEqual('1', sess.authenticator.auth_methods[0].username)
self.assertEqual('1', sess.authenticator.auth_methods[0].user_name)
self.assertEqual('2', sess.authenticator.auth_methods[0].password)
self.assertEqual('7', sess.transport.verify)
self.assertEqual('9', sess.transport._user_agent)