Fix noauth support

This patch includes several improvements:
1. Use keystoneauth1 plugin mechanism for auth plugins.
2. Implements CinderNoAuthPlugin.
3. Deletes non-working cinderclient.auth_plugin module.
4. Deprecates --bypass-url in flavor of --os-endpoint to be consistent
with keystoneauth1 plugins.
5. Deprecates in --os-auth-system in falvor of --os-auth-type to be
consistent with keystoneauth1 plugins.

Both bypass_url and os_auth_system params are not changed for client
objects to not break backward compatibility for Python API.

How to use noauth with cinderclient CLI:
OS_USER_ID=userid OS_PROJECT_ID=projectis cinder --os-auth-type noauth
--os-endpoint=http://localhost:8776/v2 list

Change-Id: I3be59a5a39235acbc3334e0a0b797081507a5c88
Closes-Bug: #1657156
This commit is contained in:
Ivan Kolodyazhny 2017-01-31 14:33:32 +02:00
parent 29d29a7cd4
commit 60f92db704
11 changed files with 182 additions and 610 deletions

View File

@ -1,141 +0,0 @@
# Copyright 2013 OpenStack Foundation
# Copyright 2013 Spanish National Research Council.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import pkg_resources
from cinderclient import exceptions
from cinderclient import utils
logger = logging.getLogger(__name__)
_discovered_plugins = {}
def discover_auth_systems():
"""Discover the available auth-systems.
This won't take into account the old style auth-systems.
"""
ep_name = 'openstack.client.auth_plugin'
for ep in pkg_resources.iter_entry_points(ep_name):
try:
auth_plugin = ep.load()
except (ImportError, pkg_resources.UnknownExtra, AttributeError) as e:
logger.debug("ERROR: Cannot load auth plugin %s", ep.name)
logger.debug(e, exc_info=1)
else:
_discovered_plugins[ep.name] = auth_plugin
def load_auth_system_opts(parser):
"""Load options needed by the available auth-systems into a parser.
This function will try to populate the parser with options from the
available plugins.
"""
for name, auth_plugin in _discovered_plugins.items():
add_opts_fn = getattr(auth_plugin, "add_opts", None)
if add_opts_fn:
group = parser.add_argument_group("Auth-system '%s' options" %
name)
add_opts_fn(group)
def load_plugin(auth_system):
if auth_system in _discovered_plugins:
return _discovered_plugins[auth_system]()
# NOTE(aloga): If we arrive here, the plugin will be an old-style one,
# so we have to create a fake AuthPlugin for it.
return DeprecatedAuthPlugin(auth_system)
class BaseAuthPlugin(object):
"""Base class for authentication plugins.
An authentication plugin needs to override at least the authenticate
method to be a valid plugin.
"""
def __init__(self):
self.opts = {}
def get_auth_url(self):
"""Return the auth url for the plugin (if any)."""
return None
@staticmethod
def add_opts(parser):
"""Populate and return the parser with the options for this plugin.
If the plugin does not need any options, it should return the same
parser untouched.
"""
return parser
def parse_opts(self, args):
"""Parse the actual auth-system options if any.
This method is expected to populate the attribute self.opts with a
dict containing the options and values needed to make authentication.
If the dict is empty, the client should assume that it needs the same
options as the 'keystone' auth system (i.e. os_username and
os_password).
Returns the self.opts dict.
"""
return self.opts
def authenticate(self, cls, auth_url):
"""Authenticate using plugin defined method."""
raise exceptions.AuthSystemNotFound(self.auth_system)
class DeprecatedAuthPlugin(object):
"""Class to mimic the AuthPlugin class for deprecated auth systems.
Old auth systems only define two entry points: openstack.client.auth_url
and openstack.client.authenticate. This class will load those entry points
into a class similar to a valid AuthPlugin.
"""
def __init__(self, auth_system):
self.auth_system = auth_system
def authenticate(cls, auth_url):
raise exceptions.AuthSystemNotFound(self.auth_system)
self.opts = {}
self.get_auth_url = lambda: None
self.authenticate = authenticate
self._load_endpoints()
def _load_endpoints(self):
ep_name = 'openstack.client.auth_url'
fn = utils._load_entry_point(ep_name, name=self.auth_system)
if fn:
self.get_auth_url = fn
ep_name = 'openstack.client.authenticate'
fn = utils._load_entry_point(ep_name, name=self.auth_system)
if fn:
self.authenticate = fn
def parse_opts(self, args):
return self.opts

View File

@ -432,7 +432,7 @@ class HTTPClient(object):
version = get_volume_api_from_url(self.management_url)
except exceptions.UnsupportedVersion as e:
if self.management_url == self.bypass_url:
msg = (_("Invalid url was specified in --bypass-url or "
msg = (_("Invalid url was specified in --os-endpoint or "
"environment variable CINDERCLIENT_BYPASS_URL.\n"
"%s") % six.text_type(e.message))
else:
@ -530,8 +530,6 @@ class HTTPClient(object):
while auth_url:
if not self.auth_system or self.auth_system == 'keystone':
auth_url = self._v2_or_v3_auth(auth_url)
else:
auth_url = self._plugin_auth(auth_url)
# Are we acting on behalf of another user via an
# existing token? If so, our actual endpoints may
@ -585,9 +583,6 @@ class HTTPClient(object):
else:
raise exceptions.from_response(resp, body)
def _plugin_auth(self, auth_url):
return self.auth_plugin.authenticate(self, auth_url)
def _v2_or_v3_auth(self, url):
"""Authenticate against a v2.0 auth service."""
if self.version == "v3":
@ -649,8 +644,7 @@ def _construct_http_client(username=None, password=None, project_id=None,
auth=None, api_version=None,
**kwargs):
# Don't use sessions if third party plugin or bypass_url being used
if session and not auth_plugin and not bypass_url:
if session:
kwargs.setdefault('user_agent', 'python-cinderclient')
kwargs.setdefault('interface', endpoint_type)
return SessionClient(session=session,

View File

View File

@ -0,0 +1,77 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from keystoneauth1 import loading
from keystoneauth1 import plugin
class CinderNoAuthPlugin(plugin.BaseAuthPlugin):
def __init__(self, user_id, project_id=None, roles=None, endpoint=None):
self._user_id = user_id
self._project_id = project_id if project_id else user_id
self._endpoint = endpoint
self._roles = roles
self.auth_token = '%s:%s' % (self._user_id,
self._project_id)
def get_headers(self, session, **kwargs):
return {'x-user-id': self._user_id,
'x-project-id': self._project_id,
'X-Auth-Token': self.auth_token}
def get_user_id(self, session, **kwargs):
return self._user_id
def get_project_id(self, session, **kwargs):
return self._project_id
def get_endpoint(self, session, **kwargs):
return '%s/%s' % (self._endpoint, self._project_id)
def invalidate(self):
pass
class CinderOpt(loading.Opt):
@property
def argparse_args(self):
return ['--%s' % o.name for o in self._all_opts]
@property
def argparse_default(self):
# select the first ENV that is not false-y or return None
for o in self._all_opts:
v = os.environ.get('Cinder_%s' % o.name.replace('-', '_').upper())
if v:
return v
return self.default
class CinderNoAuthLoader(loading.BaseLoader):
plugin_class = CinderNoAuthPlugin
def get_options(self):
options = super(CinderNoAuthLoader, self).get_options()
options.extend([
CinderOpt('user-id', help='User ID', required=True,
metavar="<cinder user id>"),
CinderOpt('project-id', help='Project ID',
metavar="<cinder project id>"),
CinderOpt('endpoint', help='Cinder endpoint',
dest="endpoint", required=True,
metavar="<cinder endpoint>"),
])
return options

View File

@ -25,6 +25,9 @@ import getpass
import logging
import sys
import requests
import six
from keystoneauth1 import discover
from keystoneauth1 import loading
from keystoneauth1 import session
@ -38,13 +41,13 @@ import requests
import six
import six.moves.urllib.parse as urlparse
import cinderclient
from cinderclient import api_versions
from cinderclient import client
from cinderclient import exceptions as exc
from cinderclient import utils
from cinderclient import _i18n
from cinderclient._i18n import _
from cinderclient import utils
import cinderclient.auth_plugin
# Enable i18n lazy translation
@ -138,12 +141,21 @@ class OpenStackCinderShell(object):
help=_('Shows debugging output.'))
parser.add_argument('--os-auth-system',
metavar='<auth-system>',
default=utils.env('OS_AUTH_SYSTEM'),
help=_('Defaults to env[OS_AUTH_SYSTEM].'))
metavar='<os-auth-system>',
dest='os_auth_type',
default=utils.env('OS_AUTH_SYSTEM',
default=utils.env('OS_AUTH_TYPE')),
help=_('DEPRECATED! Use --os-auth-type.'
'Defaults to env[OS_AUTH_SYSTEM].'))
parser.add_argument('--os_auth_system',
help=argparse.SUPPRESS)
parser.add_argument('--os-auth-type',
metavar='<os-auth-type>',
dest='os_auth_type',
default=utils.env('OS_AUTH_TYPE'),
help=_('Defaults to env[OS_AUTH_TYPE].'))
parser.add_argument('--os_auth_type',
help=argparse.SUPPRESS)
parser.add_argument('--service-type',
metavar='<service-type>',
help=_('Service type. '
@ -200,14 +212,26 @@ class OpenStackCinderShell(object):
parser.add_argument('--bypass-url',
metavar='<bypass-url>',
dest='bypass_url',
default=utils.env('CINDERCLIENT_BYPASS_URL'),
help=_("Use this API endpoint instead of the "
dest='os_endpoint',
default=utils.env('CINDERCLIENT_BYPASS_URL',
default=utils.env('CINDER_ENDPOINT')),
help=_("DEPRECATED! Use os_endpoint. "
"Use this API endpoint instead of the "
"Service Catalog. Defaults to "
"env[CINDERCLIENT_BYPASS_URL]."))
parser.add_argument('--bypass_url',
help=argparse.SUPPRESS)
parser.add_argument('--os-endpoint',
metavar='<os-endpoint>',
dest='os_endpoint',
default=utils.env('CINDER_ENDPOINT'),
help=_("Use this API endpoint instead of the "
"Service Catalog. Defaults to "
"env[CINDER_ENDPOINT]."))
parser.add_argument('--os_endpoint',
help=argparse.SUPPRESS)
parser.add_argument('--retries',
metavar='<retries>',
type=int,
@ -227,10 +251,6 @@ class OpenStackCinderShell(object):
self._append_global_identity_args(parser)
# The auth-system-plugins might require some extra options
cinderclient.auth_plugin.discover_auth_systems()
cinderclient.auth_plugin.load_auth_system_opts(parser)
return parser
def _append_global_identity_args(self, parser):
@ -608,18 +628,23 @@ class OpenStackCinderShell(object):
(os_username, os_password, os_tenant_name, os_auth_url,
os_region_name, os_tenant_id, endpoint_type,
service_type, service_name, volume_service_name, bypass_url,
cacert, os_auth_system) = (
service_type, service_name, volume_service_name, os_endpoint,
cacert, os_auth_type) = (
args.os_username, args.os_password,
args.os_tenant_name, args.os_auth_url,
args.os_region_name, args.os_tenant_id,
args.os_endpoint_type,
args.service_type, args.service_name,
args.volume_service_name,
args.bypass_url, args.os_cacert,
args.os_auth_system)
if os_auth_system and os_auth_system != "keystone":
auth_plugin = cinderclient.auth_plugin.load_plugin(os_auth_system)
args.os_endpoint, args.os_cacert,
args.os_auth_type)
auth_session = None
if os_auth_type and os_auth_type != "keystone":
auth_plugin = loading.load_auth_from_argparse_arguments(
self.options)
auth_session = loading.load_session_from_argparse_arguments(
self.options, auth=auth_plugin)
else:
auth_plugin = None
@ -637,16 +662,10 @@ class OpenStackCinderShell(object):
self.options.os_project_domain_id)) or
self.options.os_project_id)
if not utils.isunauthenticated(args.func):
if auth_plugin:
auth_plugin.parse_opts(args)
if not auth_plugin or not auth_plugin.opts:
if not os_username:
raise exc.CommandError("You must provide a user name "
"through --os-username or "
"env[OS_USERNAME].")
# NOTE(e0ne): if auth_session exists it means auth plugin created
# session and we don't need to check for password and other
# authentification-related things.
if not utils.isunauthenticated(args.func) and not auth_session:
if not os_password:
# No password, If we've got a tty, try prompting for it
if hasattr(sys.stdin, 'isatty') and sys.stdin.isatty():
@ -681,10 +700,6 @@ class OpenStackCinderShell(object):
"(env[OS_PROJECT_DOMAIN_NAME])"
))
if not os_auth_url:
if os_auth_system and os_auth_system != 'keystone':
os_auth_url = auth_plugin.get_auth_url()
if not os_auth_url:
raise exc.CommandError(
"You must provide an authentication URL "
@ -705,13 +720,12 @@ class OpenStackCinderShell(object):
"(env[OS_PROJECT_DOMAIN_NAME])"
))
if not os_auth_url:
if not os_auth_url and not auth_plugin:
raise exc.CommandError(
"You must provide an authentication URL "
"through --os-auth-url or env[OS_AUTH_URL].")
auth_session = None
if not auth_plugin:
if not auth_session:
auth_session = self._get_keystone_session()
insecure = self.options.insecure
@ -726,11 +740,11 @@ class OpenStackCinderShell(object):
service_type=service_type,
service_name=service_name,
volume_service_name=volume_service_name,
bypass_url=bypass_url,
bypass_url=os_endpoint,
retries=options.retries,
http_log_debug=args.debug,
insecure=insecure,
cacert=cacert, auth_system=os_auth_system,
cacert=cacert, auth_system=os_auth_type,
auth_plugin=auth_plugin,
session=auth_session,
logger=self.ks_logger if auth_session else self.client_logger)

View File

@ -13,380 +13,31 @@
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import mock
import pkg_resources
import requests
try:
import json
except ImportError:
import simplejson as json
from cinderclient import auth_plugin
from cinderclient import exceptions
from cinderclient.contrib import noauth
from cinderclient.tests.unit import utils
from cinderclient.v1 import client
def mock_http_request(resp=None):
"""Mock an HTTP Request."""
if not resp:
resp = {
"access": {
"token": {
"expires": "12345",
"id": "FAKE_ID",
"tenant": {
"id": "FAKE_TENANT_ID",
}
},
"serviceCatalog": [
{
"type": "volume",
"endpoints": [
{
"region": "RegionOne",
"adminURL": "http://localhost:8774/v1",
"internalURL": "http://localhost:8774/v1",
"publicURL": "http://localhost:8774/v1/",
},
],
},
{
"type": "volumev2",
"endpoints": [
{
"region": "RegionOne",
"adminURL": "http://localhost:8774/v2",
"internalURL": "http://localhost:8774/v2",
"publicURL": "http://localhost:8774/v2/",
},
],
},
{
"type": "volumev3",
"endpoints": [
{
"region": "RegionOne",
"adminURL": "http://localhost:8774/v3",
"internalURL": "http://localhost:8774/v3",
"publicURL": "http://localhost:8774/v3/",
},
],
},
],
},
}
class CinderNoAuthPluginTest(utils.TestCase):
def setUp(self):
super(CinderNoAuthPluginTest, self).setUp()
self.plugin = noauth.CinderNoAuthPlugin('user', 'project',
endpoint='example.com')
auth_response = utils.TestResponse({
"status_code": 200,
"text": json.dumps(resp),
})
return mock.Mock(return_value=(auth_response))
def test_auth_token(self):
auth_token = 'user:project'
self.assertEqual(auth_token, self.plugin.auth_token)
def test_auth_token_no_project(self):
auth_token = 'user:user'
plugin = noauth.CinderNoAuthPlugin('user')
self.assertEqual(auth_token, plugin.auth_token)
def requested_headers(cs):
"""Return requested passed headers."""
return {
'User-Agent': cs.client.USER_AGENT,
'Content-Type': 'application/json',
'Accept': 'application/json',
}
def test_get_headers(self):
headers = {'x-user-id': 'user',
'x-project-id': 'project',
'X-Auth-Token': 'user:project'}
self.assertEqual(headers, self.plugin.get_headers(None))
class DeprecatedAuthPluginTest(utils.TestCase):
def test_auth_system_success(self):
class MockEntrypoint(pkg_resources.EntryPoint):
def load(self):
return self.authenticate
def authenticate(self, cls, auth_url):
cls._authenticate(auth_url, {"fake": "me"})
def mock_iter_entry_points(_type, name):
if _type == 'openstack.client.authenticate':
return [MockEntrypoint("fake", "fake", ["fake"])]
else:
return []
mock_request = mock_http_request()
@mock.patch.object(pkg_resources, "iter_entry_points",
mock_iter_entry_points)
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
plugin = auth_plugin.DeprecatedAuthPlugin("fake")
cs = client.Client("username", "password", "project_id",
"auth_url/v2.0", auth_system="fake",
auth_plugin=plugin)
cs.client.authenticate()
headers = requested_headers(cs)
token_url = cs.client.auth_url + "/tokens"
mock_request.assert_called_with(
"POST",
token_url,
headers=headers,
data='{"fake": "me"}',
allow_redirects=True,
**self.TEST_REQUEST_BASE)
test_auth_call()
def test_auth_system_not_exists(self):
def mock_iter_entry_points(_t, name=None):
return [pkg_resources.EntryPoint("fake", "fake", ["fake"])]
mock_request = mock_http_request()
@mock.patch.object(pkg_resources, "iter_entry_points",
mock_iter_entry_points)
@mock.patch.object(requests.Session, "request", mock_request)
def test_auth_call():
auth_plugin.discover_auth_systems()
plugin = auth_plugin.DeprecatedAuthPlugin("notexists")
cs = client.Client("username", "password", "project_id",
"auth_url/v2.0", auth_system="notexists",
auth_plugin=plugin)
self.assertRaises(exceptions.AuthSystemNotFound,
cs.client.authenticate)
test_auth_call()
def test_auth_system_defining_auth_url(self):
class MockAuthUrlEntrypoint(pkg_resources.EntryPoint):
def load(self):
return self.auth_url
def auth_url(self):
return "http://faked/v2.0"
class MockAuthenticateEntrypoint(pkg_resources.EntryPoint):
def load(self):
return self.authenticate
def authenticate(self, cls, auth_url):
cls._authenticate(auth_url, {"fake": "me"})
def mock_iter_entry_points(_type, name):
if _type == 'openstack.client.auth_url':
return [MockAuthUrlEntrypoint("fakewithauthurl",
"fakewithauthurl",
["auth_url"])]
elif _type == 'openstack.client.authenticate':
return [MockAuthenticateEntrypoint("fakewithauthurl",
"fakewithauthurl",
["authenticate"])]
else:
return []
mock_request = mock_http_request()
@mock.patch.object(pkg_resources, "iter_entry_points",
mock_iter_entry_points)
@mock.patch.object(requests.Session, "request", mock_request)
def test_auth_call():
plugin = auth_plugin.DeprecatedAuthPlugin("fakewithauthurl")
cs = client.Client("username", "password", "project_id",
auth_system="fakewithauthurl",
auth_plugin=plugin)
cs.client.authenticate()
self.assertEqual("http://faked/v2.0", cs.client.auth_url)
test_auth_call()
@mock.patch.object(pkg_resources, "iter_entry_points")
def test_client_raises_exc_without_auth_url(self, mock_iter_entry_points):
class MockAuthUrlEntrypoint(pkg_resources.EntryPoint):
def load(self):
return self.auth_url
def auth_url(self):
return None
mock_iter_entry_points.side_effect = lambda _t, name: [
MockAuthUrlEntrypoint("fakewithauthurl",
"fakewithauthurl",
["auth_url"])]
plugin = auth_plugin.DeprecatedAuthPlugin("fakewithauthurl")
self.assertRaises(
exceptions.EndpointNotFound,
client.Client, "username", "password", "project_id",
auth_system="fakewithauthurl", auth_plugin=plugin)
class AuthPluginTest(utils.TestCase):
@mock.patch.object(requests, "request")
@mock.patch.object(pkg_resources, "iter_entry_points")
def _test_auth_success(self, mock_iter_entry_points, mock_request,
**client_kwargs):
"""Generic test that we can authenticate using the auth system."""
class MockEntrypoint(pkg_resources.EntryPoint):
def load(self):
return FakePlugin
class FakePlugin(auth_plugin.BaseAuthPlugin):
def authenticate(self, cls, auth_url):
cls._authenticate(auth_url, {"fake": "me"})
mock_iter_entry_points.side_effect = lambda _t: [
MockEntrypoint("fake", "fake", ["FakePlugin"])]
mock_request.side_effect = mock_http_request()
auth_plugin.discover_auth_systems()
plugin = auth_plugin.load_plugin("fake")
cs = client.Client("username", "password", "project_id",
"auth_url/v2.0", auth_system="fake",
auth_plugin=plugin, **client_kwargs)
cs.client.authenticate()
headers = requested_headers(cs)
token_url = cs.client.auth_url + "/tokens"
mock_request.assert_called_with(
"POST",
token_url,
headers=headers,
data='{"fake": "me"}',
allow_redirects=True,
**self.TEST_REQUEST_BASE)
return cs.client
def test_auth_system_success(self):
"""Test that we can authenticate using the auth system."""
c = self._test_auth_success()
self.assertIsNone(c.bypass_url)
self.assertIsNone(c.proxy_token)
def test_auth_bypass_url(self):
"""Test that we can authenticate with bypass URL."""
c = self._test_auth_success(bypass_url='auth_url2/v2.0')
self.assertEqual('auth_url2/v2.0', c.bypass_url)
self.assertEqual('auth_url2/v2.0', c.management_url)
self.assertIsNone(c.proxy_token)
def test_auth_bypass_url_proxy_token(self):
"""Test that we can authenticate with bypass URL and proxy token."""
c = self._test_auth_success(proxy_token='abc',
bypass_url='auth_url2/v2.0')
self.assertEqual('auth_url2/v2.0', c.bypass_url)
self.assertEqual('auth_url2/v2.0', c.management_url)
self.assertEqual('abc', c.proxy_token)
@mock.patch.object(pkg_resources, "iter_entry_points")
def test_discover_auth_system_options(self, mock_iter_entry_points):
"""Test that we can load the auth system options."""
class FakePlugin(auth_plugin.BaseAuthPlugin):
@staticmethod
def add_opts(parser):
parser.add_argument('--auth_system_opt',
default=False,
action='store_true',
help="Fake option")
return parser
class MockEntrypoint(pkg_resources.EntryPoint):
def load(self):
return FakePlugin
mock_iter_entry_points.side_effect = lambda _t: [
MockEntrypoint("fake", "fake", ["FakePlugin"])]
parser = argparse.ArgumentParser()
auth_plugin.discover_auth_systems()
auth_plugin.load_auth_system_opts(parser)
opts, args = parser.parse_known_args(['--auth_system_opt'])
self.assertTrue(opts.auth_system_opt)
@mock.patch.object(pkg_resources, "iter_entry_points")
def test_parse_auth_system_options(self, mock_iter_entry_points):
"""Test that we can parse the auth system options."""
class MockEntrypoint(pkg_resources.EntryPoint):
def load(self):
return FakePlugin
class FakePlugin(auth_plugin.BaseAuthPlugin):
def __init__(self):
self.opts = {"fake_argument": True}
def parse_opts(self, args):
return self.opts
mock_iter_entry_points.side_effect = lambda _t: [
MockEntrypoint("fake", "fake", ["FakePlugin"])]
auth_plugin.discover_auth_systems()
plugin = auth_plugin.load_plugin("fake")
plugin.parse_opts([])
self.assertIn("fake_argument", plugin.opts)
@mock.patch.object(pkg_resources, "iter_entry_points")
def test_auth_system_defining_url(self, mock_iter_entry_points):
"""Test the auth_system defining an url."""
class MockEntrypoint(pkg_resources.EntryPoint):
def load(self):
return FakePlugin
class FakePlugin(auth_plugin.BaseAuthPlugin):
def get_auth_url(self):
return "http://faked/v2.0"
mock_iter_entry_points.side_effect = lambda _t: [
MockEntrypoint("fake", "fake", ["FakePlugin"])]
auth_plugin.discover_auth_systems()
plugin = auth_plugin.load_plugin("fake")
cs = client.Client("username", "password", "project_id",
auth_system="fakewithauthurl",
auth_plugin=plugin)
self.assertEqual("http://faked/v2.0", cs.client.auth_url)
@mock.patch.object(pkg_resources, "iter_entry_points")
def test_exception_if_no_authenticate(self, mock_iter_entry_points):
"""Test that no authenticate raises a proper exception."""
class MockEntrypoint(pkg_resources.EntryPoint):
def load(self):
return FakePlugin
class FakePlugin(auth_plugin.BaseAuthPlugin):
pass
mock_iter_entry_points.side_effect = lambda _t: [
MockEntrypoint("fake", "fake", ["FakePlugin"])]
auth_plugin.discover_auth_systems()
plugin = auth_plugin.load_plugin("fake")
self.assertRaises(
exceptions.EndpointNotFound,
client.Client, "username", "password", "project_id",
auth_system="fake", auth_plugin=plugin)
@mock.patch.object(pkg_resources, "iter_entry_points")
def test_exception_if_no_url(self, mock_iter_entry_points):
"""Test that no auth_url at all raises exception."""
class MockEntrypoint(pkg_resources.EntryPoint):
def load(self):
return FakePlugin
class FakePlugin(auth_plugin.BaseAuthPlugin):
pass
mock_iter_entry_points.side_effect = lambda _t: [
MockEntrypoint("fake", "fake", ["FakePlugin"])]
auth_plugin.discover_auth_systems()
plugin = auth_plugin.load_plugin("fake")
self.assertRaises(
exceptions.EndpointNotFound,
client.Client, "username", "password", "project_id",
auth_system="fake", auth_plugin=plugin)
def test_get_endpoint(self):
endpoint = 'example.com/project'
self.assertEqual(endpoint, self.plugin.get_endpoint(None))

View File

@ -47,14 +47,14 @@ class ClientTest(utils.TestCase):
@mock.patch.object(cinderclient.client.HTTPClient, '__init__')
@mock.patch('cinderclient.client.SessionClient')
def test_construct_http_client_bypass_url(
def test_construct_http_client_endpoint_url(
self, session_mock, httpclient_mock):
bypass_url = 'http://example.com/'
os_endpoint = 'http://example.com/'
httpclient_mock.return_value = None
cinderclient.client._construct_http_client(
bypass_url=bypass_url)
bypass_url=os_endpoint)
self.assertTrue(httpclient_mock.called)
self.assertEqual(bypass_url,
self.assertEqual(os_endpoint,
httpclient_mock.call_args[1].get('bypass_url'))
session_mock.assert_not_called()

View File

@ -83,7 +83,7 @@ def get_authed_client(retries=0):
return cl
def get_authed_bypass_url(retries=0):
def get_authed_endpoint_url(retries=0):
cl = client.HTTPClient("username", "password",
"project_id", "auth_test",
bypass_url="volume/v100/", retries=retries)
@ -284,8 +284,8 @@ class ClientTest(utils.TestCase):
test_post_call()
def test_bypass_url(self):
cl = get_authed_bypass_url()
def test_os_endpoint_url(self):
cl = get_authed_endpoint_url()
self.assertEqual("volume/v100", cl.bypass_url)
self.assertEqual("volume/v100", cl.management_url)

View File

@ -21,20 +21,16 @@ import keystoneauth1.exceptions as ks_exc
from keystoneauth1.exceptions import DiscoveryFailure
from keystoneauth1 import session
import mock
import pkg_resources
import requests_mock
import requests
from six import moves
from testtools import matchers
import cinderclient
from cinderclient import api_versions
from cinderclient.contrib import noauth
from cinderclient import exceptions
from cinderclient import auth_plugin
from cinderclient import shell
from cinderclient.tests.unit import fake_actions_module
from cinderclient.tests.unit.test_auth_plugins import mock_http_request
from cinderclient.tests.unit.test_auth_plugins import requested_headers
from cinderclient.tests.unit.fixture_data import keystone_client
from cinderclient.tests.unit import utils
@ -173,49 +169,16 @@ class ShellTest(utils.TestCase):
tenant_name=self.FAKE_ENV['OS_TENANT_NAME'],
username=self.FAKE_ENV['OS_USERNAME'])
@mock.patch.object(requests, "request")
@mock.patch.object(pkg_resources, "iter_entry_points")
def test_auth_system_not_keystone(self, mock_iter_entry_points,
mock_request):
"""Test that we can authenticate using the auth plugin system."""
non_keystone_auth_url = "http://non-keystone-url.com/v2.0"
class MockEntrypoint(pkg_resources.EntryPoint):
def load(self):
return FakePlugin
class FakePlugin(auth_plugin.BaseAuthPlugin):
def authenticate(self, cls, auth_url):
cls._authenticate(auth_url, {"fake": "me"})
def get_auth_url(self):
return non_keystone_auth_url
mock_iter_entry_points.side_effect = lambda _t: [
MockEntrypoint("fake", "fake", ["FakePlugin"])]
mock_request.side_effect = mock_http_request()
# Tell the shell we wish to use our 'fake' auth instead of keystone
# and the auth plugin will provide the auth url
self.make_env(exclude="OS_AUTH_URL",
include={'OS_AUTH_SYSTEM': 'fake'})
# This should fail as we have not setup a mock response for 'list',
# however auth should have been called
def test_noauth_plugin(self):
_shell = shell.OpenStackCinderShell()
self.assertRaises(KeyError, _shell.main, ['list'])
args = ['--os-endpoint', 'http://example.com/v2',
'--os-auth-type', 'noauth', '--os-user-id',
'admin', '--os-project-id', 'admin', 'list']
headers = requested_headers(_shell.cs)
token_url = _shell.cs.client.auth_url + "/tokens"
self.assertEqual(non_keystone_auth_url + "/tokens", token_url)
mock_request.assert_any_call(
"POST",
token_url,
headers=headers,
data='{"fake": "me"}',
allow_redirects=True,
**self.TEST_REQUEST_BASE)
# This "fails" but instantiates the client with session
self.assertRaises(exceptions.NotFound, _shell.main, args)
self.assertIsInstance(_shell.cs.client.session.auth,
noauth.CinderNoAuthPlugin)
@mock.patch.object(cinderclient.client.HTTPClient, 'authenticate',
side_effect=exceptions.Unauthorized('No'))

View File

@ -0,0 +1,11 @@
---
features:
- |
Cinderclient now supports noauth mode using `--os-auth-type noauth`
param. Also python-cinderclient now supports keystoneauth1 plugins.
deprecations:
- |
--bypass-url param is now deprecated. Please use --os-endpoint instead
of it.
--os-auth-system param is now deprecated. Please --os-auth-type instead of
it.

View File

@ -34,6 +34,9 @@ packages =
console_scripts =
cinder = cinderclient.shell:main
keystoneauth1.plugin =
noauth = cinderclient.contrib.noauth:CinderNoAuthLoader
[build_sphinx]
all_files = 1
source-dir = doc/source