Magnum cluster-config/bay-config compatible with py3

Fix issue related to bytes (py3) and str (py2) while running magnum
cluster-config/bay-config command.

Also increate code coverage for cluster_shell and bay_shell to 92%.
Overall code coverage now is 75%

Closes-Bug: #1649192
Partially implements: blueprint magnumclient-ut-coverage

Change-Id: I99610eea1e1e24f187baf1bd03c7e4e5a337bc7d
This commit is contained in:
Hieu LE 2016-12-12 14:49:34 +07:00
parent b3d5948c77
commit d6a0d5d268
4 changed files with 142 additions and 4 deletions

View File

@ -34,6 +34,11 @@ class FakeBay(Bay):
self.bay_create_timeout = kwargs.get('bay_create_timeout', 60)
class FakeCert(object):
def __init__(self, pem):
self.pem = pem
class ShellTest(shell_test_base.TestCommandLineArgument):
def _get_expected_args_list(self, marker=None, limit=None,
@ -328,3 +333,65 @@ class ShellTest(shell_test_base.TestCommandLineArgument):
self._test_arg_failure('bay-config xxx yyy',
self._unrecognized_arg_error)
mock_bay.assert_not_called()
@mock.patch('os.path.exists')
@mock.patch('magnumclient.v1.certificates.CertificateManager.create')
@mock.patch('magnumclient.v1.certificates.CertificateManager.get')
@mock.patch('magnumclient.v1.baymodels.BayModelManager.get')
@mock.patch('magnumclient.v1.bays.BayManager.get')
def _test_bay_config_success(self, mock_bay, mock_bm, mock_cert_get,
mock_cert_create, mock_exists, coe, shell,
tls_disable):
cert = FakeCert(pem='foo bar')
mock_exists.return_value = False
mock_bay.return_value = FakeBay(status='CREATE_COMPLETE',
info={'name': 'Kluster',
'api_address': '127.0.0.1'},
baymodel_id='fake_bm',
uuid='fake_bay')
mock_cert_get.return_value = cert
mock_cert_create.return_value = cert
mock_bm.return_value = test_baymodels_shell. \
FakeBayModel(coe=coe, name='fake_bm', tls_disabled=tls_disable)
with mock.patch.dict('os.environ', {'SHELL': shell}):
self._test_arg_success('bay-config test_bay')
self.assertTrue(mock_exists.called)
mock_bay.assert_called_once_with('test_bay')
mock_bm.assert_called_once_with('fake_bm')
if not tls_disable:
mock_cert_create.assert_called_once_with(cluster_uuid='fake_bay',
csr=mock.ANY)
mock_cert_get.assert_called_once_with(cluster_uuid='fake_bay')
def test_bay_config_swarm_success_with_tls_csh(self):
self._test_bay_config_success(coe='swarm', shell='csh',
tls_disable=False)
def test_bay_config_swarm_success_with_tls_non_csh(self):
self._test_bay_config_success(coe='swarm', shell='zsh',
tls_disable=False)
def test_bay_config_swarm_success_without_tls_csh(self):
self._test_bay_config_success(coe='swarm', shell='csh',
tls_disable=True)
def test_bay_config_swarm_success_without_tls_non_csh(self):
self._test_bay_config_success(coe='swarm', shell='zsh',
tls_disable=True)
def test_bay_config_k8s_success_with_tls_csh(self):
self._test_bay_config_success(coe='kubernetes', shell='csh',
tls_disable=False)
def test_bay_config_k8s_success_with_tls_non_csh(self):
self._test_bay_config_success(coe='kubernetes', shell='zsh',
tls_disable=False)
def test_bay_config_k8s_success_without_tls_csh(self):
self._test_bay_config_success(coe='kubernetes', shell='csh',
tls_disable=True)
def test_bay_config_k8s_success_without_tls_non_csh(self):
self._test_bay_config_success(coe='kubernetes', shell='zsh',
tls_disable=True)

View File

@ -35,6 +35,11 @@ class FakeCluster(Cluster):
self.create_timeout = kwargs.get('create_timeout', 60)
class FakeCert(object):
def __init__(self, pem):
self.pem = pem
class ShellTest(shell_test_base.TestCommandLineArgument):
def _get_expected_args_list(self, marker=None, limit=None,
@ -381,3 +386,67 @@ class ShellTest(shell_test_base.TestCommandLineArgument):
self._test_arg_failure('cluster-config xxx yyy',
self._unrecognized_arg_error)
mock_cluster.assert_not_called()
@mock.patch('os.path.exists')
@mock.patch('magnumclient.v1.certificates.CertificateManager.create')
@mock.patch('magnumclient.v1.certificates.CertificateManager.get')
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
def _test_cluster_config_success(self, mock_cluster, mock_ct,
mock_cert_get, mock_cert_create,
mock_exists, coe, shell, tls_disable):
cert = FakeCert(pem='foo bar')
mock_exists.return_value = False
mock_cluster.return_value = FakeCluster(status='CREATE_COMPLETE',
info={
'name': 'Kluster',
'api_address': '10.0.0.1'},
cluster_template_id='fake_ct',
uuid='fake_cluster')
mock_cert_get.return_value = cert
mock_cert_create.return_value = cert
mock_ct.return_value = test_clustertemplates_shell.\
FakeClusterTemplate(coe=coe, name='fake_ct',
tls_disabled=tls_disable)
with mock.patch.dict('os.environ', {'SHELL': shell}):
self._test_arg_success('cluster-config test_cluster')
self.assertTrue(mock_exists.called)
mock_cluster.assert_called_once_with('test_cluster')
mock_ct.assert_called_once_with('fake_ct')
if not tls_disable:
mock_cert_create.assert_called_once_with(
cluster_uuid='fake_cluster', csr=mock.ANY)
mock_cert_get.assert_called_once_with(cluster_uuid='fake_cluster')
def test_cluster_config_swarm_success_with_tls_csh(self):
self._test_cluster_config_success(coe='swarm', shell='csh',
tls_disable=False)
def test_cluster_config_swarm_success_with_tls_non_csh(self):
self._test_cluster_config_success(coe='swarm', shell='zsh',
tls_disable=False)
def test_cluster_config_swarm_success_without_tls_csh(self):
self._test_cluster_config_success(coe='swarm', shell='csh',
tls_disable=True)
def test_cluster_config_swarm_success_without_tls_non_csh(self):
self._test_cluster_config_success(coe='swarm', shell='zsh',
tls_disable=True)
def test_cluster_config_k8s_success_with_tls_csh(self):
self._test_cluster_config_success(coe='kubernetes', shell='csh',
tls_disable=False)
def test_cluster_config_k8s_success_with_tls_non_csh(self):
self._test_cluster_config_success(coe='kubernetes', shell='zsh',
tls_disable=False)
def test_cluster_config_k8s_success_without_tls_csh(self):
self._test_cluster_config_success(coe='kubernetes', shell='csh',
tls_disable=True)
def test_cluster_config_k8s_success_without_tls_non_csh(self):
self._test_cluster_config_success(coe='kubernetes', shell='zsh',
tls_disable=True)

View File

@ -346,11 +346,12 @@ def _generate_csr_and_key():
])).sign(key, hashes.SHA256(), default_backend())
result = {
'csr': csr.public_bytes(encoding=serialization.Encoding.PEM),
'csr': csr.public_bytes(
encoding=serialization.Encoding.PEM).decode("utf-8"),
'key': key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()),
encryption_algorithm=serialization.NoEncryption()).decode("utf-8"),
}
return result

View File

@ -350,11 +350,12 @@ def _generate_csr_and_key():
])).sign(key, hashes.SHA256(), default_backend())
result = {
'csr': csr.public_bytes(encoding=serialization.Encoding.PEM),
'csr': csr.public_bytes(
encoding=serialization.Encoding.PEM).decode("utf-8"),
'key': key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()),
encryption_algorithm=serialization.NoEncryption()).decode("utf-8"),
}
return result