Allow loading auth plugins via overrides

Swift configures auth_token purely by paste options, not oslo.config.
This means that we cannot rely on purely using the keystoneclient
load_from_config_options for auth plugins.

Copy the logic from keystoneclient regarding auth plugin loading from
config files and make it specific to the _conf_get that auth_token
middleware uses so that the auth plugin options obey options from paste
as other options do.

This will be replaced with a keystoneclient helper as soon as possible,
however we want to fix the swift issue before that happens and we will
still be compatible going forward.

Change-Id: I54ac4c566cb798196ea18e24d4ce868450f269f3
Closes-Bug: #1428900
This commit is contained in:
Jamie Lennox 2015-03-06 11:28:21 +11:00
parent f85cf357e5
commit 06bdfc886f
2 changed files with 151 additions and 65 deletions

View File

@ -534,12 +534,12 @@ class AuthProtocol(object):
'check_revocations_for_cached')
self._init_auth_headers()
def _conf_get(self, name):
def _conf_get(self, name, group=_base.AUTHTOKEN_GROUP):
# try config from paste-deploy first
if name in self._conf:
return self._conf[name]
else:
return CONF.keystone_authtoken[name]
return CONF[group][name]
def _call_app(self, env, start_response):
# NOTE(jamielennox): We wrap the given start response so that if an
@ -1016,6 +1016,42 @@ class AuthProtocol(object):
self._SIGNING_CA_FILE_NAME,
self._identity_server.fetch_ca_cert())
def _get_auth_plugin(self):
# NOTE(jamielennox): Ideally this would use get_from_conf_options
# however that is not possible because we have to support the override
# pattern we use in _conf_get. There is a somewhat replacement for this
# in keystoneclient in load_from_options_getter which should be used
# when available. Until then this is essentially a copy and paste of
# the ksc load_from_conf_options code because we need to get a fix out
# for this quickly.
# FIXME(jamielennox): update to use load_from_options_getter when
# https://review.openstack.org/162529 merges.
# !!! - UNDER NO CIRCUMSTANCES COPY ANY OF THIS CODE - !!!
group = self._conf_get('auth_section') or _base.AUTHTOKEN_GROUP
plugin_name = self._conf_get('auth_plugin', group=group)
plugin_kwargs = dict()
if plugin_name:
plugin_class = auth.get_plugin_class(plugin_name)
else:
plugin_class = _auth.AuthTokenPlugin
# logger object is a required parameter of the default plugin
plugin_kwargs['log'] = self._LOG
plugin_opts = plugin_class.get_options()
CONF.register_opts(plugin_opts, group=group)
for opt in plugin_opts:
val = self._conf_get(opt.dest, group=group)
if val is not None:
val = opt.type(val)
plugin_kwargs[opt.dest] = val
return plugin_class.load_from_options(**plugin_kwargs)
def _create_identity_server(self):
# NOTE(jamielennox): Loading Session here should be exactly the
# same as calling Session.load_from_conf_options(CONF, GROUP)
@ -1029,30 +1065,7 @@ class AuthProtocol(object):
timeout=self._conf_get('http_connect_timeout')
))
# NOTE(jamielennox): The original auth mechanism allowed deployers
# to configure authentication information via paste file. These
# are accessible via _conf_get, however this doesn't work with the
# plugin loading mechanisms. For using auth plugins we only support
# configuring via the CONF file.
auth_plugin = auth.load_from_conf_options(CONF, _base.AUTHTOKEN_GROUP)
if not auth_plugin:
# NOTE(jamielennox): Loading AuthTokenPlugin here should be
# exactly the same as calling
# _AuthTokenPlugin.load_from_conf_options(CONF, GROUP) however
# we can't do that because we have to use _conf_get to support
# the paste.ini options.
auth_plugin = _auth.AuthTokenPlugin.load_from_options(
auth_host=self._conf_get('auth_host'),
auth_port=int(self._conf_get('auth_port')),
auth_protocol=self._conf_get('auth_protocol'),
auth_admin_prefix=self._conf_get('auth_admin_prefix'),
admin_user=self._conf_get('admin_user'),
admin_password=self._conf_get('admin_password'),
admin_tenant_name=self._conf_get('admin_tenant_name'),
admin_token=self._conf_get('admin_token'),
identity_uri=self._conf_get('identity_uri'),
log=self._LOG)
auth_plugin = self._get_auth_plugin()
adap = adapter.Adapter(
sess,

View File

@ -2555,6 +2555,7 @@ class OtherTests(BaseAuthTokenMiddlewareTest):
def setUp(self):
super(OtherTests, self).setUp()
self.logger = self.useFixture(fixtures.FakeLogger())
self.cfg = self.useFixture(cfg_fixture.Config())
def test_unknown_server_versions(self):
versions = fixture.DiscoveryList(v2=False, v3_id='v4', href=BASE_URI)
@ -2616,6 +2617,49 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
super(AuthProtocolLoadingTests, self).setUp()
self.cfg = self.useFixture(cfg_fixture.Config())
self.project_id = uuid.uuid4().hex
# first touch is to discover the available versions at the auth_url
self.requests.get(self.AUTH_URL,
json=fixture.DiscoveryList(href=self.DISC_URL),
status_code=300)
# then we do discovery on the URL from the service catalog. In practice
# this is mostly the same URL as before but test the full range.
self.requests.get(self.KEYSTONE_BASE_URL + '/',
json=fixture.DiscoveryList(href=self.CRUD_URL),
status_code=300)
def good_request(self, app):
# admin_token is the token that the service will get back from auth
admin_token_id = uuid.uuid4().hex
admin_token = fixture.V3Token(project_id=self.project_id)
s = admin_token.add_service('identity', name='keystone')
s.add_standard_endpoints(admin=self.KEYSTONE_URL)
self.requests.post(self.DISC_URL + '/v3/auth/tokens',
json=admin_token,
headers={'X-Subject-Token': admin_token_id})
# user_token is the data from the user's inputted token
user_token_id = uuid.uuid4().hex
user_token = fixture.V3Token()
user_token.set_project_scope()
request_headers = {'X-Subject-Token': user_token_id,
'X-Auth-Token': admin_token_id}
self.requests.get(self.CRUD_URL + '/v3/auth/tokens',
request_headers=request_headers,
json=user_token)
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = user_token_id
resp = app(req.environ, self.start_fake_response)
self.assertEqual(200, self.response_status)
return resp
def test_loading_password_plugin(self):
# the password options aren't set on config until loading time, but we
# need them set so we can override the values for testing, so force it
@ -2633,50 +2677,16 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
user_domain_id='userdomainid',
group=_base.AUTHTOKEN_GROUP)
# admin_token is the token that the service will get back from auth
admin_token_id = uuid.uuid4().hex
admin_token = fixture.V3Token(project_id=project_id)
s = admin_token.add_service('identity', name='keystone')
s.add_standard_endpoints(admin=self.KEYSTONE_URL)
# user_token is the data from the user's inputted token
user_token_id = uuid.uuid4().hex
user_token = fixture.V3Token()
user_token.set_project_scope()
# first touch is to discover the available versions at the auth_url
self.requests.get(self.AUTH_URL,
json=fixture.DiscoveryList(href=self.DISC_URL),
status_code=300)
# then we use the url returned from discovery to actually auth
self.requests.post(self.DISC_URL + '/v3/auth/tokens',
json=admin_token,
headers={'X-Subject-Token': admin_token_id})
# then we do discovery on the URL from the service catalog. In practice
# this is mostly the same URL as before but test the full range.
self.requests.get(self.KEYSTONE_BASE_URL + '/',
json=fixture.DiscoveryList(href=self.CRUD_URL),
status_code=300)
# actually authenticating the user will then use the base url that was
# retrieved from discovery from the service catalog.
self.requests.get(self.CRUD_URL + '/v3/auth/tokens',
request_headers={'X-Subject-Token': user_token_id,
'X-Auth-Token': admin_token_id},
json=user_token)
body = uuid.uuid4().hex
app = auth_token.AuthProtocol(new_app('200 OK', body)(), {})
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = user_token_id
resp = app(req.environ, self.start_fake_response)
self.assertEqual(200, self.response_status)
resp = self.good_request(app)
self.assertEqual(six.b(body), resp[0])
@staticmethod
def get_plugin(app):
return app._identity_server._adapter.auth
def test_invalid_plugin_fails_to_intialize(self):
self.cfg.config(auth_plugin=uuid.uuid4().hex,
group=_base.AUTHTOKEN_GROUP)
@ -2685,6 +2695,69 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
exceptions.NoMatchingPlugin,
lambda: auth_token.AuthProtocol(new_app('200 OK', '')(), {}))
def test_plugin_loading_mixed_opts(self):
# some options via override and some via conf
opts = auth.get_plugin_options('password')
self.cfg.register_opts(opts, group=_base.AUTHTOKEN_GROUP)
username = 'testuser'
password = 'testpass'
# configure the authentication options
self.cfg.config(auth_plugin='password',
password=password,
project_id=self.project_id,
user_domain_id='userdomainid',
group=_base.AUTHTOKEN_GROUP)
conf = {'username': username, 'auth_url': self.AUTH_URL}
body = uuid.uuid4().hex
app = auth_token.AuthProtocol(new_app('200 OK', body)(), conf)
resp = self.good_request(app)
self.assertEqual(six.b(body), resp[0])
plugin = self.get_plugin(app)
self.assertEqual(self.AUTH_URL, plugin.auth_url)
self.assertEqual(username, plugin._username)
self.assertEqual(password, plugin._password)
self.assertEqual(self.project_id, plugin._project_id)
def test_plugin_loading_with_auth_section(self):
# some options via override and some via conf
section = 'testsection'
username = 'testuser'
password = 'testpass'
auth.register_conf_options(self.cfg.conf, group=section)
opts = auth.get_plugin_options('password')
self.cfg.register_opts(opts, group=section)
# configure the authentication options
self.cfg.config(auth_section=section, group=_base.AUTHTOKEN_GROUP)
self.cfg.config(auth_plugin='password',
password=password,
project_id=self.project_id,
user_domain_id='userdomainid',
group=section)
conf = {'username': username, 'auth_url': self.AUTH_URL}
body = uuid.uuid4().hex
app = auth_token.AuthProtocol(new_app('200 OK', body)(), conf)
resp = self.good_request(app)
self.assertEqual(six.b(body), resp[0])
plugin = self.get_plugin(app)
self.assertEqual(self.AUTH_URL, plugin.auth_url)
self.assertEqual(username, plugin._username)
self.assertEqual(password, plugin._password)
self.assertEqual(self.project_id, plugin._project_id)
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)