diff --git a/.testr.conf b/.testr.conf index 858fae61f..e6d51d85e 100644 --- a/.testr.conf +++ b/.testr.conf @@ -2,6 +2,6 @@ test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \ OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \ OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-120} \ - ${PYTHON:-python} -m subunit.run discover -t ./ ${OS_TEST_PATH:-./gbpservice} $LISTOPT $IDOPTION + ${PYTHON:-python} -m subunit.run discover -t ./ ${OS_TEST_PATH:-./gbpservice/neutron/tests/unit/nfp/core/} $LISTOPT $IDOPTION test_id_option=--load-list $IDFILE test_list_option=--list diff --git a/gbpservice/neutron/tests/unit/nfp/core/nfp_module.py b/gbpservice/neutron/tests/unit/nfp/core/nfp_module.py index 11a1db531..c8c6d0483 100644 --- a/gbpservice/neutron/tests/unit/nfp/core/nfp_module.py +++ b/gbpservice/neutron/tests/unit/nfp/core/nfp_module.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License.from gbpservice.neutron.nsf.core import main +from gbpservice.nfp.core import context as nfp_context from gbpservice.nfp.core import event from gbpservice.nfp.core import module as nfp_api from oslo_log import log as logging @@ -30,6 +31,9 @@ class EventsHandler(nfp_api.NfpEventHandler): self.controller = controller def handle_event(self, event): + event.context['log_context']['namespace'] = event.desc.target + nfp_context.init(event.context) + if event.id == 'TEST_EVENT_ACK_FROM_WORKER': self.controller.event_ack_handler_cb_obj.set() diff --git a/gbpservice/neutron/tests/unit/nfp/core/test_process_model.py b/gbpservice/neutron/tests/unit/nfp/core/test_process_model.py index 031416c18..e0a2db54b 100644 --- a/gbpservice/neutron/tests/unit/nfp/core/test_process_model.py +++ b/gbpservice/neutron/tests/unit/nfp/core/test_process_model.py @@ -10,9 +10,11 @@ # License for the specific language governing permissions and limitations # under the License. +from gbpservice.nfp.core import context as nfp_context from gbpservice.nfp.core import controller as nfp_controller from gbpservice.nfp.core import event as nfp_event from gbpservice.nfp.core import log as nfp_logging +from gbpservice.nfp.core import manager as nfp_manager from gbpservice.nfp.core import worker as nfp_worker import mock import multiprocessing as multiprocessing @@ -51,11 +53,10 @@ class MockedPipe(object): class MockedProcess(object): def __init__(self, parent_pipe=None, child_pipe=None, - lock=None, controller=None): + controller=None): self.parent_pipe = parent_pipe self.child_pipe = child_pipe - self.lock = lock self.controller = controller self.daemon = True self.pid = random.randint(8888, 9999) @@ -64,14 +65,12 @@ class MockedProcess(object): self.worker = nfp_worker.NfpWorker({}, threads=0) self.worker.parent_pipe = self.parent_pipe self.worker.pipe = self.child_pipe - self.worker.lock = self.lock self.worker.controller = nfp_controller.NfpController( self.controller._conf, singleton=False) # fork a new controller object self.worker.controller.PROCESS_TYPE = "worker" self.worker.controller._pipe = self.worker.pipe - self.worker.controller._lock = self.worker.lock self.worker.controller._event_handlers = ( self.controller._event_handlers) self.worker.event_handlers = self.controller.get_event_handlers() @@ -82,34 +81,31 @@ class MockedProcess(object): self.controller._process_event) -class MockedLock(object): - def __init__(self): - pass - - def acquire(self): - pass - - def release(self): - pass - - def mocked_pipe(**kwargs): return MockedPipe(), MockedPipe() def mocked_process(target=None, args=None): return MockedProcess(parent_pipe=args[1], - child_pipe=args[2], lock=args[3], - controller=args[4]) - - -def mocked_lock(): - return MockedLock() + child_pipe=args[2], + controller=args[3]) nfp_controller.PIPE = mocked_pipe nfp_controller.PROCESS = mocked_process -nfp_controller.LOCK = mocked_lock + + +class MockedWatchdog(object): + + def __init__(self, handler, seconds=1, event=None): + if event and event.desc.type == 'poll_event': + # time.sleep(seconds) + handler(event=event) + + def cancel(self): + pass + +nfp_manager.WATCHDOG = MockedWatchdog class Object(object): @@ -120,6 +116,9 @@ class Object(object): class Test_Process_Model(unittest.TestCase): + def setUp(self): + nfp_context.init() + def _mocked_fork(self, args): proc = Object() pid = random.randint(8888, 9999) @@ -277,7 +276,10 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(False) - def mocked_pipe_send(self, pipe, lock, event): + def mocked_compress(self, event): + pass + + def mocked_pipe_send(self, pipe, event): if event.id == 'EVENT_1': if hasattr(event, 'desc'): if event.desc.worker: @@ -328,10 +330,12 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(called) @mock.patch( - 'gbpservice.nfp.core.controller.NfpController.pipe_send' - ) - def test_load_distribution_to_workers(self, mock_pipe_send): + 'gbpservice.nfp.core.controller.NfpController.pipe_send') + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_load_distribution_to_workers(self, mock_compress, mock_pipe_send): mock_pipe_send.side_effect = self.mocked_pipe_send + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -501,20 +505,22 @@ class Test_Process_Model(unittest.TestCase): controller.event_complete(event_1) controller.event_complete(event_2) - @mock.patch( - 'gbpservice.nfp.core.controller.NfpController.pipe_send' - ) - def test_poll_event(self, mock_pipe_send): - mock_pipe_send.side_effect = self.mocked_pipe_send - conf = oslo_config.CONF - conf.nfp_modules_path = NFP_MODULES_PATH - controller = nfp_controller.NfpController(conf, singleton=False) - self.controller = controller - nfp_controller.load_nfp_modules(conf, controller) - # Mock launching of a worker - controller.launch(1) - controller._update_manager() - self.controller = controller + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.pipe_send') + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_poll_event(self, mock_compress, mock_pipe_send): + mock_pipe_send.side_effect = self.mocked_pipe_send + mock_compress.side_effect = self.mocked_compress + conf = oslo_config.CONF + conf.nfp_modules_path = NFP_MODULES_PATH + controller = nfp_controller.NfpController(conf, singleton=False) + self.controller = controller + nfp_controller.load_nfp_modules(conf, controller) + # Mock launching of a worker + controller.launch(1) + controller._update_manager() + self.controller = controller wait_obj = multiprocessing.Event() setattr(controller, 'poll_event_wait_obj', wait_obj) @@ -526,6 +532,9 @@ class Test_Process_Model(unittest.TestCase): setattr(event, 'desc', desc) event.desc.worker = controller.get_childrens().keys()[0] + ctx = nfp_context.get() + ctx['log_context']['namespace'] = 'nfp_module' + controller.poll_event(event, spacing=1) # controller._manager.manager_run() @@ -533,7 +542,7 @@ class Test_Process_Model(unittest.TestCase): # relinquish for 1sec time.sleep(1) - controller.poll() + # controller.poll() controller.poll_event_wait_obj.wait(0.1) called = controller.poll_event_wait_obj.is_set() end_time = time.time() @@ -541,9 +550,11 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(round(end_time - start_time) == 1.0) @mock.patch( - 'gbpservice.nfp.core.controller.NfpController.pipe_send' - ) - def test_poll_event_with_no_worker(self, mock_pipe_send): + 'gbpservice.nfp.core.controller.NfpController.pipe_send') + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_poll_event_with_no_worker(self, mock_compress, mock_pipe_send): + mock_compress.side_effect = self.mocked_compress mock_pipe_send.side_effect = self.mocked_pipe_send conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH @@ -566,6 +577,9 @@ class Test_Process_Model(unittest.TestCase): # Explicitly make it none event.desc.worker = None + ctx = nfp_context.get() + ctx['log_context']['namespace'] = 'nfp_module' + controller.poll_event(event, spacing=1) # controller._manager.manager_run() @@ -573,7 +587,7 @@ class Test_Process_Model(unittest.TestCase): # relinquish for 1sec time.sleep(1) - controller.poll() + # controller.poll() controller.poll_event_wait_obj.wait(0.1) called = controller.poll_event_wait_obj.is_set() end_time = time.time() @@ -581,10 +595,14 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(round(end_time - start_time) == 1.0) @mock.patch( - 'gbpservice.nfp.core.controller.NfpController.pipe_send' - ) - def test_poll_event_with_decorator_spacing(self, mock_pipe_send): + 'gbpservice.nfp.core.controller.NfpController.pipe_send') + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_poll_event_with_decorator_spacing(self, + mock_compress, mock_pipe_send): + mock_pipe_send.side_effect = self.mocked_pipe_send + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -606,6 +624,8 @@ class Test_Process_Model(unittest.TestCase): # Explicitly make it none event.desc.worker = None + ctx = nfp_context.get() + ctx['log_context']['namespace'] = 'nfp_module' controller.poll_event(event) # controller._manager.manager_run() @@ -613,14 +633,17 @@ class Test_Process_Model(unittest.TestCase): # relinquish for 2secs time.sleep(2) - controller.poll() + # controller.poll() controller.poll_event_dec_wait_obj.wait(0.1) called = controller.poll_event_dec_wait_obj.is_set() end_time = time.time() self.assertTrue(called) self.assertTrue(round(end_time - start_time) == 2.0) - def test_poll_event_with_no_spacing(self): + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_poll_event_with_no_spacing(self, mock_compress): + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -643,7 +666,10 @@ class Test_Process_Model(unittest.TestCase): # self.assertTrue(False) self.assertTrue(True) - def test_poll_event_with_no_handler(self): + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_poll_event_with_no_handler(self, mock_compress): + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -666,10 +692,12 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(False) @mock.patch( - 'gbpservice.nfp.core.manager.NfpResourceManager._event_acked' - ) - def test_event_ack_from_worker(self, mock_event_acked): + 'gbpservice.nfp.core.manager.NfpResourceManager._event_acked') + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress') + def test_event_ack_from_worker(self, mock_event_acked, mock_compress): mock_event_acked.side_effect = self._mocked_event_ack + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -704,7 +732,11 @@ class Test_Process_Model(unittest.TestCase): called = controller.event_ack_handler_cb_obj.is_set() self.assertTrue(called) - def test_post_event_from_worker(self): + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress' + ) + def test_post_event_from_worker(self, mock_compress): + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -734,7 +766,11 @@ class Test_Process_Model(unittest.TestCase): called = controller.post_event_worker_wait_obj.is_set() self.assertTrue(called) - def test_poll_event_from_worker(self): + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress' + ) + def test_poll_event_from_worker(self, mock_compress): + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -768,13 +804,17 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(called) time.sleep(1) - controller.poll() + # controller.poll() controller.poll_event_poll_wait_obj.wait(1) called = controller.poll_event_poll_wait_obj.is_set() self.assertTrue(called) - def test_poll_event_cancelled_from_worker(self): + @mock.patch( + 'gbpservice.nfp.core.controller.NfpController.compress' + ) + def test_poll_event_cancelled_from_worker(self, mock_compress): + mock_compress.side_effect = self.mocked_compress conf = oslo_config.CONF conf.nfp_modules_path = NFP_MODULES_PATH controller = nfp_controller.NfpController(conf, singleton=False) @@ -810,14 +850,14 @@ class Test_Process_Model(unittest.TestCase): self.assertTrue(called) time.sleep(1) - controller.poll() + # controller.poll() controller.poll_event_poll_wait_obj.wait(1) called = controller.poll_event_poll_wait_obj.is_set() self.assertTrue(called) time.sleep(1) - controller.poll() + # controller.poll() controller.poll_event_poll_wait_obj.wait(1) called = controller.poll_event_poll_wait_obj.is_set() diff --git a/gbpservice/nfp/core/context.py b/gbpservice/nfp/core/context.py index ed93010dd..6317c230b 100644 --- a/gbpservice/nfp/core/context.py +++ b/gbpservice/nfp/core/context.py @@ -12,28 +12,85 @@ import threading -nfp_context_store = threading.local() + +class LogContext(object): + + def __init__(self, data): + self.data = data + + def purge(self): + if self.data: + return { + 'meta_id': self.data.get('meta_id', '-'), + 'nfi_id': self.data.get('nfi_id', '-'), + 'nfd_id': self.data.get('nfd_id', '-'), + 'path': self.data.get('path'), + 'auth_token': self.data.get('auth_token'), + 'namespace': self.data.get('namespace') + } + return self.data + + +class CoreContext(object): + + def __init__(self, data): + self.data = data + + def purge(self): + return { + 'log_context': LogContext(self.data.get('log_context')).purge(), + 'event_desc': self.data.get('event_desc') + } class NfpContext(object): - def __init__(self, context): - self.context = context + def __init__(self, data): + self.data = data - def get_context(self): - return self.context + def purge(self): + return CoreContext(self.data).purge() -def store_nfp_context(context): - nfp_context_store.context = NfpContext(context) +Context = threading.local() -def clear_nfp_context(): - nfp_context_store.context = None +def init_log_context(): + return { + 'meta_id': '-', + 'nfi_id': '-', + 'nfd_id': '-', + 'path': '-', + 'auth_token': None, + 'namespace': None + } -def get_nfp_context(): - context = getattr(nfp_context_store, 'context', None) - if context: - return context.get_context() - return {} +def init(data=None): + if not data: + data = {} + if 'log_context' not in data.keys(): + data['log_context'] = init_log_context() + if 'event_desc' not in data.keys(): + data['event_desc'] = {} + Context.context = NfpContext(data) + context = getattr(Context, 'context') + return context.data + + +def get(): + try: + context = getattr(Context, 'context') + return context.data + except AttributeError: + return init() + + +def purge(): + try: + context = getattr(Context, 'context') + return context.purge() + except AttributeError: + init() + context = getattr(Context, 'context') + return context.purge() diff --git a/gbpservice/nfp/core/controller.py b/gbpservice/nfp/core/controller.py index 6e8474696..491493bba 100644 --- a/gbpservice/nfp/core/controller.py +++ b/gbpservice/nfp/core/controller.py @@ -14,11 +14,11 @@ import ast import eventlet eventlet.monkey_patch() +import collections import multiprocessing import operator import os import pickle -import Queue import sys import time import zlib @@ -28,11 +28,11 @@ from oslo_service import service as oslo_service from gbpservice.nfp.core import cfg as nfp_cfg from gbpservice.nfp.core import common as nfp_common +from gbpservice.nfp.core import context from gbpservice.nfp.core import event as nfp_event from gbpservice.nfp.core import launcher as nfp_launcher from gbpservice.nfp.core import log as nfp_logging from gbpservice.nfp.core import manager as nfp_manager -from gbpservice.nfp.core import poll as nfp_poll from gbpservice.nfp.core import rpc as nfp_rpc from gbpservice.nfp.core import worker as nfp_worker @@ -42,9 +42,9 @@ from neutron.common import config LOG = nfp_logging.getLogger(__name__) PIPE = multiprocessing.Pipe -LOCK = multiprocessing.Lock PROCESS = multiprocessing.Process identify = nfp_common.identify +deque = collections.deque # REVISIT (mak): fix to pass compliance check config = config @@ -76,8 +76,8 @@ class NfpService(object): def register_events(self, event_descs, priority=0): """Register event handlers with core. """ - logging_context = nfp_logging.get_logging_context() - module = logging_context['namespace'] + nfp_context = context.get() + module = nfp_context['log_context']['namespace'] # REVISIT (mak): change name to register_event_handlers() ? for event_desc in event_descs: self._event_handlers.register( @@ -104,22 +104,12 @@ class NfpService(object): event = None try: event = nfp_event.Event(**kwargs) - # Get the logging context stored in thread - logging_context = nfp_logging.get_logging_context() - # Log metadata for event handling code - event.context = logging_context except AssertionError as aerr: message = "%s" % (aerr) LOG.exception(message) return event def post_graph(self, graph_nodes, root_node): - """Post graph. - - Since graph is also implemneted with events, - first post all the node events followed by - root node event. - """ for node in graph_nodes: self.post_event(node) @@ -138,6 +128,13 @@ class NfpService(object): event.desc.flag = nfp_event.EVENT_NEW event.desc.pid = os.getpid() event.desc.target = module + if event.lifetime == -1: + event.lifetime = nfp_event.EVENT_DEFAULT_LIFETIME + if not event.context: + # Log nfp_context for event handling code + event.context = context.purge() + event.desc.path_type = event.context['event_desc'].get('path_type') + event.desc.path_key = event.context['event_desc'].get('path_key') return event # REVISIT (mak): spacing=0, caller must explicitly specify @@ -148,35 +145,46 @@ class NfpService(object): descriptor preparation. NfpController class implements the required functionality. """ - logging_context = nfp_logging.get_logging_context() - module = logging_context['namespace'] + nfp_context = context.get() + module = nfp_context['log_context']['namespace'] handler, ev_spacing = ( self._event_handlers.get_poll_handler(event.id, module=module)) assert handler, "No poll handler found for event %s" % (event.id) assert spacing or ev_spacing, "No spacing specified for polling" if ev_spacing: spacing = ev_spacing - refuuid = event.desc.uuid - event = self._make_new_event(event) - event.lifetime = 0 + if event.desc.type != nfp_event.POLL_EVENT: + event = self._make_new_event(event) + event.desc.uuid = event.desc.uuid + ":" + "POLL_EVENT" event.desc.type = nfp_event.POLL_EVENT event.desc.target = module + event.desc.flag = None kwargs = {'spacing': spacing, - 'max_times': max_times, - 'ref': refuuid} + 'max_times': max_times} poll_desc = nfp_event.PollDesc(**kwargs) setattr(event.desc, 'poll_desc', poll_desc) + + if not event.context: + # Log nfp_context for event handling code + event.context = context.purge() + event.desc.path_type = event.context['event_desc'].get('path_type') + event.desc.path_key = event.context['event_desc'].get('path_key') return event def event_complete(self, event, result=None): """To declare and event complete. """ try: pickle.dumps(result) + uuid = event.desc.uuid + event = self._make_new_event(event) + event.desc.uuid = uuid event.sequence = False event.desc.flag = nfp_event.EVENT_COMPLETE event.result = result + event.context = {} + event.data = {} return event except Exception as e: raise e @@ -224,60 +232,84 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): self._conf = conf self._pipe = None # Queue to stash events. - self._stashq = multiprocessing.Queue() + self._stashq = deque() self._manager = nfp_manager.NfpResourceManager(conf, self) self._worker = nfp_worker.NfpWorker(conf) - self._poll_handler = nfp_poll.NfpPollHandler(conf) # ID of process handling this controller obj self.PROCESS_TYPE = "distributor" def compress(self, event): # REVISIT (mak) : zip only if length is > than threshold (1k maybe) - if event.data and not event.zipped: + if not event.zipped: event.zipped = True - event.data = zlib.compress(str({'cdata': event.data})) + data = {'context': event.context} + event.context = {} + if event.data: + data['data'] = event.data + event.data = zlib.compress(str(data)) def decompress(self, event): - if event.data and event.zipped: + if event.zipped: try: data = ast.literal_eval( zlib.decompress(event.data)) - event.data = data['cdata'] + event.data = data.get('data') + event.context = data['context'] event.zipped = False except Exception as e: - message = "Failed to decompress event data, Reason: %s" % ( + message = "Failed to decompress event data, Reason: %r" % ( e) LOG.error(message) raise e - def pipe_lock(self, lock): - if lock: - lock.acquire() + def is_picklable(self, event): + """To check event is picklable or not. + For sending event through pipe it must be picklable + """ + try: + pickle.dumps(event) + except Exception as e: + message = "(event - %s) is not picklable, Reason: %s" % ( + event.identify(), e) + assert False, message - def pipe_unlock(self, lock): - if lock: - lock.release() - - def pipe_recv(self, pipe, lock): - self.pipe_lock(lock) - event = pipe.recv() - self.pipe_unlock(lock) + def pipe_recv(self, pipe): + event = None + try: + event = pipe.recv() + except Exception as exc: + LOG.debug("Failed to receive event from pipe " + "with exception - %r - will retry.." % (exc)) + eventlet.greenthread.sleep(1.0) if event: self.decompress(event) return event - def pipe_send(self, pipe, lock, event): + def pipe_send(self, pipe, event, resending=False): + self.is_picklable(event) + try: - self.compress(event) - self.pipe_lock(lock) - pipe.send(event) - self.pipe_unlock(lock) + # If there is no reader yet + if not pipe.poll(): + self.compress(event) + pipe.send(event) + return True except Exception as e: - message = "Failed to send data via pipe, Reason: %s" % (e) - LOG.error(message) - raise e + message = ("Failed to send event - %s via pipe" + "- exception - %r - will resend" % ( + event.identify(), e)) + LOG.debug(message) + + # If the event is being sent by resending task + # then dont append here, task will put back the + # event at right location + if not resending: + # If couldnt send event.. stash it so that + # resender task will send event again + self._stashq.append(event) + return False def _fork(self, args): proc = PROCESS(target=self.child, args=args) @@ -285,6 +317,28 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): proc.start() return proc + def _resending_task(self): + while(True): + try: + event = self._stashq.popleft() + if self.PROCESS_TYPE != "worker": + evm = self._manager._get_event_manager(event.desc.worker) + LOG.debug("Resending event - %s" % (event.identify())) + sent = self.pipe_send(evm._pipe, event, resending=True) + else: + sent = self.pipe_send(self._pipe, event, resending=True) + # Put back in front + if not sent: + self._stashq.appendleft(event) + except IndexError: + pass + except Exception as e: + message = ("Unexpected exception - %r - while" + "sending event - %s" % (e, event.identify())) + LOG.error(message) + + eventlet.greenthread.sleep(0.1) + def _manager_task(self): while True: # Run 'Manager' here to monitor for workers and @@ -295,9 +349,9 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): def _update_manager(self): childs = self.get_childrens() for pid, wrapper in childs.iteritems(): - pipe, lock = wrapper.child_pipe_map[pid] + pipe = wrapper.child_pipe_map[pid] # Inform 'Manager' class about the new_child. - self._manager.new_child(pid, pipe, lock) + self._manager.new_child(pid, pipe) def _process_event(self, event): self._manager.process_events([event]) @@ -318,29 +372,21 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): parent_pipe, child_pipe = PIPE(duplex=True) - # Sometimes Resource Temporarily Not Available (errno=11) - # is observed with python pipe. There could be many reasons, - # One theory is if read & - # write happens at the same instant, pipe does report this - # error. Using lock to avoid this. - lock = LOCK() - # Registered event handlers of nfp module. # Workers need copy of this data to dispatch an # event to module. - proc = self._fork(args=(wrap.service, parent_pipe, child_pipe, - lock, self)) + proc = self._fork(args=(wrap.service, parent_pipe, child_pipe, self)) message = ("Forked a new child: %d" "Parent Pipe: % s, Child Pipe: % s") % ( - proc.pid, str(parent_pipe), str(child_pipe)) + proc.pid, str(parent_pipe), str(child_pipe)) LOG.info(message) try: - wrap.child_pipe_map[proc.pid] = (parent_pipe, lock) + wrap.child_pipe_map[proc.pid] = parent_pipe except AttributeError: setattr(wrap, 'child_pipe_map', {}) - wrap.child_pipe_map[proc.pid] = (parent_pipe, lock) + wrap.child_pipe_map[proc.pid] = parent_pipe self._worker_process[proc.pid] = proc return proc.pid @@ -388,20 +434,10 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): # One task to manage the resources - workers & events. eventlet.spawn_n(self._manager_task) - # Oslo periodic task to poll for timer events - nfp_poll.PollingTask(self._conf, self) + eventlet.spawn_n(self._resending_task) # Oslo periodic task for state reporting nfp_rpc.ReportStateTask(self._conf, self) - def poll_add(self, event, timeout, callback): - """Add an event to poller. """ - self._poll_handler.poll_add( - event, timeout, callback) - - def poll(self): - """Invoked in periodic task to poll for timedout events. """ - self._poll_handler.run() - def report_state(self): """Invoked by report_task to report states of all agents. """ for value in self._rpc_agents.itervalues(): @@ -472,7 +508,7 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): LOG.debug(message) # Send it to the distributor process - self.pipe_send(self._pipe, self._lock, event) + self.pipe_send(self._pipe, event) else: message = ("(event - %s) - new event in distributor" "processing event") % (event.identify()) @@ -518,7 +554,7 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): event = super(NfpController, self).poll_event( event, spacing=spacing, max_times=max_times) # Send to the distributor process. - self.pipe_send(self._pipe, self._lock, event) + self.pipe_send(self._pipe, event) def stop_poll_event(self, key, id): """To stop the running poll event @@ -526,58 +562,26 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): :param key: key of polling event :param id: id of polling event """ - key = key + ":" + id + key = key + ":" + id + ":" + "POLL_EVENT" event = self.new_event(id='STOP_POLL_EVENT', data={'key': key}) event.desc.type = nfp_event.POLL_EVENT event.desc.flag = nfp_event.POLL_EVENT_STOP if self.PROCESS_TYPE == "worker": - self.pipe_send(self._pipe, self._lock, event) + self.pipe_send(self._pipe, event) else: self._manager.process_events([event]) - def stash_event(self, event): - """To stash an event. - - This will be invoked by worker process. - Put this event in queue, distributor will - pick it up. - - Executor: worker-process + def path_complete_event(self): + """Create event for path completion """ - if self.PROCESS_TYPE == "distributor": - message = "(event - %s) - distributor cannot stash" % ( - event.identify()) - LOG.debug(message) + nfp_context = context.get() + event = self.new_event(id='PATH_COMPLETE') + event.desc.path_type = nfp_context['event_desc'].get('path_type') + event.desc.path_key = nfp_context['event_desc'].get('path_key') + if self.PROCESS_TYPE == "worker": + self.pipe_send(self._pipe, event) else: - message = "(event - %s) - stashed" % (event.identify()) - LOG.debug(message) - self._stashq.put(event) - - def get_stashed_events(self): - """To get stashed events. - - Returns available number of stashed events - as list. Will be invoked by distributor, - worker cannot pull. - - Executor: distributor-process - """ - events = [] - # return at max 5 events - maxx = 1 - # wait sometime for first event in the queue - timeout = 0.1 - while maxx: - try: - event = self._stashq.get(timeout=timeout) - self.decompress(event) - events.append(event) - timeout = 0 - maxx -= 1 - except Queue.Empty: - maxx = 0 - pass - return events + self._manager.process_events([event]) def event_complete(self, event, result=None): """To mark an event complete. @@ -600,7 +604,7 @@ class NfpController(nfp_launcher.NfpLauncher, NfpService): self._manager.process_events([event]) else: # Send to the distributor process. - self.pipe_send(self._pipe, self._lock, event) + self.pipe_send(self._pipe, event) def load_nfp_modules(conf, controller): @@ -615,6 +619,7 @@ def load_nfp_modules(conf, controller): def load_nfp_modules_from_path(conf, controller, path): """ Load all nfp modules from configured directory. """ pymodules = [] + nfp_context = context.get() try: base_module = __import__(path, globals(), locals(), ['modules'], -1) @@ -629,7 +634,7 @@ def load_nfp_modules_from_path(conf, controller, path): pymodule = eval('pymodule.%s' % (pyfile[:-3])) try: namespace = pyfile[:-3].split(".")[-1] - nfp_logging.store_logging_context(namespace=namespace) + nfp_context['log_context']['namespace'] = namespace pymodule.nfp_module_init(controller, conf) pymodules += [pymodule] message = "(module - %s) - Initialized" % ( @@ -664,10 +669,11 @@ def controller_init(conf, nfp_controller): def nfp_modules_post_init(conf, nfp_modules, nfp_controller): + nfp_context = context.get() for module in nfp_modules: try: namespace = module.__name__.split(".")[-1] - nfp_logging.store_logging_context(namespace=namespace) + nfp_context['log_context']['namespace'] = namespace module.nfp_module_post_init(nfp_controller, conf) except AttributeError: message = ("(module - %s) - does not implement" @@ -701,6 +707,7 @@ def load_module_opts(conf): def main(): + context.init() args, module = extract_module(sys.argv[1:]) conf = nfp_cfg.init(module, args) conf.module = module diff --git a/gbpservice/nfp/core/event.py b/gbpservice/nfp/core/event.py index 032e85c37..1622a0824 100644 --- a/gbpservice/nfp/core/event.py +++ b/gbpservice/nfp/core/event.py @@ -26,7 +26,6 @@ identify = nfp_common.identify SCHEDULE_EVENT = 'schedule_event' POLL_EVENT = 'poll_event' STASH_EVENT = 'stash_event' -EVENT_EXPIRED = 'event_expired' """Event Flag """ EVENT_NEW = 'new_event' @@ -34,6 +33,8 @@ EVENT_COMPLETE = 'event_done' EVENT_ACK = 'event_ack' POLL_EVENT_STOP = 'poll_event_stop' +EVENT_DEFAULT_LIFETIME = 600 + """Sequencer status. """ SequencerEmpty = nfp_seq.SequencerEmpty SequencerBusy = nfp_seq.SequencerBusy @@ -86,19 +87,29 @@ class EventDesc(object): self.target = None # ID of graph of which this event is part of self.graph = None + # Type of path to which this event belongs CREATE/UPDATE/DELETE + self.path_type = kwargs.get('path_type') + # Unique key for the path + self.path_key = kwargs.get('path_key') + # Marks whether an event was acked or not + self.acked = False def from_desc(self, desc): self.type = desc.type self.flag = desc.flag self.worker = desc.worker self.poll_desc = desc.poll_desc + self.path_type = desc.path_type + self.path_key = desc.path_key def to_dict(self): return {'uuid': self.uuid, 'type': self.type, 'flag': self.flag, 'worker': self.worker, - 'poll_desc': self.poll_desc + 'poll_desc': self.poll_desc, + 'path_type': self.path_type, + 'path_key': self.path_key } """Defines the event structure. @@ -126,7 +137,7 @@ class Event(object): # Handler of the event. self.handler = kwargs.get('handler') # Lifetime of the event in seconds. - self.lifetime = kwargs.get('lifetime', 0) + self.lifetime = kwargs.get('lifetime', -1) # Identifies whether event.data is zipped self.zipped = False # Log metadata context @@ -247,13 +258,12 @@ class NfpEventHandlers(object): def get_poll_handler(self, event_id, module=None): """Get the poll handler for event_id. """ - ph = None - spacing = 0 + ph, spacing = None, None try: if module: ph = self._event_desc_table[event_id]['modules'][module][0][1] - spacing = self._event_desc_table[event_id]['modules'][ - module][0][2] + spacing = self._event_desc_table[ + event_id]['modules'][module][0][2] else: priorities = ( self._event_desc_table[event_id]['priority'].keys()) @@ -261,8 +271,8 @@ class NfpEventHandlers(object): ph = ( self._event_desc_table[ event_id]['priority'][priority][0][1]) - spacing = self._event_desc_table[event_id]['priority'][ - priority][0][2] + spacing = self._event_desc_table[ + event_id]['priority'][priority][0][2] finally: message = "%s - Returning poll handler" % ( self._log_meta(event_id, ph)) @@ -292,16 +302,13 @@ class NfpEventHandlers(object): class NfpEventManager(object): - def __init__(self, conf, controller, sequencer, pipe=None, pid=-1, - lock=None): + def __init__(self, conf, controller, sequencer, pipe=None, pid=-1): self._conf = conf self._controller = controller # PID of process to which this event manager is associated self._pid = pid # Duplex pipe to read & write events self._pipe = pipe - # Lock over pipe access - self._lock = lock # Cache of UUIDs of events which are dispatched to # the worker which is handled by this em. self._cache = deque() @@ -315,7 +322,7 @@ class NfpEventManager(object): else: return "(event_manager - %d" % (self._pid) - def _wait_for_events(self, pipe, lock, timeout=0.01): + def _wait_for_events(self, pipe, timeout=0.01): """Wait & pull event from the pipe. Wait till timeout for the first event and then @@ -324,12 +331,11 @@ class NfpEventManager(object): """ events = [] try: - self._controller.pipe_lock(lock) ret = pipe.poll(timeout) - self._controller.pipe_unlock(lock) if ret: - event = self._controller.pipe_recv(pipe, lock) - events.append(event) + event = self._controller.pipe_recv(pipe) + if event: + events.append(event) except multiprocessing.TimeoutError as err: message = "%s" % (err) LOG.exception(message) @@ -373,7 +379,7 @@ class NfpEventManager(object): verr = verr message = "%s - event not in cache" % ( self._log_meta(event)) - LOG.warn(message) + LOG.debug(message) def dispatch_event(self, event, event_type=None, inc_load=True, cache=True): @@ -392,7 +398,7 @@ class NfpEventManager(object): if event_type: event.desc.type = event_type # Send to the worker - self._controller.pipe_send(self._pipe, self._lock, event) + self._controller.pipe_send(self._pipe, event) self._load = (self._load + 1) if inc_load else self._load # Add to the cache @@ -401,4 +407,4 @@ class NfpEventManager(object): def event_watcher(self, timeout=0.01): """Watch for events. """ - return self._wait_for_events(self._pipe, self._lock, timeout=timeout) + return self._wait_for_events(self._pipe, timeout=timeout) diff --git a/gbpservice/nfp/core/executor.py b/gbpservice/nfp/core/executor.py index dd7d53398..80b986df9 100644 --- a/gbpservice/nfp/core/executor.py +++ b/gbpservice/nfp/core/executor.py @@ -13,6 +13,7 @@ from argparse import Namespace +from gbpservice.nfp.core import context from gbpservice.nfp.core import log as nfp_logging from gbpservice.nfp.core import threadpool as core_tp @@ -84,6 +85,10 @@ class TaskExecutor(object): self.pipe_line = [] self.fired = False + def dispatch(self, job): + context.init() + return job['method'](*job['args'], **job['kwargs']) + @check_in_use def fire(self): self.fired = True @@ -92,8 +97,7 @@ class TaskExecutor(object): "TaskExecutor - (job - %s) dispatched" % (str(job))) - th = self.thread_pool.dispatch( - job['method'], *job['args'], **job['kwargs']) + th = self.thread_pool.dispatch(self.dispatch, job) job['thread'] = th for job in self.pipe_line: @@ -214,7 +218,7 @@ class EventGraphExecutor(object): graph = self._graph(completed_node) if graph: if completed_node == graph['root']: - #Graph is complete here, remove from running_instances + # Graph is complete here, remove from running_instances self.running.pop(graph['id']) else: root = self._root(graph, completed_node) diff --git a/gbpservice/nfp/core/launcher.py b/gbpservice/nfp/core/launcher.py index c3774648c..eeda92137 100644 --- a/gbpservice/nfp/core/launcher.py +++ b/gbpservice/nfp/core/launcher.py @@ -11,6 +11,7 @@ # under the License. import os +import signal import time from oslo_service import service as oslo_service @@ -33,12 +34,24 @@ ProcessLauncher = oslo_service.ProcessLauncher class NfpLauncher(ProcessLauncher): def __init__(self, conf): + # Add SIGALARM to ignore_signals, because core + # uses SIGALRM for watchdog, while oslo uses the + # same for exit. + # Signal handler is singleton class, changing here will + # have global effect. + self.signal_handler = oslo_service.SignalHandler() + self.signal_handler._ignore_signals += ('SIGALRM',) + self.signal_handler._signals_by_name = dict( + (name, getattr(signal, name)) + for name in dir(signal) + if name.startswith("SIG") and + name not in self.signal_handler._ignore_signals) + super(NfpLauncher, self).__init__(conf) - def child(self, service, ppipe, cpipe, lock, controller): + def child(self, service, ppipe, cpipe, controller): service.parent_pipe = ppipe service.pipe = cpipe - service.lock = lock service.controller = controller self.launcher = self._child_process(service) while True: diff --git a/gbpservice/nfp/core/log.py b/gbpservice/nfp/core/log.py index cf8e1b838..02467eda6 100644 --- a/gbpservice/nfp/core/log.py +++ b/gbpservice/nfp/core/log.py @@ -14,10 +14,11 @@ from oslo_config import cfg as oslo_config from oslo_log import log as oslo_logging from oslo_utils import importutils +from gbpservice.nfp.core import context + import logging import os import sys -import threading EVENT = 50 logging.addLevelName(EVENT, "EVENT") @@ -80,9 +81,15 @@ class WrappedLogger(logging.Logger): return rv def _get_nfp_msg(self, msg): - context = getattr(logging_context_store, 'context', None) - if context: - msg = "%s %s" % (context.emit(), msg) + nfp_context = context.get() + log_context = nfp_context['log_context'] + if log_context: + ctxt = "[%s] [NFI:%s] [NFD:%s]" % (log_context.get( + 'meta_id', '-'), + log_context.get('nfi_id', '-'), + log_context.get('nfd_id', '-')) + msg = "%s %s" % (ctxt, msg) + component = '' if hasattr(CONF, 'module'): component = CONF.module @@ -94,7 +101,6 @@ class WrappedLogger(logging.Logger): # Prefix log meta id with every log if project is 'nfp' if extra and extra.get('project') == 'nfp': msg = self._get_nfp_msg(msg) - return super(WrappedLogger, self).makeRecord( name, level, fn, lno, msg, args, exc_info, func=func, extra=extra) @@ -104,57 +110,8 @@ def init_logger(logger_class): logging.setLoggerClass(importutils.import_class(logger_class)) -logging_context_store = threading.local() - - -class NfpLogContext(object): - - def __init__(self, **kwargs): - self.meta_id = kwargs.get('meta_id', '-') - self.nfi_id = kwargs.get('nfi_id', '-') - self.nfd_id = kwargs.get('nfd_id', '-') - self.path = kwargs.get('path', '-') - self.auth_token = kwargs.get('auth_token', '') - self.namespace = kwargs.get('namespace', '') - - def emit(self): - return "[%s] [NFI:%s] [NFD:%s]" % (self.meta_id, self.nfi_id, - self.nfd_id) - - def to_dict(self): - return { - 'meta_id': self.meta_id, - 'nfi_id': self.nfi_id, - 'nfd_id': self.nfd_id, - 'path': self.path, - 'auth_token': self.auth_token, - 'namespace': self.namespace} - - def getLogger(name, **kwargs): kwargs.update(project='nfp') logger = NfpLogAdapter(logging.getLogger(name), kwargs) return logger - - -def store_logging_context(**kwargs): - context = NfpLogContext(**kwargs) - logging_context_store.context = context - - -def update_logging_context(**kwargs): - for key, val in kwargs.iteritems(): - if hasattr(logging_context_store.context, key): - setattr(logging_context_store.context, key, val) - - -def clear_logging_context(**kwargs): - logging_context_store.context = None - - -def get_logging_context(): - context = getattr(logging_context_store, 'context', None) - if context: - return context.to_dict() - return {} diff --git a/gbpservice/nfp/core/manager.py b/gbpservice/nfp/core/manager.py index 500146d95..74e7d2921 100644 --- a/gbpservice/nfp/core/manager.py +++ b/gbpservice/nfp/core/manager.py @@ -10,25 +10,23 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import os from gbpservice.nfp.core import event as nfp_event from gbpservice.nfp.core import executor as nfp_executor from gbpservice.nfp.core import log as nfp_logging +from gbpservice.nfp.core import path as nfp_path from gbpservice.nfp.core import sequencer as nfp_sequencer +from gbpservice.nfp.core import watchdog as nfp_watchdog LOG = nfp_logging.getLogger(__name__) NfpEventManager = nfp_event.NfpEventManager NfpGraphExecutor = nfp_executor.EventGraphExecutor - -deque = collections.deque +WATCHDOG = nfp_watchdog.Watchdog def IS_SCHEDULED_EVENT_ACK(event): - return event.desc.type == nfp_event.SCHEDULE_EVENT and ( - event.desc.flag == nfp_event.EVENT_ACK - ) + return event.desc.flag == nfp_event.EVENT_ACK def IS_SCHEDULED_NEW_EVENT(event): @@ -45,9 +43,9 @@ def IS_EVENT_GRAPH(event): return event.desc.graph -def IS_POLL_EVENT_STOP(event): - return event.desc.type == nfp_event.POLL_EVENT and ( - event.desc.flag == nfp_event.POLL_EVENT_STOP) +def IS_PATH_COMPLETE_EVENT(event): + return event.id == 'PATH_COMPLETE' + """Manages the forked childs. @@ -63,7 +61,7 @@ class NfpProcessManager(object): self._controller = controller self._child_snapshot = [] - def new_child(self, pid, pipe, lock): + def new_child(self, pid, pipe): # Pass, as we will learn from comparision as watcher pass @@ -105,8 +103,8 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): self._resource_map = {} # Cache of event objects - {'uuid':} self._event_cache = {} - # Not processed. Events Stored for future. - self._stash = deque() + # watchdog object mapping with event id - {'uuid':} + self._watchdog_map = {} # ID of the distributor process self._distributor_process_id = os.getpid() # Single sequencer to be used by all event managers @@ -117,7 +115,7 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): NfpProcessManager.__init__(self, conf, controller) NfpEventManager.__init__(self, conf, controller, self._event_sequencer) - def new_child(self, pid, pipe, lock): + def new_child(self, pid, pipe): """Invoked when a new child is spawned. Associates an event manager with this child, maintains @@ -130,9 +128,9 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): ev_manager = NfpEventManager( self._conf, self._controller, self._event_sequencer, - pipe=pipe, pid=pid, lock=lock) + pipe=pipe, pid=pid) self._resource_map.update(dict({pid: ev_manager})) - super(NfpResourceManager, self).new_child(pid, pipe, lock) + super(NfpResourceManager, self).new_child(pid, pipe) def manager_run(self): """Invoked periodically to check on resources. @@ -149,12 +147,8 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): def _event_acked(self, event): """Post handling after event is dispatched to worker. """ - if event.lifetime: - message = "(event - %s) - dispatched, polling for expiry" % ( - event.identify()) - LOG.debug(message) - self._controller.poll_add( - event, event.lifetime, self._event_life_timedout) + event.desc.acked = True + nfp_path.event_complete(event) def _dispatch_event(self, event): """Dispatch event to a worker. """ @@ -165,6 +159,7 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): def _graph_event(self, event): if isinstance(event.desc.graph, dict): graph = event.desc.graph + # root = graph['root'] event.desc.graph = graph['id'] @@ -191,8 +186,17 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): # same event as non graph event event.desc.graph = None - # Dispatch to a worker - self._dispatch_event(event) + decision = nfp_path.schedule_event(event) + if decision == 'schedule': + # Dispatch to a worker + self._dispatch_event(event) + LOG.debug("Watchdog started for event - %s" % + (event.identify())) + self._watchdog(event) + elif decision == 'discard': + message = "Discarding path event - %s" % (event.identify()) + LOG.info(message) + self._controller.event_complete(event, result='FAILED') else: message = "(event - %s) - sequencing" % ( event.identify()) @@ -202,54 +206,62 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): return event.sequence - def _scheduled_event_ack(self, ack_event): + def _handle_path_complete(self, event): try: - event = self._event_cache[ack_event.desc.uuid] - evmanager = self._get_event_manager(event.desc.worker) - assert evmanager - # Pop from the pending list of evmanager - evmanager.pop_event(event) - # May be start polling for lifetime of event - self._event_acked(event) - except KeyError as kerr: - kerr = kerr - message = "(event - %s) - acked," - "missing from cache" % (event.identify()) - LOG.error(message) - except AssertionError as aerr: - aerr = aerr - message = "(event - %s) - acked," - "process handling is dead, event will be" - "replayed in new process" % (event.identify()) + path_type = event.desc.path_type + path_key = event.desc.path_key + nfp_path.path_complete(path_type, path_key) + except Exception as e: + message = "Exception - %r - while handling"\ + "event - %s" % (e, event.identify()) LOG.error(message) - def _scheduled_event_complete(self, event, expired=False): + def event_expired(self, event=None): + if event: + LOG.debug("Watchdog expired for event - %s" % (event.identify())) + self._watchdog_map.pop(event.desc.uuid, None) + self._controller.event_complete(event, result='FAILED') + + def _scheduled_event_ack(self, ack_event): + self._event_acked(ack_event) + + def _watchdog_cancel(self, event): + try: + LOG.debug("Watchdog cancelled for event - %s" % (event.identify())) + wd = self._watchdog_map.pop(event.desc.uuid) + wd.cancel() + except KeyError: + pass + + def _watchdog(self, event, handler=None): + if not handler: + handler = self.event_expired + if event.lifetime != -1: + wd = WATCHDOG(handler, + seconds=event.lifetime, + event=event) + self._watchdog_map[event.desc.uuid] = wd + + def _scheduled_event_complete(self, event): # Pop it from cache cached_event = None try: cached_event = self._event_cache.pop(event.desc.uuid) cached_event.result = event.result + # Mark the event as acked + self._watchdog_cancel(event) # Get the em managing the event evmanager = self._get_event_manager(event.desc.worker) assert evmanager evmanager.pop_event(event) - # If event expired, send a cancelled event back to worker - if expired: - event.desc.type = nfp_event.EVENT_EXPIRED - evmanager.dispatch_event(event, inc_load=False, cache=False) except KeyError as kerr: kerr = kerr message = "(event - %s) - completed, not in cache" % ( event.identify()) LOG.debug(message) except AssertionError as aerr: - aerr = aerr - # No event manager for the event, worker could have got - # killed, ignore. - message = "(event - %s) - assertion error" % ( - event.identify()) + message = "%s" % (aerr.message) LOG.error(message) - pass finally: # Release the sequencer for this sequence, # so that next event can get scheduled. @@ -258,36 +270,29 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): def _stop_poll_event(self, event): try: - poll_event = self._event_cache[event.data['key']] - poll_event.desc.poll_desc = None - except KeyError: - message = "(event - uuid=%s) - polling event not in cache" % ( - event.data['key']) - LOG.debug(message) + to_stop = event.data['key'] + event.desc.uuid = to_stop + self._watchdog_cancel(event) + except Exception as e: + message = "Exception - %r - while handling"\ + "event - %s" % (e, event.identify()) + LOG.error(message) def _non_schedule_event(self, event): - if IS_POLL_EVENT_STOP(event): - self._stop_poll_event(event) - elif event.desc.type == nfp_event.POLL_EVENT: - message = "(event - %s) - polling for event, spacing(%d)" % ( - event.identify(), event.desc.poll_desc.spacing) - LOG.debug(message) - # If the poll event is new -> create one in cache, - # In most of the cases, polling is done for an existing - # event. - ref_uuid = event.desc.poll_desc.ref - if ref_uuid not in self._event_cache.keys(): - # Assign random worker for this poll event - event.desc.worker = self._resource_map.keys()[0] - self._event_cache[ref_uuid] = event - - cached_event = self._event_cache[ref_uuid] - cached_event.desc.poll_desc = event.desc.poll_desc - - self._controller.poll_add( - event, - event.desc.poll_desc.spacing, - self._event_timedout) + if event.desc.type == nfp_event.POLL_EVENT: + if event.desc.flag == nfp_event.POLL_EVENT_STOP: + self._stop_poll_event(event) + else: + message = "(event - %s) - polling for event, spacing(%d)" % ( + event.identify(), event.desc.poll_desc.spacing) + LOG.debug(message) + # If the poll event is generated without any parent + # event, then worker would not be pre-assigned. + # In such case, assign a random worker + if not event.desc.worker: + event.desc.worker = self._resource_map.keys()[0] + event.lifetime = event.desc.poll_desc.spacing + self._watchdog(event, handler=self._poll_timedout) else: message = "(event - %s) - Unknown non scheduled event" % ( event.identify()) @@ -316,8 +321,9 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): for event in events: message = "%s - processing event" % (event.identify()) LOG.debug(message) - - if IS_SCHEDULED_EVENT_ACK(event): + if IS_PATH_COMPLETE_EVENT(event): + self._handle_path_complete(event) + elif IS_SCHEDULED_EVENT_ACK(event): self._scheduled_event_ack(event) elif IS_SCHEDULED_NEW_EVENT(event): if IS_EVENT_GRAPH(event): @@ -338,6 +344,7 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): events = [] # Get events from sequencer events = self._event_sequencer.run() + events += nfp_path.run() for pid, event_manager in self._resource_map.iteritems(): events += event_manager.event_watcher(timeout=0.01) # Process the type of events received, dispatch only the @@ -353,8 +360,8 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): def _replace_child(self, killed, new): childrens = self._controller.get_childrens() wrap = childrens[new] - pipe, lock = wrap.child_pipe_map[new] - self.new_child(new, pipe, lock) + pipe = wrap.child_pipe_map[new] + self.new_child(new, pipe) new_em = self._resource_map[new] killed_em = self._resource_map[killed] new_em.init_from_event_manager(killed_em) @@ -414,36 +421,22 @@ class NfpResourceManager(NfpProcessManager, NfpEventManager): else: return self._resource_map.get(pid) - def _event_life_timedout(self, event): - """Callback for poller when event expires. """ - message = "(event - %s) - expired" % (event.identify()) - LOG.debug(message) - self._scheduled_event_complete(event, expired=True) - - def _event_timedout(self, event): + def _poll_timedout(self, event): """Callback for poller when event timesout. """ message = "(event - %s) - timedout" % (event.identify()) LOG.debug(message) - try: - assert event.desc.poll_desc - ref_event = self._event_cache[event.desc.poll_desc.ref] - assert ref_event.desc.poll_desc - evmanager = self._get_event_manager(ref_event.desc.worker) - assert evmanager - evmanager.dispatch_event( - event, event_type=nfp_event.POLL_EVENT, - inc_load=False, cache=False) - except KeyError as err: - err = err - message = "(event - %s) - timedout, not in cache" % ( - event.identify()) - LOG.error(message) - except AssertionError as aerr: - aerr = aerr - # Process associated with event could be killed. - # Ignore. - pass - def stash_event(self, event): - """Stash the given event. """ - self._stash.put(event) + try: + evmanager = self._get_event_manager(event.desc.worker) + message = "(event-%s) event manager not found" % (event.identify()) + assert evmanager, message + if nfp_path.schedule_event(event) == 'schedule': + evmanager.dispatch_event(event, + event_type=nfp_event.POLL_EVENT, + inc_load=False, cache=False) + except AssertionError as aerr: + LOG.error(aerr.message) + except Exception as e: + message = ("Unknown exception=%r - event=%s" % ( + e, event.identify())) + LOG.error(message) diff --git a/gbpservice/nfp/core/path.py b/gbpservice/nfp/core/path.py new file mode 100644 index 000000000..15ae9730f --- /dev/null +++ b/gbpservice/nfp/core/path.py @@ -0,0 +1,169 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from gbpservice.nfp.core import log as nfp_logging + +import collections + +deque = collections.deque + +LOG = nfp_logging.getLogger(__name__) + + +class Supress(object): + + def __init__(self, ignore_list=None): + self._ignore = ignore_list or [] + + def __enter__(self): + pass + + def __exit__(self, e_type, e_value, traceback): + if e_type in self._ignore: + return True + for exception in self._ignore: + if isinstance(e_type, exception): + return True + + +class Path(object): + + def __init__(self, name): + self._waitq = deque() + self.name = name + self.count = 0 + self.invalid = False + + def queue(self, event): + self._waitq.append(event) + + def pop(self): + events = [] + + with Supress([IndexError]): + events.append(self._waitq.popleft()) + return events + + def done(self): + self._waitq.clear() + +# {'key': {'current':Path, 'waiting':Path} +paths = {} + + +def run(): + for key, path in paths.iteritems(): + if path['current'].count == 0: + path['current'].done() + if path['waiting'].name != 'INVALID': + path['current'] = path['waiting'] + path['current'].invalid = False + path['waiting'] = Path('INVALID') + + events = [] + # Get any queued events in the current path + for key, path in paths.iteritems(): + events += path['current'].pop() + return events + + +def event_complete(event): + name = event.desc.path_type + key = event.desc.path_key + + if not name: + return + name = name.upper() + with Supress([KeyError]): + path = paths[key] + if path['current'].name != name: + return + path['current'].count -= 1 + + +def schedule_event(event): + name = event.desc.path_type + key = event.desc.path_key + + if not name: + return 'schedule' + + name = name.upper() + + try: + path = paths[key] + if path['current'].name == name: + if path['current'].invalid: + return 'discard' + path['current'].count += 1 + return 'schedule' + + if path['waiting'].name == name: + path['waiting'].queue(event) + return 'wait' + + if path['current'].name != name: + return 'discard' + except Exception: + return 'schedule' + return 'schedule' + + +def path_complete(path_type, key): + try: + path = paths[key] + if path['current'].name == path_type.upper() and ( + path['waiting'].name == 'INVALID'): + paths.pop(key) + except KeyError: + message = "Path completion - %s path does not exist with key %s" % ( + path_type, key) + LOG.debug(message) + + +def create_path(key): + # Create cannot progress if there is already a path + # with the same key in any state + try: + path = paths[key] + assert False, "Path (%s) with key (%s) is already in progress" % ( + path['current'].name, key) + except KeyError: + # Create new path + paths[key] = {'current': Path('CREATE'), 'waiting': Path('INVALID')} + + +def delete_path(key): + try: + path = paths[key] + if path['current'].name != 'DELETE': + path['waiting'] = Path('DELETE') + path['current'].invalid = True + else: + assert False, ("Delete Path (%s) with key (%s)" + "is already in progress" % ( + path['current'].name, key)) + except KeyError: + paths[key] = {'current': Path('DELETE'), 'waiting': Path('INVALID')} + + +def update_path(key): + # Update cannot progress if there is DELETE already in progress + # or DELETE already waiting. + try: + path = paths[key] + assert False, "Path (%s) with key (%s) is in progress" % ( + path.name, key) + except KeyError: + # Create new path + paths[key] = {'current': Path('UPDATE'), 'waiting': Path('INVALID')} diff --git a/gbpservice/nfp/core/poll.py b/gbpservice/nfp/core/poll.py deleted file mode 100644 index d6b350127..000000000 --- a/gbpservice/nfp/core/poll.py +++ /dev/null @@ -1,81 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import eventlet -import heapq -import sched -import time as pytime - -from oslo_service import loopingcall as oslo_looping_call -from oslo_service import periodic_task as oslo_periodic_task - -Scheduler = sched.scheduler - -"""Handles the queue of poll events. - - Derives from python scheduler, since base scheduler does - a tight loop and does not leave the invoked context. - Derived here to return if no event timedout, invoked - periodically by caller to check for timedout events. -""" - - -class NfpPollHandler(Scheduler): - - def __init__(self, conf): - self._conf = conf - Scheduler.__init__(self, pytime.time, eventlet.greenthread.sleep) - - def run(self): - """Run to find timedout event. """ - q = self._queue - timefunc = self.timefunc - pop = heapq.heappop - if q: - time, priority, action, argument = checked_event = q[0] - now = timefunc() - if now < time: - return - else: - event = pop(q) - # Verify that the event was not removed or altered - # by another thread after we last looked at q[0]. - if event is checked_event: - action(*argument) - else: - heapq.heappush(q, event) - - def poll_add(self, event, timeout, method): - """Enter the event to be polled. """ - self.enter(timeout, 1, method, (event,)) - -"""Periodic task to poll for timer events. - - Periodically checks for expiry of events. -""" - - -class PollingTask(oslo_periodic_task.PeriodicTasks): - - def __init__(self, conf, controller): - super(PollingTask, self).__init__(conf) - - self._controller = controller - pulse = oslo_looping_call.FixedIntervalLoopingCall( - self.run_periodic_tasks, None, None) - pulse.start( - interval=1, initial_delay=None) - - @oslo_periodic_task.periodic_task(spacing=1) - def poll(self, context): - # invoke the common class to handle event timeouts - self._controller.poll() diff --git a/gbpservice/nfp/core/sequencer.py b/gbpservice/nfp/core/sequencer.py index dc64b4458..3fb7ef5f3 100644 --- a/gbpservice/nfp/core/sequencer.py +++ b/gbpservice/nfp/core/sequencer.py @@ -73,6 +73,12 @@ class EventSequencer(object): def release(self): self._scheduled = None + def pop(self): + self.release() + events = list(self._waitq) + self._waitq.clear() + return events + def __init__(self): # Sequence of related events # {key: sequencer()} @@ -109,6 +115,13 @@ class EventSequencer(object): del self._sequencer[key] return events + def pop(self): + events = [] + sequencers = dict(self._sequencer) + for key, sequencer in sequencers.iteritems(): + events += sequencer.pop() + return events + def release(self, key, event): try: message = "(event - %s) checking to release" % (event.identify()) diff --git a/gbpservice/nfp/core/threadpool.py b/gbpservice/nfp/core/threadpool.py index 8439ecb41..b59acbcbe 100644 --- a/gbpservice/nfp/core/threadpool.py +++ b/gbpservice/nfp/core/threadpool.py @@ -94,5 +94,5 @@ class ThreadPool(object): except eventlet.greenlet.GreenletExit: pass except Exception as ex: - message = "Exception - %s" % (ex) + message = "Unexpected exception - %r" % (ex) LOG.error(message) diff --git a/gbpservice/nfp/core/watchdog.py b/gbpservice/nfp/core/watchdog.py new file mode 100644 index 000000000..e0427dc9e --- /dev/null +++ b/gbpservice/nfp/core/watchdog.py @@ -0,0 +1,113 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import heapq +import signal +from time import time + +from gbpservice.nfp.core import log as nfp_logging + + +LOG = nfp_logging.getLogger(__name__) + +alarmlist = [] + +__new_alarm = lambda t, f, a, k: (t + time(), f, a, k) +__next_alarm = lambda: int( + round(alarmlist[0][0] - time())) if alarmlist else None +__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1)) + + +class Watchdog(object): + + def __init__(self, callback, seconds=20 * 60, **kwargs): + self._seconds = seconds + self._callback = callback + self.kwargs = kwargs + + self._alarm = alarm(self._seconds, self.timedout) + + def timedout(self): + try: + self._callback(**self.kwargs) + except Exception as e: + message = "Unexpected exception - %s" % (e) + LOG.error(message) + + def cancel(self): + try: + cancel(self._alarm) + except ValueError: + pass + except Exception as e: + message = "Unexpected exception - %s" % (e) + LOG.error(message) + + +def __clear_alarm(): + """Clear an existing alarm. + + If the alarm signal was set to a callable other than our own, queue the + previous alarm settings. + """ + oldsec = signal.alarm(0) + oldfunc = signal.signal(signal.SIGALRM, __alarm_handler) + if oldsec > 0 and oldfunc != __alarm_handler: + heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {}))) + + +def __alarm_handler(*zargs): + """Handle an alarm by calling any due heap entries and resetting the alarm. + + Note that multiple heap entries might get called, especially if calling an + entry takes a lot of time. + """ + try: + nextt = __next_alarm() + while nextt is not None and nextt <= 0: + (tm, func, args, keys) = heapq.heappop(alarmlist) + func(*args, **keys) + nextt = __next_alarm() + finally: + if alarmlist: + __set_alarm() + + +def alarm(sec, func, *args, **keys): + """Set an alarm. + + When the alarm is raised in `sec` seconds, the handler will call `func`, + passing `args` and `keys`. Return the heap entry (which is just a big + tuple), so that it can be cancelled by calling `cancel()`. + """ + __clear_alarm() + try: + newalarm = __new_alarm(sec, func, args, keys) + heapq.heappush(alarmlist, newalarm) + return newalarm + finally: + __set_alarm() + + +def cancel(alarm): + """Cancel an alarm by passing the heap entry returned by `alarm()`. + + It is an error to try to cancel an alarm which has already occurred. + """ + __clear_alarm() + try: + alarmlist.remove(alarm) + heapq.heapify(alarmlist) + finally: + if alarmlist: + __set_alarm() diff --git a/gbpservice/nfp/core/worker.py b/gbpservice/nfp/core/worker.py index d6d8e3ab1..bdf60ebc0 100644 --- a/gbpservice/nfp/core/worker.py +++ b/gbpservice/nfp/core/worker.py @@ -10,6 +10,8 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet +import greenlet import os import sys import time @@ -18,12 +20,17 @@ import traceback from oslo_service import service as oslo_service from gbpservice.nfp.core import common as nfp_common +from gbpservice.nfp.core import context from gbpservice.nfp.core import event as nfp_event from gbpservice.nfp.core import log as nfp_logging +from gbpservice.nfp.core import watchdog as nfp_watchdog LOG = nfp_logging.getLogger(__name__) Service = oslo_service.Service identify = nfp_common.identify +WATCHDOG = nfp_watchdog.Watchdog + +DEFAULT_THREAD_TIMEOUT = (10 * 60) """Implements worker process. @@ -43,7 +50,6 @@ class NfpWorker(Service): self.parent_pipe = None # Pipe to recv/send messages to distributor self.pipe = None - self.lock = None # Cache of event handlers self.controller = None self._conf = conf @@ -60,16 +66,15 @@ class NfpWorker(Service): # Update the process type in controller. self.controller.PROCESS_TYPE = "worker" self.controller._pipe = self.pipe - self.controller._lock = self.lock self.event_handlers = self.controller.get_event_handlers() + + eventlet.spawn_n(self.controller._resending_task) + while True: try: event = None - self.controller.pipe_lock(self.lock) - ret = self.pipe.poll(0.1) - self.controller.pipe_unlock(self.lock) - if ret: - event = self.controller.pipe_recv(self.pipe, self.lock) + if self.pipe.poll(0.1): + event = self.controller.pipe_recv(self.pipe) if event: message = "%s - received event" % ( self._log_meta(event)) @@ -97,7 +102,7 @@ class NfpWorker(Service): desc.uuid = event.desc.uuid desc.flag = nfp_event.EVENT_ACK setattr(ack_event, 'desc', desc) - self.controller.pipe_send(self.pipe, self.lock, ack_event) + self.controller.pipe_send(self.pipe, ack_event) def _process_event(self, event): """Process & dispatch the event. @@ -108,38 +113,24 @@ class NfpWorker(Service): thread. """ if event.desc.type == nfp_event.SCHEDULE_EVENT: - self._send_event_ack(event) eh, _ = ( self.event_handlers.get_event_handler( event.id, module=event.desc.target)) - self.dispatch(eh.handle_event, event) + self.dispatch(eh.handle_event, event, eh=eh) elif event.desc.type == nfp_event.POLL_EVENT: self.dispatch(self._handle_poll_event, event) - elif event.desc.type == nfp_event.EVENT_EXPIRED: - eh, _ = ( - self.event_handlers.get_event_handler( - event.id, module=event.desc.target)) - self.dispatch(eh.event_cancelled, event, 'EXPIRED') - - def _build_poll_status(self, ret, event): - status = {'poll': True, 'event': event} - if ret: - status['poll'] = ret.get('poll', status['poll']) - status['event'] = ret.get('event', status['event']) - status['event'].desc = event.desc - - return status def _repoll(self, ret, event, eh): - status = self._build_poll_status(ret, event) - if status['poll']: + if ret.get('poll', False): message = ("(event - %s) - repolling event -" "pending times - %d") % ( event.identify(), event.desc.poll_desc.max_times) LOG.debug(message) if event.desc.poll_desc.max_times: - self.controller.pipe_send(self.pipe, self.lock, - status['event']) + self.controller.poll_event( + event, + spacing=event.desc.poll_desc.spacing, + max_times=event.desc.poll_desc.max_times) else: message = ("(event - %s) - max timed out," "calling event_cancelled") % (event.identify()) @@ -147,7 +138,7 @@ class NfpWorker(Service): eh.event_cancelled(event, 'MAX_TIMED_OUT') def _handle_poll_event(self, event): - ret = {} + ret = {'poll': False} event.desc.poll_desc.max_times -= 1 module = event.desc.target poll_handler, _ = ( @@ -155,45 +146,77 @@ class NfpWorker(Service): event_handler, _ = ( self.event_handlers.get_event_handler(event.id, module=module)) try: - ret = poll_handler(event) - except TypeError: - ret = poll_handler(event_handler, event) + try: + ret = poll_handler(event) + except TypeError: + ret = poll_handler(event_handler, event) + if not ret: + ret = {'poll': True} + except greenlet.GreenletExit: + pass except Exception as exc: - message = "Exception from module's poll handler - %s" % exc + message = "Exception - %r" % (exc) LOG.error(message) + ret = self.dispatch_exception(event_handler, event, exc) + if not ret: + ret = {'poll': False} + self._repoll(ret, event, event_handler) - def log_dispatch(self, handler, event, *args): + def _dispatch(self, handler, event, *args, **kwargs): + event.context['log_context']['namespace'] = event.desc.target + context.init(event.context) try: - event.context['namespace'] = event.desc.target - nfp_logging.store_logging_context(**(event.context)) + handler(event, *args) + except greenlet.GreenletExit: + self.controller.event_complete(event, result='FAILED') + except Exception as exc: + # How to log traceback propery ?? + message = "Exception - %r" % (exc) + LOG.error(message) + self.dispatch_exception(kwargs.get('eh'), event, exc) + self.controller.event_complete(event, result="FAILED") finally: - try: - handler(event, *args) - nfp_logging.clear_logging_context() - except Exception as exc: - exc_type, exc_value, exc_traceback = sys.exc_info() - message = "Exception from module's event handler - %s" % (exc) - LOG.error(message) - # REVISIT(ashu): Format this traceback log properly. - # Currently, it is a single string, but there are some - # newline characters, which can be use to print it properly. - message = ("Traceback: %s" % traceback.format_exception( - exc_type, exc_value, exc_traceback)) - LOG.error(message) + self._send_event_ack(event) - def dispatch(self, handler, event, *args): + def dispatch_exception(self, event_handler, event, exception): + ret = {} + try: + ret = event_handler.handle_exception(event, exception) + except Exception: + exc_type, exc_value, exc_traceback = sys.exc_info() + message = "Traceback: %s" % traceback.format_exception( + exc_type, exc_value, exc_traceback) + LOG.error(message) + finally: + return ret + + def thread_done(self, th, watchdog=None): + if watchdog: + watchdog.cancel() + + def thread_timedout(self, thread=None): + if thread: + eventlet.greenthread.kill(thread.thread) + + def dispatch(self, handler, event, *args, **kwargs): if self._threads: - self.tg.add_thread(self.log_dispatch, handler, event, *args) + th = self.tg.add_thread( + self._dispatch, handler, event, *args, **kwargs) message = "%s - (handler - %s) - dispatched to thread " % ( self._log_meta(), identify(handler)) LOG.debug(message) + wd = WATCHDOG(self.thread_timedout, + seconds=DEFAULT_THREAD_TIMEOUT, thread=th) + th.link(self.thread_done, watchdog=wd) else: try: handler(event, *args) message = "%s - (handler - %s) - invoked" % ( self._log_meta(), identify(handler)) LOG.debug(message) + self._send_event_ack(event) except Exception as exc: - message = "Exception from module's event handler - %s" % exc + message = "Exception from module's event handler - %s" % (exc) LOG.error(message) + self.dispatch_exception(kwargs.get('eh'), event, exc)