managing wamp message for wamp rpcs

Change-Id: Ib69bb5026d1d339bffb227528c898e3181456ba6
This commit is contained in:
Fabio Verboso 2017-03-07 18:59:49 +01:00
parent 3964168d1e
commit ed9a1b943d
8 changed files with 105 additions and 76 deletions

View File

@ -13,15 +13,16 @@
# under the License. # under the License.
import cPickle as cpickle import cPickle as cpickle
from iotronic.common import exception
from iotronic.common import states from iotronic.common import states
from iotronic.conductor.provisioner import Provisioner from iotronic.conductor.provisioner import Provisioner
from iotronic import objects from iotronic import objects
from iotronic.objects import base as objects_base from iotronic.objects import base as objects_base
from iotronic.wamp import wampmessage as wm
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging import oslo_messaging
import random import random
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -50,18 +51,24 @@ class ConductorEndpoint(object):
LOG.info("ECHO: %s" % data) LOG.info("ECHO: %s" % data)
return data return data
def registration_uuid(self, ctx, uuid, session_num): def connection(self, ctx, uuid, session_num):
LOG.debug('Received registration from %s with session %s', LOG.debug('Received registration from %s with session %s',
uuid, session_num) uuid, session_num)
try: try:
node = objects.Node.get_by_uuid(ctx, uuid) node = objects.Node.get_by_uuid(ctx, uuid)
except Exception: except Exception as exc:
return exception.NodeNotFound(node=uuid) msg = exc.message % {'node': uuid}
LOG.error(msg)
wmessage = wm.WampError(msg).serialize()
return wmessage
try: try:
old_session = objects.SessionWP( old_ses = objects.SessionWP(ctx)
ctx).get_session_by_node_uuid(node.uuid, valid=True) old_ses = old_ses.get_session_by_node_uuid(ctx, node.uuid,
old_session.valid = False valid=True)
old_session.save() old_ses.valid = False
old_ses.save()
except Exception: except Exception:
LOG.debug('valid session for %s not found', node.uuid) LOG.debug('valid session for %s not found', node.uuid)
@ -74,23 +81,28 @@ class ConductorEndpoint(object):
session.session_id = session_num session.session_id = session_num
session.create() session.create()
session.save() session.save()
return return wm.WampSuccess('').serialize()
def registration(self, ctx, code, session_num): def registration(self, ctx, code, session_num):
LOG.debug('Received registration from %s with session %s', LOG.debug('Received registration from %s with session %s',
code, session_num) code, session_num)
try: try:
node = objects.Node.get_by_code(ctx, code) node = objects.Node.get_by_code(ctx, code)
except Exception: except Exception as exc:
return exception.NodeNotFound(node=code) msg = exc.message % {'node': code}
LOG.error(msg)
wmessage = wm.WampError(msg).serialize()
return wmessage
try: try:
old_session = objects.SessionWP(ctx old_ses = objects.SessionWP(ctx)
).get_session_by_node_uuid( old_ses = old_ses.get_session_by_node_uuid(ctx, node.uuid,
node.uuid, valid=True) valid=True)
old_session.valid = False old_ses.valid = False
old_session.save() old_ses.save()
except Exception: except Exception:
LOG.debug('valid session for %s Not found', node.uuid) LOG.debug('valid session for %s not found', node.uuid)
session = objects.SessionWP(ctx) session = objects.SessionWP(ctx)
session.node_id = node.id session.node_id = node.id
@ -110,7 +122,9 @@ class ConductorEndpoint(object):
node.save() node.save()
LOG.debug('sending this conf %s', node.config) LOG.debug('sending this conf %s', node.config)
return node.config
wmessage = wm.WampSuccess(node.config)
return wmessage.serialize()
def destroy_node(self, ctx, node_id): def destroy_node(self, ctx, node_id):
LOG.info('Destroying node with id %s', LOG.info('Destroying node with id %s',

View File

@ -64,8 +64,8 @@ class ConductorAPI(object):
return cctxt.call(context, 'registration', return cctxt.call(context, 'registration',
code=code, session_num=session_num) code=code, session_num=session_num)
def registration_uuid(self, context, uuid, session_num, topic=None): def connection(self, context, uuid, session_num, topic=None):
"""Registration of a node. """Connection of a node.
:param context: request context. :param context: request context.
:param uuid: uuid node :param uuid: uuid node
@ -73,7 +73,7 @@ class ConductorAPI(object):
:param topic: RPC topic. Defaults to self.topic. :param topic: RPC topic. Defaults to self.topic.
""" """
cctxt = self.client.prepare(topic=topic or self.topic, version='1.0') cctxt = self.client.prepare(topic=topic or self.topic, version='1.0')
return cctxt.call(context, 'registration_uuid', return cctxt.call(context, 'connection',
uuid=uuid, session_num=session_num) uuid=uuid, session_num=session_num)
def create_node(self, context, node_obj, location_obj, topic=None): def create_node(self, context, node_obj, location_obj, topic=None):

View File

@ -216,13 +216,17 @@ class Connection(object):
def get_session_by_node_uuid(self, node_uuid, valid): def get_session_by_node_uuid(self, node_uuid, valid):
"""Return a Wamp session of a Node """Return a Wamp session of a Node
:param filters: Filters to apply. Defaults to None. :param node_uuid: Filters to apply. Defaults to None.
:param limit: Maximum number of wampagents to return. :param valid: is valid
:param marker: the last item of the previous page; we return the next :returns: A session.
result set. """
:param sort_key: Attribute by which results should be sorted.
:param sort_dir: direction in which results should be sorted. @abc.abstractmethod
(asc, desc) def get_session_by_id(self, session_id):
"""Return a Wamp session
:param session_id: The id of a session.
:returns: A session.
""" """
@abc.abstractmethod @abc.abstractmethod

View File

@ -406,13 +406,12 @@ class Connection(api.Connection):
models.SessionWP).filter_by( models.SessionWP).filter_by(
node_uuid=node_uuid).filter_by( node_uuid=node_uuid).filter_by(
valid=valid) valid=valid)
try: try:
return query.one() return query.one()
except NoResultFound: except NoResultFound:
raise exception.NodeNotConnected(node=node_uuid) raise exception.NodeNotConnected(node=node_uuid)
def get_session_by_session_id(self, session_id): def get_session_by_id(self, session_id):
query = model_query(models.SessionWP).filter_by(session_id=session_id) query = model_query(models.SessionWP).filter_by(session_id=session_id)
try: try:
return query.one() return query.one()

View File

@ -1,5 +1,5 @@
# coding=utf-8 # Copyright 2017 MDSLAB - University of Messina
# # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -13,13 +13,11 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from oslo_utils import strutils
from oslo_utils import uuidutils
from iotronic.common import exception from iotronic.common import exception
from iotronic.db import api as dbapi from iotronic.db import api as dbapi
from iotronic.objects import base from iotronic.objects import base
from iotronic.objects import utils as obj_utils from iotronic.objects import utils as obj_utils
from oslo_utils import strutils
class SessionWP(base.IotronicObject): class SessionWP(base.IotronicObject):
@ -61,8 +59,6 @@ class SessionWP(base.IotronicObject):
""" """
if strutils.is_int_like(session_id): if strutils.is_int_like(session_id):
return cls.get_by_id(context, session_id) return cls.get_by_id(context, session_id)
elif uuidutils.is_uuid_like(session_id):
return cls.get_by_uuid(context, session_id)
else: else:
raise exception.InvalidIdentity(identity=session_id) raise exception.InvalidIdentity(identity=session_id)
@ -77,17 +73,6 @@ class SessionWP(base.IotronicObject):
session = SessionWP._from_db_object(cls(context), db_session) session = SessionWP._from_db_object(cls(context), db_session)
return session return session
@base.remotable_classmethod
def get_by_session_id(cls, context, session_id):
"""Find a session based on its integer id and return a SessionWP object.
:param session_id: the id of a session.
:returns: a :class:`SessionWP` object.
"""
db_session = cls.dbapi.get_session_by_session_id(session_id)
session = SessionWP._from_db_object(cls(context), db_session)
return session
@base.remotable_classmethod @base.remotable_classmethod
def get_session_by_node_uuid(cls, context, node_uuid, valid=True): def get_session_by_node_uuid(cls, context, node_uuid, valid=True):
"""Find a session based on uuid and return a :class:`SessionWP` object. """Find a session based on uuid and return a :class:`SessionWP` object.
@ -120,28 +105,6 @@ class SessionWP(base.IotronicObject):
sort_dir=sort_dir) sort_dir=sort_dir)
return SessionWP._from_db_object_list(db_sessions, cls, context) return SessionWP._from_db_object_list(db_sessions, cls, context)
'''
@base.remotable_classmethod
def list_by_node_id(cls, context, node_id, limit=None, marker=None,
sort_key=None, sort_dir=None):
"""Return a list of SessionWP objects associated with a given node ID.
:param context: Security context.
:param node_id: the ID of the node.
:param limit: maximum number of resources to return in a single result.
:param marker: pagination marker for large data sets.
:param sort_key: column to sort results by.
:param sort_dir: direction to sort. "asc" or "desc".
:returns: a list of :class:`SessionWP` object.
"""
db_sessions = cls.dbapi.get_sessions_by_node_id(node_id, limit=limit,
marker=marker,
sort_key=sort_key,
sort_dir=sort_dir)
return SessionWP._from_db_object_list(db_sessions, cls, context)
'''
@base.remotable @base.remotable
def create(self, context=None): def create(self, context=None):
"""Create a SessionWP record in the DB. """Create a SessionWP record in the DB.

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC. # Copyright 2017 MDSLAB - University of Messina
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -113,8 +113,8 @@ class WampFrontend(wamp.ApplicationSession):
if CONF.wamp.register_agent: if CONF.wamp.register_agent:
self.register(fun.registration, u'stack4things.register') self.register(fun.registration, u'stack4things.register')
LOG.info("I have been set as registration agent") LOG.info("I have been set as registration agent")
self.register(fun.registration_uuid, self.register(fun.connection,
AGENT_HOST + u'.stack4things.register_uuid') AGENT_HOST + u'.stack4things.connection')
self.register(fun.echo, self.register(fun.echo,
AGENT_HOST + u'.stack4things.echo') AGENT_HOST + u'.stack4things.echo')
LOG.info("procedure registered") LOG.info("procedure registered")

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC. # Copyright 2017 MDSLAB - University of Messina
# All Rights Reserved. # All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -54,8 +54,8 @@ def node_on_leave(session_id):
LOG.debug('session %s not found', session_id) LOG.debug('session %s not found', session_id)
def registration_uuid(uuid, session): def connection(uuid, session):
return c.registration_uuid(ctxt, uuid, session) return c.connection(ctxt, uuid, session)
def registration(code, session): def registration(code, session):

View File

@ -0,0 +1,49 @@
# Copyright 2017 MDSLAB - University of Messina
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
SUCCESS = 'SUCCESS'
ERROR = 'ERROR'
WARNING = 'WARNING'
class WampMessage(object):
def __init__(self, message=None, result=None):
self.message = message
self.result = result
def serialize(self):
return json.dumps(self, default=lambda o: o.__dict__)
def deserialize(self, received):
self.__dict__ = json.loads(received)
return self
class WampSuccess(WampMessage):
def __init__(self, msg=None):
super(WampSuccess, self).__init__(msg, SUCCESS)
class WampError(WampMessage):
def __init__(self, msg=None):
super(WampError, self).__init__(msg, ERROR)
class WampWarning(WampMessage):
def __init__(self, msg=None):
super(WampWarning, self).__init__(msg, WARNING)