Fix keystoneclient and heatclient incompatibility

Previous version of heat client couldn't work correctly
because of keystone auth error in actual use.
So, heat client and keystone client are copied from solum.

Change-Id: I49fef01bcec581f470e05aa82526b31fe47d0adc
This commit is contained in:
OTSUKA, Yuanying 2014-12-25 11:58:07 +09:00 committed by Motohiro OTSUKA
parent fa29190557
commit 3d1bff28ab
9 changed files with 594 additions and 368 deletions

99
magnum/common/clients.py Normal file
View File

@ -0,0 +1,99 @@
# Copyright 2014 - Rackspace Hosting.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heatclient.v1 import client as heatclient
from oslo.config import cfg
from magnum.common import exception
from magnum.common import magnum_keystoneclient
from magnum.openstack.common._i18n import _
from magnum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
heat_client_opts = [
cfg.StrOpt('endpoint_type',
default='publicURL',
help=_(
'Type of endpoint in Identity service catalog to use '
'for communication with the OpenStack service.')),
cfg.StrOpt('ca_file',
help=_('Optional CA cert file to use in SSL connections.')),
cfg.StrOpt('cert_file',
help=_('Optional PEM-formatted certificate chain file.')),
cfg.StrOpt('key_file',
help=_('Optional PEM-formatted file that contains the '
'private key.')),
cfg.BoolOpt('insecure',
default=False,
help=_("If set, then the server's certificate will not "
"be verified."))]
cfg.CONF.register_opts(heat_client_opts, group='heat_client')
class OpenStackClients(object):
"""Convenience class to create and cache client instances."""
def __init__(self, context):
self.context = context
self._keystone = None
self._heat = None
def url_for(self, **kwargs):
return self.keystone().client.service_catalog.url_for(**kwargs)
@property
def auth_url(self):
return self.keystone().v3_endpoint
@property
def auth_token(self):
return self.context.auth_token or self.keystone().auth_token
def keystone(self):
if self._keystone:
return self._keystone
self._keystone = magnum_keystoneclient.KeystoneClientV3(self.context)
return self._keystone
def _get_client_option(self, client, option):
return getattr(getattr(cfg.CONF, '%s_client' % client), option)
@exception.wrap_keystone_exception
def heat(self):
if self._heat:
return self._heat
endpoint_type = self._get_client_option('heat', 'endpoint_type')
endpoint = self.url_for(service_type='orchestration',
endpoint_type=endpoint_type)
args = {
'endpoint': endpoint,
'auth_url': self.auth_url,
'token': self.auth_token,
'username': None,
'password': None,
'ca_file': self._get_client_option('heat', 'ca_file'),
'cert_file': self._get_client_option('heat', 'cert_file'),
'key_file': self._get_client_option('heat', 'key_file'),
'insecure': self._get_client_option('heat', 'insecure')
}
self._heat = heatclient.Client(**args)
return self._heat

View File

@ -1,75 +0,0 @@
# Copyright 2014 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heatclient import client as heatclient
from oslo.config import cfg
from magnum.common import exception
from magnum.common import keystone
from magnum.openstack.common._i18n import _
from magnum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
heat_client_opts = [
cfg.StrOpt('endpoint_type',
default='publicURL',
help=_(
'Type of endpoint in Identity service catalog to use '
'for communication with the OpenStack service.')),
cfg.StrOpt('ca_file',
help=_('Optional CA cert file to use in SSL connections.')),
cfg.StrOpt('cert_file',
help=_('Optional PEM-formatted certificate chain file.')),
cfg.StrOpt('key_file',
help=_('Optional PEM-formatted file that contains the '
'private key.')),
cfg.BoolOpt('insecure',
default=False,
help=_("If set, then the server's certificate will not "
"be verified."))]
cfg.CONF.register_opts(heat_client_opts, group='heat_client')
cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token',
group='keystone_authtoken')
cfg.CONF.import_opt('auth_version', 'keystonemiddleware.auth_token',
group='keystone_authtoken')
@exception.wrap_keystone_exception
def get_client(context):
endpoint_type = cfg.CONF.heat_client.endpoint_type
auth_url = cfg.CONF.keystone_authtoken.auth_uri
auth_version = cfg.CONF.keystone_authtoken.auth_version
auth_url = keystone.get_keystone_url(auth_url, auth_version)
args = {
'auth_url': auth_url,
'token': context.auth_token,
'username': None,
'password': None,
'ca_file': cfg.CONF.heat_client.ca_file,
'cert_file': cfg.CONF.heat_client.cert_file,
'key_file': cfg.CONF.heat_client.key_file,
'insecure': cfg.CONF.heat_client.insecure
}
endpoint = keystone.get_service_url(service_type='orchestration',
endpoint_type=endpoint_type)
return heatclient.Client('1', endpoint, **args)

View File

@ -1,127 +0,0 @@
# coding=utf-8
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneclient import exceptions as ksexception
# NOTE(deva): import auth_token so oslo.config pulls in keystone_authtoken
from keystonemiddleware import auth_token # noqa
from oslo.config import cfg
from six.moves.urllib import parse
from magnum.common import exception
from magnum.openstack.common._i18n import _
CONF = cfg.CONF
def _is_apiv3(auth_url, auth_version):
"""Checks if V3 version of API is being used or not.
This method inspects auth_url and auth_version, and checks whether V3
version of the API is being used or not.
:param auth_url: a http or https url to be inspected (like
'http://127.0.0.1:9898/').
:param auth_version: a string containing the version (like 'v2', 'v3.0')
:returns: True if V3 of the API is being used.
"""
return auth_version == 'v3.0' or '/v3' in parse.urlparse(auth_url).path
def _get_ksclient(token=None):
auth_url = CONF.keystone_authtoken.auth_uri
if not auth_url:
raise exception.KeystoneFailure(_('Keystone API endpoint is missing'))
auth_version = CONF.keystone_authtoken.auth_version
api_v3 = _is_apiv3(auth_url, auth_version)
if api_v3:
from keystoneclient.v3 import client
else:
from keystoneclient.v2_0 import client
auth_url = get_keystone_url(auth_url, auth_version)
try:
if token:
return client.Client(token=token, auth_url=auth_url)
else:
return client.Client(username=CONF.keystone_authtoken.admin_user,
password=CONF.keystone_authtoken.admin_password,
tenant_name=CONF.keystone_authtoken.admin_tenant_name,
auth_url=auth_url)
except ksexception.Unauthorized:
raise exception.KeystoneUnauthorized()
except ksexception.AuthorizationFailure as err:
raise exception.KeystoneFailure(_('Could not authorize in Keystone:'
' %s') % err)
def get_keystone_url(auth_url, auth_version):
"""Gives an http/https url to contact keystone.
Given an auth_url and auth_version, this method generates the url in
which keystone can be reached.
:param auth_url: a http or https url to be inspected (like
'http://127.0.0.1:9898/').
:param auth_version: a string containing the version (like v2, v3.0, etc)
:returns: a string containing the keystone url
"""
api_v3 = _is_apiv3(auth_url, auth_version)
api_version = 'v3' if api_v3 else 'v2.0'
# NOTE(lucasagomes): Get rid of the trailing '/' otherwise urljoin()
# fails to override the version in the URL
return parse.urljoin(auth_url.rstrip('/'), api_version)
def get_service_url(service_type='container', endpoint_type='internal'):
"""Wrapper for get service url from keystone service catalog.
Given a service_type and an endpoint_type, this method queries keystone
service catalog and provides the url for the desired endpoint.
:param service_type: the keystone service for which url is required.
:param endpoint_type: the type of endpoint for the service.
:returns: an http/https url for the desired endpoint.
"""
ksclient = _get_ksclient()
if not ksclient.has_service_catalog():
raise exception.KeystoneFailure(_('No Keystone service catalog '
'loaded'))
try:
endpoint = ksclient.service_catalog.url_for(service_type=service_type,
endpoint_type=endpoint_type)
except ksexception.EndpointNotFound:
raise exception.CatalogNotFound(service_type=service_type,
endpoint_type=endpoint_type)
return endpoint
def get_admin_auth_token():
"""Get an admin auth_token from the Keystone."""
ksclient = _get_ksclient()
return ksclient.auth_token
def token_expires_soon(token, duration=None):
"""Determines if token expiration is about to occur.
:param duration: time interval in seconds
:returns: boolean : true if expiration is within the given duration
"""
ksclient = _get_ksclient(token=token)
return ksclient.auth_ref.will_expire_soon(stale_duration=duration)

View File

@ -0,0 +1,196 @@
# Copyright 2014 - Rackspace Hosting.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import keystoneclient.exceptions as kc_exception
from keystoneclient.v3 import client as kc_v3
from oslo.config import cfg
from oslo.utils import importutils
from magnum.common import context
from magnum.common import exception
from magnum.openstack.common._i18n import _
from magnum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
trust_opts = [
cfg.ListOpt('trusts_delegated_roles',
default=['magnum_assembly_update'],
help=_('Subset of trustor roles to be delegated to magnum.')),
]
cfg.CONF.register_opts(trust_opts)
cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token',
group='keystone_authtoken')
class KeystoneClientV3(object):
"""Keystone client wrapper so we can encapsulate logic in one place."""
def __init__(self, context):
# If a trust_id is specified in the context, we immediately
# authenticate so we can populate the context with a trust token
# otherwise, we delay client authentication until needed to avoid
# unnecessary calls to keystone.
#
# Note that when you obtain a token using a trust, it cannot be
# used to reauthenticate and get another token, so we have to
# get a new trust-token even if context.auth_token is set.
#
# - context.auth_url is expected to contain a versioned keystone
# path, we will work with either a v2.0 or v3 path
self.context = context
self._client = None
self._admin_client = None
if self.context.auth_url:
self.v3_endpoint = self.context.auth_url.replace('v2.0', 'v3')
else:
# Import auth_token to have keystone_authtoken settings setup.
importutils.import_module('keystonemiddleware.auth_token')
self.v3_endpoint = cfg.CONF.keystone_authtoken.auth_uri.replace(
'v2.0', 'v3')
if self.context.trust_id:
# Create a client with the specified trust_id, this
# populates self.context.auth_token with a trust-scoped token
self._client = self._v3_client_init()
@property
def client(self):
if not self._client:
# Create connection to v3 API
self._client = self._v3_client_init()
return self._client
@property
def admin_client(self):
if not self._admin_client:
# Create admin client connection to v3 API
admin_creds = self._service_admin_creds()
c = kc_v3.Client(**admin_creds)
if c.authenticate():
self._admin_client = c
else:
LOG.error("Admin client authentication failed")
raise exception.AuthorizationFailure()
return self._admin_client
def _v3_client_init(self):
kwargs = {
'auth_url': self.v3_endpoint,
'endpoint': self.v3_endpoint
}
# Note try trust_id first, as we can't reuse auth_token in that case
if self.context.trust_id is not None:
# We got a trust_id, so we use the admin credentials
# to authenticate with the trust_id so we can use the
# trust impersonating the trustor user.
kwargs.update(self._service_admin_creds())
kwargs['trust_id'] = self.context.trust_id
kwargs.pop('project_name')
elif self.context.auth_token_info is not None:
# The auth_ref version must be set according to the token version
if 'access' in self.context.auth_token_info:
kwargs['auth_ref'] = copy.deepcopy(
self.context.auth_token_info['access'])
kwargs['auth_ref']['version'] = 'v2.0'
kwargs['auth_ref']['token']['id'] = self.context.auth_token
elif 'token' in self.context.auth_token_info:
kwargs['auth_ref'] = copy.deepcopy(
self.context.auth_token_info['token'])
kwargs['auth_ref']['version'] = 'v3'
kwargs['auth_ref']['auth_token'] = self.context.auth_token
else:
LOG.error("Unknown version in auth_token_info")
raise exception.AuthorizationFailure()
elif self.context.auth_token is not None:
kwargs['token'] = self.context.auth_token
kwargs['project_id'] = self.context.tenant
else:
LOG.error(_("Keystone v3 API connection failed, no password "
"trust or auth_token!"))
raise exception.AuthorizationFailure()
client = kc_v3.Client(**kwargs)
if 'auth_ref' not in kwargs:
client.authenticate()
# If we are authenticating with a trust set the context auth_token
# with the trust scoped token
if 'trust_id' in kwargs:
# Sanity check
if not client.auth_ref.trust_scoped:
LOG.error(_("trust token re-scoping failed!"))
raise exception.AuthorizationFailure()
# All OK so update the context with the token
self.context.auth_token = client.auth_ref.auth_token
self.context.auth_url = self.v3_endpoint
self.context.user = client.auth_ref.user_id
self.context.tenant = client.auth_ref.project_id
self.context.user_name = client.auth_ref.username
return client
def _service_admin_creds(self):
# Import auth_token to have keystone_authtoken settings setup.
importutils.import_module('keystonemiddleware.auth_token')
creds = {
'username': cfg.CONF.keystone_authtoken.admin_user,
'password': cfg.CONF.keystone_authtoken.admin_password,
'auth_url': self.v3_endpoint,
'endpoint': self.v3_endpoint,
'project_name': cfg.CONF.keystone_authtoken.admin_tenant_name}
LOG.info('admin creds %s' % creds)
return creds
def create_trust_context(self):
"""Create a trust using the trustor identity in the current context.
Use the trustee as the magnum service user and return a context
containing the new trust_id.
If the current context already contains a trust_id, we do nothing
and return the current context.
"""
if self.context.trust_id:
return self.context
# We need the service admin user ID (not name), as the trustor user
# can't lookup the ID in keystoneclient unless they're admin
# workaround this by getting the user_id from admin_client
trustee_user_id = self.admin_client.auth_ref.user_id
trustor_user_id = self.client.auth_ref.user_id
trustor_project_id = self.client.auth_ref.project_id
roles = cfg.CONF.trusts_delegated_roles
trust = self.client.trusts.create(trustor_user=trustor_user_id,
trustee_user=trustee_user_id,
project=trustor_project_id,
impersonation=True,
role_names=roles)
trust_context = context.RequestContext.from_dict(
self.context.to_dict())
trust_context.trust_id = trust.id
return trust_context
def delete_trust(self, trust_id):
"""Delete the specified trust."""
try:
self.client.trusts.delete(trust_id)
except kc_exception.NotFound:
pass
@property
def auth_token(self):
return self.client.auth_token

View File

@ -0,0 +1,80 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heatclient.v1 import client as heatclient
import mock
from magnum.common import clients
from magnum.common import exception
from magnum.tests import base
class ClientsTest(base.BaseTestCase):
@mock.patch.object(clients.OpenStackClients, 'keystone')
def test_url_for(self, mock_keystone):
obj = clients.OpenStackClients(None)
obj.url_for(service_type='fake_service', endpoint_type='fake_endpoint')
mock_cat = mock_keystone.return_value.client.service_catalog
mock_cat.url_for.assert_called_once_with(service_type='fake_service',
endpoint_type='fake_endpoint')
@mock.patch.object(heatclient, 'Client')
@mock.patch.object(clients.OpenStackClients, 'url_for')
@mock.patch.object(clients.OpenStackClients, 'auth_url')
def test_clients_heat(self, mock_auth, mock_url, mock_call):
mock_auth.__get__ = mock.Mock(return_value="keystone_url")
con = mock.MagicMock()
con.tenant = "b363706f891f48019483f8bd6503c54b"
con.auth_token = "3bcc3d3a03f44e3d8377f9247b0ad155"
con.auth_url = "keystone_url"
mock_url.return_value = "url_from_keystone"
obj = clients.OpenStackClients(con)
obj._heat = None
obj.heat()
mock_call.assert_called_once_with(
endpoint='url_from_keystone', username=None,
cert_file=None, token='3bcc3d3a03f44e3d8377f9247b0ad155',
auth_url='keystone_url', ca_file=None, key_file=None,
password=None, insecure=False)
mock_url.assert_called_once_with(service_type='orchestration',
endpoint_type='publicURL')
def test_clients_heat_noauth(self):
con = mock.MagicMock()
con.auth_token = None
con.auth_token_info = None
con.tenant = "b363706f891f48019483f8bd6503c54b"
auth_url = mock.PropertyMock(name="auth_url",
return_value="keystone_url")
type(con).auth_url = auth_url
con.get_url_for = mock.Mock(name="get_url_for")
con.get_url_for.return_value = "url_from_keystone"
obj = clients.OpenStackClients(con)
obj._heat = None
self.assertRaises(exception.AuthorizationFailure, obj.heat)
@mock.patch.object(clients.OpenStackClients, 'url_for')
@mock.patch.object(clients.OpenStackClients, 'auth_url')
def test_clients_heat_cached(self, mock_auth, mock_url):
mock_auth.__get__ = mock.Mock(return_value="keystone_url")
con = mock.MagicMock()
con.tenant = "b363706f891f48019483f8bd6503c54b"
con.auth_token = "3bcc3d3a03f44e3d8377f9247b0ad155"
con.auth_url = "keystone_url"
mock_url.return_value = "url_from_keystone"
obj = clients.OpenStackClients(con)
obj._heat = None
heat = obj.heat()
heat_cached = obj.heat()
self.assertEqual(heat, heat_cached)

View File

@ -1,43 +0,0 @@
# Copyright 2014 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heatclient import client as heatclient
import mock
from magnum.common import heat
from magnum.common import keystone
from magnum.tests import base
class HeatTestCase(base.TestCase):
@mock.patch.object(heatclient, 'Client')
@mock.patch.object(keystone, 'get_service_url')
@mock.patch.object(keystone, 'get_keystone_url')
def test_heat(self, mock_auth, mock_url, mock_call):
mock_auth.return_value = "keystone_url"
con = mock.MagicMock()
con.tenant = "b363706f891f48019483f8bd6503c54b"
con.auth_token = "3bcc3d3a03f44e3d8377f9247b0ad155"
con.auth_url = "keystone_url"
mock_url.return_value = "url_from_keystone"
heat.get_client(con)
mock_call.assert_called_once_with(
'1', 'url_from_keystone', username=None,
cert_file=None, token='3bcc3d3a03f44e3d8377f9247b0ad155',
auth_url='keystone_url', ca_file=None, key_file=None,
password=None, insecure=False)
mock_url.assert_called_once_with(service_type='orchestration',
endpoint_type='publicURL')

View File

@ -1,119 +0,0 @@
# -*- encoding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneclient import exceptions as ksexception
import mock
from magnum.common import exception
from magnum.common import keystone
from magnum.tests import base
class FakeCatalog:
def url_for(self, **kwargs):
return 'fake-url'
class FakeClient:
def __init__(self, **kwargs):
self.service_catalog = FakeCatalog()
def has_service_catalog(self):
return True
class KeystoneTestCase(base.TestCase):
def setUp(self):
super(KeystoneTestCase, self).setUp()
self.config(group='keystone_authtoken',
auth_uri='http://127.0.0.1:9898/',
admin_user='fake', admin_password='fake',
admin_tenant_name='fake')
def test_failure_authorization(self):
self.assertRaises(exception.KeystoneFailure, keystone.get_service_url)
@mock.patch.object(FakeCatalog, 'url_for')
@mock.patch('keystoneclient.v2_0.client.Client')
def test_get_url(self, mock_ks, mock_uf):
fake_url = 'http://127.0.0.1:6385'
mock_uf.return_value = fake_url
mock_ks.return_value = FakeClient()
res = keystone.get_service_url()
self.assertEqual(fake_url, res)
@mock.patch.object(FakeCatalog, 'url_for')
@mock.patch('keystoneclient.v2_0.client.Client')
def test_url_not_found(self, mock_ks, mock_uf):
mock_uf.side_effect = ksexception.EndpointNotFound
mock_ks.return_value = FakeClient()
self.assertRaises(exception.CatalogNotFound, keystone.get_service_url)
@mock.patch.object(FakeClient, 'has_service_catalog')
@mock.patch('keystoneclient.v2_0.client.Client')
def test_no_catalog(self, mock_ks, mock_hsc):
mock_hsc.return_value = False
mock_ks.return_value = FakeClient()
self.assertRaises(exception.KeystoneFailure, keystone.get_service_url)
@mock.patch('keystoneclient.v2_0.client.Client')
def test_unauthorized(self, mock_ks):
mock_ks.side_effect = ksexception.Unauthorized
self.assertRaises(exception.KeystoneUnauthorized,
keystone.get_service_url)
def test_get_service_url_fail_missing_auth_uri(self):
self.config(group='keystone_authtoken', auth_uri=None)
self.assertRaises(exception.KeystoneFailure,
keystone.get_service_url)
@mock.patch('keystoneclient.v2_0.client.Client')
def test_get_service_url_versionless_v2(self, mock_ks):
mock_ks.return_value = FakeClient()
self.config(group='keystone_authtoken', auth_uri='http://127.0.0.1')
expected_url = 'http://127.0.0.1/v2.0'
keystone.get_service_url()
mock_ks.assert_called_once_with(username='fake', password='fake',
tenant_name='fake',
auth_url=expected_url)
@mock.patch('keystoneclient.v3.client.Client')
def test_get_service_url_versionless_v3(self, mock_ks):
mock_ks.return_value = FakeClient()
self.config(group='keystone_authtoken', auth_version='v3.0',
auth_uri='http://127.0.0.1')
expected_url = 'http://127.0.0.1/v3'
keystone.get_service_url()
mock_ks.assert_called_once_with(username='fake', password='fake',
tenant_name='fake',
auth_url=expected_url)
@mock.patch('keystoneclient.v2_0.client.Client')
def test_get_service_url_version_override(self, mock_ks):
mock_ks.return_value = FakeClient()
self.config(group='keystone_authtoken',
auth_uri='http://127.0.0.1/v2.0/')
expected_url = 'http://127.0.0.1/v2.0'
keystone.get_service_url()
mock_ks.assert_called_once_with(username='fake', password='fake',
tenant_name='fake',
auth_url=expected_url)
@mock.patch('keystoneclient.v2_0.client.Client')
def test_get_admin_auth_token(self, mock_ks):
fake_client = FakeClient()
fake_client.auth_token = '123456'
mock_ks.return_value = fake_client
self.assertEqual('123456', keystone.get_admin_auth_token())

View File

@ -0,0 +1,217 @@
# Copyright 2014 - Rackspace Hosting.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo.config import cfg
cfg.CONF.import_group('keystone_authtoken',
'keystonemiddleware.auth_token')
import keystoneclient.exceptions as kc_exception # noqa
from magnum.common import exception
from magnum.common import magnum_keystoneclient
from magnum.tests import base
from magnum.tests import utils
@mock.patch('keystoneclient.v3.client.Client')
class KeystoneClientTest(base.BaseTestCase):
"""Test cases for magnum.common.magnum_keystoneclient."""
def setUp(self):
super(KeystoneClientTest, self).setUp()
dummy_url = 'http://server.test:5000/v2.0'
self.ctx = utils.dummy_context()
self.ctx.auth_url = dummy_url
self.ctx.auth_token = 'abcd1234'
self.ctx.auth_token_info = None
cfg.CONF.set_override('auth_uri', dummy_url,
group='keystone_authtoken')
cfg.CONF.set_override('admin_user', 'magnum',
group='keystone_authtoken')
cfg.CONF.set_override('admin_password', 'verybadpass',
group='keystone_authtoken')
cfg.CONF.set_override('admin_tenant_name', 'service',
group='keystone_authtoken')
def test_init_v3_token(self, mock_ks):
"""Test creating the client, token auth."""
self.ctx.tenant = None
self.ctx.trust_id = None
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
magnum_ks_client.client
self.assertIsNotNone(magnum_ks_client._client)
mock_ks.assert_called_once_with(token='abcd1234', project_id=None,
auth_url='http://server.test:5000/v3',
endpoint='http://server.test:5000/v3')
mock_ks.return_value.authenticate.assert_called_once_with()
def test_init_v3_bad_nocreds(self, mock_ks):
"""Test creating the client, no credentials."""
self.ctx.auth_token = None
self.ctx.trust_id = None
self.ctx.username = None
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
self.assertRaises(exception.AuthorizationFailure,
magnum_ks_client._v3_client_init)
def test_init_trust_token_access(self, mock_ks):
"""Test creating the client, token auth."""
self.ctx.tenant = 'abcd1234'
self.ctx.trust_id = None
self.ctx.auth_token_info = {'access': {'token': {'id': 'placeholder'}}}
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
magnum_ks_client.client
self.assertIsNotNone(magnum_ks_client._client)
mock_ks.assert_called_once_with(auth_ref={'version': 'v2.0',
'token': {
'id': 'abcd1234'}},
endpoint='http://server.test:5000/v3',
auth_url='http://server.test:5000/v3')
def test_init_trust_token_token(self, mock_ks):
self.ctx.tenant = None
self.ctx.trust_id = None
self.ctx.auth_token_info = {'token': {}}
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
magnum_ks_client.client
self.assertIsNotNone(magnum_ks_client._client)
mock_ks.assert_called_once_with(auth_ref={'auth_token': 'abcd1234',
'version': 'v3'},
endpoint='http://server.test:5000/v3',
auth_url='http://server.test:5000/v3')
def test_init_trust_token_none(self, mock_ks):
self.ctx.tenant = None
self.ctx.trust_id = None
self.ctx.auth_token_info = {'not_this': 'urg'}
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
self.assertRaises(exception.AuthorizationFailure,
magnum_ks_client._v3_client_init)
def test_create_trust_context_trust_id(self, mock_ks):
"""Test create_trust_context with existing trust_id."""
self.ctx.trust_id = 'atrust123'
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
trust_context = magnum_ks_client.create_trust_context()
self.assertEqual(self.ctx.to_dict(), trust_context.to_dict())
mock_ks.assert_called_once_with(username='magnum',
auth_url='http://server.test:5000/v3',
password='verybadpass',
endpoint='http://server.test:5000/v3',
trust_id='atrust123')
mock_ks.return_value.authenticate.assert_called_once_with()
def test_create_trust_context_trust_create(self, mock_ks):
"""Test create_trust_context when creating a trust."""
class FakeTrust(object):
id = 'atrust123'
cfg.CONF.set_override('trusts_delegated_roles',
['magnum_assembly_update'])
getter_mock = mock.PropertyMock(side_effect=['1234', '5678'])
type(mock_ks.return_value.auth_ref).user_id = getter_mock
mock_ks.return_value.auth_ref.project_id = '42'
mock_ks.return_value.trusts.create.return_value = FakeTrust()
self.ctx.trust_id = None
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
trust_context = magnum_ks_client.create_trust_context()
# admin_client and user client
expected = [mock.call(username='magnum',
project_name='service',
password='verybadpass',
auth_url='http://server.test:5000/v3',
endpoint='http://server.test:5000/v3'),
mock.call(token='abcd1234',
project_id='test_tenant_id',
auth_url='http://server.test:5000/v3',
endpoint='http://server.test:5000/v3')]
self.assertEqual(expected, mock_ks.call_args_list)
self.assertEqual([mock.call(), mock.call()],
mock_ks.return_value.authenticate.call_args_list)
# trust creation
self.assertEqual('atrust123', trust_context.trust_id)
mock_ks.return_value.trusts.create.assert_called_once_with(
trustor_user='5678',
trustee_user='1234',
project='42',
impersonation=True,
role_names=['magnum_assembly_update'])
def test_init_admin_client_denied(self, mock_ks):
"""Test the admin_client property, auth failure path."""
self.ctx.username = None
self.ctx.password = None
self.ctx.trust_id = None
mock_ks.return_value.authenticate.return_value = False
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
# Define wrapper for property or the property raises the exception
# outside of the assertRaises which fails the test
def get_admin_client():
magnum_ks_client.admin_client
self.assertRaises(exception.AuthorizationFailure,
get_admin_client)
def test_trust_init_fail(self, mock_ks):
"""Test consuming a trust when initializing, error scoping."""
self.ctx.username = None
self.ctx.auth_token = None
self.ctx.trust_id = 'atrust123'
mock_ks.return_value.auth_ref.trust_scoped = False
self.assertRaises(exception.AuthorizationFailure,
magnum_keystoneclient.KeystoneClientV3, self.ctx)
def test_trust_init_token(self, mock_ks):
"""Test trust_id takes precedence when token specified."""
self.ctx.username = None
self.ctx.trust_id = 'atrust123'
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
self.assertIsNotNone(magnum_ks_client._client)
mock_ks.assert_called_once_with(username='magnum',
auth_url='http://server.test:5000/v3',
password='verybadpass',
endpoint='http://server.test:5000/v3',
trust_id='atrust123')
mock_ks.return_value.authenticate.assert_called_once_with()
def test_delete_trust(self, mock_ks):
"""Test delete_trust when deleting trust."""
mock_ks.return_value.trusts.delete.return_value = None
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
self.assertIsNone(magnum_ks_client.delete_trust(trust_id='atrust123'))
mock_ks.return_value.trusts.delete.assert_called_once_with('atrust123')
def test_delete_trust_not_found(self, mock_ks):
"""Test delete_trust when trust already deleted."""
mock_delete = mock_ks.return_value.trusts.delete
mock_delete.side_effect = kc_exception.NotFound()
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
self.assertIsNone(magnum_ks_client.delete_trust(trust_id='atrust123'))

View File

@ -25,10 +25,8 @@ from magnum.db.sqlalchemy import api as sql_api
CONF = cfg.CONF
def dummy_context(user='test_username', tenant_id='test_tenant_id',
user_name='usr_name'):
return context.RequestContext(user=user, tenant=tenant_id,
user_name=user_name)
def dummy_context(user='test_username', tenant_id='test_tenant_id'):
return context.RequestContext(user=user, tenant=tenant_id)
class Database(fixtures.Fixture):