Fix wamp session synchronization on the database.
Improved and fixed the session synchronization on the wamp agent. Change-Id: I4e94d975b06c87f08c6680084b976fe8146bb424
This commit is contained in:
parent
30fa19b1cf
commit
36c20a1ef6
|
@ -268,7 +268,7 @@ class Connection(object):
|
|||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_valid_wpsessions_list(self):
|
||||
def get_valid_wpsessions_list(self, agent):
|
||||
"""Return a list of wpsession."""
|
||||
|
||||
@abc.abstractmethod
|
||||
|
|
|
@ -508,8 +508,13 @@ class Connection(api.Connection):
|
|||
except NoResultFound:
|
||||
return None
|
||||
|
||||
def get_valid_wpsessions_list(self):
|
||||
query = model_query(models.SessionWP).filter_by(valid=1)
|
||||
def get_valid_wpsessions_list(self, agent):
|
||||
query = model_query(models.SessionWP)
|
||||
query = query.filter_by(valid=1)
|
||||
query = query.join(models.Board,
|
||||
models.SessionWP.board_id == models.Board.id)
|
||||
query = query.filter_by(agent=agent)
|
||||
|
||||
return query.all()
|
||||
|
||||
# WAMPAGENT api
|
||||
|
|
|
@ -82,14 +82,14 @@ class SessionWP(base.IotronicObject):
|
|||
return session
|
||||
|
||||
@base.remotable_classmethod
|
||||
def valid_list(cls, context):
|
||||
def valid_list(cls, context, agent):
|
||||
"""Return a list of SessionWP objects.
|
||||
|
||||
:returns: a list of valid session_id
|
||||
|
||||
"""
|
||||
|
||||
db_list = cls.dbapi.get_valid_wpsessions_list()
|
||||
db_list = cls.dbapi.get_valid_wpsessions_list(agent)
|
||||
return [SessionWP._from_db_object(cls(context), x) for x in db_list]
|
||||
|
||||
@base.remotable
|
||||
|
|
|
@ -243,7 +243,7 @@ class WampManager(object):
|
|||
|
||||
session_l = await session.call(u'wamp.session.list')
|
||||
session_l.remove(details.session)
|
||||
fun.update_sessions(session_l)
|
||||
fun.update_sessions(session_l, AGENT_HOST)
|
||||
|
||||
@comp.on_leave
|
||||
async def onLeave(session, details):
|
||||
|
|
|
@ -59,25 +59,40 @@ def alive():
|
|||
'%Y-%m-%dT%H:%M:%S.%f')
|
||||
|
||||
|
||||
def update_sessions(session_list):
|
||||
def update_sessions(session_list, agent):
|
||||
session_list = set(session_list)
|
||||
list_from_db = objects.SessionWP.valid_list(ctxt)
|
||||
list_from_db = objects.SessionWP.valid_list(ctxt, agent)
|
||||
list_db = set([int(elem.session_id) for elem in list_from_db])
|
||||
LOG.debug('Wamp session list: %s', session_list)
|
||||
LOG.debug('DB session list: %s', list_db)
|
||||
|
||||
if session_list == list_db:
|
||||
LOG.debug('Sessions on the database are updated.')
|
||||
return
|
||||
|
||||
# list of board not connected anymore
|
||||
old_connected = list_db.difference(session_list)
|
||||
|
||||
LOG.debug('no more valid session list: %s', old_connected)
|
||||
|
||||
for elem in old_connected:
|
||||
old_session = objects.SessionWP.get(ctxt, elem)
|
||||
old_session.valid = False
|
||||
old_session.save()
|
||||
LOG.debug('%s has been put offline.', old_session.board_uuid)
|
||||
if old_session.valid:
|
||||
old_session.valid = False
|
||||
old_session.save()
|
||||
board = objects.Board.get_by_uuid(ctxt, old_session.board_uuid)
|
||||
board.status = states.OFFLINE
|
||||
board.save()
|
||||
LOG.debug('Session updated. Board %s is now %s', board.uuid,
|
||||
states.OFFLINE)
|
||||
|
||||
if old_connected:
|
||||
LOG.warning('Some boards have been updated: status offline')
|
||||
|
||||
# list of board still connected
|
||||
keep_connected = list_db.intersection(session_list)
|
||||
LOG.debug('still valid session list: %s', keep_connected)
|
||||
|
||||
for elem in keep_connected:
|
||||
for x in list_from_db:
|
||||
if x.session_id == str(elem):
|
||||
|
@ -91,13 +106,18 @@ def board_on_leave(session_id):
|
|||
LOG.debug('A board with %s disconnectd', session_id)
|
||||
try:
|
||||
old_session = objects.SessionWP.get(ctxt, session_id)
|
||||
old_session.valid = False
|
||||
old_session.save()
|
||||
board = objects.Board.get_by_uuid(ctxt, old_session.board_uuid)
|
||||
board.status = states.OFFLINE
|
||||
board.save()
|
||||
LOG.debug('Session updated. Board %s is now %s', board.uuid,
|
||||
states.OFFLINE)
|
||||
|
||||
if old_session.valid:
|
||||
old_session.valid = False
|
||||
old_session.save()
|
||||
board = objects.Board.get_by_uuid(ctxt, old_session.board_uuid)
|
||||
board.status = states.OFFLINE
|
||||
board.save()
|
||||
LOG.debug('Session updated. Board %s is now %s', board.uuid,
|
||||
states.OFFLINE)
|
||||
return
|
||||
|
||||
LOG.debug('Session %s already set to not valid', session_id)
|
||||
except Exception:
|
||||
LOG.debug('session %s not found', session_id)
|
||||
|
||||
|
@ -111,13 +131,14 @@ def connection(uuid, session):
|
|||
msg = exc.message % {'board': uuid}
|
||||
LOG.error(msg)
|
||||
return wm.WampError(msg).serialize()
|
||||
|
||||
try:
|
||||
old_ses = objects.SessionWP(ctxt)
|
||||
old_ses = old_ses.get_session_by_board_uuid(ctxt, board.uuid,
|
||||
valid=True)
|
||||
old_ses.valid = False
|
||||
old_ses.save()
|
||||
LOG.debug('old session for %s found: %s', board.uuid,
|
||||
old_ses.session_id)
|
||||
|
||||
except Exception:
|
||||
LOG.debug('valid session for %s not found', board.uuid)
|
||||
|
@ -127,10 +148,13 @@ def connection(uuid, session):
|
|||
'session_id': session}
|
||||
session = objects.SessionWP(ctxt, **session_data)
|
||||
session.create()
|
||||
LOG.debug('new session for %s saved %s', board.uuid,
|
||||
session.session_id)
|
||||
board.status = states.ONLINE
|
||||
board.save()
|
||||
LOG.info('Board %s (%s) is now %s', board.uuid,
|
||||
board.name, states.ONLINE)
|
||||
|
||||
return wm.WampSuccess('').serialize()
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue