Cleanup SM management during rebalance events.

This cleans up the worker's handling of rebalance events a bit
and ensures we dont drop state machines in a way that prevents
them from later being recreated.  It also avoids a bug where, upon
failing over resources to a new orchestartor, we create a state
machine per worker, instead of dispatching them to one single worker.

To do this, the scheduler is passed into workers as well as the
process name, allowing them to more intelligently figure out what
they need to manage after a cluster event.

Finally, this ensures a config update is issued to appliances after
they have moved to a new orchestrator after a cluster event.

Change-Id: I76bf702c33ac6ff831270e7185a6aa3fc4c464ca
Partial-bug: #1524068
Closes-bug: #1527396
This commit is contained in:
Adam Gandelman 2015-12-17 15:16:35 -08:00
parent 652c7e03ef
commit f2360d861f
9 changed files with 317 additions and 65 deletions

View File

@ -40,12 +40,12 @@ SCHEDULER_OPTS = [
CONF.register_opts(SCHEDULER_OPTS)
def _worker(inq, worker_factory):
def _worker(inq, worker_factory, scheduler, proc_name):
"""Scheduler's worker process main function.
"""
daemon.ignore_signals()
LOG.debug('starting worker process')
worker = worker_factory()
worker = worker_factory(scheduler=scheduler, proc_name=proc_name)
while True:
try:
data = inq.get()
@ -118,20 +118,24 @@ class Scheduler(object):
# when someone calls our handle_message() method.
for i in range(self.num_workers):
wq = multiprocessing.JoinableQueue()
name = 'p%02d' % i
worker = multiprocessing.Process(
target=_worker,
kwargs={
'inq': wq,
'worker_factory': worker_factory,
'scheduler': self,
'proc_name': name,
},
name='p%02d' % i,
name=name,
)
worker.start()
self.workers.append({
'queue': wq,
'worker': worker,
})
self.dispatcher = Dispatcher(self.workers)
for w in self.workers:
w['worker'].start()
def stop(self):
"""Shutdown all workers cleanly.

View File

@ -26,6 +26,7 @@ import collections
import itertools
from astara.common.i18n import _LE, _LI, _LW
from astara.event import Resource
from astara.event import POLL, CREATE, READ, UPDATE, DELETE, REBUILD
from astara import instance_manager
from astara.drivers import states
@ -412,6 +413,9 @@ class Automaton(object):
)
self.state = CalcAction(self._state_params)
self.resource = Resource(
driver=self.driver, id=self.resource_id, tenant_id=self.tenant_id)
def service_shutdown(self):
"Called when the parent process is being stopped"
@ -541,6 +545,6 @@ class Automaton(object):
This is used after a ring rebalance if this state machine no longer
maps to the local Rug process.
"""
self.driver.log.info(
self.driver.log.debug(
'Dropping %s pending actions from queue', len(self._queue))
self._queue.clear()

View File

@ -81,6 +81,26 @@ class ResourceContainer(object):
with self.lock:
return item in self.state_machines
def unmanage(self, resource_id):
"""Used to delete a state machine from local management
Removes the local state machine from orchestrator management during
cluster events. This is different than deleting the resource in that
it does not tag the resource as also deleted from Neutron, which would
prevent us from recreating its state machine if the resource later ends
up back under this orchestrators control.
:param resource_id: The resource id to unmanage
"""
try:
with self.lock:
sm = self.state_machines.pop(resource_id)
sm.drop_queue()
LOG.debug('unmanaged tenant state machine for resource %s',
resource_id)
except KeyError:
pass
class TenantResourceManager(object):
"""Keep track of the state machines for the logical resources for a given
@ -107,6 +127,9 @@ class TenantResourceManager(object):
self._default_resource_id = None
self.delete(resource)
def unmanage_resource(self, resource_id):
self.state_machines.unmanage(resource_id)
def shutdown(self):
LOG.info('shutting down')
for resource_id, sm in self.state_machines.items():
@ -128,6 +151,9 @@ class TenantResourceManager(object):
}
self.notify(msg)
def get_all_state_machines(self):
return self.state_machines.values()
def get_state_machines(self, message, worker_context):
"""Return the state machines and the queue for sending it messages for
the logical resource being addressed by the message.

View File

@ -15,6 +15,7 @@
# under the License.
import mock
import testtools
from oslo_config import cfg
@ -28,6 +29,10 @@ class RugTestBase(testtools.TestCase):
self.argv = []
cfg.CONF.import_opt('host', 'astara.main')
self.time_patch = mock.patch('time.sleep')
self.time_mock = self.time_patch.start()
self.addCleanup(mock.patch.stopall)
def config(self, **kw):
"""Override config options for a test."""
group = kw.pop('group', None)

View File

@ -62,8 +62,9 @@ class PoolManagerTest(base.RugTestBase):
mock_delete.assert_called_with('errored_instance_id')
def test__check_del_instances(self):
self.time_patch.stop()
pool = self._create_pool(num=1, status=ak_pool.DELETING)
self.pool_manager.delete_timeout = 1
self.pool_manager.delete_timeout = .01
res = self.pool_manager._check_del_instances(pool)
# deletion hasn't timed out yet
@ -74,7 +75,7 @@ class PoolManagerTest(base.RugTestBase):
pool[self.resource][0].id, self.pool_manager._delete_counters)
# A stuck instance is reported back as such
time.sleep(1.5)
time.sleep(.02)
res = self.pool_manager._check_del_instances(pool)
self.assertIn(pool[self.resource][0], res)

View File

@ -265,6 +265,8 @@ class TestTenantResourceManager(unittest.TestCase):
self.assertIn('5678', self.trm.state_machines)
sm._do_delete()
self.assertNotIn('5678', self.trm.state_machines)
self.assertTrue(
self.trm.state_machines.has_been_deleted('5678'))
def test_report_bandwidth(self):
notifications = []
@ -293,3 +295,11 @@ class TestTenantResourceManager(unittest.TestCase):
self.trm.get_state_machine_by_resource_id('fake_resource_id'),
fake_sm
)
def test_unmanage_resource(self):
fake_sm = mock.Mock()
self.trm.state_machines['fake-resource_id'] = fake_sm
self.trm.unmanage_resource('fake-resource-id')
self.assertNotIn('fake-resource-id', self.trm.state_machines)
self.assertFalse(
self.trm.state_machines.has_been_deleted('fake-resource-id'))

View File

@ -60,7 +60,14 @@ class WorkerTestBase(base.DbTestCase):
self.fake_neutron = mock.patch.object(
neutron, 'Neutron', return_value=fake_neutron_obj).start()
self.w = worker.Worker(mock.Mock(), fakes.FAKE_MGT_ADDR)
self.fake_scheduler = mock.Mock()
self.proc_name = 'p0x'
self.w = worker.Worker(
notifier=mock.Mock(),
management_address=fakes.FAKE_MGT_ADDR,
scheduler=self.fake_scheduler,
proc_name=self.proc_name)
self.addCleanup(mock.patch.stopall)
self.target = self.tenant_id
@ -368,6 +375,25 @@ class TestWorker(WorkerTestBase):
'address': fakes.FAKE_MGT_ADDR,
})
@mock.patch('astara.worker.Worker._get_trms')
def test__get_all_state_machines(self, fake_get_trms):
trms = [
mock.Mock(
get_all_state_machines=mock.Mock(
return_value=['sm1', 'sm2']),
),
mock.Mock(
get_all_state_machines=mock.Mock(
return_value=['sm3', 'sm4']),
),
]
fake_get_trms.return_value = trms
res = self.w._get_all_state_machines()
self.assertEqual(
res,
set(['sm1', 'sm2', 'sm3', 'sm4'])
)
class TestResourceCache(WorkerTestBase):
def setUp(self):
@ -532,7 +558,8 @@ class TestShutdown(WorkerTestBase):
@mock.patch('kombu.Producer')
def test_stop_threads_notifier(self, producer, exchange, broker):
notifier = notifications.Publisher('topic')
w = worker.Worker(notifier, fakes.FAKE_MGT_ADDR)
w = worker.Worker(
notifier, fakes.FAKE_MGT_ADDR, self.fake_scheduler, self.proc_name)
self.assertTrue(notifier)
w._shutdown()
self.assertFalse(w.notifier._t)
@ -798,36 +825,170 @@ class TestGlobalDebug(WorkerTestBase):
class TestRebalance(WorkerTestBase):
def test_rebalance(self):
tenant_id = '98dd9c41-d3ac-4fd6-8927-567afa0b8fc3'
resource_id = 'ac194fc5-f317-412e-8611-fb290629f624'
r = event.Resource(
tenant_id=tenant_id,
id=resource_id,
driver='router',
)
msg = event.Event(
resource=r,
crud=event.CREATE,
def setUp(self):
super(TestRebalance, self).setUp()
self.fake_host = 'fake_host'
self.w.host = 'fake_host'
self.resource_id = '56232034-a852-11e5-854e-035a3632659f'
self.tenant_id = '601128de-a852-11e5-a09d-cf6fa26e6e6b'
self.resource = event.Resource(
'router',
self.resource_id,
self.tenant_id)
self.msg = event.Event(
resource=self.resource,
crud=None,
body={'key': 'value'},
)
trm = self.w._get_trms(tenant_id)[0]
sm = trm.get_state_machines(msg, worker.WorkerContext(
fakes.FAKE_MGT_ADDR))[0]
self.w.hash_ring_mgr.rebalance(['foo'])
self.assertEqual(self.w.hash_ring_mgr.hosts, set(['foo']))
r = event.Resource(
tenant_id='*',
id='*',
driver='*',
@mock.patch('astara.worker.Worker._repopulate')
def test_rebalance_bootstrap(self, fake_repop):
fake_hash = mock.Mock(
rebalance=mock.Mock(),
)
self.w.hash_ring_mgr = fake_hash
msg = event.Event(
resource=r,
resource=self.resource,
crud=event.REBALANCE,
body={'members': ['foo', 'bar']},
body={
'members': ['foo', 'bar'],
'node_bootstrap': True
},
)
with mock.patch.object(sm, 'drop_queue') as meth:
self.w.handle_message('*', msg)
self.assertTrue(meth.called)
self.assertEqual(self.w.hash_ring_mgr.hosts, set(['foo', 'bar']))
self.w.handle_message('*', msg)
fake_hash.rebalance.assert_called_with(['foo', 'bar'])
self.assertFalse(fake_repop.called)
@mock.patch('astara.worker.Worker._add_resource_to_work_queue')
@mock.patch('astara.worker.Worker._get_all_state_machines')
@mock.patch('astara.worker.Worker._repopulate')
def test_rebalance(self, fake_repop, fake_get_all_sms, fake_add_rsc):
sm1 = mock.Mock(
resource_id='sm1',
send_message=mock.Mock(return_value=True),
)
sm2 = mock.Mock(
resource_id='sm2',
resource='sm2_resource',
send_message=mock.Mock(return_value=True),
)
fake_get_all_sms.side_effect = [
set([sm1]),
set([sm1, sm2]),
]
fake_hash = mock.Mock(
rebalance=mock.Mock(),
)
self.w.hash_ring_mgr = fake_hash
msg = event.Event(
resource=self.resource,
crud=event.REBALANCE,
body={
'members': ['foo', 'bar'],
},
)
self.w.handle_message('*', msg)
fake_hash.rebalance.assert_called_with(['foo', 'bar'])
self.assertTrue(fake_repop.called)
exp_event = event.Event(
resource='sm2_resource',
crud=event.UPDATE,
body={}
)
sm2.send_message.assert_called_with(exp_event)
sm2._add_resource_to_work_queue(sm2)
@mock.patch('astara.populate.repopulate')
def test__repopulate_sm_removed(self, fake_repopulate):
fake_ring = mock.Mock(
get_hosts=mock.Mock()
)
fake_hash = mock.Mock(ring=fake_ring)
self.w.hash_ring_mgr = fake_hash
rsc1 = event.Resource(
driver='router',
tenant_id='79f418c8-a849-11e5-9c36-df27538e1b7e',
id='7f2a1d56-a849-11e5-a0ce-a74ef0b18fa1',
)
rsc2 = event.Resource(
driver='router',
tenant_id='8d55fdb4-a849-11e5-958f-0b870649546d',
id='9005cd5a-a849-11e5-a434-27c4c7c70a8b',
)
resources = [rsc1, rsc2]
# create initial, pre-rebalance state machines
for r in resources:
for trm in self.w._get_trms(r.tenant_id):
e = event.Event(resource=r, crud=None, body={})
trm.get_state_machines(e, self.w._context)
fake_hash.ring.get_hosts.side_effect = [
'foo', self.fake_host
]
fake_repopulate.return_value = resources
# mock doesn't like to have its .name overwritten?
class FakeWorker(object):
name = self.w.proc_name
tgt = [{'worker': FakeWorker()}]
self.w.scheduler.dispatcher.pick_workers = mock.Mock(return_value=tgt)
self.w._repopulate()
post_rebalance_sms = self.w._get_all_state_machines()
self.assertEqual(len(post_rebalance_sms), 1)
sm = post_rebalance_sms.pop()
self.assertEqual(sm.resource_id, rsc2.id)
@mock.patch('astara.populate.repopulate')
def test__repopulate_sm_added(self, fake_repopulate):
fake_ring = mock.Mock(
get_hosts=mock.Mock()
)
fake_hash = mock.Mock(ring=fake_ring)
self.w.hash_ring_mgr = fake_hash
rsc1 = event.Resource(
driver='router',
tenant_id='79f418c8-a849-11e5-9c36-df27538e1b7e',
id='7f2a1d56-a849-11e5-a0ce-a74ef0b18fa1',
)
rsc2 = event.Resource(
driver='router',
tenant_id='8d55fdb4-a849-11e5-958f-0b870649546d',
id='9005cd5a-a849-11e5-a434-27c4c7c70a8b',
)
rsc3 = event.Resource(
driver='router',
tenant_id='455549a4-a851-11e5-a060-df26a5877746',
id='4a05c758-a851-11e5-bf9f-0387cfcb8f9b',
)
resources = [rsc1, rsc2, rsc3]
# create initial, pre-rebalance state machines
for r in resources[:-1]:
for trm in self.w._get_trms(r.tenant_id):
e = event.Event(resource=r, crud=None, body={})
trm.get_state_machines(e, self.w._context)
fake_hash.ring.get_hosts.side_effect = [
self.fake_host, self.fake_host, self.fake_host
]
fake_repopulate.return_value = resources
# mock doesn't like to have its .name overwritten?
class FakeWorker(object):
name = self.w.proc_name
tgt = [{'worker': FakeWorker()}]
self.w.scheduler.dispatcher.pick_workers = mock.Mock(return_value=tgt)
self.w._repopulate()
post_rebalance_sms = self.w._get_all_state_machines()
self.assertEqual(len(post_rebalance_sms), 3)
rids = [r.id for r in resources]
for sm in post_rebalance_sms:
self.assertIn(sm.resource_id, rids)

View File

@ -126,7 +126,7 @@ class WorkerContext(object):
"""Holds resources owned by the worker and used by the Automaton.
"""
def __init__(self, management_address):
def __init__(self, management_address=None):
self.neutron = neutron.Neutron(cfg.CONF)
self.nova_client = nova.Nova(cfg.CONF)
self.management_address = management_address
@ -147,7 +147,7 @@ class Worker(object):
track of a bunch of the state machines, so the callable is a
method of an instance of this class instead of a simple function.
"""
def __init__(self, notifier, management_address):
def __init__(self, notifier, management_address, scheduler, proc_name):
self._ignore_directory = cfg.CONF.ignored_router_directory
self._queue_warning_threshold = cfg.CONF.queue_warning_threshold
self._reboot_error_threshold = cfg.CONF.reboot_error_threshold
@ -157,6 +157,8 @@ class Worker(object):
self._keep_going = True
self.tenant_managers = {}
self.management_address = management_address
self.scheduler = scheduler
self.proc_name = proc_name
self.resource_cache = TenantResourceCache()
# This process-global context should not be used in the
@ -233,7 +235,8 @@ class Worker(object):
if self.host not in target_hosts:
LOG.debug('Skipping update of router %s, it no longer '
'maps here.', sm.resource_id)
sm._do_delete()
trm = self.tenant_managers[sm.tenant_id]
trm.unmanage_resource(sm.resource_id)
self.work_queue.task_done()
with self.lock:
self._release_resource_lock(sm)
@ -439,7 +442,51 @@ class Worker(object):
if sm:
return sm
def _get_all_state_machines(self):
sms = set()
for trm in self._get_trms('*'):
sms.update(trm.get_all_state_machines())
return sms
def _repopulate(self):
"""Repopulate local state machines given the new DHT
After the hash ring has been rebalanced, this ensures the workers'
TRMs are populated with the correct set of state machines given the
current layout of the ring. We also consult the dispatcher to ensure
we're creating state machines on the correct worker process. This
also cleans up state machines that are no longer mapped here.
"""
LOG.debug('Running post-rebalance repopulate for worker %s',
self.proc_name)
for resource in populate.repopulate():
target_hosts = self.hash_ring_mgr.ring.get_hosts(
resource.id)
if self.host not in target_hosts:
tid = _normalize_uuid(resource.tenant_id)
if tid in self.tenant_managers:
trm = self.tenant_managers[tid]
trm.unmanage_resource(resource.id)
continue
tgt = self.scheduler.dispatcher.pick_workers(
resource.tenant_id)[0]
if tgt['worker'].name != self.proc_name:
# Typically, state machine creation doesn't happen until the
# dispatcher has scheduled a msg to a single worker. rebalances
# are scheduled to all workers so we need to consult the
# dispatcher here to avoid creating state machines in all
# workers.
continue
for trm in self._get_trms(resource.tenant_id):
# creates a state machine if one does not exist.
e = event.Event(resource=resource, crud=None, body={})
trm.get_state_machines(e, self._context)
def _rebalance(self, message):
# rebalance the ring with the new membership.
self.hash_ring_mgr.rebalance(message.body.get('members'))
# We leverage the rebalance event to both seed the local node's
@ -450,34 +497,23 @@ class Worker(object):
if message.body.get('node_bootstrap'):
return
# After we rebalance, we need to repopulate state machines
# for any resources that now map here. This is required
# otherwise commands that hash here will not be delivered
# until a state machine is created during a later event
# delivery. Note that this causes a double populate for new
# nodes (once for pre-populate on startup, again for the
# repopulate here once the node has joined the cluster)
for resource in populate.repopulate():
if not self.hash_ring_mgr.ring.get_hosts(resource.id):
continue
e = event.Event(resource=resource, crud=None, body={})
trms = self._get_trms(resource.tenant_id)
for trm in trms:
trm.get_state_machines(e, self._context)
# rebalance our hash ring according to new cluster membership
self.hash_ring_mgr.rebalance(message.body.get('members'))
# track which SMs we initially owned
orig_sms = self._get_all_state_machines()
# loop through all local state machines and drop all pending work
# for those that are no longer managed here, as per newly balanced
# hash ring
trms = self._get_trms('*')
for trm in trms:
sms = trm.get_state_machines(message, self._context)
for sm in sms:
target_hosts = self.hash_ring_mgr.ring.get_hosts(
sm.resource_id)
if self.host not in target_hosts:
sm.drop_queue()
# rebuild the TRMs and SMs based on new ownership
self._repopulate()
# TODO(adam_g): Replace the UPDATE with a POST_REBALANCE commnand
# that triggers a driver method instead of generic update.
# for newly owned resources, issue a post-rebalance update.
for sm in (self._get_all_state_machines() - orig_sms):
post_rebalance = event.Event(
resource=sm.resource, crud=event.UPDATE,
body={})
LOG.debug('Sending post-rebalance update for %s',
sm.resource_id)
if sm.send_message(post_rebalance):
self._add_resource_to_work_queue(sm)
# NOTE(adam_g): If somethings queued up on a SM, it means the SM
# is currently executing something thats probably long running

View File

@ -0,0 +1,5 @@
---
fixes:
- Bug `1527396 <https://bugs.launchpad.net/astara/+bug/1527396>`_
Fixes issue where, after a cluster rebalance, stat machines are created across
all workers and instead ensures they are only created on a single target worker.