trove/trove/tests/unittests/taskmanager/test_models.py

750 lines
31 KiB
Python

# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import testtools
from mock import Mock, MagicMock, patch
from testtools.matchers import Equals, Is
from cinderclient import exceptions as cinder_exceptions
from novaclient import exceptions as nova_exceptions
import novaclient.v1_1.servers
import novaclient.v1_1.flavors
import cinderclient.v2.client as cinderclient
from oslo.utils import timeutils
import trove.backup.models
import trove.common.context
from trove.datastore import models as datastore_models
import trove.db.models
from trove.taskmanager import models as taskmanager_models
import trove.guestagent.api
from trove.backup import models as backup_models
from trove.backup import state
from trove.common import remote
from trove.common.exception import GuestError
from trove.common.exception import PollTimeOut
from trove.common.exception import TroveError
from trove.common.exception import MalformedSecurityGroupRuleError
from trove.common.instance import ServiceStatuses
from trove.extensions.mysql import models as mysql_models
from trove.instance.models import InstanceServiceStatus
from trove.instance.models import InstanceStatus
from trove.instance.models import DBInstance
from trove.instance.tasks import InstanceTasks
from trove.tests.unittests.util import util
from trove.common import utils
from trove import rpc
from swiftclient.client import ClientException
from tempfile import NamedTemporaryFile
import os
import trove.common.template as template
import uuid
INST_ID = 'dbinst-id-1'
VOLUME_ID = 'volume-id-1'
class FakeOptGroup(object):
def __init__(self, tcp_ports=['3306', '3301-3307'],
udp_ports=[]):
self.tcp_ports = tcp_ports
self.udp_ports = udp_ports
class fake_Server:
def __init__(self):
self.id = None
self.name = None
self.image_id = None
self.flavor_id = None
self.files = None
self.userdata = None
self.security_groups = None
self.block_device_mapping = None
self.status = 'ACTIVE'
class fake_ServerManager:
def create(self, name, image_id, flavor_id, files, userdata,
security_groups, block_device_mapping, availability_zone=None,
nics=None, config_drive=False):
server = fake_Server()
server.id = "server_id"
server.name = name
server.image_id = image_id
server.flavor_id = flavor_id
server.files = files
server.userdata = userdata
server.security_groups = security_groups
server.block_device_mapping = block_device_mapping
server.availability_zone = availability_zone
server.nics = nics
return server
class fake_nova_client:
def __init__(self):
self.servers = fake_ServerManager()
class fake_InstanceServiceStatus(object):
_instance = None
def __init__(self):
self.deleted = False
self.status = None
pass
def set_status(self, status):
self.status = status
pass
def get_status(self):
return self.status
@classmethod
def find_by(cls, **kwargs):
if not cls._instance:
cls._instance = fake_InstanceServiceStatus()
return cls._instance
def save(self):
pass
def delete(self):
self.deleted = True
pass
def is_deleted(self):
return self.deleted
class fake_DBInstance(object):
_instance = None
def __init__(self):
self.deleted = False
pass
@classmethod
def find_by(cls, **kwargs):
if not cls._instance:
cls._instance = fake_DBInstance()
return cls._instance
def set_task_status(self, status):
self.status = status
pass
def get_task_status(self):
return self.status
def save(self):
pass
def delete(self):
self.deleted = True
pass
def is_deleted(self):
return self.deleted
class FreshInstanceTasksTest(testtools.TestCase):
def setUp(self):
super(FreshInstanceTasksTest, self).setUp()
mock_instance = patch('trove.instance.models.FreshInstance')
mock_instance.start()
self.addCleanup(mock_instance.stop)
mock_instance.id = Mock(return_value='instance_id')
mock_instance.tenant_id = Mock(return_value="tenant_id")
mock_instance.hostname = Mock(return_value="hostname")
mock_instance.name = Mock(return_value='name')
mock_instance.nova_client = Mock(
return_value=fake_nova_client())
mock_datastore_v = patch(
'trove.datastore.models.DatastoreVersion')
mock_datastore_v.start()
self.addCleanup(mock_datastore_v.stop)
mock_datastore = patch(
'trove.datastore.models.Datastore')
mock_datastore.start()
self.addCleanup(mock_datastore.stop)
taskmanager_models.FreshInstanceTasks.nova_client = fake_nova_client()
self.orig_ISS_find_by = InstanceServiceStatus.find_by
self.orig_DBI_find_by = DBInstance.find_by
self.userdata = "hello moto"
self.guestconfig_content = "guest config"
with NamedTemporaryFile(suffix=".cloudinit", delete=False) as f:
self.cloudinit = f.name
f.write(self.userdata)
with NamedTemporaryFile(delete=False) as f:
self.guestconfig = f.name
f.write(self.guestconfig_content)
self.freshinstancetasks = taskmanager_models.FreshInstanceTasks(
None, Mock(), None, None)
def tearDown(self):
super(FreshInstanceTasksTest, self).tearDown()
os.remove(self.cloudinit)
os.remove(self.guestconfig)
InstanceServiceStatus.find_by = self.orig_ISS_find_by
DBInstance.find_by = self.orig_DBI_find_by
@patch('trove.taskmanager.models.CONF')
def test_create_instance_userdata(self, mock_conf):
cloudinit_location = os.path.dirname(self.cloudinit)
datastore_manager = os.path.splitext(os.path.basename(self.
cloudinit))[0]
def fake_conf_getter(*args, **kwargs):
if args[0] == 'cloudinit_location':
return cloudinit_location
else:
return ''
mock_conf.get.side_effect = fake_conf_getter
server = self.freshinstancetasks._create_server(
None, None, None, datastore_manager, None, None, None)
self.assertEqual(server.userdata, self.userdata)
@patch('trove.taskmanager.models.CONF')
def test_create_instance_guestconfig(self, mock_conf):
def fake_conf_getter(*args, **kwargs):
if args[0] == 'guest_config':
return self.guestconfig
else:
return ''
mock_conf.get.side_effect = fake_conf_getter
# execute
server = self.freshinstancetasks._create_server(
None, None, None, "test", None, None, None)
# verify
self.assertTrue('/etc/trove-guestagent.conf' in server.files)
self.assertEqual(server.files['/etc/trove-guestagent.conf'],
self.guestconfig_content)
@patch('trove.taskmanager.models.CONF')
def test_create_instance_with_az_kwarg(self, mock_conf):
mock_conf.get.return_value = ''
# execute
server = self.freshinstancetasks._create_server(
None, None, None, None, None, availability_zone='nova', nics=None)
# verify
self.assertIsNotNone(server)
@patch('trove.taskmanager.models.CONF')
def test_create_instance_with_az(self, mock_conf):
mock_conf.get.return_value = ''
# execute
server = self.freshinstancetasks._create_server(
None, None, None, None, None, 'nova', None)
# verify
self.assertIsNotNone(server)
@patch('trove.taskmanager.models.CONF')
def test_create_instance_with_az_none(self, mock_conf):
mock_conf.get.return_value = ''
# execute
server = self.freshinstancetasks._create_server(
None, None, None, None, None, None, None)
# verify
self.assertIsNotNone(server)
@patch('trove.taskmanager.models.CONF')
def test_update_status_of_intance_failure(self, mock_conf):
mock_conf.get.return_value = ''
InstanceServiceStatus.find_by = Mock(
return_value=fake_InstanceServiceStatus.find_by())
DBInstance.find_by = Mock(
return_value=fake_DBInstance.find_by())
self.freshinstancetasks.update_statuses_on_time_out()
self.assertEqual(fake_InstanceServiceStatus.find_by().get_status(),
ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT)
self.assertEqual(fake_DBInstance.find_by().get_task_status(),
InstanceTasks.BUILDING_ERROR_TIMEOUT_GA)
def test_create_sg_rules_success(self):
datastore_manager = 'mysql'
taskmanager_models.SecurityGroup.create_for_instance = (
Mock(return_value={'id': uuid.uuid4(),
'name': uuid.uuid4()}))
taskmanager_models.CONF.get = Mock(return_value=FakeOptGroup())
taskmanager_models.SecurityGroupRule.create_sec_group_rule = (
Mock())
self.freshinstancetasks._create_secgroup(datastore_manager)
self.assertEqual(2, taskmanager_models.SecurityGroupRule.
create_sec_group_rule.call_count)
def test_create_sg_rules_format_exception_raised(self):
datastore_manager = 'mysql'
taskmanager_models.SecurityGroup.create_for_instance = (
Mock(return_value={'id': uuid.uuid4(),
'name': uuid.uuid4()}))
taskmanager_models.CONF.get = Mock(
return_value=FakeOptGroup(tcp_ports=['3306', '-3306']))
self.freshinstancetasks.update_db = Mock()
taskmanager_models.SecurityGroupRule.create_sec_group_rule = (
Mock())
self.assertRaises(MalformedSecurityGroupRuleError,
self.freshinstancetasks._create_secgroup,
datastore_manager)
def test_create_sg_rules_greater_than_exception_raised(self):
datastore_manager = 'mysql'
taskmanager_models.SecurityGroup.create_for_instance = (
Mock(return_value={'id': uuid.uuid4(),
'name': uuid.uuid4()}))
taskmanager_models.CONF.get = Mock(
return_value=FakeOptGroup(tcp_ports=['3306', '33060-3306']))
self.freshinstancetasks.update_db = Mock()
taskmanager_models.SecurityGroupRule.create_sec_group_rule = (
Mock())
self.assertRaises(MalformedSecurityGroupRuleError,
self.freshinstancetasks._create_secgroup,
datastore_manager)
def test_create_sg_rules_success_with_duplicated_port_or_range(self):
datastore_manager = 'mysql'
taskmanager_models.SecurityGroup.create_for_instance = (
Mock(return_value={'id': uuid.uuid4(),
'name': uuid.uuid4()}))
taskmanager_models.CONF.get = Mock(
return_value=FakeOptGroup(
tcp_ports=['3306', '3306', '3306-3307', '3306-3307']))
taskmanager_models.SecurityGroupRule.create_sec_group_rule = (
Mock())
self.freshinstancetasks.update_db = Mock()
self.freshinstancetasks._create_secgroup(datastore_manager)
self.assertEqual(2, taskmanager_models.SecurityGroupRule.
create_sec_group_rule.call_count)
def test_create_sg_rules_exception_with_malformed_ports_or_range(self):
datastore_manager = 'mysql'
taskmanager_models.SecurityGroup.create_for_instance = (
Mock(return_value={'id': uuid.uuid4(),
'name': uuid.uuid4()}))
taskmanager_models.CONF.get = Mock(
return_value=FakeOptGroup(tcp_ports=['A', 'B-C']))
self.freshinstancetasks.update_db = Mock()
self.assertRaises(MalformedSecurityGroupRuleError,
self.freshinstancetasks._create_secgroup,
datastore_manager)
class ResizeVolumeTest(testtools.TestCase):
def setUp(self):
super(ResizeVolumeTest, self).setUp()
utils.poll_until = Mock()
timeutils.isotime = Mock()
self.instance = Mock()
self.old_vol_size = 1
self.new_vol_size = 2
self.action = taskmanager_models.ResizeVolumeAction(self.instance,
self.old_vol_size,
self.new_vol_size)
class FakeGroup():
def __init__(self):
self.mount_point = 'var/lib/mysql'
self.device_path = '/dev/vdb'
taskmanager_models.CONF.get = Mock(return_value=FakeGroup())
def tearDown(self):
super(ResizeVolumeTest, self).tearDown()
def test_resize_volume_unmount_exception(self):
self.instance.guest.unmount_volume = Mock(
side_effect=GuestError("test exception"))
self.assertRaises(GuestError,
self.action._unmount_volume,
recover_func=self.action._recover_restart)
self.assertEqual(1, self.instance.restart.call_count)
self.instance.guest.unmount_volume.side_effect = None
self.instance.reset_mock()
def test_resize_volume_detach_exception(self):
self.instance.nova_client.volumes.delete_server_volume = Mock(
side_effect=nova_exceptions.ClientException("test exception"))
self.assertRaises(nova_exceptions.ClientException,
self.action._detach_volume,
recover_func=self.action._recover_mount_restart)
self.assertEqual(1, self.instance.guest.mount_volume.call_count)
self.assertEqual(1, self.instance.restart.call_count)
self.instance.nova_client.volumes.delete_server_volume.side_effect = (
None)
self.instance.reset_mock()
def test_resize_volume_extend_exception(self):
self.instance.volume_client.volumes.extend = Mock(
side_effect=cinder_exceptions.ClientException("test exception"))
self.assertRaises(cinder_exceptions.ClientException,
self.action._extend,
recover_func=self.action._recover_full)
attach_count = (
self.instance.nova_client.volumes.create_server_volume.call_count)
self.assertEqual(1, attach_count)
self.assertEqual(1, self.instance.guest.mount_volume.call_count)
self.assertEqual(1, self.instance.restart.call_count)
self.instance.volume_client.volumes.extend.side_effect = None
self.instance.reset_mock()
def test_resize_volume_verify_extend_no_volume(self):
self.instance.volume_client.volumes.get = Mock(
return_value=None)
self.assertRaises(cinder_exceptions.ClientException,
self.action._verify_extend)
self.instance.reset_mock()
def test_resize_volume_poll_timeout(self):
utils.poll_until = Mock(side_effect=PollTimeOut)
self.assertRaises(PollTimeOut, self.action._verify_extend)
self.assertEqual(2, self.instance.volume_client.volumes.get.call_count)
utils.poll_until.side_effect = None
self.instance.reset_mock()
def test_resize_volume_active_server_succeeds(self):
server = Mock(status=InstanceStatus.ACTIVE)
self.instance.attach_mock(server, 'server')
self.action.execute()
self.assertEqual(1, self.instance.guest.stop_db.call_count)
self.assertEqual(1, self.instance.guest.unmount_volume.call_count)
detach_count = (
self.instance.nova_client.volumes.delete_server_volume.call_count)
self.assertEqual(1, detach_count)
extend_count = self.instance.volume_client.volumes.extend.call_count
self.assertEqual(1, extend_count)
attach_count = (
self.instance.nova_client.volumes.create_server_volume.call_count)
self.assertEqual(1, attach_count)
self.assertEqual(1, self.instance.guest.resize_fs.call_count)
self.assertEqual(1, self.instance.guest.mount_volume.call_count)
self.assertEqual(1, self.instance.restart.call_count)
self.instance.reset_mock()
def test_resize_volume_server_error_fails(self):
server = Mock(status=InstanceStatus.ERROR)
self.instance.attach_mock(server, 'server')
self.assertRaises(TroveError, self.action.execute)
self.instance.reset_mock()
class BuiltInstanceTasksTest(testtools.TestCase):
def get_inst_service_status(self, status_id, statuses):
answers = []
for i, status in enumerate(statuses):
inst_svc_status = InstanceServiceStatus(status,
id="%s-%s" % (status_id,
i))
inst_svc_status.save = MagicMock(return_value=None)
answers.append(inst_svc_status)
return answers
def _stub_volume_client(self):
self.instance_task._volume_client = MagicMock(spec=cinderclient.Client)
stub_volume_mgr = MagicMock(spec=cinderclient.volumes.VolumeManager)
self.instance_task.volume_client.volumes = stub_volume_mgr
stub_volume_mgr.extend = MagicMock(return_value=None)
stub_new_volume = cinderclient.volumes.Volume(
stub_volume_mgr, {'status': 'available', 'size': 2}, True)
stub_volume_mgr.get = MagicMock(return_value=stub_new_volume)
stub_volume_mgr.attach = MagicMock(return_value=None)
def setUp(self):
super(BuiltInstanceTasksTest, self).setUp()
self.new_flavor = {'id': 8, 'ram': 768, 'name': 'bigger_flavor'}
stub_nova_server = MagicMock()
rpc.get_notifier = MagicMock()
rpc.get_client = MagicMock()
db_instance = DBInstance(InstanceTasks.NONE,
id=INST_ID,
name='resize-inst-name',
datastore_version_id='1',
datastore_id='id-1',
flavor_id='6',
manager='mysql',
created=datetime.datetime.utcnow(),
updated=datetime.datetime.utcnow(),
compute_instance_id='computeinst-id-1',
tenant_id='testresize-tenant-id',
volume_size='1',
volume_id=VOLUME_ID)
# this is used during the final check of whether the resize successful
db_instance.server_status = 'ACTIVE'
self.db_instance = db_instance
datastore_models.DatastoreVersion.load_by_uuid = MagicMock(
return_value=datastore_models.DatastoreVersion(db_instance))
datastore_models.Datastore.load = MagicMock(
return_value=datastore_models.Datastore(db_instance))
self.instance_task = taskmanager_models.BuiltInstanceTasks(
trove.common.context.TroveContext(),
db_instance,
stub_nova_server,
InstanceServiceStatus(ServiceStatuses.RUNNING,
id='inst-stat-id-0'))
self.instance_task._guest = MagicMock(spec=trove.guestagent.api.API)
self.instance_task._nova_client = MagicMock(
spec=novaclient.v1_1.Client)
self.stub_server_mgr = MagicMock(
spec=novaclient.v1_1.servers.ServerManager)
self.stub_running_server = MagicMock(
spec=novaclient.v1_1.servers.Server)
self.stub_running_server.status = 'ACTIVE'
self.stub_running_server.flavor = {'id': 6, 'ram': 512}
self.stub_verifying_server = MagicMock(
spec=novaclient.v1_1.servers.Server)
self.stub_verifying_server.status = 'VERIFY_RESIZE'
self.stub_verifying_server.flavor = {'id': 8, 'ram': 768}
self.stub_server_mgr.get = MagicMock(
return_value=self.stub_verifying_server)
self.instance_task._nova_client.servers = self.stub_server_mgr
stub_flavor_manager = MagicMock(
spec=novaclient.v1_1.flavors.FlavorManager)
self.instance_task._nova_client.flavors = stub_flavor_manager
nova_flavor = novaclient.v1_1.flavors.Flavor(stub_flavor_manager,
self.new_flavor,
True)
stub_flavor_manager.get = MagicMock(return_value=nova_flavor)
answers = (status for status in
self.get_inst_service_status('inst_stat-id',
[ServiceStatuses.SHUTDOWN,
ServiceStatuses.RUNNING,
ServiceStatuses.RUNNING,
ServiceStatuses.RUNNING]))
def side_effect_func(*args, **kwargs):
if 'instance_id' in kwargs:
return answers.next()
elif ('id' in kwargs and 'deleted' in kwargs
and not kwargs['deleted']):
return db_instance
else:
return MagicMock()
trove.db.models.DatabaseModelBase.find_by = MagicMock(
side_effect=side_effect_func)
template.SingleInstanceConfigTemplate = MagicMock(
spec=template.SingleInstanceConfigTemplate)
db_instance.save = MagicMock(return_value=None)
trove.backup.models.Backup.running = MagicMock(return_value=None)
if 'volume' in self._testMethodName:
self._stub_volume_client()
def tearDown(self):
super(BuiltInstanceTasksTest, self).tearDown()
def test_resize_flavor(self):
orig_server = self.instance_task.server
self.instance_task.resize_flavor({'id': 1, 'ram': 512},
self.new_flavor)
# verify
self.assertIsNot(self.instance_task.server, orig_server)
self.instance_task._guest.stop_db.assert_any_call(
do_not_start_on_reboot=True)
orig_server.resize.assert_any_call(self.new_flavor['id'])
self.assertThat(self.db_instance.task_status, Is(InstanceTasks.NONE))
self.assertEqual(self.stub_server_mgr.get.call_count, 1)
self.assertThat(self.db_instance.flavor_id, Is(self.new_flavor['id']))
def test_resize_flavor_resize_failure(self):
orig_server = self.instance_task.server
self.stub_verifying_server.status = 'ERROR'
with patch.object(self.instance_task._nova_client.servers, 'get',
return_value=self.stub_verifying_server):
# execute
self.assertRaises(TroveError, self.instance_task.resize_flavor,
{'id': 1, 'ram': 512}, self.new_flavor)
# verify
self.assertTrue(self.stub_server_mgr.get.called)
self.assertIs(self.instance_task.server,
self.stub_verifying_server)
self.instance_task._guest.stop_db.assert_any_call(
do_not_start_on_reboot=True)
orig_server.resize.assert_any_call(self.new_flavor['id'])
self.assertThat(self.db_instance.task_status,
Is(InstanceTasks.NONE))
self.assertThat(self.db_instance.flavor_id, Is('6'))
@patch.object(utils, 'poll_until')
def test_reboot(self, mock_poll):
self.instance_task.datastore_status_matches = Mock(return_value=True)
self.instance_task._refresh_datastore_status = Mock()
self.instance_task.server.reboot = Mock()
self.instance_task.set_datastore_status_to_paused = Mock()
self.instance_task.reboot()
self.instance_task._guest.stop_db.assert_any_call()
self.instance_task._refresh_datastore_status.assert_any_call()
self.instance_task.server.reboot.assert_any_call()
self.instance_task.set_datastore_status_to_paused.assert_any_call()
@patch.object(utils, 'poll_until')
def test_reboot_datastore_not_ready(self, mock_poll):
self.instance_task.datastore_status_matches = Mock(return_value=False)
self.instance_task._refresh_datastore_status = Mock()
self.instance_task.server.reboot = Mock()
self.instance_task.set_datastore_status_to_paused = Mock()
self.instance_task.reboot()
self.instance_task._guest.stop_db.assert_any_call()
self.instance_task._refresh_datastore_status.assert_any_call()
assert not self.instance_task.server.reboot.called
assert not self.instance_task.set_datastore_status_to_paused.called
class BackupTasksTest(testtools.TestCase):
def setUp(self):
super(BackupTasksTest, self).setUp()
self.backup = backup_models.DBBackup()
self.backup.id = 'backup_id'
self.backup.name = 'backup_test',
self.backup.description = 'test desc'
self.backup.location = 'http://xxx/z_CLOUD/12e48.xbstream.gz'
self.backup.instance_id = 'instance id'
self.backup.created = 'yesterday'
self.backup.updated = 'today'
self.backup.size = 2.0
self.backup.state = state.BackupState.NEW
self.container_content = (None,
[{'name': 'first'},
{'name': 'second'},
{'name': 'third'}])
backup_models.Backup.delete = MagicMock(return_value=None)
backup_models.Backup.get_by_id = MagicMock(return_value=self.backup)
backup_models.DBBackup.save = MagicMock(return_value=self.backup)
self.backup.delete = MagicMock(return_value=None)
self.swift_client = MagicMock()
remote.create_swift_client = MagicMock(return_value=self.swift_client)
self.swift_client.head_container = MagicMock(
side_effect=ClientException("foo"))
self.swift_client.head_object = MagicMock(
side_effect=ClientException("foo"))
self.swift_client.get_container = MagicMock(
return_value=self.container_content)
self.swift_client.delete_object = MagicMock(return_value=None)
self.swift_client.delete_container = MagicMock(return_value=None)
def tearDown(self):
super(BackupTasksTest, self).tearDown()
def test_delete_backup_nolocation(self):
self.backup.location = ''
taskmanager_models.BackupTasks.delete_backup('dummy context',
self.backup.id)
self.backup.delete.assert_any_call()
def test_delete_backup_fail_delete_manifest(self):
with patch.object(self.swift_client, 'delete_object',
side_effect=ClientException("foo")):
with patch.object(self.swift_client, 'head_object',
return_value={}):
self.assertRaises(
TroveError,
taskmanager_models.BackupTasks.delete_backup,
'dummy context', self.backup.id)
self.assertFalse(backup_models.Backup.delete.called)
self.assertEqual(
state.BackupState.DELETE_FAILED,
self.backup.state,
"backup should be in DELETE_FAILED status")
def test_delete_backup_fail_delete_segment(self):
with patch.object(self.swift_client, 'delete_object',
side_effect=ClientException("foo")):
self.assertRaises(
TroveError,
taskmanager_models.BackupTasks.delete_backup,
'dummy context', self.backup.id)
self.assertFalse(backup_models.Backup.delete.called)
self.assertEqual(
state.BackupState.DELETE_FAILED,
self.backup.state,
"backup should be in DELETE_FAILED status")
def test_parse_manifest(self):
manifest = 'container/prefix'
cont, prefix = taskmanager_models.BackupTasks._parse_manifest(manifest)
self.assertEqual(cont, 'container')
self.assertEqual(prefix, 'prefix')
def test_parse_manifest_bad(self):
manifest = 'bad_prefix'
cont, prefix = taskmanager_models.BackupTasks._parse_manifest(manifest)
self.assertEqual(cont, None)
self.assertEqual(prefix, None)
def test_parse_manifest_long(self):
manifest = 'container/long/path/to/prefix'
cont, prefix = taskmanager_models.BackupTasks._parse_manifest(manifest)
self.assertEqual(cont, 'container')
self.assertEqual(prefix, 'long/path/to/prefix')
def test_parse_manifest_short(self):
manifest = 'container/'
cont, prefix = taskmanager_models.BackupTasks._parse_manifest(manifest)
self.assertEqual(cont, 'container')
self.assertEqual(prefix, '')
class NotifyMixinTest(testtools.TestCase):
def test_get_service_id(self):
id_map = {
'mysql': '123',
'percona': 'abc'
}
mixin = taskmanager_models.NotifyMixin()
self.assertThat(mixin._get_service_id('mysql', id_map), Equals('123'))
def test_get_service_id_unknown(self):
id_map = {
'mysql': '123',
'percona': 'abc'
}
transformer = taskmanager_models.NotifyMixin()
self.assertThat(transformer._get_service_id('m0ng0', id_map),
Equals('unknown-service-id-error'))
class RootReportTest(testtools.TestCase):
def setUp(self):
super(RootReportTest, self).setUp()
util.init_db()
def tearDown(self):
super(RootReportTest, self).tearDown()
def test_report_root_first_time(self):
report = mysql_models.RootHistory.create(
None, utils.generate_uuid(), 'root')
self.assertIsNotNone(report)
def test_report_root_double_create(self):
uuid = utils.generate_uuid()
history = mysql_models.RootHistory(uuid, 'root').save()
mysql_models.RootHistory.load = Mock(return_value=history)
report = mysql_models.RootHistory.create(
None, uuid, 'root')
self.assertTrue(mysql_models.RootHistory.load.called)
self.assertEqual(history.user, report.user)
self.assertEqual(history.id, report.id)