Add support for keepalive to client

A gearman client only waiting for jobs will wait indefinitely if the
gearman server vanishes (e.g. due to a VM crash). In this case there
is no traffic on the connection and the client blocks forever if there
is nothing in between that forcefully terminates the connection.

Adding tcp keepalive can mitigate that and the connection will be
terminated by the kernel in this situation which then triggers a
reconnect.

Change-Id: I8589cd45450245a25539c051355b38d16ee9f4b9
This commit is contained in:
Tobias Henkel 2018-09-04 13:30:40 +02:00
parent c00ca944db
commit 71dbac070c
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
1 changed files with 28 additions and 3 deletions

View File

@ -124,16 +124,25 @@ class Connection(object):
:arg str client_id: The client ID associated with this connection.
It will be appending to the name of the logger (e.g.,
gear.Connection.client_id). Defaults to 'unknown'.
:arg bool keepalive: Whether to use TCP keepalives
:arg int tcp_keepidle: Idle time after which to start keepalives sending
:arg int tcp_keepintvl: Interval in seconds between TCP keepalives
:arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
"""
def __init__(self, host, port, ssl_key=None, ssl_cert=None, ssl_ca=None,
client_id='unknown'):
client_id='unknown', keepalive=False, tcp_keepidle=7200,
tcp_keepintvl=75, tcp_keepcnt=9):
self.log = logging.getLogger("gear.Connection.%s" % (client_id,))
self.host = host
self.port = port
self.ssl_key = ssl_key
self.ssl_cert = ssl_cert
self.ssl_ca = ssl_ca
self.keepalive = keepalive
self.tcp_keepcnt = tcp_keepcnt
self.tcp_keepintvl = tcp_keepintvl
self.tcp_keepidle = tcp_keepidle
self.use_ssl = False
if all([self.ssl_key, self.ssl_cert, self.ssl_ca]):
@ -182,6 +191,14 @@ class Connection(object):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
if self.keepalive:
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
self.tcp_keepidle)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL,
self.tcp_keepintvl)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT,
self.tcp_keepcnt)
except socket.error:
s = None
continue
@ -1162,7 +1179,9 @@ class BaseClient(BaseClientServer):
self.broadcast_lock = threading.RLock()
def addServer(self, host, port=4730,
ssl_key=None, ssl_cert=None, ssl_ca=None):
ssl_key=None, ssl_cert=None, ssl_ca=None,
keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75,
tcp_keepcnt=9):
"""Add a server to the client's connection pool.
Any number of Gearman servers may be added to a client. The
@ -1184,6 +1203,11 @@ class BaseClient(BaseClientServer):
:arg str ssl_key: Path to the SSL private key.
:arg str ssl_cert: Path to the SSL certificate.
:arg str ssl_ca: Path to the CA certificate.
:arg bool keepalive: Whether to use TCP keepalives
:arg int tcp_keepidle: Idle time after which to start keepalives
sending
:arg int tcp_keepintvl: Interval in seconds between TCP keepalives
:arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
:raises ConfigurationError: If the host/port combination has
already been added to the client.
"""
@ -1196,7 +1220,8 @@ class BaseClient(BaseClientServer):
if conn.host == host and conn.port == port:
raise ConfigurationError("Host/port already specified")
conn = Connection(host, port, ssl_key, ssl_cert, ssl_ca,
self.client_id)
self.client_id, keepalive, tcp_keepidle,
tcp_keepintvl, tcp_keepcnt)
self.inactive_connections.append(conn)
self.connections_condition.notifyAll()
finally: