Continuously ensure the component registry is up to date
On startup, the launcher waits up to 5 seconds until it has seen its own registry entry because it uses the registry to decide if other components are able to handle a request, and if not, fail the request. In the case of a ZK disconnection, we will lose all information about registered components as well as the tree caches. Upon reconnection, we will repopulate the tree caches and re-register our component. If the tree cache repopulation happens first, our component registration may be in line behind several thousand ZK events. It may take more than 5 seconds to repopulate and it would be better for the launcher to wait until the component registry is up to date before it resumes processing. To fix this, instead of only waiting on the initial registration, we check each time through the launcher's main loop that the registry is up-to-date before we start processing. This should include disconnections because we expect the main loop to abort with an error and restart in those cases. This operates only on local cached data, so it doesn't generate any extra ZK traffic. Change-Id: I1949ec56610fe810d9e088b00666053f2cc37a9a
This commit is contained in:
parent
392cf017c3
commit
619dee016c
|
@ -33,6 +33,7 @@ from nodepool import config as nodepool_config
|
|||
from nodepool.zk import zookeeper as zk
|
||||
from nodepool.zk import ZooKeeperClient
|
||||
from nodepool.zk.components import LauncherComponent, PoolComponent
|
||||
from nodepool.zk.components import COMPONENT_REGISTRY
|
||||
from nodepool.driver.utils import QuotaInformation, QuotaSupport
|
||||
from nodepool.logconfig import get_annotated_logger
|
||||
from nodepool.version import get_version_string
|
||||
|
@ -444,6 +445,22 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
def getProviderManager(self):
|
||||
return self.nodepool.getProviderManager(self.provider_name)
|
||||
|
||||
def waitForComponentRegistration(self, timeout=5.0):
|
||||
# Wait 5 seconds for the component to appear in our local
|
||||
# cache so that operations which rely on lists of available
|
||||
# labels, etc, behave more synchronously.
|
||||
elapsed = 0.0
|
||||
while elapsed < timeout:
|
||||
for component in COMPONENT_REGISTRY.registry.all(
|
||||
self.component_info.kind):
|
||||
if self.component_info.path == component.path:
|
||||
return True
|
||||
time.sleep(0.1)
|
||||
elapsed += 0.1
|
||||
self.log.info("Did not see component registration for %s",
|
||||
self.component_info.path)
|
||||
return False
|
||||
|
||||
def run(self):
|
||||
self.running = True
|
||||
|
||||
|
@ -473,6 +490,14 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
if did_suspend:
|
||||
self.log.info("ZooKeeper available. Resuming")
|
||||
|
||||
# Verify that our own component is in the registry as
|
||||
# a proxy for determining that the registry is likely
|
||||
# to have up to date infor for all the other
|
||||
# components.
|
||||
if not self.waitForComponentRegistration():
|
||||
# Check if we're still running and then keep waiting.
|
||||
continue
|
||||
|
||||
pool_config = self.getPoolConfig()
|
||||
self.component_info.supported_labels = list(pool_config.labels)
|
||||
self.component_info.priority = self.getPriority()
|
||||
|
|
|
@ -42,13 +42,19 @@ class TestComponentRegistry(tests.DBTestCase):
|
|||
})
|
||||
launcher.register()
|
||||
|
||||
for _ in iterate_timeout(10, Exception,
|
||||
"node in registry"):
|
||||
launcher_pools = self.zk.getRegisteredPools()
|
||||
self.assertEqual(1, len(launcher_pools))
|
||||
self.assertEqual(launcher.id, list(launcher_pools)[0].id)
|
||||
if (len(launcher_pools) == 1 and
|
||||
launcher.id == list(launcher_pools)[0].id):
|
||||
break
|
||||
|
||||
launcher.unregister()
|
||||
for _ in iterate_timeout(10, Exception,
|
||||
"node not in registry"):
|
||||
launcher_pools = self.zk.getRegisteredPools()
|
||||
self.assertEqual(0, len(launcher_pools))
|
||||
if len(launcher_pools) == 0:
|
||||
break
|
||||
|
||||
|
||||
class TestZooKeeper(tests.DBTestCase):
|
||||
|
|
|
@ -144,22 +144,6 @@ class BaseComponent(ZooKeeperBase):
|
|||
include_data=True,
|
||||
)
|
||||
|
||||
if not COMPONENT_REGISTRY.registry:
|
||||
return
|
||||
|
||||
# Wait 5 seconds for the component to appear in our local
|
||||
# cache so that operations which rely on lists of available
|
||||
# labels, etc, behave more synchronously.
|
||||
for x in range(50):
|
||||
registered = set()
|
||||
for kind, components in COMPONENT_REGISTRY.registry.all():
|
||||
for component in components:
|
||||
registered.add(component.path)
|
||||
if self.path in registered:
|
||||
return
|
||||
time.sleep(0.1)
|
||||
self.log.info("Did not see component registration for %s", path)
|
||||
|
||||
def unregister(self):
|
||||
with self.register_lock:
|
||||
self.log.info("Unregistering component in ZooKeeper %s", self.path)
|
||||
|
|
Loading…
Reference in New Issue