Add tests to the layer-openstack
Add unit tests to the adapters, charm and ip parts of the lib to ensure that the API to the classes remains stable with no unplanned regessions over time.
This commit is contained in:
parent
ee3ed44e23
commit
20d3c49893
|
@ -0,0 +1,8 @@
|
|||
[DEFAULT]
|
||||
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
|
||||
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
|
||||
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
|
||||
${PYTHON:-python} -m subunit.run discover -t ./ ./unit_tests $LISTOPT $IDOPTION
|
||||
|
||||
test_id_option=--load-list $IDFILE
|
||||
test_list_option=--list
|
|
@ -0,0 +1,11 @@
|
|||
#!/usr/bin/make
|
||||
PYTHON := /usr/bin/env python
|
||||
|
||||
clean:
|
||||
@rm -rf .testrepository .unit-state.db .tox
|
||||
lint:
|
||||
@tox -e pep8
|
||||
|
||||
test:
|
||||
@echo Starting unit tests...
|
||||
@tox -e py27
|
|
@ -1 +1,7 @@
|
|||
includes: ['layer:basic']
|
||||
ignore:
|
||||
- 'unit_tests'
|
||||
- 'Makefile'
|
||||
- '.testr.conf'
|
||||
- 'test-requirements.txt'
|
||||
- 'tox.ini'
|
||||
|
|
|
@ -23,7 +23,7 @@ class OpenStackRelationAdapter(object):
|
|||
"""
|
||||
Name of the relation this adapter is handling.
|
||||
"""
|
||||
return self._relation.relation_name
|
||||
return self.relation.relation_name
|
||||
|
||||
def _setup_properties(self):
|
||||
"""
|
||||
|
@ -113,12 +113,15 @@ class DatabaseRelationAdapter(OpenStackRelationAdapter):
|
|||
self.host,
|
||||
self.database,
|
||||
)
|
||||
if self.ssl_ca:
|
||||
uri = '{}?ssl_ca={}'.format(uri, self.ssl_ca)
|
||||
if self.ssl_cert:
|
||||
uri = '{}&ssl_cert={}&ssl_key={}'.format(uri,
|
||||
self.ssl_cert,
|
||||
self.ssl_key)
|
||||
try:
|
||||
if self.ssl_ca:
|
||||
uri = '{}?ssl_ca={}'.format(uri, self.ssl_ca)
|
||||
if self.ssl_cert:
|
||||
uri = ('{}&ssl_cert={}&ssl_key={}'
|
||||
.format(uri, self.ssl_cert, self.ssl_key))
|
||||
except AttributeError:
|
||||
# ignore ssl_ca or ssl_cert if not available
|
||||
pass
|
||||
return uri
|
||||
|
||||
@property
|
||||
|
|
|
@ -1,7 +1,12 @@
|
|||
"""Classes to support writing re-usable charms in the reactive framework"""
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import subprocess
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
from collections import OrderedDict
|
||||
|
||||
from charmhelpers.contrib.openstack.utils import (
|
||||
configure_installation_source,
|
||||
)
|
||||
|
@ -12,16 +17,13 @@ from charmhelpers.fetch import (
|
|||
apt_update,
|
||||
filter_installed_packages,
|
||||
)
|
||||
|
||||
from charm.openstack.ip import PUBLIC, INTERNAL, ADMIN, canonical_url
|
||||
from contextlib import contextmanager
|
||||
from collections import OrderedDict
|
||||
from charmhelpers.contrib.openstack.templating import get_loader
|
||||
from charmhelpers.core.templating import render
|
||||
from charmhelpers.core.hookenv import leader_get, leader_set
|
||||
|
||||
from charms.reactive.bus import set_state, remove_state
|
||||
|
||||
from charm.openstack.ip import PUBLIC, INTERNAL, ADMIN, canonical_url
|
||||
|
||||
|
||||
class OpenStackCharm(object):
|
||||
"""
|
||||
|
@ -114,13 +116,13 @@ class OpenStackCharm(object):
|
|||
|
||||
@contextmanager
|
||||
def restart_on_change(self):
|
||||
checksums = {path: path_hash(path) for path in self.restart_map}
|
||||
checksums = {path: path_hash(path) for path in self.restart_map.keys()}
|
||||
yield
|
||||
restarts = []
|
||||
for path in self.restart_map:
|
||||
if path_hash(path) != checksums[path]:
|
||||
restarts += self.restart_map[path]
|
||||
services_list = list(OrderedDict.fromkeys(restarts))
|
||||
services_list = list(OrderedDict.fromkeys(restarts).keys())
|
||||
for service_name in services_list:
|
||||
service_restart(service_name)
|
||||
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
flake8>=2.2.4,<=2.4.1
|
||||
os-testr>=0.4.1
|
||||
charm-tools
|
||||
paramiko<2.0
|
||||
charm-tools>=2.0.0
|
||||
charms.reactive
|
||||
mock>=1.2
|
||||
coverage>=3.6
|
||||
|
|
2
tox.ini
2
tox.ini
|
@ -16,7 +16,7 @@ deps = -r{toxinidir}/test-requirements.txt
|
|||
[testenv:pep8]
|
||||
basepython = python2.7
|
||||
deps = -r{toxinidir}/test-requirements.txt
|
||||
commands = flake8 {posargs} lib
|
||||
commands = flake8 {posargs} lib unit_tests
|
||||
|
||||
[testenv:venv]
|
||||
commands = {posargs}
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
import sys
|
||||
import mock
|
||||
|
||||
|
||||
sys.path.append('./lib')
|
||||
# mock out some charmhelpers libraries as they have apt install side effects
|
||||
sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
|
||||
sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
|
|
@ -0,0 +1,198 @@
|
|||
# Note that the unit_tests/__init__.py has the following lines to stop
|
||||
# side effects from the imorts from charm helpers.
|
||||
|
||||
# sys.path.append('./lib')
|
||||
# mock out some charmhelpers libraries as they have apt install side effects
|
||||
# sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
|
||||
# sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
|
||||
|
||||
import unittest
|
||||
import mock
|
||||
|
||||
import charm.openstack.adapters as adapters
|
||||
|
||||
|
||||
class MyRelation(object):
|
||||
|
||||
auto_accessors = ['this', 'that']
|
||||
relation_name = 'my-name'
|
||||
|
||||
def this(self):
|
||||
return 'this'
|
||||
|
||||
def that(self):
|
||||
return 'that'
|
||||
|
||||
def some(self):
|
||||
return 'thing'
|
||||
|
||||
|
||||
class TestOpenStackRelationAdapter(unittest.TestCase):
|
||||
|
||||
def test_class(self):
|
||||
ad = adapters.OpenStackRelationAdapter(MyRelation(), ['some'])
|
||||
self.assertEqual(ad.this, 'this')
|
||||
self.assertEqual(ad.that, 'that')
|
||||
self.assertEqual(ad.some, 'thing')
|
||||
self.assertEqual(ad.relation_name, 'my-name')
|
||||
with self.assertRaises(AttributeError):
|
||||
ad.relation_name = 'hello'
|
||||
|
||||
|
||||
class FakeRabbitMQRelation():
|
||||
|
||||
auto_accessors = ['vip', 'private_address']
|
||||
relation_name = 'amqp'
|
||||
|
||||
def __init__(self, vip=None):
|
||||
self._vip = vip
|
||||
|
||||
def vip(self):
|
||||
return self._vip
|
||||
|
||||
def private_address(self):
|
||||
return 'private-address'
|
||||
|
||||
def rabbitmq_hosts(self):
|
||||
return ['host1', 'host2']
|
||||
|
||||
def vhost(self):
|
||||
return 'vhost'
|
||||
|
||||
def username(self):
|
||||
return 'fakename'
|
||||
|
||||
|
||||
class TestRabbitMQRelationAdapter(unittest.TestCase):
|
||||
|
||||
def test_class(self):
|
||||
fake = FakeRabbitMQRelation(None)
|
||||
mq = adapters.RabbitMQRelationAdapter(fake)
|
||||
self.assertEqual(mq.vhost, 'vhost')
|
||||
self.assertEqual(mq.username, 'fakename')
|
||||
self.assertEqual(mq.host, 'private-address')
|
||||
# TODO: can't do the following 2 lines as not dynamic accessors
|
||||
# fake._vip = 'vip1'
|
||||
# self.assertEqual(mq.host, 'vip1')
|
||||
self.assertEqual(mq.hosts, 'host1,host2')
|
||||
|
||||
|
||||
class FakeDatabaseRelation():
|
||||
|
||||
auto_accessors = []
|
||||
relation_name = 'shared_db'
|
||||
|
||||
def db_host(self):
|
||||
return 'host1'
|
||||
|
||||
def username(self, prefix=''):
|
||||
return 'username1{}'.format(prefix)
|
||||
|
||||
def password(self, prefix=''):
|
||||
return 'password1{}'.format(prefix)
|
||||
|
||||
def database(self, prefix=''):
|
||||
return 'database1{}'.format(prefix)
|
||||
|
||||
|
||||
class SSLDatabaseRelationAdapter(adapters.DatabaseRelationAdapter):
|
||||
|
||||
ssl_ca = 'my-ca'
|
||||
ssl_cert = 'my-cert'
|
||||
ssl_key = 'my-key'
|
||||
|
||||
|
||||
class TestDatabaseRelationAdapter(unittest.TestCase):
|
||||
|
||||
def test_class(self):
|
||||
fake = FakeDatabaseRelation()
|
||||
db = adapters.DatabaseRelationAdapter(fake)
|
||||
self.assertEqual(db.host, 'host1')
|
||||
self.assertEqual(db.type, 'mysql')
|
||||
self.assertEqual(db.password, 'password1')
|
||||
self.assertEqual(db.username, 'username1')
|
||||
self.assertEqual(db.database, 'database1')
|
||||
self.assertEqual(db.uri, 'mysql://username1:password1@host1/database1')
|
||||
self.assertEqual(db.get_uri('x'),
|
||||
'mysql://username1x:password1x@host1/database1x')
|
||||
# test the ssl feature of the base class
|
||||
db = SSLDatabaseRelationAdapter(fake)
|
||||
self.assertEqual(db.uri,
|
||||
'mysql://username1:password1@host1/database1'
|
||||
'?ssl_ca=my-ca'
|
||||
'&ssl_cert=my-cert&ssl_key=my-key')
|
||||
|
||||
|
||||
class TestConfigurationAdapter(unittest.TestCase):
|
||||
|
||||
def test_class(self):
|
||||
test_config = {
|
||||
'one': 1,
|
||||
'two': 2,
|
||||
'three': 3,
|
||||
'that-one': 4
|
||||
}
|
||||
with mock.patch.object(adapters.hookenv, 'config',
|
||||
new=lambda: test_config):
|
||||
c = adapters.ConfigurationAdapter()
|
||||
self.assertEqual(c.one, 1)
|
||||
self.assertEqual(c.three, 3)
|
||||
self.assertEqual(c.that_one, 4)
|
||||
|
||||
|
||||
class TestOpenStackRelationAdapters(unittest.TestCase):
|
||||
# test the OpenStackRelationAdapters() class, and then derive from it to
|
||||
# test the additonal relation_adapters member on __init__
|
||||
|
||||
def test_class(self):
|
||||
test_config = {
|
||||
'one': 1,
|
||||
'two': 2,
|
||||
'three': 3,
|
||||
'that-one': 4
|
||||
}
|
||||
with mock.patch.object(adapters.hookenv, 'config',
|
||||
new=lambda: test_config):
|
||||
amqp = FakeRabbitMQRelation()
|
||||
shared_db = FakeDatabaseRelation()
|
||||
mine = MyRelation()
|
||||
a = adapters.OpenStackRelationAdapters([amqp, shared_db, mine])
|
||||
self.assertEqual(a.amqp.private_address, 'private-address')
|
||||
self.assertEqual(a.my_name.this, 'this')
|
||||
items = list(a)
|
||||
self.assertEqual(items[0][0], 'amqp')
|
||||
self.assertEqual(items[1][0], 'shared_db')
|
||||
self.assertEqual(items[2][0], 'my_name')
|
||||
self.assertEqual(items[3][0], 'options')
|
||||
|
||||
|
||||
class MyRelationAdapter(adapters.OpenStackRelationAdapter):
|
||||
|
||||
@property
|
||||
def us(self):
|
||||
return self.this + '-us'
|
||||
|
||||
|
||||
class MyOpenStackRelationAdapters(adapters.OpenStackRelationAdapters):
|
||||
|
||||
relation_adapters = {
|
||||
'my_name': MyRelationAdapter,
|
||||
}
|
||||
|
||||
|
||||
class TestCustomOpenStackRelationAdapters(unittest.TestCase):
|
||||
|
||||
def test_class(self):
|
||||
test_config = {
|
||||
'one': 1,
|
||||
'two': 2,
|
||||
'three': 3,
|
||||
'that-one': 4
|
||||
}
|
||||
with mock.patch.object(adapters.hookenv, 'config',
|
||||
new=lambda: test_config):
|
||||
amqp = FakeRabbitMQRelation()
|
||||
shared_db = FakeDatabaseRelation()
|
||||
mine = MyRelation()
|
||||
a = MyOpenStackRelationAdapters([amqp, shared_db, mine])
|
||||
self.assertEqual(a.my_name.us, 'this-us')
|
|
@ -0,0 +1,285 @@
|
|||
# Note that the unit_tests/__init__.py has the following lines to stop
|
||||
# side effects from the imorts from charm helpers.
|
||||
|
||||
# sys.path.append('./lib')
|
||||
# mock out some charmhelpers libraries as they have apt install side effects
|
||||
# sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
|
||||
# sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
|
||||
|
||||
import mock
|
||||
|
||||
import utils
|
||||
|
||||
import charm.openstack.charm as chm
|
||||
|
||||
TEST_CONFIG = {'config': True}
|
||||
|
||||
|
||||
class BaseOpenStackCharmTest(utils.BaseTestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.patched_config = mock.patch.object(chm, 'config')
|
||||
cls.patched_config_started = cls.patched_config.start()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.patched_config.stop()
|
||||
cls.patched_config_started = None
|
||||
cls.patched_config = None
|
||||
|
||||
def setUp(self, target_cls, test_config):
|
||||
super(BaseOpenStackCharmTest, self).setUp()
|
||||
# set up the return value on the mock before instantiating the class to
|
||||
# get the config into the class.config.
|
||||
chm.config.return_value = test_config
|
||||
self.target = target_cls()
|
||||
|
||||
def tearDown(self):
|
||||
self.target = None
|
||||
super(BaseOpenStackCharmTest, self).tearDown()
|
||||
|
||||
def patch_target(self, attr, return_value=None, name=None, new=None):
|
||||
# uses BaseTestCase.patch_object() to patch targer.
|
||||
self.patch_object(self.target, attr, return_value, name, new)
|
||||
|
||||
|
||||
class TestOpenStackCharm(BaseOpenStackCharmTest):
|
||||
# Note that this only tests the OpenStackCharm() class, which has not very
|
||||
# useful defaults for testing. In order to test all the code without too
|
||||
# many mocks, a separate test dervied charm class is used below.
|
||||
|
||||
def setUp(self):
|
||||
super(TestOpenStackCharm, self).setUp(chm.OpenStackCharm, TEST_CONFIG)
|
||||
|
||||
def test__init__(self):
|
||||
# Note cls.setUpClass() creates an OpenStackCharm() instance
|
||||
self.assertEqual(chm.config(), TEST_CONFIG)
|
||||
self.assertEqual(self.target.config, TEST_CONFIG)
|
||||
self.assertEqual(self.target.release, 'liberty')
|
||||
|
||||
def test_install(self):
|
||||
# only tests that the default set_state is called
|
||||
self.patch_target('set_state')
|
||||
self.patch_object(chm, 'filter_installed_packages',
|
||||
name='fip',
|
||||
return_value=None)
|
||||
self.target.install()
|
||||
self.target.set_state.assert_called_once_with('charmname-installed')
|
||||
self.fip.assert_called_once_with([])
|
||||
|
||||
def test_set_state(self):
|
||||
# tests that OpenStackCharm.set_state() calls set_state() global
|
||||
self.patch_object(chm, 'set_state')
|
||||
self.target.set_state('hello')
|
||||
self.set_state.assert_called_once_with('hello', None)
|
||||
self.set_state.reset_mock()
|
||||
self.target.set_state('hello', 'there')
|
||||
self.set_state.assert_called_once_with('hello', 'there')
|
||||
|
||||
def test_remove_state(self):
|
||||
# tests that OpenStackCharm.remove_state() calls remove_state() global
|
||||
self.patch_object(chm, 'remove_state')
|
||||
self.target.remove_state('hello')
|
||||
self.remove_state.assert_called_once_with('hello')
|
||||
|
||||
def test_configure_source(self):
|
||||
self.patch_object(chm, 'configure_installation_source', name='cis')
|
||||
self.patch_object(chm, 'apt_update')
|
||||
self.patch_target('config', new={'openstack-origin': 'an-origin'})
|
||||
self.target.configure_source()
|
||||
self.cis.assert_called_once_with('an-origin')
|
||||
self.apt_update.assert_called_once_with(fatal=True)
|
||||
|
||||
def test_region(self):
|
||||
self.patch_target('config', new={'region': 'a-region'})
|
||||
self.assertEqual(self.target.region, 'a-region')
|
||||
|
||||
def test_restart_on_change(self):
|
||||
from collections import OrderedDict
|
||||
hashs = OrderedDict([
|
||||
('path1', 100),
|
||||
('path2', 200),
|
||||
('path3', 300),
|
||||
('path4', 400),
|
||||
])
|
||||
self.target.restart_map = {
|
||||
'path1': ['s1'],
|
||||
'path2': ['s2'],
|
||||
'path3': ['s3'],
|
||||
'path4': ['s2', 's4'],
|
||||
}
|
||||
self.patch_object(chm, 'path_hash')
|
||||
self.path_hash.side_effect = lambda x: hashs[x]
|
||||
self.patch_object(chm, 'service_restart')
|
||||
# slightly awkard, in that we need to test a context manager
|
||||
with self.target.restart_on_change():
|
||||
# test with no restarts
|
||||
pass
|
||||
self.assertEqual(self.service_restart.call_count, 0)
|
||||
|
||||
with self.target.restart_on_change():
|
||||
# test with path1 and path3 restarts
|
||||
for k in ['path1', 'path3']:
|
||||
hashs[k] += 1
|
||||
self.assertEqual(self.service_restart.call_count, 2)
|
||||
self.service_restart.assert_any_call('s1')
|
||||
self.service_restart.assert_any_call('s3')
|
||||
|
||||
# test with path2 and path4 and that s2 only gets restarted once
|
||||
self.service_restart.reset_mock()
|
||||
with self.target.restart_on_change():
|
||||
for k in ['path2', 'path4']:
|
||||
hashs[k] += 1
|
||||
self.assertEqual(self.service_restart.call_count, 2)
|
||||
calls = [mock.call('s2'), mock.call('s4')]
|
||||
self.service_restart.assert_has_calls(calls)
|
||||
|
||||
def test_restart_all(self):
|
||||
self.patch_object(chm, 'service_restart')
|
||||
self.patch_target('services', new=['s1', 's2'])
|
||||
self.target.restart_all()
|
||||
self.assertEqual(self.service_restart.call_args_list,
|
||||
[mock.call('s1'), mock.call('s2')])
|
||||
|
||||
def test_db_sync(self):
|
||||
self.patch_object(chm, 'leader_get')
|
||||
self.patch_object(chm, 'leader_set')
|
||||
self.patch_object(chm, 'subprocess', name='subprocess')
|
||||
self.patch_target('restart_all')
|
||||
# first check with leader_get returning True
|
||||
self.leader_get.return_value = True
|
||||
self.target.db_sync()
|
||||
self.leader_get.assert_called_once_with(attribute='db-sync-done')
|
||||
self.subprocess.check_call.assert_not_called()
|
||||
self.leader_set.assert_not_called()
|
||||
self.restart_all.assert_not_called()
|
||||
# Now check with leader_get returning False
|
||||
self.leader_get.reset_mock()
|
||||
self.leader_get.return_value = False
|
||||
self.target.sync_cmd = ['a', 'cmd']
|
||||
self.target.db_sync()
|
||||
self.leader_get.assert_called_once_with(attribute='db-sync-done')
|
||||
self.subprocess.check_call.assert_called_once_with(['a', 'cmd'])
|
||||
self.leader_set.assert_called_once_with({'db-sync-done': True})
|
||||
self.restart_all.assert_called_once_with()
|
||||
|
||||
|
||||
class MyAdapter(object):
|
||||
|
||||
def __init__(self, interfaces):
|
||||
self.interfaces = interfaces
|
||||
|
||||
|
||||
class MyOpenStackCharm(chm.OpenStackCharm):
|
||||
|
||||
name = 'my-charm'
|
||||
packages = ['p1', 'p2', 'p3', 'package-to-filter']
|
||||
api_ports = {
|
||||
'service1': {
|
||||
chm.PUBLIC: 1,
|
||||
chm.INTERNAL: 2,
|
||||
},
|
||||
'service2': {
|
||||
chm.PUBLIC: 3,
|
||||
},
|
||||
'my-default-service': {
|
||||
chm.PUBLIC: 1234,
|
||||
chm.ADMIN: 2468,
|
||||
chm.INTERNAL: 3579,
|
||||
},
|
||||
}
|
||||
service_type = 'my-service-type'
|
||||
default_service = 'my-default-service'
|
||||
restart_map = {
|
||||
'path1': ['s1'],
|
||||
'path2': ['s2'],
|
||||
'path3': ['s3'],
|
||||
'path4': ['s2', 's4'],
|
||||
}
|
||||
sync_cmd = ['my-sync-cmd', 'param1']
|
||||
services = ['my-default-service', 'my-second-service']
|
||||
adapters_class = MyAdapter
|
||||
|
||||
|
||||
class TestMyOpenStackCharm(BaseOpenStackCharmTest):
|
||||
|
||||
def setUp(self):
|
||||
def make_open_stack_charm():
|
||||
return MyOpenStackCharm(['interface1', 'interface2'])
|
||||
|
||||
super(TestMyOpenStackCharm, self).setUp(make_open_stack_charm,
|
||||
TEST_CONFIG)
|
||||
|
||||
def test_install(self):
|
||||
# tests that the packages are filtered before installation
|
||||
self.patch_target('set_state')
|
||||
self.patch_object(chm, 'filter_installed_packages',
|
||||
return_value=None,
|
||||
name='fip')
|
||||
self.fip.side_effect = lambda x: ['p1', 'p2']
|
||||
self.patch_object(chm, 'status_set')
|
||||
self.patch_object(chm, 'apt_install')
|
||||
self.target.install()
|
||||
self.target.set_state.assert_called_once_with('my-charm-installed')
|
||||
self.fip.assert_called_once_with(self.target.packages)
|
||||
self.status_set.assert_called_once_with('maintenance',
|
||||
'Installing packages')
|
||||
|
||||
def test_api_port(self):
|
||||
self.assertEqual(self.target.api_port('service1'), 1)
|
||||
self.assertEqual(self.target.api_port('service1', chm.PUBLIC), 1)
|
||||
self.assertEqual(self.target.api_port('service2'), 3)
|
||||
with self.assertRaises(KeyError):
|
||||
self.target.api_port('service3')
|
||||
with self.assertRaises(KeyError):
|
||||
self.target.api_port('service2', chm.INTERNAL)
|
||||
|
||||
def test_public_url(self):
|
||||
self.patch_object(chm, 'canonical_url', return_value='my-ip-address')
|
||||
self.assertEqual(self.target.public_url, 'my-ip-address:1234')
|
||||
self.canonical_url.assert_called_once_with(chm.PUBLIC)
|
||||
|
||||
def test_admin_url(self):
|
||||
self.patch_object(chm, 'canonical_url', return_value='my-ip-address')
|
||||
self.assertEqual(self.target.admin_url, 'my-ip-address:2468')
|
||||
self.canonical_url.assert_called_once_with(chm.ADMIN)
|
||||
|
||||
def test_internal_url(self):
|
||||
self.patch_object(chm, 'canonical_url', return_value='my-ip-address')
|
||||
self.assertEqual(self.target.internal_url, 'my-ip-address:3579')
|
||||
self.canonical_url.assert_called_once_with(chm.INTERNAL)
|
||||
|
||||
def test_render_all_configs(self):
|
||||
self.patch_target('render_configs')
|
||||
self.target.render_all_configs()
|
||||
self.assertEqual(self.render_configs.call_count, 1)
|
||||
args = self.render_configs.call_args_list[0][0][0]
|
||||
self.assertEqual(['path1', 'path2', 'path3', 'path4'],
|
||||
sorted(args))
|
||||
|
||||
def test_render_configs(self):
|
||||
# give us a way to check that the context manager was called.
|
||||
from contextlib import contextmanager
|
||||
d = [0]
|
||||
|
||||
@contextmanager
|
||||
def fake_restart_on_change():
|
||||
d[0] += 1
|
||||
yield
|
||||
|
||||
self.patch_target('restart_on_change', new=fake_restart_on_change)
|
||||
self.patch_object(chm, 'render')
|
||||
self.patch_object(chm, 'get_loader', return_value='my-loader')
|
||||
# self.patch_target('adapter_instance', new='my-adapter')
|
||||
self.target.render_configs(['path1'])
|
||||
self.assertEqual(d[0], 1)
|
||||
self.render.assert_called_once_with(
|
||||
source='path1',
|
||||
template_loader='my-loader',
|
||||
target='path1',
|
||||
context=mock.ANY)
|
||||
# assert the context was an MyAdapter instance.
|
||||
context = self.render.call_args_list[0][1]['context']
|
||||
assert isinstance(context, MyAdapter)
|
||||
self.assertEqual(context.interfaces, ['interface1', 'interface2'])
|
|
@ -0,0 +1,108 @@
|
|||
# Note that the unit_tests/__init__.py has the following lines to stop
|
||||
# side effects from the imorts from charm helpers.
|
||||
|
||||
# sys.path.append('./lib')
|
||||
# mock out some charmhelpers libraries as they have apt install side effects
|
||||
# sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
|
||||
# sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
|
||||
|
||||
import utils
|
||||
|
||||
import charm.openstack.ip as ip
|
||||
|
||||
|
||||
class TestCharmOpenStackIp(utils.BaseTestCase):
|
||||
|
||||
def test_canonical_url(self):
|
||||
self.patch_object(ip, 'resolve_address', return_value='address1')
|
||||
self.patch_object(ip, 'is_ipv6', return_value=False)
|
||||
# not ipv6
|
||||
url = ip.canonical_url()
|
||||
self.assertEqual(url, 'http://address1')
|
||||
self.resolve_address.assert_called_once_with(ip.PUBLIC)
|
||||
# is ipv6
|
||||
self.is_ipv6.return_value = True
|
||||
self.resolve_address.reset_mock()
|
||||
url = ip.canonical_url()
|
||||
self.assertEqual(url, 'http://[address1]')
|
||||
self.resolve_address.assert_called_once_with(ip.PUBLIC)
|
||||
# test we check for enpoint type
|
||||
self.is_ipv6.return_value = False
|
||||
self.resolve_address.reset_mock()
|
||||
url = ip.canonical_url(ip.INTERNAL)
|
||||
self.resolve_address.assert_called_once_with(ip.INTERNAL)
|
||||
|
||||
def test_resolve_address(self):
|
||||
self.patch_object(ip, 'is_clustered')
|
||||
self.patch_object(ip, 'config')
|
||||
self.patch_object(ip, 'is_address_in_network')
|
||||
self.patch_object(ip, 'get_ipv6_addr')
|
||||
self.patch_object(ip, 'unit_get')
|
||||
self.patch_object(ip, 'get_address_in_network')
|
||||
|
||||
# define a fake_config() that returns predictable results and remembers
|
||||
# what it was called with.
|
||||
calls_list = []
|
||||
_config = {
|
||||
'vip': 'vip-address',
|
||||
'prefer-ipv6': False,
|
||||
'os-public-network': 'the-public-network',
|
||||
'os-internal-network': 'the-internal-network',
|
||||
'os-admin-network': 'the-admin-network',
|
||||
}
|
||||
|
||||
def fake_config(*args):
|
||||
calls_list.append(args)
|
||||
return _config[args[0]]
|
||||
|
||||
self.config.side_effect = fake_config
|
||||
|
||||
# first test, if not clustered, that the function uses unit_get() and
|
||||
# get_address_in_network to get a real address.
|
||||
# for the default PUBLIC endpoint
|
||||
self.is_clustered.return_value = False
|
||||
self.get_address_in_network.return_value = 'got-address'
|
||||
self.unit_get.return_value = 'unit-get-address'
|
||||
addr = ip.resolve_address()
|
||||
self.assertEqual(addr, 'got-address')
|
||||
self.assertEqual(calls_list,
|
||||
[('prefer-ipv6',), ('os-public-network',)])
|
||||
self.unit_get.assert_called_once_with('public-address')
|
||||
self.get_address_in_network.assert_called_once_with(
|
||||
'the-public-network', 'unit-get-address')
|
||||
|
||||
# second test: not clusted, prefer-ipv6 is True
|
||||
_config['prefer-ipv6'] = True
|
||||
calls_list = []
|
||||
self.get_ipv6_addr.return_value = ['ipv6-addr']
|
||||
self.get_address_in_network.reset_mock()
|
||||
addr = ip.resolve_address()
|
||||
self.get_ipv6_addr.assert_called_once_with(exc_list=['vip-address'])
|
||||
self.get_address_in_network.assert_called_once_with(
|
||||
'the-public-network', 'ipv6-addr')
|
||||
|
||||
# Third test: clustered, and config(...) returns None
|
||||
self.is_clustered.return_value = True
|
||||
_config['os-public-network'] = None
|
||||
calls_list = []
|
||||
addr = ip.resolve_address()
|
||||
self.assertEqual(calls_list, [('os-public-network',), ('vip',)])
|
||||
|
||||
# Fourth test: clustered, and config(...) returns not None
|
||||
_config['os-public-network'] = 'the-public-network'
|
||||
calls_list = []
|
||||
_config['vip'] = 'vip1 vip2'
|
||||
self.is_address_in_network.return_value = (False, True)
|
||||
addr = ip.resolve_address()
|
||||
self.assertEqual(calls_list, [
|
||||
('os-public-network',),
|
||||
('vip',),
|
||||
('os-public-network',),
|
||||
('os-public-network',)])
|
||||
self.assertEqual(addr, 'vip2')
|
||||
|
||||
# Finally resolved_address returns None -> ValueError()
|
||||
# allow vip to not be found:
|
||||
self.is_address_in_network.return_value = False
|
||||
with self.assertRaises(ValueError):
|
||||
addr = ip.resolve_address()
|
|
@ -0,0 +1,48 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Note that the unit_tests/__init__.py also mocks out two charmhelpers imports
|
||||
# that have side effects that try to apt install modules:
|
||||
# sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
|
||||
# sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
|
||||
|
||||
|
||||
import unittest
|
||||
import mock
|
||||
|
||||
|
||||
class BaseTestCase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self._patches = {}
|
||||
self._patches_start = {}
|
||||
|
||||
def tearDown(self):
|
||||
for k, v in self._patches.items():
|
||||
v.stop()
|
||||
setattr(self, k, None)
|
||||
self._patches = None
|
||||
self._patches_start = None
|
||||
|
||||
def patch_object(self, obj, attr, return_value=None, name=None, new=None):
|
||||
if name is None:
|
||||
name = attr
|
||||
if new is not None:
|
||||
mocked = mock.patch.object(obj, attr, new=new)
|
||||
else:
|
||||
mocked = mock.patch.object(obj, attr)
|
||||
self._patches[name] = mocked
|
||||
started = mocked.start()
|
||||
if new is None:
|
||||
started.return_value = return_value
|
||||
self._patches_start[name] = started
|
||||
setattr(self, name, started)
|
Loading…
Reference in New Issue