Rename CloudConfig to CloudRegion

The name CloudConfig has always sucked. While working on the next patch,
it occurred to me that what a CloudConfig represents is the config for a
given region of a cloud. We even reference "Cloud Region" as a unit of
work when giving conference talks.

Take the opportunity while we're doing this sdk/occ merge to rename it.
Obviously we can provide naming compat shim in OCC itself.

Leave in *some* naming compat shims to get us past the 0.10 release so
that OSC continues to have the happies.

Change-Id: Ia0bbc20eb28a3a36e69adba3a4b45323e4aa284e
This commit is contained in:
Monty Taylor 2018-01-05 18:10:00 -06:00
parent 1c06fd3798
commit 18fe7b4e2b
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
25 changed files with 971 additions and 894 deletions

View File

@ -80,7 +80,7 @@ Connection
----------
The :class:`openstack.connection.Connection` class builds atop a
:class:`os_client_config.config.CloudConfig` object, and provides a higher
:class:`os_client_config.config.CloudRegion` object, and provides a higher
level interface constructed of ``Proxy`` objects from each of the services.
The ``Connection`` class' primary purpose is to act as a high-level interface

View File

@ -20,9 +20,9 @@ Get a named cloud.
import openstack.config
cloud_config = openstack.config.OpenStackConfig().get_one_cloud(
cloud_region = openstack.config.OpenStackConfig().get_one(
'internap', region_name='ams01')
print(cloud_config.name, cloud_config.region, cloud_config.config)
print(cloud_region.name, cloud_region.region, cloud_region.config)
Or, get all of the clouds.
@ -30,9 +30,9 @@ Or, get all of the clouds.
import openstack.config
cloud_config = openstack.config.OpenStackConfig().get_all_clouds()
for cloud in cloud_config:
print(cloud.name, cloud.region, cloud.config)
cloud_regions = openstack.config.OpenStackConfig().get_all()
for cloud_region in cloud_regions:
print(cloud_region.name, cloud_region.region, cloud_region.config)
argparse
--------
@ -49,13 +49,13 @@ with - as well as a consumption argument.
import openstack.config
cloud_config = openstack.config.OpenStackConfig()
config = openstack.config.OpenStackConfig()
parser = argparse.ArgumentParser()
cloud_config.register_argparse_arguments(parser, sys.argv)
config.register_argparse_arguments(parser, sys.argv)
options = parser.parse_args()
cloud = cloud_config.get_one_cloud(argparse=options)
cloud_region = config.get_one(argparse=options)
Constructing a Connection object
--------------------------------
@ -89,8 +89,8 @@ If you want to do the same thing but also support command line parsing.
conn = openstack.config.make_connection(options=argparse.ArgumentParser())
Constructing cloud objects
--------------------------
Constructing OpenStackCloud objects
-----------------------------------
If what you want to do is get an
`opentack.cloud.openstackcloud.OpenStackCloud` object, a

View File

@ -64,7 +64,7 @@ OpenStack services.
connection
Once you have a *Connection* instance, the following services may be exposed
to you. The combination of your ``CloudConfig`` and the catalog of the cloud
to you. The combination of your ``CloudRegion`` and the catalog of the cloud
in question control which services are exposed, but listed below are the ones
provided by the SDK.

View File

@ -73,7 +73,7 @@ def create_connection_from_args():
config = occ.OpenStackConfig()
config.register_argparse_arguments(parser, sys.argv[1:])
args = parser.parse_args()
return openstack.connect(config=config.get_one_cloud(argparse=args))
return openstack.connect(config=config.get_one(argparse=args))
def create_connection(auth_url, region, project_name, username, password):

View File

@ -81,20 +81,18 @@ def openstack_clouds(
return [
OpenStackCloud(
cloud=f.name, debug=debug,
cloud_config=f,
strict=strict,
**f.config)
for f in config.get_all_clouds()
cloud_config=cloud_region,
strict=strict)
for cloud_region in config.get_all()
]
else:
return [
OpenStackCloud(
cloud=f.name, debug=debug,
cloud_config=f,
strict=strict,
**f.config)
for f in config.get_all_clouds()
if f.name == cloud
cloud_config=cloud_region,
strict=strict)
for cloud_region in config.get_all()
if cloud_region.name == cloud
]
except keystoneauth1.exceptions.auth_plugins.NoMatchingPlugin as e:
raise OpenStackCloudException(
@ -110,11 +108,11 @@ def openstack_cloud(
if not config:
config = _get_openstack_config(app_name, app_version)
try:
cloud_config = config.get_one_cloud(**kwargs)
cloud_region = config.get_one(**kwargs)
except keystoneauth1.exceptions.auth_plugins.NoMatchingPlugin as e:
raise OpenStackCloudException(
"Invalid cloud configuration: {exc}".format(exc=str(e)))
return OpenStackCloud(cloud_config=cloud_config, strict=strict)
return OpenStackCloud(cloud_config=cloud_region, strict=strict)
# TODO(shade) This wants to be renamed before we make a release - there is
@ -126,11 +124,11 @@ def operator_cloud(
if not config:
config = _get_openstack_config(app_name, app_version)
try:
cloud_config = config.get_one_cloud(**kwargs)
cloud_region = config.get_one(**kwargs)
except keystoneauth1.exceptions.auth_plugins.NoMatchingPlugin as e:
raise OpenStackCloudException(
"Invalid cloud configuration: {exc}".format(exc=str(e)))
return OperatorCloud(cloud_config=cloud_config, strict=strict)
return OperatorCloud(cloud_config=cloud_region, strict=strict)
def connect(*args, **kwargs):

View File

@ -37,14 +37,14 @@ class OpenStackInventory(object):
if cloud is None:
self.clouds = [
openstack.OpenStackCloud(cloud_config=cloud_config)
for cloud_config in config.get_all_clouds()
openstack.OpenStackCloud(cloud_config=cloud_region)
for cloud_region in config.get_all()
]
else:
try:
self.clouds = [
openstack.OpenStackCloud(
cloud_config=config.get_one_cloud(cloud))
cloud_config=config.get_one(cloud))
]
except openstack.config.exceptions.OpenStackConfigException as e:
raise openstack.OpenStackCloudException(e)

View File

@ -132,7 +132,7 @@ class OpenStackCloud(_normalize.Normalizer):
string. Optional, defaults to None.
:param app_version: Version of the application to be appended to the
user-agent string. Optional, defaults to None.
:param CloudConfig cloud_config: Cloud config object from os-client-config
:param CloudRegion cloud_config: Cloud config object from os-client-config
In the future, this will be the only way
to pass in cloud configuration, but is
being phased in currently.
@ -157,7 +157,7 @@ class OpenStackCloud(_normalize.Normalizer):
config = openstack.config.OpenStackConfig(
app_name=app_name, app_version=app_version)
cloud_config = config.get_one_cloud(**kwargs)
cloud_config = config.get_one(**kwargs)
self.name = cloud_config.name
self.auth = cloud_config.get_auth_args()
@ -375,6 +375,8 @@ class OpenStackCloud(_normalize.Normalizer):
for key, value in kwargs.items():
params['auth'][key] = value
# TODO(mordred) Replace this chunk with the next patch that allows
# passing a Session to CloudRegion.
# Closure to pass to OpenStackConfig to ensure the new cloud shares
# the Session with the current cloud. This will ensure that version
# discovery cache will be re-used.
@ -384,7 +386,7 @@ class OpenStackCloud(_normalize.Normalizer):
return keystoneauth1.session.Session(session=self.keystone_session)
# Use cloud='defaults' so that we overlay settings properly
cloud_config = config.get_one_cloud(
cloud_config = config.get_one(
cloud='defaults',
session_constructor=session_constructor,
**params)

View File

@ -35,7 +35,7 @@ def get_config(
else:
parsed_options = None
return _config.get_one_cloud(options=parsed_options, **kwargs)
return _config.get_one(options=parsed_options, **kwargs)
def make_rest_client(
@ -54,11 +54,11 @@ def make_rest_client(
get_session_client on it. This function is to make it easy to poke
at OpenStack REST APIs with a properly configured keystone session.
"""
cloud = get_config(
cloud_region = get_config(
service_key=service_key, options=options,
app_name=app_name, app_version=app_version,
**kwargs)
return cloud.get_session_client(service_key, version=version)
return cloud_region.get_session_client(service_key, version=version)
# Backwards compat - simple_client was a terrible name
simple_client = make_rest_client
# Backwards compat - session_client was a terrible name
@ -74,8 +74,8 @@ def make_connection(options=None, **kwargs):
:rtype: :class:`~openstack.connection.Connection`
"""
from openstack import connection
cloud = get_config(options=options, **kwargs)
return connection.from_config(cloud_config=cloud, options=options)
cloud_region = get_config(options=options, **kwargs)
return connection.from_config(cloud_region=cloud_region, options=options)
def make_cloud(options=None, **kwargs):
@ -86,5 +86,5 @@ def make_cloud(options=None, **kwargs):
:rtype: :class:`~openstack.OpenStackCloud`
"""
import openstack.cloud
cloud = get_config(options=options, **kwargs)
return openstack.OpenStackCloud(cloud_config=cloud, **kwargs)
cloud_region = get_config(options=options, **kwargs)
return openstack.OpenStackCloud(cloud_config=cloud_region, **kwargs)

View File

@ -1,4 +1,4 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
# Copyright (c) 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -12,583 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import math
import warnings
# TODO(mordred) This is only here to ease the OSC transition
from keystoneauth1 import adapter
import keystoneauth1.exceptions.catalog
from keystoneauth1 import session
import requestsexceptions
import openstack
from openstack import _log
from openstack.config import constructors
from openstack.config import exceptions
from openstack.config import cloud_region
def _get_client(service_key):
class_mapping = constructors.get_constructor_mapping()
if service_key not in class_mapping:
raise exceptions.OpenStackConfigException(
"Service {service_key} is unkown. Please pass in a client"
" constructor or submit a patch to os-client-config".format(
service_key=service_key))
mod_name, ctr_name = class_mapping[service_key].rsplit('.', 1)
lib_name = mod_name.split('.')[0]
try:
mod = importlib.import_module(mod_name)
except ImportError:
raise exceptions.OpenStackConfigException(
"Client for '{service_key}' was requested, but"
" {mod_name} was unable to be imported. Either import"
" the module yourself and pass the constructor in as an argument,"
" or perhaps you do not have python-{lib_name} installed.".format(
service_key=service_key,
mod_name=mod_name,
lib_name=lib_name))
try:
ctr = getattr(mod, ctr_name)
except AttributeError:
raise exceptions.OpenStackConfigException(
"Client for '{service_key}' was requested, but although"
" {mod_name} imported fine, the constructor at {fullname}"
" as not found. Please check your installation, we have no"
" clue what is wrong with your computer.".format(
service_key=service_key,
mod_name=mod_name,
fullname=class_mapping[service_key]))
return ctr
class CloudConfig(cloud_region.CloudRegion):
def _make_key(key, service_type):
if not service_type:
return key
else:
service_type = service_type.lower().replace('-', '_')
return "_".join([service_type, key])
class CloudConfig(object):
def __init__(self, name, region, config,
force_ipv4=False, auth_plugin=None,
openstack_config=None, session_constructor=None,
app_name=None, app_version=None):
self.name = name
def __init__(self, name, region, config, **kwargs):
super(CloudConfig, self).__init__(name, region, config, **kwargs)
self.region = region
self.config = config
self.log = _log.setup_logging(__name__)
self._force_ipv4 = force_ipv4
self._auth = auth_plugin
self._openstack_config = openstack_config
self._keystone_session = None
self._session_constructor = session_constructor or session.Session
self._app_name = app_name
self._app_version = app_version
def __getattr__(self, key):
"""Return arbitrary attributes."""
if key.startswith('os_'):
key = key[3:]
if key in [attr.replace('-', '_') for attr in self.config]:
return self.config[key]
else:
return None
def __iter__(self):
return self.config.__iter__()
def __eq__(self, other):
return (self.name == other.name and self.region == other.region
and self.config == other.config)
def __ne__(self, other):
return not self == other
def set_session_constructor(self, session_constructor):
"""Sets the Session constructor."""
self._session_constructor = session_constructor
def get_requests_verify_args(self):
"""Return the verify and cert values for the requests library."""
if self.config['verify'] and self.config['cacert']:
verify = self.config['cacert']
else:
verify = self.config['verify']
if self.config['cacert']:
warnings.warn(
"You are specifying a cacert for the cloud {0} but "
"also to ignore the host verification. The host SSL cert "
"will not be verified.".format(self.name))
cert = self.config.get('cert', None)
if cert:
if self.config['key']:
cert = (cert, self.config['key'])
return (verify, cert)
def get_services(self):
"""Return a list of service types we know something about."""
services = []
for key, val in self.config.items():
if (key.endswith('api_version')
or key.endswith('service_type')
or key.endswith('service_name')):
services.append("_".join(key.split('_')[:-2]))
return list(set(services))
def get_auth_args(self):
return self.config['auth']
def get_interface(self, service_type=None):
key = _make_key('interface', service_type)
interface = self.config.get('interface')
return self.config.get(key, interface)
def get_region_name(self, service_type=None):
if not service_type:
return self.region
key = _make_key('region_name', service_type)
return self.config.get(key, self.region)
def get_api_version(self, service_type):
key = _make_key('api_version', service_type)
return self.config.get(key, None)
def get_service_type(self, service_type):
key = _make_key('service_type', service_type)
# Cinder did an evil thing where they defined a second service
# type in the catalog. Of course, that's insane, so let's hide this
# atrocity from the as-yet-unsullied eyes of our users.
# Of course, if the user requests a volumev2, that structure should
# still work.
# What's even more amazing is that they did it AGAIN with cinder v3
# And then I learned that mistral copied it.
# TODO(shade) This should get removed when we have os-service-types
# alias support landed in keystoneauth.
if service_type in ('volume', 'block-storage'):
vol_ver = self.get_api_version('volume')
if vol_ver and vol_ver.startswith('2'):
service_type = 'volumev2'
elif vol_ver and vol_ver.startswith('3'):
service_type = 'volumev3'
elif service_type == 'workflow':
wk_ver = self.get_api_version(service_type)
if wk_ver and wk_ver.startswith('2'):
service_type = 'workflowv2'
return self.config.get(key, service_type)
def get_service_name(self, service_type):
key = _make_key('service_name', service_type)
return self.config.get(key, None)
def get_endpoint(self, service_type):
key = _make_key('endpoint_override', service_type)
old_key = _make_key('endpoint', service_type)
return self.config.get(key, self.config.get(old_key, None))
@property
def prefer_ipv6(self):
return not self._force_ipv4
@property
def force_ipv4(self):
return self._force_ipv4
def get_auth(self):
"""Return a keystoneauth plugin from the auth credentials."""
return self._auth
def get_session(self):
"""Return a keystoneauth session based on the auth credentials."""
if self._keystone_session is None:
if not self._auth:
raise exceptions.OpenStackConfigException(
"Problem with auth parameters")
(verify, cert) = self.get_requests_verify_args()
# Turn off urllib3 warnings about insecure certs if we have
# explicitly configured requests to tell it we do not want
# cert verification
if not verify:
self.log.debug(
"Turning off SSL warnings for {cloud}:{region}"
" since verify=False".format(
cloud=self.name, region=self.region))
requestsexceptions.squelch_warnings(insecure_requests=not verify)
self._keystone_session = self._session_constructor(
auth=self._auth,
verify=verify,
cert=cert,
timeout=self.config['api_timeout'])
if hasattr(self._keystone_session, 'additional_user_agent'):
self._keystone_session.additional_user_agent.append(
('openstacksdk', openstack.__version__))
# Using old keystoneauth with new os-client-config fails if
# we pass in app_name and app_version. Those are not essential,
# nor a reason to bump our minimum, so just test for the session
# having the attribute post creation and set them then.
if hasattr(self._keystone_session, 'app_name'):
self._keystone_session.app_name = self._app_name
if hasattr(self._keystone_session, 'app_version'):
self._keystone_session.app_version = self._app_version
return self._keystone_session
def get_service_catalog(self):
"""Helper method to grab the service catalog."""
return self._auth.get_access(self.get_session()).service_catalog
def _get_version_args(self, service_key, version):
"""Translate OCC version args to those needed by ksa adapter.
If no version is requested explicitly and we have a configured version,
set the version parameter and let ksa deal with expanding that to
min=ver.0, max=ver.latest.
If version is set, pass it through.
If version is not set and we don't have a configured version, default
to latest.
"""
if version == 'latest':
return None, None, 'latest'
if not version:
version = self.get_api_version(service_key)
if not version:
return None, None, 'latest'
return version, None, None
def get_session_client(self, service_key, version=None):
"""Return a prepped requests adapter for a given service.
This is useful for making direct requests calls against a
'mounted' endpoint. That is, if you do:
client = get_session_client('compute')
then you can do:
client.get('/flavors')
and it will work like you think.
"""
(version, min_version, max_version) = self._get_version_args(
service_key, version)
return adapter.Adapter(
session=self.get_session(),
service_type=self.get_service_type(service_key),
service_name=self.get_service_name(service_key),
interface=self.get_interface(service_key),
region_name=self.get_region_name(service_key),
version=version,
min_version=min_version,
max_version=max_version)
def _get_highest_endpoint(self, service_types, kwargs):
session = self.get_session()
for service_type in service_types:
kwargs['service_type'] = service_type
try:
# Return the highest version we find that matches
# the request
return session.get_endpoint(**kwargs)
except keystoneauth1.exceptions.catalog.EndpointNotFound:
pass
def get_session_endpoint(
self, service_key, min_version=None, max_version=None):
"""Return the endpoint from config or the catalog.
If a configuration lists an explicit endpoint for a service,
return that. Otherwise, fetch the service catalog from the
keystone session and return the appropriate endpoint.
:param service_key: Generic key for service, such as 'compute' or
'network'
"""
override_endpoint = self.get_endpoint(service_key)
if override_endpoint:
return override_endpoint
endpoint = None
kwargs = {
'service_name': self.get_service_name(service_key),
'region_name': self.region
}
kwargs['interface'] = self.get_interface(service_key)
if service_key == 'volume' and not self.get_api_version('volume'):
# If we don't have a configured cinder version, we can't know
# to request a different service_type
min_version = float(min_version or 1)
max_version = float(max_version or 3)
min_major = math.trunc(float(min_version))
max_major = math.trunc(float(max_version))
versions = range(int(max_major) + 1, int(min_major), -1)
service_types = []
for version in versions:
if version == 1:
service_types.append('volume')
else:
service_types.append('volumev{v}'.format(v=version))
else:
service_types = [self.get_service_type(service_key)]
endpoint = self._get_highest_endpoint(service_types, kwargs)
if not endpoint:
self.log.warning(
"Keystone catalog entry not found ("
"service_type=%s,service_name=%s"
"interface=%s,region_name=%s)",
service_key,
kwargs['service_name'],
kwargs['interface'],
kwargs['region_name'])
return endpoint
def get_legacy_client(
self, service_key, client_class=None, interface_key=None,
pass_version_arg=True, version=None, min_version=None,
max_version=None, **kwargs):
"""Return a legacy OpenStack client object for the given config.
Most of the OpenStack python-*client libraries have the same
interface for their client constructors, but there are several
parameters one wants to pass given a :class:`CloudConfig` object.
In the future, OpenStack API consumption should be done through
the OpenStack SDK, but that's not ready yet. This is for getting
Client objects from python-*client only.
:param service_key: Generic key for service, such as 'compute' or
'network'
:param client_class: Class of the client to be instantiated. This
should be the unversioned version if there
is one, such as novaclient.client.Client, or
the versioned one, such as
neutronclient.v2_0.client.Client if there isn't
:param interface_key: (optional) Some clients, such as glanceclient
only accept the parameter 'interface' instead
of 'endpoint_type' - this is a get-out-of-jail
parameter for those until they can be aligned.
os-client-config understands this to be the
case if service_key is image, so this is really
only for use with other unknown broken clients.
:param pass_version_arg: (optional) If a versioned Client constructor
was passed to client_class, set this to
False, which will tell get_client to not
pass a version parameter. os-client-config
already understand that this is the
case for network, so it can be omitted in
that case.
:param version: (optional) Version string to override the configured
version string.
:param min_version: (options) Minimum version acceptable.
:param max_version: (options) Maximum version acceptable.
:param kwargs: (optional) keyword args are passed through to the
Client constructor, so this is in case anything
additional needs to be passed in.
"""
if not client_class:
client_class = _get_client(service_key)
interface = self.get_interface(service_key)
# trigger exception on lack of service
endpoint = self.get_session_endpoint(
service_key, min_version=min_version, max_version=max_version)
endpoint_override = self.get_endpoint(service_key)
if service_key == 'object-store':
constructor_kwargs = dict(
session=self.get_session(),
os_options=dict(
service_type=self.get_service_type(service_key),
object_storage_url=endpoint_override,
region_name=self.region))
else:
constructor_kwargs = dict(
session=self.get_session(),
service_name=self.get_service_name(service_key),
service_type=self.get_service_type(service_key),
endpoint_override=endpoint_override,
region_name=self.region)
if service_key == 'image':
# os-client-config does not depend on glanceclient, but if
# the user passed in glanceclient.client.Client, which they
# would need to do if they were requesting 'image' - then
# they necessarily have glanceclient installed
from glanceclient.common import utils as glance_utils
endpoint, detected_version = glance_utils.strip_version(endpoint)
# If the user has passed in a version, that's explicit, use it
if not version:
version = detected_version
# If the user has passed in or configured an override, use it.
# Otherwise, ALWAYS pass in an endpoint_override becuase
# we've already done version stripping, so we don't want version
# reconstruction to happen twice
if not endpoint_override:
constructor_kwargs['endpoint_override'] = endpoint
constructor_kwargs.update(kwargs)
if pass_version_arg and service_key != 'object-store':
if not version:
version = self.get_api_version(service_key)
if not version and service_key == 'volume':
from cinderclient import client as cinder_client
version = cinder_client.get_volume_api_from_url(endpoint)
# Temporary workaround while we wait for python-openstackclient
# to be able to handle 2.0 which is what neutronclient expects
if service_key == 'network' and version == '2':
version = '2.0'
if service_key == 'identity':
# Workaround for bug#1513839
if 'endpoint' not in constructor_kwargs:
endpoint = self.get_session_endpoint('identity')
constructor_kwargs['endpoint'] = endpoint
if service_key == 'network':
constructor_kwargs['api_version'] = version
elif service_key == 'baremetal':
if version != '1':
# Set Ironic Microversion
constructor_kwargs['os_ironic_api_version'] = version
# Version arg is the major version, not the full microstring
constructor_kwargs['version'] = version[0]
else:
constructor_kwargs['version'] = version
if min_version and min_version > float(version):
raise exceptions.OpenStackConfigVersionException(
"Minimum version {min_version} requested but {version}"
" found".format(min_version=min_version, version=version),
version=version)
if max_version and max_version < float(version):
raise exceptions.OpenStackConfigVersionException(
"Maximum version {max_version} requested but {version}"
" found".format(max_version=max_version, version=version),
version=version)
if service_key == 'database':
# TODO(mordred) Remove when https://review.openstack.org/314032
# has landed and released. We're passing in a Session, but the
# trove Client object has username and password as required
# args
constructor_kwargs['username'] = None
constructor_kwargs['password'] = None
if not interface_key:
if service_key in ('image', 'key-manager'):
interface_key = 'interface'
elif (service_key == 'identity'
and version and version.startswith('3')):
interface_key = 'interface'
else:
interface_key = 'endpoint_type'
if service_key == 'object-store':
constructor_kwargs['os_options'][interface_key] = interface
else:
constructor_kwargs[interface_key] = interface
return client_class(**constructor_kwargs)
def get_cache_expiration_time(self):
if self._openstack_config:
return self._openstack_config.get_cache_expiration_time()
def get_cache_path(self):
if self._openstack_config:
return self._openstack_config.get_cache_path()
def get_cache_class(self):
if self._openstack_config:
return self._openstack_config.get_cache_class()
def get_cache_arguments(self):
if self._openstack_config:
return self._openstack_config.get_cache_arguments()
def get_cache_expiration(self):
if self._openstack_config:
return self._openstack_config.get_cache_expiration()
def get_cache_resource_expiration(self, resource, default=None):
"""Get expiration time for a resource
:param resource: Name of the resource type
:param default: Default value to return if not found (optional,
defaults to None)
:returns: Expiration time for the resource type as float or default
"""
if self._openstack_config:
expiration = self._openstack_config.get_cache_expiration()
if resource not in expiration:
return default
return float(expiration[resource])
def requires_floating_ip(self):
"""Return whether or not this cloud requires floating ips.
:returns: True of False if know, None if discovery is needed.
If requires_floating_ip is not configured but the cloud is
known to not provide floating ips, will return False.
"""
if self.config['floating_ip_source'] == "None":
return False
return self.config.get('requires_floating_ip')
def get_external_networks(self):
"""Get list of network names for external networks."""
return [
net['name'] for net in self.config['networks']
if net['routes_externally']]
def get_external_ipv4_networks(self):
"""Get list of network names for external IPv4 networks."""
return [
net['name'] for net in self.config['networks']
if net['routes_ipv4_externally']]
def get_external_ipv6_networks(self):
"""Get list of network names for external IPv6 networks."""
return [
net['name'] for net in self.config['networks']
if net['routes_ipv6_externally']]
def get_internal_networks(self):
"""Get list of network names for internal networks."""
return [
net['name'] for net in self.config['networks']
if not net['routes_externally']]
def get_internal_ipv4_networks(self):
"""Get list of network names for internal IPv4 networks."""
return [
net['name'] for net in self.config['networks']
if not net['routes_ipv4_externally']]
def get_internal_ipv6_networks(self):
"""Get list of network names for internal IPv6 networks."""
return [
net['name'] for net in self.config['networks']
if not net['routes_ipv6_externally']]
def get_default_network(self):
"""Get network used for default interactions."""
for net in self.config['networks']:
if net['default_interface']:
return net['name']
return None
def get_nat_destination(self):
"""Get network used for NAT destination."""
for net in self.config['networks']:
if net['nat_destination']:
return net['name']
return None
def get_nat_source(self):
"""Get network used for NAT source."""
for net in self.config['networks']:
if net.get('nat_source'):
return net['name']
return None

View File

@ -0,0 +1,599 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import math
import warnings
from keystoneauth1 import adapter
import keystoneauth1.exceptions.catalog
from keystoneauth1 import session
import requestsexceptions
import openstack
from openstack import _log
from openstack.config import constructors
from openstack.config import exceptions
def _get_client(service_key):
class_mapping = constructors.get_constructor_mapping()
if service_key not in class_mapping:
raise exceptions.OpenStackConfigException(
"Service {service_key} is unkown. Please pass in a client"
" constructor or submit a patch to os-client-config".format(
service_key=service_key))
mod_name, ctr_name = class_mapping[service_key].rsplit('.', 1)
lib_name = mod_name.split('.')[0]
try:
mod = importlib.import_module(mod_name)
except ImportError:
raise exceptions.OpenStackConfigException(
"Client for '{service_key}' was requested, but"
" {mod_name} was unable to be imported. Either import"
" the module yourself and pass the constructor in as an argument,"
" or perhaps you do not have python-{lib_name} installed.".format(
service_key=service_key,
mod_name=mod_name,
lib_name=lib_name))
try:
ctr = getattr(mod, ctr_name)
except AttributeError:
raise exceptions.OpenStackConfigException(
"Client for '{service_key}' was requested, but although"
" {mod_name} imported fine, the constructor at {fullname}"
" as not found. Please check your installation, we have no"
" clue what is wrong with your computer.".format(
service_key=service_key,
mod_name=mod_name,
fullname=class_mapping[service_key]))
return ctr
def _make_key(key, service_type):
if not service_type:
return key
else:
service_type = service_type.lower().replace('-', '_')
return "_".join([service_type, key])
class CloudRegion(object):
"""The configuration for a Region of an OpenStack Cloud.
A CloudRegion encapsulates the config information needed for connections
to all of the services in a Region of a Cloud.
"""
def __init__(self, name, region, config,
force_ipv4=False, auth_plugin=None,
openstack_config=None, session_constructor=None,
app_name=None, app_version=None):
self.name = name
self.region = region
self.config = config
self.log = _log.setup_logging(__name__)
self._force_ipv4 = force_ipv4
self._auth = auth_plugin
self._openstack_config = openstack_config
self._keystone_session = None
self._session_constructor = session_constructor or session.Session
self._app_name = app_name
self._app_version = app_version
def __getattr__(self, key):
"""Return arbitrary attributes."""
if key.startswith('os_'):
key = key[3:]
if key in [attr.replace('-', '_') for attr in self.config]:
return self.config[key]
else:
return None
def __iter__(self):
return self.config.__iter__()
def __eq__(self, other):
return (self.name == other.name and self.region == other.region
and self.config == other.config)
def __ne__(self, other):
return not self == other
def set_session_constructor(self, session_constructor):
"""Sets the Session constructor."""
self._session_constructor = session_constructor
def get_requests_verify_args(self):
"""Return the verify and cert values for the requests library."""
if self.config['verify'] and self.config['cacert']:
verify = self.config['cacert']
else:
verify = self.config['verify']
if self.config['cacert']:
warnings.warn(
"You are specifying a cacert for the cloud {0} but "
"also to ignore the host verification. The host SSL cert "
"will not be verified.".format(self.name))
cert = self.config.get('cert', None)
if cert:
if self.config['key']:
cert = (cert, self.config['key'])
return (verify, cert)
def get_services(self):
"""Return a list of service types we know something about."""
services = []
for key, val in self.config.items():
if (key.endswith('api_version')
or key.endswith('service_type')
or key.endswith('service_name')):
services.append("_".join(key.split('_')[:-2]))
return list(set(services))
def get_auth_args(self):
return self.config['auth']
def get_interface(self, service_type=None):
key = _make_key('interface', service_type)
interface = self.config.get('interface')
return self.config.get(key, interface)
def get_region_name(self, service_type=None):
if not service_type:
return self.region
key = _make_key('region_name', service_type)
return self.config.get(key, self.region)
def get_api_version(self, service_type):
key = _make_key('api_version', service_type)
return self.config.get(key, None)
def get_service_type(self, service_type):
key = _make_key('service_type', service_type)
# Cinder did an evil thing where they defined a second service
# type in the catalog. Of course, that's insane, so let's hide this
# atrocity from the as-yet-unsullied eyes of our users.
# Of course, if the user requests a volumev2, that structure should
# still work.
# What's even more amazing is that they did it AGAIN with cinder v3
# And then I learned that mistral copied it.
# TODO(shade) This should get removed when we have os-service-types
# alias support landed in keystoneauth.
if service_type in ('volume', 'block-storage'):
vol_ver = self.get_api_version('volume')
if vol_ver and vol_ver.startswith('2'):
service_type = 'volumev2'
elif vol_ver and vol_ver.startswith('3'):
service_type = 'volumev3'
elif service_type == 'workflow':
wk_ver = self.get_api_version(service_type)
if wk_ver and wk_ver.startswith('2'):
service_type = 'workflowv2'
return self.config.get(key, service_type)
def get_service_name(self, service_type):
key = _make_key('service_name', service_type)
return self.config.get(key, None)
def get_endpoint(self, service_type):
key = _make_key('endpoint_override', service_type)
old_key = _make_key('endpoint', service_type)
return self.config.get(key, self.config.get(old_key, None))
@property
def prefer_ipv6(self):
return not self._force_ipv4
@property
def force_ipv4(self):
return self._force_ipv4
def get_auth(self):
"""Return a keystoneauth plugin from the auth credentials."""
return self._auth
def get_session(self):
"""Return a keystoneauth session based on the auth credentials."""
if self._keystone_session is None:
if not self._auth:
raise exceptions.OpenStackConfigException(
"Problem with auth parameters")
(verify, cert) = self.get_requests_verify_args()
# Turn off urllib3 warnings about insecure certs if we have
# explicitly configured requests to tell it we do not want
# cert verification
if not verify:
self.log.debug(
"Turning off SSL warnings for {cloud}:{region}"
" since verify=False".format(
cloud=self.name, region=self.region))
requestsexceptions.squelch_warnings(insecure_requests=not verify)
self._keystone_session = self._session_constructor(
auth=self._auth,
verify=verify,
cert=cert,
timeout=self.config['api_timeout'])
if hasattr(self._keystone_session, 'additional_user_agent'):
self._keystone_session.additional_user_agent.append(
('openstacksdk', openstack.__version__))
# Using old keystoneauth with new os-client-config fails if
# we pass in app_name and app_version. Those are not essential,
# nor a reason to bump our minimum, so just test for the session
# having the attribute post creation and set them then.
if hasattr(self._keystone_session, 'app_name'):
self._keystone_session.app_name = self._app_name
if hasattr(self._keystone_session, 'app_version'):
self._keystone_session.app_version = self._app_version
return self._keystone_session
def get_service_catalog(self):
"""Helper method to grab the service catalog."""
return self._auth.get_access(self.get_session()).service_catalog
def _get_version_args(self, service_key, version):
"""Translate OCC version args to those needed by ksa adapter.
If no version is requested explicitly and we have a configured version,
set the version parameter and let ksa deal with expanding that to
min=ver.0, max=ver.latest.
If version is set, pass it through.
If version is not set and we don't have a configured version, default
to latest.
"""
if version == 'latest':
return None, None, 'latest'
if not version:
version = self.get_api_version(service_key)
if not version:
return None, None, 'latest'
return version, None, None
def get_session_client(self, service_key, version=None):
"""Return a prepped requests adapter for a given service.
This is useful for making direct requests calls against a
'mounted' endpoint. That is, if you do:
client = get_session_client('compute')
then you can do:
client.get('/flavors')
and it will work like you think.
"""
(version, min_version, max_version) = self._get_version_args(
service_key, version)
return adapter.Adapter(
session=self.get_session(),
service_type=self.get_service_type(service_key),
service_name=self.get_service_name(service_key),
interface=self.get_interface(service_key),
region_name=self.get_region_name(service_key),
version=version,
min_version=min_version,
max_version=max_version)
def _get_highest_endpoint(self, service_types, kwargs):
session = self.get_session()
for service_type in service_types:
kwargs['service_type'] = service_type
try:
# Return the highest version we find that matches
# the request
return session.get_endpoint(**kwargs)
except keystoneauth1.exceptions.catalog.EndpointNotFound:
pass
def get_session_endpoint(
self, service_key, min_version=None, max_version=None):
"""Return the endpoint from config or the catalog.
If a configuration lists an explicit endpoint for a service,
return that. Otherwise, fetch the service catalog from the
keystone session and return the appropriate endpoint.
:param service_key: Generic key for service, such as 'compute' or
'network'
"""
override_endpoint = self.get_endpoint(service_key)
if override_endpoint:
return override_endpoint
endpoint = None
kwargs = {
'service_name': self.get_service_name(service_key),
'region_name': self.region
}
kwargs['interface'] = self.get_interface(service_key)
if service_key == 'volume' and not self.get_api_version('volume'):
# If we don't have a configured cinder version, we can't know
# to request a different service_type
min_version = float(min_version or 1)
max_version = float(max_version or 3)
min_major = math.trunc(float(min_version))
max_major = math.trunc(float(max_version))
versions = range(int(max_major) + 1, int(min_major), -1)
service_types = []
for version in versions:
if version == 1:
service_types.append('volume')
else:
service_types.append('volumev{v}'.format(v=version))
else:
service_types = [self.get_service_type(service_key)]
endpoint = self._get_highest_endpoint(service_types, kwargs)
if not endpoint:
self.log.warning(
"Keystone catalog entry not found ("
"service_type=%s,service_name=%s"
"interface=%s,region_name=%s)",
service_key,
kwargs['service_name'],
kwargs['interface'],
kwargs['region_name'])
return endpoint
def get_legacy_client(
self, service_key, client_class=None, interface_key=None,
pass_version_arg=True, version=None, min_version=None,
max_version=None, **kwargs):
"""Return a legacy OpenStack client object for the given config.
Most of the OpenStack python-*client libraries have the same
interface for their client constructors, but there are several
parameters one wants to pass given a :class:`CloudRegion` object.
In the future, OpenStack API consumption should be done through
the OpenStack SDK, but that's not ready yet. This is for getting
Client objects from python-*client only.
:param service_key: Generic key for service, such as 'compute' or
'network'
:param client_class: Class of the client to be instantiated. This
should be the unversioned version if there
is one, such as novaclient.client.Client, or
the versioned one, such as
neutronclient.v2_0.client.Client if there isn't
:param interface_key: (optional) Some clients, such as glanceclient
only accept the parameter 'interface' instead
of 'endpoint_type' - this is a get-out-of-jail
parameter for those until they can be aligned.
os-client-config understands this to be the
case if service_key is image, so this is really
only for use with other unknown broken clients.
:param pass_version_arg: (optional) If a versioned Client constructor
was passed to client_class, set this to
False, which will tell get_client to not
pass a version parameter. os-client-config
already understand that this is the
case for network, so it can be omitted in
that case.
:param version: (optional) Version string to override the configured
version string.
:param min_version: (options) Minimum version acceptable.
:param max_version: (options) Maximum version acceptable.
:param kwargs: (optional) keyword args are passed through to the
Client constructor, so this is in case anything
additional needs to be passed in.
"""
if not client_class:
client_class = _get_client(service_key)
interface = self.get_interface(service_key)
# trigger exception on lack of service
endpoint = self.get_session_endpoint(
service_key, min_version=min_version, max_version=max_version)
endpoint_override = self.get_endpoint(service_key)
if service_key == 'object-store':
constructor_kwargs = dict(
session=self.get_session(),
os_options=dict(
service_type=self.get_service_type(service_key),
object_storage_url=endpoint_override,
region_name=self.region))
else:
constructor_kwargs = dict(
session=self.get_session(),
service_name=self.get_service_name(service_key),
service_type=self.get_service_type(service_key),
endpoint_override=endpoint_override,
region_name=self.region)
if service_key == 'image':
# os-client-config does not depend on glanceclient, but if
# the user passed in glanceclient.client.Client, which they
# would need to do if they were requesting 'image' - then
# they necessarily have glanceclient installed
from glanceclient.common import utils as glance_utils
endpoint, detected_version = glance_utils.strip_version(endpoint)
# If the user has passed in a version, that's explicit, use it
if not version:
version = detected_version
# If the user has passed in or configured an override, use it.
# Otherwise, ALWAYS pass in an endpoint_override becuase
# we've already done version stripping, so we don't want version
# reconstruction to happen twice
if not endpoint_override:
constructor_kwargs['endpoint_override'] = endpoint
constructor_kwargs.update(kwargs)
if pass_version_arg and service_key != 'object-store':
if not version:
version = self.get_api_version(service_key)
if not version and service_key == 'volume':
from cinderclient import client as cinder_client
version = cinder_client.get_volume_api_from_url(endpoint)
# Temporary workaround while we wait for python-openstackclient
# to be able to handle 2.0 which is what neutronclient expects
if service_key == 'network' and version == '2':
version = '2.0'
if service_key == 'identity':
# Workaround for bug#1513839
if 'endpoint' not in constructor_kwargs:
endpoint = self.get_session_endpoint('identity')
constructor_kwargs['endpoint'] = endpoint
if service_key == 'network':
constructor_kwargs['api_version'] = version
elif service_key == 'baremetal':
if version != '1':
# Set Ironic Microversion
constructor_kwargs['os_ironic_api_version'] = version
# Version arg is the major version, not the full microstring
constructor_kwargs['version'] = version[0]
else:
constructor_kwargs['version'] = version
if min_version and min_version > float(version):
raise exceptions.OpenStackConfigVersionException(
"Minimum version {min_version} requested but {version}"
" found".format(min_version=min_version, version=version),
version=version)
if max_version and max_version < float(version):
raise exceptions.OpenStackConfigVersionException(
"Maximum version {max_version} requested but {version}"
" found".format(max_version=max_version, version=version),
version=version)
if service_key == 'database':
# TODO(mordred) Remove when https://review.openstack.org/314032
# has landed and released. We're passing in a Session, but the
# trove Client object has username and password as required
# args
constructor_kwargs['username'] = None
constructor_kwargs['password'] = None
if not interface_key:
if service_key in ('image', 'key-manager'):
interface_key = 'interface'
elif (service_key == 'identity'
and version and version.startswith('3')):
interface_key = 'interface'
else:
interface_key = 'endpoint_type'
if service_key == 'object-store':
constructor_kwargs['os_options'][interface_key] = interface
else:
constructor_kwargs[interface_key] = interface
return client_class(**constructor_kwargs)
def get_cache_expiration_time(self):
if self._openstack_config:
return self._openstack_config.get_cache_expiration_time()
def get_cache_path(self):
if self._openstack_config:
return self._openstack_config.get_cache_path()
def get_cache_class(self):
if self._openstack_config:
return self._openstack_config.get_cache_class()
def get_cache_arguments(self):
if self._openstack_config:
return self._openstack_config.get_cache_arguments()
def get_cache_expiration(self):
if self._openstack_config:
return self._openstack_config.get_cache_expiration()
def get_cache_resource_expiration(self, resource, default=None):
"""Get expiration time for a resource
:param resource: Name of the resource type
:param default: Default value to return if not found (optional,
defaults to None)
:returns: Expiration time for the resource type as float or default
"""
if self._openstack_config:
expiration = self._openstack_config.get_cache_expiration()
if resource not in expiration:
return default
return float(expiration[resource])
def requires_floating_ip(self):
"""Return whether or not this cloud requires floating ips.
:returns: True of False if know, None if discovery is needed.
If requires_floating_ip is not configured but the cloud is
known to not provide floating ips, will return False.
"""
if self.config['floating_ip_source'] == "None":
return False
return self.config.get('requires_floating_ip')
def get_external_networks(self):
"""Get list of network names for external networks."""
return [
net['name'] for net in self.config['networks']
if net['routes_externally']]
def get_external_ipv4_networks(self):
"""Get list of network names for external IPv4 networks."""
return [
net['name'] for net in self.config['networks']
if net['routes_ipv4_externally']]
def get_external_ipv6_networks(self):
"""Get list of network names for external IPv6 networks."""
return [
net['name'] for net in self.config['networks']
if net['routes_ipv6_externally']]
def get_internal_networks(self):
"""Get list of network names for internal networks."""
return [
net['name'] for net in self.config['networks']
if not net['routes_externally']]
def get_internal_ipv4_networks(self):
"""Get list of network names for internal IPv4 networks."""
return [
net['name'] for net in self.config['networks']
if not net['routes_ipv4_externally']]
def get_internal_ipv6_networks(self):
"""Get list of network names for internal IPv6 networks."""
return [
net['name'] for net in self.config['networks']
if not net['routes_ipv6_externally']]
def get_default_network(self):
"""Get network used for default interactions."""
for net in self.config['networks']:
if net['default_interface']:
return net['name']
return None
def get_nat_destination(self):
"""Get network used for NAT destination."""
for net in self.config['networks']:
if net['nat_destination']:
return net['name']
return None
def get_nat_source(self):
"""Get network used for NAT source."""
for net in self.config['networks']:
if net.get('nat_source'):
return net['name']
return None

View File

@ -29,7 +29,7 @@ from keystoneauth1 import loading
import yaml
from openstack import _log
from openstack.config import cloud_config
from openstack.config import cloud_region
from openstack.config import defaults
from openstack.config import exceptions
from openstack.config import vendors
@ -707,7 +707,7 @@ class OpenStackConfig(object):
# for from the user passing it explicitly. We'll stash it for later
local_parser.add_argument('--timeout', metavar='<timeout>')
# We need for get_one_cloud to be able to peek at whether a token
# We need for get_one to be able to peek at whether a token
# was passed so that we can swap the default from password to
# token if it was. And we need to also peek for --os-auth-token
# for novaclient backwards compat
@ -729,8 +729,8 @@ class OpenStackConfig(object):
# the rest of the arguments given are invalid for the plugin
# chosen (for instance, --help may be requested, so that the
# user can see what options he may want to give
cloud = self.get_one_cloud(argparse=options, validate=False)
default_auth_type = cloud.config['auth_type']
cloud_region = self.get_one(argparse=options, validate=False)
default_auth_type = cloud_region.config['auth_type']
try:
loading.register_auth_argparse_arguments(
@ -802,16 +802,18 @@ class OpenStackConfig(object):
new_cloud['api_timeout'] = new_cloud.pop('timeout')
return new_cloud
def get_all_clouds(self):
def get_all(self):
clouds = []
for cloud in self.get_cloud_names():
for region in self._get_regions(cloud):
if region:
clouds.append(self.get_one_cloud(
clouds.append(self.get_one(
cloud, region_name=region['name']))
return clouds
# TODO(mordred) Backwards compat for OSC transition
get_all_clouds = get_all
def _fix_args(self, args=None, argparse=None):
"""Massage the passed-in options
@ -1022,9 +1024,9 @@ class OpenStackConfig(object):
return config
def get_one_cloud(self, cloud=None, validate=True,
argparse=None, **kwargs):
"""Retrieve a single cloud configuration and merge additional options
def get_one(
self, cloud=None, validate=True, argparse=None, **kwargs):
"""Retrieve a single CloudRegion and merge additional options
:param string cloud:
The name of the configuration to load from clouds.yaml
@ -1038,6 +1040,7 @@ class OpenStackConfig(object):
:param region_name: Name of the region of the cloud.
:param kwargs: Additional configuration options
:returns: openstack.config.cloud_region.CloudRegion
:raises: keystoneauth1.exceptions.MissingRequiredOptions
on missing required auth parameters
"""
@ -1101,7 +1104,7 @@ class OpenStackConfig(object):
cloud_name = ''
else:
cloud_name = str(cloud)
return cloud_config.CloudConfig(
return cloud_region.CloudRegion(
name=cloud_name,
region=config['region_name'],
config=config,
@ -1112,6 +1115,8 @@ class OpenStackConfig(object):
app_name=self._app_name,
app_version=self._app_version,
)
# TODO(mordred) Backwards compat for OSC transition
get_one_cloud = get_one
def get_one_cloud_osc(
self,
@ -1120,7 +1125,7 @@ class OpenStackConfig(object):
argparse=None,
**kwargs
):
"""Retrieve a single cloud configuration and merge additional options
"""Retrieve a single CloudRegion and merge additional options
:param string cloud:
The name of the configuration to load from clouds.yaml
@ -1196,7 +1201,7 @@ class OpenStackConfig(object):
cloud_name = ''
else:
cloud_name = str(cloud)
return cloud_config.CloudConfig(
return cloud_region.CloudRegion(
name=cloud_name,
region=config['region_name'],
config=self._normalize_keys(config),

View File

@ -31,13 +31,13 @@ settings in your clouds.yaml file and refer to them by name.::
conn = connection.Connection(cloud='example', region_name='earth1')
If you already have an :class:`~openstack.config.cloud_config.CloudConfig`
If you already have an :class:`~openstack.config.cloud_region.CloudRegion`
you can pass it in instead.::
from openstack import connection
import openstack.config
config = openstack.config.OpenStackConfig.get_one_cloud(
config = openstack.config.OpenStackConfig.get_one(
cloud='example', region_name='earth')
conn = connection.Connection(config=config)
@ -80,9 +80,10 @@ import sys
import keystoneauth1.exceptions
import os_service_types
from six.moves import urllib
import openstack.config
from openstack.config import cloud_config
from openstack.config import cloud_region
from openstack import exceptions
from openstack import proxy
from openstack import proxy2
@ -92,36 +93,36 @@ from openstack import utils
_logger = logging.getLogger(__name__)
def from_config(cloud_name=None, cloud_config=None, options=None):
def from_config(cloud=None, config=None, options=None, **kwargs):
"""Create a Connection using openstack.config
:param str cloud_name: Use the `cloud_name` configuration details when
creating the Connection instance.
:param cloud_config: An instance of
`openstack.config.loader.OpenStackConfig`
as returned from openstack.config.
If no `config` is provided,
`openstack.config.OpenStackConfig` will be called,
and the provided `cloud_name` will be used in
determining which cloud's configuration details
will be used in creation of the
`Connection` instance.
:param options: A namespace object; allows direct passing in of options to
be added to the cloud config. This does not have to be an
instance of argparse.Namespace, despite the naming of the
the `openstack.config.loader.OpenStackConfig.get_one_cloud`
argument to which it is passed.
:param str cloud:
Use the `cloud` configuration details when creating the Connection.
:param openstack.config.cloud_region.CloudRegion config:
An existing CloudRegion configuration. If no `config` is provided,
`openstack.config.OpenStackConfig` will be called, and the provided
`name` will be used in determining which cloud's configuration
details will be used in creation of the `Connection` instance.
:param argparse.Namespace options:
Allows direct passing in of options to be added to the cloud config.
This does not have to be an actual instance of argparse.Namespace,
despite the naming of the the
`openstack.config.loader.OpenStackConfig.get_one` argument to which
it is passed.
:rtype: :class:`~openstack.connection.Connection`
"""
if cloud_config is None:
occ = openstack.config.OpenStackConfig()
cloud_config = occ.get_one_cloud(cloud=cloud_name, argparse=options)
# TODO(mordred) Backwards compat while we transition
cloud = cloud or kwargs.get('cloud_name')
config = config or kwargs.get('cloud_config')
if config is None:
config = openstack.config.OpenStackConfig().get_one(
cloud=cloud, argparse=options)
if cloud_config.debug:
if config.debug:
utils.enable_logging(True, stream=sys.stdout)
return Connection(config=cloud_config)
return Connection(config=config)
class Connection(object):
@ -142,18 +143,18 @@ class Connection(object):
name ``envvars`` may be used to consume a cloud configured via ``OS_``
environment variables.
A pre-existing :class:`~openstack.config.cloud_config.CloudConfig`
A pre-existing :class:`~openstack.config.cloud_region.CloudRegion`
object can be passed in lieu of a cloud name, for cases where the user
already has a fully formed CloudConfig and just wants to use it.
already has a fully formed CloudRegion and just wants to use it.
Similarly, if for some reason the user already has a
:class:`~keystoneauth1.session.Session` and wants to use it, it may be
passed in.
:param str cloud: Name of the cloud from config to use.
:param config: CloudConfig object representing the config for the
:param config: CloudRegion object representing the config for the
region of the cloud in question.
:type config: :class:`~openstack.config.cloud_config.CloudConfig`
:type config: :class:`~openstack.config.cloud_region.CloudRegion`
:param session: A session object compatible with
:class:`~keystoneauth1.session.Session`.
:type session: :class:`~keystoneauth1.session.Session`
@ -168,22 +169,22 @@ class Connection(object):
transition.
:param kwargs: If a config is not provided, the rest of the parameters
provided are assumed to be arguments to be passed to the
CloudConfig contructor.
CloudRegion contructor.
"""
self.config = config
self.service_type_manager = os_service_types.ServiceTypes()
if not self.config:
openstack_config = openstack.config.OpenStackConfig(
app_name=app_name, app_version=app_version,
load_yaml_config=profile is None)
if profile:
# TODO(shade) Remove this once we've shifted
# python-openstackclient to not use the profile interface.
self.config = self._get_config_from_profile(
openstack_config, profile, authenticator, **kwargs)
profile, authenticator, **kwargs)
else:
self.config = openstack_config.get_one_cloud(
openstack_config = openstack.config.OpenStackConfig(
app_name=app_name, app_version=app_version,
load_yaml_config=profile is None)
self.config = openstack_config.get_one(
cloud=cloud, validate=session is None, **kwargs)
self.task_manager = task_manager.TaskManager(
@ -197,30 +198,32 @@ class Connection(object):
self._open()
def _get_config_from_profile(
self, openstack_config, profile, authenticator, **kwargs):
def _get_config_from_profile(self, profile, authenticator, **kwargs):
"""Get openstack.config objects from legacy profile."""
# TODO(shade) Remove this once we've shifted python-openstackclient
# to not use the profile interface.
config = openstack_config.get_one_cloud(
cloud='defaults', validate=False, **kwargs)
config._auth = authenticator
# We don't have a cloud name. Make one up from the auth_url hostname
# so that log messages work.
name = urllib.parse.urlparse(authenticator.auth_url).hostname
region_name = None
for service in profile.get_services():
if service.region:
region_name = service.region
service_type = service.service_type
if service.interface:
key = cloud_config._make_key('interface', service_type)
config.config[key] = service.interface
if service.region:
key = cloud_config._make_key('region_name', service_type)
config.config[key] = service.region
key = cloud_region._make_key('interface', service_type)
kwargs[key] = service.interface
if service.version:
version = service.version
if version.startswith('v'):
version = version[1:]
key = cloud_config._make_key('api_version', service_type)
config.config[key] = service.version
return config
key = cloud_region._make_key('api_version', service_type)
kwargs[key] = service.version
config = cloud_region.CloudRegion(
name=name, region=region_name, config=kwargs)
config._auth = authenticator
def _open(self):
"""Open the connection. """

View File

@ -22,24 +22,16 @@ from openstack.tests import base
#: file, typically in $HOME/.config/openstack/clouds.yaml. That configuration
#: will determine where the functional tests will be run and what resource
#: defaults will be used to run the functional tests.
TEST_CLOUD = os.getenv('OS_CLOUD', 'devstack-admin')
class Opts(object):
def __init__(self, cloud_name='devstack-admin', debug=False):
self.cloud = cloud_name
self.debug = debug
TEST_CLOUD_NAME = os.getenv('OS_CLOUD', 'devstack-admin')
TEST_CLOUD_REGION = openstack.config.get_config(cloud=TEST_CLOUD_NAME)
def _get_resource_value(resource_key, default):
try:
return cloud.config['functional'][resource_key]
return TEST_CLOUD_REGION.config['functional'][resource_key]
except KeyError:
return default
opts = Opts(cloud_name=TEST_CLOUD)
occ = openstack.config.OpenStackConfig()
cloud = occ.get_one_cloud(opts.cloud, argparse=opts)
IMAGE_NAME = _get_resource_value('image_name', 'cirros-0.3.5-x86_64-disk')
FLAVOR_NAME = _get_resource_value('flavor_name', 'm1.small')
@ -49,7 +41,7 @@ class BaseFunctionalTest(base.TestCase):
def setUp(self):
super(BaseFunctionalTest, self).setUp()
self.conn = connection.from_config(cloud_name=TEST_CLOUD)
self.conn = connection.Connection(config=TEST_CLOUD_REGION)
def addEmptyCleanup(self, func, *args, **kwargs):
def cleanup():

View File

@ -36,14 +36,14 @@ class BaseFunctionalTestCase(base.TestCase):
self.operator_cloud.cloud_config.get_api_version('identity')
def _set_user_cloud(self, **kwargs):
user_config = self.config.get_one_cloud(
user_config = self.config.get_one(
cloud=self._demo_name, **kwargs)
self.user_cloud = openstack.OpenStackCloud(
cloud_config=user_config,
log_inner_exceptions=True)
def _set_operator_cloud(self, **kwargs):
operator_config = self.config.get_one_cloud(
operator_config = self.config.get_one(
cloud=self._op_name, **kwargs)
self.operator_cloud = openstack.OperatorCloud(
cloud_config=operator_config,

View File

@ -26,7 +26,7 @@ class TestImage(base.BaseFunctionalTest):
super(TestImage, self).setUp()
opts = self.ImageOpts()
self.conn = connection.from_config(
cloud_name=base.TEST_CLOUD, options=opts)
cloud_name=base.TEST_CLOUD_NAME, options=opts)
self.img = self.conn.image.upload_image(
name=TEST_IMAGE_NAME,

View File

@ -117,7 +117,7 @@ class BaseTestCase(base.TestCase):
config_files=[config.name],
vendor_files=[vendor.name],
secure_files=['non-existant'])
self.cloud_config = self.config.get_one_cloud(
self.cloud_config = self.config.get_one(
cloud=test_cloud, validate=False)
self.cloud = openstack.OpenStackCloud(
cloud_config=self.cloud_config,
@ -141,7 +141,7 @@ class TestCase(BaseTestCase):
super(TestCase, self).setUp(cloud_config_fixture=cloud_config_fixture)
self.session_fixture = self.useFixture(fixtures.MonkeyPatch(
'openstack.config.cloud_config.CloudConfig.get_session',
'openstack.config.cloud_region.CloudRegion.get_session',
mock.Mock()))
@ -461,7 +461,7 @@ class RequestsMockTestCase(BaseTestCase):
def _make_test_cloud(self, cloud_name='_test_cloud_', **kwargs):
test_cloud = os.environ.get('OPENSTACKSDK_OS_CLOUD', cloud_name)
self.cloud_config = self.config.get_one_cloud(
self.cloud_config = self.config.get_one(
cloud=test_cloud, validate=True, **kwargs)
self.conn = openstack.connection.Connection(
config=self.cloud_config)

View File

@ -28,7 +28,7 @@ class TestInventory(base.TestCase):
@mock.patch("openstack.config.loader.OpenStackConfig")
@mock.patch("openstack.OpenStackCloud")
def test__init(self, mock_cloud, mock_config):
mock_config.return_value.get_all_clouds.return_value = [{}]
mock_config.return_value.get_all.return_value = [{}]
inv = inventory.OpenStackInventory()
@ -37,12 +37,12 @@ class TestInventory(base.TestCase):
)
self.assertIsInstance(inv.clouds, list)
self.assertEqual(1, len(inv.clouds))
self.assertTrue(mock_config.return_value.get_all_clouds.called)
self.assertTrue(mock_config.return_value.get_all.called)
@mock.patch("openstack.config.loader.OpenStackConfig")
@mock.patch("openstack.OpenStackCloud")
def test__init_one_cloud(self, mock_cloud, mock_config):
mock_config.return_value.get_one_cloud.return_value = [{}]
mock_config.return_value.get_one.return_value = [{}]
inv = inventory.OpenStackInventory(cloud='supercloud')
@ -51,8 +51,8 @@ class TestInventory(base.TestCase):
)
self.assertIsInstance(inv.clouds, list)
self.assertEqual(1, len(inv.clouds))
self.assertFalse(mock_config.return_value.get_all_clouds.called)
mock_config.return_value.get_one_cloud.assert_called_once_with(
self.assertFalse(mock_config.return_value.get_all.called)
mock_config.return_value.get_one.assert_called_once_with(
'supercloud')
@mock.patch("openstack.config.loader.OpenStackConfig")
@ -62,19 +62,19 @@ class TestInventory(base.TestCase):
Test that when os-client-config can't find a named cloud, a
shade exception is emitted.
"""
mock_config.return_value.get_one_cloud.side_effect = (
mock_config.return_value.get_one.side_effect = (
occ_exc.OpenStackConfigException()
)
self.assertRaises(exc.OpenStackCloudException,
inventory.OpenStackInventory,
cloud='supercloud')
mock_config.return_value.get_one_cloud.assert_called_once_with(
mock_config.return_value.get_one.assert_called_once_with(
'supercloud')
@mock.patch("openstack.config.loader.OpenStackConfig")
@mock.patch("openstack.OpenStackCloud")
def test_list_hosts(self, mock_cloud, mock_config):
mock_config.return_value.get_all_clouds.return_value = [{}]
mock_config.return_value.get_all.return_value = [{}]
inv = inventory.OpenStackInventory()
@ -93,7 +93,7 @@ class TestInventory(base.TestCase):
@mock.patch("openstack.config.loader.OpenStackConfig")
@mock.patch("openstack.OpenStackCloud")
def test_list_hosts_no_detail(self, mock_cloud, mock_config):
mock_config.return_value.get_all_clouds.return_value = [{}]
mock_config.return_value.get_all.return_value = [{}]
inv = inventory.OpenStackInventory()
@ -112,7 +112,7 @@ class TestInventory(base.TestCase):
@mock.patch("openstack.config.loader.OpenStackConfig")
@mock.patch("openstack.OpenStackCloud")
def test_search_hosts(self, mock_cloud, mock_config):
mock_config.return_value.get_all_clouds.return_value = [{}]
mock_config.return_value.get_all.return_value = [{}]
inv = inventory.OpenStackInventory()
@ -128,7 +128,7 @@ class TestInventory(base.TestCase):
@mock.patch("openstack.config.loader.OpenStackConfig")
@mock.patch("openstack.OpenStackCloud")
def test_get_host(self, mock_cloud, mock_config):
mock_config.return_value.get_all_clouds.return_value = [{}]
mock_config.return_value.get_all.return_value = [{}]
inv = inventory.OpenStackInventory()

View File

@ -15,7 +15,7 @@ import testtools
import openstack
from openstack.cloud import exc
from openstack.config import cloud_config
from openstack.config import cloud_region
from openstack.tests import fakes
from openstack.tests.unit import base
@ -25,13 +25,13 @@ class TestOperatorCloud(base.RequestsMockTestCase):
def test_operator_cloud(self):
self.assertIsInstance(self.op_cloud, openstack.OperatorCloud)
@mock.patch.object(cloud_config.CloudConfig, 'get_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_endpoint')
def test_get_session_endpoint_provided(self, fake_get_endpoint):
fake_get_endpoint.return_value = 'http://fake.url'
self.assertEqual(
'http://fake.url', self.op_cloud.get_session_endpoint('image'))
@mock.patch.object(cloud_config.CloudConfig, 'get_session')
@mock.patch.object(cloud_region.CloudRegion, 'get_session')
def test_get_session_endpoint_session(self, get_session_mock):
session_mock = mock.Mock()
session_mock.get_endpoint.return_value = 'http://fake.url'
@ -39,7 +39,7 @@ class TestOperatorCloud(base.RequestsMockTestCase):
self.assertEqual(
'http://fake.url', self.op_cloud.get_session_endpoint('image'))
@mock.patch.object(cloud_config.CloudConfig, 'get_session')
@mock.patch.object(cloud_region.CloudRegion, 'get_session')
def test_get_session_endpoint_exception(self, get_session_mock):
class FakeException(Exception):
pass
@ -57,7 +57,7 @@ class TestOperatorCloud(base.RequestsMockTestCase):
" No service"):
self.op_cloud.get_session_endpoint("image")
@mock.patch.object(cloud_config.CloudConfig, 'get_session')
@mock.patch.object(cloud_region.CloudRegion, 'get_session')
def test_get_session_endpoint_unavailable(self, get_session_mock):
session_mock = mock.Mock()
session_mock.get_endpoint.return_value = None
@ -65,7 +65,7 @@ class TestOperatorCloud(base.RequestsMockTestCase):
image_endpoint = self.op_cloud.get_session_endpoint("image")
self.assertIsNone(image_endpoint)
@mock.patch.object(cloud_config.CloudConfig, 'get_session')
@mock.patch.object(cloud_region.CloudRegion, 'get_session')
def test_get_session_endpoint_identity(self, get_session_mock):
session_mock = mock.Mock()
get_session_mock.return_value = session_mock
@ -76,14 +76,14 @@ class TestOperatorCloud(base.RequestsMockTestCase):
session_mock.get_endpoint.assert_called_with(**kwargs)
@mock.patch.object(cloud_config.CloudConfig, 'get_session')
@mock.patch.object(cloud_region.CloudRegion, 'get_session')
def test_has_service_no(self, get_session_mock):
session_mock = mock.Mock()
session_mock.get_endpoint.return_value = None
get_session_mock.return_value = session_mock
self.assertFalse(self.op_cloud.has_service("image"))
@mock.patch.object(cloud_config.CloudConfig, 'get_session')
@mock.patch.object(cloud_region.CloudRegion, 'get_session')
def test_has_service_yes(self, get_session_mock):
session_mock = mock.Mock()
session_mock.get_endpoint.return_value = 'http://fake.url'

View File

@ -20,7 +20,7 @@ import copy
import os
import tempfile
from openstack.config import cloud_config
from openstack.config import cloud_region
import extras
import fixtures
@ -227,7 +227,7 @@ class TestCase(base.BaseTestCase):
self.useFixture(fixtures.EnvironmentVariable(env))
def _assert_cloud_details(self, cc):
self.assertIsInstance(cc, cloud_config.CloudConfig)
self.assertIsInstance(cc, cloud_region.CloudRegion)
self.assertTrue(extras.safe_hasattr(cc, 'auth'))
self.assertIsInstance(cc.auth, dict)
self.assertIsNone(cc.cloud)

View File

@ -16,7 +16,7 @@ from keystoneauth1 import exceptions as ksa_exceptions
from keystoneauth1 import session as ksa_session
import mock
from openstack.config import cloud_config
from openstack.config import cloud_region
from openstack.config import defaults
from openstack.config import exceptions
from openstack.tests.unit.config import base
@ -37,10 +37,10 @@ fake_services_dict = {
}
class TestCloudConfig(base.TestCase):
class TestCloudRegion(base.TestCase):
def test_arbitrary_attributes(self):
cc = cloud_config.CloudConfig("test1", "region-al", fake_config_dict)
cc = cloud_region.CloudRegion("test1", "region-al", fake_config_dict)
self.assertEqual("test1", cc.name)
self.assertEqual("region-al", cc.region)
@ -61,25 +61,25 @@ class TestCloudConfig(base.TestCase):
self.assertFalse(cc.force_ipv4)
def test_iteration(self):
cc = cloud_config.CloudConfig("test1", "region-al", fake_config_dict)
cc = cloud_region.CloudRegion("test1", "region-al", fake_config_dict)
self.assertTrue('a' in cc)
self.assertFalse('x' in cc)
def test_equality(self):
cc1 = cloud_config.CloudConfig("test1", "region-al", fake_config_dict)
cc2 = cloud_config.CloudConfig("test1", "region-al", fake_config_dict)
cc1 = cloud_region.CloudRegion("test1", "region-al", fake_config_dict)
cc2 = cloud_region.CloudRegion("test1", "region-al", fake_config_dict)
self.assertEqual(cc1, cc2)
def test_inequality(self):
cc1 = cloud_config.CloudConfig("test1", "region-al", fake_config_dict)
cc1 = cloud_region.CloudRegion("test1", "region-al", fake_config_dict)
cc2 = cloud_config.CloudConfig("test2", "region-al", fake_config_dict)
cc2 = cloud_region.CloudRegion("test2", "region-al", fake_config_dict)
self.assertNotEqual(cc1, cc2)
cc2 = cloud_config.CloudConfig("test1", "region-xx", fake_config_dict)
cc2 = cloud_region.CloudRegion("test1", "region-xx", fake_config_dict)
self.assertNotEqual(cc1, cc2)
cc2 = cloud_config.CloudConfig("test1", "region-al", {})
cc2 = cloud_region.CloudRegion("test1", "region-al", {})
self.assertNotEqual(cc1, cc2)
def test_verify(self):
@ -87,12 +87,12 @@ class TestCloudConfig(base.TestCase):
config_dict['cacert'] = None
config_dict['verify'] = False
cc = cloud_config.CloudConfig("test1", "region-xx", config_dict)
cc = cloud_region.CloudRegion("test1", "region-xx", config_dict)
(verify, cert) = cc.get_requests_verify_args()
self.assertFalse(verify)
config_dict['verify'] = True
cc = cloud_config.CloudConfig("test1", "region-xx", config_dict)
cc = cloud_region.CloudRegion("test1", "region-xx", config_dict)
(verify, cert) = cc.get_requests_verify_args()
self.assertTrue(verify)
@ -101,12 +101,12 @@ class TestCloudConfig(base.TestCase):
config_dict['cacert'] = "certfile"
config_dict['verify'] = False
cc = cloud_config.CloudConfig("test1", "region-xx", config_dict)
cc = cloud_region.CloudRegion("test1", "region-xx", config_dict)
(verify, cert) = cc.get_requests_verify_args()
self.assertFalse(verify)
config_dict['verify'] = True
cc = cloud_config.CloudConfig("test1", "region-xx", config_dict)
cc = cloud_region.CloudRegion("test1", "region-xx", config_dict)
(verify, cert) = cc.get_requests_verify_args()
self.assertEqual("certfile", verify)
@ -118,17 +118,17 @@ class TestCloudConfig(base.TestCase):
config_dict['cert'] = 'cert'
config_dict['key'] = 'key'
cc = cloud_config.CloudConfig("test1", "region-xx", config_dict)
cc = cloud_region.CloudRegion("test1", "region-xx", config_dict)
(verify, cert) = cc.get_requests_verify_args()
self.assertEqual(("cert", "key"), cert)
def test_ipv6(self):
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", fake_config_dict, force_ipv4=True)
self.assertTrue(cc.force_ipv4)
def test_getters(self):
cc = cloud_config.CloudConfig("test1", "region-al", fake_services_dict)
cc = cloud_region.CloudRegion("test1", "region-al", fake_services_dict)
self.assertEqual(['compute', 'identity', 'image', 'volume'],
sorted(cc.get_services()))
@ -153,23 +153,23 @@ class TestCloudConfig(base.TestCase):
self.assertEqual('locks', cc.get_service_name('identity'))
def test_volume_override(self):
cc = cloud_config.CloudConfig("test1", "region-al", fake_services_dict)
cc = cloud_region.CloudRegion("test1", "region-al", fake_services_dict)
cc.config['volume_api_version'] = '2'
self.assertEqual('volumev2', cc.get_service_type('volume'))
def test_volume_override_v3(self):
cc = cloud_config.CloudConfig("test1", "region-al", fake_services_dict)
cc = cloud_region.CloudRegion("test1", "region-al", fake_services_dict)
cc.config['volume_api_version'] = '3'
self.assertEqual('volumev3', cc.get_service_type('volume'))
def test_workflow_override_v2(self):
cc = cloud_config.CloudConfig("test1", "region-al", fake_services_dict)
cc = cloud_region.CloudRegion("test1", "region-al", fake_services_dict)
cc.config['workflow_api_version'] = '2'
self.assertEqual('workflowv2', cc.get_service_type('workflow'))
def test_no_override(self):
"""Test no override happens when defaults are not configured"""
cc = cloud_config.CloudConfig("test1", "region-al", fake_services_dict)
cc = cloud_region.CloudRegion("test1", "region-al", fake_services_dict)
self.assertEqual('volume', cc.get_service_type('volume'))
self.assertEqual('workflow', cc.get_service_type('workflow'))
self.assertEqual('not-exist', cc.get_service_type('not-exist'))
@ -177,7 +177,7 @@ class TestCloudConfig(base.TestCase):
def test_get_session_no_auth(self):
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
cc = cloud_config.CloudConfig("test1", "region-al", config_dict)
cc = cloud_region.CloudRegion("test1", "region-al", config_dict)
self.assertRaises(
exceptions.OpenStackConfigException,
cc.get_session)
@ -189,7 +189,7 @@ class TestCloudConfig(base.TestCase):
fake_session = mock.Mock()
fake_session.additional_user_agent = []
mock_session.return_value = fake_session
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_session()
mock_session.assert_called_with(
@ -208,7 +208,7 @@ class TestCloudConfig(base.TestCase):
fake_session.app_name = None
fake_session.app_version = None
mock_session.return_value = fake_session
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock(),
app_name="test_app", app_version="test_version")
cc.get_session()
@ -229,7 +229,7 @@ class TestCloudConfig(base.TestCase):
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
config_dict['api_timeout'] = 9
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_session()
mock_session.assert_called_with(
@ -243,7 +243,7 @@ class TestCloudConfig(base.TestCase):
def test_override_session_endpoint_override(self, mock_session):
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
self.assertEqual(
cc.get_session_endpoint('compute'),
@ -253,19 +253,19 @@ class TestCloudConfig(base.TestCase):
def test_override_session_endpoint(self, mock_session):
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
self.assertEqual(
cc.get_session_endpoint('telemetry'),
fake_services_dict['telemetry_endpoint'])
@mock.patch.object(cloud_config.CloudConfig, 'get_session')
@mock.patch.object(cloud_region.CloudRegion, 'get_session')
def test_session_endpoint(self, mock_get_session):
mock_session = mock.Mock()
mock_get_session.return_value = mock_session
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_session_endpoint('orchestration')
mock_session.get_endpoint.assert_called_with(
@ -274,17 +274,17 @@ class TestCloudConfig(base.TestCase):
region_name='region-al',
service_type='orchestration')
@mock.patch.object(cloud_config.CloudConfig, 'get_session')
@mock.patch.object(cloud_region.CloudRegion, 'get_session')
def test_session_endpoint_not_found(self, mock_get_session):
exc_to_raise = ksa_exceptions.catalog.EndpointNotFound
mock_get_session.return_value.get_endpoint.side_effect = exc_to_raise
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", {}, auth_plugin=mock.Mock())
self.assertIsNone(cc.get_session_endpoint('notfound'))
@mock.patch.object(cloud_config.CloudConfig, 'get_api_version')
@mock.patch.object(cloud_config.CloudConfig, 'get_auth_args')
@mock.patch.object(cloud_config.CloudConfig, 'get_session_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_api_version')
@mock.patch.object(cloud_region.CloudRegion, 'get_auth_args')
@mock.patch.object(cloud_region.CloudRegion, 'get_session_endpoint')
def test_legacy_client_object_store_password(
self,
mock_get_session_endpoint,
@ -301,7 +301,7 @@ class TestCloudConfig(base.TestCase):
)
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('object-store', mock_client)
mock_client.assert_called_with(
@ -313,8 +313,8 @@ class TestCloudConfig(base.TestCase):
'endpoint_type': 'public',
})
@mock.patch.object(cloud_config.CloudConfig, 'get_auth_args')
@mock.patch.object(cloud_config.CloudConfig, 'get_session_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_auth_args')
@mock.patch.object(cloud_region.CloudRegion, 'get_session_endpoint')
def test_legacy_client_object_store_password_v2(
self, mock_get_session_endpoint, mock_get_auth_args):
mock_client = mock.Mock()
@ -327,7 +327,7 @@ class TestCloudConfig(base.TestCase):
)
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('object-store', mock_client)
mock_client.assert_called_with(
@ -339,8 +339,8 @@ class TestCloudConfig(base.TestCase):
'endpoint_type': 'public',
})
@mock.patch.object(cloud_config.CloudConfig, 'get_auth_args')
@mock.patch.object(cloud_config.CloudConfig, 'get_session_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_auth_args')
@mock.patch.object(cloud_region.CloudRegion, 'get_session_endpoint')
def test_legacy_client_object_store(
self, mock_get_session_endpoint, mock_get_auth_args):
mock_client = mock.Mock()
@ -348,7 +348,7 @@ class TestCloudConfig(base.TestCase):
mock_get_auth_args.return_value = {}
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('object-store', mock_client)
mock_client.assert_called_with(
@ -360,8 +360,8 @@ class TestCloudConfig(base.TestCase):
'endpoint_type': 'public',
})
@mock.patch.object(cloud_config.CloudConfig, 'get_auth_args')
@mock.patch.object(cloud_config.CloudConfig, 'get_session_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_auth_args')
@mock.patch.object(cloud_region.CloudRegion, 'get_session_endpoint')
def test_legacy_client_object_store_timeout(
self, mock_get_session_endpoint, mock_get_auth_args):
mock_client = mock.Mock()
@ -370,7 +370,7 @@ class TestCloudConfig(base.TestCase):
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
config_dict['api_timeout'] = 9
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('object-store', mock_client)
mock_client.assert_called_with(
@ -382,7 +382,7 @@ class TestCloudConfig(base.TestCase):
'endpoint_type': 'public',
})
@mock.patch.object(cloud_config.CloudConfig, 'get_auth_args')
@mock.patch.object(cloud_region.CloudRegion, 'get_auth_args')
def test_legacy_client_object_store_endpoint(
self, mock_get_auth_args):
mock_client = mock.Mock()
@ -390,7 +390,7 @@ class TestCloudConfig(base.TestCase):
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
config_dict['object_store_endpoint'] = 'http://example.com/swift'
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('object-store', mock_client)
mock_client.assert_called_with(
@ -402,13 +402,13 @@ class TestCloudConfig(base.TestCase):
'endpoint_type': 'public',
})
@mock.patch.object(cloud_config.CloudConfig, 'get_session_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_session_endpoint')
def test_legacy_client_image(self, mock_get_session_endpoint):
mock_client = mock.Mock()
mock_get_session_endpoint.return_value = 'http://example.com/v2'
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('image', mock_client)
mock_client.assert_called_with(
@ -422,14 +422,14 @@ class TestCloudConfig(base.TestCase):
service_type='mage'
)
@mock.patch.object(cloud_config.CloudConfig, 'get_session_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_session_endpoint')
def test_legacy_client_image_override(self, mock_get_session_endpoint):
mock_client = mock.Mock()
mock_get_session_endpoint.return_value = 'http://example.com/v2'
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
config_dict['image_endpoint_override'] = 'http://example.com/override'
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('image', mock_client)
mock_client.assert_called_with(
@ -443,7 +443,7 @@ class TestCloudConfig(base.TestCase):
service_type='mage'
)
@mock.patch.object(cloud_config.CloudConfig, 'get_session_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_session_endpoint')
def test_legacy_client_image_versioned(self, mock_get_session_endpoint):
mock_client = mock.Mock()
mock_get_session_endpoint.return_value = 'http://example.com/v2'
@ -451,7 +451,7 @@ class TestCloudConfig(base.TestCase):
config_dict.update(fake_services_dict)
# v2 endpoint was passed, 1 requested in config, endpoint wins
config_dict['image_api_version'] = '1'
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('image', mock_client)
mock_client.assert_called_with(
@ -465,7 +465,7 @@ class TestCloudConfig(base.TestCase):
service_type='mage'
)
@mock.patch.object(cloud_config.CloudConfig, 'get_session_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_session_endpoint')
def test_legacy_client_image_unversioned(self, mock_get_session_endpoint):
mock_client = mock.Mock()
mock_get_session_endpoint.return_value = 'http://example.com/'
@ -473,7 +473,7 @@ class TestCloudConfig(base.TestCase):
config_dict.update(fake_services_dict)
# Versionless endpoint, config wins
config_dict['image_api_version'] = '1'
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('image', mock_client)
mock_client.assert_called_with(
@ -487,7 +487,7 @@ class TestCloudConfig(base.TestCase):
service_type='mage'
)
@mock.patch.object(cloud_config.CloudConfig, 'get_session_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_session_endpoint')
def test_legacy_client_image_argument(self, mock_get_session_endpoint):
mock_client = mock.Mock()
mock_get_session_endpoint.return_value = 'http://example.com/v3'
@ -495,7 +495,7 @@ class TestCloudConfig(base.TestCase):
config_dict.update(fake_services_dict)
# Versionless endpoint, config wins
config_dict['image_api_version'] = '6'
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('image', mock_client, version='beef')
mock_client.assert_called_with(
@ -509,13 +509,13 @@ class TestCloudConfig(base.TestCase):
service_type='mage'
)
@mock.patch.object(cloud_config.CloudConfig, 'get_session_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_session_endpoint')
def test_legacy_client_network(self, mock_get_session_endpoint):
mock_client = mock.Mock()
mock_get_session_endpoint.return_value = 'http://example.com/v2'
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('network', mock_client)
mock_client.assert_called_with(
@ -527,13 +527,13 @@ class TestCloudConfig(base.TestCase):
session=mock.ANY,
service_name=None)
@mock.patch.object(cloud_config.CloudConfig, 'get_session_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_session_endpoint')
def test_legacy_client_compute(self, mock_get_session_endpoint):
mock_client = mock.Mock()
mock_get_session_endpoint.return_value = 'http://example.com/v2'
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('compute', mock_client)
mock_client.assert_called_with(
@ -545,13 +545,13 @@ class TestCloudConfig(base.TestCase):
session=mock.ANY,
service_name=None)
@mock.patch.object(cloud_config.CloudConfig, 'get_session_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_session_endpoint')
def test_legacy_client_identity(self, mock_get_session_endpoint):
mock_client = mock.Mock()
mock_get_session_endpoint.return_value = 'http://example.com/v2'
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('identity', mock_client)
mock_client.assert_called_with(
@ -564,14 +564,14 @@ class TestCloudConfig(base.TestCase):
session=mock.ANY,
service_name='locks')
@mock.patch.object(cloud_config.CloudConfig, 'get_session_endpoint')
@mock.patch.object(cloud_region.CloudRegion, 'get_session_endpoint')
def test_legacy_client_identity_v3(self, mock_get_session_endpoint):
mock_client = mock.Mock()
mock_get_session_endpoint.return_value = 'http://example.com'
config_dict = defaults.get_defaults()
config_dict.update(fake_services_dict)
config_dict['identity_api_version'] = '3'
cc = cloud_config.CloudConfig(
cc = cloud_region.CloudRegion(
"test1", "region-al", config_dict, auth_plugin=mock.Mock())
cc.get_legacy_client('identity', mock_client)
mock_client.assert_called_with(

View File

@ -22,7 +22,7 @@ import testtools
import yaml
from openstack import config
from openstack.config import cloud_config
from openstack.config import cloud_region
from openstack.config import defaults
from openstack.config import exceptions
from openstack.config import loader
@ -36,7 +36,21 @@ def prompt_for_password(prompt=None):
class TestConfig(base.TestCase):
def test_get_all(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml],
secure_files=[self.no_yaml])
clouds = c.get_all()
# We add one by hand because the regions cloud is going to exist
# twice since it has two regions in it
user_clouds = [
cloud for cloud in base.USER_CONF['clouds'].keys()
] + ['_test_cloud_regions']
configured_clouds = [cloud.name for cloud in clouds]
self.assertItemsEqual(user_clouds, configured_clouds)
def test_get_all_clouds(self):
# Ensure the alias is in place
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml],
secure_files=[self.no_yaml])
@ -49,14 +63,22 @@ class TestConfig(base.TestCase):
configured_clouds = [cloud.name for cloud in clouds]
self.assertItemsEqual(user_clouds, configured_clouds)
def test_get_one(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cloud = c.get_one(validate=False)
self.assertIsInstance(cloud, cloud_region.CloudRegion)
self.assertEqual(cloud.name, '')
def test_get_one_cloud(self):
# Ensure the alias is in place
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cloud = c.get_one_cloud(validate=False)
self.assertIsInstance(cloud, cloud_config.CloudConfig)
self.assertIsInstance(cloud, cloud_region.CloudRegion)
self.assertEqual(cloud.name, '')
def test_get_one_cloud_default_cloud_from_file(self):
def test_get_one_default_cloud_from_file(self):
single_conf = base._write_yaml({
'clouds': {
'single': {
@ -72,12 +94,12 @@ class TestConfig(base.TestCase):
})
c = config.OpenStackConfig(config_files=[single_conf],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud()
cc = c.get_one()
self.assertEqual(cc.name, 'single')
def test_get_one_cloud_auth_defaults(self):
def test_get_one_auth_defaults(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml])
cc = c.get_one_cloud(cloud='_test-cloud_', auth={'username': 'user'})
cc = c.get_one(cloud='_test-cloud_', auth={'username': 'user'})
self.assertEqual('user', cc.auth['username'])
self.assertEqual(
defaults._defaults['auth_type'],
@ -88,11 +110,11 @@ class TestConfig(base.TestCase):
cc.identity_api_version,
)
def test_get_one_cloud_auth_override_defaults(self):
def test_get_one_auth_override_defaults(self):
default_options = {'compute_api_version': '4'}
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
override_defaults=default_options)
cc = c.get_one_cloud(cloud='_test-cloud_', auth={'username': 'user'})
cc = c.get_one(cloud='_test-cloud_', auth={'username': 'user'})
self.assertEqual('user', cc.auth['username'])
self.assertEqual('4', cc.compute_api_version)
self.assertEqual(
@ -100,7 +122,7 @@ class TestConfig(base.TestCase):
cc.identity_api_version,
)
def test_get_one_cloud_with_config_files(self):
def test_get_one_with_config_files(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml],
secure_files=[self.secure_yaml])
@ -109,51 +131,51 @@ class TestConfig(base.TestCase):
self.assertIsInstance(c.cloud_config['cache'], dict)
self.assertIn('max_age', c.cloud_config['cache'])
self.assertIn('path', c.cloud_config['cache'])
cc = c.get_one_cloud('_test-cloud_')
cc = c.get_one('_test-cloud_')
self._assert_cloud_details(cc)
cc = c.get_one_cloud('_test_cloud_no_vendor')
cc = c.get_one('_test_cloud_no_vendor')
self._assert_cloud_details(cc)
def test_get_one_cloud_with_int_project_id(self):
def test_get_one_with_int_project_id(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud('_test-cloud-int-project_')
cc = c.get_one('_test-cloud-int-project_')
self.assertEqual('12345', cc.auth['project_id'])
def test_get_one_cloud_with_domain_id(self):
def test_get_one_with_domain_id(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud('_test-cloud-domain-id_')
cc = c.get_one('_test-cloud-domain-id_')
self.assertEqual('6789', cc.auth['user_domain_id'])
self.assertEqual('123456789', cc.auth['project_domain_id'])
self.assertNotIn('domain_id', cc.auth)
self.assertNotIn('domain-id', cc.auth)
self.assertNotIn('domain_id', cc)
def test_get_one_cloud_domain_scoped(self):
def test_get_one_domain_scoped(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud('_test-cloud-domain-scoped_')
cc = c.get_one('_test-cloud-domain-scoped_')
self.assertEqual('12345', cc.auth['domain_id'])
self.assertNotIn('user_domain_id', cc.auth)
self.assertNotIn('project_domain_id', cc.auth)
def test_get_one_cloud_infer_user_domain(self):
def test_get_one_infer_user_domain(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud('_test-cloud-int-project_')
cc = c.get_one('_test-cloud-int-project_')
self.assertEqual('awesome-domain', cc.auth['user_domain_id'])
self.assertEqual('awesome-domain', cc.auth['project_domain_id'])
self.assertNotIn('domain_id', cc.auth)
self.assertNotIn('domain_id', cc)
def test_get_one_cloud_with_hyphenated_project_id(self):
def test_get_one_with_hyphenated_project_id(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud('_test_cloud_hyphenated')
cc = c.get_one('_test_cloud_hyphenated')
self.assertEqual('12345', cc.auth['project_id'])
def test_get_one_cloud_with_hyphenated_kwargs(self):
def test_get_one_with_hyphenated_kwargs(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
args = {
@ -165,14 +187,14 @@ class TestConfig(base.TestCase):
},
'region_name': 'test-region',
}
cc = c.get_one_cloud(**args)
cc = c.get_one(**args)
self.assertEqual('http://example.com/v2', cc.auth['auth_url'])
def test_no_environ(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
self.assertRaises(
exceptions.OpenStackConfigException, c.get_one_cloud, 'envvars')
exceptions.OpenStackConfigException, c.get_one, 'envvars')
def test_fallthrough(self):
c = config.OpenStackConfig(config_files=[self.no_yaml],
@ -181,44 +203,44 @@ class TestConfig(base.TestCase):
for k in os.environ.keys():
if k.startswith('OS_'):
self.useFixture(fixtures.EnvironmentVariable(k))
c.get_one_cloud(cloud='defaults', validate=False)
c.get_one(cloud='defaults', validate=False)
def test_prefer_ipv6_true(self):
c = config.OpenStackConfig(config_files=[self.no_yaml],
vendor_files=[self.no_yaml],
secure_files=[self.no_yaml])
cc = c.get_one_cloud(cloud='defaults', validate=False)
cc = c.get_one(cloud='defaults', validate=False)
self.assertTrue(cc.prefer_ipv6)
def test_prefer_ipv6_false(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud(cloud='_test-cloud_')
cc = c.get_one(cloud='_test-cloud_')
self.assertFalse(cc.prefer_ipv6)
def test_force_ipv4_true(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud(cloud='_test-cloud_')
cc = c.get_one(cloud='_test-cloud_')
self.assertTrue(cc.force_ipv4)
def test_force_ipv4_false(self):
c = config.OpenStackConfig(config_files=[self.no_yaml],
vendor_files=[self.no_yaml],
secure_files=[self.no_yaml])
cc = c.get_one_cloud(cloud='defaults', validate=False)
cc = c.get_one(cloud='defaults', validate=False)
self.assertFalse(cc.force_ipv4)
def test_get_one_cloud_auth_merge(self):
def test_get_one_auth_merge(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml])
cc = c.get_one_cloud(cloud='_test-cloud_', auth={'username': 'user'})
cc = c.get_one(cloud='_test-cloud_', auth={'username': 'user'})
self.assertEqual('user', cc.auth['username'])
self.assertEqual('testpass', cc.auth['password'])
def test_get_one_cloud_networks(self):
def test_get_one_networks(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud('_test-cloud-networks_')
cc = c.get_one('_test-cloud-networks_')
self.assertEqual(
['a-public', 'another-public', 'split-default'],
cc.get_external_networks())
@ -235,10 +257,10 @@ class TestConfig(base.TestCase):
['a-public', 'another-public', 'split-default'],
cc.get_external_ipv6_networks())
def test_get_one_cloud_no_networks(self):
def test_get_one_no_networks(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud('_test-cloud-domain-scoped_')
cc = c.get_one('_test-cloud-domain-scoped_')
self.assertEqual([], cc.get_external_networks())
self.assertEqual([], cc.get_internal_networks())
self.assertIsNone(cc.get_nat_source())
@ -249,7 +271,7 @@ class TestConfig(base.TestCase):
c = config.OpenStackConfig(config_files=['nonexistent'],
vendor_files=['nonexistent'],
secure_files=[self.secure_yaml])
cc = c.get_one_cloud(cloud='_test_cloud_no_vendor', validate=False)
cc = c.get_one(cloud='_test_cloud_no_vendor', validate=False)
self.assertEqual('testpass', cc.auth['password'])
def test_get_cloud_names(self):
@ -273,7 +295,7 @@ class TestConfig(base.TestCase):
for k in os.environ.keys():
if k.startswith('OS_'):
self.useFixture(fixtures.EnvironmentVariable(k))
c.get_one_cloud(cloud='defaults', validate=False)
c.get_one(cloud='defaults', validate=False)
self.assertEqual(['defaults'], sorted(c.get_cloud_names()))
def test_set_one_cloud_creates_file(self):
@ -394,24 +416,24 @@ class TestConfigArgparse(base.TestCase):
self.options = argparse.Namespace(**self.args)
def test_get_one_cloud_bad_region_argparse(self):
def test_get_one_bad_region_argparse(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
self.assertRaises(
exceptions.OpenStackConfigException, c.get_one_cloud,
exceptions.OpenStackConfigException, c.get_one,
cloud='_test-cloud_', argparse=self.options)
def test_get_one_cloud_argparse(self):
def test_get_one_argparse(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud(
cc = c.get_one(
cloud='_test_cloud_regions', argparse=self.options, validate=False)
self.assertEqual(cc.region_name, 'region2')
self.assertEqual(cc.snack_type, 'cookie')
def test_get_one_cloud_precedence(self):
def test_get_one_precedence(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
@ -437,7 +459,7 @@ class TestConfigArgparse(base.TestCase):
)
options = argparse.Namespace(**args)
cc = c.get_one_cloud(
cc = c.get_one(
argparse=options, **kwargs)
self.assertEqual(cc.region_name, 'region2')
self.assertEqual(cc.auth['password'], 'authpass')
@ -479,7 +501,7 @@ class TestConfigArgparse(base.TestCase):
self.assertEqual(cc.auth['password'], 'argpass')
self.assertEqual(cc.snack_type, 'cookie')
def test_get_one_cloud_precedence_no_argparse(self):
def test_get_one_precedence_no_argparse(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
@ -495,30 +517,30 @@ class TestConfigArgparse(base.TestCase):
'arbitrary': 'value',
}
cc = c.get_one_cloud(**kwargs)
cc = c.get_one(**kwargs)
self.assertEqual(cc.region_name, 'kwarg_region')
self.assertEqual(cc.auth['password'], 'authpass')
self.assertIsNone(cc.password)
def test_get_one_cloud_just_argparse(self):
def test_get_one_just_argparse(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud(argparse=self.options, validate=False)
cc = c.get_one(argparse=self.options, validate=False)
self.assertIsNone(cc.cloud)
self.assertEqual(cc.region_name, 'region2')
self.assertEqual(cc.snack_type, 'cookie')
def test_get_one_cloud_just_kwargs(self):
def test_get_one_just_kwargs(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud(validate=False, **self.args)
cc = c.get_one(validate=False, **self.args)
self.assertIsNone(cc.cloud)
self.assertEqual(cc.region_name, 'region2')
self.assertEqual(cc.snack_type, 'cookie')
def test_get_one_cloud_dash_kwargs(self):
def test_get_one_dash_kwargs(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
@ -530,93 +552,93 @@ class TestConfigArgparse(base.TestCase):
'region_name': 'other-test-region',
'snack_type': 'cookie',
}
cc = c.get_one_cloud(**args)
cc = c.get_one(**args)
self.assertIsNone(cc.cloud)
self.assertEqual(cc.region_name, 'other-test-region')
self.assertEqual(cc.snack_type, 'cookie')
def test_get_one_cloud_no_argparse(self):
def test_get_one_no_argparse(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud(cloud='_test-cloud_', argparse=None)
cc = c.get_one(cloud='_test-cloud_', argparse=None)
self._assert_cloud_details(cc)
self.assertEqual(cc.region_name, 'test-region')
self.assertIsNone(cc.snack_type)
def test_get_one_cloud_no_argparse_regions(self):
def test_get_one_no_argparse_regions(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud(cloud='_test_cloud_regions', argparse=None)
cc = c.get_one(cloud='_test_cloud_regions', argparse=None)
self._assert_cloud_details(cc)
self.assertEqual(cc.region_name, 'region1')
self.assertIsNone(cc.snack_type)
def test_get_one_cloud_bad_region(self):
def test_get_one_bad_region(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
self.assertRaises(
exceptions.OpenStackConfigException,
c.get_one_cloud,
c.get_one,
cloud='_test_cloud_regions', region_name='bad')
def test_get_one_cloud_bad_region_no_regions(self):
def test_get_one_bad_region_no_regions(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
self.assertRaises(
exceptions.OpenStackConfigException,
c.get_one_cloud,
c.get_one,
cloud='_test-cloud_', region_name='bad_region')
def test_get_one_cloud_no_argparse_region2(self):
def test_get_one_no_argparse_region2(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud(
cc = c.get_one(
cloud='_test_cloud_regions', region_name='region2', argparse=None)
self._assert_cloud_details(cc)
self.assertEqual(cc.region_name, 'region2')
self.assertIsNone(cc.snack_type)
def test_get_one_cloud_network(self):
def test_get_one_network(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud(
cc = c.get_one(
cloud='_test_cloud_regions', region_name='region1', argparse=None)
self._assert_cloud_details(cc)
self.assertEqual(cc.region_name, 'region1')
self.assertEqual('region1-network', cc.config['external_network'])
def test_get_one_cloud_per_region_network(self):
def test_get_one_per_region_network(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud(
cc = c.get_one(
cloud='_test_cloud_regions', region_name='region2', argparse=None)
self._assert_cloud_details(cc)
self.assertEqual(cc.region_name, 'region2')
self.assertEqual('my-network', cc.config['external_network'])
def test_get_one_cloud_no_yaml_no_cloud(self):
def test_get_one_no_yaml_no_cloud(self):
c = config.OpenStackConfig(load_yaml_config=False)
self.assertRaises(
exceptions.OpenStackConfigException,
c.get_one_cloud,
c.get_one,
cloud='_test_cloud_regions', region_name='region2', argparse=None)
def test_get_one_cloud_no_yaml(self):
def test_get_one_no_yaml(self):
c = config.OpenStackConfig(load_yaml_config=False)
cc = c.get_one_cloud(
cc = c.get_one(
region_name='region2', argparse=None,
**base.USER_CONF['clouds']['_test_cloud_regions'])
# Not using assert_cloud_details because of cache settings which
# are not present without the file
self.assertIsInstance(cc, cloud_config.CloudConfig)
self.assertIsInstance(cc, cloud_region.CloudRegion)
self.assertTrue(extras.safe_hasattr(cc, 'auth'))
self.assertIsInstance(cc.auth, dict)
self.assertIsNone(cc.cloud)
@ -675,7 +697,7 @@ class TestConfigArgparse(base.TestCase):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud(
cc = c.get_one(
cloud='envvars', argparse=self.options, validate=False)
self.assertEqual(cc.auth['project_name'], 'project')
@ -688,7 +710,7 @@ class TestConfigArgparse(base.TestCase):
# novaclient will add this
parser.add_argument('--os-auth-token')
opts, _remain = parser.parse_known_args()
cc = c.get_one_cloud(
cc = c.get_one(
cloud='_test_cloud_regions', argparse=opts)
self.assertEqual(cc.config['auth_type'], 'password')
self.assertNotIn('token', cc.config['auth'])
@ -704,7 +726,7 @@ class TestConfigArgparse(base.TestCase):
opts, _remain = parser.parse_known_args(
['--os-auth-token', 'very-bad-things',
'--os-auth-type', 'token'])
cc = c.get_one_cloud(argparse=opts, validate=False)
cc = c.get_one(argparse=opts, validate=False)
self.assertEqual(cc.config['auth_type'], 'token')
self.assertEqual(cc.config['auth']['token'], 'very-bad-things')
@ -719,7 +741,7 @@ class TestConfigArgparse(base.TestCase):
'--os-auth-url', 'auth-url', '--os-project-name', 'project']
c.register_argparse_arguments(parser, argv=argv)
opts, _remain = parser.parse_known_args(argv)
cc = c.get_one_cloud(argparse=opts)
cc = c.get_one(argparse=opts)
self.assertEqual(cc.config['auth']['username'], 'user')
self.assertEqual(cc.config['auth']['password'], 'pass')
self.assertEqual(cc.config['auth']['auth_url'], 'auth-url')
@ -800,7 +822,7 @@ class TestConfigArgparse(base.TestCase):
self.assertEqual(opts.http_timeout, '20')
with testtools.ExpectedException(AttributeError):
opts.os_network_service_type
cloud = c.get_one_cloud(argparse=opts, validate=False)
cloud = c.get_one(argparse=opts, validate=False)
self.assertEqual(cloud.config['service_type'], 'network')
self.assertEqual(cloud.config['interface'], 'admin')
self.assertEqual(cloud.config['api_timeout'], '20')
@ -821,7 +843,7 @@ class TestConfigArgparse(base.TestCase):
self.assertIsNone(opts.os_network_service_type)
self.assertIsNone(opts.os_network_api_version)
self.assertEqual(opts.network_api_version, '4')
cloud = c.get_one_cloud(argparse=opts, validate=False)
cloud = c.get_one(argparse=opts, validate=False)
self.assertEqual(cloud.config['service_type'], 'network')
self.assertEqual(cloud.config['interface'], 'admin')
self.assertEqual(cloud.config['network_api_version'], '4')
@ -848,7 +870,7 @@ class TestConfigArgparse(base.TestCase):
self.assertEqual(opts.os_endpoint_type, 'admin')
self.assertIsNone(opts.os_network_api_version)
self.assertEqual(opts.network_api_version, '4')
cloud = c.get_one_cloud(argparse=opts, validate=False)
cloud = c.get_one(argparse=opts, validate=False)
self.assertEqual(cloud.config['service_type'], 'compute')
self.assertEqual(cloud.config['network_service_type'], 'badtype')
self.assertEqual(cloud.config['interface'], 'admin')
@ -872,7 +894,7 @@ class TestConfigPrompt(base.TestCase):
self.options = argparse.Namespace(**self.args)
def test_get_one_cloud_prompt(self):
def test_get_one_prompt(self):
c = config.OpenStackConfig(
config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml],
@ -882,7 +904,7 @@ class TestConfigPrompt(base.TestCase):
# This needs a cloud definition without a password.
# If this starts failing unexpectedly check that the cloud_yaml
# and/or vendor_yaml do not have a password in the selected cloud.
cc = c.get_one_cloud(
cc = c.get_one(
cloud='_test_cloud_no_vendor',
argparse=self.options,
)
@ -904,7 +926,7 @@ class TestConfigDefault(base.TestCase):
def test_set_no_default(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud(cloud='_test-cloud_', argparse=None)
cc = c.get_one(cloud='_test-cloud_', argparse=None)
self._assert_cloud_details(cc)
self.assertEqual('password', cc.auth_type)
@ -912,7 +934,7 @@ class TestConfigDefault(base.TestCase):
loader.set_default('identity_api_version', '4')
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud(cloud='_test-cloud_', argparse=None)
cc = c.get_one(cloud='_test-cloud_', argparse=None)
self.assertEqual('4', cc.identity_api_version)

View File

@ -14,7 +14,7 @@
from openstack import config
from openstack.config import cloud_config
from openstack.config import cloud_region
from openstack.config import exceptions
from openstack.tests.unit.config import base
@ -36,23 +36,23 @@ class TestEnviron(base.TestCase):
self.useFixture(
fixtures.EnvironmentVariable('NOVA_PROJECT_ID', 'testnova'))
def test_get_one_cloud(self):
def test_get_one(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
self.assertIsInstance(c.get_one_cloud(), cloud_config.CloudConfig)
self.assertIsInstance(c.get_one(), cloud_region.CloudRegion)
def test_no_fallthrough(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
self.assertRaises(
exceptions.OpenStackConfigException, c.get_one_cloud, 'openstack')
exceptions.OpenStackConfigException, c.get_one, 'openstack')
def test_envvar_name_override(self):
self.useFixture(
fixtures.EnvironmentVariable('OS_CLOUD_NAME', 'override'))
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud('override')
cc = c.get_one('override')
self._assert_cloud_details(cc)
def test_envvar_prefer_ipv6_override(self):
@ -61,22 +61,22 @@ class TestEnviron(base.TestCase):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml],
secure_files=[self.secure_yaml])
cc = c.get_one_cloud('_test-cloud_')
cc = c.get_one('_test-cloud_')
self.assertFalse(cc.prefer_ipv6)
def test_environ_exists(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml],
secure_files=[self.secure_yaml])
cc = c.get_one_cloud('envvars')
cc = c.get_one('envvars')
self._assert_cloud_details(cc)
self.assertNotIn('auth_url', cc.config)
self.assertIn('auth_url', cc.config['auth'])
self.assertNotIn('project_id', cc.config['auth'])
self.assertNotIn('auth_url', cc.config)
cc = c.get_one_cloud('_test-cloud_')
cc = c.get_one('_test-cloud_')
self._assert_cloud_details(cc)
cc = c.get_one_cloud('_test_cloud_no_vendor')
cc = c.get_one('_test_cloud_no_vendor')
self._assert_cloud_details(cc)
def test_environ_prefix(self):
@ -84,18 +84,18 @@ class TestEnviron(base.TestCase):
vendor_files=[self.vendor_yaml],
envvar_prefix='NOVA_',
secure_files=[self.secure_yaml])
cc = c.get_one_cloud('envvars')
cc = c.get_one('envvars')
self._assert_cloud_details(cc)
self.assertNotIn('auth_url', cc.config)
self.assertIn('auth_url', cc.config['auth'])
self.assertIn('project_id', cc.config['auth'])
self.assertNotIn('auth_url', cc.config)
cc = c.get_one_cloud('_test-cloud_')
cc = c.get_one('_test-cloud_')
self._assert_cloud_details(cc)
cc = c.get_one_cloud('_test_cloud_no_vendor')
cc = c.get_one('_test_cloud_no_vendor')
self._assert_cloud_details(cc)
def test_get_one_cloud_with_config_files(self):
def test_get_one_with_config_files(self):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml],
secure_files=[self.secure_yaml])
@ -104,9 +104,9 @@ class TestEnviron(base.TestCase):
self.assertIsInstance(c.cloud_config['cache'], dict)
self.assertIn('max_age', c.cloud_config['cache'])
self.assertIn('path', c.cloud_config['cache'])
cc = c.get_one_cloud('_test-cloud_')
cc = c.get_one('_test-cloud_')
self._assert_cloud_details(cc)
cc = c.get_one_cloud('_test_cloud_no_vendor')
cc = c.get_one('_test_cloud_no_vendor')
self._assert_cloud_details(cc)
def test_config_file_override(self):
@ -115,7 +115,7 @@ class TestEnviron(base.TestCase):
'OS_CLIENT_CONFIG_FILE', self.cloud_yaml))
c = config.OpenStackConfig(config_files=[],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud('_test-cloud_')
cc = c.get_one('_test-cloud_')
self._assert_cloud_details(cc)
@ -127,7 +127,7 @@ class TestEnvvars(base.TestCase):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
self.assertRaises(
exceptions.OpenStackConfigException, c.get_one_cloud, 'envvars')
exceptions.OpenStackConfigException, c.get_one, 'envvars')
def test_test_envvars(self):
self.useFixture(
@ -137,7 +137,7 @@ class TestEnvvars(base.TestCase):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
self.assertRaises(
exceptions.OpenStackConfigException, c.get_one_cloud, 'envvars')
exceptions.OpenStackConfigException, c.get_one, 'envvars')
def test_incomplete_envvars(self):
self.useFixture(
@ -150,7 +150,7 @@ class TestEnvvars(base.TestCase):
# commenting it out in this patch to keep the patch size reasonable
# self.assertRaises(
# keystoneauth1.exceptions.auth_plugins.MissingRequiredOptions,
# c.get_one_cloud, 'envvars')
# c.get_one, 'envvars')
def test_have_envvars(self):
self.useFixture(
@ -165,7 +165,7 @@ class TestEnvvars(base.TestCase):
fixtures.EnvironmentVariable('OS_PROJECT_NAME', 'project'))
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml])
cc = c.get_one_cloud('envvars')
cc = c.get_one('envvars')
self.assertEqual(cc.config['auth']['username'], 'user')
def test_old_envvars(self):
@ -181,5 +181,5 @@ class TestEnvvars(base.TestCase):
c = config.OpenStackConfig(config_files=[self.cloud_yaml],
vendor_files=[self.vendor_yaml],
envvar_prefix='NOVA_')
cc = c.get_one_cloud('envvars')
cc = c.get_one('envvars')
self.assertEqual(cc.config['auth']['username'], 'nova')

View File

@ -18,18 +18,18 @@ from openstack.tests.unit.config import base
class TestInit(base.TestCase):
def test_get_config_without_arg_parser(self):
cloud_config = openstack.config.get_config(
cloud_region = openstack.config.get_config(
options=None, validate=False)
self.assertIsInstance(
cloud_config,
openstack.config.cloud_config.CloudConfig
cloud_region,
openstack.config.cloud_region.CloudRegion
)
def test_get_config_with_arg_parser(self):
cloud_config = openstack.config.get_config(
cloud_region = openstack.config.get_config(
options=argparse.ArgumentParser(),
validate=False)
self.assertIsInstance(
cloud_config,
openstack.config.cloud_config.CloudConfig
cloud_region,
openstack.config.cloud_region.CloudRegion
)

View File

@ -107,10 +107,10 @@ class TestConnection(base.RequestsMockTestCase):
self.assertEqual('openstack.workflow.v2._proxy',
conn.workflow.__class__.__module__)
def test_from_config_given_data(self):
data = openstack.config.OpenStackConfig().get_one_cloud("sample")
def test_from_config_given_config(self):
cloud_region = openstack.config.OpenStackConfig().get_one("sample")
sot = connection.from_config(cloud_config=data)
sot = connection.from_config(config=cloud_region)
self.assertEqual(CONFIG_USERNAME,
sot.config.config['auth']['username'])
@ -121,7 +121,7 @@ class TestConnection(base.RequestsMockTestCase):
self.assertEqual(CONFIG_PROJECT,
sot.config.config['auth']['project_name'])
def test_from_config_given_name(self):
def test_from_config_given_cloud_name(self):
sot = connection.from_config(cloud_name="sample")
self.assertEqual(CONFIG_USERNAME,
@ -133,21 +133,47 @@ class TestConnection(base.RequestsMockTestCase):
self.assertEqual(CONFIG_PROJECT,
sot.config.config['auth']['project_name'])
def test_from_config_given_cloud_config(self):
cloud_region = openstack.config.OpenStackConfig().get_one("sample")
sot = connection.from_config(cloud_config=cloud_region)
self.assertEqual(CONFIG_USERNAME,
sot.config.config['auth']['username'])
self.assertEqual(CONFIG_PASSWORD,
sot.config.config['auth']['password'])
self.assertEqual(CONFIG_AUTH_URL,
sot.config.config['auth']['auth_url'])
self.assertEqual(CONFIG_PROJECT,
sot.config.config['auth']['project_name'])
def test_from_config_given_cloud(self):
sot = connection.from_config(cloud="sample")
self.assertEqual(CONFIG_USERNAME,
sot.config.config['auth']['username'])
self.assertEqual(CONFIG_PASSWORD,
sot.config.config['auth']['password'])
self.assertEqual(CONFIG_AUTH_URL,
sot.config.config['auth']['auth_url'])
self.assertEqual(CONFIG_PROJECT,
sot.config.config['auth']['project_name'])
def test_from_config_given_options(self):
version = "100"
class Opts(object):
compute_api_version = version
sot = connection.from_config(cloud_name="sample", options=Opts)
sot = connection.from_config(cloud="sample", options=Opts)
self.assertEqual(version, sot.compute.version)
def test_from_config_verify(self):
sot = connection.from_config(cloud_name="insecure")
sot = connection.from_config(cloud="insecure")
self.assertFalse(sot.session.verify)
sot = connection.from_config(cloud_name="cacert")
sot = connection.from_config(cloud="cacert")
self.assertEqual(CONFIG_CACERT, sot.session.verify)

View File

@ -1,7 +1,7 @@
---
upgrade:
- The Profile object has been replaced with the use of
CloudConfig objects from openstack.config.
CloudRegion objects from openstack.config.
- The openstacksdk specific Session object has been removed.
- Proxy objects are now subclasses of
keystoneauth1.adapter.Adapter.