diff --git a/astara/scheduler.py b/astara/scheduler.py index 5518b1f3..61dc0a56 100644 --- a/astara/scheduler.py +++ b/astara/scheduler.py @@ -40,12 +40,12 @@ SCHEDULER_OPTS = [ CONF.register_opts(SCHEDULER_OPTS) -def _worker(inq, worker_factory): +def _worker(inq, worker_factory, scheduler, proc_name): """Scheduler's worker process main function. """ daemon.ignore_signals() LOG.debug('starting worker process') - worker = worker_factory() + worker = worker_factory(scheduler=scheduler, proc_name=proc_name) while True: try: data = inq.get() @@ -118,20 +118,24 @@ class Scheduler(object): # when someone calls our handle_message() method. for i in range(self.num_workers): wq = multiprocessing.JoinableQueue() + name = 'p%02d' % i worker = multiprocessing.Process( target=_worker, kwargs={ 'inq': wq, 'worker_factory': worker_factory, + 'scheduler': self, + 'proc_name': name, }, - name='p%02d' % i, + name=name, ) - worker.start() self.workers.append({ 'queue': wq, 'worker': worker, }) self.dispatcher = Dispatcher(self.workers) + for w in self.workers: + w['worker'].start() def stop(self): """Shutdown all workers cleanly. diff --git a/astara/state.py b/astara/state.py index 135e955d..8068aab3 100644 --- a/astara/state.py +++ b/astara/state.py @@ -26,6 +26,7 @@ import collections import itertools from astara.common.i18n import _LE, _LI, _LW +from astara.event import Resource from astara.event import POLL, CREATE, READ, UPDATE, DELETE, REBUILD from astara import instance_manager from astara.drivers import states @@ -412,6 +413,9 @@ class Automaton(object): ) self.state = CalcAction(self._state_params) + self.resource = Resource( + driver=self.driver, id=self.resource_id, tenant_id=self.tenant_id) + def service_shutdown(self): "Called when the parent process is being stopped" @@ -541,6 +545,6 @@ class Automaton(object): This is used after a ring rebalance if this state machine no longer maps to the local Rug process. """ - self.driver.log.info( + self.driver.log.debug( 'Dropping %s pending actions from queue', len(self._queue)) self._queue.clear() diff --git a/astara/tenant.py b/astara/tenant.py index 1eac8632..00558663 100644 --- a/astara/tenant.py +++ b/astara/tenant.py @@ -81,6 +81,26 @@ class ResourceContainer(object): with self.lock: return item in self.state_machines + def unmanage(self, resource_id): + """Used to delete a state machine from local management + + Removes the local state machine from orchestrator management during + cluster events. This is different than deleting the resource in that + it does not tag the resource as also deleted from Neutron, which would + prevent us from recreating its state machine if the resource later ends + up back under this orchestrators control. + + :param resource_id: The resource id to unmanage + """ + try: + with self.lock: + sm = self.state_machines.pop(resource_id) + sm.drop_queue() + LOG.debug('unmanaged tenant state machine for resource %s', + resource_id) + except KeyError: + pass + class TenantResourceManager(object): """Keep track of the state machines for the logical resources for a given @@ -107,6 +127,9 @@ class TenantResourceManager(object): self._default_resource_id = None self.delete(resource) + def unmanage_resource(self, resource_id): + self.state_machines.unmanage(resource_id) + def shutdown(self): LOG.info('shutting down') for resource_id, sm in self.state_machines.items(): @@ -128,6 +151,9 @@ class TenantResourceManager(object): } self.notify(msg) + def get_all_state_machines(self): + return self.state_machines.values() + def get_state_machines(self, message, worker_context): """Return the state machines and the queue for sending it messages for the logical resource being addressed by the message. diff --git a/astara/test/unit/base.py b/astara/test/unit/base.py index 85b16006..af129dc0 100644 --- a/astara/test/unit/base.py +++ b/astara/test/unit/base.py @@ -15,6 +15,7 @@ # under the License. +import mock import testtools from oslo_config import cfg @@ -28,6 +29,10 @@ class RugTestBase(testtools.TestCase): self.argv = [] cfg.CONF.import_opt('host', 'astara.main') + self.time_patch = mock.patch('time.sleep') + self.time_mock = self.time_patch.start() + self.addCleanup(mock.patch.stopall) + def config(self, **kw): """Override config options for a test.""" group = kw.pop('group', None) diff --git a/astara/test/unit/pez/test_pool_manager.py b/astara/test/unit/pez/test_pool_manager.py index fc860eb7..efb62b7b 100644 --- a/astara/test/unit/pez/test_pool_manager.py +++ b/astara/test/unit/pez/test_pool_manager.py @@ -62,8 +62,9 @@ class PoolManagerTest(base.RugTestBase): mock_delete.assert_called_with('errored_instance_id') def test__check_del_instances(self): + self.time_patch.stop() pool = self._create_pool(num=1, status=ak_pool.DELETING) - self.pool_manager.delete_timeout = 1 + self.pool_manager.delete_timeout = .01 res = self.pool_manager._check_del_instances(pool) # deletion hasn't timed out yet @@ -74,7 +75,7 @@ class PoolManagerTest(base.RugTestBase): pool[self.resource][0].id, self.pool_manager._delete_counters) # A stuck instance is reported back as such - time.sleep(1.5) + time.sleep(.02) res = self.pool_manager._check_del_instances(pool) self.assertIn(pool[self.resource][0], res) diff --git a/astara/test/unit/test_tenant.py b/astara/test/unit/test_tenant.py index 13aec3f2..ca96c589 100644 --- a/astara/test/unit/test_tenant.py +++ b/astara/test/unit/test_tenant.py @@ -265,6 +265,8 @@ class TestTenantResourceManager(unittest.TestCase): self.assertIn('5678', self.trm.state_machines) sm._do_delete() self.assertNotIn('5678', self.trm.state_machines) + self.assertTrue( + self.trm.state_machines.has_been_deleted('5678')) def test_report_bandwidth(self): notifications = [] @@ -293,3 +295,11 @@ class TestTenantResourceManager(unittest.TestCase): self.trm.get_state_machine_by_resource_id('fake_resource_id'), fake_sm ) + + def test_unmanage_resource(self): + fake_sm = mock.Mock() + self.trm.state_machines['fake-resource_id'] = fake_sm + self.trm.unmanage_resource('fake-resource-id') + self.assertNotIn('fake-resource-id', self.trm.state_machines) + self.assertFalse( + self.trm.state_machines.has_been_deleted('fake-resource-id')) diff --git a/astara/test/unit/test_worker.py b/astara/test/unit/test_worker.py index 28a86f5a..b836d8a5 100644 --- a/astara/test/unit/test_worker.py +++ b/astara/test/unit/test_worker.py @@ -60,7 +60,14 @@ class WorkerTestBase(base.DbTestCase): self.fake_neutron = mock.patch.object( neutron, 'Neutron', return_value=fake_neutron_obj).start() - self.w = worker.Worker(mock.Mock(), fakes.FAKE_MGT_ADDR) + self.fake_scheduler = mock.Mock() + self.proc_name = 'p0x' + self.w = worker.Worker( + notifier=mock.Mock(), + management_address=fakes.FAKE_MGT_ADDR, + scheduler=self.fake_scheduler, + proc_name=self.proc_name) + self.addCleanup(mock.patch.stopall) self.target = self.tenant_id @@ -368,6 +375,25 @@ class TestWorker(WorkerTestBase): 'address': fakes.FAKE_MGT_ADDR, }) + @mock.patch('astara.worker.Worker._get_trms') + def test__get_all_state_machines(self, fake_get_trms): + trms = [ + mock.Mock( + get_all_state_machines=mock.Mock( + return_value=['sm1', 'sm2']), + ), + mock.Mock( + get_all_state_machines=mock.Mock( + return_value=['sm3', 'sm4']), + ), + ] + fake_get_trms.return_value = trms + res = self.w._get_all_state_machines() + self.assertEqual( + res, + set(['sm1', 'sm2', 'sm3', 'sm4']) + ) + class TestResourceCache(WorkerTestBase): def setUp(self): @@ -532,7 +558,8 @@ class TestShutdown(WorkerTestBase): @mock.patch('kombu.Producer') def test_stop_threads_notifier(self, producer, exchange, broker): notifier = notifications.Publisher('topic') - w = worker.Worker(notifier, fakes.FAKE_MGT_ADDR) + w = worker.Worker( + notifier, fakes.FAKE_MGT_ADDR, self.fake_scheduler, self.proc_name) self.assertTrue(notifier) w._shutdown() self.assertFalse(w.notifier._t) @@ -798,36 +825,170 @@ class TestGlobalDebug(WorkerTestBase): class TestRebalance(WorkerTestBase): - def test_rebalance(self): - tenant_id = '98dd9c41-d3ac-4fd6-8927-567afa0b8fc3' - resource_id = 'ac194fc5-f317-412e-8611-fb290629f624' - r = event.Resource( - tenant_id=tenant_id, - id=resource_id, - driver='router', - ) - msg = event.Event( - resource=r, - crud=event.CREATE, + def setUp(self): + super(TestRebalance, self).setUp() + self.fake_host = 'fake_host' + self.w.host = 'fake_host' + self.resource_id = '56232034-a852-11e5-854e-035a3632659f' + self.tenant_id = '601128de-a852-11e5-a09d-cf6fa26e6e6b' + + self.resource = event.Resource( + 'router', + self.resource_id, + self.tenant_id) + self.msg = event.Event( + resource=self.resource, + crud=None, body={'key': 'value'}, ) - trm = self.w._get_trms(tenant_id)[0] - sm = trm.get_state_machines(msg, worker.WorkerContext( - fakes.FAKE_MGT_ADDR))[0] - self.w.hash_ring_mgr.rebalance(['foo']) - self.assertEqual(self.w.hash_ring_mgr.hosts, set(['foo'])) - r = event.Resource( - tenant_id='*', - id='*', - driver='*', + @mock.patch('astara.worker.Worker._repopulate') + def test_rebalance_bootstrap(self, fake_repop): + fake_hash = mock.Mock( + rebalance=mock.Mock(), ) + self.w.hash_ring_mgr = fake_hash msg = event.Event( - resource=r, + resource=self.resource, crud=event.REBALANCE, - body={'members': ['foo', 'bar']}, + body={ + 'members': ['foo', 'bar'], + 'node_bootstrap': True + }, ) - with mock.patch.object(sm, 'drop_queue') as meth: - self.w.handle_message('*', msg) - self.assertTrue(meth.called) - self.assertEqual(self.w.hash_ring_mgr.hosts, set(['foo', 'bar'])) + self.w.handle_message('*', msg) + fake_hash.rebalance.assert_called_with(['foo', 'bar']) + self.assertFalse(fake_repop.called) + + @mock.patch('astara.worker.Worker._add_resource_to_work_queue') + @mock.patch('astara.worker.Worker._get_all_state_machines') + @mock.patch('astara.worker.Worker._repopulate') + def test_rebalance(self, fake_repop, fake_get_all_sms, fake_add_rsc): + sm1 = mock.Mock( + resource_id='sm1', + send_message=mock.Mock(return_value=True), + ) + sm2 = mock.Mock( + resource_id='sm2', + resource='sm2_resource', + send_message=mock.Mock(return_value=True), + ) + fake_get_all_sms.side_effect = [ + set([sm1]), + set([sm1, sm2]), + ] + fake_hash = mock.Mock( + rebalance=mock.Mock(), + ) + self.w.hash_ring_mgr = fake_hash + msg = event.Event( + resource=self.resource, + crud=event.REBALANCE, + body={ + 'members': ['foo', 'bar'], + }, + ) + self.w.handle_message('*', msg) + fake_hash.rebalance.assert_called_with(['foo', 'bar']) + self.assertTrue(fake_repop.called) + + exp_event = event.Event( + resource='sm2_resource', + crud=event.UPDATE, + body={} + ) + sm2.send_message.assert_called_with(exp_event) + sm2._add_resource_to_work_queue(sm2) + + @mock.patch('astara.populate.repopulate') + def test__repopulate_sm_removed(self, fake_repopulate): + fake_ring = mock.Mock( + get_hosts=mock.Mock() + ) + fake_hash = mock.Mock(ring=fake_ring) + self.w.hash_ring_mgr = fake_hash + + rsc1 = event.Resource( + driver='router', + tenant_id='79f418c8-a849-11e5-9c36-df27538e1b7e', + id='7f2a1d56-a849-11e5-a0ce-a74ef0b18fa1', + ) + rsc2 = event.Resource( + driver='router', + tenant_id='8d55fdb4-a849-11e5-958f-0b870649546d', + id='9005cd5a-a849-11e5-a434-27c4c7c70a8b', + ) + resources = [rsc1, rsc2] + + # create initial, pre-rebalance state machines + for r in resources: + for trm in self.w._get_trms(r.tenant_id): + e = event.Event(resource=r, crud=None, body={}) + trm.get_state_machines(e, self.w._context) + + fake_hash.ring.get_hosts.side_effect = [ + 'foo', self.fake_host + ] + fake_repopulate.return_value = resources + + # mock doesn't like to have its .name overwritten? + class FakeWorker(object): + name = self.w.proc_name + tgt = [{'worker': FakeWorker()}] + + self.w.scheduler.dispatcher.pick_workers = mock.Mock(return_value=tgt) + self.w._repopulate() + post_rebalance_sms = self.w._get_all_state_machines() + self.assertEqual(len(post_rebalance_sms), 1) + sm = post_rebalance_sms.pop() + self.assertEqual(sm.resource_id, rsc2.id) + + @mock.patch('astara.populate.repopulate') + def test__repopulate_sm_added(self, fake_repopulate): + fake_ring = mock.Mock( + get_hosts=mock.Mock() + ) + fake_hash = mock.Mock(ring=fake_ring) + self.w.hash_ring_mgr = fake_hash + + rsc1 = event.Resource( + driver='router', + tenant_id='79f418c8-a849-11e5-9c36-df27538e1b7e', + id='7f2a1d56-a849-11e5-a0ce-a74ef0b18fa1', + ) + rsc2 = event.Resource( + driver='router', + tenant_id='8d55fdb4-a849-11e5-958f-0b870649546d', + id='9005cd5a-a849-11e5-a434-27c4c7c70a8b', + ) + rsc3 = event.Resource( + driver='router', + tenant_id='455549a4-a851-11e5-a060-df26a5877746', + id='4a05c758-a851-11e5-bf9f-0387cfcb8f9b', + ) + + resources = [rsc1, rsc2, rsc3] + + # create initial, pre-rebalance state machines + for r in resources[:-1]: + for trm in self.w._get_trms(r.tenant_id): + e = event.Event(resource=r, crud=None, body={}) + trm.get_state_machines(e, self.w._context) + + fake_hash.ring.get_hosts.side_effect = [ + self.fake_host, self.fake_host, self.fake_host + ] + fake_repopulate.return_value = resources + + # mock doesn't like to have its .name overwritten? + class FakeWorker(object): + name = self.w.proc_name + tgt = [{'worker': FakeWorker()}] + + self.w.scheduler.dispatcher.pick_workers = mock.Mock(return_value=tgt) + self.w._repopulate() + post_rebalance_sms = self.w._get_all_state_machines() + self.assertEqual(len(post_rebalance_sms), 3) + rids = [r.id for r in resources] + for sm in post_rebalance_sms: + self.assertIn(sm.resource_id, rids) diff --git a/astara/worker.py b/astara/worker.py index b9bcadc5..e8a74591 100644 --- a/astara/worker.py +++ b/astara/worker.py @@ -126,7 +126,7 @@ class WorkerContext(object): """Holds resources owned by the worker and used by the Automaton. """ - def __init__(self, management_address): + def __init__(self, management_address=None): self.neutron = neutron.Neutron(cfg.CONF) self.nova_client = nova.Nova(cfg.CONF) self.management_address = management_address @@ -147,7 +147,7 @@ class Worker(object): track of a bunch of the state machines, so the callable is a method of an instance of this class instead of a simple function. """ - def __init__(self, notifier, management_address): + def __init__(self, notifier, management_address, scheduler, proc_name): self._ignore_directory = cfg.CONF.ignored_router_directory self._queue_warning_threshold = cfg.CONF.queue_warning_threshold self._reboot_error_threshold = cfg.CONF.reboot_error_threshold @@ -157,6 +157,8 @@ class Worker(object): self._keep_going = True self.tenant_managers = {} self.management_address = management_address + self.scheduler = scheduler + self.proc_name = proc_name self.resource_cache = TenantResourceCache() # This process-global context should not be used in the @@ -233,7 +235,8 @@ class Worker(object): if self.host not in target_hosts: LOG.debug('Skipping update of router %s, it no longer ' 'maps here.', sm.resource_id) - sm._do_delete() + trm = self.tenant_managers[sm.tenant_id] + trm.unmanage_resource(sm.resource_id) self.work_queue.task_done() with self.lock: self._release_resource_lock(sm) @@ -439,7 +442,51 @@ class Worker(object): if sm: return sm + def _get_all_state_machines(self): + sms = set() + for trm in self._get_trms('*'): + sms.update(trm.get_all_state_machines()) + return sms + + def _repopulate(self): + """Repopulate local state machines given the new DHT + + After the hash ring has been rebalanced, this ensures the workers' + TRMs are populated with the correct set of state machines given the + current layout of the ring. We also consult the dispatcher to ensure + we're creating state machines on the correct worker process. This + also cleans up state machines that are no longer mapped here. + """ + LOG.debug('Running post-rebalance repopulate for worker %s', + self.proc_name) + for resource in populate.repopulate(): + target_hosts = self.hash_ring_mgr.ring.get_hosts( + resource.id) + if self.host not in target_hosts: + tid = _normalize_uuid(resource.tenant_id) + if tid in self.tenant_managers: + trm = self.tenant_managers[tid] + trm.unmanage_resource(resource.id) + continue + + tgt = self.scheduler.dispatcher.pick_workers( + resource.tenant_id)[0] + + if tgt['worker'].name != self.proc_name: + # Typically, state machine creation doesn't happen until the + # dispatcher has scheduled a msg to a single worker. rebalances + # are scheduled to all workers so we need to consult the + # dispatcher here to avoid creating state machines in all + # workers. + continue + + for trm in self._get_trms(resource.tenant_id): + # creates a state machine if one does not exist. + e = event.Event(resource=resource, crud=None, body={}) + trm.get_state_machines(e, self._context) + def _rebalance(self, message): + # rebalance the ring with the new membership. self.hash_ring_mgr.rebalance(message.body.get('members')) # We leverage the rebalance event to both seed the local node's @@ -450,34 +497,23 @@ class Worker(object): if message.body.get('node_bootstrap'): return - # After we rebalance, we need to repopulate state machines - # for any resources that now map here. This is required - # otherwise commands that hash here will not be delivered - # until a state machine is created during a later event - # delivery. Note that this causes a double populate for new - # nodes (once for pre-populate on startup, again for the - # repopulate here once the node has joined the cluster) - for resource in populate.repopulate(): - if not self.hash_ring_mgr.ring.get_hosts(resource.id): - continue - e = event.Event(resource=resource, crud=None, body={}) - trms = self._get_trms(resource.tenant_id) - for trm in trms: - trm.get_state_machines(e, self._context) - # rebalance our hash ring according to new cluster membership - self.hash_ring_mgr.rebalance(message.body.get('members')) + # track which SMs we initially owned + orig_sms = self._get_all_state_machines() - # loop through all local state machines and drop all pending work - # for those that are no longer managed here, as per newly balanced - # hash ring - trms = self._get_trms('*') - for trm in trms: - sms = trm.get_state_machines(message, self._context) - for sm in sms: - target_hosts = self.hash_ring_mgr.ring.get_hosts( - sm.resource_id) - if self.host not in target_hosts: - sm.drop_queue() + # rebuild the TRMs and SMs based on new ownership + self._repopulate() + + # TODO(adam_g): Replace the UPDATE with a POST_REBALANCE commnand + # that triggers a driver method instead of generic update. + # for newly owned resources, issue a post-rebalance update. + for sm in (self._get_all_state_machines() - orig_sms): + post_rebalance = event.Event( + resource=sm.resource, crud=event.UPDATE, + body={}) + LOG.debug('Sending post-rebalance update for %s', + sm.resource_id) + if sm.send_message(post_rebalance): + self._add_resource_to_work_queue(sm) # NOTE(adam_g): If somethings queued up on a SM, it means the SM # is currently executing something thats probably long running diff --git a/releasenotes/notes/cleanup_sm_management_post_rebalance-3e7c64785679f239.yaml b/releasenotes/notes/cleanup_sm_management_post_rebalance-3e7c64785679f239.yaml new file mode 100644 index 00000000..acb6f773 --- /dev/null +++ b/releasenotes/notes/cleanup_sm_management_post_rebalance-3e7c64785679f239.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - Bug `1527396 `_ + Fixes issue where, after a cluster rebalance, stat machines are created across + all workers and instead ensures they are only created on a single target worker.