LR v0.4.9-dev19: refactoring utils library and module; req_id management for requests added in device-manager.

Change-Id: Ic1d5398556abef1db0f1d06367a515a777464442
This commit is contained in:
Nicola Peditto 2019-05-03 18:54:29 +02:00
parent d8cf8d666e
commit 8403990ea3
12 changed files with 322 additions and 230 deletions

View File

@ -10,7 +10,7 @@ Requirements
::
apt install python3 python3-setuptools python3-pip gdb lsof libssl-dev libffi-dev libffi-dev
apt install python3 python3-setuptools python3-pip gdb lsof libssl-dev libffi-dev
* NodeJS

View File

@ -20,7 +20,7 @@ import signal
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
from iotronic_lightningrod.common import utils
from iotronic_lightningrod.modules import utils as lr_utils
def manageTimeout(error_message, action):
@ -35,7 +35,7 @@ def manageTimeout(error_message, action):
LOG.warning("Iotronic RPC-ALIVE timeout details: " + str(details))
try:
utils.destroyWampSocket()
lr_utils.destroyWampSocket()
except Exception as e:
LOG.warning("Iotronic RPC-ALIVE timeout error: " + str(e))
@ -43,7 +43,7 @@ def manageTimeout(error_message, action):
else:
LOG.warning("Board connection call timeout ["
+ str(action) + "]: " + str(details))
utils.LR_restart()
lr_utils.LR_restart()
class NginxError(Exception):

View File

@ -15,38 +15,17 @@
__author__ = "Nicola Peditto <n.peditto@gmail.com>"
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
import inspect
import os
import pkg_resources
import psutil
import site
import subprocess
import sys
import threading
import time
def LR_restart():
try:
LOG.warning("Lightning-rod RESTARTING...")
python = sys.executable
os.execl(python, python, *sys.argv)
except Exception as err:
LOG.error("Lightning-rod restarting error" + str(err))
def LR_restart_delayed(seconds):
def delayLRrestarting():
time.sleep(seconds)
python = sys.executable
os.execl(python, python, *sys.argv)
threading.Thread(target=delayLRrestarting).start()
def getFuncName():
return inspect.stack()[1][3]
def checkIotronicConf(lr_CONF):
@ -63,102 +42,12 @@ def checkIotronicConf(lr_CONF):
return False
def destroyWampSocket():
LR_PID = os.getpid()
try:
process = subprocess.Popen(
["gdb", "-p", str(LR_PID)],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
proc = psutil.Process()
conn_list = proc.connections()
proc_msg = "WAMP RECOVERY: " + str(conn_list)
print(proc_msg)
LOG.info(proc_msg)
wamp_conn_set = False
for socks in conn_list:
# print(socks.raddr, socks.fd)
if socks.raddr != ():
# print(socks.raddr.port, socks.fd)
if socks.raddr.port == 8181:
socks_msg = "FD selected: " + str(socks.fd) \
+ " [port " + str(socks.raddr.port) + "]"
print(socks_msg)
LOG.info(socks_msg)
ws_fd = socks.fd
first = b"call ((void(*)()) shutdown)("
fd = str(ws_fd).encode('ascii')
last = b"u,0)\nquit\ny"
commands = b"%s%s%s" % (first, fd, last)
process.communicate(input=commands)[0]
msg = "Websocket-Zombie closed! Restoring..."
LOG.warning(msg)
print(msg)
# WAMP connection found!
wamp_conn_set = True
# LOG.info("WAMP CONNECTION FOUND")
if wamp_conn_set == False:
LOG.warning("WAMP CONNECTION NOT FOUND: LR restarting...")
# In conn_list there is not the WAMP connection!
LR_restart()
except Exception as e:
LOG.warning("RPC-ALIVE - destroyWampSocket error: " + str(e))
LR_restart()
def get_version(package):
package = package.lower()
return next((p.version for p in pkg_resources.working_set if
p.project_name.lower() == package), "No version")
def get_socket_info(wport):
sock_bundle = "N/A"
try:
for socks in psutil.Process().connections():
if len(socks.raddr) != 0:
if (socks.raddr.port == wport):
lr_net_iface = socks
print("WAMP SOCKET: " + str(lr_net_iface))
dct = psutil.net_if_addrs()
for key in dct.keys():
if isinstance(dct[key], dict) == False:
iface = key
for elem in dct[key]:
ip_addr = elem.address
if ip_addr == str(lr_net_iface.laddr.ip):
for snicaddr in dct[iface]:
if snicaddr.family == 17:
lr_mac = snicaddr.address
sock_bundle = [iface, ip_addr,
lr_mac]
return sock_bundle
return sock_bundle
except Exception as e:
LOG.warning("Error getting socket info " + str(e))
sock_bundle = "N/A"
return sock_bundle
return sock_bundle
def backupConf():
try:
os.system(

View File

@ -27,25 +27,21 @@ import asyncio
import inspect
import os
import pkg_resources
import psutil
import signal
import ssl
from stevedore import extension
import sys
import time
import txaio
from pip._vendor import pkg_resources
from stevedore import extension
# IoTronic imports
from iotronic_lightningrod.Board import Board
from iotronic_lightningrod.Board import FIRST_BOOT
from iotronic_lightningrod.common.exception import timeoutALIVE
from iotronic_lightningrod.common.exception import timeoutRPC
from iotronic_lightningrod.common import utils
from iotronic_lightningrod.common.utils import get_socket_info
from iotronic_lightningrod.common.utils import get_version
from iotronic_lightningrod.modules import utils as lr_utils
import iotronic_lightningrod.wampmessage as WM
@ -112,7 +108,7 @@ class LightningRod(object):
LogoLR()
LOG.info(' - version: ' +
str(get_version("iotronic-lightningrod")))
str(utils.get_version("iotronic-lightningrod")))
LOG.info(' - PID: ' + str(os.getpid()))
LOG.info("LR available modules: ")
@ -137,7 +133,7 @@ class LightningRod(object):
LOG.info('Lightning-rod: ')
LOG.info(' - version: ' +
str(get_version("iotronic-lightningrod")))
str(utils.get_version("iotronic-lightningrod")))
LOG.info(' - PID: ' + str(os.getpid()))
LOG.info(' - Logs: ' + CONF.log_file)
LOG.info(" - Home: " + CONF.lightningrod_home)
@ -238,6 +234,38 @@ def iotronic_status(board_status):
return alive
def wampNotify(session, board, w_msg):
rpc = str(board.agent) + u'.stack4things.notify_result'
async def wampCall(session, board, wm, rpc, action):
w_msg = None
try:
with timeoutRPC(seconds=10, action=action):
res = await session.call(
rpc,
board_uuid=board.uuid,
wampmessage=wm
)
w_msg = WM.deserialize(res)
except exception.ApplicationError as e:
LOG.error(" - wampCall RPC error: " + str(e))
return w_msg
res = asyncio.run_coroutine_threadsafe(
wampCall(session, board, w_msg, rpc, "notify_result"),
loop
).result()
return res
async def wamp_singleCheck(session):
try:
@ -282,7 +310,7 @@ async def wamp_checks(session):
# The board will disconnect from WAMP agent and retry later.
global reconnection
reconnection = True
utils.destroyWampSocket()
lr_utils.destroyWampSocket()
try:
await asyncio.sleep(CONF.alive_timer)
@ -317,7 +345,9 @@ async def IotronicLogin(board, session, details):
uuid=board.uuid,
session=details.session,
info={
"lr_version": str(get_version("iotronic-lightningrod")),
"lr_version": str(
utils.get_version("iotronic-lightningrod")
),
"connectivity": lr_cty
}
@ -339,7 +369,7 @@ async def IotronicLogin(board, session, details):
except Exception as e:
LOG.warning("WARNING - Could not load modules: " + str(e))
utils.LR_restart()
lr_utils.LR_restart()
# Reset flag to False
# reconnection = False
@ -356,7 +386,7 @@ async def IotronicLogin(board, session, details):
# We restart Lightning-rod if RPC 'stack4things.connection' is not
# available, this means Wagent is unreachable
utils.LR_restart()
lr_utils.LR_restart()
except Exception as e:
LOG.warning("Iotronic board connection error: " + str(e))
@ -436,7 +466,7 @@ def wampConnect(wamp_conf):
global wport
global lr_cty
sock_bundle = get_socket_info(wport)
sock_bundle = lr_utils.get_socket_info(wport)
if sock_bundle == "N/A":
lr_cty = {}
@ -540,7 +570,7 @@ def wampConnect(wamp_conf):
# We restart Lightning-rod if RPC
# 'stack4things.connection' is not available,
# this means Wagent is unreachable
utils.LR_restart()
lr_utils.LR_restart()
else:
LOG.error("Registration denied by Iotronic - " +
@ -561,7 +591,7 @@ def wampConnect(wamp_conf):
# We restart Lightning-rod if RPC
# 'stack4things.connection' is not available,
# this means Wagent is unreachable
utils.LR_restart()
lr_utils.LR_restart()
except Exception as e:
LOG.warning(
@ -625,7 +655,8 @@ def wampConnect(wamp_conf):
session=details.session,
info={
"lr_version": str(
get_version("iotronic-lightningrod")),
utils.get_version("iotronic-lightningrod")
),
"connectivity": lr_cty
}
@ -679,7 +710,7 @@ def wampConnect(wamp_conf):
# We restart Lightning-rod if RPC 'stack4things.connection'
# is not available, this means Wagent is unreachable
utils.LR_restart()
lr_utils.LR_restart()
except Exception as e:
LOG.warning("Board connection error after WAMP recovery: "
@ -972,7 +1003,7 @@ def moduleReloadInfo(session):
except Exception as err:
LOG.warning("Board modules reloading error: " + str(err))
utils.LR_restart()
lr_utils.LR_restart()
def Bye():

View File

@ -19,16 +19,18 @@ import importlib as imp
import inspect
import os
import subprocess
import sys
import threading
import time
from autobahn.wamp import exception
from datetime import datetime
from iotronic_lightningrod.common import utils
from iotronic_lightningrod.config import package_path
from iotronic_lightningrod.lightningrod import RPC_devices
from iotronic_lightningrod.lightningrod import wampNotify
from iotronic_lightningrod.modules import Module
from iotronic_lightningrod.modules import utils
from iotronic_lightningrod.modules import utils as lr_utils
import iotronic_lightningrod.wampmessage as WM
@ -97,9 +99,10 @@ class DeviceManager(Module.Module):
LOG.info(" --> " + str(meth[0]) + " registered!")
async def DevicePing(self, parameters=None):
async def DevicePing(self, req_id, parameters=None):
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED")
LOG.info("RPC " + rpc_name + " CALLED [req_id: " + str(req_id) + "]")
LOG.info("--> Parameters: " + str(parameters))
command = "hostname"
@ -122,9 +125,10 @@ class DeviceManager(Module.Module):
return w_msg.serialize()
async def DeviceReboot(self, parameters=None):
async def DeviceReboot(self, req_id, parameters=None):
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED")
LOG.info("RPC " + rpc_name + " CALLED [req_id: " + str(req_id) + "]")
LOG.info("--> Parameters: " + str(parameters))
delay = 3 # default delay
@ -157,9 +161,10 @@ class DeviceManager(Module.Module):
return w_msg.serialize()
async def DeviceRestartLR(self, parameters=None):
async def DeviceRestartLR(self, req_id, parameters=None):
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED")
LOG.info("RPC " + rpc_name + " CALLED [req_id: " + str(req_id) + "]")
LOG.info("--> Parameters: " + str(parameters))
delay = 3 # default delay
@ -174,12 +179,8 @@ class DeviceManager(Module.Module):
LOG.info("--> delay: " + str(delay))
def delayLRrestarting():
time.sleep(delay)
python = sys.executable
os.execl(python, python, *sys.argv)
threading.Thread(target=delayLRrestarting).start()
# LR restarting
lr_utils.LR_restart_delayed(delay)
message = "Restarting LR in " + str(delay) \
+ " seconds (" \
@ -189,9 +190,9 @@ class DeviceManager(Module.Module):
return w_msg.serialize()
async def DeviceUpgradeLR(self, parameters=None):
async def DeviceUpgradeLR(self, req_id, parameters=None):
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED")
LOG.info("RPC " + rpc_name + " CALLED [req_id: " + str(req_id) + "]")
LOG.info("--> Parameters: " + str(parameters))
try:
@ -199,17 +200,18 @@ class DeviceManager(Module.Module):
version = parameters['version']
except Exception as err:
LOG.info("--> version not specified: set 'latest'" + str(err))
LOG.info("--> version not specified: set 'latest'")
version = None # latest
if (version != None) or (version != "latest"):
if (version != None) and (version != "latest"):
command = "pip3 install iotronic-lightningrod==" + str(version)
else:
command = "pip3 install --upgrade iotronic-lightningrod"
def LRupgrading():
def upgradingLR():
out = subprocess.Popen(
command,
shell=True,
@ -217,11 +219,37 @@ class DeviceManager(Module.Module):
)
output = out.communicate()[0].decode('utf-8').strip()
LOG.info(str(output))
LOG.info("\n" + str(output))
try:
w_msg = WM.WampSuccess(
msg="LR upgraded", req_id=req_id
).serialize()
except Exception as e:
LOG.error(" - Wamp Message error in '"
+ rpc_name + "': " + str(e))
try:
notify = wampNotify(self.device_session, self.board, w_msg)
LOG.info(
" - Notify result '" + rpc_name + "': "
+ str(notify.result) + " - " + str(notify.message)
)
except exception.ApplicationError as e:
LOG.error(" - Notify result '"
+ rpc_name + "' error: " + str(e))
# Restart LR to start new version
lr_utils.LR_restart_delayed(2)
try:
threading.Thread(target=LRupgrading).start()
threading.Thread(target=upgradingLR).start()
except Exception as err:
LOG.error("Error in parameters: " + str(err))
@ -230,9 +258,9 @@ class DeviceManager(Module.Module):
return w_msg.serialize()
async def DevicePackageAction(self, parameters=None):
async def DevicePackageAction(self, req_id, parameters=None):
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED")
LOG.info("RPC " + rpc_name + " CALLED [req_id: " + str(req_id) + "]")
LOG.info("--> Parameters: " + str(parameters))
try:
@ -278,6 +306,29 @@ class DeviceManager(Module.Module):
output = out.communicate()[0].decode('utf-8').strip()
LOG.info(str(output))
try:
w_msg = WM.WampSuccess(
msg="Package Action completed", req_id=req_id
).serialize()
except Exception as e:
LOG.error(" - Wamp Message error in '"
+ rpc_name + "': " + str(e))
try:
notify = wampNotify(self.device_session, self.board, w_msg)
LOG.info(
" - Notify result '" + rpc_name + "': "
+ str(notify.result) + " - " + str(notify.message)
)
except exception.ApplicationError as e:
LOG.error(" - Notify result '"
+ rpc_name + "' error: " + str(e))
try:
threading.Thread(target=actionOnPackage).start()
@ -289,9 +340,9 @@ class DeviceManager(Module.Module):
return w_msg.serialize()
async def DeviceEcho(self, parameters=None):
async def DeviceEcho(self, req_id, parameters=None):
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED")
LOG.info("RPC " + rpc_name + " CALLED [req_id: " + str(req_id) + "]")
LOG.info("--> Parameters: " + str(parameters))
try:
@ -309,9 +360,10 @@ class DeviceManager(Module.Module):
return w_msg.serialize()
async def DeviceNetConfig(self, parameters=None):
async def DeviceNetConfig(self, req_id, parameters=None):
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED")
LOG.info("RPC " + rpc_name + " CALLED [req_id: " + str(req_id) + "]")
LOG.info("--> Parameters: " + str(parameters))
message = getIfconfig()
w_msg = WM.WampSuccess(message)

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import asyncio
from iotronic_lightningrod.common import utils
from iotronic_lightningrod.modules import Module
import iotronic_lightningrod.wampmessage as WM
from oslo_log import log as logging
@ -42,7 +42,7 @@ class NetworkManager(Module.Module):
def restore(self):
pass
async def Create_VIF(self, r_tcp_port):
async def Create_VIF(self, req_id, r_tcp_port):
LOG.info("Creation of the VIF ")
@ -85,7 +85,7 @@ class NetworkManager(Module.Module):
return w_msg.serialize()
async def Configure_VIF(self, port, cidr):
async def Configure_VIF(self, req_id, port, cidr):
LOG.info("Configuration of the VIF")
@ -119,7 +119,7 @@ class NetworkManager(Module.Module):
return w_msg.serialize()
async def Remove_VIF(self, VIF_name):
async def Remove_VIF(self, req_id, VIF_name):
LOG.info("Removing a VIF from the board")

View File

@ -25,9 +25,9 @@ import shutil
import time
from iotronic_lightningrod.common import utils
from iotronic_lightningrod.modules import Module
from iotronic_lightningrod.modules.plugins import PluginSerializer
from iotronic_lightningrod.modules import utils
import iotronic_lightningrod.wampmessage as WM

View File

@ -18,12 +18,12 @@ __author__ = "Nicola Peditto <n.peditto@gmail.com>"
from iotronic_lightningrod.common.pam import pamAuthentication
from iotronic_lightningrod.common import utils
from iotronic_lightningrod.common.utils import get_version
from iotronic_lightningrod.lightningrod import board
from iotronic_lightningrod.lightningrod import iotronic_status
from iotronic_lightningrod.modules import device_manager
from iotronic_lightningrod.modules import Module
from iotronic_lightningrod.modules import service_manager
from iotronic_lightningrod.modules import utils as lr_utils
from datetime import datetime
@ -152,7 +152,9 @@ class RestManager(Module.Module):
'board_reg_status': str(board.status),
'iotronic_status': str(iotronic_status(board.status)),
'service_list': str(service_list),
'lr_version': str(get_version("iotronic-lightningrod"))
'lr_version': str(
utils.get_version("iotronic-lightningrod")
)
}
return render_template('status.html', **info)
@ -294,7 +296,7 @@ class RestManager(Module.Module):
# restart LR
print("--> LR restarting in 5 seconds...")
f_session['status'] = "restarting"
utils.LR_restart_delayed(5)
lr_utils.LR_restart_delayed(5)
return redirect("/", code=302)
@ -368,7 +370,7 @@ class RestManager(Module.Module):
# restart LR
print("--> LR restarting in 5 seconds...")
f_session['status'] = "restarting"
utils.LR_restart_delayed(5)
lr_utils.LR_restart_delayed(5)
return redirect("/", code=302)
else:
@ -404,7 +406,7 @@ class RestManager(Module.Module):
print("Refactored")
print("--> LR restarting in 5 seconds...")
f_session['status'] = "restarting"
utils.LR_restart_delayed(5)
lr_utils.LR_restart_delayed(5)
return redirect("/", code=302)
elif request.form.get('change_hostname'):
@ -473,7 +475,7 @@ class RestManager(Module.Module):
print(" - LR restarting "
+ "in 5 seconds...")
f_session['status'] = "restarting"
utils.LR_restart_delayed(5)
lr_utils.LR_restart_delayed(5)
return redirect("/", code=302)
@ -566,13 +568,13 @@ class RestManager(Module.Module):
print("Refactored")
print("--> LR restarting in 5 seconds...")
f_session['status'] = "restarting"
utils.LR_restart_delayed(5)
lr_utils.LR_restart_delayed(5)
return redirect("/", code=302)
elif request.args.get('lr_restart_btn'):
print("LR restarting in 5 seconds...")
f_session['status'] = "restarting"
utils.LR_restart_delayed(5)
lr_utils.LR_restart_delayed(5)
return redirect("/", code=302)
else:

View File

@ -20,7 +20,6 @@ import json
import os
import psutil
import pyinotify
import queue
import signal
import socket
import subprocess
@ -29,17 +28,18 @@ import time
import threading
from datetime import datetime
from random import randint
from threading import Thread
from urllib.parse import urlparse
from iotronic_lightningrod.common import utils
from iotronic_lightningrod.config import package_path
from iotronic_lightningrod.modules import Module
from iotronic_lightningrod.modules import utils
from iotronic_lightningrod import lightningrod
import iotronic_lightningrod.wampmessage as WM
from iotronic_lightningrod import lightningrod
from random import randint
from oslo_config import cfg
from oslo_log import log as logging
@ -223,8 +223,9 @@ class ServiceManager(Module.Module):
os.kill(service_pid, signal.SIGINT)
LOG.debug(
" - [finalize] WSTUN process " +
"[" + str(wstun.pid) + "] killed")
" - [finalize] WSTUN process ["
+ str(service_pid) + "] killed"
)
print("OLD WSTUN KILLED: " + str(wp))
try:
@ -518,7 +519,7 @@ class ServiceManager(Module.Module):
except Exception:
pass
break
# break
if not wstun_found:
message = "Tunnel killed by LR"
@ -786,6 +787,7 @@ class ServiceManager(Module.Module):
try:
if event != "enable":
WS_MON_LIST[str(local_port)].stop()
except Exception as err:
LOG.error("Error stopping WSTUN monitor: " + str(err))

View File

@ -16,54 +16,32 @@
__author__ = "Nicola Peditto <n.peditto@gmail.com>"
import asyncio
import inspect
import pkg_resources
from six import moves
from stevedore import extension
import os
import psutil
import subprocess
import sys
import threading
import time
from iotronic_lightningrod.config import entry_points_name
from iotronic_lightningrod.lightningrod import SESSION
from iotronic_lightningrod.modules import Module
from iotronic_lightningrod.modules import utils as lr_utils
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
def getFuncName():
return inspect.stack()[1][3]
def refresh_stevedore(namespace=None):
"""Trigger reload of entry points.
Useful to have dynamic loading/unloading of stevedore modules.
"""
# NOTE(sheeprine): pkg_resources doesn't support reload on python3 due to
# defining basestring which is still there on reload hence executing
# python2 related code.
try:
del sys.modules['pkg_resources'].basestring
except AttributeError:
# python2, do nothing
pass
# Force working_set reload
moves.reload_module(sys.modules['pkg_resources'])
# Clear stevedore cache
cache = extension.ExtensionManager.ENTRY_POINT_CACHE
if namespace:
if namespace in cache:
del cache[namespace]
else:
cache.clear()
class Utility(Module.Module):
def __init__(self, board, session):
super(Utility, self).__init__("Utility", board)
self.session = session
def finalize(self):
pass
@ -102,7 +80,7 @@ class Utility(Module.Module):
await named_objects
SESSION.disconnect()
self.session.disconnect()
return str(named_objects)
@ -125,3 +103,145 @@ class Utility(Module.Module):
LOG.info("DESTROY RESULT: " + str(result))
return result
def refresh_stevedore(namespace=None):
"""Trigger reload of entry points.
Useful to have dynamic loading/unloading of stevedore modules.
"""
# NOTE(sheeprine): pkg_resources doesn't support reload on python3 due to
# defining basestring which is still there on reload hence executing
# python2 related code.
try:
del sys.modules['pkg_resources'].basestring
except AttributeError:
# python2, do nothing
pass
# Force working_set reload
moves.reload_module(sys.modules['pkg_resources'])
# Clear stevedore cache
cache = extension.ExtensionManager.ENTRY_POINT_CACHE
if namespace:
if namespace in cache:
del cache[namespace]
else:
cache.clear()
def LR_restart_delayed(seconds):
try:
if seconds < 3:
seconds = 3
LOG.warning("Lightning-rod restarting in "
+ str(seconds) + " seconds...")
def delayLRrestarting():
time.sleep(seconds)
python = sys.executable
os.execl(python, python, *sys.argv)
threading.Thread(target=delayLRrestarting).start()
except Exception as err:
LOG.error("Lightning-rod restarting error: " + str(err))
def LR_restart():
try:
LOG.warning("Lightning-rod restarting in few seconds...")
python = sys.executable
os.execl(python, python, *sys.argv)
except Exception as err:
LOG.error("Lightning-rod restarting error: " + str(err))
def destroyWampSocket():
LR_PID = os.getpid()
try:
process = subprocess.Popen(
["gdb", "-p", str(LR_PID)],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
proc = psutil.Process()
conn_list = proc.connections()
proc_msg = "WAMP RECOVERY: " + str(conn_list)
print(proc_msg)
LOG.info(proc_msg)
wamp_conn_set = False
for socks in conn_list:
# print(socks.raddr, socks.fd)
if socks.raddr != ():
# print(socks.raddr.port, socks.fd)
if socks.raddr.port == 8181:
socks_msg = "FD selected: " + str(socks.fd) \
+ " [port " + str(socks.raddr.port) + "]"
print(socks_msg)
LOG.info(socks_msg)
ws_fd = socks.fd
first = b"call ((void(*)()) shutdown)("
fd = str(ws_fd).encode('ascii')
last = b"u,0)\nquit\ny"
commands = b"%s%s%s" % (first, fd, last)
process.communicate(input=commands)[0]
msg = "Websocket-Zombie closed! Restoring..."
LOG.warning(msg)
print(msg)
# WAMP connection found!
wamp_conn_set = True
# LOG.info("WAMP CONNECTION FOUND")
if wamp_conn_set == False:
LOG.warning("WAMP CONNECTION NOT FOUND: LR restarting...")
# In conn_list there is not the WAMP connection!
lr_utils.LR_restart()
except Exception as e:
LOG.warning("RPC-ALIVE - destroyWampSocket error: " + str(e))
lr_utils.LR_restart()
def get_socket_info(wport):
sock_bundle = "N/A"
try:
for socks in psutil.Process().connections():
if len(socks.raddr) != 0:
if (socks.raddr.port == wport):
lr_net_iface = socks
print("WAMP SOCKET: " + str(lr_net_iface))
dct = psutil.net_if_addrs()
for key in dct.keys():
if isinstance(dct[key], dict) == False:
iface = key
for elem in dct[key]:
ip_addr = elem.address
if ip_addr == str(lr_net_iface.laddr.ip):
for snicaddr in dct[iface]:
if snicaddr.family == 17:
lr_mac = snicaddr.address
sock_bundle = [iface, ip_addr,
lr_mac]
return sock_bundle
return sock_bundle
except Exception as e:
LOG.warning("Error getting socket info " + str(e))
sock_bundle = "N/A"
return sock_bundle
return sock_bundle

View File

@ -15,10 +15,10 @@
__author__ = "Nicola Peditto <n.peditto@gmail.com>"
from iotronic_lightningrod.common import utils
from iotronic_lightningrod.config import package_path
from iotronic_lightningrod.lightningrod import RPC_proxies
from iotronic_lightningrod.modules import Module
from iotronic_lightningrod.modules import utils
import iotronic_lightningrod.wampmessage as WM

View File

@ -29,34 +29,30 @@ def deserialize(received):
class WampMessage(object):
def __init__(self, message=None, result=None):
def __init__(self, message, result, req_id):
self.message = message
self.result = result
self.req_id = req_id
def serialize(self):
return json.dumps(self, default=lambda o: o.__dict__)
"""
def deserialize(self, received):
self.__dict__ = json.loads(received)
return self
"""
class WampSuccess(WampMessage):
def __init__(self, msg=None):
super(WampSuccess, self).__init__(msg, SUCCESS)
def __init__(self, msg=None, req_id=None):
super(WampSuccess, self).__init__(msg, SUCCESS, req_id)
class WampError(WampMessage):
def __init__(self, msg=None):
super(WampError, self).__init__(msg, ERROR)
def __init__(self, msg=None, req_id=None):
super(WampError, self).__init__(msg, ERROR, req_id)
class WampWarning(WampMessage):
def __init__(self, msg=None):
super(WampWarning, self).__init__(msg, WARNING)
def __init__(self, msg=None, req_id=None):
super(WampWarning, self).__init__(msg, WARNING, req_id)
class WampRunning(WampMessage):
def __init__(self, msg=None):
super(WampRunning, self).__init__(msg, RUNNING)
def __init__(self, msg=None, req_id=None):
super(WampRunning, self).__init__(msg, RUNNING, req_id)