Merge "Flag initial cluster rebalances as bootstrap events"

This commit is contained in:
Jenkins 2015-10-16 16:45:47 +00:00 committed by Gerrit Code Review
commit ae2132ff93
3 changed files with 24 additions and 5 deletions

View File

@ -106,7 +106,7 @@ class RugCoordinator(object):
self._coordinator.heartbeat()
LOG.debug("Sending initial event changed for members; %s" %
self.members)
self.cluster_changed(event=None)
self.cluster_changed(event=None, node_bootstrap=True)
def run(self):
try:
@ -147,11 +147,20 @@ class RugCoordinator(object):
"""Returns true if the local cluster member is the leader"""
return self._coordinator.get_leader(self.group).get() == self.host
def cluster_changed(self, event):
def cluster_changed(self, event, node_bootstrap=False):
"""Event callback to be called by tooz on membership changes"""
LOG.debug('Broadcasting cluster changed event to trigger rebalance. '
'members=%s' % self.members)
body = {
'members': self.members
}
# Flag this as a local bootstrap rebalance rather than one in reaction
# to a cluster event.
if node_bootstrap:
body['node_bootstrap'] = True
r = ak_event.Resource(
tenant_id='*',
id='*',
@ -160,7 +169,7 @@ class RugCoordinator(object):
e = ak_event.Event(
resource=r,
crud=ak_event.REBALANCE,
body={'members': self.members}
body=body,
)
self._queue.put(('*', e))

View File

@ -60,7 +60,8 @@ class TestRugCoordinator(base.RugTestBase):
'foo_coord_group',
fake_cluster_changed)
self.assertTrue(self.fake_coord.heartbeat.called)
fake_cluster_changed.assert_called_with(event=None)
fake_cluster_changed.assert_called_with(
event=None, node_bootstrap=True)
def test_start_raises(self):
self.coordinator = coordination.RugCoordinator(self.queue)
@ -115,8 +116,9 @@ class TestRugCoordinator(base.RugTestBase):
self.assertEqual(self.coordinator.is_leader, True)
self.fake_coord.get_leader.assert_called_with(self.coordinator.group)
@mock.patch('akanda.rug.coordination.RugCoordinator.start')
@mock.patch('akanda.rug.coordination.RugCoordinator.members')
def test_cluster_changed(self, fake_members):
def test_cluster_changed(self, fake_members, fake_start):
fake_members.__get__ = mock.Mock(return_value=['foo', 'bar'])
self.coordinator = coordination.RugCoordinator(self.queue)
expected_rebalance_event = event.Event(

View File

@ -419,6 +419,14 @@ class Worker(object):
def _rebalance(self, message):
self.hash_ring_mgr.rebalance(message.body.get('members'))
# We leverage the rebalance event to both seed the local node's
# hash ring when it comes online, and to also rebalance it in
# reaction to cluster events. Exit early if we're only responding
# to a bootstrapping rebalance, we don't need to worry about adjusting
# state because there is none yet.
if message.body.get('node_bootstrap'):
return
# After we rebalance, we need to repopulate state machines
# for any resources that now map here. This is required
# otherwise commands that hash here will not be delivered