From 41bfe34311f212ab637a1aa4b81e9bd714b44fec Mon Sep 17 00:00:00 2001 From: Fabio Verboso Date: Fri, 17 Feb 2017 14:28:00 +0100 Subject: [PATCH] fix shutdown procedures for conductor and agent Change-Id: Ie7ab10280491ac1069857e3091c7273af31f8892 --- .gitignore | 1 + iotronic/conductor/endpoints.py | 22 +++++++------- iotronic/conductor/manager.py | 28 ++++++++++------- iotronic/db/api.py | 17 ++++++++++- iotronic/db/sqlalchemy/api.py | 7 +++-- iotronic/wamp/agent.py | 53 ++++++++++++++++++++++----------- 6 files changed, 87 insertions(+), 41 deletions(-) diff --git a/.gitignore b/.gitignore index e8f222a..c3709d7 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ .idea iotronic.egg-info build +*.pyc diff --git a/iotronic/conductor/endpoints.py b/iotronic/conductor/endpoints.py index 247a489..329548c 100644 --- a/iotronic/conductor/endpoints.py +++ b/iotronic/conductor/endpoints.py @@ -21,16 +21,18 @@ from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +import random + LOG = logging.getLogger(__name__) serializer = objects_base.IotronicObjectSerializer() def get_best_agent(ctx): - agents = objects.WampAgent.list(ctx, filters=[{'online': True}]) - agent = agents.pop(0) + agents = objects.WampAgent.list(ctx, filters={'online': True}) + LOG.debug('found %d Agent(s).', len(agents)) + agent = random.choice(agents) LOG.debug('Selected agent: %s', agent.hostname) - agents.append(agent) return agent.hostname @@ -40,6 +42,7 @@ class ConductorEndpoint(object): self.target = oslo_messaging.Target() self.wamp_agent_client = oslo_messaging.RPCClient(transport, self.target) + self.wamp_agent_client.prepare(timeout=10) self.ragent = ragent def echo(self, ctx, data): @@ -47,7 +50,7 @@ class ConductorEndpoint(object): return data def registration_uuid(self, ctx, uuid, session_num): - LOG.debug('Receved registration from %s with session %s', + LOG.debug('Received registration from %s with session %s', uuid, session_num) try: node = objects.Node.get_by_uuid(ctx, uuid) @@ -73,7 +76,7 @@ class ConductorEndpoint(object): return def registration(self, ctx, code, session_num): - LOG.debug('Receved registration from %s with session %s', + LOG.debug('Received registration from %s with session %s', code, session_num) try: node = objects.Node.get_by_code(ctx, code) @@ -137,16 +140,15 @@ class ConductorEndpoint(object): def execute_on_node(self, ctx, node_uuid, wamp_rpc_call, wamp_rpc_args): LOG.debug('Executing \"%s\" on the node: %s', wamp_rpc_call, node_uuid) - try: - node = objects.Node.get_by_uuid(ctx, node_uuid) - except Exception: - return exception.NodeNotFound(node=node_uuid) + node = objects.Node.get_by_uuid(ctx, node_uuid) + + # check the session; it rise an excpetion if session miss + # session = objects.SessionWP.get_session_by_node_uuid(node_uuid) s4t_topic = 's4t_invoke_wamp' full_topic = node.agent + '.' + s4t_topic self.target.topic = full_topic - self.wamp_agent_client.prepare(timeout=10) full_wamp_call = 'iotronic.' + node.uuid + "." + wamp_rpc_call diff --git a/iotronic/conductor/manager.py b/iotronic/conductor/manager.py index 76e56ee..8165254 100644 --- a/iotronic/conductor/manager.py +++ b/iotronic/conductor/manager.py @@ -17,9 +17,11 @@ from iotronic.common.i18n import _LI from iotronic.common.i18n import _LW from iotronic.conductor import endpoints as endp from iotronic.db import api as dbapi +import os from oslo_config import cfg from oslo_log import log as logging import oslo_messaging +import signal import time LOG = logging.getLogger(__name__) @@ -51,6 +53,8 @@ class ConductorManager(object): CONF(project='iotronic') logging.setup(CONF, "iotronic-conductor") + signal.signal(signal.SIGINT, self.stop_handler) + if not host: host = CONF.host self.host = host @@ -81,18 +85,22 @@ class ConductorManager(object): endpoints = [ endp.ConductorEndpoint(ragent), ] - server = oslo_messaging.get_rpc_server(transport, target, endpoints, - executor='threading') + self.server = oslo_messaging.get_rpc_server(transport, + target, + endpoints, + executor='threading') - try: - server.start() - while True: - time.sleep(1) - except KeyboardInterrupt: - print("Stopping server") + self.server.start() - server.stop() - # server.wait() + while True: + time.sleep(1) + + def stop_handler(self, signum, frame): + LOG.info("Stopping server") + self.server.stop() + self.server.wait() + self.del_host() + os._exit(0) def del_host(self, deregister=True): if deregister: diff --git a/iotronic/db/api.py b/iotronic/db/api.py index 8624891..b12a27d 100644 --- a/iotronic/db/api.py +++ b/iotronic/db/api.py @@ -24,7 +24,8 @@ from oslo_db import api as db_api import six _BACKEND_MAPPING = {'sqlalchemy': 'iotronic.db.sqlalchemy.api'} -IMPL = db_api.DBAPI.from_config(cfg.CONF, backend_mapping=_BACKEND_MAPPING, +IMPL = db_api.DBAPI.from_config(cfg.CONF, + backend_mapping=_BACKEND_MAPPING, lazy=True) @@ -287,3 +288,17 @@ class Connection(object): :param sort_dir: direction in which results should be sorted. (asc, desc) """ + + @abc.abstractmethod + def get_session_by_node_uuid(self, filters=None, limit=None, marker=None, + sort_key=None, sort_dir=None): + """Return a Wamp session of a Node + + :param filters: Filters to apply. Defaults to None. + :param limit: Maximum number of wampagents to return. + :param marker: the last item of the previous page; we return the next + result set. + :param sort_key: Attribute by which results should be sorted. + :param sort_dir: direction in which results should be sorted. + (asc, desc) + """ diff --git a/iotronic/db/sqlalchemy/api.py b/iotronic/db/sqlalchemy/api.py index 9c5a376..e0727e4 100644 --- a/iotronic/db/sqlalchemy/api.py +++ b/iotronic/db/sqlalchemy/api.py @@ -104,6 +104,7 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None, raise exception.InvalidParameterValue( _('The sort_key value "%(key)s" is an invalid field for sorting') % {'key': sort_key}) + return query.all() @@ -140,9 +141,9 @@ class Connection(api.Connection): if 'online' in filters: if filters['online']: - query = query.filter(models.WampAgent.online is False) + query = query.filter(models.WampAgent.online == 1) else: - query = query.filter(models.WampAgent.online is True) + query = query.filter(models.WampAgent.online == 0) return query @@ -384,7 +385,7 @@ class Connection(api.Connection): try: return query.one() except NoResultFound: - return None + raise exception.NodeNotConnected(node=node_uuid) def get_session_by_session_id(self, session_id): query = model_query(models.SessionWP).filter_by(session_id=session_id) diff --git a/iotronic/wamp/agent.py b/iotronic/wamp/agent.py index 37282b6..0b606e2 100644 --- a/iotronic/wamp/agent.py +++ b/iotronic/wamp/agent.py @@ -17,6 +17,7 @@ from autobahn.twisted import wamp from autobahn.twisted import websocket from autobahn.wamp import types from iotronic.common import exception +from iotronic.common.i18n import _LI from iotronic.common.i18n import _LW from iotronic.db import api as dbapi from oslo_config import cfg @@ -27,6 +28,9 @@ from threading import Thread from twisted.internet.protocol import ReconnectingClientFactory from twisted.internet import reactor +import os +import signal + LOG = logging.getLogger(__name__) wamp_opts = [ @@ -160,15 +164,13 @@ class RPCServer(Thread): executor='threading') def run(self): + LOG.info("Starting AMQP server... ") + self.server.start() - try: - LOG.info("Starting AMQP server... ") - self.server.start() - except KeyboardInterrupt: - - LOG.info("Stopping AMQP server... ") - self.server.stop() - LOG.info("AMQP server stopped. ") + def stop(self): + LOG.info("Stopping AMQP server... ") + self.server.stop() + LOG.info("AMQP server stopped. ") class WampManager(object): @@ -198,6 +200,8 @@ class WampManager(object): class WampAgent(object): def __init__(self, host): + signal.signal(signal.SIGINT, self.stop_handler) + logging.register_options(CONF) CONF(project='iotronic') logging.setup(CONF, "iotronic-wamp-agent") @@ -225,13 +229,28 @@ class WampAgent(object): global AGENT_HOST AGENT_HOST = self.host - r = RPCServer() - w = WampManager() + self.r = RPCServer() + self.w = WampManager() - try: - r.start() - w.start() - except KeyboardInterrupt: - w.stop() - r.stop() - exit() + self.r.start() + self.w.start() + + def del_host(self, deregister=True): + if deregister: + try: + self.dbapi.unregister_wampagent(self.host) + LOG.info(_LI('Successfully stopped wampagent with hostname ' + '%(hostname)s.'), + {'hostname': self.host}) + except exception.WampAgentNotFound: + pass + else: + LOG.info(_LI('Not deregistering wampagent with hostname ' + '%(hostname)s.'), + {'hostname': self.host}) + + def stop_handler(self, signum, frame): + self.w.stop() + self.r.stop() + self.del_host() + os._exit(0)