diff --git a/iotronic/conductor/manager.py b/iotronic/conductor/manager.py index 7dbd5d3..94aa5f3 100644 --- a/iotronic/conductor/manager.py +++ b/iotronic/conductor/manager.py @@ -28,8 +28,6 @@ from iotronic.common.i18n import _LI from iotronic.common.i18n import _LW from iotronic.conductor import task_manager -from iotronic.wamp.rpcwamp import RPC_Wamp -from iotronic.wamp.wampresponse import WampResponse from iotronic.openstack.common import periodic_task @@ -123,7 +121,6 @@ class ConductorManager(periodic_task.PeriodicTasks): host = CONF.host self.host = host self.topic = topic - self.wamp = RPC_Wamp() def init_host(self): self.dbapi = dbapi.get_instance() @@ -246,6 +243,7 @@ class ConductorManager(periodic_task.PeriodicTasks): :raises: NodeNotConnected if the node is not connected. """ + """ REMOVE ASAP with task_manager.acquire(context, node_id) as task: node = task.node @@ -260,3 +258,5 @@ class ConductorManager(periodic_task.PeriodicTasks): {'node': node.uuid}) else: raise exception.NodeNotConnected(node=node.uuid) + """ + pass diff --git a/iotronic/wamp/functions.py b/iotronic/wamp/functions.py deleted file mode 100644 index c985a7d..0000000 --- a/iotronic/wamp/functions.py +++ /dev/null @@ -1,93 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from iotronic.common import exception -from iotronic import objects -from iotronic.wamp.wampresponse import WampResponse -from oslo_log import log - -LOG = log.getLogger(__name__) - - -def leave_function(session_id): - LOG.debug('A node with %s disconnectd', session_id) - try: - old_session = objects.SessionWP({}).get_by_session_id({}, session_id) - old_session.valid = False - old_session.save() - LOG.debug('Session %s deleted', session_id) - except Exception: - LOG.debug('session %s not found', session_id) - - -def echo(text): - LOG.debug(text) - return text - - -def registration(code_node, session_num): - LOG.debug('Receved registration from %s with session %s', - code_node, session_num) - try: - node = objects.Node.get_by_code({}, code_node) - except Exception: - return exception.NodeNotFound(node=code_node) - try: - old_session = objects.SessionWP( - {}).get_session_by_node_uuid(node.uuid, valid=True) - old_session.valid = False - old_session.save() - except Exception: - LOG.debug('valid session for %s Not found', node.uuid) - - session = objects.SessionWP({}) - session.node_id = node.id - session.node_uuid = node.uuid - session.session_id = session_num - session.create() - session.save() - - r = WampResponse() - - r.addSection('config', []) - r.addConfig('add', 'config:node:uuid', node.uuid, 'config') - r.addConfig('add', 'config:iotronic:command-agent', - {"url": "ws://cmd-iotronic", "port": "8181", - "realm": "s4t"}, 'config') - - return r.getResponse() - - -def registration_uuid(uuid, session_num): - LOG.debug('Receved registration from %s with session %s', - uuid, session_num) - try: - node = objects.Node.get_by_uuid({}, uuid) - except Exception: - return exception.NodeNotFound(node=uuid) - try: - old_session = objects.SessionWP( - {}).get_session_by_node_uuid(node.uuid, valid=True) - old_session.valid = False - old_session.save() - except Exception: - LOG.debug('valid session for %s not found', node.uuid) - - session = objects.SessionWP({}) - session.node_id = node.id - session.node_uuid = node.uuid - session.session_id = session_num - session.create() - session.save() - r = WampResponse() - r.addSection('result', 0) - return r.getResponse() diff --git a/iotronic/wamp/rpcwamp.py b/iotronic/wamp/rpcwamp.py deleted file mode 100644 index 066e557..0000000 --- a/iotronic/wamp/rpcwamp.py +++ /dev/null @@ -1,171 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import multiprocessing - -from autobahn.twisted.wamp import ApplicationRunner -from autobahn.twisted.wamp import ApplicationSession -from twisted.internet.defer import inlineCallbacks - -from oslo_config import cfg -from oslo_log import log -from twisted.internet import reactor - - -from multiprocessing import Pipe - - -LOG = log.getLogger(__name__) - -wamp_opts = [ - cfg.StrOpt('wamp_ip', - default='127.0.0.1', - help=('URL of wamp broker')), - cfg.IntOpt('wamp_port', - default=8181, - help='port wamp broker'), - cfg.StrOpt('wamp_realm', - default='s4t', - help=('realm broker')), -] -CONF = cfg.CONF -CONF.register_opts(wamp_opts, 'wamp') - - -class RPCWampServerManager(ApplicationSession): - - def __init__(self, config=None): - ApplicationSession.__init__(self, config) - LOG.info("RPC wamp manager created") - - ''' - #unused methods - def onConnect(self): - print("transport connected") - self.join(self.config.realm) - - def onChallenge(self, challenge): - print("authentication challenge received") - - def onLeave(self, details): - print("session left") - import os, signal - os.kill(multi.pid, signal.SIGKILL) - - def onDisconnect(self): - print("transport disconnected") - ''' - - @inlineCallbacks - def onJoin(self, details): - LOG.info('RPC Wamp Session ready') - import iotronic.wamp.functions as fun - self.subscribe(fun.leave_function, 'wamp.session.on_leave') - - try: - yield self.register(fun.echo, - u'stack4things.echo') - yield self.register(fun.registration, - u'stack4things.register') - yield self.register(fun.registration_uuid, - u'stack4things.register_uuid') - - LOG.info("Procedures registered") - except Exception as e: - LOG.error("could not register procedure: {0}".format(e)) - - -class RPCWampServer(object): - - def __init__(self, ip, port, realm): - self.ip = unicode(ip) - self.port = unicode(port) - self.realm = unicode(realm) - self._url = "ws://" + self.ip + ":" + self.port + "/ws" - self.runner = ApplicationRunner( - url=unicode(self._url), - realm=self.realm, - # debug = True, debug_wamp = True, debug_app = True - ) - self.runner.run(RPCWampServerManager, start_reactor=False) - - -class RPCWampManagerClient(ApplicationSession): - """An application component calling the different backend procedures. - - """ - - @inlineCallbacks - def onJoin(self, details): - LOG.debug("session attached") - rpc = self.config.extra['rpc'] - args = self.config.extra['args'] - self.pipe_out = self.config.extra['pipe'] - res = {'response': '', 'error': ''} - try: - res['response'] = yield self.call(rpc, args) - res['error'] = 0 - except Exception as e: - LOG.error(e) - res['response'] = e - res['error'] = 1 - self.pipe_out.send(res) - self.leave() - - def onDisconnect(self): - reactor.stop() - - -class RPCWampClient(object): - - def __init__(self, ip, port, realm, rpc, args, b_a_ext): - self.ip = unicode(ip) - self.port = unicode(port) - self.realm = unicode(realm) - self._url = "ws://" + self.ip + ":" + self.port + "/ws" - - self.runner = ApplicationRunner( - url=unicode(self._url), - realm=self.realm, - extra={'rpc': rpc, 'args': args, 'pipe': b_a_ext}, - # debug = False, debug_wamp = False, debug_app = False - ) - self.runner.run(RPCWampManagerClient, start_reactor=False) - - -class RPC_Wamp(object): - - def __init__(self): - self.ip = unicode(CONF.wamp.wamp_ip) - self.port = unicode(CONF.wamp.wamp_port) - self.realm = unicode(CONF.wamp.wamp_realm) - self.server = RPCWampServer(self.ip, self.port, self.realm) - self.b_a_int, self.b_a_ext = Pipe() - server_process = multiprocessing.Process(target=reactor.run, args=()) - server_process.start() - - def rpc_call(self, rpc, *args): - res = '' - RPCWampClient( - self.ip, self.port, self.realm, rpc, args, self.b_a_ext) - client_process = multiprocessing.Process(target=reactor.run, args=()) - client_process.start() - - while True: - if self.b_a_int.poll(): - res = self.b_a_int.recv() - client_process.join() - break - if res['error'] == 0: - return res['response'] - else: - return {'result': 1} diff --git a/iotronic/wamp/wampresponse.py b/iotronic/wamp/wampresponse.py deleted file mode 100644 index b0f7d19..0000000 --- a/iotronic/wamp/wampresponse.py +++ /dev/null @@ -1,59 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import json - - -class WampResponse(object): - - def __init__(self): - self.response = {} - - def getResponse(self): - return json.dumps(self.response) - - def addSection(self, name, value=''): - self.response[name] = value - - def addElement(self, position, value, section): - if isinstance(self.response[section], list): - self.response[section].append({"position": position, - "value": value}) - elif isinstance(self.response[section], dict): - self.response[section][position] = value - - def addConfig(self, action, position, value, section='config'): - if isinstance(self.response[section], list): - self.response[section].append({"action": action, - "position": position, - "value": value}) - - def removeSection(self, name): - self.response.pop(name, None) - - def clearSection(self, name): - self.response[name] = '' - - def clearConfig(self): - self.addSection('config', []) - self.addConfig('clear', 'config', {"iotronic": {"registration-agent": { - "url": "", - "port": "", - "realm": "" - }}, - "log": { - "logfile": "s4t-lightning-rod.log", - "loglevel": "info" - }, - "node": { - "token": ""} - })