notification: Remove eventlet timers

This change removes usage of eventlet timers.

This allows coordinator heartbeat/watchers to work correctly when
the main thread is stuck for any reason (IO, time.sleep, ...).

Change-Id: I847aebb0d0166c2b46505061a15a06e3ce1b5eb2
Closes-Bug: #1566887
This commit is contained in:
Mehdi Abaakouk 2016-04-20 09:33:25 +02:00
parent 3766f21c7f
commit 24f75dae12
2 changed files with 31 additions and 4 deletions

View File

@ -15,6 +15,8 @@
import itertools
import threading
from concurrent import futures
from futurist import periodics
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
@ -181,10 +183,27 @@ class NotificationService(service_base.BaseService):
self.partition_coordinator.join_group(self.group_id)
self.partition_coordinator.watch_group(self.group_id,
self._refresh_agent)
self.tg.add_timer(cfg.CONF.coordination.heartbeat,
self.partition_coordinator.heartbeat)
self.tg.add_timer(cfg.CONF.coordination.check_watchers,
self.partition_coordinator.run_watchers)
@periodics.periodic(spacing=cfg.CONF.coordination.heartbeat,
run_immediately=True)
def heartbeat():
self.partition_coordinator.heartbeat()
@periodics.periodic(spacing=cfg.CONF.coordination.check_watchers,
run_immediately=True)
def run_watchers():
self.partition_coordinator.run_watchers()
self.periodic = periodics.PeriodicWorker.create(
[], executor_factory=lambda:
futures.ThreadPoolExecutor(max_workers=10))
self.periodic.add(heartbeat)
self.periodic.add(run_watchers)
t = threading.Thread(target=self.periodic.start)
t.daemon = True
t.start()
# configure pipelines after all coordination is configured.
self._configure_pipeline_listeners()
@ -192,6 +211,9 @@ class NotificationService(service_base.BaseService):
LOG.warning(_LW('Non-metric meters may be collected. It is highly '
'advisable to disable these meters using '
'ceilometer.conf or the pipeline.yaml'))
# NOTE(sileht): We have to drop eventlet to drop this last eventlet
# thread.
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
@ -290,6 +312,9 @@ class NotificationService(service_base.BaseService):
self.pipeline_listeners.append(listener)
def stop(self):
if getattr(self, 'periodic', None):
self.periodic.stop()
self.periodic.wait()
if getattr(self, 'partition_coordinator', None):
self.partition_coordinator.stop()
listeners = []

View File

@ -2,6 +2,8 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
futures>=3.0;python_version=='2.7' or python_version=='2.6' # BSD
futurist>=0.11.0 # Apache-2.0
retrying!=1.3.0,>=1.2.3 # Apache-2.0
jsonpath-rw-ext>=0.1.9 # Apache-2.0
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT