Merge "Use unique passwords for replication user"
This commit is contained in:
commit
d6ab085cf5
|
@ -396,10 +396,6 @@ mysql_opts = [
|
|||
cfg.StrOpt('replication_namespace',
|
||||
default='trove.guestagent.strategies.replication.mysql_binlog',
|
||||
help='Namespace to load replication strategies from.'),
|
||||
cfg.StrOpt('replication_user', default='slave_user',
|
||||
help='Userid for replication slave.', secret=True),
|
||||
cfg.StrOpt('replication_password', default='NETOU7897NNLOU',
|
||||
help='Password for replication slave user.', secret=True),
|
||||
cfg.StrOpt('mount_point', default='/var/lib/mysql',
|
||||
help="Filesystem path for mounting "
|
||||
"volumes if volume support is enabled."),
|
||||
|
|
|
@ -340,7 +340,12 @@ class API(proxy.RpcProxy):
|
|||
|
||||
def detach_replica(self):
|
||||
LOG.debug("Detaching replica %s from its replication source.", self.id)
|
||||
self._call("detach_replica", AGENT_HIGH_TIMEOUT)
|
||||
return self._call("detach_replica", AGENT_HIGH_TIMEOUT)
|
||||
|
||||
def cleanup_source_on_replica_detach(self, replica_info):
|
||||
LOG.debug("Cleaning up master %s on detach of replica.", self.id)
|
||||
self._call("cleanup_source_on_replica_detach", AGENT_HIGH_TIMEOUT,
|
||||
replica_info=replica_info)
|
||||
|
||||
def demote_replication_master(self):
|
||||
LOG.debug("Demoting instance %s to non-master.", self.id)
|
||||
|
|
|
@ -276,7 +276,14 @@ class Manager(periodic_task.PeriodicTasks):
|
|||
LOG.debug("Detaching replica.")
|
||||
app = MySqlApp(MySqlAppStatus.get())
|
||||
replication = REPLICATION_STRATEGY_CLASS(context)
|
||||
replication.detach_slave(app)
|
||||
replica_info = replication.detach_slave(app)
|
||||
return replica_info
|
||||
|
||||
def cleanup_source_on_replica_detach(self, context, replica_info):
|
||||
LOG.debug("Cleaning up the source on the detach of a replica.")
|
||||
replication = REPLICATION_STRATEGY_CLASS(context)
|
||||
replication.cleanup_source_on_replica_detach(MySqlAdmin(),
|
||||
replica_info)
|
||||
|
||||
def demote_replication_master(self, context):
|
||||
LOG.debug("Demoting replication master.")
|
||||
|
|
|
@ -51,8 +51,6 @@ MYSQL_BASE_DIR = "/var/lib/mysql"
|
|||
|
||||
CONF = cfg.CONF
|
||||
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql'
|
||||
REPLICATION_USER = CONF.get(MANAGER).replication_user
|
||||
REPLICATION_PASSWORD = CONF.get(MANAGER).replication_password
|
||||
|
||||
INCLUDE_MARKER_OPERATORS = {
|
||||
True: ">=",
|
||||
|
@ -341,11 +339,15 @@ class MySqlAdmin(object):
|
|||
|
||||
def delete_user(self, user):
|
||||
"""Delete the specified user."""
|
||||
mysql_user = models.MySQLUser()
|
||||
mysql_user.deserialize(user)
|
||||
self.delete_user_by_name(mysql_user.name, mysql_user.host)
|
||||
|
||||
def delete_user_by_name(self, name, host='%'):
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
mysql_user = models.MySQLUser()
|
||||
mysql_user.deserialize(user)
|
||||
du = sql_query.DropUser(mysql_user.name, host=mysql_user.host)
|
||||
du = sql_query.DropUser(name, host=host)
|
||||
t = text(str(du))
|
||||
LOG.debug("delete_user_by_name: %s", t)
|
||||
client.execute(t)
|
||||
|
||||
def get_user(self, username, hostname):
|
||||
|
@ -839,13 +841,13 @@ class MySqlApp(object):
|
|||
if os.path.exists(MYCNF_REPLMASTER):
|
||||
utils.execute_with_timeout("sudo", "rm", MYCNF_REPLMASTER)
|
||||
|
||||
def grant_replication_privilege(self):
|
||||
def grant_replication_privilege(self, replication_user):
|
||||
LOG.info(_("Granting Replication Slave privilege."))
|
||||
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
g = sql_query.Grant(permissions=['REPLICATION SLAVE'],
|
||||
user=REPLICATION_USER,
|
||||
clear=REPLICATION_PASSWORD)
|
||||
user=replication_user['name'],
|
||||
clear=replication_user['password'])
|
||||
|
||||
t = text(str(g))
|
||||
client.execute(t)
|
||||
|
@ -854,11 +856,13 @@ class MySqlApp(object):
|
|||
LOG.info(_("Revoking Replication Slave privilege."))
|
||||
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
g = sql_query.Revoke(permissions=['REPLICATION SLAVE'],
|
||||
user=REPLICATION_USER,
|
||||
clear=REPLICATION_PASSWORD)
|
||||
results = client.execute('SHOW SLAVE STATUS').fetchall()
|
||||
slave_status_info = results[0]
|
||||
|
||||
t = text(str(g))
|
||||
r = sql_query.Revoke(permissions=['REPLICATION SLAVE'],
|
||||
user=slave_status_info['master_user'])
|
||||
|
||||
t = text(str(r))
|
||||
client.execute(t)
|
||||
|
||||
def get_port(self):
|
||||
|
@ -875,9 +879,10 @@ class MySqlApp(object):
|
|||
}
|
||||
return binlog_position
|
||||
|
||||
def change_master_for_binlog(self, host, port, log_position):
|
||||
def change_master_for_binlog(self, host, port, logging_config):
|
||||
LOG.info(_("Configuring replication from %s.") % host)
|
||||
|
||||
replication_user = logging_config['replication_user']
|
||||
change_master_cmd = ("CHANGE MASTER TO MASTER_HOST='%(host)s', "
|
||||
"MASTER_PORT=%(port)s, "
|
||||
"MASTER_USER='%(user)s', "
|
||||
|
@ -887,10 +892,10 @@ class MySqlApp(object):
|
|||
{
|
||||
'host': host,
|
||||
'port': port,
|
||||
'user': REPLICATION_USER,
|
||||
'password': REPLICATION_PASSWORD,
|
||||
'log_file': log_position['log_file'],
|
||||
'log_pos': log_position['log_position']
|
||||
'user': replication_user['name'],
|
||||
'password': replication_user['password'],
|
||||
'log_file': logging_config['log_file'],
|
||||
'log_pos': logging_config['log_position']
|
||||
})
|
||||
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
|
@ -903,11 +908,18 @@ class MySqlApp(object):
|
|||
self._wait_for_slave_status("ON", client, 60)
|
||||
|
||||
def stop_slave(self):
|
||||
replication_user = None
|
||||
LOG.info(_("Stopping slave replication."))
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
result = client.execute('SHOW SLAVE STATUS')
|
||||
replication_user = result.first()['Master_User']
|
||||
client.execute('STOP SLAVE')
|
||||
client.execute('RESET SLAVE ALL')
|
||||
self._wait_for_slave_status("OFF", client, 30)
|
||||
client.execute('DROP USER ' + replication_user)
|
||||
return {
|
||||
'replication_user': replication_user
|
||||
}
|
||||
|
||||
def _wait_for_slave_status(self, status, client, max_time):
|
||||
|
||||
|
|
|
@ -52,6 +52,10 @@ class Replication(Strategy):
|
|||
def detach_slave(self, service):
|
||||
"""Turn off replication on a slave site."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def cleanup_source_on_replica_detach(self, service, replica_info):
|
||||
"""Clean up the source on the detach of a replica."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def demote_master(self, service):
|
||||
"""Turn off replication on a master site."""
|
||||
|
|
|
@ -15,12 +15,15 @@
|
|||
#
|
||||
|
||||
import csv
|
||||
import uuid
|
||||
|
||||
from trove.common import cfg
|
||||
from trove.common import exception
|
||||
from trove.common import utils
|
||||
from trove.guestagent.backup.backupagent import BackupAgent
|
||||
from trove.guestagent.common import operating_system
|
||||
from trove.guestagent.datastore.mysql.service import MySqlAdmin
|
||||
from trove.guestagent.db import models
|
||||
from trove.guestagent.strategies import backup
|
||||
from trove.guestagent.strategies.replication import base
|
||||
from trove.guestagent.strategies.storage import get_storage_strategy
|
||||
|
@ -65,6 +68,33 @@ class MysqlBinlogReplication(base.Replication):
|
|||
}
|
||||
return master_ref
|
||||
|
||||
def _create_replication_user(self):
|
||||
replication_user = None
|
||||
replication_password = utils.generate_random_password(16)
|
||||
|
||||
mysql_user = models.MySQLUser()
|
||||
mysql_user.password = replication_password
|
||||
|
||||
retry_count = 0
|
||||
|
||||
while replication_user is None:
|
||||
try:
|
||||
mysql_user.name = 'slave_' + str(uuid.uuid4())[:8]
|
||||
MySqlAdmin().create_user([mysql_user.serialize()])
|
||||
LOG.debug("Trying to create replication user " +
|
||||
mysql_user.name)
|
||||
replication_user = {
|
||||
'name': mysql_user.name,
|
||||
'password': replication_password
|
||||
}
|
||||
except Exception:
|
||||
retry_count += 1
|
||||
if retry_count > 5:
|
||||
LOG.error(_("Replication user retry count exceeded"))
|
||||
raise
|
||||
|
||||
return replication_user
|
||||
|
||||
def snapshot_for_replication(self, context, service,
|
||||
location, snapshot_info):
|
||||
snapshot_id = snapshot_info['id']
|
||||
|
@ -76,9 +106,14 @@ class MysqlBinlogReplication(base.Replication):
|
|||
AGENT.stream_backup_to_storage(snapshot_info, REPL_BACKUP_RUNNER,
|
||||
storage, {}, REPL_EXTRA_OPTS)
|
||||
|
||||
replication_user = self._create_replication_user()
|
||||
service.grant_replication_privilege(replication_user)
|
||||
|
||||
# With streamed InnobackupEx, the log position is in
|
||||
# the stream and will be decoded by the slave
|
||||
log_position = {}
|
||||
log_position = {
|
||||
'replication_user': replication_user
|
||||
}
|
||||
return snapshot_id, log_position
|
||||
|
||||
def enable_as_master(self, service, snapshot_info, master_config):
|
||||
|
@ -86,23 +121,28 @@ class MysqlBinlogReplication(base.Replication):
|
|||
master_config = MASTER_CONFIG
|
||||
service.write_replication_overrides(master_config)
|
||||
service.restart()
|
||||
service.grant_replication_privilege()
|
||||
|
||||
def enable_as_slave(self, service, snapshot, slave_config):
|
||||
if not slave_config:
|
||||
slave_config = SLAVE_CONFIG
|
||||
service.write_replication_overrides(slave_config)
|
||||
service.restart()
|
||||
logging_config = snapshot['log_position']
|
||||
logging_config.update(self._read_log_position())
|
||||
service.change_master_for_binlog(
|
||||
snapshot['master']['host'],
|
||||
snapshot['master']['port'],
|
||||
self._read_log_position())
|
||||
logging_config)
|
||||
service.start_slave()
|
||||
|
||||
def detach_slave(self, service):
|
||||
service.stop_slave()
|
||||
replica_info = service.stop_slave()
|
||||
service.remove_replication_overrides()
|
||||
service.restart()
|
||||
return replica_info
|
||||
|
||||
def cleanup_source_on_replica_detach(self, admin_service, replica_info):
|
||||
admin_service.delete_user_by_name(replica_info['replication_user'])
|
||||
|
||||
def demote_master(self, service):
|
||||
service.revoke_replication_privilege()
|
||||
|
|
|
@ -60,8 +60,10 @@ class Manager(periodic_task.PeriodicTasks):
|
|||
instance_tasks.restart()
|
||||
|
||||
def detach_replica(self, context, instance_id):
|
||||
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
|
||||
instance_tasks.detach_replica()
|
||||
slave = models.BuiltInstanceTasks.load(context, instance_id)
|
||||
master_id = slave.slave_of_id
|
||||
master = models.BuiltInstanceTasks.load(context, master_id)
|
||||
slave.detach_replica(master)
|
||||
|
||||
def migrate(self, context, instance_id, host):
|
||||
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
|
||||
|
|
|
@ -945,14 +945,19 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
|
|||
return run_with_quotas(self.context.tenant, {'backups': 1},
|
||||
_get_replication_snapshot)
|
||||
|
||||
def detach_replica(self):
|
||||
def detach_replica(self, master):
|
||||
LOG.debug("Calling detach_replica on %s" % self.id)
|
||||
try:
|
||||
self.guest.detach_replica()
|
||||
replica_info = self.guest.detach_replica()
|
||||
master.cleanup_source_on_replica_detach(replica_info)
|
||||
self.update_db(slave_of_id=None)
|
||||
except (GuestError, GuestTimeout):
|
||||
LOG.exception(_("Failed to detach replica %s.") % self.id)
|
||||
|
||||
def cleanup_source_on_replica_detach(self, replica_info):
|
||||
LOG.debug("Calling cleanup_source_on_replica_detach on %s" % self.id)
|
||||
self.guest.cleanup_source_on_replica_detach(replica_info)
|
||||
|
||||
def reboot(self):
|
||||
try:
|
||||
LOG.debug("Stopping datastore on instance %s." % self.id)
|
||||
|
|
|
@ -42,6 +42,14 @@ slave_instance = SlaveInstanceTestInfo()
|
|||
existing_db_on_master = generate_uuid()
|
||||
|
||||
|
||||
def _get_user_count(server_info):
|
||||
cmd = ('mysql -BNq -e \\\'select count\\(*\\) from mysql.user'
|
||||
' where user like \\\"slave_%\\\"\\\'')
|
||||
server = create_server_connection(server_info.id)
|
||||
stdout, stderr = server.execute(cmd)
|
||||
return int(stdout.rstrip())
|
||||
|
||||
|
||||
def slave_is_running(running=True):
|
||||
|
||||
def check_slave_is_running():
|
||||
|
@ -141,6 +149,11 @@ class VerifySlave(object):
|
|||
def test_existing_db_exists_on_slave(self):
|
||||
poll_until(self.db_is_found(existing_db_on_master))
|
||||
|
||||
@test(depends_on=[test_existing_db_exists_on_slave])
|
||||
def test_slave_user_exists(self):
|
||||
assert_equal(_get_user_count(slave_instance), 1)
|
||||
assert_equal(_get_user_count(instance_info), 1)
|
||||
|
||||
|
||||
@test(groups=[GROUP],
|
||||
depends_on=[WaitForCreateSlaveToFinish],
|
||||
|
@ -200,6 +213,16 @@ class DetachReplica(object):
|
|||
stdout, stderr = server.execute(cmd)
|
||||
assert_equal(stdout, "0\n")
|
||||
|
||||
@test(depends_on=[test_detach_replica])
|
||||
def test_slave_user_removed(self):
|
||||
if CONFIG.fake_mode:
|
||||
raise SkipTest("Test not_read_only not supported in fake mode")
|
||||
|
||||
def _slave_user_deleted():
|
||||
return _get_user_count(instance_info) == 0
|
||||
|
||||
poll_until(_slave_user_deleted)
|
||||
|
||||
|
||||
@test(groups=[GROUP],
|
||||
depends_on=[WaitForCreateSlaveToFinish],
|
||||
|
|
|
@ -293,7 +293,7 @@ class MySqlAdminTest(testtools.TestCase):
|
|||
|
||||
def test_delete_user(self):
|
||||
|
||||
user = {"_name": "testUser"}
|
||||
user = {"_name": "testUser", "_host": None}
|
||||
|
||||
self.mySqlAdmin.delete_user(user)
|
||||
|
||||
|
@ -301,7 +301,7 @@ class MySqlAdminTest(testtools.TestCase):
|
|||
call_args = dbaas.LocalSqlClient.execute.call_args
|
||||
if call_args is not None:
|
||||
args, _ = call_args
|
||||
expected = "DROP USER `testUser`;"
|
||||
expected = "DROP USER `testUser`@`%`;"
|
||||
self.assertEqual(args[0].text, expected,
|
||||
"Delete user queries are not the same")
|
||||
|
||||
|
|
Loading…
Reference in New Issue