Exit launchers and builders immediately
Replace time.sleep with event.wait so that when the launcher is stopped, it exits immediately. The stop method of the main launcher process is altered to avoid a race condition where it spawns a new poolworker while it is shutting down. Change-Id: Idbb6cc8f1e40ee2611cc73e27232f7db308a7230
This commit is contained in:
parent
782dca216c
commit
4991bd5745
|
@ -115,6 +115,7 @@ class BaseWorker(threading.Thread):
|
|||
self.log = logging.getLogger("nodepool.builder.BaseWorker")
|
||||
self.daemon = True
|
||||
self._running = False
|
||||
self._stop_event = threading.Event()
|
||||
self._config = None
|
||||
self._config_path = config_path
|
||||
self._secure_path = secure_path
|
||||
|
@ -141,6 +142,7 @@ class BaseWorker(threading.Thread):
|
|||
|
||||
def shutdown(self):
|
||||
self._running = False
|
||||
self._stop_event.set()
|
||||
|
||||
|
||||
class CleanupWorker(BaseWorker):
|
||||
|
@ -514,7 +516,7 @@ class CleanupWorker(BaseWorker):
|
|||
self.log.exception("Exception in CleanupWorker:")
|
||||
time.sleep(10)
|
||||
|
||||
time.sleep(self._interval)
|
||||
self._stop_event.wait(self._interval)
|
||||
|
||||
provider_manager.ProviderManager.stopProviders(self._config)
|
||||
|
||||
|
@ -804,7 +806,7 @@ class BuildWorker(BaseWorker):
|
|||
self.log.exception("Exception in BuildWorker:")
|
||||
time.sleep(10)
|
||||
|
||||
time.sleep(self._interval)
|
||||
self._stop_event.wait(self._interval)
|
||||
|
||||
def _run(self):
|
||||
'''
|
||||
|
@ -1066,7 +1068,7 @@ class UploadWorker(BaseWorker):
|
|||
self.log.exception("Exception in UploadWorker:")
|
||||
time.sleep(10)
|
||||
|
||||
time.sleep(self._interval)
|
||||
self._stop_event.wait(self._interval)
|
||||
|
||||
provider_manager.ProviderManager.stopProviders(self._config)
|
||||
|
||||
|
|
|
@ -132,6 +132,7 @@ class PoolWorker(threading.Thread):
|
|||
self.provider_name = provider_name
|
||||
self.pool_name = pool_name
|
||||
self.running = False
|
||||
self.stop_event = threading.Event()
|
||||
self.paused_handler = None
|
||||
self.request_handlers = []
|
||||
self.watermark_sleep = nodepool.watermark_sleep
|
||||
|
@ -299,7 +300,7 @@ class PoolWorker(threading.Thread):
|
|||
self._removeCompletedHandlers()
|
||||
except Exception:
|
||||
self.log.exception("Error in PoolWorker:")
|
||||
time.sleep(self.watermark_sleep)
|
||||
self.stop_event.wait(self.watermark_sleep)
|
||||
|
||||
# Cleanup on exit
|
||||
if self.paused_handler:
|
||||
|
@ -315,6 +316,7 @@ class PoolWorker(threading.Thread):
|
|||
'''
|
||||
self.log.info("%s received stop" % self.name)
|
||||
self.running = False
|
||||
self.stop_event.set()
|
||||
|
||||
|
||||
class BaseCleanupWorker(threading.Thread):
|
||||
|
@ -323,6 +325,7 @@ class BaseCleanupWorker(threading.Thread):
|
|||
self._nodepool = nodepool
|
||||
self._interval = interval
|
||||
self._running = False
|
||||
self._stop_event = threading.Event()
|
||||
|
||||
def _deleteInstance(self, node):
|
||||
'''
|
||||
|
@ -361,12 +364,13 @@ class BaseCleanupWorker(threading.Thread):
|
|||
self.log.info("ZooKeeper available. Resuming")
|
||||
|
||||
self._run()
|
||||
time.sleep(self._interval)
|
||||
self._stop_event.wait(self._interval)
|
||||
|
||||
self.log.info("Stopped")
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
self._stop_event.set()
|
||||
self.join()
|
||||
|
||||
|
||||
|
@ -674,20 +678,22 @@ class NodePool(threading.Thread):
|
|||
self.cleanup_interval = 60
|
||||
self.delete_interval = 5
|
||||
self._stopped = False
|
||||
self._stop_event = threading.Event()
|
||||
self.config = None
|
||||
self.zk = None
|
||||
self.statsd = stats.get_client()
|
||||
self._pool_threads = {}
|
||||
self._cleanup_thread = None
|
||||
self._delete_thread = None
|
||||
self._wake_condition = threading.Condition()
|
||||
self._submittedRequests = {}
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
self._wake_condition.acquire()
|
||||
self._wake_condition.notify()
|
||||
self._wake_condition.release()
|
||||
self._stop_event.set()
|
||||
# Our run method can start new threads, so make sure it has
|
||||
# completed before we continue the shutdown.
|
||||
if self.isAlive():
|
||||
self.join()
|
||||
if self.config:
|
||||
provider_manager.ProviderManager.stopProviders(self.config)
|
||||
|
||||
|
@ -708,8 +714,6 @@ class NodePool(threading.Thread):
|
|||
self.log.debug("Waiting for %s" % thd.name)
|
||||
thd.join()
|
||||
|
||||
if self.isAlive():
|
||||
self.join()
|
||||
if self.zk:
|
||||
self.zk.disconnect()
|
||||
self.log.debug("Finished stopping")
|
||||
|
@ -945,6 +949,4 @@ class NodePool(threading.Thread):
|
|||
except Exception:
|
||||
self.log.exception("Exception in main loop:")
|
||||
|
||||
self._wake_condition.acquire()
|
||||
self._wake_condition.wait(self.watermark_sleep)
|
||||
self._wake_condition.release()
|
||||
self._stop_event.wait(self.watermark_sleep)
|
||||
|
|
Loading…
Reference in New Issue