# Copyright 2016 Canonical Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import unittest from unittest import mock import sys # Modules imported from other interfaces/layers need to be mocked sys.modules[ 'relations.hacluster.interface_hacluster.common' ] = mock.MagicMock() import reactive.designate_bind_handlers as handlers _when_args = {} _when_not_args = {} def mock_hook_factory(d): def mock_hook(*args, **kwargs): def inner(f): # remember what we were passed. Note that we can't actually # determine the class we're attached to, as the decorator only gets # the function. try: d[f.__name__].append(dict(args=args, kwargs=kwargs)) except KeyError: d[f.__name__] = [dict(args=args, kwargs=kwargs)] return f return inner return mock_hook class TestDesignateHandlers(unittest.TestCase): @classmethod def setUpClass(cls): cls._patched_when = mock.patch('charms.reactive.when', mock_hook_factory(_when_args)) cls._patched_when_started = cls._patched_when.start() cls._patched_when_not = mock.patch('charms.reactive.when_not', mock_hook_factory(_when_not_args)) cls._patched_when_not_started = cls._patched_when_not.start() # force requires to rerun the mock_hook decorator: # try except is Python2/Python3 compatibility as Python3 has moved # reload to importlib. try: reload(handlers) except NameError: import importlib importlib.reload(handlers) @classmethod def tearDownClass(cls): cls._patched_when.stop() cls._patched_when_started = None cls._patched_when = None cls._patched_when_not.stop() cls._patched_when_not_started = None cls._patched_when_not = None # and fix any breakage we did to the module try: reload(handlers) except NameError: import importlib importlib.reload(handlers) def setUp(self): self._patches = {} self._patches_start = {} def tearDown(self): for k, v in self._patches.items(): v.stop() setattr(self, k, None) self._patches = None self._patches_start = None def patch(self, obj, attr, return_value=None): mocked = mock.patch.object(obj, attr) self._patches[attr] = mocked started = mocked.start() started.return_value = return_value self._patches_start[attr] = started setattr(self, attr, started) def test_registered_hooks(self): # test that the hooks actually registered the relation expressions that # are meaningful for this interface: this is to handle regressions. # The keys are the function names that the hook attaches to. when_patterns = { 'setup_sync_target_alone': [('installed', )], 'send_info': [ ('dns-backend.related', ), ('rndckey.available', ), ], 'config_changed': [ ('dns-backend.related', ), ('rndckey.available', ), ], 'update_zones_from_peer': [ ('cluster.connected', ), ('sync.request.sent', ), ], 'check_zone_status': [ ('cluster.connected', ), ('installed', ), ], 'process_sync_requests': [ ('cluster.connected', ), ('zones.initialised', ), ], 'assess_status': [('zones.initialised', )], 'service_ips_changed': [('config.changed.service_ips', )], 'hacluster_connected': [('ha.connected', )], 'hacluster_departed': [('ha-relation-departed',)], } when_not_patterns = { 'install_packages': [('installed', )], 'setup_secret': [('rndckey.available', )], 'update_zones_from_peer': [('zones.initialised', )], 'setup_sync_target_alone': [ ('cluster.connected', ), ('zones.initialised', ), ('sync.request.sent', ), ], 'check_zone_status': [ ('zones.initialised', ), ('sync.request.sent', ), ], } # check the when hooks are attached to the expected functions for t, p in [(_when_args, when_patterns), (_when_not_args, when_not_patterns)]: for f, args in t.items(): # check that function is in patterns print(f) self.assertTrue(f in p.keys()) # check that the lists are equal newlist = [a['args'] for a in args] self.assertEqual(newlist, p[f]) def test_install_packages(self): self.patch(handlers.designate_bind, 'install') self.patch(handlers.designate_bind, 'set_apparmor') self.patch(handlers.reactive, 'set_state') handlers.install_packages() self.install.assert_called_once_with() self.set_apparmor.assert_called_once_with() self.set_state.assert_called_once_with('installed') def test_setup_secret(self): self.patch(handlers.designate_bind, 'init_rndckey') self.patch(handlers.reactive, 'set_state') self.init_rndckey.return_value = None handlers.setup_secret() self.assertFalse(self.set_state.called) self.init_rndckey.return_value = 'secret' handlers.setup_secret() self.set_state.assert_called_with('rndckey.available') def test_setup_info(self): dnsclient = mock.MagicMock() self.patch(handlers.designate_bind, 'get_rndc_secret') self.patch(handlers.designate_bind, 'get_rndc_algorithm') self.get_rndc_secret.return_value = 'secret' self.get_rndc_algorithm.return_value = 'hmac-md5' handlers.send_info(dnsclient) dnsclient.send_rndckey_info.assert_called_once_with( 'secret', 'hmac-md5') def test_config_changed(self): self.patch(handlers.designate_bind, 'set_apparmor') self.patch(handlers.designate_bind, 'render_all_configs') handlers.config_changed('arg1', 'arg2') self.set_apparmor.assert_called_once_with() self.render_all_configs.assert_called_once_with(('arg1', 'arg2', )) def test_setup_sync_target_alone(self): self.patch(handlers.hookenv, 'is_leader') self.patch(handlers.designate_bind, 'setup_sync') self.patch(handlers.reactive, 'set_state') self.is_leader.return_value = False handlers.setup_sync_target_alone() self.assertFalse(self.setup_sync.called) self.assertFalse(self.set_state.called) self.is_leader.return_value = True handlers.setup_sync_target_alone() self.setup_sync.assert_called_once_with() self.set_state.assert_called_once_with('zones.initialised') def test_update_zones_from_peer(self): self.patch(handlers.designate_bind, 'retrieve_zones') handlers.update_zones_from_peer('hacluster') self.retrieve_zones.assert_called_once_with('hacluster') def test_check_zone_status(self): self.patch(handlers.hookenv, 'is_leader') self.patch(handlers.reactive, 'set_state') self.patch(handlers.designate_bind, 'get_sync_time') self.patch(handlers.designate_bind, 'retrieve_zones') self.patch(handlers.designate_bind, 'setup_sync') self.patch(handlers.designate_bind, 'request_sync') # Leader test: Retrieve sync self.is_leader.return_value = True self.get_sync_time.return_value = 100 handlers.check_zone_status('hacluster') self.retrieve_zones.assert_called_once_with() self.retrieve_zones.reset_mock() # Leader test: Setup sync self.is_leader.return_value = True self.get_sync_time.return_value = None handlers.check_zone_status('hacluster') self.assertFalse(self.retrieve_zones.called) self.setup_sync.assert_called_once_with() self.set_state.assert_called_once_with('zones.initialised') # Non-Leader test self.is_leader.return_value = False handlers.check_zone_status('hacluster') self.request_sync.assert_called_once_with('hacluster') def test_process_sync_requests(self): self.patch(handlers.hookenv, 'is_leader') self.patch(handlers.designate_bind, 'process_requests') self.is_leader.return_value = False handlers.process_sync_requests('hacluster') self.assertFalse(self.process_requests.called) self.process_requests.reset_mock() self.is_leader.return_value = True handlers.process_sync_requests('hacluster') self.process_requests.assert_called_once_with('hacluster') def test_configure_service_ips(self): """Test that change of 'service_ips' triggers reconfiguration.""" self.patch(handlers.designate_bind, 'reconfigure_service_ips') handlers.service_ips_changed() handlers.designate_bind.reconfigure_service_ips.assert_called_once() def test_hacluster_connected_reconfigure_service_ips(self): """Test that hacluster connect hook configures 'service_ips' on join. When relation with hacluster is joined and 'service_ips' are waiting for configuration, 'reconfigure_service_ips' should be trigerred. """ self.patch(handlers.designate_bind, 'reconfigure_service_ips') self.patch(handlers.reactive, 'is_flag_set', True) self.patch(handlers.reactive, 'clear_flag') handlers.hacluster_connected(None) handlers.designate_bind.reconfigure_service_ips.assert_called_once() handlers.reactive.clear_flag.assert_called_once_with( handlers.designate_bind.AWAITING_HACLUSTER_FLAG ) def test_hacluster_connect_no_service_ips(self): """Test that hacluster connect hook passes if there are no IPs. If there are no service_ips configured when the hacluster joins, this hook should not execute anything. """ self.patch(handlers.designate_bind, 'reconfigure_service_ips') self.patch(handlers.reactive, 'is_flag_set', False) self.patch(handlers.reactive, 'clear_flag') handlers.hacluster_connected(None) handlers.designate_bind.reconfigure_service_ips.assert_not_called()