From 9bf78aa2da06106a335b236bc50ce2036ca3c126 Mon Sep 17 00:00:00 2001 From: Nicola Peditto Date: Fri, 23 Nov 2018 11:02:53 +0100 Subject: [PATCH] Refactored Webservice Manager: - webservices exposed via subdomanins - added monitoring wstun tunnels (zombie processes managed) - "proxies" module moved - nginx management improved - requirements updated Added RPCs in Device Manager Added "serializers" to Autobahn "Component" Change-Id: Ie5f780c4cdcf854fd4c8af2d4ef6c3c52f68da10 --- .gitignore | 3 +- README.rst | 2 - .../plugins/__init__.py => __init__.py | 0 doc/installation/arduino_yun.rst | 4 +- doc/installation/raspberry_pi_3.rst | 86 +- doc/installation/ubuntu1604.rst | 85 +- etc/iotronic/iotronic.conf | 2 + etc/logrotate.d/lightning-rod.log | 1 - iotronic_lightningrod/common/exception.py | 57 +- iotronic_lightningrod/common/utils.py | 85 ++ iotronic_lightningrod/lightningrod.py | 1135 +++++++++-------- .../modules/device_manager.py | 73 +- .../modules/network_manager.py | 13 - .../modules/plugin_manager.py | 3 +- .../{ => modules}/plugins/Plugin.py | 0 .../{ => modules}/plugins/PluginSerializer.py | 0 .../{proxies => modules/plugins}/__init__.py | 0 .../{ => modules}/plugins/pluginApis.py | 0 .../plugins_examples/arduino_yun/demo.py | 0 .../plugins_examples/arduino_yun/led.json | 0 .../plugins_examples/arduino_yun/led.py | 0 .../plugins_examples/generics/echo.json | 0 .../plugins/plugins_examples/generics/echo.py | 4 +- .../plugins_examples/generics/runner.json | 0 .../plugins_examples/generics/runner.py | 4 +- .../{ => modules}/proxies/Proxy.py | 0 .../modules/proxies/__init__.py | 0 .../{ => modules}/proxies/configs/default | 0 .../{ => modules}/proxies/configs/iotronic | 0 .../{ => modules}/proxies/nginx.py | 182 +-- .../modules/service_manager.py | 303 ++++- iotronic_lightningrod/modules/test.py | 43 - iotronic_lightningrod/modules/vfs_library.py | 164 --- iotronic_lightningrod/modules/vfs_manager.py | 509 -------- .../modules/webservice_manager.py | 144 ++- requirements.txt | 4 +- scripts/lr_configure | 2 +- setup.cfg | 3 +- 38 files changed, 1289 insertions(+), 1622 deletions(-) rename iotronic_lightningrod/plugins/__init__.py => __init__.py (100%) create mode 100644 iotronic_lightningrod/common/utils.py rename iotronic_lightningrod/{ => modules}/plugins/Plugin.py (100%) rename iotronic_lightningrod/{ => modules}/plugins/PluginSerializer.py (100%) rename iotronic_lightningrod/{proxies => modules/plugins}/__init__.py (100%) rename iotronic_lightningrod/{ => modules}/plugins/pluginApis.py (100%) rename iotronic_lightningrod/{ => modules}/plugins/plugins_examples/arduino_yun/demo.py (100%) rename iotronic_lightningrod/{ => modules}/plugins/plugins_examples/arduino_yun/led.json (100%) rename iotronic_lightningrod/{ => modules}/plugins/plugins_examples/arduino_yun/led.py (100%) rename iotronic_lightningrod/{ => modules}/plugins/plugins_examples/generics/echo.json (100%) rename iotronic_lightningrod/{ => modules}/plugins/plugins_examples/generics/echo.py (86%) rename iotronic_lightningrod/{ => modules}/plugins/plugins_examples/generics/runner.json (100%) rename iotronic_lightningrod/{ => modules}/plugins/plugins_examples/generics/runner.py (89%) rename iotronic_lightningrod/{ => modules}/proxies/Proxy.py (100%) create mode 100644 iotronic_lightningrod/modules/proxies/__init__.py rename iotronic_lightningrod/{ => modules}/proxies/configs/default (100%) rename iotronic_lightningrod/{ => modules}/proxies/configs/iotronic (100%) rename iotronic_lightningrod/{ => modules}/proxies/nginx.py (64%) delete mode 100644 iotronic_lightningrod/modules/test.py delete mode 100644 iotronic_lightningrod/modules/vfs_library.py delete mode 100644 iotronic_lightningrod/modules/vfs_manager.py diff --git a/.gitignore b/.gitignore index 5761d21..b2e67ad 100644 --- a/.gitignore +++ b/.gitignore @@ -12,5 +12,6 @@ ChangeLog *.md .eggs dist +STUFF/ iotronic_lightningrod/modules/test.py -iotronic_lightningrod/modules/vfs_* \ No newline at end of file +iotronic_lightningrod/modules/vfs_* diff --git a/README.rst b/README.rst index df656db..51691b0 100644 --- a/README.rst +++ b/README.rst @@ -18,5 +18,3 @@ Installation guides * `Raspberry Pi 3 `_. * `Ubuntu 16.04 `_. - -* `Arduino YUN `_. diff --git a/iotronic_lightningrod/plugins/__init__.py b/__init__.py similarity index 100% rename from iotronic_lightningrod/plugins/__init__.py rename to __init__.py diff --git a/doc/installation/arduino_yun.rst b/doc/installation/arduino_yun.rst index ace8ce4..0ba7695 100644 --- a/doc/installation/arduino_yun.rst +++ b/doc/installation/arduino_yun.rst @@ -1,5 +1,5 @@ -IoTronic Lightning-rod installation guide for Arduino YUN -========================================================= +[DEPRECATED] IoTronic Lightning-rod installation guide for Arduino YUN +====================================================================== We tested this procedure on a Arduino YUN board with OpenWRT LininoIO image. diff --git a/doc/installation/raspberry_pi_3.rst b/doc/installation/raspberry_pi_3.rst index fe7420c..16e9e97 100644 --- a/doc/installation/raspberry_pi_3.rst +++ b/doc/installation/raspberry_pi_3.rst @@ -1,100 +1,42 @@ IoTronic Lightning-rod installation guide for Raspberry Pi 3 ============================================================ -We tested this procedure on a Raspberry Pi 3 board. +We tested this procedure on a Raspberry Pi 3 board (Raspbian). -Install from source code ------------------------- - -Install requirements -~~~~~~~~~~~~~~~~~~~~ - -:: - - pip install oslo.config oslo.log asyncio autobahn httplib2 psutil six - -Set up environment: -~~~~~~~~~~~~~~~~~~~ - -:: - - mkdir -p /var/lib/iotronic - mkdir /var/lib/iotronic/plugins - mkdir /var/log/iotronic/ - mkdir /etc/iotronic Install Lightning-rod ~~~~~~~~~~~~~~~~~~~~~ -Get source code -''''''''''''''' - :: - cd /var/lib/iotronic - git clone git://github.com/MDSLab/iotronic-lightning-rod-agent.git - mv iotronic-lightning-rod-agent/ iotronic-lightning-rod/ + pip3 install iotronic-lightningrod Deployment '''''''''' :: + lr_install - cd iotronic-lightning-rod/ - cp etc/iotronic/iotronic.conf /etc/iotronic/ - cp settings.example.json /var/lib/iotronic/settings.json - cp plugins.example.json /var/lib/iotronic/plugins.json - cp services.example.json /var/lib/iotronic/services.json - cp etc/systemd/system/s4t-lightning-rod.service /etc/systemd/system/lightning-rod.service - chmod +x /etc/systemd/system/lightning-rod.service - systemctl daemon-reload -- Edit configuration file: - - - nano /var/lib/iotronic/settings.json - - :: - - { - "iotronic": { - "board": { - "token": "" - }, - "wamp": { - "registration-agent": { - "url": "ws://:/", - "realm": "" - } - } - } - } - -- setup logrotate: -- nano /etc/logrotate.d/lightning-rod.log - - :: - - /var/log/iotronic/lightning-rod.log { - weekly - rotate = 3 - compress - su root root - maxsize 5M - } - -Building -'''''''' +Iotronic setup +'''''''''''''' :: + lr_configure - cd /var/lib/iotronic/iotronic-lightning-rod/ - python setup.py install +Arguments required: + : token released by IoTronic registration procedure + : IoTronic Crossbar server URL + +e.g. +:: + lr_configure 000001 ws(s)://:/ Execution: ~~~~~~~~~~ :: - systemctl restart lightning-rod.service + systemctl start lightning-rod.service tail -f /var/log/iotronic/lightning-rod.log \ No newline at end of file diff --git a/doc/installation/ubuntu1604.rst b/doc/installation/ubuntu1604.rst index 55874da..d2cfe2e 100644 --- a/doc/installation/ubuntu1604.rst +++ b/doc/installation/ubuntu1604.rst @@ -4,98 +4,39 @@ IoTronic Lightning-rod installation guide for Ubuntu 16.04 We tested this procedure on a Ubuntu 16.04 (also within a LXD container). Everything needs to be run as root. -Install from source code via Git --------------------------------- - -Install requirements -~~~~~~~~~~~~~~~~~~~~ - -:: - - pip install oslo.config oslo.log asyncio autobahn httplib2 psutil six - -Set up environment: -~~~~~~~~~~~~~~~~~~~ - -:: - - mkdir -p /var/lib/iotronic - mkdir /var/lib/iotronic/plugins - mkdir /var/log/iotronic/ - mkdir /etc/iotronic - Install Lightning-rod ~~~~~~~~~~~~~~~~~~~~~ -Get source code -''''''''''''''' - :: - cd /var/lib/iotronic - git clone git://github.com/MDSLab/iotronic-lightning-rod-agent.git - mv iotronic-lightning-rod-agent/ iotronic-lightning-rod/ + pip3 install iotronic-lightningrod Deployment '''''''''' :: + lr_install - cd iotronic-lightning-rod/ - cp etc/iotronic/iotronic.conf /etc/iotronic/ - cp settings.example.json /var/lib/iotronic/settings.json - cp plugins.example.json /var/lib/iotronic/plugins.json - cp services.example.json /var/lib/iotronic/services.json - cp etc/systemd/system/s4t-lightning-rod.service /etc/systemd/system/lightning-rod.service - chmod +x /etc/systemd/system/lightning-rod.service - systemctl daemon-reload -- Edit configuration file: - - - nano /var/lib/iotronic/settings.json - - :: - - { - "iotronic": { - "board": { - "token": "" - }, - "wamp": { - "registration-agent": { - "url": "ws://:/", - "realm": "" - } - } - } - } - -- setup logrotate: -- nano /etc/logrotate.d/lightning-rod.log - - :: - - /var/log/iotronic/lightning-rod.log { - weekly - rotate = 3 - compress - su root root - maxsize 5M - } - -Building -'''''''' +Iotronic setup +'''''''''''''' :: + lr_configure - cd /var/lib/iotronic/iotronic-lightning-rod/ - python setup.py install +Arguments required: + : token released by IoTronic registration procedure + : IoTronic Crossbar server URL + +e.g. +:: + lr_configure 000001 ws(s)://:/ Execution: ~~~~~~~~~~ :: - systemctl restart lightning-rod.service + systemctl start lightning-rod.service tail -f /var/log/iotronic/lightning-rod.log \ No newline at end of file diff --git a/etc/iotronic/iotronic.conf b/etc/iotronic/iotronic.conf index 6dcfbc6..bd47aef 100644 --- a/etc/iotronic/iotronic.conf +++ b/etc/iotronic/iotronic.conf @@ -4,3 +4,5 @@ skip_cert_verify = True debug = True proxy = nginx log_file = /var/log/iotronic/lightning-rod.log +alive_timer = 600 +rpc_alive_timer = 3 \ No newline at end of file diff --git a/etc/logrotate.d/lightning-rod.log b/etc/logrotate.d/lightning-rod.log index e29687f..959698c 100644 --- a/etc/logrotate.d/lightning-rod.log +++ b/etc/logrotate.d/lightning-rod.log @@ -1,6 +1,5 @@ /var/log/iotronic/lightning-rod.log /var/log/wstun/wstun.log{ copytruncate - create missingok weekly rotate = 3 diff --git a/iotronic_lightningrod/common/exception.py b/iotronic_lightningrod/common/exception.py index 078b9fc..27c7614 100644 --- a/iotronic_lightningrod/common/exception.py +++ b/iotronic_lightningrod/common/exception.py @@ -15,12 +15,13 @@ __author__ = "Nicola Peditto " -import os import signal from oslo_log import log as logging LOG = logging.getLogger(__name__) +from iotronic_lightningrod.common import utils + def manageTimeout(error_message, action): try: @@ -29,8 +30,36 @@ def manageTimeout(error_message, action): except TimeoutError as err: details = err.args[0] - LOG.warning("Board connection call timeout: " + str(details)) - os._exit(1) + if (action == "ws_alive"): + + LOG.warning("Iotronic RPC-ALIVE timeout details: " + str(details)) + try: + + utils.destroyWampSocket() + + except Exception as e: + LOG.warning("Iotronic RPC-ALIVE timeout error: " + str(e)) + + else: + LOG.warning("Board connection call timeout: " + str(details)) + utils.LR_restart() + +""" +def manageTimeoutALIVE(error_message, action): + try: + + raise TimeoutError(error_message, action) + + except TimeoutError as err: + details = err.args[0] + LOG.warning("Iotronic RPC-ALIVE timeout details: " + str(details)) + try: + + utils.destroyWampSocket() + + except Exception as e: + LOG.warning("Iotronic RPC-ALIVE timeout error: " + str(e)) +""" class NginxError(Exception): @@ -67,15 +96,31 @@ class timeout(object): class timeoutRPC(object): - def __init__(self, seconds=1, error_message='Timeout', action=None): + def __init__(self, seconds=1, error_message='Timeout-RPC', action=None): + self.seconds = seconds + self.error_message = error_message + self.action = action + + def handle_timeout(self, signum, frame): + manageTimeout(self.error_message, self.action) + + def __enter__(self): + signal.signal(signal.SIGALRM, self.handle_timeout) + signal.alarm(self.seconds) + + def __exit__(self, type, value, traceback): + signal.alarm(0) + + +class timeoutALIVE(object): + + def __init__(self, seconds=1, error_message='Timeout-Alive', action=None): self.seconds = seconds self.error_message = error_message self.action = action def handle_timeout(self, signum, frame): manageTimeout(self.error_message, self.action) - # LOG.warning("RPC timeout: " + str(self.error_message)) - # os._exit(1) def __enter__(self): signal.signal(signal.SIGALRM, self.handle_timeout) diff --git a/iotronic_lightningrod/common/utils.py b/iotronic_lightningrod/common/utils.py new file mode 100644 index 0000000..c2ce12a --- /dev/null +++ b/iotronic_lightningrod/common/utils.py @@ -0,0 +1,85 @@ +# Copyright 2018 MDSLAB - University of Messina +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +__author__ = "Nicola Peditto " + + +from oslo_log import log as logging +LOG = logging.getLogger(__name__) + + +import os +import pkg_resources +import psutil +import subprocess +import sys + + +def LR_restart(): + try: + LOG.warning("Lightning-rod RESTARTING...") + python = sys.executable + os.execl(python, python, *sys.argv) + except Exception as err: + LOG.error("Lightning-rod restarting error" + str(err)) + + +def checkIotronicConf(lr_CONF): + + try: + if(lr_CONF.log_file == None): + LOG.warning("'log_file' is not specified!") + return False + else: + print("View logs in " + lr_CONF.log_file) + return True + except Exception as err: + print(err) + return False + + +def destroyWampSocket(): + + LR_PID = os.getpid() + + try: + process = subprocess.Popen( + ["gdb", "-p", str(LR_PID)], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE + ) + + proc = psutil.Process() + print("WAMP RECOVERY: " + str(proc.connections()[0])) + + ws_fd = proc.connections()[0].fd + first = b"call shutdown(" + fd = str(ws_fd).encode('ascii') + last = b"u,0)\nquit\ny" + commands = b"%s%s%s" % (first, fd, last) + process.communicate(input=commands)[0] + + msg = "Websocket-Zombie closed! Restoring..." + LOG.warning(msg) + print(msg) + + except Exception as e: + LOG.warning("RPC-ALIVE - destroyWampSocket error: " + str(e)) + + +def get_version(package): + package = package.lower() + return next((p.version for p in pkg_resources.working_set if + p.project_name.lower() == package), "No version") diff --git a/iotronic_lightningrod/lightningrod.py b/iotronic_lightningrod/lightningrod.py index a39b8a3..43ef71e 100644 --- a/iotronic_lightningrod/lightningrod.py +++ b/iotronic_lightningrod/lightningrod.py @@ -15,28 +15,33 @@ __author__ = "Nicola Peditto " # Autobahn imports -import asyncio from autobahn.asyncio.component import Component from autobahn.wamp import exception -import txaio # OSLO imports from oslo_config import cfg from oslo_log import log as logging # MODULES imports +import asyncio import inspect import os import pkg_resources +import psutil import signal import ssl from stevedore import extension import sys +import txaio +from pip._vendor import pkg_resources # IoTronic imports from iotronic_lightningrod.Board import Board +from iotronic_lightningrod.common.exception import timeoutALIVE from iotronic_lightningrod.common.exception import timeoutRPC +from iotronic_lightningrod.common import utils +from iotronic_lightningrod.common.utils import get_version import iotronic_lightningrod.wampmessage as WM @@ -52,6 +57,12 @@ lr_opts = [ default=True, help=('Flag for skipping the verification of the server cert ' '(for the auto-signed ones)')), + cfg.IntOpt('alive_timer', + default=600, + help=('Wamp websocket check time')), + cfg.IntOpt('rpc_alive_timer', + default=3, + help=('RPC alive response time threshold')), ] proxy_opts = [ @@ -74,6 +85,7 @@ reconnection = False RPC = {} RPC_devices = {} RPC_proxies = {} +zombie_alert = True # ASYNCIO loop = None @@ -86,35 +98,581 @@ global MODULES MODULES = {} -def moduleReloadInfo(session): - """This function is used in the reconnection stage to register +class LightningRod(object): - again the RPCs of each module and for device. + def __init__(self): - :param session: WAMP session object. + LogoLR() + + LOG.info(' - version: ' + + str(get_version("iotronic-lightningrod"))) + LOG.info(' - PID: ' + str(os.getpid())) + + LOG.info("LR available modules: ") + for ep in pkg_resources.iter_entry_points(group='s4t.modules'): + LOG.info(" - " + str(ep)) + + logging.register_options(CONF) + DOMAIN = "s4t-lightning-rod" + CONF(project='iotronic') + logging.setup(CONF, DOMAIN) + + if (utils.checkIotronicConf(CONF)): + + if CONF.debug: + txaio.start_logging(level="debug") + + signal.signal(signal.SIGINT, self.stop_handler) + + LogoLR() + + LOG.info('Lightning-rod: ') + LOG.info(' - version: ' + + str(get_version("iotronic-lightningrod"))) + LOG.info(' - PID: ' + str(os.getpid())) + LOG.info(' - Logs: ' + CONF.log_file) + LOG.info(" - Home: " + CONF.lightningrod_home) + LOG.info(" - Alive Check timer: " + str(CONF.alive_timer) + + " seconds") + LOG.info(" - RPC-Alive Check timer: " + str(CONF.rpc_alive_timer) + + " seconds") + + global board + board = Board() + + self.w = WampManager(board.wamp_config) + + self.w.start() + + else: + Bye() + + def stop_handler(self, signum, frame): + + try: + # No zombie alert activation + zombie_alert = False + LOG.info("LR is shutting down...") + self.w.stop() + Bye() + except Exception as e: + LOG.error("Error closing LR") + + +class WampManager(object): + """WAMP Manager: through this LR manages the connection to Crossbar server. """ - LOG.info("\n\nModules reloading after WAMP recovery...\n\n") + def __init__(self, wamp_conf): + + # wampConnect configures and manages the connection to Crossbar server. + wampConnect(wamp_conf) + + def start(self): + LOG.info(" - starting Lightning-rod WAMP server...") + + global loop + loop = asyncio.get_event_loop() + component.start(loop) + loop.run_forever() + + def stop(self): + LOG.info("Stopping WAMP agent server...") + # Canceling pending tasks and stopping the loop + asyncio.gather(*asyncio.Task.all_tasks()).cancel() + LOG.info("WAMP server stopped!") + + +async def wamp_checks(session): + + while (True): + + try: + + # LOG.debug("ALIVE sending...") + + with timeoutALIVE(seconds=CONF.rpc_alive_timer, action="ws_alive"): + res = await session.call( + str(board.agent) + u'.stack4things.alive' + # board_uuid=board.uuid, + # board_name=board.name + ) + + LOG.debug("WampCheck attempt " + str(res)) + + except exception.ApplicationError as e: + LOG.error(" - Iotronic Connection RPC error: " + str(e)) + # Iotronic is offline the board can not call + # the "stack4things.alive" RPC. + # The board will disconnect from WAMP agent and retry later. + global reconnection + reconnection = True + utils.destroyWampSocket() + + try: + await asyncio.sleep(CONF.alive_timer) + except Exception as e: + LOG.warning(" - asyncio alert: " + str(e)) + + +async def IotronicLogin(board, session, details): + """Function called to connect the board to Iotronic. + + The board: + 1. logs in to Iotronic + 2. loads the modules + + :param board: + :param session: + :param details: + + """ + + LOG.info("IoTronic Authentication:") + + global reconnection try: - # Call module restore procedures and - # register RPCs for each Lightning-rod module - for mod_name in MODULES: - LOG.info("- Registering RPCs for module " + str(mod_name)) - moduleWampRegister(session, RPC[mod_name]) + rpc = str(board.agent) + u'.stack4things.connection' - LOG.info("- Restoring module " + str(mod_name)) - MODULES[mod_name].restore() + with timeoutRPC(seconds=5, action=rpc): + res = await session.call( + rpc, + uuid=board.uuid, + session=details.session + ) - # Register RPCs for the device - for dev in RPC_devices: - LOG.info("- Registering RPCs for device " + str(dev)) - moduleWampRegister(session, RPC_devices[dev]) + w_msg = WM.deserialize(res) + + if w_msg.result == WM.SUCCESS: + + LOG.info(" - Access granted to Iotronic.") + + # WS ALIVE + asyncio.run_coroutine_threadsafe(wamp_checks(session), loop) + + # LOADING BOARD MODULES + try: + + modulesLoader(session) + + except Exception as e: + LOG.warning("WARNING - Could not load modules: " + str(e)) + utils.LR_restart() + + # Reset flag to False + # reconnection = False + + else: + LOG.error(" - Access denied to Iotronic.") + Bye() + + except exception.ApplicationError as e: + LOG.error(" - Iotronic Connection RPC error: " + str(e)) + # Iotronic is offline the board can not call + # the "stack4things.connection" RPC. + # The board will disconnect from WAMP agent and retry later. + reconnection = True + + # We restart Lightning-rod if RPC 'stack4things.connection' is not + # available, this means Wagent is unreachable + utils.LR_restart() + + except Exception as e: + LOG.warning("Iotronic board connection error: " + str(e)) + + +def wampConnect(wamp_conf): + """WAMP connection procedures. + + :param wamp_conf: WAMP configuration from settings.json file + + """ + + LOG.info("WAMP connection precedures:") + + try: + + LOG.info( + "WAMP status @ boot:" + + "\n- board = " + str(board.status) + + "\n- reconnection = " + str(reconnection) + + "\n- connected = " + str(connected) + ) + + wamp_transport = wamp_conf['url'] + wurl_list = wamp_transport.split(':') + is_wss = False + + if wurl_list[0] == "wss": + is_wss = True + whost = wurl_list[1].replace('/', '') + wport = int(wurl_list[2].replace('/', '')) + + if is_wss and CONF.skip_cert_verify: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + wamp_transport = [ + { + "url": wamp_transport, + "serializers": ["json"], + "endpoint": { + "type": "tcp", + "host": whost, + "port": wport, + "tls": ctx + }, + }, + ] + + # LR creates the Autobahn Asyncio Component that points to the + # WAMP Agent (main/registration agent) + global component + component = Component( + transports=wamp_transport, + realm=wamp_conf['realm'] + ) + + # To manage the registration stage: we got the info for the main + # WAMP agent and LR is going to connect to it starting the Component + # with the new WAMP configuration. + if connected == False and board.status == "registered" \ + and reconnection == False: + component.start(loop) + + @component.on_join + async def join(session, details): + """Execute the following procedures when the board connects + to Crossbar. + + :param details: WAMP session details + + """ + + print("WAMP SOCKET: " + str(psutil.Process().connections()[0])) + + global connected + connected = True + + # LIGHTNING-ROD STATES: + # - REGISTRATION STATE: the first connection to Iotronic + # - FIRST CONNECTION: the board become operative after registration + # - LIGHTNING-ROD BOOT: the first connection to WAMP + # after Lightning-rod starting + # - WAMP RECOVERY: when the established WAMP connection fails + + global reconnection + + # reconnection flag is False when the board is: + # - LIGHTNING-ROD BOOT + # - REGISTRATION STATE + # - FIRST CONNECTION + # + # reconnection flag is True when the board is: + # - WAMP RECOVERY + + global SESSION + SESSION = session + + # LOG.debug(" - session: " + str(details)) + + board.session_id = details.session + + LOG.info(" - Joined in realm " + board.wamp_config['realm'] + ":") + LOG.info(" - WAMP Agent: " + str(board.agent)) + print(" - WAMP Agent: " + str(board.agent) + " - " + + str(wamp_conf['url'])) + LOG.info(" - Session ID: " + str(board.session_id)) + print(" - Session ID: " + str(board.session_id)) + LOG.info(" - Board status: " + str(board.status)) + + if reconnection is False: + + if board.uuid is None: + + ###################### + # REGISTRATION STATE # + ###################### + # If in the LR configuration file there is not the + # Board UUID specified it means the board is a new one + # and it has to call IoTronic in order to complete + # the registration. + + try: + + LOG.info(" - Board needs to be registered.") + + rpc = u'stack4things.register' + + with timeoutRPC(seconds=5, action=rpc): + res = await session.call( + rpc, + code=board.code, + session=board.session_id + ) + + w_msg = WM.deserialize(res) + + # LOG.info(" - Board registration result: \n" + + # json.loads(w_msg.message, indent=4)) + + if w_msg.result == WM.SUCCESS: + + LOG.info("Registration authorized by IoTronic:\n" + + str(w_msg.message)) + + # the 'message' field contains + # the board configuration to load + board.setConf(w_msg.message) + + # We need to disconnect the client from the + # registration-agent in order to reconnect + # to the WAMP agent assigned by Iotronic + # at the provisioning stage + LOG.info( + "\n\nDisconnecting from Registration Agent " + "to load new settings...\n\n") + + # We restart Lightning-rod if RPC + # 'stack4things.connection' is not available, + # this means Wagent is unreachable + utils.LR_restart() + + else: + LOG.error("Registration denied by Iotronic: " + + str(w_msg.message)) + Bye() + + except exception.ApplicationError as e: + LOG.error("IoTronic registration error: " + str(e)) + # Iotronic is offline the board can not call the + # "stack4things.connection" RPC. The board will + # disconnect from WAMP agent and retry later. + + # TO ACTIVE BOOT CONNECTION RECOVERY MODE + reconnection = True + + # We restart Lightning-rod if RPC + # 'stack4things.connection' is not available, + # this means Wagent is unreachable + utils.LR_restart() + + except Exception as e: + LOG.warning( + " - Board registration call error: " + str(e)) + Bye() + + else: + + if board.status == "registered": + #################### + # FIRST CONNECTION # + #################### + + # In this case we manage the first connection + # after the registration stage: + # Lightining-rod sets its status to "operative" + # completing the provisioning and configuration stage. + LOG.info("\n\n\nBoard is becoming operative...\n\n\n") + board.updateStatus("operative") + board.loadSettings() + LOG.info("WAMP status @ first connection:" + + "\n- board = " + str(board.status) + + "\n- reconnection = " + str(reconnection) + + "\n- connected = " + str(connected) + ) + await IotronicLogin(board, session, details) + + elif board.status == "operative": + ###################### + # LIGHTNING-ROD BOOT # + ###################### + + # After join to WAMP agent, Lightning-rod will: + # - authenticate to Iotronic + # - load the enabled modules + + # The board will keep at this stage until + # it will succeed to connect to Iotronic. + await IotronicLogin(board, session, details) + + else: + LOG.error("Wrong board status '" + board.status + "'.") + Bye() + + else: + + ################# + # WAMP RECOVERY # + ################# + + LOG.info("IoTronic connection recovery:") + + try: + + rpc = str(board.agent) + u'.stack4things.connection' + + with timeoutRPC(seconds=5, action=rpc): + res = await session.call( + rpc, + uuid=board.uuid, + session=details.session + ) + + w_msg = WM.deserialize(res) + + if w_msg.result == WM.SUCCESS: + + LOG.info(" - Access granted to Iotronic (recovery).") + + # LOADING BOARD MODULES + # If the board is in WAMP connection recovery state + # we need to register again the RPCs of each module + try: + + moduleReloadInfo(session) + + # Reset flag to False + reconnection = False + + LOG.info("WAMP Session Recovered!") + + LOG.info("\n\nListening...\n\n") + + # WS ALIVE + asyncio.run_coroutine_threadsafe( + wamp_checks(session), + loop + ) + + except Exception as e: + LOG.warning( + "WARNING - Could not reload modules: " + + str(e)) + Bye() + + else: + LOG.error("Access to IoTronic denied: " + + str(w_msg.message)) + Bye() + + except exception.ApplicationError as e: + LOG.error("IoTronic connection error:\n" + str(e)) + # Iotronic is offline the board can not call + # the "stack4things.connection" RPC. + # The board will disconnect from WAMP agent and retry later + + # TO ACTIVE WAMP CONNECTION RECOVERY MODE + reconnection = False + + # We restart Lightning-rod if RPC 'stack4things.connection' + # is not available, this means Wagent is unreachable + utils.LR_restart() + + except Exception as e: + LOG.warning("Board connection error after WAMP recovery: " + + str(e)) + Bye() + + @component.on_leave + async def onLeave(session, details): + LOG.warning("WAMP Session Left: reason = " + str(details.reason)) + + @component.on_disconnect + async def onDisconnect(session, was_clean): + """Procedure triggered on WAMP connection lost. + :param session: + :param was_clean: + :return: + """ + + LOG.warning('WAMP Transport Left: was_clean = ' + str(was_clean)) + global connected + connected = False + + global reconnection + + LOG.info( + "WAMP status on disconnect:" + + "\n- board = " + str(board.status) + + "\n- reconnection = " + str(reconnection) + + "\n- connected = " + str(connected) + ) + + if board.status == "operative" and reconnection is False: + + ################# + # WAMP RECOVERY # + ################# + # we need to recover wamp session and + # we set reconnection flag to True in order to activate + # the module-RPCs registration procedure for each module + + reconnection = True + + # LR needs to reconncet to WAMP + if not connected: + LOG.warning(".............WAMP DISCONNECTION.............") + LOG.info( + "WAMP status on disconnect:" + + "\n- board = " + str(board.status) + + "\n- reconnection = " + str(reconnection) + + "\n- connected = " + str(connected) + ) + + # component.start(loop) + + elif board.status == "operative" and reconnection is True: + + ###################### + # LIGHTNING-ROD BOOT # + ###################### + # At this stage if the reconnection flag was set to True + # it means that we forced the reconnection procedure + # because of the board is not able to connect to IoTronic + # calling "stack4things.connection" RPC... + # it means IoTronic is offline! + + # We need to reset the reconnection flag to False in order to + # do not enter in module-RPCs registration procedure... + # At this stage the board tries to reconnect to + # IoTronic until it will come online again. + reconnection = False + + # LR needs to reconncet to WAMP + LOG.warning(".............WAMP DISCONNECTION.............") + LOG.info("WAMP status on disconnect:" + + "\n- board = " + str(board.status) + + "\n- reconnection = " + str(reconnection) + + "\n- connected = " + str(connected) + ) + + # component.start(loop) + + elif (board.status == "registered"): + ###################### + # REGISTRATION STATE # + ###################### + + # LR was disconnected from Registration Agent + # in order to connect it to the assigned WAMP Agent. + + LOG.debug("\n\nReconnecting after registration...\n\n") + + # LR load the new configuration and gets the new WAMP Agent + board.loadSettings() + + # LR has to connect to the assigned WAMP Agent + wampConnect(board.wamp_config) + + else: + LOG.error("Reconnection wrong status!") except Exception as err: - LOG.warning("Board modules reloading error: " + str(err)) + LOG.error(" - WAMP connection error: " + str(err)) Bye() @@ -137,7 +695,9 @@ def moduleWampRegister(session, meth_list): # "restore" methods if (meth[0] != "__init__") & (meth[0] != "finalize") \ & (meth[0] != "restore"): - rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0] + + rpc_addr = u'iotronic.' + str(board.session_id) + '.' + \ + board.uuid + '.' + meth[0] if not meth[0].startswith('_'): session.register(meth[1], rpc_addr) @@ -210,472 +770,36 @@ def modulesLoader(session): LOG.info("\n\nListening...") -async def IotronicLogin(board, session, details): - """Function called to connect the board to Iotronic. +def moduleReloadInfo(session): + """This function is used in the reconnection stage to register - The board: - 1. logs in to Iotronic - 2. loads the modules + again the RPCs of each module and for device. - :param board: - :param session: - :param details: + :param session: WAMP session object. """ - LOG.info("IoTronic Authentication:") - - global reconnection + LOG.info("\n\nModules reloading after WAMP recovery...\n\n") try: - rpc = str(board.agent) + u'.stack4things.connection' - - with timeoutRPC(seconds=3, action=rpc): - res = await session.call( - rpc, - uuid=board.uuid, - session=details.session - ) - - w_msg = WM.deserialize(res) - - if w_msg.result == WM.SUCCESS: - - LOG.info(" - Access granted to Iotronic.") - - # LOADING BOARD MODULES - try: - - modulesLoader(session) - - except Exception as e: - LOG.warning("WARNING - Could not load modules: " + str(e)) - - # Reset flag to False - # reconnection = False - - else: - LOG.error(" - Access denied to Iotronic.") - Bye() - - except exception.ApplicationError as e: - LOG.error(" - Iotronic Connection RPC error: " + str(e)) - # Iotronic is offline the board can not call - # the "stack4things.connection" RPC. - # The board will disconnect from WAMP agent and retry later. - reconnection = True - - # We stop the Component in order to trigger the onDisconnect event - component.stop() - - except Exception as e: - LOG.warning("Iotronic board connection error: " + str(e)) - - -def wampConnect(wamp_conf): - """WAMP connection procedures. - - :param wamp_conf: WAMP configuration from settings.json file - - """ - - LOG.info("WAMP connection precedures:") - - try: - - LOG.info("WAMP status @ boot:" + - "\n- board = " + str(board.status) + - "\n- reconnection = " + str(reconnection) + - "\n- connected = " + str(connected) - ) - - wamp_transport = wamp_conf['url'] - wurl_list = wamp_transport.split(':') - is_wss = False - - if wurl_list[0] == "wss": - is_wss = True - whost = wurl_list[1].replace('/', '') - wport = int(wurl_list[2].replace('/', '')) - - if is_wss and CONF.skip_cert_verify: - ctx = ssl.create_default_context() - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - wamp_transport = [ - { - "url": wamp_transport, - "endpoint": { - "type": "tcp", - "host": whost, - "port": wport, - "tls": ctx - }, - }, - ] - - # LR creates the Autobahn Asyncio Component that points to the - # WAMP Agent (main/registration agent) - global component - component = Component( - transports=wamp_transport, - realm=wamp_conf['realm'] - ) - - # To manage the registration stage: we got the info for the main - # WAMP agent and LR is going to connect to it starting the Component - # with the new WAMP configuration. - if connected == False and board.status == "registered" \ - and reconnection == False: - component.start(loop) - - @component.on_join - async def join(session, details): - """Execute the following procedures when the board connects - to Crossbar. - - :param details: WAMP session details - - """ - - global connected - connected = True - - # LIGHTNING-ROD STATES: - # - REGISTRATION STATE: the first connection to Iotronic - # - FIRST CONNECTION: the board become operative after registration - # - LIGHTNING-ROD BOOT: the first connection to WAMP - # after Lightning-rod starting - # - WAMP RECOVERY: when the established WAMP connection fails - - global reconnection - - # reconnection flag is False when the board is: - # - LIGHTNING-ROD BOOT - # - REGISTRATION STATE - # - FIRST CONNECTION - # - # reconnection flag is True when the board is: - # - WAMP RECOVERY - - global SESSION - SESSION = session - - # LOG.debug(" - session: " + str(details)) - - board.session_id = details.session - - LOG.info(" - Joined in realm " + board.wamp_config['realm'] + ":") - LOG.info(" - WAMP Agent: " + str(board.agent)) - LOG.info(" - Session ID: " + str(board.session_id)) - LOG.info(" - Board status: " + str(board.status)) - - if reconnection is False: - - if board.uuid is None: - - ###################### - # REGISTRATION STATE # - ###################### - # If in the LR configuration file there is not the - # Board UUID specified it means the board is a new one - # and it has to call IoTronic in order to complete - # the registration. - - try: - - LOG.info(" - Board needs to be registered.") - - rpc = u'stack4things.register' - - with timeoutRPC(seconds=3, action=rpc): - res = await session.call( - rpc, - code=board.code, - session=board.session_id - ) - - w_msg = WM.deserialize(res) - - # LOG.info(" - Board registration result: \n" + - # json.loads(w_msg.message, indent=4)) - - if w_msg.result == WM.SUCCESS: - - LOG.info("Registration authorized by IoTronic:\n" - + str(w_msg.message)) - - # the 'message' field contains - # the board configuration to load - board.setConf(w_msg.message) - - # We need to disconnect the client from the - # registration-agent in order to reconnect - # to the WAMP agent assigned by Iotronic - # at the provisioning stage - LOG.info( - "\n\nDisconnecting from Registration Agent " - "to load new settings...\n\n") - - # We stop the Component in order to trigger the - # onDisconnect event - component.stop() - - else: - LOG.error("Registration denied by Iotronic: " - + str(w_msg.message)) - Bye() - - except exception.ApplicationError as e: - LOG.error("IoTronic registration error: " + str(e)) - # Iotronic is offline the board can not call the - # "stack4things.connection" RPC. The board will - # disconnect from WAMP agent and retry later. - - # TO ACTIVE BOOT CONNECTION RECOVERY MODE - reconnection = True - # We stop the Component in order to trigger the - # onDisconnect event - component.stop() - - except Exception as e: - LOG.warning( - " - Board registration call error: " + str(e)) - Bye() - - else: - - if board.status == "registered": - #################### - # FIRST CONNECTION # - #################### - - # In this case we manage the first connection - # after the registration stage: - # Lightining-rod sets its status to "operative" - # completing the provisioning and configuration stage. - LOG.info("\n\n\nBoard is becoming operative...\n\n\n") - board.updateStatus("operative") - board.loadSettings() - LOG.info("WAMP status @ first connection:" + - "\n- board = " + str(board.status) + - "\n- reconnection = " + str(reconnection) + - "\n- connected = " + str(connected) - ) - await IotronicLogin(board, session, details) - - elif board.status == "operative": - ###################### - # LIGHTNING-ROD BOOT # - ###################### - - # After join to WAMP agent, Lightning-rod will: - # - authenticate to Iotronic - # - load the enabled modules - - # The board will keep at this stage until - # it will succeed to connect to Iotronic. - await IotronicLogin(board, session, details) - - else: - LOG.error("Wrong board status '" + board.status + "'.") - Bye() - - else: - - ################# - # WAMP RECOVERY # - ################# - - LOG.info("IoTronic connection recovery:") - - try: - - rpc = str(board.agent) + u'.stack4things.connection' - - with timeoutRPC(seconds=3, action=rpc): - res = await session.call( - rpc, - uuid=board.uuid, - session=details.session - ) - - w_msg = WM.deserialize(res) - - if w_msg.result == WM.SUCCESS: - - LOG.info(" - Access granted to Iotronic.") - - # LOADING BOARD MODULES - # If the board is in WAMP connection recovery state - # we need to register again the RPCs of each module - try: - - moduleReloadInfo(session) - - # Reset flag to False - reconnection = False - - LOG.info("WAMP Session Recovered!") - - LOG.info("\n\nListening...\n\n") - - except Exception as e: - LOG.warning( - "WARNING - Could not reload modules: " - + str(e)) - Bye() - - else: - LOG.error("Access to IoTronic denied: " - + str(w_msg.message)) - Bye() - - except exception.ApplicationError as e: - LOG.error("IoTronic connection error:\n" + str(e)) - # Iotronic is offline the board can not call - # the "stack4things.connection" RPC. - # The board will disconnect from WAMP agent and retry later - - # TO ACTIVE WAMP CONNECTION RECOVERY MODE - reconnection = False - # We stop the Component in order to trigger the - # onDisconnect event - component.stop() - - except Exception as e: - LOG.warning("Board connection error after WAMP recovery: " - + str(e)) - Bye() - - @component.on_leave - async def onLeave(session, details): - LOG.warning('WAMP Session Left: ' + str(details)) - - @component.on_disconnect - async def onDisconnect(session, was_clean): - """Procedure triggered on WAMP connection lost, for istance - when we call component.stop(). - - :param connector: WAMP connector object - :param reason: WAMP connection failure reason - - """ - - LOG.warning('WAMP Transport Left: was_clean = ' + str(was_clean)) - global connected - connected = False - - global reconnection - - LOG.info("WAMP status on disconnect:" + - "\n- board = " + str(board.status) + - "\n- reconnection = " + str(reconnection) + - "\n- connected = " + str(connected) - ) - - if board.status == "operative" and reconnection is False: - - ################# - # WAMP RECOVERY # - ################# - # we need to recover wamp session and - # we set reconnection flag to True in order to activate - # the RPCs module registration procedure for each module - - reconnection = True - - # LR needs to reconncet to WAMP - if not connected: - component.start(loop) - - elif board.status == "operative" and reconnection is True: - - ###################### - # LIGHTNING-ROD BOOT # - ###################### - # At this stage if the reconnection flag was set to True - # it means that we forced the reconnection procedure - # because of the board is not able to connect to IoTronic - # calling "stack4things.connection" RPC... - # it means IoTronic is offline! - - # We need to reset the recconnection flag to False in order to - # do not enter in RPCs module registration procedure... - # At this stage the board tries to reconnect to - # IoTronic until it will come online again. - reconnection = False - - # LR needs to reconncet to WAMP - if not connected: - component.start(loop) - - elif (board.status == "registered"): - ###################### - # REGISTRATION STATE # - ###################### - - # LR was disconnected from Registration Agent - # in order to connect it to the assigned WAMP Agent. - - LOG.debug("\n\nReconnecting after registration...\n\n") - - # LR load the new configuration and gets the new WAMP Agent - board.loadSettings() - - # LR has to connect to the assigned WAMP Agent - wampConnect(board.wamp_config) - - else: - LOG.error("Reconnection wrong status!") + # Call module restore procedures and + # register RPCs for each Lightning-rod module + for mod_name in MODULES: + LOG.info("- Registering RPCs for module " + str(mod_name)) + moduleWampRegister(session, RPC[mod_name]) + + LOG.info("- Restoring module " + str(mod_name)) + MODULES[mod_name].restore() + + # Register RPCs for the device + for dev in RPC_devices: + LOG.info("- Registering RPCs for device " + str(dev)) + moduleWampRegister(session, RPC_devices[dev]) except Exception as err: - LOG.error(" - WAMP connection error: " + str(err)) - Bye() - - -class WampManager(object): - """WAMP Manager: through this LR manages the connection to Crossbar server. - - """ - - def __init__(self, wamp_conf): - - # wampConnect configures and manages the connection to Crossbar server. - wampConnect(wamp_conf) - - def start(self): - LOG.info(" - starting Lightning-rod WAMP server...") - - global loop - loop = asyncio.get_event_loop() - component.start(loop) - loop.run_forever() - - """ - # TEMPORARY ------------------------------------------------------ - from subprocess import call - LOG.debug("Unmounting...") - - try: - mountPoint = "/opt/BBB" - # errorCode = self.libc.umount(mountPoint, None) - errorCode = call(["umount", "-l", mountPoint]) - - LOG.debug("Unmount " + mountPoint + " result: " + str(errorCode)) - - except Exception as msg: - result = "Unmounting error:", msg - LOG.debug(result) - # ------------------------------------------------------------------ - """ - - def stop(self): - LOG.info("Stopping WAMP agent server...") - # Canceling pending tasks and stopping the loop - asyncio.gather(*asyncio.Task.all_tasks()).cancel() - LOG.info("WAMP server stopped!") + LOG.warning("Board modules reloading error: " + str(err)) + utils.LR_restart() def Bye(): @@ -689,64 +813,5 @@ def LogoLR(): LOG.info('##############################') -def checkIotronicConf(lr_CONF): - try: - if(lr_CONF.log_file == None): - LOG.warning("'log_file' is not specified!") - return False - else: - print("View logs in " + lr_CONF.log_file) - return True - except Exception as err: - print(err) - return False - - -class LightningRod(object): - - def __init__(self): - - LogoLR() - - LOG.info("LR available modules: ") - for ep in pkg_resources.iter_entry_points(group='s4t.modules'): - LOG.info(" - " + str(ep)) - - logging.register_options(CONF) - DOMAIN = "s4t-lightning-rod" - CONF(project='iotronic') - logging.setup(CONF, DOMAIN) - - if (checkIotronicConf(CONF)): - - if CONF.debug: - txaio.start_logging(level="debug") - - signal.signal(signal.SIGINT, self.stop_handler) - - LogoLR() - - global board - board = Board() - - LOG.info('Lightning-rod configurations:') - LOG.info(' - Logs: ' + CONF.log_file) - LOG.info(" - Current time: " + board.getTimestamp()) - LOG.info(" - Home: " + CONF.lightningrod_home) - LOG.info(" - WebServices Proxy: " + CONF.proxy) - - self.w = WampManager(board.wamp_config) - - self.w.start() - - else: - Bye() - - def stop_handler(self, signum, frame): - LOG.info("LR is shutting down...") - self.w.stop() - Bye() - - def main(): LightningRod() diff --git a/iotronic_lightningrod/modules/device_manager.py b/iotronic_lightningrod/modules/device_manager.py index e04d0f8..86f452f 100644 --- a/iotronic_lightningrod/modules/device_manager.py +++ b/iotronic_lightningrod/modules/device_manager.py @@ -19,6 +19,9 @@ import importlib as imp import inspect import os import subprocess +import sys +import threading +import time from datetime import datetime @@ -27,6 +30,7 @@ from iotronic_lightningrod.lightningrod import RPC_devices from iotronic_lightningrod.lightningrod import SESSION from iotronic_lightningrod.modules import Module from iotronic_lightningrod.modules import utils +import iotronic_lightningrod.wampmessage as WM from oslo_log import log as logging @@ -85,7 +89,10 @@ class DeviceManager(Module.Module): if (meth[0] != "__init__") & (meth[0] != "finalize"): # LOG.info(" - " + str(meth[0])) - rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0] + # rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0] + rpc_addr = u'iotronic.' + str(board.session_id) + '.' + \ + board.uuid + '.' + meth[0] + # LOG.debug(" --> " + str(rpc_addr)) SESSION.register(meth[1], rpc_addr) @@ -94,23 +101,50 @@ class DeviceManager(Module.Module): async def DevicePing(self): rpc_name = utils.getFuncName() LOG.info("RPC " + rpc_name + " CALLED") - return datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f') + + message = datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f') + w_msg = WM.WampSuccess(message) + + return w_msg.serialize() async def DeviceReboot(self): rpc_name = utils.getFuncName() LOG.info("RPC " + rpc_name + " CALLED") - command = "reboot" - subprocess.call(command, shell=True) + def delayBoardReboot(): + time.sleep(3) + subprocess.call("reboot", shell=True) - return datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f') + threading.Thread(target=delayBoardReboot).start() + + message = "Rebooting board in few seconds @" + \ + str(datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')) + w_msg = WM.WampSuccess(message) + + return w_msg.serialize() + + async def DeviceRestartLR(self): + rpc_name = utils.getFuncName() + LOG.info("RPC " + rpc_name + " CALLED") + + def delayLRrestarting(): + time.sleep(2) + python = sys.executable + os.execl(python, python, *sys.argv) + + threading.Thread(target=delayLRrestarting).start() + + message = "Restarting LR in 5 seconds (" + \ + datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f') + ")..." + w_msg = WM.WampSuccess(message) + + return w_msg.serialize() async def DeviceHostname(self): rpc_name = utils.getFuncName() LOG.info("RPC " + rpc_name + " CALLED") command = "hostname" - # subprocess.call(command, shell=True) out = subprocess.Popen( command, @@ -119,23 +153,28 @@ class DeviceManager(Module.Module): ) output = out.communicate()[0].decode('utf-8').strip() - print(output) - return str(output) + "@" + \ + message = str(output) + "@" + \ str(datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')) + w_msg = WM.WampSuccess(message) - """ - async def DeviceWampDisconnect(self): + return w_msg.serialize() + + async def DeviceNetConfig(self): rpc_name = utils.getFuncName() LOG.info("RPC " + rpc_name + " CALLED") - import threading, time + command = "ifconfig" - def delayDisconnection(): - time.sleep(5) - SESSION.disconnect() + out = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE + ) - threading.Thread(target=delayDisconnection).start() + output = out.communicate()[0].decode('utf-8').strip() - return "Device disconnection in 5 seconds..." - """ + message = str(output) + w_msg = WM.WampSuccess(message) + + return w_msg.serialize() diff --git a/iotronic_lightningrod/modules/network_manager.py b/iotronic_lightningrod/modules/network_manager.py index 81b4015..c2c59d9 100644 --- a/iotronic_lightningrod/modules/network_manager.py +++ b/iotronic_lightningrod/modules/network_manager.py @@ -42,19 +42,6 @@ class NetworkManager(Module.Module): def restore(self): pass - async def test_function(self): - import random - s = random.uniform(0.5, 1.5) - await asyncio.sleep(s) - result = "DEVICE test result: TEST!" - LOG.info(result) - return result - - async def add(self, x, y): - c = x + y - LOG.info("DEVICE add result: " + str(c)) - return c - async def Create_VIF(self, r_tcp_port): LOG.info("Creation of the VIF ") diff --git a/iotronic_lightningrod/modules/plugin_manager.py b/iotronic_lightningrod/modules/plugin_manager.py index 2c4f02f..25392fa 100644 --- a/iotronic_lightningrod/modules/plugin_manager.py +++ b/iotronic_lightningrod/modules/plugin_manager.py @@ -26,8 +26,8 @@ import time from iotronic_lightningrod.modules import Module +from iotronic_lightningrod.modules.plugins import PluginSerializer from iotronic_lightningrod.modules import utils -from iotronic_lightningrod.plugins import PluginSerializer import iotronic_lightningrod.wampmessage as WM @@ -166,7 +166,6 @@ class PluginManager(Module.Module): if os.path.exists(plugin_filename): - # task = imp.load_source("plugin", plugin_filename) task = imp.machinery.SourceFileLoader( "plugin", plugin_filename diff --git a/iotronic_lightningrod/plugins/Plugin.py b/iotronic_lightningrod/modules/plugins/Plugin.py similarity index 100% rename from iotronic_lightningrod/plugins/Plugin.py rename to iotronic_lightningrod/modules/plugins/Plugin.py diff --git a/iotronic_lightningrod/plugins/PluginSerializer.py b/iotronic_lightningrod/modules/plugins/PluginSerializer.py similarity index 100% rename from iotronic_lightningrod/plugins/PluginSerializer.py rename to iotronic_lightningrod/modules/plugins/PluginSerializer.py diff --git a/iotronic_lightningrod/proxies/__init__.py b/iotronic_lightningrod/modules/plugins/__init__.py similarity index 100% rename from iotronic_lightningrod/proxies/__init__.py rename to iotronic_lightningrod/modules/plugins/__init__.py diff --git a/iotronic_lightningrod/plugins/pluginApis.py b/iotronic_lightningrod/modules/plugins/pluginApis.py similarity index 100% rename from iotronic_lightningrod/plugins/pluginApis.py rename to iotronic_lightningrod/modules/plugins/pluginApis.py diff --git a/iotronic_lightningrod/plugins/plugins_examples/arduino_yun/demo.py b/iotronic_lightningrod/modules/plugins/plugins_examples/arduino_yun/demo.py similarity index 100% rename from iotronic_lightningrod/plugins/plugins_examples/arduino_yun/demo.py rename to iotronic_lightningrod/modules/plugins/plugins_examples/arduino_yun/demo.py diff --git a/iotronic_lightningrod/plugins/plugins_examples/arduino_yun/led.json b/iotronic_lightningrod/modules/plugins/plugins_examples/arduino_yun/led.json similarity index 100% rename from iotronic_lightningrod/plugins/plugins_examples/arduino_yun/led.json rename to iotronic_lightningrod/modules/plugins/plugins_examples/arduino_yun/led.json diff --git a/iotronic_lightningrod/plugins/plugins_examples/arduino_yun/led.py b/iotronic_lightningrod/modules/plugins/plugins_examples/arduino_yun/led.py similarity index 100% rename from iotronic_lightningrod/plugins/plugins_examples/arduino_yun/led.py rename to iotronic_lightningrod/modules/plugins/plugins_examples/arduino_yun/led.py diff --git a/iotronic_lightningrod/plugins/plugins_examples/generics/echo.json b/iotronic_lightningrod/modules/plugins/plugins_examples/generics/echo.json similarity index 100% rename from iotronic_lightningrod/plugins/plugins_examples/generics/echo.json rename to iotronic_lightningrod/modules/plugins/plugins_examples/generics/echo.json diff --git a/iotronic_lightningrod/plugins/plugins_examples/generics/echo.py b/iotronic_lightningrod/modules/plugins/plugins_examples/generics/echo.py similarity index 86% rename from iotronic_lightningrod/plugins/plugins_examples/generics/echo.py rename to iotronic_lightningrod/modules/plugins/plugins_examples/generics/echo.py index 2eba760..241a932 100644 --- a/iotronic_lightningrod/plugins/plugins_examples/generics/echo.py +++ b/iotronic_lightningrod/modules/plugins/plugins_examples/generics/echo.py @@ -13,7 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. -from iotronic_lightningrod.plugins import Plugin +from iotronic_lightningrod.modules.plugins import Plugin from oslo_log import log as logging LOG = logging.getLogger(__name__) @@ -29,4 +29,4 @@ class Worker(Plugin.Plugin): def run(self): LOG.info("Input parameters: " + str(self.params['name'])) LOG.info("Plugin " + self.name + " process completed!") - self.q_result.put("ECHO RESULT: "+str(self.params['name'])) + self.q_result.put("ECHO RESULT: " + str(self.params['name'])) diff --git a/iotronic_lightningrod/plugins/plugins_examples/generics/runner.json b/iotronic_lightningrod/modules/plugins/plugins_examples/generics/runner.json similarity index 100% rename from iotronic_lightningrod/plugins/plugins_examples/generics/runner.json rename to iotronic_lightningrod/modules/plugins/plugins_examples/generics/runner.json diff --git a/iotronic_lightningrod/plugins/plugins_examples/generics/runner.py b/iotronic_lightningrod/modules/plugins/plugins_examples/generics/runner.py similarity index 89% rename from iotronic_lightningrod/plugins/plugins_examples/generics/runner.py rename to iotronic_lightningrod/modules/plugins/plugins_examples/generics/runner.py index 5e0ed0c..90a4102 100644 --- a/iotronic_lightningrod/plugins/plugins_examples/generics/runner.py +++ b/iotronic_lightningrod/modules/plugins/plugins_examples/generics/runner.py @@ -13,8 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. -from iotronic_lightningrod.plugins import Plugin -# from iotronic_lightningrod.plugins import pluginApis as API +from iotronic_lightningrod.modules.plugins import Plugin +# from iotronic_lightningrod.modules.plugins import pluginApis as API from oslo_log import log as logging LOG = logging.getLogger(__name__) diff --git a/iotronic_lightningrod/proxies/Proxy.py b/iotronic_lightningrod/modules/proxies/Proxy.py similarity index 100% rename from iotronic_lightningrod/proxies/Proxy.py rename to iotronic_lightningrod/modules/proxies/Proxy.py diff --git a/iotronic_lightningrod/modules/proxies/__init__.py b/iotronic_lightningrod/modules/proxies/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/iotronic_lightningrod/proxies/configs/default b/iotronic_lightningrod/modules/proxies/configs/default similarity index 100% rename from iotronic_lightningrod/proxies/configs/default rename to iotronic_lightningrod/modules/proxies/configs/default diff --git a/iotronic_lightningrod/proxies/configs/iotronic b/iotronic_lightningrod/modules/proxies/configs/iotronic similarity index 100% rename from iotronic_lightningrod/proxies/configs/iotronic rename to iotronic_lightningrod/modules/proxies/configs/iotronic diff --git a/iotronic_lightningrod/proxies/nginx.py b/iotronic_lightningrod/modules/proxies/nginx.py similarity index 64% rename from iotronic_lightningrod/proxies/nginx.py rename to iotronic_lightningrod/modules/proxies/nginx.py index cf3ad65..658b4d5 100644 --- a/iotronic_lightningrod/proxies/nginx.py +++ b/iotronic_lightningrod/modules/proxies/nginx.py @@ -16,7 +16,7 @@ __author__ = "Nicola Peditto " -from iotronic_lightningrod.proxies import Proxy +from iotronic_lightningrod.modules.proxies import Proxy from oslo_log import log as logging LOG = logging.getLogger(__name__) @@ -24,7 +24,6 @@ LOG = logging.getLogger(__name__) import json import os -import site import subprocess import time @@ -162,61 +161,49 @@ class ProxyManager(Proxy.Proxy): return json.dumps(nginxMsg) - def _proxyBoardDnsSetup(self, board_dns, owner_email): + def _nginx_conf_verify(self, fp): + with open(fp, "r") as text_file: + LOG.debug(text_file.read()) + + def _proxyEnableWebService(self, board_dns, owner_email): nginxMsg = {} try: - py_dist_pack = site.getsitepackages()[0] + nginx_path = "/etc/nginx/conf.d/" - iotronic_nginx_path = "/etc/nginx/conf.d/iotronic" - iotronic_nginx_default = "/etc/nginx/conf.d/iotronic/default" + nginx_board_conf_file = nginx_path + "/" + board_dns + ".conf" + nginx_board_conf = '''server {{ + listen 80; + server_name {0}; + }} + '''.format(board_dns) - if not os.path.exists(iotronic_nginx_path): - os.makedirs(iotronic_nginx_path) - - nginx_default = '''proxy_set_header Host $http_host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; - proxy_http_version 1.1; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade";''' - - with open(iotronic_nginx_default, "w") as text_file: - text_file.write("%s" % nginx_default) - - iotronic_nginx_avl_path = "/etc/nginx/sites-available/iotronic" - - string = '''server {{ - listen 80; - - server_name {0}; - include conf.d/iotronic/*; - }}'''.format(board_dns) - - with open(iotronic_nginx_avl_path, "w") as text_file: - text_file.write("%s" % string) - - os.system( - 'ln -s ' - '/etc/nginx/sites-available/iotronic ' - '/etc/nginx/sites-enabled/' - ) + with open(nginx_board_conf_file, "w") as text_file: + text_file.write("%s" % nginx_board_conf) + self._nginx_conf_verify(nginx_board_conf_file) time.sleep(3) self._proxyReload() time.sleep(3) - command = '/usr/bin/certbot -n ' \ - '--redirect --authenticator webroot ' \ - '--installer nginx -w /var/www/html/ ' \ - '--domain ' + board_dns + ' --agree-tos ' \ - '--email ' + owner_email + command = "/usr/bin/certbot -n " \ + "--redirect " \ + "--authenticator webroot " \ + "--installer nginx " \ + "-w /var/www/html/ " \ + "--domain " + board_dns + " " \ + "--agree-tos " \ + "--email " + owner_email LOG.debug(command) - call(command, shell=True) + certbot_result = call(command, shell=True) + LOG.info("CERTBOT RESULT: " + str(certbot_result)) + + nginxMsg['result'] = "SUCCESS" + nginxMsg['message'] = "Webservice module enabled." + LOG.info("--> " + nginxMsg['message']) except Exception as err: nginxMsg['log'] = "NGINX DNS setup error: " + str(err) @@ -225,99 +212,142 @@ class ProxyManager(Proxy.Proxy): return json.dumps(nginxMsg) - def _exposeWebservice(self, service_name, local_port): + def _exposeWebservice(self, board_dns, service_dns, local_port, dns_list): nginxMsg = {} try: - nginx_path = "/etc/nginx/conf.d/iotronic" + nginx_path = "/etc/nginx/conf.d" - if not os.path.exists(nginx_path): - os.makedirs(nginx_path) + service_path = nginx_path + "/" + service_dns + ".conf" + string = '''server {{ + listen 80; + server_name {0}; - fp = nginx_path + "/" + service_name + proxy_set_header Host $http_host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; - string = '''location /{0}/ {{ - proxy_pass http://localhost:{1}/; - include conf.d/iotronic/default; + location / {{ + proxy_pass http://localhost:{1}; }} - - location /{0} {{ - rewrite ^ $scheme://$http_host/{0}/ redirect; }} - '''.format(service_name, local_port) + '''.format(service_dns, local_port) - with open(fp, "w") as ws_nginx_conf: + with open(service_path, "w") as ws_nginx_conf: ws_nginx_conf.write("%s" % string) time.sleep(3) - nginxMsg['message'] = "Webservice '" + service_name + \ - "' configuration injected in NGINX." - nginxMsg['result'] = "SUCCESS" - LOG.info("--> " + nginxMsg['message']) + self._nginx_conf_verify(service_path) self._proxyReload() time.sleep(3) + command = "/usr/bin/certbot " \ + "--expand -n " \ + "--redirect " \ + "--authenticator webroot " \ + "--installer nginx -w /var/www/html/ " \ + "--domain " + str(dns_list) + + command = "/usr/bin/certbot " \ + "-n " \ + "--redirect " \ + "--authenticator webroot " \ + "--installer nginx -w /var/www/html/ " \ + "--cert-name " + str(board_dns) + " " \ + "--domain " + str(dns_list) + + LOG.debug(command) + certbot_result = call(command, shell=True) + LOG.info("CERTBOT RESULT: " + str(certbot_result)) + + LOG.info("Webservices list updated:\n" + + str(self._webserviceList())) + + nginxMsg['result'] = "SUCCESS" + nginxMsg['message'] = "Webservice '" + service_dns + \ + "' exposed in NGINX." + LOG.info(nginxMsg['message']) + except Exception as e: nginxMsg['message'] = "Error exposing Webservice '" + \ - service_name + \ + service_dns + \ "' configuration in NGINX: {}".format(e) nginxMsg['result'] = "ERROR" LOG.warning("--> " + nginxMsg['message']) return json.dumps(nginxMsg) - def _disableWebservice(self, service_name): + def _disableWebservice(self, service_dns, dns_list): + """ + :param service: + :param dns_list: + :return: + """ nginxMsg = {} try: - nginx_path = "/etc/nginx/conf.d/iotronic" - service_path = nginx_path + "/" + service_name + nginx_path = "/etc/nginx/conf.d" + service_path = nginx_path + "/" + service_dns + ".conf" if os.path.exists(service_path): os.remove(service_path) - time.sleep(3) - - nginxMsg['message'] = "webservice '" \ - + service_name + "' disabled." - nginxMsg['result'] = "SUCCESS" - # LOG.info("--> " + nginxMsg['message']) + time.sleep(1) self._proxyReload() time.sleep(3) + command = "/usr/bin/certbot " \ + "--expand -n " \ + "--redirect " \ + "--authenticator webroot " \ + "--installer nginx -w /var/www/html/ " \ + "--domain " + str(dns_list) + + LOG.debug(command) + certbot_result = call(command, shell=True) + LOG.info("CERTBOT RESULT: " + str(certbot_result)) + + LOG.info("Webservices list updated:\n" + str( + self._webserviceList())) + + nginxMsg['message'] = "webservice '" \ + + service_dns + "' disabled." + nginxMsg['result'] = "SUCCESS" + LOG.info(nginxMsg['message']) + else: nginxMsg['message'] = "webservice file " \ + service_path + " does not exist" nginxMsg['result'] = "ERROR" - # LOG.info("--> " + nginxMsg['message']) except Exception as e: nginxMsg['message'] = "Error disabling Webservice '" + \ - service_name + "': {}".format(e) + service_dns + "': {}".format(e) nginxMsg['result'] = "ERROR" - # LOG.warning("--> " + nginxMsg['message']) return json.dumps(nginxMsg) def _webserviceList(self): - nginx_path = "/etc/nginx/conf.d/iotronic" + nginx_path = "/etc/nginx/conf.d/" if os.path.exists(nginx_path): service_list = [f for f in os.listdir(nginx_path) if os.path.isfile(os.path.join(nginx_path, f))] - - service_list.remove('default') else: service_list = [] diff --git a/iotronic_lightningrod/modules/service_manager.py b/iotronic_lightningrod/modules/service_manager.py index ae05909..29689d7 100644 --- a/iotronic_lightningrod/modules/service_manager.py +++ b/iotronic_lightningrod/modules/service_manager.py @@ -15,13 +15,17 @@ __author__ = "Nicola Peditto " -from datetime import datetime import errno import json import os import psutil import signal import subprocess +import time +import traceback + +from datetime import datetime +from threading import Thread from urllib.parse import urlparse from iotronic_lightningrod.modules import Module @@ -29,6 +33,9 @@ from iotronic_lightningrod.modules import utils import iotronic_lightningrod.wampmessage as WM +from iotronic_lightningrod import lightningrod + + from oslo_config import cfg from oslo_log import log as logging LOG = logging.getLogger(__name__) @@ -70,6 +77,9 @@ class ServiceManager(Module.Module): if (p.name() == "node" and "wstun" in p.cmdline()[1]): wstun_process_list.append(p) + if len(services_conf) != 0: + print("\nWSTUN processes:") + for service_uuid in services_conf['services']: service_name = services_conf['services'][service_uuid]['name'] @@ -85,11 +95,22 @@ class ServiceManager(Module.Module): "' already exists; killing...") # 1. Kill wstun process (if exists) + + # No zombie alert activation + lightningrod.zombie_alert = False + LOG.debug( + "[WSTUN-RESTORE] - " + "on-finalize zombie_alert: " + + str(lightningrod.zombie_alert) + ) + try: - os.kill(service_pid, signal.SIGKILL) + os.kill(service_pid, signal.SIGINT) + print("OLD WSTUN KILLED: " + str(wp)) LOG.info(" --> service '" + service_name + "' with PID " + str(service_pid) + " was killed; creating new one...") + except OSError: LOG.warning(" - WSTUN process already killed, " "creating new one...") @@ -102,7 +123,7 @@ class ServiceManager(Module.Module): local_port = \ services_conf['services'][service_uuid]['local_port'] - wstun = self._startWstun(public_port, local_port) + wstun = self._startWstun(public_port, local_port, event="boot") if wstun != None: service_pid = wstun.pid @@ -123,9 +144,144 @@ class ServiceManager(Module.Module): + " service tunnel!" LOG.error(" - " + message) + signal.signal(signal.SIGCHLD, self._zombie_hunter) + + # Reactivate zombies monitoring + if not lightningrod.zombie_alert: + lightningrod.zombie_alert = True + else: LOG.info(" --> No service tunnels to establish.") + signal.signal(signal.SIGCHLD, self._zombie_hunter) + + def _zombie_hunter(self, signum, frame): + + wstun_found = False + + if (lightningrod.zombie_alert): + # print(signum); traceback.print_stack(frame) + + zombie_list = [] + + for p in psutil.process_iter(): + if len(p.cmdline()) == 0: + if ((p.name() == "node") and + (p.status() == psutil.STATUS_ZOMBIE)): + print(" - process: " + str(p)) + zombie_list.append(p.pid) + + if len(zombie_list) == 0: + # print(" - no action required.") + return + + print("\nCheck killed process...") + print(" - Zombies found: " + str(zombie_list)) + + # Load services.json configuration file + services_conf = self._loadServicesConf() + + for service_uuid in services_conf['services']: + + service_pid = services_conf['services'][service_uuid]['pid'] + + if service_pid in zombie_list: + + message = "WSTUN zombie process ALERT!" + print(" - " + str(message)) + LOG.debug("[WSTUN-RESTORE] --> " + str(message)) + + wstun_found = True + + print(services_conf['services'][service_uuid]) + service_public_port = \ + services_conf['services'][service_uuid]['public_port'] + service_local_port = \ + services_conf['services'][service_uuid]['local_port'] + service_name = \ + services_conf['services'][service_uuid]['name'] + + try: + + wstun = self._startWstun( + service_public_port, + service_local_port, + event="zombie" + ) + + if wstun != None: + service_pid = wstun.pid + + # UPDATE services.json file + services_conf['services'][service_uuid]['pid'] = \ + service_pid + services_conf['services'][service_uuid][ + 'updated_at'] = \ + datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f') + + self._updateServiceConf(services_conf, + service_uuid, + output=True) + + message = "Zombie service " + str(service_name) \ + + " restored on port " \ + + str(service_public_port) \ + + " on " + self.wstun_ip + LOG.info(" - " + message + " with PID " + str( + service_pid)) + + except Exception: + pass + + break + + if not wstun_found: + message = "Tunnel killed by LR" + print(" - " + str(message)) + # LOG.debug("[WSTUN-RESTORE] --> " + str(message)) + + else: + print("WSTUN kill event:") + message = "Tunnel killed by LR" + print(" - " + str(message)) + # LOG.debug("[WSTUN-RESTORE] --> " + str(message)) + # lightningrod.zombie_alert = True + + def _restoreWSTUN(self, services_conf, service_uuid): + service_name = services_conf['services'][service_uuid]['name'] + service_pid = services_conf['services'][service_uuid]['pid'] + LOG.info(" - " + service_name) + + # Create the reverse tunnel again + public_port = \ + services_conf['services'][service_uuid]['public_port'] + local_port = \ + services_conf['services'][service_uuid]['local_port'] + + wstun = self._startWstun(public_port, local_port, event="restore") + + if wstun != None: + service_pid = wstun.pid + + # 3. Update services.json file + services_conf['services'][service_uuid]['pid'] = \ + service_pid + services_conf['services'][service_uuid]['updated_at'] = \ + datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f') + + self._updateServiceConf( + services_conf, + service_uuid, + output=True + ) + + LOG.info(" --> Cloud service '" + service_name + + "' tunnel restored.") + else: + message = "Error spawning " + str(service_name) \ + + " service tunnel!" + LOG.error(" - " + message) + def restore(self): LOG.info("Cloud service tunnels to restore:") @@ -136,58 +292,38 @@ class ServiceManager(Module.Module): wstun_process_list = [] + # No zombie alert activation + lightningrod.zombie_alert = False + LOG.debug("[WSTUN-RESTORE] - Restore zombie_alert: " + str( + lightningrod.zombie_alert)) + # Collect all alive WSTUN proccesses for p in psutil.process_iter(): - if len(p.cmdline()) != 0: - if (p.name() == "node") and ("wstun" in p.cmdline()[1]): + if (p.name() == "node"): + if (p.status() == psutil.STATUS_ZOMBIE): + LOG.warning("WSTUN ZOMBIE: " + str(p)) wstun_process_list.append(p) + elif ("wstun" in p.cmdline()[1]): + LOG.warning("WSTUN ALIVE: " + str(p)) + wstun_process_list.append(p) + + psutil.Process(p.pid).kill() + LOG.warning(" --> PID " + str(p.pid) + " killed!") + + LOG.debug("[WSTUN-RESTORE] - WSTUN processes to restore:\n" + + str(wstun_process_list)) for service_uuid in services_conf['services']: - service_name = services_conf['services'][service_uuid]['name'] - service_pid = services_conf['services'][service_uuid]['pid'] - LOG.info(" - " + service_name) + Thread( + target=self._restoreWSTUN, + args=(services_conf, service_uuid,) + ).start() + time.sleep(2) - s_alive = False - - # WSTUN is still alive - if len(wstun_process_list) != 0: - - for wp in wstun_process_list: - - if service_pid == wp.pid: - LOG.warning(" --> the tunnel for '" + service_name - + "' is still established.") - s_alive = True - break - - if not s_alive: - # Create the reverse tunnel again - public_port = services_conf['services'][service_uuid] - ['public_port'] - local_port = services_conf['services'][service_uuid] - ['local_port'] - - wstun = self._startWstun(public_port, local_port) - - if wstun != None: - service_pid = wstun.pid - - # 3. Update services.json file - services_conf['services'][service_uuid]['pid'] = \ - service_pid - services_conf['services'][service_uuid]['updated_at'] = \ - datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f') - - self._updateServiceConf(services_conf, - service_uuid, output=True) - - LOG.info(" --> Cloud service '" + service_name - + "' tunnel restored.") - else: - message = "Error spawning " + str(service_name) \ - + " service tunnel!" - LOG.error(" - " + message) + # Reactivate zombies monitoring + if not lightningrod.zombie_alert: + lightningrod.zombie_alert = True else: LOG.info(" --> No service tunnels to restore.") @@ -211,16 +347,24 @@ class ServiceManager(Module.Module): return services_conf - def _startWstun(self, public_port, local_port): + def _startWstun(self, public_port, local_port, event="no-set"): - opt_reverse = "-r" + str( - public_port) + ":127.0.0.1:" + str(local_port) + opt_reverse = "-r" + str(public_port) + ":127.0.0.1:" + str(local_port) try: wstun = subprocess.Popen( ['/usr/bin/wstun', opt_reverse, self.wstun_url], stdout=subprocess.PIPE ) + + if(event != "boot"): + print("WSTUN start event:") + + cmd_print = 'WSTUN exec: /usr/bin/wstun ' \ + + opt_reverse + ' ' + self.wstun_url + print(" - " + str(cmd_print)) + LOG.debug(cmd_print) + except Exception as err: LOG.error("Error spawning WSTUN process: " + str(err)) wstun = None @@ -254,13 +398,13 @@ class ServiceManager(Module.Module): try: - wstun = self._startWstun(public_port, local_port) + wstun = self._startWstun(public_port, local_port, event="enable") if wstun != None: service_pid = wstun.pid - LOG.debug(" - WSTUN stdout: " + str(wstun.stdout)) + # LOG.debug(" - WSTUN stdout: " + str(wstun.stdout)) # Update services.json file # Load services.json configuration file @@ -338,6 +482,16 @@ class ServiceManager(Module.Module): try: + # No zombie alert activation + lightningrod.zombie_alert = False + + """ + LOG.debug( + "[WSTUN-RESTORE] - disable-RPC zombie_alert: " + + str(lightningrod.zombie_alert) + ) + """ + os.kill(service_pid, signal.SIGKILL) message = "Cloud service '" \ @@ -349,6 +503,11 @@ class ServiceManager(Module.Module): output=False) LOG.info(" - " + message) + + # Reactivate zombies monitoring + if not lightningrod.zombie_alert: + lightningrod.zombie_alert = True + w_msg = WM.WampSuccess(message) except Exception as err: @@ -362,6 +521,10 @@ class ServiceManager(Module.Module): self._updateServiceConf(services_conf, service_uuid, output=False) + # Reactivate zombies monitoring + if not lightningrod.zombie_alert: + lightningrod.zombie_alert = True + w_msg = WM.WampWarning(message) else: @@ -369,6 +532,11 @@ class ServiceManager(Module.Module): message = "Error disabling '" + str( service_name) + "' service tunnel: " + str(err) LOG.error(" - " + message) + + # Reactivate zombies monitoring + if not lightningrod.zombie_alert: + lightningrod.zombie_alert = True + w_msg = WM.WampError(message) else: @@ -407,6 +575,14 @@ class ServiceManager(Module.Module): try: + # No zombie alert activation + lightningrod.zombie_alert = False + """ + LOG.debug( + "[WSTUN-RESTORE] - restore-RPC lightningrod.zombie_alert: " + + str(lightningrod.zombie_alert)) + """ + # 1. Kill wstun process (if exists) try: os.kill(service_pid, signal.SIGKILL) @@ -418,7 +594,11 @@ class ServiceManager(Module.Module): "creating new one...") # 2. Create the reverse tunnel - wstun = self._startWstun(public_port, local_port) + wstun = self._startWstun( + public_port, + local_port, + event="kill-restore" + ) if wstun != None: service_pid = wstun.pid @@ -437,12 +617,21 @@ class ServiceManager(Module.Module): + str(public_port) + " on " + self.wstun_ip LOG.info(" - " + message + " with PID " + str(service_pid)) + # Reactivate zombies monitoring + if not lightningrod.zombie_alert: + lightningrod.zombie_alert = True + w_msg = WM.WampSuccess(message) else: message = "Error spawning " + str(service_name) \ + " service tunnel!" LOG.error(" - " + message) + + # Reactivate zombies monitoring + if not lightningrod.zombie_alert: + lightningrod.zombie_alert = True + w_msg = WM.WampError(message) except Exception as err: @@ -455,7 +644,11 @@ class ServiceManager(Module.Module): local_port = service['port'] - wstun = self._startWstun(public_port, local_port) + wstun = self._startWstun( + public_port, + local_port, + event="clean-restore" + ) if wstun != None: diff --git a/iotronic_lightningrod/modules/test.py b/iotronic_lightningrod/modules/test.py deleted file mode 100644 index 2f7927d..0000000 --- a/iotronic_lightningrod/modules/test.py +++ /dev/null @@ -1,43 +0,0 @@ -# Copyright 2017 MDSLAB - University of Messina -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -__author__ = "Nicola Peditto " - -import asyncio - -from iotronic_lightningrod.modules import Module - -from oslo_log import log as logging -LOG = logging.getLogger(__name__) - - -class Test(Module.Module): - - def __init__(self, board): - - super(Test, self).__init__("Test", board) - - async def test_function(self): - import random - s = random.uniform(0.5, 1.5) - await asyncio.sleep(s) - result = "DEVICE test result: TEST!" - LOG.info(result) - return result - - async def add(self, x, y): - c = x + y - LOG.info("DEVICE add result: " + str(c)) - return c diff --git a/iotronic_lightningrod/modules/vfs_library.py b/iotronic_lightningrod/modules/vfs_library.py deleted file mode 100644 index 947b24f..0000000 --- a/iotronic_lightningrod/modules/vfs_library.py +++ /dev/null @@ -1,164 +0,0 @@ -# Copyright 2017 MDSLAB - University of Messina -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -__author__ = "Nicola Peditto " - -import errno -from fuse import FuseOSError -import os - - -# Logging conf -from oslo_log import log as logging -LOG = logging.getLogger(__name__) - - -class FuseLib(object): - def __init__(self, mountSource): - self.mountSource = mountSource - - def _full_path(self, partial): - if partial.startswith("/"): - partial = partial[1:] - path = os.path.join(self.mountSource, partial) - print(path) - return path - - # Filesystem methods - # ================== - - def access(self, path, mode): - full_path = self._full_path(path) - if not os.access(full_path, mode): - raise FuseOSError(errno.EACCES) - - def chmod(self, path, mode): - full_path = self._full_path(path) - return os.chmod(full_path, mode) - - def chown(self, path, uid, gid): - full_path = self._full_path(path) - return os.chown(full_path, uid, gid) - - def getattr(self, path, fh=None): - full_path = self._full_path(path) - st = os.lstat(full_path) - attr = dict((key, getattr(st, key)) - for key in ( - 'st_atime', - 'st_ctime', - 'st_gid', - 'st_mode', - 'st_mtime', - 'st_nlink', - 'st_size', - 'st_uid' - ) - ) - - return attr - - def readdir(self, path, fh): - full_path = self._full_path(path) - - dirents = ['.', '..'] - if os.path.isdir(full_path): - dirents.extend(os.listdir(full_path)) - for r in dirents: - yield r - - def readlink(self, path): - pathname = os.readlink(self._full_path(path)) - if pathname.startswith("/"): - # Path name is absolute, sanitize it. - return os.path.relpath(pathname, self.mountSource) - else: - return pathname - - def mknod(self, path, mode, dev): - return os.mknod(self._full_path(path), mode, dev) - - def rmdir(self, path): - full_path = self._full_path(path) - return os.rmdir(full_path) - - def mkdir(self, path, mode): - return os.mkdir(self._full_path(path), mode) - - def statfs(self, path): - full_path = self._full_path(path) - stv = os.statvfs(full_path) - stat = dict((key, getattr(stv, key)) - for key in ('f_bavail', - 'f_bfree', - 'f_blocks', - 'f_bsize', - 'f_favail', - 'f_ffree', - 'f_files', - 'f_flag', - 'f_frsize', - 'f_namemax' - ) - ) - return stat - - def unlink(self, path): - return os.unlink(self._full_path(path)) - - def symlink(self, name, target): - return os.symlink(name, self._full_path(target)) - - def rename(self, old, new): - return os.rename(self._full_path(old), self._full_path(new)) - - def link(self, target, name): - return os.link(self._full_path(target), self._full_path(name)) - - def utimens(self, path, times=None): - return os.utime(self._full_path(path), times) - - # File methods - # ============ - - def open(self, path, flags): - full_path = self._full_path(path) - return os.open(full_path, flags) - - def create(self, path, mode, fi=None): - full_path = self._full_path(path) - return os.open(full_path, os.O_WRONLY | os.O_CREAT, mode) - - def read(self, path, length, offset, fh): - os.lseek(fh, offset, os.SEEK_SET) - return os.read(fh, length) - - def write(self, path, buf, offset, fh): - os.lseek(fh, offset, os.SEEK_SET) - return os.write(fh, buf) - - def truncate(self, path, length, fh=None): - full_path = self._full_path(path) - with open(full_path, 'r+') as f: - f.truncate(length) - - def flush(self, path, fh): - return os.fsync(fh) - - def release(self, path, fh): - return os.close(fh) - - def fsync(self, path, fdatasync, fh): - return self.flush(path, fh) diff --git a/iotronic_lightningrod/modules/vfs_manager.py b/iotronic_lightningrod/modules/vfs_manager.py deleted file mode 100644 index 408a08c..0000000 --- a/iotronic_lightningrod/modules/vfs_manager.py +++ /dev/null @@ -1,509 +0,0 @@ -# Copyright 2017 MDSLAB - University of Messina -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from __future__ import with_statement - -__author__ = "Nicola Peditto " - -import errno -import os -from subprocess import call -import threading - -# Iotronic imports -from iotronic_lightningrod.modules import Module - -# Fuse imports -import ctypes -import ctypes.util -from fuse import FUSE -from fuse import FuseOSError -from fuse import Operations - -# Logging conf -from oslo_log import log as logging -LOG = logging.getLogger(__name__) - - -class VfsManager(Module.Module): - - def __init__(self, board, session): - super(VfsManager, self).__init__("VFS", board) - - self.session = session - self.board = board - - """ - #print session - from iotronic_lightningrod.modules import vfs_library - fuse=vfs_library.FuseLib("/opt/AAA") - print fuse.getattr("/aaa.txt") - """ - - libcPath = ctypes.util.find_library("c") - self.libc = ctypes.CDLL(libcPath) - - def finalize(self): - pass - - def restore(self): - pass - - def mountLocal(self, mountSource, mountPoint): - - try: - - mounter = MounterLocal(mountSource, mountPoint) - mounter.start() - - result = "Mounted " + mountSource + " in " + mountPoint - - except Exception as msg: - result = "Mounting error:", msg - - print(result) - return result - - def unmountLocal(self, mountPoint): - - print("Unmounting...") - - try: - - # errorCode = self.libc.umount(mountPoint, None) - errorCode = call(["umount", "-l", mountPoint]) - - result = "Unmount " + mountPoint + " result: " + str(errorCode) - - except Exception as msg: - result = "Unmounting error:", msg - - print(result) - return result - - def mountRemote(self, - mountSource, - mountPoint, - boardRemote=None, - agentRemote=None - ): - - try: - - mounter = MounterRemote( - mountSource, - mountPoint, - self.board, - self.session, - boardRemote, - agentRemote - ) - - mounter.start() - - result = "Mounted " + mountSource + " in " + mountPoint - - except Exception as msg: - result = "Mounting error:", msg - - print(result) - return result - - def unmountRemote(self, mountPoint): - - print("Unmounting...") - - try: - - # errorCode = self.libc.umount(mountPoint, None) - errorCode = call(["umount", "-l", mountPoint]) - - result = "Unmount " + mountPoint + " result: " + str(errorCode) - - except Exception as msg: - result = "Unmounting error:", msg - - print(result) - return result - - -class MounterLocal(threading.Thread): - - def __init__(self, mountSource, mountPoint): - threading.Thread.__init__(self) - # self.setDaemon(1) - self.setName("VFS-Mounter") # Set thread name - - self.mountSource = mountSource - self.mountPoint = mountPoint - - def run(self): - """Mount FUSE FS - - """ - try: - - FUSE( - FuseManager(self.mountSource), - self.mountPoint, - nothreads=False, - foreground=True - ) - - except Exception as msg: - LOG.error("Mounting FUSE error: " + str(msg)) - - -class MounterRemote(threading.Thread): - - def __init__( - self, - mountSource, - mountPoint, - board, - session, - boardRemote, - agentRemote - ): - - threading.Thread.__init__(self) - # self.setDaemon(1) - self.setName("VFS-Mounter") # Set thread name - - self.mountSource = mountSource - self.mountPoint = mountPoint - self.session = session - self.board = board - self.boardRemote = boardRemote - self.agentRemote = agentRemote - - def run(self): - """Mount FUSE FS. - - """ - try: - - FUSE( - FuseRemoteManager( - self.mountSource, - self.board.agent, - self.session, - self.boardRemote, - self.agentRemote - ), - self.mountPoint, - nothreads=False, - foreground=True - ) - - except Exception as msg: - LOG.error("Mounting FUSE error: " + str(msg)) - - -async def makeCall(msg=None, agent=None, session=None): - rpc_addr = str(agent) + '.stack4things.echo' - LOG.debug("VFS - I'm calling " + rpc_addr) - try: - res = await session.call(rpc_addr, msg) - LOG.info("NOTIFICATION " + str(res)) - except Exception as e: - LOG.warning("NOTIFICATION error: {0}".format(e)) - - -class FuseRemoteManager(Operations): - - def __init__(self, mountSource, agent, session, boardRemote, agentRemote): - - self.mountSource = mountSource - self.session = session - self.agent = agent - self.boardRemote = boardRemote - self.agentRemote = agentRemote - - # makeCall("UUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUU", - # self.agent, self.session) # TEMPORARY - - def join_path(self, partial): - if partial.startswith("/"): - partial = partial[1:] - path = os.path.join(self.mountSource, partial) - print(path) - return path - - # Filesystem methods - # ================== - - def access(self, path, mode): - full_path = self.join_path(path) - if not os.access(full_path, mode): - raise FuseOSError(errno.EACCES) - - def chmod(self, path, mode): - full_path = self.join_path(path) - return os.chmod(full_path, mode) - - def chown(self, path, uid, gid): - full_path = self.join_path(path) - return os.chown(full_path, uid, gid) - - def getattr(self, path, fh=None): - full_path = self.join_path(path) - st = os.lstat(full_path) - attr = dict((key, getattr(st, key)) - for key in ( - 'st_atime', - 'st_ctime', - 'st_gid', - 'st_mode', - 'st_mtime', - 'st_nlink', - 'st_size', - 'st_uid' - ) - ) - - return attr - - def readdir(self, path, fh): - full_path = self.join_path(path) - - dirents = ['.', '..'] - if os.path.isdir(full_path): - dirents.extend(os.listdir(full_path)) - for r in dirents: - yield r - - def readlink(self, path): - pathname = os.readlink(self.join_path(path)) - if pathname.startswith("/"): - # Path name is absolute, sanitize it. - return os.path.relpath(pathname, self.mountSource) - else: - return pathname - - def mknod(self, path, mode, dev): - return os.mknod(self.join_path(path), mode, dev) - - def rmdir(self, path): - full_path = self.join_path(path) - return os.rmdir(full_path) - - def mkdir(self, path, mode): - return os.mkdir(self.join_path(path), mode) - - def statfs(self, path): - full_path = self.join_path(path) - stv = os.statvfs(full_path) - stat = dict((key, getattr(stv, key)) - for key in ('f_bavail', - 'f_bfree', - 'f_blocks', - 'f_bsize', - 'f_favail', - 'f_ffree', - 'f_files', - 'f_flag', - 'f_frsize', - 'f_namemax' - ) - ) - return stat - - def unlink(self, path): - return os.unlink(self.join_path(path)) - - def symlink(self, name, target): - return os.symlink(name, self.join_path(target)) - - def rename(self, old, new): - return os.rename(self.join_path(old), self.join_path(new)) - - def link(self, target, name): - return os.link(self.join_path(target), self.join_path(name)) - - def utimens(self, path, times=None): - return os.utime(self.join_path(path), times) - - # File methods - # ============ - - def open(self, path, flags): - full_path = self.join_path(path) - return os.open(full_path, flags) - - def create(self, path, mode, fi=None): - full_path = self.join_path(path) - return os.open(full_path, os.O_WRONLY | os.O_CREAT, mode) - - def read(self, path, length, offset, fh): - os.lseek(fh, offset, os.SEEK_SET) - return os.read(fh, length) - - def write(self, path, buf, offset, fh): - os.lseek(fh, offset, os.SEEK_SET) - return os.write(fh, buf) - - def truncate(self, path, length, fh=None): - full_path = self.join_path(path) - with open(full_path, 'r+') as f: - f.truncate(length) - - def flush(self, path, fh): - return os.fsync(fh) - - def release(self, path, fh): - return os.close(fh) - - def fsync(self, path, fdatasync, fh): - return self.flush(path, fh) - - -class FuseManager(Operations): - - def __init__(self, mountSource): - self.mountSource = mountSource - - def join_path(self, partial): - if partial.startswith("/"): - partial = partial[1:] - path = os.path.join(self.mountSource, partial) - print(path) - return path - - # Filesystem methods - # ================== - - def access(self, path, mode): - full_path = self.join_path(path) - if not os.access(full_path, mode): - raise FuseOSError(errno.EACCES) - - def chmod(self, path, mode): - full_path = self.join_path(path) - return os.chmod(full_path, mode) - - def chown(self, path, uid, gid): - full_path = self.join_path(path) - return os.chown(full_path, uid, gid) - - def getattr(self, path, fh=None): - full_path = self.join_path(path) - st = os.lstat(full_path) - attr = dict((key, getattr(st, key)) - for key in ( - 'st_atime', - 'st_ctime', - 'st_gid', - 'st_mode', - 'st_mtime', - 'st_nlink', - 'st_size', - 'st_uid' - ) - ) - - return attr - - def readdir(self, path, fh): - full_path = self.join_path(path) - - dirents = ['.', '..'] - if os.path.isdir(full_path): - dirents.extend(os.listdir(full_path)) - for r in dirents: - yield r - - def readlink(self, path): - pathname = os.readlink(self.join_path(path)) - if pathname.startswith("/"): - # Path name is absolute, sanitize it. - return os.path.relpath(pathname, self.mountSource) - else: - return pathname - - def mknod(self, path, mode, dev): - return os.mknod(self.join_path(path), mode, dev) - - def rmdir(self, path): - full_path = self.join_path(path) - return os.rmdir(full_path) - - def mkdir(self, path, mode): - return os.mkdir(self.join_path(path), mode) - - def statfs(self, path): - full_path = self.join_path(path) - stv = os.statvfs(full_path) - stat = dict((key, getattr(stv, key)) - for key in ('f_bavail', - 'f_bfree', - 'f_blocks', - 'f_bsize', - 'f_favail', - 'f_ffree', - 'f_files', - 'f_flag', - 'f_frsize', - 'f_namemax' - ) - ) - return stat - - def unlink(self, path): - return os.unlink(self.join_path(path)) - - def symlink(self, name, target): - return os.symlink(name, self.join_path(target)) - - def rename(self, old, new): - return os.rename(self.join_path(old), self.join_path(new)) - - def link(self, target, name): - return os.link(self.join_path(target), self.join_path(name)) - - def utimens(self, path, times=None): - return os.utime(self.join_path(path), times) - - # File methods - # ============ - - def open(self, path, flags): - full_path = self.join_path(path) - return os.open(full_path, flags) - - def create(self, path, mode, fi=None): - full_path = self.join_path(path) - return os.open(full_path, os.O_WRONLY | os.O_CREAT, mode) - - def read(self, path, length, offset, fh): - os.lseek(fh, offset, os.SEEK_SET) - return os.read(fh, length) - - def write(self, path, buf, offset, fh): - os.lseek(fh, offset, os.SEEK_SET) - return os.write(fh, buf) - - def truncate(self, path, length, fh=None): - full_path = self.join_path(path) - with open(full_path, 'r+') as f: - f.truncate(length) - - def flush(self, path, fh): - return os.fsync(fh) - - def release(self, path, fh): - return os.close(fh) - - def fsync(self, path, fdatasync, fh): - return self.flush(path, fh) diff --git a/iotronic_lightningrod/modules/webservice_manager.py b/iotronic_lightningrod/modules/webservice_manager.py index 0f4989d..fd16efb 100644 --- a/iotronic_lightningrod/modules/webservice_manager.py +++ b/iotronic_lightningrod/modules/webservice_manager.py @@ -15,7 +15,12 @@ __author__ = "Nicola Peditto " +from iotronic_lightningrod.config import package_path +from iotronic_lightningrod.lightningrod import RPC_proxies +from iotronic_lightningrod.lightningrod import SESSION from iotronic_lightningrod.modules import Module +from iotronic_lightningrod.modules import utils +import iotronic_lightningrod.wampmessage as WM from oslo_config import cfg from oslo_log import log as logging @@ -26,13 +31,9 @@ CONF = cfg.CONF import importlib as imp import inspect import json +import OpenSSL.crypto import os - -from iotronic_lightningrod.config import package_path -from iotronic_lightningrod.lightningrod import RPC_proxies -from iotronic_lightningrod.lightningrod import SESSION -from iotronic_lightningrod.modules import utils -import iotronic_lightningrod.wampmessage as WM +import time class WebServiceManager(Module.Module): @@ -42,51 +43,102 @@ class WebServiceManager(Module.Module): LOG.info(" - Proxy used: " + CONF.proxy.upper()) - proxy_type = CONF.proxy - path = package_path + "/proxies/" + proxy_type + ".py" + try: + proxy_type = CONF.proxy + path = package_path + "/modules/proxies/" + proxy_type + ".py" - if os.path.exists(path): + if os.path.exists(path): - proxy_module = imp.import_module("iotronic_lightningrod.proxies." - + proxy_type) - LOG.info(" --> " + proxy_type.upper() + " module imported!") + proxy_module = imp.import_module( + "iotronic_lightningrod.modules.proxies." + proxy_type + ) - proxy = proxy_module.ProxyManager() + LOG.info(" --> " + proxy_type.upper() + " module imported!") - proxy_meth_list = inspect.getmembers( - proxy, - predicate=inspect.ismethod - ) + proxy = proxy_module.ProxyManager() - RPC_proxies[proxy_type] = proxy_meth_list + proxy_meth_list = inspect.getmembers( + proxy, + predicate=inspect.ismethod + ) - board.proxy = proxy + RPC_proxies[proxy_type] = proxy_meth_list - self._proxyWampRegister(proxy_meth_list, board) + board.proxy = proxy - else: - LOG.warning("Proxy '" + proxy_type + "' not supported!") + self._proxyWampRegister(proxy_meth_list, board) + + else: + LOG.warning("Proxy '" + proxy_type + "' not supported!") + + except Exception as err: + LOG.error("Error init WebServiceManager: " + str(err)) def finalize(self): - proxy_status = json.loads(self.board.proxy._proxyInfo()) - LOG.info("--> Proxy " + self.board.proxy.type.upper() - + " status:\n Active: " + str(proxy_status['status']) - + "\n Info: " + str(proxy_status['log'])) + try: - LOG.info("Webservice exposed on device:") - active_webservice_list = self.board.proxy._webserviceList() - if len(active_webservice_list) != 0: - for ws in active_webservice_list: - LOG.info("-> " + ws) - else: - LOG.info("-> NO WebService!") + proxy_status = json.loads(self.board.proxy._proxyInfo()) + LOG.info("--> Proxy " + self.board.proxy.type.upper() + + " status:\n Active: " + str(proxy_status['status']) + + "\n Info: " + str(proxy_status['log'])) - LOG.info("WebService Manager initialized!") + LOG.info("Webservice exposed on device:") + active_webservice_list = self.board.proxy._webserviceList() + if len(active_webservice_list) != 0: + for ws in active_webservice_list: + ws = ws.replace('.conf', '') + LOG.info("-> " + ws) + else: + LOG.info("-> NO WebService!") + + LOG.info("Certificates on device:") + active_certs_list = self._certsList() + if len(active_certs_list) != 0: + for certificate in active_certs_list: + LOG.info("-> " + certificate) + + c = open('/etc/letsencrypt/live/' + + certificate + '/cert.pem').read() + cert = OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_PEM, c) + + LOG.info("--> Subject: " + str(cert.get_subject())) + LOG.info("--> ISSUER Organization:" + + str(cert.get_issuer())) + LOG.info("--> Expire date: " + str( + cert.get_notAfter().decode("utf-8"))) + LOG.info("--> Expired: " + str(cert.has_expired())) + + else: + LOG.info("-> NO certificates!") + + # Safe apply changes on proxy + self.board.proxy._proxyReload() + time.sleep(3) + + LOG.info("WebService Manager initialized!") + + except Exception as err: + LOG.error("Error finalize init WebServiceManager: " + str(err)) def restore(self): LOG.info("WebService Manager restored.") + def _certsList(self): + + letsencrypt_path = "/etc/letsencrypt/live/" + + if os.path.exists(letsencrypt_path): + certs_list = [ + f for f in os.listdir(letsencrypt_path) + if os.path.isdir(os.path.join(letsencrypt_path, f)) + ] + else: + certs_list = [] + + return certs_list + def _proxyWampRegister(self, proxy_meth_list, board): LOG.info(" - " + str(board.proxy.type).upper() @@ -96,38 +148,41 @@ class WebServiceManager(Module.Module): if (meth[0] != "__init__") & (meth[0] != "finalize") \ & (meth[0] != "restore"): # LOG.info(" - " + str(meth[0])) - rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0] + rpc_addr = u'iotronic.' + str(board.session_id) + '.' + \ + board.uuid + '.' + meth[0] + # LOG.debug(" --> " + str(rpc_addr)) if not meth[0].startswith('_'): SESSION.register(meth[1], rpc_addr) LOG.info(" --> " + str(meth[0])) - async def ExposeWebservice(self, service_name, local_port): + async def ExposeWebservice(self, board_dns, service_dns, + local_port, dns_list): rpc_name = utils.getFuncName() LOG.info("RPC " + rpc_name + " CALLED") - response = self.board.proxy._exposeWebservice(service_name, local_port) + response = self.board.proxy._exposeWebservice(board_dns, service_dns, + local_port, dns_list) response = json.loads(response) if(response['result'] == "SUCCESS"): - message = "Webservice '" + service_name + "' successfully exposed!" + message = "Webservice '" + service_dns + "' successfully exposed!" LOG.info("--> " + str(message)) w_msg = WM.WampSuccess(response) else: - message = "Error exposing webservice '" + service_name + "'" LOG.warning("--> " + str(response['message'])) w_msg = WM.WampWarning(response) return w_msg.serialize() - async def UnexposeWebservice(self, service_name): + async def UnexposeWebservice(self, service, dns_list): rpc_name = utils.getFuncName() LOG.info("RPC " + rpc_name + " CALLED") - response = self.board.proxy._disableWebservice(service_name) + response = self.board.proxy._disableWebservice(service, dns_list) response = json.loads(response) @@ -140,12 +195,15 @@ class WebServiceManager(Module.Module): return w_msg.serialize() - async def BoardDnsCertsSetup(self, board_dns, owner_email): + async def EnableWebService(self, board_dns, owner_email): rpc_name = utils.getFuncName() LOG.info("RPC " + rpc_name + " CALLED") - message = self.board.proxy._proxyBoardDnsSetup(board_dns, owner_email) + message = self.board.proxy._proxyEnableWebService( + board_dns, + owner_email + ) w_msg = WM.WampSuccess(message) return w_msg.serialize() diff --git a/requirements.txt b/requirements.txt index 27e8399..e947780 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,9 +3,9 @@ # process, which may cause wedges in the gate later. pbr>=2.0.0,!=2.1.0 # Apache-2.0 -autobahn>=0.10.1 # MIT License +autobahn>=18.10.1 # MIT License six>=1.10.0 # MIT httplib2>=0.9.1 # MIT -psutil # BSD +psutil>=5.4.7 # BSD oslo.config>=5.1.0 # Apache-2.0 oslo.log>=3.36.0 # Apache-2.0 diff --git a/scripts/lr_configure b/scripts/lr_configure index 6e5334a..db9c332 100755 --- a/scripts/lr_configure +++ b/scripts/lr_configure @@ -19,7 +19,7 @@ import os import sys if len(sys.argv) < 3: - print('Arguments required: " ', + print('Arguments required: ', str(sys.argv)) else: os.system('sed -i "s|\\"code\\":.*|\\"code\\": \\"' diff --git a/setup.cfg b/setup.cfg index f13caac..054c02a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -65,9 +65,8 @@ s4t.modules = plugin = iotronic_lightningrod.modules.plugin_manager:PluginManager device = iotronic_lightningrod.modules.device_manager:DeviceManager service = iotronic_lightningrod.modules.service_manager:ServiceManager - # network = iotronic_lightningrod.modules.network_manager:NetworkManager + network = iotronic_lightningrod.modules.network_manager:NetworkManager webservice = iotronic_lightningrod.modules.webservice_manager:WebServiceManager - # vfs = iotronic_lightningrod.modules.vfs_manager:VfsManager [options] build_scripts =