Send writes to the cluster RW primary

Before this change, DB setup for client applications would occur on
whichever node was the juju leader. This may not be the cluster RW
primary node. This led to Bug #1866164.

This change guarantees that any post-clustering writes get sent to the
cluster RW primary node.

Change-Id: If6beb4450be755a1b204c63759de32d486201202
Closes-Bug: #1866164
This commit is contained in:
David Ames 2020-04-09 14:34:43 -07:00
parent f32212e068
commit 2ee2031a5f
2 changed files with 93 additions and 10 deletions

View File

@ -413,6 +413,23 @@ class MySQLInnoDBClusterCharm(charms_openstack.charm.OpenStackCharm):
upasswdf_template="/var/lib/charm/{}/mysql-{{}}.passwd"
.format(ch_core.hookenv.service_name()))
def get_cluster_rw_db_helper(self):
"""Get connected RW instance of the MySQLDB8Helper class.
Connect to the RW cluster primary node and return a DB helper.
:param self: Self
:type self: MySQLInnoDBClusterCharm instance
:returns: Instance of MySQLDB8Helper class
:rtype: MySQLDB8Helper instance
"""
_helper = self.get_db_helper()
_helper.connect(
user=self.cluster_user,
password=self.cluster_password,
host=self.get_cluster_primary_address(nocache=True))
return _helper
def create_cluster_user(
self, cluster_address, cluster_user, cluster_password):
"""Create cluster user and grant permissions in the MySQL DB.
@ -757,6 +774,32 @@ class MySQLInnoDBClusterCharm(charms_openstack.charm.OpenStackCharm):
self._cached_cluster_status = json.loads(output.decode("UTF-8"))
return self._cached_cluster_status
def get_cluster_primary_address(self, nocache=False):
"""Get cluster RW primary address.
Return cluster.status()['groupInformationSourceMember'] which is the
primary R/W node in the cluster. This node is safe to use for writes
to the cluster.
:param self: Self
:type self: MySQLInnoDBClusterCharm instance
:param nocache: Do not return cached data
:type nocache: Boolean
:side effect: Calls self.get_cluster_status
:returns: String IP address
:rtype: Union[None, str]
"""
if self._cached_cluster_status and not nocache:
_status = self._cached_cluster_status
else:
_status = self.get_cluster_status(nocache=nocache)
if not _status:
return
# Return addresss without port number
if ":" in _status['groupInformationSourceMember']:
return _status['groupInformationSourceMember'][:-5]
return _status['groupInformationSourceMember']
def get_cluster_status_summary(self, nocache=False):
"""Get cluster status summary
@ -944,10 +987,10 @@ class MySQLInnoDBClusterCharm(charms_openstack.charm.OpenStackCharm):
level="DEBUG")
hosts = [hosts]
db_helper = self.get_db_helper()
rw_helper = self.get_cluster_rw_db_helper()
for host in hosts:
password = db_helper.configure_db(host, database, username)
password = rw_helper.configure_db(host, database, username)
return password
@ -981,10 +1024,10 @@ class MySQLInnoDBClusterCharm(charms_openstack.charm.OpenStackCharm):
level="DEBUG")
hosts = [hosts]
db_helper = self.get_db_helper()
rw_helper = self.get_cluster_rw_db_helper()
for host in hosts:
password = db_helper.configure_router(host, username)
password = rw_helper.configure_router(host, username)
return password
@ -1108,7 +1151,7 @@ class MySQLInnoDBClusterCharm(charms_openstack.charm.OpenStackCharm):
@tenacity.retry(wait=tenacity.wait_fixed(10),
reraise=True,
stop=tenacity.stop_after_delay(5))
stop=tenacity.stop_after_attempt(5))
def wait_until_connectable(
self, username=None, password=None, address=None):
"""Wait until MySQL instance is accessible.
@ -1139,7 +1182,7 @@ class MySQLInnoDBClusterCharm(charms_openstack.charm.OpenStackCharm):
@tenacity.retry(wait=tenacity.wait_fixed(10),
reraise=True,
stop=tenacity.stop_after_delay(5))
stop=tenacity.stop_after_attempt(5))
def wait_until_cluster_available(self):
"""Wait until MySQL InnoDB Cluster is available.

View File

@ -399,6 +399,20 @@ class TestMySQLInnoDBClusterCharm(test_utils.PatchHelper):
self.assertEqual(_helper, midbc.get_db_helper())
self.MySQL8Helper.assert_called_once()
def test_get_cluster_rw_db_helper(self):
_addr = "10.5.50.41"
_helper = mock.MagicMock()
midbc = mysql_innodb_cluster.MySQLInnoDBClusterCharm()
midbc.get_db_helper = mock.MagicMock()
midbc.get_db_helper.return_value = _helper
midbc.get_cluster_primary_address = mock.MagicMock()
midbc.get_cluster_primary_address.return_value = _addr
self.assertEqual(_helper, midbc.get_cluster_rw_db_helper())
_helper.connect.assert_called_once_with(
user=midbc.cluster_user,
password=midbc.cluster_password,
host=_addr)
def test_create_cluster_user(self):
_user = "user"
_pass = "pass"
@ -719,8 +733,8 @@ class TestMySQLInnoDBClusterCharm(test_utils.PatchHelper):
_helper = mock.MagicMock()
_helper.configure_db.return_value = _pass
midbc = mysql_innodb_cluster.MySQLInnoDBClusterCharm()
midbc.get_db_helper = mock.MagicMock()
midbc.get_db_helper.return_value = _helper
midbc.get_cluster_rw_db_helper = mock.MagicMock()
midbc.get_cluster_rw_db_helper.return_value = _helper
# One host
self.assertEqual(
@ -749,8 +763,8 @@ class TestMySQLInnoDBClusterCharm(test_utils.PatchHelper):
_helper = mock.MagicMock()
_helper.configure_router.return_value = _pass
midbc = mysql_innodb_cluster.MySQLInnoDBClusterCharm()
midbc.get_db_helper = mock.MagicMock()
midbc.get_db_helper.return_value = _helper
midbc.get_cluster_rw_db_helper = mock.MagicMock()
midbc.get_cluster_rw_db_helper.return_value = _helper
# One host
self.assertEqual(
@ -914,6 +928,32 @@ class TestMySQLInnoDBClusterCharm(test_utils.PatchHelper):
self.assertEqual("OK", midbc.get_cluster_status_summary(nocache=True))
_status_obj.assert_called_once_with(nocache=True)
def test_get_cluster_primary_address(self):
_addr = "10.5.50.76"
_status_dict = {
"groupInformationSourceMember": "{}:3360".format(_addr)}
_status_obj = mock.MagicMock()
_status_obj.return_value = _status_dict
midbc = mysql_innodb_cluster.MySQLInnoDBClusterCharm()
midbc.get_cluster_status = _status_obj
self.assertEqual(_addr, midbc.get_cluster_primary_address())
_status_obj.assert_called_once_with(nocache=False)
# Cached data
_status_obj.reset_mock()
midbc._cached_cluster_status = _status_dict
self.assertEqual(_addr, midbc.get_cluster_primary_address())
_status_obj.assert_not_called()
# Nocache requested
_status_obj.reset_mock()
midbc._cached_cluster_status = _status_dict
self.assertEqual(
_addr, midbc.get_cluster_primary_address(nocache=True))
_status_obj.assert_called_once_with(nocache=True)
def test_get_cluster_status_text(self):
_status_dict = {"defaultReplicaSet": {"statusText": "Text"}}
_status_obj = mock.MagicMock()