Update hung process/thread approach

The previous commit attempted to prevent a hung process/thread by using
a "tcp ping" to determine if the server_endpoint was reachable before
deploying the heat stack and starting the heartbeat thread.

While this approach works well in theory we're finding that in practice
in a live K8 environment we're seeing a lot of random errors:
[Errno 111] Connection refused: ConnectionRefusedError

There could be numerous reasons for these random connection errors but
ZMQ has retry logic which should overcome these problems.

This commit updates the sockets used in ZMQ to add a timeout (a padded
value of agent_loss_timeout). While this does not prevent the
creation of a heat stack and heartbeat thread that might never respond,
it does solve the initial problem of having stuck process/threads and
getting a clean exit

Change-Id: I8193c72120b459c2a18d780d9f8799e8df592e20
This commit is contained in:
Oded Le'Sage 2020-02-11 11:32:52 -06:00
parent 639760a0fa
commit 539e376978
1 changed files with 13 additions and 100 deletions

View File

@ -14,15 +14,14 @@
# limitations under the License.
import multiprocessing
import socket
import time
import zmq
from oslo_config import cfg
from oslo_log import log as logging
from shaker.agent import agent
from shaker.engine import utils
from six.moves import zip_longest
from timeit import default_timer as timer
LOG = logging.getLogger(__name__)
@ -38,20 +37,6 @@ class MessageQueue(object):
self.socket.bind("tcp://*:%s" % port)
LOG.info('Listening on *:%s', port)
# Test that endpoint is actually reachable
# otherwise the process will get stuck indefinately waiting
# for a REQ/REP that will never happen.
# The code to support this was adapted from pypi package tcping
try:
LOG.info("Testing route to %s" % endpoint)
ping_test = Ping(ip, port)
ping_test.ping(3)
if ping_test._success == 0:
raise socket.timeout("No valid route to %s" % endpoint)
except socket.error as e:
LOG.exception(e)
raise
heartbeat = multiprocessing.Process(
target=agent.work,
kwargs=dict(agent_id=HEARTBEAT_AGENT, endpoint=endpoint,
@ -62,6 +47,16 @@ class MessageQueue(object):
def __iter__(self):
try:
while True:
# Set a timeout(ms) based on agent_loss_timeout.
# Add some padding so agent_loss_timeout
# has a chance to fire correctly before the socket closes
client_timeout = (cfg.CONF.agent_loss_timeout * 1000) + 3000
# This prevents a hung process/thread when
# server_endpoint is not reachable
self.socket.setsockopt(zmq.RCVTIMEO, client_timeout)
# If this is not set to 0 the socket will not
# close even if timeout is reached
self.socket.setsockopt(zmq.LINGER, 0)
# Wait for next request from client
message = self.socket.recv_json()
LOG.debug('Received request: %s', message)
@ -81,85 +76,3 @@ class MessageQueue(object):
else:
LOG.exception(e)
raise
class Socket(object):
def __init__(self, family, type_, timeout):
s = socket.socket(family, type_)
s.settimeout(timeout)
self._s = s
def connect(self, host, port=80):
self._s.connect((host, int(port)))
def shutdown(self):
self._s.shutdown(socket.SHUT_RD)
def close(self):
self._s.close()
class Timer(object):
def __init__(self):
self._start = 0
self._stop = 0
def start(self):
self._start = timer()
def stop(self):
self._stop = timer()
def cost(self, funcs, args):
self.start()
for func, arg in zip_longest(funcs, args):
if arg:
func(*arg)
else:
func()
self.stop()
return self._stop - self._start
class Ping(object):
def __init__(self, host, port=80, timeout=1):
self.timer = Timer()
self._success = 0
self._failed = 0
self._conn_times = []
self._host = host
self._port = port
self._timeout = timeout
def _create_socket(self, family, type_):
return Socket(family, type_, self._timeout)
def ping(self, count=10):
for n in range(1, count + 1):
s = self._create_socket(socket.AF_INET, socket.SOCK_STREAM)
try:
time.sleep(1)
cost_time = self.timer.cost(
(s.connect, s.shutdown),
((self._host, self._port), None))
s_runtime = 1000 * (cost_time)
LOG.debug("Connected to %s[:%s]: seq=%d time=%.2f ms" % (
self._host, self._port, n, s_runtime))
self._conn_times.append(s_runtime)
except socket.timeout:
LOG.error("Connected to %s[:%s]: seq=%d time out!" % (
self._host, self._port, n))
self._failed += 1
except KeyboardInterrupt:
raise KeyboardInterrupt()
else:
self._success += 1
finally:
s.close()