Remove H903 error in sources.

Some codes in python magnumclient use Windows style
line endings, this violates H903 error pep8.

Change-Id: I4f5424403a5187d6845d59d005be2324ff40f354
Closes-Bug: #1658845
This commit is contained in:
Hieu LE 2017-01-24 08:17:11 +07:00
parent af7e5f605a
commit de91fd3e0a
4 changed files with 1976 additions and 1976 deletions

View File

@ -1,335 +1,335 @@
# Copyright 2015 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import testtools
from testtools import matchers
from magnumclient import exceptions
from magnumclient.tests import utils
from magnumclient.v1 import clusters
CLUSTER1 = {'id': 123,
'uuid': '66666666-7777-8888-9999-000000000001',
'name': 'cluster1',
'cluster_template_id': 'e74c40e0-d825-11e2-a28f-0800200c9a61',
'stack_id': '5d12f6fd-a196-4bf0-ae4c-1f639a523a51',
'api_address': '172.17.2.1',
'node_addresses': ['172.17.2.3'],
'node_count': 2,
'master_count': 1,
}
CLUSTER2 = {'id': 124,
'uuid': '66666666-7777-8888-9999-000000000002',
'name': 'cluster2',
'cluster_template_id': 'e74c40e0-d825-11e2-a28f-0800200c9a62',
'stack_id': '5d12f6fd-a196-4bf0-ae4c-1f639a523a52',
'api_address': '172.17.2.2',
'node_addresses': ['172.17.2.4'],
'node_count': 2,
'master_count': 1,
}
CREATE_CLUSTER = copy.deepcopy(CLUSTER1)
del CREATE_CLUSTER['id']
del CREATE_CLUSTER['uuid']
del CREATE_CLUSTER['stack_id']
del CREATE_CLUSTER['api_address']
del CREATE_CLUSTER['node_addresses']
UPDATED_CLUSTER = copy.deepcopy(CLUSTER1)
NEW_NAME = 'newcluster'
UPDATED_CLUSTER['name'] = NEW_NAME
fake_responses = {
'/v1/clusters':
{
'GET': (
{},
{'clusters': [CLUSTER1, CLUSTER2]},
),
'POST': (
{},
CREATE_CLUSTER,
),
},
'/v1/clusters/%s' % CLUSTER1['id']:
{
'GET': (
{},
CLUSTER1
),
'DELETE': (
{},
None,
),
'PATCH': (
{},
UPDATED_CLUSTER,
),
},
'/v1/clusters/%s/?rollback=True' % CLUSTER1['id']:
{
'PATCH': (
{},
UPDATED_CLUSTER,
),
},
'/v1/clusters/%s' % CLUSTER1['name']:
{
'GET': (
{},
CLUSTER1
),
'DELETE': (
{},
None,
),
'PATCH': (
{},
UPDATED_CLUSTER,
),
},
'/v1/clusters/?limit=2':
{
'GET': (
{},
{'clusters': [CLUSTER1, CLUSTER2]},
),
},
'/v1/clusters/?marker=%s' % CLUSTER2['uuid']:
{
'GET': (
{},
{'clusters': [CLUSTER1, CLUSTER2]},
),
},
'/v1/clusters/?limit=2&marker=%s' % CLUSTER2['uuid']:
{
'GET': (
{},
{'clusters': [CLUSTER1, CLUSTER2]},
),
},
'/v1/clusters/?sort_dir=asc':
{
'GET': (
{},
{'clusters': [CLUSTER1, CLUSTER2]},
),
},
'/v1/clusters/?sort_key=uuid':
{
'GET': (
{},
{'clusters': [CLUSTER1, CLUSTER2]},
),
},
'/v1/clusters/?sort_key=uuid&sort_dir=desc':
{
'GET': (
{},
{'clusters': [CLUSTER2, CLUSTER1]},
),
},
}
class ClusterManagerTest(testtools.TestCase):
def setUp(self):
super(ClusterManagerTest, self).setUp()
self.api = utils.FakeAPI(fake_responses)
self.mgr = clusters.ClusterManager(self.api)
def test_cluster_list(self):
clusters = self.mgr.list()
expect = [
('GET', '/v1/clusters', {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertThat(clusters, matchers.HasLength(2))
def _test_cluster_list_with_filters(self, limit=None, marker=None,
sort_key=None, sort_dir=None,
detail=False, expect=[]):
clusters_filter = self.mgr.list(limit=limit,
marker=marker,
sort_key=sort_key,
sort_dir=sort_dir,
detail=detail)
self.assertEqual(expect, self.api.calls)
self.assertThat(clusters_filter, matchers.HasLength(2))
def test_cluster_list_with_limit(self):
expect = [
('GET', '/v1/clusters/?limit=2', {}, None),
]
self._test_cluster_list_with_filters(
limit=2,
expect=expect)
def test_cluster_list_with_marker(self):
expect = [
('GET', '/v1/clusters/?marker=%s' % CLUSTER2['uuid'], {}, None),
]
self._test_cluster_list_with_filters(
marker=CLUSTER2['uuid'],
expect=expect)
def test_cluster_list_with_marker_limit(self):
expect = [
('GET', '/v1/clusters/?limit=2&marker=%s' % CLUSTER2['uuid'],
{},
None),
]
self._test_cluster_list_with_filters(
limit=2, marker=CLUSTER2['uuid'],
expect=expect)
def test_cluster_list_with_sort_dir(self):
expect = [
('GET', '/v1/clusters/?sort_dir=asc', {}, None),
]
self._test_cluster_list_with_filters(
sort_dir='asc',
expect=expect)
def test_cluster_list_with_sort_key(self):
expect = [
('GET', '/v1/clusters/?sort_key=uuid', {}, None),
]
self._test_cluster_list_with_filters(
sort_key='uuid',
expect=expect)
def test_cluster_list_with_sort_key_dir(self):
expect = [
('GET', '/v1/clusters/?sort_key=uuid&sort_dir=desc', {}, None),
]
self._test_cluster_list_with_filters(
sort_key='uuid', sort_dir='desc',
expect=expect)
def test_cluster_show_by_id(self):
cluster = self.mgr.get(CLUSTER1['id'])
expect = [
('GET', '/v1/clusters/%s' % CLUSTER1['id'], {}, None)
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(CLUSTER1['name'], cluster.name)
self.assertEqual(CLUSTER1['cluster_template_id'],
cluster.cluster_template_id)
def test_cluster_show_by_name(self):
cluster = self.mgr.get(CLUSTER1['name'])
expect = [
('GET', '/v1/clusters/%s' % CLUSTER1['name'], {}, None)
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(CLUSTER1['name'], cluster.name)
self.assertEqual(CLUSTER1['cluster_template_id'],
cluster.cluster_template_id)
def test_cluster_create(self):
cluster = self.mgr.create(**CREATE_CLUSTER)
expect = [
('POST', '/v1/clusters', {}, CREATE_CLUSTER),
]
self.assertEqual(expect, self.api.calls)
self.assertTrue(cluster)
def test_cluster_create_with_keypair(self):
cluster_with_keypair = dict()
cluster_with_keypair.update(CREATE_CLUSTER)
cluster_with_keypair['keypair'] = 'test_key'
cluster = self.mgr.create(**cluster_with_keypair)
expect = [
('POST', '/v1/clusters', {}, cluster_with_keypair),
]
self.assertEqual(expect, self.api.calls)
self.assertTrue(cluster)
def test_cluster_create_with_discovery_url(self):
cluster_with_discovery = dict()
cluster_with_discovery.update(CREATE_CLUSTER)
cluster_with_discovery['discovery_url'] = 'discovery_url'
cluster = self.mgr.create(**cluster_with_discovery)
expect = [
('POST', '/v1/clusters', {}, cluster_with_discovery),
]
self.assertEqual(expect, self.api.calls)
self.assertTrue(cluster)
def test_cluster_create_with_cluster_create_timeout(self):
cluster_with_timeout = dict()
cluster_with_timeout.update(CREATE_CLUSTER)
cluster_with_timeout['create_timeout'] = '15'
cluster = self.mgr.create(**cluster_with_timeout)
expect = [
('POST', '/v1/clusters', {}, cluster_with_timeout),
]
self.assertEqual(expect, self.api.calls)
self.assertTrue(cluster)
def test_cluster_create_fail(self):
CREATE_CLUSTER_FAIL = copy.deepcopy(CREATE_CLUSTER)
CREATE_CLUSTER_FAIL["wrong_key"] = "wrong"
self.assertRaisesRegexp(exceptions.InvalidAttribute,
("Key must be in %s" %
','.join(clusters.CREATION_ATTRIBUTES)),
self.mgr.create, **CREATE_CLUSTER_FAIL)
self.assertEqual([], self.api.calls)
def test_cluster_delete_by_id(self):
cluster = self.mgr.delete(CLUSTER1['id'])
expect = [
('DELETE', '/v1/clusters/%s' % CLUSTER1['id'], {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertIsNone(cluster)
def test_cluster_delete_by_name(self):
cluster = self.mgr.delete(CLUSTER1['name'])
expect = [
('DELETE', '/v1/clusters/%s' % CLUSTER1['name'], {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertIsNone(cluster)
def test_cluster_update(self):
patch = {'op': 'replace',
'value': NEW_NAME,
'path': '/name'}
cluster = self.mgr.update(id=CLUSTER1['id'], patch=patch)
expect = [
('PATCH', '/v1/clusters/%s' % CLUSTER1['id'], {}, patch),
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(NEW_NAME, cluster.name)
def test_cluster_update_with_rollback(self):
patch = {'op': 'replace',
'value': NEW_NAME,
'path': '/name'}
cluster = self.mgr.update(id=CLUSTER1['id'], patch=patch,
rollback=True)
expect = [
('PATCH', '/v1/clusters/%s/?rollback=True' % CLUSTER1['id'],
{}, patch),
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(NEW_NAME, cluster.name)
# Copyright 2015 IBM Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import testtools
from testtools import matchers
from magnumclient import exceptions
from magnumclient.tests import utils
from magnumclient.v1 import clusters
CLUSTER1 = {'id': 123,
'uuid': '66666666-7777-8888-9999-000000000001',
'name': 'cluster1',
'cluster_template_id': 'e74c40e0-d825-11e2-a28f-0800200c9a61',
'stack_id': '5d12f6fd-a196-4bf0-ae4c-1f639a523a51',
'api_address': '172.17.2.1',
'node_addresses': ['172.17.2.3'],
'node_count': 2,
'master_count': 1,
}
CLUSTER2 = {'id': 124,
'uuid': '66666666-7777-8888-9999-000000000002',
'name': 'cluster2',
'cluster_template_id': 'e74c40e0-d825-11e2-a28f-0800200c9a62',
'stack_id': '5d12f6fd-a196-4bf0-ae4c-1f639a523a52',
'api_address': '172.17.2.2',
'node_addresses': ['172.17.2.4'],
'node_count': 2,
'master_count': 1,
}
CREATE_CLUSTER = copy.deepcopy(CLUSTER1)
del CREATE_CLUSTER['id']
del CREATE_CLUSTER['uuid']
del CREATE_CLUSTER['stack_id']
del CREATE_CLUSTER['api_address']
del CREATE_CLUSTER['node_addresses']
UPDATED_CLUSTER = copy.deepcopy(CLUSTER1)
NEW_NAME = 'newcluster'
UPDATED_CLUSTER['name'] = NEW_NAME
fake_responses = {
'/v1/clusters':
{
'GET': (
{},
{'clusters': [CLUSTER1, CLUSTER2]},
),
'POST': (
{},
CREATE_CLUSTER,
),
},
'/v1/clusters/%s' % CLUSTER1['id']:
{
'GET': (
{},
CLUSTER1
),
'DELETE': (
{},
None,
),
'PATCH': (
{},
UPDATED_CLUSTER,
),
},
'/v1/clusters/%s/?rollback=True' % CLUSTER1['id']:
{
'PATCH': (
{},
UPDATED_CLUSTER,
),
},
'/v1/clusters/%s' % CLUSTER1['name']:
{
'GET': (
{},
CLUSTER1
),
'DELETE': (
{},
None,
),
'PATCH': (
{},
UPDATED_CLUSTER,
),
},
'/v1/clusters/?limit=2':
{
'GET': (
{},
{'clusters': [CLUSTER1, CLUSTER2]},
),
},
'/v1/clusters/?marker=%s' % CLUSTER2['uuid']:
{
'GET': (
{},
{'clusters': [CLUSTER1, CLUSTER2]},
),
},
'/v1/clusters/?limit=2&marker=%s' % CLUSTER2['uuid']:
{
'GET': (
{},
{'clusters': [CLUSTER1, CLUSTER2]},
),
},
'/v1/clusters/?sort_dir=asc':
{
'GET': (
{},
{'clusters': [CLUSTER1, CLUSTER2]},
),
},
'/v1/clusters/?sort_key=uuid':
{
'GET': (
{},
{'clusters': [CLUSTER1, CLUSTER2]},
),
},
'/v1/clusters/?sort_key=uuid&sort_dir=desc':
{
'GET': (
{},
{'clusters': [CLUSTER2, CLUSTER1]},
),
},
}
class ClusterManagerTest(testtools.TestCase):
def setUp(self):
super(ClusterManagerTest, self).setUp()
self.api = utils.FakeAPI(fake_responses)
self.mgr = clusters.ClusterManager(self.api)
def test_cluster_list(self):
clusters = self.mgr.list()
expect = [
('GET', '/v1/clusters', {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertThat(clusters, matchers.HasLength(2))
def _test_cluster_list_with_filters(self, limit=None, marker=None,
sort_key=None, sort_dir=None,
detail=False, expect=[]):
clusters_filter = self.mgr.list(limit=limit,
marker=marker,
sort_key=sort_key,
sort_dir=sort_dir,
detail=detail)
self.assertEqual(expect, self.api.calls)
self.assertThat(clusters_filter, matchers.HasLength(2))
def test_cluster_list_with_limit(self):
expect = [
('GET', '/v1/clusters/?limit=2', {}, None),
]
self._test_cluster_list_with_filters(
limit=2,
expect=expect)
def test_cluster_list_with_marker(self):
expect = [
('GET', '/v1/clusters/?marker=%s' % CLUSTER2['uuid'], {}, None),
]
self._test_cluster_list_with_filters(
marker=CLUSTER2['uuid'],
expect=expect)
def test_cluster_list_with_marker_limit(self):
expect = [
('GET', '/v1/clusters/?limit=2&marker=%s' % CLUSTER2['uuid'],
{},
None),
]
self._test_cluster_list_with_filters(
limit=2, marker=CLUSTER2['uuid'],
expect=expect)
def test_cluster_list_with_sort_dir(self):
expect = [
('GET', '/v1/clusters/?sort_dir=asc', {}, None),
]
self._test_cluster_list_with_filters(
sort_dir='asc',
expect=expect)
def test_cluster_list_with_sort_key(self):
expect = [
('GET', '/v1/clusters/?sort_key=uuid', {}, None),
]
self._test_cluster_list_with_filters(
sort_key='uuid',
expect=expect)
def test_cluster_list_with_sort_key_dir(self):
expect = [
('GET', '/v1/clusters/?sort_key=uuid&sort_dir=desc', {}, None),
]
self._test_cluster_list_with_filters(
sort_key='uuid', sort_dir='desc',
expect=expect)
def test_cluster_show_by_id(self):
cluster = self.mgr.get(CLUSTER1['id'])
expect = [
('GET', '/v1/clusters/%s' % CLUSTER1['id'], {}, None)
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(CLUSTER1['name'], cluster.name)
self.assertEqual(CLUSTER1['cluster_template_id'],
cluster.cluster_template_id)
def test_cluster_show_by_name(self):
cluster = self.mgr.get(CLUSTER1['name'])
expect = [
('GET', '/v1/clusters/%s' % CLUSTER1['name'], {}, None)
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(CLUSTER1['name'], cluster.name)
self.assertEqual(CLUSTER1['cluster_template_id'],
cluster.cluster_template_id)
def test_cluster_create(self):
cluster = self.mgr.create(**CREATE_CLUSTER)
expect = [
('POST', '/v1/clusters', {}, CREATE_CLUSTER),
]
self.assertEqual(expect, self.api.calls)
self.assertTrue(cluster)
def test_cluster_create_with_keypair(self):
cluster_with_keypair = dict()
cluster_with_keypair.update(CREATE_CLUSTER)
cluster_with_keypair['keypair'] = 'test_key'
cluster = self.mgr.create(**cluster_with_keypair)
expect = [
('POST', '/v1/clusters', {}, cluster_with_keypair),
]
self.assertEqual(expect, self.api.calls)
self.assertTrue(cluster)
def test_cluster_create_with_discovery_url(self):
cluster_with_discovery = dict()
cluster_with_discovery.update(CREATE_CLUSTER)
cluster_with_discovery['discovery_url'] = 'discovery_url'
cluster = self.mgr.create(**cluster_with_discovery)
expect = [
('POST', '/v1/clusters', {}, cluster_with_discovery),
]
self.assertEqual(expect, self.api.calls)
self.assertTrue(cluster)
def test_cluster_create_with_cluster_create_timeout(self):
cluster_with_timeout = dict()
cluster_with_timeout.update(CREATE_CLUSTER)
cluster_with_timeout['create_timeout'] = '15'
cluster = self.mgr.create(**cluster_with_timeout)
expect = [
('POST', '/v1/clusters', {}, cluster_with_timeout),
]
self.assertEqual(expect, self.api.calls)
self.assertTrue(cluster)
def test_cluster_create_fail(self):
CREATE_CLUSTER_FAIL = copy.deepcopy(CREATE_CLUSTER)
CREATE_CLUSTER_FAIL["wrong_key"] = "wrong"
self.assertRaisesRegexp(exceptions.InvalidAttribute,
("Key must be in %s" %
','.join(clusters.CREATION_ATTRIBUTES)),
self.mgr.create, **CREATE_CLUSTER_FAIL)
self.assertEqual([], self.api.calls)
def test_cluster_delete_by_id(self):
cluster = self.mgr.delete(CLUSTER1['id'])
expect = [
('DELETE', '/v1/clusters/%s' % CLUSTER1['id'], {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertIsNone(cluster)
def test_cluster_delete_by_name(self):
cluster = self.mgr.delete(CLUSTER1['name'])
expect = [
('DELETE', '/v1/clusters/%s' % CLUSTER1['name'], {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertIsNone(cluster)
def test_cluster_update(self):
patch = {'op': 'replace',
'value': NEW_NAME,
'path': '/name'}
cluster = self.mgr.update(id=CLUSTER1['id'], patch=patch)
expect = [
('PATCH', '/v1/clusters/%s' % CLUSTER1['id'], {}, patch),
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(NEW_NAME, cluster.name)
def test_cluster_update_with_rollback(self):
patch = {'op': 'replace',
'value': NEW_NAME,
'path': '/name'}
cluster = self.mgr.update(id=CLUSTER1['id'], patch=patch,
rollback=True)
expect = [
('PATCH', '/v1/clusters/%s/?rollback=True' % CLUSTER1['id'],
{}, patch),
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(NEW_NAME, cluster.name)

View File

@ -1,467 +1,467 @@
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from magnumclient import exceptions
from magnumclient.tests.v1 import shell_test_base
from magnumclient.tests.v1 import test_clustertemplates_shell
from magnumclient.v1.clusters import Cluster
class FakeCluster(Cluster):
def __init__(self, manager=None, info={}, **kwargs):
Cluster.__init__(self, manager=manager, info=info)
self.uuid = kwargs.get('uuid', 'x')
self.keypair = kwargs.get('keypair', 'x')
self.name = kwargs.get('name', 'x')
self.cluster_template_id = kwargs.get('cluster_template_id', 'x')
self.stack_id = kwargs.get('stack_id', 'x')
self.status = kwargs.get('status', 'x')
self.master_count = kwargs.get('master_count', 1)
self.node_count = kwargs.get('node_count', 1)
self.links = kwargs.get('links', [])
self.create_timeout = kwargs.get('create_timeout', 60)
class FakeCert(object):
def __init__(self, pem):
self.pem = pem
class ShellTest(shell_test_base.TestCommandLineArgument):
def _get_expected_args_list(self, marker=None, limit=None,
sort_dir=None, sort_key=None):
expected_args = {}
expected_args['marker'] = marker
expected_args['limit'] = limit
expected_args['sort_dir'] = sort_dir
expected_args['sort_key'] = sort_key
return expected_args
def _get_expected_args_create(self, cluster_template_id, name=None,
master_count=1, node_count=1,
create_timeout=60, keypair=None,
discovery_url=None):
expected_args = {}
expected_args['name'] = name
expected_args['cluster_template_id'] = cluster_template_id
expected_args['master_count'] = master_count
expected_args['node_count'] = node_count
expected_args['create_timeout'] = create_timeout
expected_args['discovery_url'] = discovery_url
expected_args['keypair'] = keypair
return expected_args
@mock.patch('magnumclient.v1.clusters.ClusterManager.list')
def test_cluster_list_success(self, mock_list):
self._test_arg_success('cluster-list')
expected_args = self._get_expected_args_list()
mock_list.assert_called_once_with(**expected_args)
@mock.patch('magnumclient.v1.clusters.ClusterManager.list')
def test_cluster_list_success_with_arg(self, mock_list):
self._test_arg_success('cluster-list '
'--marker some_uuid '
'--limit 1 '
'--sort-dir asc '
'--sort-key uuid')
expected_args = self._get_expected_args_list('some_uuid', 1,
'asc', 'uuid')
mock_list.assert_called_once_with(**expected_args)
@mock.patch('magnumclient.v1.clusters.ClusterManager.list')
def test_cluster_list_ignored_duplicated_field(self, mock_list):
mock_list.return_value = [FakeCluster()]
self._test_arg_success(
'cluster-list --fields status,status,status,name',
keyword=('\n| uuid | name | keypair | node_count | '
'master_count | status |\n'))
# Output should be
# +------+------+---------+--------------+--------------+--------+
# | uuid | name | keypair | node_count | master_count | status |
# +------+------+---------+--------------+--------------+--------+
# | x | x | x | x | x | x |
# +------+------+---------+--------------+--------------+--------+
expected_args = self._get_expected_args_list()
mock_list.assert_called_once_with(**expected_args)
@mock.patch('magnumclient.v1.clusters.ClusterManager.list')
def test_cluster_list_failure_with_invalid_field(self, mock_list):
mock_list.return_value = [FakeCluster()]
_error_msg = [".*?^Non-existent fields are specified: ['xxx','zzz']"]
self.assertRaises(exceptions.CommandError,
self._test_arg_failure,
'cluster-list --fields xxx,stack_id,zzz,status',
_error_msg)
expected_args = self._get_expected_args_list()
mock_list.assert_called_once_with(**expected_args)
@mock.patch('magnumclient.v1.clusters.ClusterManager.list')
def test_cluster_list_failure_invalid_arg(self, mock_list):
_error_msg = [
'.*?^usage: magnum cluster-list ',
'.*?^error: argument --sort-dir: invalid choice: ',
".*?^Try 'magnum help cluster-list' for more information."
]
self._test_arg_failure('cluster-list --sort-dir aaa', _error_msg)
mock_list.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.list')
def test_cluster_list_failure(self, mock_list):
self._test_arg_failure('cluster-list --wrong',
self._unrecognized_arg_error)
mock_list.assert_not_called()
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_success(self, mock_create, mock_get):
mock_ct = mock.MagicMock()
mock_ct.uuid = 'xxx'
mock_get.return_value = mock_ct
self._test_arg_success('cluster-create --name test '
'--cluster-template xxx '
'--node-count 123 --timeout 15')
expected_args = self._get_expected_args_create('xxx', name='test',
node_count=123,
create_timeout=15)
mock_create.assert_called_with(**expected_args)
self._test_arg_success('cluster-create --cluster-template xxx')
expected_args = self._get_expected_args_create('xxx')
mock_create.assert_called_with(**expected_args)
self._test_arg_success('cluster-create --cluster-template xxx '
'--keypair x')
expected_args = self._get_expected_args_create('xxx',
keypair='x')
mock_create.assert_called_with(**expected_args)
self._test_arg_success('cluster-create --name test '
'--cluster-template xxx')
expected_args = self._get_expected_args_create('xxx',
name='test')
mock_create.assert_called_with(**expected_args)
self._test_arg_success('cluster-create --cluster-template xxx '
'--node-count 123')
expected_args = self._get_expected_args_create('xxx',
node_count=123)
mock_create.assert_called_with(**expected_args)
self._test_arg_success('cluster-create --cluster-template xxx '
'--node-count 123 --master-count 123')
expected_args = self._get_expected_args_create('xxx',
master_count=123,
node_count=123)
mock_create.assert_called_with(**expected_args)
self._test_arg_success('cluster-create --cluster-template xxx '
'--timeout 15')
expected_args = self._get_expected_args_create('xxx',
create_timeout=15)
mock_create.assert_called_with(**expected_args)
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_deprecation_warnings(self, mock_create, mock_get):
self._test_arg_failure('cluster-create --cluster-template xxx '
'--keypair-id x',
self._deprecated_warning)
self.assertTrue(mock_create.called)
self.assertTrue(mock_get.called)
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_deprecation_errors(self,
mock_create,
mock_get):
self._test_arg_failure('cluster-create --cluster-template xxx '
'--keypair-id x --keypair x',
self._too_many_group_arg_error)
self.assertFalse(mock_create.called)
self.assertFalse(mock_get.called)
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_show_clustertemplate_metadata(self,
mock_cluster,
mock_clustertemplate):
mock_cluster.return_value = mock.MagicMock(cluster_template_id=0)
mock_clustertemplate.return_value = \
test_clustertemplates_shell.FakeClusterTemplate(info={'links': 0,
'uuid': 0,
'id': 0,
'name': ''})
self._test_arg_success('cluster-show --long x')
mock_cluster.assert_called_once_with('x')
mock_clustertemplate.assert_called_once_with(0)
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_success_only_clustertemplate_arg(self,
mock_create,
mock_get):
mock_ct = mock.MagicMock()
mock_ct.uuid = 'xxx'
mock_get.return_value = mock_ct
self._test_arg_success('cluster-create --cluster-template xxx')
expected_args = self._get_expected_args_create('xxx')
mock_create.assert_called_with(**expected_args)
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_only_name(self, mock_create):
self._test_arg_failure('cluster-create --name test',
self._mandatory_arg_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_only_keypair(self, mock_create):
self._test_arg_failure('cluster-create --keypair test',
self._mandatory_arg_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_only_node_count(self, mock_create):
self._test_arg_failure('cluster-create --node-count 1',
self._mandatory_arg_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_invalid_node_count(self, mock_create):
self._test_arg_failure('cluster-create --cluster-template xxx '
'--node-count test',
self._invalid_value_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_only_cluster_create_timeout(self,
mock_create):
self._test_arg_failure('cluster-create --timeout 15',
self._mandatory_arg_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_no_arg(self, mock_create):
self._test_arg_failure('cluster-create',
self._mandatory_arg_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_invalid_master_count(self, mock_create):
self._test_arg_failure('cluster-create --cluster-template xxx '
'--master-count test',
self._invalid_value_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.delete')
def test_cluster_delete_success(self, mock_delete):
self._test_arg_success('cluster-delete xxx')
mock_delete.assert_called_once_with('xxx')
@mock.patch('magnumclient.v1.clusters.ClusterManager.delete')
def test_cluster_delete_multiple_id_success(self, mock_delete):
self._test_arg_success('cluster-delete xxx xyz')
calls = [mock.call('xxx'), mock.call('xyz')]
mock_delete.assert_has_calls(calls)
@mock.patch('magnumclient.v1.clusters.ClusterManager.delete')
def test_cluster_delete_failure_no_arg(self, mock_delete):
self._test_arg_failure('cluster-delete', self._few_argument_error)
mock_delete.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_show_success(self, mock_show):
self._test_arg_success('cluster-show xxx')
mock_show.assert_called_once_with('xxx')
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_show_failure_no_arg(self, mock_show):
self._test_arg_failure('cluster-show', self._few_argument_error)
mock_show.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_success(self, mock_update):
self._test_arg_success('cluster-update test add test=test')
patch = [{'op': 'add', 'path': '/test', 'value': 'test'}]
mock_update.assert_called_once_with('test', patch, False)
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_success_many_attribute(self, mock_update):
self._test_arg_success('cluster-update test add test=test test1=test1')
patch = [{'op': 'add', 'path': '/test', 'value': 'test'},
{'op': 'add', 'path': '/test1', 'value': 'test1'}]
mock_update.assert_called_once_with('test', patch, False)
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_success_rollback(self, mock_update):
self._test_arg_success('cluster-update test add test=test --rollback')
patch = [{'op': 'add', 'path': '/test', 'value': 'test'}]
mock_update.assert_called_once_with('test', patch, True)
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_rollback_old_api_version(self, mock_update):
self.assertRaises(
exceptions.CommandError,
self.shell,
'--magnum-api-version 1.2 cluster-update '
'test add test=test --rollback')
mock_update.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_failure_wrong_op(self, mock_update):
_error_msg = [
'.*?^usage: magnum cluster-update ',
'.*?^error: argument <op>: invalid choice: ',
".*?^Try 'magnum help cluster-update' for more information."
]
self._test_arg_failure('cluster-update test wrong test=test',
_error_msg)
mock_update.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_failure_wrong_attribute(self, mock_update):
_error_msg = [
'.*?^ERROR: Attributes must be a list of PATH=VALUE'
]
self.assertRaises(exceptions.CommandError, self._test_arg_failure,
'cluster-update test add test', _error_msg)
mock_update.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_failure_few_args(self, mock_update):
_error_msg = [
'.*?^usage: magnum cluster-update ',
'.*?^error: (the following arguments|too few arguments)',
".*?^Try 'magnum help cluster-update' for more information."
]
self._test_arg_failure('cluster-update', _error_msg)
mock_update.assert_not_called()
self._test_arg_failure('cluster-update test', _error_msg)
mock_update.assert_not_called()
self._test_arg_failure('cluster-update test add', _error_msg)
mock_update.assert_not_called()
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_config_success(self, mock_cluster, mock_clustertemplate):
mock_cluster.return_value = FakeCluster(status='UPDATE_COMPLETE')
self._test_arg_success('cluster-config xxx')
mock_cluster.assert_called_with('xxx')
mock_cluster.return_value = FakeCluster(status='CREATE_COMPLETE')
self._test_arg_success('cluster-config xxx')
mock_cluster.assert_called_with('xxx')
self._test_arg_success('cluster-config --dir /tmp xxx')
mock_cluster.assert_called_with('xxx')
self._test_arg_success('cluster-config --force xxx')
mock_cluster.assert_called_with('xxx')
self._test_arg_success('cluster-config --dir /tmp --force xxx')
mock_cluster.assert_called_with('xxx')
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_config_failure_wrong_status(self,
mock_cluster,
mock_clustertemplate):
mock_cluster.return_value = FakeCluster(status='CREATE_IN_PROGRESS')
self.assertRaises(exceptions.CommandError,
self._test_arg_failure,
'cluster-config xxx',
['.*?^Cluster in status: '])
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_config_failure_no_arg(self, mock_cluster):
self._test_arg_failure('cluster-config', self._few_argument_error)
mock_cluster.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_config_failure_wrong_arg(self, mock_cluster):
self._test_arg_failure('cluster-config xxx yyy',
self._unrecognized_arg_error)
mock_cluster.assert_not_called()
@mock.patch('os.path.exists')
@mock.patch('magnumclient.v1.certificates.CertificateManager.create')
@mock.patch('magnumclient.v1.certificates.CertificateManager.get')
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def _test_cluster_config_success(self, mock_cluster, mock_ct,
mock_cert_get, mock_cert_create,
mock_exists, coe, shell, tls_disable):
cert = FakeCert(pem='foo bar')
mock_exists.return_value = False
mock_cluster.return_value = FakeCluster(status='CREATE_COMPLETE',
info={
'name': 'Kluster',
'api_address': '10.0.0.1'},
cluster_template_id='fake_ct',
uuid='fake_cluster')
mock_cert_get.return_value = cert
mock_cert_create.return_value = cert
mock_ct.return_value = test_clustertemplates_shell.\
FakeClusterTemplate(coe=coe, name='fake_ct',
tls_disabled=tls_disable)
with mock.patch.dict('os.environ', {'SHELL': shell}):
self._test_arg_success('cluster-config test_cluster')
self.assertTrue(mock_exists.called)
mock_cluster.assert_called_once_with('test_cluster')
mock_ct.assert_called_once_with('fake_ct')
if not tls_disable:
mock_cert_create.assert_called_once_with(
cluster_uuid='fake_cluster', csr=mock.ANY)
mock_cert_get.assert_called_once_with(cluster_uuid='fake_cluster')
def test_cluster_config_swarm_success_with_tls_csh(self):
self._test_cluster_config_success(coe='swarm', shell='csh',
tls_disable=False)
def test_cluster_config_swarm_success_with_tls_non_csh(self):
self._test_cluster_config_success(coe='swarm', shell='zsh',
tls_disable=False)
def test_cluster_config_swarm_success_without_tls_csh(self):
self._test_cluster_config_success(coe='swarm', shell='csh',
tls_disable=True)
def test_cluster_config_swarm_success_without_tls_non_csh(self):
self._test_cluster_config_success(coe='swarm', shell='zsh',
tls_disable=True)
def test_cluster_config_k8s_success_with_tls_csh(self):
self._test_cluster_config_success(coe='kubernetes', shell='csh',
tls_disable=False)
def test_cluster_config_k8s_success_with_tls_non_csh(self):
self._test_cluster_config_success(coe='kubernetes', shell='zsh',
tls_disable=False)
def test_cluster_config_k8s_success_without_tls_csh(self):
self._test_cluster_config_success(coe='kubernetes', shell='csh',
tls_disable=True)
def test_cluster_config_k8s_success_without_tls_non_csh(self):
self._test_cluster_config_success(coe='kubernetes', shell='zsh',
tls_disable=True)
# Copyright 2015 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from magnumclient import exceptions
from magnumclient.tests.v1 import shell_test_base
from magnumclient.tests.v1 import test_clustertemplates_shell
from magnumclient.v1.clusters import Cluster
class FakeCluster(Cluster):
def __init__(self, manager=None, info={}, **kwargs):
Cluster.__init__(self, manager=manager, info=info)
self.uuid = kwargs.get('uuid', 'x')
self.keypair = kwargs.get('keypair', 'x')
self.name = kwargs.get('name', 'x')
self.cluster_template_id = kwargs.get('cluster_template_id', 'x')
self.stack_id = kwargs.get('stack_id', 'x')
self.status = kwargs.get('status', 'x')
self.master_count = kwargs.get('master_count', 1)
self.node_count = kwargs.get('node_count', 1)
self.links = kwargs.get('links', [])
self.create_timeout = kwargs.get('create_timeout', 60)
class FakeCert(object):
def __init__(self, pem):
self.pem = pem
class ShellTest(shell_test_base.TestCommandLineArgument):
def _get_expected_args_list(self, marker=None, limit=None,
sort_dir=None, sort_key=None):
expected_args = {}
expected_args['marker'] = marker
expected_args['limit'] = limit
expected_args['sort_dir'] = sort_dir
expected_args['sort_key'] = sort_key
return expected_args
def _get_expected_args_create(self, cluster_template_id, name=None,
master_count=1, node_count=1,
create_timeout=60, keypair=None,
discovery_url=None):
expected_args = {}
expected_args['name'] = name
expected_args['cluster_template_id'] = cluster_template_id
expected_args['master_count'] = master_count
expected_args['node_count'] = node_count
expected_args['create_timeout'] = create_timeout
expected_args['discovery_url'] = discovery_url
expected_args['keypair'] = keypair
return expected_args
@mock.patch('magnumclient.v1.clusters.ClusterManager.list')
def test_cluster_list_success(self, mock_list):
self._test_arg_success('cluster-list')
expected_args = self._get_expected_args_list()
mock_list.assert_called_once_with(**expected_args)
@mock.patch('magnumclient.v1.clusters.ClusterManager.list')
def test_cluster_list_success_with_arg(self, mock_list):
self._test_arg_success('cluster-list '
'--marker some_uuid '
'--limit 1 '
'--sort-dir asc '
'--sort-key uuid')
expected_args = self._get_expected_args_list('some_uuid', 1,
'asc', 'uuid')
mock_list.assert_called_once_with(**expected_args)
@mock.patch('magnumclient.v1.clusters.ClusterManager.list')
def test_cluster_list_ignored_duplicated_field(self, mock_list):
mock_list.return_value = [FakeCluster()]
self._test_arg_success(
'cluster-list --fields status,status,status,name',
keyword=('\n| uuid | name | keypair | node_count | '
'master_count | status |\n'))
# Output should be
# +------+------+---------+--------------+--------------+--------+
# | uuid | name | keypair | node_count | master_count | status |
# +------+------+---------+--------------+--------------+--------+
# | x | x | x | x | x | x |
# +------+------+---------+--------------+--------------+--------+
expected_args = self._get_expected_args_list()
mock_list.assert_called_once_with(**expected_args)
@mock.patch('magnumclient.v1.clusters.ClusterManager.list')
def test_cluster_list_failure_with_invalid_field(self, mock_list):
mock_list.return_value = [FakeCluster()]
_error_msg = [".*?^Non-existent fields are specified: ['xxx','zzz']"]
self.assertRaises(exceptions.CommandError,
self._test_arg_failure,
'cluster-list --fields xxx,stack_id,zzz,status',
_error_msg)
expected_args = self._get_expected_args_list()
mock_list.assert_called_once_with(**expected_args)
@mock.patch('magnumclient.v1.clusters.ClusterManager.list')
def test_cluster_list_failure_invalid_arg(self, mock_list):
_error_msg = [
'.*?^usage: magnum cluster-list ',
'.*?^error: argument --sort-dir: invalid choice: ',
".*?^Try 'magnum help cluster-list' for more information."
]
self._test_arg_failure('cluster-list --sort-dir aaa', _error_msg)
mock_list.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.list')
def test_cluster_list_failure(self, mock_list):
self._test_arg_failure('cluster-list --wrong',
self._unrecognized_arg_error)
mock_list.assert_not_called()
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_success(self, mock_create, mock_get):
mock_ct = mock.MagicMock()
mock_ct.uuid = 'xxx'
mock_get.return_value = mock_ct
self._test_arg_success('cluster-create --name test '
'--cluster-template xxx '
'--node-count 123 --timeout 15')
expected_args = self._get_expected_args_create('xxx', name='test',
node_count=123,
create_timeout=15)
mock_create.assert_called_with(**expected_args)
self._test_arg_success('cluster-create --cluster-template xxx')
expected_args = self._get_expected_args_create('xxx')
mock_create.assert_called_with(**expected_args)
self._test_arg_success('cluster-create --cluster-template xxx '
'--keypair x')
expected_args = self._get_expected_args_create('xxx',
keypair='x')
mock_create.assert_called_with(**expected_args)
self._test_arg_success('cluster-create --name test '
'--cluster-template xxx')
expected_args = self._get_expected_args_create('xxx',
name='test')
mock_create.assert_called_with(**expected_args)
self._test_arg_success('cluster-create --cluster-template xxx '
'--node-count 123')
expected_args = self._get_expected_args_create('xxx',
node_count=123)
mock_create.assert_called_with(**expected_args)
self._test_arg_success('cluster-create --cluster-template xxx '
'--node-count 123 --master-count 123')
expected_args = self._get_expected_args_create('xxx',
master_count=123,
node_count=123)
mock_create.assert_called_with(**expected_args)
self._test_arg_success('cluster-create --cluster-template xxx '
'--timeout 15')
expected_args = self._get_expected_args_create('xxx',
create_timeout=15)
mock_create.assert_called_with(**expected_args)
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_deprecation_warnings(self, mock_create, mock_get):
self._test_arg_failure('cluster-create --cluster-template xxx '
'--keypair-id x',
self._deprecated_warning)
self.assertTrue(mock_create.called)
self.assertTrue(mock_get.called)
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_deprecation_errors(self,
mock_create,
mock_get):
self._test_arg_failure('cluster-create --cluster-template xxx '
'--keypair-id x --keypair x',
self._too_many_group_arg_error)
self.assertFalse(mock_create.called)
self.assertFalse(mock_get.called)
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_show_clustertemplate_metadata(self,
mock_cluster,
mock_clustertemplate):
mock_cluster.return_value = mock.MagicMock(cluster_template_id=0)
mock_clustertemplate.return_value = \
test_clustertemplates_shell.FakeClusterTemplate(info={'links': 0,
'uuid': 0,
'id': 0,
'name': ''})
self._test_arg_success('cluster-show --long x')
mock_cluster.assert_called_once_with('x')
mock_clustertemplate.assert_called_once_with(0)
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_success_only_clustertemplate_arg(self,
mock_create,
mock_get):
mock_ct = mock.MagicMock()
mock_ct.uuid = 'xxx'
mock_get.return_value = mock_ct
self._test_arg_success('cluster-create --cluster-template xxx')
expected_args = self._get_expected_args_create('xxx')
mock_create.assert_called_with(**expected_args)
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_only_name(self, mock_create):
self._test_arg_failure('cluster-create --name test',
self._mandatory_arg_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_only_keypair(self, mock_create):
self._test_arg_failure('cluster-create --keypair test',
self._mandatory_arg_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_only_node_count(self, mock_create):
self._test_arg_failure('cluster-create --node-count 1',
self._mandatory_arg_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_invalid_node_count(self, mock_create):
self._test_arg_failure('cluster-create --cluster-template xxx '
'--node-count test',
self._invalid_value_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_only_cluster_create_timeout(self,
mock_create):
self._test_arg_failure('cluster-create --timeout 15',
self._mandatory_arg_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_no_arg(self, mock_create):
self._test_arg_failure('cluster-create',
self._mandatory_arg_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.create')
def test_cluster_create_failure_invalid_master_count(self, mock_create):
self._test_arg_failure('cluster-create --cluster-template xxx '
'--master-count test',
self._invalid_value_error)
mock_create.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.delete')
def test_cluster_delete_success(self, mock_delete):
self._test_arg_success('cluster-delete xxx')
mock_delete.assert_called_once_with('xxx')
@mock.patch('magnumclient.v1.clusters.ClusterManager.delete')
def test_cluster_delete_multiple_id_success(self, mock_delete):
self._test_arg_success('cluster-delete xxx xyz')
calls = [mock.call('xxx'), mock.call('xyz')]
mock_delete.assert_has_calls(calls)
@mock.patch('magnumclient.v1.clusters.ClusterManager.delete')
def test_cluster_delete_failure_no_arg(self, mock_delete):
self._test_arg_failure('cluster-delete', self._few_argument_error)
mock_delete.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_show_success(self, mock_show):
self._test_arg_success('cluster-show xxx')
mock_show.assert_called_once_with('xxx')
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_show_failure_no_arg(self, mock_show):
self._test_arg_failure('cluster-show', self._few_argument_error)
mock_show.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_success(self, mock_update):
self._test_arg_success('cluster-update test add test=test')
patch = [{'op': 'add', 'path': '/test', 'value': 'test'}]
mock_update.assert_called_once_with('test', patch, False)
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_success_many_attribute(self, mock_update):
self._test_arg_success('cluster-update test add test=test test1=test1')
patch = [{'op': 'add', 'path': '/test', 'value': 'test'},
{'op': 'add', 'path': '/test1', 'value': 'test1'}]
mock_update.assert_called_once_with('test', patch, False)
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_success_rollback(self, mock_update):
self._test_arg_success('cluster-update test add test=test --rollback')
patch = [{'op': 'add', 'path': '/test', 'value': 'test'}]
mock_update.assert_called_once_with('test', patch, True)
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_rollback_old_api_version(self, mock_update):
self.assertRaises(
exceptions.CommandError,
self.shell,
'--magnum-api-version 1.2 cluster-update '
'test add test=test --rollback')
mock_update.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_failure_wrong_op(self, mock_update):
_error_msg = [
'.*?^usage: magnum cluster-update ',
'.*?^error: argument <op>: invalid choice: ',
".*?^Try 'magnum help cluster-update' for more information."
]
self._test_arg_failure('cluster-update test wrong test=test',
_error_msg)
mock_update.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_failure_wrong_attribute(self, mock_update):
_error_msg = [
'.*?^ERROR: Attributes must be a list of PATH=VALUE'
]
self.assertRaises(exceptions.CommandError, self._test_arg_failure,
'cluster-update test add test', _error_msg)
mock_update.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.update')
def test_cluster_update_failure_few_args(self, mock_update):
_error_msg = [
'.*?^usage: magnum cluster-update ',
'.*?^error: (the following arguments|too few arguments)',
".*?^Try 'magnum help cluster-update' for more information."
]
self._test_arg_failure('cluster-update', _error_msg)
mock_update.assert_not_called()
self._test_arg_failure('cluster-update test', _error_msg)
mock_update.assert_not_called()
self._test_arg_failure('cluster-update test add', _error_msg)
mock_update.assert_not_called()
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_config_success(self, mock_cluster, mock_clustertemplate):
mock_cluster.return_value = FakeCluster(status='UPDATE_COMPLETE')
self._test_arg_success('cluster-config xxx')
mock_cluster.assert_called_with('xxx')
mock_cluster.return_value = FakeCluster(status='CREATE_COMPLETE')
self._test_arg_success('cluster-config xxx')
mock_cluster.assert_called_with('xxx')
self._test_arg_success('cluster-config --dir /tmp xxx')
mock_cluster.assert_called_with('xxx')
self._test_arg_success('cluster-config --force xxx')
mock_cluster.assert_called_with('xxx')
self._test_arg_success('cluster-config --dir /tmp --force xxx')
mock_cluster.assert_called_with('xxx')
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_config_failure_wrong_status(self,
mock_cluster,
mock_clustertemplate):
mock_cluster.return_value = FakeCluster(status='CREATE_IN_PROGRESS')
self.assertRaises(exceptions.CommandError,
self._test_arg_failure,
'cluster-config xxx',
['.*?^Cluster in status: '])
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_config_failure_no_arg(self, mock_cluster):
self._test_arg_failure('cluster-config', self._few_argument_error)
mock_cluster.assert_not_called()
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def test_cluster_config_failure_wrong_arg(self, mock_cluster):
self._test_arg_failure('cluster-config xxx yyy',
self._unrecognized_arg_error)
mock_cluster.assert_not_called()
@mock.patch('os.path.exists')
@mock.patch('magnumclient.v1.certificates.CertificateManager.create')
@mock.patch('magnumclient.v1.certificates.CertificateManager.get')
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def _test_cluster_config_success(self, mock_cluster, mock_ct,
mock_cert_get, mock_cert_create,
mock_exists, coe, shell, tls_disable):
cert = FakeCert(pem='foo bar')
mock_exists.return_value = False
mock_cluster.return_value = FakeCluster(status='CREATE_COMPLETE',
info={
'name': 'Kluster',
'api_address': '10.0.0.1'},
cluster_template_id='fake_ct',
uuid='fake_cluster')
mock_cert_get.return_value = cert
mock_cert_create.return_value = cert
mock_ct.return_value = test_clustertemplates_shell.\
FakeClusterTemplate(coe=coe, name='fake_ct',
tls_disabled=tls_disable)
with mock.patch.dict('os.environ', {'SHELL': shell}):
self._test_arg_success('cluster-config test_cluster')
self.assertTrue(mock_exists.called)
mock_cluster.assert_called_once_with('test_cluster')
mock_ct.assert_called_once_with('fake_ct')
if not tls_disable:
mock_cert_create.assert_called_once_with(
cluster_uuid='fake_cluster', csr=mock.ANY)
mock_cert_get.assert_called_once_with(cluster_uuid='fake_cluster')
def test_cluster_config_swarm_success_with_tls_csh(self):
self._test_cluster_config_success(coe='swarm', shell='csh',
tls_disable=False)
def test_cluster_config_swarm_success_with_tls_non_csh(self):
self._test_cluster_config_success(coe='swarm', shell='zsh',
tls_disable=False)
def test_cluster_config_swarm_success_without_tls_csh(self):
self._test_cluster_config_success(coe='swarm', shell='csh',
tls_disable=True)
def test_cluster_config_swarm_success_without_tls_non_csh(self):
self._test_cluster_config_success(coe='swarm', shell='zsh',
tls_disable=True)
def test_cluster_config_k8s_success_with_tls_csh(self):
self._test_cluster_config_success(coe='kubernetes', shell='csh',
tls_disable=False)
def test_cluster_config_k8s_success_with_tls_non_csh(self):
self._test_cluster_config_success(coe='kubernetes', shell='zsh',
tls_disable=False)
def test_cluster_config_k8s_success_without_tls_csh(self):
self._test_cluster_config_success(coe='kubernetes', shell='csh',
tls_disable=True)
def test_cluster_config_k8s_success_without_tls_non_csh(self):
self._test_cluster_config_success(coe='kubernetes', shell='zsh',
tls_disable=True)

View File

@ -1,433 +1,433 @@
# Copyright 2015 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import testtools
from testtools import matchers
from magnumclient import exceptions
from magnumclient.tests import utils
from magnumclient.v1 import cluster_templates
CLUSTERTEMPLATE1 = {
'id': 123,
'uuid': '66666666-7777-8888-9999-000000000001',
'name': 'clustertemplate1',
'image_id': 'clustertemplate1-image',
'master_flavor_id': 'm1.tiny',
'flavor_id': 'm1.small',
'external_network_id': 'd1f02cfb-d27f-4068-9332-84d907cb0e21',
'fixed_network': 'private',
'fixed_subnet': 'private-subnet',
'network_driver': 'libnetwork',
'volume_driver': 'rexray',
'dns_nameserver': '8.8.1.1',
'docker_volume_size': '71',
'docker_storage_driver': 'devicemapper',
'coe': 'swarm',
'http_proxy': 'http_proxy',
'https_proxy': 'https_proxy',
'no_proxy': 'no_proxy',
'labels': 'key1=val1,key11=val11',
'tls_disabled': False,
'public': False,
'registry_enabled': False,
'master_lb_enabled': True,
'floating_ip_enabled': True
}
CLUSTERTEMPLATE2 = {
'id': 124,
'uuid': '66666666-7777-8888-9999-000000000002',
'name': 'clustertemplate2',
'image_id': 'clustertemplate2-image',
'flavor_id': 'm2.small',
'master_flavor_id': 'm2.tiny',
'external_network_id': 'd1f02cfb-d27f-4068-9332-84d907cb0e22',
'fixed_network': 'private2',
'network_driver': 'flannel',
'volume_driver': 'cinder',
'dns_nameserver': '8.8.1.2',
'docker_volume_size': '50',
'docker_storage_driver': 'overlay',
'coe': 'kubernetes',
'labels': 'key2=val2,key22=val22',
'tls_disabled': True,
'public': True,
'registry_enabled': True}
CREATE_CLUSTERTEMPLATE = copy.deepcopy(CLUSTERTEMPLATE1)
del CREATE_CLUSTERTEMPLATE['id']
del CREATE_CLUSTERTEMPLATE['uuid']
UPDATED_CLUSTERTEMPLATE = copy.deepcopy(CLUSTERTEMPLATE1)
NEW_NAME = 'newcluster'
UPDATED_CLUSTERTEMPLATE['name'] = NEW_NAME
fake_responses = {
'/v1/clustertemplates':
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
'POST': (
{},
CREATE_CLUSTERTEMPLATE,
),
},
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['id']:
{
'GET': (
{},
CLUSTERTEMPLATE1
),
'DELETE': (
{},
None,
),
'PATCH': (
{},
UPDATED_CLUSTERTEMPLATE,
),
},
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['name']:
{
'GET': (
{},
CLUSTERTEMPLATE1
),
'DELETE': (
{},
None,
),
'PATCH': (
{},
UPDATED_CLUSTERTEMPLATE,
),
},
'/v1/clustertemplates/detail':
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
},
'/v1/clustertemplates/?limit=2':
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
},
'/v1/clustertemplates/?marker=%s' % CLUSTERTEMPLATE2['uuid']:
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
},
'/v1/clustertemplates/?limit=2&marker=%s' % CLUSTERTEMPLATE2['uuid']:
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
},
'/v1/clustertemplates/?sort_dir=asc':
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
},
'/v1/clustertemplates/?sort_key=uuid':
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
},
'/v1/clustertemplates/?sort_key=uuid&sort_dir=desc':
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE2, CLUSTERTEMPLATE1]},
),
},
}
class ClusterTemplateManagerTest(testtools.TestCase):
def setUp(self):
super(ClusterTemplateManagerTest, self).setUp()
self.api = utils.FakeAPI(fake_responses)
self.mgr = cluster_templates.ClusterTemplateManager(self.api)
def test_clustertemplate_list(self):
clustertemplates = self.mgr.list()
expect = [
('GET', '/v1/clustertemplates', {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertThat(clustertemplates, matchers.HasLength(2))
def _test_clustertemplate_list_with_filters(
self, limit=None, marker=None,
sort_key=None, sort_dir=None,
detail=False, expect=[]):
clustertemplates_filter = self.mgr.list(limit=limit,
marker=marker,
sort_key=sort_key,
sort_dir=sort_dir,
detail=detail)
self.assertEqual(expect, self.api.calls)
self.assertThat(clustertemplates_filter, matchers.HasLength(2))
def test_clustertemplate_list_with_detail(self):
expect = [
('GET', '/v1/clustertemplates/detail', {}, None),
]
self._test_clustertemplate_list_with_filters(
detail=True,
expect=expect)
def test_clustertemplate_list_with_limit(self):
expect = [
('GET', '/v1/clustertemplates/?limit=2', {}, None),
]
self._test_clustertemplate_list_with_filters(
limit=2,
expect=expect)
def test_clustertemplate_list_with_marker(self):
expect = [
('GET',
'/v1/clustertemplates/?marker=%s' % CLUSTERTEMPLATE2['uuid'],
{},
None),
]
self._test_clustertemplate_list_with_filters(
marker=CLUSTERTEMPLATE2['uuid'],
expect=expect)
def test_clustertemplate_list_with_marker_limit(self):
expect = [
('GET',
'/v1/clustertemplates/?limit=2&marker=%s' %
CLUSTERTEMPLATE2['uuid'],
{},
None),
]
self._test_clustertemplate_list_with_filters(
limit=2, marker=CLUSTERTEMPLATE2['uuid'],
expect=expect)
def test_clustertemplate_list_with_sort_dir(self):
expect = [
('GET', '/v1/clustertemplates/?sort_dir=asc', {}, None),
]
self._test_clustertemplate_list_with_filters(
sort_dir='asc',
expect=expect)
def test_clustertemplate_list_with_sort_key(self):
expect = [
('GET', '/v1/clustertemplates/?sort_key=uuid', {}, None),
]
self._test_clustertemplate_list_with_filters(
sort_key='uuid',
expect=expect)
def test_clustertemplate_list_with_sort_key_dir(self):
expect = [
('GET',
'/v1/clustertemplates/?sort_key=uuid&sort_dir=desc',
{},
None),
]
self._test_clustertemplate_list_with_filters(
sort_key='uuid', sort_dir='desc',
expect=expect)
def test_clustertemplate_show_by_id(self):
cluster_template = self.mgr.get(CLUSTERTEMPLATE1['id'])
expect = [
('GET',
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['id'],
{},
None)
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(CLUSTERTEMPLATE1['name'],
cluster_template.name)
self.assertEqual(CLUSTERTEMPLATE1['image_id'],
cluster_template.image_id)
self.assertEqual(CLUSTERTEMPLATE1['docker_volume_size'],
cluster_template.docker_volume_size)
self.assertEqual(CLUSTERTEMPLATE1['docker_storage_driver'],
cluster_template.docker_storage_driver)
self.assertEqual(CLUSTERTEMPLATE1['fixed_network'],
cluster_template.fixed_network)
self.assertEqual(CLUSTERTEMPLATE1['fixed_subnet'],
cluster_template.fixed_subnet)
self.assertEqual(CLUSTERTEMPLATE1['coe'],
cluster_template.coe)
self.assertEqual(CLUSTERTEMPLATE1['http_proxy'],
cluster_template.http_proxy)
self.assertEqual(CLUSTERTEMPLATE1['https_proxy'],
cluster_template.https_proxy)
self.assertEqual(CLUSTERTEMPLATE1['no_proxy'],
cluster_template.no_proxy)
self.assertEqual(CLUSTERTEMPLATE1['network_driver'],
cluster_template.network_driver)
self.assertEqual(CLUSTERTEMPLATE1['volume_driver'],
cluster_template.volume_driver)
self.assertEqual(CLUSTERTEMPLATE1['labels'],
cluster_template.labels)
self.assertEqual(CLUSTERTEMPLATE1['tls_disabled'],
cluster_template.tls_disabled)
self.assertEqual(CLUSTERTEMPLATE1['public'],
cluster_template.public)
self.assertEqual(CLUSTERTEMPLATE1['registry_enabled'],
cluster_template.registry_enabled)
self.assertEqual(CLUSTERTEMPLATE1['master_lb_enabled'],
cluster_template.master_lb_enabled)
self.assertEqual(CLUSTERTEMPLATE1['floating_ip_enabled'],
cluster_template.floating_ip_enabled)
def test_clustertemplate_show_by_name(self):
cluster_template = self.mgr.get(CLUSTERTEMPLATE1['name'])
expect = [
('GET',
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['name'],
{},
None)
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(CLUSTERTEMPLATE1['name'],
cluster_template.name)
self.assertEqual(CLUSTERTEMPLATE1['image_id'],
cluster_template.image_id)
self.assertEqual(CLUSTERTEMPLATE1['docker_volume_size'],
cluster_template.docker_volume_size)
self.assertEqual(CLUSTERTEMPLATE1['docker_storage_driver'],
cluster_template.docker_storage_driver)
self.assertEqual(CLUSTERTEMPLATE1['fixed_network'],
cluster_template.fixed_network)
self.assertEqual(CLUSTERTEMPLATE1['fixed_subnet'],
cluster_template.fixed_subnet)
self.assertEqual(CLUSTERTEMPLATE1['coe'],
cluster_template.coe)
self.assertEqual(CLUSTERTEMPLATE1['http_proxy'],
cluster_template.http_proxy)
self.assertEqual(CLUSTERTEMPLATE1['https_proxy'],
cluster_template.https_proxy)
self.assertEqual(CLUSTERTEMPLATE1['no_proxy'],
cluster_template.no_proxy)
self.assertEqual(CLUSTERTEMPLATE1['network_driver'],
cluster_template.network_driver)
self.assertEqual(CLUSTERTEMPLATE1['volume_driver'],
cluster_template.volume_driver)
self.assertEqual(CLUSTERTEMPLATE1['labels'],
cluster_template.labels)
self.assertEqual(CLUSTERTEMPLATE1['tls_disabled'],
cluster_template.tls_disabled)
self.assertEqual(CLUSTERTEMPLATE1['public'],
cluster_template.public)
self.assertEqual(CLUSTERTEMPLATE1['registry_enabled'],
cluster_template.registry_enabled)
self.assertEqual(CLUSTERTEMPLATE1['master_lb_enabled'],
cluster_template.master_lb_enabled)
self.assertEqual(CLUSTERTEMPLATE1['floating_ip_enabled'],
cluster_template.floating_ip_enabled)
def test_clustertemplate_create(self):
cluster_template = self.mgr.create(**CREATE_CLUSTERTEMPLATE)
expect = [
('POST', '/v1/clustertemplates', {}, CREATE_CLUSTERTEMPLATE),
]
self.assertEqual(expect, self.api.calls)
self.assertTrue(cluster_template)
self.assertEqual(CLUSTERTEMPLATE1['docker_volume_size'],
cluster_template.docker_volume_size)
self.assertEqual(CLUSTERTEMPLATE1['docker_storage_driver'],
cluster_template.docker_storage_driver)
def test_clustertemplate_create_with_keypair(self):
cluster_template_with_keypair = dict()
cluster_template_with_keypair.update(CREATE_CLUSTERTEMPLATE)
cluster_template_with_keypair['keypair_id'] = 'test_key'
cluster_template = self.mgr.create(**cluster_template_with_keypair)
expect = [
('POST', '/v1/clustertemplates', {},
cluster_template_with_keypair),
]
self.assertEqual(expect, self.api.calls)
self.assertTrue(cluster_template)
self.assertEqual(CLUSTERTEMPLATE1['docker_volume_size'],
cluster_template.docker_volume_size)
self.assertEqual(CLUSTERTEMPLATE1['docker_storage_driver'],
cluster_template.docker_storage_driver)
def test_clustertemplate_create_fail(self):
CREATE_CLUSTERTEMPLATE_FAIL = copy.deepcopy(CREATE_CLUSTERTEMPLATE)
CREATE_CLUSTERTEMPLATE_FAIL["wrong_key"] = "wrong"
self.assertRaisesRegexp(
exceptions.InvalidAttribute,
("Key must be in %s" %
','.join(cluster_templates.CREATION_ATTRIBUTES)),
self.mgr.create, **CREATE_CLUSTERTEMPLATE_FAIL)
self.assertEqual([], self.api.calls)
def test_clustertemplate_delete_by_id(self):
cluster_template = self.mgr.delete(CLUSTERTEMPLATE1['id'])
expect = [
('DELETE',
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['id'],
{},
None),
]
self.assertEqual(expect, self.api.calls)
self.assertIsNone(cluster_template)
def test_clustertemplate_delete_by_name(self):
cluster_template = self.mgr.delete(CLUSTERTEMPLATE1['name'])
expect = [
('DELETE',
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['name'],
{},
None),
]
self.assertEqual(expect, self.api.calls)
self.assertIsNone(cluster_template)
def test_clustertemplate_update(self):
patch = {'op': 'replace',
'value': NEW_NAME,
'path': '/name'}
cluster_template = self.mgr.update(id=CLUSTERTEMPLATE1['id'],
patch=patch)
expect = [
('PATCH',
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['id'],
{},
patch),
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(NEW_NAME, cluster_template.name)
# Copyright 2015 IBM Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import testtools
from testtools import matchers
from magnumclient import exceptions
from magnumclient.tests import utils
from magnumclient.v1 import cluster_templates
CLUSTERTEMPLATE1 = {
'id': 123,
'uuid': '66666666-7777-8888-9999-000000000001',
'name': 'clustertemplate1',
'image_id': 'clustertemplate1-image',
'master_flavor_id': 'm1.tiny',
'flavor_id': 'm1.small',
'external_network_id': 'd1f02cfb-d27f-4068-9332-84d907cb0e21',
'fixed_network': 'private',
'fixed_subnet': 'private-subnet',
'network_driver': 'libnetwork',
'volume_driver': 'rexray',
'dns_nameserver': '8.8.1.1',
'docker_volume_size': '71',
'docker_storage_driver': 'devicemapper',
'coe': 'swarm',
'http_proxy': 'http_proxy',
'https_proxy': 'https_proxy',
'no_proxy': 'no_proxy',
'labels': 'key1=val1,key11=val11',
'tls_disabled': False,
'public': False,
'registry_enabled': False,
'master_lb_enabled': True,
'floating_ip_enabled': True
}
CLUSTERTEMPLATE2 = {
'id': 124,
'uuid': '66666666-7777-8888-9999-000000000002',
'name': 'clustertemplate2',
'image_id': 'clustertemplate2-image',
'flavor_id': 'm2.small',
'master_flavor_id': 'm2.tiny',
'external_network_id': 'd1f02cfb-d27f-4068-9332-84d907cb0e22',
'fixed_network': 'private2',
'network_driver': 'flannel',
'volume_driver': 'cinder',
'dns_nameserver': '8.8.1.2',
'docker_volume_size': '50',
'docker_storage_driver': 'overlay',
'coe': 'kubernetes',
'labels': 'key2=val2,key22=val22',
'tls_disabled': True,
'public': True,
'registry_enabled': True}
CREATE_CLUSTERTEMPLATE = copy.deepcopy(CLUSTERTEMPLATE1)
del CREATE_CLUSTERTEMPLATE['id']
del CREATE_CLUSTERTEMPLATE['uuid']
UPDATED_CLUSTERTEMPLATE = copy.deepcopy(CLUSTERTEMPLATE1)
NEW_NAME = 'newcluster'
UPDATED_CLUSTERTEMPLATE['name'] = NEW_NAME
fake_responses = {
'/v1/clustertemplates':
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
'POST': (
{},
CREATE_CLUSTERTEMPLATE,
),
},
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['id']:
{
'GET': (
{},
CLUSTERTEMPLATE1
),
'DELETE': (
{},
None,
),
'PATCH': (
{},
UPDATED_CLUSTERTEMPLATE,
),
},
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['name']:
{
'GET': (
{},
CLUSTERTEMPLATE1
),
'DELETE': (
{},
None,
),
'PATCH': (
{},
UPDATED_CLUSTERTEMPLATE,
),
},
'/v1/clustertemplates/detail':
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
},
'/v1/clustertemplates/?limit=2':
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
},
'/v1/clustertemplates/?marker=%s' % CLUSTERTEMPLATE2['uuid']:
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
},
'/v1/clustertemplates/?limit=2&marker=%s' % CLUSTERTEMPLATE2['uuid']:
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
},
'/v1/clustertemplates/?sort_dir=asc':
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
},
'/v1/clustertemplates/?sort_key=uuid':
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE1, CLUSTERTEMPLATE2]},
),
},
'/v1/clustertemplates/?sort_key=uuid&sort_dir=desc':
{
'GET': (
{},
{'clustertemplates': [CLUSTERTEMPLATE2, CLUSTERTEMPLATE1]},
),
},
}
class ClusterTemplateManagerTest(testtools.TestCase):
def setUp(self):
super(ClusterTemplateManagerTest, self).setUp()
self.api = utils.FakeAPI(fake_responses)
self.mgr = cluster_templates.ClusterTemplateManager(self.api)
def test_clustertemplate_list(self):
clustertemplates = self.mgr.list()
expect = [
('GET', '/v1/clustertemplates', {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertThat(clustertemplates, matchers.HasLength(2))
def _test_clustertemplate_list_with_filters(
self, limit=None, marker=None,
sort_key=None, sort_dir=None,
detail=False, expect=[]):
clustertemplates_filter = self.mgr.list(limit=limit,
marker=marker,
sort_key=sort_key,
sort_dir=sort_dir,
detail=detail)
self.assertEqual(expect, self.api.calls)
self.assertThat(clustertemplates_filter, matchers.HasLength(2))
def test_clustertemplate_list_with_detail(self):
expect = [
('GET', '/v1/clustertemplates/detail', {}, None),
]
self._test_clustertemplate_list_with_filters(
detail=True,
expect=expect)
def test_clustertemplate_list_with_limit(self):
expect = [
('GET', '/v1/clustertemplates/?limit=2', {}, None),
]
self._test_clustertemplate_list_with_filters(
limit=2,
expect=expect)
def test_clustertemplate_list_with_marker(self):
expect = [
('GET',
'/v1/clustertemplates/?marker=%s' % CLUSTERTEMPLATE2['uuid'],
{},
None),
]
self._test_clustertemplate_list_with_filters(
marker=CLUSTERTEMPLATE2['uuid'],
expect=expect)
def test_clustertemplate_list_with_marker_limit(self):
expect = [
('GET',
'/v1/clustertemplates/?limit=2&marker=%s' %
CLUSTERTEMPLATE2['uuid'],
{},
None),
]
self._test_clustertemplate_list_with_filters(
limit=2, marker=CLUSTERTEMPLATE2['uuid'],
expect=expect)
def test_clustertemplate_list_with_sort_dir(self):
expect = [
('GET', '/v1/clustertemplates/?sort_dir=asc', {}, None),
]
self._test_clustertemplate_list_with_filters(
sort_dir='asc',
expect=expect)
def test_clustertemplate_list_with_sort_key(self):
expect = [
('GET', '/v1/clustertemplates/?sort_key=uuid', {}, None),
]
self._test_clustertemplate_list_with_filters(
sort_key='uuid',
expect=expect)
def test_clustertemplate_list_with_sort_key_dir(self):
expect = [
('GET',
'/v1/clustertemplates/?sort_key=uuid&sort_dir=desc',
{},
None),
]
self._test_clustertemplate_list_with_filters(
sort_key='uuid', sort_dir='desc',
expect=expect)
def test_clustertemplate_show_by_id(self):
cluster_template = self.mgr.get(CLUSTERTEMPLATE1['id'])
expect = [
('GET',
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['id'],
{},
None)
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(CLUSTERTEMPLATE1['name'],
cluster_template.name)
self.assertEqual(CLUSTERTEMPLATE1['image_id'],
cluster_template.image_id)
self.assertEqual(CLUSTERTEMPLATE1['docker_volume_size'],
cluster_template.docker_volume_size)
self.assertEqual(CLUSTERTEMPLATE1['docker_storage_driver'],
cluster_template.docker_storage_driver)
self.assertEqual(CLUSTERTEMPLATE1['fixed_network'],
cluster_template.fixed_network)
self.assertEqual(CLUSTERTEMPLATE1['fixed_subnet'],
cluster_template.fixed_subnet)
self.assertEqual(CLUSTERTEMPLATE1['coe'],
cluster_template.coe)
self.assertEqual(CLUSTERTEMPLATE1['http_proxy'],
cluster_template.http_proxy)
self.assertEqual(CLUSTERTEMPLATE1['https_proxy'],
cluster_template.https_proxy)
self.assertEqual(CLUSTERTEMPLATE1['no_proxy'],
cluster_template.no_proxy)
self.assertEqual(CLUSTERTEMPLATE1['network_driver'],
cluster_template.network_driver)
self.assertEqual(CLUSTERTEMPLATE1['volume_driver'],
cluster_template.volume_driver)
self.assertEqual(CLUSTERTEMPLATE1['labels'],
cluster_template.labels)
self.assertEqual(CLUSTERTEMPLATE1['tls_disabled'],
cluster_template.tls_disabled)
self.assertEqual(CLUSTERTEMPLATE1['public'],
cluster_template.public)
self.assertEqual(CLUSTERTEMPLATE1['registry_enabled'],
cluster_template.registry_enabled)
self.assertEqual(CLUSTERTEMPLATE1['master_lb_enabled'],
cluster_template.master_lb_enabled)
self.assertEqual(CLUSTERTEMPLATE1['floating_ip_enabled'],
cluster_template.floating_ip_enabled)
def test_clustertemplate_show_by_name(self):
cluster_template = self.mgr.get(CLUSTERTEMPLATE1['name'])
expect = [
('GET',
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['name'],
{},
None)
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(CLUSTERTEMPLATE1['name'],
cluster_template.name)
self.assertEqual(CLUSTERTEMPLATE1['image_id'],
cluster_template.image_id)
self.assertEqual(CLUSTERTEMPLATE1['docker_volume_size'],
cluster_template.docker_volume_size)
self.assertEqual(CLUSTERTEMPLATE1['docker_storage_driver'],
cluster_template.docker_storage_driver)
self.assertEqual(CLUSTERTEMPLATE1['fixed_network'],
cluster_template.fixed_network)
self.assertEqual(CLUSTERTEMPLATE1['fixed_subnet'],
cluster_template.fixed_subnet)
self.assertEqual(CLUSTERTEMPLATE1['coe'],
cluster_template.coe)
self.assertEqual(CLUSTERTEMPLATE1['http_proxy'],
cluster_template.http_proxy)
self.assertEqual(CLUSTERTEMPLATE1['https_proxy'],
cluster_template.https_proxy)
self.assertEqual(CLUSTERTEMPLATE1['no_proxy'],
cluster_template.no_proxy)
self.assertEqual(CLUSTERTEMPLATE1['network_driver'],
cluster_template.network_driver)
self.assertEqual(CLUSTERTEMPLATE1['volume_driver'],
cluster_template.volume_driver)
self.assertEqual(CLUSTERTEMPLATE1['labels'],
cluster_template.labels)
self.assertEqual(CLUSTERTEMPLATE1['tls_disabled'],
cluster_template.tls_disabled)
self.assertEqual(CLUSTERTEMPLATE1['public'],
cluster_template.public)
self.assertEqual(CLUSTERTEMPLATE1['registry_enabled'],
cluster_template.registry_enabled)
self.assertEqual(CLUSTERTEMPLATE1['master_lb_enabled'],
cluster_template.master_lb_enabled)
self.assertEqual(CLUSTERTEMPLATE1['floating_ip_enabled'],
cluster_template.floating_ip_enabled)
def test_clustertemplate_create(self):
cluster_template = self.mgr.create(**CREATE_CLUSTERTEMPLATE)
expect = [
('POST', '/v1/clustertemplates', {}, CREATE_CLUSTERTEMPLATE),
]
self.assertEqual(expect, self.api.calls)
self.assertTrue(cluster_template)
self.assertEqual(CLUSTERTEMPLATE1['docker_volume_size'],
cluster_template.docker_volume_size)
self.assertEqual(CLUSTERTEMPLATE1['docker_storage_driver'],
cluster_template.docker_storage_driver)
def test_clustertemplate_create_with_keypair(self):
cluster_template_with_keypair = dict()
cluster_template_with_keypair.update(CREATE_CLUSTERTEMPLATE)
cluster_template_with_keypair['keypair_id'] = 'test_key'
cluster_template = self.mgr.create(**cluster_template_with_keypair)
expect = [
('POST', '/v1/clustertemplates', {},
cluster_template_with_keypair),
]
self.assertEqual(expect, self.api.calls)
self.assertTrue(cluster_template)
self.assertEqual(CLUSTERTEMPLATE1['docker_volume_size'],
cluster_template.docker_volume_size)
self.assertEqual(CLUSTERTEMPLATE1['docker_storage_driver'],
cluster_template.docker_storage_driver)
def test_clustertemplate_create_fail(self):
CREATE_CLUSTERTEMPLATE_FAIL = copy.deepcopy(CREATE_CLUSTERTEMPLATE)
CREATE_CLUSTERTEMPLATE_FAIL["wrong_key"] = "wrong"
self.assertRaisesRegexp(
exceptions.InvalidAttribute,
("Key must be in %s" %
','.join(cluster_templates.CREATION_ATTRIBUTES)),
self.mgr.create, **CREATE_CLUSTERTEMPLATE_FAIL)
self.assertEqual([], self.api.calls)
def test_clustertemplate_delete_by_id(self):
cluster_template = self.mgr.delete(CLUSTERTEMPLATE1['id'])
expect = [
('DELETE',
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['id'],
{},
None),
]
self.assertEqual(expect, self.api.calls)
self.assertIsNone(cluster_template)
def test_clustertemplate_delete_by_name(self):
cluster_template = self.mgr.delete(CLUSTERTEMPLATE1['name'])
expect = [
('DELETE',
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['name'],
{},
None),
]
self.assertEqual(expect, self.api.calls)
self.assertIsNone(cluster_template)
def test_clustertemplate_update(self):
patch = {'op': 'replace',
'value': NEW_NAME,
'path': '/name'}
cluster_template = self.mgr.update(id=CLUSTERTEMPLATE1['id'],
patch=patch)
expect = [
('PATCH',
'/v1/clustertemplates/%s' % CLUSTERTEMPLATE1['id'],
{},
patch),
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(NEW_NAME, cluster_template.name)

File diff suppressed because it is too large Load Diff