Implemented regions tests.

Added ability to test against openstack via direct login.

Change-Id: Icc8cef595a5bef9f4d02446ff9ab9f8cfb3b831b
This commit is contained in:
alexey-mr 2015-10-23 00:57:30 +03:00
parent 0186280b15
commit 765e4a0c57
6 changed files with 157 additions and 36 deletions

View File

@ -46,7 +46,8 @@ if [[ ! -f $TEST_CONFIG_DIR/$TEST_CONFIG ]]; then
project_id=$id
[[ -n "$project_id" ]] || { echo "Can't create project"; exit 1; }
user_name="user-$(cat /dev/urandom | tr -cd 'a-f0-9' | head -c 8)"
eval $(openstack user create "$user_name" --project "$project_id" --password "password" --email "$user_name@example.com" -f shell -c id)
password='qwe123QWE'
eval $(openstack user create "$user_name" --project "$project_id" --password "$password" --email "$user_name@example.com" -f shell -c id)
user_id=$id
[[ -n "$user_id" ]] || { echo "Can't create user"; exit 1; }
# add 'Member' role for swift access
@ -70,7 +71,7 @@ if [[ ! -f $TEST_CONFIG_DIR/$TEST_CONFIG ]]; then
export OS_PROJECT_NAME=$project_name
export OS_TENANT_NAME=$project_name
export OS_USERNAME=$user_name
export OS_PASSWORD="password"
export OS_PASSWORD=$password
sudo bash -c "cat > $TEST_CONFIG_DIR/$TEST_CONFIG <<EOF
[gce]
@ -80,17 +81,22 @@ build_interval=${TIMEOUT:-180}
# GCE API schema
schema=${GCE_SCHEMA:-'etc/gceapi/protocols/v1.json'}
# GCE auth options
cred_type=${GCE_CRED_TYPE:-'os_token'}
auth_url=${OS_AUTH_URL}
username=${OS_USERNAME}
password=${OS_PASSWORD}
# GCE services address
protocol=${GCE_API_PROTOCOL:-'http'}
host=${GCE_HOST:-'localhost'}
port=${GCE_PORT:-8787}
# GCE URLs
auth_url=${GCE_AUTH_URL:-'/auth'}
# GCE API URLs
discovery_url=${GCE_DISCOVERY_URL:-'/discovery/v1/apis/{api}/{apiVersion}/rest'}
# GCE resource IDs for testing
project_id=$project_id
project_id=${OS_PROJECT_NAME}
zone=${ZONE:-'nova'}
region=${REGION:-'RegionOne'}
EOF"

View File

@ -33,13 +33,6 @@ class TestRegions(test_base.GCETestCase):
'Null regions object, api is not built properly')
return res
def test_list(self):
project_id = self.cfg.project_id
self.trace('List regions: project_id={}'.format(project_id))
result = self.regions.list(project=project_id).execute()
self.trace('Regions: {}'.format(result))
self.api.validate_schema(value=result, schema_name='RegionList')
def test_describe(self):
project_id = self.cfg.project_id
region = self.cfg.region
@ -48,3 +41,11 @@ class TestRegions(test_base.GCETestCase):
result = self.regions.get(project=project_id, region=region).execute()
self.trace('Region: {}'.format(result))
self.api.validate_schema(value=result, schema_name='Region')
def test_list(self):
project_id = self.cfg.project_id
self.trace('List regions: project_id={}'.format(project_id))
result = self.regions.list(project=project_id).execute()
self.trace('Regions: {}'.format(result))
self.api.validate_schema(value=result, schema_name='RegionList')
self.assertFind(self.cfg.region, result)

View File

@ -46,8 +46,4 @@ class TestZones(test_base.GCETestCase):
result = self.zones.list(project=project_id).execute()
self.trace('Zones: {}'.format(result))
self.api.validate_schema(value=result, schema_name='ZoneList')
zone = self.cfg.zone
self.assertIn(
zone,
result['items'],
'There is no required zone {} in returned list'.format(zone))
self.assertFind(self.cfg.zone, result)

View File

@ -29,6 +29,23 @@ OPTIONS = [
default=1,
help='Interval'),
# GCE auth options
cfg.StrOpt('cred_type',
default='os_token',
help='Method how to get credentials:'
'\n\tos_token - request token from OS keystone directly'
'\n\tgcloud_auth - use app credentials that should be'
'obtained before via gcloud auth'),
cfg.StrOpt('username',
default='demo',
help='User name'),
cfg.StrOpt('password',
default='qwe123QWE',
help='User password'),
cfg.StrOpt('auth_url',
default='http://localhost:5000/v2.0/',
help='OAuth API relative URL'),
# GCE API schema
cfg.StrOpt('schema',
default='etc/gceapi/protocols/v1.json',
@ -45,15 +62,16 @@ OPTIONS = [
default=8787,
help='GCE service port'),
# GCE URLs
cfg.StrOpt('auth_url',
default='/auth',
help='OAuth API relative URL'),
# GCE API URLs
cfg.StrOpt('discovery_url',
default='/discovery/v1/apis/{api}/{apiVersion}/rest',
help='Discovery API relative URL'),
# GCE resource IDs for testing
# Note that Google's project has Name, ID and Number, for project
# identification ID should be used, but in Openstack project has
# Name and ID, where Name is corresponds to Project ID in Google, ID is
# Openstack ID's and has no relation to Google's ID and Number.
cfg.StrOpt('project_id',
default='test',
help='GCE Project ID for testing'),

View File

@ -0,0 +1,59 @@
# Copyright 2015 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneclient.client import Client as KeystoneClient
from oauth2client.client import AccessTokenCredentials
from oauth2client.client import GoogleCredentials
class CredentialsProvider(object):
def __init__(self, supp):
self._supp = supp
def _trace(self, msg):
self._supp.trace(msg)
def _get_app_credentials(self):
self._trace('Create GoogleCredentials from default app file')
return GoogleCredentials.get_application_default()
def _get_token_crenetials(self):
cfg = self._supp.cfg
auth_data = {
'username': cfg.username,
'password': cfg.password,
'tenant_name': cfg.project_id,
'auth_url': cfg.auth_url
}
self._trace('Auth data {}'.format(auth_data))
client = KeystoneClient(**auth_data)
if not client.authenticate():
raise Exception('Failed to authenticate user')
token = client.auth_token
self._trace('Created token {}'.format(token))
return AccessTokenCredentials(access_token=token,
user_agent='GCE test')
@property
def credentials(self):
cred_type = self._supp.cfg.cred_type
if cred_type == 'os_token':
return self._get_token_crenetials()
elif cred_type == 'gcloud_auth':
return self._get_app_credentials()
else:
raise Exception('Unknown cred_type {}'.format(cred_type))

View File

@ -15,13 +15,15 @@
# under the License.
from gceapi.tests.functional import config
from googleapiclient.discovery import build
from googleapiclient.schema import Schemas
from jsonschema import RefResolver
from jsonschema import validate
from oauth2client.client import GoogleCredentials
from tempest_lib import base
from gceapi.tests.functional import config
from gceapi.tests.functional.credentials import CredentialsProvider
class TestSupp(object):
def __init__(self, *args, **kwargs):
@ -38,32 +40,58 @@ class TestSupp(object):
self._log.trace(*args, **kwargs)
class LocalRefResolver(RefResolver):
def __init__(
self,
base_uri,
referrer,
store=(),
cache_remote=True,
handlers=(),
urljoin_cache=None,
remote_cache=None):
super(LocalRefResolver, self).__init__(base_uri,
referrer,
store,
cache_remote,
handlers,
urljoin_cache,
remote_cache)
self._local_schema = referrer
def resolve_from_url(self, url):
if url in self._local_schema:
return self._local_schema[url]
return super(LocalRefResolver, self).resolve_from_url(url)
class GCEApi(object):
def __init__(self, supp):
self._supp = supp
self._schema = None
def __init__(self, supp, cred_provider):
self._compute = None
self._cred_provider = cred_provider
self._schema = None
self._scheme_ref_resolver = 0
self._supp = supp
def init(self):
self._schema = Schemas(self._supp.cfg.schema)
self._auth()
self._scheme_ref_resolver = LocalRefResolver.from_schema(
self._schema.schemas)
self._build_api()
def _auth(self):
self._trace('Create GoogleCredentials from default app file')
self.credentials = GoogleCredentials.get_application_default()
def _build_api(self):
url = self._get_discovery_url()
credentials = self._cred_provider.credentials
url = self._discovery_url
self._trace(
'Build Google compute api with discovery url {}'.format(url))
self._compute = build(
'compute', 'v1',
credentials=self.credentials,
credentials=credentials,
discoveryServiceUrl=url
)
def _get_discovery_url(self):
@property
def _discovery_url(self):
cfg = self._supp.cfg
return '{}://{}:{}{}'.format(
cfg.protocol,
@ -81,7 +109,8 @@ class GCEApi(object):
return self._compute
def validate_schema(self, value, schema_name):
validate(value, self._schema[schema_name])
schema = self._schema.get(schema_name)
validate(value, schema, resolver=self._scheme_ref_resolver)
class GCETestCase(base.BaseTestCase):
@ -101,6 +130,18 @@ class GCETestCase(base.BaseTestCase):
@classmethod
def setUpClass(cls):
cls._supp = TestSupp()
cls._api = GCEApi(cls._supp)
cls._api = GCEApi(cls._supp, CredentialsProvider(cls._supp))
cls._api.init()
super(GCETestCase, cls).setUpClass()
def assertFind(self, item, items_list):
found = False
items = items_list['items']
for i in items:
if i['name'] == item:
found = True
break
self.assertTrue(
found,
'There is no required item {} in the list {}'.format(item, items))