290 lines
11 KiB
Python
290 lines
11 KiB
Python
# Copyright 2011 Justin Santa Barbara
|
|
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import errno
|
|
import os
|
|
import os.path
|
|
import shutil
|
|
import tempfile
|
|
|
|
import mock
|
|
from oslo_concurrency import processutils
|
|
from oslo_utils import netutils
|
|
|
|
from magnum.common import exception
|
|
from magnum.common import utils
|
|
import magnum.conf
|
|
from magnum.tests import base
|
|
|
|
CONF = magnum.conf.CONF
|
|
|
|
|
|
class UtilsTestCase(base.TestCase):
|
|
|
|
def test_get_k8s_quantity(self):
|
|
self.assertEqual(1024000.0, utils.get_k8s_quantity('1000Ki'))
|
|
self.assertEqual(0.001, utils.get_k8s_quantity('1E-3'))
|
|
self.assertEqual(0.5, utils.get_k8s_quantity('0.0005k'))
|
|
self.assertEqual(0.5, utils.get_k8s_quantity('500m'))
|
|
self.assertEqual(1300000.0, utils.get_k8s_quantity('1.3E+6'))
|
|
self.assertEqual(1300000.0, utils.get_k8s_quantity('1.3E6'))
|
|
self.assertRaises(exception.UnsupportedK8sQuantityFormat,
|
|
utils.get_k8s_quantity, '1E1E')
|
|
|
|
def test_get_docker_quantity(self):
|
|
self.assertEqual(512, utils.get_docker_quantity('512'))
|
|
self.assertEqual(512, utils.get_docker_quantity('512b'))
|
|
self.assertEqual(512 * 1024, utils.get_docker_quantity('512k'))
|
|
self.assertEqual(512 * 1024 * 1024, utils.get_docker_quantity('512m'))
|
|
self.assertEqual(512 * 1024 * 1024 * 1024,
|
|
utils.get_docker_quantity('512g'))
|
|
self.assertRaises(exception.UnsupportedDockerQuantityFormat,
|
|
utils.get_docker_quantity, '512bb')
|
|
self.assertRaises(exception.UnsupportedDockerQuantityFormat,
|
|
utils.get_docker_quantity, '512B')
|
|
|
|
def test_get_openstasck_ca(self):
|
|
# openstack_ca_file is empty
|
|
self.assertEqual('', utils.get_openstack_ca())
|
|
|
|
# openstack_ca_file is set but the file doesn't exist
|
|
CONF.set_override('openstack_ca_file',
|
|
'/tmp/invalid-ca.pem',
|
|
group='drivers')
|
|
self.assertRaises(IOError, utils.get_openstack_ca)
|
|
|
|
# openstack_ca_file is set and the file exists
|
|
CONF.set_override('openstack_ca_file',
|
|
'/tmp/invalid-ca.pem',
|
|
group='drivers')
|
|
with mock.patch('magnum.common.utils.open',
|
|
mock.mock_open(read_data="CERT"), create=True):
|
|
self.assertEqual('CERT', utils.get_openstack_ca())
|
|
|
|
|
|
class ExecuteTestCase(base.TestCase):
|
|
|
|
def test_retry_on_failure(self):
|
|
fd, tmpfilename = tempfile.mkstemp()
|
|
_, tmpfilename2 = tempfile.mkstemp()
|
|
try:
|
|
fp = os.fdopen(fd, 'w+')
|
|
fp.write('''#!/bin/sh
|
|
# If stdin fails to get passed during one of the runs, make a note.
|
|
if ! grep -q foo
|
|
then
|
|
echo 'failure' > "$1"
|
|
fi
|
|
# If stdin has failed to get passed during this or a previous run, exit early.
|
|
if grep failure "$1"
|
|
then
|
|
exit 1
|
|
fi
|
|
runs="$(cat $1)"
|
|
if [ -z "$runs" ]
|
|
then
|
|
runs=0
|
|
fi
|
|
runs=$(($runs + 1))
|
|
echo $runs > "$1"
|
|
exit 1
|
|
''')
|
|
fp.close()
|
|
os.chmod(tmpfilename, 0o755)
|
|
try:
|
|
self.assertRaises(processutils.ProcessExecutionError,
|
|
utils.execute,
|
|
tmpfilename, tmpfilename2, attempts=10,
|
|
process_input=b'foo',
|
|
delay_on_retry=False)
|
|
except OSError as e:
|
|
if e.errno == errno.EACCES:
|
|
self.skipTest("Permissions error detected. "
|
|
"Are you running with a noexec /tmp?")
|
|
else:
|
|
raise
|
|
with open(tmpfilename2, 'r') as fp:
|
|
runs = fp.read()
|
|
self.assertNotEqual(runs.strip(), 'failure', 'stdin did not '
|
|
'always get passed '
|
|
'correctly')
|
|
runs = int(runs.strip())
|
|
self.assertEqual(10, runs,
|
|
'Ran %d times instead of 10.' % runs)
|
|
finally:
|
|
os.unlink(tmpfilename)
|
|
os.unlink(tmpfilename2)
|
|
|
|
def test_unknown_kwargs_raises_error(self):
|
|
self.assertRaises(processutils.UnknownArgumentError,
|
|
utils.execute,
|
|
'/usr/bin/env', 'true',
|
|
this_is_not_a_valid_kwarg=True)
|
|
|
|
def test_check_exit_code_boolean(self):
|
|
utils.execute('/usr/bin/env', 'false', check_exit_code=False)
|
|
self.assertRaises(processutils.ProcessExecutionError,
|
|
utils.execute,
|
|
'/usr/bin/env', 'false', check_exit_code=True)
|
|
|
|
def test_no_retry_on_success(self):
|
|
fd, tmpfilename = tempfile.mkstemp()
|
|
_, tmpfilename2 = tempfile.mkstemp()
|
|
try:
|
|
fp = os.fdopen(fd, 'w+')
|
|
fp.write('''#!/bin/sh
|
|
# If we've already run, bail out.
|
|
grep -q foo "$1" && exit 1
|
|
# Mark that we've run before.
|
|
echo foo > "$1"
|
|
# Check that stdin gets passed correctly.
|
|
grep foo
|
|
''')
|
|
fp.close()
|
|
os.chmod(tmpfilename, 0o755)
|
|
try:
|
|
utils.execute(tmpfilename,
|
|
tmpfilename2,
|
|
process_input=b'foo',
|
|
attempts=2)
|
|
except OSError as e:
|
|
if e.errno == errno.EACCES:
|
|
self.skipTest("Permissions error detected. "
|
|
"Are you running with a noexec /tmp?")
|
|
else:
|
|
raise
|
|
finally:
|
|
os.unlink(tmpfilename)
|
|
os.unlink(tmpfilename2)
|
|
|
|
@mock.patch.object(processutils, 'execute')
|
|
@mock.patch.object(os.environ, 'copy', return_value={})
|
|
def test_execute_use_standard_locale_no_env_variables(self, env_mock,
|
|
execute_mock):
|
|
utils.execute('foo', use_standard_locale=True)
|
|
execute_mock.assert_called_once_with('foo',
|
|
env_variables={'LC_ALL': 'C'})
|
|
|
|
@mock.patch.object(processutils, 'execute')
|
|
def test_execute_use_standard_locale_with_env_variables(self,
|
|
execute_mock):
|
|
utils.execute('foo', use_standard_locale=True,
|
|
env_variables={'foo': 'bar'})
|
|
execute_mock.assert_called_once_with('foo',
|
|
env_variables={'LC_ALL': 'C',
|
|
'foo': 'bar'})
|
|
|
|
@mock.patch.object(processutils, 'execute')
|
|
def test_execute_not_use_standard_locale(self, execute_mock):
|
|
utils.execute('foo', use_standard_locale=False,
|
|
env_variables={'foo': 'bar'})
|
|
execute_mock.assert_called_once_with('foo',
|
|
env_variables={'foo': 'bar'})
|
|
|
|
def test_execute_get_root_helper(self):
|
|
with mock.patch.object(processutils, 'execute') as execute_mock:
|
|
helper = utils._get_root_helper()
|
|
utils.execute('foo', run_as_root=True)
|
|
execute_mock.assert_called_once_with('foo', run_as_root=True,
|
|
root_helper=helper)
|
|
|
|
def test_execute_without_root_helper(self):
|
|
with mock.patch.object(processutils, 'execute') as execute_mock:
|
|
utils.execute('foo', run_as_root=False)
|
|
execute_mock.assert_called_once_with('foo', run_as_root=False)
|
|
|
|
def test_validate_and_normalize_mac(self):
|
|
mac = 'AA:BB:CC:DD:EE:FF'
|
|
with mock.patch.object(netutils, 'is_valid_mac') as m_mock:
|
|
m_mock.return_value = True
|
|
self.assertEqual(mac.lower(),
|
|
utils.validate_and_normalize_mac(mac))
|
|
|
|
def test_validate_and_normalize_mac_invalid_format(self):
|
|
with mock.patch.object(netutils, 'is_valid_mac') as m_mock:
|
|
m_mock.return_value = False
|
|
self.assertRaises(exception.InvalidMAC,
|
|
utils.validate_and_normalize_mac, 'invalid-mac')
|
|
|
|
def test_safe_rstrip(self):
|
|
value = '/test/'
|
|
rstripped_value = '/test'
|
|
not_rstripped = '/'
|
|
|
|
self.assertEqual(rstripped_value, utils.safe_rstrip(value, '/'))
|
|
self.assertEqual(not_rstripped, utils.safe_rstrip(not_rstripped, '/'))
|
|
|
|
def test_safe_rstrip_not_raises_exceptions(self):
|
|
# Supplying an integer should normally raise an exception because it
|
|
# does not save the rstrip() method.
|
|
value = 10
|
|
|
|
# In the case of raising an exception safe_rstrip() should return the
|
|
# original value.
|
|
self.assertEqual(value, utils.safe_rstrip(value))
|
|
|
|
|
|
class TempFilesTestCase(base.TestCase):
|
|
|
|
def test_tempdir(self):
|
|
|
|
dirname = None
|
|
with utils.tempdir() as tempdir:
|
|
self.assertTrue(os.path.isdir(tempdir))
|
|
dirname = tempdir
|
|
self.assertFalse(os.path.exists(dirname))
|
|
|
|
@mock.patch.object(shutil, 'rmtree')
|
|
@mock.patch.object(tempfile, 'mkdtemp')
|
|
def test_tempdir_mocked(self, mkdtemp_mock, rmtree_mock):
|
|
|
|
self.config(tempdir='abc')
|
|
mkdtemp_mock.return_value = 'temp-dir'
|
|
kwargs = {'a': 'b'}
|
|
|
|
with utils.tempdir(**kwargs) as tempdir:
|
|
self.assertEqual('temp-dir', tempdir)
|
|
tempdir_created = tempdir
|
|
|
|
mkdtemp_mock.assert_called_once_with(**kwargs)
|
|
rmtree_mock.assert_called_once_with(tempdir_created)
|
|
|
|
@mock.patch.object(utils, 'LOG')
|
|
@mock.patch.object(shutil, 'rmtree')
|
|
@mock.patch.object(tempfile, 'mkdtemp')
|
|
def test_tempdir_mocked_error_on_rmtree(self, mkdtemp_mock, rmtree_mock,
|
|
log_mock):
|
|
|
|
self.config(tempdir='abc')
|
|
mkdtemp_mock.return_value = 'temp-dir'
|
|
rmtree_mock.side_effect = OSError
|
|
|
|
with utils.tempdir() as tempdir:
|
|
self.assertEqual('temp-dir', tempdir)
|
|
tempdir_created = tempdir
|
|
|
|
rmtree_mock.assert_called_once_with(tempdir_created)
|
|
self.assertTrue(log_mock.error.called)
|
|
|
|
|
|
class GeneratePasswordTestCase(base.TestCase):
|
|
def test_generate_password(self):
|
|
password = utils.generate_password(length=12)
|
|
self.assertTrue([c for c in password if c in '0123456789'])
|
|
self.assertTrue([c for c in password
|
|
if c in 'abcdefghijklmnopqrstuvwxyz'])
|
|
self.assertTrue([c for c in password
|
|
if c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'])
|