Authentication from keystoneclient

This authentication codes is pretty much copied from the
keystoneclient, but greatly simplified.  The tests in
keystoneclient were hard to bring over without a lot of
other baggage, so the tests were created from scratch.

Change-Id: I1033c7f64ac47bf461679d5ffa8847a1acf7c59e
This commit is contained in:
Terry Howe 2014-05-02 14:19:03 -06:00
parent 56635b082a
commit 93dd42ba68
25 changed files with 2583 additions and 39 deletions

500
openstack/auth/access.py Normal file
View File

@ -0,0 +1,500 @@
# Copyright 2012 Nebula, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from openstack.auth import service_catalog as catalog
from openstack.common import timeutils
# Do not use token before expiration
BEST_BEFORE_SECONDS = 30
class AccessInfo(object):
"""Encapsulates a raw authentication token from keystone.
Provides helper methods for extracting useful values from that token.
"""
def __init__(self, **kwargs):
"""Construct access info
"""
self._info = kwargs
@classmethod
def factory(cls, resp=None, body=None, **kwargs):
"""Create AccessInfo object given a successful auth response & body
or a user-provided dict.
"""
if body is not None or len(kwargs):
if AccessInfoV3.is_valid(body, **kwargs):
token = None
if resp:
token = resp.headers['X-Subject-Token']
if body:
return AccessInfoV3(token, **body['token'])
else:
return AccessInfoV3(token, **kwargs)
elif AccessInfoV2.is_valid(body, **kwargs):
if body:
return AccessInfoV2(**body['access'])
else:
return AccessInfoV2(**kwargs)
else:
raise NotImplementedError('Unrecognized auth response')
else:
return AccessInfoV2(**kwargs)
def will_expire_soon(self, best_before=BEST_BEFORE_SECONDS):
"""Determines if expiration is about to occur.
:return: boolean : true if expiration is within the given duration
"""
norm_expires = timeutils.normalize_time(self.expires)
soon = (timeutils.utcnow() + datetime.timedelta(seconds=best_before))
return norm_expires < soon
@classmethod
def is_valid(cls, body, **kwargs):
"""Determines if processing v2 or v3 token given a successful
auth body or a user-provided dict.
:return: boolean : true if auth body matches implementing class
"""
raise NotImplementedError()
def has_service_catalog(self):
"""Returns true if the authorization token has a service catalog.
:returns: boolean
"""
raise NotImplementedError()
@property
def auth_token(self):
"""Returns the token_id associated with the auth request, to be used
in headers for authenticating OpenStack API requests.
:returns: str
"""
raise NotImplementedError()
@property
def expires(self):
"""Returns the token expiration (as datetime object)
:returns: datetime
"""
raise NotImplementedError()
@property
def username(self):
"""Returns the username associated with the authentication request.
Follows the pattern defined in the V2 API of first looking for 'name',
returning that if available, and falling back to 'username' if name
is unavailable.
:returns: str
"""
raise NotImplementedError()
@property
def user_id(self):
"""Returns the user id associated with the authentication request.
:returns: str
"""
raise NotImplementedError()
@property
def user_domain_id(self):
"""Returns the domain id of the user associated with the authentication
request.
For v2, it always returns 'default' which may be different from the
Keystone configuration.
:returns: str
"""
raise NotImplementedError()
@property
def user_domain_name(self):
"""Returns the domain name of the user associated with the
authentication request.
For v2, it always returns 'Default' which may be different from the
Keystone configuration.
:returns: str
"""
raise NotImplementedError()
@property
def role_names(self):
"""Returns a list of role names of the user associated with the
authentication request.
:returns: a list of strings of role names
"""
raise NotImplementedError()
@property
def domain_name(self):
"""Returns the domain name associated with the authentication token.
:returns: str or None (if no domain associated with the token)
"""
raise NotImplementedError()
@property
def domain_id(self):
"""Returns the domain id associated with the authentication token.
:returns: str or None (if no domain associated with the token)
"""
raise NotImplementedError()
@property
def project_name(self):
"""Returns the project name associated with the authentication request.
:returns: str or None (if no project associated with the token)
"""
raise NotImplementedError()
@property
def tenant_name(self):
"""Synonym for project_name."""
return self.project_name
@property
def project_scoped(self):
"""Returns true if the authorization token was scoped to a tenant
(project).
:returns: bool
"""
raise NotImplementedError()
@property
def domain_scoped(self):
"""Returns true if the authorization token was scoped to a domain.
:returns: bool
"""
raise NotImplementedError()
@property
def trust_id(self):
"""Returns the trust id associated with the authentication token.
:returns: str or None (if no trust associated with the token)
"""
raise NotImplementedError()
@property
def trust_scoped(self):
"""Returns true if the authorization token was scoped as delegated in a
trust, via the OS-TRUST v3 extension.
:returns: bool
"""
raise NotImplementedError()
@property
def project_id(self):
"""Returns the project ID associated with the authentication
request, or None if the authentication request wasn't scoped to a
project.
:returns: str or None (if no project associated with the token)
"""
raise NotImplementedError()
@property
def tenant_id(self):
"""Synonym for project_id."""
return self.project_id
@property
def project_domain_id(self):
"""Returns the domain id of the project associated with the
authentication request.
For v2, it returns 'default' if a project is scoped or None which may
be different from the keystone configuration.
:returns: str
"""
raise NotImplementedError()
@property
def project_domain_name(self):
"""Returns the domain name of the project associated with the
authentication request.
For v2, it returns 'Default' if a project is scoped or None which may
be different from the keystone configuration.
:returns: str
"""
raise NotImplementedError()
@property
def version(self):
"""Returns the version of the auth token from identity service.
:returns: str
"""
return self._info.get('version')
class AccessInfoV2(AccessInfo):
"""An object for encapsulating a raw v2 auth token from identity
service.
"""
def __init__(self, **kwargs):
super(AccessInfoV2, self).__init__(**kwargs)
self._info.update(version='v2.0')
service_catalog = self._info['serviceCatalog']
self.service_catalog = catalog.ServiceCatalogV2(service_catalog)
@classmethod
def is_valid(cls, body, **kwargs):
if body:
return 'access' in body
elif kwargs:
return kwargs.get('version') == 'v2.0'
else:
return False
def has_service_catalog(self):
return 'serviceCatalog' in self._info
@property
def auth_token(self):
return self._info['token']['id']
@property
def expires(self):
return timeutils.parse_isotime(self._info['token']['expires'])
@property
def username(self):
user = self._info['user']
return user.get('name', user.get('username'))
@property
def user_id(self):
return self._info['user']['id']
@property
def user_domain_id(self):
return 'default'
@property
def user_domain_name(self):
return 'Default'
@property
def role_names(self):
return [r['name'] for r in self._info['user'].get('roles', [])]
@property
def domain_name(self):
return None
@property
def domain_id(self):
return None
@property
def project_name(self):
try:
tenant_dict = self._info['token']['tenant']
except KeyError:
pass
else:
return tenant_dict.get('name')
# pre grizzly
try:
return self._info['user']['tenantName']
except KeyError:
pass
# pre diablo, keystone only provided a tenantId
try:
return self._info['token']['tenantId']
except KeyError:
pass
@property
def project_scoped(self):
return 'tenant' in self._info['token']
@property
def domain_scoped(self):
return False
@property
def trust_id(self):
return self._info.get('trust', {}).get('id')
@property
def trust_scoped(self):
return 'trust' in self._info
@property
def project_id(self):
try:
tenant_dict = self._info['token']['tenant']
except KeyError:
pass
else:
return tenant_dict.get('id')
# pre grizzly
try:
return self._info['user']['tenantId']
except KeyError:
pass
# pre diablo
try:
return self._info['token']['tenantId']
except KeyError:
pass
@property
def project_domain_id(self):
if self.project_id:
return 'default'
@property
def project_domain_name(self):
if self.project_id:
return 'Default'
class AccessInfoV3(AccessInfo):
"""An object for encapsulating a raw v3 auth token from identity
service.
"""
def __init__(self, token, **kwargs):
super(AccessInfoV3, self).__init__(**kwargs)
self._info.update(version='v3')
self.service_catalog = catalog.ServiceCatalog(self._info['catalog'])
if token:
self._info.update(auth_token=token)
@classmethod
def is_valid(cls, body, **kwargs):
if body:
return 'token' in body
elif kwargs:
return kwargs.get('version') == 'v3'
else:
return False
def has_service_catalog(self):
return 'catalog' in self._info
@property
def auth_token(self):
return self._info['auth_token']
@property
def expires(self):
return timeutils.parse_isotime(self._info['expires_at'])
@property
def user_id(self):
return self._info['user']['id']
@property
def user_domain_id(self):
return self._info['user']['domain']['id']
@property
def user_domain_name(self):
return self._info['user']['domain']['name']
@property
def role_names(self):
return [r['name'] for r in self._info.get('roles', [])]
@property
def username(self):
return self._info['user']['name']
@property
def domain_name(self):
domain = self._info.get('domain')
if domain:
return domain['name']
@property
def domain_id(self):
domain = self._info.get('domain')
if domain:
return domain['id']
@property
def project_id(self):
project = self._info.get('project')
if project:
return project['id']
@property
def project_domain_id(self):
project = self._info.get('project')
if project:
return project['domain']['id']
@property
def project_domain_name(self):
project = self._info.get('project')
if project:
return project['domain']['name']
@property
def project_name(self):
project = self._info.get('project')
if project:
return project['name']
@property
def project_scoped(self):
return 'project' in self._info
@property
def domain_scoped(self):
return 'domain' in self._info
@property
def trust_id(self):
return self._info.get('OS-TRUST:trust', {}).get('id')
@property
def trust_scoped(self):
return 'OS-TRUST:trust' in self._info

View File

@ -47,8 +47,8 @@ class BaseAuthenticator(object):
:param Transport transport: A transport object so the authenticator
can make HTTP calls
:param ServiceIdentifier service: The object that identifies the
service for the authenticator.
:param ServiceFilter service: The filter to identify the desired
service.
:returns string: The base URL that will be used to talk to the
required service or None if not available.

View File

View File

@ -0,0 +1,86 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from openstack.auth import base
@six.add_metaclass(abc.ABCMeta)
class BaseIdentityPlugin(base.BaseAuthenticator):
# Consider a token valid if it does not expire for this many seconds
BEST_BEFORE_SECONDS = 1
def __init__(self, auth_url=None):
super(BaseIdentityPlugin, self).__init__()
self.auth_url = auth_url
self.access_info = None
@abc.abstractmethod
def authorize(self, transport, **kwargs):
"""Obtain access information from an OpenStack Identity Service.
Thus method will authenticate and fetch a new AccessInfo when
invoked.
:raises InvalidResponse: The response returned wasn't appropriate.
:raises HttpError: An error from an invalid HTTP response.
:returns AccessInfo: Token access information.
"""
def get_token(self, transport, **kwargs):
"""Return a valid auth token.
If a valid token is not present then a new one will be fetched.
:raises HttpError: An error from an invalid HTTP response.
:return string: A valid token.
"""
return self.get_access(transport).auth_token
def get_access(self, transport, **kwargs):
"""Fetch or return a current AccessInfo object.
If a valid AccessInfo is present then it is returned otherwise a new
one will be fetched.
:raises HttpError: An error from an invalid HTTP response.
:returns AccessInfo: Valid AccessInfo
"""
if (not self.access_info or
self.access_info.will_expire_soon(self.BEST_BEFORE_SECONDS)):
self.access_info = self.authorize(transport, kwargs)
return self.access_info
def get_endpoint(self, transport, service, **kwargs):
"""Return a valid endpoint for a service.
If a valid token is not present then a new one will be fetched using
the transport and kwargs.
:param Transport transport: A transport object so the authenticator
can authenticate.
:param ServiceFilter service: The filter to identify the desired
service.
:raises HttpError: An error from an invalid HTTP response.
:return string or None: A valid endpoint URL or None if not available.
"""
service_catalog = self.get_access(transport, kwargs).service_catalog
return service_catalog.get_url(service)

View File

@ -0,0 +1,106 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from openstack.auth import access
from openstack.auth.identity import base
from openstack import exceptions
@six.add_metaclass(abc.ABCMeta)
class Auth(base.BaseIdentityPlugin):
def __init__(self, auth_url,
trust_id=None,
tenant_id=None,
tenant_name=None):
"""Construct an Identity V2 Authentication Plugin.
:param string auth_url: Identity service endpoint for authorization.
:param string trust_id: Trust ID for trust scoping.
:param string tenant_id: Tenant ID for project scoping.
:param string tenant_name: Tenant name for project scoping.
"""
super(Auth, self).__init__(auth_url=auth_url)
self.trust_id = trust_id
self.tenant_id = tenant_id
self.tenant_name = tenant_name
def authorize(self, transport, **kwargs):
headers = {'Accept': 'application/json'}
url = self.auth_url + '/tokens'
params = {'auth': self.get_auth_data(headers)}
if self.tenant_id:
params['auth']['tenantId'] = self.tenant_id
elif self.tenant_name:
params['auth']['tenantName'] = self.tenant_name
if self.trust_id:
params['auth']['trust_id'] = self.trust_id
resp = transport.post(url, json=params, headers=headers)
try:
resp_data = resp.json()['access']
except (KeyError, ValueError):
raise exceptions.InvalidResponse(response=resp)
return access.AccessInfoV2(**resp_data)
@abc.abstractmethod
def get_auth_data(self, headers=None):
"""Return the authentication section of an auth plugin.
:param dict headers: The headers that will be sent with the auth
request if a plugin needs to add to them.
:return dict: A dict of authentication data for the auth type.
"""
class Password(Auth):
def __init__(self, auth_url, username, password, **kwargs):
"""A plugin for authenticating with a username and password.
:param string auth_url: Identity service endpoint for authorization.
:param string username: Username for authentication.
:param string password: Password for authentication.
"""
super(Password, self).__init__(auth_url, **kwargs)
self.username = username
self.password = password
def get_auth_data(self, headers=None):
return {'passwordCredentials': {'username': self.username,
'password': self.password}}
class Token(Auth):
def __init__(self, auth_url, token, **kwargs):
"""A plugin for authenticating with an existing token.
:param string auth_url: Identity service endpoint for authorization.
:param string token: Existing token for authentication.
"""
super(Token, self).__init__(auth_url, **kwargs)
self.token = token
def get_auth_data(self, headers=None):
if headers is not None:
headers['X-Auth-Token'] = self.token
return {'token': {'id': self.token}}

View File

@ -0,0 +1,237 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from openstack.auth import access
from openstack.auth.identity import base
from openstack import exceptions
_logger = logging.getLogger(__name__)
class Auth(base.BaseIdentityPlugin):
def __init__(self, auth_url, auth_methods,
trust_id=None,
domain_id=None,
domain_name=None,
project_id=None,
project_name=None,
project_domain_id=None,
project_domain_name=None):
"""Construct an Identity V3 Authentication Plugin.
:param string auth_url: Identity service endpoint for authentication.
:param list auth_methods: A collection of methods to authenticate with.
:param string trust_id: Trust ID for trust scoping.
:param string domain_id: Domain ID for domain scoping.
:param string domain_name: Domain name for domain scoping.
:param string project_id: Project ID for project scoping.
:param string project_name: Project name for project scoping.
:param string project_domain_id: Project's domain ID for project.
:param string project_domain_name: Project's domain name for project.
"""
super(Auth, self).__init__(auth_url=auth_url)
self.auth_methods = auth_methods
self.trust_id = trust_id
self.domain_id = domain_id
self.domain_name = domain_name
self.project_id = project_id
self.project_name = project_name
self.project_domain_id = project_domain_id
self.project_domain_name = project_domain_name
@property
def token_url(self):
"""The full URL where we will send authentication data."""
return '%s/auth/tokens' % self.auth_url.rstrip('/')
def authorize(self, transport, **kwargs):
headers = {'Accept': 'application/json'}
body = {'auth': {'identity': {}}}
ident = body['auth']['identity']
for method in self.auth_methods:
name, auth_data = method.get_auth_data(transport, self, headers)
ident.setdefault('methods', []).append(name)
ident[name] = auth_data
if not ident:
raise exceptions.AuthorizationFailure('Authentication method '
'required (e.g. password)')
mutual_exclusion = [bool(self.domain_id or self.domain_name),
bool(self.project_id or self.project_name),
bool(self.trust_id)]
if sum(mutual_exclusion) > 1:
raise exceptions.AuthorizationFailure('Authentication cannot be '
'scoped to multiple '
'targets. Pick one of: '
'project, domain or trust')
if self.domain_id:
body['auth']['scope'] = {'domain': {'id': self.domain_id}}
elif self.domain_name:
body['auth']['scope'] = {'domain': {'name': self.domain_name}}
elif self.project_id:
body['auth']['scope'] = {'project': {'id': self.project_id}}
elif self.project_name:
scope = body['auth']['scope'] = {'project': {}}
scope['project']['name'] = self.project_name
if self.project_domain_id:
scope['project']['domain'] = {'id': self.project_domain_id}
elif self.project_domain_name:
scope['project']['domain'] = {'name': self.project_domain_name}
elif self.trust_id:
body['auth']['scope'] = {'OS-TRUST:trust': {'id': self.trust_id}}
resp = transport.post(self.token_url, json=body, headers=headers)
try:
resp_data = resp.json()['token']
except (KeyError, ValueError):
raise exceptions.InvalidResponse(response=resp)
return access.AccessInfoV3(resp.headers['X-Subject-Token'],
**resp_data)
@six.add_metaclass(abc.ABCMeta)
class AuthMethod(object):
"""One part of a V3 Authentication strategy.
V3 Tokens allow multiple methods to be presented when authentication
against the server. Each one of these methods is implemented by an
AuthMethod.
Note: When implementing an AuthMethod use the method_parameters
and do not use positional arguments. Otherwise they can't be picked up by
the factory method and don't work as well with AuthConstructors.
"""
_method_parameters = []
def __init__(self, **kwargs):
for param in self._method_parameters:
setattr(self, param, kwargs.pop(param, None))
if kwargs:
msg = "Unexpected Attributes: %s" % ", ".join(kwargs.keys())
raise AttributeError(msg)
@classmethod
def _extract_kwargs(cls, kwargs):
"""Remove parameters related to this method from other kwargs."""
return dict([(p, kwargs.pop(p, None))
for p in cls._method_parameters])
@abc.abstractmethod
def get_auth_data(self, transport, auth, headers, **kwargs):
"""Return the authentication section of an auth plugin.
:param Transport transport: The communication transport.
:param Auth auth: The auth plugin calling the method.
:param dict headers: The headers that will be sent with the auth
request if a plugin needs to add to them.
:return tuple(string, dict): The identifier of this plugin and a dict
of authentication data for the auth type.
"""
@six.add_metaclass(abc.ABCMeta)
class _AuthConstructor(Auth):
"""AuthConstructor is a means of creating an Auth Plugin that contains
only one authentication method. This is generally the required usage.
An AuthConstructor creates an AuthMethod based on the method's
arguments and the auth_method_class defined by the plugin. It then
creates the auth plugin with only that authentication method.
"""
_auth_method_class = None
def __init__(self, auth_url, *args, **kwargs):
method_kwargs = self._auth_method_class._extract_kwargs(kwargs)
method = self._auth_method_class(*args, **method_kwargs)
super(_AuthConstructor, self).__init__(auth_url, [method], **kwargs)
class PasswordMethod(AuthMethod):
_method_parameters = ['user_id',
'username',
'user_domain_id',
'user_domain_name',
'password']
def __init__(self, **kwargs):
"""Construct a User/Password based authentication method.
:param string password: Password for authentication.
:param string username: Username for authentication.
:param string user_id: User ID for authentication.
:param string user_domain_id: User's domain ID for authentication.
:param string user_domain_name: User's domain name for authentication.
"""
super(PasswordMethod, self).__init__(**kwargs)
def get_auth_data(self, transport, auth, headers, **kwargs):
user = {'password': self.password}
if self.user_id:
user['id'] = self.user_id
elif self.username:
user['name'] = self.username
if self.user_domain_id:
user['domain'] = {'id': self.user_domain_id}
elif self.user_domain_name:
user['domain'] = {'name': self.user_domain_name}
return 'password', {'user': user}
class Password(_AuthConstructor):
_auth_method_class = PasswordMethod
class TokenMethod(AuthMethod):
_method_parameters = ['token']
def __init__(self, **kwargs):
"""Construct a Auth plugin to fetch a token from a token.
:param string token: Token for authentication.
"""
super(TokenMethod, self).__init__(**kwargs)
def get_auth_data(self, transport, auth, headers, **kwargs):
headers['X-Auth-Token'] = self.token
return 'token', {'id': self.token}
class Token(_AuthConstructor):
_auth_method_class = TokenMethod
def __init__(self, auth_url, token, **kwargs):
super(Token, self).__init__(auth_url, token=token, **kwargs)

View File

@ -1,32 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class ServiceIdentifier(object):
"""The basic structure of an authentication plugin."""
PUBLIC = 'public'
INTERNAL = 'internal'
ADMIN = 'admin'
VISIBILITY = [PUBLIC, INTERNAL, ADMIN]
def __init__(self, service_type, visibility=PUBLIC, region=None):
"""" Create a service identifier.
:param string service_type: The desired type of service.
:param string visibility: The exposure of the endpoint. Should be
`public` (default), `internal` or `admin`.
:param string region: The desired region (optional).
"""
self.service_type = service_type
self.visibility = visibility
self.region = region

View File

@ -0,0 +1,108 @@
# Copyright 2011 OpenStack Foundation
# Copyright 2011, Piston Cloud Computing, Inc.
# Copyright 2011 Nebula, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import six
from openstack import exceptions
class ServiceCatalog(object):
"""Helper methods for dealing with a Keystone Service Catalog."""
def __init__(self, catalog):
if catalog is None:
self.catalog = []
raise exceptions.EmptyCatalog('The service catalog is missing')
self.catalog = copy.deepcopy(catalog)
def get_urls(self, filtration):
"""Fetch and filter endpoints for the specified service.
Returns endpoints for the specified service (or all) containing
the specified type (or all) and region (or all) and service name.
If there is no name in the service catalog the service_name check will
be skipped. This allows compatibility with services that existed
before the name was available in the catalog.
"""
eps = []
for service in self.catalog:
if not filtration.match_service_type(service.get('type')):
continue
if not filtration.match_service_name(service.get('name')):
continue
for endpoint in service.get('endpoints', []):
if not filtration.match_region(endpoint.get('region')):
continue
if not filtration.match_visibility(endpoint.get('interface')):
continue
url = endpoint.get('url')
if url is not None:
eps += [url]
return eps
def get_url(self, service):
"""Fetch an endpoint from the service catalog.
Get the first endpoint that matches the service filter.
:param ServiceFilter service: The filter to identify the desired
service.
"""
urls = self.get_urls(service)
if len(urls) < 1:
message = "Endpoint not found for %s" % six.text_type(service)
raise exceptions.EndpointNotFound(message)
return urls[0]
class ServiceCatalogV2(ServiceCatalog):
"""The V2 service catalog from Keystone.
"""
def __init__(self, catalog):
super(ServiceCatalogV2, self).__init__(catalog)
self._normalize()
def _normalize(self):
"""Handle differences in the way v2 and v3 catalogs specify endpoints.
Normallize the v2 service catalog to the endpoint types used in v3.
"""
for service in self.catalog:
eps = []
for endpoint in service['endpoints']:
if 'adminURL' in endpoint:
eps += [{
'interface': 'admin',
'region': endpoint['region'],
'url': endpoint['adminURL'],
}]
if 'internalURL' in endpoint:
eps += [{
'interface': 'internal',
'region': endpoint['region'],
'url': endpoint['internalURL'],
}]
if 'publicURL' in endpoint:
eps += [{
'interface': 'public',
'region': endpoint['region'],
'url': endpoint['publicURL'],
}]
service['endpoints'] = eps

View File

@ -0,0 +1,76 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack import exceptions
class ServiceFilter(object):
"""The basic structure of an authentication plugin."""
PUBLIC = 'public'
INTERNAL = 'internal'
ADMIN = 'admin'
VISIBILITY = [PUBLIC, INTERNAL, ADMIN]
def __init__(self, service_type, visibility=PUBLIC, region=None,
service_name=None):
"""" Create a service identifier.
:param string service_type: The desired type of service.
:param string visibility: The exposure of the endpoint. Should be
`public` (default), `internal` or `admin`.
:param string region: The desired region (optional).
:param string service_name: Name of the service
"""
self.service_type = service_type
if not service_type:
msg = "Service type must be specified to locate service"
raise exceptions.SdkException(msg)
if not visibility:
msg = "Visibility must be specified to locate service"
raise exceptions.SdkException(msg)
visibility = visibility.rstrip('URL')
if visibility not in self.VISIBILITY:
msg = "Visibility <%s> not in %s" % (visibility, self.VISIBILITY)
raise exceptions.SdkException(msg)
self.visibility = visibility
self.region = region
self.service_name = service_name
def __repr__(self):
ret = "service_type=%s" % self.service_type
ret += ",visibility=%s" % self.visibility
if self.region is not None:
ret += ",region=%s" % self.region
if self.service_name:
ret += ",service_name=%s" % self.service_name
return ret
def match_service_type(self, service_type):
return self.service_type == service_type
def match_service_name(self, service_name):
if not self.service_name:
return True
if self.service_name == service_name:
return True
return False
def match_region(self, region):
if not self.region:
return True
if self.region == region:
return True
return False
def match_visibility(self, visibility):
return self.visibility == visibility

View File

@ -0,0 +1,31 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.auth import base
class Token(base.BaseAuthPlugin):
"""A provider that will always use the given token and endpoint.
This is really only useful for testing and in certain CLI cases where you
have a known endpoint and admin token that you want to use.
"""
def __init__(self, endpoint, token):
# NOTE(jamielennox): endpoint is reserved for when plugins
# can be used to provide that information
self.endpoint = endpoint
self.token = token
def get_token(self, session):
return self.token

View File

View File

@ -0,0 +1,204 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Time related utilities and helper functions.
"""
import calendar
import datetime
import time
import iso8601
import six
# ISO 8601 extended time format with microseconds
_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
def isotime(at=None, subsecond=False):
"""Stringify time in ISO 8601 format."""
if not at:
at = utcnow()
st = at.strftime(_ISO8601_TIME_FORMAT
if not subsecond
else _ISO8601_TIME_FORMAT_SUBSECOND)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
st += ('Z' if tz == 'UTC' else tz)
return st
def parse_isotime(timestr):
"""Parse time from ISO 8601 format."""
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(six.text_type(e))
except TypeError as e:
raise ValueError(six.text_type(e))
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
"""Returns formatted utcnow."""
if not at:
at = utcnow()
return at.strftime(fmt)
def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
"""Turn a formatted time back into a datetime."""
return datetime.datetime.strptime(timestr, fmt)
def normalize_time(timestamp):
"""Normalize time in arbitrary timezone to UTC naive object."""
offset = timestamp.utcoffset()
if offset is None:
return timestamp
return timestamp.replace(tzinfo=None) - offset
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
if isinstance(before, six.string_types):
before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
if isinstance(after, six.string_types):
after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds)
def utcnow_ts():
"""Timestamp version of our utcnow function."""
if utcnow.override_time is None:
# NOTE(kgriffs): This is several times faster
# than going through calendar.timegm(...)
return int(time.time())
return calendar.timegm(utcnow().timetuple())
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
try:
return utcnow.override_time.pop(0)
except AttributeError:
return utcnow.override_time
return datetime.datetime.utcnow()
def iso8601_from_timestamp(timestamp):
"""Returns a iso8601 formated date from timestamp."""
return isotime(datetime.datetime.utcfromtimestamp(timestamp))
utcnow.override_time = None
def set_time_override(override_time=None):
"""Overrides utils.utcnow.
Make it return a constant time or a list thereof, one at a time.
:param override_time: datetime instance or list thereof. If not
given, defaults to the current UTC time.
"""
utcnow.override_time = override_time or datetime.datetime.utcnow()
def advance_time_delta(timedelta):
"""Advance overridden time using a datetime.timedelta."""
assert(not utcnow.override_time is None)
try:
for dt in utcnow.override_time:
dt += timedelta
except TypeError:
utcnow.override_time += timedelta
def advance_time_seconds(seconds):
"""Advance overridden time by seconds."""
advance_time_delta(datetime.timedelta(0, seconds))
def clear_time_override():
"""Remove the overridden time."""
utcnow.override_time = None
def marshall_now(now=None):
"""Make an rpc-safe datetime with microseconds.
Note: tzinfo is stripped, but not required for relative times.
"""
if not now:
now = utcnow()
return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
minute=now.minute, second=now.second,
microsecond=now.microsecond)
def unmarshall_time(tyme):
"""Unmarshall a datetime dict."""
return datetime.datetime(day=tyme['day'],
month=tyme['month'],
year=tyme['year'],
hour=tyme['hour'],
minute=tyme['minute'],
second=tyme['second'],
microsecond=tyme['microsecond'])
def delta_seconds(before, after):
"""Return the difference between two timing objects.
Compute the difference in seconds between two date, time, or
datetime objects (as a float, to microsecond resolution).
"""
delta = after - before
return total_seconds(delta)
def total_seconds(delta):
"""Return the total seconds of datetime.timedelta object.
Compute total seconds of datetime.timedelta, datetime.timedelta
doesn't have method total_seconds in Python2.6, calculate it manually.
"""
try:
return delta.total_seconds()
except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6))
def is_soon(dt, window):
"""Determines if time is going to happen in the next window seconds.
:params dt: the time
:params window: minimum seconds to remain to consider the time not soon
:return: True if expiration is within the given duration
"""
soon = (utcnow() + datetime.timedelta(seconds=window))
return normalize_time(dt) <= soon

56
openstack/exceptions.py Normal file
View File

@ -0,0 +1,56 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 Nebula, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Exception definitions.
"""
class SdkException(Exception):
"""The base exception class for all exceptions this library raises.
"""
pass
class AuthorizationFailure(SdkException):
"""Cannot authorize API client."""
pass
class EndpointException(SdkException):
"""Something is rotten in Service Catalog."""
pass
class EndpointNotFound(EndpointException):
"""Could not find requested endpoint in Service Catalog."""
pass
class EmptyCatalog(EndpointNotFound):
"""The service catalog is empty."""
pass
class NoMatchingPlugin(SdkException):
"""No matching plugins could be created with the provided parameters."""
pass
class InvalidResponse(SdkException):
"""The response from the server is not valid for this request."""
def __init__(self, response):
super(InvalidResponse, self).__init__()
self.response = response

View File

@ -37,9 +37,9 @@ class Session(object):
Handle a session level request.
:param ServiceIdentifier service: Object that identifies service to
:param ServiceFilter service: Object that identifies service to
the authenticator.
:type service: :class:`openstack.auth.service.ServiceIdentifier`
:type service: :class:`openstack.auth.service_filter.ServiceFilter`
:param string path: Path relative to authentictor base url.
:param string method: The http method to use. (eg. 'GET', 'POST').
:param bool authenticate: True if a token should be attached

View File

@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,282 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
TEST_ADMIN_URL = 'http://identity.region1.admin/'
TEST_DOMAIN_ID = '1'
TEST_DOMAIN_NAME = 'aDomain'
TEST_EXPIRES = '2020-01-01 00:00:10.000123+00:00'
TEST_PASS = 'wasspord'
TEST_PROJECT_ID = 'pid'
TEST_PROJECT_NAME = 'pname'
TEST_SUBJECT = 'subjay'
TEST_TOKEN = 'atoken'
TEST_TENANT_ID = 'tid'
TEST_TENANT_NAME = 'tname'
TEST_TRUST_ID = 'trusty'
TEST_USER = 'youzer'
TEST_USER_ID = 'youid'
TEST_SERVICE_CATALOG_V2 = [
{
"endpoints": [{
"adminURL": "http://compute.region2.admin/",
"region": "RegionTwo",
"internalURL": "http://compute.region2.internal/",
"publicURL": "http://compute.region2.public/",
}],
"type": "compute",
"name": "nova2"
}, {
"endpoints": [{
"adminURL": "http://compute.region1.admin/",
"region": "RegionOne",
"internalURL": "http://compute.region1.internal/",
"publicURL": "http://compute.region1.public/",
}],
"type": "compute",
"name": "nova"
}, {
"endpoints": [{
"adminURL": "http://image.region1.admin/",
"region": "RegionOne",
"internalURL": "http://image.region1.internal/",
"publicURL": "http://image.region1.public/",
}],
"type": "image",
"name": "glance"
}, {
"endpoints": [{
"adminURL": TEST_ADMIN_URL,
"region": "RegionOne",
"internalURL": "http://identity.region1.internal/",
"publicURL": "http://identity.region1.public/",
}],
"type": "identity",
"name": "keystone"
}, {
"endpoints": [{
"adminURL": "http://object-store.region1.admin/",
"region": "RegionOne",
"internalURL": "http://object-store.region1.internal/",
"publicURL": "http://object-store.region1.public/",
}],
"type": "object-store",
"name": "swift"
}]
TEST_SERVICE_CATALOG_V2_NORMALIZED = [
{
"endpoints": [{
"interface": "admin",
"region": "RegionTwo",
"url": "http://compute.region2.admin/",
}, {
"interface": "internal",
"region": "RegionTwo",
"url": "http://compute.region2.internal/",
}, {
"interface": "public",
"region": "RegionTwo",
"url": "http://compute.region2.public/",
}],
"type": "compute",
"name": "nova2"
}, {
"endpoints": [{
"interface": "admin",
"region": "RegionOne",
"url": "http://compute.region1.admin/",
}, {
"interface": "internal",
"region": "RegionOne",
"url": "http://compute.region1.internal/",
}, {
"interface": "public",
"region": "RegionOne",
"url": "http://compute.region1.public/",
}],
"type": "compute",
"name": "nova"
}, {
"endpoints": [{
"interface": "admin",
"region": "RegionOne",
"url": "http://image.region1.admin/",
}, {
"interface": "internal",
"region": "RegionOne",
"url": "http://image.region1.internal/",
}, {
"interface": "public",
"region": "RegionOne",
"url": "http://image.region1.public/",
}],
"type": "image",
"name": "glance"
}, {
"endpoints": [{
"interface": "admin",
"region": "RegionOne",
"url": TEST_ADMIN_URL,
}, {
"interface": "internal",
"region": "RegionOne",
"url": "http://identity.region1.internal/",
}, {
"interface": "public",
"region": "RegionOne",
"url": "http://identity.region1.public/",
}],
"type": "identity",
"name": "keystone"
}, {
"endpoints": [{
"interface": "admin",
"region": "RegionOne",
"url": "http://object-store.region1.admin/",
}, {
"interface": "internal",
"region": "RegionOne",
"url": "http://object-store.region1.internal/",
}, {
"interface": "public",
"region": "RegionOne",
"url": "http://object-store.region1.public/",
}],
"type": "object-store",
"name": "swift"
}]
TEST_RESPONSE_DICT_V2 = {
"access": {
"token": {
"expires": TEST_EXPIRES,
"id": TEST_TOKEN,
"tenant": {
"id": TEST_TENANT_ID
},
},
"user": {
"id": TEST_USER_ID
},
"serviceCatalog": TEST_SERVICE_CATALOG_V2,
},
}
TEST_SERVICE_CATALOG_V3 = [
{
"endpoints": [{
"url": "http://compute.region2.public/",
"region": "RegionTwo",
"interface": "public"
}, {
"url": "http://compute.region2.internal/",
"region": "RegionTwo",
"interface": "internal"
}, {
"url": "http://compute.region2.admin/",
"region": "RegionTwo",
"interface": "admin"
}],
"type": "compute",
"name": "nova2",
}, {
"endpoints": [{
"url": "http://compute.region1.public/",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://compute.region1.internal/",
"region": "RegionOne",
"interface": "internal"
}, {
"url": "http://compute.region1.admin/",
"region": "RegionOne",
"interface": "admin"
}],
"type": "compute",
"name": "nova",
}, {
"endpoints": [{
"url": "http://image.region1.public/",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://image.region1.internal/",
"region": "RegionOne",
"interface": "internal"
}, {
"url": "http://image.region1.admin/",
"region": "RegionOne",
"interface": "admin"
}],
"type": "image",
"name": "glance"
}, {
"endpoints": [{
"url": "http://identity.region1.public/",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://identity.region1.internal/",
"region": "RegionOne",
"interface": "internal"
}, {
"url": "http://identity.region1.admin/",
"region": "RegionOne",
"interface": "admin"
}],
"type": "identity"
}, {
"endpoints": [{
"url": "http://object-store.region1.public/",
"region": "RegionOne",
"interface": "public"
}, {
"url": "http://object-store.region1.internal/",
"region": "RegionOne",
"interface": "internal"
}, {
"url": "http://object-store.region1.admin/",
"region": "RegionOne",
"interface": "admin"
}],
"type": "object-store"
}]
TEST_RESPONSE_DICT_V3 = {
"token": {
"methods": [
"token",
"password"
],
"expires_at": TEST_EXPIRES,
"project": {
"domain": {
"id": TEST_DOMAIN_ID,
"name": TEST_DOMAIN_NAME
},
"id": TEST_PROJECT_ID,
"name": TEST_PROJECT_NAME
},
"user": {
"domain": {
"id": TEST_DOMAIN_ID,
"name": TEST_DOMAIN_NAME
},
"id": TEST_USER_ID,
"name": TEST_USER
},
"issued_at": "2013-05-29T16:55:21.468960Z",
"catalog": TEST_SERVICE_CATALOG_V3
},
}

View File

@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,123 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from openstack.auth.identity import v2
from openstack import exceptions
from openstack.tests.auth import common
TEST_URL = 'http://127.0.0.1:5000/v2.0'
TEST_SERVICE_CATALOG = common.TEST_SERVICE_CATALOG_V2
TEST_RESPONSE_DICT = common.TEST_RESPONSE_DICT_V2
class TestV2Auth(testtools.TestCase):
def test_password(self):
kargs = {'trust_id': common.TEST_TRUST_ID,
'tenant_id': common.TEST_TENANT_ID,
'tenant_name': common.TEST_TENANT_NAME}
sot = v2.Password(TEST_URL, common.TEST_USER, common.TEST_PASS,
**kargs)
self.assertEqual(common.TEST_USER, sot.username)
self.assertEqual(common.TEST_PASS, sot.password)
self.assertEqual(common.TEST_TRUST_ID, sot.trust_id)
self.assertEqual(common.TEST_TENANT_ID, sot.tenant_id)
self.assertEqual(common.TEST_TENANT_NAME, sot.tenant_name)
expected = {'passwordCredentials': {'password': common.TEST_PASS,
'username': common.TEST_USER}}
self.assertEqual(expected, sot.get_auth_data())
def test_token(self):
kargs = {'trust_id': common.TEST_TRUST_ID,
'tenant_id': common.TEST_TENANT_ID,
'tenant_name': common.TEST_TENANT_NAME}
sot = v2.Token(TEST_URL, common.TEST_TOKEN, **kargs)
self.assertEqual(common.TEST_TOKEN, sot.token)
self.assertEqual(common.TEST_TRUST_ID, sot.trust_id)
self.assertEqual(common.TEST_TENANT_ID, sot.tenant_id)
self.assertEqual(common.TEST_TENANT_NAME, sot.tenant_name)
expected = {'token': {'id': common.TEST_TOKEN}}
self.assertEqual(expected, sot.get_auth_data())
def create_mock_transport(self, xresp):
transport = mock.Mock()
transport.post = mock.Mock()
response = mock.Mock()
response.json = mock.Mock()
response.json.return_value = xresp
transport.post.return_value = response
return transport
def test_authorize_tenant_id(self):
kargs = {'trust_id': common.TEST_TRUST_ID,
'tenant_id': common.TEST_TENANT_ID,
'tenant_name': common.TEST_TENANT_NAME}
sot = v2.Token(TEST_URL, common.TEST_TOKEN, **kargs)
xport = self.create_mock_transport(TEST_RESPONSE_DICT)
resp = sot.authorize(xport)
eurl = TEST_URL + '/tokens'
eheaders = {'Accept': 'application/json',
'X-Auth-Token': common.TEST_TOKEN}
ejson = {'auth': {'token': {'id': common.TEST_TOKEN},
'trust_id': common.TEST_TRUST_ID,
'tenantId': common.TEST_TENANT_ID}}
xport.post.assert_called_with(eurl, headers=eheaders, json=ejson)
ecatalog = TEST_RESPONSE_DICT['access'].copy()
ecatalog['version'] = 'v2.0'
self.assertEqual(ecatalog, resp._info)
def test_authorize_tenant_name(self):
kargs = {'tenant_name': common.TEST_TENANT_NAME}
sot = v2.Token(TEST_URL, common.TEST_TOKEN, **kargs)
xport = self.create_mock_transport(TEST_RESPONSE_DICT)
resp = sot.authorize(xport)
eurl = TEST_URL + '/tokens'
eheaders = {'Accept': 'application/json',
'X-Auth-Token': common.TEST_TOKEN}
ejson = {'auth': {'token': {'id': common.TEST_TOKEN},
'tenantName': common.TEST_TENANT_NAME}}
xport.post.assert_called_with(eurl, headers=eheaders, json=ejson)
ecatalog = TEST_RESPONSE_DICT['access'].copy()
ecatalog['version'] = 'v2.0'
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_only(self):
sot = v2.Token(TEST_URL, common.TEST_TOKEN)
xport = self.create_mock_transport(TEST_RESPONSE_DICT)
resp = sot.authorize(xport)
eurl = TEST_URL + '/tokens'
eheaders = {'Accept': 'application/json',
'X-Auth-Token': common.TEST_TOKEN}
ejson = {'auth': {'token': {'id': common.TEST_TOKEN}}}
xport.post.assert_called_with(eurl, headers=eheaders, json=ejson)
ecatalog = TEST_RESPONSE_DICT['access'].copy()
ecatalog['version'] = 'v2.0'
self.assertEqual(ecatalog, resp._info)
def test_authorize_bad_response(self):
sot = v2.Token(TEST_URL, common.TEST_TOKEN)
xport = self.create_mock_transport({})
self.assertRaises(exceptions.InvalidResponse, sot.authorize, xport)

View File

@ -0,0 +1,320 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from openstack.auth.identity import v3
from openstack import exceptions
from openstack.tests.auth import common
TEST_URL = 'http://127.0.0.1:5000/v3.0'
class TestV3Auth(testtools.TestCase):
def test_password_user_domain(self):
kargs = {'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME}
method = v3.PasswordMethod(username=common.TEST_USER,
user_id=common.TEST_USER_ID,
user_domain_id=common.TEST_DOMAIN_ID,
user_domain_name=common.TEST_DOMAIN_NAME,
password=common.TEST_PASS)
sot = v3.Auth(TEST_URL, [method], **kargs)
self.assertEqual(1, len(sot.auth_methods))
auther = sot.auth_methods[0]
self.assertEqual(common.TEST_USER_ID, auther.user_id)
self.assertEqual(common.TEST_USER, auther.username)
self.assertEqual(common.TEST_DOMAIN_ID, auther.user_domain_id)
self.assertEqual(common.TEST_DOMAIN_NAME, auther.user_domain_name)
self.assertEqual(common.TEST_PASS, auther.password)
expected = ('password', {'user': {'id': common.TEST_USER_ID,
'password': common.TEST_PASS}})
self.assertEqual(expected, auther.get_auth_data(None, None, {}))
self.assertEqual(common.TEST_TRUST_ID, sot.trust_id)
self.assertEqual(None, sot.domain_id)
self.assertEqual(None, sot.domain_name)
self.assertEqual(common.TEST_PROJECT_ID, sot.project_id)
self.assertEqual(common.TEST_PROJECT_NAME, sot.project_name)
self.assertEqual(None, sot.project_domain_id)
self.assertEqual(None, sot.project_domain_name)
self.assertEqual(TEST_URL + '/auth/tokens', sot.token_url)
def test_password_domain(self):
kargs = {'domain_id': common.TEST_DOMAIN_ID,
'domain_name': common.TEST_DOMAIN_NAME,
'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME}
methods = [v3.PasswordMethod(username=common.TEST_USER,
user_id=common.TEST_USER_ID,
password=common.TEST_PASS)]
sot = v3.Auth(TEST_URL, methods, **kargs)
self.assertEqual(1, len(sot.auth_methods))
auther = sot.auth_methods[0]
self.assertEqual(common.TEST_USER_ID, auther.user_id)
self.assertEqual(common.TEST_USER, auther.username)
self.assertEqual(None, auther.user_domain_id)
self.assertEqual(None, auther.user_domain_name)
self.assertEqual(common.TEST_PASS, auther.password)
expected = ('password', {'user': {'id': common.TEST_USER_ID,
'password': common.TEST_PASS}})
self.assertEqual(expected, auther.get_auth_data(None, None, {}))
self.assertEqual(common.TEST_TRUST_ID, sot.trust_id)
self.assertEqual(common.TEST_DOMAIN_ID, sot.domain_id)
self.assertEqual(common.TEST_DOMAIN_NAME, sot.domain_name)
self.assertEqual(common.TEST_PROJECT_ID, sot.project_id)
self.assertEqual(common.TEST_PROJECT_NAME, sot.project_name)
self.assertEqual(None, sot.project_domain_id)
self.assertEqual(None, sot.project_domain_name)
self.assertEqual(TEST_URL + '/auth/tokens', sot.token_url)
def test_token_project_domain(self):
kargs = {'project_domain_id': common.TEST_DOMAIN_ID,
'project_domain_name': common.TEST_DOMAIN_NAME,
'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
self.assertEqual(1, len(sot.auth_methods))
auther = sot.auth_methods[0]
self.assertEqual(common.TEST_TOKEN, auther.token)
expected = ('token', {'id': common.TEST_TOKEN})
self.assertEqual(expected, auther.get_auth_data(None, None, {}))
self.assertEqual(common.TEST_TRUST_ID, sot.trust_id)
self.assertEqual(None, sot.domain_id)
self.assertEqual(None, sot.domain_name)
self.assertEqual(common.TEST_PROJECT_ID, sot.project_id)
self.assertEqual(common.TEST_PROJECT_NAME, sot.project_name)
self.assertEqual(common.TEST_DOMAIN_ID, sot.project_domain_id)
self.assertEqual(common.TEST_DOMAIN_NAME, sot.project_domain_name)
self.assertEqual(TEST_URL + '/auth/tokens', sot.token_url)
def create_mock_transport(self, xresp):
transport = mock.Mock()
transport.post = mock.Mock()
response = mock.Mock()
response.json = mock.Mock()
response.json.return_value = xresp
response.headers = {'X-Subject-Token': common.TEST_SUBJECT}
transport.post.return_value = response
return transport
def test_authorize_token(self):
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
eurl = TEST_URL + '/auth/tokens'
eheaders = {'Accept': 'application/json',
'X-Auth-Token': common.TEST_TOKEN}
ejson = {'auth': {'identity': {'token': {'id': common.TEST_TOKEN},
'methods': ['token']}}}
xport.post.assert_called_with(eurl, headers=eheaders, json=ejson)
ecatalog = common.TEST_RESPONSE_DICT_V3['token'].copy()
ecatalog['auth_token'] = common.TEST_SUBJECT
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_domain_id(self):
kargs = {'domain_id': common.TEST_DOMAIN_ID}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
eurl = TEST_URL + '/auth/tokens'
eheaders = {'Accept': 'application/json',
'X-Auth-Token': common.TEST_TOKEN}
ejson = {'auth': {'identity': {'token': {'id': common.TEST_TOKEN},
'methods': ['token']},
'scope': {'domain': {'id': common.TEST_DOMAIN_ID}}}}
xport.post.assert_called_with(eurl, headers=eheaders, json=ejson)
ecatalog = common.TEST_RESPONSE_DICT_V3['token'].copy()
ecatalog['auth_token'] = common.TEST_SUBJECT
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_domain_name(self):
kargs = {'domain_name': common.TEST_DOMAIN_NAME}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
eurl = TEST_URL + '/auth/tokens'
eheaders = {'Accept': 'application/json',
'X-Auth-Token': common.TEST_TOKEN}
scope = {'domain': {'name': common.TEST_DOMAIN_NAME}}
ejson = {'auth': {'identity': {'token': {'id': common.TEST_TOKEN},
'methods': ['token']},
'scope': scope}}
xport.post.assert_called_with(eurl, headers=eheaders, json=ejson)
ecatalog = common.TEST_RESPONSE_DICT_V3['token'].copy()
ecatalog['auth_token'] = common.TEST_SUBJECT
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_project_id(self):
kargs = {'project_id': common.TEST_PROJECT_ID}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
eurl = TEST_URL + '/auth/tokens'
eheaders = {'Accept': 'application/json',
'X-Auth-Token': common.TEST_TOKEN}
scope = {'project': {'id': common.TEST_PROJECT_ID}}
ejson = {'auth': {'identity': {'token': {'id': common.TEST_TOKEN},
'methods': ['token']},
'scope': scope}}
xport.post.assert_called_with(eurl, headers=eheaders, json=ejson)
ecatalog = common.TEST_RESPONSE_DICT_V3['token'].copy()
ecatalog['auth_token'] = common.TEST_SUBJECT
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_project_name(self):
kargs = {'project_name': common.TEST_PROJECT_NAME,
'project_domain_id': common.TEST_DOMAIN_ID}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
eurl = TEST_URL + '/auth/tokens'
eheaders = {'Accept': 'application/json',
'X-Auth-Token': common.TEST_TOKEN}
domain = {'domain': {'id': common.TEST_DOMAIN_ID},
'name': common.TEST_PROJECT_NAME}
scope = {'project': domain}
ejson = {'auth': {'identity': {'methods': ['token'],
'token': {'id': common.TEST_TOKEN}},
'scope': scope}}
xport.post.assert_called_with(eurl, headers=eheaders, json=ejson)
ecatalog = common.TEST_RESPONSE_DICT_V3['token'].copy()
ecatalog['auth_token'] = common.TEST_SUBJECT
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_project_name_domain_name(self):
kargs = {'project_name': common.TEST_PROJECT_NAME,
'project_domain_name': common.TEST_DOMAIN_NAME}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
eurl = TEST_URL + '/auth/tokens'
eheaders = {'Accept': 'application/json',
'X-Auth-Token': common.TEST_TOKEN}
domain = {'domain': {'name': common.TEST_DOMAIN_NAME},
'name': common.TEST_PROJECT_NAME}
scope = {'project': domain}
ejson = {'auth': {'identity': {'methods': ['token'],
'token': {'id': common.TEST_TOKEN}},
'scope': scope}}
xport.post.assert_called_with(eurl, headers=eheaders, json=ejson)
ecatalog = common.TEST_RESPONSE_DICT_V3['token'].copy()
ecatalog['auth_token'] = common.TEST_SUBJECT
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_trust_id(self):
kargs = {'trust_id': common.TEST_TRUST_ID}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
eurl = TEST_URL + '/auth/tokens'
eheaders = {'Accept': 'application/json',
'X-Auth-Token': common.TEST_TOKEN}
scope = {'OS-TRUST:trust': {'id': common.TEST_TRUST_ID}}
ejson = {'auth': {'identity': {'token': {'id': common.TEST_TOKEN},
'methods': ['token']},
'scope': scope}}
xport.post.assert_called_with(eurl, headers=eheaders, json=ejson)
ecatalog = common.TEST_RESPONSE_DICT_V3['token'].copy()
ecatalog['auth_token'] = common.TEST_SUBJECT
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_multi_method(self):
methods = [v3.PasswordMethod(username=common.TEST_USER,
password=common.TEST_PASS),
v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
eurl = TEST_URL + '/auth/tokens'
eheaders = {'Accept': 'application/json',
'X-Auth-Token': common.TEST_TOKEN}
up = {'password': common.TEST_PASS, 'name': common.TEST_USER}
ejson = {'auth': {'identity': {'token': {'id': common.TEST_TOKEN},
'password': {'user': up},
'methods': ['password', 'token']}}}
xport.post.assert_called_with(eurl, headers=eheaders, json=ejson)
ecatalog = common.TEST_RESPONSE_DICT_V3['token'].copy()
ecatalog['auth_token'] = common.TEST_SUBJECT
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_mutually_exclusive(self):
x = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
kargs = {'domain_id': 'a',
'project_id': 'b'}
sot = v3.Auth(TEST_URL, methods, **kargs)
self.assertRaises(exceptions.AuthorizationFailure, sot.authorize, x)
kargs = {'domain_name': 'a',
'project_name': 'b'}
sot = v3.Auth(TEST_URL, methods, **kargs)
self.assertRaises(exceptions.AuthorizationFailure, sot.authorize, x)
kargs = {'domain_name': 'a',
'trust_id': 'b'}
sot = v3.Auth(TEST_URL, methods, **kargs)
self.assertRaises(exceptions.AuthorizationFailure, sot.authorize, x)
kargs = {'project_id': 'a',
'trust_id': 'b'}
sot = v3.Auth(TEST_URL, methods, **kargs)
self.assertRaises(exceptions.AuthorizationFailure, sot.authorize, x)
def test_authorize_bad_response(self):
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods)
xport = self.create_mock_transport({})
self.assertRaises(exceptions.InvalidResponse, sot.authorize, xport)

View File

@ -0,0 +1,141 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json as jsonutils
import logging
import sys
import time
import fixtures
import httpretty
import mock
import requests
import six
from six.moves.urllib import parse as urlparse
import testtools
class TestCase(testtools.TestCase):
TEST_DOMAIN_ID = '1'
TEST_DOMAIN_NAME = 'aDomain'
TEST_TENANT_ID = '1'
TEST_TENANT_NAME = 'aTenant'
TEST_TOKEN = 'aToken'
TEST_TRUST_ID = 'aTrust'
TEST_USER = 'test'
TEST_ROOT_URL = 'http://127.0.0.1:5000/'
def setUp(self):
super(TestCase, self).setUp()
self.logger = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
self.time_patcher = mock.patch.object(time, 'time', lambda: 1234)
self.time_patcher.start()
def tearDown(self):
self.time_patcher.stop()
super(TestCase, self).tearDown()
def stub_url(self, method, parts=None, base_url=None, json=None, **kwargs):
if not base_url:
base_url = self.TEST_URL
if json:
kwargs['body'] = jsonutils.dumps(json)
kwargs['content_type'] = 'application/json'
if parts:
url = '/'.join([p.strip('/') for p in [base_url] + parts])
else:
url = base_url
httpretty.register_uri(method, url, **kwargs)
def assertRequestBodyIs(self, body=None, json=None):
last_request_body = httpretty.last_request().body
if six.PY3:
last_request_body = last_request_body.decode('utf-8')
if json:
val = jsonutils.loads(last_request_body)
self.assertEqual(json, val)
elif body:
self.assertEqual(body, last_request_body)
def assertQueryStringIs(self, qs=''):
"""Verify the QueryString matches what is expected.
The qs parameter should be of the format \'foo=bar&abc=xyz\'
"""
expected = urlparse.parse_qs(qs)
self.assertEqual(expected, httpretty.last_request().querystring)
def assertQueryStringContains(self, **kwargs):
qs = httpretty.last_request().querystring
for k, v in six.iteritems(kwargs):
self.assertIn(k, qs)
self.assertIn(v, qs[k])
def assertRequestHeaderEqual(self, name, val):
"""Verify that the last request made contains a header and its value
The request must have already been made and httpretty must have been
activated for the request.
"""
headers = httpretty.last_request().headers
self.assertEqual(headers.get(name), val)
if tuple(sys.version_info)[0:2] < (2, 7):
def assertDictEqual(self, d1, d2, msg=None):
# Simple version taken from 2.7
self.assertIsInstance(d1, dict,
'First argument is not a dictionary')
self.assertIsInstance(d2, dict,
'Second argument is not a dictionary')
if d1 != d2:
if msg:
self.fail(msg)
else:
standardMsg = '%r != %r' % (d1, d2)
self.fail(standardMsg)
TestCase.assertDictEqual = assertDictEqual
class TestResponse(requests.Response):
"""Class used to wrap requests.Response and provide some
convenience to initialize with a dict.
"""
def __init__(self, data):
self._text = None
super(TestResponse, self).__init__()
if isinstance(data, dict):
self.status_code = data.get('status_code', 200)
headers = data.get('headers')
if headers:
self.headers.update(headers)
# Fake the text attribute to streamline Response creation
# _content is defined by requests.Response
self._content = data.get('text')
else:
self.status_code = data
def __eq__(self, other):
return self.__dict__ == other.__dict__
@property
def text(self):
return self.content

View File

@ -0,0 +1,88 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from openstack.auth import access
from openstack.tests.auth import common
class TestAccessInfo(testtools.TestCase):
def test_is_valid(self):
v2body = common.TEST_RESPONSE_DICT_V2
v3body = common.TEST_RESPONSE_DICT_V3
self.assertTrue(access.AccessInfoV2.is_valid(v2body))
self.assertFalse(access.AccessInfoV2.is_valid(v3body))
self.assertFalse(access.AccessInfoV3.is_valid(v2body))
self.assertTrue(access.AccessInfoV3.is_valid(v3body))
def test_factory_v2(self):
sot = access.AccessInfo.factory(body=common.TEST_RESPONSE_DICT_V2)
self.assertTrue(isinstance(sot, access.AccessInfoV2))
self.assertFalse(sot.will_expire_soon())
self.assertTrue(sot.has_service_catalog())
self.assertEqual(common.TEST_TOKEN, sot.auth_token)
self.assertEqual(common.TEST_EXPIRES, str(sot.expires))
self.assertEqual(None, sot.username)
self.assertEqual(common.TEST_USER_ID, sot.user_id)
self.assertEqual('default', sot.user_domain_id)
self.assertEqual('Default', sot.user_domain_name)
self.assertEqual([], sot.role_names)
self.assertEqual(None, sot.domain_id)
self.assertEqual(None, sot.domain_name)
self.assertEqual(None, sot.project_name)
self.assertEqual(None, sot.tenant_name)
self.assertTrue(sot.project_scoped)
self.assertFalse(sot.domain_scoped)
self.assertEqual(None, sot.trust_id)
self.assertFalse(sot.trust_scoped)
self.assertEqual(common.TEST_TENANT_ID, sot.project_id)
self.assertEqual(common.TEST_TENANT_ID, sot.tenant_id)
self.assertEqual('default', sot.project_domain_id)
self.assertEqual('Default', sot.project_domain_name)
self.assertEqual('v2.0', sot.version)
def test_factory_v3(self):
response = mock.Mock()
response.headers = {'X-Subject-Token': common.TEST_TOKEN}
sot = access.AccessInfo.factory(body=common.TEST_RESPONSE_DICT_V3,
resp=response)
self.assertTrue(isinstance(sot, access.AccessInfoV3))
self.assertFalse(sot.will_expire_soon())
self.assertTrue(sot.has_service_catalog())
self.assertEqual(common.TEST_TOKEN, sot.auth_token)
self.assertEqual(common.TEST_EXPIRES, str(sot.expires))
self.assertEqual(common.TEST_USER, sot.username)
self.assertEqual(common.TEST_USER_ID, sot.user_id)
self.assertEqual(common.TEST_DOMAIN_ID, sot.user_domain_id)
self.assertEqual(common.TEST_DOMAIN_NAME, sot.user_domain_name)
self.assertEqual([], sot.role_names)
self.assertEqual(None, sot.domain_id)
self.assertEqual(None, sot.domain_name)
self.assertEqual(common.TEST_PROJECT_NAME, sot.project_name)
self.assertEqual(common.TEST_PROJECT_NAME, sot.tenant_name)
self.assertTrue(sot.project_scoped)
self.assertFalse(sot.domain_scoped)
self.assertEqual(None, sot.trust_id)
self.assertFalse(sot.trust_scoped)
self.assertEqual(common.TEST_PROJECT_ID, sot.project_id)
self.assertEqual(common.TEST_PROJECT_ID, sot.tenant_id)
self.assertEqual(common.TEST_DOMAIN_ID, sot.project_domain_id)
self.assertEqual(common.TEST_DOMAIN_NAME, sot.project_domain_name)
self.assertEqual('v3', sot.version)
def test_factory_raises(self):
self.assertRaises(NotImplementedError, access.AccessInfo.factory,
body={})

View File

@ -0,0 +1,105 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from openstack.auth import service_catalog as catalog
from openstack.auth import service_filter
from openstack import exceptions as exc
from openstack.tests.auth import common
class TestServiceCatalog(testtools.TestCase):
def get_urls(self, sot):
sf = service_filter.ServiceFilter('compute')
exp = ["http://compute.region2.public/",
"http://compute.region1.public/"]
self.assertEqual(exp, sot.get_urls(sf))
sf = service_filter.ServiceFilter('image')
self.assertEqual(["http://image.region1.public/"], sot.get_urls(sf))
sf = service_filter.ServiceFilter('identity')
self.assertEqual(["http://identity.region1.public/"], sot.get_urls(sf))
sf = service_filter.ServiceFilter('object-store')
self.assertEqual(["http://object-store.region1.public/"],
sot.get_urls(sf))
def get_urls_name(self, sot):
sf = service_filter.ServiceFilter('compute', service_name='nova')
self.assertEqual(["http://compute.region1.public/"], sot.get_urls(sf))
sf = service_filter.ServiceFilter('compute', service_name='nova2')
self.assertEqual(["http://compute.region2.public/"], sot.get_urls(sf))
def get_urls_region(self, sot):
sf = service_filter.ServiceFilter('compute', region='RegionTwo')
self.assertEqual(["http://compute.region2.public/"], sot.get_urls(sf))
sf = service_filter.ServiceFilter('compute', region='RegionOne')
self.assertEqual(["http://compute.region1.public/"], sot.get_urls(sf))
def get_urls_visibility(self, sot):
sf = service_filter.ServiceFilter('identity', visibility='admin')
self.assertEqual(["http://identity.region1.admin/"], sot.get_urls(sf))
sf = service_filter.ServiceFilter('identity', visibility='internal')
self.assertEqual(["http://identity.region1.internal/"],
sot.get_urls(sf))
sf = service_filter.ServiceFilter('identity', visibility='public')
self.assertEqual(["http://identity.region1.public/"], sot.get_urls(sf))
class TestServiceCatalogV2(TestServiceCatalog):
def test_catalog(self):
sot = catalog.ServiceCatalogV2(common.TEST_SERVICE_CATALOG_V2)
self.assertEqual(common.TEST_SERVICE_CATALOG_V2_NORMALIZED,
sot.catalog)
def test_catalog_empty(self):
self.assertRaises(exc.EmptyCatalog, catalog.ServiceCatalogV2, None)
def test_get_urls(self):
sot = catalog.ServiceCatalogV2(common.TEST_SERVICE_CATALOG_V2)
self.get_urls(sot)
def test_get_urls_name(self):
sot = catalog.ServiceCatalogV2(common.TEST_SERVICE_CATALOG_V2)
self.get_urls_name(sot)
def test_get_urls_region(self):
sot = catalog.ServiceCatalogV2(common.TEST_SERVICE_CATALOG_V2)
self.get_urls_region(sot)
def test_get_urls_visibility(self):
sot = catalog.ServiceCatalogV2(common.TEST_SERVICE_CATALOG_V2)
self.get_urls_visibility(sot)
class TestServiceCatalogV3(TestServiceCatalog):
def test_catalog(self):
sot = catalog.ServiceCatalog(common.TEST_SERVICE_CATALOG_V3)
self.assertEqual(common.TEST_SERVICE_CATALOG_V3, sot.catalog)
def test_catalog_empty(self):
self.assertRaises(exc.EmptyCatalog, catalog.ServiceCatalog, None)
def test_get_urls(self):
sot = catalog.ServiceCatalog(common.TEST_SERVICE_CATALOG_V3)
self.get_urls(sot)
def test_get_urls_name(self):
sot = catalog.ServiceCatalog(common.TEST_SERVICE_CATALOG_V3)
self.get_urls_name(sot)
def test_get_urls_region(self):
sot = catalog.ServiceCatalog(common.TEST_SERVICE_CATALOG_V3)
self.get_urls_region(sot)
def test_get_urls_visibility(self):
sot = catalog.ServiceCatalog(common.TEST_SERVICE_CATALOG_V3)
self.get_urls_visibility(sot)

View File

@ -0,0 +1,87 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
import testtools
from openstack.auth import service_filter as filt
from openstack import exceptions
class TestServiceFilter(testtools.TestCase):
def test_minimum(self):
sot = filt.ServiceFilter('identity')
self.assertEqual("service_type=identity,visibility=public",
six.text_type(sot))
def test_maximum(self):
sot = filt.ServiceFilter('compute', visibility='admin', region='b',
service_name='c')
exp = "service_type=compute,visibility=admin,region=b,service_name=c"
self.assertEqual(exp, six.text_type(sot))
def test_visibility(self):
sot = filt.ServiceFilter('identity', visibility='public')
self.assertEqual("service_type=identity,visibility=public",
six.text_type(sot))
sot = filt.ServiceFilter('identity', visibility='internal')
self.assertEqual("service_type=identity,visibility=internal",
six.text_type(sot))
sot = filt.ServiceFilter('identity', visibility='admin')
self.assertEqual("service_type=identity,visibility=admin",
six.text_type(sot))
sot = filt.ServiceFilter('identity', visibility='publicURL')
self.assertEqual("service_type=identity,visibility=public",
six.text_type(sot))
sot = filt.ServiceFilter('identity', visibility='internalURL')
self.assertEqual("service_type=identity,visibility=internal",
six.text_type(sot))
sot = filt.ServiceFilter('identity', visibility='adminURL')
self.assertEqual("service_type=identity,visibility=admin",
six.text_type(sot))
self.assertRaises(exceptions.SdkException,
filt.ServiceFilter, 'identity', visibility='b')
self.assertRaises(exceptions.SdkException, filt.ServiceFilter,
'identity', visibility=None)
self.assertRaises(exceptions.SdkException, filt.ServiceFilter, None)
self.assertRaises(exceptions.SdkException, filt.ServiceFilter, None)
def test_match_service_type(self):
sot = filt.ServiceFilter('identity')
self.assertTrue(sot.match_service_type('identity'))
self.assertFalse(sot.match_service_type('compute'))
def test_match_service_name(self):
sot = filt.ServiceFilter('identity')
self.assertTrue(sot.match_service_name('keystone'))
self.assertTrue(sot.match_service_name('ldap'))
self.assertTrue(sot.match_service_name(None))
sot = filt.ServiceFilter('identity', service_name='keystone')
self.assertTrue(sot.match_service_name('keystone'))
self.assertFalse(sot.match_service_name('ldap'))
self.assertFalse(sot.match_service_name(None))
def test_match_region(self):
sot = filt.ServiceFilter('identity')
self.assertTrue(sot.match_region('East'))
self.assertTrue(sot.match_region('West'))
self.assertTrue(sot.match_region(None))
sot = filt.ServiceFilter('identity', region='East')
self.assertTrue(sot.match_region('East'))
self.assertFalse(sot.match_region('West'))
self.assertFalse(sot.match_region(None))
def test_match_visibility(self):
sot = filt.ServiceFilter('identity', visibility='internal')
self.assertFalse(sot.match_visibility('admin'))
self.assertTrue(sot.match_visibility('internal'))
self.assertFalse(sot.match_visibility('public'))

View File

@ -10,7 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from openstack.auth import service
from openstack.auth import service_filter
from openstack import session
from openstack.tests import base
from openstack.tests import fakes
@ -24,7 +24,7 @@ class TestSession(base.TestCase):
super(TestSession, self).setUp()
self.xport = fakes.FakeTransport()
self.auth = fakes.FakeAuthenticator()
self.serv = service.ServiceIdentifier('identity')
self.serv = service_filter.ServiceFilter('identity')
self.sess = session.Session(self.xport, self.auth)
self.expected = {'headers': {'X-Auth-Token': self.auth.TOKEN}}

View File

@ -1,4 +1,4 @@
pbr>=0.6,!=0.7,<1.0
Babel>=1.3
iso8601>=0.1.9
requests>=1.1