From 539e376978aa399a7df50bf360855079ec476969 Mon Sep 17 00:00:00 2001 From: Oded Le'Sage Date: Tue, 11 Feb 2020 11:32:52 -0600 Subject: [PATCH] Update hung process/thread approach The previous commit attempted to prevent a hung process/thread by using a "tcp ping" to determine if the server_endpoint was reachable before deploying the heat stack and starting the heartbeat thread. While this approach works well in theory we're finding that in practice in a live K8 environment we're seeing a lot of random errors: [Errno 111] Connection refused: ConnectionRefusedError There could be numerous reasons for these random connection errors but ZMQ has retry logic which should overcome these problems. This commit updates the sockets used in ZMQ to add a timeout (a padded value of agent_loss_timeout). While this does not prevent the creation of a heat stack and heartbeat thread that might never respond, it does solve the initial problem of having stuck process/threads and getting a clean exit Change-Id: I8193c72120b459c2a18d780d9f8799e8df592e20 --- shaker/engine/messaging.py | 113 +++++-------------------------------- 1 file changed, 13 insertions(+), 100 deletions(-) diff --git a/shaker/engine/messaging.py b/shaker/engine/messaging.py index d8be500..318f683 100644 --- a/shaker/engine/messaging.py +++ b/shaker/engine/messaging.py @@ -14,15 +14,14 @@ # limitations under the License. import multiprocessing -import socket -import time import zmq +from oslo_config import cfg from oslo_log import log as logging + from shaker.agent import agent from shaker.engine import utils -from six.moves import zip_longest -from timeit import default_timer as timer + LOG = logging.getLogger(__name__) @@ -38,20 +37,6 @@ class MessageQueue(object): self.socket.bind("tcp://*:%s" % port) LOG.info('Listening on *:%s', port) - # Test that endpoint is actually reachable - # otherwise the process will get stuck indefinately waiting - # for a REQ/REP that will never happen. - # The code to support this was adapted from pypi package tcping - try: - LOG.info("Testing route to %s" % endpoint) - ping_test = Ping(ip, port) - ping_test.ping(3) - if ping_test._success == 0: - raise socket.timeout("No valid route to %s" % endpoint) - except socket.error as e: - LOG.exception(e) - raise - heartbeat = multiprocessing.Process( target=agent.work, kwargs=dict(agent_id=HEARTBEAT_AGENT, endpoint=endpoint, @@ -62,6 +47,16 @@ class MessageQueue(object): def __iter__(self): try: while True: + # Set a timeout(ms) based on agent_loss_timeout. + # Add some padding so agent_loss_timeout + # has a chance to fire correctly before the socket closes + client_timeout = (cfg.CONF.agent_loss_timeout * 1000) + 3000 + # This prevents a hung process/thread when + # server_endpoint is not reachable + self.socket.setsockopt(zmq.RCVTIMEO, client_timeout) + # If this is not set to 0 the socket will not + # close even if timeout is reached + self.socket.setsockopt(zmq.LINGER, 0) # Wait for next request from client message = self.socket.recv_json() LOG.debug('Received request: %s', message) @@ -81,85 +76,3 @@ class MessageQueue(object): else: LOG.exception(e) raise - - -class Socket(object): - def __init__(self, family, type_, timeout): - s = socket.socket(family, type_) - s.settimeout(timeout) - self._s = s - - def connect(self, host, port=80): - self._s.connect((host, int(port))) - - def shutdown(self): - self._s.shutdown(socket.SHUT_RD) - - def close(self): - self._s.close() - - -class Timer(object): - def __init__(self): - self._start = 0 - self._stop = 0 - - def start(self): - self._start = timer() - - def stop(self): - self._stop = timer() - - def cost(self, funcs, args): - self.start() - for func, arg in zip_longest(funcs, args): - if arg: - func(*arg) - else: - func() - - self.stop() - return self._stop - self._start - - -class Ping(object): - def __init__(self, host, port=80, timeout=1): - self.timer = Timer() - - self._success = 0 - self._failed = 0 - self._conn_times = [] - self._host = host - self._port = port - self._timeout = timeout - - def _create_socket(self, family, type_): - return Socket(family, type_, self._timeout) - - def ping(self, count=10): - for n in range(1, count + 1): - s = self._create_socket(socket.AF_INET, socket.SOCK_STREAM) - try: - time.sleep(1) - cost_time = self.timer.cost( - (s.connect, s.shutdown), - ((self._host, self._port), None)) - s_runtime = 1000 * (cost_time) - - LOG.debug("Connected to %s[:%s]: seq=%d time=%.2f ms" % ( - self._host, self._port, n, s_runtime)) - - self._conn_times.append(s_runtime) - except socket.timeout: - LOG.error("Connected to %s[:%s]: seq=%d time out!" % ( - self._host, self._port, n)) - self._failed += 1 - - except KeyboardInterrupt: - raise KeyboardInterrupt() - - else: - self._success += 1 - - finally: - s.close()