Merge "Mysql guest agent functionality for replication"

This commit is contained in:
Jenkins 2014-08-23 16:43:32 +00:00 committed by Gerrit Code Review
commit 315c20cc8c
19 changed files with 611 additions and 38 deletions

View File

@ -94,3 +94,11 @@ backup_aes_cbc_key = "default_aes_cbc_key"
backup_use_snet = False
backup_chunk_size = 65536
backup_segment_max_size = 2147483648
[mysql]
# Configuration for Replication
replication_strategy = MysqlBinlogReplication
replication_namespace = trove.guestagent.strategies.replication.mysql_binlog
replication_user = slave_user
replication_password = slave_password

View File

@ -174,6 +174,7 @@ if __name__ == "__main__":
from trove.tests.api import instances_resize # noqa
from trove.tests.api import databases # noqa
from trove.tests.api import datastores # noqa
from trove.tests.api import replication # noqa
from trove.tests.api import root # noqa
from trove.tests.api import root_on_create # noqa
from trove.tests.api import users # noqa

View File

@ -299,6 +299,15 @@ mysql_opts = [
'if trove_security_groups_support is True).'),
cfg.StrOpt('backup_strategy', default='InnoBackupEx',
help='Default strategy to perform backups.'),
cfg.StrOpt('replication_strategy', default='MysqlBinlogReplication',
help='Default strategy for replication.'),
cfg.StrOpt('replication_namespace',
default='trove.guestagent.strategies.replication.mysql_binlog',
help='Namespace to load replication strategies from.'),
cfg.StrOpt('replication_user', default='slave_user',
help='Userid for replication slave.', secret=True),
cfg.StrOpt('replication_password', default='NETOU7897NNLOU',
help='Password for replication slave user.', secret=True),
cfg.StrOpt('mount_point', default='/var/lib/mysql',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),
@ -334,6 +343,15 @@ percona_opts = [
'if trove_security_groups_support is True).'),
cfg.StrOpt('backup_strategy', default='InnoBackupEx',
help='Default strategy to perform backups.'),
cfg.StrOpt('replication_strategy', default='MysqlBinlogReplication',
help='Default strategy for replication.'),
cfg.StrOpt('replication_namespace',
default='trove.guestagent.strategies.replication.mysql_binlog',
help='Namespace to load replication strategies from.'),
cfg.StrOpt('replication_user', default='slave_user',
help='Userid for replication slave.'),
cfg.StrOpt('replication_password', default='NETOU7897NNLOU',
help='Password for replication slave user.'),
cfg.StrOpt('mount_point', default='/var/lib/mysql',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),
@ -369,6 +387,8 @@ redis_opts = [
'if trove_security_groups_support is True).'),
cfg.StrOpt('backup_strategy', default=None,
help='Default strategy to perform backups.'),
cfg.StrOpt('replication_strategy', default=None,
help='Default strategy for replication.'),
cfg.StrOpt('mount_point', default='/var/lib/redis',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),
@ -395,6 +415,8 @@ cassandra_opts = [
'if trove_security_groups_support is True).'),
cfg.StrOpt('backup_strategy', default=None,
help='Default strategy to perform backups.'),
cfg.StrOpt('replication_strategy', default=None,
help='Default strategy for replication.'),
cfg.StrOpt('mount_point', default='/var/lib/cassandra',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),
@ -423,6 +445,8 @@ couchbase_opts = [
'if trove_security_groups_support is True).'),
cfg.StrOpt('backup_strategy', default='CbBackup',
help='Default strategy to perform backups.'),
cfg.StrOpt('replication_strategy', default=None,
help='Default strategy for replication.'),
cfg.StrOpt('mount_point', default='/var/lib/couchbase',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),
@ -458,6 +482,8 @@ mongodb_opts = [
'if trove_security_groups_support is True).'),
cfg.StrOpt('backup_strategy', default=None,
help='Default strategy to perform backups.'),
cfg.StrOpt('replication_strategy', default=None,
help='Default strategy for replication.'),
cfg.StrOpt('mount_point', default='/var/lib/mongodb',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),

View File

@ -252,6 +252,12 @@ class VolumeNotSupported(TroveError):
message = _("Volume support is not enabled.")
class ReplicationNotSupported(TroveError):
message = _("Replication is not supported for "
"the '%(datastore)s' datastore.")
class TaskManagerError(TroveError):
message = _("An error occurred communicating with the task manager: "
@ -445,3 +451,15 @@ class NoServiceEndpoint(TroveError):
class EmptyCatalog(NoServiceEndpoint):
"""The service catalog is empty."""
message = _("Empty catalog.")
class IncompatibleReplicationStrategy(TroveError):
message = _("Instance with replication strategy %(guest_strategy)s "
"cannot replicate from instance with replication strategy "
"%(replication_strategy)s.")
class InsufficientSpaceForSlave(TroveError):
message = _("The target instance has only %(slave_volume_size)sG free, "
"but the replication snapshot contains %(dataset_size)sG "
"of data.")

View File

@ -326,8 +326,8 @@ class API(proxy.RpcProxy):
def get_replication_snapshot(self, master_config=None):
LOG.debug("Retrieving replication snapshot from instance %s.", self.id)
self._call("get_replication_snapshot", AGENT_HIGH_TIMEOUT,
master_config=master_config)
return self._call("get_replication_snapshot", AGENT_HIGH_TIMEOUT,
master_config=master_config)
def attach_replication_slave(self, snapshot, slave_config=None):
LOG.debug("Configuring instance %s to replicate from %s.",
@ -337,7 +337,7 @@ class API(proxy.RpcProxy):
def detach_replication_slave(self):
LOG.debug("Detaching slave %s from its master.", self.id)
self._call("detach_replication_slave", AGENT_LOW_TIMEOUT)
self._call("detach_replication_slave", AGENT_HIGH_TIMEOUT)
def demote_replication_master(self):
LOG.debug("Demoting instance %s to non-master.", self.id)

View File

@ -26,6 +26,7 @@ from trove.guestagent import volume
from trove.guestagent.datastore.mysql.service import MySqlAppStatus
from trove.guestagent.datastore.mysql.service import MySqlAdmin
from trove.guestagent.datastore.mysql.service import MySqlApp
from trove.guestagent.strategies.replication import get_replication_strategy
from trove.openstack.common import log as logging
from trove.openstack.common.gettextutils import _
from trove.openstack.common import periodic_task
@ -33,7 +34,11 @@ from trove.openstack.common import periodic_task
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
MANAGER = CONF.datastore_manager
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql'
REPLICATION_STRATEGY = CONF.get(MANAGER).replication_strategy
REPLICATION_NAMESPACE = CONF.get(MANAGER).replication_namespace
REPLICATION_STRATEGY_CLASS = get_replication_strategy(REPLICATION_STRATEGY,
REPLICATION_NAMESPACE)
class Manager(periodic_task.PeriodicTasks):
@ -166,8 +171,7 @@ class Manager(periodic_task.PeriodicTasks):
def get_filesystem_stats(self, context, fs_path):
"""Gets the filesystem stats for the path given."""
mount_point = CONF.get(
'mysql' if not MANAGER else MANAGER).mount_point
mount_point = CONF.get(MANAGER).mount_point
return dbaas.get_filesystem_volume_stats(mount_point)
def create_backup(self, context, backup_info):
@ -209,22 +213,69 @@ class Manager(periodic_task.PeriodicTasks):
app = MySqlApp(MySqlAppStatus.get())
app.apply_overrides(overrides)
def get_replication_snapshot(self, master_config):
def get_replication_snapshot(self, context, master_config):
LOG.debug("Getting replication snapshot.")
raise exception.DatastoreOperationNotSupported(
operation='get_replication_snapshot', datastore=MANAGER)
app = MySqlApp(MySqlAppStatus.get())
def attach_replication_slave(self, snapshot, slave_config):
replication = REPLICATION_STRATEGY_CLASS(context)
replication.enable_as_master(app, master_config)
snapshot_id, log_position = (
replication.snapshot_for_replication(app, None, master_config))
mount_point = CONF.get(MANAGER).mount_point
volume_stats = dbaas.get_filesystem_volume_stats(mount_point)
replication_snapshot = {
'dataset': {
'datastore_manager': MANAGER,
'dataset_size': volume_stats.get('used', 0.0),
'volume_size': volume_stats.get('total', 0.0),
'snapshot_id': snapshot_id
},
'replication_strategy': REPLICATION_STRATEGY,
'master': replication.get_master_ref(app, master_config),
'log_position': log_position
}
return replication_snapshot
def _validate_slave_for_replication(self, context, snapshot):
if (snapshot['replication_strategy'] != REPLICATION_STRATEGY):
raise exception.IncompatibleReplicationStrategy(
snapshot.update({
'guest_strategy': REPLICATION_STRATEGY
}))
mount_point = CONF.get(MANAGER).mount_point
volume_stats = dbaas.get_filesystem_volume_stats(mount_point)
if (volume_stats.get('total', 0.0) <
snapshot['dataset']['dataset_size']):
raise exception.InsufficientSpaceForSlave(
snapshot.update({
'slave_volume_size': volume_stats.get('total', 0.0)
}))
def attach_replication_slave(self, context, snapshot, slave_config):
LOG.debug("Attaching replication snapshot.")
raise exception.DatastoreOperationNotSupported(
operation='attach_replication_slave', datastore=MANAGER)
app = MySqlApp(MySqlAppStatus.get())
try:
self._validate_slave_for_replication(context, snapshot)
replication = REPLICATION_STRATEGY_CLASS(context)
replication.enable_as_slave(app, snapshot)
except Exception:
LOG.exception("Error enabling replication.")
app.status.set_status(rd_instance.ServiceStatuses.FAILED)
raise
def detach_replication_slave(self):
LOG.debug("Detaching replication slave.")
raise exception.DatastoreOperationNotSupported(
operation='detach_replication_slave', datastore=MANAGER)
def detach_replication_slave(self, context):
LOG.debug("Detaching replication snapshot.")
app = MySqlApp(MySqlAppStatus.get())
replication = REPLICATION_STRATEGY_CLASS(context)
replication.detach_slave(app)
def demote_replication_master(self):
def demote_replication_master(self, context):
LOG.debug("Demoting replication master.")
raise exception.DatastoreOperationNotSupported(
operation='demote_replication_master', datastore=MANAGER)
app = MySqlApp(MySqlAppStatus.get())
replication = REPLICATION_STRATEGY_CLASS(context)
replication.demote_master(app)

View File

@ -28,6 +28,7 @@ from trove.common import cfg
from trove.common import utils as utils
from trove.common import exception
from trove.common import instance as rd_instance
from trove.common.exception import PollTimeOut
from trove.guestagent.common import operating_system
from trove.guestagent.common import sql_query
from trove.guestagent.db import models
@ -47,6 +48,9 @@ TMP_MYCNF = "/tmp/my.cnf.tmp"
MYSQL_BASE_DIR = "/var/lib/mysql"
CONF = cfg.CONF
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql'
REPLICATION_USER = CONF.get(MANAGER).replication_user
REPLICATION_PASSWORD = CONF.get(MANAGER).replication_password
INCLUDE_MARKER_OPERATORS = {
True: ">=",
@ -58,6 +62,8 @@ MYSQL_SERVICE_CANDIDATES = ["mysql", "mysqld", "mysql-server"]
MYSQL_BIN_CANDIDATES = ["/usr/sbin/mysqld", "/usr/libexec/mysqld"]
MYCNF_OVERRIDES = "/etc/mysql/conf.d/overrides.cnf"
MYCNF_OVERRIDES_TMP = "/tmp/overrides.cnf.tmp"
MYCNF_REPLMASTER = "/etc/mysql/conf.d/replication.cnf"
MYCNF_REPLMASTER_TMP = "/tmp/replication.cnf.tmp"
# Create a package impl
@ -798,7 +804,7 @@ class MySqlApp(object):
MYCNF_OVERRIDES)
LOG.info(_("Setting permissions on overrides.cnf."))
utils.execute_with_timeout("sudo", "chmod", "0711",
utils.execute_with_timeout("sudo", "chmod", "0644",
MYCNF_OVERRIDES)
def _remove_overrides(self):
@ -806,6 +812,111 @@ class MySqlApp(object):
if os.path.exists(MYCNF_OVERRIDES):
utils.execute_with_timeout("sudo", "rm", MYCNF_OVERRIDES)
def write_replication_overrides(self, overrideValues):
LOG.info(_("Writing replication.cnf file."))
with open(MYCNF_REPLMASTER_TMP, 'w') as overrides:
overrides.write(overrideValues)
LOG.debug("Moving temp replication.cnf into correct location.")
utils.execute_with_timeout("sudo", "mv", MYCNF_REPLMASTER_TMP,
MYCNF_REPLMASTER)
LOG.debug("Setting permissions on replication.cnf.")
utils.execute_with_timeout("sudo", "chmod", "0644",
MYCNF_REPLMASTER)
def remove_replication_overrides(self):
LOG.info(_("Removing replication configuration file."))
if os.path.exists(MYCNF_REPLMASTER):
utils.execute_with_timeout("sudo", "rm", MYCNF_REPLMASTER)
def grant_replication_privilege(self):
LOG.info(_("Granting Replication Slave privilege."))
with LocalSqlClient(get_engine()) as client:
g = sql_query.Grant(permissions=['REPLICATION SLAVE'],
user=REPLICATION_USER,
clear=REPLICATION_PASSWORD)
t = text(str(g))
client.execute(t)
def revoke_replication_privilege(self):
LOG.info(_("Revoking Replication Slave privilege."))
with LocalSqlClient(get_engine()) as client:
g = sql_query.Revoke(permissions=['REPLICATION SLAVE'],
user=REPLICATION_USER,
clear=REPLICATION_PASSWORD)
t = text(str(g))
client.execute(t)
def get_port(self):
with LocalSqlClient(get_engine()) as client:
result = client.execute('SELECT @@port').first()
return result[0]
def get_binlog_position(self):
with LocalSqlClient(get_engine()) as client:
result = client.execute('SHOW MASTER STATUS').first()
binlog_position = {
'log_file': result['File'],
'position': result['Position']
}
return binlog_position
def change_master_for_binlog(self, host, port, log_position):
LOG.info(_("Configuring replication from %s.") % host)
change_master_cmd = ("CHANGE MASTER TO MASTER_HOST='%(host)s', "
"MASTER_PORT=%(port)s, "
"MASTER_USER='%(user)s', "
"MASTER_PASSWORD='%(password)s', "
"MASTER_LOG_FILE='%(log_file)s', "
"MASTER_LOG_POS=%(log_pos)s" %
{
'host': host,
'port': port,
'user': REPLICATION_USER,
'password': REPLICATION_PASSWORD,
'log_file': log_position['log_file'],
'log_pos': log_position['position']
})
with LocalSqlClient(get_engine()) as client:
client.execute(change_master_cmd)
def start_slave(self):
LOG.info(_("Starting slave replication."))
with LocalSqlClient(get_engine()) as client:
client.execute('START SLAVE')
self._wait_for_slave_status("ON", client, 60)
def stop_slave(self):
LOG.info(_("Stopping slave replication."))
with LocalSqlClient(get_engine()) as client:
client.execute('STOP SLAVE')
client.execute('RESET SLAVE ALL')
self._wait_for_slave_status("OFF", client, 30)
def _wait_for_slave_status(self, status, client, max_time):
def verify_slave_status():
actual_status = client.execute(
"SHOW GLOBAL STATUS like 'slave_running'").first()[1]
return actual_status.upper() == status.upper()
LOG.debug("Waiting for SLAVE_RUNNING to change to %s.", status)
try:
utils.poll_until(verify_slave_status, sleep_time=3,
time_out=max_time)
LOG.info(_("Replication is now %s.") % status.lower())
except PollTimeOut:
raise RuntimeError(
_("Replication is not %(status)s after %(max)d seconds.") % {
'status': status.lower(), 'max': max_time})
def start_mysql(self, update_db=False):
LOG.info(_("Starting MySQL."))
# This is the site of all the trouble in the restart tests.

View File

@ -0,0 +1,25 @@
# Copyright 2014 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from trove.guestagent.strategy import Strategy
from trove.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def get_replication_strategy(replication_driver, ns=__name__):
LOG.debug("Getting replication strategy: %s.", replication_driver)
return Strategy.get_strategy(replication_driver, ns)

View File

@ -0,0 +1,56 @@
# Copyright 2014 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import abc
import six
from trove.guestagent.strategy import Strategy
@six.add_metaclass(abc.ABCMeta)
class Replication(Strategy):
"""Base class for Replication Strategy implementation."""
__strategy_type__ = 'replication'
__strategy_ns__ = 'trove.guestagent.strategies.replication'
def __init__(self, context):
self.context = context
super(Replication, self).__init__()
@abc.abstractmethod
def get_master_ref(self, mysql_service, master_config):
"""Get reference to master site for replication strategy."""
@abc.abstractmethod
def snapshot_for_replication(self, mysql_service, location, master_config):
"""Capture snapshot of master db."""
@abc.abstractmethod
def enable_as_master(self, mysql_service, master_config):
"""Configure underlying database to act as master for replication."""
@abc.abstractmethod
def enable_as_slave(self, mysql_service, snapshot):
"""Configure underlying database as a slave of the given master."""
@abc.abstractmethod
def detach_slave(self, mysql_service):
"""Turn off replication on a slave site."""
@abc.abstractmethod
def demote_master(self, mysql_service):
"""Turn off replication on a master site."""

View File

@ -0,0 +1,75 @@
# Copyright 2014 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from trove.guestagent.strategies.replication import base
from trove.guestagent.common import operating_system
from trove.openstack.common import log as logging
MASTER_CONFIG = """
[mysqld]
log_bin = /var/lib/mysql/mysql-bin.log
"""
SLAVE_CONFIG = """
[mysqld]
log_bin = /var/lib/mysql/mysql-bin.log
relay_log = /var/lib/mysql/mysql-relay-bin.log
"""
LOG = logging.getLogger(__name__)
class MysqlBinlogReplication(base.Replication):
"""MySql Replication coordinated by binlog position."""
def get_master_ref(self, mysql_service, master_config):
master_ref = {
'host': operating_system.get_ip_address(),
'port': mysql_service.get_port()
}
return master_ref
def snapshot_for_replication(self, mysql_service, location, master_config):
# TODO(mwj): snapshot_id = master_config['snapshot_id']
# Check to see if the snapshot_id exists as a backup. If so, and
# it is suitable for restoring the slave, just use it
# Otherwise, create a new backup of the master site.
snapshot_id = None
log_position = mysql_service.get_binlog_position()
return snapshot_id, log_position
def enable_as_master(self, mysql_service, master_config):
mysql_service.write_replication_overrides(MASTER_CONFIG)
mysql_service.restart()
mysql_service.grant_replication_privilege()
def enable_as_slave(self, mysql_service, snapshot):
mysql_service.write_replication_overrides(SLAVE_CONFIG)
mysql_service.restart()
mysql_service.change_master_for_binlog(
snapshot['master']['host'],
snapshot['master']['port'],
snapshot['log_position'])
mysql_service.start_slave()
def detach_slave(self, mysql_service):
mysql_service.stop_slave()
mysql_service.remove_replication_overrides()
mysql_service.restart()
def demote_master(self, mysql_service):
mysql_service.revoke_replication_privilege()
mysql_service.remove_replication_overrides()
mysql_service.restart()

View File

@ -617,6 +617,7 @@ class Instance(BuiltInstance):
availability_zone=None, nics=None, configuration_id=None,
slave_of_id=None):
datastore_cfg = CONF.get(datastore_version.manager)
client = create_nova_client(context)
try:
flavor = client.flavors.get(flavor_id)
@ -624,14 +625,14 @@ class Instance(BuiltInstance):
raise exception.FlavorNotFound(uuid=flavor_id)
deltas = {'instances': 1}
volume_support = CONF.get(datastore_version.manager).volume_support
volume_support = datastore_cfg.volume_support
if volume_support:
validate_volume_size(volume_size)
deltas['volumes'] = volume_size
else:
if volume_size is not None:
raise exception.VolumeNotSupported()
ephemeral_support = CONF.get(datastore_version.manager).device_path
ephemeral_support = datastore_cfg.device_path
if ephemeral_support:
if flavor.ephemeral == 0:
raise exception.LocalStorageNotSpecified(flavor=flavor_id)
@ -653,6 +654,12 @@ class Instance(BuiltInstance):
datastore1=backup_info.datastore.name,
datastore2=datastore.name)
if slave_of_id:
replication_support = datastore_cfg.replication_strategy
if not replication_support:
raise exception.ReplicationNotSupported(
datastore=datastore.name)
if not nics:
nics = []
if CONF.default_neutron_networks:
@ -702,7 +709,8 @@ class Instance(BuiltInstance):
availability_zone,
root_password,
nics,
overrides)
overrides,
slave_of_id)
return SimpleInstance(context, db_info, datastore_status,
root_password)

View File

@ -207,15 +207,9 @@ class InstanceController(wsgi.Controller):
else:
backup_id = None
if 'availability_zone' in body['instance']:
availability_zone = body['instance']['availability_zone']
else:
availability_zone = None
if 'nics' in body['instance']:
nics = body['instance']['nics']
else:
nics = None
availability_zone = body['instance'].get('availability_zone')
nics = body['instance'].get('nics')
slave_of_id = body['instance'].get('slave_of')
if 'slave_of' in body['instance']:
slave_of_id = body['instance']['slave_of']

View File

@ -92,6 +92,9 @@ class InstanceTasks(object):
'Build error: Secgroup '
'or rule.',
is_error=True)
BUILDING_ERROR_SLAVE = InstanceTask(0x54, 'BUILDING',
'Build error: Replication slave.',
is_error=True)
# Dissuade further additions at run-time.
InstanceTask.__init__ = None

View File

@ -114,7 +114,7 @@ class API(proxy.RpcProxy):
image_id, databases, users, datastore_manager,
packages, volume_size, backup_id=None,
availability_zone=None, root_password=None,
nics=None, overrides=None):
nics=None, overrides=None, slave_of_id=None):
LOG.debug("Making async call to create instance %s " % instance_id)
self.cast(self.context,
self.make_msg("create_instance",
@ -131,7 +131,8 @@ class API(proxy.RpcProxy):
availability_zone=availability_zone,
root_password=root_password,
nics=nics,
overrides=overrides))
overrides=overrides,
slave_of_id=slave_of_id))
def update_overrides(self, instance_id, overrides=None):
LOG.debug("Making async call to update datastore configurations for "

View File

@ -81,13 +81,24 @@ class Manager(periodic_task.PeriodicTasks):
def create_instance(self, context, instance_id, name, flavor,
image_id, databases, users, datastore_manager,
packages, volume_size, backup_id, availability_zone,
root_password, nics, overrides):
root_password, nics, overrides, slave_of_id):
instance_tasks = FreshInstanceTasks.load(context, instance_id)
if slave_of_id:
# We are creating a slave of an existing instance: get a snapshot
# of the master so we can restore the data on the slave.
snapshot = instance_tasks.get_replication_master_snapshot(
context, slave_of_id, backup_id)
backup_id = snapshot['dataset']['snapshot_id']
instance_tasks.create_instance(flavor, image_id, databases, users,
datastore_manager, packages,
volume_size, backup_id,
availability_zone, root_password, nics,
overrides)
availability_zone, root_password,
nics, overrides)
if slave_of_id:
# Enable replication on the newly created slave instance
instance_tasks.attach_replication_slave(snapshot)
def update_overrides(self, context, instance_id, overrides):
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)

View File

@ -266,11 +266,31 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
"Timeout waiting for instance to become active. "
"No usage create-event was sent.") % self.id)
self.update_statuses_on_time_out()
except Exception:
LOG.exception(_("Failed to send usage create-event for "
"instance %s.") % self.id)
def attach_replication_slave(self, snapshot, slave_config=None):
LOG.debug("Calling attach_replication_slave for %s.", self.id)
try:
self.guest.attach_replication_slave(snapshot, slave_config)
except GuestError as e:
msg = (_("Error attaching instance %s "
"as replication slave.") % self.id)
err = inst_models.InstanceTasks.BUILDING_ERROR_SLAVE
self._log_and_raise(e, msg, err)
def get_replication_master_snapshot(self, context, slave_of_id, backup_id):
try:
master_tasks = BuiltInstanceTasks.load(context, slave_of_id)
snapshot = master_tasks.get_replication_snapshot(backup_id)
return snapshot
except TroveError as e:
msg = (_("Error getting snapshot from "
"replication master %s.") % slave_of_id)
err = inst_models.InstanceTasks.BUILDING_ERROR_SLAVE
self._log_and_raise(e, msg, err)
def report_root_enabled(self):
mysql_models.RootHistory.create(self.context, self.id, 'root')
@ -812,6 +832,18 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
LOG.info(_("Initiating backup for instance %s.") % self.id)
self.guest.create_backup(backup_info)
def get_replication_snapshot(self, backup_id):
master_config = {'snapshot_id': backup_id or utils.generate_uuid()}
LOG.debug("Calling get_replication_snapshot on %s.", self.id)
try:
result = self.guest.get_replication_snapshot(master_config)
LOG.debug("Got replication snapshot from %s.", self.id)
return result
except (GuestError, GuestTimeout):
msg = _("Failed to get replication snapshot from %s.") % self.id
LOG.exception(msg)
raise TroveError(msg)
def reboot(self):
try:
LOG.debug("Stopping datastore on instance %s." % self.id)

View File

@ -0,0 +1,134 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from proboscis import test
from proboscis.asserts import assert_equal
from proboscis.asserts import assert_raises
from proboscis.decorators import time_out
from trove.common.utils import generate_uuid
from trove.common.utils import poll_until
from trove.tests.api.instances import instance_info
from trove.tests.api.instances import TIMEOUT_INSTANCE_CREATE
from trove.tests.api.instances import TIMEOUT_INSTANCE_DELETE
from trove.tests.api.instances import WaitForGuestInstallationToFinish
from trove.tests.config import CONFIG
from trove.tests.util.server_connection import create_server_connection
from troveclient.compat import exceptions
class SlaveInstanceTestInfo(object):
"""Stores slave instance information."""
def __init__(self):
self.id = None
self.replicated_db = generate_uuid()
GROUP = "dbaas.api.replication"
slave_instance = SlaveInstanceTestInfo()
@test(depends_on_classes=[WaitForGuestInstallationToFinish],
groups=[GROUP])
class CreateReplicationSlave(object):
@test
def test_create_slave(self):
result = instance_info.dbaas.instances.create(
instance_info.name + "_slave",
instance_info.dbaas_flavor_href,
instance_info.volume,
slave_of=instance_info.id)
assert_equal(200, instance_info.dbaas.last_http_code)
assert_equal("BUILD", result.status)
slave_instance.id = result.id
@test(groups=[GROUP])
class WaitForCreateSlaveToFinish(object):
"""Wait until the instance is created and set up as slave."""
@test(depends_on=[CreateReplicationSlave.test_create_slave])
@time_out(TIMEOUT_INSTANCE_CREATE)
def test_slave_created(self):
def result_is_active():
instance = instance_info.dbaas.instances.get(slave_instance.id)
if instance.status == "ACTIVE":
return True
else:
# If its not ACTIVE, anything but BUILD must be
# an error.
assert_equal("BUILD", instance.status)
if instance_info.volume is not None:
assert_equal(instance.volume.get('used', None), None)
return False
poll_until(result_is_active)
@test(enabled=(not CONFIG.fake_mode),
depends_on=[WaitForCreateSlaveToFinish],
groups=[GROUP])
class VerifySlave(object):
@test
@time_out(5 * 60)
def test_correctly_started_replication(self):
def slave_is_running():
server = create_server_connection(slave_instance.id)
cmd = ("mysqladmin extended-status "
"| awk '/Slave_running/{print $4}'")
stdout, stderr = server.execute(cmd)
return stdout == "ON\n"
poll_until(slave_is_running)
@test(depends_on=[test_correctly_started_replication])
def test_create_db_on_master(self):
databases = [{'name': slave_instance.replicated_db}]
instance_info.dbaas.databases.create(instance_info.id, databases)
assert_equal(202, instance_info.dbaas.last_http_code)
@test(depends_on=[test_create_db_on_master])
@time_out(5 * 60)
def test_database_replicated_on_slave(self):
def db_is_found():
databases = instance_info.dbaas.databases.list(slave_instance.id)
return (slave_instance.replicated_db
in [d.name for d in databases])
poll_until(db_is_found)
@test(groups=[GROUP],
depends_on=[WaitForCreateSlaveToFinish],
runs_after=[VerifySlave])
class DeleteSlaveInstance(object):
@test
@time_out(TIMEOUT_INSTANCE_DELETE)
def test_delete_slave_instance(self):
instance_info.dbaas.instances.delete(slave_instance.id)
assert_equal(202, instance_info.dbaas.last_http_code)
def instance_is_gone():
try:
instance_info.dbaas.instances.get(slave_instance.id)
return False
except exceptions.NotFound:
return True
poll_until(instance_is_gone)
assert_raises(exceptions.NotFound, instance_info.dbaas.instances.get,
slave_instance.id)

View File

@ -323,6 +323,23 @@ class FakeGuest(object):
def apply_overrides(self, overrides):
self.overrides = overrides
def get_replication_snapshot(self, master_config):
return {
'dataset':
{
'datastore_manager': 'mysql',
'dataset_size': '0.0',
'volume_size': '10.0',
'snapshot_id': None
},
'replication_strategy': 'replication_strategy',
'master': '1',
'log_position': '100'
}
def attach_replication_slave(self, snapshot, slave_config):
pass
def get_or_create(id):
if id not in DB:

View File

@ -26,6 +26,7 @@ from trove.tests.api.mgmt import admin_required
from trove.tests.api.mgmt import hosts
from trove.tests.api.mgmt import instances as mgmt_instances
from trove.tests.api.mgmt import storage
from trove.tests.api import replication
from trove.tests.api import root
from trove.tests.api import user_access
from trove.tests.api import users
@ -46,6 +47,7 @@ black_box_groups = [
instances.GROUP_QUOTAS,
instances.GROUP_SECURITY_GROUPS,
backups.GROUP,
replication.GROUP,
configurations.GROUP,
datastores.GROUP,
instances_actions.GROUP_RESIZE,