# Copyright 2016 Canonical Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import builtins import collections import copy from mock import ANY, patch, call, MagicMock, mock_open, Mock import json import os import subprocess import time import charmhelpers.contrib.openstack.utils as os_utils from test_utils import CharmTestCase import keystone_types as ks_types os.environ['JUJU_UNIT_NAME'] = 'keystone' with patch('charmhelpers.core.hookenv.config') as config, \ patch('charmhelpers.contrib.openstack.' 'utils.snap_install_requested', Mock(return_value=False)): import importlib import keystone_utils as utils # we have to force utils to reload as another test module may already have # pulled it in, and thus all this fancy patching will just fail importlib.reload(utils) TO_PATCH = [ 'api_port', 'config', 'os_release', 'log', 'create_role', 'create_service_entry', 'create_endpoint_template', 'get_local_endpoint', 'get_requested_roles', 'get_service_password', 'get_os_codename_install_source', 'grant_role', 'configure_installation_source', 'https', 'lsb_release', 'peer_store_and_set', 'service_stop', 'service_start', 'snap_install_requested', 'relation_get', 'relation_set', 'relation_ids', 'relation_id', 'local_unit', 'related_units', 'https', 'mkdir', 'write_file', # generic 'apt_update', 'apt_upgrade', 'apt_install', 'subprocess', 'time', 'pwgen', 'os_application_version_set', 'os_application_status_set', 'reset_os_release', ] class TestKeystoneUtils(CharmTestCase): def setUp(self): super(TestKeystoneUtils, self).setUp(utils, TO_PATCH) self.config.side_effect = self.test_config.get self.snap_install_requested.return_value = False self.ctxt = MagicMock() self.rsc_map = { '/etc/keystone/keystone.conf': { 'services': ['keystone'], 'contexts': [self.ctxt], }, '/etc/apache2/sites-available/openstack_https_frontend': { 'services': ['apache2'], 'contexts': [self.ctxt], }, '/etc/apache2/sites-available/openstack_https_frontend.conf': { 'services': ['apache2'], 'contexts': [self.ctxt], } } self.get_os_codename_install_source.return_value = 'icehouse' @patch('charmhelpers.contrib.openstack.templating.OSConfigRenderer') @patch('os.path.exists') @patch.object(utils, 'resource_map') def test_register_configs_apache(self, resource_map, exists, renderer): exists.return_value = False self.os_release.return_value = 'havana' fake_renderer = MagicMock() fake_renderer.register = MagicMock() renderer.return_value = fake_renderer resource_map.return_value = self.rsc_map utils.register_configs() renderer.assert_called_with( openstack_release='havana', templates_dir='templates/') ex_reg = [ call('/etc/keystone/keystone.conf', [self.ctxt]), call( '/etc/apache2/sites-available/openstack_https_frontend', [self.ctxt]), call( '/etc/apache2/sites-available/openstack_https_frontend.conf', [self.ctxt]), ] fake_renderer.register.assert_has_calls(ex_reg, any_order=True) def test_resource_map_exclude_policy_json_before_liberty(self): resources = self._test_resource_map(os_release='kilo') self.assertFalse('/etc/keystone/policy.json' in resources.keys()) def test_resource_map_include_policy_json_from_liberty(self): resources = self._test_resource_map(os_release='liberty') self.assertTrue('/etc/keystone/policy.json' in resources.keys()) def test_resource_map_apache24_conf_present_if_conf_avail_present(self): resources = self._test_resource_map(os_path_return_value=True) self.assertTrue( '/etc/apache2/sites-available/openstack_https_frontend.conf' in resources.keys()) def test_resource_map_apache24_conf_absent_if_conf_avail_absent(self): resources = self._test_resource_map(os_path_return_value=False) self.assertFalse( '/etc/apache2/sites-available/openstack_https_frontend.conf' in resources.keys()) def test_resource_map_excludes_apache_files_if_using_snap(self): resources = self._test_resource_map(use_snap=True) for config_file in ( '/etc/apache2/sites-available/openstack_https_frontend', '/etc/apache2/sites-available/openstack_https_frontend.conf', ): self.assertFalse(config_file in resources.keys()) def test_resource_map_ensure_snap_includes_nginx_and_uwsgi(self): resources = self._test_resource_map(use_snap=True) required_services = ('snap.keystone.nginx', 'snap.keystone.uwsgi') for cfile in resources: services = resources[cfile]['services'] self.assertTrue(all(service in services) for service in required_services) def test_resource_map_enable_memcache_mitaka(self): resources = self._test_resource_map(os_release='mitaka') self.assertTrue('/etc/memcached.conf' in resources.keys()) def test_resource_map_enable_memcache_liberty(self): resources = self._test_resource_map(os_release='liberty') self.assertFalse('/etc/memcached.conf' in resources.keys()) @patch.object(utils, 'snap_install_requested') @patch.object(utils, 'os') def _test_resource_map(self, mock_os, snap_install_requested, os_release='mitaka', use_snap=False, os_path_return_value=False): self.os_release.return_value = os_release snap_install_requested.return_value = use_snap mock_os.path.exists.return_value = os_path_return_value resource_map = utils.resource_map() return resource_map def test_determine_ports(self): self.test_config.set('admin-port', '80') self.test_config.set('service-port', '81') result = utils.determine_ports() self.assertEqual(result, ['80', '81']) @patch.object(utils, 'get_subordinate_release_packages') @patch('charmhelpers.contrib.openstack.utils.config') def test_determine_packages( self, _config, _get_subordinate_release_packages): self.os_release.return_value = 'havana' self.snap_install_requested.return_value = False _config.return_value = None _get_subordinate_release_packages.return_value = \ os_utils.SubordinatePackages(set(), set()) result = utils.determine_packages() ex = utils.BASE_PACKAGES + ['keystone', 'python-keystoneclient'] self.assertEqual(set(ex), set(result)) _get_subordinate_release_packages.return_value = \ os_utils.SubordinatePackages(set(['sub-pkg']), set()) self.assertIn('sub-pkg', utils.determine_packages()) @patch.object(utils, 'get_subordinate_release_packages') @patch('charmhelpers.contrib.openstack.utils.config') def test_determine_packages_queens( self, _config, _get_subordinate_release_packages): self.os_release.return_value = 'queens' self.snap_install_requested.return_value = False _config.return_value = None _get_subordinate_release_packages.return_value = \ os_utils.SubordinatePackages(set(), set()) result = utils.determine_packages() ex = utils.BASE_PACKAGES + [ 'keystone', 'python-keystoneclient', 'memcached', 'libapache2-mod-wsgi' ] self.assertEqual(set(ex), set(result)) @patch.object(utils, 'get_subordinate_release_packages') @patch('charmhelpers.contrib.openstack.utils.config') def test_determine_packages_rocky( self, _config, _get_subordinate_release_packages): self.os_release.return_value = 'rocky' self.snap_install_requested.return_value = False _config.return_value = None _get_subordinate_release_packages.return_value = \ os_utils.SubordinatePackages(set(), set()) result = utils.determine_packages() ex = list(set( [p for p in utils.BASE_PACKAGES if not p.startswith('python-')] + ['memcached'] + utils.PY3_PACKAGES)) self.assertEqual(set(ex), set(result)) @patch.object(utils, 'get_subordinate_release_packages') @patch('charmhelpers.contrib.openstack.utils.config') def test_determine_packages_snap_install( self, _config, _get_subordinate_release_packages): self.os_release.return_value = 'mitaka' self.snap_install_requested.return_value = True _config.return_value = None _get_subordinate_release_packages.return_value = \ os_utils.SubordinatePackages(set(), set()) result = utils.determine_packages() ex = utils.BASE_PACKAGES_SNAP + ['memcached'] self.assertEqual(set(ex), set(result)) _get_subordinate_release_packages.return_value = \ os_utils.SubordinatePackages(set(['sub-snap']), set()) self.assertIn('sub-snap', utils.determine_packages()) @patch.object(utils, 'get_subordinate_release_packages') def test_determine_purge_packages(self, _get_subordinate_release_packages): 'Ensure no packages are identified for purge prior to rocky' self.os_release.return_value = 'queens' _get_subordinate_release_packages.return_value = \ os_utils.SubordinatePackages(set(), set()) self.assertEqual(utils.determine_purge_packages(), []) _get_subordinate_release_packages.return_value = \ os_utils.SubordinatePackages(set(), set(['sub-purge'])) self.assertEqual(utils.determine_purge_packages(), ['sub-purge']) @patch.object(utils, 'get_subordinate_release_packages') def test_determine_purge_packages_rocky( self, _get_subordinate_release_packages): 'Ensure python packages are identified for purge at rocky' self.os_release.return_value = 'rocky' _get_subordinate_release_packages.return_value = \ os_utils.SubordinatePackages(set(), set()) self.assertEqual(utils.determine_purge_packages(), sorted([p for p in utils.BASE_PACKAGES if p.startswith('python-')] + ['python-keystone', 'python-memcache'])) _get_subordinate_release_packages.return_value = \ os_utils.SubordinatePackages(set(), set(['sub-purge'])) self.assertIn('sub-purge', utils.determine_purge_packages()) @patch.object(utils, 'get_subordinate_release_packages') @patch.object(utils, 'bootstrap_keystone') @patch.object(utils, 'is_elected_leader') @patch.object(utils, 'disable_unused_apache_sites') @patch('os.path.exists') @patch.object(utils, 'run_in_apache') @patch.object(utils, 'determine_packages') @patch.object(utils, 'migrate_database') def test_openstack_upgrade_leader( self, migrate_database, determine_packages, run_in_apache, os_path_exists, disable_unused_apache_sites, mock_is_elected_leader, mock_bootstrap_keystone, mock_get_subordinate_release_packages): configs = MagicMock() self.test_config.set('openstack-origin', 'cloud:xenial-newton') self.os_release.return_value = 'ocata' determine_packages.return_value = [] os_path_exists.return_value = True run_in_apache.return_value = True mock_get_subordinate_release_packages.return_value = \ os_utils.SubordinatePackages(set(), set()) utils.do_openstack_upgrade(configs) self.get_os_codename_install_source.assert_called_with( 'cloud:xenial-newton' ) self.configure_installation_source.assert_called_with( 'cloud:xenial-newton' ) self.assertTrue(self.apt_update.called) dpkg_opts = [ '--option', 'Dpkg::Options::=--force-confnew', '--option', 'Dpkg::Options::=--force-confdef', ] self.apt_upgrade.assert_called_with( options=dpkg_opts, fatal=True, dist=True) self.apt_install.assert_called_with( packages=[], options=dpkg_opts, fatal=True) self.assertTrue(configs.set_release.called) self.assertTrue(configs.write_all.called) self.assertTrue(migrate_database.called) mock_bootstrap_keystone.assert_called_once_with(configs=ANY) disable_unused_apache_sites.assert_called_with() self.reset_os_release.assert_called() @patch.object(utils, 'leader_get') def test_is_db_initialised_true_string(self, _leader_get): _leader_get.return_value = "True" self.assertTrue(utils.is_db_initialised()) @patch.object(utils, 'leader_get') def test_is_db_initialised_true_bool(self, _leader_get): _leader_get.return_value = True self.assertTrue(utils.is_db_initialised()) @patch.object(utils, 'leader_get') def test_is_db_initialised_not_set(self, _leader_get): _leader_get.return_value = None self.assertFalse(utils.is_db_initialised()) @patch.object(utils, 'stop_manager_instance') @patch.object(utils, 'leader_set') def test_migrate_database(self, _leader_set, mock_stop_manager_instance): self.os_release.return_value = 'havana' utils.migrate_database() self.service_stop.assert_called_with('keystone') cmd = ['sudo', '-u', 'keystone', 'keystone-manage', 'db_sync'] self.subprocess.check_output.assert_called_with(cmd) self.service_start.assert_called_with('keystone') _leader_set.assert_called_with({'db-initialised': True}) mock_stop_manager_instance.assert_called_once_with() @patch.object(utils, 'get_real_role_names') @patch.object(utils, 'leader_get') @patch.object(utils, 'get_api_version') @patch.object(utils, 'get_manager') @patch.object(utils, 'resolve_address') def test_add_service_to_keystone_clustered_https_none_values( self, _resolve_address, _get_manager, _get_api_version, _leader_get, _get_real_role_names): _get_real_role_names.return_value = ['Member', 'SpecialRole'] _get_api_version.return_value = 2 _leader_get.return_value = None relation_id = 'identity-service:0' remote_unit = 'unit/0' _resolve_address.return_value = '10.10.10.10' self.https.return_value = True self.test_config.set('vip', '10.10.10.10') self.test_config.set('admin-port', 80) self.test_config.set('service-port', 81) self.get_requested_roles.return_value = ['role1', ] self.relation_get.return_value = {'service': 'keystone', 'region': 'RegionOne', 'public_url': 'None', 'admin_url': '10.0.0.2', 'internal_url': '192.168.1.2'} utils.add_service_to_keystone( relation_id=relation_id, remote_unit=remote_unit) self.assertTrue(self.https.called) self.assertTrue(self.create_role.called) relation_data = {'auth_host': '10.10.10.10', 'service_host': '10.10.10.10', 'service_protocol': 'https', 'auth_port': 80, 'auth_protocol': 'https', 'service_port': 81, 'internal_host': '10.10.10.10', 'internal_protocol': 'https', 'internal_port': 81, 'region': 'RegionOne', 'api_version': 2, 'admin_domain_id': None, 'created_roles': 'Member,SpecialRole'} self.peer_store_and_set.assert_called_with(relation_id=relation_id, **relation_data) @patch.object(utils, 'get_real_role_names') @patch.object(utils, 'is_leader') @patch.object(utils, 'leader_set') @patch.object(utils, 'leader_get') @patch.object(utils, 'get_api_version') @patch.object(utils, 'create_user') @patch.object(utils, 'resolve_address') @patch.object(utils, 'ensure_valid_service') @patch.object(utils, 'add_endpoint') @patch.object(utils, 'get_manager') def test_add_service_to_keystone_no_clustered_no_https_complete_values( self, KeystoneManager, add_endpoint, ensure_valid_service, _resolve_address, create_user, get_api_version, leader_get, leader_set, is_leader, _get_real_role_names, test_api_version=2): _get_real_role_names.return_value = ['Member', 'SpecialRole'] get_api_version.return_value = test_api_version leader_get.return_value = None is_leader.return_value = True relation_id = 'identity-service:0' remote_unit = 'unit/0' self.get_service_password.return_value = 'password' self.test_config.set('service-tenant', 'tenant') self.test_config.set('admin-role', 'Admin') self.get_requested_roles.return_value = ['role1', ] _resolve_address.return_value = '10.0.0.3' self.test_config.set('admin-port', 80) self.test_config.set('service-port', 81) self.https.return_value = False self.get_local_endpoint.return_value = 'http://localhost:80/v2.0/' self.relation_ids.return_value = ['cluster/0'] service_domain = None service_domain_id = None service_role = 'Admin' admin_project_id = None admin_user_id = None if test_api_version > 2: service_domain = 'service_domain' service_domain_id = '1234567890' admin_user_id = 'foobar-user' admin_project_id = 'tenant_id' mock_keystone = MagicMock() mock_keystone.resolve_tenant_id.return_value = 'tenant_id' mock_keystone.resolve_domain_id.return_value = service_domain_id mock_keystone.resolve_user_id.return_value = admin_user_id KeystoneManager.return_value = mock_keystone self.relation_get.return_value = {'service': 'keystone', 'region': 'RegionOne', 'public_url': '10.0.0.1', 'admin_url': '10.0.0.2', 'internal_url': '192.168.1.2'} mock_keystone.user_exists.return_value = False utils.add_service_to_keystone( relation_id=relation_id, remote_unit=remote_unit) ensure_valid_service.assert_called_with('keystone') add_endpoint.assert_called_with(region='RegionOne', service='keystone', publicurl='10.0.0.1', adminurl='10.0.0.2', internalurl='192.168.1.2') self.get_service_password.assert_called_with('keystone') create_user.assert_called_with('keystone', 'password', domain=service_domain, tenant='tenant') self.grant_role.assert_called_with('keystone', service_role, project_domain=service_domain, tenant='tenant', user_domain=service_domain) self.create_role.assert_called_with('role1', user='keystone', tenant='tenant', domain=service_domain) relation_data = {'admin_domain_id': None, 'admin_user_id': admin_user_id, 'admin_project_id': admin_project_id, 'auth_host': '10.0.0.3', 'service_host': '10.0.0.3', 'service_port': 81, 'auth_port': 80, 'internal_host': '10.0.0.3', 'internal_protocol': 'http', 'internal_port': 81, 'service_username': 'keystone', 'service_password': 'password', 'service_domain': service_domain, 'service_domain_id': service_domain_id, 'service_tenant': 'tenant', 'https_keystone': '__null__', 'ssl_cert': '__null__', 'ssl_key': '__null__', 'ca_cert': '__null__', 'auth_protocol': 'http', 'service_protocol': 'http', 'service_tenant_id': 'tenant_id', 'api_version': test_api_version, 'created_roles': 'Member,SpecialRole'} filtered = collections.OrderedDict() for k, v in relation_data.items(): if v == '__null__': filtered[k] = None else: filtered[k] = v self.assertTrue(self.relation_set.called) self.peer_store_and_set.assert_called_with(relation_id=relation_id, **relation_data) self.relation_set.assert_called_with(relation_id=relation_id, **filtered) if test_api_version > 2: mock_keystone.resolve_domain_id.assert_called_with(service_domain) def test_add_service_to_keystone_no_clustered_no_https_complete_values_v3( self): return self.\ test_add_service_to_keystone_no_clustered_no_https_complete_values( test_api_version=3) @patch.object(utils, 'leader_set') @patch.object(utils, 'is_leader') @patch.object(utils, 'leader_get') @patch('charmhelpers.contrib.openstack.ip.config') @patch.object(utils, 'ensure_valid_service') @patch.object(utils, 'add_endpoint') @patch.object(utils, 'get_manager') def test_add_service_to_keystone_nosubset( self, KeystoneManager, add_endpoint, ensure_valid_service, ip_config, leader_get, is_leader, leader_set): relation_id = 'identity-service:0' remote_unit = 'unit/0' self.relation_get.return_value = {'ec2_service': 'nova', 'ec2_region': 'RegionOne', 'ec2_public_url': '10.0.0.1', 'ec2_admin_url': '10.0.0.2', 'ec2_internal_url': '192.168.1.2'} self.get_local_endpoint.return_value = 'http://localhost:80/v2.0/' KeystoneManager.resolve_tenant_id.return_value = 'tenant_id' KeystoneManager.user_exists.return_value = False leader_get.return_value = None utils.add_service_to_keystone( relation_id=relation_id, remote_unit=remote_unit) ensure_valid_service.assert_called_with('nova') add_endpoint.assert_called_with(region='RegionOne', service='nova', publicurl='10.0.0.1', adminurl='10.0.0.2', internalurl='192.168.1.2', list_services=ANY, list_endpoints=ANY) @patch.object(utils, 'get_real_role_names') @patch.object(utils, 'get_requested_roles') @patch.object(utils, 'create_service_credentials') @patch.object(utils, 'leader_get') @patch('charmhelpers.contrib.openstack.ip.config') @patch.object(utils, 'ensure_valid_service') @patch.object(utils, 'add_endpoint') @patch.object(utils, 'get_manager') def test_add_service_to_keystone_multi_endpoints_bug_1739409( self, KeystoneManager, add_endpoint, ensure_valid_service, ip_config, leader_get, create_service_credentials, get_requested_roles, _get_real_role_names): _get_real_role_names.return_value = ['Member', 'SpecialRole'] relation_id = 'identity-service:8' remote_unit = 'nova-cloud-controller/0' get_requested_roles.return_value = 'role1' self.relation_get.return_value = { 'ec2_admin_url': 'http://10.5.0.16:8773/services/Cloud', 'ec2_internal_url': 'http://10.5.0.16:8773/services/Cloud', 'ec2_public_url': 'http://10.5.0.16:8773/services/Cloud', 'ec2_region': 'RegionOne', 'ec2_service': 'ec2', 'nova_admin_url': 'http://10.5.0.16:8774/v2/$(tenant_id)s', 'nova_internal_url': 'http://10.5.0.16:8774/v2/$(tenant_id)s', 'nova_public_url': 'http://10.5.0.16:8774/v2/$(tenant_id)s', 'nova_region': 'RegionOne', 'nova_service': 'nova', 'private-address': '10.5.0.16', 's3_admin_url': 'http://10.5.0.16:3333', 's3_internal_url': 'http://10.5.0.16:3333', 's3_public_url': 'http://10.5.0.16:3333', 's3_region': 'RegionOne', 's3_service': 's3'} self.get_local_endpoint.return_value = 'http://localhost:80/v2.0/' KeystoneManager.resolve_tenant_id.return_value = 'tenant_id' leader_get.return_value = None utils.add_service_to_keystone( relation_id=relation_id, remote_unit=remote_unit) create_service_credentials.assert_called_once_with( 'ec2_nova_s3', new_roles='role1') @patch.object(utils, 'set_service_password') @patch.object(utils, 'get_service_password') @patch.object(utils, 'user_exists') @patch.object(utils, 'grant_role') @patch.object(utils, 'create_role') @patch.object(utils, 'create_user') def test_create_user_credentials_no_roles(self, mock_create_user, mock_create_role, mock_grant_role, mock_user_exists, get_callback, set_callback): mock_user_exists.return_value = False get_callback.return_value = 'passA' utils.create_user_credentials('userA', get_callback, set_callback, tenant='tenantA') mock_create_user.assert_has_calls([call('userA', 'passA', domain=None, tenant='tenantA')]) mock_create_role.assert_has_calls([]) mock_grant_role.assert_has_calls([]) @patch.object(utils, 'set_service_password') @patch.object(utils, 'get_service_password') @patch.object(utils, 'user_exists') @patch.object(utils, 'grant_role') @patch.object(utils, 'create_role') @patch.object(utils, 'create_user') def test_create_user_credentials(self, mock_create_user, mock_create_role, mock_grant_role, mock_user_exists, get_callback, set_callback): mock_user_exists.return_value = False get_callback.return_value = 'passA' utils.create_user_credentials('userA', get_callback, set_callback, tenant='tenantA', grants=['roleA'], new_roles=['roleB']) mock_create_user.assert_has_calls([call('userA', 'passA', tenant='tenantA', domain=None)]) mock_create_role.assert_has_calls([call('roleB', user='userA', tenant='tenantA', domain=None)]) mock_grant_role.assert_has_calls([call('userA', 'roleA', tenant='tenantA', user_domain=None, project_domain=None)]) @patch.object(utils, 'is_password_changed', lambda x, y: True) @patch.object(utils, 'set_service_password') @patch.object(utils, 'get_service_password') @patch.object(utils, 'update_user_password') @patch.object(utils, 'user_exists') @patch.object(utils, 'grant_role') @patch.object(utils, 'create_role') @patch.object(utils, 'create_user') def test_create_user_credentials_user_exists(self, mock_create_user, mock_create_role, mock_grant_role, mock_user_exists, mock_update_user_password, get_callback, set_callback, test_api_version=2): domain = None if test_api_version > 2: domain = 'admin_domain' mock_user_exists.return_value = True get_callback.return_value = 'passA' utils.create_user_credentials('userA', get_callback, set_callback, tenant='tenantA', grants=['roleA'], new_roles=['roleB'], domain=domain) mock_create_user.assert_has_calls([]) mock_create_role.assert_has_calls([call('roleB', user='userA', tenant='tenantA', domain=domain)]) mock_grant_role.assert_has_calls([call('userA', 'roleA', tenant='tenantA', user_domain=domain, project_domain=domain)]) mock_update_user_password.assert_has_calls([call('userA', 'passA', domain)]) def test_create_user_credentials_user_exists_v3(self): self.test_create_user_credentials_user_exists(test_api_version=3) @patch.object(utils, 'set_service_password') @patch.object(utils, 'get_service_password') @patch.object(utils, 'create_user_credentials') def test_create_service_credentials(self, mock_create_user_credentials, get_callback, set_callback): get_callback.return_value = 'passA' cfg = {'service-tenant': 'tenantA', 'admin-role': 'Admin', 'preferred-api-version': 2} self.config.side_effect = lambda key: cfg.get(key, None) calls = [call('serviceA', get_callback, set_callback, domain=None, grants=['Admin'], new_roles=None, tenant='tenantA')] utils.create_service_credentials('serviceA') mock_create_user_credentials.assert_has_calls(calls) @patch.object(utils, 'protect_user_account_from_pci_dss_force_change_password') @patch.object(utils, 'set_service_password') @patch.object(utils, 'get_service_password') @patch.object(utils, 'create_user_credentials') def test_create_service_credentials_v3(self, mock_create_user_credentials, get_callback, set_callback, mock_protect_user_accounts): get_callback.return_value = 'passA' cfg = {'service-tenant': 'tenantA', 'admin-role': 'Admin', 'preferred-api-version': 3} self.config.side_effect = lambda key: cfg.get(key, None) calls = [ call('serviceA', get_callback, set_callback, domain='default', grants=['Admin'], new_roles=None, tenant='tenantA'), call('serviceA', get_callback, set_callback, domain='service_domain', grants=['Admin'], new_roles=None, tenant='tenantA')] utils.create_service_credentials('serviceA') mock_create_user_credentials.assert_has_calls(calls) mock_protect_user_accounts.assert_called_once_with('serviceA') @patch.object(utils, 'get_api_version') def test_protect_user_account_from_pci_dss_force_change_password_v2( self, mock_get_api_version): mock_get_api_version.return_value = 2 utils.protect_user_account_from_pci_dss_force_change_password('u1') self.config.assert_not_called() @patch.object(utils, 'get_api_version') def test_protect_user_account_from_pci_dss_v3_no_tenant( self, mock_get_api_version): mock_get_api_version.return_value = 3 cfg = {} self.config.side_effect = lambda key: cfg.get(key, None) with self.assertRaises(ValueError): utils.protect_user_account_from_pci_dss_force_change_password('u1') self.config.assert_called_once_with('service-tenant') @patch.object(utils, 'update_user') @patch.object(utils, 'get_user_dict') @patch.object(utils, 'get_api_version') def test_protect_user_account_from_pci_dss_v3( self, mock_get_api_version, mock_get_user_dict, mock_update_user): mock_get_api_version.return_value = 3 cfg = {'service-tenant': 'tenantA'} self.config.side_effect = lambda key: cfg.get(key, None) users = { 'u1': { 'id': 'u1id', 'options': {'ignore_change_password_upon_first_use': True, 'ignore_password_expiry': True}}, 'u2': { 'id': 'u2id', 'options': {'ignore_change_password_upon_first_use': False, 'ignore_password_expiry': True}}, 'u3': {'id': 'u3id', 'options': {}}, 'u4': {'id': 'u4id'}, } mock_get_user_dict.side_effect = ( lambda key, **kwargs: copy.deepcopy(users.get(key, None))) # test user exists and option exists and is true utils.protect_user_account_from_pci_dss_force_change_password('u1') calls = [call('u1', utils.DEFAULT_DOMAIN), call('u1', utils.SERVICE_DOMAIN)] mock_get_user_dict.has_calls(calls) mock_update_user.assert_not_called() # test user exists and option exists and is False mock_get_user_dict.reset_mock() utils.protect_user_account_from_pci_dss_force_change_password('u2') calls = [call('u2id', options={'ignore_change_password_upon_first_use': True, 'ignore_password_expiry': True} ), call('u2id', options={'ignore_change_password_upon_first_use': True, 'ignore_password_expiry': True} )] mock_update_user.assert_has_calls(calls) # test user exists and the option doesn't exist mock_get_user_dict.reset_mock() mock_update_user.reset_mock(calls) utils.protect_user_account_from_pci_dss_force_change_password('u3') calls = [call('u3id', options={'ignore_change_password_upon_first_use': True, 'ignore_password_expiry': True} ), call('u3id', options={'ignore_change_password_upon_first_use': True, 'ignore_password_expiry': True} )] mock_update_user.assert_has_calls(calls) # test user exists and no options exist mock_get_user_dict.reset_mock() mock_update_user.reset_mock(calls) utils.protect_user_account_from_pci_dss_force_change_password('u4') calls = [call('u4id', options={'ignore_change_password_upon_first_use': True, 'ignore_password_expiry': True} ), call('u4id', options={'ignore_change_password_upon_first_use': True, 'ignore_password_expiry': True} )] mock_update_user.assert_has_calls(calls) # test user doesn't exist mock_get_user_dict.reset_mock() mock_update_user.reset_mock(calls) utils.protect_user_account_from_pci_dss_force_change_password('u5') mock_update_user.assert_not_called() @patch.object(utils, 'protect_user_account_from_pci_dss_force_change_password') @patch.object(utils, 'list_users_for_domain') @patch.object(utils, 'get_api_version') def test_ensure_all_service_accounts_protected_for_pci_dss_options( self, mock_get_api_version, mock_list_users_for_domain, mock_protect_service_accounts, ): # test it does nothing for less that keystone v3 mock_get_api_version.return_value = 2 utils.ensure_all_service_accounts_protected_for_pci_dss_options() mock_protect_service_accounts.assert_not_called() # now check that service accounts get protected. mock_get_api_version.return_value = 3 mock_list_users_for_domain.return_value = [ {'name': 'u1'}, {'name': 'u2'}] utils.ensure_all_service_accounts_protected_for_pci_dss_options() mock_protect_service_accounts.assert_has_calls([ call('u1'), call('u2')]) def test_ensure_valid_service_incorrect(self): utils.ensure_valid_service('fakeservice') self.log.assert_called_with("Invalid service requested: 'fakeservice'") self.relation_set.assert_called_with(admin_token=-1) def test_add_endpoint(self): publicurl = '10.0.0.1' adminurl = '10.0.0.2' internalurl = '10.0.0.3' utils.add_endpoint( 'RegionOne', 'nova', publicurl, adminurl, internalurl) self.create_service_entry.assert_called_with( 'nova', 'compute', 'Nova Compute Service', list_services=None) self.create_endpoint_template.asssert_called_with( region='RegionOne', service='nova', publicurl=publicurl, adminurl=adminurl, internalurl=internalurl) @patch.object(utils, 'uuid') @patch.object(utils, 'relation_set') @patch.object(utils, 'relation_get') @patch.object(utils, 'relation_ids') @patch.object(utils, 'is_elected_leader') def test_send_id_notifications(self, mock_is_elected_leader, mock_relation_ids, mock_relation_get, mock_relation_set, mock_uuid): checksums = {'foo-endpoint-changed': 1} relation_id = 'testrel:0' mock_uuid.uuid4.return_value = '1234' mock_relation_ids.return_value = [relation_id] mock_is_elected_leader.return_value = False utils.send_id_notifications(checksums) self.assertFalse(mock_relation_set.called) mock_is_elected_leader.return_value = True utils.send_id_notifications({}) self.assertFalse(mock_relation_set.called) utils.send_id_notifications(checksums) self.assertTrue(mock_relation_set.called) mock_relation_set.assert_called_once_with(relation_id=relation_id, relation_settings=checksums) mock_relation_set.reset_mock() utils.send_id_notifications(checksums, force=True) self.assertTrue(mock_relation_set.called) checksums['trigger'] = '1234' mock_relation_set.assert_called_once_with(relation_id=relation_id, relation_settings=checksums) @patch.object(utils, 'service_endpoint_dict') @patch.object(utils, 'relation_ids') @patch.object(utils, 'related_units') @patch.object(utils, 'relation_get') @patch.object(utils, 'relation_set') def test_send_id_service_notifications(self, mock_relation_set, mock_relation_get, mock_related_units, mock_relation_ids, mock_service_endpoint_dict): id_svc_rel_units = { 'identity-service:1': ['neutron-gateway/0'], 'identity-service:2': [ 'nova-cloud-controller/0', 'nova-cloud-controller/1', 'nova-cloud-controller/2'], 'identity-service:3': ['glance/0', 'glance/1', 'glance/2'] } def _related_units(rid): return id_svc_rel_units[rid] id_svc_rel_data = { 'neutron-gateway/0': { 'subscribe_ep_change': 'neutron'}, 'nova-cloud-controller/0': { 'nova_admin_url': 'http://172.20.0.13:8774/v2.1', 'subscribe_ep_change': 'placement neutron'}, 'nova-cloud-controller/1': {}, 'nova-cloud-controller/2': {}, 'glance/0': { 'admin_url': 'http://172.20.0.32:9292'}, 'glance/1': {}, 'glance/2': {}, 'keystone/0': {} } def _relation_get(unit, rid, attribute): return id_svc_rel_data[unit].get(attribute) def _service_endpoint_dict(service_name): _services = { 'neutron': {'internal': 'http://neutron.demo.com:9696'} } return _services.get(service_name) mock_relation_ids.return_value = id_svc_rel_units.keys() mock_related_units.side_effect = _related_units mock_relation_get.side_effect = _relation_get mock_service_endpoint_dict.side_effect = _service_endpoint_dict self.local_unit.return_value = 'keystone/0' # Check all services subscribed to placement changes are notified. mock_relation_set.reset_mock() utils.send_id_service_notifications( {'placement-endpoint-changed': {"internal": "http://demo.com"}}) mock_relation_set.assert_has_calls([ call( relation_id='identity-service:1', relation_settings={ 'catalog_ttl': 60, 'ep_changed': ('{"neutron": {"internal": ' '"http://neutron.demo.com:9696"}}') } ), call( relation_id='identity-service:2', relation_settings={ 'catalog_ttl': 60, 'ep_changed': ('{"neutron": {"internal": ' '"http://neutron.demo.com:9696"},' ' "placement": {"internal": "http://demo.com"}}') } ) ], any_order=True) # Check all services subscribed to neutron changes are notified. mock_relation_set.reset_mock() utils.send_id_service_notifications( {'neutron-endpoint-changed': {"internal": "http://demo.com"}}) expected_rel_set_calls = [ call( relation_id='identity-service:1', relation_settings={ 'catalog_ttl': 60, 'ep_changed': '{"neutron": {"internal": "http://demo.com"}}' } ), call( relation_id='identity-service:2', relation_settings={ 'catalog_ttl': 60, 'ep_changed': '{"neutron": {"internal": "http://demo.com"}}' } ) ] mock_relation_set.assert_has_calls( expected_rel_set_calls, any_order=True) # Check multiple ep changes with app subscribing to multiple eps mock_relation_set.reset_mock() utils.send_id_service_notifications( {'neutron-endpoint-changed': {"internal": "http://demo.com"}, 'placement-endpoint-changed': {"internal": "http://demo.com"}}) expected_rel_set_calls = [ call( relation_id='identity-service:1', relation_settings={ 'catalog_ttl': 60, 'ep_changed': '{"neutron": {"internal": "http://demo.com"}}' } ), call( relation_id='identity-service:2', relation_settings={ 'catalog_ttl': 60, 'ep_changed': ( '{"neutron": {"internal": "http://demo.com"}, ' '"placement": {"internal": "http://demo.com"}}' ) } ) ] mock_relation_set.assert_has_calls( expected_rel_set_calls, any_order=True) @patch.object(utils, 'get_manager') def test_service_endpoint_dict_v3(self, mock_get_manager): mock_manager = MagicMock() mock_get_manager.return_value = mock_manager mock_manager.resolve_service_id.return_value = None self.assertIsNone(utils.service_endpoint_dict('dummyservice')) mock_manager.reset_mock() mock_manager.resolve_service_id.return_value = 'd123456' mock_manager.list_endpoints.return_value = [ {'service_id': 'd123456', 'interface': 'internal', 'url': 'http://dummyservice.demo.com'}, {'service_id': 'd789102', 'interface': 'internal', 'url': 'http://anotherservice.demo.com'}, ] self.assertEqual( utils.service_endpoint_dict('dummyservice'), {'internal': 'http://dummyservice.demo.com'} ) mock_manager.reset_mock() mock_manager.resolve_service_id.return_value = 'd123456' mock_manager.list_endpoints.return_value = [ {'service_id': 'd789102', 'interface': 'public', 'url': 'http://anotherservice.demo.com'}, {'service_id': 'd789102', 'interface': 'internal', 'url': 'http://anotherservice.demo.com'}, ] self.assertEqual( utils.service_endpoint_dict('dummyservice'), {} ) @patch.object(utils, 'get_manager') def test_service_endpoint_dict_v2(self, mock_get_manager): mock_manager = MagicMock() mock_get_manager.return_value = mock_manager mock_manager.resolve_service_id.return_value = None self.assertIsNone(utils.service_endpoint_dict('dummyservice')) mock_manager.reset_mock() mock_manager.resolve_service_id.return_value = 'd123456' mock_manager.list_endpoints.return_value = [ {'service_id': 'd123456', 'internalurl': 'http://dummyservice.demo.com', 'publicurl': 'http://anotherservice.demo.com'}, ] self.assertEqual( utils.service_endpoint_dict('dummyservice'), {'internal': 'http://dummyservice.demo.com', 'public': 'http://anotherservice.demo.com'} ) mock_manager.reset_mock() mock_manager.resolve_service_id.return_value = 'd123456' mock_manager.list_endpoints.return_value = [ {'service_id': 'd789102', 'internalurl': 'http://dummyservice.demo.com', 'publicurl': 'http://anotherservice.demo.com'}, {'service_id': 'd789102', 'internalurl': 'http://dummyservice.demo.com', 'publicurl': 'http://anotherservice.demo.com'}, ] self.assertEqual( utils.service_endpoint_dict('dummyservice'), {} ) def test_get_admin_passwd_pwd_set(self): self.test_config.set('admin-password', 'supersecret') self.assertEqual(utils.get_admin_passwd(), 'supersecret') @patch.object(utils, 'is_leader') @patch.object(utils, 'leader_get') def test_get_admin_passwd_leader_set(self, leader_get, is_leader): is_leader.return_value = False leader_get.return_value = 'admin' self.assertEqual(utils.get_admin_passwd(), 'admin') leader_get.assert_called_with('admin_passwd') @patch.object(utils, 'is_leader') @patch.object(utils, 'leader_get') def test_get_admin_passwd_leader_set_user_specified(self, leader_get, is_leader): is_leader.return_value = False leader_get.return_value = 'admin' self.assertEqual(utils.get_admin_passwd(user='test'), 'admin') leader_get.assert_called_with('test_passwd') @patch.object(utils, 'is_leader') @patch.object(utils, 'leader_get') def test_get_admin_passwd_leader_set_user_config(self, leader_get, is_leader): is_leader.return_value = False leader_get.return_value = 'admin' self.test_config.set('admin-user', 'test') self.assertEqual(utils.get_admin_passwd(), 'admin') leader_get.assert_called_with('test_passwd') @patch.object(utils, 'is_leader') @patch.object(utils, 'leader_set') def test_set_admin_password(self, leader_set, is_leader): is_leader.return_value = True utils.set_admin_passwd('secret') leader_set.assert_called_once_with({'admin_passwd': 'secret'}) @patch.object(utils, 'is_leader') @patch.object(utils, 'leader_set') def test_set_admin_password_config_username(self, leader_set, is_leader): is_leader.return_value = True self.test_config.set('admin-user', 'username') utils.set_admin_passwd('secret') leader_set.assert_called_once_with({'username_passwd': 'secret'}) @patch.object(utils, 'is_leader') @patch.object(utils, 'leader_set') def test_set_admin_password_username(self, leader_set, is_leader): is_leader.return_value = True utils.set_admin_passwd('secret', user='username') leader_set.assert_called_once_with({'username_passwd': 'secret'}) @patch.object(utils, 'is_leader') @patch.object(utils, 'leader_get') @patch('os.path.isfile') def test_get_admin_passwd_genpass(self, isfile, leader_get, is_leader): is_leader.return_value = True leader_get.return_value = 'supersecretgen' self.test_config.set('admin-password', '') isfile.return_value = False self.subprocess.check_output.return_value = 'supersecretgen' self.assertEqual(utils.get_admin_passwd(), 'supersecretgen') def test_is_db_ready(self): allowed_units = None def fake_rel_get(attribute=None, *args, **kwargs): if attribute == 'allowed_units': return allowed_units self.relation_get.side_effect = fake_rel_get self.relation_id.return_value = 'shared-db:0' self.relation_ids.return_value = ['shared-db:0'] self.local_unit.return_value = 'unit/0' allowed_units = 'unit/0' self.assertTrue(utils.is_db_ready(use_current_context=True)) self.relation_id.return_value = 'shared-db:0' self.relation_ids.return_value = ['shared-db:0'] self.local_unit.return_value = 'unit/0' allowed_units = 'unit/1' self.assertFalse(utils.is_db_ready(use_current_context=True)) self.relation_ids.return_value = ['acme:0'] with self.assertRaises(Exception): utils.is_db_ready(use_current_context=True) allowed_units = 'unit/0' self.related_units.return_value = ['unit/0'] self.relation_ids.return_value = ['shared-db:0', 'shared-db:1'] self.assertTrue(utils.is_db_ready()) allowed_units = 'unit/1' self.assertFalse(utils.is_db_ready()) self.related_units.return_value = [] self.assertTrue(utils.is_db_ready()) @patch.object(utils, 'leader_set') @patch.object(utils, 'leader_get') @patch('charmhelpers.contrib.openstack.ip.unit_get') @patch('charmhelpers.contrib.openstack.ip.is_clustered') @patch('charmhelpers.contrib.openstack.ip.config') @patch.object(utils, 'create_keystone_endpoint') @patch.object(utils, 'create_tenant') @patch.object(utils, 'create_user_credentials') @patch.object(utils, 'create_service_entry') def test_ensure_initial_admin_public_name(self, _create_service_entry, _create_user_creds, _create_tenant, _create_keystone_endpoint, _ip_config, _is_clustered, _unit_get, _leader_get, _leader_set): _is_clustered.return_value = False _ip_config.side_effect = self.test_config.get _unit_get.return_value = '10.0.0.1' _leader_get.return_value = None self.test_config.set('os-public-hostname', 'keystone.example.com') utils.ensure_initial_admin(self.config) _create_keystone_endpoint.assert_called_with( public_ip='keystone.example.com', service_port=5000, internal_ip='10.0.0.1', admin_ip='10.0.0.1', auth_port=35357, region='RegionOne', ) @patch.object(utils, 'get_manager') def test_is_service_present(self, KeystoneManager): mock_keystone = MagicMock() mock_keystone.resolve_service_id.return_value = 'sid1' KeystoneManager.return_value = mock_keystone self.assertTrue(utils.is_service_present('bob', 'bill')) @patch.object(utils, 'get_manager') def test_is_service_present_false(self, KeystoneManager): mock_keystone = MagicMock() mock_keystone.resolve_service_id.return_value = None KeystoneManager.return_value = mock_keystone self.assertFalse(utils.is_service_present('bob', 'bill')) @patch.object(utils, 'get_manager') def test_delete_service_entry(self, KeystoneManager): mock_keystone = MagicMock() mock_keystone.resolve_service_id.return_value = 'sid1' KeystoneManager.return_value = mock_keystone utils.delete_service_entry('bob', 'bill') mock_keystone.delete_service_by_id.assert_called_once_with('sid1') @patch('os.path.isfile') def test_get_file_stored_domain_id(self, isfile_mock): isfile_mock.return_value = False x = utils.get_file_stored_domain_id('/a/file') assert x is None with patch.object(builtins, 'open', mock_open( read_data="some_data\n")): isfile_mock.return_value = True x = utils.get_file_stored_domain_id('/a/file') self.assertEqual(x, 'some_data') @patch.object(utils, 'is_leader') def test_assess_status(self, is_leader): is_leader.return_value = True with patch.object(utils, 'assess_status_func') as asf: callee = MagicMock() asf.return_value = callee utils.assess_status('test-config') asf.assert_called_once_with('test-config') callee.assert_called_once_with() self.os_application_version_set.assert_called_with( utils.VERSION_PACKAGE ) self.assertTrue(self.os_application_status_set.called) @patch.object(utils, 'determine_ports') @patch.object(utils, 'services') @patch.object(utils, 'get_optional_interfaces') @patch.object(utils, 'REQUIRED_INTERFACES') @patch.object(utils, 'check_extra_for_assess_status') @patch.object(utils, 'get_managed_services_and_ports') @patch.object(utils, 'make_assess_status_func') def test_assess_status_func(self, make_assess_status_func, get_managed_services_and_ports, check_extra_for_assess_status, REQUIRED_INTERFACES, get_optional_interfaces, services, determine_ports): services.return_value = ['s1'] determine_ports.return_value = [200] get_managed_services_and_ports.return_value = (['s1'], [200]) REQUIRED_INTERFACES.copy.return_value = {'int': ['test 1']} get_optional_interfaces.return_value = {'opt': ['test 2']} utils.assess_status_func('test-config') make_assess_status_func.assert_called_once_with( 'test-config', {'int': ['test 1'], 'opt': ['test 2']}, charm_func=check_extra_for_assess_status, services=['s1'], ports=[200]) def test_pause_unit_helper(self): with patch.object(utils, '_pause_resume_helper') as prh: utils.pause_unit_helper('random-config') prh.assert_called_once_with(utils.pause_unit, 'random-config') with patch.object(utils, '_pause_resume_helper') as prh: utils.resume_unit_helper('random-config') prh.assert_called_once_with(utils.resume_unit, 'random-config') @patch.object(utils, 'determine_ports') @patch.object(utils, 'services') @patch.object(utils, 'get_managed_services_and_ports') def test_pause_resume_helper(self, get_managed_services_and_ports, services, determine_ports): f = MagicMock() get_managed_services_and_ports.return_value = (['s1'], [200]) with patch.object(utils, 'assess_status_func') as asf: asf.return_value = 'assessor' utils._pause_resume_helper(f, 'some-config') asf.assert_called_once_with('some-config') f.assert_called_once_with('assessor', services=['s1'], ports=[200]) @patch.object(utils, 'run_in_apache') @patch.object(utils, 'restart_keystone') def test_restart_function_map(self, restart_keystone, run_in_apache): run_in_apache.return_value = True self.assertEqual(utils.restart_function_map(), {'apache2': restart_keystone}) @patch.object(utils, 'stop_manager_instance') @patch.object(utils, 'is_unit_paused_set') def test_restart_keystone_unit_paused(self, mock_is_unit_paused_set, mock_stop_manager_instance): mock_is_unit_paused_set.return_value = True utils.restart_keystone() mock_stop_manager_instance.assert_not_called() @patch.object(utils, 'snap_install_requested') @patch.object(utils, 'service_restart') @patch.object(utils, 'stop_manager_instance') @patch.object(utils, 'is_unit_paused_set') def test_restart_keystone_unit_not_paused_snap_install( self, mock_is_unit_paused_set, mock_stop_manager_instance, mock_service_restart, mock_snap_install_requested): mock_is_unit_paused_set.return_value = False mock_snap_install_requested.return_value = True utils.restart_keystone() mock_service_restart.assert_called_once_with('snap.keystone.*') mock_stop_manager_instance.assert_called_once_with() @patch.object(utils, 'run_in_apache') @patch.object(utils, 'snap_install_requested') @patch.object(utils, 'service_restart') @patch.object(utils, 'stop_manager_instance') @patch.object(utils, 'is_unit_paused_set') def test_restart_keystone_unit_not_paused_legacy( self, mock_is_unit_paused_set, mock_stop_manager_instance, mock_service_restart, mock_snap_install_requested, mock_run_in_apache): mock_is_unit_paused_set.return_value = False mock_snap_install_requested.return_value = False mock_run_in_apache.return_value = False utils.restart_keystone() mock_service_restart.assert_called_once_with('keystone') mock_stop_manager_instance.assert_called_once_with() @patch.object(utils, 'run_in_apache') @patch.object(utils, 'snap_install_requested') @patch.object(utils, 'restart_pid_check') @patch.object(utils, 'stop_manager_instance') @patch.object(utils, 'is_unit_paused_set') def test_restart_keystone_unit_not_paused( self, mock_is_unit_paused_set, mock_stop_manager_instance, mock_restart_pid_check, mock_snap_install_requested, mock_run_in_apache): mock_is_unit_paused_set.return_value = False mock_snap_install_requested.return_value = False mock_run_in_apache.return_value = True utils.restart_keystone() mock_restart_pid_check.assert_called_once_with('apache2') mock_stop_manager_instance.assert_called_once_with() def test_restart_pid_check(self): self.subprocess.call.return_value = 1 utils.restart_pid_check('apache2') self.service_stop.assert_called_once_with('apache2') self.service_start.assert_called_once_with('apache2') self.subprocess.call.assert_called_once_with( ['pgrep', 'apache2', '--nslist', 'pid', '--ns', str(os.getpid())] ) def test_restart_pid_check_ptable_string(self): self.subprocess.call.return_value = 1 utils.restart_pid_check('apache2', ptable_string='httpd') self.service_stop.assert_called_once_with('apache2') self.service_start.assert_called_once_with('apache2') self.subprocess.call.assert_called_once_with( ['pgrep', 'httpd', '--nslist', 'pid', '--ns', str(os.getpid())] ) # Do not sleep() to speed up manual runs. @patch('charmhelpers.core.decorators.time') def test_restart_pid_check_ptable_string_retry(self, mock_time): call_returns = [1, 0, 0] self.subprocess.call.side_effect = lambda x: call_returns.pop() utils.restart_pid_check('apache2', ptable_string='httpd') self.service_stop.assert_called_once_with('apache2') self.service_start.assert_called_once_with('apache2') # self.subprocess.call.assert_called_once_with(['pgrep', 'httpd']) expected = [ call(['pgrep', 'httpd', '--nslist', 'pid', '--ns', str(os.getpid())]), call(['pgrep', 'httpd', '--nslist', 'pid', '--ns', str(os.getpid())]), call(['pgrep', 'httpd', '--nslist', 'pid', '--ns', str(os.getpid())]) ] self.assertEqual(self.subprocess.call.call_args_list, expected) def test_get_requested_grants(self): settings = {'requested_grants': 'Admin,Member'} expected_results = ['Admin', 'Member'] self.assertEqual(utils.get_requested_grants(settings), expected_results) settings = {'not_requsted_grants': 'something else'} expected_results = [] self.assertEqual(utils.get_requested_grants(settings), expected_results) @patch.object(utils, 'https') def test_get_protocol(self, https): # http https.return_value = False protocol = utils.get_protocol() self.assertEqual(protocol, 'http') # https https.return_value = True protocol = utils.get_protocol() self.assertEqual(protocol, 'https') @patch.object(utils, 'get_manager') def test_add_credentials_keystone_not_ready(self, get_manager): """ Verify add_credentials_to_keystone when the relation data is incomplete """ relation_id = 'identity-credentials:0' remote_unit = 'unit/0' self.relation_get.return_value = {} utils.add_credentials_to_keystone( relation_id=relation_id, remote_unit=remote_unit) self.log.assert_called_with('identity-credentials peer has not yet ' 'set username') @patch.object(utils, 'set_service_password') @patch.object(utils, 'get_service_password') @patch.object(utils, 'create_user_credentials') @patch.object(utils, 'get_protocol') @patch.object(utils, 'resolve_address') @patch.object(utils, 'get_api_version') @patch.object(utils, 'get_manager') def test_add_credentials_keystone_username_only(self, get_manager, get_api_version, resolve_address, get_protocol, create_user_credentials, get_callback, set_callback): """ Verify add_credentials with only username """ manager = MagicMock() manager.resolve_tenant_id.return_value = 'abcdef0123456789' get_manager.return_value = manager remote_unit = 'unit/0' relation_id = 'identity-credentials:0' get_api_version.return_value = 2 get_protocol.return_value = 'http' resolve_address.return_value = '10.10.10.10' create_user_credentials.return_value = 'password' self.relation_get.return_value = {'username': 'requester'} self.get_service_password.return_value = 'password' self.get_requested_roles.return_value = [] self.test_config.set('admin-port', 80) self.test_config.set('service-port', 81) self.test_config.set('service-tenant', 'services') relation_data = {'auth_host': '10.10.10.10', 'credentials_host': '10.10.10.10', 'credentials_port': 81, 'auth_port': 80, 'auth_protocol': 'http', 'credentials_username': 'requester', 'credentials_protocol': 'http', 'credentials_password': 'password', 'credentials_project': 'services', 'credentials_project_id': 'abcdef0123456789', 'region': 'RegionOne', 'api_version': 2} utils.add_credentials_to_keystone( relation_id=relation_id, remote_unit=remote_unit) create_user_credentials.assert_called_with('requester', get_callback, set_callback, domain=None, new_roles=[], grants=['Admin'], tenant='services') self.peer_store_and_set.assert_called_with(relation_id=relation_id, **relation_data) @patch.object(utils, 'set_service_password') @patch.object(utils, 'get_service_password') @patch.object(utils, 'create_user_credentials') @patch.object(utils, 'get_protocol') @patch.object(utils, 'resolve_address') @patch.object(utils, 'get_api_version') @patch.object(utils, 'get_manager') def test_add_credentials_keystone_kv3(self, get_manager, get_api_version, resolve_address, get_protocol, create_user_credentials, get_callback, set_callback): """ Verify add_credentials with Keystone V3 """ manager = MagicMock() manager.resolve_tenant_id.return_value = 'abcdef0123456789' manager.resolve_domain_id.return_value = 'a-domain-id' get_manager.return_value = manager remote_unit = 'unit/0' relation_id = 'identity-credentials:0' get_api_version.return_value = 3 get_protocol.return_value = 'http' resolve_address.return_value = '10.10.10.10' create_user_credentials.return_value = 'password' self.relation_get.return_value = {'username': 'requester', 'domain': 'Non-Default'} self.get_service_password.return_value = 'password' self.get_requested_roles.return_value = [] self.test_config.set('admin-port', 80) self.test_config.set('service-port', 81) relation_data = {'auth_host': '10.10.10.10', 'credentials_host': '10.10.10.10', 'credentials_port': 81, 'auth_port': 80, 'auth_protocol': 'http', 'credentials_username': 'requester', 'credentials_protocol': 'http', 'credentials_password': 'password', 'credentials_project': 'services', 'credentials_project_id': 'abcdef0123456789', 'credentials_user_domain_id': 'a-domain-id', 'credentials_project_domain_id': 'a-domain-id', 'credentials_project_domain_name': 'Non-Default', 'credentials_user_domain_name': 'Non-Default', 'region': 'RegionOne', 'domain': 'Non-Default', 'api_version': 3} utils.add_credentials_to_keystone( relation_id=relation_id, remote_unit=remote_unit) create_user_credentials.assert_called_with('requester', get_callback, set_callback, domain='Non-Default', new_roles=[], grants=['Admin'], tenant='services') self.peer_store_and_set.assert_called_with(relation_id=relation_id, **relation_data) @patch.object(utils, 'set_service_password') @patch.object(utils, 'get_service_password') @patch.object(utils, 'create_tenant') @patch.object(utils, 'create_user_credentials') @patch.object(utils, 'get_protocol') @patch.object(utils, 'resolve_address') @patch.object(utils, 'get_api_version') @patch.object(utils, 'get_manager') def test_add_credentials_keystone_roles_grants(self, get_manager, get_api_version, resolve_address, get_protocol, create_user_credentials, create_tenant, get_callback, set_callback): """ Verify add_credentials with all relation settings """ manager = MagicMock() manager.resolve_tenant_id.return_value = 'abcdef0123456789' get_manager.return_value = manager remote_unit = 'unit/0' relation_id = 'identity-credentials:0' get_api_version.return_value = 2 get_protocol.return_value = 'http' resolve_address.return_value = '10.10.10.10' create_user_credentials.return_value = 'password' self.relation_get.return_value = {'username': 'requester', 'project': 'myproject', 'requested_roles': 'New,Member', 'requested_grants': 'New,Member'} self.get_service_password.return_value = 'password' self.get_requested_roles.return_value = ['New', 'Member'] self.test_config.set('admin-port', 80) self.test_config.set('service-port', 81) relation_data = {'auth_host': '10.10.10.10', 'credentials_host': '10.10.10.10', 'credentials_port': 81, 'auth_port': 80, 'auth_protocol': 'http', 'credentials_username': 'requester', 'credentials_protocol': 'http', 'credentials_password': 'password', 'credentials_project': 'myproject', 'credentials_project_id': 'abcdef0123456789', 'region': 'RegionOne', 'api_version': 2} utils.add_credentials_to_keystone( relation_id=relation_id, remote_unit=remote_unit) create_tenant.assert_called_with('myproject', None) create_user_credentials.assert_called_with('requester', get_callback, set_callback, domain=None, new_roles=['New', 'Member'], grants=['New', 'Member'], tenant='myproject') self.peer_store_and_set.assert_called_with(relation_id=relation_id, **relation_data) @patch.object(utils.os, 'remove') @patch.object(utils.os.path, 'exists') def test_disable_unused_apache_sites(self, os_path_exists, os_remove): utils.UNUSED_APACHE_SITE_FILES = ['/path/sitename.conf'] # Files do not exist os_path_exists.return_value = False utils.disable_unused_apache_sites() self.subprocess.check_call.assert_not_called() # Files exist os_path_exists.return_value = True utils.disable_unused_apache_sites() self.subprocess.check_call.assert_called_with( ['a2dissite', 'sitename'] ) # Force remove os_path_exists.return_value = True self.subprocess.CalledProcessError = subprocess.CalledProcessError self.subprocess.check_call.side_effect = subprocess.CalledProcessError( 1, 'a2dissite') utils.disable_unused_apache_sites() os_remove.assert_called_with(utils.UNUSED_APACHE_SITE_FILES[0]) def test_run_in_apache_kilo(self): self.os_release.return_value = 'kilo' self.assertFalse(utils.run_in_apache()) def test_run_in_apache_liberty(self): self.os_release.return_value = 'liberty' self.assertTrue(utils.run_in_apache()) def test_run_in_apache_set_release(self): self.os_release.return_value = 'kilo' self.assertTrue(utils.run_in_apache(release='liberty')) def test_get_api_version_icehouse(self): self.assertEqual(utils.get_api_version(), 2) def test_get_api_version_queens(self): self.get_os_codename_install_source.return_value = 'queens' self.assertEqual(utils.get_api_version(), 3) def test_get_api_version_invalid_option_value(self): self.test_config.set('preferred-api-version', 4) with self.assertRaises(ValueError): utils.get_api_version() def test_get_api_version_queens_invalid_option_value(self): self.test_config.set('preferred-api-version', 2) self.get_os_codename_install_source.return_value = 'queens' with self.assertRaises(ValueError): utils.get_api_version() @patch.object(utils, 'is_leader') @patch('os.path.exists') def test_key_setup(self, mock_path_exists, mock_is_leader): base_cmd = ['sudo', '-u', 'keystone', 'keystone-manage'] mock_is_leader.return_value = True mock_path_exists.return_value = False with patch.object(builtins, 'open', mock_open()) as m: utils.key_setup() m.assert_called_once_with(utils.KEY_SETUP_FILE, "w") self.subprocess.check_output.has_calls( [ base_cmd + ['fernet_setup'], base_cmd + ['credential_setup'], base_cmd + ['credential_migrate'], ]) mock_path_exists.assert_called_once_with(utils.KEY_SETUP_FILE) mock_is_leader.assert_called_once_with() def test_fernet_rotate(self): cmd = ['sudo', '-u', 'keystone', 'keystone-manage', 'fernet_rotate'] utils.fernet_rotate() self.subprocess.check_output.called_with(cmd) @patch.object(utils, 'is_leader') @patch.object(utils, 'leader_get') @patch.object(utils, 'leader_set') @patch('os.listdir') def test_key_leader_set(self, listdir, leader_set, leader_get, is_leader): listdir.return_value = ['0', '1'] is_leader.return_value = True leader_get.return_value = None self.time.time.return_value = "the-time" with patch.object(builtins, 'open', mock_open( read_data="some_data")): utils.key_leader_set() listdir.has_calls([ call(utils.FERNET_KEY_REPOSITORY), call(utils.CREDENTIAL_KEY_REPOSITORY)]) leader_set.assert_called_with( {'key_repository': json.dumps( {utils.FERNET_KEY_REPOSITORY: {'0': 'some_data', '1': 'some_data'}, utils.CREDENTIAL_KEY_REPOSITORY: {'0': 'some_data', '1': 'some_data'}}) }) @patch.object(utils, 'is_leader') @patch.object(utils, 'leader_get') @patch.object(utils, 'leader_set') @patch('os.listdir') def test_key_leader_set_no_change( self, listdir, leader_set, leader_get, is_leader): listdir.return_value = ['0', '1'] is_leader.return_value = True leader_get.return_value = json.dumps( {utils.FERNET_KEY_REPOSITORY: {'0': 'some_data', '1': 'some_data'}, utils.CREDENTIAL_KEY_REPOSITORY: {'0': 'some_data', '1': 'some_data'}}) self.time.time.return_value = "the-time" with patch.object(builtins, 'open', mock_open( read_data="some_data")): utils.key_leader_set() listdir.has_calls([ call(utils.FERNET_KEY_REPOSITORY), call(utils.CREDENTIAL_KEY_REPOSITORY)]) leader_set.assert_not_called() @patch.object(utils, '_file_equal_to') @patch.object(utils, "is_leader") @patch('os.rename') @patch.object(utils, 'leader_get') @patch('os.listdir') @patch('os.remove') def test_key_write( self, remove, listdir, leader_get, rename, is_leader, mock_file_equal_to ): mock_file_equal_to.return_value = False is_leader.return_value = False leader_get.return_value = json.dumps( {utils.FERNET_KEY_REPOSITORY: {'0': 'key0', '1': 'key1'}, utils.CREDENTIAL_KEY_REPOSITORY: {'0': 'key0', '1': 'key1'}}) listdir.return_value = ['0', '1', '2'] with patch.object(builtins, 'open', mock_open()) as m: utils.key_write() m.assert_called_with(utils.KEY_SETUP_FILE, "w") self.mkdir.has_calls([call(utils.CREDENTIAL_KEY_REPOSITORY, owner='keystone', group='keystone', perms=0o700), call(utils.FERNET_KEY_REPOSITORY, owner='keystone', group='keystone', perms=0o700)]) tmp_fernet_dir = utils.FERNET_KEY_REPOSITORY + ".tmp" tmp_creds_dir = utils.CREDENTIAL_KEY_REPOSITORY + ".tmp" # note 'any_order=True' as we are dealing with dictionaries in Py27 self.write_file.assert_has_calls( [ call(os.path.join(tmp_creds_dir, '0'), u'key0', owner='keystone', group='keystone', perms=0o600), call(os.path.join(tmp_creds_dir, '1'), u'key1', owner='keystone', group='keystone', perms=0o600), call(os.path.join(tmp_fernet_dir, '0'), u'key0', owner='keystone', group='keystone', perms=0o600), call(os.path.join(tmp_fernet_dir, '1'), u'key1', owner='keystone', group='keystone', perms=0o600), ], any_order=True) rename.assert_has_calls( [ call(os.path.join(tmp_creds_dir, '0'), os.path.join(utils.CREDENTIAL_KEY_REPOSITORY, '0')), call(os.path.join(tmp_creds_dir, '1'), os.path.join(utils.CREDENTIAL_KEY_REPOSITORY, '1')), call(os.path.join(tmp_fernet_dir, '0'), os.path.join(utils.FERNET_KEY_REPOSITORY, '0')), call(os.path.join(tmp_fernet_dir, '1'), os.path.join(utils.FERNET_KEY_REPOSITORY, '1')), ], any_order=True) @patch.object(utils, 'keystone_context') @patch.object(utils, 'fernet_rotate') @patch.object(utils, 'key_leader_set') @patch.object(utils, 'os') @patch.object(utils, 'is_leader') def test_fernet_keys_rotate_and_sync(self, mock_is_leader, mock_os, mock_key_leader_set, mock_fernet_rotate, mock_keystone_context): self.test_config.set('fernet-max-active-keys', 3) self.test_config.set('token-expiration', 60) self.time.time.return_value = 0 # if not leader shouldn't do anything mock_is_leader.return_value = False utils.fernet_keys_rotate_and_sync() mock_os.stat.assert_not_called() # shouldn't do anything as the token provider is wrong mock_keystone_context.fernet_enabled.return_value = False mock_is_leader.return_value = True utils.fernet_keys_rotate_and_sync() mock_os.stat.assert_not_called() # fail gracefully if key repository is not initialized mock_keystone_context.fernet_enabled.return_value = True mock_os.stat.side_effect = Exception() with self.assertRaises(Exception): utils.fernet_keys_rotate_and_sync() self.time.time.assert_not_called() mock_os.stat.side_effect = None # now set up the times, so that it still shouldn't be called. self.time.time.return_value = 30 self.time.gmtime = time.gmtime self.time.asctime = time.asctime _stat = MagicMock() _stat.st_mtime = 10 mock_os.stat.return_value = _stat mock_key_leader_set.reset_mock() utils.fernet_keys_rotate_and_sync(log_func=self.log) self.log.assert_called_once_with( 'No rotation until at least Thu Jan 1 00:01:10 1970, ' 'but checking keys are set in leader settings.', level='DEBUG') # key_leader_set is always called once, as even if keys are not rotated # it's needs to be certain that they are in the leader settings mock_key_leader_set.assert_called_once_with() # finally, set it up so that the rotation and sync occur self.time.time.return_value = 71 mock_key_leader_set.reset_mock() utils.fernet_keys_rotate_and_sync() mock_fernet_rotate.assert_called_once_with() mock_key_leader_set.assert_called_once_with() @patch.object(utils, 'resource_map') @patch.object(utils.os.path, 'isdir') def test_restart_map(self, osp_isdir, resource_map): rsc_map = collections.OrderedDict([ ('file1', { 'services': ['svc1'], 'contexts': ['ctxt1']})]) resource_map.return_value = rsc_map osp_isdir.return_value = False self.assertEqual( utils.restart_map(), collections.OrderedDict([ ('file1', ['svc1'])])) osp_isdir.return_value = True self.assertEqual( utils.restart_map(), collections.OrderedDict([ ('file1', ['svc1']), ('/etc/apache2/ssl/keystone/*', ['apache2'])])) def test_assemble_endpoints(self): data = { "egress-subnets": "10.5.0.16/32", "ingress-address": "10.5.0.16", "neutron_admin_url": "http://10.5.0.16:9696", "neutron_internal_url": "http://10.5.1.16:9696", "neutron_public_url": "http://10.5.2.16:9696", "neutron_region": "RegionOne", "neutron_service": "neutron", "private-address": "10.5.3.16", "relation_trigger": "821bd014-3189-4062-a004-5b286335bcef", } expected = { 'neutron': {'service': 'neutron', 'admin_url': "http://10.5.0.16:9696", 'internal_url': "http://10.5.1.16:9696", 'public_url': "http://10.5.2.16:9696", 'region': 'RegionOne'}, # these are leftovers which are present in the response 'egress-subnets': {'': "10.5.0.16/32"}, 'ingress-address': {'': "10.5.0.16"}, 'relation': {'trigger': "821bd014-3189-4062-a004-5b286335bcef"}, 'private-address': {'': "10.5.3.16"} } endpoints = utils.assemble_endpoints(data) self.assertDictEqual(dict(endpoints), expected) def test_endpoints_checksum(self): settings = {'service': 'neutron', 'admin_url': "http://10.5.0.16:9696", 'internal_url': "http://10.5.1.16:9696", 'public_url': "http://10.5.2.16:9696", 'region': 'RegionOne'} csum = utils.endpoints_checksum(settings) self.assertEqual(csum, ('d938ff5656d3d9b50345e8061b4c73c8' '116a9c7fbc087765ce2e3a4a5df7cb17')) @patch.object(utils, 'leader_get') def test_get_charm_credentials(self, mock_leader_get): expect = ks_types.CharmCredentials( '_charm-keystone-admin', 'fakepassword', 'all', 'admin', 'default', 'default') mock_leader_get.return_value = 'fakepassword' self.assertEquals(utils.get_charm_credentials(), expect) mock_leader_get.assert_called_once_with(expect.username + '_passwd') mock_leader_get.retrun_value = None mock_leader_get.side_effect = [None] with self.assertRaises(RuntimeError): utils.get_charm_credentials() @patch.object(utils, 'leader_get') def test_is_bootstrapped(self, mock_leader_get): mock_leader_get.side_effect = [None, None] self.assertFalse(utils.is_bootstrapped()) mock_leader_get.assert_called_once_with('keystone-bootstrapped') mock_leader_get.side_effect = [True, None] self.assertFalse(utils.is_bootstrapped()) mock_leader_get.side_effect = [True, 'fakepassword'] self.assertTrue(utils.is_bootstrapped()) mock_leader_get.assert_called_with('_charm-keystone-admin_passwd') @patch.object(utils, 'get_manager') @patch.object(utils, 'is_leader') @patch.object(utils, 'leader_set') @patch.object(utils, 'resolve_address') @patch.object(utils, 'endpoint_url') @patch.object(utils, 'pwgen') @patch.object(utils, 'leader_get') @patch.object(utils, 'get_api_suffix') def test_bootstrap_keystone( self, mock_get_api_suffix, mock_leader_get, mock_pwgen, mock_endpoint_url, mock_resolve_address, mock_leader_set, mock_is_leader, mock_get_manager): configs = MagicMock() mock_get_api_suffix.return_value = 'suffix' mock_resolve_address.side_effect = lambda x: x mock_endpoint_url.side_effect = ( lambda x, y, z: 'http://{}:{}/{}'.format(x, y, z)) mock_leader_get.return_value = 'fakepassword' mock_is_leader.return_value = True mock_get_manager().resolve_user_id.return_value = 'fakeid' self.os_release.return_value = 'queens' utils.bootstrap_keystone(configs=configs) self.subprocess.check_call.assert_called_once_with( ('keystone-manage', 'bootstrap', '--bootstrap-username', '_charm-keystone-admin', '--bootstrap-password', 'fakepassword', '--bootstrap-project-name', 'admin', '--bootstrap-role-name', 'Admin', '--bootstrap-service-name', 'keystone', '--bootstrap-admin-url', 'http://admin:35357/suffix', '--bootstrap-public-url', 'http://public:5000/suffix', '--bootstrap-internal-url', 'http://int:5000/suffix', '--bootstrap-region-id', 'RegionOne'), ) mock_leader_set.assert_called_once_with({ 'keystone-bootstrapped': True, '_charm-keystone-admin_passwd': 'fakepassword'}) self.assertFalse(configs.write_all.called) mock_leader_set.reset_mock() self.os_release.return_value = 'pike' utils.bootstrap_keystone(configs=configs) mock_leader_set.assert_has_calls([ call({'keystone-bootstrapped': True, '_charm-keystone-admin_passwd': 'fakepassword'}), call({'transitional_charm_user_id': 'fakeid'}), ]) configs.write_all.assert_called_once_with() def test_get_real_role_name(self): def _resolve_role_name(role_name): roles = {'member': 'Member'} return roles.get(role_name) manager = MagicMock() manager.resolve_role_name.side_effect = _resolve_role_name self.assertEqual( utils.get_real_role_names( ['member', 'MissingRole'], manager), ['Member'])