Expose the gerrit watcher as a thread with defined transitions

Instead of just maintaining a simple boolean that denotes
whether the thread is connected or not, expose a more rich
set of state transitions that can be used by external users
to know exactly the state the thread is currently in.

This new list is:

* IDLE (not initialized/started running)
* DISCONNECTED (not connected to gerrit)
* CONNECTING (attempting to connect)
* CONNECTED (connected to gerrit)
* CONSUMING (consuming events from gerrit)
* DEAD (thread has exited its run method)

The state transitions are the following:

IDLE -> DISCONNECTED
DISCONNECTED -> CONNECTING
CONNECTING -> CONNECTED
CONNECTED -> CONSUMING
CONSUMING -> DISCONNECTED (on consuming failure)
CONNECTING -> DEAD (if no more retries)

Change-Id: Ib4ecef4f093b6d6925bc4b553020e15111248617
This commit is contained in:
Joshua Harlow 2014-02-01 22:01:23 -08:00
parent 996343b115
commit a68b5972f5
1 changed files with 21 additions and 7 deletions

View File

@ -23,8 +23,15 @@ import time
import paramiko
CONNECTED = 'connected'
CONNECTING = 'connecting'
CONSUMING = 'consuming'
DEAD = 'dead'
DISCONNECTED = 'disconnected'
IDLE = 'idle'
class GerritWatcher(object):
class GerritWatcher(threading.Thread):
log = logging.getLogger("gerrit.GerritWatcher")
def __init__(
@ -37,6 +44,7 @@ class GerritWatcher(object):
All other parameters are optional and if not supplied are sourced from
the gerrit instance.
"""
super(GerritWatcher, self).__init__()
assert retry_delay >= 0, "Retry delay must be >= 0"
self.username = username or gerrit.username
self.keyfile = keyfile or gerrit.keyfile
@ -45,7 +53,7 @@ class GerritWatcher(object):
self.gerrit = gerrit
self.connection_attempts = int(connection_attempts)
self.retry_delay = float(retry_delay)
self.connected = False
self.state = IDLE
def _read(self, fd):
l = fd.readline()
@ -122,6 +130,7 @@ class GerritWatcher(object):
"""Consumes events using the given client."""
stdin, stdout, stderr = client.exec_command("gerrit stream-events")
self.state = CONSUMING
self._listen(stdout, stderr)
ret = stdout.channel.recv_exit_status()
@ -132,9 +141,9 @@ class GerritWatcher(object):
" return code %s" % ret)
def _run(self):
self.connected = False
self.state = CONNECTING
client = self._connect()
self.connected = True
self.state = CONNECTED
try:
self._consume(client)
except Exception:
@ -145,14 +154,19 @@ class GerritWatcher(object):
client.close()
except (IOError, paramiko.SSHException):
self.log.exception("Failure closing broken client")
self.state = DISCONNECTED
if self.retry_delay > 0:
self.log.info("Delaying consumption retry for %s seconds",
self.retry_delay)
time.sleep(self.retry_delay)
def run(self):
while True:
self._run()
try:
while True:
self.state = DISCONNECTED
self._run()
finally:
self.state = DEAD
class Gerrit(object):
@ -171,7 +185,7 @@ class Gerrit(object):
watcher = GerritWatcher(self,
connection_attempts=connection_attempts,
retry_delay=retry_delay)
self.watcher_thread = threading.Thread(target=watcher.run)
self.watcher_thread = watcher
self.watcher_thread.daemon = True
self.watcher_thread.start()