fix shutdown procedures for conductor and agent
Change-Id: Ie7ab10280491ac1069857e3091c7273af31f8892
This commit is contained in:
parent
66d7db87cc
commit
41bfe34311
|
@ -2,3 +2,4 @@
|
|||
.idea
|
||||
iotronic.egg-info
|
||||
build
|
||||
*.pyc
|
||||
|
|
|
@ -21,16 +21,18 @@ from oslo_config import cfg
|
|||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
|
||||
import random
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
serializer = objects_base.IotronicObjectSerializer()
|
||||
|
||||
|
||||
def get_best_agent(ctx):
|
||||
agents = objects.WampAgent.list(ctx, filters=[{'online': True}])
|
||||
agent = agents.pop(0)
|
||||
agents = objects.WampAgent.list(ctx, filters={'online': True})
|
||||
LOG.debug('found %d Agent(s).', len(agents))
|
||||
agent = random.choice(agents)
|
||||
LOG.debug('Selected agent: %s', agent.hostname)
|
||||
agents.append(agent)
|
||||
return agent.hostname
|
||||
|
||||
|
||||
|
@ -40,6 +42,7 @@ class ConductorEndpoint(object):
|
|||
self.target = oslo_messaging.Target()
|
||||
self.wamp_agent_client = oslo_messaging.RPCClient(transport,
|
||||
self.target)
|
||||
self.wamp_agent_client.prepare(timeout=10)
|
||||
self.ragent = ragent
|
||||
|
||||
def echo(self, ctx, data):
|
||||
|
@ -47,7 +50,7 @@ class ConductorEndpoint(object):
|
|||
return data
|
||||
|
||||
def registration_uuid(self, ctx, uuid, session_num):
|
||||
LOG.debug('Receved registration from %s with session %s',
|
||||
LOG.debug('Received registration from %s with session %s',
|
||||
uuid, session_num)
|
||||
try:
|
||||
node = objects.Node.get_by_uuid(ctx, uuid)
|
||||
|
@ -73,7 +76,7 @@ class ConductorEndpoint(object):
|
|||
return
|
||||
|
||||
def registration(self, ctx, code, session_num):
|
||||
LOG.debug('Receved registration from %s with session %s',
|
||||
LOG.debug('Received registration from %s with session %s',
|
||||
code, session_num)
|
||||
try:
|
||||
node = objects.Node.get_by_code(ctx, code)
|
||||
|
@ -137,16 +140,15 @@ class ConductorEndpoint(object):
|
|||
def execute_on_node(self, ctx, node_uuid, wamp_rpc_call, wamp_rpc_args):
|
||||
LOG.debug('Executing \"%s\" on the node: %s', wamp_rpc_call, node_uuid)
|
||||
|
||||
try:
|
||||
node = objects.Node.get_by_uuid(ctx, node_uuid)
|
||||
except Exception:
|
||||
return exception.NodeNotFound(node=node_uuid)
|
||||
node = objects.Node.get_by_uuid(ctx, node_uuid)
|
||||
|
||||
# check the session; it rise an excpetion if session miss
|
||||
# session = objects.SessionWP.get_session_by_node_uuid(node_uuid)
|
||||
|
||||
s4t_topic = 's4t_invoke_wamp'
|
||||
full_topic = node.agent + '.' + s4t_topic
|
||||
|
||||
self.target.topic = full_topic
|
||||
self.wamp_agent_client.prepare(timeout=10)
|
||||
|
||||
full_wamp_call = 'iotronic.' + node.uuid + "." + wamp_rpc_call
|
||||
|
||||
|
|
|
@ -17,9 +17,11 @@ from iotronic.common.i18n import _LI
|
|||
from iotronic.common.i18n import _LW
|
||||
from iotronic.conductor import endpoints as endp
|
||||
from iotronic.db import api as dbapi
|
||||
import os
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
import signal
|
||||
import time
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -51,6 +53,8 @@ class ConductorManager(object):
|
|||
CONF(project='iotronic')
|
||||
logging.setup(CONF, "iotronic-conductor")
|
||||
|
||||
signal.signal(signal.SIGINT, self.stop_handler)
|
||||
|
||||
if not host:
|
||||
host = CONF.host
|
||||
self.host = host
|
||||
|
@ -81,18 +85,22 @@ class ConductorManager(object):
|
|||
endpoints = [
|
||||
endp.ConductorEndpoint(ragent),
|
||||
]
|
||||
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
|
||||
executor='threading')
|
||||
self.server = oslo_messaging.get_rpc_server(transport,
|
||||
target,
|
||||
endpoints,
|
||||
executor='threading')
|
||||
|
||||
try:
|
||||
server.start()
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
print("Stopping server")
|
||||
self.server.start()
|
||||
|
||||
server.stop()
|
||||
# server.wait()
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
def stop_handler(self, signum, frame):
|
||||
LOG.info("Stopping server")
|
||||
self.server.stop()
|
||||
self.server.wait()
|
||||
self.del_host()
|
||||
os._exit(0)
|
||||
|
||||
def del_host(self, deregister=True):
|
||||
if deregister:
|
||||
|
|
|
@ -24,7 +24,8 @@ from oslo_db import api as db_api
|
|||
import six
|
||||
|
||||
_BACKEND_MAPPING = {'sqlalchemy': 'iotronic.db.sqlalchemy.api'}
|
||||
IMPL = db_api.DBAPI.from_config(cfg.CONF, backend_mapping=_BACKEND_MAPPING,
|
||||
IMPL = db_api.DBAPI.from_config(cfg.CONF,
|
||||
backend_mapping=_BACKEND_MAPPING,
|
||||
lazy=True)
|
||||
|
||||
|
||||
|
@ -287,3 +288,17 @@ class Connection(object):
|
|||
:param sort_dir: direction in which results should be sorted.
|
||||
(asc, desc)
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_session_by_node_uuid(self, filters=None, limit=None, marker=None,
|
||||
sort_key=None, sort_dir=None):
|
||||
"""Return a Wamp session of a Node
|
||||
|
||||
:param filters: Filters to apply. Defaults to None.
|
||||
:param limit: Maximum number of wampagents to return.
|
||||
:param marker: the last item of the previous page; we return the next
|
||||
result set.
|
||||
:param sort_key: Attribute by which results should be sorted.
|
||||
:param sort_dir: direction in which results should be sorted.
|
||||
(asc, desc)
|
||||
"""
|
||||
|
|
|
@ -104,6 +104,7 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None,
|
|||
raise exception.InvalidParameterValue(
|
||||
_('The sort_key value "%(key)s" is an invalid field for sorting')
|
||||
% {'key': sort_key})
|
||||
|
||||
return query.all()
|
||||
|
||||
|
||||
|
@ -140,9 +141,9 @@ class Connection(api.Connection):
|
|||
|
||||
if 'online' in filters:
|
||||
if filters['online']:
|
||||
query = query.filter(models.WampAgent.online is False)
|
||||
query = query.filter(models.WampAgent.online == 1)
|
||||
else:
|
||||
query = query.filter(models.WampAgent.online is True)
|
||||
query = query.filter(models.WampAgent.online == 0)
|
||||
|
||||
return query
|
||||
|
||||
|
@ -384,7 +385,7 @@ class Connection(api.Connection):
|
|||
try:
|
||||
return query.one()
|
||||
except NoResultFound:
|
||||
return None
|
||||
raise exception.NodeNotConnected(node=node_uuid)
|
||||
|
||||
def get_session_by_session_id(self, session_id):
|
||||
query = model_query(models.SessionWP).filter_by(session_id=session_id)
|
||||
|
|
|
@ -17,6 +17,7 @@ from autobahn.twisted import wamp
|
|||
from autobahn.twisted import websocket
|
||||
from autobahn.wamp import types
|
||||
from iotronic.common import exception
|
||||
from iotronic.common.i18n import _LI
|
||||
from iotronic.common.i18n import _LW
|
||||
from iotronic.db import api as dbapi
|
||||
from oslo_config import cfg
|
||||
|
@ -27,6 +28,9 @@ from threading import Thread
|
|||
from twisted.internet.protocol import ReconnectingClientFactory
|
||||
from twisted.internet import reactor
|
||||
|
||||
import os
|
||||
import signal
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
wamp_opts = [
|
||||
|
@ -160,15 +164,13 @@ class RPCServer(Thread):
|
|||
executor='threading')
|
||||
|
||||
def run(self):
|
||||
LOG.info("Starting AMQP server... ")
|
||||
self.server.start()
|
||||
|
||||
try:
|
||||
LOG.info("Starting AMQP server... ")
|
||||
self.server.start()
|
||||
except KeyboardInterrupt:
|
||||
|
||||
LOG.info("Stopping AMQP server... ")
|
||||
self.server.stop()
|
||||
LOG.info("AMQP server stopped. ")
|
||||
def stop(self):
|
||||
LOG.info("Stopping AMQP server... ")
|
||||
self.server.stop()
|
||||
LOG.info("AMQP server stopped. ")
|
||||
|
||||
|
||||
class WampManager(object):
|
||||
|
@ -198,6 +200,8 @@ class WampManager(object):
|
|||
class WampAgent(object):
|
||||
def __init__(self, host):
|
||||
|
||||
signal.signal(signal.SIGINT, self.stop_handler)
|
||||
|
||||
logging.register_options(CONF)
|
||||
CONF(project='iotronic')
|
||||
logging.setup(CONF, "iotronic-wamp-agent")
|
||||
|
@ -225,13 +229,28 @@ class WampAgent(object):
|
|||
global AGENT_HOST
|
||||
AGENT_HOST = self.host
|
||||
|
||||
r = RPCServer()
|
||||
w = WampManager()
|
||||
self.r = RPCServer()
|
||||
self.w = WampManager()
|
||||
|
||||
try:
|
||||
r.start()
|
||||
w.start()
|
||||
except KeyboardInterrupt:
|
||||
w.stop()
|
||||
r.stop()
|
||||
exit()
|
||||
self.r.start()
|
||||
self.w.start()
|
||||
|
||||
def del_host(self, deregister=True):
|
||||
if deregister:
|
||||
try:
|
||||
self.dbapi.unregister_wampagent(self.host)
|
||||
LOG.info(_LI('Successfully stopped wampagent with hostname '
|
||||
'%(hostname)s.'),
|
||||
{'hostname': self.host})
|
||||
except exception.WampAgentNotFound:
|
||||
pass
|
||||
else:
|
||||
LOG.info(_LI('Not deregistering wampagent with hostname '
|
||||
'%(hostname)s.'),
|
||||
{'hostname': self.host})
|
||||
|
||||
def stop_handler(self, signum, frame):
|
||||
self.w.stop()
|
||||
self.r.stop()
|
||||
self.del_host()
|
||||
os._exit(0)
|
||||
|
|
Loading…
Reference in New Issue