Add connection attempt limit

Add a connect() function that connects
to the ssh server, if this fails it
does a retry algorithm until that fails
and then the thread dies.

If that works, consume from the given client
and if that fails, allow for retrying using
a new client (after a given consume_retry_delay
delay).

To maintain previous behavior the default of
attempting to connect forever is provided when the
number of connection_attempts is <= 0.

Change-Id: I1a2cb764c7cef04b8137a7e4b05d9a4cc45498b0
This commit is contained in:
Joshua Harlow 2013-10-21 11:06:50 -07:00 committed by Joshua Harlow
parent edb43f70f4
commit c1ea16e0b3
1 changed files with 80 additions and 15 deletions

View File

@ -29,7 +29,7 @@ class GerritWatcher(object):
def __init__(
self, gerrit, username=None, hostname=None, port=None,
keyfile=None):
keyfile=None, connection_attempts=-1, retry_delay=5):
"""Create a GerritWatcher.
:param gerrit: A Gerrit instance to pass events to.
@ -37,11 +37,15 @@ class GerritWatcher(object):
All other parameters are optional and if not supplied are sourced from
the gerrit instance.
"""
assert retry_delay >= 0, "Retry delay must be >= 0"
self.username = username or gerrit.username
self.keyfile = keyfile or gerrit.keyfile
self.hostname = hostname or gerrit.hostname
self.port = port or gerrit.port
self.gerrit = gerrit
self.connection_attempts = int(connection_attempts)
self.retry_delay = float(retry_delay)
self.connected = False
def _read(self, fd):
l = fd.readline()
@ -62,28 +66,89 @@ class GerritWatcher(object):
else:
raise Exception("event on ssh connection")
def _run(self):
try:
def _connect(self):
"""Attempts to connect and returns the connected client."""
def _make_client():
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
client.connect(self.hostname,
username=self.username,
port=self.port,
key_filename=self.keyfile)
return client
stdin, stdout, stderr = client.exec_command("gerrit stream-events")
def _attempt_gen(connection_attempts, retry_delay):
if connection_attempts <= 0:
attempt = 1
while True:
yield (attempt, retry_delay)
attempt += 1
else:
for attempt in range(1, connection_attempts + 1):
if attempt < connection_attempts:
yield (attempt, retry_delay)
else:
# No more attempts left after this one, (signal this by
# not returning a valid retry_delay).
yield (attempt, None)
self._listen(stdout, stderr)
for (attempt, retry_delay) in _attempt_gen(self.connection_attempts,
self.retry_delay):
self.log.debug("Connection attempt %s to %s:%s (retry_delay=%s)",
attempt, self.hostname, self.port, retry_delay)
client = None
try:
client = _make_client()
client.connect(self.hostname,
username=self.username,
port=self.port,
key_filename=self.keyfile)
return client
except (IOError, paramiko.SSHException) as e:
self.log.exception("Exception connecting to %s:%s",
self.hostname, self.port)
if client:
try:
client.close()
except (IOError, paramiko.SSHException):
self.log.exception("Failure closing broken client")
if retry_delay is not None:
if retry_delay > 0:
self.log.info("Trying again in %s seconds",
retry_delay)
time.sleep(retry_delay)
else:
raise e
ret = stdout.channel.recv_exit_status()
self.log.debug("SSH exit status: %s" % ret)
def _consume(self, client):
"""Consumes events using the given client."""
stdin, stdout, stderr = client.exec_command("gerrit stream-events")
if ret:
raise Exception("Gerrit error executing stream-events")
self._listen(stdout, stderr)
ret = stdout.channel.recv_exit_status()
self.log.debug("SSH exit status: %s" % ret)
if ret:
raise Exception("Gerrit error executing stream-events:"
" return code %s" % ret)
def _run(self):
self.connected = False
client = self._connect()
self.connected = True
try:
self._consume(client)
except Exception:
self.log.exception("Exception on ssh event stream:")
time.sleep(5)
# NOTE(harlowja): allow consuming failures to *always* be retryable
self.log.exception("Exception consuming ssh event stream:")
if client:
try:
client.close()
except (IOError, paramiko.SSHException):
self.log.exception("Failure closing broken client")
if self.retry_delay > 0:
self.log.info("Delaying consumption retry for %s seconds",
self.retry_delay)
time.sleep(self.retry_delay)
def run(self):
while True: