Add unit tests

This commit is contained in:
Liam Young 2016-07-08 12:35:52 +01:00
parent 0995e280f4
commit 51b9f62307
18 changed files with 784 additions and 18 deletions

2
.gitignore vendored
View File

@ -3,3 +3,5 @@ tmp
build
layers
interfaces
__pycache__
.testrepository

8
.testr.conf Normal file
View File

@ -0,0 +1,8 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
${PYTHON:-python} -m subunit.run discover -t ./ ./unit_tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@ -1,3 +1,5 @@
# Requirements to build the charm
charm-tools
flake8
ruamel.yaml==0.10.12
simplejson
flake8

View File

@ -19,7 +19,7 @@ LEADERDB_SYNC_SRC_KEY = 'sync_src'
LEADERDB_SYNC_TIME_KEY = 'sync_time'
CLUSTER_SYNC_KEY = 'sync_request'
WWW_DIR = '/var/www/html'
ZONE_DIR = '/var/cache/bind/'
ZONE_DIR = '/var/cache/bind'
def install():
@ -311,6 +311,18 @@ class DesignateBindCharm(openstack_charm.OpenStackCharm):
cmd.extend(zone_files)
subprocess.check_call(cmd, cwd=ZONE_DIR)
def setup_sync_dir(self, sync_time):
sync_dir = '{}/zone-syncs'.format(WWW_DIR, sync_time)
try:
os.mkdir(sync_dir, 0o755)
except FileExistsError:
os.chmod(sync_dir, 0o755)
def create_sync_src_info_file(self):
unit_name = hookenv.local_unit().replace('/', '_')
touch_file = '{}/juju-zone-src-{}'.format(ZONE_DIR, unit_name)
open(touch_file, 'w+').close()
def setup_sync(self):
"""Setup a sync target
@ -321,14 +333,8 @@ class DesignateBindCharm(openstack_charm.OpenStackCharm):
"""
hookenv.log('Setting up zone info for collection', level=hookenv.DEBUG)
sync_time = str(time.time())
sync_dir = '{}/zone-syncs'.format(WWW_DIR, sync_time)
try:
os.mkdir(sync_dir, 0o755)
except FileExistsError:
os.chmod(sync_dir, 0o755)
unit_name = hookenv.local_unit().replace('/', '_')
touch_file = '{}/juju-zone-src-{}'.format(ZONE_DIR, unit_name)
open(touch_file, 'w+').close()
sync_dir = self.setup_sync_dir(sync_time)
self.create_sync_src_info_file()
# FIXME Try freezing DNS rather than stopping bind
self.service_control('stop', ['bind9'])
tar_file = '{}/{}.tar.gz'.format(sync_dir, sync_time)
@ -348,7 +354,7 @@ class DesignateBindCharm(openstack_charm.OpenStackCharm):
'start': host.service_start,
'restart': host.service_restart,
}
for service in self.services:
for service in services:
cmds[cmd](service)
def request_sync(self, hacluster):
@ -373,6 +379,7 @@ class DesignateBindCharm(openstack_charm.OpenStackCharm):
:param target_dir: Place file in this directory
:returns: None
"""
print("{} {}".format(url, target_dir))
cmd = ['wget', url, '--retry-connrefused', '-t', '10']
subprocess.check_call(cmd, cwd=target_dir)
@ -388,6 +395,7 @@ class DesignateBindCharm(openstack_charm.OpenStackCharm):
:returns: None
"""
request_time = None
if cluster_relation:
request_time = cluster_relation.retrieve_local(CLUSTER_SYNC_KEY)
sync_time = DesignateBindCharm.get_sync_time()

7
test-requirements.txt Normal file
View File

@ -0,0 +1,7 @@
# Unit test requirements
flake8>=2.2.4,<=2.4.1
os-testr>=0.4.1
charms.reactive
mock>=1.2
coverage>=3.6
git+https://github.com/gnuoy/charms.openstack.git@bug/general#egg=charms.openstack

29
tox.ini
View File

@ -1,8 +1,10 @@
[tox]
skipsdist = True
envlist = generate
envlist = pep8,py34,py35
skip_missing_interpreters = True
[testenv]
basepython = python2.7
setenv = VIRTUAL_ENV={envdir}
PYTHONHASHSEED=0
TERM=linux
@ -13,16 +15,29 @@ passenv = http_proxy https_proxy
install_command =
pip install {opts} {packages}
deps =
-r{toxinidir}/requirements.txt
-r{toxinidir}/requirements.txt
[testenv:generate]
basepython = python2.7
[testenv:build]
commands =
charm build --log-level DEBUG -o {toxinidir}/build charm
charm-build --log-level DEBUG -o {toxinidir}/build src
[testenv:venv]
commands = {posargs}
[testenv:lint]
[testenv:pep8]
commands = flake8 {posargs} src/reactive src/lib unit_tests
[testenv:py27]
basepython = python2.7
commands = flake8 {posargs} charm/reactive
deps = -r{toxinidir}/test-requirements.txt
commands = ostestr {posargs}
[testenv:py34]
basepython = python3.4
deps = -r{toxinidir}/test-requirements.txt
commands = ostestr {posargs}
[testenv:py35]
basepython = python3.5
deps = -r{toxinidir}/test-requirements.txt
commands = ostestr {posargs}

56
unit_tests/__init__.py Normal file
View File

@ -0,0 +1,56 @@
# Copyright 2016 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import mock
sys.path.append('src')
sys.path.append('src/lib')
# Mock out charmhelpers so that we can test without it.
# also stops sideeffects from occuring.
charmhelpers = mock.MagicMock()
sys.modules['charmhelpers'] = charmhelpers
sys.modules['charmhelpers.core'] = charmhelpers.core
sys.modules['charmhelpers.core.hookenv'] = charmhelpers.core.hookenv
sys.modules['charmhelpers.core.decorators'] = charmhelpers.core.decorators
sys.modules['charmhelpers.core.host'] = charmhelpers.core.host
sys.modules['charmhelpers.core.unitdata'] = charmhelpers.core.unitdata
sys.modules['charmhelpers.core.templating'] = charmhelpers.core.templating
sys.modules['charmhelpers.contrib'] = charmhelpers.contrib
sys.modules['charmhelpers.contrib.openstack'] = charmhelpers.contrib.openstack
sys.modules['charmhelpers.contrib.openstack.utils'] = (
charmhelpers.contrib.openstack.utils)
sys.modules['charmhelpers.contrib.openstack.templating'] = (
charmhelpers.contrib.openstack.templating)
sys.modules['charmhelpers.contrib.network'] = charmhelpers.contrib.network
sys.modules['charmhelpers.contrib.network.ip'] = (
charmhelpers.contrib.network.ip)
sys.modules['charmhelpers.fetch'] = charmhelpers.fetch
sys.modules['charmhelpers.cli'] = charmhelpers.cli
sys.modules['charmhelpers.contrib.hahelpers'] = charmhelpers.contrib.hahelpers
sys.modules['charmhelpers.contrib.hahelpers.cluster'] = (
charmhelpers.contrib.hahelpers.cluster)
def _fake_retry(num_retries, base_delay=0, exc_type=Exception):
def _retry_on_exception_inner_1(f):
def _retry_on_exception_inner_2(*args, **kwargs):
return f(*args, **kwargs)
return _retry_on_exception_inner_2
return _retry_on_exception_inner_1
mock.patch(
'charmhelpers.core.decorators.retry_on_exception',
_fake_retry).start()

BIN
unit_tests/__init__.pyc Normal file

Binary file not shown.

View File

@ -0,0 +1,225 @@
from __future__ import absolute_import
from __future__ import print_function
import unittest
import mock
import reactive.designate_bind_handlers as handlers
_when_args = {}
_when_not_args = {}
def mock_hook_factory(d):
def mock_hook(*args, **kwargs):
def inner(f):
# remember what we were passed. Note that we can't actually
# determine the class we're attached to, as the decorator only gets
# the function.
try:
d[f.__name__].append(dict(args=args, kwargs=kwargs))
except KeyError:
d[f.__name__] = [dict(args=args, kwargs=kwargs)]
return f
return inner
return mock_hook
class TestDesignateHandlers(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls._patched_when = mock.patch('charms.reactive.when',
mock_hook_factory(_when_args))
cls._patched_when_started = cls._patched_when.start()
cls._patched_when_not = mock.patch('charms.reactive.when_not',
mock_hook_factory(_when_not_args))
cls._patched_when_not_started = cls._patched_when_not.start()
# force requires to rerun the mock_hook decorator:
# try except is Python2/Python3 compatibility as Python3 has moved
# reload to importlib.
try:
reload(handlers)
except NameError:
import importlib
importlib.reload(handlers)
@classmethod
def tearDownClass(cls):
cls._patched_when.stop()
cls._patched_when_started = None
cls._patched_when = None
cls._patched_when_not.stop()
cls._patched_when_not_started = None
cls._patched_when_not = None
# and fix any breakage we did to the module
try:
reload(handlers)
except NameError:
import importlib
importlib.reload(handlers)
def setUp(self):
self._patches = {}
self._patches_start = {}
def tearDown(self):
for k, v in self._patches.items():
v.stop()
setattr(self, k, None)
self._patches = None
self._patches_start = None
def patch(self, obj, attr, return_value=None):
mocked = mock.patch.object(obj, attr)
self._patches[attr] = mocked
started = mocked.start()
started.return_value = return_value
self._patches_start[attr] = started
setattr(self, attr, started)
def test_registered_hooks(self):
# test that the hooks actually registered the relation expressions that
# are meaningful for this interface: this is to handle regressions.
# The keys are the function names that the hook attaches to.
when_patterns = {
'setup_sync_target_alone': [('installed', )],
'send_info': [
('dns-backend.related', ),
('rndckey.available', ),
],
'config_changed': [
('dns-backend.related', ),
('rndckey.available', ),
],
'update_zones_from_peer': [
('cluster.connected', ),
('sync.request.sent', ),
],
'check_zone_status': [
('cluster.connected', ),
('installed', ),
],
'process_sync_requests': [
('cluster.connected', ),
('zones.initialised', ),
],
}
when_not_patterns = {
'install_packages': [('installed', )],
'setup_secret': [('rndckey.available', )],
'update_zones_from_peer': [('zones.initialised', )],
'setup_sync_target_alone': [
('cluster.connected', ),
('zones.initialised', ),
('sync.request.sent', ),
],
'check_zone_status': [
('zones.initialised', ),
('sync.request.sent', ),
],
}
# check the when hooks are attached to the expected functions
for t, p in [(_when_args, when_patterns),
(_when_not_args, when_not_patterns)]:
for f, args in t.items():
# check that function is in patterns
self.assertTrue(f in p.keys())
# check that the lists are equal
l = [a['args'] for a in args]
self.assertEqual(l, p[f])
def test_install_packages(self):
self.patch(handlers.designate_bind, 'install')
self.patch(handlers.designate_bind, 'set_apparmor')
self.patch(handlers.reactive, 'set_state')
handlers.install_packages()
self.install.assert_called_once_with()
self.set_apparmor.assert_called_once_with()
self.set_state.assert_called_once_with('installed')
def test_setup_secret(self):
self.patch(handlers.designate_bind, 'init_rndckey')
self.patch(handlers.reactive, 'set_state')
self.init_rndckey.return_value = None
handlers.setup_secret()
self.assertFalse(self.set_state.called)
self.init_rndckey.return_value = 'secret'
handlers.setup_secret()
self.set_state.assert_called_with('rndckey.available')
def test_setup_info(self):
dnsclient = mock.MagicMock()
self.patch(handlers.designate_bind, 'get_rndc_secret')
self.patch(handlers.designate_bind, 'get_rndc_algorithm')
self.get_rndc_secret.return_value = 'secret'
self.get_rndc_algorithm.return_value = 'hmac-md5'
handlers.send_info(dnsclient)
dnsclient.send_rndckey_info.assert_called_once_with(
'secret',
'hmac-md5')
def test_config_changed(self):
self.patch(handlers.designate_bind, 'set_apparmor')
self.patch(handlers.designate_bind, 'render_all_configs')
handlers.config_changed('arg1', 'arg2')
self.set_apparmor.assert_called_once_with()
self.render_all_configs.assert_called_once_with(('arg1', 'arg2', ))
def test_setup_sync_target_alone(self):
self.patch(handlers.hookenv, 'is_leader')
self.patch(handlers.designate_bind, 'setup_sync')
self.patch(handlers.reactive, 'set_state')
self.is_leader.return_value = False
handlers.setup_sync_target_alone()
self.assertFalse(self.setup_sync.called)
self.assertFalse(self.set_state.called)
self.is_leader.return_value = True
handlers.setup_sync_target_alone()
self.setup_sync.assert_called_once_with()
self.set_state.assert_called_once_with('zones.initialised')
def test_update_zones_from_peer(self):
self.patch(handlers.designate_bind, 'retrieve_zones')
handlers.update_zones_from_peer('hacluster')
self.retrieve_zones.assert_called_once_with('hacluster')
def test_check_zone_status(self):
self.patch(handlers.hookenv, 'is_leader')
self.patch(handlers.reactive, 'set_state')
self.patch(handlers.designate_bind, 'get_sync_time')
self.patch(handlers.designate_bind, 'retrieve_zones')
self.patch(handlers.designate_bind, 'setup_sync')
self.patch(handlers.designate_bind, 'request_sync')
# Leader test: Retrieve sync
self.is_leader.return_value = True
self.get_sync_time.return_value = 100
handlers.check_zone_status('hacluster')
self.retrieve_zones.assert_called_once_with()
self.retrieve_zones.reset_mock()
# Leader test: Setup sync
self.is_leader.return_value = True
self.get_sync_time.return_value = None
handlers.check_zone_status('hacluster')
self.assertFalse(self.retrieve_zones.called)
self.setup_sync.assert_called_once_with()
self.set_state.assert_called_once_with('zones.initialised')
# Non-Leader test
self.is_leader.return_value = False
handlers.check_zone_status('hacluster')
self.request_sync.assert_called_once_with('hacluster')
def test_process_sync_requests(self):
self.patch(handlers.hookenv, 'is_leader')
self.patch(handlers.designate_bind, 'process_requests')
self.is_leader.return_value = False
handlers.process_sync_requests('hacluster')
self.assertFalse(self.process_requests.called)
self.process_requests.reset_mock()
self.is_leader.return_value = True
handlers.process_sync_requests('hacluster')
self.process_requests.assert_called_once_with('hacluster')

Binary file not shown.

View File

@ -0,0 +1,443 @@
# Copyright 2016 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import print_function
import unittest
import mock
import charm.openstack.designate_bind as designate_bind
def FakeConfig(init_dict):
def _config(key=None):
return init_dict[key] if key else init_dict
return _config
class Helper(unittest.TestCase):
def setUp(self):
self._patches = {}
self._patches_start = {}
self.ch_config_patch = mock.patch('charmhelpers.core.hookenv.config')
self.ch_config = self.ch_config_patch.start()
self.ch_config.side_effect = lambda: {'ssl_param': None}
def tearDown(self):
for k, v in self._patches.items():
v.stop()
setattr(self, k, None)
self._patches = None
self._patches_start = None
self.ch_config_patch.stop()
def patch(self, obj, attr, return_value=None, **kwargs):
mocked = mock.patch.object(obj, attr, **kwargs)
self._patches[attr] = mocked
started = mocked.start()
started.return_value = return_value
self._patches_start[attr] = started
setattr(self, attr, started)
def patch_object(self, obj, attr, return_value=None, name=None, new=None):
if name is None:
name = attr
if new is not None:
mocked = mock.patch.object(obj, attr, new=new)
else:
mocked = mock.patch.object(obj, attr)
self._patches[name] = mocked
started = mocked.start()
if new is None:
started.return_value = return_value
self._patches_start[name] = started
setattr(self, name, started)
class TestOpenStackDesignateBind(Helper):
def test_install(self):
self.patch(designate_bind.DesignateBindCharm.singleton, 'install')
designate_bind.install()
self.install.assert_called_once_with()
def test_init_rndckey(self):
self.patch(designate_bind.DesignateBindCharm.singleton, 'init_rndckey')
designate_bind.init_rndckey()
self.init_rndckey.assert_called_once_with()
def test_get_rndc_secret(self):
self.patch(
designate_bind.DesignateBindCharm.singleton,
'get_rndc_secret')
designate_bind.get_rndc_secret()
self.get_rndc_secret.assert_called_once_with()
def test_get_rndc_algorithm(self):
self.patch(
designate_bind.DesignateBindCharm.singleton,
'get_rndc_algorithm')
designate_bind.get_rndc_algorithm()
self.get_rndc_algorithm.assert_called_once_with()
def test_get_sync_time(self):
self.patch(
designate_bind.DesignateBindCharm.singleton,
'get_sync_time')
designate_bind.get_sync_time()
self.get_sync_time.assert_called_once_with()
def test_setup_sync(self):
self.patch(designate_bind.DesignateBindCharm.singleton, 'setup_sync')
designate_bind.setup_sync()
self.setup_sync.assert_called_once_with()
def test_retrieve_zones(self):
self.patch(
designate_bind.DesignateBindCharm.singleton,
'retrieve_zones')
designate_bind.retrieve_zones('hacluster')
self.retrieve_zones.assert_called_once_with('hacluster')
def test_request_sync(self):
self.patch(
designate_bind.DesignateBindCharm.singleton,
'request_sync')
designate_bind.request_sync('hacluster')
self.request_sync.assert_called_once_with('hacluster')
def test_process_requests(self):
self.patch(
designate_bind.DesignateBindCharm.singleton,
'process_requests')
designate_bind.process_requests('hacluster')
self.process_requests.assert_called_once_with('hacluster')
def test_render_all_configs(self):
self.patch(
designate_bind.DesignateBindCharm.singleton,
'render_with_interfaces')
designate_bind.render_all_configs('interface_list')
self.render_with_interfaces.assert_called_once_with('interface_list')
class TestDNSAdapter(Helper):
def test_control_listen_ip(self):
relation = mock.MagicMock()
self.patch(designate_bind.hookenv, 'unit_private_ip')
self.unit_private_ip.return_value = 'ip1'
a = designate_bind.DNSAdapter(relation)
self.assertEqual(a.control_listen_ip, 'ip1')
def test_control_ips(self):
relation = mock.MagicMock()
relation.client_ips.return_value = ['ip1', 'ip2']
a = designate_bind.DNSAdapter(relation)
self.assertEqual(a.control_ips, 'ip1;ip2')
def test_algorithm(self):
relation = mock.MagicMock()
self.patch(designate_bind.DesignateBindCharm, 'get_rndc_algorithm')
self.get_rndc_algorithm.return_value = 'algo1'
a = designate_bind.DNSAdapter(relation)
self.assertEqual(a.algorithm, 'algo1')
def test_secret(self):
relation = mock.MagicMock()
self.patch(designate_bind.DesignateBindCharm, 'get_rndc_secret')
self.get_rndc_secret.return_value = 'secret1'
a = designate_bind.DNSAdapter(relation)
self.assertEqual(a.secret, 'secret1')
class TestBindAdapters(Helper):
def test_bind_adapters(self):
dns_backend_relation = mock.MagicMock()
dns_backend_relation.relation_name = 'dns_backend'
b = designate_bind.BindAdapters([dns_backend_relation])
# ensure that the relevant things got put on.
self.assertTrue(
isinstance(
b.dns_backend,
designate_bind.adapters.OpenStackRelationAdapter))
class TestDesignateBindCharm(Helper):
def test_get_rndc_algorithm(self):
self.assertEqual(
designate_bind.DesignateBindCharm.get_rndc_algorithm(),
'hmac-md5'
)
def test_get_rndc_secret(self):
self.patch(designate_bind.hookenv, 'leader_get')
self.leader_get.return_value = 'secret1'
self.assertEqual(
designate_bind.DesignateBindCharm.get_rndc_secret(),
'secret1'
)
def test_get_sync_src(self):
self.patch(designate_bind.hookenv, 'leader_get')
self.leader_get.return_value = 'http://ip1/my.tar'
self.assertEqual(
designate_bind.DesignateBindCharm.get_sync_src(),
'http://ip1/my.tar'
)
def test_get_sync_time(self):
self.patch(designate_bind.hookenv, 'leader_get')
self.leader_get.return_value = '100'
self.assertEqual(
designate_bind.DesignateBindCharm.get_sync_time(),
'100'
)
def test_process_requests(self):
hacluster = mock.MagicMock()
self.patch(designate_bind.hookenv, 'log')
self.patch(designate_bind.DesignateBindCharm, 'setup_sync')
self.patch(designate_bind.DesignateBindCharm, 'get_sync_time')
a = designate_bind.DesignateBindCharm()
# No queued requests
hacluster.retrieve_remote.return_value = []
self.get_sync_time.return_value = 20
a.process_requests(hacluster)
self.assertFalse(self.setup_sync.called)
# No request since last sync
self.setup_sync.reset_mock()
hacluster.retrieve_remote.return_value = ['10']
self.get_sync_time.return_value = 20
a.process_requests(hacluster)
self.assertFalse(self.setup_sync.called)
# New request present
self.setup_sync.reset_mock()
hacluster.retrieve_remote.return_value = ['10', '30']
self.get_sync_time.return_value = 20
a.process_requests(hacluster)
self.assertTrue(self.setup_sync.called)
def test_set_sync_info(self):
self.patch(designate_bind.hookenv, 'leader_set')
self.patch(designate_bind.hookenv, 'unit_private_ip')
self.unit_private_ip.return_value = 'ip1'
a = designate_bind.DesignateBindCharm()
a.set_sync_info('20', '/tmp/tarball.tar')
self.leader_set.assert_called_once_with({
'sync_time': '20',
'sync_src': 'http://ip1:80/zone-syncs//tmp/tarball.tar'})
def test_generate_rndc_key(self):
hmac_mock = mock.MagicMock()
self.patch(designate_bind.os, 'urandom', return_value='seed')
self.patch(designate_bind.hmac, 'new', return_value=hmac_mock)
self.patch(designate_bind.base64, 'b64encode', return_value=hmac_mock)
self.patch_object(designate_bind.hashlib, 'md5', new='md5lib')
a = designate_bind.DesignateBindCharm()
a.generate_rndc_key()
self.new.assert_called_once_with(
'seed',
digestmod='md5lib',
msg=b'RNDC Secret')
def test_init_rndckey(self):
self.patch(designate_bind.hookenv, 'log')
self.patch(designate_bind.DesignateBindCharm, 'get_rndc_secret')
self.patch(designate_bind.DesignateBindCharm, 'generate_rndc_key')
self.patch(designate_bind.hookenv, 'leader_set')
self.patch(designate_bind.hookenv, 'is_leader')
a = designate_bind.DesignateBindCharm()
# Test secret already stored
self.get_rndc_secret.return_value = 'mysecret'
self.assertEqual(a.init_rndckey(), 'mysecret')
# Test need new secret (Leader)
self.get_rndc_secret.return_value = None
self.generate_rndc_key.return_value = 'newsecret'
self.is_leader.return_value = True
self.assertEqual(a.init_rndckey(), 'newsecret')
self.leader_set.assert_called_once_with({'rndc_key': 'newsecret'})
# Test need new secret (Not Leader)
self.get_rndc_secret.return_value = None
self.is_leader.return_value = False
self.assertEqual(a.init_rndckey(), None)
def test_create_zone_tarball(self):
self.patch(designate_bind.glob, 'glob')
self.patch(designate_bind.subprocess, 'check_call')
_files = {
'/var/cache/bind/juju*': ['jujufile1'],
'/var/cache/bind/slave*': ['slavefile1'],
'/var/cache/bind/*nzf': ['nsffile']}
self.glob.side_effect = lambda x: _files[x]
a = designate_bind.DesignateBindCharm()
a.create_zone_tarball('/tmp/tarball.tar')
self.check_call.assert_called_once_with([
'tar', 'zcvf', '/tmp/tarball.tar', 'jujufile1', 'slavefile1',
'nsffile'], cwd='/var/cache/bind')
def test_setup_sync_dir(self):
self.patch(designate_bind.os, 'mkdir')
self.patch(designate_bind.os, 'chmod')
a = designate_bind.DesignateBindCharm()
a.setup_sync_dir('100')
self.mkdir.assert_called_once_with('/var/www/html/zone-syncs', 493)
self.assertFalse(self.chmod.called)
# Test dir does not exist
self.mkdir.side_effect = FileExistsError
a.setup_sync_dir('100')
self.chmod.assert_called_once_with('/var/www/html/zone-syncs', 493)
def test_create_sync_src_info_file(self):
self.patch(designate_bind.hookenv, 'local_unit', return_value='unit/1')
a = designate_bind.DesignateBindCharm()
with mock.patch('builtins.open') as bob:
a.create_sync_src_info_file()
bob.assert_called_once_with(
'/var/cache/bind/juju-zone-src-unit_1',
'w+')
def test_setup_sync(self):
self.patch(designate_bind.hookenv, 'log')
self.patch(designate_bind.DesignateBindCharm, 'setup_sync_dir')
self.patch(designate_bind.time, 'time')
self.patch(
designate_bind.DesignateBindCharm,
'create_sync_src_info_file')
self.patch(designate_bind.DesignateBindCharm, 'service_control')
self.patch(designate_bind.DesignateBindCharm, 'create_zone_tarball')
self.patch(designate_bind.DesignateBindCharm, 'set_sync_info')
self.setup_sync_dir.return_value = '/tmp/zonefiles'
self.time.return_value = 100
a = designate_bind.DesignateBindCharm()
a.setup_sync()
self.setup_sync_dir.assert_called_once_with('100')
self.create_sync_src_info_file.assert_called_once_with()
ctrl_calls = [
mock.call('stop', ['bind9']),
mock.call('start', ['bind9'])]
self.service_control.assert_has_calls(ctrl_calls)
self.create_zone_tarball.assert_called_once_with(
'/tmp/zonefiles/100.tar.gz')
self.set_sync_info.assert_called_once_with('100', '100.tar.gz')
def test_service_control(self):
self.patch(designate_bind.host, 'service_stop')
a = designate_bind.DesignateBindCharm()
a.service_control('stop', ['svc1', 'svc2'])
ctrl_calls = [
mock.call('svc1'),
mock.call('svc2')]
self.service_stop.assert_has_calls(ctrl_calls)
def test_request_sync(self):
self.patch(designate_bind.time, 'time')
relation = mock.MagicMock()
self.patch(designate_bind.reactive, 'set_state')
self.time.return_value = 100
a = designate_bind.DesignateBindCharm()
a.request_sync(relation)
relation.send_all.assert_called_once_with(
{'sync_request': '100'},
store_local=True)
self.set_state.assert_called_once_with('sync.request.sent')
def test_wget_file(self):
# retry_on_exception patched out in __init__.py
self.patch(designate_bind.subprocess, 'check_call')
a = designate_bind.DesignateBindCharm()
a.wget_file('http://ip1/tarfile.tar', '/tmp')
self.check_call.assert_called_once_with(
['wget', 'http://ip1/tarfile.tar', '--retry-connrefused', '-t',
'10'],
cwd='/tmp'
)
def test_retrieve_zones_cluster_relation(self):
relation = mock.MagicMock()
self.patch(designate_bind.DesignateBindCharm, 'get_sync_time')
self.patch(designate_bind.DesignateBindCharm, 'get_sync_src')
self.patch(designate_bind.DesignateBindCharm, 'service_control')
self.patch(designate_bind.hookenv, 'log')
self.patch(designate_bind.reactive, 'set_state')
self.patch(designate_bind.reactive, 'remove_state')
self.patch(designate_bind.os, 'remove')
self.patch(designate_bind.subprocess, 'check_call')
self.patch_object(designate_bind.DesignateBindCharm, 'wget_file')
self.get_sync_src.return_value = 'http://ip1/tarfile.tar'
ctrl_calls = [
mock.call('stop', ['bind9']),
mock.call('start', ['bind9'])]
a = designate_bind.DesignateBindCharm()
# Using cluster_relation, no sync needed
relation.retrieve_local.return_value = '30'
self.get_sync_time.return_value = '20'
a.retrieve_zones(relation)
self.assertFalse(self.service_control.called)
# Using cluster_relation, sync needed
self.service_control.reset_mock()
relation.retrieve_local.return_value = '10'
self.get_sync_time.return_value = '20'
a.retrieve_zones(relation)
self.service_control.assert_has_calls(ctrl_calls)
self.check_call.assert_called_once_with(
['tar', 'xf', 'tarfile.tar'], cwd='/var/cache/bind')
self.wget_file.assert_called_once_with(
'http://ip1/tarfile.tar',
'/var/cache/bind')
def test_retrieve_zones_no_cluster_relation(self):
self.patch(designate_bind.DesignateBindCharm, 'get_sync_time')
self.patch(designate_bind.DesignateBindCharm, 'get_sync_src')
self.patch(designate_bind.DesignateBindCharm, 'service_control')
self.patch(designate_bind.hookenv, 'log')
self.patch(designate_bind.reactive, 'set_state')
self.patch(designate_bind.reactive, 'remove_state')
self.patch(designate_bind.os, 'remove')
self.patch(designate_bind.subprocess, 'check_call')
self.patch_object(designate_bind.DesignateBindCharm, 'wget_file')
self.get_sync_src.return_value = 'http://ip1/tarfile.tar'
ctrl_calls = [
mock.call('stop', ['bind9']),
mock.call('start', ['bind9'])]
a = designate_bind.DesignateBindCharm()
self.get_sync_time.return_value = '20'
a.retrieve_zones()
self.service_control.assert_has_calls(ctrl_calls)
self.check_call.assert_called_once_with(
['tar', 'xf', 'tarfile.tar'], cwd='/var/cache/bind')
self.wget_file.assert_called_once_with(
'http://ip1/tarfile.tar',
'/var/cache/bind')
def test_set_apparmor(self):
self.patch(designate_bind.os.path, 'isfile')
a = designate_bind.DesignateBindCharm()
self.isfile.return_value = True
with mock.patch('builtins.open') as bob:
a.set_apparmor()
self.assertFalse(bob.called)
self.isfile.return_value = False
with mock.patch('builtins.open') as bob:
a.set_apparmor()
bob.assert_called_once_with(
'/etc/apparmor.d/disable/usr.sbin.named',
'w')