MySQL asynchronous replication
This patchset implements new relations: "master" and "slave" based on a common "mysql-async-replication" interface which are used for the purpose on enabling MySQL asynchronous replication between multiple Percona XtraDB Clusters. Change-Id: I94710bff17091516875c81ca769d8078ef5efd10 Closes-Bug: 1776171
This commit is contained in:
parent
117f2bd290
commit
e116b1ef86
22
README.md
22
README.md
|
@ -124,6 +124,28 @@ If both 'vip' and 'dns-ha' are set, as they are mutually exclusive
|
|||
If 'dns-ha' is set and 'os-access-hostname' is not set
|
||||
If the 'access' binding is not set and 'dns-ha' is set, consumers of the db may not be allowed to connect
|
||||
|
||||
## MySQL asynchronous replication
|
||||
|
||||
This charm supports MySQL asynchronous replication feature which can be used
|
||||
to replicate databases between multiple Percona XtraDB Clusters. In order to
|
||||
setup master-slave replication of "example1" and "example2" databases between
|
||||
"pxc1" and "pxc2" applications, first configure mandatory options:
|
||||
|
||||
juju config pxc1 databases-to-replicate="database1:table1,table2;database2"
|
||||
juju config pxc2 databases-to-replicate="database1:table1,table2;database2"
|
||||
juju config pxc1 cluster-id=1
|
||||
juju config pxc2 cluster-id=2
|
||||
|
||||
and then relate them:
|
||||
|
||||
juju relate pxc1:master pxc2:slave
|
||||
|
||||
In order to setup master-master replication, add another relation:
|
||||
|
||||
juju relate pxc2:master pxc1:slave
|
||||
|
||||
In the same way circular replication can be setup between multiple clusters.
|
||||
|
||||
## Network Space support
|
||||
|
||||
This charm supports the use of Juju Network Spaces, allowing the charm to be bound
|
||||
|
|
20
config.yaml
20
config.yaml
|
@ -321,3 +321,23 @@ options:
|
|||
WARNING Please read all documentation before changing the default value which may have
|
||||
unintended consequences. It may be necessary to set this value higher during deploy time
|
||||
(PT15S) and subsequently change it back to the default (PT3S) after deployment.
|
||||
databases-to-replicate:
|
||||
type: string
|
||||
default:
|
||||
description: |
|
||||
Databases and tables to replicate using MySQL asynchronous replication.
|
||||
The databases should be separated with a semicolon while the tables
|
||||
should be separated with a comma. No tables mean that the whole database
|
||||
will be replicated. For example "database1:table1,table2;database2"
|
||||
will replicate "table1" and "table2" tables from "database1" databasae
|
||||
and all tables from "database2" database.
|
||||
.
|
||||
NOTE: This option should be used only when relating one cluster to the
|
||||
other. It does not affect Galera synchronous replication.
|
||||
cluster-id:
|
||||
type: int
|
||||
default:
|
||||
description: |
|
||||
Cluster ID to be used when using MySQL asynchronous replication.
|
||||
.
|
||||
NOTE: This value must be different for each cluster.
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
percona_hooks.py
|
|
@ -0,0 +1 @@
|
|||
percona_hooks.py
|
|
@ -0,0 +1 @@
|
|||
percona_hooks.py
|
|
@ -37,6 +37,7 @@ from charmhelpers.core.host import (
|
|||
lsb_release,
|
||||
mkdir,
|
||||
CompareHostReleases,
|
||||
pwgen,
|
||||
)
|
||||
from charmhelpers.core.templating import render
|
||||
from charmhelpers.fetch import (
|
||||
|
@ -122,6 +123,15 @@ from percona_utils import (
|
|||
pause_unit_helper,
|
||||
resume_unit_helper,
|
||||
check_for_socket,
|
||||
get_cluster_id,
|
||||
get_databases_to_replicate,
|
||||
configure_master,
|
||||
configure_slave,
|
||||
deconfigure_slave,
|
||||
get_master_status,
|
||||
get_slave_status,
|
||||
delete_replication_user,
|
||||
list_replication_users,
|
||||
)
|
||||
|
||||
from charmhelpers.core.unitdata import kv
|
||||
|
@ -233,6 +243,11 @@ def render_config(hosts=None):
|
|||
context['innodb_autoinc_lock_mode'] = '2'
|
||||
context['pxc_strict_mode'] = config('pxc-strict-mode')
|
||||
|
||||
if config('databases-to-replicate'):
|
||||
context['databases_to_replicate'] = get_databases_to_replicate()
|
||||
|
||||
context['server-id'] = get_server_id()
|
||||
|
||||
context.update(PerconaClusterHelper().parse_config())
|
||||
render(os.path.basename(config_file), config_file, context, perms=0o444)
|
||||
|
||||
|
@ -560,6 +575,10 @@ def config_changed():
|
|||
update_root_password()
|
||||
set_ready_on_peers()
|
||||
|
||||
# NOTE(tkurek): re-set 'master' relation data
|
||||
if relation_ids('master'):
|
||||
master_joined()
|
||||
|
||||
|
||||
@hooks.hook('cluster-relation-joined')
|
||||
def cluster_joined():
|
||||
|
@ -937,6 +956,12 @@ def ha_relation_changed():
|
|||
def leader_settings_changed():
|
||||
'''Re-trigger install once leader has seeded passwords into install'''
|
||||
config_changed()
|
||||
# NOTE(tkurek): re-set 'master' relation data
|
||||
if relation_ids('master'):
|
||||
master_joined()
|
||||
# NOTE(tkurek): deconfigure old leader
|
||||
if relation_ids('slave'):
|
||||
deconfigure_slave()
|
||||
|
||||
|
||||
@hooks.hook('leader-elected')
|
||||
|
@ -947,6 +972,12 @@ def leader_elected():
|
|||
else:
|
||||
log('leader-elected hook executed, but this unit is not the leader',
|
||||
level=INFO)
|
||||
# NOTE(tkurek): re-set 'master' relation data
|
||||
if relation_ids('master'):
|
||||
master_joined()
|
||||
# NOTE(tkurek): configure new leader
|
||||
if relation_ids('slave'):
|
||||
configure_slave()
|
||||
|
||||
|
||||
@hooks.hook('nrpe-external-master-relation-joined',
|
||||
|
@ -966,6 +997,106 @@ def update_nrpe_config():
|
|||
nrpe_setup.write()
|
||||
|
||||
|
||||
@hooks.hook('master-relation-joined')
|
||||
def master_joined(interface='master'):
|
||||
cluster_id = get_cluster_id()
|
||||
if not is_clustered():
|
||||
log("Not clustered yet", level=DEBUG)
|
||||
return
|
||||
relation_settings = {}
|
||||
leader_settings = leader_get()
|
||||
if is_leader():
|
||||
if not leader_settings.get('async-rep-password'):
|
||||
# Replication password cannot be longer than 32 characters
|
||||
leader_set({'async-rep-password': pwgen(32)})
|
||||
return
|
||||
configure_master()
|
||||
master_address, master_file, master_position = (
|
||||
get_master_status(interface))
|
||||
if leader_settings.get('master-address') is not master_address:
|
||||
leader_settings['master-address'] = master_address
|
||||
leader_settings['master-file'] = master_file
|
||||
leader_settings['master-position'] = master_position
|
||||
leader_set(leader_settings)
|
||||
relation_settings = {'leader': True}
|
||||
else:
|
||||
relation_settings = {'leader': False}
|
||||
relation_settings['cluster_id'] = cluster_id
|
||||
relation_settings['master_address'] = leader_settings['master-address']
|
||||
relation_settings['master_file'] = leader_settings['master-file']
|
||||
relation_settings['master_password'] = \
|
||||
leader_settings['async-rep-password']
|
||||
relation_settings['master_position'] = leader_settings['master-position']
|
||||
log("Setting master relation: '{}'".format(relation_settings), level=INFO)
|
||||
for rid in relation_ids(interface):
|
||||
relation_set(relation_id=rid, relation_settings=relation_settings)
|
||||
|
||||
|
||||
@hooks.hook('master-relation-changed')
|
||||
def master_changed(interface='master'):
|
||||
if is_leader():
|
||||
configure_master()
|
||||
|
||||
|
||||
@hooks.hook('master-relation-departed')
|
||||
def master_departed(interface='master'):
|
||||
if is_leader():
|
||||
reset_password = True
|
||||
new_slave_addresses = []
|
||||
old_slave_addresses = list_replication_users()
|
||||
for rid in relation_ids(interface):
|
||||
if related_units(rid):
|
||||
reset_password = False
|
||||
for unit in related_units(rid):
|
||||
if not relation_get(attribute='slave_address',
|
||||
rid=rid, unit=unit):
|
||||
log("No relation data for {}".format(unit), level=DEBUG)
|
||||
return
|
||||
new_slave_addresses.append(
|
||||
relation_get(attribute='slave_address',
|
||||
rid=rid,
|
||||
unit=unit))
|
||||
for old_slave_address in old_slave_addresses:
|
||||
if old_slave_address not in new_slave_addresses:
|
||||
delete_replication_user(old_slave_address)
|
||||
if reset_password:
|
||||
leader_set({'async-rep-password': ''})
|
||||
|
||||
|
||||
@hooks.hook('slave-relation-joined')
|
||||
def slave_joined(interface='slave'):
|
||||
relation_settings = {}
|
||||
cluster_id = get_cluster_id()
|
||||
if not is_clustered():
|
||||
log("Not clustered yet", level=DEBUG)
|
||||
return
|
||||
if is_leader():
|
||||
configure_slave()
|
||||
relation_settings = {'slave_address':
|
||||
network_get_primary_address(interface)}
|
||||
relation_settings['cluster_id'] = cluster_id
|
||||
log("Setting slave relation: '{}'".format(relation_settings), level=INFO)
|
||||
for rid in relation_ids(interface):
|
||||
relation_set(relation_id=rid, relation_settings=relation_settings)
|
||||
|
||||
|
||||
@hooks.hook('slave-relation-changed')
|
||||
def slave_changed(interface='slave'):
|
||||
for rid in relation_ids(interface):
|
||||
for unit in related_units(rid):
|
||||
rdata = relation_get(unit=unit, rid=rid)
|
||||
if rdata.get('leader'):
|
||||
if rdata.get('master_address') is not get_slave_status():
|
||||
slave_departed()
|
||||
slave_joined()
|
||||
|
||||
|
||||
@hooks.hook('slave-relation-departed')
|
||||
def slave_departed():
|
||||
if is_leader():
|
||||
deconfigure_slave()
|
||||
|
||||
|
||||
@hooks.hook('update-status')
|
||||
@harden()
|
||||
def update_status():
|
||||
|
|
|
@ -3,8 +3,11 @@ import subprocess
|
|||
from subprocess import Popen, PIPE
|
||||
import socket
|
||||
import tempfile
|
||||
import copy
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import six
|
||||
import uuid
|
||||
from functools import partial
|
||||
import time
|
||||
|
@ -1078,12 +1081,48 @@ def get_wsrep_provider_options():
|
|||
return ';'.join(wsrep_provider_options)
|
||||
|
||||
|
||||
class ClusterIDRequired(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ClusterIDIdentical(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def get_cluster_id():
|
||||
""" Return cluster id (lp1776171)
|
||||
|
||||
Return cluster ID for MySQL asynchronous replication
|
||||
:returns: int cluster_id
|
||||
"""
|
||||
if not config('cluster-id'):
|
||||
msg = ("Master / Slave relation requires 'cluster-id' option")
|
||||
status_set("blocked", msg)
|
||||
raise ClusterIDRequired(msg)
|
||||
cluster_id = config('cluster-id')
|
||||
for rid in relation_ids('master'):
|
||||
for unit in related_units(rid):
|
||||
if relation_get(attribute='cluster_id',
|
||||
rid=rid,
|
||||
unit=unit) == cluster_id:
|
||||
msg = ("'cluster-id' option must be unique within a cluster")
|
||||
status_set('blocked', msg)
|
||||
raise ClusterIDIdentical(msg)
|
||||
for rid in relation_ids('slave'):
|
||||
for unit in related_units(rid):
|
||||
if relation_get(attribute='cluster_id',
|
||||
rid=rid,
|
||||
unit=unit) == cluster_id:
|
||||
msg = ("'cluster-id' option must be unique within a cluster")
|
||||
status_set('blocked', msg)
|
||||
raise ClusterIDIdentical(msg)
|
||||
return cluster_id
|
||||
|
||||
|
||||
def get_server_id():
|
||||
""" Return unique server id for bin log replication
|
||||
|
||||
Server ID must be a unique, non-zero, positive number from 1 to 2**32 - 1
|
||||
https://dev.mysql.com/doc/refman/8.0/en/replication-options.html
|
||||
|
||||
:returns: int server_id
|
||||
"""
|
||||
MAX_SERVER_ID = 2**32 - 1
|
||||
|
@ -1128,3 +1167,283 @@ def check_for_socket(file_name, exists=True, sleep=10, attempts=12):
|
|||
# If we get here throw an exception
|
||||
raise Exception("Socket {} not found after {} attempts."
|
||||
.format(file_name, attempts))
|
||||
|
||||
|
||||
class InvalidDatabasesToReplicate(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidCharacters(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def get_databases_to_replicate():
|
||||
""" Get databases_to_replicate (lp1776171)
|
||||
|
||||
Returns databases and tables to replicate using MySQL asynchronous
|
||||
replication
|
||||
|
||||
:returns: list of dicts of databases and tables to replicate
|
||||
:rtype: [{'database': str, 'tables': [str, ...]}, ...]
|
||||
:raises: OperationalError
|
||||
"""
|
||||
if not config('cluster-id'):
|
||||
msg = ("'cluster-id' option must be set when using "
|
||||
"'databases-to-replicate' option")
|
||||
status_set('blocked', msg)
|
||||
raise ClusterIDRequired(msg)
|
||||
|
||||
databases_to_replicate = []
|
||||
entries = config('databases-to-replicate').strip().split(';')
|
||||
try:
|
||||
for entry in entries:
|
||||
databases_and_tables = {}
|
||||
entry_split = entry.split(':')
|
||||
databases_and_tables['database'] = (
|
||||
check_invalid_chars(entry_split[0]))
|
||||
try:
|
||||
# Tables present
|
||||
databases_and_tables['tables'] = (
|
||||
check_invalid_chars(entry_split[1].split(',')))
|
||||
except IndexError:
|
||||
# If there are no tables
|
||||
databases_and_tables['tables'] = []
|
||||
databases_to_replicate.append(databases_and_tables)
|
||||
except InvalidCharacters as e:
|
||||
raise InvalidDatabasesToReplicate(
|
||||
"The configuration setting databases-to-replicate is malformed. {}"
|
||||
.format(e.message))
|
||||
return databases_to_replicate
|
||||
|
||||
|
||||
def check_invalid_chars(data, bad_chars_re="[\^\\/?%*:|\"'<>., ]"):
|
||||
""" Check for invalid characters
|
||||
|
||||
Run a pattern check on the data and raise an InvalidCharacters exception
|
||||
if there is a match. Return the original data untouched if no match is
|
||||
found.
|
||||
|
||||
Input can be a list or a string.
|
||||
|
||||
:param data: List or string under test
|
||||
:type data: str or list
|
||||
:param bad_chars_re: String regex to check against
|
||||
:type bad_chars_re: str
|
||||
:raises: InvalidCharacters
|
||||
:returns: The original data untouched
|
||||
:rtype: str or list
|
||||
"""
|
||||
if isinstance(data, six.string_types):
|
||||
data_strings = [data]
|
||||
else:
|
||||
data_strings = copy.copy(data)
|
||||
|
||||
for data_string in data_strings:
|
||||
m = re.search(bad_chars_re, data_string)
|
||||
if m:
|
||||
raise(InvalidCharacters(
|
||||
"Invalid character '{}' in '{}'"
|
||||
.format(m.group(0), data_string)))
|
||||
return data
|
||||
|
||||
|
||||
def configure_master():
|
||||
""" Configure master (lp1776171)
|
||||
|
||||
Calls 'create_replication_user' function for IP addresses of all related
|
||||
units.
|
||||
|
||||
"""
|
||||
new_slave_addresses = []
|
||||
old_slave_addresses = list_replication_users()
|
||||
for rid in relation_ids('master'):
|
||||
for unit in related_units(rid):
|
||||
if not relation_get(attribute='slave_address', rid=rid, unit=unit):
|
||||
log("No IP address for {} yet".format(unit), level=DEBUG)
|
||||
return
|
||||
new_slave_addresses.append(
|
||||
relation_get(attribute='slave_address', rid=rid, unit=unit))
|
||||
# If not yet created
|
||||
for new_slave_address in new_slave_addresses:
|
||||
if new_slave_address not in old_slave_addresses:
|
||||
create_replication_user(new_slave_address,
|
||||
leader_get('async-rep-password'))
|
||||
|
||||
|
||||
def configure_slave():
|
||||
""" Configure slave (lp1776171)
|
||||
|
||||
Configures MySQL asynchronous replication slave.
|
||||
|
||||
:raises: OperationalError
|
||||
"""
|
||||
rel_data = {}
|
||||
for rid in relation_ids('slave'):
|
||||
for unit in related_units(rid):
|
||||
rdata = relation_get(unit=unit, rid=rid)
|
||||
is_leader = rdata.get('leader', None)
|
||||
if is_leader is None:
|
||||
log("No relation data for {} yet".format(unit), level=DEBUG)
|
||||
continue
|
||||
try:
|
||||
if (is_leader and not(all(
|
||||
rdata.get("master_{}".format(k)) for k in ["address",
|
||||
"file",
|
||||
"password",
|
||||
"position"]))):
|
||||
log("No full relation data for {} yet".format(unit),
|
||||
level=DEBUG)
|
||||
continue
|
||||
m_helper = get_db_helper()
|
||||
try:
|
||||
m_helper.connect(user='replication',
|
||||
password=rdata.get('master_password'),
|
||||
host=rdata.get('master_address'))
|
||||
rel_data['master_address'] = rdata.get('master_address')
|
||||
rel_data['master_file'] = rdata.get('master_file')
|
||||
rel_data['master_password'] = rdata.get('master_password')
|
||||
rel_data['master_position'] = rdata.get('master_position')
|
||||
except OperationalError:
|
||||
log("Could not connect to {}".format(unit), level=DEBUG)
|
||||
except KeyError:
|
||||
log("No relation data for {} yet".format(unit), level=DEBUG)
|
||||
raise
|
||||
if not rel_data:
|
||||
log("Unable to find the master", level=DEBUG)
|
||||
return
|
||||
m_helper = get_db_helper()
|
||||
try:
|
||||
m_helper.connect(password=m_helper.get_mysql_root_password())
|
||||
except OperationalError:
|
||||
log("Could not connect to db", level=DEBUG)
|
||||
return
|
||||
m_helper.execute("STOP SLAVE;")
|
||||
m_helper.execute(("CHANGE MASTER TO master_host='{master_address}', "
|
||||
"master_port=3306, master_user='replication', "
|
||||
"master_password='{master_password}', "
|
||||
"master_log_file='{master_file}', "
|
||||
"master_log_pos={master_position};").format(**rel_data))
|
||||
m_helper.execute("START SLAVE;")
|
||||
|
||||
|
||||
def deconfigure_slave():
|
||||
""" Deconfigure slave (lp1776171)
|
||||
|
||||
Deconfigures MySQL asynchronous replication slave on relation departure.
|
||||
|
||||
:raises: OperationalError
|
||||
"""
|
||||
m_helper = get_db_helper()
|
||||
try:
|
||||
m_helper.connect(password=m_helper.get_mysql_root_password())
|
||||
except OperationalError:
|
||||
log("Could not connect to db", level=DEBUG)
|
||||
return
|
||||
m_helper.execute("STOP SLAVE;")
|
||||
m_helper.execute("RESET SLAVE ALL;")
|
||||
|
||||
|
||||
def get_master_status(interface):
|
||||
""" Get master status (lp1776171)
|
||||
|
||||
Returns MySQL asynchronous replication master status.
|
||||
|
||||
:param interface: relation name
|
||||
:type interface: str
|
||||
:returns: tuple of (IP address in space associated with 'master' binding,
|
||||
replication file,
|
||||
replication file position)
|
||||
:rtype: (str, str, str)
|
||||
:raises: OperationalError
|
||||
"""
|
||||
m_helper = get_db_helper()
|
||||
try:
|
||||
m_helper.connect(password=m_helper.get_mysql_root_password())
|
||||
except OperationalError:
|
||||
log("Could not connect to db", level=DEBUG)
|
||||
raise
|
||||
results = m_helper.select("SHOW MASTER STATUS;")
|
||||
return network_get_primary_address(interface), results[0][0], results[0][1]
|
||||
|
||||
|
||||
def get_slave_status():
|
||||
""" Get slave status (lp1776171)
|
||||
|
||||
Returns MySQL asynchronous replication slave status.
|
||||
|
||||
returns: currently configured master IP address
|
||||
rtype: str
|
||||
:raises: OperationalError
|
||||
"""
|
||||
m_helper = get_db_helper()
|
||||
try:
|
||||
m_helper.connect(password=m_helper.get_mysql_root_password())
|
||||
except OperationalError:
|
||||
log("Could not connect to db", level=DEBUG)
|
||||
raise
|
||||
results = m_helper.select("SHOW SLAVE STATUS;")
|
||||
return results[0][1]
|
||||
|
||||
|
||||
def create_replication_user(slave_address, master_password):
|
||||
""" Create replication user (lp1776171)
|
||||
|
||||
Grants access for MySQL asynchronous replication slave.
|
||||
|
||||
:param slave_address: slave IP address
|
||||
:type slave_address: str
|
||||
:param master_password: replication password
|
||||
:type master_password: str
|
||||
:raises: OperationalError
|
||||
"""
|
||||
m_helper = get_db_helper()
|
||||
try:
|
||||
m_helper.connect(password=m_helper.get_mysql_root_password())
|
||||
except OperationalError:
|
||||
log("Could not connect to db", level=DEBUG)
|
||||
return
|
||||
m_helper.execute(("GRANT REPLICATION SLAVE ON *.* TO ""'replication'@'{}' "
|
||||
"IDENTIFIED BY '{}';").format(slave_address,
|
||||
master_password))
|
||||
|
||||
|
||||
def delete_replication_user(slave_address):
|
||||
""" Delete replication user (lp1776171)
|
||||
|
||||
Revokes access for MySQL asynchronous replication slave.
|
||||
|
||||
:param slave_address: slave IP address
|
||||
:type slave_address: str
|
||||
:raises: OperationalError
|
||||
"""
|
||||
m_helper = get_db_helper()
|
||||
try:
|
||||
m_helper.connect(password=m_helper.get_mysql_root_password())
|
||||
except OperationalError:
|
||||
log("Could not connect to db", DEBUG)
|
||||
return
|
||||
m_helper.execute(("DELETE FROM mysql.user WHERE Host='{}' AND "
|
||||
"User='replication';").format(slave_address))
|
||||
|
||||
|
||||
def list_replication_users():
|
||||
""" List replication users (lp1776171)
|
||||
|
||||
Lists IP addresses of slaves which have been granted with an access for
|
||||
MySQL asynchronous replication.
|
||||
|
||||
:returns: IP addresses of slaves
|
||||
:rtype replication_users: [str]
|
||||
:raises: OperationalError
|
||||
"""
|
||||
replication_users = []
|
||||
m_helper = get_db_helper()
|
||||
try:
|
||||
m_helper.connect(password=m_helper.get_mysql_root_password())
|
||||
except OperationalError:
|
||||
log("Could not connect to db", DEBUG)
|
||||
raise
|
||||
for result in m_helper.select("SELECT Host FROM mysql.user WHERE "
|
||||
"User='replication';"):
|
||||
replication_users.append(result[0])
|
||||
return replication_users
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
percona_hooks.py
|
|
@ -0,0 +1 @@
|
|||
percona_hooks.py
|
|
@ -0,0 +1 @@
|
|||
percona_hooks.py
|
|
@ -24,6 +24,8 @@ provides:
|
|||
nrpe-external-master:
|
||||
interface: nrpe-external-master
|
||||
scope: container
|
||||
master:
|
||||
interface: mysql-async-replication
|
||||
peers:
|
||||
cluster:
|
||||
interface: percona-cluster
|
||||
|
@ -31,3 +33,5 @@ requires:
|
|||
ha:
|
||||
interface: hacluster
|
||||
scope: container
|
||||
slave:
|
||||
interface: mysql-async-replication
|
||||
|
|
|
@ -28,13 +28,28 @@ wsrep_cluster_address=gcomm://{{ cluster_hosts }}
|
|||
# In order for Galera to work correctly binlog format should be ROW
|
||||
binlog_format=ROW
|
||||
|
||||
{% if enable_binlogs -%}
|
||||
{% if enable_binlogs or databases_to_replicate -%}
|
||||
server_id = {{ server_id }}
|
||||
log_bin={{ binlogs_path }}
|
||||
expire_logs_days={{ binlogs_expire_days }}
|
||||
max_binlog_size={{ binlogs_max_size }}
|
||||
{% endif %}
|
||||
|
||||
{% if databases_to_replicate -%}
|
||||
# MySQL synchronous replication
|
||||
log_slave_updates = 1
|
||||
{% for entry in databases_to_replicate -%}
|
||||
replicate_do_db = {{ entry.database }}
|
||||
{% if entry.tables -%}
|
||||
{% for table in entry.tables -%}
|
||||
replicate_do_table = {{ entry.database }}.{{ table }}
|
||||
{% endfor %}
|
||||
{% else -%}
|
||||
replicate_wild_do_table = {{ entry.database }}.%
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
|
||||
# MyISAM storage engine has only experimental support
|
||||
default_storage_engine=InnoDB
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@ log_error = /var/log/mysql/error.log
|
|||
# The following can be used as easy to replay backup logs or for replication.
|
||||
# note: if you are setting up a replication slave, see README.Debian about
|
||||
# other settings you may need to change.
|
||||
{% if enable_binlogs -%}
|
||||
{% if enable_binlogs or databases_to_replicate -%}
|
||||
server_id = {{ server_id }}
|
||||
log_bin={{ binlogs_path }}
|
||||
{% endif %}
|
||||
|
@ -80,6 +80,21 @@ log_bin={{ binlogs_path }}
|
|||
expire_logs_days = {{ binlogs_expire_days }}
|
||||
max_binlog_size = {{ binlogs_max_size }}
|
||||
|
||||
{% if databases_to_replicate -%}
|
||||
# MySQL asynchronous replication
|
||||
log_slave_updates = 1
|
||||
{% for entry in databases_to_replicate -%}
|
||||
replicate_do_db = {{ entry.database }}
|
||||
{% if entry.tables -%}
|
||||
{% for table in entry.tables -%}
|
||||
replicate_do_table = {{ entry.database }}.{{ table }}
|
||||
{% endfor %}
|
||||
{% else -%}
|
||||
replicate_wild_do_table = {{ entry.database }}.%
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
|
||||
# Required to allow trigger creation for openstack services
|
||||
log_bin_trust_function_creators = 1
|
||||
|
||||
|
|
|
@ -283,6 +283,102 @@ class TestNRPERelation(CharmTestCase):
|
|||
mock.ANY, ["mysql"], mock.ANY)
|
||||
|
||||
|
||||
class TestMasterRelation(CharmTestCase):
|
||||
def setUp(self):
|
||||
patch_targets_master = TO_PATCH[:]
|
||||
patch_targets_master.extend(['configure_master',
|
||||
'get_cluster_id',
|
||||
'get_master_status',
|
||||
'leader_set'])
|
||||
CharmTestCase.setUp(self, hooks, patch_targets_master)
|
||||
|
||||
def test_master_joined_is_leader_and_no_leader_change(self):
|
||||
self.relation_ids.return_value = ['master:1']
|
||||
self.get_cluster_id.return_value = 1
|
||||
self.is_clustered.return_value = True
|
||||
self.leader_get.return_value = {'async-rep-password': 'password',
|
||||
'master-address': '10.0.0.1',
|
||||
'master-file': 'file1',
|
||||
'master-position': 'position1'}
|
||||
self.is_leader.return_value = True
|
||||
self.configure_master.return_value = True
|
||||
self.get_master_status.return_value = '10.0.0.1', 'file1', 'position1'
|
||||
|
||||
hooks.master_joined()
|
||||
self.leader_set.assert_called_with(
|
||||
{'async-rep-password': 'password',
|
||||
'master-address': '10.0.0.1',
|
||||
'master-file': 'file1',
|
||||
'master-position': 'position1'})
|
||||
self.relation_set.assert_called_with(
|
||||
relation_id='master:1', relation_settings={
|
||||
'leader': True, 'cluster_id': 1, 'master_address': '10.0.0.1',
|
||||
'master_file': 'file1', 'master_password': 'password',
|
||||
'master_position': 'position1'})
|
||||
|
||||
def test_master_joined_is_leader_and_leader_change(self):
|
||||
self.relation_ids.return_value = ['master:1']
|
||||
self.get_cluster_id.return_value = 1
|
||||
self.is_clustered.return_value = True
|
||||
self.leader_get.return_value = {'async-rep-password': 'password',
|
||||
'master-address': '10.0.0.1',
|
||||
'master-file': 'file1',
|
||||
'master-position': 'position1'}
|
||||
self.is_leader.return_value = True
|
||||
self.configure_master.return_value = True
|
||||
self.get_master_status.return_value = '10.0.0.2', 'file2', 'position2'
|
||||
|
||||
hooks.master_joined()
|
||||
self.leader_set.assert_called_with(
|
||||
{'async-rep-password': 'password',
|
||||
'master-address': '10.0.0.2',
|
||||
'master-file': 'file2',
|
||||
'master-position': 'position2'})
|
||||
self.relation_set.assert_called_with(
|
||||
relation_id='master:1', relation_settings={
|
||||
'leader': True, 'cluster_id': 1, 'master_address': '10.0.0.2',
|
||||
'master_file': 'file2', 'master_password': 'password',
|
||||
'master_position': 'position2'})
|
||||
|
||||
def test_master_joined_is_not_leader(self):
|
||||
self.relation_ids.return_value = ['master:1']
|
||||
self.get_cluster_id.return_value = 1
|
||||
self.is_clustered.return_value = True
|
||||
self.leader_get.return_value = {'async-rep-password': 'password',
|
||||
'master-address': '10.0.0.1',
|
||||
'master-file': 'file',
|
||||
'master-position': 'position'}
|
||||
self.is_leader.return_value = False
|
||||
|
||||
hooks.master_joined()
|
||||
self.relation_set.assert_called_with(
|
||||
relation_id='master:1', relation_settings={
|
||||
'leader': False, 'cluster_id': 1, 'master_address': '10.0.0.1',
|
||||
'master_file': 'file', 'master_password': 'password',
|
||||
'master_position': 'position'})
|
||||
|
||||
|
||||
class TestSlaveRelation(CharmTestCase):
|
||||
def setUp(self):
|
||||
patch_targets_slave = TO_PATCH[:]
|
||||
patch_targets_slave.extend(["configure_slave",
|
||||
"get_cluster_id"])
|
||||
CharmTestCase.setUp(self, hooks, patch_targets_slave)
|
||||
|
||||
def test_slave_joined(self):
|
||||
self.relation_ids.return_value = ['slave:1']
|
||||
self.is_clustered.return_value = True
|
||||
self.get_cluster_id.return_value = 1
|
||||
self.is_leader.return_value = True
|
||||
self.configure_slave.return_value = True
|
||||
self.network_get_primary_address.return_value = '172.16.0.1'
|
||||
|
||||
hooks.slave_joined()
|
||||
self.relation_set.assert_called_with(
|
||||
relation_id='slave:1', relation_settings={
|
||||
'slave_address': '172.16.0.1', 'cluster_id': 1})
|
||||
|
||||
|
||||
class TestConfigChanged(CharmTestCase):
|
||||
|
||||
TO_PATCH = [
|
||||
|
|
|
@ -990,3 +990,190 @@ class TestUpdateBootstrapUUID(CharmTestCase):
|
|||
'leader-ip': '10.10.10.10'}
|
||||
self.leader_get.return_value = leader_config
|
||||
self.assertTrue(percona_utils.is_leader_bootstrapped())
|
||||
|
||||
|
||||
class TestAsynchronousReplication(CharmTestCase):
|
||||
TO_PATCH = [
|
||||
'config',
|
||||
'leader_get',
|
||||
'network_get_primary_address',
|
||||
'related_units',
|
||||
'relation_get',
|
||||
'relation_ids',
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
super(TestAsynchronousReplication, self).setUp(percona_utils,
|
||||
self.TO_PATCH)
|
||||
|
||||
@mock.patch.object(percona_utils, 'config')
|
||||
def test_get_databases_to_replicate(self, mock_config):
|
||||
config = {'cluster-id': 1, 'databases-to-replicate': 'db1:tb1,tb2;db2'}
|
||||
mock_config.side_effect = lambda k: config.get(k)
|
||||
percona_utils.get_databases_to_replicate()
|
||||
self.assertEqual(percona_utils.get_databases_to_replicate(),
|
||||
([{'database': 'db1', 'tables': ['tb1', 'tb2']},
|
||||
{'database': 'db2', 'tables': []}]))
|
||||
|
||||
@mock.patch.object(percona_utils, 'create_replication_user')
|
||||
@mock.patch.object(percona_utils, 'list_replication_users')
|
||||
def test_configure_master_slave_address_not_in_relation_data(
|
||||
self, mock_list_replication_users, mock_create_replication_user):
|
||||
self.relation_ids.return_value = [1]
|
||||
self.related_units.return_value = [1, 2, 3]
|
||||
self.relation_get.return_value = None
|
||||
percona_utils.configure_master()
|
||||
mock_create_replication_user.assert_not_called()
|
||||
|
||||
@mock.patch.object(percona_utils, 'create_replication_user')
|
||||
@mock.patch.object(percona_utils, 'list_replication_users')
|
||||
def test_configure_master_slave_address_in_relation_data_and_created(
|
||||
self, mock_list_replication_users, mock_create_replication_user):
|
||||
self.relation_ids.return_value = [1]
|
||||
self.related_units.return_value = [1, 2, 3]
|
||||
|
||||
def _mock_rel_get(*args, **kwargs):
|
||||
unit_id = kwargs.get('unit')
|
||||
return '10.0.1.{}'.format(unit_id)
|
||||
|
||||
self.relation_get.side_effect = _mock_rel_get
|
||||
mock_list_replication_users.return_value = ['10.0.1.1',
|
||||
'10.0.1.2',
|
||||
'10.0.1.3']
|
||||
percona_utils.configure_master()
|
||||
mock_create_replication_user.assert_not_called()
|
||||
|
||||
@mock.patch.object(percona_utils, 'create_replication_user')
|
||||
@mock.patch.object(percona_utils, 'list_replication_users')
|
||||
def test_configure_master_slave_address_in_relation_data_and_not_created(
|
||||
self, mock_list_replication_users, mock_create_replication_user):
|
||||
self.relation_ids.return_value = [1]
|
||||
self.related_units.return_value = [1, 2, 3]
|
||||
|
||||
def _mock_rel_get(*args, **kwargs):
|
||||
unit_id = kwargs.get('unit')
|
||||
return '10.0.1.{}'.format(unit_id)
|
||||
|
||||
self.relation_get.side_effect = _mock_rel_get
|
||||
mock_list_replication_users.return_value = ['10.0.1.1', '10.0.1.2']
|
||||
self.leader_get.return_value = 'password'
|
||||
percona_utils.configure_master()
|
||||
mock_create_replication_user.assert_called_once_with('10.0.1.3',
|
||||
'password')
|
||||
|
||||
@mock.patch.object(percona_utils, 'get_db_helper')
|
||||
def test_configure_slave_no_leader(
|
||||
self, mock_get_db_helper):
|
||||
my_mock = mock.Mock()
|
||||
self.relation_ids.return_value = [1]
|
||||
self.related_units.return_value = [1, 2, 3]
|
||||
|
||||
def _mock_rel_get(*args, **kwargs):
|
||||
return {'private-address': '10.0.0.1'}
|
||||
|
||||
self.relation_get.side_effect = _mock_rel_get
|
||||
mock_get_db_helper.return_value = my_mock
|
||||
percona_utils.configure_slave()
|
||||
my_mock.execute.assert_not_called()
|
||||
|
||||
@mock.patch.object(percona_utils, 'get_db_helper')
|
||||
def test_configure_slave_leader_and_no_full_relation_data(
|
||||
self, mock_get_db_helper):
|
||||
my_mock = mock.Mock()
|
||||
self.relation_ids.return_value = [1]
|
||||
self.related_units.return_value = [1, 2, 3]
|
||||
|
||||
def _mock_rel_get(*args, **kwargs):
|
||||
return {'private-address': '10.0.0.1',
|
||||
'leader': True}
|
||||
|
||||
self.relation_get.side_effect = _mock_rel_get
|
||||
mock_get_db_helper.return_value = my_mock
|
||||
percona_utils.configure_slave()
|
||||
my_mock.execute.assert_not_called()
|
||||
|
||||
@mock.patch.object(percona_utils, 'get_db_helper')
|
||||
def test_configure_slave_leader_and_full_relation_data(
|
||||
self, mock_get_db_helper):
|
||||
my_mock = mock.Mock()
|
||||
self.relation_ids.return_value = [1]
|
||||
self.related_units.return_value = [1, 2, 3]
|
||||
|
||||
def _mock_rel_get(*args, **kwargs):
|
||||
return {'private-address': '10.0.0.1',
|
||||
'leader': True,
|
||||
'master_address': '10.0.0.1',
|
||||
'master_file': 'file',
|
||||
'master_password': 'password',
|
||||
'master_position': 'position'}
|
||||
|
||||
self.relation_get.side_effect = _mock_rel_get
|
||||
mock_get_db_helper.return_value = my_mock
|
||||
sql1 = "STOP SLAVE;"
|
||||
sql2 = ("CHANGE MASTER TO "
|
||||
"master_host='10.0.0.1', "
|
||||
"master_port=3306, "
|
||||
"master_user='replication', "
|
||||
"master_password='password', "
|
||||
"master_log_file='file', "
|
||||
"master_log_pos=position;")
|
||||
sql3 = "START SLAVE;"
|
||||
percona_utils.configure_slave()
|
||||
my_mock.execute.assert_any_call(sql1)
|
||||
my_mock.execute.assert_any_call(sql2)
|
||||
my_mock.execute.assert_any_call(sql3)
|
||||
|
||||
@mock.patch.object(percona_utils, 'get_db_helper')
|
||||
def test_deconfigure_slave(self, mock_get_db_helper):
|
||||
my_mock = mock.Mock()
|
||||
mock_get_db_helper.return_value = my_mock
|
||||
sql1 = "STOP SLAVE;"
|
||||
sql2 = "RESET SLAVE ALL;"
|
||||
percona_utils.deconfigure_slave()
|
||||
my_mock.execute.assert_any_call(sql1)
|
||||
my_mock.execute.assert_any_call(sql2)
|
||||
|
||||
@mock.patch.object(percona_utils, 'get_db_helper')
|
||||
def test_get_master_status(self, mock_get_db_helper):
|
||||
my_mock = mock.Mock()
|
||||
self.network_get_primary_address.return_value = '10.0.0.1'
|
||||
mock_get_db_helper.return_value = my_mock
|
||||
my_mock.select.return_value = [['file', 'position']]
|
||||
self.assertEqual(percona_utils.get_master_status('master'),
|
||||
('10.0.0.1', 'file', 'position'))
|
||||
|
||||
@mock.patch.object(percona_utils, 'get_db_helper')
|
||||
def test_get_slave_status(self, mock_get_db_helper):
|
||||
my_mock = mock.Mock()
|
||||
mock_get_db_helper.return_value = my_mock
|
||||
my_mock.select.return_value = [['state', '10.0.0.1']]
|
||||
self.assertEqual(percona_utils.get_slave_status(), ('10.0.0.1'))
|
||||
|
||||
@mock.patch.object(percona_utils, 'get_db_helper')
|
||||
def test_create_replication_user(self, mock_get_db_helper):
|
||||
my_mock = mock.Mock()
|
||||
slave_address = '10.0.1.1'
|
||||
master_password = 'password'
|
||||
mock_get_db_helper.return_value = my_mock
|
||||
sql = ("GRANT REPLICATION SLAVE ON *.* TO 'replication'@'{}' "
|
||||
"IDENTIFIED BY '{}';").format(slave_address, master_password)
|
||||
percona_utils.create_replication_user(slave_address, master_password)
|
||||
my_mock.execute.assert_called_with(sql)
|
||||
|
||||
@mock.patch.object(percona_utils, 'get_db_helper')
|
||||
def test_delete_replication_user(self, mock_get_db_helper):
|
||||
my_mock = mock.Mock()
|
||||
slave_address = '10.0.1.1'
|
||||
mock_get_db_helper.return_value = my_mock
|
||||
sql = ("DELETE FROM mysql.user WHERE Host='{}' AND "
|
||||
"User='replication';").format(slave_address)
|
||||
percona_utils.delete_replication_user(slave_address)
|
||||
my_mock.execute.assert_called_with(sql)
|
||||
|
||||
@mock.patch.object(percona_utils, 'get_db_helper')
|
||||
def test_list_replication_users(self, mock_get_db_helper):
|
||||
my_mock = mock.Mock()
|
||||
mock_get_db_helper.return_value = my_mock
|
||||
my_mock.select.return_value = [['10.0.0.1'], ['10.0.0.2']]
|
||||
self.assertEqual(percona_utils.list_replication_users(),
|
||||
(['10.0.0.1', '10.0.0.2']))
|
||||
|
|
Loading…
Reference in New Issue