Add support for keepalive to client
A gearman client only waiting for jobs will wait indefinitely if the gearman server vanishes (e.g. due to a VM crash). In this case there is no traffic on the connection and the client blocks forever if there is nothing in between that forcefully terminates the connection. Adding tcp keepalive can mitigate that and the connection will be terminated by the kernel in this situation which then triggers a reconnect. Change-Id: I8589cd45450245a25539c051355b38d16ee9f4b9
This commit is contained in:
parent
c00ca944db
commit
71dbac070c
|
@ -124,16 +124,25 @@ class Connection(object):
|
|||
:arg str client_id: The client ID associated with this connection.
|
||||
It will be appending to the name of the logger (e.g.,
|
||||
gear.Connection.client_id). Defaults to 'unknown'.
|
||||
:arg bool keepalive: Whether to use TCP keepalives
|
||||
:arg int tcp_keepidle: Idle time after which to start keepalives sending
|
||||
:arg int tcp_keepintvl: Interval in seconds between TCP keepalives
|
||||
:arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
|
||||
"""
|
||||
|
||||
def __init__(self, host, port, ssl_key=None, ssl_cert=None, ssl_ca=None,
|
||||
client_id='unknown'):
|
||||
client_id='unknown', keepalive=False, tcp_keepidle=7200,
|
||||
tcp_keepintvl=75, tcp_keepcnt=9):
|
||||
self.log = logging.getLogger("gear.Connection.%s" % (client_id,))
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.ssl_key = ssl_key
|
||||
self.ssl_cert = ssl_cert
|
||||
self.ssl_ca = ssl_ca
|
||||
self.keepalive = keepalive
|
||||
self.tcp_keepcnt = tcp_keepcnt
|
||||
self.tcp_keepintvl = tcp_keepintvl
|
||||
self.tcp_keepidle = tcp_keepidle
|
||||
|
||||
self.use_ssl = False
|
||||
if all([self.ssl_key, self.ssl_cert, self.ssl_ca]):
|
||||
|
@ -182,6 +191,14 @@ class Connection(object):
|
|||
af, socktype, proto, canonname, sa = res
|
||||
try:
|
||||
s = socket.socket(af, socktype, proto)
|
||||
if self.keepalive:
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
|
||||
self.tcp_keepidle)
|
||||
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL,
|
||||
self.tcp_keepintvl)
|
||||
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT,
|
||||
self.tcp_keepcnt)
|
||||
except socket.error:
|
||||
s = None
|
||||
continue
|
||||
|
@ -1162,7 +1179,9 @@ class BaseClient(BaseClientServer):
|
|||
self.broadcast_lock = threading.RLock()
|
||||
|
||||
def addServer(self, host, port=4730,
|
||||
ssl_key=None, ssl_cert=None, ssl_ca=None):
|
||||
ssl_key=None, ssl_cert=None, ssl_ca=None,
|
||||
keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75,
|
||||
tcp_keepcnt=9):
|
||||
"""Add a server to the client's connection pool.
|
||||
|
||||
Any number of Gearman servers may be added to a client. The
|
||||
|
@ -1184,6 +1203,11 @@ class BaseClient(BaseClientServer):
|
|||
:arg str ssl_key: Path to the SSL private key.
|
||||
:arg str ssl_cert: Path to the SSL certificate.
|
||||
:arg str ssl_ca: Path to the CA certificate.
|
||||
:arg bool keepalive: Whether to use TCP keepalives
|
||||
:arg int tcp_keepidle: Idle time after which to start keepalives
|
||||
sending
|
||||
:arg int tcp_keepintvl: Interval in seconds between TCP keepalives
|
||||
:arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
|
||||
:raises ConfigurationError: If the host/port combination has
|
||||
already been added to the client.
|
||||
"""
|
||||
|
@ -1196,7 +1220,8 @@ class BaseClient(BaseClientServer):
|
|||
if conn.host == host and conn.port == port:
|
||||
raise ConfigurationError("Host/port already specified")
|
||||
conn = Connection(host, port, ssl_key, ssl_cert, ssl_ca,
|
||||
self.client_id)
|
||||
self.client_id, keepalive, tcp_keepidle,
|
||||
tcp_keepintvl, tcp_keepcnt)
|
||||
self.inactive_connections.append(conn)
|
||||
self.connections_condition.notifyAll()
|
||||
finally:
|
||||
|
|
Loading…
Reference in New Issue