Merge "Remove all code related to HTTPClient"

This commit is contained in:
Jenkins 2016-12-17 19:14:55 +00:00 committed by Gerrit Code Review
commit 8c0c50ff3d
11 changed files with 74 additions and 1887 deletions

View File

@ -20,35 +20,20 @@
OpenStack Client interface. Handles the REST calls and responses.
"""
import copy
import functools
import hashlib
import itertools
import logging
import pkgutil
import re
import warnings
from keystoneauth1 import adapter
from keystoneauth1 import identity
from keystoneauth1 import session as ksession
from oslo_utils import importutils
from oslo_utils import netutils
import pkg_resources
import requests
try:
import json
except ImportError:
import simplejson as json
from six.moves.urllib import parse
from novaclient import api_versions
from novaclient import exceptions
from novaclient import extension as ext
from novaclient.i18n import _, _LW
from novaclient import service_catalog
from novaclient import utils
@ -58,19 +43,6 @@ from novaclient import utils
extensions_ignored_name = ["__init__"]
class _ClientConnectionPool(object):
def __init__(self):
self._adapters = {}
def get(self, url):
"""Store and reuse HTTP adapters per Service URL."""
if url not in self._adapters:
self._adapters[url] = ksession.TCPKeepAliveAdapter()
return self._adapters[url]
def _log_request_id(logger, resp, service_name):
request_id = (resp.headers.get('x-openstack-request-id') or
resp.headers.get('x-compute-request-id'))
@ -138,564 +110,6 @@ class SessionClient(adapter.LegacyJsonAdapter):
self.endpoint_override = value
def _original_only(f):
"""Decorator to indicate and enforce original HTTPClient object.
Indicates and enforces that this function can only be used if we are
using the original HTTPClient object.
We use this to specify that if you use the newer Session HTTP client then
you are aware that the way you use your client has been updated and certain
functions are no longer allowed to be used.
"""
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
if isinstance(self.client, SessionClient):
msg = ('This call is no longer available. The operation should '
'be performed on the session object instead.')
raise exceptions.InvalidUsage(msg)
return f(self, *args, **kwargs)
return wrapper
class HTTPClient(object):
USER_AGENT = 'python-novaclient'
def __init__(self, user, password, projectid=None, auth_url=None,
insecure=False, timeout=None, proxy_tenant_id=None,
proxy_token=None, region_name=None,
endpoint_type='publicURL', service_type=None,
service_name=None, volume_service_name=None,
timings=False, bypass_url=None,
os_cache=False, no_cache=True,
http_log_debug=False, auth_token=None,
cacert=None, tenant_id=None, user_id=None,
connection_pool=False, api_version=None,
logger=None):
self.user = user
self.user_id = user_id
self.password = password
self.projectid = projectid
self.tenant_id = tenant_id
self.api_version = api_version or api_versions.APIVersion()
self._connection_pool = (_ClientConnectionPool()
if connection_pool else None)
# This will be called by #_get_password if self.password is None.
# EG if a password can only be obtained by prompting the user, but a
# token is available, you don't want to prompt until the token has
# been proven invalid
self.password_func = None
self.auth_url = auth_url.rstrip('/') if auth_url else auth_url
self.version = 'v1.1'
self.region_name = region_name
self.endpoint_type = endpoint_type
self.service_type = service_type
self.service_name = service_name
self.volume_service_name = volume_service_name
self.timings = timings
self.bypass_url = bypass_url.rstrip('/') if bypass_url else bypass_url
self.os_cache = os_cache or not no_cache
self.http_log_debug = http_log_debug
if timeout is not None:
self.timeout = float(timeout)
else:
self.timeout = None
self.times = [] # [("item", starttime, endtime), ...]
self.management_url = self.bypass_url or None
self.auth_token = auth_token
self.proxy_token = proxy_token
self.proxy_tenant_id = proxy_tenant_id
self.keyring_saver = None
self.keyring_saved = False
if insecure:
self.verify_cert = False
else:
if cacert:
self.verify_cert = cacert
else:
self.verify_cert = True
self._session = None
self._current_url = None
self._logger = logger or logging.getLogger(__name__)
if (self.http_log_debug and logger is None and
not self._logger.handlers):
# Logging level is already set on the root logger
ch = logging.StreamHandler()
self._logger.addHandler(ch)
self._logger.propagate = False
if hasattr(requests, 'logging'):
rql = requests.logging.getLogger(requests.__name__)
rql.addHandler(ch)
# Since we have already setup the root logger on debug, we
# have to set it up here on WARNING (its original level)
# otherwise we will get all the requests logging messages
rql.setLevel(logging.WARNING)
self.service_catalog = None
self.services_url = {}
self.last_request_id = None
def use_token_cache(self, use_it):
self.os_cache = use_it
def unauthenticate(self):
"""Forget all of our authentication information."""
self.management_url = None
self.auth_token = None
def set_management_url(self, url):
self.management_url = url
def get_timings(self):
return self.times
def reset_timings(self):
self.times = []
def _redact(self, target, path, text=None):
"""Replace the value of a key in `target`.
The key can be at the top level by specifying a list with a single
key as the path. Nested dictionaries are also supported by passing a
list of keys to be navigated to find the one that should be replaced.
In this case the last one is the one that will be replaced.
:param dict target: the dictionary that may have a key to be redacted;
modified in place
:param list path: a list representing the nested structure in `target`
that should be redacted; modified in place
:param string text: optional text to use as a replacement for the
redacted key. if text is not specified, the
default text will be sha1 hash of the value being
redacted
"""
key = path.pop()
# move to the most nested dict
for p in path:
try:
target = target[p]
except KeyError:
return
if key in target:
if text:
target[key] = text
elif target[key] is not None:
# because in python3 byte string handling is ... ug
value = target[key].encode('utf-8')
sha1sum = hashlib.sha1(value)
target[key] = "{SHA1}%s" % sha1sum.hexdigest()
def http_log_req(self, method, url, kwargs):
if not self.http_log_debug:
return
string_parts = ['curl -g -i']
if not kwargs.get('verify', True):
string_parts.append(' --insecure')
string_parts.append(" '%s'" % url)
string_parts.append(' -X %s' % method)
headers = copy.deepcopy(kwargs['headers'])
self._redact(headers, ['X-Auth-Token'])
# because dict ordering changes from 2 to 3
keys = sorted(headers.keys())
for name in keys:
value = headers[name]
header = ' -H "%s: %s"' % (name, value)
string_parts.append(header)
if 'data' in kwargs:
data = json.loads(kwargs['data'])
self._redact(data, ['auth', 'passwordCredentials', 'password'])
string_parts.append(" -d '%s'" % json.dumps(data))
self._logger.debug("REQ: %s" % "".join(string_parts))
def http_log_resp(self, resp):
if not self.http_log_debug:
return
if resp.text and resp.status_code != 400:
try:
body = json.loads(resp.text)
self._redact(body, ['access', 'token', 'id'])
except ValueError:
body = None
else:
body = None
self._logger.debug("RESP: [%(status)s] %(headers)s\nRESP BODY: "
"%(text)s\n", {'status': resp.status_code,
'headers': resp.headers,
'text': json.dumps(body)})
# if service name is None then use service_type for logging
service = self.service_name or self.service_type
_log_request_id(self._logger, resp, service)
def open_session(self):
if not self._connection_pool:
self._session = requests.Session()
def close_session(self):
if self._session and not self._connection_pool:
self._session.close()
self._session = None
def _get_session(self, url):
if self._connection_pool:
magic_tuple = parse.urlsplit(url)
scheme, netloc, path, query, frag = magic_tuple
service_url = '%s://%s' % (scheme, netloc)
if self._current_url != service_url:
# Invalidate Session object in case the url is somehow changed
if self._session:
self._session.close()
self._current_url = service_url
self._logger.debug(
"New session created for: (%s)" % service_url)
self._session = requests.Session()
self._session.mount(service_url,
self._connection_pool.get(service_url))
return self._session
elif self._session:
return self._session
def request(self, url, method, **kwargs):
kwargs.setdefault('headers', kwargs.get('headers', {}))
kwargs['headers']['User-Agent'] = self.USER_AGENT
kwargs['headers']['Accept'] = 'application/json'
if 'body' in kwargs:
kwargs['headers']['Content-Type'] = 'application/json'
kwargs['data'] = json.dumps(kwargs.pop('body'))
api_versions.update_headers(kwargs["headers"], self.api_version)
if self.timeout is not None:
kwargs.setdefault('timeout', self.timeout)
kwargs['verify'] = self.verify_cert
self.http_log_req(method, url, kwargs)
request_func = requests.request
session = self._get_session(url)
if session:
request_func = session.request
resp = request_func(
method,
url,
**kwargs)
# TODO(andreykurilin): uncomment this line, when we will be able to
# check only nova-related calls
# api_versions.check_headers(resp, self.api_version)
self.http_log_resp(resp)
if resp.text:
# TODO(dtroyer): verify the note below in a requests context
# NOTE(alaski): Because force_exceptions_to_status_code=True
# httplib2 returns a connection refused event as a 400 response.
# To determine if it is a bad request or refused connection we need
# to check the body. httplib2 tests check for 'Connection refused'
# or 'actively refused' in the body, so that's what we'll do.
if resp.status_code == 400:
if ('Connection refused' in resp.text or
'actively refused' in resp.text):
raise exceptions.ConnectionRefused(resp.text)
try:
body = json.loads(resp.text)
except ValueError:
body = None
else:
body = None
self.last_request_id = (resp.headers.get('x-openstack-request-id')
if resp.headers else None)
if resp.status_code >= 400:
raise exceptions.from_response(resp, body, url, method)
return resp, body
def _time_request(self, url, method, **kwargs):
with utils.record_time(self.times, self.timings, method, url):
resp, body = self.request(url, method, **kwargs)
return resp, body
def _cs_request(self, url, method, **kwargs):
if not self.management_url:
self.authenticate()
if url is None:
# To get API version information, it is necessary to GET
# a nova endpoint directly without "v2/<tenant-id>".
magic_tuple = parse.urlsplit(self.management_url)
scheme, netloc, path, query, frag = magic_tuple
path = re.sub(r'v[1-9](\.[1-9][0-9]*)?/[a-z0-9]+$', '', path)
url = parse.urlunsplit((scheme, netloc, path, None, None))
else:
if self.service_catalog and not self.bypass_url:
url = self.get_service_url(self.service_type) + url
else:
url = self.management_url + url
# Perform the request once. If we get a 401 back then it
# might be because the auth token expired, so try to
# re-authenticate and try again. If it still fails, bail.
try:
kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token
if self.projectid:
kwargs['headers']['X-Auth-Project-Id'] = self.projectid
resp, body = self._time_request(url, method, **kwargs)
return resp, body
except exceptions.Unauthorized as e:
try:
# first discard auth token, to avoid the possibly expired
# token being re-used in the re-authentication attempt
self.unauthenticate()
# overwrite bad token
self.keyring_saved = False
self.authenticate()
kwargs['headers']['X-Auth-Token'] = self.auth_token
resp, body = self._time_request(url, method, **kwargs)
return resp, body
except exceptions.Unauthorized:
raise e
def _get_password(self):
if not self.password and self.password_func:
self.password = self.password_func()
return self.password
def get(self, url, **kwargs):
return self._cs_request(url, 'GET', **kwargs)
def post(self, url, **kwargs):
return self._cs_request(url, 'POST', **kwargs)
def put(self, url, **kwargs):
return self._cs_request(url, 'PUT', **kwargs)
def delete(self, url, **kwargs):
return self._cs_request(url, 'DELETE', **kwargs)
def get_service_url(self, service_type):
if service_type not in self.services_url:
url = self.service_catalog.url_for(
attr='region',
filter_value=self.region_name,
endpoint_type=self.endpoint_type,
service_type=service_type,
service_name=self.service_name,
volume_service_name=self.volume_service_name,)
url = url.rstrip('/')
self.services_url[service_type] = url
return self.services_url[service_type]
def _extract_service_catalog(self, url, resp, body, extract_token=True):
"""Extract service catalog from input resource body.
See what the auth service told us and process the response.
We may get redirected to another site, fail or actually get
back a service catalog with a token and our endpoints.
"""
# content must always present
if resp.status_code == 200 or resp.status_code == 201:
try:
self.auth_url = url
self.service_catalog = \
service_catalog.ServiceCatalog(body)
if extract_token:
self.auth_token = self.service_catalog.get_token()
self.tenant_id = self.service_catalog.get_tenant_id()
self.management_url = self.get_service_url(self.service_type)
return None
except exceptions.AmbiguousEndpoints:
print(_("Found more than one valid endpoint. Use a more "
"restrictive filter"))
raise
except KeyError:
raise exceptions.AuthorizationFailure()
except exceptions.EndpointNotFound:
print(_("Could not find any suitable endpoint. Correct "
"region?"))
raise
elif resp.status_code == 305:
return resp.headers['location']
else:
raise exceptions.from_response(resp, body, url)
def _fetch_endpoints_from_auth(self, url):
"""Fetch endpoint using token.
We have a token, but don't know the final endpoint for
the region. We have to go back to the auth service and
ask again. This request requires an admin-level token
to work. The proxy token supplied could be from a low-level enduser.
We can't get this from the keystone service endpoint, we have to use
the admin endpoint.
This will overwrite our admin token with the user token.
"""
# GET ...:5001/v2.0/tokens/#####/endpoints
url = '/'.join([url, 'tokens', '%s?belongsTo=%s'
% (self.proxy_token, self.proxy_tenant_id)])
self._logger.debug("Using Endpoint URL: %s" % url)
resp, body = self._time_request(
url, "GET", headers={'X-Auth-Token': self.auth_token})
return self._extract_service_catalog(url, resp, body,
extract_token=False)
def authenticate(self):
if not self.auth_url:
msg = _("Authentication requires 'auth_url', which should be "
"specified in '%s'") % self.__class__.__name__
raise exceptions.AuthorizationFailure(msg)
magic_tuple = netutils.urlsplit(self.auth_url)
scheme, netloc, path, query, frag = magic_tuple
port = magic_tuple.port
if port is None:
port = 80
path_parts = path.split('/')
for part in path_parts:
if len(part) > 0 and part[0] == 'v':
self.version = part
break
if self.auth_token and self.management_url:
self._save_keys()
return
# TODO(sandy): Assume admin endpoint is 35357 for now.
# Ideally this is going to have to be provided by the service catalog.
new_netloc = netloc.replace(':%d' % port, ':%d' % (35357,))
admin_url = parse.urlunsplit(
(scheme, new_netloc, path, query, frag))
auth_url = self.auth_url
if self.version == "v2.0": # FIXME(chris): This should be better.
while auth_url:
auth_url = self._v2_auth(auth_url)
# Are we acting on behalf of another user via an
# existing token? If so, our actual endpoints may
# be different than that of the admin token.
if self.proxy_token:
if self.bypass_url:
self.set_management_url(self.bypass_url)
else:
self._fetch_endpoints_from_auth(admin_url)
# Since keystone no longer returns the user token
# with the endpoints any more, we need to replace
# our service account token with the user token.
self.auth_token = self.proxy_token
else:
try:
while auth_url:
auth_url = self._v1_auth(auth_url)
# In some configurations nova makes redirection to
# v2.0 keystone endpoint. Also, new location does not contain
# real endpoint, only hostname and port.
except exceptions.AuthorizationFailure:
if auth_url.find('v2.0') < 0:
auth_url = auth_url + '/v2.0'
self._v2_auth(auth_url)
if self.bypass_url:
self.set_management_url(self.bypass_url)
elif not self.management_url:
raise exceptions.Unauthorized('Nova Client')
self._save_keys()
def _save_keys(self):
# Store the token/mgmt url in the keyring for later requests.
if (self.keyring_saver and self.os_cache and not self.keyring_saved and
self.auth_token and self.management_url and
self.tenant_id):
self.keyring_saver.save(self.auth_token,
self.management_url,
self.tenant_id)
# Don't save it again
self.keyring_saved = True
def _v1_auth(self, url):
if self.proxy_token:
raise exceptions.NoTokenLookupException()
headers = {'X-Auth-User': self.user,
'X-Auth-Key': self._get_password()}
if self.projectid:
headers['X-Auth-Project-Id'] = self.projectid
resp, body = self._time_request(url, 'GET', headers=headers)
if resp.status_code in (200, 204): # in some cases we get No Content
try:
mgmt_header = 'x-server-management-url'
self.management_url = resp.headers[mgmt_header].rstrip('/')
self.auth_token = resp.headers['x-auth-token']
self.auth_url = url
except (KeyError, TypeError):
raise exceptions.AuthorizationFailure()
elif resp.status_code == 305:
return resp.headers['location']
else:
raise exceptions.from_response(resp, body, url)
def _v2_auth(self, url):
"""Authenticate against a v2.0 auth service."""
if self.auth_token:
body = {"auth": {
"token": {"id": self.auth_token}}}
elif self.user_id:
body = {"auth": {
"passwordCredentials": {"userId": self.user_id,
"password": self._get_password()}}}
else:
body = {"auth": {
"passwordCredentials": {"username": self.user,
"password": self._get_password()}}}
if self.tenant_id:
body['auth']['tenantId'] = self.tenant_id
elif self.projectid:
body['auth']['tenantName'] = self.projectid
return self._authenticate(url, body)
def _authenticate(self, url, body, **kwargs):
"""Authenticate and extract the service catalog."""
method = "POST"
token_url = url + "/tokens"
# Make sure we follow redirects when trying to reach Keystone
resp, respbody = self._time_request(
token_url,
method,
body=body,
allow_redirects=True,
**kwargs)
return self._extract_service_catalog(url, resp, respbody)
def _construct_http_client(api_version=None,
auth=None,
auth_token=None,

View File

@ -1,89 +0,0 @@
# Copyright 2011 OpenStack Foundation
# Copyright 2011, Piston Cloud Computing, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import novaclient.exceptions
class ServiceCatalog(object):
"""Helper methods for dealing with a Keystone Service Catalog."""
def __init__(self, resource_dict):
self.catalog = resource_dict
def get_token(self):
return self.catalog['access']['token']['id']
def get_tenant_id(self):
return self.catalog['access']['token']['tenant']['id']
def url_for(self, attr=None, filter_value=None,
service_type=None, endpoint_type='publicURL',
service_name=None, volume_service_name=None):
"""Fetch the public URL from the Compute service for
a particular endpoint attribute. If none given, return
the first. See tests for sample service catalog.
"""
matching_endpoints = []
if 'endpoints' in self.catalog:
# We have a bastardized service catalog. Treat it special. :/
for endpoint in self.catalog['endpoints']:
if not filter_value or endpoint[attr] == filter_value:
# Ignore 1.0 compute endpoints
if endpoint.get("type") == 'compute' and \
endpoint.get('versionId') in (None, '1.1', '2'):
matching_endpoints.append(endpoint)
if not matching_endpoints:
raise novaclient.exceptions.EndpointNotFound()
# We don't always get a service catalog back ...
if 'serviceCatalog' not in self.catalog['access']:
return None
# Full catalog ...
catalog = self.catalog['access']['serviceCatalog']
for service in catalog:
if service.get("type") != service_type:
continue
if (service_name and service_type == 'compute' and
service.get('name') != service_name):
continue
if (volume_service_name and service_type == 'volume' and
service.get('name') != volume_service_name):
continue
endpoints = service['endpoints']
for endpoint in endpoints:
# Ignore 1.0 compute endpoints
if (service.get("type") == 'compute' and
endpoint.get('versionId', '2') not in ('1.1', '2')):
continue
if (not filter_value or
endpoint.get(attr).lower() == filter_value.lower()):
endpoint["serviceName"] = service.get("name")
matching_endpoints.append(endpoint)
if not matching_endpoints:
raise novaclient.exceptions.EndpointNotFound()
elif len(matching_endpoints) > 1:
raise novaclient.exceptions.AmbiguousEndpoints(
endpoints=matching_endpoints)
else:
return matching_endpoints[0][endpoint_type]

View File

@ -444,7 +444,7 @@ class V1(Base):
assert len(body.keys()) == 1
action = list(body)[0]
if v2_fakes.FakeHTTPClient.check_server_actions(body):
if v2_fakes.FakeSessionClient.check_server_actions(body):
# NOTE(snikitin): No need to do any operations here. This 'pass'
# is needed to avoid AssertionError in the last 'else' statement
# if we found 'action' in method check_server_actions and

View File

@ -14,9 +14,7 @@
# under the License.
import copy
import logging
import fixtures
from keystoneauth1 import session
import mock
@ -27,129 +25,7 @@ from novaclient.tests.unit import utils
import novaclient.v2.client
class ClientConnectionPoolTest(utils.TestCase):
@mock.patch("keystoneauth1.session.TCPKeepAliveAdapter")
def test_get(self, mock_http_adapter):
mock_http_adapter.side_effect = lambda: mock.Mock()
pool = novaclient.client._ClientConnectionPool()
self.assertEqual(pool.get("abc"), pool.get("abc"))
self.assertNotEqual(pool.get("abc"), pool.get("def"))
class ClientTest(utils.TestCase):
def test_client_with_timeout(self):
auth_url = "http://example.com"
instance = novaclient.client.HTTPClient(user='user',
password='password',
projectid='project',
timeout=2,
auth_url=auth_url)
self.assertEqual(2, instance.timeout)
headers = {
'x-server-management-url': 'example.com',
'x-auth-token': 'blah',
}
self.requests_mock.get(auth_url, headers=headers)
instance.authenticate()
self.assertEqual(2, self.requests_mock.last_request.timeout)
def test_client_reauth(self):
auth_url = "http://www.example.com"
instance = novaclient.client.HTTPClient(user='user',
password='password',
projectid='project',
timeout=2,
auth_url=auth_url)
instance.auth_token = 'foobar'
mgmt_url = "http://mgmt.example.com"
instance.management_url = mgmt_url
instance.get_service_url = mock.Mock(return_value=mgmt_url)
instance.version = 'v2.0'
auth = self.requests_mock.post(auth_url + '/tokens', status_code=401)
detail = self.requests_mock.get(mgmt_url + '/servers/detail',
status_code=401)
self.assertRaises(novaclient.exceptions.Unauthorized,
instance.get,
'/servers/detail')
self.assertEqual(2, self.requests_mock.call_count)
self.assertTrue(detail.called_once)
self.assertTrue(auth.called_once)
detail_headers = detail.last_request.headers
self.assertEqual('project', detail_headers['X-Auth-Project-Id'])
self.assertEqual('foobar', detail_headers['X-Auth-Token'])
self.assertEqual('python-novaclient', detail_headers['User-Agent'])
self.assertEqual('application/json', detail_headers['Accept'])
reauth_headers = auth.last_request.headers
self.assertEqual('application/json', reauth_headers['Content-Type'])
self.assertEqual('application/json', reauth_headers['Accept'])
self.assertEqual('python-novaclient', reauth_headers['User-Agent'])
data = {
"auth": {
"tenantName": "project",
"passwordCredentials": {
"username": "user",
"password": "password"
}
}
}
self.assertEqual(data, auth.last_request.json())
def _check_version_url(self, management_url, version_url):
projectid = '25e469aa1848471b875e68cde6531bc5'
auth_url = "http://example.com"
instance = novaclient.client.HTTPClient(user='user',
password='password',
projectid=projectid,
auth_url=auth_url)
instance.auth_token = 'foobar'
instance.management_url = management_url % projectid
mock_get_service_url = mock.Mock(return_value=instance.management_url)
instance.get_service_url = mock_get_service_url
instance.version = 'v2.0'
versions = self.requests_mock.get(version_url, json={'versions': []})
servers = self.requests_mock.get(instance.management_url + 'servers')
# If passing None as the part of url, a client accesses the url which
# doesn't include "v2/<projectid>" for getting API version info.
instance.get(None)
self.assertTrue(versions.called_once)
# Otherwise, a client accesses the url which includes "v2/<projectid>".
self.assertFalse(servers.called_once)
instance.get('servers')
self.assertTrue(servers.called_once)
def test_client_version_url(self):
self._check_version_url('http://example.com/v2/%s',
'http://example.com/')
self._check_version_url('http://example.com/v2.1/%s',
'http://example.com/')
self._check_version_url('http://example.com/v3.785/%s',
'http://example.com/')
def test_client_version_url_with_project_name(self):
self._check_version_url('http://example.com/nova/v2/%s',
'http://example.com/nova/')
self._check_version_url('http://example.com/nova/v2.1/%s',
'http://example.com/nova/')
self._check_version_url('http://example.com/nova/v3.785/%s',
'http://example.com/nova/')
def test_get_client_class_v2(self):
output = novaclient.client.get_client_class('2')
self.assertEqual(output, novaclient.v2.client.Client)
@ -172,239 +48,6 @@ class ClientTest(utils.TestCase):
self.assertRaises(novaclient.exceptions.UnsupportedVersion,
novaclient.client.get_client_class, '2.latest')
def test_client_get_reset_timings_v2(self):
cs = novaclient.client.Client("2", "user", "password", "project_id",
auth_url="foo/v2")
self.assertEqual(0, len(cs.get_timings()))
cs.client.times.append("somevalue")
self.assertEqual(1, len(cs.get_timings()))
self.assertEqual("somevalue", cs.get_timings()[0])
cs.reset_timings()
self.assertEqual(0, len(cs.get_timings()))
def test_get_password_simple(self):
cs = novaclient.client.HTTPClient("user", "password", "", "")
cs.password_func = mock.Mock()
self.assertEqual("password", cs._get_password())
self.assertFalse(cs.password_func.called)
def test_get_password_none(self):
cs = novaclient.client.HTTPClient("user", None, "", "")
self.assertIsNone(cs._get_password())
def test_get_password_func(self):
cs = novaclient.client.HTTPClient("user", None, "", "")
cs.password_func = mock.Mock(return_value="password")
self.assertEqual("password", cs._get_password())
cs.password_func.assert_called_once_with()
cs.password_func = mock.Mock()
self.assertEqual("password", cs._get_password())
self.assertFalse(cs.password_func.called)
def test_auth_url_rstrip_slash(self):
cs = novaclient.client.HTTPClient("user", "password", "project_id",
auth_url="foo/v2/")
self.assertEqual("foo/v2", cs.auth_url)
def test_token_and_bypass_url(self):
cs = novaclient.client.HTTPClient(None, None, None,
auth_token="12345",
bypass_url="compute/v100/")
self.assertIsNone(cs.auth_url)
self.assertEqual("12345", cs.auth_token)
self.assertEqual("compute/v100", cs.bypass_url)
self.assertEqual("compute/v100", cs.management_url)
def test_service_url_lookup(self):
service_type = 'compute'
cs = novaclient.client.HTTPClient(None, None, None,
auth_url='foo/v2',
service_type=service_type)
self.requests_mock.get('http://mgmt.example.com/compute/v5/servers')
@mock.patch.object(cs,
'get_service_url',
return_value='http://mgmt.example.com/compute/v5')
@mock.patch.object(cs, 'authenticate')
def do_test(mock_auth, mock_get):
def set_service_catalog():
cs.service_catalog = 'catalog'
mock_auth.side_effect = set_service_catalog
cs.get('/servers')
mock_get.assert_called_once_with(service_type)
mock_auth.assert_called_once_with()
do_test()
self.assertEqual(1, self.requests_mock.call_count)
self.assertEqual('/compute/v5/servers',
self.requests_mock.last_request.path)
def test_bypass_url_no_service_url_lookup(self):
bypass_url = 'http://mgmt.compute.com/v100'
cs = novaclient.client.HTTPClient(None, None, None,
auth_url='foo/v2',
bypass_url=bypass_url)
get = self.requests_mock.get('http://mgmt.compute.com/v100/servers')
@mock.patch.object(cs, 'get_service_url')
def do_test(mock_get):
cs.get('/servers')
self.assertFalse(mock_get.called)
do_test()
self.assertTrue(get.called_once)
@mock.patch("novaclient.client.requests.Session")
def test_session(self, mock_session):
fake_session = mock.Mock()
mock_session.return_value = fake_session
cs = novaclient.client.HTTPClient("user", None, "", "")
cs.open_session()
self.assertEqual(cs._session, fake_session)
cs.close_session()
self.assertIsNone(cs._session)
def test_session_connection_pool(self):
cs = novaclient.client.HTTPClient("user", None, "",
"", connection_pool=True)
cs.open_session()
self.assertIsNone(cs._session)
cs.close_session()
self.assertIsNone(cs._session)
def test_get_session(self):
cs = novaclient.client.HTTPClient("user", None, "", "")
self.assertIsNone(cs._get_session("http://example.com"))
@mock.patch("novaclient.client.requests.Session")
def test_get_session_open_session(self, mock_session):
fake_session = mock.Mock()
mock_session.return_value = fake_session
cs = novaclient.client.HTTPClient("user", None, "", "")
cs.open_session()
self.assertEqual(fake_session, cs._get_session("http://example.com"))
@mock.patch("novaclient.client.requests.Session")
@mock.patch("novaclient.client._ClientConnectionPool")
def test_get_session_connection_pool(self, mock_pool, mock_session):
service_url = "http://service.example.com"
pool = mock.MagicMock()
pool.get.return_value = "http_adapter"
mock_pool.return_value = pool
cs = novaclient.client.HTTPClient("user", None, "",
"", connection_pool=True)
cs._current_url = "http://current.example.com"
session = cs._get_session(service_url)
self.assertEqual(session, mock_session.return_value)
pool.get.assert_called_once_with(service_url)
mock_session().mount.assert_called_once_with(service_url,
'http_adapter')
def test_init_without_connection_pool(self):
cs = novaclient.client.HTTPClient("user", None, "", "")
self.assertIsNone(cs._connection_pool)
@mock.patch("novaclient.client._ClientConnectionPool")
def test_init_with_proper_connection_pool(self, mock_pool):
fake_pool = mock.Mock()
mock_pool.return_value = fake_pool
cs = novaclient.client.HTTPClient("user", None, "",
connection_pool=True)
self.assertEqual(cs._connection_pool, fake_pool)
def test_log_req(self):
self.logger = self.useFixture(
fixtures.FakeLogger(
format="%(message)s",
level=logging.DEBUG,
nuke_handlers=True
)
)
cs = novaclient.client.HTTPClient("user", None, "",
connection_pool=True)
cs.http_log_debug = True
cs.http_log_req('GET', '/foo', {'headers': {}})
cs.http_log_req('GET', '/foo', {'headers':
{'X-Auth-Token': None}})
cs.http_log_req('GET', '/foo', {'headers':
{'X-Auth-Token': 'totally_bogus'}})
cs.http_log_req('GET', '/foo', {'headers':
{'X-Foo': 'bar',
'X-Auth-Token': 'totally_bogus'}})
cs.http_log_req('GET', '/foo', {'headers': {},
'data':
'{"auth": {"passwordCredentials": '
'{"password": "zhaoqin"}}}'})
output = self.logger.output.split('\n')
self.assertIn("REQ: curl -g -i '/foo' -X GET", output)
self.assertIn(
"REQ: curl -g -i '/foo' -X GET -H "
'"X-Auth-Token: None"',
output)
self.assertIn(
"REQ: curl -g -i '/foo' -X GET -H "
'"X-Auth-Token: {SHA1}b42162b6ffdbd7c3c37b7c95b7ba9f51dda0236d"',
output)
self.assertIn(
"REQ: curl -g -i '/foo' -X GET -H "
'"X-Auth-Token: {SHA1}b42162b6ffdbd7c3c37b7c95b7ba9f51dda0236d"'
' -H "X-Foo: bar"',
output)
self.assertIn(
"REQ: curl -g -i '/foo' -X GET -d "
'\'{"auth": {"passwordCredentials": {"password":'
' "{SHA1}4fc49c6a671ce889078ff6b250f7066cf6d2ada2"}}}\'',
output)
def test_log_resp(self):
self.logger = self.useFixture(
fixtures.FakeLogger(
format="%(message)s",
level=logging.DEBUG,
nuke_handlers=True
)
)
cs = novaclient.client.HTTPClient("user", None, "",
connection_pool=True)
cs.http_log_debug = True
text = ('{"access": {"token": {"id": "zhaoqin"}}}')
resp = utils.TestResponse({'status_code': 200, 'headers': {},
'text': text})
cs.http_log_resp(resp)
output = self.logger.output.split('\n')
self.assertIn('RESP: [200] {}', output)
self.assertIn('RESP BODY: {"access": {"token": {"id":'
' "{SHA1}4fc49c6a671ce889078ff6b250f7066cf6d2ada2"}}}',
output)
def test_timings(self):
self.requests_mock.get('http://no.where')
client = novaclient.client.HTTPClient(user='zqfan', password='')
client._time_request("http://no.where", 'GET')
self.assertEqual(0, len(client.times))
client = novaclient.client.HTTPClient(user='zqfan', password='',
timings=True)
client._time_request("http://no.where", 'GET')
self.assertEqual(1, len(client.times))
self.assertEqual('GET http://no.where', client.times[0][0])
class SessionClientTest(utils.TestCase):
@ -422,6 +65,16 @@ class SessionClientTest(utils.TestCase):
self.assertEqual(1, len(client.times))
self.assertEqual('GET http://no.where', client.times[0][0])
def test_client_get_reset_timings_v2(self):
cs = novaclient.client.SessionClient(session=session.Session())
self.assertEqual(0, len(cs.get_timings()))
cs.times.append("somevalue")
self.assertEqual(1, len(cs.get_timings()))
self.assertEqual("somevalue", cs.get_timings()[0])
cs.reset_timings()
self.assertEqual(0, len(cs.get_timings()))
@mock.patch.object(novaclient.client, '_log_request_id')
def test_log_request_id(self, mock_log_request_id):
self.requests_mock.get('http://no.where')

View File

@ -1,220 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import requests
import six
from novaclient import client
from novaclient import exceptions
from novaclient.tests.unit import utils
fake_response = utils.TestResponse({
"status_code": 200,
"text": '{"hi": "there"}',
})
mock_request = mock.Mock(return_value=(fake_response))
refused_response = utils.TestResponse({
"status_code": 400,
"text": '[Errno 111] Connection refused',
})
refused_mock_request = mock.Mock(return_value=(refused_response))
bad_req_response = utils.TestResponse({
"status_code": 400,
"text": '',
})
bad_req_mock_request = mock.Mock(return_value=(bad_req_response))
unknown_error_response = utils.TestResponse({
"status_code": 503,
"text": '',
})
unknown_error_mock_request = mock.Mock(return_value=unknown_error_response)
retry_after_response = utils.TestResponse({
"status_code": 413,
"text": '',
"headers": {
"retry-after": "5"
},
})
retry_after_mock_request = mock.Mock(return_value=retry_after_response)
retry_after_no_headers_response = utils.TestResponse({
"status_code": 413,
"text": '',
})
retry_after_no_headers_mock_request = mock.Mock(
return_value=retry_after_no_headers_response)
retry_after_non_supporting_response = utils.TestResponse({
"status_code": 403,
"text": '',
"headers": {
"retry-after": "5"
},
})
retry_after_non_supporting_mock_request = mock.Mock(
return_value=retry_after_non_supporting_response)
def get_client(**kwargs):
cl = client.HTTPClient("username", "password",
"project_id",
utils.AUTH_URL_V2,
**kwargs)
return cl
def get_authed_client(**kwargs):
cl = get_client(**kwargs)
cl.management_url = "http://example.com"
cl.auth_token = "token"
cl.get_service_url = mock.Mock(return_value="http://example.com")
return cl
class ClientTest(utils.TestCase):
def test_get(self):
cl = get_authed_client()
@mock.patch.object(requests, "request", mock_request)
@mock.patch('time.time', mock.Mock(return_value=1234))
def test_get_call():
resp, body = cl.get("/hi")
headers = {"X-Auth-Token": "token",
"X-Auth-Project-Id": "project_id",
"User-Agent": cl.USER_AGENT,
'Accept': 'application/json'}
mock_request.assert_called_with(
"GET",
"http://example.com/hi",
headers=headers,
**self.TEST_REQUEST_BASE)
# Automatic JSON parsing
self.assertEqual({"hi": "there"}, body)
test_get_call()
def test_post(self):
cl = get_authed_client()
@mock.patch.object(requests, "request", mock_request)
def test_post_call():
cl.post("/hi", body=[1, 2, 3])
headers = {
"X-Auth-Token": "token",
"X-Auth-Project-Id": "project_id",
"Content-Type": "application/json",
'Accept': 'application/json',
"User-Agent": cl.USER_AGENT
}
mock_request.assert_called_with(
"POST",
"http://example.com/hi",
headers=headers,
data='[1, 2, 3]',
**self.TEST_REQUEST_BASE)
test_post_call()
def test_auth_failure(self):
cl = get_client()
# response must not have x-server-management-url header
@mock.patch.object(requests.Session, "request", mock_request)
def test_auth_call():
self.assertRaises(exceptions.AuthorizationFailure, cl.authenticate)
test_auth_call()
def test_auth_failure_due_to_miss_of_auth_url(self):
cl = client.HTTPClient("username", "password")
self.assertRaises(exceptions.AuthorizationFailure, cl.authenticate)
def test_connection_refused(self):
cl = get_client()
@mock.patch.object(requests, "request", refused_mock_request)
def test_refused_call():
self.assertRaises(exceptions.ConnectionRefused, cl.get, "/hi")
test_refused_call()
def test_bad_request(self):
cl = get_client()
@mock.patch.object(requests, "request", bad_req_mock_request)
def test_refused_call():
self.assertRaises(exceptions.BadRequest, cl.get, "/hi")
test_refused_call()
@mock.patch.object(requests, "request", mock_request)
@mock.patch.object(client, '_log_request_id')
@mock.patch.object(client.HTTPClient, 'http_log_req')
def test_client_logger(self, mock_http_log_req, mock_log_request_id):
cl = get_authed_client(service_name='compute', http_log_debug=True)
self.assertIsNotNone(cl._logger)
cl.post("/hi", body=[1, 2, 3])
mock_log_request_id.assert_called_once_with(cl._logger, fake_response,
'compute')
@mock.patch.object(requests, 'request', unknown_error_mock_request)
def test_unknown_server_error(self):
cl = get_client()
# This would be cleaner with the context manager version of
# assertRaises or assertRaisesRegexp, but both only appeared in
# Python 2.7 and testtools doesn't match that implementation yet
try:
cl.get('/hi')
except exceptions.ClientException as exc:
self.assertIn('Unknown Error', six.text_type(exc))
else:
self.fail('Expected exceptions.ClientException')
@mock.patch.object(requests, "request", retry_after_mock_request)
def test_retry_after_request(self):
cl = get_client()
try:
cl.get("/hi")
except exceptions.OverLimit as exc:
self.assertEqual(5, exc.retry_after)
else:
self.fail('Expected exceptions.OverLimit')
@mock.patch.object(requests, "request",
retry_after_no_headers_mock_request)
def test_retry_after_request_no_headers(self):
cl = get_client()
try:
cl.get("/hi")
except exceptions.OverLimit as exc:
self.assertEqual(0, exc.retry_after)
else:
self.fail('Expected exceptions.OverLimit')
@mock.patch.object(requests, "request",
retry_after_non_supporting_mock_request)
def test_retry_after_request_non_supporting_exc(self):
cl = get_client()
self.assertRaises(exceptions.Forbidden, cl.get, "/hi")

View File

@ -1,60 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneauth1 import fixture
from novaclient import exceptions
from novaclient import service_catalog
from novaclient.tests.unit import utils
SERVICE_CATALOG = fixture.V2Token()
SERVICE_CATALOG.set_scope()
_s = SERVICE_CATALOG.add_service('compute')
_e = _s.add_endpoint("https://compute1.host/v1/1")
_e["tenantId"] = "1"
_e["versionId"] = "1.0"
_e = _s.add_endpoint("https://compute1.host/v1.1/2", region="North")
_e["tenantId"] = "2"
_e["versionId"] = "1.1"
_e = _s.add_endpoint("https://compute1.host/v2/1", region="North")
_e["tenantId"] = "1"
_e["versionId"] = "2"
_s = SERVICE_CATALOG.add_service('volume')
_e = _s.add_endpoint("https://volume1.host/v1/1", region="South")
_e["tenantId"] = "1"
_e = _s.add_endpoint("https://volume1.host/v1.1/2", region="South")
_e["tenantId"] = "2"
class ServiceCatalogTest(utils.TestCase):
def test_building_a_service_catalog(self):
sc = service_catalog.ServiceCatalog(SERVICE_CATALOG)
self.assertRaises(exceptions.AmbiguousEndpoints, sc.url_for,
service_type='compute')
self.assertEqual("https://compute1.host/v2/1",
sc.url_for('tenantId', '1', service_type='compute'))
self.assertEqual("https://compute1.host/v1.1/2",
sc.url_for('tenantId', '2', service_type='compute'))
self.assertRaises(exceptions.EndpointNotFound, sc.url_for,
"region", "South", service_type='compute')
def test_building_a_service_catalog_insensitive_case(self):
sc = service_catalog.ServiceCatalog(SERVICE_CATALOG)
# Matching south (and catalog has South).
self.assertRaises(exceptions.AmbiguousEndpoints, sc.url_for,
'region', 'south', service_type='volume')

View File

@ -61,21 +61,24 @@ class FakeClient(fakes.FakeClient, client.Client):
project_id='project_id', auth_url='auth_url',
extensions=kwargs.get('extensions'),
direct_use=False, api_version=api_version)
self.client = FakeHTTPClient(api_version=api_version, **kwargs)
self.client = FakeSessionClient(api_version=api_version, **kwargs)
class FakeHTTPClient(base_client.HTTPClient):
class FakeSessionClient(base_client.SessionClient):
def __init__(self, *args, **kwargs):
def __init__(self, **kwargs):
self.username = 'username'
self.password = 'password'
self.auth_url = 'auth_url'
self.tenant_id = 'tenant_id'
self.callstack = []
self.projectid = 'projectid'
self.user = 'user'
self.region_name = 'region_name'
self.auth = mock.Mock()
self.session = mock.Mock()
self.service_type = 'service_type'
self.service_name = None
self.endpoint_override = None
self.interface = None
self.region_name = None
self.version = None
self.api_version = kwargs.get('api_version')
self.auth.get_auth_ref.return_value.project_id = 'tenant_id'
# determines which endpoint to return in get_endpoint()
# NOTE(augustina): this is a hacky workaround, ultimately
# we need to fix our whole mocking architecture (fixtures?)
@ -83,17 +86,19 @@ class FakeHTTPClient(base_client.HTTPClient):
self.endpoint_type = kwargs['endpoint_type']
else:
self.endpoint_type = 'endpoint_type'
self.logger = mock.MagicMock()
self.service_type = 'service_type'
self.service_name = 'service_name'
self.volume_service_name = 'volume_service_name'
self.timings = 'timings'
self.bypass_url = 'bypass_url'
self.os_cache = 'os_cache'
self.http_log_debug = 'http_log_debug'
self.last_request_id = None
self.management_url = self.get_endpoint()
self.api_version = kwargs.get("api_version")
def get_endpoint(self, **kwargs):
# check if endpoint matches expected format (eg, v2.1)
if (hasattr(self, 'endpoint_type') and
ENDPOINT_TYPE_RE.search(self.endpoint_type)):
return "http://nova-api:8774/%s/" % self.endpoint_type
else:
return (
"http://nova-api:8774/v2.1/190a755eef2e4aac9f06aa6be9786385")
def request(self, url, method, **kwargs):
return self._cs_request(url, method, **kwargs)
def _cs_request(self, url, method, **kwargs):
# Check that certain things are called correctly
@ -156,15 +161,6 @@ class FakeHTTPClient(base_client.HTTPClient):
})
return r, body
def get_endpoint(self, **kwargs):
# check if endpoint matches expected format (eg, v2.1)
if (hasattr(self, 'endpoint_type') and
ENDPOINT_TYPE_RE.search(self.endpoint_type)):
return "http://nova-api:8774/%s/" % self.endpoint_type
else:
return (
"http://nova-api:8774/v2.1/190a755eef2e4aac9f06aa6be9786385")
def get_versions(self):
return (200, FAKE_RESPONSE_HEADERS, {
"versions": [
@ -2357,35 +2353,3 @@ class FakeHTTPClient(base_client.HTTPClient):
'status': 'completed',
'tag': 'tag',
'server_uuid': 'fake-uuid2'}]})
class FakeSessionClient(fakes.FakeClient, client.Client):
def __init__(self, api_version, *args, **kwargs):
client.Client.__init__(self, username='username', password='password',
project_id='project_id', auth_url='auth_url',
extensions=kwargs.get('extensions'),
direct_use=False, api_version=api_version)
self.client = FakeSessionMockClient(api_version=api_version, **kwargs)
class FakeSessionMockClient(base_client.SessionClient, FakeHTTPClient):
def __init__(self, *args, **kwargs):
self.callstack = []
self.auth = mock.Mock()
self.session = mock.Mock()
self.session.get_endpoint.return_value = FakeHTTPClient.get_endpoint(
self)
self.service_type = 'service_type'
self.service_name = None
self.endpoint_override = None
self.interface = None
self.region_name = None
self.version = None
self.api_version = kwargs.get('api_version')
self.auth.get_auth_ref.return_value.project_id = 'tenant_id'
def request(self, url, method, **kwargs):
return self._cs_request(url, method, **kwargs)

View File

@ -1,392 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
from keystoneauth1 import fixture
import mock
import requests
from novaclient import client
from novaclient import exceptions
from novaclient.tests.unit import utils
def Client(*args, **kwargs):
return client.Client("2", *args, **kwargs)
class AuthenticateAgainstKeystoneTests(utils.TestCase):
def setUp(self):
super(AuthenticateAgainstKeystoneTests, self).setUp()
self.skipTest("This TestCase checks deprecated authentication "
"methods, which will be removed in separate patch.")
def get_token(self, **kwargs):
resp = fixture.V2Token(**kwargs)
resp.set_scope()
s = resp.add_service('compute')
s.add_endpoint('http://localhost:8774/v1.1', region='RegionOne')
return resp
def test_authenticate_success(self):
cs = Client("username", "password", project_name="project_id",
auth_url=utils.AUTH_URL_V2, service_type='compute')
resp = self.get_token()
auth_response = utils.TestResponse({
"status_code": 200,
"text": json.dumps(resp),
})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers = {
'User-Agent': cs.client.USER_AGENT,
'Content-Type': 'application/json',
'Accept': 'application/json',
}
body = {
'auth': {
'passwordCredentials': {
'username': cs.client.user,
'password': cs.client.password,
},
'tenantName': cs.client.projectid,
},
}
token_url = cs.client.auth_url + "/tokens"
mock_request.assert_called_with(
"POST",
token_url,
headers=headers,
data=json.dumps(body),
allow_redirects=True,
**self.TEST_REQUEST_BASE)
endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
public_url = endpoints[0]["publicURL"].rstrip('/')
self.assertEqual(cs.client.management_url, public_url)
token_id = resp["access"]["token"]["id"]
self.assertEqual(cs.client.auth_token, token_id)
test_auth_call()
def test_authenticate_failure(self):
cs = Client("username", "password", project_name="project_id",
auth_url=utils.AUTH_URL_V2)
resp = {"unauthorized": {"message": "Unauthorized", "code": "401"}}
auth_response = utils.TestResponse({
"status_code": 401,
"text": json.dumps(resp),
})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests.Session, "request", mock_request)
def test_auth_call():
self.assertRaises(exceptions.Unauthorized, cs.client.authenticate)
test_auth_call()
def test_v1_auth_redirect(self):
cs = Client("username", "password", project_name="project_id",
auth_url=utils.AUTH_URL_V1, service_type='compute')
dict_correct_response = self.get_token()
correct_response = json.dumps(dict_correct_response)
dict_responses = [
{"headers": {'location': 'http://127.0.0.1:5001'},
"status_code": 305,
"text": "Use proxy"},
# Configured on admin port, nova redirects to v2.0 port.
# When trying to connect on it, keystone auth succeed by v1.0
# protocol (through headers) but tokens are being returned in
# body (looks like keystone bug). Leaved for compatibility.
{"headers": {},
"status_code": 200,
"text": correct_response},
{"headers": {},
"status_code": 200,
"text": correct_response}
]
responses = [(utils.TestResponse(resp)) for resp in dict_responses]
def side_effect(*args, **kwargs):
return responses.pop(0)
mock_request = mock.Mock(side_effect=side_effect)
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers = {
'User-Agent': cs.client.USER_AGENT,
'Content-Type': 'application/json',
'Accept': 'application/json',
}
body = {
'auth': {
'passwordCredentials': {
'username': cs.client.user,
'password': cs.client.password,
},
'tenantName': cs.client.projectid,
},
}
token_url = cs.client.auth_url + "/tokens"
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = headers
kwargs['data'] = json.dumps(body)
mock_request.assert_called_with(
"POST",
token_url,
allow_redirects=True,
**kwargs)
resp = dict_correct_response
endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
public_url = endpoints[0]["publicURL"].rstrip('/')
self.assertEqual(cs.client.management_url, public_url)
token_id = resp["access"]["token"]["id"]
self.assertEqual(cs.client.auth_token, token_id)
test_auth_call()
def test_v2_auth_redirect(self):
cs = Client("username", "password", project_name="project_id",
auth_url=utils.AUTH_URL_V2, service_type='compute')
dict_correct_response = self.get_token()
correct_response = json.dumps(dict_correct_response)
dict_responses = [
{"headers": {'location': 'http://127.0.0.1:5001'},
"status_code": 305,
"text": "Use proxy"},
# Configured on admin port, nova redirects to v2.0 port.
# When trying to connect on it, keystone auth succeed by v1.0
# protocol (through headers) but tokens are being returned in
# body (looks like keystone bug). Leaved for compatibility.
{"headers": {},
"status_code": 200,
"text": correct_response},
{"headers": {},
"status_code": 200,
"text": correct_response}
]
responses = [(utils.TestResponse(resp)) for resp in dict_responses]
def side_effect(*args, **kwargs):
return responses.pop(0)
mock_request = mock.Mock(side_effect=side_effect)
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers = {
'User-Agent': cs.client.USER_AGENT,
'Content-Type': 'application/json',
'Accept': 'application/json',
}
body = {
'auth': {
'passwordCredentials': {
'username': cs.client.user,
'password': cs.client.password,
},
'tenantName': cs.client.projectid,
},
}
token_url = cs.client.auth_url + "/tokens"
kwargs = copy.copy(self.TEST_REQUEST_BASE)
kwargs['headers'] = headers
kwargs['data'] = json.dumps(body)
mock_request.assert_called_with(
"POST",
token_url,
allow_redirects=True,
**kwargs)
resp = dict_correct_response
endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
public_url = endpoints[0]["publicURL"].rstrip('/')
self.assertEqual(cs.client.management_url, public_url)
token_id = resp["access"]["token"]["id"]
self.assertEqual(cs.client.auth_token, token_id)
test_auth_call()
def test_ambiguous_endpoints(self):
cs = Client("username", "password", project_name="project_id",
auth_url=utils.AUTH_URL_V2, service_type='compute')
resp = self.get_token()
# duplicate existing service
s = resp.add_service('compute')
s.add_endpoint('http://localhost:8774/v1.1', region='RegionOne')
auth_response = utils.TestResponse({
"status_code": 200,
"text": json.dumps(resp),
})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests.Session, "request", mock_request)
def test_auth_call():
self.assertRaises(exceptions.AmbiguousEndpoints,
cs.client.authenticate)
test_auth_call()
def test_authenticate_with_token_success(self):
cs = Client("username", None, project_name="project_id",
auth_url=utils.AUTH_URL_V2, service_type='compute')
cs.client.auth_token = "FAKE_ID"
resp = self.get_token(token_id="FAKE_ID")
auth_response = utils.TestResponse({
"status_code": 200,
"text": json.dumps(resp),
})
mock_request = mock.Mock(return_value=(auth_response))
with mock.patch.object(requests, "request", mock_request):
cs.client.authenticate()
headers = {
'User-Agent': cs.client.USER_AGENT,
'Content-Type': 'application/json',
'Accept': 'application/json',
}
body = {
'auth': {
'token': {
'id': cs.client.auth_token,
},
'tenantName': cs.client.projectid,
},
}
token_url = cs.client.auth_url + "/tokens"
mock_request.assert_called_with(
"POST",
token_url,
headers=headers,
data=json.dumps(body),
allow_redirects=True,
**self.TEST_REQUEST_BASE)
endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
public_url = endpoints[0]["publicURL"].rstrip('/')
self.assertEqual(cs.client.management_url, public_url)
token_id = resp["access"]["token"]["id"]
self.assertEqual(cs.client.auth_token, token_id)
def test_authenticate_with_token_failure(self):
cs = Client("username", None, project_name="project_id",
auth_url=utils.AUTH_URL_V2)
cs.client.auth_token = "FAKE_ID"
resp = {"unauthorized": {"message": "Unauthorized", "code": "401"}}
auth_response = utils.TestResponse({
"status_code": 401,
"text": json.dumps(resp),
})
mock_request = mock.Mock(return_value=(auth_response))
with mock.patch.object(requests.Session, "request", mock_request):
self.assertRaises(exceptions.Unauthorized, cs.client.authenticate)
class AuthenticationTests(utils.TestCase):
def setUp(self):
super(AuthenticationTests, self).setUp()
self.skipTest("This TestCase checks deprecated authentication "
"methods, which will be removed in separate patch.")
def test_authenticate_success(self):
cs = Client("username", "password", project_name="project_id",
auth_url=utils.AUTH_URL)
management_url = 'https://localhost/v1.1/443470'
auth_response = utils.TestResponse({
'status_code': 204,
'headers': {
'x-server-management-url': management_url,
'x-auth-token': '1b751d74-de0c-46ae-84f0-915744b582d1',
},
})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
cs.client.authenticate()
headers = {
'Accept': 'application/json',
'X-Auth-User': 'username',
'X-Auth-Key': 'password',
'X-Auth-Project-Id': 'project_id',
'User-Agent': cs.client.USER_AGENT
}
mock_request.assert_called_with(
"GET",
cs.client.auth_url,
headers=headers,
**self.TEST_REQUEST_BASE)
self.assertEqual(cs.client.management_url,
auth_response.headers['x-server-management-url'])
self.assertEqual(cs.client.auth_token,
auth_response.headers['x-auth-token'])
test_auth_call()
def test_authenticate_failure(self):
cs = Client("username", "password", project_name="project_id",
auth_url=utils.AUTH_URL)
auth_response = utils.TestResponse({'status_code': 401})
mock_request = mock.Mock(return_value=(auth_response))
@mock.patch.object(requests, "request", mock_request)
def test_auth_call():
self.assertRaises(exceptions.Unauthorized, cs.client.authenticate)
test_auth_call()
def test_auth_automatic(self):
cs = Client("username", "password", project_name="project_id",
auth_url=utils.AUTH_URL, direct_use=False)
http_client = cs.client
http_client.management_url = ''
http_client.get_service_url = mock.Mock(return_value='')
mock_request = mock.Mock(return_value=(None, None))
@mock.patch.object(http_client, 'request', mock_request)
@mock.patch.object(http_client, 'authenticate')
def test_auth_call(m):
http_client.get('/')
self.assertTrue(m.called)
self.assertTrue(mock_request.called)
test_auth_call()

View File

@ -75,7 +75,6 @@ class ShellTest(utils.TestCase):
self.useFixture(fixtures.EnvironmentVariable(var,
self.FAKE_ENV[var]))
self.shell = self.useFixture(ShellFixture()).shell
self.useFixture(fixtures.MonkeyPatch(
'novaclient.client.Client', fakes.FakeClient))
@ -920,7 +919,7 @@ class ShellTest(utils.TestCase):
@mock.patch('novaclient.v2.client.Client.has_neutron', return_value=False)
@mock.patch(
'novaclient.tests.unit.v2.fakes.FakeHTTPClient.get_os_networks')
'novaclient.tests.unit.v2.fakes.FakeSessionClient.get_os_networks')
def test_boot_nics_net_name_multiple_matches(self, mock_networks_list,
has_neutron):
mock_networks_list.return_value = (200, {}, {
@ -3253,16 +3252,6 @@ class ShellTest(utils.TestCase):
self.assert_called('GET', '/servers/detail?not-tags-any=tag1%2Ctag2')
class ShellWithSessionClientTest(ShellTest):
def setUp(self):
"""Run before each test."""
super(ShellWithSessionClientTest, self).setUp()
self.useFixture(fixtures.MonkeyPatch(
'novaclient.client.Client', fakes.FakeSessionClient))
class GetSecgroupTest(utils.TestCase):
def test_with_integer(self):
cs = mock.Mock(**{

View File

@ -15,7 +15,6 @@
import mock
from novaclient import api_versions
from novaclient import base
from novaclient import exceptions as exc
from novaclient.tests.unit import utils
from novaclient.tests.unit.v2 import fakes
@ -28,73 +27,24 @@ class VersionsTest(utils.TestCase):
self.cs = fakes.FakeClient(api_versions.APIVersion("2.0"))
self.service_type = versions.Version
@mock.patch.object(versions.VersionManager, '_is_session_client',
return_value=False)
def test_list_services_with_http_client(self, mock_is_session_client):
vl = self.cs.versions.list()
self.assert_request_id(vl, fakes.FAKE_REQUEST_ID_LIST)
self.cs.assert_called('GET', None)
@mock.patch.object(versions.VersionManager, '_is_session_client',
return_value=True)
def test_list_services_with_session_client(self, mock_is_session_client):
def test_list_services(self):
vl = self.cs.versions.list()
self.assert_request_id(vl, fakes.FAKE_REQUEST_ID_LIST)
self.cs.assert_called('GET', 'http://nova-api:8774/')
@mock.patch.object(versions.VersionManager, '_is_session_client',
return_value=False)
@mock.patch.object(versions.VersionManager, 'list')
def test_get_current_with_http_client(self, mock_list,
mock_is_session_client):
headers = {'x-openstack-request-id': fakes.FAKE_REQUEST_ID}
resp = utils.TestResponse({"headers": headers})
current_version = versions.Version(
None, {"links": [{"href": "http://nova-api:8774/v2.1"}]},
loaded=True)
all_versions = [
versions.Version(
None, {"links": [{"href": "http://url/v1"}]}, loaded=True),
versions.Version(
None, {"links": [{"href": "http://url/v2"}]}, loaded=True),
versions.Version(
None, {"links": [{"href": "http://url/v3"}]}, loaded=True),
current_version,
versions.Version(
None, {"links": [{"href": "http://url/v21"}]}, loaded=True)]
mock_list.return_value = base.ListWithMeta(all_versions, resp)
v = self.cs.versions.get_current()
self.assert_request_id(v, fakes.FAKE_REQUEST_ID_LIST)
self.assertEqual(current_version, v)
@mock.patch.object(versions.VersionManager, '_is_session_client',
return_value=True)
def test_get_current_with_session_client(self, mock_is_session_client):
def test_get_current(self):
self.cs.callback = []
v = self.cs.versions.get_current()
self.assert_request_id(v, fakes.FAKE_REQUEST_ID_LIST)
self.cs.assert_called('GET', 'http://nova-api:8774/v2.1/')
@mock.patch.object(versions.VersionManager, '_is_session_client',
return_value=True)
@mock.patch.object(versions.VersionManager, '_get',
side_effect=exc.Unauthorized("401 RAX"))
def test_get_current_with_rax_workaround(self, session, get):
def test_get_current_with_rax_workaround(self, get):
self.cs.callback = []
self.assertIsNone(self.cs.versions.get_current())
@mock.patch.object(versions.VersionManager, '_is_session_client',
return_value=False)
@mock.patch.object(versions.VersionManager, '_list',
side_effect=exc.Unauthorized("401 RAX"))
def test_get_current_with_rax_auth_plugin_workaround(self, session, _list):
self.cs.callback = []
self.assertIsNone(self.cs.versions.get_current())
@mock.patch.object(versions.VersionManager, '_is_session_client',
return_value=True)
def test_get_endpoint_without_project_id(self, mock_is_session_client):
def test_get_endpoint_without_project_id(self):
# create a fake client such that get_endpoint()
# doesn't return uuid in url
endpoint_type = 'v2.1'
@ -106,15 +56,11 @@ class VersionsTest(utils.TestCase):
self.assert_request_id(result, fakes.FAKE_REQUEST_ID_LIST)
self.assertEqual(result.manager.api.client.endpoint_type,
endpoint_type, "Check endpoint_type was set")
self.assertEqual(result.manager.api.client.management_url,
expected_endpoint, "Check endpoint without uuid")
# check that the full request works as expected
cs_2_1.assert_called('GET', 'http://nova-api:8774/v2.1/')
cs_2_1.assert_called('GET', expected_endpoint)
@mock.patch.object(versions.VersionManager, '_is_session_client',
return_value=True)
def test_v2_get_endpoint_without_project_id(self, mock_is_session_client):
def test_v2_get_endpoint_without_project_id(self):
# create a fake client such that get_endpoint()
# doesn't return uuid in url
endpoint_type = 'v2'
@ -126,8 +72,6 @@ class VersionsTest(utils.TestCase):
self.assert_request_id(result, fakes.FAKE_REQUEST_ID_LIST)
self.assertEqual(result.manager.api.client.endpoint_type,
endpoint_type, "Check v2 endpoint_type was set")
self.assertEqual(result.manager.api.client.management_url,
expected_endpoint, "Check v2 endpoint without uuid")
# check that the full request works as expected
cs_2.assert_called('GET', 'http://nova-api:8774/v2/')
cs_2.assert_called('GET', expected_endpoint)

View File

@ -19,7 +19,6 @@ version interface
from six.moves import urllib
from novaclient import base
from novaclient import client
from novaclient import exceptions as exc
@ -34,50 +33,37 @@ class Version(base.Resource):
class VersionManager(base.ManagerWithFind):
resource_class = Version
def _is_session_client(self):
return isinstance(self.api.client, client.SessionClient)
def _get_current(self):
"""Returns info about current version."""
# TODO(sdague): we've now got to make up to 3 HTTP requests to
# determine what version we are running, due to differences in
# deployments and versions. We really need to cache the
# results of this per endpoint and keep the results of it for
# some reasonable TTL (like 24 hours) to reduce our round trip
# traffic.
if self._is_session_client():
try:
# Assume that the value of get_endpoint() is something
# we can get the version of. This is a 404 for Nova <
# Mitaka if the service catalog contains project_id.
#
# TODO(sdague): add microversion for when this will
# change
url = "%s" % self.api.client.get_endpoint()
return self._get(url, "version")
except exc.NotFound:
# If that's a 404, we can instead try hacking together
# an endpoint root url by chopping off the last 2 /s.
# This is kind of gross, but we've had this baked in
# so long people got used to this hard coding.
#
# NOTE(sdague): many service providers don't really
# implement GET / in the expected way, if we do a GET
# /v2 that's actually a 300 redirect to
# /v2/... because of how paste works. So adding the
# end slash is really important.
url = "%s/" % url.rsplit("/", 1)[0]
return self._get(url, "version")
else:
# NOTE(andreykurilin): HTTPClient doesn't have ability to send get
# request without token in the url, so `self._get` doesn't work.
all_versions = self.list()
url = self.client.management_url.rsplit("/", 1)[0]
for version in all_versions:
for link in version.links:
if link["href"].rstrip('/') == url:
version.append_request_ids(all_versions.request_ids)
return version
try:
# Assume that the value of get_endpoint() is something
# we can get the version of. This is a 404 for Nova <
# Mitaka if the service catalog contains project_id.
#
# TODO(sdague): add microversion for when this will
# change
url = "%s" % self.api.client.get_endpoint()
return self._get(url, "version")
except exc.NotFound:
# If that's a 404, we can instead try hacking together
# an endpoint root url by chopping off the last 2 /s.
# This is kind of gross, but we've had this baked in
# so long people got used to this hard coding.
#
# NOTE(sdague): many service providers don't really
# implement GET / in the expected way, if we do a GET
# /v2 that's actually a 300 redirect to
# /v2/... because of how paste works. So adding the
# end slash is really important.
url = "%s/" % url.rsplit("/", 1)[0]
return self._get(url, "version")
def get_current(self):
try:
@ -92,13 +78,11 @@ class VersionManager(base.ManagerWithFind):
def list(self):
"""List all versions."""
version_url = None
if self._is_session_client():
# NOTE: "list versions" API needs to be accessed without base
# URI (like "v2/{project-id}"), so here should be a scheme("http",
# etc.) and a hostname.
endpoint = self.api.client.get_endpoint()
url = urllib.parse.urlparse(endpoint)
version_url = '%s://%s/' % (url.scheme, url.netloc)
# NOTE: "list versions" API needs to be accessed without base
# URI (like "v2/{project-id}"), so here should be a scheme("http",
# etc.) and a hostname.
endpoint = self.api.client.get_endpoint()
url = urllib.parse.urlparse(endpoint)
version_url = '%s://%s/' % (url.scheme, url.netloc)
return self._list(version_url, "versions")