Merge "Use unique passwords for replication user"

This commit is contained in:
Jenkins 2014-10-02 10:01:50 +00:00 committed by Gerrit Code Review
commit d6ab085cf5
10 changed files with 127 additions and 33 deletions

View File

@ -396,10 +396,6 @@ mysql_opts = [
cfg.StrOpt('replication_namespace',
default='trove.guestagent.strategies.replication.mysql_binlog',
help='Namespace to load replication strategies from.'),
cfg.StrOpt('replication_user', default='slave_user',
help='Userid for replication slave.', secret=True),
cfg.StrOpt('replication_password', default='NETOU7897NNLOU',
help='Password for replication slave user.', secret=True),
cfg.StrOpt('mount_point', default='/var/lib/mysql',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),

View File

@ -340,7 +340,12 @@ class API(proxy.RpcProxy):
def detach_replica(self):
LOG.debug("Detaching replica %s from its replication source.", self.id)
self._call("detach_replica", AGENT_HIGH_TIMEOUT)
return self._call("detach_replica", AGENT_HIGH_TIMEOUT)
def cleanup_source_on_replica_detach(self, replica_info):
LOG.debug("Cleaning up master %s on detach of replica.", self.id)
self._call("cleanup_source_on_replica_detach", AGENT_HIGH_TIMEOUT,
replica_info=replica_info)
def demote_replication_master(self):
LOG.debug("Demoting instance %s to non-master.", self.id)

View File

@ -276,7 +276,14 @@ class Manager(periodic_task.PeriodicTasks):
LOG.debug("Detaching replica.")
app = MySqlApp(MySqlAppStatus.get())
replication = REPLICATION_STRATEGY_CLASS(context)
replication.detach_slave(app)
replica_info = replication.detach_slave(app)
return replica_info
def cleanup_source_on_replica_detach(self, context, replica_info):
LOG.debug("Cleaning up the source on the detach of a replica.")
replication = REPLICATION_STRATEGY_CLASS(context)
replication.cleanup_source_on_replica_detach(MySqlAdmin(),
replica_info)
def demote_replication_master(self, context):
LOG.debug("Demoting replication master.")

View File

@ -51,8 +51,6 @@ MYSQL_BASE_DIR = "/var/lib/mysql"
CONF = cfg.CONF
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql'
REPLICATION_USER = CONF.get(MANAGER).replication_user
REPLICATION_PASSWORD = CONF.get(MANAGER).replication_password
INCLUDE_MARKER_OPERATORS = {
True: ">=",
@ -341,11 +339,15 @@ class MySqlAdmin(object):
def delete_user(self, user):
"""Delete the specified user."""
mysql_user = models.MySQLUser()
mysql_user.deserialize(user)
self.delete_user_by_name(mysql_user.name, mysql_user.host)
def delete_user_by_name(self, name, host='%'):
with LocalSqlClient(get_engine()) as client:
mysql_user = models.MySQLUser()
mysql_user.deserialize(user)
du = sql_query.DropUser(mysql_user.name, host=mysql_user.host)
du = sql_query.DropUser(name, host=host)
t = text(str(du))
LOG.debug("delete_user_by_name: %s", t)
client.execute(t)
def get_user(self, username, hostname):
@ -839,13 +841,13 @@ class MySqlApp(object):
if os.path.exists(MYCNF_REPLMASTER):
utils.execute_with_timeout("sudo", "rm", MYCNF_REPLMASTER)
def grant_replication_privilege(self):
def grant_replication_privilege(self, replication_user):
LOG.info(_("Granting Replication Slave privilege."))
with LocalSqlClient(get_engine()) as client:
g = sql_query.Grant(permissions=['REPLICATION SLAVE'],
user=REPLICATION_USER,
clear=REPLICATION_PASSWORD)
user=replication_user['name'],
clear=replication_user['password'])
t = text(str(g))
client.execute(t)
@ -854,11 +856,13 @@ class MySqlApp(object):
LOG.info(_("Revoking Replication Slave privilege."))
with LocalSqlClient(get_engine()) as client:
g = sql_query.Revoke(permissions=['REPLICATION SLAVE'],
user=REPLICATION_USER,
clear=REPLICATION_PASSWORD)
results = client.execute('SHOW SLAVE STATUS').fetchall()
slave_status_info = results[0]
t = text(str(g))
r = sql_query.Revoke(permissions=['REPLICATION SLAVE'],
user=slave_status_info['master_user'])
t = text(str(r))
client.execute(t)
def get_port(self):
@ -875,9 +879,10 @@ class MySqlApp(object):
}
return binlog_position
def change_master_for_binlog(self, host, port, log_position):
def change_master_for_binlog(self, host, port, logging_config):
LOG.info(_("Configuring replication from %s.") % host)
replication_user = logging_config['replication_user']
change_master_cmd = ("CHANGE MASTER TO MASTER_HOST='%(host)s', "
"MASTER_PORT=%(port)s, "
"MASTER_USER='%(user)s', "
@ -887,10 +892,10 @@ class MySqlApp(object):
{
'host': host,
'port': port,
'user': REPLICATION_USER,
'password': REPLICATION_PASSWORD,
'log_file': log_position['log_file'],
'log_pos': log_position['log_position']
'user': replication_user['name'],
'password': replication_user['password'],
'log_file': logging_config['log_file'],
'log_pos': logging_config['log_position']
})
with LocalSqlClient(get_engine()) as client:
@ -903,11 +908,18 @@ class MySqlApp(object):
self._wait_for_slave_status("ON", client, 60)
def stop_slave(self):
replication_user = None
LOG.info(_("Stopping slave replication."))
with LocalSqlClient(get_engine()) as client:
result = client.execute('SHOW SLAVE STATUS')
replication_user = result.first()['Master_User']
client.execute('STOP SLAVE')
client.execute('RESET SLAVE ALL')
self._wait_for_slave_status("OFF", client, 30)
client.execute('DROP USER ' + replication_user)
return {
'replication_user': replication_user
}
def _wait_for_slave_status(self, status, client, max_time):

View File

@ -52,6 +52,10 @@ class Replication(Strategy):
def detach_slave(self, service):
"""Turn off replication on a slave site."""
@abc.abstractmethod
def cleanup_source_on_replica_detach(self, service, replica_info):
"""Clean up the source on the detach of a replica."""
@abc.abstractmethod
def demote_master(self, service):
"""Turn off replication on a master site."""

View File

@ -15,12 +15,15 @@
#
import csv
import uuid
from trove.common import cfg
from trove.common import exception
from trove.common import utils
from trove.guestagent.backup.backupagent import BackupAgent
from trove.guestagent.common import operating_system
from trove.guestagent.datastore.mysql.service import MySqlAdmin
from trove.guestagent.db import models
from trove.guestagent.strategies import backup
from trove.guestagent.strategies.replication import base
from trove.guestagent.strategies.storage import get_storage_strategy
@ -65,6 +68,33 @@ class MysqlBinlogReplication(base.Replication):
}
return master_ref
def _create_replication_user(self):
replication_user = None
replication_password = utils.generate_random_password(16)
mysql_user = models.MySQLUser()
mysql_user.password = replication_password
retry_count = 0
while replication_user is None:
try:
mysql_user.name = 'slave_' + str(uuid.uuid4())[:8]
MySqlAdmin().create_user([mysql_user.serialize()])
LOG.debug("Trying to create replication user " +
mysql_user.name)
replication_user = {
'name': mysql_user.name,
'password': replication_password
}
except Exception:
retry_count += 1
if retry_count > 5:
LOG.error(_("Replication user retry count exceeded"))
raise
return replication_user
def snapshot_for_replication(self, context, service,
location, snapshot_info):
snapshot_id = snapshot_info['id']
@ -76,9 +106,14 @@ class MysqlBinlogReplication(base.Replication):
AGENT.stream_backup_to_storage(snapshot_info, REPL_BACKUP_RUNNER,
storage, {}, REPL_EXTRA_OPTS)
replication_user = self._create_replication_user()
service.grant_replication_privilege(replication_user)
# With streamed InnobackupEx, the log position is in
# the stream and will be decoded by the slave
log_position = {}
log_position = {
'replication_user': replication_user
}
return snapshot_id, log_position
def enable_as_master(self, service, snapshot_info, master_config):
@ -86,23 +121,28 @@ class MysqlBinlogReplication(base.Replication):
master_config = MASTER_CONFIG
service.write_replication_overrides(master_config)
service.restart()
service.grant_replication_privilege()
def enable_as_slave(self, service, snapshot, slave_config):
if not slave_config:
slave_config = SLAVE_CONFIG
service.write_replication_overrides(slave_config)
service.restart()
logging_config = snapshot['log_position']
logging_config.update(self._read_log_position())
service.change_master_for_binlog(
snapshot['master']['host'],
snapshot['master']['port'],
self._read_log_position())
logging_config)
service.start_slave()
def detach_slave(self, service):
service.stop_slave()
replica_info = service.stop_slave()
service.remove_replication_overrides()
service.restart()
return replica_info
def cleanup_source_on_replica_detach(self, admin_service, replica_info):
admin_service.delete_user_by_name(replica_info['replication_user'])
def demote_master(self, service):
service.revoke_replication_privilege()

View File

@ -60,8 +60,10 @@ class Manager(periodic_task.PeriodicTasks):
instance_tasks.restart()
def detach_replica(self, context, instance_id):
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
instance_tasks.detach_replica()
slave = models.BuiltInstanceTasks.load(context, instance_id)
master_id = slave.slave_of_id
master = models.BuiltInstanceTasks.load(context, master_id)
slave.detach_replica(master)
def migrate(self, context, instance_id, host):
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)

View File

@ -945,14 +945,19 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
return run_with_quotas(self.context.tenant, {'backups': 1},
_get_replication_snapshot)
def detach_replica(self):
def detach_replica(self, master):
LOG.debug("Calling detach_replica on %s" % self.id)
try:
self.guest.detach_replica()
replica_info = self.guest.detach_replica()
master.cleanup_source_on_replica_detach(replica_info)
self.update_db(slave_of_id=None)
except (GuestError, GuestTimeout):
LOG.exception(_("Failed to detach replica %s.") % self.id)
def cleanup_source_on_replica_detach(self, replica_info):
LOG.debug("Calling cleanup_source_on_replica_detach on %s" % self.id)
self.guest.cleanup_source_on_replica_detach(replica_info)
def reboot(self):
try:
LOG.debug("Stopping datastore on instance %s." % self.id)

View File

@ -42,6 +42,14 @@ slave_instance = SlaveInstanceTestInfo()
existing_db_on_master = generate_uuid()
def _get_user_count(server_info):
cmd = ('mysql -BNq -e \\\'select count\\(*\\) from mysql.user'
' where user like \\\"slave_%\\\"\\\'')
server = create_server_connection(server_info.id)
stdout, stderr = server.execute(cmd)
return int(stdout.rstrip())
def slave_is_running(running=True):
def check_slave_is_running():
@ -141,6 +149,11 @@ class VerifySlave(object):
def test_existing_db_exists_on_slave(self):
poll_until(self.db_is_found(existing_db_on_master))
@test(depends_on=[test_existing_db_exists_on_slave])
def test_slave_user_exists(self):
assert_equal(_get_user_count(slave_instance), 1)
assert_equal(_get_user_count(instance_info), 1)
@test(groups=[GROUP],
depends_on=[WaitForCreateSlaveToFinish],
@ -200,6 +213,16 @@ class DetachReplica(object):
stdout, stderr = server.execute(cmd)
assert_equal(stdout, "0\n")
@test(depends_on=[test_detach_replica])
def test_slave_user_removed(self):
if CONFIG.fake_mode:
raise SkipTest("Test not_read_only not supported in fake mode")
def _slave_user_deleted():
return _get_user_count(instance_info) == 0
poll_until(_slave_user_deleted)
@test(groups=[GROUP],
depends_on=[WaitForCreateSlaveToFinish],

View File

@ -293,7 +293,7 @@ class MySqlAdminTest(testtools.TestCase):
def test_delete_user(self):
user = {"_name": "testUser"}
user = {"_name": "testUser", "_host": None}
self.mySqlAdmin.delete_user(user)
@ -301,7 +301,7 @@ class MySqlAdminTest(testtools.TestCase):
call_args = dbaas.LocalSqlClient.execute.call_args
if call_args is not None:
args, _ = call_args
expected = "DROP USER `testUser`;"
expected = "DROP USER `testUser`@`%`;"
self.assertEqual(args[0].text, expected,
"Delete user queries are not the same")