From 585c525403ac672c4aa7534781e6e4644c6b871c Mon Sep 17 00:00:00 2001 From: Jamie Lennox Date: Fri, 22 Jan 2016 13:37:03 +1100 Subject: [PATCH] Allow parameter expansion in endpoint_override Allow %(project_id)s and %(user_id)s in an endpoint_override string to be replaced with the appropriate values from the authentication. This makes it easier to use and specify overrides from the command line or scripts where we may not know the project_id/user_id ahead of time. Change-Id: Ia09f5a88cb6590113dab26796c58e708014e98f7 Closes-Bug: #1536874 --- keystoneauth1/session.py | 30 ++++++++- keystoneauth1/tests/unit/test_session.py | 80 ++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 2 deletions(-) diff --git a/keystoneauth1/session.py b/keystoneauth1/session.py index a77af39d..dc626884 100644 --- a/keystoneauth1/session.py +++ b/keystoneauth1/session.py @@ -71,6 +71,28 @@ class _JSONEncoder(json.JSONEncoder): return super(_JSONEncoder, self).default(o) +class _StringFormatter(object): + """A String formatter that fetches values on demand""" + + def __init__(self, session, auth): + self.session = session + self.auth = auth + + def __getitem__(self, item): + if item == 'project_id': + value = self.session.get_project_id(self.auth) + elif item == 'user_id': + value = self.session.get_user_id(self.auth) + else: + raise AttributeError(item) + + if not value: + raise ValueError("This type of authentication does not provide a " + "%s that can be substituted" % item) + + return value + + class Session(object): """Maintains client communication state and common functionality. @@ -302,7 +324,11 @@ class Session(object): endpoint in the auth plugin. This will be ignored if a fully qualified URL is provided but take priority over an - endpoint_filter. (optional) + endpoint_filter. This string may contain + the values %(project_id)s and %(user_id)s + to have those values replaced by the + project_id/user_id of the current + authentication. (optional) :param auth: The auth plugin to use when authenticating this request. This will override the plugin that is attached to the session (if any). (optional) @@ -360,7 +386,7 @@ class Session(object): base_url = None if endpoint_override: - base_url = endpoint_override + base_url = endpoint_override % _StringFormatter(self, auth) elif endpoint_filter: base_url = self.get_endpoint(auth, **endpoint_filter) diff --git a/keystoneauth1/tests/unit/test_session.py b/keystoneauth1/tests/unit/test_session.py index 630a1616..508c56a3 100644 --- a/keystoneauth1/tests/unit/test_session.py +++ b/keystoneauth1/tests/unit/test_session.py @@ -25,6 +25,7 @@ from keystoneauth1 import exceptions from keystoneauth1 import plugin from keystoneauth1 import session as client_session from keystoneauth1.tests.unit import utils +from keystoneauth1 import token_endpoint class SessionTests(utils.TestCase): @@ -366,12 +367,16 @@ class AuthPlugin(plugin.BaseAuthPlugin): class CalledAuthPlugin(plugin.BaseAuthPlugin): ENDPOINT = 'http://fakeendpoint/' + USER_ID = uuid.uuid4().hex + PROJECT_ID = uuid.uuid4().hex def __init__(self, invalidate=True): self.get_token_called = False self.get_endpoint_called = False self.endpoint_arguments = {} self.invalidate_called = False + self.get_project_id_called = False + self.get_user_id_called = False self._invalidate = invalidate def get_token(self, session): @@ -387,6 +392,14 @@ class CalledAuthPlugin(plugin.BaseAuthPlugin): self.invalidate_called = True return self._invalidate + def get_project_id(self, session, **kwargs): + self.get_project_id_called = True + return self.PROJECT_ID + + def get_user_id(self, session, **kwargs): + self.get_user_id_called = True + return self.USER_ID + class SessionAuthTests(utils.TestCase): @@ -574,6 +587,9 @@ class SessionAuthTests(utils.TestCase): self.assertTrue(auth.get_token_called) self.assertFalse(auth.get_endpoint_called) + self.assertFalse(auth.get_user_id_called) + self.assertFalse(auth.get_project_id_called) + def test_endpoint_override_ignore_full_url(self): auth = CalledAuthPlugin() sess = client_session.Session(auth=auth) @@ -594,6 +610,70 @@ class SessionAuthTests(utils.TestCase): self.assertTrue(auth.get_token_called) self.assertFalse(auth.get_endpoint_called) + self.assertFalse(auth.get_user_id_called) + self.assertFalse(auth.get_project_id_called) + + def test_endpoint_override_does_id_replacement(self): + auth = CalledAuthPlugin() + sess = client_session.Session(auth=auth) + + override_base = 'http://mytest/%(project_id)s/%(user_id)s' + path = 'path' + replacements = {'user_id': CalledAuthPlugin.USER_ID, + 'project_id': CalledAuthPlugin.PROJECT_ID} + override_url = override_base % replacements + '/' + path + resp_text = uuid.uuid4().hex + + self.requests_mock.get(override_url, text=resp_text) + + resp = sess.get(path, + endpoint_override=override_base, + endpoint_filter={'service_type': 'identity'}) + + self.assertEqual(resp_text, resp.text) + self.assertEqual(override_url, self.requests_mock.last_request.url) + + self.assertTrue(auth.get_token_called) + self.assertTrue(auth.get_user_id_called) + self.assertTrue(auth.get_project_id_called) + self.assertFalse(auth.get_endpoint_called) + + def test_endpoint_override_fails_to_replace_if_none(self): + # The token_endpoint plugin doesn't know user_id or project_id + auth = token_endpoint.Token(uuid.uuid4().hex, uuid.uuid4().hex) + sess = client_session.Session(auth=auth) + + override_base = 'http://mytest/%(project_id)s' + + e = self.assertRaises(ValueError, + sess.get, + '/path', + endpoint_override=override_base, + endpoint_filter={'service_type': 'identity'}) + + self.assertIn('project_id', str(e)) + override_base = 'http://mytest/%(user_id)s' + + e = self.assertRaises(ValueError, + sess.get, + '/path', + endpoint_override=override_base, + endpoint_filter={'service_type': 'identity'}) + self.assertIn('user_id', str(e)) + + def test_endpoint_override_fails_to_do_unknown_replacement(self): + auth = CalledAuthPlugin() + sess = client_session.Session(auth=auth) + + override_base = 'http://mytest/%(unknown_id)s' + + e = self.assertRaises(AttributeError, + sess.get, + '/path', + endpoint_override=override_base, + endpoint_filter={'service_type': 'identity'}) + self.assertIn('unknown_id', str(e)) + def test_user_and_project_id(self): auth = AuthPlugin() sess = client_session.Session(auth=auth)