Merge "Force Nodepool re-election on connection suspended"
This commit is contained in:
commit
45d899659a
|
@ -45,6 +45,7 @@ class Nodepool(object):
|
|||
zk_client,
|
||||
enable_node_request_cache=True,
|
||||
node_request_event_callback=self._handleNodeRequestEvent,
|
||||
connection_suspended_callback=self.stop_watcher_event,
|
||||
enable_node_cache=True)
|
||||
self.election = NodepoolEventElection(zk_client)
|
||||
self.event_thread = threading.Thread(target=self.runEventElection)
|
||||
|
|
|
@ -75,6 +75,7 @@ class ZooKeeperClient(object):
|
|||
self.on_disconnect_listeners: List[Callable[[], None]] = []
|
||||
self.on_connection_lost_listeners: List[Callable[[], None]] = []
|
||||
self.on_reconnect_listeners: List[Callable[[], None]] = []
|
||||
self.on_suspended_listeners = []
|
||||
|
||||
def _connectionListener(self, state):
|
||||
"""
|
||||
|
@ -92,6 +93,11 @@ class ZooKeeperClient(object):
|
|||
self.log.exception("Exception calling listener:")
|
||||
elif state == KazooState.SUSPENDED:
|
||||
self.log.debug("ZooKeeper connection: SUSPENDED")
|
||||
for listener in self.on_suspended_listeners:
|
||||
try:
|
||||
listener()
|
||||
except Exception:
|
||||
self.log.exception("Exception calling listener:")
|
||||
else:
|
||||
self.log.debug("ZooKeeper connection: CONNECTED")
|
||||
# Create a throwaway thread since zk operations can't
|
||||
|
@ -241,6 +247,7 @@ class ZooKeeperBase(ZooKeeperSimpleBase):
|
|||
self.client.on_connect_listeners.append(self._onConnect)
|
||||
self.client.on_disconnect_listeners.append(self._onDisconnect)
|
||||
self.client.on_reconnect_listeners.append(self._onReconnect)
|
||||
self.client.on_suspended_listeners.append(self._onSuspended)
|
||||
|
||||
def _onConnect(self):
|
||||
pass
|
||||
|
@ -250,3 +257,6 @@ class ZooKeeperBase(ZooKeeperSimpleBase):
|
|||
|
||||
def _onReconnect(self):
|
||||
pass
|
||||
|
||||
def _onSuspended(self):
|
||||
pass
|
||||
|
|
|
@ -55,10 +55,12 @@ class ZooKeeperNodepool(ZooKeeperBase):
|
|||
def __init__(self, client,
|
||||
enable_node_request_cache=False,
|
||||
node_request_event_callback=None,
|
||||
connection_suspended_callback=None,
|
||||
enable_node_cache=False):
|
||||
super().__init__(client)
|
||||
self.enable_node_request_cache = enable_node_request_cache
|
||||
self.node_request_event_callback = node_request_event_callback
|
||||
self.connection_suspended_callback = connection_suspended_callback
|
||||
self.enable_node_cache = enable_node_cache
|
||||
# The caching model we use is designed around handing out model
|
||||
# data as objects. To do this, we use two caches: one is a TreeCache
|
||||
|
@ -97,6 +99,9 @@ class ZooKeeperNodepool(ZooKeeperBase):
|
|||
self._node_tree.close()
|
||||
self._node_tree = None
|
||||
|
||||
def _onSuspended(self):
|
||||
self.connection_suspended_callback()
|
||||
|
||||
def _nodePath(self, node):
|
||||
return "%s/%s" % (self.NODES_ROOT, node)
|
||||
|
||||
|
|
Loading…
Reference in New Issue