MySQL asynchronous replication

This patchset implements new relations: "master" and "slave" based
on a common "mysql-async-replication" interface which are used for
the purpose on enabling MySQL asynchronous replication between
multiple Percona XtraDB Clusters.

Change-Id: I94710bff17091516875c81ca769d8078ef5efd10
Closes-Bug: 1776171
This commit is contained in:
Tytus Kurek 2018-11-21 17:06:22 +01:00
parent 117f2bd290
commit e116b1ef86
15 changed files with 819 additions and 4 deletions

View File

@ -124,6 +124,28 @@ If both 'vip' and 'dns-ha' are set, as they are mutually exclusive
If 'dns-ha' is set and 'os-access-hostname' is not set
If the 'access' binding is not set and 'dns-ha' is set, consumers of the db may not be allowed to connect
## MySQL asynchronous replication
This charm supports MySQL asynchronous replication feature which can be used
to replicate databases between multiple Percona XtraDB Clusters. In order to
setup master-slave replication of "example1" and "example2" databases between
"pxc1" and "pxc2" applications, first configure mandatory options:
juju config pxc1 databases-to-replicate="database1:table1,table2;database2"
juju config pxc2 databases-to-replicate="database1:table1,table2;database2"
juju config pxc1 cluster-id=1
juju config pxc2 cluster-id=2
and then relate them:
juju relate pxc1:master pxc2:slave
In order to setup master-master replication, add another relation:
juju relate pxc2:master pxc1:slave
In the same way circular replication can be setup between multiple clusters.
## Network Space support
This charm supports the use of Juju Network Spaces, allowing the charm to be bound

View File

@ -321,3 +321,23 @@ options:
WARNING Please read all documentation before changing the default value which may have
unintended consequences. It may be necessary to set this value higher during deploy time
(PT15S) and subsequently change it back to the default (PT3S) after deployment.
databases-to-replicate:
type: string
default:
description: |
Databases and tables to replicate using MySQL asynchronous replication.
The databases should be separated with a semicolon while the tables
should be separated with a comma. No tables mean that the whole database
will be replicated. For example "database1:table1,table2;database2"
will replicate "table1" and "table2" tables from "database1" databasae
and all tables from "database2" database.
.
NOTE: This option should be used only when relating one cluster to the
other. It does not affect Galera synchronous replication.
cluster-id:
type: int
default:
description: |
Cluster ID to be used when using MySQL asynchronous replication.
.
NOTE: This value must be different for each cluster.

View File

@ -0,0 +1 @@
percona_hooks.py

View File

@ -0,0 +1 @@
percona_hooks.py

View File

@ -0,0 +1 @@
percona_hooks.py

View File

@ -37,6 +37,7 @@ from charmhelpers.core.host import (
lsb_release,
mkdir,
CompareHostReleases,
pwgen,
)
from charmhelpers.core.templating import render
from charmhelpers.fetch import (
@ -122,6 +123,15 @@ from percona_utils import (
pause_unit_helper,
resume_unit_helper,
check_for_socket,
get_cluster_id,
get_databases_to_replicate,
configure_master,
configure_slave,
deconfigure_slave,
get_master_status,
get_slave_status,
delete_replication_user,
list_replication_users,
)
from charmhelpers.core.unitdata import kv
@ -233,6 +243,11 @@ def render_config(hosts=None):
context['innodb_autoinc_lock_mode'] = '2'
context['pxc_strict_mode'] = config('pxc-strict-mode')
if config('databases-to-replicate'):
context['databases_to_replicate'] = get_databases_to_replicate()
context['server-id'] = get_server_id()
context.update(PerconaClusterHelper().parse_config())
render(os.path.basename(config_file), config_file, context, perms=0o444)
@ -560,6 +575,10 @@ def config_changed():
update_root_password()
set_ready_on_peers()
# NOTE(tkurek): re-set 'master' relation data
if relation_ids('master'):
master_joined()
@hooks.hook('cluster-relation-joined')
def cluster_joined():
@ -937,6 +956,12 @@ def ha_relation_changed():
def leader_settings_changed():
'''Re-trigger install once leader has seeded passwords into install'''
config_changed()
# NOTE(tkurek): re-set 'master' relation data
if relation_ids('master'):
master_joined()
# NOTE(tkurek): deconfigure old leader
if relation_ids('slave'):
deconfigure_slave()
@hooks.hook('leader-elected')
@ -947,6 +972,12 @@ def leader_elected():
else:
log('leader-elected hook executed, but this unit is not the leader',
level=INFO)
# NOTE(tkurek): re-set 'master' relation data
if relation_ids('master'):
master_joined()
# NOTE(tkurek): configure new leader
if relation_ids('slave'):
configure_slave()
@hooks.hook('nrpe-external-master-relation-joined',
@ -966,6 +997,106 @@ def update_nrpe_config():
nrpe_setup.write()
@hooks.hook('master-relation-joined')
def master_joined(interface='master'):
cluster_id = get_cluster_id()
if not is_clustered():
log("Not clustered yet", level=DEBUG)
return
relation_settings = {}
leader_settings = leader_get()
if is_leader():
if not leader_settings.get('async-rep-password'):
# Replication password cannot be longer than 32 characters
leader_set({'async-rep-password': pwgen(32)})
return
configure_master()
master_address, master_file, master_position = (
get_master_status(interface))
if leader_settings.get('master-address') is not master_address:
leader_settings['master-address'] = master_address
leader_settings['master-file'] = master_file
leader_settings['master-position'] = master_position
leader_set(leader_settings)
relation_settings = {'leader': True}
else:
relation_settings = {'leader': False}
relation_settings['cluster_id'] = cluster_id
relation_settings['master_address'] = leader_settings['master-address']
relation_settings['master_file'] = leader_settings['master-file']
relation_settings['master_password'] = \
leader_settings['async-rep-password']
relation_settings['master_position'] = leader_settings['master-position']
log("Setting master relation: '{}'".format(relation_settings), level=INFO)
for rid in relation_ids(interface):
relation_set(relation_id=rid, relation_settings=relation_settings)
@hooks.hook('master-relation-changed')
def master_changed(interface='master'):
if is_leader():
configure_master()
@hooks.hook('master-relation-departed')
def master_departed(interface='master'):
if is_leader():
reset_password = True
new_slave_addresses = []
old_slave_addresses = list_replication_users()
for rid in relation_ids(interface):
if related_units(rid):
reset_password = False
for unit in related_units(rid):
if not relation_get(attribute='slave_address',
rid=rid, unit=unit):
log("No relation data for {}".format(unit), level=DEBUG)
return
new_slave_addresses.append(
relation_get(attribute='slave_address',
rid=rid,
unit=unit))
for old_slave_address in old_slave_addresses:
if old_slave_address not in new_slave_addresses:
delete_replication_user(old_slave_address)
if reset_password:
leader_set({'async-rep-password': ''})
@hooks.hook('slave-relation-joined')
def slave_joined(interface='slave'):
relation_settings = {}
cluster_id = get_cluster_id()
if not is_clustered():
log("Not clustered yet", level=DEBUG)
return
if is_leader():
configure_slave()
relation_settings = {'slave_address':
network_get_primary_address(interface)}
relation_settings['cluster_id'] = cluster_id
log("Setting slave relation: '{}'".format(relation_settings), level=INFO)
for rid in relation_ids(interface):
relation_set(relation_id=rid, relation_settings=relation_settings)
@hooks.hook('slave-relation-changed')
def slave_changed(interface='slave'):
for rid in relation_ids(interface):
for unit in related_units(rid):
rdata = relation_get(unit=unit, rid=rid)
if rdata.get('leader'):
if rdata.get('master_address') is not get_slave_status():
slave_departed()
slave_joined()
@hooks.hook('slave-relation-departed')
def slave_departed():
if is_leader():
deconfigure_slave()
@hooks.hook('update-status')
@harden()
def update_status():

View File

@ -3,8 +3,11 @@ import subprocess
from subprocess import Popen, PIPE
import socket
import tempfile
import copy
import os
import re
import shutil
import six
import uuid
from functools import partial
import time
@ -1078,12 +1081,48 @@ def get_wsrep_provider_options():
return ';'.join(wsrep_provider_options)
class ClusterIDRequired(Exception):
pass
class ClusterIDIdentical(Exception):
pass
def get_cluster_id():
""" Return cluster id (lp1776171)
Return cluster ID for MySQL asynchronous replication
:returns: int cluster_id
"""
if not config('cluster-id'):
msg = ("Master / Slave relation requires 'cluster-id' option")
status_set("blocked", msg)
raise ClusterIDRequired(msg)
cluster_id = config('cluster-id')
for rid in relation_ids('master'):
for unit in related_units(rid):
if relation_get(attribute='cluster_id',
rid=rid,
unit=unit) == cluster_id:
msg = ("'cluster-id' option must be unique within a cluster")
status_set('blocked', msg)
raise ClusterIDIdentical(msg)
for rid in relation_ids('slave'):
for unit in related_units(rid):
if relation_get(attribute='cluster_id',
rid=rid,
unit=unit) == cluster_id:
msg = ("'cluster-id' option must be unique within a cluster")
status_set('blocked', msg)
raise ClusterIDIdentical(msg)
return cluster_id
def get_server_id():
""" Return unique server id for bin log replication
Server ID must be a unique, non-zero, positive number from 1 to 2**32 - 1
https://dev.mysql.com/doc/refman/8.0/en/replication-options.html
:returns: int server_id
"""
MAX_SERVER_ID = 2**32 - 1
@ -1128,3 +1167,283 @@ def check_for_socket(file_name, exists=True, sleep=10, attempts=12):
# If we get here throw an exception
raise Exception("Socket {} not found after {} attempts."
.format(file_name, attempts))
class InvalidDatabasesToReplicate(Exception):
pass
class InvalidCharacters(Exception):
pass
def get_databases_to_replicate():
""" Get databases_to_replicate (lp1776171)
Returns databases and tables to replicate using MySQL asynchronous
replication
:returns: list of dicts of databases and tables to replicate
:rtype: [{'database': str, 'tables': [str, ...]}, ...]
:raises: OperationalError
"""
if not config('cluster-id'):
msg = ("'cluster-id' option must be set when using "
"'databases-to-replicate' option")
status_set('blocked', msg)
raise ClusterIDRequired(msg)
databases_to_replicate = []
entries = config('databases-to-replicate').strip().split(';')
try:
for entry in entries:
databases_and_tables = {}
entry_split = entry.split(':')
databases_and_tables['database'] = (
check_invalid_chars(entry_split[0]))
try:
# Tables present
databases_and_tables['tables'] = (
check_invalid_chars(entry_split[1].split(',')))
except IndexError:
# If there are no tables
databases_and_tables['tables'] = []
databases_to_replicate.append(databases_and_tables)
except InvalidCharacters as e:
raise InvalidDatabasesToReplicate(
"The configuration setting databases-to-replicate is malformed. {}"
.format(e.message))
return databases_to_replicate
def check_invalid_chars(data, bad_chars_re="[\^\\/?%*:|\"'<>., ]"):
""" Check for invalid characters
Run a pattern check on the data and raise an InvalidCharacters exception
if there is a match. Return the original data untouched if no match is
found.
Input can be a list or a string.
:param data: List or string under test
:type data: str or list
:param bad_chars_re: String regex to check against
:type bad_chars_re: str
:raises: InvalidCharacters
:returns: The original data untouched
:rtype: str or list
"""
if isinstance(data, six.string_types):
data_strings = [data]
else:
data_strings = copy.copy(data)
for data_string in data_strings:
m = re.search(bad_chars_re, data_string)
if m:
raise(InvalidCharacters(
"Invalid character '{}' in '{}'"
.format(m.group(0), data_string)))
return data
def configure_master():
""" Configure master (lp1776171)
Calls 'create_replication_user' function for IP addresses of all related
units.
"""
new_slave_addresses = []
old_slave_addresses = list_replication_users()
for rid in relation_ids('master'):
for unit in related_units(rid):
if not relation_get(attribute='slave_address', rid=rid, unit=unit):
log("No IP address for {} yet".format(unit), level=DEBUG)
return
new_slave_addresses.append(
relation_get(attribute='slave_address', rid=rid, unit=unit))
# If not yet created
for new_slave_address in new_slave_addresses:
if new_slave_address not in old_slave_addresses:
create_replication_user(new_slave_address,
leader_get('async-rep-password'))
def configure_slave():
""" Configure slave (lp1776171)
Configures MySQL asynchronous replication slave.
:raises: OperationalError
"""
rel_data = {}
for rid in relation_ids('slave'):
for unit in related_units(rid):
rdata = relation_get(unit=unit, rid=rid)
is_leader = rdata.get('leader', None)
if is_leader is None:
log("No relation data for {} yet".format(unit), level=DEBUG)
continue
try:
if (is_leader and not(all(
rdata.get("master_{}".format(k)) for k in ["address",
"file",
"password",
"position"]))):
log("No full relation data for {} yet".format(unit),
level=DEBUG)
continue
m_helper = get_db_helper()
try:
m_helper.connect(user='replication',
password=rdata.get('master_password'),
host=rdata.get('master_address'))
rel_data['master_address'] = rdata.get('master_address')
rel_data['master_file'] = rdata.get('master_file')
rel_data['master_password'] = rdata.get('master_password')
rel_data['master_position'] = rdata.get('master_position')
except OperationalError:
log("Could not connect to {}".format(unit), level=DEBUG)
except KeyError:
log("No relation data for {} yet".format(unit), level=DEBUG)
raise
if not rel_data:
log("Unable to find the master", level=DEBUG)
return
m_helper = get_db_helper()
try:
m_helper.connect(password=m_helper.get_mysql_root_password())
except OperationalError:
log("Could not connect to db", level=DEBUG)
return
m_helper.execute("STOP SLAVE;")
m_helper.execute(("CHANGE MASTER TO master_host='{master_address}', "
"master_port=3306, master_user='replication', "
"master_password='{master_password}', "
"master_log_file='{master_file}', "
"master_log_pos={master_position};").format(**rel_data))
m_helper.execute("START SLAVE;")
def deconfigure_slave():
""" Deconfigure slave (lp1776171)
Deconfigures MySQL asynchronous replication slave on relation departure.
:raises: OperationalError
"""
m_helper = get_db_helper()
try:
m_helper.connect(password=m_helper.get_mysql_root_password())
except OperationalError:
log("Could not connect to db", level=DEBUG)
return
m_helper.execute("STOP SLAVE;")
m_helper.execute("RESET SLAVE ALL;")
def get_master_status(interface):
""" Get master status (lp1776171)
Returns MySQL asynchronous replication master status.
:param interface: relation name
:type interface: str
:returns: tuple of (IP address in space associated with 'master' binding,
replication file,
replication file position)
:rtype: (str, str, str)
:raises: OperationalError
"""
m_helper = get_db_helper()
try:
m_helper.connect(password=m_helper.get_mysql_root_password())
except OperationalError:
log("Could not connect to db", level=DEBUG)
raise
results = m_helper.select("SHOW MASTER STATUS;")
return network_get_primary_address(interface), results[0][0], results[0][1]
def get_slave_status():
""" Get slave status (lp1776171)
Returns MySQL asynchronous replication slave status.
returns: currently configured master IP address
rtype: str
:raises: OperationalError
"""
m_helper = get_db_helper()
try:
m_helper.connect(password=m_helper.get_mysql_root_password())
except OperationalError:
log("Could not connect to db", level=DEBUG)
raise
results = m_helper.select("SHOW SLAVE STATUS;")
return results[0][1]
def create_replication_user(slave_address, master_password):
""" Create replication user (lp1776171)
Grants access for MySQL asynchronous replication slave.
:param slave_address: slave IP address
:type slave_address: str
:param master_password: replication password
:type master_password: str
:raises: OperationalError
"""
m_helper = get_db_helper()
try:
m_helper.connect(password=m_helper.get_mysql_root_password())
except OperationalError:
log("Could not connect to db", level=DEBUG)
return
m_helper.execute(("GRANT REPLICATION SLAVE ON *.* TO ""'replication'@'{}' "
"IDENTIFIED BY '{}';").format(slave_address,
master_password))
def delete_replication_user(slave_address):
""" Delete replication user (lp1776171)
Revokes access for MySQL asynchronous replication slave.
:param slave_address: slave IP address
:type slave_address: str
:raises: OperationalError
"""
m_helper = get_db_helper()
try:
m_helper.connect(password=m_helper.get_mysql_root_password())
except OperationalError:
log("Could not connect to db", DEBUG)
return
m_helper.execute(("DELETE FROM mysql.user WHERE Host='{}' AND "
"User='replication';").format(slave_address))
def list_replication_users():
""" List replication users (lp1776171)
Lists IP addresses of slaves which have been granted with an access for
MySQL asynchronous replication.
:returns: IP addresses of slaves
:rtype replication_users: [str]
:raises: OperationalError
"""
replication_users = []
m_helper = get_db_helper()
try:
m_helper.connect(password=m_helper.get_mysql_root_password())
except OperationalError:
log("Could not connect to db", DEBUG)
raise
for result in m_helper.select("SELECT Host FROM mysql.user WHERE "
"User='replication';"):
replication_users.append(result[0])
return replication_users

View File

@ -0,0 +1 @@
percona_hooks.py

View File

@ -0,0 +1 @@
percona_hooks.py

1
hooks/slave-relation-joined Symbolic link
View File

@ -0,0 +1 @@
percona_hooks.py

View File

@ -24,6 +24,8 @@ provides:
nrpe-external-master:
interface: nrpe-external-master
scope: container
master:
interface: mysql-async-replication
peers:
cluster:
interface: percona-cluster
@ -31,3 +33,5 @@ requires:
ha:
interface: hacluster
scope: container
slave:
interface: mysql-async-replication

View File

@ -28,13 +28,28 @@ wsrep_cluster_address=gcomm://{{ cluster_hosts }}
# In order for Galera to work correctly binlog format should be ROW
binlog_format=ROW
{% if enable_binlogs -%}
{% if enable_binlogs or databases_to_replicate -%}
server_id = {{ server_id }}
log_bin={{ binlogs_path }}
expire_logs_days={{ binlogs_expire_days }}
max_binlog_size={{ binlogs_max_size }}
{% endif %}
{% if databases_to_replicate -%}
# MySQL synchronous replication
log_slave_updates = 1
{% for entry in databases_to_replicate -%}
replicate_do_db = {{ entry.database }}
{% if entry.tables -%}
{% for table in entry.tables -%}
replicate_do_table = {{ entry.database }}.{{ table }}
{% endfor %}
{% else -%}
replicate_wild_do_table = {{ entry.database }}.%
{% endif %}
{% endfor %}
{% endif %}
# MyISAM storage engine has only experimental support
default_storage_engine=InnoDB

View File

@ -72,7 +72,7 @@ log_error = /var/log/mysql/error.log
# The following can be used as easy to replay backup logs or for replication.
# note: if you are setting up a replication slave, see README.Debian about
# other settings you may need to change.
{% if enable_binlogs -%}
{% if enable_binlogs or databases_to_replicate -%}
server_id = {{ server_id }}
log_bin={{ binlogs_path }}
{% endif %}
@ -80,6 +80,21 @@ log_bin={{ binlogs_path }}
expire_logs_days = {{ binlogs_expire_days }}
max_binlog_size = {{ binlogs_max_size }}
{% if databases_to_replicate -%}
# MySQL asynchronous replication
log_slave_updates = 1
{% for entry in databases_to_replicate -%}
replicate_do_db = {{ entry.database }}
{% if entry.tables -%}
{% for table in entry.tables -%}
replicate_do_table = {{ entry.database }}.{{ table }}
{% endfor %}
{% else -%}
replicate_wild_do_table = {{ entry.database }}.%
{% endif %}
{% endfor %}
{% endif %}
# Required to allow trigger creation for openstack services
log_bin_trust_function_creators = 1

View File

@ -283,6 +283,102 @@ class TestNRPERelation(CharmTestCase):
mock.ANY, ["mysql"], mock.ANY)
class TestMasterRelation(CharmTestCase):
def setUp(self):
patch_targets_master = TO_PATCH[:]
patch_targets_master.extend(['configure_master',
'get_cluster_id',
'get_master_status',
'leader_set'])
CharmTestCase.setUp(self, hooks, patch_targets_master)
def test_master_joined_is_leader_and_no_leader_change(self):
self.relation_ids.return_value = ['master:1']
self.get_cluster_id.return_value = 1
self.is_clustered.return_value = True
self.leader_get.return_value = {'async-rep-password': 'password',
'master-address': '10.0.0.1',
'master-file': 'file1',
'master-position': 'position1'}
self.is_leader.return_value = True
self.configure_master.return_value = True
self.get_master_status.return_value = '10.0.0.1', 'file1', 'position1'
hooks.master_joined()
self.leader_set.assert_called_with(
{'async-rep-password': 'password',
'master-address': '10.0.0.1',
'master-file': 'file1',
'master-position': 'position1'})
self.relation_set.assert_called_with(
relation_id='master:1', relation_settings={
'leader': True, 'cluster_id': 1, 'master_address': '10.0.0.1',
'master_file': 'file1', 'master_password': 'password',
'master_position': 'position1'})
def test_master_joined_is_leader_and_leader_change(self):
self.relation_ids.return_value = ['master:1']
self.get_cluster_id.return_value = 1
self.is_clustered.return_value = True
self.leader_get.return_value = {'async-rep-password': 'password',
'master-address': '10.0.0.1',
'master-file': 'file1',
'master-position': 'position1'}
self.is_leader.return_value = True
self.configure_master.return_value = True
self.get_master_status.return_value = '10.0.0.2', 'file2', 'position2'
hooks.master_joined()
self.leader_set.assert_called_with(
{'async-rep-password': 'password',
'master-address': '10.0.0.2',
'master-file': 'file2',
'master-position': 'position2'})
self.relation_set.assert_called_with(
relation_id='master:1', relation_settings={
'leader': True, 'cluster_id': 1, 'master_address': '10.0.0.2',
'master_file': 'file2', 'master_password': 'password',
'master_position': 'position2'})
def test_master_joined_is_not_leader(self):
self.relation_ids.return_value = ['master:1']
self.get_cluster_id.return_value = 1
self.is_clustered.return_value = True
self.leader_get.return_value = {'async-rep-password': 'password',
'master-address': '10.0.0.1',
'master-file': 'file',
'master-position': 'position'}
self.is_leader.return_value = False
hooks.master_joined()
self.relation_set.assert_called_with(
relation_id='master:1', relation_settings={
'leader': False, 'cluster_id': 1, 'master_address': '10.0.0.1',
'master_file': 'file', 'master_password': 'password',
'master_position': 'position'})
class TestSlaveRelation(CharmTestCase):
def setUp(self):
patch_targets_slave = TO_PATCH[:]
patch_targets_slave.extend(["configure_slave",
"get_cluster_id"])
CharmTestCase.setUp(self, hooks, patch_targets_slave)
def test_slave_joined(self):
self.relation_ids.return_value = ['slave:1']
self.is_clustered.return_value = True
self.get_cluster_id.return_value = 1
self.is_leader.return_value = True
self.configure_slave.return_value = True
self.network_get_primary_address.return_value = '172.16.0.1'
hooks.slave_joined()
self.relation_set.assert_called_with(
relation_id='slave:1', relation_settings={
'slave_address': '172.16.0.1', 'cluster_id': 1})
class TestConfigChanged(CharmTestCase):
TO_PATCH = [

View File

@ -990,3 +990,190 @@ class TestUpdateBootstrapUUID(CharmTestCase):
'leader-ip': '10.10.10.10'}
self.leader_get.return_value = leader_config
self.assertTrue(percona_utils.is_leader_bootstrapped())
class TestAsynchronousReplication(CharmTestCase):
TO_PATCH = [
'config',
'leader_get',
'network_get_primary_address',
'related_units',
'relation_get',
'relation_ids',
]
def setUp(self):
super(TestAsynchronousReplication, self).setUp(percona_utils,
self.TO_PATCH)
@mock.patch.object(percona_utils, 'config')
def test_get_databases_to_replicate(self, mock_config):
config = {'cluster-id': 1, 'databases-to-replicate': 'db1:tb1,tb2;db2'}
mock_config.side_effect = lambda k: config.get(k)
percona_utils.get_databases_to_replicate()
self.assertEqual(percona_utils.get_databases_to_replicate(),
([{'database': 'db1', 'tables': ['tb1', 'tb2']},
{'database': 'db2', 'tables': []}]))
@mock.patch.object(percona_utils, 'create_replication_user')
@mock.patch.object(percona_utils, 'list_replication_users')
def test_configure_master_slave_address_not_in_relation_data(
self, mock_list_replication_users, mock_create_replication_user):
self.relation_ids.return_value = [1]
self.related_units.return_value = [1, 2, 3]
self.relation_get.return_value = None
percona_utils.configure_master()
mock_create_replication_user.assert_not_called()
@mock.patch.object(percona_utils, 'create_replication_user')
@mock.patch.object(percona_utils, 'list_replication_users')
def test_configure_master_slave_address_in_relation_data_and_created(
self, mock_list_replication_users, mock_create_replication_user):
self.relation_ids.return_value = [1]
self.related_units.return_value = [1, 2, 3]
def _mock_rel_get(*args, **kwargs):
unit_id = kwargs.get('unit')
return '10.0.1.{}'.format(unit_id)
self.relation_get.side_effect = _mock_rel_get
mock_list_replication_users.return_value = ['10.0.1.1',
'10.0.1.2',
'10.0.1.3']
percona_utils.configure_master()
mock_create_replication_user.assert_not_called()
@mock.patch.object(percona_utils, 'create_replication_user')
@mock.patch.object(percona_utils, 'list_replication_users')
def test_configure_master_slave_address_in_relation_data_and_not_created(
self, mock_list_replication_users, mock_create_replication_user):
self.relation_ids.return_value = [1]
self.related_units.return_value = [1, 2, 3]
def _mock_rel_get(*args, **kwargs):
unit_id = kwargs.get('unit')
return '10.0.1.{}'.format(unit_id)
self.relation_get.side_effect = _mock_rel_get
mock_list_replication_users.return_value = ['10.0.1.1', '10.0.1.2']
self.leader_get.return_value = 'password'
percona_utils.configure_master()
mock_create_replication_user.assert_called_once_with('10.0.1.3',
'password')
@mock.patch.object(percona_utils, 'get_db_helper')
def test_configure_slave_no_leader(
self, mock_get_db_helper):
my_mock = mock.Mock()
self.relation_ids.return_value = [1]
self.related_units.return_value = [1, 2, 3]
def _mock_rel_get(*args, **kwargs):
return {'private-address': '10.0.0.1'}
self.relation_get.side_effect = _mock_rel_get
mock_get_db_helper.return_value = my_mock
percona_utils.configure_slave()
my_mock.execute.assert_not_called()
@mock.patch.object(percona_utils, 'get_db_helper')
def test_configure_slave_leader_and_no_full_relation_data(
self, mock_get_db_helper):
my_mock = mock.Mock()
self.relation_ids.return_value = [1]
self.related_units.return_value = [1, 2, 3]
def _mock_rel_get(*args, **kwargs):
return {'private-address': '10.0.0.1',
'leader': True}
self.relation_get.side_effect = _mock_rel_get
mock_get_db_helper.return_value = my_mock
percona_utils.configure_slave()
my_mock.execute.assert_not_called()
@mock.patch.object(percona_utils, 'get_db_helper')
def test_configure_slave_leader_and_full_relation_data(
self, mock_get_db_helper):
my_mock = mock.Mock()
self.relation_ids.return_value = [1]
self.related_units.return_value = [1, 2, 3]
def _mock_rel_get(*args, **kwargs):
return {'private-address': '10.0.0.1',
'leader': True,
'master_address': '10.0.0.1',
'master_file': 'file',
'master_password': 'password',
'master_position': 'position'}
self.relation_get.side_effect = _mock_rel_get
mock_get_db_helper.return_value = my_mock
sql1 = "STOP SLAVE;"
sql2 = ("CHANGE MASTER TO "
"master_host='10.0.0.1', "
"master_port=3306, "
"master_user='replication', "
"master_password='password', "
"master_log_file='file', "
"master_log_pos=position;")
sql3 = "START SLAVE;"
percona_utils.configure_slave()
my_mock.execute.assert_any_call(sql1)
my_mock.execute.assert_any_call(sql2)
my_mock.execute.assert_any_call(sql3)
@mock.patch.object(percona_utils, 'get_db_helper')
def test_deconfigure_slave(self, mock_get_db_helper):
my_mock = mock.Mock()
mock_get_db_helper.return_value = my_mock
sql1 = "STOP SLAVE;"
sql2 = "RESET SLAVE ALL;"
percona_utils.deconfigure_slave()
my_mock.execute.assert_any_call(sql1)
my_mock.execute.assert_any_call(sql2)
@mock.patch.object(percona_utils, 'get_db_helper')
def test_get_master_status(self, mock_get_db_helper):
my_mock = mock.Mock()
self.network_get_primary_address.return_value = '10.0.0.1'
mock_get_db_helper.return_value = my_mock
my_mock.select.return_value = [['file', 'position']]
self.assertEqual(percona_utils.get_master_status('master'),
('10.0.0.1', 'file', 'position'))
@mock.patch.object(percona_utils, 'get_db_helper')
def test_get_slave_status(self, mock_get_db_helper):
my_mock = mock.Mock()
mock_get_db_helper.return_value = my_mock
my_mock.select.return_value = [['state', '10.0.0.1']]
self.assertEqual(percona_utils.get_slave_status(), ('10.0.0.1'))
@mock.patch.object(percona_utils, 'get_db_helper')
def test_create_replication_user(self, mock_get_db_helper):
my_mock = mock.Mock()
slave_address = '10.0.1.1'
master_password = 'password'
mock_get_db_helper.return_value = my_mock
sql = ("GRANT REPLICATION SLAVE ON *.* TO 'replication'@'{}' "
"IDENTIFIED BY '{}';").format(slave_address, master_password)
percona_utils.create_replication_user(slave_address, master_password)
my_mock.execute.assert_called_with(sql)
@mock.patch.object(percona_utils, 'get_db_helper')
def test_delete_replication_user(self, mock_get_db_helper):
my_mock = mock.Mock()
slave_address = '10.0.1.1'
mock_get_db_helper.return_value = my_mock
sql = ("DELETE FROM mysql.user WHERE Host='{}' AND "
"User='replication';").format(slave_address)
percona_utils.delete_replication_user(slave_address)
my_mock.execute.assert_called_with(sql)
@mock.patch.object(percona_utils, 'get_db_helper')
def test_list_replication_users(self, mock_get_db_helper):
my_mock = mock.Mock()
mock_get_db_helper.return_value = my_mock
my_mock.select.return_value = [['10.0.0.1'], ['10.0.0.2']]
self.assertEqual(percona_utils.list_replication_users(),
(['10.0.0.1', '10.0.0.2']))