From 0bce1217af190bf5cec759213573b6f4d0371037 Mon Sep 17 00:00:00 2001 From: mak-454 Date: Mon, 27 Feb 2017 14:55:29 +0530 Subject: [PATCH] NFP - Failure handling in core Added following support : 1) Watchdog support for, 1.1) Sequenced events. 1.2) APIs sent to controller. 1.3) Works delivered to threads in worker process. 2) Path support, 2.1) NFP modules can now identify their events with a Path. CREATE/UPDATE/DELETE 2.2) Core offloads handling of following conditions on path, -> DELETE events while CREATE is going on. -> Discard delayed responses from controller on a path. -> UPDATE while an UPDATE is going on. 3) Support for Event Context, 3.1) Inherently passed along with each event. 3.2) Modules can choose to override. 3.3) Maintained as a python GT context so that all methods executing in that thread get access without being explicitly passed. Change-Id: I6526737a57271cf8d24d498d97474e8583ccc59d Partial-Bug: 1668198 --- .testr.conf | 2 +- .../neutron/tests/unit/nfp/core/nfp_module.py | 4 + .../tests/unit/nfp/core/test_process_model.py | 160 ++++++----- gbpservice/nfp/core/context.py | 85 +++++- gbpservice/nfp/core/controller.py | 249 +++++++++--------- gbpservice/nfp/core/event.py | 48 ++-- gbpservice/nfp/core/executor.py | 10 +- gbpservice/nfp/core/launcher.py | 17 +- gbpservice/nfp/core/log.py | 65 +---- gbpservice/nfp/core/manager.py | 219 ++++++++------- gbpservice/nfp/core/path.py | 169 ++++++++++++ gbpservice/nfp/core/poll.py | 81 ------ gbpservice/nfp/core/sequencer.py | 13 + gbpservice/nfp/core/threadpool.py | 2 +- gbpservice/nfp/core/watchdog.py | 113 ++++++++ gbpservice/nfp/core/worker.py | 127 +++++---- 16 files changed, 841 insertions(+), 523 deletions(-) create mode 100644 gbpservice/nfp/core/path.py delete mode 100644 gbpservice/nfp/core/poll.py create mode 100644 gbpservice/nfp/core/watchdog.py diff --git a/.testr.conf b/.testr.conf index 858fae61f..e6d51d85e 100644 --- a/.testr.conf +++ b/.testr.conf @@ -2,6 +2,6 @@ test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \ OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \ OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-120} \ - ${PYTHON:-python} -m subunit.run discover -t ./ ${OS_TEST_PATH:-./gbpservice} $LISTOPT $IDOPTION + ${PYTHON:-python} -m subunit.run discover -t ./ ${OS_TEST_PATH:-./gbpservice/neutron/tests/unit/nfp/core/} $LISTOPT $IDOPTION test_id_option=--load-list $IDFILE test_list_option=--list diff --git a/gbpservice/neutron/tests/unit/nfp/core/nfp_module.py b/gbpservice/neutron/tests/unit/nfp/core/nfp_module.py index 11a1db531..c8c6d0483 100644 --- a/gbpservice/neutron/tests/unit/nfp/core/nfp_module.py +++ b/gbpservice/neutron/tests/unit/nfp/core/nfp_module.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License.from gbpservice.neutron.nsf.core import main +from gbpservice.nfp.core import context as nfp_context from gbpservice.nfp.core import event from gbpservice.nfp.core import module as nfp_api from oslo_log import log as logging @@ -30,6 +31,9 @@ class EventsHandler(nfp_api.NfpEventHandler): self.controller = controller def handle_event(self, event): + event.context['log_context']['namespace'] = event.desc.target + nfp_context.init(event.context) + if event.id == 'TEST_EVENT_ACK_FROM_WORKER': self.controller.event_ack_handler_cb_obj.set() diff --git a/gbpservice/neutron/tests/unit/nfp/core/test_process_model.py b/gbpservice/neutron/tests/unit/nfp/core/test_process_model.py index 031416c18..e0a2db54b 100644 --- a/gbpservice/neutron/tests/unit/nfp/core/test_process_model.py +++ b/gbpservice/neutron/tests/unit/nfp/core/test_process_model.py @@ -10,9 +10,11 @@ # License for the specific language governing permissions and limitations # under the License. +from gbpservice.nfp.core import context as nfp_context from gbpservice.nfp.core import controller as nfp_controller from gbpservice.nfp.core import event as nfp_event from gbpservice.nfp.core import log as nfp_logging +from gbpservice.nfp.core import manager as nfp_manager from gbpservice.nfp.core import worker as nfp_worker import mock import multiprocessing as multiprocessing @@ -51,11 +53,10 @@ class MockedPipe(object): class MockedProcess(object): def __init__(self, parent_pipe=None, child_pipe=None, - lock=None, controller=None): + controller=None): self.parent_pipe = parent_pipe self.child_pipe = child_pipe - self.lock = lock self.controller = controller self.daemon = True self.pid = random.randint(8888, 9999) @@ -64,14 +65,12 @@ class MockedProcess(object): self.worker = nfp_worker.NfpWorker({}, threads=0) self.worker.parent_pipe = self.parent_pipe self.worker.pipe = self.child_pipe - self.worker.lock = self.lock self.worker.controller = nfp_controller.NfpController( self.controller._conf, singleton=False) # fork a new controller object self.worker.controller.PROCESS_TYPE = "worker" self.worker.controller._pipe = self.worker.pipe - self.worker.controller._lock = self.worker.lock self.worker.controller._event_handlers = ( self.controller._event_handlers) self.worker.event_handlers = self.controller.get_event_handlers() @@ -82,34 +81,31 @@ class MockedProcess(object): self.controller._process_event) -class MockedLock(object): - def __init__(self): - pass - - def acquire(self): - pass - - def release(self): - pass - - def mocked_pipe(**kwargs): return MockedPipe(), MockedPipe() def mocked_process(target=None, args=None): return MockedProcess(parent_pipe=args[1], - child_pipe=args[2], lock=args[3], - controller=args[4]) - - -def mocked_lock(): - return MockedLock() + child_pipe=args[2], + controller=args[3]) nfp_controller.PIPE = mocked_pipe nfp_controller.PROCESS = mocked_process -nfp_controller.LOCK = mocked_lock + + +class MockedWatchdog(object): + + def __init__(self, handler, seconds=1, event=None): + if event and event.desc.type == 'poll_event': + # time.sleep(seconds) + handler(event=event) + + def cancel(self): + pass + +nfp_manager.WATCHDOG = MockedWatchdog class Object(object): @@ -120,6 +116,9 @@ class Object(object): class Test_Process_Model(unittest.TestCase): + def setUp(self): + nfp_context.init() + def _mocked_fork(self, args): proc = Object() pid = random.randint(8888, 9999) @@ -277,7 +276,10 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(False) - def mocked_pipe_send(self, pipe, lock, event): + def mocked_compress(self, event): + pass + + def mocked_pipe_send(self, pipe, event): if event.id == 'EVENT_1': if hasattr(event, 'desc'): if event.desc.worker: @@ -328,10 +330,12 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(called) @mock.patch( - 'gbpservice.nfp.core.controller.NfpController.pipe_send' - ) - def test_load_distribution_to_workers(self, mock_pipe_send): + 'gbpservice.nfp.core.controller.NfpController.pipe_send') + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_load_distribution_to_workers(self, mock_compress, mock_pipe_send): mock_pipe_send.side_effect = self.mocked_pipe_send + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -501,20 +505,22 @@ class Test_Process_Model(unittest.TestCase): controller.event_complete(event_1) controller.event_complete(event_2) - @mock.patch( - 'gbpservice.nfp.core.controller.NfpController.pipe_send' - ) - def test_poll_event(self, mock_pipe_send): - mock_pipe_send.side_effect = self.mocked_pipe_send - conf = oslo_config.CONF - conf.nfp_modules_path = NFP_MODULES_PATH - controller = nfp_controller.NfpController(conf, singleton=False) - self.controller = controller - nfp_controller.load_nfp_modules(conf, controller) - # Mock launching of a worker - controller.launch(1) - controller._update_manager() - self.controller = controller + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.pipe_send') + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_poll_event(self, mock_compress, mock_pipe_send): + mock_pipe_send.side_effect = self.mocked_pipe_send + mock_compress.side_effect = self.mocked_compress + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf, singleton=False) + self.controller = controller + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(1) + controller._update_manager() + self.controller = controller wait_obj = multiprocessing.Event() setattr(controller, 'poll_event_wait_obj', wait_obj) @@ -526,6 +532,9 @@ class Test_Process_Model(unittest.TestCase): setattr(event, 'desc', desc) event.desc.worker = controller.get_childrens().keys()[0] + ctx = nfp_context.get() + ctx['log_context']['namespace'] = 'nfp_module' + controller.poll_event(event, spacing=1) # controller._manager.manager_run() @@ -533,7 +542,7 @@ class Test_Process_Model(unittest.TestCase): # relinquish for 1sec time.sleep(1) - controller.poll() + # controller.poll() controller.poll_event_wait_obj.wait(0.1) called = controller.poll_event_wait_obj.is_set() end_time = time.time() @@ -541,9 +550,11 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(round(end_time - start_time) == 1.0) @mock.patch( - 'gbpservice.nfp.core.controller.NfpController.pipe_send' - ) - def test_poll_event_with_no_worker(self, mock_pipe_send): + 'gbpservice.nfp.core.controller.NfpController.pipe_send') + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_poll_event_with_no_worker(self, mock_compress, mock_pipe_send): + mock_compress.side_effect = self.mocked_compress mock_pipe_send.side_effect = self.mocked_pipe_send conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH @@ -566,6 +577,9 @@ class Test_Process_Model(unittest.TestCase): # Explicitly make it none event.desc.worker = None + ctx = nfp_context.get() + ctx['log_context']['namespace'] = 'nfp_module' + controller.poll_event(event, spacing=1) # controller._manager.manager_run() @@ -573,7 +587,7 @@ class Test_Process_Model(unittest.TestCase): # relinquish for 1sec time.sleep(1) - controller.poll() + # controller.poll() controller.poll_event_wait_obj.wait(0.1) called = controller.poll_event_wait_obj.is_set() end_time = time.time() @@ -581,10 +595,14 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(round(end_time - start_time) == 1.0) @mock.patch( - 'gbpservice.nfp.core.controller.NfpController.pipe_send' - ) - def test_poll_event_with_decorator_spacing(self, mock_pipe_send): + 'gbpservice.nfp.core.controller.NfpController.pipe_send') + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_poll_event_with_decorator_spacing(self, + mock_compress, mock_pipe_send): + mock_pipe_send.side_effect = self.mocked_pipe_send + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -606,6 +624,8 @@ class Test_Process_Model(unittest.TestCase): # Explicitly make it none event.desc.worker = None + ctx = nfp_context.get() + ctx['log_context']['namespace'] = 'nfp_module' controller.poll_event(event) # controller._manager.manager_run() @@ -613,14 +633,17 @@ class Test_Process_Model(unittest.TestCase): # relinquish for 2secs time.sleep(2) - controller.poll() + # controller.poll() controller.poll_event_dec_wait_obj.wait(0.1) called = controller.poll_event_dec_wait_obj.is_set() end_time = time.time() self.assertTrue(called) self.assertTrue(round(end_time - start_time) == 2.0) - def test_poll_event_with_no_spacing(self): + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_poll_event_with_no_spacing(self, mock_compress): + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -643,7 +666,10 @@ class Test_Process_Model(unittest.TestCase): # self.assertTrue(False) self.assertTrue(True) - def test_poll_event_with_no_handler(self): + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_poll_event_with_no_handler(self, mock_compress): + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -666,10 +692,12 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(False) @mock.patch( - 'gbpservice.nfp.core.manager.NfpResourceManager._event_acked' - ) - def test_event_ack_from_worker(self, mock_event_acked): + 'gbpservice.nfp.core.manager.NfpResourceManager._event_acked') + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_event_ack_from_worker(self, mock_event_acked, mock_compress): mock_event_acked.side_effect = self._mocked_event_ack + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -704,7 +732,11 @@ class Test_Process_Model(unittest.TestCase): called = controller.event_ack_handler_cb_obj.is_set() self.assertTrue(called) - def test_post_event_from_worker(self): + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress' + ) + def test_post_event_from_worker(self, mock_compress): + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -734,7 +766,11 @@ class Test_Process_Model(unittest.TestCase): called = controller.post_event_worker_wait_obj.is_set() self.assertTrue(called) - def test_poll_event_from_worker(self): + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress' + ) + def test_poll_event_from_worker(self, mock_compress): + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -768,13 +804,17 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(called) time.sleep(1) - controller.poll() + # controller.poll() controller.poll_event_poll_wait_obj.wait(1) called = controller.poll_event_poll_wait_obj.is_set() self.assertTrue(called) - def test_poll_event_cancelled_from_worker(self): + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress' + ) + def test_poll_event_cancelled_from_worker(self, mock_compress): + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -810,14 +850,14 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(called) time.sleep(1) - controller.poll() + # controller.poll() controller.poll_event_poll_wait_obj.wait(1) called = controller.poll_event_poll_wait_obj.is_set() self.assertTrue(called) time.sleep(1) - controller.poll() + # controller.poll() controller.poll_event_poll_wait_obj.wait(1) called = controller.poll_event_poll_wait_obj.is_set() diff --git a/gbpservice/nfp/core/context.py b/gbpservice/nfp/core/context.py index ed93010dd..6317c230b 100644 --- a/gbpservice/nfp/core/context.py +++ b/gbpservice/nfp/core/context.py @@ -12,28 +12,85 @@ import threading -nfp_context_store = threading.local() + +class LogContext(object): + + def __init__(self, data): + self.data = data + + def purge(self): + if self.data: + return { + 'meta_id': self.data.get('meta_id', '-'), + 'nfi_id': self.data.get('nfi_id', '-'), + 'nfd_id': self.data.get('nfd_id', '-'), + 'path': self.data.get('path'), + 'auth_token': self.data.get('auth_token'), + 'namespace': self.data.get('namespace') + } + return self.data + + +class CoreContext(object): + + def __init__(self, data): + self.data = data + + def purge(self): + return { + 'log_context': LogContext(self.data.get('log_context')).purge(), + 'event_desc': self.data.get('event_desc') + } class NfpContext(object): - def __init__(self, context): - self.context = context + def __init__(self, data): + self.data = data - def get_context(self): - return self.context + def purge(self): + return CoreContext(self.data).purge() -def store_nfp_context(context): - nfp_context_store.context = NfpContext(context) +Context = threading.local() -def clear_nfp_context(): - nfp_context_store.context = None +def init_log_context(): + return { + 'meta_id': '-', + 'nfi_id': '-', + 'nfd_id': '-', + 'path': '-', + 'auth_token': None, + 'namespace': None + } -def get_nfp_context(): - context = getattr(nfp_context_store, 'context', None) - if context: - return context.get_context() - return {} +def init(data=None): + if not data: + data = {} + if 'log_context' not in data.keys(): + data['log_context'] = init_log_context() + if 'event_desc' not in data.keys(): + data['event_desc'] = {} + Context.context = NfpContext(data) + context = getattr(Context, 'context') + return context.data + + +def get(): + try: + context = getattr(Context, 'context') + return context.data + except AttributeError: + return init() + + +def purge(): + try: + context = getattr(Context, 'context') + return context.purge() + except AttributeError: + init() + context = getattr(Context, 'context') + return context.purge() diff --git a/gbpservice/nfp/core/controller.py b/gbpservice/nfp/core/controller.py index 6e8474696..491493bba 100644 --- a/gbpservice/nfp/core/controller.py +++ b/gbpservice/nfp/core/controller.py @@ -14,11 +14,11 @@ import ast import eventlet eventlet.monkey_patch() +import collections import multiprocessing import operator import os import pickle -import Queue import sys import time import zlib @@ -28,11 +28,11 @@ from oslo_service import service as oslo_service from gbpservice.nfp.core import cfg as nfp_cfg from gbpservice.nfp.core import common as nfp_common +from gbpservice.nfp.core import context from gbpservice.nfp.core import event as nfp_event from gbpservice.nfp.core import launcher as nfp_launcher from gbpservice.nfp.core import log as nfp_logging from gbpservice.nfp.core import manager as nfp_manager -from gbpservice.nfp.core import poll as nfp_poll from gbpservice.nfp.core import rpc as nfp_rpc from gbpservice.nfp.core import worker as nfp_worker @@ -42,9 +42,9 @@ from neutron.common import config LOG = nfp_logging.getLogger(__name__) PIPE = multiprocessing.Pipe -LOCK = multiprocessing.Lock PROCESS = multiprocessing.Process identify = nfp_common.identify +deque = collections.deque # REVISIT (mak): fix to pass compliance check config = config @@ -76,8 +76,8 @@ class NfpService(object): def register_events(self, event_descs, priority=0): """Register event handlers with core. """ - logging_context = nfp_logging.get_logging_context() - module = logging_context['namespace'] + nfp_context = context.get() + module = nfp_context['log_context']['namespace'] # REVISIT (mak): change name to register_event_handlers() ? for event_desc in event_descs: self._event_handlers.register( @@ -104,22 +104,12 @@ class NfpService(object): event = None try: event = nfp_event.Event(**kwargs) - # Get the logging context stored in thread - logging_context = nfp_logging.get_logging_context() - # Log metadata for event handling code - event.context = logging_context except AssertionError as aerr: message = "%s" % (aerr) LOG.exception(message) return event def post_graph(self, graph_nodes, root_node): - """Post graph. - - Since graph is also implemneted with events, - first post all the node events followed by - root node event. - """ for node in graph_nodes: self.post_event(node) @@ -138,6 +128,13 @@ class NfpService(object): event.desc.flag = nfp_event.EVENT_NEW event.desc.pid = os.getpid() event.desc.target = module + if event.lifetime == -1: + event.lifetime = nfp_event.EVENT_DEFAULT_LIFETIME + if not event.context: + # Log nfp_context for event handling code + event.context = context.purge() + event.desc.path_type = event.context['event_desc'].get('path_type') + event.desc.path_key = event.context['event_desc'].get('path_key') return event # REVISIT (mak): spacing=0, caller must explicitly specify @@ -148,35 +145,46 @@ class NfpService(object): descriptor preparation. NfpController class implements the required functionality. """ - logging_context = nfp_logging.get_logging_context() - module = logging_context['namespace'] + nfp_context = context.get() + module = nfp_context['log_context']['namespace'] handler, ev_spacing = ( self._event_handlers.get_poll_handler(event.id, module=module)) assert handler, "No poll handler found for event %s" % (event.id) assert spacing or ev_spacing, "No spacing specified for polling" if ev_spacing: spacing = ev_spacing - refuuid = event.desc.uuid - event = self._make_new_event(event) - event.lifetime = 0 + if event.desc.type != nfp_event.POLL_EVENT: + event = self._make_new_event(event) + event.desc.uuid = event.desc.uuid + ":" + "POLL_EVENT" event.desc.type = nfp_event.POLL_EVENT event.desc.target = module + event.desc.flag = None kwargs = {'spacing': spacing, - 'max_times': max_times, - 'ref': refuuid} + 'max_times': max_times} poll_desc = nfp_event.PollDesc(**kwargs) setattr(event.desc, 'poll_desc', poll_desc) + + if not event.context: + # Log nfp_context for event handling code + event.context = context.purge() + event.desc.path_type = event.context['event_desc'].get('path_type') + event.desc.path_key = event.context['event_desc'].get('path_key') return event def event_complete(self, event, result=None): """To declare and event complete. """ try: pickle.dumps(result) + uuid = event.desc.uuid + event = self._make_new_event(event) + event.desc.uuid = uuid event.sequence = False event.desc.flag = nfp_event.EVENT_COMPLETE event.result = result + event.context = {} + event.data = {} return event except Exception as e: raise e @@ -224,60 +232,84 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): self._conf = conf self._pipe = None # Queue to stash events. - self._stashq = multiprocessing.Queue() + self._stashq = deque() self._manager = nfp_manager.NfpResourceManager(conf, self) self._worker = nfp_worker.NfpWorker(conf) - self._poll_handler = nfp_poll.NfpPollHandler(conf) # ID of process handling this controller obj self.PROCESS_TYPE = "distributor" def compress(self, event): # REVISIT (mak) : zip only if length is > than threshold (1k maybe) - if event.data and not event.zipped: + if not event.zipped: event.zipped = True - event.data = zlib.compress(str({'cdata': event.data})) + data = {'context': event.context} + event.context = {} + if event.data: + data['data'] = event.data + event.data = zlib.compress(str(data)) def decompress(self, event): - if event.data and event.zipped: + if event.zipped: try: data = ast.literal_eval( zlib.decompress(event.data)) - event.data = data['cdata'] + event.data = data.get('data') + event.context = data['context'] event.zipped = False except Exception as e: - message = "Failed to decompress event data, Reason: %s" % ( + message = "Failed to decompress event data, Reason: %r" % ( e) LOG.error(message) raise e - def pipe_lock(self, lock): - if lock: - lock.acquire() + def is_picklable(self, event): + """To check event is picklable or not. + For sending event through pipe it must be picklable + """ + try: + pickle.dumps(event) + except Exception as e: + message = "(event - %s) is not picklable, Reason: %s" % ( + event.identify(), e) + assert False, message - def pipe_unlock(self, lock): - if lock: - lock.release() - - def pipe_recv(self, pipe, lock): - self.pipe_lock(lock) - event = pipe.recv() - self.pipe_unlock(lock) + def pipe_recv(self, pipe): + event = None + try: + event = pipe.recv() + except Exception as exc: + LOG.debug("Failed to receive event from pipe " + "with exception - %r - will retry.." % (exc)) + eventlet.greenthread.sleep(1.0) if event: self.decompress(event) return event - def pipe_send(self, pipe, lock, event): + def pipe_send(self, pipe, event, resending=False): + self.is_picklable(event) + try: - self.compress(event) - self.pipe_lock(lock) - pipe.send(event) - self.pipe_unlock(lock) + # If there is no reader yet + if not pipe.poll(): + self.compress(event) + pipe.send(event) + return True except Exception as e: - message = "Failed to send data via pipe, Reason: %s" % (e) - LOG.error(message) - raise e + message = ("Failed to send event - %s via pipe" + "- exception - %r - will resend" % ( + event.identify(), e)) + LOG.debug(message) + + # If the event is being sent by resending task + # then dont append here, task will put back the + # event at right location + if not resending: + # If couldnt send event.. stash it so that + # resender task will send event again + self._stashq.append(event) + return False def _fork(self, args): proc = PROCESS(target=self.child, args=args) @@ -285,6 +317,28 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): proc.start() return proc + def _resending_task(self): + while(True): + try: + event = self._stashq.popleft() + if self.PROCESS_TYPE != "worker": + evm = self._manager._get_event_manager(event.desc.worker) + LOG.debug("Resending event - %s" % (event.identify())) + sent = self.pipe_send(evm._pipe, event, resending=True) + else: + sent = self.pipe_send(self._pipe, event, resending=True) + # Put back in front + if not sent: + self._stashq.appendleft(event) + except IndexError: + pass + except Exception as e: + message = ("Unexpected exception - %r - while" + "sending event - %s" % (e, event.identify())) + LOG.error(message) + + eventlet.greenthread.sleep(0.1) + def _manager_task(self): while True: # Run 'Manager' here to monitor for workers and @@ -295,9 +349,9 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): def _update_manager(self): childs = self.get_childrens() for pid, wrapper in childs.iteritems(): - pipe, lock = wrapper.child_pipe_map[pid] + pipe = wrapper.child_pipe_map[pid] # Inform 'Manager' class about the new_child. - self._manager.new_child(pid, pipe, lock) + self._manager.new_child(pid, pipe) def _process_event(self, event): self._manager.process_events([event]) @@ -318,29 +372,21 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): parent_pipe, child_pipe = PIPE(duplex=True) - # Sometimes Resource Temporarily Not Available (errno=11) - # is observed with python pipe. There could be many reasons, - # One theory is if read & - # write happens at the same instant, pipe does report this - # error. Using lock to avoid this. - lock = LOCK() - # Registered event handlers of nfp module. # Workers need copy of this data to dispatch an # event to module. - proc = self._fork(args=(wrap.service, parent_pipe, child_pipe, - lock, self)) + proc = self._fork(args=(wrap.service, parent_pipe, child_pipe, self)) message = ("Forked a new child: %d" "Parent Pipe: % s, Child Pipe: % s") % ( - proc.pid, str(parent_pipe), str(child_pipe)) + proc.pid, str(parent_pipe), str(child_pipe)) LOG.info(message) try: - wrap.child_pipe_map[proc.pid] = (parent_pipe, lock) + wrap.child_pipe_map[proc.pid] = parent_pipe except AttributeError: setattr(wrap, 'child_pipe_map', {}) - wrap.child_pipe_map[proc.pid] = (parent_pipe, lock) + wrap.child_pipe_map[proc.pid] = parent_pipe self._worker_process[proc.pid] = proc return proc.pid @@ -388,20 +434,10 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): # One task to manage the resources - workers & events. eventlet.spawn_n(self._manager_task) - # Oslo periodic task to poll for timer events - nfp_poll.PollingTask(self._conf, self) + eventlet.spawn_n(self._resending_task) # Oslo periodic task for state reporting nfp_rpc.ReportStateTask(self._conf, self) - def poll_add(self, event, timeout, callback): - """Add an event to poller. """ - self._poll_handler.poll_add( - event, timeout, callback) - - def poll(self): - """Invoked in periodic task to poll for timedout events. """ - self._poll_handler.run() - def report_state(self): """Invoked by report_task to report states of all agents. """ for value in self._rpc_agents.itervalues(): @@ -472,7 +508,7 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): LOG.debug(message) # Send it to the distributor process - self.pipe_send(self._pipe, self._lock, event) + self.pipe_send(self._pipe, event) else: message = ("(event - %s) - new event in distributor" "processing event") % (event.identify()) @@ -518,7 +554,7 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): event = super(NfpController, self).poll_event( event, spacing=spacing, max_times=max_times) # Send to the distributor process. - self.pipe_send(self._pipe, self._lock, event) + self.pipe_send(self._pipe, event) def stop_poll_event(self, key, id): """To stop the running poll event @@ -526,58 +562,26 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): :param key: key of polling event :param id: id of polling event """ - key = key + ":" + id + key = key + ":" + id + ":" + "POLL_EVENT" event = self.new_event(id='STOP_POLL_EVENT', data={'key': key}) event.desc.type = nfp_event.POLL_EVENT event.desc.flag = nfp_event.POLL_EVENT_STOP if self.PROCESS_TYPE == "worker": - self.pipe_send(self._pipe, self._lock, event) + self.pipe_send(self._pipe, event) else: self._manager.process_events([event]) - def stash_event(self, event): - """To stash an event. - - This will be invoked by worker process. - Put this event in queue, distributor will - pick it up. - - Executor: worker-process + def path_complete_event(self): + """Create event for path completion """ - if self.PROCESS_TYPE == "distributor": - message = "(event - %s) - distributor cannot stash" % ( - event.identify()) - LOG.debug(message) + nfp_context = context.get() + event = self.new_event(id='PATH_COMPLETE') + event.desc.path_type = nfp_context['event_desc'].get('path_type') + event.desc.path_key = nfp_context['event_desc'].get('path_key') + if self.PROCESS_TYPE == "worker": + self.pipe_send(self._pipe, event) else: - message = "(event - %s) - stashed" % (event.identify()) - LOG.debug(message) - self._stashq.put(event) - - def get_stashed_events(self): - """To get stashed events. - - Returns available number of stashed events - as list. Will be invoked by distributor, - worker cannot pull. - - Executor: distributor-process - """ - events = [] - # return at max 5 events - maxx = 1 - # wait sometime for first event in the queue - timeout = 0.1 - while maxx: - try: - event = self._stashq.get(timeout=timeout) - self.decompress(event) - events.append(event) - timeout = 0 - maxx -= 1 - except Queue.Empty: - maxx = 0 - pass - return events + self._manager.process_events([event]) def event_complete(self, event, result=None): """To mark an event complete. @@ -600,7 +604,7 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): self._manager.process_events([event]) else: # Send to the distributor process. - self.pipe_send(self._pipe, self._lock, event) + self.pipe_send(self._pipe, event) def load_nfp_modules(conf, controller): @@ -615,6 +619,7 @@ def load_nfp_modules(conf, controller): def load_nfp_modules_from_path(conf, controller, path): """ Load all nfp modules from configured directory. """ pymodules = [] + nfp_context = context.get() try: base_module = __import__(path, globals(), locals(), ['modules'], -1) @@ -629,7 +634,7 @@ def load_nfp_modules_from_path(conf, controller, path): pymodule = eval('pymodule.%s' % (pyfile[:-3])) try: namespace = pyfile[:-3].split(".")[-1] - nfp_logging.store_logging_context(namespace=namespace) + nfp_context['log_context']['namespace'] = namespace pymodule.nfp_module_init(controller, conf) pymodules += [pymodule] message = "(module - %s) - Initialized" % ( @@ -664,10 +669,11 @@ def controller_init(conf, nfp_controller): def nfp_modules_post_init(conf, nfp_modules, nfp_controller): + nfp_context = context.get() for module in nfp_modules: try: namespace = module.__name__.split(".")[-1] - nfp_logging.store_logging_context(namespace=namespace) + nfp_context['log_context']['namespace'] = namespace module.nfp_module_post_init(nfp_controller, conf) except AttributeError: message = ("(module - %s) - does not implement" @@ -701,6 +707,7 @@ def load_module_opts(conf): def main(): + context.init() args, module = extract_module(sys.argv[1:]) conf = nfp_cfg.init(module, args) conf.module = module diff --git a/gbpservice/nfp/core/event.py b/gbpservice/nfp/core/event.py index 032e85c37..1622a0824 100644 --- a/gbpservice/nfp/core/event.py +++ b/gbpservice/nfp/core/event.py @@ -26,7 +26,6 @@ identify = nfp_common.identify SCHEDULE_EVENT = 'schedule_event' POLL_EVENT = 'poll_event' STASH_EVENT = 'stash_event' -EVENT_EXPIRED = 'event_expired' """Event Flag """ EVENT_NEW = 'new_event' @@ -34,6 +33,8 @@ EVENT_COMPLETE = 'event_done' EVENT_ACK = 'event_ack' POLL_EVENT_STOP = 'poll_event_stop' +EVENT_DEFAULT_LIFETIME = 600 + """Sequencer status. """ SequencerEmpty = nfp_seq.SequencerEmpty SequencerBusy = nfp_seq.SequencerBusy @@ -86,19 +87,29 @@ class EventDesc(object): self.target = None # ID of graph of which this event is part of self.graph = None + # Type of path to which this event belongs CREATE/UPDATE/DELETE + self.path_type = kwargs.get('path_type') + # Unique key for the path + self.path_key = kwargs.get('path_key') + # Marks whether an event was acked or not + self.acked = False def from_desc(self, desc): self.type = desc.type self.flag = desc.flag self.worker = desc.worker self.poll_desc = desc.poll_desc + self.path_type = desc.path_type + self.path_key = desc.path_key def to_dict(self): return {'uuid': self.uuid, 'type': self.type, 'flag': self.flag, 'worker': self.worker, - 'poll_desc': self.poll_desc + 'poll_desc': self.poll_desc, + 'path_type': self.path_type, + 'path_key': self.path_key } """Defines the event structure. @@ -126,7 +137,7 @@ class Event(object): # Handler of the event. self.handler = kwargs.get('handler') # Lifetime of the event in seconds. - self.lifetime = kwargs.get('lifetime', 0) + self.lifetime = kwargs.get('lifetime', -1) # Identifies whether event.data is zipped self.zipped = False # Log metadata context @@ -247,13 +258,12 @@ class NfpEventHandlers(object): def get_poll_handler(self, event_id, module=None): """Get the poll handler for event_id. """ - ph = None - spacing = 0 + ph, spacing = None, None try: if module: ph = self._event_desc_table[event_id]['modules'][module][0][1] - spacing = self._event_desc_table[event_id]['modules'][ - module][0][2] + spacing = self._event_desc_table[ + event_id]['modules'][module][0][2] else: priorities = ( self._event_desc_table[event_id]['priority'].keys()) @@ -261,8 +271,8 @@ class NfpEventHandlers(object): ph = ( self._event_desc_table[ event_id]['priority'][priority][0][1]) - spacing = self._event_desc_table[event_id]['priority'][ - priority][0][2] + spacing = self._event_desc_table[ + event_id]['priority'][priority][0][2] finally: message = "%s - Returning poll handler" % ( self._log_meta(event_id, ph)) @@ -292,16 +302,13 @@ class NfpEventHandlers(object): class NfpEventManager(object): - def __init__(self, conf, controller, sequencer, pipe=None, pid=-1, - lock=None): + def __init__(self, conf, controller, sequencer, pipe=None, pid=-1): self._conf = conf self._controller = controller # PID of process to which this event manager is associated self._pid = pid # Duplex pipe to read & write events self._pipe = pipe - # Lock over pipe access - self._lock = lock # Cache of UUIDs of events which are dispatched to # the worker which is handled by this em. self._cache = deque() @@ -315,7 +322,7 @@ class NfpEventManager(object): else: return "(event_manager - %d" % (self._pid) - def _wait_for_events(self, pipe, lock, timeout=0.01): + def _wait_for_events(self, pipe, timeout=0.01): """Wait & pull event from the pipe. Wait till timeout for the first event and then @@ -324,12 +331,11 @@ class NfpEventManager(object): """ events = [] try: - self._controller.pipe_lock(lock) ret = pipe.poll(timeout) - self._controller.pipe_unlock(lock) if ret: - event = self._controller.pipe_recv(pipe, lock) - events.append(event) + event = self._controller.pipe_recv(pipe) + if event: + events.append(event) except multiprocessing.TimeoutError as err: message = "%s" % (err) LOG.exception(message) @@ -373,7 +379,7 @@ class NfpEventManager(object): verr = verr message = "%s - event not in cache" % ( self._log_meta(event)) - LOG.warn(message) + LOG.debug(message) def dispatch_event(self, event, event_type=None, inc_load=True, cache=True): @@ -392,7 +398,7 @@ class NfpEventManager(object): if event_type: event.desc.type = event_type # Send to the worker - self._controller.pipe_send(self._pipe, self._lock, event) + self._controller.pipe_send(self._pipe, event) self._load = (self._load + 1) if inc_load else self._load # Add to the cache @@ -401,4 +407,4 @@ class NfpEventManager(object): def event_watcher(self, timeout=0.01): """Watch for events. """ - return self._wait_for_events(self._pipe, self._lock, timeout=timeout) + return self._wait_for_events(self._pipe, timeout=timeout) diff --git a/gbpservice/nfp/core/executor.py b/gbpservice/nfp/core/executor.py index dd7d53398..80b986df9 100644 --- a/gbpservice/nfp/core/executor.py +++ b/gbpservice/nfp/core/executor.py @@ -13,6 +13,7 @@ from argparse import Namespace +from gbpservice.nfp.core import context from gbpservice.nfp.core import log as nfp_logging from gbpservice.nfp.core import threadpool as core_tp @@ -84,6 +85,10 @@ class TaskExecutor(object): self.pipe_line = [] self.fired = False + def dispatch(self, job): + context.init() + return job['method'](*job['args'], **job['kwargs']) + @check_in_use def fire(self): self.fired = True @@ -92,8 +97,7 @@ class TaskExecutor(object): "TaskExecutor - (job - %s) dispatched" % (str(job))) - th = self.thread_pool.dispatch( - job['method'], *job['args'], **job['kwargs']) + th = self.thread_pool.dispatch(self.dispatch, job) job['thread'] = th for job in self.pipe_line: @@ -214,7 +218,7 @@ class EventGraphExecutor(object): graph = self._graph(completed_node) if graph: if completed_node == graph['root']: - #Graph is complete here, remove from running_instances + # Graph is complete here, remove from running_instances self.running.pop(graph['id']) else: root = self._root(graph, completed_node) diff --git a/gbpservice/nfp/core/launcher.py b/gbpservice/nfp/core/launcher.py index c3774648c..eeda92137 100644 --- a/gbpservice/nfp/core/launcher.py +++ b/gbpservice/nfp/core/launcher.py @@ -11,6 +11,7 @@ # under the License. import os +import signal import time from oslo_service import service as oslo_service @@ -33,12 +34,24 @@ ProcessLauncher = oslo_service.ProcessLauncher class NfpLauncher(ProcessLauncher): def __init__(self, conf): + # Add SIGALARM to ignore_signals, because core + # uses SIGALRM for watchdog, while oslo uses the + # same for exit. + # Signal handler is singleton class, changing here will + # have global effect. + self.signal_handler = oslo_service.SignalHandler() + self.signal_handler._ignore_signals += ('SIGALRM',) + self.signal_handler._signals_by_name = dict( + (name, getattr(signal, name)) + for name in dir(signal) + if name.startswith("SIG") and + name not in self.signal_handler._ignore_signals) + super(NfpLauncher, self).__init__(conf) - def child(self, service, ppipe, cpipe, lock, controller): + def child(self, service, ppipe, cpipe, controller): service.parent_pipe = ppipe service.pipe = cpipe - service.lock = lock service.controller = controller self.launcher = self._child_process(service) while True: diff --git a/gbpservice/nfp/core/log.py b/gbpservice/nfp/core/log.py index cf8e1b838..02467eda6 100644 --- a/gbpservice/nfp/core/log.py +++ b/gbpservice/nfp/core/log.py @@ -14,10 +14,11 @@ from oslo_config import cfg as oslo_config from oslo_log import log as oslo_logging from oslo_utils import importutils +from gbpservice.nfp.core import context + import logging import os import sys -import threading EVENT = 50 logging.addLevelName(EVENT, "EVENT") @@ -80,9 +81,15 @@ class WrappedLogger(logging.Logger): return rv def _get_nfp_msg(self, msg): - context = getattr(logging_context_store, 'context', None) - if context: - msg = "%s %s" % (context.emit(), msg) + nfp_context = context.get() + log_context = nfp_context['log_context'] + if log_context: + ctxt = "[%s] [NFI:%s] [NFD:%s]" % (log_context.get( + 'meta_id', '-'), + log_context.get('nfi_id', '-'), + log_context.get('nfd_id', '-')) + msg = "%s %s" % (ctxt, msg) + component = '' if hasattr(CONF, 'module'): component = CONF.module @@ -94,7 +101,6 @@ class WrappedLogger(logging.Logger): # Prefix log meta id with every log if project is 'nfp' if extra and extra.get('project') == 'nfp': msg = self._get_nfp_msg(msg) - return super(WrappedLogger, self).makeRecord( name, level, fn, lno, msg, args, exc_info, func=func, extra=extra) @@ -104,57 +110,8 @@ def init_logger(logger_class): logging.setLoggerClass(importutils.import_class(logger_class)) -logging_context_store = threading.local() - - -class NfpLogContext(object): - - def __init__(self, **kwargs): - self.meta_id = kwargs.get('meta_id', '-') - self.nfi_id = kwargs.get('nfi_id', '-') - self.nfd_id = kwargs.get('nfd_id', '-') - self.path = kwargs.get('path', '-') - self.auth_token = kwargs.get('auth_token', '') - self.namespace = kwargs.get('namespace', '') - - def emit(self): - return "[%s] [NFI:%s] [NFD:%s]" % (self.meta_id, self.nfi_id, - self.nfd_id) - - def to_dict(self): - return { - 'meta_id': self.meta_id, - 'nfi_id': self.nfi_id, - 'nfd_id': self.nfd_id, - 'path': self.path, - 'auth_token': self.auth_token, - 'namespace': self.namespace} - - def getLogger(name, **kwargs): kwargs.update(project='nfp') logger = NfpLogAdapter(logging.getLogger(name), kwargs) return logger - - -def store_logging_context(**kwargs): - context = NfpLogContext(**kwargs) - logging_context_store.context = context - - -def update_logging_context(**kwargs): - for key, val in kwargs.iteritems(): - if hasattr(logging_context_store.context, key): - setattr(logging_context_store.context, key, val) - - -def clear_logging_context(**kwargs): - logging_context_store.context = None - - -def get_logging_context(): - context = getattr(logging_context_store, 'context', None) - if context: - return context.to_dict() - return {} diff --git a/gbpservice/nfp/core/manager.py b/gbpservice/nfp/core/manager.py index 500146d95..74e7d2921 100644 --- a/gbpservice/nfp/core/manager.py +++ b/gbpservice/nfp/core/manager.py @@ -10,25 +10,23 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import os from gbpservice.nfp.core import event as nfp_event from gbpservice.nfp.core import executor as nfp_executor from gbpservice.nfp.core import log as nfp_logging +from gbpservice.nfp.core import path as nfp_path from gbpservice.nfp.core import sequencer as nfp_sequencer +from gbpservice.nfp.core import watchdog as nfp_watchdog LOG = nfp_logging.getLogger(__name__) NfpEventManager = nfp_event.NfpEventManager NfpGraphExecutor = nfp_executor.EventGraphExecutor - -deque = collections.deque +WATCHDOG = nfp_watchdog.Watchdog def IS_SCHEDULED_EVENT_ACK(event): - return event.desc.type == nfp_event.SCHEDULE_EVENT and ( - event.desc.flag == nfp_event.EVENT_ACK - ) + return event.desc.flag == nfp_event.EVENT_ACK def IS_SCHEDULED_NEW_EVENT(event): @@ -45,9 +43,9 @@ def IS_EVENT_GRAPH(event): return event.desc.graph -def IS_POLL_EVENT_STOP(event): - return event.desc.type == nfp_event.POLL_EVENT and ( - event.desc.flag == nfp_event.POLL_EVENT_STOP) +def IS_PATH_COMPLETE_EVENT(event): + return event.id == 'PATH_COMPLETE' + """Manages the forked childs. @@ -63,7 +61,7 @@ class NfpProcessManager(object): self._controller = controller self._child_snapshot = [] - def new_child(self, pid, pipe, lock): + def new_child(self, pid, pipe): # Pass, as we will learn from comparision as watcher pass @@ -105,8 +103,8 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): self._resource_map = {} # Cache of event objects - {'uuid':} self._event_cache = {} - # Not processed. Events Stored for future. - self._stash = deque() + # watchdog object mapping with event id - {'uuid':} + self._watchdog_map = {} # ID of the distributor process self._distributor_process_id = os.getpid() # Single sequencer to be used by all event managers @@ -117,7 +115,7 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): NfpProcessManager.__init__(self, conf, controller) NfpEventManager.__init__(self, conf, controller, self._event_sequencer) - def new_child(self, pid, pipe, lock): + def new_child(self, pid, pipe): """Invoked when a new child is spawned. Associates an event manager with this child, maintains @@ -130,9 +128,9 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): ev_manager = NfpEventManager( self._conf, self._controller, self._event_sequencer, - pipe=pipe, pid=pid, lock=lock) + pipe=pipe, pid=pid) self._resource_map.update(dict({pid: ev_manager})) - super(NfpResourceManager, self).new_child(pid, pipe, lock) + super(NfpResourceManager, self).new_child(pid, pipe) def manager_run(self): """Invoked periodically to check on resources. @@ -149,12 +147,8 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): def _event_acked(self, event): """Post handling after event is dispatched to worker. """ - if event.lifetime: - message = "(event - %s) - dispatched, polling for expiry" % ( - event.identify()) - LOG.debug(message) - self._controller.poll_add( - event, event.lifetime, self._event_life_timedout) + event.desc.acked = True + nfp_path.event_complete(event) def _dispatch_event(self, event): """Dispatch event to a worker. """ @@ -165,6 +159,7 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): def _graph_event(self, event): if isinstance(event.desc.graph, dict): graph = event.desc.graph + # root = graph['root'] event.desc.graph = graph['id'] @@ -191,8 +186,17 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): # same event as non graph event event.desc.graph = None - # Dispatch to a worker - self._dispatch_event(event) + decision = nfp_path.schedule_event(event) + if decision == 'schedule': + # Dispatch to a worker + self._dispatch_event(event) + LOG.debug("Watchdog started for event - %s" % + (event.identify())) + self._watchdog(event) + elif decision == 'discard': + message = "Discarding path event - %s" % (event.identify()) + LOG.info(message) + self._controller.event_complete(event, result='FAILED') else: message = "(event - %s) - sequencing" % ( event.identify()) @@ -202,54 +206,62 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): return event.sequence - def _scheduled_event_ack(self, ack_event): + def _handle_path_complete(self, event): try: - event = self._event_cache[ack_event.desc.uuid] - evmanager = self._get_event_manager(event.desc.worker) - assert evmanager - # Pop from the pending list of evmanager - evmanager.pop_event(event) - # May be start polling for lifetime of event - self._event_acked(event) - except KeyError as kerr: - kerr = kerr - message = "(event - %s) - acked," - "missing from cache" % (event.identify()) - LOG.error(message) - except AssertionError as aerr: - aerr = aerr - message = "(event - %s) - acked," - "process handling is dead, event will be" - "replayed in new process" % (event.identify()) + path_type = event.desc.path_type + path_key = event.desc.path_key + nfp_path.path_complete(path_type, path_key) + except Exception as e: + message = "Exception - %r - while handling"\ + "event - %s" % (e, event.identify()) LOG.error(message) - def _scheduled_event_complete(self, event, expired=False): + def event_expired(self, event=None): + if event: + LOG.debug("Watchdog expired for event - %s" % (event.identify())) + self._watchdog_map.pop(event.desc.uuid, None) + self._controller.event_complete(event, result='FAILED') + + def _scheduled_event_ack(self, ack_event): + self._event_acked(ack_event) + + def _watchdog_cancel(self, event): + try: + LOG.debug("Watchdog cancelled for event - %s" % (event.identify())) + wd = self._watchdog_map.pop(event.desc.uuid) + wd.cancel() + except KeyError: + pass + + def _watchdog(self, event, handler=None): + if not handler: + handler = self.event_expired + if event.lifetime != -1: + wd = WATCHDOG(handler, + seconds=event.lifetime, + event=event) + self._watchdog_map[event.desc.uuid] = wd + + def _scheduled_event_complete(self, event): # Pop it from cache cached_event = None try: cached_event = self._event_cache.pop(event.desc.uuid) cached_event.result = event.result + # Mark the event as acked + self._watchdog_cancel(event) # Get the em managing the event evmanager = self._get_event_manager(event.desc.worker) assert evmanager evmanager.pop_event(event) - # If event expired, send a cancelled event back to worker - if expired: - event.desc.type = nfp_event.EVENT_EXPIRED - evmanager.dispatch_event(event, inc_load=False, cache=False) except KeyError as kerr: kerr = kerr message = "(event - %s) - completed, not in cache" % ( event.identify()) LOG.debug(message) except AssertionError as aerr: - aerr = aerr - # No event manager for the event, worker could have got - # killed, ignore. - message = "(event - %s) - assertion error" % ( - event.identify()) + message = "%s" % (aerr.message) LOG.error(message) - pass finally: # Release the sequencer for this sequence, # so that next event can get scheduled. @@ -258,36 +270,29 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): def _stop_poll_event(self, event): try: - poll_event = self._event_cache[event.data['key']] - poll_event.desc.poll_desc = None - except KeyError: - message = "(event - uuid=%s) - polling event not in cache" % ( - event.data['key']) - LOG.debug(message) + to_stop = event.data['key'] + event.desc.uuid = to_stop + self._watchdog_cancel(event) + except Exception as e: + message = "Exception - %r - while handling"\ + "event - %s" % (e, event.identify()) + LOG.error(message) def _non_schedule_event(self, event): - if IS_POLL_EVENT_STOP(event): - self._stop_poll_event(event) - elif event.desc.type == nfp_event.POLL_EVENT: - message = "(event - %s) - polling for event, spacing(%d)" % ( - event.identify(), event.desc.poll_desc.spacing) - LOG.debug(message) - # If the poll event is new -> create one in cache, - # In most of the cases, polling is done for an existing - # event. - ref_uuid = event.desc.poll_desc.ref - if ref_uuid not in self._event_cache.keys(): - # Assign random worker for this poll event - event.desc.worker = self._resource_map.keys()[0] - self._event_cache[ref_uuid] = event - - cached_event = self._event_cache[ref_uuid] - cached_event.desc.poll_desc = event.desc.poll_desc - - self._controller.poll_add( - event, - event.desc.poll_desc.spacing, - self._event_timedout) + if event.desc.type == nfp_event.POLL_EVENT: + if event.desc.flag == nfp_event.POLL_EVENT_STOP: + self._stop_poll_event(event) + else: + message = "(event - %s) - polling for event, spacing(%d)" % ( + event.identify(), event.desc.poll_desc.spacing) + LOG.debug(message) + # If the poll event is generated without any parent + # event, then worker would not be pre-assigned. + # In such case, assign a random worker + if not event.desc.worker: + event.desc.worker = self._resource_map.keys()[0] + event.lifetime = event.desc.poll_desc.spacing + self._watchdog(event, handler=self._poll_timedout) else: message = "(event - %s) - Unknown non scheduled event" % ( event.identify()) @@ -316,8 +321,9 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): for event in events: message = "%s - processing event" % (event.identify()) LOG.debug(message) - - if IS_SCHEDULED_EVENT_ACK(event): + if IS_PATH_COMPLETE_EVENT(event): + self._handle_path_complete(event) + elif IS_SCHEDULED_EVENT_ACK(event): self._scheduled_event_ack(event) elif IS_SCHEDULED_NEW_EVENT(event): if IS_EVENT_GRAPH(event): @@ -338,6 +344,7 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): events = [] # Get events from sequencer events = self._event_sequencer.run() + events += nfp_path.run() for pid, event_manager in self._resource_map.iteritems(): events += event_manager.event_watcher(timeout=0.01) # Process the type of events received, dispatch only the @@ -353,8 +360,8 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): def _replace_child(self, killed, new): childrens = self._controller.get_childrens() wrap = childrens[new] - pipe, lock = wrap.child_pipe_map[new] - self.new_child(new, pipe, lock) + pipe = wrap.child_pipe_map[new] + self.new_child(new, pipe) new_em = self._resource_map[new] killed_em = self._resource_map[killed] new_em.init_from_event_manager(killed_em) @@ -414,36 +421,22 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): else: return self._resource_map.get(pid) - def _event_life_timedout(self, event): - """Callback for poller when event expires. """ - message = "(event - %s) - expired" % (event.identify()) - LOG.debug(message) - self._scheduled_event_complete(event, expired=True) - - def _event_timedout(self, event): + def _poll_timedout(self, event): """Callback for poller when event timesout. """ message = "(event - %s) - timedout" % (event.identify()) LOG.debug(message) - try: - assert event.desc.poll_desc - ref_event = self._event_cache[event.desc.poll_desc.ref] - assert ref_event.desc.poll_desc - evmanager = self._get_event_manager(ref_event.desc.worker) - assert evmanager - evmanager.dispatch_event( - event, event_type=nfp_event.POLL_EVENT, - inc_load=False, cache=False) - except KeyError as err: - err = err - message = "(event - %s) - timedout, not in cache" % ( - event.identify()) - LOG.error(message) - except AssertionError as aerr: - aerr = aerr - # Process associated with event could be killed. - # Ignore. - pass - def stash_event(self, event): - """Stash the given event. """ - self._stash.put(event) + try: + evmanager = self._get_event_manager(event.desc.worker) + message = "(event-%s) event manager not found" % (event.identify()) + assert evmanager, message + if nfp_path.schedule_event(event) == 'schedule': + evmanager.dispatch_event(event, + event_type=nfp_event.POLL_EVENT, + inc_load=False, cache=False) + except AssertionError as aerr: + LOG.error(aerr.message) + except Exception as e: + message = ("Unknown exception=%r - event=%s" % ( + e, event.identify())) + LOG.error(message) diff --git a/gbpservice/nfp/core/path.py b/gbpservice/nfp/core/path.py new file mode 100644 index 000000000..15ae9730f --- /dev/null +++ b/gbpservice/nfp/core/path.py @@ -0,0 +1,169 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from gbpservice.nfp.core import log as nfp_logging + +import collections + +deque = collections.deque + +LOG = nfp_logging.getLogger(__name__) + + +class Supress(object): + + def __init__(self, ignore_list=None): + self._ignore = ignore_list or [] + + def __enter__(self): + pass + + def __exit__(self, e_type, e_value, traceback): + if e_type in self._ignore: + return True + for exception in self._ignore: + if isinstance(e_type, exception): + return True + + +class Path(object): + + def __init__(self, name): + self._waitq = deque() + self.name = name + self.count = 0 + self.invalid = False + + def queue(self, event): + self._waitq.append(event) + + def pop(self): + events = [] + + with Supress([IndexError]): + events.append(self._waitq.popleft()) + return events + + def done(self): + self._waitq.clear() + +# {'key': {'current':Path, 'waiting':Path} +paths = {} + + +def run(): + for key, path in paths.iteritems(): + if path['current'].count == 0: + path['current'].done() + if path['waiting'].name != 'INVALID': + path['current'] = path['waiting'] + path['current'].invalid = False + path['waiting'] = Path('INVALID') + + events = [] + # Get any queued events in the current path + for key, path in paths.iteritems(): + events += path['current'].pop() + return events + + +def event_complete(event): + name = event.desc.path_type + key = event.desc.path_key + + if not name: + return + name = name.upper() + with Supress([KeyError]): + path = paths[key] + if path['current'].name != name: + return + path['current'].count -= 1 + + +def schedule_event(event): + name = event.desc.path_type + key = event.desc.path_key + + if not name: + return 'schedule' + + name = name.upper() + + try: + path = paths[key] + if path['current'].name == name: + if path['current'].invalid: + return 'discard' + path['current'].count += 1 + return 'schedule' + + if path['waiting'].name == name: + path['waiting'].queue(event) + return 'wait' + + if path['current'].name != name: + return 'discard' + except Exception: + return 'schedule' + return 'schedule' + + +def path_complete(path_type, key): + try: + path = paths[key] + if path['current'].name == path_type.upper() and ( + path['waiting'].name == 'INVALID'): + paths.pop(key) + except KeyError: + message = "Path completion - %s path does not exist with key %s" % ( + path_type, key) + LOG.debug(message) + + +def create_path(key): + # Create cannot progress if there is already a path + # with the same key in any state + try: + path = paths[key] + assert False, "Path (%s) with key (%s) is already in progress" % ( + path['current'].name, key) + except KeyError: + # Create new path + paths[key] = {'current': Path('CREATE'), 'waiting': Path('INVALID')} + + +def delete_path(key): + try: + path = paths[key] + if path['current'].name != 'DELETE': + path['waiting'] = Path('DELETE') + path['current'].invalid = True + else: + assert False, ("Delete Path (%s) with key (%s)" + "is already in progress" % ( + path['current'].name, key)) + except KeyError: + paths[key] = {'current': Path('DELETE'), 'waiting': Path('INVALID')} + + +def update_path(key): + # Update cannot progress if there is DELETE already in progress + # or DELETE already waiting. + try: + path = paths[key] + assert False, "Path (%s) with key (%s) is in progress" % ( + path.name, key) + except KeyError: + # Create new path + paths[key] = {'current': Path('UPDATE'), 'waiting': Path('INVALID')} diff --git a/gbpservice/nfp/core/poll.py b/gbpservice/nfp/core/poll.py deleted file mode 100644 index d6b350127..000000000 --- a/gbpservice/nfp/core/poll.py +++ /dev/null @@ -1,81 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import eventlet -import heapq -import sched -import time as pytime - -from oslo_service import loopingcall as oslo_looping_call -from oslo_service import periodic_task as oslo_periodic_task - -Scheduler = sched.scheduler - -"""Handles the queue of poll events. - - Derives from python scheduler, since base scheduler does - a tight loop and does not leave the invoked context. - Derived here to return if no event timedout, invoked - periodically by caller to check for timedout events. -""" - - -class NfpPollHandler(Scheduler): - - def __init__(self, conf): - self._conf = conf - Scheduler.__init__(self, pytime.time, eventlet.greenthread.sleep) - - def run(self): - """Run to find timedout event. """ - q = self._queue - timefunc = self.timefunc - pop = heapq.heappop - if q: - time, priority, action, argument = checked_event = q[0] - now = timefunc() - if now < time: - return - else: - event = pop(q) - # Verify that the event was not removed or altered - # by another thread after we last looked at q[0]. - if event is checked_event: - action(*argument) - else: - heapq.heappush(q, event) - - def poll_add(self, event, timeout, method): - """Enter the event to be polled. """ - self.enter(timeout, 1, method, (event,)) - -"""Periodic task to poll for timer events. - - Periodically checks for expiry of events. -""" - - -class PollingTask(oslo_periodic_task.PeriodicTasks): - - def __init__(self, conf, controller): - super(PollingTask, self).__init__(conf) - - self._controller = controller - pulse = oslo_looping_call.FixedIntervalLoopingCall( - self.run_periodic_tasks, None, None) - pulse.start( - interval=1, initial_delay=None) - - @oslo_periodic_task.periodic_task(spacing=1) - def poll(self, context): - # invoke the common class to handle event timeouts - self._controller.poll() diff --git a/gbpservice/nfp/core/sequencer.py b/gbpservice/nfp/core/sequencer.py index dc64b4458..3fb7ef5f3 100644 --- a/gbpservice/nfp/core/sequencer.py +++ b/gbpservice/nfp/core/sequencer.py @@ -73,6 +73,12 @@ class EventSequencer(object): def release(self): self._scheduled = None + def pop(self): + self.release() + events = list(self._waitq) + self._waitq.clear() + return events + def __init__(self): # Sequence of related events # {key: sequencer()} @@ -109,6 +115,13 @@ class EventSequencer(object): del self._sequencer[key] return events + def pop(self): + events = [] + sequencers = dict(self._sequencer) + for key, sequencer in sequencers.iteritems(): + events += sequencer.pop() + return events + def release(self, key, event): try: message = "(event - %s) checking to release" % (event.identify()) diff --git a/gbpservice/nfp/core/threadpool.py b/gbpservice/nfp/core/threadpool.py index 8439ecb41..b59acbcbe 100644 --- a/gbpservice/nfp/core/threadpool.py +++ b/gbpservice/nfp/core/threadpool.py @@ -94,5 +94,5 @@ class ThreadPool(object): except eventlet.greenlet.GreenletExit: pass except Exception as ex: - message = "Exception - %s" % (ex) + message = "Unexpected exception - %r" % (ex) LOG.error(message) diff --git a/gbpservice/nfp/core/watchdog.py b/gbpservice/nfp/core/watchdog.py new file mode 100644 index 000000000..e0427dc9e --- /dev/null +++ b/gbpservice/nfp/core/watchdog.py @@ -0,0 +1,113 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import heapq +import signal +from time import time + +from gbpservice.nfp.core import log as nfp_logging + + +LOG = nfp_logging.getLogger(__name__) + +alarmlist = [] + +__new_alarm = lambda t, f, a, k: (t + time(), f, a, k) +__next_alarm = lambda: int( + round(alarmlist[0][0] - time())) if alarmlist else None +__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1)) + + +class Watchdog(object): + + def __init__(self, callback, seconds=20 * 60, **kwargs): + self._seconds = seconds + self._callback = callback + self.kwargs = kwargs + + self._alarm = alarm(self._seconds, self.timedout) + + def timedout(self): + try: + self._callback(**self.kwargs) + except Exception as e: + message = "Unexpected exception - %s" % (e) + LOG.error(message) + + def cancel(self): + try: + cancel(self._alarm) + except ValueError: + pass + except Exception as e: + message = "Unexpected exception - %s" % (e) + LOG.error(message) + + +def __clear_alarm(): + """Clear an existing alarm. + + If the alarm signal was set to a callable other than our own, queue the + previous alarm settings. + """ + oldsec = signal.alarm(0) + oldfunc = signal.signal(signal.SIGALRM, __alarm_handler) + if oldsec > 0 and oldfunc != __alarm_handler: + heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {}))) + + +def __alarm_handler(*zargs): + """Handle an alarm by calling any due heap entries and resetting the alarm. + + Note that multiple heap entries might get called, especially if calling an + entry takes a lot of time. + """ + try: + nextt = __next_alarm() + while nextt is not None and nextt <= 0: + (tm, func, args, keys) = heapq.heappop(alarmlist) + func(*args, **keys) + nextt = __next_alarm() + finally: + if alarmlist: + __set_alarm() + + +def alarm(sec, func, *args, **keys): + """Set an alarm. + + When the alarm is raised in `sec` seconds, the handler will call `func`, + passing `args` and `keys`. Return the heap entry (which is just a big + tuple), so that it can be cancelled by calling `cancel()`. + """ + __clear_alarm() + try: + newalarm = __new_alarm(sec, func, args, keys) + heapq.heappush(alarmlist, newalarm) + return newalarm + finally: + __set_alarm() + + +def cancel(alarm): + """Cancel an alarm by passing the heap entry returned by `alarm()`. + + It is an error to try to cancel an alarm which has already occurred. + """ + __clear_alarm() + try: + alarmlist.remove(alarm) + heapq.heapify(alarmlist) + finally: + if alarmlist: + __set_alarm() diff --git a/gbpservice/nfp/core/worker.py b/gbpservice/nfp/core/worker.py index d6d8e3ab1..bdf60ebc0 100644 --- a/gbpservice/nfp/core/worker.py +++ b/gbpservice/nfp/core/worker.py @@ -10,6 +10,8 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet +import greenlet import os import sys import time @@ -18,12 +20,17 @@ import traceback from oslo_service import service as oslo_service from gbpservice.nfp.core import common as nfp_common +from gbpservice.nfp.core import context from gbpservice.nfp.core import event as nfp_event from gbpservice.nfp.core import log as nfp_logging +from gbpservice.nfp.core import watchdog as nfp_watchdog LOG = nfp_logging.getLogger(__name__) Service = oslo_service.Service identify = nfp_common.identify +WATCHDOG = nfp_watchdog.Watchdog + +DEFAULT_THREAD_TIMEOUT = (10 * 60) """Implements worker process. @@ -43,7 +50,6 @@ class NfpWorker(Service): self.parent_pipe = None # Pipe to recv/send messages to distributor self.pipe = None - self.lock = None # Cache of event handlers self.controller = None self._conf = conf @@ -60,16 +66,15 @@ class NfpWorker(Service): # Update the process type in controller. self.controller.PROCESS_TYPE = "worker" self.controller._pipe = self.pipe - self.controller._lock = self.lock self.event_handlers = self.controller.get_event_handlers() + + eventlet.spawn_n(self.controller._resending_task) + while True: try: event = None - self.controller.pipe_lock(self.lock) - ret = self.pipe.poll(0.1) - self.controller.pipe_unlock(self.lock) - if ret: - event = self.controller.pipe_recv(self.pipe, self.lock) + if self.pipe.poll(0.1): + event = self.controller.pipe_recv(self.pipe) if event: message = "%s - received event" % ( self._log_meta(event)) @@ -97,7 +102,7 @@ class NfpWorker(Service): desc.uuid = event.desc.uuid desc.flag = nfp_event.EVENT_ACK setattr(ack_event, 'desc', desc) - self.controller.pipe_send(self.pipe, self.lock, ack_event) + self.controller.pipe_send(self.pipe, ack_event) def _process_event(self, event): """Process & dispatch the event. @@ -108,38 +113,24 @@ class NfpWorker(Service): thread. """ if event.desc.type == nfp_event.SCHEDULE_EVENT: - self._send_event_ack(event) eh, _ = ( self.event_handlers.get_event_handler( event.id, module=event.desc.target)) - self.dispatch(eh.handle_event, event) + self.dispatch(eh.handle_event, event, eh=eh) elif event.desc.type == nfp_event.POLL_EVENT: self.dispatch(self._handle_poll_event, event) - elif event.desc.type == nfp_event.EVENT_EXPIRED: - eh, _ = ( - self.event_handlers.get_event_handler( - event.id, module=event.desc.target)) - self.dispatch(eh.event_cancelled, event, 'EXPIRED') - - def _build_poll_status(self, ret, event): - status = {'poll': True, 'event': event} - if ret: - status['poll'] = ret.get('poll', status['poll']) - status['event'] = ret.get('event', status['event']) - status['event'].desc = event.desc - - return status def _repoll(self, ret, event, eh): - status = self._build_poll_status(ret, event) - if status['poll']: + if ret.get('poll', False): message = ("(event - %s) - repolling event -" "pending times - %d") % ( event.identify(), event.desc.poll_desc.max_times) LOG.debug(message) if event.desc.poll_desc.max_times: - self.controller.pipe_send(self.pipe, self.lock, - status['event']) + self.controller.poll_event( + event, + spacing=event.desc.poll_desc.spacing, + max_times=event.desc.poll_desc.max_times) else: message = ("(event - %s) - max timed out," "calling event_cancelled") % (event.identify()) @@ -147,7 +138,7 @@ class NfpWorker(Service): eh.event_cancelled(event, 'MAX_TIMED_OUT') def _handle_poll_event(self, event): - ret = {} + ret = {'poll': False} event.desc.poll_desc.max_times -= 1 module = event.desc.target poll_handler, _ = ( @@ -155,45 +146,77 @@ class NfpWorker(Service): event_handler, _ = ( self.event_handlers.get_event_handler(event.id, module=module)) try: - ret = poll_handler(event) - except TypeError: - ret = poll_handler(event_handler, event) + try: + ret = poll_handler(event) + except TypeError: + ret = poll_handler(event_handler, event) + if not ret: + ret = {'poll': True} + except greenlet.GreenletExit: + pass except Exception as exc: - message = "Exception from module's poll handler - %s" % exc + message = "Exception - %r" % (exc) LOG.error(message) + ret = self.dispatch_exception(event_handler, event, exc) + if not ret: + ret = {'poll': False} + self._repoll(ret, event, event_handler) - def log_dispatch(self, handler, event, *args): + def _dispatch(self, handler, event, *args, **kwargs): + event.context['log_context']['namespace'] = event.desc.target + context.init(event.context) try: - event.context['namespace'] = event.desc.target - nfp_logging.store_logging_context(**(event.context)) + handler(event, *args) + except greenlet.GreenletExit: + self.controller.event_complete(event, result='FAILED') + except Exception as exc: + # How to log traceback propery ?? + message = "Exception - %r" % (exc) + LOG.error(message) + self.dispatch_exception(kwargs.get('eh'), event, exc) + self.controller.event_complete(event, result="FAILED") finally: - try: - handler(event, *args) - nfp_logging.clear_logging_context() - except Exception as exc: - exc_type, exc_value, exc_traceback = sys.exc_info() - message = "Exception from module's event handler - %s" % (exc) - LOG.error(message) - # REVISIT(ashu): Format this traceback log properly. - # Currently, it is a single string, but there are some - # newline characters, which can be use to print it properly. - message = ("Traceback: %s" % traceback.format_exception( - exc_type, exc_value, exc_traceback)) - LOG.error(message) + self._send_event_ack(event) - def dispatch(self, handler, event, *args): + def dispatch_exception(self, event_handler, event, exception): + ret = {} + try: + ret = event_handler.handle_exception(event, exception) + except Exception: + exc_type, exc_value, exc_traceback = sys.exc_info() + message = "Traceback: %s" % traceback.format_exception( + exc_type, exc_value, exc_traceback) + LOG.error(message) + finally: + return ret + + def thread_done(self, th, watchdog=None): + if watchdog: + watchdog.cancel() + + def thread_timedout(self, thread=None): + if thread: + eventlet.greenthread.kill(thread.thread) + + def dispatch(self, handler, event, *args, **kwargs): if self._threads: - self.tg.add_thread(self.log_dispatch, handler, event, *args) + th = self.tg.add_thread( + self._dispatch, handler, event, *args, **kwargs) message = "%s - (handler - %s) - dispatched to thread " % ( self._log_meta(), identify(handler)) LOG.debug(message) + wd = WATCHDOG(self.thread_timedout, + seconds=DEFAULT_THREAD_TIMEOUT, thread=th) + th.link(self.thread_done, watchdog=wd) else: try: handler(event, *args) message = "%s - (handler - %s) - invoked" % ( self._log_meta(), identify(handler)) LOG.debug(message) + self._send_event_ack(event) except Exception as exc: - message = "Exception from module's event handler - %s" % exc + message = "Exception from module's event handler - %s" % (exc) LOG.error(message) + self.dispatch_exception(kwargs.get('eh'), event, exc)