From c1ea16e0b3fb88d3e3ec1cefe5dfe6aae39f34b3 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 21 Oct 2013 11:06:50 -0700 Subject: [PATCH] Add connection attempt limit Add a connect() function that connects to the ssh server, if this fails it does a retry algorithm until that fails and then the thread dies. If that works, consume from the given client and if that fails, allow for retrying using a new client (after a given consume_retry_delay delay). To maintain previous behavior the default of attempting to connect forever is provided when the number of connection_attempts is <= 0. Change-Id: I1a2cb764c7cef04b8137a7e4b05d9a4cc45498b0 --- gerritlib/gerrit.py | 95 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 80 insertions(+), 15 deletions(-) diff --git a/gerritlib/gerrit.py b/gerritlib/gerrit.py index f3c84c4..f9dfa00 100644 --- a/gerritlib/gerrit.py +++ b/gerritlib/gerrit.py @@ -29,7 +29,7 @@ class GerritWatcher(object): def __init__( self, gerrit, username=None, hostname=None, port=None, - keyfile=None): + keyfile=None, connection_attempts=-1, retry_delay=5): """Create a GerritWatcher. :param gerrit: A Gerrit instance to pass events to. @@ -37,11 +37,15 @@ class GerritWatcher(object): All other parameters are optional and if not supplied are sourced from the gerrit instance. """ + assert retry_delay >= 0, "Retry delay must be >= 0" self.username = username or gerrit.username self.keyfile = keyfile or gerrit.keyfile self.hostname = hostname or gerrit.hostname self.port = port or gerrit.port self.gerrit = gerrit + self.connection_attempts = int(connection_attempts) + self.retry_delay = float(retry_delay) + self.connected = False def _read(self, fd): l = fd.readline() @@ -62,28 +66,89 @@ class GerritWatcher(object): else: raise Exception("event on ssh connection") - def _run(self): - try: + def _connect(self): + """Attempts to connect and returns the connected client.""" + + def _make_client(): client = paramiko.SSHClient() client.load_system_host_keys() client.set_missing_host_key_policy(paramiko.WarningPolicy()) - client.connect(self.hostname, - username=self.username, - port=self.port, - key_filename=self.keyfile) + return client - stdin, stdout, stderr = client.exec_command("gerrit stream-events") + def _attempt_gen(connection_attempts, retry_delay): + if connection_attempts <= 0: + attempt = 1 + while True: + yield (attempt, retry_delay) + attempt += 1 + else: + for attempt in range(1, connection_attempts + 1): + if attempt < connection_attempts: + yield (attempt, retry_delay) + else: + # No more attempts left after this one, (signal this by + # not returning a valid retry_delay). + yield (attempt, None) - self._listen(stdout, stderr) + for (attempt, retry_delay) in _attempt_gen(self.connection_attempts, + self.retry_delay): + self.log.debug("Connection attempt %s to %s:%s (retry_delay=%s)", + attempt, self.hostname, self.port, retry_delay) + client = None + try: + client = _make_client() + client.connect(self.hostname, + username=self.username, + port=self.port, + key_filename=self.keyfile) + return client + except (IOError, paramiko.SSHException) as e: + self.log.exception("Exception connecting to %s:%s", + self.hostname, self.port) + if client: + try: + client.close() + except (IOError, paramiko.SSHException): + self.log.exception("Failure closing broken client") + if retry_delay is not None: + if retry_delay > 0: + self.log.info("Trying again in %s seconds", + retry_delay) + time.sleep(retry_delay) + else: + raise e - ret = stdout.channel.recv_exit_status() - self.log.debug("SSH exit status: %s" % ret) + def _consume(self, client): + """Consumes events using the given client.""" + stdin, stdout, stderr = client.exec_command("gerrit stream-events") - if ret: - raise Exception("Gerrit error executing stream-events") + self._listen(stdout, stderr) + + ret = stdout.channel.recv_exit_status() + self.log.debug("SSH exit status: %s" % ret) + + if ret: + raise Exception("Gerrit error executing stream-events:" + " return code %s" % ret) + + def _run(self): + self.connected = False + client = self._connect() + self.connected = True + try: + self._consume(client) except Exception: - self.log.exception("Exception on ssh event stream:") - time.sleep(5) + # NOTE(harlowja): allow consuming failures to *always* be retryable + self.log.exception("Exception consuming ssh event stream:") + if client: + try: + client.close() + except (IOError, paramiko.SSHException): + self.log.exception("Failure closing broken client") + if self.retry_delay > 0: + self.log.info("Delaying consumption retry for %s seconds", + self.retry_delay) + time.sleep(self.retry_delay) def run(self): while True: