Merge "Eject-replica-source chooses most recent slave"
This commit is contained in:
commit
d0fd88a11e
|
@ -379,11 +379,17 @@ class API(object):
|
|||
self._call("enable_as_master", AGENT_HIGH_TIMEOUT, self.version_cap,
|
||||
replica_source_config=replica_source_config)
|
||||
|
||||
# DEPRECATED: Maintain for API Compatibility
|
||||
def get_txn_count(self):
|
||||
LOG.debug("Executing get_txn_count.")
|
||||
return self._call("get_txn_count",
|
||||
AGENT_HIGH_TIMEOUT, self.version_cap)
|
||||
|
||||
def get_last_txn(self):
|
||||
LOG.debug("Executing get_last_txn.")
|
||||
return self._call("get_last_txn",
|
||||
AGENT_HIGH_TIMEOUT, self.version_cap)
|
||||
|
||||
def get_latest_txn_id(self):
|
||||
LOG.debug("Executing get_latest_txn_id.")
|
||||
return self._call("get_latest_txn_id",
|
||||
|
|
|
@ -256,10 +256,15 @@ class Manager(periodic_task.PeriodicTasks):
|
|||
replication = REPLICATION_STRATEGY_CLASS(context)
|
||||
replication.enable_as_master(app, replica_source_config)
|
||||
|
||||
# DEPRECATED: Maintain for API Compatibility
|
||||
def get_txn_count(self, context):
|
||||
LOG.debug("Calling get_txn_count")
|
||||
return MySqlApp(MySqlAppStatus.get()).get_txn_count()
|
||||
|
||||
def get_last_txn(self, context):
|
||||
LOG.debug("Calling get_last_txn")
|
||||
return MySqlApp(MySqlAppStatus.get()).get_last_txn()
|
||||
|
||||
def get_latest_txn_id(self, context):
|
||||
LOG.debug("Calling get_latest_txn_id.")
|
||||
return MySqlApp(MySqlAppStatus.get()).get_latest_txn_id()
|
||||
|
|
|
@ -987,6 +987,7 @@ class MySqlApp(object):
|
|||
LOG.info(_("Resetting configuration."))
|
||||
self._write_mycnf(None, config_contents)
|
||||
|
||||
# DEPRECATED: Mantain for API Compatibility
|
||||
def get_txn_count(self):
|
||||
LOG.info(_("Retrieving latest txn id."))
|
||||
txn_count = 0
|
||||
|
@ -1001,11 +1002,32 @@ class MySqlApp(object):
|
|||
txn_count += 1
|
||||
return txn_count
|
||||
|
||||
def _get_slave_status(self):
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
return client.execute('SHOW SLAVE STATUS').first()
|
||||
|
||||
def _get_master_UUID(self):
|
||||
slave_status = self._get_slave_status()
|
||||
return slave_status and slave_status['Master_UUID'] or None
|
||||
|
||||
def _get_gtid_executed(self):
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
return client.execute('SELECT @@global.gtid_executed').first()[0]
|
||||
|
||||
def get_last_txn(self):
|
||||
master_UUID = self._get_master_UUID()
|
||||
last_txn_id = '0'
|
||||
gtid_executed = self._get_gtid_executed()
|
||||
for gtid_set in gtid_executed.split(','):
|
||||
uuid_set = gtid_set.split(':')
|
||||
if uuid_set[0] == master_UUID:
|
||||
last_txn_id = uuid_set[-1].split('-')[-1]
|
||||
break
|
||||
return master_UUID, int(last_txn_id)
|
||||
|
||||
def get_latest_txn_id(self):
|
||||
LOG.info(_("Retrieving latest txn id."))
|
||||
with LocalSqlClient(get_engine()) as client:
|
||||
result = client.execute('SELECT @@global.gtid_executed').first()
|
||||
return result[0]
|
||||
return self._get_gtid_executed()
|
||||
|
||||
def wait_for_txn(self, txn):
|
||||
LOG.info(_("Waiting on txn '%s'.") % txn)
|
||||
|
|
|
@ -12,6 +12,9 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from sets import Set
|
||||
|
||||
from oslo import messaging
|
||||
from trove.common.context import TroveContext
|
||||
|
||||
|
@ -23,6 +26,7 @@ from trove.common.i18n import _
|
|||
import trove.common.rpc.version as rpc_version
|
||||
from trove.common import exception
|
||||
from trove.common.exception import ReplicationSlaveAttachError
|
||||
from trove.common.exception import TroveError
|
||||
from trove.common.strategies.cluster import strategy
|
||||
import trove.extensions.mgmt.instances.models as mgmtmodels
|
||||
from trove.instance.tasks import InstanceTasks
|
||||
|
@ -157,22 +161,24 @@ class Manager(periodic_task.PeriodicTasks):
|
|||
InstanceTasks.PROMOTION_ERROR)
|
||||
raise
|
||||
|
||||
# pulled out to facilitate testing
|
||||
def _get_replica_txns(self, replica_models):
|
||||
return [[repl] + repl.get_last_txn() for repl in replica_models]
|
||||
|
||||
def _most_current_replica(self, old_master, replica_models):
|
||||
last_txns = self._get_replica_txns(replica_models)
|
||||
master_ids = [txn[1] for txn in last_txns if txn[1]]
|
||||
if len(Set(master_ids)) > 1:
|
||||
raise TroveError(_("Replicas of %s not all replicating"
|
||||
" from same master") % old_master.id)
|
||||
return sorted(last_txns, key=lambda x: x[2], reverse=True)[0][0]
|
||||
|
||||
def eject_replica_source(self, context, instance_id):
|
||||
|
||||
def _eject_replica_source(old_master, replica_models):
|
||||
|
||||
# Select the slave with the greatest number of transactions to
|
||||
# be the new master.
|
||||
# TODO(mwj): Replace this heuristic with code to store the
|
||||
# site id of the master then use it to determine which slave
|
||||
# has the most recent txn from that master.
|
||||
master_candidate = None
|
||||
max_txn_count = 0
|
||||
for replica in replica_models:
|
||||
txn_count = replica.get_txn_count()
|
||||
if txn_count > max_txn_count:
|
||||
master_candidate = replica
|
||||
max_txn_count = txn_count
|
||||
master_candidate = self._most_current_replica(old_master,
|
||||
replica_models)
|
||||
|
||||
master_ips = old_master.detach_public_ips()
|
||||
slave_ips = master_candidate.detach_public_ips()
|
||||
|
|
|
@ -1100,9 +1100,9 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
|
|||
self.slave_list = None
|
||||
self.guest.enable_as_master(replica_source_config.config_contents)
|
||||
|
||||
def get_txn_count(self):
|
||||
LOG.debug("Calling get_txn_count on %s" % self.id)
|
||||
return self.guest.get_txn_count()
|
||||
def get_last_txn(self):
|
||||
LOG.debug("Calling get_last_txn on %s" % self.id)
|
||||
return self.guest.get_last_txn()
|
||||
|
||||
def get_latest_txn_id(self):
|
||||
LOG.debug("Calling get_latest_txn_id on %s" % self.id)
|
||||
|
|
|
@ -26,6 +26,7 @@ import trove.guestagent.datastore.mysql.service as dbaas
|
|||
from trove.guestagent import backup
|
||||
from trove.guestagent.volume import VolumeDevice
|
||||
from trove.guestagent import pkg as pkg
|
||||
from proboscis.asserts import assert_equal
|
||||
|
||||
|
||||
class GuestAgentManagerTest(testtools.TestCase):
|
||||
|
@ -361,3 +362,40 @@ class GuestAgentManagerTest(testtools.TestCase):
|
|||
self.manager.demote_replication_master(self.context)
|
||||
# assertions
|
||||
self.assertEqual(mock_replication.demote_master.call_count, 1)
|
||||
|
||||
def test_get_master_UUID(self):
|
||||
app = dbaas.MySqlApp(None)
|
||||
|
||||
def test_case(slave_status, expected_value):
|
||||
with patch.object(dbaas.MySqlApp, '_get_slave_status',
|
||||
return_value=slave_status):
|
||||
assert_equal(app._get_master_UUID(), expected_value)
|
||||
|
||||
test_case({'Master_UUID': '2a5b-2064-32fb'}, '2a5b-2064-32fb')
|
||||
test_case({'Master_UUID': ''}, None)
|
||||
test_case({}, None)
|
||||
|
||||
def test_get_last_txn(self):
|
||||
|
||||
def test_case(gtid_list, expected_value):
|
||||
with patch.object(dbaas.MySqlApp, '_get_gtid_executed',
|
||||
return_value=gtid_list):
|
||||
txn = self.manager.get_last_txn(self.context)
|
||||
assert_equal(txn, expected_value)
|
||||
|
||||
with patch.object(dbaas.MySqlApp, '_get_slave_status',
|
||||
return_value={'Master_UUID': '2a5b-2064-32fb'}):
|
||||
test_case('2a5b-2064-32fb:1', ('2a5b-2064-32fb', 1))
|
||||
test_case('2a5b-2064-32fb:1-5', ('2a5b-2064-32fb', 5))
|
||||
test_case('2a5b-2064-32fb:1,4b4-23:5', ('2a5b-2064-32fb', 1))
|
||||
test_case('4b4-23:5,2a5b-2064-32fb:1', ('2a5b-2064-32fb', 1))
|
||||
test_case('4b-23:5,2a5b-2064-32fb:1,25:3-4', ('2a5b-2064-32fb', 1))
|
||||
test_case('4b4-23:1-5,2a5b-2064-32fb:1-10', ('2a5b-2064-32fb', 10))
|
||||
|
||||
with patch.object(dbaas.MySqlApp, '_get_slave_status',
|
||||
return_value={'Master_UUID': ''}):
|
||||
test_case('2a5b-2064-32fb:1', (None, 0))
|
||||
|
||||
with patch.object(dbaas.MySqlApp, '_get_slave_status',
|
||||
return_value={}):
|
||||
test_case('2a5b-2064-32fb:1', (None, 0))
|
||||
|
|
|
@ -15,12 +15,44 @@
|
|||
# under the License.
|
||||
|
||||
from testtools import TestCase
|
||||
from mock import Mock, patch
|
||||
|
||||
from trove.taskmanager.manager import Manager
|
||||
from trove.common.exception import TroveError
|
||||
from proboscis.asserts import assert_equal
|
||||
|
||||
|
||||
class TestManager(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestManager, self).setUp()
|
||||
self.manager = Manager()
|
||||
|
||||
def tearDown(self):
|
||||
super(TestManager, self).tearDown()
|
||||
self.manager = None
|
||||
|
||||
def test_getattr_lookup(self):
|
||||
self.assertTrue(callable(Manager().delete_cluster))
|
||||
self.assertTrue(callable(Manager().mongodb_add_shard_cluster))
|
||||
self.assertTrue(callable(self.manager.delete_cluster))
|
||||
self.assertTrue(callable(self.manager.mongodb_add_shard_cluster))
|
||||
|
||||
def test_most_current_replica(self):
|
||||
master = Mock()
|
||||
master.id = 32
|
||||
|
||||
def test_case(txn_list, selected_master):
|
||||
with patch.object(self.manager, '_get_replica_txns',
|
||||
return_value=txn_list):
|
||||
result = self.manager._most_current_replica(master, None)
|
||||
assert_equal(result, selected_master)
|
||||
|
||||
with self.assertRaisesRegexp(TroveError,
|
||||
'not all replicating from same'):
|
||||
test_case([['a', '2a99e-32bf', 2], ['b', '2a', 1]], None)
|
||||
|
||||
test_case([['a', '2a99e-32bf', 2]], 'a')
|
||||
test_case([['a', '2a', 1], ['b', '2a', 2]], 'b')
|
||||
test_case([['a', '2a', 2], ['b', '2a', 1]], 'a')
|
||||
test_case([['a', '2a', 1], ['b', '2a', 1]], 'a')
|
||||
test_case([['a', None, 0]], 'a')
|
||||
test_case([['a', None, 0], ['b', '2a', 1]], 'b')
|
||||
|
|
Loading…
Reference in New Issue