Magnum cluster-config/bay-config compatible with py3
Fix issue related to bytes (py3) and str (py2) while running magnum cluster-config/bay-config command. Also increate code coverage for cluster_shell and bay_shell to 92%. Overall code coverage now is 75% Closes-Bug: #1649192 Partially implements: blueprint magnumclient-ut-coverage Change-Id: I99610eea1e1e24f187baf1bd03c7e4e5a337bc7d
This commit is contained in:
parent
b3d5948c77
commit
d6a0d5d268
|
@ -34,6 +34,11 @@ class FakeBay(Bay):
|
|||
self.bay_create_timeout = kwargs.get('bay_create_timeout', 60)
|
||||
|
||||
|
||||
class FakeCert(object):
|
||||
def __init__(self, pem):
|
||||
self.pem = pem
|
||||
|
||||
|
||||
class ShellTest(shell_test_base.TestCommandLineArgument):
|
||||
|
||||
def _get_expected_args_list(self, marker=None, limit=None,
|
||||
|
@ -328,3 +333,65 @@ class ShellTest(shell_test_base.TestCommandLineArgument):
|
|||
self._test_arg_failure('bay-config xxx yyy',
|
||||
self._unrecognized_arg_error)
|
||||
mock_bay.assert_not_called()
|
||||
|
||||
@mock.patch('os.path.exists')
|
||||
@mock.patch('magnumclient.v1.certificates.CertificateManager.create')
|
||||
@mock.patch('magnumclient.v1.certificates.CertificateManager.get')
|
||||
@mock.patch('magnumclient.v1.baymodels.BayModelManager.get')
|
||||
@mock.patch('magnumclient.v1.bays.BayManager.get')
|
||||
def _test_bay_config_success(self, mock_bay, mock_bm, mock_cert_get,
|
||||
mock_cert_create, mock_exists, coe, shell,
|
||||
tls_disable):
|
||||
cert = FakeCert(pem='foo bar')
|
||||
mock_exists.return_value = False
|
||||
mock_bay.return_value = FakeBay(status='CREATE_COMPLETE',
|
||||
info={'name': 'Kluster',
|
||||
'api_address': '127.0.0.1'},
|
||||
baymodel_id='fake_bm',
|
||||
uuid='fake_bay')
|
||||
mock_cert_get.return_value = cert
|
||||
mock_cert_create.return_value = cert
|
||||
mock_bm.return_value = test_baymodels_shell. \
|
||||
FakeBayModel(coe=coe, name='fake_bm', tls_disabled=tls_disable)
|
||||
with mock.patch.dict('os.environ', {'SHELL': shell}):
|
||||
self._test_arg_success('bay-config test_bay')
|
||||
|
||||
self.assertTrue(mock_exists.called)
|
||||
mock_bay.assert_called_once_with('test_bay')
|
||||
mock_bm.assert_called_once_with('fake_bm')
|
||||
if not tls_disable:
|
||||
mock_cert_create.assert_called_once_with(cluster_uuid='fake_bay',
|
||||
csr=mock.ANY)
|
||||
mock_cert_get.assert_called_once_with(cluster_uuid='fake_bay')
|
||||
|
||||
def test_bay_config_swarm_success_with_tls_csh(self):
|
||||
self._test_bay_config_success(coe='swarm', shell='csh',
|
||||
tls_disable=False)
|
||||
|
||||
def test_bay_config_swarm_success_with_tls_non_csh(self):
|
||||
self._test_bay_config_success(coe='swarm', shell='zsh',
|
||||
tls_disable=False)
|
||||
|
||||
def test_bay_config_swarm_success_without_tls_csh(self):
|
||||
self._test_bay_config_success(coe='swarm', shell='csh',
|
||||
tls_disable=True)
|
||||
|
||||
def test_bay_config_swarm_success_without_tls_non_csh(self):
|
||||
self._test_bay_config_success(coe='swarm', shell='zsh',
|
||||
tls_disable=True)
|
||||
|
||||
def test_bay_config_k8s_success_with_tls_csh(self):
|
||||
self._test_bay_config_success(coe='kubernetes', shell='csh',
|
||||
tls_disable=False)
|
||||
|
||||
def test_bay_config_k8s_success_with_tls_non_csh(self):
|
||||
self._test_bay_config_success(coe='kubernetes', shell='zsh',
|
||||
tls_disable=False)
|
||||
|
||||
def test_bay_config_k8s_success_without_tls_csh(self):
|
||||
self._test_bay_config_success(coe='kubernetes', shell='csh',
|
||||
tls_disable=True)
|
||||
|
||||
def test_bay_config_k8s_success_without_tls_non_csh(self):
|
||||
self._test_bay_config_success(coe='kubernetes', shell='zsh',
|
||||
tls_disable=True)
|
||||
|
|
|
@ -35,6 +35,11 @@ class FakeCluster(Cluster):
|
|||
self.create_timeout = kwargs.get('create_timeout', 60)
|
||||
|
||||
|
||||
class FakeCert(object):
|
||||
def __init__(self, pem):
|
||||
self.pem = pem
|
||||
|
||||
|
||||
class ShellTest(shell_test_base.TestCommandLineArgument):
|
||||
|
||||
def _get_expected_args_list(self, marker=None, limit=None,
|
||||
|
@ -381,3 +386,67 @@ class ShellTest(shell_test_base.TestCommandLineArgument):
|
|||
self._test_arg_failure('cluster-config xxx yyy',
|
||||
self._unrecognized_arg_error)
|
||||
mock_cluster.assert_not_called()
|
||||
|
||||
@mock.patch('os.path.exists')
|
||||
@mock.patch('magnumclient.v1.certificates.CertificateManager.create')
|
||||
@mock.patch('magnumclient.v1.certificates.CertificateManager.get')
|
||||
@mock.patch('magnumclient.v1.cluster_templates.ClusterTemplateManager.get')
|
||||
@mock.patch('magnumclient.v1.clusters.ClusterManager.get')
|
||||
def _test_cluster_config_success(self, mock_cluster, mock_ct,
|
||||
mock_cert_get, mock_cert_create,
|
||||
mock_exists, coe, shell, tls_disable):
|
||||
cert = FakeCert(pem='foo bar')
|
||||
mock_exists.return_value = False
|
||||
mock_cluster.return_value = FakeCluster(status='CREATE_COMPLETE',
|
||||
info={
|
||||
'name': 'Kluster',
|
||||
'api_address': '10.0.0.1'},
|
||||
cluster_template_id='fake_ct',
|
||||
uuid='fake_cluster')
|
||||
mock_cert_get.return_value = cert
|
||||
mock_cert_create.return_value = cert
|
||||
mock_ct.return_value = test_clustertemplates_shell.\
|
||||
FakeClusterTemplate(coe=coe, name='fake_ct',
|
||||
tls_disabled=tls_disable)
|
||||
with mock.patch.dict('os.environ', {'SHELL': shell}):
|
||||
self._test_arg_success('cluster-config test_cluster')
|
||||
|
||||
self.assertTrue(mock_exists.called)
|
||||
mock_cluster.assert_called_once_with('test_cluster')
|
||||
mock_ct.assert_called_once_with('fake_ct')
|
||||
if not tls_disable:
|
||||
mock_cert_create.assert_called_once_with(
|
||||
cluster_uuid='fake_cluster', csr=mock.ANY)
|
||||
mock_cert_get.assert_called_once_with(cluster_uuid='fake_cluster')
|
||||
|
||||
def test_cluster_config_swarm_success_with_tls_csh(self):
|
||||
self._test_cluster_config_success(coe='swarm', shell='csh',
|
||||
tls_disable=False)
|
||||
|
||||
def test_cluster_config_swarm_success_with_tls_non_csh(self):
|
||||
self._test_cluster_config_success(coe='swarm', shell='zsh',
|
||||
tls_disable=False)
|
||||
|
||||
def test_cluster_config_swarm_success_without_tls_csh(self):
|
||||
self._test_cluster_config_success(coe='swarm', shell='csh',
|
||||
tls_disable=True)
|
||||
|
||||
def test_cluster_config_swarm_success_without_tls_non_csh(self):
|
||||
self._test_cluster_config_success(coe='swarm', shell='zsh',
|
||||
tls_disable=True)
|
||||
|
||||
def test_cluster_config_k8s_success_with_tls_csh(self):
|
||||
self._test_cluster_config_success(coe='kubernetes', shell='csh',
|
||||
tls_disable=False)
|
||||
|
||||
def test_cluster_config_k8s_success_with_tls_non_csh(self):
|
||||
self._test_cluster_config_success(coe='kubernetes', shell='zsh',
|
||||
tls_disable=False)
|
||||
|
||||
def test_cluster_config_k8s_success_without_tls_csh(self):
|
||||
self._test_cluster_config_success(coe='kubernetes', shell='csh',
|
||||
tls_disable=True)
|
||||
|
||||
def test_cluster_config_k8s_success_without_tls_non_csh(self):
|
||||
self._test_cluster_config_success(coe='kubernetes', shell='zsh',
|
||||
tls_disable=True)
|
||||
|
|
|
@ -346,11 +346,12 @@ def _generate_csr_and_key():
|
|||
])).sign(key, hashes.SHA256(), default_backend())
|
||||
|
||||
result = {
|
||||
'csr': csr.public_bytes(encoding=serialization.Encoding.PEM),
|
||||
'csr': csr.public_bytes(
|
||||
encoding=serialization.Encoding.PEM).decode("utf-8"),
|
||||
'key': key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption()),
|
||||
encryption_algorithm=serialization.NoEncryption()).decode("utf-8"),
|
||||
}
|
||||
|
||||
return result
|
||||
|
|
|
@ -350,11 +350,12 @@ def _generate_csr_and_key():
|
|||
])).sign(key, hashes.SHA256(), default_backend())
|
||||
|
||||
result = {
|
||||
'csr': csr.public_bytes(encoding=serialization.Encoding.PEM),
|
||||
'csr': csr.public_bytes(
|
||||
encoding=serialization.Encoding.PEM).decode("utf-8"),
|
||||
'key': key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption()),
|
||||
encryption_algorithm=serialization.NoEncryption()).decode("utf-8"),
|
||||
}
|
||||
|
||||
return result
|
||||
|
|
Loading…
Reference in New Issue