Synchronize notification listener and evaluator queue

Change-Id: Ib16c36f174698d9644757dcc670a6816919f0e79
This commit is contained in:
Idan Hefetz 2017-07-19 07:44:33 +00:00
parent e2b964183c
commit 6714cc4dd4
1 changed files with 15 additions and 10 deletions

View File

@ -13,6 +13,7 @@
# under the License.
import datetime
import threading
from oslo_log import log
import oslo_messaging
@ -42,7 +43,8 @@ class VitrageGraphService(os_service.Service):
self.processor = proc.Processor(self.conf,
self.init,
e_graph=entity_graph)
self.listener = self._create_datasources_event_listener(conf)
self.processor_lock = threading.RLock()
self.listener = self._create_datasources_event_listener()
def start(self):
LOG.info("Vitrage Graph Service - Starting...")
@ -65,10 +67,6 @@ class VitrageGraphService(os_service.Service):
LOG.info("Vitrage Graph Service - Stopped!")
def _process_events(self):
while True:
self._process_event_non_blocking()
def _process_event_non_blocking(self):
"""Process events received from datasource
@ -77,6 +75,7 @@ class VitrageGraphService(os_service.Service):
seconds and goes to sleep for 1 second. if there are more events in
the queue they are done when timer returns.
"""
self.processor_lock.acquire()
start_time = datetime.datetime.now()
while not self.evaluator_queue.empty():
time_delta = datetime.datetime.now() - start_time
@ -84,6 +83,7 @@ class VitrageGraphService(os_service.Service):
break
if not self.evaluator_queue.empty():
self.do_process(self.evaluator_queue)
self.processor_lock.release()
def do_process(self, queue):
try:
@ -92,23 +92,28 @@ class VitrageGraphService(os_service.Service):
except Exception as e:
LOG.exception("Exception: %s", e)
def _create_datasources_event_listener(self, conf):
topic = conf.datasources.notification_topic_collector
transport = messaging.get_transport(conf)
def _create_datasources_event_listener(self):
topic = self.conf.datasources.notification_topic_collector
transport = messaging.get_transport(self.conf)
targets = [oslo_messaging.Target(topic=topic)]
return messaging.get_notification_listener(
transport,
targets,
[PushNotificationsEndpoint(self.processor.process_event)])
[PushNotificationsEndpoint(self.processor.process_event,
self.processor_lock)])
class PushNotificationsEndpoint(object):
def __init__(self, process_event_callback):
def __init__(self, process_event_callback, processor_lock):
self.process_event_callback = process_event_callback
self.processor_lock = processor_lock
def info(self, ctxt, publisher_id, event_type, payload, metadata):
try:
self.processor_lock.acquire()
self.process_event_callback(payload)
except Exception as e:
LOG.exception(e)
finally:
self.processor_lock.release()