Merge "Flag initial cluster rebalances as bootstrap events"
This commit is contained in:
commit
ae2132ff93
|
@ -106,7 +106,7 @@ class RugCoordinator(object):
|
|||
self._coordinator.heartbeat()
|
||||
LOG.debug("Sending initial event changed for members; %s" %
|
||||
self.members)
|
||||
self.cluster_changed(event=None)
|
||||
self.cluster_changed(event=None, node_bootstrap=True)
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
|
@ -147,11 +147,20 @@ class RugCoordinator(object):
|
|||
"""Returns true if the local cluster member is the leader"""
|
||||
return self._coordinator.get_leader(self.group).get() == self.host
|
||||
|
||||
def cluster_changed(self, event):
|
||||
def cluster_changed(self, event, node_bootstrap=False):
|
||||
"""Event callback to be called by tooz on membership changes"""
|
||||
LOG.debug('Broadcasting cluster changed event to trigger rebalance. '
|
||||
'members=%s' % self.members)
|
||||
|
||||
body = {
|
||||
'members': self.members
|
||||
}
|
||||
|
||||
# Flag this as a local bootstrap rebalance rather than one in reaction
|
||||
# to a cluster event.
|
||||
if node_bootstrap:
|
||||
body['node_bootstrap'] = True
|
||||
|
||||
r = ak_event.Resource(
|
||||
tenant_id='*',
|
||||
id='*',
|
||||
|
@ -160,7 +169,7 @@ class RugCoordinator(object):
|
|||
e = ak_event.Event(
|
||||
resource=r,
|
||||
crud=ak_event.REBALANCE,
|
||||
body={'members': self.members}
|
||||
body=body,
|
||||
)
|
||||
self._queue.put(('*', e))
|
||||
|
||||
|
|
|
@ -60,7 +60,8 @@ class TestRugCoordinator(base.RugTestBase):
|
|||
'foo_coord_group',
|
||||
fake_cluster_changed)
|
||||
self.assertTrue(self.fake_coord.heartbeat.called)
|
||||
fake_cluster_changed.assert_called_with(event=None)
|
||||
fake_cluster_changed.assert_called_with(
|
||||
event=None, node_bootstrap=True)
|
||||
|
||||
def test_start_raises(self):
|
||||
self.coordinator = coordination.RugCoordinator(self.queue)
|
||||
|
@ -115,8 +116,9 @@ class TestRugCoordinator(base.RugTestBase):
|
|||
self.assertEqual(self.coordinator.is_leader, True)
|
||||
self.fake_coord.get_leader.assert_called_with(self.coordinator.group)
|
||||
|
||||
@mock.patch('akanda.rug.coordination.RugCoordinator.start')
|
||||
@mock.patch('akanda.rug.coordination.RugCoordinator.members')
|
||||
def test_cluster_changed(self, fake_members):
|
||||
def test_cluster_changed(self, fake_members, fake_start):
|
||||
fake_members.__get__ = mock.Mock(return_value=['foo', 'bar'])
|
||||
self.coordinator = coordination.RugCoordinator(self.queue)
|
||||
expected_rebalance_event = event.Event(
|
||||
|
|
|
@ -419,6 +419,14 @@ class Worker(object):
|
|||
def _rebalance(self, message):
|
||||
self.hash_ring_mgr.rebalance(message.body.get('members'))
|
||||
|
||||
# We leverage the rebalance event to both seed the local node's
|
||||
# hash ring when it comes online, and to also rebalance it in
|
||||
# reaction to cluster events. Exit early if we're only responding
|
||||
# to a bootstrapping rebalance, we don't need to worry about adjusting
|
||||
# state because there is none yet.
|
||||
if message.body.get('node_bootstrap'):
|
||||
return
|
||||
|
||||
# After we rebalance, we need to repopulate state machines
|
||||
# for any resources that now map here. This is required
|
||||
# otherwise commands that hash here will not be delivered
|
||||
|
|
Loading…
Reference in New Issue