From f2360d861f3904c8a06d94175be553fe5e7bab05 Mon Sep 17 00:00:00 2001 From: Adam Gandelman Date: Thu, 17 Dec 2015 15:16:35 -0800 Subject: [PATCH] Cleanup SM management during rebalance events. This cleans up the worker's handling of rebalance events a bit and ensures we dont drop state machines in a way that prevents them from later being recreated. It also avoids a bug where, upon failing over resources to a new orchestartor, we create a state machine per worker, instead of dispatching them to one single worker. To do this, the scheduler is passed into workers as well as the process name, allowing them to more intelligently figure out what they need to manage after a cluster event. Finally, this ensures a config update is issued to appliances after they have moved to a new orchestrator after a cluster event. Change-Id: I76bf702c33ac6ff831270e7185a6aa3fc4c464ca Partial-bug: #1524068 Closes-bug: #1527396 --- astara/scheduler.py | 12 +- astara/state.py | 6 +- astara/tenant.py | 26 +++ astara/test/unit/base.py | 5 + astara/test/unit/pez/test_pool_manager.py | 5 +- astara/test/unit/test_tenant.py | 10 + astara/test/unit/test_worker.py | 217 +++++++++++++++--- astara/worker.py | 96 +++++--- ...ement_post_rebalance-3e7c64785679f239.yaml | 5 + 9 files changed, 317 insertions(+), 65 deletions(-) create mode 100644 releasenotes/notes/cleanup_sm_management_post_rebalance-3e7c64785679f239.yaml diff --git a/astara/scheduler.py b/astara/scheduler.py index 5518b1f3..61dc0a56 100644 --- a/astara/scheduler.py +++ b/astara/scheduler.py @@ -40,12 +40,12 @@ SCHEDULER_OPTS = [ CONF.register_opts(SCHEDULER_OPTS) -def _worker(inq, worker_factory): +def _worker(inq, worker_factory, scheduler, proc_name): """Scheduler's worker process main function. """ daemon.ignore_signals() LOG.debug('starting worker process') - worker = worker_factory() + worker = worker_factory(scheduler=scheduler, proc_name=proc_name) while True: try: data = inq.get() @@ -118,20 +118,24 @@ class Scheduler(object): # when someone calls our handle_message() method. for i in range(self.num_workers): wq = multiprocessing.JoinableQueue() + name = 'p%02d' % i worker = multiprocessing.Process( target=_worker, kwargs={ 'inq': wq, 'worker_factory': worker_factory, + 'scheduler': self, + 'proc_name': name, }, - name='p%02d' % i, + name=name, ) - worker.start() self.workers.append({ 'queue': wq, 'worker': worker, }) self.dispatcher = Dispatcher(self.workers) + for w in self.workers: + w['worker'].start() def stop(self): """Shutdown all workers cleanly. diff --git a/astara/state.py b/astara/state.py index 135e955d..8068aab3 100644 --- a/astara/state.py +++ b/astara/state.py @@ -26,6 +26,7 @@ import collections import itertools from astara.common.i18n import _LE, _LI, _LW +from astara.event import Resource from astara.event import POLL, CREATE, READ, UPDATE, DELETE, REBUILD from astara import instance_manager from astara.drivers import states @@ -412,6 +413,9 @@ class Automaton(object): ) self.state = CalcAction(self._state_params) + self.resource = Resource( + driver=self.driver, id=self.resource_id, tenant_id=self.tenant_id) + def service_shutdown(self): "Called when the parent process is being stopped" @@ -541,6 +545,6 @@ class Automaton(object): This is used after a ring rebalance if this state machine no longer maps to the local Rug process. """ - self.driver.log.info( + self.driver.log.debug( 'Dropping %s pending actions from queue', len(self._queue)) self._queue.clear() diff --git a/astara/tenant.py b/astara/tenant.py index 1eac8632..00558663 100644 --- a/astara/tenant.py +++ b/astara/tenant.py @@ -81,6 +81,26 @@ class ResourceContainer(object): with self.lock: return item in self.state_machines + def unmanage(self, resource_id): + """Used to delete a state machine from local management + + Removes the local state machine from orchestrator management during + cluster events. This is different than deleting the resource in that + it does not tag the resource as also deleted from Neutron, which would + prevent us from recreating its state machine if the resource later ends + up back under this orchestrators control. + + :param resource_id: The resource id to unmanage + """ + try: + with self.lock: + sm = self.state_machines.pop(resource_id) + sm.drop_queue() + LOG.debug('unmanaged tenant state machine for resource %s', + resource_id) + except KeyError: + pass + class TenantResourceManager(object): """Keep track of the state machines for the logical resources for a given @@ -107,6 +127,9 @@ class TenantResourceManager(object): self._default_resource_id = None self.delete(resource) + def unmanage_resource(self, resource_id): + self.state_machines.unmanage(resource_id) + def shutdown(self): LOG.info('shutting down') for resource_id, sm in self.state_machines.items(): @@ -128,6 +151,9 @@ class TenantResourceManager(object): } self.notify(msg) + def get_all_state_machines(self): + return self.state_machines.values() + def get_state_machines(self, message, worker_context): """Return the state machines and the queue for sending it messages for the logical resource being addressed by the message. diff --git a/astara/test/unit/base.py b/astara/test/unit/base.py index 85b16006..af129dc0 100644 --- a/astara/test/unit/base.py +++ b/astara/test/unit/base.py @@ -15,6 +15,7 @@ # under the License. +import mock import testtools from oslo_config import cfg @@ -28,6 +29,10 @@ class RugTestBase(testtools.TestCase): self.argv = [] cfg.CONF.import_opt('host', 'astara.main') + self.time_patch = mock.patch('time.sleep') + self.time_mock = self.time_patch.start() + self.addCleanup(mock.patch.stopall) + def config(self, **kw): """Override config options for a test.""" group = kw.pop('group', None) diff --git a/astara/test/unit/pez/test_pool_manager.py b/astara/test/unit/pez/test_pool_manager.py index fc860eb7..efb62b7b 100644 --- a/astara/test/unit/pez/test_pool_manager.py +++ b/astara/test/unit/pez/test_pool_manager.py @@ -62,8 +62,9 @@ class PoolManagerTest(base.RugTestBase): mock_delete.assert_called_with('errored_instance_id') def test__check_del_instances(self): + self.time_patch.stop() pool = self._create_pool(num=1, status=ak_pool.DELETING) - self.pool_manager.delete_timeout = 1 + self.pool_manager.delete_timeout = .01 res = self.pool_manager._check_del_instances(pool) # deletion hasn't timed out yet @@ -74,7 +75,7 @@ class PoolManagerTest(base.RugTestBase): pool[self.resource][0].id, self.pool_manager._delete_counters) # A stuck instance is reported back as such - time.sleep(1.5) + time.sleep(.02) res = self.pool_manager._check_del_instances(pool) self.assertIn(pool[self.resource][0], res) diff --git a/astara/test/unit/test_tenant.py b/astara/test/unit/test_tenant.py index 13aec3f2..ca96c589 100644 --- a/astara/test/unit/test_tenant.py +++ b/astara/test/unit/test_tenant.py @@ -265,6 +265,8 @@ class TestTenantResourceManager(unittest.TestCase): self.assertIn('5678', self.trm.state_machines) sm._do_delete() self.assertNotIn('5678', self.trm.state_machines) + self.assertTrue( + self.trm.state_machines.has_been_deleted('5678')) def test_report_bandwidth(self): notifications = [] @@ -293,3 +295,11 @@ class TestTenantResourceManager(unittest.TestCase): self.trm.get_state_machine_by_resource_id('fake_resource_id'), fake_sm ) + + def test_unmanage_resource(self): + fake_sm = mock.Mock() + self.trm.state_machines['fake-resource_id'] = fake_sm + self.trm.unmanage_resource('fake-resource-id') + self.assertNotIn('fake-resource-id', self.trm.state_machines) + self.assertFalse( + self.trm.state_machines.has_been_deleted('fake-resource-id')) diff --git a/astara/test/unit/test_worker.py b/astara/test/unit/test_worker.py index 28a86f5a..b836d8a5 100644 --- a/astara/test/unit/test_worker.py +++ b/astara/test/unit/test_worker.py @@ -60,7 +60,14 @@ class WorkerTestBase(base.DbTestCase): self.fake_neutron = mock.patch.object( neutron, 'Neutron', return_value=fake_neutron_obj).start() - self.w = worker.Worker(mock.Mock(), fakes.FAKE_MGT_ADDR) + self.fake_scheduler = mock.Mock() + self.proc_name = 'p0x' + self.w = worker.Worker( + notifier=mock.Mock(), + management_address=fakes.FAKE_MGT_ADDR, + scheduler=self.fake_scheduler, + proc_name=self.proc_name) + self.addCleanup(mock.patch.stopall) self.target = self.tenant_id @@ -368,6 +375,25 @@ class TestWorker(WorkerTestBase): 'address': fakes.FAKE_MGT_ADDR, }) + @mock.patch('astara.worker.Worker._get_trms') + def test__get_all_state_machines(self, fake_get_trms): + trms = [ + mock.Mock( + get_all_state_machines=mock.Mock( + return_value=['sm1', 'sm2']), + ), + mock.Mock( + get_all_state_machines=mock.Mock( + return_value=['sm3', 'sm4']), + ), + ] + fake_get_trms.return_value = trms + res = self.w._get_all_state_machines() + self.assertEqual( + res, + set(['sm1', 'sm2', 'sm3', 'sm4']) + ) + class TestResourceCache(WorkerTestBase): def setUp(self): @@ -532,7 +558,8 @@ class TestShutdown(WorkerTestBase): @mock.patch('kombu.Producer') def test_stop_threads_notifier(self, producer, exchange, broker): notifier = notifications.Publisher('topic') - w = worker.Worker(notifier, fakes.FAKE_MGT_ADDR) + w = worker.Worker( + notifier, fakes.FAKE_MGT_ADDR, self.fake_scheduler, self.proc_name) self.assertTrue(notifier) w._shutdown() self.assertFalse(w.notifier._t) @@ -798,36 +825,170 @@ class TestGlobalDebug(WorkerTestBase): class TestRebalance(WorkerTestBase): - def test_rebalance(self): - tenant_id = '98dd9c41-d3ac-4fd6-8927-567afa0b8fc3' - resource_id = 'ac194fc5-f317-412e-8611-fb290629f624' - r = event.Resource( - tenant_id=tenant_id, - id=resource_id, - driver='router', - ) - msg = event.Event( - resource=r, - crud=event.CREATE, + def setUp(self): + super(TestRebalance, self).setUp() + self.fake_host = 'fake_host' + self.w.host = 'fake_host' + self.resource_id = '56232034-a852-11e5-854e-035a3632659f' + self.tenant_id = '601128de-a852-11e5-a09d-cf6fa26e6e6b' + + self.resource = event.Resource( + 'router', + self.resource_id, + self.tenant_id) + self.msg = event.Event( + resource=self.resource, + crud=None, body={'key': 'value'}, ) - trm = self.w._get_trms(tenant_id)[0] - sm = trm.get_state_machines(msg, worker.WorkerContext( - fakes.FAKE_MGT_ADDR))[0] - self.w.hash_ring_mgr.rebalance(['foo']) - self.assertEqual(self.w.hash_ring_mgr.hosts, set(['foo'])) - r = event.Resource( - tenant_id='*', - id='*', - driver='*', + @mock.patch('astara.worker.Worker._repopulate') + def test_rebalance_bootstrap(self, fake_repop): + fake_hash = mock.Mock( + rebalance=mock.Mock(), ) + self.w.hash_ring_mgr = fake_hash msg = event.Event( - resource=r, + resource=self.resource, crud=event.REBALANCE, - body={'members': ['foo', 'bar']}, + body={ + 'members': ['foo', 'bar'], + 'node_bootstrap': True + }, ) - with mock.patch.object(sm, 'drop_queue') as meth: - self.w.handle_message('*', msg) - self.assertTrue(meth.called) - self.assertEqual(self.w.hash_ring_mgr.hosts, set(['foo', 'bar'])) + self.w.handle_message('*', msg) + fake_hash.rebalance.assert_called_with(['foo', 'bar']) + self.assertFalse(fake_repop.called) + + @mock.patch('astara.worker.Worker._add_resource_to_work_queue') + @mock.patch('astara.worker.Worker._get_all_state_machines') + @mock.patch('astara.worker.Worker._repopulate') + def test_rebalance(self, fake_repop, fake_get_all_sms, fake_add_rsc): + sm1 = mock.Mock( + resource_id='sm1', + send_message=mock.Mock(return_value=True), + ) + sm2 = mock.Mock( + resource_id='sm2', + resource='sm2_resource', + send_message=mock.Mock(return_value=True), + ) + fake_get_all_sms.side_effect = [ + set([sm1]), + set([sm1, sm2]), + ] + fake_hash = mock.Mock( + rebalance=mock.Mock(), + ) + self.w.hash_ring_mgr = fake_hash + msg = event.Event( + resource=self.resource, + crud=event.REBALANCE, + body={ + 'members': ['foo', 'bar'], + }, + ) + self.w.handle_message('*', msg) + fake_hash.rebalance.assert_called_with(['foo', 'bar']) + self.assertTrue(fake_repop.called) + + exp_event = event.Event( + resource='sm2_resource', + crud=event.UPDATE, + body={} + ) + sm2.send_message.assert_called_with(exp_event) + sm2._add_resource_to_work_queue(sm2) + + @mock.patch('astara.populate.repopulate') + def test__repopulate_sm_removed(self, fake_repopulate): + fake_ring = mock.Mock( + get_hosts=mock.Mock() + ) + fake_hash = mock.Mock(ring=fake_ring) + self.w.hash_ring_mgr = fake_hash + + rsc1 = event.Resource( + driver='router', + tenant_id='79f418c8-a849-11e5-9c36-df27538e1b7e', + id='7f2a1d56-a849-11e5-a0ce-a74ef0b18fa1', + ) + rsc2 = event.Resource( + driver='router', + tenant_id='8d55fdb4-a849-11e5-958f-0b870649546d', + id='9005cd5a-a849-11e5-a434-27c4c7c70a8b', + ) + resources = [rsc1, rsc2] + + # create initial, pre-rebalance state machines + for r in resources: + for trm in self.w._get_trms(r.tenant_id): + e = event.Event(resource=r, crud=None, body={}) + trm.get_state_machines(e, self.w._context) + + fake_hash.ring.get_hosts.side_effect = [ + 'foo', self.fake_host + ] + fake_repopulate.return_value = resources + + # mock doesn't like to have its .name overwritten? + class FakeWorker(object): + name = self.w.proc_name + tgt = [{'worker': FakeWorker()}] + + self.w.scheduler.dispatcher.pick_workers = mock.Mock(return_value=tgt) + self.w._repopulate() + post_rebalance_sms = self.w._get_all_state_machines() + self.assertEqual(len(post_rebalance_sms), 1) + sm = post_rebalance_sms.pop() + self.assertEqual(sm.resource_id, rsc2.id) + + @mock.patch('astara.populate.repopulate') + def test__repopulate_sm_added(self, fake_repopulate): + fake_ring = mock.Mock( + get_hosts=mock.Mock() + ) + fake_hash = mock.Mock(ring=fake_ring) + self.w.hash_ring_mgr = fake_hash + + rsc1 = event.Resource( + driver='router', + tenant_id='79f418c8-a849-11e5-9c36-df27538e1b7e', + id='7f2a1d56-a849-11e5-a0ce-a74ef0b18fa1', + ) + rsc2 = event.Resource( + driver='router', + tenant_id='8d55fdb4-a849-11e5-958f-0b870649546d', + id='9005cd5a-a849-11e5-a434-27c4c7c70a8b', + ) + rsc3 = event.Resource( + driver='router', + tenant_id='455549a4-a851-11e5-a060-df26a5877746', + id='4a05c758-a851-11e5-bf9f-0387cfcb8f9b', + ) + + resources = [rsc1, rsc2, rsc3] + + # create initial, pre-rebalance state machines + for r in resources[:-1]: + for trm in self.w._get_trms(r.tenant_id): + e = event.Event(resource=r, crud=None, body={}) + trm.get_state_machines(e, self.w._context) + + fake_hash.ring.get_hosts.side_effect = [ + self.fake_host, self.fake_host, self.fake_host + ] + fake_repopulate.return_value = resources + + # mock doesn't like to have its .name overwritten? + class FakeWorker(object): + name = self.w.proc_name + tgt = [{'worker': FakeWorker()}] + + self.w.scheduler.dispatcher.pick_workers = mock.Mock(return_value=tgt) + self.w._repopulate() + post_rebalance_sms = self.w._get_all_state_machines() + self.assertEqual(len(post_rebalance_sms), 3) + rids = [r.id for r in resources] + for sm in post_rebalance_sms: + self.assertIn(sm.resource_id, rids) diff --git a/astara/worker.py b/astara/worker.py index b9bcadc5..e8a74591 100644 --- a/astara/worker.py +++ b/astara/worker.py @@ -126,7 +126,7 @@ class WorkerContext(object): """Holds resources owned by the worker and used by the Automaton. """ - def __init__(self, management_address): + def __init__(self, management_address=None): self.neutron = neutron.Neutron(cfg.CONF) self.nova_client = nova.Nova(cfg.CONF) self.management_address = management_address @@ -147,7 +147,7 @@ class Worker(object): track of a bunch of the state machines, so the callable is a method of an instance of this class instead of a simple function. """ - def __init__(self, notifier, management_address): + def __init__(self, notifier, management_address, scheduler, proc_name): self._ignore_directory = cfg.CONF.ignored_router_directory self._queue_warning_threshold = cfg.CONF.queue_warning_threshold self._reboot_error_threshold = cfg.CONF.reboot_error_threshold @@ -157,6 +157,8 @@ class Worker(object): self._keep_going = True self.tenant_managers = {} self.management_address = management_address + self.scheduler = scheduler + self.proc_name = proc_name self.resource_cache = TenantResourceCache() # This process-global context should not be used in the @@ -233,7 +235,8 @@ class Worker(object): if self.host not in target_hosts: LOG.debug('Skipping update of router %s, it no longer ' 'maps here.', sm.resource_id) - sm._do_delete() + trm = self.tenant_managers[sm.tenant_id] + trm.unmanage_resource(sm.resource_id) self.work_queue.task_done() with self.lock: self._release_resource_lock(sm) @@ -439,7 +442,51 @@ class Worker(object): if sm: return sm + def _get_all_state_machines(self): + sms = set() + for trm in self._get_trms('*'): + sms.update(trm.get_all_state_machines()) + return sms + + def _repopulate(self): + """Repopulate local state machines given the new DHT + + After the hash ring has been rebalanced, this ensures the workers' + TRMs are populated with the correct set of state machines given the + current layout of the ring. We also consult the dispatcher to ensure + we're creating state machines on the correct worker process. This + also cleans up state machines that are no longer mapped here. + """ + LOG.debug('Running post-rebalance repopulate for worker %s', + self.proc_name) + for resource in populate.repopulate(): + target_hosts = self.hash_ring_mgr.ring.get_hosts( + resource.id) + if self.host not in target_hosts: + tid = _normalize_uuid(resource.tenant_id) + if tid in self.tenant_managers: + trm = self.tenant_managers[tid] + trm.unmanage_resource(resource.id) + continue + + tgt = self.scheduler.dispatcher.pick_workers( + resource.tenant_id)[0] + + if tgt['worker'].name != self.proc_name: + # Typically, state machine creation doesn't happen until the + # dispatcher has scheduled a msg to a single worker. rebalances + # are scheduled to all workers so we need to consult the + # dispatcher here to avoid creating state machines in all + # workers. + continue + + for trm in self._get_trms(resource.tenant_id): + # creates a state machine if one does not exist. + e = event.Event(resource=resource, crud=None, body={}) + trm.get_state_machines(e, self._context) + def _rebalance(self, message): + # rebalance the ring with the new membership. self.hash_ring_mgr.rebalance(message.body.get('members')) # We leverage the rebalance event to both seed the local node's @@ -450,34 +497,23 @@ class Worker(object): if message.body.get('node_bootstrap'): return - # After we rebalance, we need to repopulate state machines - # for any resources that now map here. This is required - # otherwise commands that hash here will not be delivered - # until a state machine is created during a later event - # delivery. Note that this causes a double populate for new - # nodes (once for pre-populate on startup, again for the - # repopulate here once the node has joined the cluster) - for resource in populate.repopulate(): - if not self.hash_ring_mgr.ring.get_hosts(resource.id): - continue - e = event.Event(resource=resource, crud=None, body={}) - trms = self._get_trms(resource.tenant_id) - for trm in trms: - trm.get_state_machines(e, self._context) - # rebalance our hash ring according to new cluster membership - self.hash_ring_mgr.rebalance(message.body.get('members')) + # track which SMs we initially owned + orig_sms = self._get_all_state_machines() - # loop through all local state machines and drop all pending work - # for those that are no longer managed here, as per newly balanced - # hash ring - trms = self._get_trms('*') - for trm in trms: - sms = trm.get_state_machines(message, self._context) - for sm in sms: - target_hosts = self.hash_ring_mgr.ring.get_hosts( - sm.resource_id) - if self.host not in target_hosts: - sm.drop_queue() + # rebuild the TRMs and SMs based on new ownership + self._repopulate() + + # TODO(adam_g): Replace the UPDATE with a POST_REBALANCE commnand + # that triggers a driver method instead of generic update. + # for newly owned resources, issue a post-rebalance update. + for sm in (self._get_all_state_machines() - orig_sms): + post_rebalance = event.Event( + resource=sm.resource, crud=event.UPDATE, + body={}) + LOG.debug('Sending post-rebalance update for %s', + sm.resource_id) + if sm.send_message(post_rebalance): + self._add_resource_to_work_queue(sm) # NOTE(adam_g): If somethings queued up on a SM, it means the SM # is currently executing something thats probably long running diff --git a/releasenotes/notes/cleanup_sm_management_post_rebalance-3e7c64785679f239.yaml b/releasenotes/notes/cleanup_sm_management_post_rebalance-3e7c64785679f239.yaml new file mode 100644 index 00000000..acb6f773 --- /dev/null +++ b/releasenotes/notes/cleanup_sm_management_post_rebalance-3e7c64785679f239.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - Bug `1527396 `_ + Fixes issue where, after a cluster rebalance, stat machines are created across + all workers and instead ensures they are only created on a single target worker.