Release v0.4.2:

- WAMP alive RPC refactored
 - WSTUN tunnels monitoring: added recovery procedure
 - NGINX proxy updated
 - Iotronic oslo conf refactored
 - Pyinotify requirement added (fixed)

Change-Id: I877d6b8e8e401efaca315a4e8917d73ebbeb593c
This commit is contained in:
Nicola Peditto 2018-12-05 18:25:38 +01:00
parent 9bf78aa2da
commit 84678996b0
9 changed files with 242 additions and 105 deletions

View File

@ -2,7 +2,13 @@
lightningrod_home = /var/lib/iotronic
skip_cert_verify = True
debug = True
proxy = nginx
log_file = /var/log/iotronic/lightning-rod.log
connection_timer = 10
alive_timer = 600
rpc_alive_timer = 3
rpc_alive_timer = 3
[services]
wstun_bin = /usr/bin/wstun
[webservices]
proxy = nginx

View File

@ -41,26 +41,10 @@ def manageTimeout(error_message, action):
LOG.warning("Iotronic RPC-ALIVE timeout error: " + str(e))
else:
LOG.warning("Board connection call timeout: " + str(details))
LOG.warning("Board connection call timeout ["
+ str(action) + "]: " + str(details))
utils.LR_restart()
"""
def manageTimeoutALIVE(error_message, action):
try:
raise TimeoutError(error_message, action)
except TimeoutError as err:
details = err.args[0]
LOG.warning("Iotronic RPC-ALIVE timeout details: " + str(details))
try:
utils.destroyWampSocket()
except Exception as e:
LOG.warning("Iotronic RPC-ALIVE timeout error: " + str(e))
"""
class NginxError(Exception):

View File

@ -57,6 +57,9 @@ lr_opts = [
default=True,
help=('Flag for skipping the verification of the server cert '
'(for the auto-signed ones)')),
cfg.IntOpt('connection_timer',
default=10,
help=('IoTronic connection RPC timer')),
cfg.IntOpt('alive_timer',
default=600,
help=('Wamp websocket check time')),
@ -65,18 +68,8 @@ lr_opts = [
help=('RPC alive response time threshold')),
]
proxy_opts = [
cfg.StrOpt(
'proxy',
choices=[('nginx', ('nginx proxy')), ],
help=('Proxy for WebServices Manager')
),
]
CONF = cfg.CONF
CONF.register_opts(lr_opts)
CONF.register_opts(proxy_opts)
SESSION = None
global board
@ -194,9 +187,9 @@ async def wamp_checks(session):
with timeoutALIVE(seconds=CONF.rpc_alive_timer, action="ws_alive"):
res = await session.call(
str(board.agent) + u'.stack4things.alive'
# board_uuid=board.uuid,
# board_name=board.name
str(board.agent) + u'.stack4things.wamp_alive',
board_uuid=board.uuid,
board_name=board.name
)
LOG.debug("WampCheck attempt " + str(res))
@ -237,7 +230,7 @@ async def IotronicLogin(board, session, details):
rpc = str(board.agent) + u'.stack4things.connection'
with timeoutRPC(seconds=5, action=rpc):
with timeoutRPC(seconds=CONF.connection_timer, action=rpc):
res = await session.call(
rpc,
uuid=board.uuid,
@ -515,7 +508,7 @@ def wampConnect(wamp_conf):
rpc = str(board.agent) + u'.stack4things.connection'
with timeoutRPC(seconds=5, action=rpc):
with timeoutRPC(seconds=CONF.connection_timer, action=rpc):
res = await session.call(
rpc,
uuid=board.uuid,

View File

@ -51,22 +51,54 @@ class ProxyManager(Proxy.Proxy):
nginxMsg = {}
try:
stat = subprocess.Popen('systemctl status nginx.service',
shell=True, stdout=subprocess.PIPE)
stdout_list = str(stat.communicate()[0]).split('\n')
for line in stdout_list:
if 'Active:' in line:
nginxMsg['log'] = line.split('\\n')[2].replace(" ", "")
get_service = 'pidof systemd > /dev/null ' \
'&& echo "systemd" || echo "init.d"'
service_cmd = subprocess.Popen(get_service,
shell=True, stdout=subprocess.PIPE)
if '(running)' in line:
nginxMsg['status'] = True
else:
nginxMsg['status'] = False
service_mng = \
service_cmd.communicate()[0].decode("utf-8").split("\n")[0]
nginxMsg = json.dumps(nginxMsg)
if service_mng == 'init.d':
# print('INIT')
stat = subprocess.Popen('service nginx status',
shell=True, stdout=subprocess.PIPE)
stdout_list = stat.communicate()[0].decode("utf-8").split("\n")
return nginxMsg
for line in stdout_list:
if 'running' in line:
nginxMsg['log'] = stdout_list[0]
if 'running' in line:
nginxMsg['status'] = True
else:
nginxMsg['status'] = False
nginxMsg = json.dumps(nginxMsg)
return nginxMsg
elif service_mng == 'systemd':
# print('SYSTEMD')
stat = subprocess.Popen('systemctl status nginx.service',
shell=True, stdout=subprocess.PIPE)
stdout_list = str(stat.communicate()[0]).split('\n')
for line in stdout_list:
if 'Active:' in line:
nginxMsg['log'] = \
line.split('\\n')[2].replace(" ", "")
if '(running)' in line:
nginxMsg['status'] = True
else:
nginxMsg['status'] = False
nginxMsg = json.dumps(nginxMsg)
return nginxMsg
except Exception as err:
LOG.error("Error check NGINX status: " + str(err))
@ -109,6 +141,8 @@ class ProxyManager(Proxy.Proxy):
nginxMsg = {}
stat = None
try:
stat = subprocess.call('service nginx reload', shell=True)

View File

@ -19,10 +19,10 @@ import errno
import json
import os
import psutil
import pyinotify
import signal
import subprocess
import time
import traceback
from datetime import datetime
from threading import Thread
@ -40,7 +40,24 @@ from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
wstun_opts = [
cfg.StrOpt(
'wstun_bin',
default='/usr/bin/wstun',
help=('WSTUN bin for Services Manager')
),
]
CONF = cfg.CONF
service_group = cfg.OptGroup(
name='services', title='Services options'
)
CONF.register_group(service_group)
CONF.register_opts(wstun_opts, group=service_group)
SERVICES_CONF_FILE = CONF.lightningrod_home + "/services.json"
@ -49,6 +66,8 @@ class ServiceManager(Module.Module):
def __init__(self, board, session):
super(ServiceManager, self).__init__("ServiceManager", board)
print("\nWSTUN bin path: " + str(CONF.services.wstun_bin))
self.wstun_ip = urlparse(board.wamp_config["url"])[1].split(':')[0]
self.wstun_port = "8080"
@ -126,6 +145,7 @@ class ServiceManager(Module.Module):
wstun = self._startWstun(public_port, local_port, event="boot")
if wstun != None:
service_pid = wstun.pid
# 3. Update services.json file
@ -155,6 +175,52 @@ class ServiceManager(Module.Module):
signal.signal(signal.SIGCHLD, self._zombie_hunter)
def restore(self):
LOG.info("Cloud service tunnels to restore:")
# Load services.json configuration file
services_conf = self._loadServicesConf()
if len(services_conf['services']) != 0:
wstun_process_list = []
# No zombie alert activation
lightningrod.zombie_alert = False
LOG.debug("[WSTUN-RESTORE] - Restore zombie_alert: " + str(
lightningrod.zombie_alert))
# Collect all alive WSTUN proccesses
for p in psutil.process_iter():
if (p.name() == "node"):
if (p.status() == psutil.STATUS_ZOMBIE):
LOG.warning("WSTUN ZOMBIE: " + str(p))
wstun_process_list.append(p)
elif ("wstun" in p.cmdline()[1]):
LOG.warning("WSTUN ALIVE: " + str(p))
wstun_process_list.append(p)
psutil.Process(p.pid).kill()
LOG.warning(" --> PID " + str(p.pid) + " killed!")
LOG.debug("[WSTUN-RESTORE] - WSTUN processes to restore:\n"
+ str(wstun_process_list))
for service_uuid in services_conf['services']:
Thread(
target=self._restoreWSTUN,
args=(services_conf, service_uuid,)
).start()
time.sleep(2)
# Reactivate zombies monitoring
if not lightningrod.zombie_alert:
lightningrod.zombie_alert = True
else:
LOG.info(" --> No service tunnels to restore.")
def _zombie_hunter(self, signum, frame):
wstun_found = False
@ -282,52 +348,6 @@ class ServiceManager(Module.Module):
+ " service tunnel!"
LOG.error(" - " + message)
def restore(self):
LOG.info("Cloud service tunnels to restore:")
# Load services.json configuration file
services_conf = self._loadServicesConf()
if len(services_conf['services']) != 0:
wstun_process_list = []
# No zombie alert activation
lightningrod.zombie_alert = False
LOG.debug("[WSTUN-RESTORE] - Restore zombie_alert: " + str(
lightningrod.zombie_alert))
# Collect all alive WSTUN proccesses
for p in psutil.process_iter():
if (p.name() == "node"):
if (p.status() == psutil.STATUS_ZOMBIE):
LOG.warning("WSTUN ZOMBIE: " + str(p))
wstun_process_list.append(p)
elif ("wstun" in p.cmdline()[1]):
LOG.warning("WSTUN ALIVE: " + str(p))
wstun_process_list.append(p)
psutil.Process(p.pid).kill()
LOG.warning(" --> PID " + str(p.pid) + " killed!")
LOG.debug("[WSTUN-RESTORE] - WSTUN processes to restore:\n"
+ str(wstun_process_list))
for service_uuid in services_conf['services']:
Thread(
target=self._restoreWSTUN,
args=(services_conf, service_uuid,)
).start()
time.sleep(2)
# Reactivate zombies monitoring
if not lightningrod.zombie_alert:
lightningrod.zombie_alert = True
else:
LOG.info(" --> No service tunnels to restore.")
def _loadServicesConf(self):
"""Load services.json JSON configuration.
@ -347,24 +367,106 @@ class ServiceManager(Module.Module):
return services_conf
def _wstunMon(self, wstun):
wfd_check = True
while (wfd_check):
try:
wp = psutil.Process(int(wstun.pid))
wstun_fd = wp.connections()[0].fd
if len(wp.connections()) != 0:
LOG.debug("WSTUN alive socket: " + str(wp.connections()))
wfd_check = False
except IndexError as err:
# LOG.error(str(err) + " - RETRY...")
pass
time.sleep(1)
class EventProcessor(pyinotify.ProcessEvent):
_methods = [
# "IN_CREATE",
# "IN_OPEN",
# "IN_ACCESS",
# "IN_ATTRIB",
"IN_CLOSE_NOWRITE",
"IN_CLOSE_WRITE",
"IN_DELETE",
"IN_DELETE_SELF",
# "IN_IGNORED",
# "IN_MODIFY",
# "IN_MOVE_SELF",
# "IN_MOVED_FROM",
# "IN_MOVED_TO",
# "IN_Q_OVERFLOW",
# "IN_UNMOUNT",
"default"
]
def process_generator(cls, method):
def _method_name(self, event):
if(event.maskname == "IN_CLOSE_WRITE"):
LOG.info("WSTUN FD SOCKET CLOSED: " + str(event.pathname))
LOG.debug(
"\nMethod name: process_{}()\n"
"Path name: {}\n"
"Event Name: {}\n".format(
method, event.pathname, event.maskname
)
)
os.kill(wstun.pid, signal.SIGKILL)
_method_name.__name__ = "process_{}".format(method)
setattr(cls, _method_name.__name__, _method_name)
for method in EventProcessor._methods:
process_generator(EventProcessor, method)
watch_manager = pyinotify.WatchManager()
event_notifier = pyinotify.ThreadedNotifier(
watch_manager, EventProcessor()
)
watch_this = os.path.abspath(
"/proc/" + str(wstun.pid) + "/fd/" + str(wstun_fd)
)
watch_manager.add_watch(watch_this, pyinotify.ALL_EVENTS)
event_notifier.start()
def _startWstun(self, public_port, local_port, event="no-set"):
opt_reverse = "-r" + str(public_port) + ":127.0.0.1:" + str(local_port)
try:
wstun = subprocess.Popen(
['/usr/bin/wstun', opt_reverse, self.wstun_url],
[CONF.services.wstun_bin, opt_reverse, self.wstun_url],
stdout=subprocess.PIPE
)
if(event != "boot"):
print("WSTUN start event:")
cmd_print = 'WSTUN exec: /usr/bin/wstun ' \
cmd_print = 'WSTUN exec: ' + str(CONF.services.wstun_bin) \
+ opt_reverse + ' ' + self.wstun_url
print(" - " + str(cmd_print))
LOG.debug(cmd_print)
# WSTUN MON
# ##############################################################
Thread(
target=self._wstunMon,
args=(wstun,)
).start()
# self._wstunMon(wstun)
# ##############################################################
except Exception as err:
LOG.error("Error spawning WSTUN process: " + str(err))
wstun = None

View File

@ -22,11 +22,6 @@ from iotronic_lightningrod.modules import Module
from iotronic_lightningrod.modules import utils
import iotronic_lightningrod.wampmessage as WM
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
import importlib as imp
import inspect
@ -35,16 +30,36 @@ import OpenSSL.crypto
import os
import time
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
proxy_opts = [
cfg.StrOpt(
'proxy',
choices=[('nginx', ('nginx proxy')), ],
help=('Proxy for WebServices Manager')
),
]
CONF = cfg.CONF
webservice_group = cfg.OptGroup(
name='webservices', title='WebServices options'
)
CONF.register_group(webservice_group)
CONF.register_opts(proxy_opts, group=webservice_group)
class WebServiceManager(Module.Module):
def __init__(self, board, session):
super(WebServiceManager, self).__init__("WebServiceManager", board)
LOG.info(" - Proxy used: " + CONF.proxy.upper())
LOG.info(" - Proxy used: " + CONF.webservices.proxy.upper())
try:
proxy_type = CONF.proxy
proxy_type = CONF.webservices.proxy
path = package_path + "/modules/proxies/" + proxy_type + ".py"
if os.path.exists(path):

View File

@ -9,3 +9,4 @@ httplib2>=0.9.1 # MIT
psutil>=5.4.7 # BSD
oslo.config>=5.1.0 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0
pyinotify>=0.9.6;sys_platform!='win32' and sys_platform!='darwin' and sys_platform!='sunos5' # MIT

View File

@ -55,7 +55,9 @@ else:
+ 'etc/iotronic/iotronic.conf /etc/iotronic/iotronic.conf')
print(' - iotronic.conf - Created.')
else:
print(' - iotronic.conf - Already exists.')
os.system('cp ' + py_dist_pack + '/iotronic_lightningrod/'
+ 'etc/iotronic/iotronic.conf /etc/iotronic/iotronic.conf')
print(' - iotronic.conf - Overwritten.')
if not os.path.exists('/var/lib/iotronic/'):

View File

@ -51,4 +51,4 @@ basepython = python3.5
show-source = True
builtins = _
ignore = E711,E712,H404,H405,E123,E125,E901
exclude = .venv,.git,.tox,dist,doc,etc,*lib/python*,*egg,build,iotronic_lightningrod/plugins/plugins_examples/
exclude = .venv,.git,.tox,dist,doc,etc,*lib/python*,*egg,build,iotronic_lightningrod/plugins/plugins_examples/,STUFF