diff --git a/trove/common/cfg.py b/trove/common/cfg.py index 69e34d5d00..bbd4e5f048 100644 --- a/trove/common/cfg.py +++ b/trove/common/cfg.py @@ -396,10 +396,6 @@ mysql_opts = [ cfg.StrOpt('replication_namespace', default='trove.guestagent.strategies.replication.mysql_binlog', help='Namespace to load replication strategies from.'), - cfg.StrOpt('replication_user', default='slave_user', - help='Userid for replication slave.', secret=True), - cfg.StrOpt('replication_password', default='NETOU7897NNLOU', - help='Password for replication slave user.', secret=True), cfg.StrOpt('mount_point', default='/var/lib/mysql', help="Filesystem path for mounting " "volumes if volume support is enabled."), diff --git a/trove/guestagent/api.py b/trove/guestagent/api.py index bf498c592f..12f594979f 100644 --- a/trove/guestagent/api.py +++ b/trove/guestagent/api.py @@ -340,7 +340,12 @@ class API(proxy.RpcProxy): def detach_replica(self): LOG.debug("Detaching replica %s from its replication source.", self.id) - self._call("detach_replica", AGENT_HIGH_TIMEOUT) + return self._call("detach_replica", AGENT_HIGH_TIMEOUT) + + def cleanup_source_on_replica_detach(self, replica_info): + LOG.debug("Cleaning up master %s on detach of replica.", self.id) + self._call("cleanup_source_on_replica_detach", AGENT_HIGH_TIMEOUT, + replica_info=replica_info) def demote_replication_master(self): LOG.debug("Demoting instance %s to non-master.", self.id) diff --git a/trove/guestagent/datastore/mysql/manager.py b/trove/guestagent/datastore/mysql/manager.py index 2e848d71e1..5947cab44b 100644 --- a/trove/guestagent/datastore/mysql/manager.py +++ b/trove/guestagent/datastore/mysql/manager.py @@ -276,7 +276,14 @@ class Manager(periodic_task.PeriodicTasks): LOG.debug("Detaching replica.") app = MySqlApp(MySqlAppStatus.get()) replication = REPLICATION_STRATEGY_CLASS(context) - replication.detach_slave(app) + replica_info = replication.detach_slave(app) + return replica_info + + def cleanup_source_on_replica_detach(self, context, replica_info): + LOG.debug("Cleaning up the source on the detach of a replica.") + replication = REPLICATION_STRATEGY_CLASS(context) + replication.cleanup_source_on_replica_detach(MySqlAdmin(), + replica_info) def demote_replication_master(self, context): LOG.debug("Demoting replication master.") diff --git a/trove/guestagent/datastore/mysql/service.py b/trove/guestagent/datastore/mysql/service.py index 2461893340..80f044b6ce 100644 --- a/trove/guestagent/datastore/mysql/service.py +++ b/trove/guestagent/datastore/mysql/service.py @@ -51,8 +51,6 @@ MYSQL_BASE_DIR = "/var/lib/mysql" CONF = cfg.CONF MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql' -REPLICATION_USER = CONF.get(MANAGER).replication_user -REPLICATION_PASSWORD = CONF.get(MANAGER).replication_password INCLUDE_MARKER_OPERATORS = { True: ">=", @@ -341,11 +339,15 @@ class MySqlAdmin(object): def delete_user(self, user): """Delete the specified user.""" + mysql_user = models.MySQLUser() + mysql_user.deserialize(user) + self.delete_user_by_name(mysql_user.name, mysql_user.host) + + def delete_user_by_name(self, name, host='%'): with LocalSqlClient(get_engine()) as client: - mysql_user = models.MySQLUser() - mysql_user.deserialize(user) - du = sql_query.DropUser(mysql_user.name, host=mysql_user.host) + du = sql_query.DropUser(name, host=host) t = text(str(du)) + LOG.debug("delete_user_by_name: %s", t) client.execute(t) def get_user(self, username, hostname): @@ -839,13 +841,13 @@ class MySqlApp(object): if os.path.exists(MYCNF_REPLMASTER): utils.execute_with_timeout("sudo", "rm", MYCNF_REPLMASTER) - def grant_replication_privilege(self): + def grant_replication_privilege(self, replication_user): LOG.info(_("Granting Replication Slave privilege.")) with LocalSqlClient(get_engine()) as client: g = sql_query.Grant(permissions=['REPLICATION SLAVE'], - user=REPLICATION_USER, - clear=REPLICATION_PASSWORD) + user=replication_user['name'], + clear=replication_user['password']) t = text(str(g)) client.execute(t) @@ -854,11 +856,13 @@ class MySqlApp(object): LOG.info(_("Revoking Replication Slave privilege.")) with LocalSqlClient(get_engine()) as client: - g = sql_query.Revoke(permissions=['REPLICATION SLAVE'], - user=REPLICATION_USER, - clear=REPLICATION_PASSWORD) + results = client.execute('SHOW SLAVE STATUS').fetchall() + slave_status_info = results[0] - t = text(str(g)) + r = sql_query.Revoke(permissions=['REPLICATION SLAVE'], + user=slave_status_info['master_user']) + + t = text(str(r)) client.execute(t) def get_port(self): @@ -875,9 +879,10 @@ class MySqlApp(object): } return binlog_position - def change_master_for_binlog(self, host, port, log_position): + def change_master_for_binlog(self, host, port, logging_config): LOG.info(_("Configuring replication from %s.") % host) + replication_user = logging_config['replication_user'] change_master_cmd = ("CHANGE MASTER TO MASTER_HOST='%(host)s', " "MASTER_PORT=%(port)s, " "MASTER_USER='%(user)s', " @@ -887,10 +892,10 @@ class MySqlApp(object): { 'host': host, 'port': port, - 'user': REPLICATION_USER, - 'password': REPLICATION_PASSWORD, - 'log_file': log_position['log_file'], - 'log_pos': log_position['log_position'] + 'user': replication_user['name'], + 'password': replication_user['password'], + 'log_file': logging_config['log_file'], + 'log_pos': logging_config['log_position'] }) with LocalSqlClient(get_engine()) as client: @@ -903,11 +908,18 @@ class MySqlApp(object): self._wait_for_slave_status("ON", client, 60) def stop_slave(self): + replication_user = None LOG.info(_("Stopping slave replication.")) with LocalSqlClient(get_engine()) as client: + result = client.execute('SHOW SLAVE STATUS') + replication_user = result.first()['Master_User'] client.execute('STOP SLAVE') client.execute('RESET SLAVE ALL') self._wait_for_slave_status("OFF", client, 30) + client.execute('DROP USER ' + replication_user) + return { + 'replication_user': replication_user + } def _wait_for_slave_status(self, status, client, max_time): diff --git a/trove/guestagent/strategies/replication/base.py b/trove/guestagent/strategies/replication/base.py index 2d4dc1c3a7..0736e2e88d 100644 --- a/trove/guestagent/strategies/replication/base.py +++ b/trove/guestagent/strategies/replication/base.py @@ -52,6 +52,10 @@ class Replication(Strategy): def detach_slave(self, service): """Turn off replication on a slave site.""" + @abc.abstractmethod + def cleanup_source_on_replica_detach(self, service, replica_info): + """Clean up the source on the detach of a replica.""" + @abc.abstractmethod def demote_master(self, service): """Turn off replication on a master site.""" diff --git a/trove/guestagent/strategies/replication/mysql_binlog.py b/trove/guestagent/strategies/replication/mysql_binlog.py index 15508de671..bbe81aa676 100644 --- a/trove/guestagent/strategies/replication/mysql_binlog.py +++ b/trove/guestagent/strategies/replication/mysql_binlog.py @@ -15,12 +15,15 @@ # import csv +import uuid from trove.common import cfg from trove.common import exception from trove.common import utils from trove.guestagent.backup.backupagent import BackupAgent from trove.guestagent.common import operating_system +from trove.guestagent.datastore.mysql.service import MySqlAdmin +from trove.guestagent.db import models from trove.guestagent.strategies import backup from trove.guestagent.strategies.replication import base from trove.guestagent.strategies.storage import get_storage_strategy @@ -65,6 +68,33 @@ class MysqlBinlogReplication(base.Replication): } return master_ref + def _create_replication_user(self): + replication_user = None + replication_password = utils.generate_random_password(16) + + mysql_user = models.MySQLUser() + mysql_user.password = replication_password + + retry_count = 0 + + while replication_user is None: + try: + mysql_user.name = 'slave_' + str(uuid.uuid4())[:8] + MySqlAdmin().create_user([mysql_user.serialize()]) + LOG.debug("Trying to create replication user " + + mysql_user.name) + replication_user = { + 'name': mysql_user.name, + 'password': replication_password + } + except Exception: + retry_count += 1 + if retry_count > 5: + LOG.error(_("Replication user retry count exceeded")) + raise + + return replication_user + def snapshot_for_replication(self, context, service, location, snapshot_info): snapshot_id = snapshot_info['id'] @@ -76,9 +106,14 @@ class MysqlBinlogReplication(base.Replication): AGENT.stream_backup_to_storage(snapshot_info, REPL_BACKUP_RUNNER, storage, {}, REPL_EXTRA_OPTS) + replication_user = self._create_replication_user() + service.grant_replication_privilege(replication_user) + # With streamed InnobackupEx, the log position is in # the stream and will be decoded by the slave - log_position = {} + log_position = { + 'replication_user': replication_user + } return snapshot_id, log_position def enable_as_master(self, service, snapshot_info, master_config): @@ -86,23 +121,28 @@ class MysqlBinlogReplication(base.Replication): master_config = MASTER_CONFIG service.write_replication_overrides(master_config) service.restart() - service.grant_replication_privilege() def enable_as_slave(self, service, snapshot, slave_config): if not slave_config: slave_config = SLAVE_CONFIG service.write_replication_overrides(slave_config) service.restart() + logging_config = snapshot['log_position'] + logging_config.update(self._read_log_position()) service.change_master_for_binlog( snapshot['master']['host'], snapshot['master']['port'], - self._read_log_position()) + logging_config) service.start_slave() def detach_slave(self, service): - service.stop_slave() + replica_info = service.stop_slave() service.remove_replication_overrides() service.restart() + return replica_info + + def cleanup_source_on_replica_detach(self, admin_service, replica_info): + admin_service.delete_user_by_name(replica_info['replication_user']) def demote_master(self, service): service.revoke_replication_privilege() diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py index 1ea7b1e95c..5c119c9b95 100644 --- a/trove/taskmanager/manager.py +++ b/trove/taskmanager/manager.py @@ -60,8 +60,10 @@ class Manager(periodic_task.PeriodicTasks): instance_tasks.restart() def detach_replica(self, context, instance_id): - instance_tasks = models.BuiltInstanceTasks.load(context, instance_id) - instance_tasks.detach_replica() + slave = models.BuiltInstanceTasks.load(context, instance_id) + master_id = slave.slave_of_id + master = models.BuiltInstanceTasks.load(context, master_id) + slave.detach_replica(master) def migrate(self, context, instance_id, host): instance_tasks = models.BuiltInstanceTasks.load(context, instance_id) diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py index f1bce8a805..2415eb0d19 100755 --- a/trove/taskmanager/models.py +++ b/trove/taskmanager/models.py @@ -945,14 +945,19 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin): return run_with_quotas(self.context.tenant, {'backups': 1}, _get_replication_snapshot) - def detach_replica(self): + def detach_replica(self, master): LOG.debug("Calling detach_replica on %s" % self.id) try: - self.guest.detach_replica() + replica_info = self.guest.detach_replica() + master.cleanup_source_on_replica_detach(replica_info) self.update_db(slave_of_id=None) except (GuestError, GuestTimeout): LOG.exception(_("Failed to detach replica %s.") % self.id) + def cleanup_source_on_replica_detach(self, replica_info): + LOG.debug("Calling cleanup_source_on_replica_detach on %s" % self.id) + self.guest.cleanup_source_on_replica_detach(replica_info) + def reboot(self): try: LOG.debug("Stopping datastore on instance %s." % self.id) diff --git a/trove/tests/api/replication.py b/trove/tests/api/replication.py index da5165dbac..bdb056b1c5 100644 --- a/trove/tests/api/replication.py +++ b/trove/tests/api/replication.py @@ -42,6 +42,14 @@ slave_instance = SlaveInstanceTestInfo() existing_db_on_master = generate_uuid() +def _get_user_count(server_info): + cmd = ('mysql -BNq -e \\\'select count\\(*\\) from mysql.user' + ' where user like \\\"slave_%\\\"\\\'') + server = create_server_connection(server_info.id) + stdout, stderr = server.execute(cmd) + return int(stdout.rstrip()) + + def slave_is_running(running=True): def check_slave_is_running(): @@ -141,6 +149,11 @@ class VerifySlave(object): def test_existing_db_exists_on_slave(self): poll_until(self.db_is_found(existing_db_on_master)) + @test(depends_on=[test_existing_db_exists_on_slave]) + def test_slave_user_exists(self): + assert_equal(_get_user_count(slave_instance), 1) + assert_equal(_get_user_count(instance_info), 1) + @test(groups=[GROUP], depends_on=[WaitForCreateSlaveToFinish], @@ -200,6 +213,16 @@ class DetachReplica(object): stdout, stderr = server.execute(cmd) assert_equal(stdout, "0\n") + @test(depends_on=[test_detach_replica]) + def test_slave_user_removed(self): + if CONFIG.fake_mode: + raise SkipTest("Test not_read_only not supported in fake mode") + + def _slave_user_deleted(): + return _get_user_count(instance_info) == 0 + + poll_until(_slave_user_deleted) + @test(groups=[GROUP], depends_on=[WaitForCreateSlaveToFinish], diff --git a/trove/tests/unittests/guestagent/test_dbaas.py b/trove/tests/unittests/guestagent/test_dbaas.py index d6177c970e..a966060633 100644 --- a/trove/tests/unittests/guestagent/test_dbaas.py +++ b/trove/tests/unittests/guestagent/test_dbaas.py @@ -293,7 +293,7 @@ class MySqlAdminTest(testtools.TestCase): def test_delete_user(self): - user = {"_name": "testUser"} + user = {"_name": "testUser", "_host": None} self.mySqlAdmin.delete_user(user) @@ -301,7 +301,7 @@ class MySqlAdminTest(testtools.TestCase): call_args = dbaas.LocalSqlClient.execute.call_args if call_args is not None: args, _ = call_args - expected = "DROP USER `testUser`;" + expected = "DROP USER `testUser`@`%`;" self.assertEqual(args[0].text, expected, "Delete user queries are not the same")