From 71dbac070c7e6a74a192197ce6a6ce9cc65ee0f9 Mon Sep 17 00:00:00 2001 From: Tobias Henkel Date: Tue, 4 Sep 2018 13:30:40 +0200 Subject: [PATCH] Add support for keepalive to client A gearman client only waiting for jobs will wait indefinitely if the gearman server vanishes (e.g. due to a VM crash). In this case there is no traffic on the connection and the client blocks forever if there is nothing in between that forcefully terminates the connection. Adding tcp keepalive can mitigate that and the connection will be terminated by the kernel in this situation which then triggers a reconnect. Change-Id: I8589cd45450245a25539c051355b38d16ee9f4b9 --- gear/__init__.py | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/gear/__init__.py b/gear/__init__.py index 4ed674e..57d80e3 100644 --- a/gear/__init__.py +++ b/gear/__init__.py @@ -124,16 +124,25 @@ class Connection(object): :arg str client_id: The client ID associated with this connection. It will be appending to the name of the logger (e.g., gear.Connection.client_id). Defaults to 'unknown'. + :arg bool keepalive: Whether to use TCP keepalives + :arg int tcp_keepidle: Idle time after which to start keepalives sending + :arg int tcp_keepintvl: Interval in seconds between TCP keepalives + :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect """ def __init__(self, host, port, ssl_key=None, ssl_cert=None, ssl_ca=None, - client_id='unknown'): + client_id='unknown', keepalive=False, tcp_keepidle=7200, + tcp_keepintvl=75, tcp_keepcnt=9): self.log = logging.getLogger("gear.Connection.%s" % (client_id,)) self.host = host self.port = port self.ssl_key = ssl_key self.ssl_cert = ssl_cert self.ssl_ca = ssl_ca + self.keepalive = keepalive + self.tcp_keepcnt = tcp_keepcnt + self.tcp_keepintvl = tcp_keepintvl + self.tcp_keepidle = tcp_keepidle self.use_ssl = False if all([self.ssl_key, self.ssl_cert, self.ssl_ca]): @@ -182,6 +191,14 @@ class Connection(object): af, socktype, proto, canonname, sa = res try: s = socket.socket(af, socktype, proto) + if self.keepalive: + s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, + self.tcp_keepidle) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, + self.tcp_keepintvl) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, + self.tcp_keepcnt) except socket.error: s = None continue @@ -1162,7 +1179,9 @@ class BaseClient(BaseClientServer): self.broadcast_lock = threading.RLock() def addServer(self, host, port=4730, - ssl_key=None, ssl_cert=None, ssl_ca=None): + ssl_key=None, ssl_cert=None, ssl_ca=None, + keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75, + tcp_keepcnt=9): """Add a server to the client's connection pool. Any number of Gearman servers may be added to a client. The @@ -1184,6 +1203,11 @@ class BaseClient(BaseClientServer): :arg str ssl_key: Path to the SSL private key. :arg str ssl_cert: Path to the SSL certificate. :arg str ssl_ca: Path to the CA certificate. + :arg bool keepalive: Whether to use TCP keepalives + :arg int tcp_keepidle: Idle time after which to start keepalives + sending + :arg int tcp_keepintvl: Interval in seconds between TCP keepalives + :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect :raises ConfigurationError: If the host/port combination has already been added to the client. """ @@ -1196,7 +1220,8 @@ class BaseClient(BaseClientServer): if conn.host == host and conn.port == port: raise ConfigurationError("Host/port already specified") conn = Connection(host, port, ssl_key, ssl_cert, ssl_ca, - self.client_id) + self.client_id, keepalive, tcp_keepidle, + tcp_keepintvl, tcp_keepcnt) self.inactive_connections.append(conn) self.connections_condition.notifyAll() finally: