507 lines
18 KiB
Python
507 lines
18 KiB
Python
# Copyright 2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
from eventlet import Timeout
|
|
import mock
|
|
import oslo_messaging as messaging
|
|
from oslo_messaging.rpc.client import RemoteError
|
|
from testtools.matchers import Is
|
|
|
|
import trove.common.context as context
|
|
from trove.common import exception
|
|
from trove.common.remote import guest_client
|
|
from trove.guestagent import api
|
|
from trove import rpc
|
|
from trove.tests.unittests import trove_testtools
|
|
|
|
REPLICATION_SNAPSHOT = {'master': {'id': '123', 'host': '192.168.0.1',
|
|
'port': 3306},
|
|
'dataset': {},
|
|
'binlog_position': 'binpos'}
|
|
|
|
RPC_API_VERSION = '1.0'
|
|
|
|
|
|
def _mock_call_pwd_change(cmd, version=None, users=None):
|
|
if users == 'dummy':
|
|
return True
|
|
else:
|
|
raise BaseException("Test Failed")
|
|
|
|
|
|
def _mock_call(cmd, timeout, version=None, username=None, hostname=None,
|
|
database=None, databases=None):
|
|
# To check get_user, list_access, grant_access, revoke_access in cmd.
|
|
if cmd in ('get_user', 'list_access', 'grant_access', 'revoke_access'):
|
|
return True
|
|
else:
|
|
raise BaseException("Test Failed")
|
|
|
|
|
|
class ApiTest(trove_testtools.TestCase):
|
|
@mock.patch.object(rpc, 'get_client')
|
|
@mock.patch('trove.instance.models.get_instance_encryption_key',
|
|
return_value='2LMDgren5citVxmSYNiRFCyFfVDjJtDaQT9LYV08')
|
|
def setUp(self, mock_get_encryption_key, *args):
|
|
super(ApiTest, self).setUp()
|
|
self.context = context.TroveContext()
|
|
self.guest = api.API(self.context, 0)
|
|
self.guest._cast = _mock_call_pwd_change
|
|
self.guest._call = _mock_call
|
|
self.api = api.API(self.context, "instance-id-x23d2d")
|
|
self._mock_rpc_client()
|
|
mock_get_encryption_key.assert_called()
|
|
|
|
def test_change_passwords(self):
|
|
self.assertIsNone(self.guest.change_passwords("dummy"))
|
|
|
|
def test_get_user(self):
|
|
self.assertTrue(self.guest.get_user("dummyname", "dummyhost"))
|
|
|
|
def test_list_access(self):
|
|
self.assertTrue(self.guest.list_access("dummyname", "dummyhost"))
|
|
|
|
def test_grant_access(self):
|
|
self.assertTrue(self.guest.grant_access("dumname", "dumhost", "dumdb"))
|
|
|
|
def test_revoke_access(self):
|
|
self.assertTrue(self.guest.revoke_access("dumname", "dumhost",
|
|
"dumdb"))
|
|
|
|
def test_get_routing_key(self):
|
|
self.assertEqual('guestagent.instance-id-x23d2d',
|
|
self.api._get_routing_key())
|
|
|
|
def test_update_attributes(self):
|
|
self.api.update_attributes('test_user', '%', {'name': 'new_user'})
|
|
|
|
self._verify_rpc_prepare_before_cast()
|
|
self._verify_cast('update_attributes', username='test_user',
|
|
hostname='%', user_attrs={'name': 'new_user'})
|
|
|
|
def test_create_user(self):
|
|
self.api.create_user('test_user')
|
|
|
|
self._verify_rpc_prepare_before_cast()
|
|
self._verify_cast('create_user', users='test_user')
|
|
|
|
@mock.patch('trove.guestagent.api.LOG')
|
|
def test_api_cast_exception(self, mock_logging):
|
|
self.call_context.cast.side_effect = IOError('host down')
|
|
self.assertRaises(exception.GuestError, self.api.create_user,
|
|
'test_user')
|
|
|
|
@mock.patch('trove.guestagent.api.LOG')
|
|
def test_api_call_exception(self, mock_logging):
|
|
self.call_context.call.side_effect = IOError('host_down')
|
|
self.assertRaises(exception.GuestError, self.api.list_users)
|
|
|
|
def test_api_call_timeout(self):
|
|
self.call_context.call.side_effect = Timeout()
|
|
self.assertRaises(exception.GuestTimeout, self.api.restart)
|
|
|
|
@mock.patch('trove.guestagent.api.LOG')
|
|
def test_api_cast_remote_error(self, mock_logging):
|
|
self.call_context.cast.side_effect = RemoteError('Error')
|
|
self.assertRaises(exception.GuestError, self.api.delete_database,
|
|
'test_db')
|
|
|
|
@mock.patch('trove.guestagent.api.LOG')
|
|
def test_api_call_remote_error(self, mock_logging):
|
|
self.call_context.call.side_effect = RemoteError('Error')
|
|
self.assertRaises(exception.GuestError, self.api.stop_db)
|
|
|
|
def test_list_users(self):
|
|
exp_resp = ['user1', 'user2', 'user3']
|
|
self.call_context.call.return_value = exp_resp
|
|
|
|
resp = self.api.list_users()
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('list_users', limit=None, marker=None,
|
|
include_marker=False)
|
|
self.assertEqual(exp_resp, resp)
|
|
|
|
def test_delete_user(self):
|
|
self.api.delete_user('test_user')
|
|
|
|
self._verify_rpc_prepare_before_cast()
|
|
self._verify_cast('delete_user', user='test_user')
|
|
|
|
def test_create_database(self):
|
|
databases = ['db1', 'db2', 'db3']
|
|
self.api.create_database(databases)
|
|
|
|
self._verify_rpc_prepare_before_cast()
|
|
self.call_context.cast.assert_called_once_with(
|
|
self.context, "create_database", databases=databases)
|
|
|
|
def test_list_databases(self):
|
|
exp_resp = ['db1', 'db2', 'db3']
|
|
self.call_context.call.return_value = exp_resp
|
|
|
|
resp = self.api.list_databases(
|
|
limit=1, marker=2, include_marker=False)
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call("list_databases", limit=1, marker=2,
|
|
include_marker=False)
|
|
self.assertEqual(exp_resp, resp)
|
|
|
|
def test_delete_database(self):
|
|
self.api.delete_database('test_database_name')
|
|
|
|
self._verify_rpc_prepare_before_cast()
|
|
self._verify_cast("delete_database", database='test_database_name')
|
|
|
|
def test_enable_root(self):
|
|
self.call_context.call.return_value = True
|
|
|
|
resp = self.api.enable_root()
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('enable_root')
|
|
self.assertThat(resp, Is(True))
|
|
|
|
def test_enable_root_with_password(self):
|
|
self.call_context.call.return_value = True
|
|
|
|
resp = self.api.enable_root_with_password()
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('enable_root_with_password', root_password=None)
|
|
self.assertThat(resp, Is(True))
|
|
|
|
def test_disable_root(self):
|
|
self.call_context.call.return_value = True
|
|
|
|
resp = self.api.disable_root()
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('disable_root')
|
|
self.assertThat(resp, Is(True))
|
|
|
|
def test_is_root_enabled(self):
|
|
self.call_context.call.return_value = False
|
|
|
|
resp = self.api.is_root_enabled()
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('is_root_enabled')
|
|
self.assertThat(resp, Is(False))
|
|
|
|
def test_get_hwinfo(self):
|
|
self.call_context.call.return_value = '[blah]'
|
|
|
|
resp = self.api.get_hwinfo()
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('get_hwinfo')
|
|
self.assertThat(resp, Is('[blah]'))
|
|
|
|
def test_rpc_ping(self):
|
|
# execute
|
|
self.api.rpc_ping()
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('rpc_ping')
|
|
|
|
def test_get_diagnostics(self):
|
|
self.call_context.call.return_value = '[all good]'
|
|
|
|
resp = self.api.get_diagnostics()
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('get_diagnostics')
|
|
self.assertThat(resp, Is('[all good]'))
|
|
|
|
def test_restart(self):
|
|
self.api.restart()
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('restart')
|
|
|
|
def test_start_db_with_conf_changes(self):
|
|
self.api.start_db_with_conf_changes(None)
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('start_db_with_conf_changes', config_contents=None)
|
|
|
|
def test_reset_configuration(self):
|
|
# execute
|
|
self.api.reset_configuration({'config_contents': 'some junk'})
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('reset_configuration',
|
|
configuration={'config_contents': 'some junk'})
|
|
|
|
def test_stop_db(self):
|
|
self.api.stop_db(do_not_start_on_reboot=False)
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('stop_db', do_not_start_on_reboot=False)
|
|
|
|
def test_get_volume_info(self):
|
|
exp_resp = {'fake': 'resp'}
|
|
self.call_context.call.return_value = exp_resp
|
|
|
|
resp = self.api.get_volume_info()
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('get_filesystem_stats', fs_path=None)
|
|
self.assertThat(resp, Is(exp_resp))
|
|
|
|
def test_update_guest(self):
|
|
self.api.update_guest()
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('update_guest')
|
|
|
|
def test_create_backup(self):
|
|
self.api.create_backup({'id': '123'})
|
|
|
|
self._verify_rpc_prepare_before_cast()
|
|
self._verify_cast('create_backup', backup_info={'id': '123'})
|
|
|
|
def test_unmount_volume(self):
|
|
# execute
|
|
self.api.unmount_volume('/dev/vdb', '/var/lib/mysql')
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('unmount_volume', device_path='/dev/vdb',
|
|
mount_point='/var/lib/mysql')
|
|
|
|
def test_mount_volume(self):
|
|
# execute
|
|
self.api.mount_volume('/dev/vdb', '/var/lib/mysql')
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('mount_volume', device_path='/dev/vdb',
|
|
mount_point='/var/lib/mysql')
|
|
|
|
def test_resize_fs(self):
|
|
# execute
|
|
self.api.resize_fs('/dev/vdb', '/var/lib/mysql')
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('resize_fs', device_path='/dev/vdb',
|
|
mount_point='/var/lib/mysql')
|
|
|
|
def test_update_overrides(self):
|
|
self.api.update_overrides('123')
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('update_overrides', overrides='123', remove=False)
|
|
|
|
def test_apply_overrides(self):
|
|
self.api.apply_overrides('123')
|
|
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('apply_overrides', overrides='123')
|
|
|
|
def test_get_replication_snapshot(self):
|
|
# execute
|
|
self.api.get_replication_snapshot({})
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('get_replication_snapshot', snapshot_info={},
|
|
replica_source_config=None)
|
|
|
|
def test_attach_replication_slave(self):
|
|
# execute
|
|
self.api.attach_replication_slave(REPLICATION_SNAPSHOT)
|
|
# verify
|
|
self._verify_rpc_prepare_before_cast()
|
|
self._verify_cast('attach_replication_slave',
|
|
snapshot=REPLICATION_SNAPSHOT, slave_config=None)
|
|
|
|
def test_detach_replica(self):
|
|
# execute
|
|
self.api.detach_replica()
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('detach_replica', for_failover=False)
|
|
|
|
def test_get_replica_context(self):
|
|
# execute
|
|
self.api.get_replica_context()
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('get_replica_context')
|
|
|
|
def test_attach_replica(self):
|
|
# execute
|
|
self.api.attach_replica(REPLICATION_SNAPSHOT, slave_config=None)
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('attach_replica',
|
|
replica_info=REPLICATION_SNAPSHOT, slave_config=None)
|
|
|
|
def test_make_read_only(self):
|
|
# execute
|
|
self.api.make_read_only(True)
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('make_read_only', read_only=True)
|
|
|
|
def test_enable_as_master(self):
|
|
# execute
|
|
self.api.enable_as_master({})
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('enable_as_master', replica_source_config={})
|
|
|
|
def test_get_txn_count(self):
|
|
# execute
|
|
self.api.get_txn_count()
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('get_txn_count')
|
|
|
|
def test_get_last_txn(self):
|
|
# execute
|
|
self.api.get_last_txn()
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('get_last_txn')
|
|
|
|
def test_get_latest_txn_id(self):
|
|
# execute
|
|
self.api.get_latest_txn_id()
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('get_latest_txn_id')
|
|
|
|
def test_wait_for_txn(self):
|
|
# execute
|
|
self.api.wait_for_txn("")
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('wait_for_txn', txn="")
|
|
|
|
def test_cleanup_source_on_replica_detach(self):
|
|
# execute
|
|
self.api.cleanup_source_on_replica_detach({'replication_user':
|
|
'test_user'})
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('cleanup_source_on_replica_detach',
|
|
replica_info={'replication_user': 'test_user'})
|
|
|
|
def test_demote_replication_master(self):
|
|
# execute
|
|
self.api.demote_replication_master()
|
|
# verify
|
|
self._verify_rpc_prepare_before_call()
|
|
self._verify_call('demote_replication_master')
|
|
|
|
@mock.patch.object(messaging, 'Target')
|
|
@mock.patch.object(rpc, 'get_server')
|
|
def test_prepare(self, *args):
|
|
self.api.prepare('2048', 'package1', 'db1', 'user1', '/dev/vdt',
|
|
'/mnt/opt', None, 'cont', '1-2-3-4',
|
|
'override', {'id': '2-3-4-5'})
|
|
|
|
self._verify_rpc_prepare_before_cast()
|
|
self._verify_cast(
|
|
'prepare', packages=['package1'], databases='db1',
|
|
memory_mb='2048', users='user1', device_path='/dev/vdt',
|
|
mount_point='/mnt/opt', backup_info=None,
|
|
config_contents='cont', root_password='1-2-3-4',
|
|
overrides='override', cluster_config={'id': '2-3-4-5'},
|
|
snapshot=None, modules=None)
|
|
|
|
@mock.patch.object(messaging, 'Target')
|
|
@mock.patch.object(rpc, 'get_server')
|
|
def test_prepare_with_backup(self, *args):
|
|
backup = {'id': 'backup_id_123'}
|
|
self.api.prepare('2048', 'package1', 'db1', 'user1', '/dev/vdt',
|
|
'/mnt/opt', backup, 'cont', '1-2-3-4',
|
|
'overrides', {"id": "2-3-4-5"}, modules=None)
|
|
|
|
self._verify_rpc_prepare_before_cast()
|
|
self._verify_cast(
|
|
'prepare', packages=['package1'], databases='db1',
|
|
memory_mb='2048', users='user1', device_path='/dev/vdt',
|
|
mount_point='/mnt/opt', backup_info=backup,
|
|
config_contents='cont', root_password='1-2-3-4',
|
|
overrides='overrides', cluster_config={'id': '2-3-4-5'},
|
|
snapshot=None, modules=None)
|
|
|
|
@mock.patch.object(messaging, 'Target')
|
|
@mock.patch.object(rpc, 'get_server')
|
|
def test_prepare_with_modules(self, *args):
|
|
modules = [{'id': 'mod_id'}]
|
|
self.api.prepare('2048', 'package1', 'db1', 'user1', '/dev/vdt',
|
|
'/mnt/opt', None, 'cont', '1-2-3-4',
|
|
'overrides', {"id": "2-3-4-5"}, modules=modules)
|
|
|
|
self._verify_rpc_prepare_before_cast()
|
|
self._verify_cast(
|
|
'prepare', packages=['package1'], databases='db1',
|
|
memory_mb='2048', users='user1', device_path='/dev/vdt',
|
|
mount_point='/mnt/opt', backup_info=None,
|
|
config_contents='cont', root_password='1-2-3-4',
|
|
overrides='overrides', cluster_config={'id': '2-3-4-5'},
|
|
snapshot=None, modules=modules)
|
|
|
|
def test_upgrade(self):
|
|
instance_version = "v1.0.1"
|
|
location = "http://swift/trove-guestagent-v1.0.1.tar.gz"
|
|
# execute
|
|
self.api.upgrade(instance_version, location)
|
|
# verify
|
|
self._verify_rpc_prepare_before_cast()
|
|
self._verify_cast(
|
|
'upgrade', instance_version=instance_version,
|
|
location=location, metadata=None)
|
|
|
|
def _verify_rpc_prepare_before_call(self):
|
|
self.api.client.prepare.assert_called_once_with(
|
|
version=RPC_API_VERSION, timeout=mock.ANY)
|
|
|
|
def _verify_rpc_prepare_before_cast(self):
|
|
self.api.client.prepare.assert_called_once_with(
|
|
version=RPC_API_VERSION)
|
|
|
|
def _verify_cast(self, *args, **kwargs):
|
|
self.call_context.cast.assert_called_once_with(self.context, *args,
|
|
**kwargs)
|
|
|
|
def _verify_call(self, *args, **kwargs):
|
|
self.call_context.call.assert_called_once_with(self.context, *args,
|
|
**kwargs)
|
|
|
|
def _mock_rpc_client(self):
|
|
self.call_context = mock.Mock()
|
|
self.api.client.prepare = mock.Mock(return_value=self.call_context)
|
|
self.call_context.call = mock.Mock()
|
|
self.call_context.cast = mock.Mock()
|
|
|
|
|
|
class ApiStrategyTest(trove_testtools.TestCase):
|
|
|
|
@mock.patch('trove.guestagent.api.API.__init__',
|
|
mock.Mock(return_value=None))
|
|
def test_guest_client_mongodb(self):
|
|
client = guest_client(mock.Mock(), mock.Mock(), 'mongodb')
|
|
self.assertFalse(hasattr(client, 'add_config_servers2'))
|
|
self.assertTrue(callable(client.add_config_servers))
|
|
|
|
@mock.patch('trove.guestagent.api.API.__init__',
|
|
mock.Mock(return_value=None))
|
|
def test_guest_client_vertica(self):
|
|
client = guest_client(mock.Mock(), mock.Mock(), 'vertica')
|
|
self.assertFalse(hasattr(client, 'get_public_keys2'))
|
|
self.assertTrue(callable(client.get_public_keys))
|