From d52237d89a88f063bfafd77e6e1f885569f61254 Mon Sep 17 00:00:00 2001 From: Hua Wang Date: Mon, 7 Sep 2015 10:35:09 +0800 Subject: [PATCH] Code refactor for keystoneclient 1.Rename magnum_keystone_client to keystone. 2.Code refactor to let keystoneclient can create trust for any user, not only admin user. Change-Id: Iac20185f2cc89ac8d6fe7cf30b2362302614df77 Partially-Implements: blueprint generate-keystone-trust --- etc/magnum/magnum.conf.sample | 7 +- magnum/common/clients.py | 8 +- magnum/common/keystone.py | 130 ++++++++++ magnum/common/magnum_keystoneclient.py | 202 --------------- magnum/opts.py | 2 - magnum/tests/unit/common/test_clients.py | 12 +- magnum/tests/unit/common/test_keystone.py | 132 ++++++++++ .../unit/common/test_magnum_keystoneclient.py | 229 ------------------ 8 files changed, 277 insertions(+), 445 deletions(-) create mode 100644 magnum/common/keystone.py delete mode 100644 magnum/common/magnum_keystoneclient.py create mode 100644 magnum/tests/unit/common/test_keystone.py delete mode 100644 magnum/tests/unit/common/test_magnum_keystoneclient.py diff --git a/etc/magnum/magnum.conf.sample b/etc/magnum/magnum.conf.sample index ea088efed1..25535f3232 100644 --- a/etc/magnum/magnum.conf.sample +++ b/etc/magnum/magnum.conf.sample @@ -8,9 +8,6 @@ # Default value is True. (boolean value) #enable_authentication = true -# Subset of trustor roles to be delegated to magnum. (list value) -#trusts_delegated_roles = magnum_assembly_update - # Directory where the magnum python module is installed. (string # value) #pybasedir = /opt/stack/magnum/magnum @@ -236,7 +233,7 @@ # From magnum # -# The port for the Magnum API server. (port number) +# The port for the Magnum API server. (unknown type) # Minimum value: 1 # Maximum value: 65535 #port = 9511 @@ -729,7 +726,7 @@ # value) #k8s_protocol = http -# Default port of the k8s master endpoint. (port number) +# Default port of the k8s master endpoint. (unknown type) # Minimum value: 1 # Maximum value: 65535 #k8s_port = 8080 diff --git a/magnum/common/clients.py b/magnum/common/clients.py index aa0439cf3f..86fc57d537 100644 --- a/magnum/common/clients.py +++ b/magnum/common/clients.py @@ -20,7 +20,7 @@ from oslo_config import cfg from oslo_log import log as logging from magnum.common import exception -from magnum.common import magnum_keystoneclient +from magnum.common import keystone from magnum.i18n import _ @@ -123,7 +123,7 @@ class OpenStackClients(object): @property def auth_url(self): - return self.keystone().v3_endpoint + return self.keystone().auth_url @property def auth_token(self): @@ -133,7 +133,7 @@ class OpenStackClients(object): if self._keystone: return self._keystone - self._keystone = magnum_keystoneclient.KeystoneClientV3(self.context) + self._keystone = keystone.KeystoneClientV3(self.context) return self._keystone def _get_client_option(self, client, option): @@ -196,7 +196,7 @@ class OpenStackClients(object): endpoint = self.url_for(service_type='key-manager', endpoint_type=endpoint_type, region_name=region_name) - session = self.keystone().client.session + session = self.keystone().session self._barbican = barbicanclient.Client(session=session, endpoint=endpoint) diff --git a/magnum/common/keystone.py b/magnum/common/keystone.py new file mode 100644 index 0000000000..b1b42598d3 --- /dev/null +++ b/magnum/common/keystone.py @@ -0,0 +1,130 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import keystoneclient.exceptions as kc_exception +from keystoneclient.v3 import client as kc_v3 +from oslo_config import cfg +from oslo_log import log as logging + +from magnum.common import exception +from magnum.i18n import _LE + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + +CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token') + + +class KeystoneClientV3(object): + """Keystone client wrapper so we can encapsulate logic in one place.""" + + def __init__(self, context): + self.context = context + self._client = None + self._admin_client = None + + @property + def auth_url(self): + v3_auth_url = CONF.keystone_authtoken.auth_uri.replace('v2.0', 'v3') + return v3_auth_url + + @property + def auth_token(self): + return self.client.auth_token + + @property + def session(self): + return self.client.session + + @property + def admin_session(self): + return self.admin_client.session + + @property + def client(self): + if self.context.is_admin: + return self.admin_client + else: + if not self._client: + self._client = self._get_ks_client() + return self._client + + def _get_admin_credentials(self): + credentials = { + 'username': CONF.keystone_authtoken.admin_user, + 'password': CONF.keystone_authtoken.admin_password, + 'project_name': CONF.keystone_authtoken.admin_tenant_name + } + return credentials + + @property + def admin_client(self): + if not self._admin_client: + admin_credentials = self._get_admin_credentials() + self._admin_client = kc_v3.Client(auth_url=self.auth_url, + **admin_credentials) + return self._admin_client + + @staticmethod + def _is_v2_valid(auth_token_info): + return 'access' in auth_token_info + + @staticmethod + def _is_v3_valid(auth_token_info): + return 'token' in auth_token_info + + def _get_ks_client(self): + kwargs = {'auth_url': self.auth_url} + if self.context.trust_id: + kwargs.update(self._get_admin_credentials()) + kwargs['trust_id'] = self.context.trust_id + kwargs.pop('project_name') + elif self.context.auth_token_info: + kwargs['token'] = self.context.auth_token + if self._is_v2_valid(self.context.auth_token_info): + LOG.warn('Keystone v2 is deprecated.') + kwargs['auth_ref'] = self.context.auth_token_info['access'] + kwargs['auth_ref']['version'] = 'v2.0' + elif self._is_v3_valid(self.context.auth_token_info): + kwargs['auth_ref'] = self.context.auth_token_info['token'] + kwargs['auth_ref']['version'] = 'v3' + else: + LOG.error(_LE('Unknown version in auth_token_info')) + raise exception.AuthorizationFailure() + elif self.context.auth_token: + kwargs['token'] = self.context.auth_token + else: + LOG.error(_LE('Keystone v3 API conntection failed, no password ' + 'trust or auth_token')) + raise exception.AuthorizationFailure() + + return kc_v3.Client(**kwargs) + + def create_trust(self, trustee_user, role_names, impersonation=True): + trustor_user_id = self.client.auth_ref.user_id + trustor_project_id = self.client.auth_ref.project_id + trust = self.client.trusts.create(trustor_user=trustor_user_id, + project=trustor_project_id, + trustee_user=trustee_user, + impersonation=impersonation, + role_names=role_names) + return trust + + def create_trust_to_admin(self, role_names, impersonation=True): + trustee_user = self.admin_client.auth_ref.user_id + return self.create_trust(trustee_user, role_names, impersonation) + + def delete_trust(self, trust_id): + try: + self.client.trusts.delete(trust_id) + except kc_exception.NotFound: + pass diff --git a/magnum/common/magnum_keystoneclient.py b/magnum/common/magnum_keystoneclient.py deleted file mode 100644 index 90b3c5620e..0000000000 --- a/magnum/common/magnum_keystoneclient.py +++ /dev/null @@ -1,202 +0,0 @@ -# Copyright 2014 - Rackspace Hosting. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import copy - -import keystoneclient.exceptions as kc_exception -from keystoneclient.v3 import client as kc_v3 -from oslo_config import cfg -from oslo_log import log as logging -from oslo_utils import importutils - -from magnum.common import context as magnum_context -from magnum.common import exception -from magnum.i18n import _ -from magnum.i18n import _LE -from magnum.i18n import _LI - -LOG = logging.getLogger(__name__) - -trust_opts = [ - cfg.ListOpt('trusts_delegated_roles', - default=['magnum_assembly_update'], - help=_('Subset of trustor roles to be delegated to magnum.')), -] -cfg.CONF.register_opts(trust_opts) -cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token', - group='keystone_authtoken') - - -class KeystoneClientV3(object): - """Keystone client wrapper so we can encapsulate logic in one place.""" - - def __init__(self, context): - # If a trust_id is specified in the context, we immediately - # authenticate so we can populate the context with a trust token - # otherwise, we delay client authentication until needed to avoid - # unnecessary calls to keystone. - # - # Note that when you obtain a token using a trust, it cannot be - # used to reauthenticate and get another token, so we have to - # get a new trust-token even if context.auth_token is set. - # - # - context.auth_url is expected to contain a versioned keystone - # path, we will work with either a v2.0 or v3 path - self.context = context - self._client = None - self._admin_client = None - self._is_admin = context.is_admin - - if self.context.auth_url: - self.v3_endpoint = self.context.auth_url.replace('v2.0', 'v3') - else: - # Import auth_token to have keystone_authtoken settings setup. - importutils.import_module('keystonemiddleware.auth_token') - self.v3_endpoint = cfg.CONF.keystone_authtoken.auth_uri.replace( - 'v2.0', 'v3') - - if self.context.trust_id: - # Create a client with the specified trust_id, this - # populates self.context.auth_token with a trust-scoped token - self._client = self._v3_client_init() - - @property - def client(self): - if self._is_admin: - return self.admin_client - else: - if not self._client: - # Create connection to v3 API - self._client = self._v3_client_init() - return self._client - - @property - def admin_client(self): - if not self._admin_client: - # Create admin client connection to v3 API - admin_creds = self._service_admin_creds() - c = kc_v3.Client(**admin_creds) - if c.authenticate(): - self._admin_client = c - else: - LOG.error(_LE("Admin client authentication failed")) - raise exception.AuthorizationFailure() - return self._admin_client - - def _v3_client_init(self): - kwargs = { - 'auth_url': self.v3_endpoint, - 'endpoint': self.v3_endpoint - } - # Note try trust_id first, as we can't reuse auth_token in that case - if self.context.trust_id is not None: - # We got a trust_id, so we use the admin credentials - # to authenticate with the trust_id so we can use the - # trust impersonating the trustor user. - kwargs.update(self._service_admin_creds()) - kwargs['trust_id'] = self.context.trust_id - kwargs.pop('project_name') - elif self.context.auth_token_info is not None: - # The auth_ref version must be set according to the token version - if 'access' in self.context.auth_token_info: - kwargs['auth_ref'] = copy.deepcopy( - self.context.auth_token_info['access']) - kwargs['auth_ref']['version'] = 'v2.0' - kwargs['auth_ref']['token']['id'] = self.context.auth_token - elif 'token' in self.context.auth_token_info: - kwargs['auth_ref'] = copy.deepcopy( - self.context.auth_token_info['token']) - kwargs['auth_ref']['version'] = 'v3' - kwargs['auth_ref']['auth_token'] = self.context.auth_token - else: - LOG.error(_LE("Unknown version in auth_token_info")) - raise exception.AuthorizationFailure() - elif self.context.auth_token is not None: - kwargs['token'] = self.context.auth_token - kwargs['project_id'] = self.context.project_id - else: - LOG.error(_LE("Keystone v3 API connection failed, no password " - "trust or auth_token!")) - raise exception.AuthorizationFailure() - client = kc_v3.Client(**kwargs) - if 'auth_ref' not in kwargs: - client.authenticate() - # If we are authenticating with a trust set the context auth_token - # with the trust scoped token - if 'trust_id' in kwargs: - # Sanity check - if not client.auth_ref.trust_scoped: - LOG.error(_LE("trust token re-scoping failed!")) - raise exception.AuthorizationFailure() - # All OK so update the context with the token - self.context.auth_token = client.auth_ref.auth_token - self.context.auth_url = self.v3_endpoint - self.context.user = client.auth_ref.user_id - self.context.project_id = client.auth_ref.project_id - self.context.user_name = client.auth_ref.username - - return client - - def _service_admin_creds(self): - # Import auth_token to have keystone_authtoken settings setup. - importutils.import_module('keystonemiddleware.auth_token') - creds = { - 'username': cfg.CONF.keystone_authtoken.admin_user, - 'password': cfg.CONF.keystone_authtoken.admin_password, - 'auth_url': self.v3_endpoint, - 'endpoint': self.v3_endpoint, - 'project_name': cfg.CONF.keystone_authtoken.admin_tenant_name} - LOG.info(_LI('admin creds %s') % creds) - return creds - - def create_trust_context(self): - """Create a trust using the trustor identity in the current context. - - Use the trustee as the magnum service user and return a context - containing the new trust_id. - - If the current context already contains a trust_id, we do nothing - and return the current context. - """ - if self.context.trust_id: - return self.context - - # We need the service admin user ID (not name), as the trustor user - # can't lookup the ID in keystoneclient unless they're admin - # workaround this by getting the user_id from admin_client - trustee_user_id = self.admin_client.auth_ref.user_id - trustor_user_id = self.client.auth_ref.user_id - trustor_project_id = self.client.auth_ref.project_id - roles = cfg.CONF.trusts_delegated_roles - trust = self.client.trusts.create(trustor_user=trustor_user_id, - trustee_user=trustee_user_id, - project=trustor_project_id, - impersonation=True, - role_names=roles) - - trust_context = magnum_context.RequestContext.from_dict( - self.context.to_dict()) - trust_context.trust_id = trust.id - return trust_context - - def delete_trust(self, trust_id): - """Delete the specified trust.""" - try: - self.client.trusts.delete(trust_id) - except kc_exception.NotFound: - pass - - @property - def auth_token(self): - return self.client.auth_token diff --git a/magnum/opts.py b/magnum/opts.py index 0b2f5feabe..cf60b8c331 100644 --- a/magnum/opts.py +++ b/magnum/opts.py @@ -21,7 +21,6 @@ import magnum.common.cert_manager from magnum.common.cert_manager import local_cert_manager import magnum.common.clients import magnum.common.exception -import magnum.common.magnum_keystoneclient import magnum.common.service import magnum.common.x509.config import magnum.conductor.config @@ -36,7 +35,6 @@ def list_opts(): return [ ('DEFAULT', itertools.chain(magnum.api.auth.AUTH_OPTS, - magnum.common.magnum_keystoneclient.trust_opts, magnum.common.paths.PATH_OPTS, magnum.common.utils.UTILS_OPTS, magnum.common.rpc_service.periodic_opts, diff --git a/magnum/tests/unit/common/test_clients.py b/magnum/tests/unit/common/test_clients.py index a216a10687..69f36b5d03 100644 --- a/magnum/tests/unit/common/test_clients.py +++ b/magnum/tests/unit/common/test_clients.py @@ -23,6 +23,12 @@ from magnum.tests import base class ClientsTest(base.BaseTestCase): + def setUp(self): + super(ClientsTest, self).setUp() + + cfg.CONF.set_override('auth_uri', 'http://server.test:5000/v2.0', + group='keystone_authtoken') + @mock.patch.object(clients.OpenStackClients, 'keystone') def test_url_for(self, mock_keystone): obj = clients.OpenStackClients(None) @@ -169,14 +175,14 @@ class ClientsTest(base.BaseTestCase): con.auth_url = "keystone_url" mock_url.return_value = "url_from_keystone" keystone = mock.MagicMock() - keystone.client.session = mock.MagicMock() + keystone.session = mock.MagicMock() mock_keystone.return_value = keystone obj = clients.OpenStackClients(con) obj._barbican = None obj.barbican() mock_call.assert_called_once_with( endpoint='url_from_keystone', - session=keystone.client.session) + session=keystone.session) mock_keystone.assert_called_once_with() mock_url.assert_called_once_with(service_type='key-manager', @@ -211,7 +217,7 @@ class ClientsTest(base.BaseTestCase): con.auth_url = "keystone_url" mock_url.return_value = "url_from_keystone" keystone = mock.MagicMock() - keystone.client.session = mock.MagicMock() + keystone.session = mock.MagicMock() mock_keystone.return_value = keystone obj = clients.OpenStackClients(con) obj._barbican = None diff --git a/magnum/tests/unit/common/test_keystone.py b/magnum/tests/unit/common/test_keystone.py new file mode 100644 index 0000000000..af2d50b9e2 --- /dev/null +++ b/magnum/tests/unit/common/test_keystone.py @@ -0,0 +1,132 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from oslo_config import cfg + +cfg.CONF.import_group('keystone_authtoken', + 'keystonemiddleware.auth_token') + +import keystoneclient.exceptions as kc_exception + +from magnum.common import exception +from magnum.common import keystone +from magnum.tests import base +from magnum.tests import utils + + +@mock.patch('keystoneclient.v3.client.Client') +class KeystoneClientTest(base.BaseTestCase): + + def setUp(self): + super(KeystoneClientTest, self).setUp() + dummy_url = 'http://server.test:5000/v2.0' + + self.ctx = utils.dummy_context() + self.ctx.auth_url = dummy_url + self.ctx.auth_token = 'abcd1234' + + cfg.CONF.set_override('auth_uri', dummy_url, + group='keystone_authtoken') + cfg.CONF.set_override('admin_user', 'magnum', + group='keystone_authtoken') + cfg.CONF.set_override('admin_password', 'verybadpass', + group='keystone_authtoken') + cfg.CONF.set_override('admin_tenant_name', 'service', + group='keystone_authtoken') + + def test_client_with_token(self, mock_ks): + ks_client = keystone.KeystoneClientV3(self.ctx) + ks_client.client + self.assertIsNotNone(ks_client._client) + mock_ks.assert_called_once_with(token='abcd1234', + auth_url='http://server.test:5000/v3') + + def test_client_with_no_credentials(self, mock_ks): + self.ctx.auth_token = None + ks_client = keystone.KeystoneClientV3(self.ctx) + self.assertRaises(exception.AuthorizationFailure, + ks_client._get_ks_client) + + def test_client_with_v2_auth_token_info(self, mock_ks): + self.ctx.auth_token_info = {'access': {}} + + ks_client = keystone.KeystoneClientV3(self.ctx) + ks_client.client + self.assertIsNotNone(ks_client._client) + mock_ks.assert_called_once_with(auth_ref={'version': 'v2.0'}, + auth_url='http://server.test:5000/v3', + token='abcd1234') + + def test_client_with_v3_auth_token_info(self, mock_ks): + self.ctx.auth_token_info = {'token': {}} + + ks_client = keystone.KeystoneClientV3(self.ctx) + ks_client.client + self.assertIsNotNone(ks_client._client) + mock_ks.assert_called_once_with(auth_ref={'version': 'v3'}, + auth_url='http://server.test:5000/v3', + token='abcd1234') + + def test_client_with_invalid_auth_token_info(self, mock_ks): + self.ctx.auth_token_info = {'not_this': 'urg'} + + ks_client = keystone.KeystoneClientV3(self.ctx) + self.assertRaises(exception.AuthorizationFailure, + ks_client._get_ks_client) + + def test_client_with_is_admin(self, mock_ks): + self.ctx.is_admin = True + ks_client = keystone.KeystoneClientV3(self.ctx) + ks_client.client + + self.assertIsNone(ks_client._client) + self.assertIsNotNone(ks_client._admin_client) + mock_ks.assert_called_once_with(auth_url='http://server.test:5000/v3', + username='magnum', + password='verybadpass', + project_name='service') + + def test_delete_trust(self, mock_ks): + mock_ks.return_value.trusts.delete.return_value = None + ks_client = keystone.KeystoneClientV3(self.ctx) + self.assertIsNone(ks_client.delete_trust(trust_id='atrust123')) + mock_ks.return_value.trusts.delete.assert_called_once_with('atrust123') + + def test_delete_trust_not_found(self, mock_ks): + mock_delete = mock_ks.return_value.trusts.delete + mock_delete.side_effect = kc_exception.NotFound() + ks_client = keystone.KeystoneClientV3(self.ctx) + self.assertIsNone(ks_client.delete_trust(trust_id='atrust123')) + + def test_create_trust(self, mock_ks): + mock_ks.return_value.auth_ref.user_id = '123456' + mock_ks.return_value.auth_ref.project_id = '654321' + + ks_client = keystone.KeystoneClientV3(self.ctx) + ks_client.create_trust(trustee_user='888888', + role_names='xxxx') + + mock_ks.return_value.trusts.create.assert_called_once_with( + trustor_user='123456', project='654321', + trustee_user='888888', role_names='xxxx', + impersonation=True) + + @mock.patch.object(keystone.KeystoneClientV3, + 'create_trust') + def test_create_trust_to_admin(self, mock_create_trust, mock_ks): + mock_ks.return_value.auth_ref.user_id = '777777' + + ks_client = keystone.KeystoneClientV3(self.ctx) + ks_client.create_trust_to_admin(role_names='xxxx') + + mock_create_trust.assert_called_once_with('777777', 'xxxx', True) diff --git a/magnum/tests/unit/common/test_magnum_keystoneclient.py b/magnum/tests/unit/common/test_magnum_keystoneclient.py deleted file mode 100644 index 2eeb4a555a..0000000000 --- a/magnum/tests/unit/common/test_magnum_keystoneclient.py +++ /dev/null @@ -1,229 +0,0 @@ -# Copyright 2014 - Rackspace Hosting. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock -from oslo_config import cfg - -cfg.CONF.import_group('keystone_authtoken', - 'keystonemiddleware.auth_token') - -import keystoneclient.exceptions as kc_exception # noqa - -from magnum.common import exception -from magnum.common import magnum_keystoneclient -from magnum.tests import base -from magnum.tests import utils - - -@mock.patch('keystoneclient.v3.client.Client') -class KeystoneClientTest(base.BaseTestCase): - """Test cases for magnum.common.magnum_keystoneclient.""" - - def setUp(self): - super(KeystoneClientTest, self).setUp() - dummy_url = 'http://server.test:5000/v2.0' - - self.ctx = utils.dummy_context() - self.ctx.auth_url = dummy_url - self.ctx.auth_token = 'abcd1234' - self.ctx.auth_token_info = None - - cfg.CONF.set_override('auth_uri', dummy_url, - group='keystone_authtoken') - cfg.CONF.set_override('admin_user', 'magnum', - group='keystone_authtoken') - cfg.CONF.set_override('admin_password', 'verybadpass', - group='keystone_authtoken') - cfg.CONF.set_override('admin_tenant_name', 'service', - group='keystone_authtoken') - - def test_init_v3_token(self, mock_ks): - """Test creating the client, token auth.""" - self.ctx.project_id = None - self.ctx.trust_id = None - magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx) - magnum_ks_client.client - self.assertIsNotNone(magnum_ks_client._client) - mock_ks.assert_called_once_with(token='abcd1234', project_id=None, - auth_url='http://server.test:5000/v3', - endpoint='http://server.test:5000/v3') - mock_ks.return_value.authenticate.assert_called_once_with() - - def test_init_v3_bad_nocreds(self, mock_ks): - """Test creating the client, no credentials.""" - self.ctx.auth_token = None - self.ctx.trust_id = None - self.ctx.username = None - magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx) - self.assertRaises(exception.AuthorizationFailure, - magnum_ks_client._v3_client_init) - - def test_init_trust_token_access(self, mock_ks): - """Test creating the client, token auth.""" - self.ctx.project_id = 'abcd1234' - self.ctx.trust_id = None - self.ctx.auth_token_info = {'access': {'token': {'id': 'placeholder'}}} - - magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx) - magnum_ks_client.client - self.assertIsNotNone(magnum_ks_client._client) - mock_ks.assert_called_once_with(auth_ref={'version': 'v2.0', - 'token': { - 'id': 'abcd1234'}}, - endpoint='http://server.test:5000/v3', - auth_url='http://server.test:5000/v3') - - def test_init_trust_token_token(self, mock_ks): - self.ctx.project_id = None - self.ctx.trust_id = None - self.ctx.auth_token_info = {'token': {}} - - magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx) - magnum_ks_client.client - self.assertIsNotNone(magnum_ks_client._client) - mock_ks.assert_called_once_with(auth_ref={'auth_token': 'abcd1234', - 'version': 'v3'}, - endpoint='http://server.test:5000/v3', - auth_url='http://server.test:5000/v3') - - def test_init_trust_token_none(self, mock_ks): - self.ctx.project_id = None - self.ctx.trust_id = None - self.ctx.auth_token_info = {'not_this': 'urg'} - - magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx) - self.assertRaises(exception.AuthorizationFailure, - magnum_ks_client._v3_client_init) - - def test_create_trust_context_trust_id(self, mock_ks): - """Test create_trust_context with existing trust_id.""" - self.ctx.trust_id = 'atrust123' - - magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx) - trust_context = magnum_ks_client.create_trust_context() - self.assertEqual(self.ctx.to_dict(), trust_context.to_dict()) - mock_ks.assert_called_once_with(username='magnum', - auth_url='http://server.test:5000/v3', - password='verybadpass', - endpoint='http://server.test:5000/v3', - trust_id='atrust123') - mock_ks.return_value.authenticate.assert_called_once_with() - - def test_create_trust_context_trust_create(self, mock_ks): - """Test create_trust_context when creating a trust.""" - class FakeTrust(object): - id = 'atrust123' - - cfg.CONF.set_override('trusts_delegated_roles', - ['magnum_assembly_update']) - - getter_mock = mock.PropertyMock(side_effect=['1234', '5678']) - type(mock_ks.return_value.auth_ref).user_id = getter_mock - - mock_ks.return_value.auth_ref.project_id = '42' - mock_ks.return_value.trusts.create.return_value = FakeTrust() - self.ctx.trust_id = None - - magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx) - trust_context = magnum_ks_client.create_trust_context() - - # admin_client and user client - expected = [mock.call(username='magnum', - project_name='service', - password='verybadpass', - auth_url='http://server.test:5000/v3', - endpoint='http://server.test:5000/v3'), - mock.call(token='abcd1234', - project_id='test_tenant_id', - auth_url='http://server.test:5000/v3', - endpoint='http://server.test:5000/v3')] - - self.assertEqual(expected, mock_ks.call_args_list) - self.assertEqual([mock.call(), mock.call()], - mock_ks.return_value.authenticate.call_args_list) - - # trust creation - self.assertEqual('atrust123', trust_context.trust_id) - mock_ks.return_value.trusts.create.assert_called_once_with( - trustor_user='5678', - trustee_user='1234', - project='42', - impersonation=True, - role_names=['magnum_assembly_update']) - - def test_init_admin_client_denied(self, mock_ks): - """Test the admin_client property, auth failure path.""" - self.ctx.username = None - self.ctx.password = None - self.ctx.trust_id = None - mock_ks.return_value.authenticate.return_value = False - - magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx) - - # Define wrapper for property or the property raises the exception - # outside of the assertRaises which fails the test - def get_admin_client(): - magnum_ks_client.admin_client - - self.assertRaises(exception.AuthorizationFailure, - get_admin_client) - - def test_trust_init_fail(self, mock_ks): - """Test consuming a trust when initializing, error scoping.""" - self.ctx.username = None - self.ctx.auth_token = None - self.ctx.trust_id = 'atrust123' - mock_ks.return_value.auth_ref.trust_scoped = False - - self.assertRaises(exception.AuthorizationFailure, - magnum_keystoneclient.KeystoneClientV3, self.ctx) - - def test_trust_init_token(self, mock_ks): - """Test trust_id takes precedence when token specified.""" - self.ctx.username = None - self.ctx.trust_id = 'atrust123' - magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx) - self.assertIsNotNone(magnum_ks_client._client) - mock_ks.assert_called_once_with(username='magnum', - auth_url='http://server.test:5000/v3', - password='verybadpass', - endpoint='http://server.test:5000/v3', - trust_id='atrust123') - mock_ks.return_value.authenticate.assert_called_once_with() - - def test_delete_trust(self, mock_ks): - """Test delete_trust when deleting trust.""" - mock_ks.return_value.trusts.delete.return_value = None - magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx) - self.assertIsNone(magnum_ks_client.delete_trust(trust_id='atrust123')) - mock_ks.return_value.trusts.delete.assert_called_once_with('atrust123') - - def test_delete_trust_not_found(self, mock_ks): - """Test delete_trust when trust already deleted.""" - mock_delete = mock_ks.return_value.trusts.delete - mock_delete.side_effect = kc_exception.NotFound() - magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx) - self.assertIsNone(magnum_ks_client.delete_trust(trust_id='atrust123')) - - @mock.patch.object(magnum_keystoneclient.KeystoneClientV3, - '_service_admin_creds') - def test_client_is_admin(self, mock_admin_creds, mock_ks): - """Test client is admin when passing an admin_context.""" - self.ctx.is_admin = True - magnum_ks_client = magnum_keystoneclient.KeystoneClientV3(self.ctx) - magnum_ks_client.client - - self.assertIsNone(magnum_ks_client._client) - self.assertIsNotNone(magnum_ks_client._admin_client) - mock_admin_creds.assert_called_once_with()