diff --git a/gerritlib/gerrit.py b/gerritlib/gerrit.py index 9a38ed9..63d008a 100644 --- a/gerritlib/gerrit.py +++ b/gerritlib/gerrit.py @@ -23,8 +23,15 @@ import time import paramiko +CONNECTED = 'connected' +CONNECTING = 'connecting' +CONSUMING = 'consuming' +DEAD = 'dead' +DISCONNECTED = 'disconnected' +IDLE = 'idle' -class GerritWatcher(object): + +class GerritWatcher(threading.Thread): log = logging.getLogger("gerrit.GerritWatcher") def __init__( @@ -37,6 +44,7 @@ class GerritWatcher(object): All other parameters are optional and if not supplied are sourced from the gerrit instance. """ + super(GerritWatcher, self).__init__() assert retry_delay >= 0, "Retry delay must be >= 0" self.username = username or gerrit.username self.keyfile = keyfile or gerrit.keyfile @@ -45,7 +53,7 @@ class GerritWatcher(object): self.gerrit = gerrit self.connection_attempts = int(connection_attempts) self.retry_delay = float(retry_delay) - self.connected = False + self.state = IDLE def _read(self, fd): l = fd.readline() @@ -122,6 +130,7 @@ class GerritWatcher(object): """Consumes events using the given client.""" stdin, stdout, stderr = client.exec_command("gerrit stream-events") + self.state = CONSUMING self._listen(stdout, stderr) ret = stdout.channel.recv_exit_status() @@ -132,9 +141,9 @@ class GerritWatcher(object): " return code %s" % ret) def _run(self): - self.connected = False + self.state = CONNECTING client = self._connect() - self.connected = True + self.state = CONNECTED try: self._consume(client) except Exception: @@ -145,14 +154,19 @@ class GerritWatcher(object): client.close() except (IOError, paramiko.SSHException): self.log.exception("Failure closing broken client") + self.state = DISCONNECTED if self.retry_delay > 0: self.log.info("Delaying consumption retry for %s seconds", self.retry_delay) time.sleep(self.retry_delay) def run(self): - while True: - self._run() + try: + while True: + self.state = DISCONNECTED + self._run() + finally: + self.state = DEAD class Gerrit(object): @@ -171,7 +185,7 @@ class Gerrit(object): watcher = GerritWatcher(self, connection_attempts=connection_attempts, retry_delay=retry_delay) - self.watcher_thread = threading.Thread(target=watcher.run) + self.watcher_thread = watcher self.watcher_thread.daemon = True self.watcher_thread.start()