diff --git a/gerritlib/gerrit.py b/gerritlib/gerrit.py index 8d2e7ac..d53f7db 100644 --- a/gerritlib/gerrit.py +++ b/gerritlib/gerrit.py @@ -29,7 +29,7 @@ class GerritWatcher(object): def __init__( self, gerrit, username=None, hostname=None, port=None, - keyfile=None): + keyfile=None, connection_attempts=-1, retry_delay=5): """Create a GerritWatcher. :param gerrit: A Gerrit instance to pass events to. @@ -37,11 +37,15 @@ class GerritWatcher(object): All other parameters are optional and if not supplied are sourced from the gerrit instance. """ + assert retry_delay >= 0, "Retry delay must be >= 0" self.username = username or gerrit.username self.keyfile = keyfile or gerrit.keyfile self.hostname = hostname or gerrit.hostname self.port = port or gerrit.port self.gerrit = gerrit + self.connection_attempts = int(connection_attempts) + self.retry_delay = float(retry_delay) + self.connected = False def _read(self, fd): l = fd.readline() @@ -62,28 +66,89 @@ class GerritWatcher(object): else: raise Exception("event on ssh connection") - def _run(self): - try: + def _connect(self): + """Attempts to connect and returns the connected client.""" + + def _make_client(): client = paramiko.SSHClient() client.load_system_host_keys() client.set_missing_host_key_policy(paramiko.WarningPolicy()) - client.connect(self.hostname, - username=self.username, - port=self.port, - key_filename=self.keyfile) + return client - stdin, stdout, stderr = client.exec_command("gerrit stream-events") + def _attempt_gen(connection_attempts, retry_delay): + if connection_attempts <= 0: + attempt = 1 + while True: + yield (attempt, retry_delay) + attempt += 1 + else: + for attempt in range(1, connection_attempts + 1): + if attempt < connection_attempts: + yield (attempt, retry_delay) + else: + # No more attempts left after this one, (signal this by + # not returning a valid retry_delay). + yield (attempt, None) - self._listen(stdout, stderr) + for (attempt, retry_delay) in _attempt_gen(self.connection_attempts, + self.retry_delay): + self.log.debug("Connection attempt %s to %s:%s (retry_delay=%s)", + attempt, self.hostname, self.port, retry_delay) + client = None + try: + client = _make_client() + client.connect(self.hostname, + username=self.username, + port=self.port, + key_filename=self.keyfile) + return client + except (IOError, paramiko.SSHException) as e: + self.log.exception("Exception connecting to %s:%s", + self.hostname, self.port) + if client: + try: + client.close() + except (IOError, paramiko.SSHException): + self.log.exception("Failure closing broken client") + if retry_delay is not None: + if retry_delay > 0: + self.log.info("Trying again in %s seconds", + retry_delay) + time.sleep(retry_delay) + else: + raise e - ret = stdout.channel.recv_exit_status() - self.log.debug("SSH exit status: %s" % ret) + def _consume(self, client): + """Consumes events using the given client.""" + stdin, stdout, stderr = client.exec_command("gerrit stream-events") - if ret: - raise Exception("Gerrit error executing stream-events") + self._listen(stdout, stderr) + + ret = stdout.channel.recv_exit_status() + self.log.debug("SSH exit status: %s" % ret) + + if ret: + raise Exception("Gerrit error executing stream-events:" + " return code %s" % ret) + + def _run(self): + self.connected = False + client = self._connect() + self.connected = True + try: + self._consume(client) except Exception: - self.log.exception("Exception on ssh event stream:") - time.sleep(5) + # NOTE(harlowja): allow consuming failures to *always* be retryable + self.log.exception("Exception consuming ssh event stream:") + if client: + try: + client.close() + except (IOError, paramiko.SSHException): + self.log.exception("Failure closing broken client") + if self.retry_delay > 0: + self.log.info("Delaying consumption retry for %s seconds", + self.retry_delay) + time.sleep(self.retry_delay) def run(self): while True: