Merge "Exit launchers and builders immediately"

This commit is contained in:
Zuul 2018-02-09 06:53:23 +00:00 committed by Gerrit Code Review
commit 1039b0fca5
2 changed files with 18 additions and 14 deletions

View File

@ -115,6 +115,7 @@ class BaseWorker(threading.Thread):
self.log = logging.getLogger("nodepool.builder.BaseWorker")
self.daemon = True
self._running = False
self._stop_event = threading.Event()
self._config = None
self._config_path = config_path
self._secure_path = secure_path
@ -141,6 +142,7 @@ class BaseWorker(threading.Thread):
def shutdown(self):
self._running = False
self._stop_event.set()
class CleanupWorker(BaseWorker):
@ -514,7 +516,7 @@ class CleanupWorker(BaseWorker):
self.log.exception("Exception in CleanupWorker:")
time.sleep(10)
time.sleep(self._interval)
self._stop_event.wait(self._interval)
provider_manager.ProviderManager.stopProviders(self._config)
@ -804,7 +806,7 @@ class BuildWorker(BaseWorker):
self.log.exception("Exception in BuildWorker:")
time.sleep(10)
time.sleep(self._interval)
self._stop_event.wait(self._interval)
def _run(self):
'''
@ -1066,7 +1068,7 @@ class UploadWorker(BaseWorker):
self.log.exception("Exception in UploadWorker:")
time.sleep(10)
time.sleep(self._interval)
self._stop_event.wait(self._interval)
provider_manager.ProviderManager.stopProviders(self._config)

View File

@ -132,6 +132,7 @@ class PoolWorker(threading.Thread):
self.provider_name = provider_name
self.pool_name = pool_name
self.running = False
self.stop_event = threading.Event()
self.paused_handler = None
self.request_handlers = []
self.watermark_sleep = nodepool.watermark_sleep
@ -299,7 +300,7 @@ class PoolWorker(threading.Thread):
self._removeCompletedHandlers()
except Exception:
self.log.exception("Error in PoolWorker:")
time.sleep(self.watermark_sleep)
self.stop_event.wait(self.watermark_sleep)
# Cleanup on exit
if self.paused_handler:
@ -315,6 +316,7 @@ class PoolWorker(threading.Thread):
'''
self.log.info("%s received stop" % self.name)
self.running = False
self.stop_event.set()
class BaseCleanupWorker(threading.Thread):
@ -323,6 +325,7 @@ class BaseCleanupWorker(threading.Thread):
self._nodepool = nodepool
self._interval = interval
self._running = False
self._stop_event = threading.Event()
def _deleteInstance(self, node):
'''
@ -361,12 +364,13 @@ class BaseCleanupWorker(threading.Thread):
self.log.info("ZooKeeper available. Resuming")
self._run()
time.sleep(self._interval)
self._stop_event.wait(self._interval)
self.log.info("Stopped")
def stop(self):
self._running = False
self._stop_event.set()
self.join()
@ -674,20 +678,22 @@ class NodePool(threading.Thread):
self.cleanup_interval = 60
self.delete_interval = 5
self._stopped = False
self._stop_event = threading.Event()
self.config = None
self.zk = None
self.statsd = stats.get_client()
self._pool_threads = {}
self._cleanup_thread = None
self._delete_thread = None
self._wake_condition = threading.Condition()
self._submittedRequests = {}
def stop(self):
self._stopped = True
self._wake_condition.acquire()
self._wake_condition.notify()
self._wake_condition.release()
self._stop_event.set()
# Our run method can start new threads, so make sure it has
# completed before we continue the shutdown.
if self.isAlive():
self.join()
if self.config:
provider_manager.ProviderManager.stopProviders(self.config)
@ -708,8 +714,6 @@ class NodePool(threading.Thread):
self.log.debug("Waiting for %s" % thd.name)
thd.join()
if self.isAlive():
self.join()
if self.zk:
self.zk.disconnect()
self.log.debug("Finished stopping")
@ -945,6 +949,4 @@ class NodePool(threading.Thread):
except Exception:
self.log.exception("Exception in main loop:")
self._wake_condition.acquire()
self._wake_condition.wait(self.watermark_sleep)
self._wake_condition.release()
self._stop_event.wait(self.watermark_sleep)