Merge pull request #3 from ajkavanagh/feature/add-tests
Add tests to the layer-openstack
This commit is contained in:
commit
9b5f887165
|
@ -0,0 +1,8 @@
|
|||
[DEFAULT]
|
||||
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
|
||||
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
|
||||
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
|
||||
${PYTHON:-python} -m subunit.run discover -t ./ ./unit_tests $LISTOPT $IDOPTION
|
||||
|
||||
test_id_option=--load-list $IDFILE
|
||||
test_list_option=--list
|
|
@ -0,0 +1,11 @@
|
|||
#!/usr/bin/make
|
||||
PYTHON := /usr/bin/env python
|
||||
|
||||
clean:
|
||||
@rm -rf .testrepository .unit-state.db .tox
|
||||
lint:
|
||||
@tox -e pep8
|
||||
|
||||
test:
|
||||
@echo Starting unit tests...
|
||||
@tox -e py27
|
|
@ -1 +1,7 @@
|
|||
includes: ['layer:basic']
|
||||
ignore:
|
||||
- 'unit_tests'
|
||||
- 'Makefile'
|
||||
- '.testr.conf'
|
||||
- 'test-requirements.txt'
|
||||
- 'tox.ini'
|
||||
|
|
|
@ -23,7 +23,7 @@ class OpenStackRelationAdapter(object):
|
|||
"""
|
||||
Name of the relation this adapter is handling.
|
||||
"""
|
||||
return self._relation.relation_name
|
||||
return self.relation.relation_name
|
||||
|
||||
def _setup_properties(self):
|
||||
"""
|
||||
|
@ -37,9 +37,15 @@ class OpenStackRelationAdapter(object):
|
|||
for field in self.accessors:
|
||||
meth_name = field.replace('-', '_')
|
||||
# Get the relation property dynamically
|
||||
# Note the additional lambda name: is to create a closure over
|
||||
# meth_name so that a new 'name' gets created for each loop,
|
||||
# otherwise the same variable meth_name is referenced in each of
|
||||
# the internal lambdas. i.e. this is (lambda x: ...)(value)
|
||||
setattr(self.__class__,
|
||||
meth_name,
|
||||
property(lambda self: getattr(self.relation, meth_name)))
|
||||
(lambda name: property(
|
||||
lambda self: getattr(
|
||||
self.relation, name)()))(meth_name))
|
||||
|
||||
|
||||
class RabbitMQRelationAdapter(OpenStackRelationAdapter):
|
||||
|
@ -113,12 +119,15 @@ class DatabaseRelationAdapter(OpenStackRelationAdapter):
|
|||
self.host,
|
||||
self.database,
|
||||
)
|
||||
if self.ssl_ca:
|
||||
uri = '{}?ssl_ca={}'.format(uri, self.ssl_ca)
|
||||
if self.ssl_cert:
|
||||
uri = '{}&ssl_cert={}&ssl_key={}'.format(uri,
|
||||
self.ssl_cert,
|
||||
self.ssl_key)
|
||||
try:
|
||||
if self.ssl_ca:
|
||||
uri = '{}?ssl_ca={}'.format(uri, self.ssl_ca)
|
||||
if self.ssl_cert:
|
||||
uri = ('{}&ssl_cert={}&ssl_key={}'
|
||||
.format(uri, self.ssl_cert, self.ssl_key))
|
||||
except AttributeError:
|
||||
# ignore ssl_ca or ssl_cert if not available
|
||||
pass
|
||||
return uri
|
||||
|
||||
@property
|
||||
|
|
|
@ -1,7 +1,12 @@
|
|||
"""Classes to support writing re-usable charms in the reactive framework"""
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import subprocess
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
from collections import OrderedDict
|
||||
|
||||
from charmhelpers.contrib.openstack.utils import (
|
||||
configure_installation_source,
|
||||
)
|
||||
|
@ -12,16 +17,13 @@ from charmhelpers.fetch import (
|
|||
apt_update,
|
||||
filter_installed_packages,
|
||||
)
|
||||
|
||||
from charm.openstack.ip import PUBLIC, INTERNAL, ADMIN, canonical_url
|
||||
from contextlib import contextmanager
|
||||
from collections import OrderedDict
|
||||
from charmhelpers.contrib.openstack.templating import get_loader
|
||||
from charmhelpers.core.templating import render
|
||||
from charmhelpers.core.hookenv import leader_get, leader_set
|
||||
|
||||
from charms.reactive.bus import set_state, remove_state
|
||||
|
||||
from charm.openstack.ip import PUBLIC, INTERNAL, ADMIN, canonical_url
|
||||
|
||||
|
||||
class OpenStackCharm(object):
|
||||
"""
|
||||
|
@ -114,13 +116,13 @@ class OpenStackCharm(object):
|
|||
|
||||
@contextmanager
|
||||
def restart_on_change(self):
|
||||
checksums = {path: path_hash(path) for path in self.restart_map}
|
||||
checksums = {path: path_hash(path) for path in self.restart_map.keys()}
|
||||
yield
|
||||
restarts = []
|
||||
for path in self.restart_map:
|
||||
if path_hash(path) != checksums[path]:
|
||||
restarts += self.restart_map[path]
|
||||
services_list = list(OrderedDict.fromkeys(restarts))
|
||||
services_list = list(OrderedDict.fromkeys(restarts).keys())
|
||||
for service_name in services_list:
|
||||
service_restart(service_name)
|
||||
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
flake8>=2.2.4,<=2.4.1
|
||||
os-testr>=0.4.1
|
||||
charm-tools
|
||||
paramiko<2.0
|
||||
charm-tools>=2.0.0
|
||||
charms.reactive
|
||||
mock>=1.2
|
||||
coverage>=3.6
|
||||
|
|
2
tox.ini
2
tox.ini
|
@ -16,7 +16,7 @@ deps = -r{toxinidir}/test-requirements.txt
|
|||
[testenv:pep8]
|
||||
basepython = python2.7
|
||||
deps = -r{toxinidir}/test-requirements.txt
|
||||
commands = flake8 {posargs} lib
|
||||
commands = flake8 {posargs} lib unit_tests
|
||||
|
||||
[testenv:venv]
|
||||
commands = {posargs}
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
import sys
|
||||
import mock
|
||||
|
||||
|
||||
sys.path.append('./lib')
|
||||
# mock out some charmhelpers libraries as they have apt install side effects
|
||||
sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
|
||||
sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
|
|
@ -0,0 +1,198 @@
|
|||
# Note that the unit_tests/__init__.py has the following lines to stop
|
||||
# side effects from the imorts from charm helpers.
|
||||
|
||||
# sys.path.append('./lib')
|
||||
# mock out some charmhelpers libraries as they have apt install side effects
|
||||
# sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
|
||||
# sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
|
||||
|
||||
import unittest
|
||||
import mock
|
||||
|
||||
import charm.openstack.adapters as adapters
|
||||
|
||||
|
||||
class MyRelation(object):
|
||||
|
||||
auto_accessors = ['this', 'that']
|
||||
relation_name = 'my-name'
|
||||
|
||||
def this(self):
|
||||
return 'this'
|
||||
|
||||
def that(self):
|
||||
return 'that'
|
||||
|
||||
def some(self):
|
||||
return 'thing'
|
||||
|
||||
|
||||
class TestOpenStackRelationAdapter(unittest.TestCase):
|
||||
|
||||
def test_class(self):
|
||||
ad = adapters.OpenStackRelationAdapter(MyRelation(), ['some'])
|
||||
self.assertEqual(ad.this, 'this')
|
||||
self.assertEqual(ad.that, 'that')
|
||||
self.assertEqual(ad.some, 'thing')
|
||||
self.assertEqual(ad.relation_name, 'my-name')
|
||||
with self.assertRaises(AttributeError):
|
||||
ad.relation_name = 'hello'
|
||||
|
||||
|
||||
class FakeRabbitMQRelation():
|
||||
|
||||
auto_accessors = ['vip', 'private_address']
|
||||
relation_name = 'amqp'
|
||||
|
||||
def __init__(self, vip=None):
|
||||
self._vip = vip
|
||||
|
||||
def vip(self):
|
||||
return self._vip
|
||||
|
||||
def private_address(self):
|
||||
return 'private-address'
|
||||
|
||||
def rabbitmq_hosts(self):
|
||||
return ['host1', 'host2']
|
||||
|
||||
def vhost(self):
|
||||
return 'vhost'
|
||||
|
||||
def username(self):
|
||||
return 'fakename'
|
||||
|
||||
|
||||
class TestRabbitMQRelationAdapter(unittest.TestCase):
|
||||
|
||||
def test_class(self):
|
||||
fake = FakeRabbitMQRelation(None)
|
||||
mq = adapters.RabbitMQRelationAdapter(fake)
|
||||
self.assertEqual(mq.vhost, 'vhost')
|
||||
self.assertEqual(mq.username, 'fakename')
|
||||
self.assertEqual(mq.host, 'private-address')
|
||||
# TODO: can't do the following 2 lines as not dynamic accessors
|
||||
# fake._vip = 'vip1'
|
||||
# self.assertEqual(mq.host, 'vip1')
|
||||
self.assertEqual(mq.hosts, 'host1,host2')
|
||||
|
||||
|
||||
class FakeDatabaseRelation():
|
||||
|
||||
auto_accessors = []
|
||||
relation_name = 'shared_db'
|
||||
|
||||
def db_host(self):
|
||||
return 'host1'
|
||||
|
||||
def username(self, prefix=''):
|
||||
return 'username1{}'.format(prefix)
|
||||
|
||||
def password(self, prefix=''):
|
||||
return 'password1{}'.format(prefix)
|
||||
|
||||
def database(self, prefix=''):
|
||||
return 'database1{}'.format(prefix)
|
||||
|
||||
|
||||
class SSLDatabaseRelationAdapter(adapters.DatabaseRelationAdapter):
|
||||
|
||||
ssl_ca = 'my-ca'
|
||||
ssl_cert = 'my-cert'
|
||||
ssl_key = 'my-key'
|
||||
|
||||
|
||||
class TestDatabaseRelationAdapter(unittest.TestCase):
|
||||
|
||||
def test_class(self):
|
||||
fake = FakeDatabaseRelation()
|
||||
db = adapters.DatabaseRelationAdapter(fake)
|
||||
self.assertEqual(db.host, 'host1')
|
||||
self.assertEqual(db.type, 'mysql')
|
||||
self.assertEqual(db.password, 'password1')
|
||||
self.assertEqual(db.username, 'username1')
|
||||
self.assertEqual(db.database, 'database1')
|
||||
self.assertEqual(db.uri, 'mysql://username1:password1@host1/database1')
|
||||
self.assertEqual(db.get_uri('x'),
|
||||
'mysql://username1x:password1x@host1/database1x')
|
||||
# test the ssl feature of the base class
|
||||
db = SSLDatabaseRelationAdapter(fake)
|
||||
self.assertEqual(db.uri,
|
||||
'mysql://username1:password1@host1/database1'
|
||||
'?ssl_ca=my-ca'
|
||||
'&ssl_cert=my-cert&ssl_key=my-key')
|
||||
|
||||
|
||||
class TestConfigurationAdapter(unittest.TestCase):
|
||||
|
||||
def test_class(self):
|
||||
test_config = {
|
||||
'one': 1,
|
||||
'two': 2,
|
||||
'three': 3,
|
||||
'that-one': 4
|
||||
}
|
||||
with mock.patch.object(adapters.hookenv, 'config',
|
||||
new=lambda: test_config):
|
||||
c = adapters.ConfigurationAdapter()
|
||||
self.assertEqual(c.one, 1)
|
||||
self.assertEqual(c.three, 3)
|
||||
self.assertEqual(c.that_one, 4)
|
||||
|
||||
|
||||
class TestOpenStackRelationAdapters(unittest.TestCase):
|
||||
# test the OpenStackRelationAdapters() class, and then derive from it to
|
||||
# test the additonal relation_adapters member on __init__
|
||||
|
||||
def test_class(self):
|
||||
test_config = {
|
||||
'one': 1,
|
||||
'two': 2,
|
||||
'three': 3,
|
||||
'that-one': 4
|
||||
}
|
||||
with mock.patch.object(adapters.hookenv, 'config',
|
||||
new=lambda: test_config):
|
||||
amqp = FakeRabbitMQRelation()
|
||||
shared_db = FakeDatabaseRelation()
|
||||
mine = MyRelation()
|
||||
a = adapters.OpenStackRelationAdapters([amqp, shared_db, mine])
|
||||
self.assertEqual(a.amqp.private_address, 'private-address')
|
||||
self.assertEqual(a.my_name.this, 'this')
|
||||
items = list(a)
|
||||
self.assertEqual(items[0][0], 'amqp')
|
||||
self.assertEqual(items[1][0], 'shared_db')
|
||||
self.assertEqual(items[2][0], 'my_name')
|
||||
self.assertEqual(items[3][0], 'options')
|
||||
|
||||
|
||||
class MyRelationAdapter(adapters.OpenStackRelationAdapter):
|
||||
|
||||
@property
|
||||
def us(self):
|
||||
return self.this + '-us'
|
||||
|
||||
|
||||
class MyOpenStackRelationAdapters(adapters.OpenStackRelationAdapters):
|
||||
|
||||
relation_adapters = {
|
||||
'my_name': MyRelationAdapter,
|
||||
}
|
||||
|
||||
|
||||
class TestCustomOpenStackRelationAdapters(unittest.TestCase):
|
||||
|
||||
def test_class(self):
|
||||
test_config = {
|
||||
'one': 1,
|
||||
'two': 2,
|
||||
'three': 3,
|
||||
'that-one': 4
|
||||
}
|
||||
with mock.patch.object(adapters.hookenv, 'config',
|
||||
new=lambda: test_config):
|
||||
amqp = FakeRabbitMQRelation()
|
||||
shared_db = FakeDatabaseRelation()
|
||||
mine = MyRelation()
|
||||
a = MyOpenStackRelationAdapters([amqp, shared_db, mine])
|
||||
self.assertEqual(a.my_name.us, 'this-us')
|
|
@ -0,0 +1,285 @@
|
|||
# Note that the unit_tests/__init__.py has the following lines to stop
|
||||
# side effects from the imorts from charm helpers.
|
||||
|
||||
# sys.path.append('./lib')
|
||||
# mock out some charmhelpers libraries as they have apt install side effects
|
||||
# sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
|
||||
# sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
|
||||
|
||||
import mock
|
||||
|
||||
import utils
|
||||
|
||||
import charm.openstack.charm as chm
|
||||
|
||||
TEST_CONFIG = {'config': True}
|
||||
|
||||
|
||||
class BaseOpenStackCharmTest(utils.BaseTestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.patched_config = mock.patch.object(chm, 'config')
|
||||
cls.patched_config_started = cls.patched_config.start()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.patched_config.stop()
|
||||
cls.patched_config_started = None
|
||||
cls.patched_config = None
|
||||
|
||||
def setUp(self, target_cls, test_config):
|
||||
super(BaseOpenStackCharmTest, self).setUp()
|
||||
# set up the return value on the mock before instantiating the class to
|
||||
# get the config into the class.config.
|
||||
chm.config.return_value = test_config
|
||||
self.target = target_cls()
|
||||
|
||||
def tearDown(self):
|
||||
self.target = None
|
||||
super(BaseOpenStackCharmTest, self).tearDown()
|
||||
|
||||
def patch_target(self, attr, return_value=None, name=None, new=None):
|
||||
# uses BaseTestCase.patch_object() to patch targer.
|
||||
self.patch_object(self.target, attr, return_value, name, new)
|
||||
|
||||
|
||||
class TestOpenStackCharm(BaseOpenStackCharmTest):
|
||||
# Note that this only tests the OpenStackCharm() class, which has not very
|
||||
# useful defaults for testing. In order to test all the code without too
|
||||
# many mocks, a separate test dervied charm class is used below.
|
||||
|
||||
def setUp(self):
|
||||
super(TestOpenStackCharm, self).setUp(chm.OpenStackCharm, TEST_CONFIG)
|
||||
|
||||
def test__init__(self):
|
||||
# Note cls.setUpClass() creates an OpenStackCharm() instance
|
||||
self.assertEqual(chm.config(), TEST_CONFIG)
|
||||
self.assertEqual(self.target.config, TEST_CONFIG)
|
||||
self.assertEqual(self.target.release, 'liberty')
|
||||
|
||||
def test_install(self):
|
||||
# only tests that the default set_state is called
|
||||
self.patch_target('set_state')
|
||||
self.patch_object(chm, 'filter_installed_packages',
|
||||
name='fip',
|
||||
return_value=None)
|
||||
self.target.install()
|
||||
self.target.set_state.assert_called_once_with('charmname-installed')
|
||||
self.fip.assert_called_once_with([])
|
||||
|
||||
def test_set_state(self):
|
||||
# tests that OpenStackCharm.set_state() calls set_state() global
|
||||
self.patch_object(chm, 'set_state')
|
||||
self.target.set_state('hello')
|
||||
self.set_state.assert_called_once_with('hello', None)
|
||||
self.set_state.reset_mock()
|
||||
self.target.set_state('hello', 'there')
|
||||
self.set_state.assert_called_once_with('hello', 'there')
|
||||
|
||||
def test_remove_state(self):
|
||||
# tests that OpenStackCharm.remove_state() calls remove_state() global
|
||||
self.patch_object(chm, 'remove_state')
|
||||
self.target.remove_state('hello')
|
||||
self.remove_state.assert_called_once_with('hello')
|
||||
|
||||
def test_configure_source(self):
|
||||
self.patch_object(chm, 'configure_installation_source', name='cis')
|
||||
self.patch_object(chm, 'apt_update')
|
||||
self.patch_target('config', new={'openstack-origin': 'an-origin'})
|
||||
self.target.configure_source()
|
||||
self.cis.assert_called_once_with('an-origin')
|
||||
self.apt_update.assert_called_once_with(fatal=True)
|
||||
|
||||
def test_region(self):
|
||||
self.patch_target('config', new={'region': 'a-region'})
|
||||
self.assertEqual(self.target.region, 'a-region')
|
||||
|
||||
def test_restart_on_change(self):
|
||||
from collections import OrderedDict
|
||||
hashs = OrderedDict([
|
||||
('path1', 100),
|
||||
('path2', 200),
|
||||
('path3', 300),
|
||||
('path4', 400),
|
||||
])
|
||||
self.target.restart_map = {
|
||||
'path1': ['s1'],
|
||||
'path2': ['s2'],
|
||||
'path3': ['s3'],
|
||||
'path4': ['s2', 's4'],
|
||||
}
|
||||
self.patch_object(chm, 'path_hash')
|
||||
self.path_hash.side_effect = lambda x: hashs[x]
|
||||
self.patch_object(chm, 'service_restart')
|
||||
# slightly awkard, in that we need to test a context manager
|
||||
with self.target.restart_on_change():
|
||||
# test with no restarts
|
||||
pass
|
||||
self.assertEqual(self.service_restart.call_count, 0)
|
||||
|
||||
with self.target.restart_on_change():
|
||||
# test with path1 and path3 restarts
|
||||
for k in ['path1', 'path3']:
|
||||
hashs[k] += 1
|
||||
self.assertEqual(self.service_restart.call_count, 2)
|
||||
self.service_restart.assert_any_call('s1')
|
||||
self.service_restart.assert_any_call('s3')
|
||||
|
||||
# test with path2 and path4 and that s2 only gets restarted once
|
||||
self.service_restart.reset_mock()
|
||||
with self.target.restart_on_change():
|
||||
for k in ['path2', 'path4']:
|
||||
hashs[k] += 1
|
||||
self.assertEqual(self.service_restart.call_count, 2)
|
||||
calls = [mock.call('s2'), mock.call('s4')]
|
||||
self.service_restart.assert_has_calls(calls)
|
||||
|
||||
def test_restart_all(self):
|
||||
self.patch_object(chm, 'service_restart')
|
||||
self.patch_target('services', new=['s1', 's2'])
|
||||
self.target.restart_all()
|
||||
self.assertEqual(self.service_restart.call_args_list,
|
||||
[mock.call('s1'), mock.call('s2')])
|
||||
|
||||
def test_db_sync(self):
|
||||
self.patch_object(chm, 'leader_get')
|
||||
self.patch_object(chm, 'leader_set')
|
||||
self.patch_object(chm, 'subprocess', name='subprocess')
|
||||
self.patch_target('restart_all')
|
||||
# first check with leader_get returning True
|
||||
self.leader_get.return_value = True
|
||||
self.target.db_sync()
|
||||
self.leader_get.assert_called_once_with(attribute='db-sync-done')
|
||||
self.subprocess.check_call.assert_not_called()
|
||||
self.leader_set.assert_not_called()
|
||||
self.restart_all.assert_not_called()
|
||||
# Now check with leader_get returning False
|
||||
self.leader_get.reset_mock()
|
||||
self.leader_get.return_value = False
|
||||
self.target.sync_cmd = ['a', 'cmd']
|
||||
self.target.db_sync()
|
||||
self.leader_get.assert_called_once_with(attribute='db-sync-done')
|
||||
self.subprocess.check_call.assert_called_once_with(['a', 'cmd'])
|
||||
self.leader_set.assert_called_once_with({'db-sync-done': True})
|
||||
self.restart_all.assert_called_once_with()
|
||||
|
||||
|
||||
class MyAdapter(object):
|
||||
|
||||
def __init__(self, interfaces):
|
||||
self.interfaces = interfaces
|
||||
|
||||
|
||||
class MyOpenStackCharm(chm.OpenStackCharm):
|
||||
|
||||
name = 'my-charm'
|
||||
packages = ['p1', 'p2', 'p3', 'package-to-filter']
|
||||
api_ports = {
|
||||
'service1': {
|
||||
chm.PUBLIC: 1,
|
||||
chm.INTERNAL: 2,
|
||||
},
|
||||
'service2': {
|
||||
chm.PUBLIC: 3,
|
||||
},
|
||||
'my-default-service': {
|
||||
chm.PUBLIC: 1234,
|
||||
chm.ADMIN: 2468,
|
||||
chm.INTERNAL: 3579,
|
||||
},
|
||||
}
|
||||
service_type = 'my-service-type'
|
||||
default_service = 'my-default-service'
|
||||
restart_map = {
|
||||
'path1': ['s1'],
|
||||
'path2': ['s2'],
|
||||
'path3': ['s3'],
|
||||
'path4': ['s2', 's4'],
|
||||
}
|
||||
sync_cmd = ['my-sync-cmd', 'param1']
|
||||
services = ['my-default-service', 'my-second-service']
|
||||
adapters_class = MyAdapter
|
||||
|
||||
|
||||
class TestMyOpenStackCharm(BaseOpenStackCharmTest):
|
||||
|
||||
def setUp(self):
|
||||
def make_open_stack_charm():
|
||||
return MyOpenStackCharm(['interface1', 'interface2'])
|
||||
|
||||
super(TestMyOpenStackCharm, self).setUp(make_open_stack_charm,
|
||||
TEST_CONFIG)
|
||||
|
||||
def test_install(self):
|
||||
# tests that the packages are filtered before installation
|
||||
self.patch_target('set_state')
|
||||
self.patch_object(chm, 'filter_installed_packages',
|
||||
return_value=None,
|
||||
name='fip')
|
||||
self.fip.side_effect = lambda x: ['p1', 'p2']
|
||||
self.patch_object(chm, 'status_set')
|
||||
self.patch_object(chm, 'apt_install')
|
||||
self.target.install()
|
||||
self.target.set_state.assert_called_once_with('my-charm-installed')
|
||||
self.fip.assert_called_once_with(self.target.packages)
|
||||
self.status_set.assert_called_once_with('maintenance',
|
||||
'Installing packages')
|
||||
|
||||
def test_api_port(self):
|
||||
self.assertEqual(self.target.api_port('service1'), 1)
|
||||
self.assertEqual(self.target.api_port('service1', chm.PUBLIC), 1)
|
||||
self.assertEqual(self.target.api_port('service2'), 3)
|
||||
with self.assertRaises(KeyError):
|
||||
self.target.api_port('service3')
|
||||
with self.assertRaises(KeyError):
|
||||
self.target.api_port('service2', chm.INTERNAL)
|
||||
|
||||
def test_public_url(self):
|
||||
self.patch_object(chm, 'canonical_url', return_value='my-ip-address')
|
||||
self.assertEqual(self.target.public_url, 'my-ip-address:1234')
|
||||
self.canonical_url.assert_called_once_with(chm.PUBLIC)
|
||||
|
||||
def test_admin_url(self):
|
||||
self.patch_object(chm, 'canonical_url', return_value='my-ip-address')
|
||||
self.assertEqual(self.target.admin_url, 'my-ip-address:2468')
|
||||
self.canonical_url.assert_called_once_with(chm.ADMIN)
|
||||
|
||||
def test_internal_url(self):
|
||||
self.patch_object(chm, 'canonical_url', return_value='my-ip-address')
|
||||
self.assertEqual(self.target.internal_url, 'my-ip-address:3579')
|
||||
self.canonical_url.assert_called_once_with(chm.INTERNAL)
|
||||
|
||||
def test_render_all_configs(self):
|
||||
self.patch_target('render_configs')
|
||||
self.target.render_all_configs()
|
||||
self.assertEqual(self.render_configs.call_count, 1)
|
||||
args = self.render_configs.call_args_list[0][0][0]
|
||||
self.assertEqual(['path1', 'path2', 'path3', 'path4'],
|
||||
sorted(args))
|
||||
|
||||
def test_render_configs(self):
|
||||
# give us a way to check that the context manager was called.
|
||||
from contextlib import contextmanager
|
||||
d = [0]
|
||||
|
||||
@contextmanager
|
||||
def fake_restart_on_change():
|
||||
d[0] += 1
|
||||
yield
|
||||
|
||||
self.patch_target('restart_on_change', new=fake_restart_on_change)
|
||||
self.patch_object(chm, 'render')
|
||||
self.patch_object(chm, 'get_loader', return_value='my-loader')
|
||||
# self.patch_target('adapter_instance', new='my-adapter')
|
||||
self.target.render_configs(['path1'])
|
||||
self.assertEqual(d[0], 1)
|
||||
self.render.assert_called_once_with(
|
||||
source='path1',
|
||||
template_loader='my-loader',
|
||||
target='path1',
|
||||
context=mock.ANY)
|
||||
# assert the context was an MyAdapter instance.
|
||||
context = self.render.call_args_list[0][1]['context']
|
||||
assert isinstance(context, MyAdapter)
|
||||
self.assertEqual(context.interfaces, ['interface1', 'interface2'])
|
|
@ -0,0 +1,108 @@
|
|||
# Note that the unit_tests/__init__.py has the following lines to stop
|
||||
# side effects from the imorts from charm helpers.
|
||||
|
||||
# sys.path.append('./lib')
|
||||
# mock out some charmhelpers libraries as they have apt install side effects
|
||||
# sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
|
||||
# sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
|
||||
|
||||
import utils
|
||||
|
||||
import charm.openstack.ip as ip
|
||||
|
||||
|
||||
class TestCharmOpenStackIp(utils.BaseTestCase):
|
||||
|
||||
def test_canonical_url(self):
|
||||
self.patch_object(ip, 'resolve_address', return_value='address1')
|
||||
self.patch_object(ip, 'is_ipv6', return_value=False)
|
||||
# not ipv6
|
||||
url = ip.canonical_url()
|
||||
self.assertEqual(url, 'http://address1')
|
||||
self.resolve_address.assert_called_once_with(ip.PUBLIC)
|
||||
# is ipv6
|
||||
self.is_ipv6.return_value = True
|
||||
self.resolve_address.reset_mock()
|
||||
url = ip.canonical_url()
|
||||
self.assertEqual(url, 'http://[address1]')
|
||||
self.resolve_address.assert_called_once_with(ip.PUBLIC)
|
||||
# test we check for enpoint type
|
||||
self.is_ipv6.return_value = False
|
||||
self.resolve_address.reset_mock()
|
||||
url = ip.canonical_url(ip.INTERNAL)
|
||||
self.resolve_address.assert_called_once_with(ip.INTERNAL)
|
||||
|
||||
def test_resolve_address(self):
|
||||
self.patch_object(ip, 'is_clustered')
|
||||
self.patch_object(ip, 'config')
|
||||
self.patch_object(ip, 'is_address_in_network')
|
||||
self.patch_object(ip, 'get_ipv6_addr')
|
||||
self.patch_object(ip, 'unit_get')
|
||||
self.patch_object(ip, 'get_address_in_network')
|
||||
|
||||
# define a fake_config() that returns predictable results and remembers
|
||||
# what it was called with.
|
||||
calls_list = []
|
||||
_config = {
|
||||
'vip': 'vip-address',
|
||||
'prefer-ipv6': False,
|
||||
'os-public-network': 'the-public-network',
|
||||
'os-internal-network': 'the-internal-network',
|
||||
'os-admin-network': 'the-admin-network',
|
||||
}
|
||||
|
||||
def fake_config(*args):
|
||||
calls_list.append(args)
|
||||
return _config[args[0]]
|
||||
|
||||
self.config.side_effect = fake_config
|
||||
|
||||
# first test, if not clustered, that the function uses unit_get() and
|
||||
# get_address_in_network to get a real address.
|
||||
# for the default PUBLIC endpoint
|
||||
self.is_clustered.return_value = False
|
||||
self.get_address_in_network.return_value = 'got-address'
|
||||
self.unit_get.return_value = 'unit-get-address'
|
||||
addr = ip.resolve_address()
|
||||
self.assertEqual(addr, 'got-address')
|
||||
self.assertEqual(calls_list,
|
||||
[('prefer-ipv6',), ('os-public-network',)])
|
||||
self.unit_get.assert_called_once_with('public-address')
|
||||
self.get_address_in_network.assert_called_once_with(
|
||||
'the-public-network', 'unit-get-address')
|
||||
|
||||
# second test: not clusted, prefer-ipv6 is True
|
||||
_config['prefer-ipv6'] = True
|
||||
calls_list = []
|
||||
self.get_ipv6_addr.return_value = ['ipv6-addr']
|
||||
self.get_address_in_network.reset_mock()
|
||||
addr = ip.resolve_address()
|
||||
self.get_ipv6_addr.assert_called_once_with(exc_list=['vip-address'])
|
||||
self.get_address_in_network.assert_called_once_with(
|
||||
'the-public-network', 'ipv6-addr')
|
||||
|
||||
# Third test: clustered, and config(...) returns None
|
||||
self.is_clustered.return_value = True
|
||||
_config['os-public-network'] = None
|
||||
calls_list = []
|
||||
addr = ip.resolve_address()
|
||||
self.assertEqual(calls_list, [('os-public-network',), ('vip',)])
|
||||
|
||||
# Fourth test: clustered, and config(...) returns not None
|
||||
_config['os-public-network'] = 'the-public-network'
|
||||
calls_list = []
|
||||
_config['vip'] = 'vip1 vip2'
|
||||
self.is_address_in_network.return_value = (False, True)
|
||||
addr = ip.resolve_address()
|
||||
self.assertEqual(calls_list, [
|
||||
('os-public-network',),
|
||||
('vip',),
|
||||
('os-public-network',),
|
||||
('os-public-network',)])
|
||||
self.assertEqual(addr, 'vip2')
|
||||
|
||||
# Finally resolved_address returns None -> ValueError()
|
||||
# allow vip to not be found:
|
||||
self.is_address_in_network.return_value = False
|
||||
with self.assertRaises(ValueError):
|
||||
addr = ip.resolve_address()
|
|
@ -0,0 +1,48 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Note that the unit_tests/__init__.py also mocks out two charmhelpers imports
|
||||
# that have side effects that try to apt install modules:
|
||||
# sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
|
||||
# sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
|
||||
|
||||
|
||||
import unittest
|
||||
import mock
|
||||
|
||||
|
||||
class BaseTestCase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self._patches = {}
|
||||
self._patches_start = {}
|
||||
|
||||
def tearDown(self):
|
||||
for k, v in self._patches.items():
|
||||
v.stop()
|
||||
setattr(self, k, None)
|
||||
self._patches = None
|
||||
self._patches_start = None
|
||||
|
||||
def patch_object(self, obj, attr, return_value=None, name=None, new=None):
|
||||
if name is None:
|
||||
name = attr
|
||||
if new is not None:
|
||||
mocked = mock.patch.object(obj, attr, new=new)
|
||||
else:
|
||||
mocked = mock.patch.object(obj, attr)
|
||||
self._patches[name] = mocked
|
||||
started = mocked.start()
|
||||
if new is None:
|
||||
started.return_value = return_value
|
||||
self._patches_start[name] = started
|
||||
setattr(self, name, started)
|
Loading…
Reference in New Issue