Merge "Eject-replica-source chooses most recent slave"

This commit is contained in:
Jenkins 2015-04-09 20:25:06 +00:00 committed by Gerrit Code Review
commit d0fd88a11e
7 changed files with 129 additions and 20 deletions

View File

@ -379,11 +379,17 @@ class API(object):
self._call("enable_as_master", AGENT_HIGH_TIMEOUT, self.version_cap,
replica_source_config=replica_source_config)
# DEPRECATED: Maintain for API Compatibility
def get_txn_count(self):
LOG.debug("Executing get_txn_count.")
return self._call("get_txn_count",
AGENT_HIGH_TIMEOUT, self.version_cap)
def get_last_txn(self):
LOG.debug("Executing get_last_txn.")
return self._call("get_last_txn",
AGENT_HIGH_TIMEOUT, self.version_cap)
def get_latest_txn_id(self):
LOG.debug("Executing get_latest_txn_id.")
return self._call("get_latest_txn_id",

View File

@ -256,10 +256,15 @@ class Manager(periodic_task.PeriodicTasks):
replication = REPLICATION_STRATEGY_CLASS(context)
replication.enable_as_master(app, replica_source_config)
# DEPRECATED: Maintain for API Compatibility
def get_txn_count(self, context):
LOG.debug("Calling get_txn_count")
return MySqlApp(MySqlAppStatus.get()).get_txn_count()
def get_last_txn(self, context):
LOG.debug("Calling get_last_txn")
return MySqlApp(MySqlAppStatus.get()).get_last_txn()
def get_latest_txn_id(self, context):
LOG.debug("Calling get_latest_txn_id.")
return MySqlApp(MySqlAppStatus.get()).get_latest_txn_id()

View File

@ -987,6 +987,7 @@ class MySqlApp(object):
LOG.info(_("Resetting configuration."))
self._write_mycnf(None, config_contents)
# DEPRECATED: Mantain for API Compatibility
def get_txn_count(self):
LOG.info(_("Retrieving latest txn id."))
txn_count = 0
@ -1001,11 +1002,32 @@ class MySqlApp(object):
txn_count += 1
return txn_count
def _get_slave_status(self):
with LocalSqlClient(get_engine()) as client:
return client.execute('SHOW SLAVE STATUS').first()
def _get_master_UUID(self):
slave_status = self._get_slave_status()
return slave_status and slave_status['Master_UUID'] or None
def _get_gtid_executed(self):
with LocalSqlClient(get_engine()) as client:
return client.execute('SELECT @@global.gtid_executed').first()[0]
def get_last_txn(self):
master_UUID = self._get_master_UUID()
last_txn_id = '0'
gtid_executed = self._get_gtid_executed()
for gtid_set in gtid_executed.split(','):
uuid_set = gtid_set.split(':')
if uuid_set[0] == master_UUID:
last_txn_id = uuid_set[-1].split('-')[-1]
break
return master_UUID, int(last_txn_id)
def get_latest_txn_id(self):
LOG.info(_("Retrieving latest txn id."))
with LocalSqlClient(get_engine()) as client:
result = client.execute('SELECT @@global.gtid_executed').first()
return result[0]
return self._get_gtid_executed()
def wait_for_txn(self, txn):
LOG.info(_("Waiting on txn '%s'.") % txn)

View File

@ -12,6 +12,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sets import Set
from oslo import messaging
from trove.common.context import TroveContext
@ -23,6 +26,7 @@ from trove.common.i18n import _
import trove.common.rpc.version as rpc_version
from trove.common import exception
from trove.common.exception import ReplicationSlaveAttachError
from trove.common.exception import TroveError
from trove.common.strategies.cluster import strategy
import trove.extensions.mgmt.instances.models as mgmtmodels
from trove.instance.tasks import InstanceTasks
@ -157,22 +161,24 @@ class Manager(periodic_task.PeriodicTasks):
InstanceTasks.PROMOTION_ERROR)
raise
# pulled out to facilitate testing
def _get_replica_txns(self, replica_models):
return [[repl] + repl.get_last_txn() for repl in replica_models]
def _most_current_replica(self, old_master, replica_models):
last_txns = self._get_replica_txns(replica_models)
master_ids = [txn[1] for txn in last_txns if txn[1]]
if len(Set(master_ids)) > 1:
raise TroveError(_("Replicas of %s not all replicating"
" from same master") % old_master.id)
return sorted(last_txns, key=lambda x: x[2], reverse=True)[0][0]
def eject_replica_source(self, context, instance_id):
def _eject_replica_source(old_master, replica_models):
# Select the slave with the greatest number of transactions to
# be the new master.
# TODO(mwj): Replace this heuristic with code to store the
# site id of the master then use it to determine which slave
# has the most recent txn from that master.
master_candidate = None
max_txn_count = 0
for replica in replica_models:
txn_count = replica.get_txn_count()
if txn_count > max_txn_count:
master_candidate = replica
max_txn_count = txn_count
master_candidate = self._most_current_replica(old_master,
replica_models)
master_ips = old_master.detach_public_ips()
slave_ips = master_candidate.detach_public_ips()

View File

@ -1100,9 +1100,9 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
self.slave_list = None
self.guest.enable_as_master(replica_source_config.config_contents)
def get_txn_count(self):
LOG.debug("Calling get_txn_count on %s" % self.id)
return self.guest.get_txn_count()
def get_last_txn(self):
LOG.debug("Calling get_last_txn on %s" % self.id)
return self.guest.get_last_txn()
def get_latest_txn_id(self):
LOG.debug("Calling get_latest_txn_id on %s" % self.id)

View File

@ -26,6 +26,7 @@ import trove.guestagent.datastore.mysql.service as dbaas
from trove.guestagent import backup
from trove.guestagent.volume import VolumeDevice
from trove.guestagent import pkg as pkg
from proboscis.asserts import assert_equal
class GuestAgentManagerTest(testtools.TestCase):
@ -361,3 +362,40 @@ class GuestAgentManagerTest(testtools.TestCase):
self.manager.demote_replication_master(self.context)
# assertions
self.assertEqual(mock_replication.demote_master.call_count, 1)
def test_get_master_UUID(self):
app = dbaas.MySqlApp(None)
def test_case(slave_status, expected_value):
with patch.object(dbaas.MySqlApp, '_get_slave_status',
return_value=slave_status):
assert_equal(app._get_master_UUID(), expected_value)
test_case({'Master_UUID': '2a5b-2064-32fb'}, '2a5b-2064-32fb')
test_case({'Master_UUID': ''}, None)
test_case({}, None)
def test_get_last_txn(self):
def test_case(gtid_list, expected_value):
with patch.object(dbaas.MySqlApp, '_get_gtid_executed',
return_value=gtid_list):
txn = self.manager.get_last_txn(self.context)
assert_equal(txn, expected_value)
with patch.object(dbaas.MySqlApp, '_get_slave_status',
return_value={'Master_UUID': '2a5b-2064-32fb'}):
test_case('2a5b-2064-32fb:1', ('2a5b-2064-32fb', 1))
test_case('2a5b-2064-32fb:1-5', ('2a5b-2064-32fb', 5))
test_case('2a5b-2064-32fb:1,4b4-23:5', ('2a5b-2064-32fb', 1))
test_case('4b4-23:5,2a5b-2064-32fb:1', ('2a5b-2064-32fb', 1))
test_case('4b-23:5,2a5b-2064-32fb:1,25:3-4', ('2a5b-2064-32fb', 1))
test_case('4b4-23:1-5,2a5b-2064-32fb:1-10', ('2a5b-2064-32fb', 10))
with patch.object(dbaas.MySqlApp, '_get_slave_status',
return_value={'Master_UUID': ''}):
test_case('2a5b-2064-32fb:1', (None, 0))
with patch.object(dbaas.MySqlApp, '_get_slave_status',
return_value={}):
test_case('2a5b-2064-32fb:1', (None, 0))

View File

@ -15,12 +15,44 @@
# under the License.
from testtools import TestCase
from mock import Mock, patch
from trove.taskmanager.manager import Manager
from trove.common.exception import TroveError
from proboscis.asserts import assert_equal
class TestManager(TestCase):
def setUp(self):
super(TestManager, self).setUp()
self.manager = Manager()
def tearDown(self):
super(TestManager, self).tearDown()
self.manager = None
def test_getattr_lookup(self):
self.assertTrue(callable(Manager().delete_cluster))
self.assertTrue(callable(Manager().mongodb_add_shard_cluster))
self.assertTrue(callable(self.manager.delete_cluster))
self.assertTrue(callable(self.manager.mongodb_add_shard_cluster))
def test_most_current_replica(self):
master = Mock()
master.id = 32
def test_case(txn_list, selected_master):
with patch.object(self.manager, '_get_replica_txns',
return_value=txn_list):
result = self.manager._most_current_replica(master, None)
assert_equal(result, selected_master)
with self.assertRaisesRegexp(TroveError,
'not all replicating from same'):
test_case([['a', '2a99e-32bf', 2], ['b', '2a', 1]], None)
test_case([['a', '2a99e-32bf', 2]], 'a')
test_case([['a', '2a', 1], ['b', '2a', 2]], 'b')
test_case([['a', '2a', 2], ['b', '2a', 1]], 'a')
test_case([['a', '2a', 1], ['b', '2a', 1]], 'a')
test_case([['a', None, 0]], 'a')
test_case([['a', None, 0], ['b', '2a', 1]], 'b')