Exit launchers and builders immediately

Replace time.sleep with event.wait so that when the launcher is
stopped, it exits immediately.

The stop method of the main launcher process is altered to avoid
a race condition where it spawns a new poolworker while it is
shutting down.

Change-Id: Idbb6cc8f1e40ee2611cc73e27232f7db308a7230
This commit is contained in:
James E. Blair 2018-02-07 14:38:46 -08:00
parent 782dca216c
commit 4991bd5745
2 changed files with 18 additions and 14 deletions

View File

@ -115,6 +115,7 @@ class BaseWorker(threading.Thread):
self.log = logging.getLogger("nodepool.builder.BaseWorker")
self.daemon = True
self._running = False
self._stop_event = threading.Event()
self._config = None
self._config_path = config_path
self._secure_path = secure_path
@ -141,6 +142,7 @@ class BaseWorker(threading.Thread):
def shutdown(self):
self._running = False
self._stop_event.set()
class CleanupWorker(BaseWorker):
@ -514,7 +516,7 @@ class CleanupWorker(BaseWorker):
self.log.exception("Exception in CleanupWorker:")
time.sleep(10)
time.sleep(self._interval)
self._stop_event.wait(self._interval)
provider_manager.ProviderManager.stopProviders(self._config)
@ -804,7 +806,7 @@ class BuildWorker(BaseWorker):
self.log.exception("Exception in BuildWorker:")
time.sleep(10)
time.sleep(self._interval)
self._stop_event.wait(self._interval)
def _run(self):
'''
@ -1066,7 +1068,7 @@ class UploadWorker(BaseWorker):
self.log.exception("Exception in UploadWorker:")
time.sleep(10)
time.sleep(self._interval)
self._stop_event.wait(self._interval)
provider_manager.ProviderManager.stopProviders(self._config)

View File

@ -132,6 +132,7 @@ class PoolWorker(threading.Thread):
self.provider_name = provider_name
self.pool_name = pool_name
self.running = False
self.stop_event = threading.Event()
self.paused_handler = None
self.request_handlers = []
self.watermark_sleep = nodepool.watermark_sleep
@ -299,7 +300,7 @@ class PoolWorker(threading.Thread):
self._removeCompletedHandlers()
except Exception:
self.log.exception("Error in PoolWorker:")
time.sleep(self.watermark_sleep)
self.stop_event.wait(self.watermark_sleep)
# Cleanup on exit
if self.paused_handler:
@ -315,6 +316,7 @@ class PoolWorker(threading.Thread):
'''
self.log.info("%s received stop" % self.name)
self.running = False
self.stop_event.set()
class BaseCleanupWorker(threading.Thread):
@ -323,6 +325,7 @@ class BaseCleanupWorker(threading.Thread):
self._nodepool = nodepool
self._interval = interval
self._running = False
self._stop_event = threading.Event()
def _deleteInstance(self, node):
'''
@ -361,12 +364,13 @@ class BaseCleanupWorker(threading.Thread):
self.log.info("ZooKeeper available. Resuming")
self._run()
time.sleep(self._interval)
self._stop_event.wait(self._interval)
self.log.info("Stopped")
def stop(self):
self._running = False
self._stop_event.set()
self.join()
@ -674,20 +678,22 @@ class NodePool(threading.Thread):
self.cleanup_interval = 60
self.delete_interval = 5
self._stopped = False
self._stop_event = threading.Event()
self.config = None
self.zk = None
self.statsd = stats.get_client()
self._pool_threads = {}
self._cleanup_thread = None
self._delete_thread = None
self._wake_condition = threading.Condition()
self._submittedRequests = {}
def stop(self):
self._stopped = True
self._wake_condition.acquire()
self._wake_condition.notify()
self._wake_condition.release()
self._stop_event.set()
# Our run method can start new threads, so make sure it has
# completed before we continue the shutdown.
if self.isAlive():
self.join()
if self.config:
provider_manager.ProviderManager.stopProviders(self.config)
@ -708,8 +714,6 @@ class NodePool(threading.Thread):
self.log.debug("Waiting for %s" % thd.name)
thd.join()
if self.isAlive():
self.join()
if self.zk:
self.zk.disconnect()
self.log.debug("Finished stopping")
@ -945,6 +949,4 @@ class NodePool(threading.Thread):
except Exception:
self.log.exception("Exception in main loop:")
self._wake_condition.acquire()
self._wake_condition.wait(self.watermark_sleep)
self._wake_condition.release()
self._stop_event.wait(self.watermark_sleep)