Merge "Add connection attempt limit"
This commit is contained in:
commit
f1544a182c
|
@ -29,7 +29,7 @@ class GerritWatcher(object):
|
|||
|
||||
def __init__(
|
||||
self, gerrit, username=None, hostname=None, port=None,
|
||||
keyfile=None):
|
||||
keyfile=None, connection_attempts=-1, retry_delay=5):
|
||||
"""Create a GerritWatcher.
|
||||
|
||||
:param gerrit: A Gerrit instance to pass events to.
|
||||
|
@ -37,11 +37,15 @@ class GerritWatcher(object):
|
|||
All other parameters are optional and if not supplied are sourced from
|
||||
the gerrit instance.
|
||||
"""
|
||||
assert retry_delay >= 0, "Retry delay must be >= 0"
|
||||
self.username = username or gerrit.username
|
||||
self.keyfile = keyfile or gerrit.keyfile
|
||||
self.hostname = hostname or gerrit.hostname
|
||||
self.port = port or gerrit.port
|
||||
self.gerrit = gerrit
|
||||
self.connection_attempts = int(connection_attempts)
|
||||
self.retry_delay = float(retry_delay)
|
||||
self.connected = False
|
||||
|
||||
def _read(self, fd):
|
||||
l = fd.readline()
|
||||
|
@ -62,28 +66,89 @@ class GerritWatcher(object):
|
|||
else:
|
||||
raise Exception("event on ssh connection")
|
||||
|
||||
def _run(self):
|
||||
try:
|
||||
def _connect(self):
|
||||
"""Attempts to connect and returns the connected client."""
|
||||
|
||||
def _make_client():
|
||||
client = paramiko.SSHClient()
|
||||
client.load_system_host_keys()
|
||||
client.set_missing_host_key_policy(paramiko.WarningPolicy())
|
||||
client.connect(self.hostname,
|
||||
username=self.username,
|
||||
port=self.port,
|
||||
key_filename=self.keyfile)
|
||||
return client
|
||||
|
||||
stdin, stdout, stderr = client.exec_command("gerrit stream-events")
|
||||
def _attempt_gen(connection_attempts, retry_delay):
|
||||
if connection_attempts <= 0:
|
||||
attempt = 1
|
||||
while True:
|
||||
yield (attempt, retry_delay)
|
||||
attempt += 1
|
||||
else:
|
||||
for attempt in range(1, connection_attempts + 1):
|
||||
if attempt < connection_attempts:
|
||||
yield (attempt, retry_delay)
|
||||
else:
|
||||
# No more attempts left after this one, (signal this by
|
||||
# not returning a valid retry_delay).
|
||||
yield (attempt, None)
|
||||
|
||||
self._listen(stdout, stderr)
|
||||
for (attempt, retry_delay) in _attempt_gen(self.connection_attempts,
|
||||
self.retry_delay):
|
||||
self.log.debug("Connection attempt %s to %s:%s (retry_delay=%s)",
|
||||
attempt, self.hostname, self.port, retry_delay)
|
||||
client = None
|
||||
try:
|
||||
client = _make_client()
|
||||
client.connect(self.hostname,
|
||||
username=self.username,
|
||||
port=self.port,
|
||||
key_filename=self.keyfile)
|
||||
return client
|
||||
except (IOError, paramiko.SSHException) as e:
|
||||
self.log.exception("Exception connecting to %s:%s",
|
||||
self.hostname, self.port)
|
||||
if client:
|
||||
try:
|
||||
client.close()
|
||||
except (IOError, paramiko.SSHException):
|
||||
self.log.exception("Failure closing broken client")
|
||||
if retry_delay is not None:
|
||||
if retry_delay > 0:
|
||||
self.log.info("Trying again in %s seconds",
|
||||
retry_delay)
|
||||
time.sleep(retry_delay)
|
||||
else:
|
||||
raise e
|
||||
|
||||
ret = stdout.channel.recv_exit_status()
|
||||
self.log.debug("SSH exit status: %s" % ret)
|
||||
def _consume(self, client):
|
||||
"""Consumes events using the given client."""
|
||||
stdin, stdout, stderr = client.exec_command("gerrit stream-events")
|
||||
|
||||
if ret:
|
||||
raise Exception("Gerrit error executing stream-events")
|
||||
self._listen(stdout, stderr)
|
||||
|
||||
ret = stdout.channel.recv_exit_status()
|
||||
self.log.debug("SSH exit status: %s" % ret)
|
||||
|
||||
if ret:
|
||||
raise Exception("Gerrit error executing stream-events:"
|
||||
" return code %s" % ret)
|
||||
|
||||
def _run(self):
|
||||
self.connected = False
|
||||
client = self._connect()
|
||||
self.connected = True
|
||||
try:
|
||||
self._consume(client)
|
||||
except Exception:
|
||||
self.log.exception("Exception on ssh event stream:")
|
||||
time.sleep(5)
|
||||
# NOTE(harlowja): allow consuming failures to *always* be retryable
|
||||
self.log.exception("Exception consuming ssh event stream:")
|
||||
if client:
|
||||
try:
|
||||
client.close()
|
||||
except (IOError, paramiko.SSHException):
|
||||
self.log.exception("Failure closing broken client")
|
||||
if self.retry_delay > 0:
|
||||
self.log.info("Delaying consumption retry for %s seconds",
|
||||
self.retry_delay)
|
||||
time.sleep(self.retry_delay)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
|
|
Loading…
Reference in New Issue