Run reset master when setting up mysql replicas

* Only run 'FLUSH PRIVILEGES;' when needed
* Fix SqlClient contextmanager initialization

Story: 2009091
Task: 42921

Change-Id: I05a07402388d214dc95071be709191f63af49411
This commit is contained in:
Lingxian Kong 2021-07-30 16:20:08 +12:00
parent 3b827ef20b
commit c551eac983
3 changed files with 43 additions and 25 deletions

View File

@ -105,7 +105,8 @@ class BaseMySqlAdmin(object):
"""Internal. Given a MySQLUser, populate its databases attribute."""
LOG.debug("Associating dbs to user %(name)s at %(host)s.",
{'name': user.name, 'host': user.host})
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
with mysql_util.SqlClient(
self.mysql_app.get_engine(), use_flush=True) as client:
q = sql_query.Query()
q.columns = ["grantee", "table_schema"]
q.tables = ["information_schema.SCHEMA_PRIVILEGES"]
@ -121,7 +122,8 @@ class BaseMySqlAdmin(object):
def change_passwords(self, users):
"""Change the passwords of one or more existing users."""
LOG.debug("Changing the password of some users.")
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
with mysql_util.SqlClient(
self.mysql_app.get_engine(), use_flush=True) as client:
for item in users:
LOG.debug("Changing password for user %s.", item)
user_dict = {'_name': item['name'],
@ -144,9 +146,8 @@ class BaseMySqlAdmin(object):
new_password = user_attrs.get('password')
if new_name or new_host or new_password:
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
with mysql_util.SqlClient(
self.mysql_app.get_engine(), use_flush=True) as client:
if new_password is not None:
uu = sql_query.SetPassword(user.name, host=user.host,
new_password=new_password)
@ -163,7 +164,8 @@ class BaseMySqlAdmin(object):
def create_databases(self, databases):
"""Create the list of specified databases."""
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
with mysql_util.SqlClient(
self.mysql_app.get_engine(), use_flush=True) as client:
for item in databases:
mydb = models.MySQLSchema.deserialize(item)
mydb.check_create()
@ -178,7 +180,8 @@ class BaseMySqlAdmin(object):
"""Create users and grant them privileges for the
specified databases.
"""
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
with mysql_util.SqlClient(
self.mysql_app.get_engine(), use_flush=True) as client:
for item in users:
user = models.MySQLUser.deserialize(item)
user.check_create()
@ -198,7 +201,8 @@ class BaseMySqlAdmin(object):
def delete_database(self, database):
"""Delete the specified database."""
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
with mysql_util.SqlClient(
self.mysql_app.get_engine(), use_flush=True) as client:
mydb = models.MySQLSchema.deserialize(database)
mydb.check_delete()
dd = sql_query.DropDatabase(mydb.name)
@ -212,7 +216,8 @@ class BaseMySqlAdmin(object):
self.delete_user_by_name(mysql_user.name, mysql_user.host)
def delete_user_by_name(self, name, host='%'):
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
with mysql_util.SqlClient(
self.mysql_app.get_engine(), use_flush=True) as client:
du = sql_query.DropUser(name, host=host)
t = text(str(du))
LOG.debug("delete_user_by_name: %s", t)
@ -238,7 +243,8 @@ class BaseMySqlAdmin(object):
": %(reason)s") %
{'user': username, 'reason': err_msg}
)
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
with mysql_util.SqlClient(
self.mysql_app.get_engine(), use_flush=True) as client:
q = sql_query.Query()
q.columns = ['User', 'Host']
q.tables = ['mysql.user']
@ -260,7 +266,8 @@ class BaseMySqlAdmin(object):
"""Grant a user permission to use a given database."""
user = self._get_user(username, hostname)
mydb = None # cache the model as we just want name validation
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
with mysql_util.SqlClient(
self.mysql_app.get_engine(), use_flush=True) as client:
for database in databases:
try:
if mydb:
@ -363,7 +370,8 @@ class BaseMySqlAdmin(object):
LOG.debug("The following user names are on ignore list and will "
"be omitted from the listing: %s", ignored_user_names)
users = []
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
with mysql_util.SqlClient(
self.mysql_app.get_engine(), use_flush=True) as client:
iq = sql_query.Query() # Inner query.
iq.columns = ['User', 'Host', "CONCAT(User, '@', Host) as Marker"]
iq.tables = ['mysql.user']
@ -405,7 +413,8 @@ class BaseMySqlAdmin(object):
def revoke_access(self, username, hostname, database):
"""Revoke a user's permission to use a given database."""
user = self._get_user(username, hostname)
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
with mysql_util.SqlClient(
self.mysql_app.get_engine(), use_flush=True) as client:
r = sql_query.Revoke(database=database,
user=user.name,
host=user.host)
@ -460,9 +469,10 @@ class BaseMySqlApp(service.BaseDbApp):
return ENGINE
def execute_sql(self, sql_statement):
def execute_sql(self, sql_statement, use_flush=False):
LOG.debug("Executing SQL: %s", sql_statement)
with mysql_util.SqlClient(self.get_engine()) as client:
with mysql_util.SqlClient(
self.get_engine(), use_flush=use_flush) as client:
return client.execute(sql_statement)
def get_data_dir(self):
@ -508,21 +518,21 @@ class BaseMySqlApp(service.BaseDbApp):
engine = sqlalchemy.create_engine(
CONNECTION_STR_FORMAT % ('root', root_pass), echo=True)
with mysql_util.SqlClient(engine, use_flush=False) as client:
with mysql_util.SqlClient(engine) as client:
self._create_admin_user(client, admin_password)
engine = sqlalchemy.create_engine(
CONNECTION_STR_FORMAT % (ADMIN_USER_NAME,
urllib.parse.quote(admin_password)),
echo=True)
with mysql_util.SqlClient(engine) as client:
with mysql_util.SqlClient(engine, use_flush=True) as client:
self._remove_anonymous_user(client)
self.save_password(ADMIN_USER_NAME, admin_password)
LOG.info("MySQL secure complete.")
def secure_root(self):
with mysql_util.SqlClient(self.get_engine()) as client:
with mysql_util.SqlClient(self.get_engine(), use_flush=True) as client:
self._remove_remote_root_access(client)
def _remove_anonymous_user(self, client):
@ -730,7 +740,7 @@ class BaseMySqlApp(service.BaseDbApp):
LOG.info("Granting replication slave privilege for %s",
replication_user['name'])
with mysql_util.SqlClient(self.get_engine()) as client:
with mysql_util.SqlClient(self.get_engine(), use_flush=True) as client:
g = sql_query.Grant(permissions=['REPLICATION SLAVE'],
user=replication_user['name'],
clear=replication_user['password'])
@ -835,7 +845,8 @@ class BaseMySqlRootAccess(object):
reset the root password.
"""
user = models.MySQLUser.root(password=root_password)
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
with mysql_util.SqlClient(
self.mysql_app.get_engine(), use_flush=True) as client:
try:
cu = sql_query.CreateUser(user.name, host=user.host)
t = text(str(cu))
@ -844,7 +855,9 @@ class BaseMySqlRootAccess(object):
# Ignore, user is already created, just reset the password
# TODO(rnirmal): More fine grained error checking later on
LOG.debug(err)
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
with mysql_util.SqlClient(self.mysql_app.get_engine(),
use_flush=True) as client:
uu = sql_query.SetPassword(user.name, host=user.host,
new_password=user.password)
t = text(str(uu))

View File

@ -31,6 +31,10 @@ class MysqlGTIDReplication(mysql_base.MysqlReplicationBase):
last_gtid = self.read_last_master_gtid(service)
LOG.info("last_gtid value is %s", last_gtid)
if '-' in last_gtid:
# See
# https://avdeo.com/tag/error-1840-hy000-global-gtid_purged-can-only-be-set-when/
# Also, FLUSH PRIVILEGES will restore gtid_executed.
service.execute_sql('RESET MASTER')
set_gtid_cmd = "SET GLOBAL gtid_purged='%s'" % last_gtid
service.execute_sql(set_gtid_cmd)

View File

@ -26,14 +26,14 @@ FLUSH = text(sql_query.FLUSH)
class SqlClient(object):
"""A sqlalchemy wrapper to manage transactions."""
def __init__(self, engine, use_flush=True):
def __init__(self, engine, use_flush=False):
self.engine = engine
self.use_flush = use_flush
def __enter__(self):
self.conn = self.engine.connect()
self.trans = self.conn.begin()
return self.conn
return self
def __exit__(self, type, value, traceback):
if self.trans:
@ -49,10 +49,11 @@ class SqlClient(object):
LOG.debug('Execute SQL: %s', t)
try:
return self.conn.execute(t, kwargs)
except Exception:
except Exception as err:
LOG.error(f'Failed to execute SQL {t}, error: {err}')
self.trans.rollback()
self.trans = None
raise
raise err
class BaseKeepAliveConnection(interfaces.PoolListener):