From e4fff5fcc544bb0bafd5a92abb1effcc0e5950a8 Mon Sep 17 00:00:00 2001 From: "DeJaeger, Darren (dd118r)" Date: Thu, 11 Jun 2020 09:30:26 -0400 Subject: [PATCH] Add psuedo lazy pirate stragey to agent, with socket timeouts This adds a pseudo lazy pirate strategy to the agent, with configurable socket timeouts and retry counts. Change-Id: I57bd09c33d071f1cc975e00b9e2deeb715f19bd6 --- etc/shaker.conf | 4 ++++ shaker/agent/agent.py | 29 ++++++++++++++++++++++++++--- shaker/engine/config.py | 17 ++++++++++++++++- 3 files changed, 46 insertions(+), 4 deletions(-) diff --git a/etc/shaker.conf b/etc/shaker.conf index c7c1198..1a0e9bb 100644 --- a/etc/shaker.conf +++ b/etc/shaker.conf @@ -335,6 +335,10 @@ # Agent unique id, defaults to MAC of primary interface. (string value) #agent_id = +# Prior to exiting, the number of reconnects the Agent will attempt with the +# server upon socket operation errors. (integer value) +#agent_socket_conn_retries = 10 + # Heat template containing receipt of building the image. Can be a file name or # one of aliases: "centos", "debian", "ubuntu". Defaults to "ubuntu". (string # value) diff --git a/shaker/agent/agent.py b/shaker/agent/agent.py index 3fbad92..390c851 100644 --- a/shaker/agent/agent.py +++ b/shaker/agent/agent.py @@ -89,9 +89,13 @@ def sleep(seconds): time.sleep(seconds) -def get_socket(endpoint): - context = zmq.Context() +def get_socket(context, endpoint): socket = context.socket(zmq.REQ) + socket.setsockopt(zmq.LINGER, 0) + if 'agent_socket_recv_timeout' in cfg.CONF: + socket.setsockopt(zmq.RCVTIMEO, cfg.CONF.agent_socket_recv_timeout) + if 'agent_socket_send_timeout' in cfg.CONF: + socket.setsockopt(zmq.SNDTIMEO, cfg.CONF.agent_socket_send_timeout) socket.connect('tcp://%s' % endpoint) return socket @@ -133,11 +137,19 @@ def work(agent_id, endpoint, polling_interval=config.DEFAULT_POLLING_INTERVAL, agent_config = dict(polling_interval=polling_interval) LOG.info('Agent config: %s', agent_config) - socket = get_socket(endpoint) + if 'agent_socket_conn_retries' in cfg.CONF: + socket_conn_retries = cfg.CONF.agent_socket_conn_retries + else: + socket_conn_retries = config.DEFAULT_SOCKET_CONN_RETRIES + + context = zmq.Context() + socket = get_socket(context, endpoint) + socket_retries_left = socket_conn_retries while True: try: work_act(socket, agent_id, agent_config) + socket_retries_left = socket_conn_retries except BaseException as e: if isinstance(e, KeyboardInterrupt): @@ -146,10 +158,21 @@ def work(agent_id, endpoint, polling_interval=config.DEFAULT_POLLING_INTERVAL, else: LOG.info('Process is interrupted') sys.exit(3) + elif isinstance(e, zmq.error.ZMQError): + socket.close() + socket_retries_left -= 1 + if socket_retries_left <= 0: + LOG.exception(e) + break + LOG.warning('Socket reconnecting...') + socket = get_socket(context, endpoint) else: LOG.exception(e) break + socket.close() + context.term() + def get_node_uuid(): s = '%012x' % uuid.getnode() diff --git a/shaker/engine/config.py b/shaker/engine/config.py index 8b12a9e..93a3988 100644 --- a/shaker/engine/config.py +++ b/shaker/engine/config.py @@ -31,6 +31,7 @@ REPORT_TEMPLATES = 'shaker/resources/report_templates/' SCENARIOS = 'shaker/scenarios/' SCHEMAS = 'shaker/resources/schemas/' DEFAULT_POLLING_INTERVAL = 10 +DEFAULT_SOCKET_CONN_RETRIES = 10 class Endpoint(types.String): @@ -75,7 +76,7 @@ COMMON_OPTS = [ cfg.IntOpt('polling-interval', default=(utils.env('SHAKER_POLLING_INTERVAL') or DEFAULT_POLLING_INTERVAL), - help='How frequently the agent polls server, in seconds') + help='How frequently the agent polls server, in seconds'), ] OPENSTACK_OPTS = [ @@ -285,6 +286,20 @@ AGENT_OPTS = [ cfg.StrOpt('agent-id', default=utils.env('SHAKER_AGENT_ID'), help='Agent unique id, defaults to MAC of primary interface.'), + cfg.IntOpt('agent-socket-recv-timeout', + default=utils.env('SHAKER_AGENT_SOCKET_RECV_TIMEOUT'), + help='The amount of time the socket will wait for ' + 'a response from a sent message, in milliseconds.'), + cfg.IntOpt('agent-socket-send-timeout', + default=utils.env('SHAKER_AGENT_SOCKET_SEND_TIMEOUT'), + help='The amount of time the socket will wait until ' + 'a sent message is accepted, in milliseconds.'), + cfg.IntOpt('agent-socket-conn-retries', + default=(utils.env('SHAKER_AGENT_SOCKET_CONN_RETRIES') or + DEFAULT_SOCKET_CONN_RETRIES), + help='Prior to exiting, the number of reconnects the Agent ' + 'will attempt with the server upon socket operation ' + 'errors.'), ] IMAGE_BUILDER_OPTS = [