communication between conductor, agents and nodes improved

Change-Id: I083bd104d8a0546f95f59d02004819fb0c3577d5
This commit is contained in:
Fabio Verboso 2017-02-15 12:17:22 +01:00
parent 400a934abf
commit 0b1826320b
20 changed files with 352 additions and 71 deletions

View File

@ -19,11 +19,8 @@ Iotronic Conductor
from oslo_config import cfg
from iotronic.conductor.manager import ConductorManager
import socket
CONF = cfg.CONF
if __name__ == '__main__':
cond=ConductorManager('iotronic')
cond = ConductorManager(socket.gethostname())

View File

@ -20,7 +20,7 @@ Iotronic Wamp Agent
"""
from iotronic.wamp.agent import WampAgent
import socket
if __name__ == '__main__':
wa=WampAgent('agent')
wa = WampAgent(socket.gethostname())

View File

@ -34,7 +34,7 @@ class Node(base.APIBase):
code = wsme.wsattr(wtypes.text)
status = wsme.wsattr(wtypes.text)
name = wsme.wsattr(wtypes.text)
device = wsme.wsattr(wtypes.text)
type = wsme.wsattr(wtypes.text)
session = wsme.wsattr(wtypes.text)
mobile = types.boolean
location = wsme.wsattr([loc.Location])

View File

@ -293,6 +293,10 @@ class WampAgentNotFound(NotFound):
message = _("WampAgent %(wampagent)s could not be found.")
class WampRegistrationAgentNotFound(NotFound):
message = _("No Wamp Registration Agent could not be found.")
class WampAgentAlreadyRegistered(IotronicException):
message = _("WampAgent %(wampagent)s already registered.")

View File

@ -149,13 +149,13 @@ inspected node shall transition to MANAGEABLE status.
INSPECTFAIL = 'inspect failed'
""" Node inspection failed. """
UPDATE_ALLOWED_STATES = (DEPLOYFAIL, INSPECTING, INSPECTFAIL, CLEANFAIL)
"""Transitional states in which we allow updating a node."""
# NEW
OPERATIVE = 'operative'
MAINTENANCE = 'maintenance'
REGISTERED = 'registered'
##############
# Power states
@ -185,6 +185,7 @@ def on_enter(new_state, event):
LOG.debug("Entering new state '%s' in response to event '%s'",
new_state, event)
watchers = {}
watchers['on_exit'] = on_exit
watchers['on_enter'] = on_enter

View File

@ -13,59 +13,116 @@
# under the License.
from iotronic.common import exception
from iotronic.common import states
from iotronic.conductor.provisioner import Provisioner
from iotronic import objects
from iotronic.objects import base as objects_base
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
LOG = logging.getLogger(__name__)
serializer = objects_base.IotronicObjectSerializer()
a = ['wagent2', 'wagent1']
def get_best_agent():
agent = a.pop(0)
LOG.debug('Selected agent: %s', agent)
a.append(agent)
return agent
class ConductorEndpoint(object):
def __init__(self):
def __init__(self, ragent):
transport = oslo_messaging.get_transport(cfg.CONF)
self.target = oslo_messaging.Target()
self.wamp_agent_client = oslo_messaging.RPCClient(transport,
self.target)
self.ragent = ragent
def echo(self, ctx, data):
LOG.info("ECHO: %s" % data)
return data
def registration(self, ctx, token, session_num):
def registration_uuid(self, ctx, uuid, session_num):
LOG.debug('Receved registration from %s with session %s',
token, session_num)
uuid, session_num)
try:
node = objects.Node.get_by_code({}, token)
node = objects.Node.get_by_uuid(ctx, uuid)
except Exception:
return exception.NodeNotFound(node=token)
return exception.NodeNotFound(node=uuid)
try:
old_session = objects.SessionWP(
{}).get_session_by_node_uuid(node.uuid, valid=True)
ctx).get_session_by_node_uuid(node.uuid, valid=True)
old_session.valid = False
old_session.save()
except Exception:
LOG.debug('valid session for %s not found', node.uuid)
node.status = states.REGISTERED
node.save()
session = objects.SessionWP(ctx)
session.node_id = node.id
session.node_uuid = node.uuid
session.session_id = session_num
session.create()
session.save()
return
def registration(self, ctx, code, session_num):
LOG.debug('Receved registration from %s with session %s',
code, session_num)
try:
node = objects.Node.get_by_code(ctx, code)
except Exception:
return exception.NodeNotFound(node=code)
try:
old_session = objects.SessionWP(ctx
).get_session_by_node_uuid(
node.uuid, valid=True)
old_session.valid = False
old_session.save()
except Exception:
LOG.debug('valid session for %s Not found', node.uuid)
session = objects.SessionWP({})
session = objects.SessionWP(ctx)
session.node_id = node.id
session.node_uuid = node.uuid
session.session_id = session_num
session.create()
session.save()
return
node.agent = get_best_agent()
agent = objects.WampAgent.get_by_hostname(ctx, node.agent)
prov = Provisioner(node)
prov.conf_registration_agent(self.ragent.wsurl)
prov.conf_main_agent(agent.wsurl)
node.config = prov.get_config()
node.save()
LOG.debug('sending this conf %s', node.config)
return node.config
def destroy_node(self, ctx, node_id):
LOG.debug('Destroying node with id %s',
node_id)
LOG.info('Destroying node with id %s',
node_id)
node = objects.Node.get_by_uuid(ctx, node_id)
prov = Provisioner()
prov.conf_clean()
p = prov.get_config()
LOG.debug('sending this conf %s', p)
self.execute_on_board(ctx, node_id, 'destroyNode', (p,))
node.destroy()
return {}
return
def create_node(self, ctx, node_obj, location_obj):
new_node = serializer.deserialize_entity(ctx, node_obj)
@ -79,21 +136,22 @@ class ConductorEndpoint(object):
return serializer.serialize_entity(ctx, new_node)
def execute_on_board(self, ctx, board, wamp_rpc_call, wamp_rpc_args):
LOG.debug('Executing \"%s\" on the board: %s', wamp_rpc_call, board)
uuid_agent = 'agent'
# ASAP get agent from node_uuid
try:
node = objects.Node.get_by_uuid(ctx, board)
except Exception:
return exception.NodeNotFound(node=board)
s4t_topic = 's4t_invoke_wamp'
full_topic = uuid_agent + '.' + s4t_topic
ctxt = {}
full_topic = node.agent + '.' + s4t_topic
self.target.topic = full_topic
self.wamp_agent_client.prepare(timeout=10)
full_wamp_call = 'iotronic.' + board + "." + wamp_rpc_call
return self.wamp_agent_client.call(ctxt, full_topic,
return self.wamp_agent_client.call(ctx, full_topic,
wamp_rpc_call=full_wamp_call,
data=wamp_rpc_args)

View File

@ -25,6 +25,7 @@ import time
LOG = logging.getLogger(__name__)
MANAGER_TOPIC = 'iotronic.conductor_manager'
RAGENT = None
conductor_opts = [
cfg.StrOpt('api_url',
@ -71,11 +72,18 @@ class ConductorManager(object):
transport = oslo_messaging.get_transport(cfg.CONF)
target = oslo_messaging.Target(topic=self.topic, server=self.host,
version=self.RPC_API_VERSION)
ragent = self.dbapi.get_registration_wampagent()
LOG.info("Found registration agent: %s on %s",
ragent.hostname, ragent.wsurl)
endpoints = [
endp.ConductorEndpoint(),
endp.ConductorEndpoint(ragent),
]
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
executor='threading')
try:
server.start()
while True:

View File

@ -0,0 +1,80 @@
# coding=utf-8
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from iotronic.objects import base as objects_base
serializer = objects_base.IotronicObjectSerializer()
class Provisioner(object):
def __init__(self, node=None):
if not node:
self.config = {"iotronic": {"extra": {}}}
else:
self.config = node.config
if 'iotronic' not in self.config:
self.config = {"iotronic": {"extra": {}}}
if 'node' not in self.config['iotronic']:
self.config['iotronic']['node'] = {}
self.config['iotronic']['node'] = node.as_dict()
self.config['iotronic']['node']['created_at'] = \
node._attr_to_primitive('created_at')
self.config['iotronic']['node']['updated_at'] = \
node._attr_to_primitive('updated_at')
# workaround until node properties are not changed
self.config['iotronic']['node']['type'] = 'yun'
try:
del self.config['iotronic']['node']['config']
except Exception:
pass
def get_config(self):
return self.config
def conf_registration_agent(self,
url="ws://<WAMP-SERVER>:<WAMP-PORT>/",
realm="s4t"):
if 'wamp' not in self.config['iotronic']:
self.config['iotronic']['wamp'] = {}
if "registration-agent" not in self.config['iotronic']['wamp']:
self.config['iotronic']['wamp']['registration-agent'] = {}
if 'url' not in self.config['iotronic']['wamp']['registration-agent']:
self.config['iotronic']['wamp']['registration-agent']['url'] = ""
if 'realm' not in \
self.config['iotronic']['wamp']['registration-agent']:
self.config['iotronic']['wamp']['registration-agent']['realm'] = ""
self.config['iotronic']['wamp']['registration-agent']['url'] = url
self.config['iotronic']['wamp']['registration-agent']['realm'] = realm
def conf_main_agent(self,
url="ws://<WAMP-SERVER>:<WAMP-PORT>/",
realm="s4t"):
if 'wamp' not in self.config['iotronic']:
self.config['iotronic']['wamp'] = {}
if "main-agent" not in self.config['iotronic']['wamp']:
self.config['iotronic']['wamp']['main-agent'] = {}
if 'url' not in self.config['iotronic']['wamp']['main-agent']:
self.config['iotronic']['wamp']['main-agent']['url'] = ""
if 'realm' not in self.config['iotronic']['wamp']['main-agent']:
self.config['iotronic']['wamp']['main-agent']['realm'] = ""
self.config['iotronic']['wamp']['main-agent']['url'] = url
self.config['iotronic']['wamp']['main-agent']['realm'] = realm
def conf_clean(self):
self.conf_registration_agent()
if 'node' not in self.config['iotronic']:
self.config['iotronic']['node'] = {}
self.config['iotronic']['node']['token'] = "<REGISTRATION-TOKEN>"

View File

@ -55,17 +55,29 @@ class ConductorAPI(object):
cctxt = self.client.prepare(topic=topic or self.topic, version='1.0')
return cctxt.call(context, 'echo', data=data)
def registration(self, context, token, session_num, topic=None):
def registration(self, context, code, session_num, topic=None):
"""Registration of a node.
:param context: request context.
:param token: token used for the first registration
:param code: token used for the first registration
:param session_num: wamp session number
:param topic: RPC topic. Defaults to self.topic.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.0')
return cctxt.call(context, 'registration',
token=token, session_num=session_num)
code=code, session_num=session_num)
def registration_uuid(self, context, uuid, session_num, topic=None):
"""Registration of a node.
:param context: request context.
:param uuid: uuid node
:param session_num: wamp session number
:param topic: RPC topic. Defaults to self.topic.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.0')
return cctxt.call(context, 'registration_uuid',
uuid=uuid, session_num=session_num)
def create_node(self, context, node_obj, location_obj, topic=None):
"""Add a node on the cloud

View File

@ -250,6 +250,14 @@ class Connection(object):
:raises: WampAgentNotFound
"""
@abc.abstractmethod
def get_registration_wampagent(self):
"""Retrieve the registration wampagent record from the database.
:returns: A wampagent.
:raises: WampAgentNotFound
"""
@abc.abstractmethod
def unregister_wampagent(self, hostname):
"""Remove this wampagent from the service registry immediately.

View File

@ -163,7 +163,7 @@ class Connection(api.Connection):
if 'uuid' not in values:
values['uuid'] = uuidutils.generate_uuid()
if 'status' not in values:
values['status'] = states.OPERATIVE
values['status'] = states.REGISTERED
node = models.Node()
node.update(values)
@ -261,19 +261,6 @@ class Connection(api.Connection):
raise exception.NodeAssociated(
node=node_id, instance=ref.instance_uuid)
if 'provision_state' in values:
values['provision_updated_at'] = timeutils.utcnow()
if values['provision_state'] == states.INSPECTING:
values['inspection_started_at'] = timeutils.utcnow()
values['inspection_finished_at'] = None
elif (ref.provision_state == states.INSPECTING and
values['provision_state'] == states.MANAGEABLE):
values['inspection_finished_at'] = timeutils.utcnow()
values['inspection_started_at'] = None
elif (ref.provision_state == states.INSPECTING and
values['provision_state'] == states.INSPECTFAIL):
values['inspection_started_at'] = None
ref.update(values)
return ref
@ -425,6 +412,14 @@ class Connection(api.Connection):
except NoResultFound:
raise exception.WampAgentNotFound(wampagent=hostname)
def get_registration_wampagent(self):
try:
return (model_query(models.WampAgent)
.filter_by(ragent=True, online=True)
.one())
except NoResultFound:
raise exception.WampRegistrationAgentNotFound()
def unregister_wampagent(self, hostname):
session = get_session()
with session.begin():

View File

@ -133,7 +133,9 @@ class WampAgent(Base):
)
id = Column(Integer, primary_key=True)
hostname = Column(String(255), nullable=False)
wsurl = Column(String(255), nullable=False)
online = Column(Boolean, default=True)
ragent = Column(Boolean, default=False)
class Node(Base):
@ -150,9 +152,11 @@ class Node(Base):
code = Column(String(25))
status = Column(String(15), nullable=True)
name = Column(String(255), nullable=True)
device = Column(String(255))
type = Column(String(255))
agent = Column(String(255), nullable=True)
session = Column(String(255), nullable=True)
mobile = Column(Boolean, default=False)
config = Column(JSONEncodedDict)
extra = Column(JSONEncodedDict)

View File

@ -16,16 +16,18 @@ from iotronic.objects import conductor
from iotronic.objects import location
from iotronic.objects import node
from iotronic.objects import sessionwp
from iotronic.objects import wampagent
Conductor = conductor.Conductor
Node = node.Node
Location = location.Location
SessionWP = sessionwp.SessionWP
WampAgent = wampagent.WampAgent
__all__ = (
Conductor,
Node,
Location,
SessionWP,
WampAgent,
)

View File

@ -34,9 +34,11 @@ class Node(base.IotronicObject):
'code': obj_utils.str_or_none,
'status': obj_utils.str_or_none,
'name': obj_utils.str_or_none,
'device': obj_utils.str_or_none,
'type': obj_utils.str_or_none,
'agent': obj_utils.str_or_none,
'session': obj_utils.str_or_none,
'mobile': bool,
'config': obj_utils.dict_or_none,
'extra': obj_utils.dict_or_none,
}
@ -223,6 +225,7 @@ class Node(base.IotronicObject):
"""
current = self.__class__.get_by_uuid(self._context, self.uuid)
for field in self.fields:
if (hasattr(self, base.get_attrname(field)) and
if (hasattr(
self, base.get_attrname(field)) and
self[field] != current[field]):
self[field] = current[field]

View File

@ -113,6 +113,7 @@ class SessionWP(base.IotronicObject):
:returns: a list of :class:`SessionWP` object.
"""
db_sessions = cls.dbapi.get_session_list(limit=limit,
marker=marker,
sort_key=sort_key,
@ -207,6 +208,7 @@ class SessionWP(base.IotronicObject):
"""
current = self.__class__.get_by_uuid(self._context, uuid=self.uuid)
for field in self.fields:
if (hasattr(self, base.get_attrname(field)) and
if (hasattr(
self, base.get_attrname(field)) and
self[field] != current[field]):
self[field] = current[field]

View File

@ -0,0 +1,92 @@
# coding=utf-8
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from iotronic.common.i18n import _
from iotronic.db import api as db_api
from iotronic.objects import base
class WampAgent(base.IotronicObject):
dbapi = db_api.get_instance()
fields = {
'id': int,
'hostname': str,
'wsurl': str,
'online': bool,
'ragent': bool,
}
@staticmethod
def _from_db_object(wampagent, db_obj):
"""Converts a database entity to a formal object."""
for field in wampagent.fields:
wampagent[field] = db_obj[field]
wampagent.obj_reset_changes()
return wampagent
@base.remotable_classmethod
def get_by_hostname(cls, context, hostname):
"""Get a WampAgent record by its hostname.
:param hostname: the hostname on which a WampAgent is running
:returns: a :class:`WampAgent` object.
"""
db_obj = cls.dbapi.get_wampagent(hostname)
wampagent = WampAgent._from_db_object(cls(context), db_obj)
return wampagent
@base.remotable_classmethod
def get_registration_agent(cls, context=None):
"""Get a Registration WampAgent
:param hostname: the hostname on which a WampAgent is running
:returns: a :class:`WampAgent` object.
"""
db_obj = cls.dbapi.get_registration_wampagent()
wampagent = WampAgent._from_db_object(cls(context), db_obj)
return wampagent
def save(self, context):
"""Save is not supported by WampAgent objects."""
raise NotImplementedError(
_('Cannot update a wampagent record directly.'))
@base.remotable
def refresh(self, context=None):
"""Loads and applies updates for this WampAgent.
Loads a :class:`WampAgent` with the same uuid from the database and
checks for updated attributes.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: WampAgent(context)
"""
current = self.__class__.get_by_hostname(self._context,
hostname=self.hostname)
for field in self.fields:
if (hasattr(
self, base.get_attrname(field)) and
self[field] != current[field]):
self[field] = current[field]
@base.remotable
def touch(self, context):
"""Touch this wampagent's DB record, marking it as up-to-date."""
self.dbapi.touch_wampagent(self.hostname)

View File

@ -36,6 +36,9 @@ wamp_opts = [
cfg.StrOpt('wamp_realm',
default='s4t',
help=('realm broker')),
cfg.BoolOpt('register_agent',
default=False,
help=('Flag for se a registration agent')),
]
CONF = cfg.CONF
@ -96,14 +99,20 @@ class WampFrontend(wamp.ApplicationSession):
def onJoin(self, details):
global wamp_session_caller, AGENT_HOST
wamp_session_caller = self
import iotronic.wamp.registerd_functions as fun
self.subscribe(fun.board_on_leave, 'wamp.session.on_leave')
self.subscribe(fun.board_on_join, 'wamp.session.on_join')
import iotronic.wamp.functions as fun
self.subscribe(fun.node_on_leave, 'wamp.session.on_leave')
self.subscribe(fun.node_on_join, 'wamp.session.on_join')
try:
self.register(fun.registration, u'stack4things.register')
self.register(fun.echo, AGENT_HOST + u'.stack4things.echo')
if CONF.wamp.register_agent:
self.register(fun.registration, u'stack4things.register')
LOG.info("I have been set as registration agent")
self.register(fun.registration_uuid,
AGENT_HOST + u'.stack4things.register_uuid')
self.register(fun.echo,
AGENT_HOST + u'.stack4things.echo')
LOG.info("procedure registered")
except Exception as e:
LOG.error("could not register procedure: {0}".format(e))
@ -199,21 +208,19 @@ class WampAgent(object):
try:
wpa = self.dbapi.register_wampagent(
{'hostname': self.host})
except exception.ConductorAlreadyRegistered:
LOG.warn(_LW("A conductor with hostname %(hostname)s "
"was previously registered. Updating registration"),
{'hostname': self.host})
{'hostname': self.host, 'wsurl': CONF.wamp.wamp_transport_url})
except exception.WampAgentAlreadyRegistered:
LOG.warn(_LW("A wampagent with hostname %(hostname)s "
"was previously registered. Updating registration"),
{'hostname': self.host})
wpa = self.dbapi.register_wampagent({'hostname': self.host},
update_existing=True)
wpa = self.dbapi.register_wampagent(
{'hostname': self.host, 'wsurl': CONF.wamp.wamp_transport_url},
update_existing=True)
self.wampagent = wpa
self.wampagent.ragent = CONF.wamp.register_agent
self.wampagent.save()
global AGENT_HOST
AGENT_HOST = self.host

View File

@ -19,7 +19,6 @@ from iotronic import objects
from oslo_config import cfg
from oslo_log import log
LOG = log.getLogger(__name__)
CONF = cfg.CONF
@ -44,7 +43,7 @@ def echo(data):
return data
def board_on_leave(session_id):
def node_on_leave(session_id):
LOG.debug('A node with %s disconnectd', session_id)
try:
old_session = objects.SessionWP({}).get_by_session_id({}, session_id)
@ -55,11 +54,13 @@ def board_on_leave(session_id):
LOG.debug('session %s not found', session_id)
def registration(data):
token = data[0]
session = data[1]
return c.registration(ctxt, token, session)
def registration_uuid(uuid, session):
return c.registration_uuid(ctxt, uuid, session)
def board_on_join(session_id):
def registration(code, session):
return c.registration(ctxt, code, session)
def node_on_join(session_id):
LOG.debug('A node with %s joined', session_id)

View File

@ -45,7 +45,9 @@ CREATE TABLE IF NOT EXISTS `iotronic`.`wampagents` (
`updated_at` DATETIME NULL DEFAULT NULL,
`id` INT(11) NOT NULL AUTO_INCREMENT,
`hostname` VARCHAR(255) NOT NULL,
`wsurl` VARCHAR(255) NOT NULL,
`online` TINYINT(1) NULL DEFAULT NULL,
`ragent` TINYINT(1) NULL DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE INDEX `uniq_wampagents0hostname` (`hostname` ASC))
ENGINE = InnoDB
@ -66,9 +68,11 @@ CREATE TABLE IF NOT EXISTS `iotronic`.`nodes` (
`code` VARCHAR(25) NOT NULL,
`status` VARCHAR(15) NULL DEFAULT NULL,
`name` VARCHAR(255) NULL DEFAULT NULL,
`device` VARCHAR(255) NOT NULL,
`type` VARCHAR(255) NOT NULL,
`agent` VARCHAR(255) NULL DEFAULT NULL,
`session` VARCHAR(255) NULL DEFAULT NULL,
`mobile` TINYINT(1) NOT NULL DEFAULT '0',
`config` TEXT NULL DEFAULT NULL,
`extra` TEXT NULL DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE INDEX `uuid` (`uuid` ASC),

3
utils/loaddb Executable file
View File

@ -0,0 +1,3 @@
#! /bin/bash
mysql -u iotronic -h$1 -p iotronic < iotronic.sql