Adds unittests on auth

Implements: bp unit-tests
Change-Id: I504af1532a2955c778f0456870b08be2bee64176
This commit is contained in:
Andrei V. Ostapenko 2014-07-11 22:32:08 +03:00
parent bfab94b6c3
commit ccf0d270c4
13 changed files with 354 additions and 31 deletions

View File

@ -1,4 +1,5 @@
# Copyright 2012 OpenStack Foundation.
# Copyright 2014 Symantec Corporation
# Copyright 2012 OpenStack Foundation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -192,7 +193,6 @@ class HTTPClient(object):
try:
body = json.loads(resp.text)
except ValueError:
pass
body = None
else:
body = None
@ -224,8 +224,7 @@ class HTTPClient(object):
try:
self.authenticate()
kwargs['headers']['X-Auth-Token'] = self.auth_token
resp, body = self._time_request(self.management_url + url,
method, **kwargs)
resp, body = self._time_request(url, method, **kwargs)
return resp, body
except exceptions.Unauthorized:
raise ex
@ -254,7 +253,8 @@ class HTTPClient(object):
kwargs.setdefault('headers', {})
if self.auth_token is None:
self.auth_token = ""
kwargs['headers']['X-Auth-Token'] = self.auth_token
if self.auth_token:
kwargs['headers']['X-Auth-Token'] = self.auth_token
resp, body = self._cs_request(self.endpoint_url + url, method,
**kwargs)
return resp, body
@ -324,8 +324,8 @@ class HTTPClient(object):
headers = {'Content-Type': 'application/json'}
body = json.dumps(creds)
resp, body = self._cs_request(req_url, 'POST',
headers=headers, body=body)
resp, body = self._time_request(req_url, 'POST',
headers=headers, body=body)
self._extract_service_catalog_v3(resp, body)
@ -347,8 +347,9 @@ class HTTPClient(object):
token_url = self.auth_url + "/tokens"
# Make sure we follow redirects when trying to reach Keystone
resp, resp_body = self._cs_request(token_url, "POST",
body=json.dumps(body))
resp, resp_body = self._time_request(token_url, "POST",
headers={},
body=json.dumps(body))
status_code = self.get_status_code(resp)
if status_code != 200:
raise exceptions.Unauthorized(message=resp_body)
@ -363,6 +364,8 @@ class HTTPClient(object):
def authenticate(self):
if self.auth_strategy == 'keystone':
if not self.auth_url:
raise exceptions.NoAuthURLProvided()
auth_api = self.auth_url.split('/')[-1]
if auth_api == 'v2.0':
self._authenticate_keystone()
@ -388,7 +391,6 @@ class HTTPClient(object):
self.authenticate()
return self.endpoint_url
body = json.loads(body)
for endpoint in body.get('endpoints', []):
if (endpoint['type'] == 'kv-storage' and
endpoint.get('region') == self.region_name):
@ -406,12 +408,5 @@ class HTTPClient(object):
'endpoint_url': self.endpoint_url}
def get_status_code(self, response):
"""Returns the integer status code from the response.
Either a Webob.Response (used in testing) or httplib.Response
is returned.
"""
if hasattr(response, 'status_int'):
return response.status_int
else:
return response.status_code
"""Returns the integer status code from the response."""
return response.status_code

View File

@ -1,4 +1,4 @@
# Copyright 2012 OpenStack Foundation.
# Copyright 2014 Symantec Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,5 @@
# Copyright 2012 OpenStack Foundation.
# Copyright 2014 Symantec Corporation
# Copyright 2012 OpenStack Foundation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2012 OpenStack Foundation.
# Copyright 2014 Symantec Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2012 OpenStack Foundation.
# Copyright 2014 Symantec Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,5 @@
# Copyright 2012 OpenStack Foundation.
# Copyright 2014 Symantec Corporation
# Copyright 2012 OpenStack Foundation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,5 @@
# Copyright 2014 Symantec Corporation
# Copyright 2012 OpenStack Foundation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -0,0 +1,325 @@
# Copyright 2014 Symantec Corporation
# Copyright 2012 OpenStack Foundation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
import uuid
import mock
import requests
import testtools
from magnetodbclient import client
from magnetodbclient.common import exceptions
USERNAME = 'testuser'
USER_ID = 'testuser_id'
TENANT_NAME = 'testtenant'
TENANT_ID = 'testtenant_id'
PASSWORD = 'password'
AUTH_URL = 'https://127.0.0.1:5000/v2.0'
ENDPOINT_URL = 'localurl'
ENDPOINT_OVERRIDE = 'otherurl'
TOKEN = 'tokentoken'
REGION = 'RegionTest'
NOAUTH = 'noauth'
KS_TOKEN_RESULT = {
'access': {
'token': {'id': TOKEN,
'expires': '2012-08-11T07:49:01Z',
'tenant': {'id': str(uuid.uuid1())}},
'user': {'id': str(uuid.uuid1())},
'serviceCatalog': [
{'endpoints_links': [],
'endpoints': [{'adminURL': ENDPOINT_URL,
'internalURL': ENDPOINT_URL,
'publicURL': ENDPOINT_URL,
'region': REGION}],
'type': 'kv-storage',
'name': 'MagnetoDB'}
]
}
}
ENDPOINTS_RESULT = {
'endpoints': [{
'type': 'kv-storage',
'name': 'MagnetoDB',
'region': REGION,
'adminURL': ENDPOINT_URL,
'internalURL': ENDPOINT_URL,
'publicURL': ENDPOINT_URL
}]
}
def get_response(status_code, headers=None):
response = mock.Mock(return_value=requests.Response)
response.headers = headers or {}
response.status_code = status_code
return response
class CLITestAuthNoAuth(testtools.TestCase):
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthNoAuth, self).setUp()
self.client = client.HTTPClient(username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
endpoint_url=ENDPOINT_URL,
auth_strategy=NOAUTH,
region_name=REGION)
def test_get_noauth(self):
with mock.patch.object(self.client, "request"):
res200 = get_response(200)
self.client.request.return_value = (res200, '')
self.client.do_request('/resource', 'GET')
self.client.request.assert_called_ones_with(
ENDPOINT_URL + '/resource',
'GET',
headers={})
self.assertEqual(self.client.endpoint_url, ENDPOINT_URL)
class CLITestAuthKeystone(testtools.TestCase):
# Auth Body expected
auth_body = ('{"auth": {"tenantName": "testtenant", '
'"passwordCredentials": '
'{"username": "testuser", "password": "password"}}}')
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthKeystone, self).setUp()
self.client = client.HTTPClient(username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
def test_reused_token_get_auth_info(self):
"""Test that Client.get_auth_info() works even if client was
instantiated with predefined token.
"""
client_ = client.HTTPClient(username=USERNAME,
tenant_name=TENANT_NAME,
token=TOKEN,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
expected = {'auth_token': TOKEN,
'auth_tenant_id': None,
'auth_user_id': None,
'endpoint_url': self.client.endpoint_url}
self.assertEqual(client_.get_auth_info(), expected)
def test_get_token(self):
with mock.patch.object(self.client, "request"):
res200 = get_response(200)
self.client.request.return_value = (res200, KS_TOKEN_RESULT)
self.client.do_request('/resource', 'GET')
self.client.request.assert_called_ones_with(
AUTH_URL + '/tokens', 'POST',
body=self.auth_body, headers={})
self.assertEqual(self.client.endpoint_url, ENDPOINT_URL)
self.assertEqual(self.client.auth_token, TOKEN)
def test_refresh_token(self):
with mock.patch.object(self.client, "request"):
self.client.auth_token = TOKEN
self.client.endpoint_url = ENDPOINT_URL
res200 = get_response(200)
return_values = [exceptions.Unauthorized(),
(res200, KS_TOKEN_RESULT),
(res200, '')]
def request_mock(*args, **kwargs):
value = return_values.pop(0)
if isinstance(value, Exception):
raise value
else:
return value
self.client.request.side_effect = request_mock
self.client.do_request('/resource', 'GET')
self.client.request.assert_has_calls([
mock.call(
ENDPOINT_URL + '/resource', 'GET',
headers={'X-Auth-Token': TOKEN}),
mock.call(
AUTH_URL + '/tokens', 'POST',
headers={},
body=self.auth_body),
mock.call(
ENDPOINT_URL + '/resource', 'GET',
headers={'X-Auth-Token': TOKEN})
])
def test_refresh_token_no_auth_url(self):
with mock.patch.object(self.client, "request"):
self.client.auth_url = None
self.client.auth_token = TOKEN
self.client.endpoint_url = ENDPOINT_URL
self.client.request.side_effect = exceptions.Unauthorized
self.assertRaises(exceptions.NoAuthURLProvided,
self.client.do_request,
'/resource',
'GET')
def test_get_endpoint_url_with_invalid_auth_url(self):
# Handle the case when auth_url is not provided
self.client.auth_url = None
self.assertRaises(exceptions.NoAuthURLProvided,
self.client._get_endpoint_url)
def test_get_endpoint_url(self):
with mock.patch.object(self.client, "request"):
self.client.auth_token = TOKEN
res200 = get_response(200)
self.client.request.side_effect = [(res200, ENDPOINTS_RESULT),
(res200, '')]
self.client.do_request('/resource', 'GET')
self.client.request.assert_has_calls(
[mock.call(AUTH_URL + '/tokens/%s/endpoints' % TOKEN, 'GET',
headers={'X-Auth-Token': TOKEN}),
mock.call(ENDPOINT_URL + '/resource', 'GET',
headers={'X-Auth-Token': TOKEN})]
)
def test_use_given_endpoint_url(self):
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION,
endpoint_url=ENDPOINT_OVERRIDE)
self.assertEqual(self.client.endpoint_url, ENDPOINT_OVERRIDE)
with mock.patch.object(self.client, "request"):
self.client.auth_token = TOKEN
res200 = get_response(200)
self.client.request.return_value = (res200, '')
self.client.do_request('/resource', 'GET')
self.client.request.assert_called_ones_with(
ENDPOINT_OVERRIDE + '/resource', 'GET',
headers={'X-Auth-Token': TOKEN})
self.assertEqual(self.client.endpoint_url, ENDPOINT_OVERRIDE)
def test_get_endpoint_url_other(self):
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='otherURL')
with mock.patch.object(self.client, "request"):
self.client.auth_token = TOKEN
res200 = get_response(200)
self.client.request.return_value = (res200, ENDPOINTS_RESULT)
self.assertRaises(exceptions.EndpointTypeNotFound,
self.client.do_request,
'/resource',
'GET')
self.client.request.assert_called_ones_with(
AUTH_URL + '/tokens/%s/endpoints' % TOKEN, 'GET',
headers={'X-Auth-Token': TOKEN})
def test_get_endpoint_url_failed(self):
with mock.patch.object(self.client, "request"):
self.client.auth_token = TOKEN
res200 = get_response(200)
return_values = [exceptions.Unauthorized(),
(res200, KS_TOKEN_RESULT),
(res200, ENDPOINTS_RESULT),
(res200, '')]
def request_mock(*args, **kwargs):
value = return_values.pop(0)
if isinstance(value, Exception):
raise value
else:
return value
self.client.request.side_effect = request_mock
self.client.do_request('/resource', 'GET')
self.client.request.assert_has_calls([
mock.call(AUTH_URL + '/tokens/%s/endpoints' % TOKEN, 'GET',
headers={'X-Auth-Token': TOKEN}),
mock.call(AUTH_URL + '/tokens', 'POST',
body=self.auth_body, headers={}),
mock.call(AUTH_URL + '/tokens/%s/endpoints' % TOKEN, 'GET',
headers={'X-Auth-Token': TOKEN}),
mock.call(ENDPOINT_URL + '/resource', 'GET',
headers={'X-Auth-Token': TOKEN})
])
def test_endpoint_type(self):
resources = copy.deepcopy(KS_TOKEN_RESULT)
endpoints = resources['access']['serviceCatalog'][0]['endpoints'][0]
endpoints['internalURL'] = 'internal'
endpoints['adminURL'] = 'admin'
endpoints['publicURL'] = 'public'
# Test default behavior is to choose public.
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION)
self.client._extract_service_catalog(resources)
self.assertEqual(self.client.endpoint_url, 'public')
# Test admin url
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='adminURL')
self.client._extract_service_catalog(resources)
self.assertEqual(self.client.endpoint_url, 'admin')
# Test public url
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='publicURL')
self.client._extract_service_catalog(resources)
self.assertEqual(self.client.endpoint_url, 'public')
# Test internal url
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='internalURL')
self.client._extract_service_catalog(resources)
self.assertEqual(self.client.endpoint_url, 'internal')
# Test url that isn't found in the service catalog
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='privateURL')
self.assertRaises(exceptions.EndpointTypeNotFound,
self.client._extract_service_catalog,
resources)

View File

@ -1,4 +1,5 @@
# Copyright (C) 2014 Symantec Corp.
# Copyright 2014 Symantec Corporation
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -122,6 +123,7 @@ class ShellTest(testtools.TestCase):
run_subcommand_p = mock.patch.object(magnetodb_shell, 'run_subcommand')
manager_p.start()
run_subcommand_p.start()
clientmanager.ClientManager.__init__.return_value = None
self.addCleanup(manager_p.stop)
self.addCleanup(run_subcommand_p.stop)
cmdline = ('--os-username test '

View File

@ -1,4 +1,4 @@
# Copyright 2012 OpenStack Foundation.
# Copyright 2014 Symantec Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2012 OpenStack Foundation.
# Copyright 2014 Symantec Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright 2012 OpenStack Foundation.
# Copyright 2014 Symantec Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,4 @@
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
# Copyright 2014 Symantec Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# @author: Carl Baldwin, Hewlett-Packard
from pbr import version