managing wamp message for wamp rpcs

Change-Id: Ib69bb5026d1d339bffb227528c898e3181456ba6
This commit is contained in:
Fabio Verboso 2017-03-07 18:59:49 +01:00
parent 3964168d1e
commit ed9a1b943d
8 changed files with 105 additions and 76 deletions

View File

@ -13,15 +13,16 @@
# under the License.
import cPickle as cpickle
from iotronic.common import exception
from iotronic.common import states
from iotronic.conductor.provisioner import Provisioner
from iotronic import objects
from iotronic.objects import base as objects_base
from iotronic.wamp import wampmessage as wm
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
import random
LOG = logging.getLogger(__name__)
@ -50,18 +51,24 @@ class ConductorEndpoint(object):
LOG.info("ECHO: %s" % data)
return data
def registration_uuid(self, ctx, uuid, session_num):
def connection(self, ctx, uuid, session_num):
LOG.debug('Received registration from %s with session %s',
uuid, session_num)
try:
node = objects.Node.get_by_uuid(ctx, uuid)
except Exception:
return exception.NodeNotFound(node=uuid)
except Exception as exc:
msg = exc.message % {'node': uuid}
LOG.error(msg)
wmessage = wm.WampError(msg).serialize()
return wmessage
try:
old_session = objects.SessionWP(
ctx).get_session_by_node_uuid(node.uuid, valid=True)
old_session.valid = False
old_session.save()
old_ses = objects.SessionWP(ctx)
old_ses = old_ses.get_session_by_node_uuid(ctx, node.uuid,
valid=True)
old_ses.valid = False
old_ses.save()
except Exception:
LOG.debug('valid session for %s not found', node.uuid)
@ -74,23 +81,28 @@ class ConductorEndpoint(object):
session.session_id = session_num
session.create()
session.save()
return
return wm.WampSuccess('').serialize()
def registration(self, ctx, code, session_num):
LOG.debug('Received registration from %s with session %s',
code, session_num)
try:
node = objects.Node.get_by_code(ctx, code)
except Exception:
return exception.NodeNotFound(node=code)
except Exception as exc:
msg = exc.message % {'node': code}
LOG.error(msg)
wmessage = wm.WampError(msg).serialize()
return wmessage
try:
old_session = objects.SessionWP(ctx
).get_session_by_node_uuid(
node.uuid, valid=True)
old_session.valid = False
old_session.save()
old_ses = objects.SessionWP(ctx)
old_ses = old_ses.get_session_by_node_uuid(ctx, node.uuid,
valid=True)
old_ses.valid = False
old_ses.save()
except Exception:
LOG.debug('valid session for %s Not found', node.uuid)
LOG.debug('valid session for %s not found', node.uuid)
session = objects.SessionWP(ctx)
session.node_id = node.id
@ -110,7 +122,9 @@ class ConductorEndpoint(object):
node.save()
LOG.debug('sending this conf %s', node.config)
return node.config
wmessage = wm.WampSuccess(node.config)
return wmessage.serialize()
def destroy_node(self, ctx, node_id):
LOG.info('Destroying node with id %s',

View File

@ -64,8 +64,8 @@ class ConductorAPI(object):
return cctxt.call(context, 'registration',
code=code, session_num=session_num)
def registration_uuid(self, context, uuid, session_num, topic=None):
"""Registration of a node.
def connection(self, context, uuid, session_num, topic=None):
"""Connection of a node.
:param context: request context.
:param uuid: uuid node
@ -73,7 +73,7 @@ class ConductorAPI(object):
:param topic: RPC topic. Defaults to self.topic.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.0')
return cctxt.call(context, 'registration_uuid',
return cctxt.call(context, 'connection',
uuid=uuid, session_num=session_num)
def create_node(self, context, node_obj, location_obj, topic=None):

View File

@ -216,13 +216,17 @@ class Connection(object):
def get_session_by_node_uuid(self, node_uuid, valid):
"""Return a Wamp session of a Node
:param filters: Filters to apply. Defaults to None.
:param limit: Maximum number of wampagents to return.
:param marker: the last item of the previous page; we return the next
result set.
:param sort_key: Attribute by which results should be sorted.
:param sort_dir: direction in which results should be sorted.
(asc, desc)
:param node_uuid: Filters to apply. Defaults to None.
:param valid: is valid
:returns: A session.
"""
@abc.abstractmethod
def get_session_by_id(self, session_id):
"""Return a Wamp session
:param session_id: The id of a session.
:returns: A session.
"""
@abc.abstractmethod

View File

@ -406,13 +406,12 @@ class Connection(api.Connection):
models.SessionWP).filter_by(
node_uuid=node_uuid).filter_by(
valid=valid)
try:
return query.one()
except NoResultFound:
raise exception.NodeNotConnected(node=node_uuid)
def get_session_by_session_id(self, session_id):
def get_session_by_id(self, session_id):
query = model_query(models.SessionWP).filter_by(session_id=session_id)
try:
return query.one()

View File

@ -1,5 +1,5 @@
# coding=utf-8
#
# Copyright 2017 MDSLAB - University of Messina
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -13,13 +13,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import strutils
from oslo_utils import uuidutils
from iotronic.common import exception
from iotronic.db import api as dbapi
from iotronic.objects import base
from iotronic.objects import utils as obj_utils
from oslo_utils import strutils
class SessionWP(base.IotronicObject):
@ -61,8 +59,6 @@ class SessionWP(base.IotronicObject):
"""
if strutils.is_int_like(session_id):
return cls.get_by_id(context, session_id)
elif uuidutils.is_uuid_like(session_id):
return cls.get_by_uuid(context, session_id)
else:
raise exception.InvalidIdentity(identity=session_id)
@ -77,17 +73,6 @@ class SessionWP(base.IotronicObject):
session = SessionWP._from_db_object(cls(context), db_session)
return session
@base.remotable_classmethod
def get_by_session_id(cls, context, session_id):
"""Find a session based on its integer id and return a SessionWP object.
:param session_id: the id of a session.
:returns: a :class:`SessionWP` object.
"""
db_session = cls.dbapi.get_session_by_session_id(session_id)
session = SessionWP._from_db_object(cls(context), db_session)
return session
@base.remotable_classmethod
def get_session_by_node_uuid(cls, context, node_uuid, valid=True):
"""Find a session based on uuid and return a :class:`SessionWP` object.
@ -120,28 +105,6 @@ class SessionWP(base.IotronicObject):
sort_dir=sort_dir)
return SessionWP._from_db_object_list(db_sessions, cls, context)
'''
@base.remotable_classmethod
def list_by_node_id(cls, context, node_id, limit=None, marker=None,
sort_key=None, sort_dir=None):
"""Return a list of SessionWP objects associated with a given node ID.
:param context: Security context.
:param node_id: the ID of the node.
:param limit: maximum number of resources to return in a single result.
:param marker: pagination marker for large data sets.
:param sort_key: column to sort results by.
:param sort_dir: direction to sort. "asc" or "desc".
:returns: a list of :class:`SessionWP` object.
"""
db_sessions = cls.dbapi.get_sessions_by_node_id(node_id, limit=limit,
marker=marker,
sort_key=sort_key,
sort_dir=sort_dir)
return SessionWP._from_db_object_list(db_sessions, cls, context)
'''
@base.remotable
def create(self, context=None):
"""Create a SessionWP record in the DB.

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2017 MDSLAB - University of Messina
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -113,8 +113,8 @@ class WampFrontend(wamp.ApplicationSession):
if CONF.wamp.register_agent:
self.register(fun.registration, u'stack4things.register')
LOG.info("I have been set as registration agent")
self.register(fun.registration_uuid,
AGENT_HOST + u'.stack4things.register_uuid')
self.register(fun.connection,
AGENT_HOST + u'.stack4things.connection')
self.register(fun.echo,
AGENT_HOST + u'.stack4things.echo')
LOG.info("procedure registered")

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2017 MDSLAB - University of Messina
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -54,8 +54,8 @@ def node_on_leave(session_id):
LOG.debug('session %s not found', session_id)
def registration_uuid(uuid, session):
return c.registration_uuid(ctxt, uuid, session)
def connection(uuid, session):
return c.connection(ctxt, uuid, session)
def registration(code, session):

View File

@ -0,0 +1,49 @@
# Copyright 2017 MDSLAB - University of Messina
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
SUCCESS = 'SUCCESS'
ERROR = 'ERROR'
WARNING = 'WARNING'
class WampMessage(object):
def __init__(self, message=None, result=None):
self.message = message
self.result = result
def serialize(self):
return json.dumps(self, default=lambda o: o.__dict__)
def deserialize(self, received):
self.__dict__ = json.loads(received)
return self
class WampSuccess(WampMessage):
def __init__(self, msg=None):
super(WampSuccess, self).__init__(msg, SUCCESS)
class WampError(WampMessage):
def __init__(self, msg=None):
super(WampError, self).__init__(msg, ERROR)
class WampWarning(WampMessage):
def __init__(self, msg=None):
super(WampWarning, self).__init__(msg, WARNING)