Code refactor for keystoneclient

1.Rename magnum_keystone_client to keystone.
2.Code refactor to let keystoneclient can create trust for
any user, not only admin user.

Change-Id: Iac20185f2cc89ac8d6fe7cf30b2362302614df77
Partially-Implements: blueprint generate-keystone-trust
This commit is contained in:
Hua Wang 2015-09-07 10:35:09 +08:00
parent 4381c685b3
commit d52237d89a
8 changed files with 277 additions and 445 deletions

View File

@ -8,9 +8,6 @@
# Default value is True. (boolean value)
#enable_authentication = true
# Subset of trustor roles to be delegated to magnum. (list value)
#trusts_delegated_roles = magnum_assembly_update
# Directory where the magnum python module is installed. (string
# value)
#pybasedir = /opt/stack/magnum/magnum
@ -236,7 +233,7 @@
# From magnum
#
# The port for the Magnum API server. (port number)
# The port for the Magnum API server. (unknown type)
# Minimum value: 1
# Maximum value: 65535
#port = 9511
@ -729,7 +726,7 @@
# value)
#k8s_protocol = http
# Default port of the k8s master endpoint. (port number)
# Default port of the k8s master endpoint. (unknown type)
# Minimum value: 1
# Maximum value: 65535
#k8s_port = 8080

View File

@ -20,7 +20,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from magnum.common import exception
from magnum.common import magnum_keystoneclient
from magnum.common import keystone
from magnum.i18n import _
@ -123,7 +123,7 @@ class OpenStackClients(object):
@property
def auth_url(self):
return self.keystone().v3_endpoint
return self.keystone().auth_url
@property
def auth_token(self):
@ -133,7 +133,7 @@ class OpenStackClients(object):
if self._keystone:
return self._keystone
self._keystone = magnum_keystoneclient.KeystoneClientV3(self.context)
self._keystone = keystone.KeystoneClientV3(self.context)
return self._keystone
def _get_client_option(self, client, option):
@ -196,7 +196,7 @@ class OpenStackClients(object):
endpoint = self.url_for(service_type='key-manager',
endpoint_type=endpoint_type,
region_name=region_name)
session = self.keystone().client.session
session = self.keystone().session
self._barbican = barbicanclient.Client(session=session,
endpoint=endpoint)

130
magnum/common/keystone.py Normal file
View File

@ -0,0 +1,130 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import keystoneclient.exceptions as kc_exception
from keystoneclient.v3 import client as kc_v3
from oslo_config import cfg
from oslo_log import log as logging
from magnum.common import exception
from magnum.i18n import _LE
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token')
class KeystoneClientV3(object):
"""Keystone client wrapper so we can encapsulate logic in one place."""
def __init__(self, context):
self.context = context
self._client = None
self._admin_client = None
@property
def auth_url(self):
v3_auth_url = CONF.keystone_authtoken.auth_uri.replace('v2.0', 'v3')
return v3_auth_url
@property
def auth_token(self):
return self.client.auth_token
@property
def session(self):
return self.client.session
@property
def admin_session(self):
return self.admin_client.session
@property
def client(self):
if self.context.is_admin:
return self.admin_client
else:
if not self._client:
self._client = self._get_ks_client()
return self._client
def _get_admin_credentials(self):
credentials = {
'username': CONF.keystone_authtoken.admin_user,
'password': CONF.keystone_authtoken.admin_password,
'project_name': CONF.keystone_authtoken.admin_tenant_name
}
return credentials
@property
def admin_client(self):
if not self._admin_client:
admin_credentials = self._get_admin_credentials()
self._admin_client = kc_v3.Client(auth_url=self.auth_url,
**admin_credentials)
return self._admin_client
@staticmethod
def _is_v2_valid(auth_token_info):
return 'access' in auth_token_info
@staticmethod
def _is_v3_valid(auth_token_info):
return 'token' in auth_token_info
def _get_ks_client(self):
kwargs = {'auth_url': self.auth_url}
if self.context.trust_id:
kwargs.update(self._get_admin_credentials())
kwargs['trust_id'] = self.context.trust_id
kwargs.pop('project_name')
elif self.context.auth_token_info:
kwargs['token'] = self.context.auth_token
if self._is_v2_valid(self.context.auth_token_info):
LOG.warn('Keystone v2 is deprecated.')
kwargs['auth_ref'] = self.context.auth_token_info['access']
kwargs['auth_ref']['version'] = 'v2.0'
elif self._is_v3_valid(self.context.auth_token_info):
kwargs['auth_ref'] = self.context.auth_token_info['token']
kwargs['auth_ref']['version'] = 'v3'
else:
LOG.error(_LE('Unknown version in auth_token_info'))
raise exception.AuthorizationFailure()
elif self.context.auth_token:
kwargs['token'] = self.context.auth_token
else:
LOG.error(_LE('Keystone v3 API conntection failed, no password '
'trust or auth_token'))
raise exception.AuthorizationFailure()
return kc_v3.Client(**kwargs)
def create_trust(self, trustee_user, role_names, impersonation=True):
trustor_user_id = self.client.auth_ref.user_id
trustor_project_id = self.client.auth_ref.project_id
trust = self.client.trusts.create(trustor_user=trustor_user_id,
project=trustor_project_id,
trustee_user=trustee_user,
impersonation=impersonation,
role_names=role_names)
return trust
def create_trust_to_admin(self, role_names, impersonation=True):
trustee_user = self.admin_client.auth_ref.user_id
return self.create_trust(trustee_user, role_names, impersonation)
def delete_trust(self, trust_id):
try:
self.client.trusts.delete(trust_id)
except kc_exception.NotFound:
pass

View File

@ -1,202 +0,0 @@
# Copyright 2014 - Rackspace Hosting.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import keystoneclient.exceptions as kc_exception
from keystoneclient.v3 import client as kc_v3
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
from magnum.common import context as magnum_context
from magnum.common import exception
from magnum.i18n import _
from magnum.i18n import _LE
from magnum.i18n import _LI
LOG = logging.getLogger(__name__)
trust_opts = [
cfg.ListOpt('trusts_delegated_roles',
default=['magnum_assembly_update'],
help=_('Subset of trustor roles to be delegated to magnum.')),
]
cfg.CONF.register_opts(trust_opts)
cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token',
group='keystone_authtoken')
class KeystoneClientV3(object):
"""Keystone client wrapper so we can encapsulate logic in one place."""
def __init__(self, context):
# If a trust_id is specified in the context, we immediately
# authenticate so we can populate the context with a trust token
# otherwise, we delay client authentication until needed to avoid
# unnecessary calls to keystone.
#
# Note that when you obtain a token using a trust, it cannot be
# used to reauthenticate and get another token, so we have to
# get a new trust-token even if context.auth_token is set.
#
# - context.auth_url is expected to contain a versioned keystone
# path, we will work with either a v2.0 or v3 path
self.context = context
self._client = None
self._admin_client = None
self._is_admin = context.is_admin
if self.context.auth_url:
self.v3_endpoint = self.context.auth_url.replace('v2.0', 'v3')
else:
# Import auth_token to have keystone_authtoken settings setup.
importutils.import_module('keystonemiddleware.auth_token')
self.v3_endpoint = cfg.CONF.keystone_authtoken.auth_uri.replace(
'v2.0', 'v3')
if self.context.trust_id:
# Create a client with the specified trust_id, this
# populates self.context.auth_token with a trust-scoped token
self._client = self._v3_client_init()
@property
def client(self):
if self._is_admin:
return self.admin_client
else:
if not self._client:
# Create connection to v3 API
self._client = self._v3_client_init()
return self._client
@property
def admin_client(self):
if not self._admin_client:
# Create admin client connection to v3 API
admin_creds = self._service_admin_creds()
c = kc_v3.Client(**admin_creds)
if c.authenticate():
self._admin_client = c
else:
LOG.error(_LE("Admin client authentication failed"))
raise exception.AuthorizationFailure()
return self._admin_client
def _v3_client_init(self):
kwargs = {
'auth_url': self.v3_endpoint,
'endpoint': self.v3_endpoint
}
# Note try trust_id first, as we can't reuse auth_token in that case
if self.context.trust_id is not None:
# We got a trust_id, so we use the admin credentials
# to authenticate with the trust_id so we can use the
# trust impersonating the trustor user.
kwargs.update(self._service_admin_creds())
kwargs['trust_id'] = self.context.trust_id
kwargs.pop('project_name')
elif self.context.auth_token_info is not None:
# The auth_ref version must be set according to the token version
if 'access' in self.context.auth_token_info:
kwargs['auth_ref'] = copy.deepcopy(
self.context.auth_token_info['access'])
kwargs['auth_ref']['version'] = 'v2.0'
kwargs['auth_ref']['token']['id'] = self.context.auth_token
elif 'token' in self.context.auth_token_info:
kwargs['auth_ref'] = copy.deepcopy(
self.context.auth_token_info['token'])
kwargs['auth_ref']['version'] = 'v3'
kwargs['auth_ref']['auth_token'] = self.context.auth_token
else:
LOG.error(_LE("Unknown version in auth_token_info"))
raise exception.AuthorizationFailure()
elif self.context.auth_token is not None:
kwargs['token'] = self.context.auth_token
kwargs['project_id'] = self.context.project_id
else:
LOG.error(_LE("Keystone v3 API connection failed, no password "
"trust or auth_token!"))
raise exception.AuthorizationFailure()
client = kc_v3.Client(**kwargs)
if 'auth_ref' not in kwargs:
client.authenticate()
# If we are authenticating with a trust set the context auth_token
# with the trust scoped token
if 'trust_id' in kwargs:
# Sanity check
if not client.auth_ref.trust_scoped:
LOG.error(_LE("trust token re-scoping failed!"))
raise exception.AuthorizationFailure()
# All OK so update the context with the token
self.context.auth_token = client.auth_ref.auth_token
self.context.auth_url = self.v3_endpoint
self.context.user = client.auth_ref.user_id
self.context.project_id = client.auth_ref.project_id
self.context.user_name = client.auth_ref.username
return client
def _service_admin_creds(self):
# Import auth_token to have keystone_authtoken settings setup.
importutils.import_module('keystonemiddleware.auth_token')
creds = {
'username': cfg.CONF.keystone_authtoken.admin_user,
'password': cfg.CONF.keystone_authtoken.admin_password,
'auth_url': self.v3_endpoint,
'endpoint': self.v3_endpoint,
'project_name': cfg.CONF.keystone_authtoken.admin_tenant_name}
LOG.info(_LI('admin creds %s') % creds)
return creds
def create_trust_context(self):
"""Create a trust using the trustor identity in the current context.
Use the trustee as the magnum service user and return a context
containing the new trust_id.
If the current context already contains a trust_id, we do nothing
and return the current context.
"""
if self.context.trust_id:
return self.context
# We need the service admin user ID (not name), as the trustor user
# can't lookup the ID in keystoneclient unless they're admin
# workaround this by getting the user_id from admin_client
trustee_user_id = self.admin_client.auth_ref.user_id
trustor_user_id = self.client.auth_ref.user_id
trustor_project_id = self.client.auth_ref.project_id
roles = cfg.CONF.trusts_delegated_roles
trust = self.client.trusts.create(trustor_user=trustor_user_id,
trustee_user=trustee_user_id,
project=trustor_project_id,
impersonation=True,
role_names=roles)
trust_context = magnum_context.RequestContext.from_dict(
self.context.to_dict())
trust_context.trust_id = trust.id
return trust_context
def delete_trust(self, trust_id):
"""Delete the specified trust."""
try:
self.client.trusts.delete(trust_id)
except kc_exception.NotFound:
pass
@property
def auth_token(self):
return self.client.auth_token

View File

@ -21,7 +21,6 @@ import magnum.common.cert_manager
from magnum.common.cert_manager import local_cert_manager
import magnum.common.clients
import magnum.common.exception
import magnum.common.magnum_keystoneclient
import magnum.common.service
import magnum.common.x509.config
import magnum.conductor.config
@ -36,7 +35,6 @@ def list_opts():
return [
('DEFAULT',
itertools.chain(magnum.api.auth.AUTH_OPTS,
magnum.common.magnum_keystoneclient.trust_opts,
magnum.common.paths.PATH_OPTS,
magnum.common.utils.UTILS_OPTS,
magnum.common.rpc_service.periodic_opts,

View File

@ -23,6 +23,12 @@ from magnum.tests import base
class ClientsTest(base.BaseTestCase):
def setUp(self):
super(ClientsTest, self).setUp()
cfg.CONF.set_override('auth_uri', 'http://server.test:5000/v2.0',
group='keystone_authtoken')
@mock.patch.object(clients.OpenStackClients, 'keystone')
def test_url_for(self, mock_keystone):
obj = clients.OpenStackClients(None)
@ -169,14 +175,14 @@ class ClientsTest(base.BaseTestCase):
con.auth_url = "keystone_url"
mock_url.return_value = "url_from_keystone"
keystone = mock.MagicMock()
keystone.client.session = mock.MagicMock()
keystone.session = mock.MagicMock()
mock_keystone.return_value = keystone
obj = clients.OpenStackClients(con)
obj._barbican = None
obj.barbican()
mock_call.assert_called_once_with(
endpoint='url_from_keystone',
session=keystone.client.session)
session=keystone.session)
mock_keystone.assert_called_once_with()
mock_url.assert_called_once_with(service_type='key-manager',
@ -211,7 +217,7 @@ class ClientsTest(base.BaseTestCase):
con.auth_url = "keystone_url"
mock_url.return_value = "url_from_keystone"
keystone = mock.MagicMock()
keystone.client.session = mock.MagicMock()
keystone.session = mock.MagicMock()
mock_keystone.return_value = keystone
obj = clients.OpenStackClients(con)
obj._barbican = None

View File

@ -0,0 +1,132 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
cfg.CONF.import_group('keystone_authtoken',
'keystonemiddleware.auth_token')
import keystoneclient.exceptions as kc_exception
from magnum.common import exception
from magnum.common import keystone
from magnum.tests import base
from magnum.tests import utils
@mock.patch('keystoneclient.v3.client.Client')
class KeystoneClientTest(base.BaseTestCase):
def setUp(self):
super(KeystoneClientTest, self).setUp()
dummy_url = 'http://server.test:5000/v2.0'
self.ctx = utils.dummy_context()
self.ctx.auth_url = dummy_url
self.ctx.auth_token = 'abcd1234'
cfg.CONF.set_override('auth_uri', dummy_url,
group='keystone_authtoken')
cfg.CONF.set_override('admin_user', 'magnum',
group='keystone_authtoken')
cfg.CONF.set_override('admin_password', 'verybadpass',
group='keystone_authtoken')
cfg.CONF.set_override('admin_tenant_name', 'service',
group='keystone_authtoken')
def test_client_with_token(self, mock_ks):
ks_client = keystone.KeystoneClientV3(self.ctx)
ks_client.client
self.assertIsNotNone(ks_client._client)
mock_ks.assert_called_once_with(token='abcd1234',
auth_url='http://server.test:5000/v3')
def test_client_with_no_credentials(self, mock_ks):
self.ctx.auth_token = None
ks_client = keystone.KeystoneClientV3(self.ctx)
self.assertRaises(exception.AuthorizationFailure,
ks_client._get_ks_client)
def test_client_with_v2_auth_token_info(self, mock_ks):
self.ctx.auth_token_info = {'access': {}}
ks_client = keystone.KeystoneClientV3(self.ctx)
ks_client.client
self.assertIsNotNone(ks_client._client)
mock_ks.assert_called_once_with(auth_ref={'version': 'v2.0'},
auth_url='http://server.test:5000/v3',
token='abcd1234')
def test_client_with_v3_auth_token_info(self, mock_ks):
self.ctx.auth_token_info = {'token': {}}
ks_client = keystone.KeystoneClientV3(self.ctx)
ks_client.client
self.assertIsNotNone(ks_client._client)
mock_ks.assert_called_once_with(auth_ref={'version': 'v3'},
auth_url='http://server.test:5000/v3',
token='abcd1234')
def test_client_with_invalid_auth_token_info(self, mock_ks):
self.ctx.auth_token_info = {'not_this': 'urg'}
ks_client = keystone.KeystoneClientV3(self.ctx)
self.assertRaises(exception.AuthorizationFailure,
ks_client._get_ks_client)
def test_client_with_is_admin(self, mock_ks):
self.ctx.is_admin = True
ks_client = keystone.KeystoneClientV3(self.ctx)
ks_client.client
self.assertIsNone(ks_client._client)
self.assertIsNotNone(ks_client._admin_client)
mock_ks.assert_called_once_with(auth_url='http://server.test:5000/v3',
username='magnum',
password='verybadpass',
project_name='service')
def test_delete_trust(self, mock_ks):
mock_ks.return_value.trusts.delete.return_value = None
ks_client = keystone.KeystoneClientV3(self.ctx)
self.assertIsNone(ks_client.delete_trust(trust_id='atrust123'))
mock_ks.return_value.trusts.delete.assert_called_once_with('atrust123')
def test_delete_trust_not_found(self, mock_ks):
mock_delete = mock_ks.return_value.trusts.delete
mock_delete.side_effect = kc_exception.NotFound()
ks_client = keystone.KeystoneClientV3(self.ctx)
self.assertIsNone(ks_client.delete_trust(trust_id='atrust123'))
def test_create_trust(self, mock_ks):
mock_ks.return_value.auth_ref.user_id = '123456'
mock_ks.return_value.auth_ref.project_id = '654321'
ks_client = keystone.KeystoneClientV3(self.ctx)
ks_client.create_trust(trustee_user='888888',
role_names='xxxx')
mock_ks.return_value.trusts.create.assert_called_once_with(
trustor_user='123456', project='654321',
trustee_user='888888', role_names='xxxx',
impersonation=True)
@mock.patch.object(keystone.KeystoneClientV3,
'create_trust')
def test_create_trust_to_admin(self, mock_create_trust, mock_ks):
mock_ks.return_value.auth_ref.user_id = '777777'
ks_client = keystone.KeystoneClientV3(self.ctx)
ks_client.create_trust_to_admin(role_names='xxxx')
mock_create_trust.assert_called_once_with('777777', 'xxxx', True)

View File

@ -1,229 +0,0 @@
# Copyright 2014 - Rackspace Hosting.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
cfg.CONF.import_group('keystone_authtoken',
'keystonemiddleware.auth_token')
import keystoneclient.exceptions as kc_exception # noqa
from magnum.common import exception
from magnum.common import magnum_keystoneclient
from magnum.tests import base
from magnum.tests import utils
@mock.patch('keystoneclient.v3.client.Client')
class KeystoneClientTest(base.BaseTestCase):
"""Test cases for magnum.common.magnum_keystoneclient."""
def setUp(self):
super(KeystoneClientTest, self).setUp()
dummy_url = 'http://server.test:5000/v2.0'
self.ctx = utils.dummy_context()
self.ctx.auth_url = dummy_url
self.ctx.auth_token = 'abcd1234'
self.ctx.auth_token_info = None
cfg.CONF.set_override('auth_uri', dummy_url,
group='keystone_authtoken')
cfg.CONF.set_override('admin_user', 'magnum',
group='keystone_authtoken')
cfg.CONF.set_override('admin_password', 'verybadpass',
group='keystone_authtoken')
cfg.CONF.set_override('admin_tenant_name', 'service',
group='keystone_authtoken')
def test_init_v3_token(self, mock_ks):
"""Test creating the client, token auth."""
self.ctx.project_id = None
self.ctx.trust_id = None
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
magnum_ks_client.client
self.assertIsNotNone(magnum_ks_client._client)
mock_ks.assert_called_once_with(token='abcd1234', project_id=None,
auth_url='http://server.test:5000/v3',
endpoint='http://server.test:5000/v3')
mock_ks.return_value.authenticate.assert_called_once_with()
def test_init_v3_bad_nocreds(self, mock_ks):
"""Test creating the client, no credentials."""
self.ctx.auth_token = None
self.ctx.trust_id = None
self.ctx.username = None
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
self.assertRaises(exception.AuthorizationFailure,
magnum_ks_client._v3_client_init)
def test_init_trust_token_access(self, mock_ks):
"""Test creating the client, token auth."""
self.ctx.project_id = 'abcd1234'
self.ctx.trust_id = None
self.ctx.auth_token_info = {'access': {'token': {'id': 'placeholder'}}}
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
magnum_ks_client.client
self.assertIsNotNone(magnum_ks_client._client)
mock_ks.assert_called_once_with(auth_ref={'version': 'v2.0',
'token': {
'id': 'abcd1234'}},
endpoint='http://server.test:5000/v3',
auth_url='http://server.test:5000/v3')
def test_init_trust_token_token(self, mock_ks):
self.ctx.project_id = None
self.ctx.trust_id = None
self.ctx.auth_token_info = {'token': {}}
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
magnum_ks_client.client
self.assertIsNotNone(magnum_ks_client._client)
mock_ks.assert_called_once_with(auth_ref={'auth_token': 'abcd1234',
'version': 'v3'},
endpoint='http://server.test:5000/v3',
auth_url='http://server.test:5000/v3')
def test_init_trust_token_none(self, mock_ks):
self.ctx.project_id = None
self.ctx.trust_id = None
self.ctx.auth_token_info = {'not_this': 'urg'}
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
self.assertRaises(exception.AuthorizationFailure,
magnum_ks_client._v3_client_init)
def test_create_trust_context_trust_id(self, mock_ks):
"""Test create_trust_context with existing trust_id."""
self.ctx.trust_id = 'atrust123'
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
trust_context = magnum_ks_client.create_trust_context()
self.assertEqual(self.ctx.to_dict(), trust_context.to_dict())
mock_ks.assert_called_once_with(username='magnum',
auth_url='http://server.test:5000/v3',
password='verybadpass',
endpoint='http://server.test:5000/v3',
trust_id='atrust123')
mock_ks.return_value.authenticate.assert_called_once_with()
def test_create_trust_context_trust_create(self, mock_ks):
"""Test create_trust_context when creating a trust."""
class FakeTrust(object):
id = 'atrust123'
cfg.CONF.set_override('trusts_delegated_roles',
['magnum_assembly_update'])
getter_mock = mock.PropertyMock(side_effect=['1234', '5678'])
type(mock_ks.return_value.auth_ref).user_id = getter_mock
mock_ks.return_value.auth_ref.project_id = '42'
mock_ks.return_value.trusts.create.return_value = FakeTrust()
self.ctx.trust_id = None
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
trust_context = magnum_ks_client.create_trust_context()
# admin_client and user client
expected = [mock.call(username='magnum',
project_name='service',
password='verybadpass',
auth_url='http://server.test:5000/v3',
endpoint='http://server.test:5000/v3'),
mock.call(token='abcd1234',
project_id='test_tenant_id',
auth_url='http://server.test:5000/v3',
endpoint='http://server.test:5000/v3')]
self.assertEqual(expected, mock_ks.call_args_list)
self.assertEqual([mock.call(), mock.call()],
mock_ks.return_value.authenticate.call_args_list)
# trust creation
self.assertEqual('atrust123', trust_context.trust_id)
mock_ks.return_value.trusts.create.assert_called_once_with(
trustor_user='5678',
trustee_user='1234',
project='42',
impersonation=True,
role_names=['magnum_assembly_update'])
def test_init_admin_client_denied(self, mock_ks):
"""Test the admin_client property, auth failure path."""
self.ctx.username = None
self.ctx.password = None
self.ctx.trust_id = None
mock_ks.return_value.authenticate.return_value = False
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
# Define wrapper for property or the property raises the exception
# outside of the assertRaises which fails the test
def get_admin_client():
magnum_ks_client.admin_client
self.assertRaises(exception.AuthorizationFailure,
get_admin_client)
def test_trust_init_fail(self, mock_ks):
"""Test consuming a trust when initializing, error scoping."""
self.ctx.username = None
self.ctx.auth_token = None
self.ctx.trust_id = 'atrust123'
mock_ks.return_value.auth_ref.trust_scoped = False
self.assertRaises(exception.AuthorizationFailure,
magnum_keystoneclient.KeystoneClientV3, self.ctx)
def test_trust_init_token(self, mock_ks):
"""Test trust_id takes precedence when token specified."""
self.ctx.username = None
self.ctx.trust_id = 'atrust123'
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
self.assertIsNotNone(magnum_ks_client._client)
mock_ks.assert_called_once_with(username='magnum',
auth_url='http://server.test:5000/v3',
password='verybadpass',
endpoint='http://server.test:5000/v3',
trust_id='atrust123')
mock_ks.return_value.authenticate.assert_called_once_with()
def test_delete_trust(self, mock_ks):
"""Test delete_trust when deleting trust."""
mock_ks.return_value.trusts.delete.return_value = None
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
self.assertIsNone(magnum_ks_client.delete_trust(trust_id='atrust123'))
mock_ks.return_value.trusts.delete.assert_called_once_with('atrust123')
def test_delete_trust_not_found(self, mock_ks):
"""Test delete_trust when trust already deleted."""
mock_delete = mock_ks.return_value.trusts.delete
mock_delete.side_effect = kc_exception.NotFound()
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
self.assertIsNone(magnum_ks_client.delete_trust(trust_id='atrust123'))
@mock.patch.object(magnum_keystoneclient.KeystoneClientV3,
'_service_admin_creds')
def test_client_is_admin(self, mock_admin_creds, mock_ks):
"""Test client is admin when passing an admin_context."""
self.ctx.is_admin = True
magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx)
magnum_ks_client.client
self.assertIsNone(magnum_ks_client._client)
self.assertIsNotNone(magnum_ks_client._admin_client)
mock_admin_creds.assert_called_once_with()