diff --git a/nodepool/driver/openstack/handler.py b/nodepool/driver/openstack/handler.py index de9c7b60d..e4fe145ac 100644 --- a/nodepool/driver/openstack/handler.py +++ b/nodepool/driver/openstack/handler.py @@ -490,6 +490,8 @@ class OpenStackNodeRequestHandler(NodeRequestHandler): "Pausing request handling to satisfy request %s", self.request) self.paused = True + self.zk.deleteOldestUnusedNode(self.provider.name, + self.pool.name) return if self.paused: diff --git a/nodepool/tests/fixtures/wedge_test.yaml b/nodepool/tests/fixtures/wedge_test.yaml new file mode 100644 index 000000000..c624e2725 --- /dev/null +++ b/nodepool/tests/fixtures/wedge_test.yaml @@ -0,0 +1,53 @@ +elements-dir: . +images-dir: '{images_dir}' + +zookeeper-servers: + - host: {zookeeper_host} + port: {zookeeper_port} + chroot: {zookeeper_chroot} + +labels: + - name: fake-label1 + min-ready: 1 + - name: fake-label2 + min-ready: 0 + +providers: + - name: fake-provider + cloud: fake + driver: fake + region-name: fake-region + rate: 0.0001 + diskimages: + - name: fake-image + meta: + key: value + key2: value + pools: + - name: main + max-servers: 1 + availability-zones: + - az1 + networks: + - net-name + labels: + - name: fake-label1 + diskimage: fake-image + min-ram: 8192 + flavor-name: 'Fake' + - name: fake-label2 + diskimage: fake-image + min-ram: 8192 + flavor-name: 'Fake' + +diskimages: + - name: fake-image + elements: + - fedora + - vm + release: 21 + env-vars: + TMPDIR: /opt/dib_tmp + DIB_IMAGE_CACHE: /opt/dib_cache + DIB_CLOUD_IMAGES: http://download.fedoraproject.org/pub/fedora/linux/releases/test/21-Beta/Cloud/Images/x86_64/ + BASE_IMAGE_FILE: Fedora-Cloud-Base-20141029-21_Beta.x86_64.qcow2 diff --git a/nodepool/tests/test_launcher.py b/nodepool/tests/test_launcher.py index 5ddaacebd..7bb3cee96 100644 --- a/nodepool/tests/test_launcher.py +++ b/nodepool/tests/test_launcher.py @@ -112,7 +112,7 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(nodes[3].type, 'fake-label2') def _test_node_assignment_at_quota(self, - config='node_quota.yaml', + config, max_cores=100, max_instances=20, max_ram=1000000): @@ -141,6 +141,23 @@ class TestLauncher(tests.DBTestCase): client = pool.getProviderManager('fake-provider')._getClient() + req1 = zk.NodeRequest() + req1.state = zk.REQUESTED + req1.node_types.append('fake-label') + req1.node_types.append('fake-label') + self.zk.storeNodeRequest(req1) + + self.log.debug("Waiting for 1st request %s", req1.id) + req1 = self.waitForNodeRequest(req1, (zk.FULFILLED,)) + self.assertEqual(len(req1.nodes), 2) + + # Mark the first request's nodes as in use so they won't be deleted + # when we pause. Locking them is enough. + req1_node1 = self.zk.getNode(req1.nodes[0]) + req1_node2 = self.zk.getNode(req1.nodes[1]) + self.zk.lockNode(req1_node1, blocking=False) + self.zk.lockNode(req1_node2, blocking=False) + # One of the things we want to test is that if we spawn many # node launches at once, we do not deadlock while the request # handler pauses for quota. To ensure we test that case, @@ -148,31 +165,16 @@ class TestLauncher(tests.DBTestCase): # requests we submit. This will ensure that we hold locks on # all of the nodes before pausing so that we can validate they # are released. - client.pause_creates = True - - req1 = zk.NodeRequest() - req1.state = zk.REQUESTED - req1.node_types.append('fake-label') - req1.node_types.append('fake-label') - self.zk.storeNodeRequest(req1) req2 = zk.NodeRequest() req2.state = zk.REQUESTED req2.node_types.append('fake-label') req2.node_types.append('fake-label') self.zk.storeNodeRequest(req2) - - req1 = self.waitForNodeRequest(req1, (zk.PENDING,)) req2 = self.waitForNodeRequest(req2, (zk.PENDING,)) - # At this point, we should be about to create or have already - # created two servers for the first request, and the request - # handler has accepted the second node request but paused - # waiting for the server count to go below quota. - - # Wait until both of the servers exist. - while len(client._server_list) < 2: - time.sleep(0.1) - + # At this point, we should have already created two servers for the + # first request, and the request handler has accepted the second node + # request but paused waiting for the server count to go below quota. # Wait until there is a paused request handler and check if there # are exactly two servers pool_worker = pool.getPoolWorkers('fake-provider') @@ -180,22 +182,13 @@ class TestLauncher(tests.DBTestCase): time.sleep(0.1) self.assertEqual(len(client._server_list), 2) - # Allow the servers to finish being created. - for server in client._server_list: - server.event.set() - - self.log.debug("Waiting for 1st request %s", req1.id) - req1 = self.waitForNodeRequest(req1) - self.assertEqual(req1.state, zk.FULFILLED) - self.assertEqual(len(req1.nodes), 2) - # Mark the first request's nodes as USED, which will get them deleted # and allow the second to proceed. self.log.debug("Marking first node as used %s", req1.id) - node = self.zk.getNode(req1.nodes[0]) - node.state = zk.USED - self.zk.storeNode(node) - self.waitForNodeDeletion(node) + req1_node1.state = zk.USED + self.zk.storeNode(req1_node1) + self.zk.unlockNode(req1_node1) + self.waitForNodeDeletion(req1_node1) # To force the sequential nature of what we're testing, wait for # the 2nd request to get a node allocated to it now that we've @@ -209,25 +202,16 @@ class TestLauncher(tests.DBTestCase): break self.log.debug("Marking second node as used %s", req1.id) - node = self.zk.getNode(req1.nodes[1]) - node.state = zk.USED - self.zk.storeNode(node) - self.waitForNodeDeletion(node) + req1_node2.state = zk.USED + self.zk.storeNode(req1_node2) + self.zk.unlockNode(req1_node2) + self.waitForNodeDeletion(req1_node2) self.log.debug("Deleting 1st request %s", req1.id) self.zk.deleteNodeRequest(req1) self.waitForNodeRequestLockDeletion(req1.id) - # Wait until both of the servers exist. - while len(client._server_list) < 2: - time.sleep(0.1) - - # Allow the servers to finish being created. - for server in client._server_list: - server.event.set() - - req2 = self.waitForNodeRequest(req2) - self.assertEqual(req2.state, zk.FULFILLED) + req2 = self.waitForNodeRequest(req2, (zk.FULFILLED,)) self.assertEqual(len(req2.nodes), 2) def test_node_assignment_at_pool_quota_cores(self): @@ -301,6 +285,10 @@ class TestLauncher(tests.DBTestCase): req1 = self.waitForNodeRequest(req1) self.assertEqual(req1.state, zk.FULFILLED) + # Lock this node so it appears as used and not deleted + req1_node = self.zk.getNode(req1.nodes[0]) + self.zk.lockNode(req1_node, blocking=False) + # Now, reduce the quota so the next node unexpectedly # (according to nodepool's quota estimate) fails. client.max_instances = 1 @@ -930,18 +918,21 @@ class TestLauncher(tests.DBTestCase): # Note we use different provider specific labels here to avoid # a race where a single provider fulfills both of these initial # requests. + + # fake-provider req = zk.NodeRequest() req.state = zk.REQUESTED - # fake-provider req.node_types.append('fake-label2') self.zk.storeNodeRequest(req) req = self.waitForNodeRequest(req, zk.FULFILLED) + + # fake-provider2 req = zk.NodeRequest() req.state = zk.REQUESTED - # fake-provider2 req.node_types.append('fake-label3') self.zk.storeNodeRequest(req) req = self.waitForNodeRequest(req, zk.FULFILLED) + nodes = map(pool.zk.getNode, pool.zk.getNodes()) provider1_first = None provider2_first = None @@ -951,6 +942,11 @@ class TestLauncher(tests.DBTestCase): elif node.provider == 'fake-provider': provider1_first = node + # Mark the nodes as being used so they won't be deleted at pause. + # Locking them is enough. + self.zk.lockNode(provider1_first, blocking=False) + self.zk.lockNode(provider2_first, blocking=False) + # Next two requests will go pending one for each provider. req1 = zk.NodeRequest() req1.state = zk.REQUESTED @@ -965,8 +961,10 @@ class TestLauncher(tests.DBTestCase): req2 = self.waitForNodeRequest(req2, zk.PENDING) # Delete node attached to provider2 this will cause provider2 to - # fulfill the request it had pending. - self.zk.deleteNode(provider2_first) + # fulfill the request it had pending. Simply unlocking here should + # cause it to be deleted. + self.zk.unlockNode(provider2_first) + self.waitForNodeDeletion(provider2_first) while True: # Wait for provider2 node to be created. Also find the request @@ -989,9 +987,10 @@ class TestLauncher(tests.DBTestCase): provider2_second = node break - # Now delete the new node we had provider2 build. At this point - # The only provider with any requests is fake-provider. - self.zk.deleteNode(provider2_second) + # Now delete the new node we had provider2 build. At this point, + # the only provider with any requests is fake-provider. + provider2_second.state = zk.DELETING + self.zk.storeNode(provider2_second) # Set provider1 run_handler to throw exception to simulate a # broken cloud. Note the pool worker instantiates request handlers on @@ -1005,9 +1004,12 @@ class TestLauncher(tests.DBTestCase): raise KeyError('fake-provider') request_handler.launch_manager.launch = raise_KeyError - # Delete instance in fake-provider. This should cause provider2 - # to service the request that was held pending by fake-provider. - self.zk.deleteNode(provider1_first) + + # Delete instance in fake-provider by unlocking it and allowing it to + # become unused. This should cause provider2 to service the request + # that was held pending by fake-provider. + self.zk.unlockNode(provider1_first) + # Request is fulfilled by provider 2 req = self.waitForNodeRequest(final_req) self.assertEqual(req.state, zk.FULFILLED) @@ -1034,3 +1036,37 @@ class TestLauncher(tests.DBTestCase): req = self.waitForNodeRequest(req) self.assertEqual(req.state, zk.FAILED) + + def test_provider_wont_wedge(self): + ''' + A provider should not wedge itself when it is at (1) maximum capacity + (# registered nodes == max-servers), (2) all of its current nodes are + not being used, and (3) a request comes in with a label that it does + not yet have available. Normally, situation (3) combined with (1) + would cause the provider to pause until capacity becomes available, + but because of (2), it never will and we would wedge the provider. + ''' + configfile = self.setup_config('wedge_test.yaml') + self.useBuilder(configfile) + pool = self.useNodepool(configfile, watermark_sleep=1) + pool.start() + + # Wait for fake-label1 min-ready request to be fulfilled, which will + # put us at maximum capacity with max-servers of 1. + label1_nodes = self.waitForNodes('fake-label1') + self.assertEqual(1, len(label1_nodes)) + + # Now we submit a request for fake-label2, which is not yet available. + req = zk.NodeRequest() + req.state = zk.REQUESTED + req.node_types.append('fake-label2') + self.zk.storeNodeRequest(req) + + # The provider should pause here to handle the fake-label2 request. + # But because the fake-label1 node is not being used, and will never + # be freed because we are paused and not handling additional requests, + # the pool worker thread should recognize that and delete the unused + # fake-label1 node for us. It can then fulfill the fake-label2 request. + self.waitForNodeDeletion(label1_nodes[0]) + req = self.waitForNodeRequest(req) + self.assertEqual(req.state, zk.FULFILLED) diff --git a/nodepool/zk.py b/nodepool/zk.py index 3b1a000e1..a3f675710 100755 --- a/nodepool/zk.py +++ b/nodepool/zk.py @@ -1646,6 +1646,69 @@ class ZooKeeper(object): ret[node.type].append(node) return ret + def deleteOldestUnusedNode(self, provider_name, pool_name): + ''' + Deletes the oldest unused (READY+unlocked) node for a provider's pool. + + If we discover that this provider pool already has a node that is + being deleted, and whose state is less than 5 minutes old, do nothing. + + :param str provider_name: Name of the provider. + :param str pool_name: Pool name for the given provider. + + :returns: True if a delete was requested, False otherwise. + ''' + def age(timestamp): + now = time.time() + dt = now - timestamp + m, s = divmod(dt, 60) + h, m = divmod(m, 60) + d, h = divmod(h, 24) + return '%02d:%02d:%02d:%02d' % (d, h, m, s) + + MAX_DELETE_AGE = 5 * 60 + + candidates = [] + for node in self.nodeIterator(): + if node.provider == provider_name and node.pool == pool_name: + if node.state == READY: + candidates.append(node) + elif (node.state == DELETING and + (time.time() - node.state_time / 1000) < MAX_DELETE_AGE + ): + return False + + candidates.sort(key=lambda n: n.state_time) + for node in candidates: + try: + self.lockNode(node, blocking=False) + except npe.ZKLockException: + continue + + # Make sure the state didn't change on us + n = self.getNode(node.id) + if n.state != READY: + self.unlockNode(node) + continue + + node.state = DELETING + try: + self.log.debug("Deleting unused node %s (age: %s)", + node.id, age(node.state_time)) + self.storeNode(node) + except Exception: + self.log.exception( + "Error attempting to update unused node %s:", node.id) + continue + finally: + self.unlockNode(node) + + # If we got here, we found and requested a delete for the + # oldest unused node. + return True + + return False + def nodeIterator(self): ''' Utility generator method for iterating through all nodes.