Updated Module class: restore method was added and all modules was upgraded. Service Manager upgraded: services restore management added.

Change-Id: I1ca9bb8dfe28f5678e213a2af42390ec7136194d
This commit is contained in:
Nicola Peditto 2018-03-07 11:14:30 +01:00
parent b817d23bd5
commit e884ed07cb
13 changed files with 673 additions and 321 deletions

View File

@ -15,9 +15,8 @@ board-side probe.
Installation guides
-------------------
* `Arduino YUN <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/arduino_yun.rst>`_.
* `Raspberry Pi 3 <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/raspberry_pi_3.rst>`_.
* `Ubuntu 16.04 <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/ubuntu1604.rst>`_.
* `Raspberry Pi 3 <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/raspberry_pi_3.rst>`_.
* `Arduino YUN <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/arduino_yun.rst>`_.

View File

@ -1,8 +1,7 @@
IoTronic Lightning-rod installation guide for Arduino YUN
=========================================================
We tested this procedure on a Arduino YUN board with OpenWRT LininoIO
image.
We tested this procedure on a Arduino YUN board with OpenWRT LininoIO image.
Install from source code
------------------------
@ -25,28 +24,13 @@ Install dependencies
::
opkg install git bzip2 python-netifaces
pip install --no-cache-dir zope.interface pyserial Babel oslo.config oslo.log
pip install --no-cache-dir zope.interface pyserial Babel oslo.config
oslo.log
easy_install httplib2
Install Autobahn:
'''''''''''''''''
::
# Install Twisted:
wget --no-check-certificate https://pypi.python.org/packages/source/T/Twisted/Twisted-14.0.2.tar.bz2
bzip2 -d Twisted-14.0.2.tar.bz2
tar -xvf Twisted-14.0.2.tar
cd Twisted-14.0.2/
vi setup.py
comment line 63:
#conditionalExtensions=getExtensions(),
python setup.py install
cd /opt/
rm -rf /opt/Twisted-14.0.2*
::
easy_install autobahn
@ -82,6 +66,7 @@ Deployment
cp etc/iotronic/iotronic.conf /etc/iotronic/
cp settings.example.json /var/lib/iotronic/settings.json
cp plugins.example.json /var/lib/iotronic/plugins.json
cp services.example.json /var/lib/iotronic/services.json
cp etc/init.d/lightning-rod /etc/init.d/lightning-rod
chmod +x /etc/init.d/lightning-rod
touch /var/log/iotronic/lightning-rod.log

View File

@ -11,7 +11,7 @@ Install requirements
::
pip install oslo-config oslo_log twisted autobahn httplib2
pip install oslo.config oslo.log asyncio autobahn httplib2 psutil six
Set up environment:
~~~~~~~~~~~~~~~~~~~
@ -44,6 +44,7 @@ Deployment
cp etc/iotronic/iotronic.conf /etc/iotronic/
cp settings.example.json /var/lib/iotronic/settings.json
cp plugins.example.json /var/lib/iotronic/plugins.json
cp services.example.json /var/lib/iotronic/services.json
cp etc/systemd/system/s4t-lightning-rod.service /etc/systemd/system/lightning-rod.service
chmod +x /etc/systemd/system/lightning-rod.service
systemctl daemon-reload

View File

@ -12,7 +12,7 @@ Install requirements
::
pip install oslo-config oslo_log twisted autobahn httplib2
pip install oslo.config oslo.log asyncio autobahn httplib2 psutil six
Set up environment:
~~~~~~~~~~~~~~~~~~~
@ -45,6 +45,7 @@ Deployment
cp etc/iotronic/iotronic.conf /etc/iotronic/
cp settings.example.json /var/lib/iotronic/settings.json
cp plugins.example.json /var/lib/iotronic/plugins.json
cp services.example.json /var/lib/iotronic/services.json
cp etc/systemd/system/s4t-lightning-rod.service /etc/systemd/system/lightning-rod.service
chmod +x /etc/systemd/system/lightning-rod.service
systemctl daemon-reload

View File

@ -1,4 +1,4 @@
[DEFAULT]
debug = True
log_file = /var/log/iotronic/lightning-rod.log
lightningrod_home = /var/lib/iotronic
lightningrod_home = /var/lib/iotronic

View File

@ -66,6 +66,9 @@ txaio.start_logging(level="info")
RUNNER = None
connected = False
global MODULES
MODULES = {}
def moduleReloadInfo(session):
"""This function is used in the reconnection stage to register
@ -76,18 +79,22 @@ def moduleReloadInfo(session):
"""
LOG.info("Modules reloading after WAMP recovery...")
LOG.info("\n\nModules reloading after WAMP recovery...\n\n")
try:
# Register RPCs for each Lightning-rod module
for mod in RPC:
LOG.info("- Reloading module RPcs for " + str(mod))
moduleWampRegister(session, RPC[mod])
# Call module restore procedures and
# register RPCs for each Lightning-rod module
for mod_name in MODULES:
LOG.info("- Registering RPCs for module " + str(mod_name))
moduleWampRegister(session, RPC[mod_name])
LOG.info("- Restoring module " + str(mod_name))
MODULES[mod_name].restore()
# Register RPCs for the device
for dev in RPC_devices:
LOG.info("- Reloading device RPCs for " + str(dev))
LOG.info("- Registering RPCs for device " + str(dev))
moduleWampRegister(session, RPC_devices[dev])
except Exception as err:
@ -110,13 +117,15 @@ def moduleWampRegister(session, meth_list):
else:
for meth in meth_list:
# We don't considere the __init__ and finalize methods
if (meth[0] != "__init__") & (meth[0] != "finalize"):
# We don't considere the "__init__", "finalize" and
# "restore" methods
if (meth[0] != "__init__") & (meth[0] != "finalize") \
& (meth[0] != "restore"):
rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0]
session.register(meth[1], rpc_addr)
LOG.info(" --> " + str(meth[0]))
if not meth[0].startswith('_'):
session.register(meth[1], rpc_addr)
LOG.info(" --> " + str(meth[0]))
def modulesLoader(session):
@ -158,13 +167,16 @@ def modulesLoader(session):
else:
mod = ext.plugin(board, session)
global MODULES
MODULES[mod.name] = mod
# Methods list for each module
meth_list = inspect.getmembers(mod, predicate=inspect.ismethod)
global RPC
RPC[mod.name] = meth_list
if len(meth_list) == 2:
if len(meth_list) == 3:
# there are at least two methods for each module:
# "__init__" and "finalize"
@ -222,8 +234,7 @@ async def IotronicLogin(board, session, details):
modulesLoader(session)
except Exception as e:
LOG.warning("WARNING - Could not register procedures: "
+ str(e))
LOG.warning("WARNING - Could not load modules: " + str(e))
# Reset flag to False
# reconnection = False
@ -257,10 +268,10 @@ def wampConnect(wamp_conf):
try:
LOG.info("WAMP status:" +
LOG.info("WAMP status @ boot:" +
"\n- board = " + str(board.status) +
"\n- reconnection = " + str(reconnection) +
"\n- connection = " + str(connected)
"\n- connected = " + str(connected)
)
# LR creates the Autobahn Asyncio Component that points to the
@ -406,10 +417,10 @@ def wampConnect(wamp_conf):
LOG.info("\n\n\nBoard is becoming operative...\n\n\n")
board.updateStatus("operative")
board.loadSettings()
LOG.info("WAMP status:" +
LOG.info("WAMP status @ firt connection:" +
"\n- board = " + str(board.status) +
"\n- reconnection = " + str(reconnection) +
"\n- connection = " + str(connected)
"\n- connected = " + str(connected)
)
await IotronicLogin(board, session, details)
@ -471,7 +482,7 @@ def wampConnect(wamp_conf):
except Exception as e:
LOG.warning(
"WARNING - Could not register procedures: "
"WARNING - Could not reload modules: "
+ str(e))
Bye()
@ -481,7 +492,7 @@ def wampConnect(wamp_conf):
Bye()
except exception.ApplicationError as e:
LOG.error("IoTronic connection error: " + str(e))
LOG.error("IoTronic connection error:\n" + str(e))
# Iotronic is offline the board can not call
# the "stack4things.connection" RPC.
# The board will disconnect from WAMP agent and retry later
@ -517,10 +528,10 @@ def wampConnect(wamp_conf):
global reconnection
LOG.info("WAMP status:" +
LOG.info("WAMP status on disconnect:" +
"\n- board = " + str(board.status) +
"\n- reconnection = " + str(reconnection) +
"\n- connection = " + str(connected)
"\n- connected = " + str(connected)
)
if board.status == "operative" and reconnection is False:
@ -579,7 +590,7 @@ def wampConnect(wamp_conf):
LOG.error("Reconnection wrong status!")
except Exception as err:
LOG.error(" - URI validation error: " + str(err))
LOG.error(" - WAMP connection error: " + str(err))
Bye()

View File

@ -28,8 +28,6 @@ class Module(object):
"""
# __metaclass__ = abc.ABCMeta
def __init__(self, name, board):
self.name = name
@ -40,3 +38,7 @@ class Module(object):
@abc.abstractmethod
def finalize(self):
pass
@abc.abstractmethod
def restore(self):
pass

View File

@ -29,22 +29,6 @@ from oslo_log import log as logging
LOG = logging.getLogger(__name__)
def deviceWampRegister(dev_meth_list, board):
LOG.info(" - " + str(board.type).capitalize()
+ " device registering RPCs:")
for meth in dev_meth_list:
if (meth[0] != "__init__") & (meth[0] != "finalize"):
# LOG.info(" - " + str(meth[0]))
rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0]
# LOG.debug(" --> " + str(rpc_addr))
SESSION.register(meth[1], rpc_addr)
LOG.info(" --> " + str(meth[0]) + " registered!")
class DeviceManager(Module.Module):
def __init__(self, board, session):
@ -71,7 +55,7 @@ class DeviceManager(Module.Module):
RPC_devices[device_type] = dev_meth_list
deviceWampRegister(dev_meth_list, board)
self._deviceWampRegister(dev_meth_list, board)
board.device = device
@ -80,3 +64,21 @@ class DeviceManager(Module.Module):
def finalize(self):
pass
def restore(self):
pass
def _deviceWampRegister(self, dev_meth_list, board):
LOG.info(" - " + str(board.type).capitalize()
+ " device registering RPCs:")
for meth in dev_meth_list:
if (meth[0] != "__init__") & (meth[0] != "finalize"):
# LOG.info(" - " + str(meth[0]))
rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0]
# LOG.debug(" --> " + str(rpc_addr))
SESSION.register(meth[1], rpc_addr)
LOG.info(" --> " + str(meth[0]) + " registered!")

View File

@ -18,7 +18,6 @@ __author__ = "Nicola Peditto <npeditto@unime.it"
from datetime import datetime
import imp
import inspect
import json
import os
import queue
@ -27,9 +26,11 @@ import time
from iotronic_lightningrod.modules import Module
from iotronic_lightningrod.modules import utils
from iotronic_lightningrod.plugins import PluginSerializer
import iotronic_lightningrod.wampmessage as WM
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
@ -39,162 +40,6 @@ PLUGINS_THRS = {}
PLUGINS_CONF_FILE = CONF.lightningrod_home + "/plugins.json"
def getFuncName():
return inspect.stack()[1][3]
def createPluginsConf():
"""Create plugins.json file if it does not exist.
"""
if not os.path.exists(PLUGINS_CONF_FILE):
LOG.debug("plugins.json does not exist: creating...")
plugins_conf = {'plugins': {}}
with open(PLUGINS_CONF_FILE, 'w') as f:
json.dump(plugins_conf, f, indent=4)
def loadPluginsConf():
"""Load plugins.json JSON configuration.
:return: JSON Plugins configuration
"""
try:
with open(PLUGINS_CONF_FILE) as settings:
plugins_conf = json.load(settings)
except Exception as err:
LOG.error("Parsing error in " + PLUGINS_CONF_FILE + ": " + str(err))
plugins_conf = None
return plugins_conf
def getEnabledPlugins():
"""This function gets the list of all asynchronous plugins.
We considered only those plugins with 'callable' flag set to False
and 'onboot' flag set to True.
:return: enabledPlugins List
"""
enabledPlugins = []
plugins_conf = loadPluginsConf()
for plugin in plugins_conf['plugins']:
if plugins_conf['plugins'][plugin]['callable'] is False:
if plugins_conf['plugins'][plugin]['onboot'] is True:
if plugins_conf['plugins'][plugin]['status'] == "operative":
enabledPlugins.append(plugin)
if len(enabledPlugins) != 0:
LOG.info(" - Enabled plugins list: " + str(enabledPlugins))
return enabledPlugins
def makeNothing():
"""Sandbox function.
"""
pass
def RebootOnBootPlugins():
"""Reboot at boot each enabled asynchronous plugin
:return:
"""
rpc_name = getFuncName()
LOG.info("Rebooting enabled plugins:")
enabledPlugins = getEnabledPlugins()
if enabledPlugins.__len__() == 0:
message = "No plugin to reboot!"
LOG.info(" - " + message)
else:
for plugin_uuid in enabledPlugins:
plugins_conf = loadPluginsConf()
plugin_name = plugins_conf['plugins'][plugin_uuid]['name']
# plugin_status = plugins_conf['plugins'][plugin_uuid]['status']
try:
if (plugin_uuid in PLUGINS_THRS) and (
PLUGINS_THRS[plugin_uuid].isAlive()
):
LOG.warning(" - Plugin "
+ plugin_uuid + " already started!")
else:
LOG.info(" - Rebooting plugin " + plugin_uuid)
plugin_home = CONF.lightningrod_home + "/plugins/" \
+ plugin_uuid
plugin_filename = plugin_home + "/" + plugin_uuid + ".py"
plugin_params_file = \
plugin_home + "/" + plugin_uuid + ".json"
if os.path.exists(plugin_filename):
task = imp.load_source("plugin", plugin_filename)
if os.path.exists(plugin_params_file):
with open(plugin_params_file) as conf:
plugin_params = json.load(conf)
worker = task.Worker(
plugin_uuid,
plugin_name,
q_result=None,
params=plugin_params
)
PLUGINS_THRS[plugin_uuid] = worker
LOG.info(" - Starting plugin " + str(worker))
worker.start()
else:
message = "ERROR " \
+ plugin_params_file + " does not exist!"
LOG.error(" - "
+ worker.complete(rpc_name, message))
else:
message = "ERROR " \
+ plugin_filename + " does not exist!"
LOG.error(" - " + worker.complete(rpc_name, message))
message = "rebooted!"
LOG.info(" - " + worker.complete(rpc_name, message))
except Exception as err:
message = "Error rebooting plugin " \
+ plugin_uuid + ": " + str(err)
LOG.error(" - " + message)
class PluginManager(Module.Module):
"""Plugin module to manage board plugins.
@ -213,7 +58,7 @@ class PluginManager(Module.Module):
super(PluginManager, self).__init__("PluginManager", board)
# Creation of plugins.json configuration file
createPluginsConf()
self._createPluginsConf()
def finalize(self):
"""Function called at the end of module loading.
@ -224,7 +69,154 @@ class PluginManager(Module.Module):
"""
# Reboot boot enabled plugins
RebootOnBootPlugins()
self._rebootOnBootPlugins()
def restore(self):
pass
def _loadPluginsConf(self):
"""Load plugins.json JSON configuration.
:return: JSON Plugins configuration
"""
try:
with open(PLUGINS_CONF_FILE) as settings:
plugins_conf = json.load(settings)
except Exception as err:
LOG.error(
"Parsing error in " + PLUGINS_CONF_FILE + ": " + str(err))
plugins_conf = None
return plugins_conf
def _getEnabledPlugins(self):
"""This function gets the list of all asynchronous plugins.
We considered only those plugins with 'callable' flag set to False
and 'onboot' flag set to True.
:return: enabledPlugins List
"""
enabledPlugins = []
plugins_conf = self._loadPluginsConf()
for plugin in plugins_conf['plugins']:
if plugins_conf['plugins'][plugin]['callable'] is False:
if plugins_conf['plugins'][plugin]['onboot'] is True:
if plugins_conf['plugins'][plugin]['status'] == \
"operative":
enabledPlugins.append(plugin)
if len(enabledPlugins) != 0:
LOG.info(" - Enabled plugins list: " + str(enabledPlugins))
return enabledPlugins
def _rebootOnBootPlugins(self):
"""Reboot at boot each enabled asynchronous plugin
:return:
"""
f_name = utils.getFuncName()
LOG.info("Rebooting enabled plugins:")
enabledPlugins = self._getEnabledPlugins()
if enabledPlugins.__len__() == 0:
message = "No plugin to reboot!"
LOG.info(" - " + message)
else:
for plugin_uuid in enabledPlugins:
plugins_conf = self._loadPluginsConf()
plugin_name = plugins_conf['plugins'][plugin_uuid]['name']
try:
if (plugin_uuid in PLUGINS_THRS) and (
PLUGINS_THRS[plugin_uuid].isAlive()
):
LOG.warning(" - Plugin "
+ plugin_uuid + " already started!")
else:
LOG.info(" - Rebooting plugin " + plugin_uuid)
plugin_home = \
CONF.lightningrod_home + "/plugins/" + plugin_uuid
plugin_filename = \
plugin_home + "/" + plugin_uuid + ".py"
plugin_params_file = \
plugin_home + "/" + plugin_uuid + ".json"
if os.path.exists(plugin_filename):
task = imp.load_source("plugin", plugin_filename)
if os.path.exists(plugin_params_file):
with open(plugin_params_file) as conf:
plugin_params = json.load(conf)
worker = task.Worker(
plugin_uuid,
plugin_name,
q_result=None,
params=plugin_params
)
PLUGINS_THRS[plugin_uuid] = worker
LOG.info(" - Starting plugin " + str(worker))
worker.start()
else:
message = "ERROR " + plugin_params_file \
+ " does not exist!"
LOG.error(" - "
+ worker.complete(f_name, message))
else:
message = "ERROR " \
+ plugin_filename + " does not exist!"
LOG.error(
" - " + worker.complete(f_name, message))
message = "rebooted!"
LOG.info(" - " + worker.complete(f_name, message))
except Exception as err:
message = "Error rebooting plugin " \
+ plugin_uuid + ": " + str(err)
LOG.error(" - " + message)
def _createPluginsConf(self):
"""Create plugins.json file if it does not exist.
"""
if not os.path.exists(PLUGINS_CONF_FILE):
LOG.debug("plugins.json does not exist: creating...")
plugins_conf = {'plugins': {}}
with open(PLUGINS_CONF_FILE, 'w') as f:
json.dump(plugins_conf, f, indent=4)
async def PluginInject(self, plugin, onboot):
"""Plugin injection procedure into the board:
@ -239,7 +231,7 @@ class PluginManager(Module.Module):
"""
rpc_name = getFuncName()
rpc_name = utils.getFuncName()
try:
@ -269,7 +261,7 @@ class PluginManager(Module.Module):
pluginfile.write(loaded)
# Load plugins.json configuration file
plugins_conf = loadPluginsConf()
plugins_conf = self._loadPluginsConf()
# LOG.debug("Plugin setup:\n"
# + json.dumps(plugin, indent=4, sort_keys=True))
@ -287,7 +279,7 @@ class PluginManager(Module.Module):
plugins_conf['plugins'][plugin_uuid]['updated_at'] = ""
plugins_conf['plugins'][plugin_uuid]['status'] = "injected"
LOG.info("Plugin " + plugin_name + " created!")
LOG.info(" - Plugin '" + plugin_name + "' created!")
message = rpc_name + " result: INJECTED"
else:
@ -299,11 +291,11 @@ class PluginManager(Module.Module):
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
plugins_conf['plugins'][plugin_uuid]['status'] = "updated"
LOG.info("Plugin " + plugin_name
+ " (" + str(plugin_uuid) + ") updated!")
LOG.info("Plugin '" + plugin_name
+ "' (" + str(plugin_uuid) + ") updated!")
message = rpc_name + " result: UPDATED"
LOG.info("Plugin setup:\n" + json.dumps(
LOG.info(" - Plugin setup:\n" + json.dumps(
plugins_conf['plugins'][plugin_uuid],
indent=4,
sort_keys=True
@ -314,12 +306,12 @@ class PluginManager(Module.Module):
json.dump(plugins_conf, f, indent=4)
LOG.info(" - " + message)
w_msg = await WM.WampSuccess(message)
w_msg = WM.WampSuccess(message)
except Exception as err:
message = "Plugin injection error: " + str(err)
LOG.error(" - " + message)
w_msg = await WM.WampError(str(err))
w_msg = WM.WampError(str(err))
return w_msg.serialize()
@ -338,11 +330,11 @@ class PluginManager(Module.Module):
try:
rpc_name = getFuncName()
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " called for '"
+ plugin_uuid + "' plugin:")
plugins_conf = loadPluginsConf()
plugins_conf = self._loadPluginsConf()
if plugin_uuid in plugins_conf['plugins']:
@ -356,7 +348,7 @@ class PluginManager(Module.Module):
message = "ALREADY STARTED!"
LOG.warning(" - Plugin "
+ plugin_uuid + " already started!")
w_msg = await WM.WampError(message)
w_msg = WM.WampError(message)
else:
@ -408,27 +400,27 @@ class PluginManager(Module.Module):
response = "STARTED"
LOG.info(" - " + worker.complete(rpc_name, response))
w_msg = await WM.WampSuccess(response)
w_msg = WM.WampSuccess(response)
else:
message = \
rpc_name + " - ERROR " \
+ plugin_filename + " does not exist!"
LOG.error(" - " + message)
w_msg = await WM.WampError(message)
w_msg = WM.WampError(message)
else:
message = "Plugin " + plugin_uuid \
+ " does not exist in this board!"
LOG.warning(" - " + message)
w_msg = await WM.WampError(message)
w_msg = WM.WampError(message)
except Exception as err:
message = \
rpc_name + " - ERROR - plugin (" + plugin_uuid + ") - " \
+ str(err)
LOG.error(" - " + message)
w_msg = await WM.WampError(str(err))
w_msg = WM.WampError(str(err))
return w_msg.serialize()
@ -442,7 +434,7 @@ class PluginManager(Module.Module):
:return: return a response to RPC request
"""
rpc_name = getFuncName()
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED for '"
+ plugin_uuid + "' plugin:")
@ -464,13 +456,13 @@ class PluginManager(Module.Module):
if 'delay' in parameters:
time.sleep(delay)
await worker.stop()
worker.stop()
del PLUGINS_THRS[plugin_uuid]
message = "STOPPED"
LOG.info(" - " + worker.complete(rpc_name, message))
w_msg = await WM.WampSuccess(message)
w_msg = WM.WampSuccess(message)
else:
message = \
@ -478,21 +470,21 @@ class PluginManager(Module.Module):
+ " - ERROR - plugin (" + plugin_uuid \
+ ") is instantiated but is not running anymore!"
LOG.error(" - " + message)
w_msg = await WM.WampError(message)
w_msg = WM.WampError(message)
else:
message = \
rpc_name + " - WARNING " \
+ plugin_uuid + " is not running!"
LOG.warning(" - " + message)
w_msg = await WM.WampWarning(message)
w_msg = WM.WampWarning(message)
except Exception as err:
message = \
rpc_name \
+ " - ERROR - plugin (" + plugin_uuid + ") - " + str(err)
LOG.error(" - " + message)
w_msg = await WM.WampError(str(err))
w_msg = WM.WampError(str(err))
return w_msg.serialize()
@ -507,7 +499,7 @@ class PluginManager(Module.Module):
"""
rpc_name = getFuncName()
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED for " + plugin_uuid + " plugin:")
try:
@ -527,7 +519,7 @@ class PluginManager(Module.Module):
plugin_filename = plugin_home + "/" + plugin_uuid + ".py"
plugin_params_file = plugin_home + "/" + plugin_uuid + ".json"
plugins_conf = loadPluginsConf()
plugins_conf = self._loadPluginsConf()
plugin_name = plugins_conf['plugins'][plugin_uuid]['name']
# Import plugin (as python module)
@ -619,7 +611,7 @@ class PluginManager(Module.Module):
"""
rpc_name = getFuncName()
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " for plugin " + plugin_uuid)
@ -630,7 +622,7 @@ class PluginManager(Module.Module):
message = "Plugin paths or files do not exist!"
LOG.error(message)
w_msg = await WM.WampError(message)
w_msg = WM.WampError(message)
return w_msg.serialize()
@ -652,14 +644,14 @@ class PluginManager(Module.Module):
message = "Removing plugin's files error in " \
+ plugin_path + ": " + str(err)
LOG.error(" - " + message)
w_msg = await WM.WampError(str(err))
w_msg = WM.WampError(str(err))
return w_msg.serialize()
# Remove from plugins.json file its configuration
try:
plugins_conf = loadPluginsConf()
plugins_conf = self._loadPluginsConf()
if plugin_uuid in plugins_conf['plugins']:
@ -674,8 +666,8 @@ class PluginManager(Module.Module):
if plugin_uuid in PLUGINS_THRS:
worker = PLUGINS_THRS[plugin_uuid]
if worker.isAlive():
LOG.info(" - Plugin "
+ plugin_name + " is running...")
LOG.info(" - Plugin '"
+ plugin_name + "' is running...")
worker.stop()
LOG.info(" ...stopped!")
@ -690,21 +682,21 @@ class PluginManager(Module.Module):
+ plugin_uuid + " already removed!"
LOG.warning(" - " + message)
w_msg = await WM.WampSuccess(message)
w_msg = WM.WampSuccess(message)
return w_msg.serialize()
except Exception as err:
message = "Updating plugins.json error: " + str(err)
LOG.error(" - " + message)
w_msg = await WM.WampError(str(err))
w_msg = WM.WampError(str(err))
return w_msg.serialize()
except Exception as err:
message = "Plugin removing error: {0}".format(err)
LOG.error(" - " + message)
w_msg = await WM.WampError(str(err))
w_msg = WM.WampError(str(err))
return w_msg.serialize()
@ -715,7 +707,7 @@ class PluginManager(Module.Module):
"""
rpc_name = getFuncName()
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED for '"
+ plugin_uuid + "' plugin:")
@ -729,7 +721,7 @@ class PluginManager(Module.Module):
plugin_filename = plugin_home + "/" + plugin_uuid + ".py"
plugin_params_file = plugin_home + "/" + plugin_uuid + ".json"
plugins_conf = loadPluginsConf()
plugins_conf = self._loadPluginsConf()
plugin_name = plugins_conf['plugins'][plugin_uuid]['name']
callable = plugins_conf['plugins'][plugin_uuid]['callable']
@ -789,18 +781,18 @@ class PluginManager(Module.Module):
message = "REBOOTED"
LOG.info(" - " + worker.complete(rpc_name, message))
w_msg = await WM.WampSuccess(message)
w_msg = WM.WampSuccess(message)
else:
message = "ERROR '" + plugin_filename + "' does not exist!"
LOG.error(" - " + message)
w_msg = await WM.WampError(message)
w_msg = WM.WampError(message)
except Exception as err:
message = "Error rebooting plugin '" \
+ plugin_uuid + "': " + str(err)
LOG.error(" - " + message)
w_msg = await WM.WampError(str(err))
w_msg = WM.WampError(str(err))
return w_msg.serialize()
@ -814,7 +806,7 @@ class PluginManager(Module.Module):
"""
rpc_name = getFuncName()
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED for '"
+ plugin_uuid + "' plugin:")
@ -830,20 +822,20 @@ class PluginManager(Module.Module):
result = "DEAD"
LOG.info(" - " + worker.complete(rpc_name, result))
w_msg = await WM.WampSuccess(result)
w_msg = WM.WampSuccess(result)
else:
result = "DEAD"
LOG.info(" - " + rpc_name + " result for "
+ plugin_uuid + ": " + result)
w_msg = await WM.WampSuccess(result)
w_msg = WM.WampSuccess(result)
except Exception as err:
message = \
rpc_name \
+ " - ERROR - plugin (" + plugin_uuid + ") - " + str(err)
LOG.error(" - " + message)
w_msg = await WM.WampError(str(err))
w_msg = WM.WampError(str(err))
return w_msg.serialize()

View File

@ -15,8 +15,11 @@
__author__ = "Nicola Peditto <npeditto@unime.it"
import inspect
from datetime import datetime
import errno
import json
import os
import psutil
import signal
import subprocess
from urllib.parse import urlparse
@ -26,109 +29,454 @@ from iotronic_lightningrod.modules import utils
import iotronic_lightningrod.wampmessage as WM
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
SERVICES_CONF_FILE = CONF.lightningrod_home + "/services.json"
class ServiceManager(Module.Module):
def __init__(self, board, session):
super(ServiceManager, self).__init__("ServiceManager", board)
self.url_ip = urlparse(board.wamp_config["url"])[1].split(':')[0]
self.wagent_url = "ws://" + self.url_ip + ":8080"
def finalize(self):
pass
LOG.info("Cloud service tunnels to initialization:")
async def ServiceEnable(self, name, public_port, local_port):
# Load services.json configuration file
services_conf = self._loadServicesConf()
LOG.info("RPC " + utils.getFuncName()
+ " CALLED for " + name + " service:")
if len(services_conf['services']) != 0:
wstun_process_list = []
for p in psutil.process_iter():
if len(p.cmdline()) != 0:
if (p.name() == "node" and "wstun" in p.cmdline()[1]):
wstun_process_list.append(p)
for service_uuid in services_conf['services']:
service_name = services_conf['services'][service_uuid]['name']
service_pid = services_conf['services'][service_uuid]['pid']
LOG.info(" - " + service_name)
if len(wstun_process_list) != 0:
for wp in wstun_process_list:
if service_pid == wp.pid:
LOG.info(" --> the tunnel for '" + service_name +
"' already exists; killing...")
# 1. Kill wstun process (if exists)
try:
os.kill(service_pid, signal.SIGKILL)
LOG.info(" --> service '" + service_name
+ "' with PID " + str(service_pid)
+ " was killed; creating new one...")
except OSError:
LOG.warning(" - WSTUN process already killed, "
"creating new one...")
break
# 2. Create the reverse tunnel
public_port = \
services_conf['services'][service_uuid]['public_port']
local_port = \
services_conf['services'][service_uuid]['local_port']
wstun = self._startWstun(public_port, local_port)
if wstun != None:
service_pid = wstun.pid
# 3. Update services.json file
services_conf['services'][service_uuid]['pid'] = \
service_pid
services_conf['services'][service_uuid]['updated_at'] = \
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
self._updateServiceConf(services_conf, service_uuid,
output=True)
LOG.info(" --> Cloud service '" + service_name
+ "' tunnel established.")
else:
message = "Error spawning " + str(service_name) \
+ " service tunnel!"
LOG.error(" - " + message)
else:
LOG.info(" --> No service tunnels to establish.")
def restore(self):
LOG.info("Cloud service tunnels to restore:")
# Load services.json configuration file
services_conf = self._loadServicesConf()
if len(services_conf['services']) != 0:
wstun_process_list = []
# Collect all alive WSTUN proccesses
for p in psutil.process_iter():
if len(p.cmdline()) != 0:
if (p.name() == "node") and ("wstun" in p.cmdline()[1]):
wstun_process_list.append(p)
for service_uuid in services_conf['services']:
service_name = services_conf['services'][service_uuid]['name']
service_pid = services_conf['services'][service_uuid]['pid']
LOG.info(" - " + service_name)
s_alive = False
# WSTUN is still alive
if len(wstun_process_list) != 0:
for wp in wstun_process_list:
if service_pid == wp.pid:
LOG.warning(" --> the tunnel for '" + service_name
+ "' is still established.")
s_alive = True
break
if not s_alive:
# Create the reverse tunnel again
public_port = services_conf['services'][service_uuid]
['public_port']
local_port = services_conf['services'][service_uuid]
['local_port']
wstun = self._startWstun(public_port, local_port)
if wstun != None:
service_pid = wstun.pid
# 3. Update services.json file
services_conf['services'][service_uuid]['pid'] = \
service_pid
services_conf['services'][service_uuid]['updated_at'] = \
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
self._updateServiceConf(services_conf,
service_uuid, output=True)
LOG.info(" --> Cloud service '" + service_name
+ "' tunnel restored.")
else:
message = "Error spawning " + str(service_name) \
+ " service tunnel!"
LOG.error(" - " + message)
else:
LOG.info(" --> No service tunnels to restore.")
def _loadServicesConf(self):
"""Load services.json JSON configuration.
:return: JSON Services configuration
"""
try:
url_ip = urlparse(self.board.wamp_config["url"])[1].split(':')[0]
with open(SERVICES_CONF_FILE) as settings:
services_conf = json.load(settings)
# "wstun -r6030:127.0.0.1:22 ws://192.168.17.103:8080"
opt_reverse = "-r" + str(public_port) + ":127.0.0.1:" \
+ str(local_port)
wagent_url = "ws://" + url_ip + ":8080"
except Exception as err:
LOG.error(
"Parsing error in " + SERVICES_CONF_FILE + ": " + str(err))
services_conf = None
return services_conf
def _startWstun(self, public_port, local_port):
opt_reverse = "-r" + str(
public_port) + ":127.0.0.1:" + str(local_port)
try:
wstun = subprocess.Popen(
['/usr/bin/wstun', opt_reverse, wagent_url],
['/usr/bin/wstun', opt_reverse, self.wagent_url],
stdout=subprocess.PIPE
)
except Exception as err:
LOG.error("Error spawning WSTUN process: " + str(err))
wstun = None
LOG.debug(" - WSTUN stdout: " + str(wstun.stdout))
return wstun
message = "Cloud service " + str(name) + " exposed on port " \
+ str(public_port) + " on " + url_ip
def _updateServiceConf(self, services_conf, service_uuid, output=True):
# Apply the changes to services.json
with open(SERVICES_CONF_FILE, 'w') as f:
json.dump(services_conf, f, indent=4)
LOG.info(" - " + message + " with PID " + str(wstun.pid))
if output:
LOG.info(" - service updated:\n" + json.dumps(
services_conf['services'][service_uuid],
indent=4,
sort_keys=True
))
else:
LOG.info(" - services.json file updated!")
w_msg = WM.WampSuccess([wstun.pid, message])
async def ServiceEnable(self, service, public_port):
rpc_name = utils.getFuncName()
service_name = service['name']
service_uuid = service['uuid']
local_port = service['port']
LOG.info("RPC " + rpc_name + " CALLED for '" + service_name
+ "' (" + service_uuid + ") service:")
try:
wstun = self._startWstun(public_port, local_port)
if wstun != None:
service_pid = wstun.pid
LOG.debug(" - WSTUN stdout: " + str(wstun.stdout))
# Update services.json file
# Load services.json configuration file
services_conf = self._loadServicesConf()
# Save plugin settings in services.json
if service_uuid not in services_conf['services']:
# It is a new plugin
services_conf['services'][service_uuid] = {}
services_conf['services'][service_uuid]['name'] = \
service_name
services_conf['services'][service_uuid]['public_port'] = \
public_port
services_conf['services'][service_uuid]['local_port'] = \
local_port
services_conf['services'][service_uuid]['pid'] = \
service_pid
services_conf['services'][service_uuid]['enabled_at'] = \
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
services_conf['services'][service_uuid]['updated_at'] = ""
else:
# The service was already added and we are updating it
services_conf['services'][service_uuid]['updated_at'] = \
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
LOG.info(" - services.json file updated!")
# Apply the changes to services.json
self._updateServiceConf(services_conf, service_uuid,
output=True)
message = "Cloud service '" + str(service_name) \
+ "' exposed on port " \
+ str(public_port) + " on " + self.url_ip
LOG.info(" - " + message + " with PID " + str(service_pid))
w_msg = WM.WampSuccess(message)
else:
message = "Error spawning " + str(service_name) \
+ " service tunnel!"
LOG.error(" - " + message)
w_msg = WM.WampError(message)
except Exception as err:
message = "Error exposing " + str(name) + " service: " + str(err)
message = "Error exposing " + str(service_name) \
+ " service: " + str(err)
LOG.error(" - " + message)
w_msg = WM.WampError(message)
return w_msg.serialize()
async def ServiceDisable(self, name, pid):
async def ServiceDisable(self, service):
LOG.info("RPC " + utils.getFuncName() + " CALLED for "
+ name + " service:")
rpc_name = utils.getFuncName()
service_name = service['name']
service_uuid = service['uuid']
LOG.info("RPC " + rpc_name
+ " CALLED for '" + service_name
+ "' (" + service_uuid + ") service:")
# Remove from services.json file
try:
os.kill(pid, signal.SIGKILL)
# Load services.json configuration file
services_conf = self._loadServicesConf()
message = "Cloud service " + str(name) + " disabled."
if service_uuid in services_conf['services']:
LOG.info(" - " + message)
w_msg = WM.WampSuccess(message)
service_pid = services_conf['services'][service_uuid]['pid']
try:
os.kill(service_pid, signal.SIGKILL)
message = "Cloud service '" \
+ str(service_name) + "' tunnel disabled."
del services_conf['services'][service_uuid]
self._updateServiceConf(services_conf, service_uuid,
output=False)
LOG.info(" - " + message)
w_msg = WM.WampSuccess(message)
except Exception as err:
if err.errno == errno.ESRCH: # ESRCH == No such process
message = "Service '" + str(
service_name) + "' WSTUN process is not running!"
LOG.warning(" - " + message)
del services_conf['services'][service_uuid]
self._updateServiceConf(services_conf, service_uuid,
output=False)
w_msg = WM.WampWarning(message)
else:
message = "Error disabling '" + str(
service_name) + "' service tunnel: " + str(err)
LOG.error(" - " + message)
w_msg = WM.WampError(message)
else:
message = rpc_name + " result: " + service_uuid \
+ " already removed!"
LOG.error(" - " + message)
w_msg = WM.WampError(message)
except Exception as err:
message = "Error disabling " + str(name) + " service: " + str(err)
message = "Updating services.json error: " + str(err)
LOG.error(" - " + message)
w_msg = WM.WampError(message)
return w_msg.serialize()
async def ServiceRestore(self, name, public_port, local_port, pid):
async def ServiceRestore(self, service, public_port):
LOG.info("RPC " + utils.getFuncName() + " CALLED for "
+ name + " service:")
rpc_name = utils.getFuncName()
try:
service_name = service['name']
service_uuid = service['uuid']
LOG.info("RPC " + rpc_name
+ " CALLED for '" + service_name
+ "' (" + service_uuid + ") service:")
# Load services.json configuration file
services_conf = self._loadServicesConf()
if service_uuid in services_conf['services']:
local_port = \
services_conf['services'][service_uuid]['local_port']
service_pid = \
services_conf['services'][service_uuid]['pid']
# 1. Kill wstun process (if exists)
try:
os.kill(pid, signal.SIGKILL)
LOG.info(" - service " + name + " with PID " + str(pid)
+ " killed.")
except OSError:
LOG.warning(" - WSTUN process already killed: "
"creating new one...")
# 2. Create the reverse tunnel
url_ip = urlparse(self.board.wamp_config["url"])[1].split(':')[0]
opt_reverse = "-r" + str(public_port) + ":127.0.0.1:" + str(
local_port)
wagent_url = "ws://" + url_ip + ":8080"
wstun = subprocess.Popen(
['/usr/bin/wstun', opt_reverse, wagent_url],
stdout=subprocess.PIPE
)
# 1. Kill wstun process (if exists)
try:
os.kill(service_pid, signal.SIGKILL)
LOG.info(" - service '" + service_name
+ "' with PID " + str(service_pid)
+ " was killed.")
except OSError:
LOG.warning(" - WSTUN process already killed: "
"creating new one...")
message = "service " + str(name) + " restored on port " \
+ str(public_port) + " on " + url_ip
LOG.info(" - " + message + " with PID " + str(wstun.pid))
w_msg = WM.WampSuccess([wstun.pid, message])
# 2. Create the reverse tunnel
wstun = self._startWstun(public_port, local_port)
except Exception as err:
message = "Error restoring " + str(name) + " service: " + str(err)
LOG.error(" - " + message)
w_msg = WM.WampError(message)
if wstun != None:
service_pid = wstun.pid
# UPDATE services.json file
services_conf['services'][service_uuid]['pid'] = \
service_pid
services_conf['services'][service_uuid]['updated_at'] = \
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
self._updateServiceConf(services_conf, service_uuid,
output=True)
message = "service " + str(service_name) \
+ " restored on port " \
+ str(public_port) + " on " + self.url_ip
LOG.info(" - " + message + " with PID " + str(service_pid))
w_msg = WM.WampSuccess(message)
else:
message = "Error spawning " + str(service_name) \
+ " service tunnel!"
LOG.error(" - " + message)
w_msg = WM.WampError(message)
except Exception as err:
message = "Error restoring '" + str(service_name) \
+ "' service tunnel: " + str(err)
LOG.error(" - " + message)
w_msg = WM.WampError(message)
else:
local_port = service['port']
wstun = self._startWstun(public_port, local_port)
if wstun != None:
service_pid = wstun.pid
services_conf['services'][service_uuid] = {}
services_conf['services'][service_uuid]['name'] = \
service_name
services_conf['services'][service_uuid]['public_port'] = \
public_port
services_conf['services'][service_uuid]['local_port'] = \
local_port
services_conf['services'][service_uuid]['pid'] = \
service_pid
services_conf['services'][service_uuid]['enabled_at'] = \
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
services_conf['services'][service_uuid]['updated_at'] = ""
self._updateServiceConf(services_conf, service_uuid,
output=True)
message = "service " + str(service_name) \
+ " restored on port " \
+ str(public_port) + " on " + self.url_ip
LOG.info(" - " + message + " with PID " + str(service_pid))
w_msg = WM.WampSuccess(message)
else:
message = "Error spawning " + str(service_name) \
+ " service tunnel!"
LOG.error(" - " + message)
w_msg = WM.WampError(message)
return w_msg.serialize()

View File

@ -67,6 +67,9 @@ class Utility(Module.Module):
def finalize(self):
pass
def restore(self):
pass
async def hello(self, client_name, message):
import random
s = random.uniform(0.5, 3.0)

View File

@ -57,6 +57,9 @@ class VfsManager(Module.Module):
def finalize(self):
pass
def restore(self):
pass
def mountLocal(self, mountSource, mountPoint):
try:

5
services.example.json Normal file
View File

@ -0,0 +1,5 @@
{
"services": {
}
}