communication between conductor, agents and nodes improved
Change-Id: I083bd104d8a0546f95f59d02004819fb0c3577d5
This commit is contained in:
parent
400a934abf
commit
0b1826320b
|
@ -19,11 +19,8 @@ Iotronic Conductor
|
|||
from oslo_config import cfg
|
||||
|
||||
from iotronic.conductor.manager import ConductorManager
|
||||
|
||||
import socket
|
||||
|
||||
CONF = cfg.CONF
|
||||
if __name__ == '__main__':
|
||||
|
||||
cond=ConductorManager('iotronic')
|
||||
|
||||
|
||||
cond = ConductorManager(socket.gethostname())
|
||||
|
|
|
@ -20,7 +20,7 @@ Iotronic Wamp Agent
|
|||
"""
|
||||
|
||||
from iotronic.wamp.agent import WampAgent
|
||||
import socket
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
wa=WampAgent('agent')
|
||||
wa = WampAgent(socket.gethostname())
|
||||
|
|
|
@ -34,7 +34,7 @@ class Node(base.APIBase):
|
|||
code = wsme.wsattr(wtypes.text)
|
||||
status = wsme.wsattr(wtypes.text)
|
||||
name = wsme.wsattr(wtypes.text)
|
||||
device = wsme.wsattr(wtypes.text)
|
||||
type = wsme.wsattr(wtypes.text)
|
||||
session = wsme.wsattr(wtypes.text)
|
||||
mobile = types.boolean
|
||||
location = wsme.wsattr([loc.Location])
|
||||
|
|
|
@ -293,6 +293,10 @@ class WampAgentNotFound(NotFound):
|
|||
message = _("WampAgent %(wampagent)s could not be found.")
|
||||
|
||||
|
||||
class WampRegistrationAgentNotFound(NotFound):
|
||||
message = _("No Wamp Registration Agent could not be found.")
|
||||
|
||||
|
||||
class WampAgentAlreadyRegistered(IotronicException):
|
||||
message = _("WampAgent %(wampagent)s already registered.")
|
||||
|
||||
|
|
|
@ -149,13 +149,13 @@ inspected node shall transition to MANAGEABLE status.
|
|||
INSPECTFAIL = 'inspect failed'
|
||||
""" Node inspection failed. """
|
||||
|
||||
|
||||
UPDATE_ALLOWED_STATES = (DEPLOYFAIL, INSPECTING, INSPECTFAIL, CLEANFAIL)
|
||||
"""Transitional states in which we allow updating a node."""
|
||||
|
||||
# NEW
|
||||
OPERATIVE = 'operative'
|
||||
MAINTENANCE = 'maintenance'
|
||||
REGISTERED = 'registered'
|
||||
|
||||
##############
|
||||
# Power states
|
||||
|
@ -185,6 +185,7 @@ def on_enter(new_state, event):
|
|||
LOG.debug("Entering new state '%s' in response to event '%s'",
|
||||
new_state, event)
|
||||
|
||||
|
||||
watchers = {}
|
||||
watchers['on_exit'] = on_exit
|
||||
watchers['on_enter'] = on_enter
|
||||
|
|
|
@ -13,59 +13,116 @@
|
|||
# under the License.
|
||||
|
||||
from iotronic.common import exception
|
||||
from iotronic.common import states
|
||||
from iotronic.conductor.provisioner import Provisioner
|
||||
from iotronic import objects
|
||||
from iotronic.objects import base as objects_base
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
serializer = objects_base.IotronicObjectSerializer()
|
||||
|
||||
a = ['wagent2', 'wagent1']
|
||||
|
||||
|
||||
def get_best_agent():
|
||||
agent = a.pop(0)
|
||||
LOG.debug('Selected agent: %s', agent)
|
||||
a.append(agent)
|
||||
return agent
|
||||
|
||||
|
||||
class ConductorEndpoint(object):
|
||||
def __init__(self):
|
||||
def __init__(self, ragent):
|
||||
transport = oslo_messaging.get_transport(cfg.CONF)
|
||||
self.target = oslo_messaging.Target()
|
||||
self.wamp_agent_client = oslo_messaging.RPCClient(transport,
|
||||
self.target)
|
||||
self.ragent = ragent
|
||||
|
||||
def echo(self, ctx, data):
|
||||
LOG.info("ECHO: %s" % data)
|
||||
return data
|
||||
|
||||
def registration(self, ctx, token, session_num):
|
||||
def registration_uuid(self, ctx, uuid, session_num):
|
||||
LOG.debug('Receved registration from %s with session %s',
|
||||
token, session_num)
|
||||
uuid, session_num)
|
||||
try:
|
||||
node = objects.Node.get_by_code({}, token)
|
||||
node = objects.Node.get_by_uuid(ctx, uuid)
|
||||
except Exception:
|
||||
return exception.NodeNotFound(node=token)
|
||||
return exception.NodeNotFound(node=uuid)
|
||||
try:
|
||||
old_session = objects.SessionWP(
|
||||
{}).get_session_by_node_uuid(node.uuid, valid=True)
|
||||
ctx).get_session_by_node_uuid(node.uuid, valid=True)
|
||||
old_session.valid = False
|
||||
old_session.save()
|
||||
except Exception:
|
||||
LOG.debug('valid session for %s not found', node.uuid)
|
||||
|
||||
node.status = states.REGISTERED
|
||||
node.save()
|
||||
|
||||
session = objects.SessionWP(ctx)
|
||||
session.node_id = node.id
|
||||
session.node_uuid = node.uuid
|
||||
session.session_id = session_num
|
||||
session.create()
|
||||
session.save()
|
||||
return
|
||||
|
||||
def registration(self, ctx, code, session_num):
|
||||
LOG.debug('Receved registration from %s with session %s',
|
||||
code, session_num)
|
||||
try:
|
||||
node = objects.Node.get_by_code(ctx, code)
|
||||
except Exception:
|
||||
return exception.NodeNotFound(node=code)
|
||||
try:
|
||||
old_session = objects.SessionWP(ctx
|
||||
).get_session_by_node_uuid(
|
||||
node.uuid, valid=True)
|
||||
old_session.valid = False
|
||||
old_session.save()
|
||||
except Exception:
|
||||
LOG.debug('valid session for %s Not found', node.uuid)
|
||||
|
||||
session = objects.SessionWP({})
|
||||
session = objects.SessionWP(ctx)
|
||||
session.node_id = node.id
|
||||
session.node_uuid = node.uuid
|
||||
session.session_id = session_num
|
||||
session.create()
|
||||
session.save()
|
||||
|
||||
return
|
||||
node.agent = get_best_agent()
|
||||
agent = objects.WampAgent.get_by_hostname(ctx, node.agent)
|
||||
|
||||
prov = Provisioner(node)
|
||||
prov.conf_registration_agent(self.ragent.wsurl)
|
||||
|
||||
prov.conf_main_agent(agent.wsurl)
|
||||
node.config = prov.get_config()
|
||||
node.save()
|
||||
|
||||
LOG.debug('sending this conf %s', node.config)
|
||||
return node.config
|
||||
|
||||
def destroy_node(self, ctx, node_id):
|
||||
LOG.debug('Destroying node with id %s',
|
||||
node_id)
|
||||
LOG.info('Destroying node with id %s',
|
||||
node_id)
|
||||
node = objects.Node.get_by_uuid(ctx, node_id)
|
||||
|
||||
prov = Provisioner()
|
||||
prov.conf_clean()
|
||||
p = prov.get_config()
|
||||
LOG.debug('sending this conf %s', p)
|
||||
self.execute_on_board(ctx, node_id, 'destroyNode', (p,))
|
||||
|
||||
node.destroy()
|
||||
return {}
|
||||
|
||||
return
|
||||
|
||||
def create_node(self, ctx, node_obj, location_obj):
|
||||
new_node = serializer.deserialize_entity(ctx, node_obj)
|
||||
|
@ -79,21 +136,22 @@ class ConductorEndpoint(object):
|
|||
return serializer.serialize_entity(ctx, new_node)
|
||||
|
||||
def execute_on_board(self, ctx, board, wamp_rpc_call, wamp_rpc_args):
|
||||
|
||||
LOG.debug('Executing \"%s\" on the board: %s', wamp_rpc_call, board)
|
||||
|
||||
uuid_agent = 'agent'
|
||||
# ASAP get agent from node_uuid
|
||||
try:
|
||||
node = objects.Node.get_by_uuid(ctx, board)
|
||||
except Exception:
|
||||
return exception.NodeNotFound(node=board)
|
||||
|
||||
s4t_topic = 's4t_invoke_wamp'
|
||||
full_topic = uuid_agent + '.' + s4t_topic
|
||||
|
||||
ctxt = {}
|
||||
full_topic = node.agent + '.' + s4t_topic
|
||||
|
||||
self.target.topic = full_topic
|
||||
self.wamp_agent_client.prepare(timeout=10)
|
||||
|
||||
full_wamp_call = 'iotronic.' + board + "." + wamp_rpc_call
|
||||
|
||||
return self.wamp_agent_client.call(ctxt, full_topic,
|
||||
return self.wamp_agent_client.call(ctx, full_topic,
|
||||
wamp_rpc_call=full_wamp_call,
|
||||
data=wamp_rpc_args)
|
||||
|
|
|
@ -25,6 +25,7 @@ import time
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
MANAGER_TOPIC = 'iotronic.conductor_manager'
|
||||
RAGENT = None
|
||||
|
||||
conductor_opts = [
|
||||
cfg.StrOpt('api_url',
|
||||
|
@ -71,11 +72,18 @@ class ConductorManager(object):
|
|||
transport = oslo_messaging.get_transport(cfg.CONF)
|
||||
target = oslo_messaging.Target(topic=self.topic, server=self.host,
|
||||
version=self.RPC_API_VERSION)
|
||||
|
||||
ragent = self.dbapi.get_registration_wampagent()
|
||||
|
||||
LOG.info("Found registration agent: %s on %s",
|
||||
ragent.hostname, ragent.wsurl)
|
||||
|
||||
endpoints = [
|
||||
endp.ConductorEndpoint(),
|
||||
endp.ConductorEndpoint(ragent),
|
||||
]
|
||||
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
|
||||
executor='threading')
|
||||
|
||||
try:
|
||||
server.start()
|
||||
while True:
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
# coding=utf-8
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from iotronic.objects import base as objects_base
|
||||
|
||||
serializer = objects_base.IotronicObjectSerializer()
|
||||
|
||||
|
||||
class Provisioner(object):
|
||||
def __init__(self, node=None):
|
||||
if not node:
|
||||
self.config = {"iotronic": {"extra": {}}}
|
||||
else:
|
||||
self.config = node.config
|
||||
if 'iotronic' not in self.config:
|
||||
self.config = {"iotronic": {"extra": {}}}
|
||||
if 'node' not in self.config['iotronic']:
|
||||
self.config['iotronic']['node'] = {}
|
||||
self.config['iotronic']['node'] = node.as_dict()
|
||||
self.config['iotronic']['node']['created_at'] = \
|
||||
node._attr_to_primitive('created_at')
|
||||
self.config['iotronic']['node']['updated_at'] = \
|
||||
node._attr_to_primitive('updated_at')
|
||||
|
||||
# workaround until node properties are not changed
|
||||
self.config['iotronic']['node']['type'] = 'yun'
|
||||
|
||||
try:
|
||||
del self.config['iotronic']['node']['config']
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def get_config(self):
|
||||
return self.config
|
||||
|
||||
def conf_registration_agent(self,
|
||||
url="ws://<WAMP-SERVER>:<WAMP-PORT>/",
|
||||
realm="s4t"):
|
||||
if 'wamp' not in self.config['iotronic']:
|
||||
self.config['iotronic']['wamp'] = {}
|
||||
if "registration-agent" not in self.config['iotronic']['wamp']:
|
||||
self.config['iotronic']['wamp']['registration-agent'] = {}
|
||||
if 'url' not in self.config['iotronic']['wamp']['registration-agent']:
|
||||
self.config['iotronic']['wamp']['registration-agent']['url'] = ""
|
||||
if 'realm' not in \
|
||||
self.config['iotronic']['wamp']['registration-agent']:
|
||||
self.config['iotronic']['wamp']['registration-agent']['realm'] = ""
|
||||
self.config['iotronic']['wamp']['registration-agent']['url'] = url
|
||||
self.config['iotronic']['wamp']['registration-agent']['realm'] = realm
|
||||
|
||||
def conf_main_agent(self,
|
||||
url="ws://<WAMP-SERVER>:<WAMP-PORT>/",
|
||||
realm="s4t"):
|
||||
if 'wamp' not in self.config['iotronic']:
|
||||
self.config['iotronic']['wamp'] = {}
|
||||
if "main-agent" not in self.config['iotronic']['wamp']:
|
||||
self.config['iotronic']['wamp']['main-agent'] = {}
|
||||
if 'url' not in self.config['iotronic']['wamp']['main-agent']:
|
||||
self.config['iotronic']['wamp']['main-agent']['url'] = ""
|
||||
if 'realm' not in self.config['iotronic']['wamp']['main-agent']:
|
||||
self.config['iotronic']['wamp']['main-agent']['realm'] = ""
|
||||
self.config['iotronic']['wamp']['main-agent']['url'] = url
|
||||
self.config['iotronic']['wamp']['main-agent']['realm'] = realm
|
||||
|
||||
def conf_clean(self):
|
||||
self.conf_registration_agent()
|
||||
if 'node' not in self.config['iotronic']:
|
||||
self.config['iotronic']['node'] = {}
|
||||
self.config['iotronic']['node']['token'] = "<REGISTRATION-TOKEN>"
|
|
@ -55,17 +55,29 @@ class ConductorAPI(object):
|
|||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.0')
|
||||
return cctxt.call(context, 'echo', data=data)
|
||||
|
||||
def registration(self, context, token, session_num, topic=None):
|
||||
def registration(self, context, code, session_num, topic=None):
|
||||
"""Registration of a node.
|
||||
|
||||
:param context: request context.
|
||||
:param token: token used for the first registration
|
||||
:param code: token used for the first registration
|
||||
:param session_num: wamp session number
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.0')
|
||||
return cctxt.call(context, 'registration',
|
||||
token=token, session_num=session_num)
|
||||
code=code, session_num=session_num)
|
||||
|
||||
def registration_uuid(self, context, uuid, session_num, topic=None):
|
||||
"""Registration of a node.
|
||||
|
||||
:param context: request context.
|
||||
:param uuid: uuid node
|
||||
:param session_num: wamp session number
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.0')
|
||||
return cctxt.call(context, 'registration_uuid',
|
||||
uuid=uuid, session_num=session_num)
|
||||
|
||||
def create_node(self, context, node_obj, location_obj, topic=None):
|
||||
"""Add a node on the cloud
|
||||
|
|
|
@ -250,6 +250,14 @@ class Connection(object):
|
|||
:raises: WampAgentNotFound
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_registration_wampagent(self):
|
||||
"""Retrieve the registration wampagent record from the database.
|
||||
|
||||
:returns: A wampagent.
|
||||
:raises: WampAgentNotFound
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def unregister_wampagent(self, hostname):
|
||||
"""Remove this wampagent from the service registry immediately.
|
||||
|
|
|
@ -163,7 +163,7 @@ class Connection(api.Connection):
|
|||
if 'uuid' not in values:
|
||||
values['uuid'] = uuidutils.generate_uuid()
|
||||
if 'status' not in values:
|
||||
values['status'] = states.OPERATIVE
|
||||
values['status'] = states.REGISTERED
|
||||
|
||||
node = models.Node()
|
||||
node.update(values)
|
||||
|
@ -261,19 +261,6 @@ class Connection(api.Connection):
|
|||
raise exception.NodeAssociated(
|
||||
node=node_id, instance=ref.instance_uuid)
|
||||
|
||||
if 'provision_state' in values:
|
||||
values['provision_updated_at'] = timeutils.utcnow()
|
||||
if values['provision_state'] == states.INSPECTING:
|
||||
values['inspection_started_at'] = timeutils.utcnow()
|
||||
values['inspection_finished_at'] = None
|
||||
elif (ref.provision_state == states.INSPECTING and
|
||||
values['provision_state'] == states.MANAGEABLE):
|
||||
values['inspection_finished_at'] = timeutils.utcnow()
|
||||
values['inspection_started_at'] = None
|
||||
elif (ref.provision_state == states.INSPECTING and
|
||||
values['provision_state'] == states.INSPECTFAIL):
|
||||
values['inspection_started_at'] = None
|
||||
|
||||
ref.update(values)
|
||||
return ref
|
||||
|
||||
|
@ -425,6 +412,14 @@ class Connection(api.Connection):
|
|||
except NoResultFound:
|
||||
raise exception.WampAgentNotFound(wampagent=hostname)
|
||||
|
||||
def get_registration_wampagent(self):
|
||||
try:
|
||||
return (model_query(models.WampAgent)
|
||||
.filter_by(ragent=True, online=True)
|
||||
.one())
|
||||
except NoResultFound:
|
||||
raise exception.WampRegistrationAgentNotFound()
|
||||
|
||||
def unregister_wampagent(self, hostname):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
|
|
|
@ -133,7 +133,9 @@ class WampAgent(Base):
|
|||
)
|
||||
id = Column(Integer, primary_key=True)
|
||||
hostname = Column(String(255), nullable=False)
|
||||
wsurl = Column(String(255), nullable=False)
|
||||
online = Column(Boolean, default=True)
|
||||
ragent = Column(Boolean, default=False)
|
||||
|
||||
|
||||
class Node(Base):
|
||||
|
@ -150,9 +152,11 @@ class Node(Base):
|
|||
code = Column(String(25))
|
||||
status = Column(String(15), nullable=True)
|
||||
name = Column(String(255), nullable=True)
|
||||
device = Column(String(255))
|
||||
type = Column(String(255))
|
||||
agent = Column(String(255), nullable=True)
|
||||
session = Column(String(255), nullable=True)
|
||||
mobile = Column(Boolean, default=False)
|
||||
config = Column(JSONEncodedDict)
|
||||
extra = Column(JSONEncodedDict)
|
||||
|
||||
|
||||
|
|
|
@ -16,16 +16,18 @@ from iotronic.objects import conductor
|
|||
from iotronic.objects import location
|
||||
from iotronic.objects import node
|
||||
from iotronic.objects import sessionwp
|
||||
from iotronic.objects import wampagent
|
||||
|
||||
Conductor = conductor.Conductor
|
||||
Node = node.Node
|
||||
Location = location.Location
|
||||
SessionWP = sessionwp.SessionWP
|
||||
|
||||
WampAgent = wampagent.WampAgent
|
||||
|
||||
__all__ = (
|
||||
Conductor,
|
||||
Node,
|
||||
Location,
|
||||
SessionWP,
|
||||
WampAgent,
|
||||
)
|
||||
|
|
|
@ -34,9 +34,11 @@ class Node(base.IotronicObject):
|
|||
'code': obj_utils.str_or_none,
|
||||
'status': obj_utils.str_or_none,
|
||||
'name': obj_utils.str_or_none,
|
||||
'device': obj_utils.str_or_none,
|
||||
'type': obj_utils.str_or_none,
|
||||
'agent': obj_utils.str_or_none,
|
||||
'session': obj_utils.str_or_none,
|
||||
'mobile': bool,
|
||||
'config': obj_utils.dict_or_none,
|
||||
'extra': obj_utils.dict_or_none,
|
||||
}
|
||||
|
||||
|
@ -223,6 +225,7 @@ class Node(base.IotronicObject):
|
|||
"""
|
||||
current = self.__class__.get_by_uuid(self._context, self.uuid)
|
||||
for field in self.fields:
|
||||
if (hasattr(self, base.get_attrname(field)) and
|
||||
if (hasattr(
|
||||
self, base.get_attrname(field)) and
|
||||
self[field] != current[field]):
|
||||
self[field] = current[field]
|
||||
|
|
|
@ -113,6 +113,7 @@ class SessionWP(base.IotronicObject):
|
|||
:returns: a list of :class:`SessionWP` object.
|
||||
|
||||
"""
|
||||
|
||||
db_sessions = cls.dbapi.get_session_list(limit=limit,
|
||||
marker=marker,
|
||||
sort_key=sort_key,
|
||||
|
@ -207,6 +208,7 @@ class SessionWP(base.IotronicObject):
|
|||
"""
|
||||
current = self.__class__.get_by_uuid(self._context, uuid=self.uuid)
|
||||
for field in self.fields:
|
||||
if (hasattr(self, base.get_attrname(field)) and
|
||||
if (hasattr(
|
||||
self, base.get_attrname(field)) and
|
||||
self[field] != current[field]):
|
||||
self[field] = current[field]
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
# coding=utf-8
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from iotronic.common.i18n import _
|
||||
from iotronic.db import api as db_api
|
||||
from iotronic.objects import base
|
||||
|
||||
|
||||
class WampAgent(base.IotronicObject):
|
||||
dbapi = db_api.get_instance()
|
||||
|
||||
fields = {
|
||||
'id': int,
|
||||
'hostname': str,
|
||||
'wsurl': str,
|
||||
'online': bool,
|
||||
'ragent': bool,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _from_db_object(wampagent, db_obj):
|
||||
"""Converts a database entity to a formal object."""
|
||||
for field in wampagent.fields:
|
||||
wampagent[field] = db_obj[field]
|
||||
|
||||
wampagent.obj_reset_changes()
|
||||
return wampagent
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_by_hostname(cls, context, hostname):
|
||||
"""Get a WampAgent record by its hostname.
|
||||
|
||||
:param hostname: the hostname on which a WampAgent is running
|
||||
:returns: a :class:`WampAgent` object.
|
||||
"""
|
||||
db_obj = cls.dbapi.get_wampagent(hostname)
|
||||
wampagent = WampAgent._from_db_object(cls(context), db_obj)
|
||||
return wampagent
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_registration_agent(cls, context=None):
|
||||
"""Get a Registration WampAgent
|
||||
|
||||
:param hostname: the hostname on which a WampAgent is running
|
||||
:returns: a :class:`WampAgent` object.
|
||||
"""
|
||||
db_obj = cls.dbapi.get_registration_wampagent()
|
||||
wampagent = WampAgent._from_db_object(cls(context), db_obj)
|
||||
return wampagent
|
||||
|
||||
def save(self, context):
|
||||
"""Save is not supported by WampAgent objects."""
|
||||
raise NotImplementedError(
|
||||
_('Cannot update a wampagent record directly.'))
|
||||
|
||||
@base.remotable
|
||||
def refresh(self, context=None):
|
||||
"""Loads and applies updates for this WampAgent.
|
||||
|
||||
Loads a :class:`WampAgent` with the same uuid from the database and
|
||||
checks for updated attributes.
|
||||
|
||||
:param context: Security context. NOTE: This should only
|
||||
be used internally by the indirection_api.
|
||||
Unfortunately, RPC requires context as the first
|
||||
argument, even though we don't use it.
|
||||
A context should be set when instantiating the
|
||||
object, e.g.: WampAgent(context)
|
||||
"""
|
||||
current = self.__class__.get_by_hostname(self._context,
|
||||
hostname=self.hostname)
|
||||
for field in self.fields:
|
||||
if (hasattr(
|
||||
self, base.get_attrname(field)) and
|
||||
self[field] != current[field]):
|
||||
self[field] = current[field]
|
||||
|
||||
@base.remotable
|
||||
def touch(self, context):
|
||||
"""Touch this wampagent's DB record, marking it as up-to-date."""
|
||||
self.dbapi.touch_wampagent(self.hostname)
|
|
@ -36,6 +36,9 @@ wamp_opts = [
|
|||
cfg.StrOpt('wamp_realm',
|
||||
default='s4t',
|
||||
help=('realm broker')),
|
||||
cfg.BoolOpt('register_agent',
|
||||
default=False,
|
||||
help=('Flag for se a registration agent')),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
@ -96,14 +99,20 @@ class WampFrontend(wamp.ApplicationSession):
|
|||
def onJoin(self, details):
|
||||
global wamp_session_caller, AGENT_HOST
|
||||
wamp_session_caller = self
|
||||
import iotronic.wamp.registerd_functions as fun
|
||||
|
||||
self.subscribe(fun.board_on_leave, 'wamp.session.on_leave')
|
||||
self.subscribe(fun.board_on_join, 'wamp.session.on_join')
|
||||
import iotronic.wamp.functions as fun
|
||||
|
||||
self.subscribe(fun.node_on_leave, 'wamp.session.on_leave')
|
||||
self.subscribe(fun.node_on_join, 'wamp.session.on_join')
|
||||
|
||||
try:
|
||||
self.register(fun.registration, u'stack4things.register')
|
||||
self.register(fun.echo, AGENT_HOST + u'.stack4things.echo')
|
||||
if CONF.wamp.register_agent:
|
||||
self.register(fun.registration, u'stack4things.register')
|
||||
LOG.info("I have been set as registration agent")
|
||||
self.register(fun.registration_uuid,
|
||||
AGENT_HOST + u'.stack4things.register_uuid')
|
||||
self.register(fun.echo,
|
||||
AGENT_HOST + u'.stack4things.echo')
|
||||
LOG.info("procedure registered")
|
||||
except Exception as e:
|
||||
LOG.error("could not register procedure: {0}".format(e))
|
||||
|
@ -199,21 +208,19 @@ class WampAgent(object):
|
|||
|
||||
try:
|
||||
wpa = self.dbapi.register_wampagent(
|
||||
{'hostname': self.host})
|
||||
|
||||
except exception.ConductorAlreadyRegistered:
|
||||
LOG.warn(_LW("A conductor with hostname %(hostname)s "
|
||||
"was previously registered. Updating registration"),
|
||||
{'hostname': self.host})
|
||||
{'hostname': self.host, 'wsurl': CONF.wamp.wamp_transport_url})
|
||||
|
||||
except exception.WampAgentAlreadyRegistered:
|
||||
LOG.warn(_LW("A wampagent with hostname %(hostname)s "
|
||||
"was previously registered. Updating registration"),
|
||||
{'hostname': self.host})
|
||||
|
||||
wpa = self.dbapi.register_wampagent({'hostname': self.host},
|
||||
update_existing=True)
|
||||
wpa = self.dbapi.register_wampagent(
|
||||
{'hostname': self.host, 'wsurl': CONF.wamp.wamp_transport_url},
|
||||
update_existing=True)
|
||||
self.wampagent = wpa
|
||||
self.wampagent.ragent = CONF.wamp.register_agent
|
||||
self.wampagent.save()
|
||||
|
||||
global AGENT_HOST
|
||||
AGENT_HOST = self.host
|
||||
|
|
|
@ -19,7 +19,6 @@ from iotronic import objects
|
|||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
@ -44,7 +43,7 @@ def echo(data):
|
|||
return data
|
||||
|
||||
|
||||
def board_on_leave(session_id):
|
||||
def node_on_leave(session_id):
|
||||
LOG.debug('A node with %s disconnectd', session_id)
|
||||
try:
|
||||
old_session = objects.SessionWP({}).get_by_session_id({}, session_id)
|
||||
|
@ -55,11 +54,13 @@ def board_on_leave(session_id):
|
|||
LOG.debug('session %s not found', session_id)
|
||||
|
||||
|
||||
def registration(data):
|
||||
token = data[0]
|
||||
session = data[1]
|
||||
return c.registration(ctxt, token, session)
|
||||
def registration_uuid(uuid, session):
|
||||
return c.registration_uuid(ctxt, uuid, session)
|
||||
|
||||
|
||||
def board_on_join(session_id):
|
||||
def registration(code, session):
|
||||
return c.registration(ctxt, code, session)
|
||||
|
||||
|
||||
def node_on_join(session_id):
|
||||
LOG.debug('A node with %s joined', session_id)
|
|
@ -45,7 +45,9 @@ CREATE TABLE IF NOT EXISTS `iotronic`.`wampagents` (
|
|||
`updated_at` DATETIME NULL DEFAULT NULL,
|
||||
`id` INT(11) NOT NULL AUTO_INCREMENT,
|
||||
`hostname` VARCHAR(255) NOT NULL,
|
||||
`wsurl` VARCHAR(255) NOT NULL,
|
||||
`online` TINYINT(1) NULL DEFAULT NULL,
|
||||
`ragent` TINYINT(1) NULL DEFAULT NULL,
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE INDEX `uniq_wampagents0hostname` (`hostname` ASC))
|
||||
ENGINE = InnoDB
|
||||
|
@ -66,9 +68,11 @@ CREATE TABLE IF NOT EXISTS `iotronic`.`nodes` (
|
|||
`code` VARCHAR(25) NOT NULL,
|
||||
`status` VARCHAR(15) NULL DEFAULT NULL,
|
||||
`name` VARCHAR(255) NULL DEFAULT NULL,
|
||||
`device` VARCHAR(255) NOT NULL,
|
||||
`type` VARCHAR(255) NOT NULL,
|
||||
`agent` VARCHAR(255) NULL DEFAULT NULL,
|
||||
`session` VARCHAR(255) NULL DEFAULT NULL,
|
||||
`mobile` TINYINT(1) NOT NULL DEFAULT '0',
|
||||
`config` TEXT NULL DEFAULT NULL,
|
||||
`extra` TEXT NULL DEFAULT NULL,
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE INDEX `uuid` (`uuid` ASC),
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
#! /bin/bash
|
||||
|
||||
mysql -u iotronic -h$1 -p iotronic < iotronic.sql
|
Loading…
Reference in New Issue