fix shutdown procedures for conductor and agent

Change-Id: Ie7ab10280491ac1069857e3091c7273af31f8892
This commit is contained in:
Fabio Verboso 2017-02-17 14:28:00 +01:00
parent 66d7db87cc
commit 41bfe34311
6 changed files with 87 additions and 41 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@
.idea
iotronic.egg-info
build
*.pyc

View File

@ -21,16 +21,18 @@ from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
import random
LOG = logging.getLogger(__name__)
serializer = objects_base.IotronicObjectSerializer()
def get_best_agent(ctx):
agents = objects.WampAgent.list(ctx, filters=[{'online': True}])
agent = agents.pop(0)
agents = objects.WampAgent.list(ctx, filters={'online': True})
LOG.debug('found %d Agent(s).', len(agents))
agent = random.choice(agents)
LOG.debug('Selected agent: %s', agent.hostname)
agents.append(agent)
return agent.hostname
@ -40,6 +42,7 @@ class ConductorEndpoint(object):
self.target = oslo_messaging.Target()
self.wamp_agent_client = oslo_messaging.RPCClient(transport,
self.target)
self.wamp_agent_client.prepare(timeout=10)
self.ragent = ragent
def echo(self, ctx, data):
@ -47,7 +50,7 @@ class ConductorEndpoint(object):
return data
def registration_uuid(self, ctx, uuid, session_num):
LOG.debug('Receved registration from %s with session %s',
LOG.debug('Received registration from %s with session %s',
uuid, session_num)
try:
node = objects.Node.get_by_uuid(ctx, uuid)
@ -73,7 +76,7 @@ class ConductorEndpoint(object):
return
def registration(self, ctx, code, session_num):
LOG.debug('Receved registration from %s with session %s',
LOG.debug('Received registration from %s with session %s',
code, session_num)
try:
node = objects.Node.get_by_code(ctx, code)
@ -137,16 +140,15 @@ class ConductorEndpoint(object):
def execute_on_node(self, ctx, node_uuid, wamp_rpc_call, wamp_rpc_args):
LOG.debug('Executing \"%s\" on the node: %s', wamp_rpc_call, node_uuid)
try:
node = objects.Node.get_by_uuid(ctx, node_uuid)
except Exception:
return exception.NodeNotFound(node=node_uuid)
node = objects.Node.get_by_uuid(ctx, node_uuid)
# check the session; it rise an excpetion if session miss
# session = objects.SessionWP.get_session_by_node_uuid(node_uuid)
s4t_topic = 's4t_invoke_wamp'
full_topic = node.agent + '.' + s4t_topic
self.target.topic = full_topic
self.wamp_agent_client.prepare(timeout=10)
full_wamp_call = 'iotronic.' + node.uuid + "." + wamp_rpc_call

View File

@ -17,9 +17,11 @@ from iotronic.common.i18n import _LI
from iotronic.common.i18n import _LW
from iotronic.conductor import endpoints as endp
from iotronic.db import api as dbapi
import os
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
import signal
import time
LOG = logging.getLogger(__name__)
@ -51,6 +53,8 @@ class ConductorManager(object):
CONF(project='iotronic')
logging.setup(CONF, "iotronic-conductor")
signal.signal(signal.SIGINT, self.stop_handler)
if not host:
host = CONF.host
self.host = host
@ -81,18 +85,22 @@ class ConductorManager(object):
endpoints = [
endp.ConductorEndpoint(ragent),
]
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
executor='threading')
self.server = oslo_messaging.get_rpc_server(transport,
target,
endpoints,
executor='threading')
try:
server.start()
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping server")
self.server.start()
server.stop()
# server.wait()
while True:
time.sleep(1)
def stop_handler(self, signum, frame):
LOG.info("Stopping server")
self.server.stop()
self.server.wait()
self.del_host()
os._exit(0)
def del_host(self, deregister=True):
if deregister:

View File

@ -24,7 +24,8 @@ from oslo_db import api as db_api
import six
_BACKEND_MAPPING = {'sqlalchemy': 'iotronic.db.sqlalchemy.api'}
IMPL = db_api.DBAPI.from_config(cfg.CONF, backend_mapping=_BACKEND_MAPPING,
IMPL = db_api.DBAPI.from_config(cfg.CONF,
backend_mapping=_BACKEND_MAPPING,
lazy=True)
@ -287,3 +288,17 @@ class Connection(object):
:param sort_dir: direction in which results should be sorted.
(asc, desc)
"""
@abc.abstractmethod
def get_session_by_node_uuid(self, filters=None, limit=None, marker=None,
sort_key=None, sort_dir=None):
"""Return a Wamp session of a Node
:param filters: Filters to apply. Defaults to None.
:param limit: Maximum number of wampagents to return.
:param marker: the last item of the previous page; we return the next
result set.
:param sort_key: Attribute by which results should be sorted.
:param sort_dir: direction in which results should be sorted.
(asc, desc)
"""

View File

@ -104,6 +104,7 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None,
raise exception.InvalidParameterValue(
_('The sort_key value "%(key)s" is an invalid field for sorting')
% {'key': sort_key})
return query.all()
@ -140,9 +141,9 @@ class Connection(api.Connection):
if 'online' in filters:
if filters['online']:
query = query.filter(models.WampAgent.online is False)
query = query.filter(models.WampAgent.online == 1)
else:
query = query.filter(models.WampAgent.online is True)
query = query.filter(models.WampAgent.online == 0)
return query
@ -384,7 +385,7 @@ class Connection(api.Connection):
try:
return query.one()
except NoResultFound:
return None
raise exception.NodeNotConnected(node=node_uuid)
def get_session_by_session_id(self, session_id):
query = model_query(models.SessionWP).filter_by(session_id=session_id)

View File

@ -17,6 +17,7 @@ from autobahn.twisted import wamp
from autobahn.twisted import websocket
from autobahn.wamp import types
from iotronic.common import exception
from iotronic.common.i18n import _LI
from iotronic.common.i18n import _LW
from iotronic.db import api as dbapi
from oslo_config import cfg
@ -27,6 +28,9 @@ from threading import Thread
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet import reactor
import os
import signal
LOG = logging.getLogger(__name__)
wamp_opts = [
@ -160,15 +164,13 @@ class RPCServer(Thread):
executor='threading')
def run(self):
LOG.info("Starting AMQP server... ")
self.server.start()
try:
LOG.info("Starting AMQP server... ")
self.server.start()
except KeyboardInterrupt:
LOG.info("Stopping AMQP server... ")
self.server.stop()
LOG.info("AMQP server stopped. ")
def stop(self):
LOG.info("Stopping AMQP server... ")
self.server.stop()
LOG.info("AMQP server stopped. ")
class WampManager(object):
@ -198,6 +200,8 @@ class WampManager(object):
class WampAgent(object):
def __init__(self, host):
signal.signal(signal.SIGINT, self.stop_handler)
logging.register_options(CONF)
CONF(project='iotronic')
logging.setup(CONF, "iotronic-wamp-agent")
@ -225,13 +229,28 @@ class WampAgent(object):
global AGENT_HOST
AGENT_HOST = self.host
r = RPCServer()
w = WampManager()
self.r = RPCServer()
self.w = WampManager()
try:
r.start()
w.start()
except KeyboardInterrupt:
w.stop()
r.stop()
exit()
self.r.start()
self.w.start()
def del_host(self, deregister=True):
if deregister:
try:
self.dbapi.unregister_wampagent(self.host)
LOG.info(_LI('Successfully stopped wampagent with hostname '
'%(hostname)s.'),
{'hostname': self.host})
except exception.WampAgentNotFound:
pass
else:
LOG.info(_LI('Not deregistering wampagent with hostname '
'%(hostname)s.'),
{'hostname': self.host})
def stop_handler(self, signum, frame):
self.w.stop()
self.r.stop()
self.del_host()
os._exit(0)