Merge "Exit launchers and builders immediately"
This commit is contained in:
commit
1039b0fca5
|
@ -115,6 +115,7 @@ class BaseWorker(threading.Thread):
|
|||
self.log = logging.getLogger("nodepool.builder.BaseWorker")
|
||||
self.daemon = True
|
||||
self._running = False
|
||||
self._stop_event = threading.Event()
|
||||
self._config = None
|
||||
self._config_path = config_path
|
||||
self._secure_path = secure_path
|
||||
|
@ -141,6 +142,7 @@ class BaseWorker(threading.Thread):
|
|||
|
||||
def shutdown(self):
|
||||
self._running = False
|
||||
self._stop_event.set()
|
||||
|
||||
|
||||
class CleanupWorker(BaseWorker):
|
||||
|
@ -514,7 +516,7 @@ class CleanupWorker(BaseWorker):
|
|||
self.log.exception("Exception in CleanupWorker:")
|
||||
time.sleep(10)
|
||||
|
||||
time.sleep(self._interval)
|
||||
self._stop_event.wait(self._interval)
|
||||
|
||||
provider_manager.ProviderManager.stopProviders(self._config)
|
||||
|
||||
|
@ -804,7 +806,7 @@ class BuildWorker(BaseWorker):
|
|||
self.log.exception("Exception in BuildWorker:")
|
||||
time.sleep(10)
|
||||
|
||||
time.sleep(self._interval)
|
||||
self._stop_event.wait(self._interval)
|
||||
|
||||
def _run(self):
|
||||
'''
|
||||
|
@ -1066,7 +1068,7 @@ class UploadWorker(BaseWorker):
|
|||
self.log.exception("Exception in UploadWorker:")
|
||||
time.sleep(10)
|
||||
|
||||
time.sleep(self._interval)
|
||||
self._stop_event.wait(self._interval)
|
||||
|
||||
provider_manager.ProviderManager.stopProviders(self._config)
|
||||
|
||||
|
|
|
@ -132,6 +132,7 @@ class PoolWorker(threading.Thread):
|
|||
self.provider_name = provider_name
|
||||
self.pool_name = pool_name
|
||||
self.running = False
|
||||
self.stop_event = threading.Event()
|
||||
self.paused_handler = None
|
||||
self.request_handlers = []
|
||||
self.watermark_sleep = nodepool.watermark_sleep
|
||||
|
@ -299,7 +300,7 @@ class PoolWorker(threading.Thread):
|
|||
self._removeCompletedHandlers()
|
||||
except Exception:
|
||||
self.log.exception("Error in PoolWorker:")
|
||||
time.sleep(self.watermark_sleep)
|
||||
self.stop_event.wait(self.watermark_sleep)
|
||||
|
||||
# Cleanup on exit
|
||||
if self.paused_handler:
|
||||
|
@ -315,6 +316,7 @@ class PoolWorker(threading.Thread):
|
|||
'''
|
||||
self.log.info("%s received stop" % self.name)
|
||||
self.running = False
|
||||
self.stop_event.set()
|
||||
|
||||
|
||||
class BaseCleanupWorker(threading.Thread):
|
||||
|
@ -323,6 +325,7 @@ class BaseCleanupWorker(threading.Thread):
|
|||
self._nodepool = nodepool
|
||||
self._interval = interval
|
||||
self._running = False
|
||||
self._stop_event = threading.Event()
|
||||
|
||||
def _deleteInstance(self, node):
|
||||
'''
|
||||
|
@ -361,12 +364,13 @@ class BaseCleanupWorker(threading.Thread):
|
|||
self.log.info("ZooKeeper available. Resuming")
|
||||
|
||||
self._run()
|
||||
time.sleep(self._interval)
|
||||
self._stop_event.wait(self._interval)
|
||||
|
||||
self.log.info("Stopped")
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
self._stop_event.set()
|
||||
self.join()
|
||||
|
||||
|
||||
|
@ -674,20 +678,22 @@ class NodePool(threading.Thread):
|
|||
self.cleanup_interval = 60
|
||||
self.delete_interval = 5
|
||||
self._stopped = False
|
||||
self._stop_event = threading.Event()
|
||||
self.config = None
|
||||
self.zk = None
|
||||
self.statsd = stats.get_client()
|
||||
self._pool_threads = {}
|
||||
self._cleanup_thread = None
|
||||
self._delete_thread = None
|
||||
self._wake_condition = threading.Condition()
|
||||
self._submittedRequests = {}
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
self._wake_condition.acquire()
|
||||
self._wake_condition.notify()
|
||||
self._wake_condition.release()
|
||||
self._stop_event.set()
|
||||
# Our run method can start new threads, so make sure it has
|
||||
# completed before we continue the shutdown.
|
||||
if self.isAlive():
|
||||
self.join()
|
||||
if self.config:
|
||||
provider_manager.ProviderManager.stopProviders(self.config)
|
||||
|
||||
|
@ -708,8 +714,6 @@ class NodePool(threading.Thread):
|
|||
self.log.debug("Waiting for %s" % thd.name)
|
||||
thd.join()
|
||||
|
||||
if self.isAlive():
|
||||
self.join()
|
||||
if self.zk:
|
||||
self.zk.disconnect()
|
||||
self.log.debug("Finished stopping")
|
||||
|
@ -945,6 +949,4 @@ class NodePool(threading.Thread):
|
|||
except Exception:
|
||||
self.log.exception("Exception in main loop:")
|
||||
|
||||
self._wake_condition.acquire()
|
||||
self._wake_condition.wait(self.watermark_sleep)
|
||||
self._wake_condition.release()
|
||||
self._stop_event.wait(self.watermark_sleep)
|
||||
|
|
Loading…
Reference in New Issue