Adds Senlin support to openstacksdk

Adds Senlin API to openstacksdk, along with unit and functional tests
for each Senlin function. Allows openstacksdk to create, list, get,
update and delete clusters, cluster policies, cluster profiles, and
cluster receivers. Also allows for attaching and detaching policies
to clusters, updating policies on clusters, and listing policies on clusters.

Change-Id: I7e80e8ba74bdb415c2359f5c9672aa900f441fba
This commit is contained in:
Bailey Miller 2018-02-09 01:51:59 +00:00 committed by Monty Taylor
parent 07d3828860
commit 4f92423f87
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
10 changed files with 2750 additions and 1 deletions

View File

@ -199,6 +199,25 @@
OPENSTACKSDK_HAS_SWIFT: 0
OPENSTACKSDK_HAS_MAGNUM: 1
- job:
name: openstacksdk-functional-devstack-senlin
parent: openstacksdk-functional-devstack
description: |
Run shade functional tests against a master devstack with senlin
required-projects:
- openstack/senlin
vars:
devstack_plugins:
senlin: https://git.openstack.org/openstack/senlin
devstack_services:
s-account: false
s-container: false
s-object: false
s-proxy: false
tox_environment:
OPENSTACKSDK_HAS_SWIFT: 0
OPENSTACKSDK_HAS_SENLIN: 1
- job:
name: openstacksdk-ansible-functional-devstack
parent: openstacksdk-functional-devstack
@ -289,6 +308,7 @@
- openstacksdk-ansible-stable-2.6-functional-devstack:
voting: false
- openstacksdk-functional-devstack
- openstacksdk-functional-devstack-senlin
- openstacksdk-functional-devstack-magnum:
voting: false
- openstacksdk-functional-devstack-python3
@ -304,6 +324,7 @@
sphinx_python: python3
- openstacksdk-functional-devstack
- openstacksdk-functional-devstack-python3
- openstacksdk-functional-devstack-senlin
- neutron-grenade
- openstack-tox-lower-constraints
- nodepool-functional-py35-src

543
openstack/cloud/openstackcloud.py Normal file → Executable file
View File

@ -534,6 +534,14 @@ class OpenStackCloud(_normalize.Normalizer):
'container-infra')
return self._raw_clients['container-infra']
@property
def _clustering_client(self):
if 'clustering' not in self._raw_clients:
clustering_client = self._get_versioned_client(
'clustering', min_version=1, max_version='1.latest')
self._raw_clients['clustering'] = clustering_client
return self._raw_clients['clustering']
@property
def _database_client(self):
if 'database' not in self._raw_clients:
@ -11386,3 +11394,538 @@ class OpenStackCloud(_normalize.Normalizer):
data = self._container_infra_client.get('/mservices')
return self._normalize_magnum_services(
self._get_and_munchify('mservices', data))
def create_cluster(self, name, profile, config=None, desired_capacity=0,
max_size=None, metadata=None, min_size=None,
timeout=None):
profile = self.get_cluster_profile(profile)
profile_id = profile['id']
body = {
'desired_capacity': desired_capacity,
'name': name,
'profile_id': profile_id
}
if config is not None:
body['config'] = config
if max_size is not None:
body['max_size'] = max_size
if metadata is not None:
body['metadata'] = metadata
if min_size is not None:
body['min_size'] = min_size
if timeout is not None:
body['timeout'] = timeout
data = self._clustering_client.post(
'/clusters', json={'cluster': body},
error_message="Error creating cluster {name}".format(name=name))
return self._get_and_munchify(key=None, data=data)
def set_cluster_metadata(self, name_or_id, metadata):
cluster = self.get_cluster(name_or_id)
if not cluster:
raise exc.OpenStackCloudException(
'Invalid Cluster {cluster}'.format(cluster=name_or_id))
self._clustering_client.post(
'/clusters/{cluster_id}/metadata'.format(cluster_id=cluster['id']),
json={'metadata': metadata},
error_message='Error updating cluster metadata')
def get_cluster_by_id(self, cluster_id):
try:
data = self._clustering_client.get(
"/clusters/{cluster_id}".format(cluster_id=cluster_id),
error_message="Error fetching cluster {name}".format(
name=cluster_id))
return self._get_and_munchify('cluster', data)
except Exception:
return None
def get_cluster(self, name_or_id, filters=None):
return _utils._get_entity(
cloud=self, resource='cluster',
name_or_id=name_or_id, filters=filters)
def update_cluster(self, name_or_id, new_name=None,
profile_name_or_id=None, config=None, metadata=None,
timeout=None, profile_only=False):
old_cluster = self.get_cluster(name_or_id)
if old_cluster is None:
raise exc.OpenStackCloudException(
'Invalid Cluster {cluster}'.format(cluster=name_or_id))
cluster = {
'profile_only': profile_only
}
if config is not None:
cluster['config'] = config
if metadata is not None:
cluster['metadata'] = metadata
if profile_name_or_id is not None:
profile = self.get_cluster_profile(profile_name_or_id)
if profile is None:
raise exc.OpenStackCloudException(
'Invalid Cluster Profile {profile}'.format(
profile=profile_name_or_id))
cluster['profile_id'] = profile.id
if timeout is not None:
cluster['timeout'] = timeout
if new_name is not None:
cluster['name'] = new_name
data = self._clustering_client.patch(
"/clusters/{cluster_id}".format(cluster_id=old_cluster['id']),
json={'cluster': cluster},
error_message="Error updating cluster "
"{name}".format(name=name_or_id))
return self._get_and_munchify(key=None, data=data)
def delete_cluster(self, name_or_id):
cluster = self.get_cluster(name_or_id)
if cluster is None:
self.log.debug("Cluster %s not found for deleting", name_or_id)
return False
for policy in self.list_policies_on_cluster(name_or_id):
detach_policy = self.get_cluster_policy_by_id(
policy['policy_id'])
self.detach_policy_from_cluster(cluster, detach_policy)
for receiver in self.list_cluster_receivers():
if cluster["id"] == receiver["cluster_id"]:
self.delete_cluster_receiver(receiver["id"], wait=True)
self._clustering_client.delete(
"/clusters/{cluster_id}".format(cluster_id=name_or_id),
error_message="Error deleting cluster {name}".format(
name=name_or_id))
return True
def search_clusters(self, name_or_id=None, filters=None):
clusters = self.list_clusters()
return _utils._filter_list(clusters, name_or_id, filters)
def list_clusters(self):
try:
data = self._clustering_client.get(
'/clusters',
error_message="Error fetching clusters")
return self._get_and_munchify('clusters', data)
except exc.OpenStackCloudURINotFound as e:
self.log.debug(str(e), exc_info=True)
return []
def attach_policy_to_cluster(self, name_or_id, policy_name_or_id,
is_enabled):
cluster = self.get_cluster(name_or_id)
policy = self.get_cluster_policy(policy_name_or_id)
if cluster is None:
raise exc.OpenStackCloudException(
'Cluster {cluster} not found for attaching'.format(
cluster=name_or_id))
if policy is None:
raise exc.OpenStackCloudException(
'Policy {policy} not found for attaching'.format(
policy=policy_name_or_id))
body = {
'policy_id': policy['id'],
'enabled': is_enabled
}
self._clustering_client.post(
"/clusters/{cluster_id}/actions".format(cluster_id=cluster['id']),
error_message="Error attaching policy {policy} to cluster "
"{cluster}".format(
policy=policy['id'],
cluster=cluster['id']),
json={'policy_attach': body})
return True
def detach_policy_from_cluster(
self, name_or_id, policy_name_or_id, wait=False, timeout=3600):
cluster = self.get_cluster(name_or_id)
policy = self.get_cluster_policy(policy_name_or_id)
if cluster is None:
raise exc.OpenStackCloudException(
'Cluster {cluster} not found for detaching'.format(
cluster=name_or_id))
if policy is None:
raise exc.OpenStackCloudException(
'Policy {policy} not found for detaching'.format(
policy=policy_name_or_id))
body = {'policy_id': policy['id']}
self._clustering_client.post(
"/clusters/{cluster_id}/actions".format(cluster_id=cluster['id']),
error_message="Error detaching policy {policy} from cluster "
"{cluster}".format(
policy=policy['id'],
cluster=cluster['id']),
json={'policy_detach': body})
if not wait:
return True
value = []
for count in _utils._iterate_timeout(
timeout, "Timeout waiting for cluster policy to detach"):
# TODO(bjjohnson) This logic will wait until there are no policies.
# Since we're detaching a specific policy, checking to make sure
# that policy is not in the list of policies would be better.
policy_status = self.get_cluster_by_id(cluster['id'])['policies']
if policy_status == value:
break
return True
def update_policy_on_cluster(self, name_or_id, policy_name_or_id,
is_enabled):
cluster = self.get_cluster(name_or_id)
policy = self.get_cluster_policy(policy_name_or_id)
if cluster is None:
raise exc.OpenStackCloudException(
'Cluster {cluster} not found for updating'.format(
cluster=name_or_id))
if policy is None:
raise exc.OpenStackCloudException(
'Policy {policy} not found for updating'.format(
policy=policy_name_or_id))
body = {
'policy_id': policy['id'],
'enabled': is_enabled
}
self._clustering_client.post(
"/clusters/{cluster_id}/actions".format(cluster_id=cluster['id']),
error_message="Error updating policy {policy} on cluster "
"{cluster}".format(
policy=policy['id'],
cluster=cluster['id']),
json={'policy_update': body})
return True
def get_policy_on_cluster(self, name_or_id, policy_name_or_id):
try:
policy = self._clustering_client.get(
"/clusters/{cluster_id}/policies/{policy_id}".format(
cluster_id=name_or_id, policy_id=policy_name_or_id),
error_message="Error fetching policy "
"{name}".format(name=policy_name_or_id))
return self._get_and_munchify('cluster_policy', policy)
except Exception:
return False
def list_policies_on_cluster(self, name_or_id):
endpoint = "/clusters/{cluster_id}/policies".format(
cluster_id=name_or_id)
try:
data = self._clustering_client.get(
endpoint,
error_message="Error fetching cluster policies")
except exc.OpenStackCloudURINotFound as e:
self.log.debug(str(e), exc_info=True)
return []
return self._get_and_munchify('cluster_policies', data)
def create_cluster_profile(self, name, spec, metadata=None):
profile = {
'name': name,
'spec': spec
}
if metadata is not None:
profile['metadata'] = metadata
data = self._clustering_client.post(
'/profiles', json={'profile': profile},
error_message="Error creating profile {name}".format(name=name))
return self._get_and_munchify('profile', data)
def set_cluster_profile_metadata(self, name_or_id, metadata):
profile = self.get_profile(name_or_id)
if not profile:
raise exc.OpenStackCloudException(
'Invalid Profile {profile}'.format(profile=name_or_id))
self._clustering_client.post(
'/profiles/{profile_id}/metadata'.format(profile_id=profile['id']),
json={'metadata': metadata},
error_message='Error updating profile metadata')
def search_cluster_profiles(self, name_or_id=None, filters=None):
cluster_profiles = self.list_cluster_profiles()
return _utils._filter_list(cluster_profiles, name_or_id, filters)
def list_cluster_profiles(self):
try:
data = self._clustering_client.get(
'/profiles',
error_message="Error fetching profiles")
except exc.OpenStackCloudURINotFound as e:
self.log.debug(str(e), exc_info=True)
return []
return self._get_and_munchify('profiles', data)
def get_cluster_profile_by_id(self, profile_id):
try:
data = self._clustering_client.get(
"/profiles/{profile_id}".format(profile_id=profile_id),
error_message="Error fetching profile {name}".format(
name=profile_id))
return self._get_and_munchify('profile', data)
except exc.OpenStackCloudURINotFound as e:
self.log.debug(str(e), exc_info=True)
return None
def get_cluster_profile(self, name_or_id, filters=None):
return _utils._get_entity(self, 'cluster_profile', name_or_id, filters)
def delete_cluster_profile(self, name_or_id):
profile = self.get_cluster_profile(name_or_id)
if profile is None:
self.log.debug("Profile %s not found for deleting", name_or_id)
return False
for cluster in self.list_clusters():
if (name_or_id, profile.id) in cluster.items():
self.log.debug(
"Profile %s is being used by cluster %s, won't delete",
name_or_id, cluster.name)
return False
self._clustering_client.delete(
"/profiles/{profile_id}".format(profile_id=profile['id']),
error_message="Error deleting profile "
"{name}".format(name=name_or_id))
return True
def update_cluster_profile(self, name_or_id, metadata=None, new_name=None):
old_profile = self.get_profile(name_or_id)
if not old_profile:
raise exc.OpenStackCloudException(
'Invalid Profile {profile}'.format(profile=name_or_id))
profile = {}
if metadata is not None:
profile['metadata'] = metadata
if new_name is not None:
profile['name'] = new_name
data = self._clustering_client.patch(
"/profiles/{profile_id}".format(profile_id=old_profile.id),
json={'profile': profile},
error_message="Error updating profile {name}".format(
name=name_or_id))
return self._get_and_munchify(key=None, data=data)
def create_cluster_policy(self, name, spec):
policy = {
'name': name,
'spec': spec
}
data = self._clustering_client.post(
'/policies', json={'policy': policy},
error_message="Error creating policy {name}".format(
name=policy['name']))
return self._get_and_munchify('policy', data)
def search_cluster_policies(self, name_or_id=None, filters=None):
cluster_policies = self.list_cluster_policies()
return _utils._filter_list(cluster_policies, name_or_id, filters)
def list_cluster_policies(self):
endpoint = "/policies"
try:
data = self._clustering_client.get(
endpoint,
error_message="Error fetching cluster policies")
except exc.OpenStackCloudURINotFound as e:
self.log.debug(str(e), exc_info=True)
return []
return self._get_and_munchify('policies', data)
def get_cluster_policy_by_id(self, policy_id):
try:
data = self._clustering_client.get(
"/policies/{policy_id}".format(policy_id=policy_id),
error_message="Error fetching policy {name}".format(
name=policy_id))
return self._get_and_munchify('policy', data)
except exc.OpenStackCloudURINotFound as e:
self.log.debug(str(e), exc_info=True)
return None
def get_cluster_policy(self, name_or_id, filters=None):
return _utils._get_entity(
self, 'cluster_policy', name_or_id, filters)
def delete_cluster_policy(self, name_or_id):
policy = self.get_cluster_policy_by_id(name_or_id)
if policy is None:
self.log.debug("Policy %s not found for deleting", name_or_id)
return False
for cluster in self.list_clusters():
if (name_or_id, policy.id) in cluster.items():
self.log.debug(
"Policy %s is being used by cluster %s, won't delete",
name_or_id, cluster.name)
return False
self._clustering_client.delete(
"/policies/{policy_id}".format(policy_id=name_or_id),
error_message="Error deleting policy "
"{name}".format(name=name_or_id))
return True
def update_cluster_policy(self, name_or_id, new_name):
old_policy = self.get_policy(name_or_id)
if not old_policy:
raise exc.OpenStackCloudException(
'Invalid Policy {policy}'.format(policy=name_or_id))
policy = {'name': new_name}
data = self._clustering_client.patch(
"/policies/{policy_id}".format(policy_id=old_policy.id),
json={'policy': policy},
error_message="Error updating policy "
"{name}".format(name=name_or_id))
return self._get_and_munchify(key=None, data=data)
def create_cluster_receiver(self, name, receiver_type,
cluster_name_or_id=None, action=None,
actor=None, params=None):
cluster = self.get_cluster(cluster_name_or_id)
if cluster is None:
raise exc.OpenStackCloudException(
'Invalid cluster {cluster}'.format(cluster=cluster_name_or_id))
receiver = {
'name': name,
'type': receiver_type
}
if cluster_name_or_id is not None:
receiver['cluster_id'] = cluster.id
if action is not None:
receiver['action'] = action
if actor is not None:
receiver['actor'] = actor
if params is not None:
receiver['params'] = params
data = self._clustering_client.post(
'/receivers', json={'receiver': receiver},
error_message="Error creating receiver {name}".format(name=name))
return self._get_and_munchify('receiver', data)
def search_cluster_receivers(self, name_or_id=None, filters=None):
cluster_receivers = self.list_cluster_receivers()
return _utils._filter_list(cluster_receivers, name_or_id, filters)
def list_cluster_receivers(self):
try:
data = self._clustering_client.get(
'/receivers',
error_message="Error fetching receivers")
except exc.OpenStackCloudURINotFound as e:
self.log.debug(str(e), exc_info=True)
return []
return self._get_and_munchify('receivers', data)
def get_cluster_receiver_by_id(self, receiver_id):
try:
data = self._clustering_client.get(
"/receivers/{receiver_id}".format(receiver_id=receiver_id),
error_message="Error fetching receiver {name}".format(
name=receiver_id))
return self._get_and_munchify('receiver', data)
except exc.OpenStackCloudURINotFound as e:
self.log.debug(str(e), exc_info=True)
return None
def get_cluster_receiver(self, name_or_id, filters=None):
return _utils._get_entity(
self, 'cluster_receiver', name_or_id, filters)
def delete_cluster_receiver(self, name_or_id, wait=False, timeout=3600):
receiver = self.get_cluster_receiver(name_or_id)
if receiver is None:
self.log.debug("Receiver %s not found for deleting", name_or_id)
return False
receiver_id = receiver['id']
self._clustering_client.delete(
"/receivers/{receiver_id}".format(receiver_id=receiver_id),
error_message="Error deleting receiver {name}".format(
name=name_or_id))
if not wait:
return True
for count in _utils._iterate_timeout(
timeout, "Timeout waiting for cluster receiver to delete"):
receiver = self.get_cluster_receiver_by_id(receiver_id)
if not receiver:
break
return True
def update_cluster_receiver(self, name_or_id, new_name=None, action=None,
params=None):
old_receiver = self.get_cluster_receiver(name_or_id)
if old_receiver is None:
raise exc.OpenStackCloudException(
'Invalid receiver {receiver}'.format(receiver=name_or_id))
receiver = {}
if new_name is not None:
receiver['name'] = new_name
if action is not None:
receiver['action'] = action
if params is not None:
receiver['params'] = params
data = self._clustering_client.patch(
"/receivers/{receiver_id}".format(receiver_id=old_receiver.id),
json={'receiver': receiver},
error_message="Error updating receiver {name}".format(
name=name_or_id))
return self._get_and_munchify(key=None, data=data)

View File

@ -480,6 +480,12 @@ class TestCase(base.TestCase):
return dict(method='GET', uri="https://bare-metal.example.com/",
text=open(discovery_fixture, 'r').read())
def get_senlin_discovery_mock_dict(self):
discovery_fixture = os.path.join(
self.fixtures_directory, "clustering.json")
return dict(method='GET', uri="https://clustering.example.com/",
text=open(discovery_fixture, 'r').read())
def use_compute_discovery(
self, compute_version_json='compute-version.json',
compute_discovery_url='https://compute.example.com/v2.1/'):
@ -518,6 +524,15 @@ class TestCase(base.TestCase):
self.__do_register_uris([
self.get_ironic_discovery_mock_dict()])
def use_senlin(self):
# NOTE(elachance): This method is only meant to be used in "setUp"
# where the ordering of the url being registered is tightly controlled
# if the functionality of .use_senlin is meant to be used during an
# actual test case, use .get_senlin_discovery_mock and apply to the
# right location in the mock_uris when calling .register_uris
self.__do_register_uris([
self.get_senlin_discovery_mock_dict()])
def register_uris(self, uri_mock_list=None):
"""Mock a list of URIs and responses via requests mock.

View File

@ -126,6 +126,20 @@
],
"type": "dns",
"name": "designate"
},
{
"endpoints_links": [],
"endpoints": [
{
"adminURL": "https://clustering.example.com",
"region": "RegionOne",
"publicURL": "https://clustering.example.com",
"internalURL": "https://clustering.example.com",
"id": "4deb4d0504a044a395d4480741ba624z"
}
],
"type": "clustering",
"name": "senlin"
}
],
"user": {

View File

@ -148,8 +148,21 @@
"endpoints_links": [],
"name": "designate",
"type": "dns"
},
{
"endpoints": [
{
"id": "4deb4d0504a044a395d4480741ba624z",
"interface": "public",
"region": "RegionOne",
"url": "https://example.com/clustering"
}
],
"endpoint_links": [],
"name": "senlin",
"type": "clustering"
}
],
],
"expires_at": "9999-12-31T23:59:59Z",
"issued_at": "2016-12-17T14:25:05.000000Z",
"methods": [

View File

@ -148,6 +148,19 @@
"endpoints_links": [],
"name": "designate",
"type": "dns"
},
{
"endpoints": [
{
"id": "4deb4d0504a044a395d4480741ba624z",
"interface": "public",
"region": "RegionOne",
"url": "https://clustering.example.com"
}
],
"endpoints_links": [],
"name": "senlin",
"type": "clustering"
}
],
"expires_at": "9999-12-31T23:59:59Z",

View File

@ -0,0 +1,3 @@
---
features:
- Added support for senlin

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,27 @@
{
"versions": [
{
"id": "1.0",
"links": [
{
"href": "/v1/",
"rel": "self"
},
{
"href": "https://clustering.example.com/api-ref/clustering",
"rel": "help"
}
],
"max_version": "1.7",
"media-types": [
{
"base": "application/json",
"type": "application/vnd.openstack.clustering-v1+json"
}
],
"min_version": "1.0",
"status": "CURRENT",
"updated": "2016-01-18T00:00:00Z"
}
]
}

View File

@ -0,0 +1,658 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import testtools
import shade
from shade.tests.unit import base
CLUSTERING_DICT = {
'name': 'fake-name',
'profile_id': '1',
'desired_capacity': 1,
'config': 'fake-config',
'max_size': 1,
'min_size': 1,
'timeout': 100,
'metadata': {}
}
PROFILE_DICT = {
'name': 'fake-profile-name',
'spec': {},
'metadata': {}
}
POLICY_DICT = {
'name': 'fake-profile-name',
'spec': {},
}
RECEIVER_DICT = {
'action': 'FAKE_CLUSTER_ACTION',
'cluster_id': 'fake-cluster-id',
'name': 'fake-receiver-name',
'params': {},
'type': 'webhook'
}
NEW_CLUSTERING_DICT = copy.copy(CLUSTERING_DICT)
NEW_CLUSTERING_DICT['id'] = '1'
NEW_PROFILE_DICT = copy.copy(PROFILE_DICT)
NEW_PROFILE_DICT['id'] = '1'
NEW_POLICY_DICT = copy.copy(POLICY_DICT)
NEW_POLICY_DICT['id'] = '1'
NEW_RECEIVER_DICT = copy.copy(RECEIVER_DICT)
NEW_RECEIVER_DICT['id'] = '1'
class TestClustering(base.RequestsMockTestCase):
def assertAreInstances(self, elements, elem_type):
for e in elements:
self.assertIsInstance(e, elem_type)
def setUp(self):
super(TestClustering, self).setUp()
self.use_senlin()
def test_create_cluster(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles', '1']),
json={
"profiles": [NEW_PROFILE_DICT]}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles']),
json={
"profiles": [NEW_PROFILE_DICT]}),
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters']),
json=NEW_CLUSTERING_DICT)
])
profile = self.cloud.get_cluster_profile_by_id(NEW_PROFILE_DICT['id'])
c = self.cloud.create_cluster(
name=CLUSTERING_DICT['name'],
desired_capacity=CLUSTERING_DICT['desired_capacity'],
profile=profile,
config=CLUSTERING_DICT['config'],
max_size=CLUSTERING_DICT['max_size'],
min_size=CLUSTERING_DICT['min_size'],
metadata=CLUSTERING_DICT['metadata'],
timeout=CLUSTERING_DICT['timeout'])
self.assertEqual(NEW_CLUSTERING_DICT, c)
self.assert_calls()
def test_create_cluster_exception(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles', '1']),
json={
"profiles": [NEW_PROFILE_DICT]}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles']),
json={
"profiles": [NEW_PROFILE_DICT]}),
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters']),
status_code=500)
])
profile = self.cloud.get_cluster_profile_by_id(NEW_PROFILE_DICT['id'])
with testtools.ExpectedException(
shade.exc.OpenStackCloudHTTPError,
"Error creating cluster fake-name.*"):
self.cloud.create_cluster(name='fake-name', profile=profile)
self.assert_calls()
def test_get_cluster_by_id(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": NEW_CLUSTERING_DICT})
])
cluster = self.cloud.get_cluster_by_id('1')
self.assertEqual(cluster['id'], '1')
self.assert_calls()
def test_get_cluster_not_found_returns_false(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters',
'no-cluster']),
status_code=404)
])
c = self.cloud.get_cluster_by_id('no-cluster')
self.assertFalse(c)
self.assert_calls()
def test_update_cluster(self):
new_max_size = 5
updated_cluster = copy.copy(NEW_CLUSTERING_DICT)
updated_cluster['max_size'] = new_max_size
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": NEW_CLUSTERING_DICT}),
dict(method='PATCH',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json=updated_cluster,
)
])
cluster = self.cloud.get_cluster_by_id('1')
c = self.cloud.update_cluster(cluster, new_max_size)
self.assertEqual(updated_cluster, c)
self.assert_calls()
def test_delete_cluster(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": NEW_CLUSTERING_DICT}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'policies']),
json={"cluster_policies": []}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers']),
json={"receivers": []}),
dict(method='DELETE',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json=NEW_CLUSTERING_DICT)
])
self.assertTrue(self.cloud.delete_cluster('1'))
self.assert_calls()
def test_list_clusters(self):
clusters = {'clusters': [NEW_CLUSTERING_DICT]}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters']),
json=clusters)
])
c = self.cloud.list_clusters()
self.assertIsInstance(c, list)
self.assertAreInstances(c, dict)
self.assert_calls()
def test_attach_policy_to_cluster(self):
policy = {
'policy_id': '1',
'enabled': 'true'
}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": NEW_CLUSTERING_DICT}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies', '1']),
json={
"policy": NEW_POLICY_DICT}),
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'actions']),
json={'policy_attach': policy})
])
cluster = self.cloud.get_cluster_by_id('1')
policy = self.cloud.get_cluster_policy_by_id('1')
p = self.cloud.attach_policy_to_cluster(cluster, policy, 'true')
self.assertTrue(p)
self.assert_calls()
def test_detach_policy_from_cluster(self):
updated_cluster = copy.copy(NEW_CLUSTERING_DICT)
updated_cluster['policies'] = ['1']
detached_cluster = copy.copy(NEW_CLUSTERING_DICT)
detached_cluster['policies'] = []
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": NEW_CLUSTERING_DICT}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies', '1']),
json={
"policy": NEW_POLICY_DICT}),
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'actions']),
json={'policy_detach': {'policy_id': '1'}}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": updated_cluster}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": detached_cluster}),
])
cluster = self.cloud.get_cluster_by_id('1')
policy = self.cloud.get_cluster_policy_by_id('1')
p = self.cloud.detach_policy_from_cluster(cluster, policy, wait=True)
self.assertTrue(p)
self.assert_calls()
def test_get_policy_on_cluster_by_id(self):
cluster_policy = {
"cluster_id": "1",
"cluster_name": "cluster1",
"enabled": True,
"id": "1",
"policy_id": "1",
"policy_name": "policy1",
"policy_type": "senlin.policy.deletion-1.0"
}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'policies', '1']),
json={
"cluster_policy": cluster_policy})
])
policy = self.cloud.get_policy_on_cluster('1', '1')
self.assertEqual(policy['cluster_id'], '1')
self.assert_calls()
def test_get_policy_on_cluster_not_found_returns_false(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'policies', 'no-policy']),
status_code=404)
])
p = self.cloud.get_policy_on_cluster('1', 'no-policy')
self.assertFalse(p)
self.assert_calls()
def test_update_policy_on_cluster(self):
policy = {
'policy_id': '1',
'enabled': 'true'
}
updated_cluster = copy.copy(NEW_CLUSTERING_DICT)
updated_cluster['policies'] = policy
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": NEW_CLUSTERING_DICT}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies',
'1']),
json={
"policy": NEW_POLICY_DICT}),
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'actions']),
json={'policies': []})
])
cluster = self.cloud.get_cluster_by_id('1')
policy = self.cloud.get_cluster_policy_by_id('1')
p = self.cloud.update_policy_on_cluster(cluster, policy, True)
self.assertTrue(p)
self.assert_calls()
def test_get_policy_on_cluster(self):
cluster_policy = {
'cluster_id': '1',
'cluster_name': 'cluster1',
'enabled': 'true',
'id': '1',
'policy_id': '1',
'policy_name': 'policy1',
'policy_type': 'type'
}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'policies', '1']),
json={
"cluster_policy": cluster_policy})
])
get_policy = self.cloud.get_policy_on_cluster('1', '1')
self.assertEqual(get_policy, cluster_policy)
self.assert_calls()
def test_create_cluster_profile(self):
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles']),
json={'profile': NEW_PROFILE_DICT})
])
p = self.cloud.create_cluster_profile('fake-profile-name', {})
self.assertEqual(NEW_PROFILE_DICT, p)
self.assert_calls()
def test_create_cluster_profile_exception(self):
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles']),
status_code=500)
])
with testtools.ExpectedException(
shade.exc.OpenStackCloudHTTPError,
"Error creating profile fake-profile-name.*"):
self.cloud.create_cluster_profile('fake-profile-name', {})
self.assert_calls()
def test_list_cluster_profiles(self):
profiles = {'profiles': [NEW_PROFILE_DICT]}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles']),
json=profiles)
])
p = self.cloud.list_cluster_profiles()
self.assertIsInstance(p, list)
self.assertAreInstances(p, dict)
self.assert_calls()
def test_get_cluster_profile_by_id(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles', '1']),
json={
"profile": NEW_PROFILE_DICT})
])
p = self.cloud.get_cluster_profile_by_id('1')
self.assertEqual(p['id'], '1')
self.assert_calls()
def test_get_cluster_profile_not_found_returns_false(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles',
'no-profile']),
status_code=404)
])
p = self.cloud.get_cluster_profile_by_id('no-profile')
self.assertFalse(p)
self.assert_calls()
def test_update_cluster_profile(self):
new_name = "new-name"
updated_profile = copy.copy(NEW_PROFILE_DICT)
updated_profile['name'] = new_name
self.register_uris([
dict(method='PATCH',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles', '1']),
json=updated_profile,
)
])
p = self.cloud.update_cluster_profile('1', new_name=new_name)
self.assertEqual(updated_profile, p)
self.assert_calls()
def test_delete_cluster_profile(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles', '1']),
json={
"profile": NEW_PROFILE_DICT}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters']),
json={}),
dict(method='DELETE',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles', '1']),
json=NEW_PROFILE_DICT)
])
profile = self.cloud.get_cluster_profile_by_id('1')
self.assertTrue(self.cloud.delete_cluster_profile(profile))
self.assert_calls()
def test_create_cluster_policy(self):
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies']),
json={'policy': NEW_POLICY_DICT})
])
p = self.cloud.create_cluster_policy('fake-policy-name', {})
self.assertEqual(NEW_POLICY_DICT, p)
self.assert_calls()
def test_create_cluster_policy_exception(self):
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies']),
status_code=500)
])
with testtools.ExpectedException(
shade.exc.OpenStackCloudHTTPError,
"Error creating policy fake-policy-name.*"):
self.cloud.create_cluster_policy('fake-policy-name', {})
self.assert_calls()
def test_list_cluster_policies(self):
policies = {'policies': [NEW_POLICY_DICT]}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies']),
json=policies)
])
p = self.cloud.list_cluster_policies()
self.assertIsInstance(p, list)
self.assertAreInstances(p, dict)
self.assert_calls()
def test_get_cluster_policy_by_id(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies', '1']),
json={
"policy": NEW_POLICY_DICT})
])
p = self.cloud.get_cluster_policy_by_id('1')
self.assertEqual(p['id'], '1')
self.assert_calls()
def test_get_cluster_policy_not_found_returns_false(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies',
'no-policy']),
status_code=404)
])
p = self.cloud.get_cluster_policy_by_id('no-policy')
self.assertFalse(p)
self.assert_calls()
def test_update_cluster_policy(self):
new_name = "new-name"
updated_policy = copy.copy(NEW_POLICY_DICT)
updated_policy['name'] = new_name
self.register_uris([
dict(method='PATCH',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies', '1']),
json=updated_policy,
)
])
p = self.cloud.update_cluster_policy('1', new_name=new_name)
self.assertEqual(updated_policy, p)
self.assert_calls()
def test_delete_cluster_policy(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies', '1']),
json={
"policy": NEW_POLICY_DICT}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters']),
json={}),
dict(method='DELETE',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies', '1']),
json=NEW_POLICY_DICT)
])
self.assertTrue(self.cloud.delete_cluster_policy('1'))
self.assert_calls()
def test_create_cluster_receiver(self):
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers']),
json={'receiver': NEW_RECEIVER_DICT})
])
r = self.cloud.create_cluster_receiver('fake-receiver-name', {})
self.assertEqual(NEW_RECEIVER_DICT, r)
self.assert_calls()
def test_create_cluster_receiver_exception(self):
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers']),
status_code=500)
])
with testtools.ExpectedException(
shade.exc.OpenStackCloudHTTPError,
"Error creating receiver fake-receiver-name.*"):
self.cloud.create_cluster_receiver('fake-receiver-name', {})
self.assert_calls()
def test_list_cluster_receivers(self):
receivers = {'receivers': [NEW_RECEIVER_DICT]}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers']),
json=receivers)
])
r = self.cloud.list_cluster_receivers()
self.assertIsInstance(r, list)
self.assertAreInstances(r, dict)
self.assert_calls()
def test_get_cluster_receiver_by_id(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers', '1']),
json={
"receiver": NEW_RECEIVER_DICT})
])
r = self.cloud.get_cluster_receiver_by_id('1')
self.assertEqual(r['id'], '1')
self.assert_calls()
def test_get_cluster_receiver_not_found_returns_false(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers',
'no-receiver']),
json={'receivers': []})
])
p = self.cloud.get_cluster_receiver_by_id('no-receiver')
self.assertFalse(p)
self.assert_calls()
def test_update_cluster_receiver(self):
new_name = "new-name"
updated_receiver = copy.copy(NEW_RECEIVER_DICT)
updated_receiver['name'] = new_name
self.register_uris([
dict(method='PATCH',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers', '1']),
json=updated_receiver,
)
])
r = self.cloud.update_cluster_receiver('1', new_name=new_name)
self.assertEqual(updated_receiver, r)
self.assert_calls()
def test_delete_cluster_receiver(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers']),
json={
"receivers": [NEW_RECEIVER_DICT]}),
dict(method='DELETE',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers', '1']),
json=NEW_RECEIVER_DICT),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers', '1']),
json={}),
])
self.assertTrue(self.cloud.delete_cluster_receiver('1', wait=True))
self.assert_calls()