Merge "Provider wedge fix"
This commit is contained in:
commit
4a4a61dcf0
|
@ -490,6 +490,8 @@ class OpenStackNodeRequestHandler(NodeRequestHandler):
|
|||
"Pausing request handling to satisfy request %s",
|
||||
self.request)
|
||||
self.paused = True
|
||||
self.zk.deleteOldestUnusedNode(self.provider.name,
|
||||
self.pool.name)
|
||||
return
|
||||
|
||||
if self.paused:
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
elements-dir: .
|
||||
images-dir: '{images_dir}'
|
||||
|
||||
zookeeper-servers:
|
||||
- host: {zookeeper_host}
|
||||
port: {zookeeper_port}
|
||||
chroot: {zookeeper_chroot}
|
||||
|
||||
labels:
|
||||
- name: fake-label1
|
||||
min-ready: 1
|
||||
- name: fake-label2
|
||||
min-ready: 0
|
||||
|
||||
providers:
|
||||
- name: fake-provider
|
||||
cloud: fake
|
||||
driver: fake
|
||||
region-name: fake-region
|
||||
rate: 0.0001
|
||||
diskimages:
|
||||
- name: fake-image
|
||||
meta:
|
||||
key: value
|
||||
key2: value
|
||||
pools:
|
||||
- name: main
|
||||
max-servers: 1
|
||||
availability-zones:
|
||||
- az1
|
||||
networks:
|
||||
- net-name
|
||||
labels:
|
||||
- name: fake-label1
|
||||
diskimage: fake-image
|
||||
min-ram: 8192
|
||||
flavor-name: 'Fake'
|
||||
- name: fake-label2
|
||||
diskimage: fake-image
|
||||
min-ram: 8192
|
||||
flavor-name: 'Fake'
|
||||
|
||||
diskimages:
|
||||
- name: fake-image
|
||||
elements:
|
||||
- fedora
|
||||
- vm
|
||||
release: 21
|
||||
env-vars:
|
||||
TMPDIR: /opt/dib_tmp
|
||||
DIB_IMAGE_CACHE: /opt/dib_cache
|
||||
DIB_CLOUD_IMAGES: http://download.fedoraproject.org/pub/fedora/linux/releases/test/21-Beta/Cloud/Images/x86_64/
|
||||
BASE_IMAGE_FILE: Fedora-Cloud-Base-20141029-21_Beta.x86_64.qcow2
|
|
@ -112,7 +112,7 @@ class TestLauncher(tests.DBTestCase):
|
|||
self.assertEqual(nodes[3].type, 'fake-label2')
|
||||
|
||||
def _test_node_assignment_at_quota(self,
|
||||
config='node_quota.yaml',
|
||||
config,
|
||||
max_cores=100,
|
||||
max_instances=20,
|
||||
max_ram=1000000):
|
||||
|
@ -141,6 +141,23 @@ class TestLauncher(tests.DBTestCase):
|
|||
|
||||
client = pool.getProviderManager('fake-provider')._getClient()
|
||||
|
||||
req1 = zk.NodeRequest()
|
||||
req1.state = zk.REQUESTED
|
||||
req1.node_types.append('fake-label')
|
||||
req1.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req1)
|
||||
|
||||
self.log.debug("Waiting for 1st request %s", req1.id)
|
||||
req1 = self.waitForNodeRequest(req1, (zk.FULFILLED,))
|
||||
self.assertEqual(len(req1.nodes), 2)
|
||||
|
||||
# Mark the first request's nodes as in use so they won't be deleted
|
||||
# when we pause. Locking them is enough.
|
||||
req1_node1 = self.zk.getNode(req1.nodes[0])
|
||||
req1_node2 = self.zk.getNode(req1.nodes[1])
|
||||
self.zk.lockNode(req1_node1, blocking=False)
|
||||
self.zk.lockNode(req1_node2, blocking=False)
|
||||
|
||||
# One of the things we want to test is that if we spawn many
|
||||
# node launches at once, we do not deadlock while the request
|
||||
# handler pauses for quota. To ensure we test that case,
|
||||
|
@ -148,31 +165,16 @@ class TestLauncher(tests.DBTestCase):
|
|||
# requests we submit. This will ensure that we hold locks on
|
||||
# all of the nodes before pausing so that we can validate they
|
||||
# are released.
|
||||
client.pause_creates = True
|
||||
|
||||
req1 = zk.NodeRequest()
|
||||
req1.state = zk.REQUESTED
|
||||
req1.node_types.append('fake-label')
|
||||
req1.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req1)
|
||||
req2 = zk.NodeRequest()
|
||||
req2.state = zk.REQUESTED
|
||||
req2.node_types.append('fake-label')
|
||||
req2.node_types.append('fake-label')
|
||||
self.zk.storeNodeRequest(req2)
|
||||
|
||||
req1 = self.waitForNodeRequest(req1, (zk.PENDING,))
|
||||
req2 = self.waitForNodeRequest(req2, (zk.PENDING,))
|
||||
|
||||
# At this point, we should be about to create or have already
|
||||
# created two servers for the first request, and the request
|
||||
# handler has accepted the second node request but paused
|
||||
# waiting for the server count to go below quota.
|
||||
|
||||
# Wait until both of the servers exist.
|
||||
while len(client._server_list) < 2:
|
||||
time.sleep(0.1)
|
||||
|
||||
# At this point, we should have already created two servers for the
|
||||
# first request, and the request handler has accepted the second node
|
||||
# request but paused waiting for the server count to go below quota.
|
||||
# Wait until there is a paused request handler and check if there
|
||||
# are exactly two servers
|
||||
pool_worker = pool.getPoolWorkers('fake-provider')
|
||||
|
@ -180,22 +182,13 @@ class TestLauncher(tests.DBTestCase):
|
|||
time.sleep(0.1)
|
||||
self.assertEqual(len(client._server_list), 2)
|
||||
|
||||
# Allow the servers to finish being created.
|
||||
for server in client._server_list:
|
||||
server.event.set()
|
||||
|
||||
self.log.debug("Waiting for 1st request %s", req1.id)
|
||||
req1 = self.waitForNodeRequest(req1)
|
||||
self.assertEqual(req1.state, zk.FULFILLED)
|
||||
self.assertEqual(len(req1.nodes), 2)
|
||||
|
||||
# Mark the first request's nodes as USED, which will get them deleted
|
||||
# and allow the second to proceed.
|
||||
self.log.debug("Marking first node as used %s", req1.id)
|
||||
node = self.zk.getNode(req1.nodes[0])
|
||||
node.state = zk.USED
|
||||
self.zk.storeNode(node)
|
||||
self.waitForNodeDeletion(node)
|
||||
req1_node1.state = zk.USED
|
||||
self.zk.storeNode(req1_node1)
|
||||
self.zk.unlockNode(req1_node1)
|
||||
self.waitForNodeDeletion(req1_node1)
|
||||
|
||||
# To force the sequential nature of what we're testing, wait for
|
||||
# the 2nd request to get a node allocated to it now that we've
|
||||
|
@ -209,25 +202,16 @@ class TestLauncher(tests.DBTestCase):
|
|||
break
|
||||
|
||||
self.log.debug("Marking second node as used %s", req1.id)
|
||||
node = self.zk.getNode(req1.nodes[1])
|
||||
node.state = zk.USED
|
||||
self.zk.storeNode(node)
|
||||
self.waitForNodeDeletion(node)
|
||||
req1_node2.state = zk.USED
|
||||
self.zk.storeNode(req1_node2)
|
||||
self.zk.unlockNode(req1_node2)
|
||||
self.waitForNodeDeletion(req1_node2)
|
||||
|
||||
self.log.debug("Deleting 1st request %s", req1.id)
|
||||
self.zk.deleteNodeRequest(req1)
|
||||
self.waitForNodeRequestLockDeletion(req1.id)
|
||||
|
||||
# Wait until both of the servers exist.
|
||||
while len(client._server_list) < 2:
|
||||
time.sleep(0.1)
|
||||
|
||||
# Allow the servers to finish being created.
|
||||
for server in client._server_list:
|
||||
server.event.set()
|
||||
|
||||
req2 = self.waitForNodeRequest(req2)
|
||||
self.assertEqual(req2.state, zk.FULFILLED)
|
||||
req2 = self.waitForNodeRequest(req2, (zk.FULFILLED,))
|
||||
self.assertEqual(len(req2.nodes), 2)
|
||||
|
||||
def test_node_assignment_at_pool_quota_cores(self):
|
||||
|
@ -301,6 +285,10 @@ class TestLauncher(tests.DBTestCase):
|
|||
req1 = self.waitForNodeRequest(req1)
|
||||
self.assertEqual(req1.state, zk.FULFILLED)
|
||||
|
||||
# Lock this node so it appears as used and not deleted
|
||||
req1_node = self.zk.getNode(req1.nodes[0])
|
||||
self.zk.lockNode(req1_node, blocking=False)
|
||||
|
||||
# Now, reduce the quota so the next node unexpectedly
|
||||
# (according to nodepool's quota estimate) fails.
|
||||
client.max_instances = 1
|
||||
|
@ -930,18 +918,21 @@ class TestLauncher(tests.DBTestCase):
|
|||
# Note we use different provider specific labels here to avoid
|
||||
# a race where a single provider fulfills both of these initial
|
||||
# requests.
|
||||
|
||||
# fake-provider
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
# fake-provider
|
||||
req.node_types.append('fake-label2')
|
||||
self.zk.storeNodeRequest(req)
|
||||
req = self.waitForNodeRequest(req, zk.FULFILLED)
|
||||
|
||||
# fake-provider2
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
# fake-provider2
|
||||
req.node_types.append('fake-label3')
|
||||
self.zk.storeNodeRequest(req)
|
||||
req = self.waitForNodeRequest(req, zk.FULFILLED)
|
||||
|
||||
nodes = map(pool.zk.getNode, pool.zk.getNodes())
|
||||
provider1_first = None
|
||||
provider2_first = None
|
||||
|
@ -951,6 +942,11 @@ class TestLauncher(tests.DBTestCase):
|
|||
elif node.provider == 'fake-provider':
|
||||
provider1_first = node
|
||||
|
||||
# Mark the nodes as being used so they won't be deleted at pause.
|
||||
# Locking them is enough.
|
||||
self.zk.lockNode(provider1_first, blocking=False)
|
||||
self.zk.lockNode(provider2_first, blocking=False)
|
||||
|
||||
# Next two requests will go pending one for each provider.
|
||||
req1 = zk.NodeRequest()
|
||||
req1.state = zk.REQUESTED
|
||||
|
@ -965,8 +961,10 @@ class TestLauncher(tests.DBTestCase):
|
|||
req2 = self.waitForNodeRequest(req2, zk.PENDING)
|
||||
|
||||
# Delete node attached to provider2 this will cause provider2 to
|
||||
# fulfill the request it had pending.
|
||||
self.zk.deleteNode(provider2_first)
|
||||
# fulfill the request it had pending. Simply unlocking here should
|
||||
# cause it to be deleted.
|
||||
self.zk.unlockNode(provider2_first)
|
||||
self.waitForNodeDeletion(provider2_first)
|
||||
|
||||
while True:
|
||||
# Wait for provider2 node to be created. Also find the request
|
||||
|
@ -989,9 +987,10 @@ class TestLauncher(tests.DBTestCase):
|
|||
provider2_second = node
|
||||
break
|
||||
|
||||
# Now delete the new node we had provider2 build. At this point
|
||||
# The only provider with any requests is fake-provider.
|
||||
self.zk.deleteNode(provider2_second)
|
||||
# Now delete the new node we had provider2 build. At this point,
|
||||
# the only provider with any requests is fake-provider.
|
||||
provider2_second.state = zk.DELETING
|
||||
self.zk.storeNode(provider2_second)
|
||||
|
||||
# Set provider1 run_handler to throw exception to simulate a
|
||||
# broken cloud. Note the pool worker instantiates request handlers on
|
||||
|
@ -1005,9 +1004,12 @@ class TestLauncher(tests.DBTestCase):
|
|||
raise KeyError('fake-provider')
|
||||
|
||||
request_handler.launch_manager.launch = raise_KeyError
|
||||
# Delete instance in fake-provider. This should cause provider2
|
||||
# to service the request that was held pending by fake-provider.
|
||||
self.zk.deleteNode(provider1_first)
|
||||
|
||||
# Delete instance in fake-provider by unlocking it and allowing it to
|
||||
# become unused. This should cause provider2 to service the request
|
||||
# that was held pending by fake-provider.
|
||||
self.zk.unlockNode(provider1_first)
|
||||
|
||||
# Request is fulfilled by provider 2
|
||||
req = self.waitForNodeRequest(final_req)
|
||||
self.assertEqual(req.state, zk.FULFILLED)
|
||||
|
@ -1034,3 +1036,37 @@ class TestLauncher(tests.DBTestCase):
|
|||
|
||||
req = self.waitForNodeRequest(req)
|
||||
self.assertEqual(req.state, zk.FAILED)
|
||||
|
||||
def test_provider_wont_wedge(self):
|
||||
'''
|
||||
A provider should not wedge itself when it is at (1) maximum capacity
|
||||
(# registered nodes == max-servers), (2) all of its current nodes are
|
||||
not being used, and (3) a request comes in with a label that it does
|
||||
not yet have available. Normally, situation (3) combined with (1)
|
||||
would cause the provider to pause until capacity becomes available,
|
||||
but because of (2), it never will and we would wedge the provider.
|
||||
'''
|
||||
configfile = self.setup_config('wedge_test.yaml')
|
||||
self.useBuilder(configfile)
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
|
||||
# Wait for fake-label1 min-ready request to be fulfilled, which will
|
||||
# put us at maximum capacity with max-servers of 1.
|
||||
label1_nodes = self.waitForNodes('fake-label1')
|
||||
self.assertEqual(1, len(label1_nodes))
|
||||
|
||||
# Now we submit a request for fake-label2, which is not yet available.
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
req.node_types.append('fake-label2')
|
||||
self.zk.storeNodeRequest(req)
|
||||
|
||||
# The provider should pause here to handle the fake-label2 request.
|
||||
# But because the fake-label1 node is not being used, and will never
|
||||
# be freed because we are paused and not handling additional requests,
|
||||
# the pool worker thread should recognize that and delete the unused
|
||||
# fake-label1 node for us. It can then fulfill the fake-label2 request.
|
||||
self.waitForNodeDeletion(label1_nodes[0])
|
||||
req = self.waitForNodeRequest(req)
|
||||
self.assertEqual(req.state, zk.FULFILLED)
|
||||
|
|
|
@ -1646,6 +1646,69 @@ class ZooKeeper(object):
|
|||
ret[node.type].append(node)
|
||||
return ret
|
||||
|
||||
def deleteOldestUnusedNode(self, provider_name, pool_name):
|
||||
'''
|
||||
Deletes the oldest unused (READY+unlocked) node for a provider's pool.
|
||||
|
||||
If we discover that this provider pool already has a node that is
|
||||
being deleted, and whose state is less than 5 minutes old, do nothing.
|
||||
|
||||
:param str provider_name: Name of the provider.
|
||||
:param str pool_name: Pool name for the given provider.
|
||||
|
||||
:returns: True if a delete was requested, False otherwise.
|
||||
'''
|
||||
def age(timestamp):
|
||||
now = time.time()
|
||||
dt = now - timestamp
|
||||
m, s = divmod(dt, 60)
|
||||
h, m = divmod(m, 60)
|
||||
d, h = divmod(h, 24)
|
||||
return '%02d:%02d:%02d:%02d' % (d, h, m, s)
|
||||
|
||||
MAX_DELETE_AGE = 5 * 60
|
||||
|
||||
candidates = []
|
||||
for node in self.nodeIterator():
|
||||
if node.provider == provider_name and node.pool == pool_name:
|
||||
if node.state == READY:
|
||||
candidates.append(node)
|
||||
elif (node.state == DELETING and
|
||||
(time.time() - node.state_time / 1000) < MAX_DELETE_AGE
|
||||
):
|
||||
return False
|
||||
|
||||
candidates.sort(key=lambda n: n.state_time)
|
||||
for node in candidates:
|
||||
try:
|
||||
self.lockNode(node, blocking=False)
|
||||
except npe.ZKLockException:
|
||||
continue
|
||||
|
||||
# Make sure the state didn't change on us
|
||||
n = self.getNode(node.id)
|
||||
if n.state != READY:
|
||||
self.unlockNode(node)
|
||||
continue
|
||||
|
||||
node.state = DELETING
|
||||
try:
|
||||
self.log.debug("Deleting unused node %s (age: %s)",
|
||||
node.id, age(node.state_time))
|
||||
self.storeNode(node)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Error attempting to update unused node %s:", node.id)
|
||||
continue
|
||||
finally:
|
||||
self.unlockNode(node)
|
||||
|
||||
# If we got here, we found and requested a delete for the
|
||||
# oldest unused node.
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def nodeIterator(self):
|
||||
'''
|
||||
Utility generator method for iterating through all nodes.
|
||||
|
|
Loading…
Reference in New Issue