Factorize group quit on stop()
Some drivers implement it differently, some other do not at all. Change-Id: I574918cd92197105ecd4d12772557a2df1cc6bce
This commit is contained in:
parent
c87348e13f
commit
ca432a0de5
|
@ -385,6 +385,8 @@ class CoordinationDriver(object):
|
|||
if self.requires_beating and start_heart:
|
||||
self.heart.start()
|
||||
self._started = True
|
||||
# Tracks which group are joined
|
||||
self._joined_groups = set()
|
||||
|
||||
def _start(self):
|
||||
pass
|
||||
|
@ -400,6 +402,17 @@ class CoordinationDriver(object):
|
|||
if self.heart.is_alive():
|
||||
self.heart.stop()
|
||||
self.heart.wait()
|
||||
leaving = [self.leave_group(group)
|
||||
for group in self._joined_groups]
|
||||
for leave in leaving:
|
||||
try:
|
||||
leave.get()
|
||||
except ToozError:
|
||||
# Whatever happens, ignore. Maybe we got booted out/never
|
||||
# existed in the first place, or something is down, but we just
|
||||
# want to call _stop after whatever happens to not leak any
|
||||
# connection.
|
||||
pass
|
||||
self._stop()
|
||||
self._started = False
|
||||
|
||||
|
|
|
@ -218,7 +218,6 @@ class FileDriver(coordination._RunWatchersMixin,
|
|||
self._reserved_dirs = [self._dir, self._group_dir, self._tmpdir]
|
||||
self._reserved_paths = list(self._reserved_dirs)
|
||||
self._reserved_paths.append(self._driver_lock_path)
|
||||
self._joined_groups = set()
|
||||
self._safe_member_id = self._make_filesystem_safe(member_id)
|
||||
|
||||
@staticmethod
|
||||
|
@ -261,8 +260,6 @@ class FileDriver(coordination._RunWatchersMixin,
|
|||
self._executor.start()
|
||||
|
||||
def _stop(self):
|
||||
while self._joined_groups:
|
||||
self.leave_group(self._joined_groups.pop())
|
||||
self._executor.stop()
|
||||
|
||||
def _update_group_metadata(self, path, group_id):
|
||||
|
|
|
@ -276,15 +276,6 @@ class MemcachedDriver(coordination._RunWatchersMixin,
|
|||
for lock in list(self._acquired_locks):
|
||||
lock.release()
|
||||
self.client.delete(self._encode_member_id(self._member_id))
|
||||
for g in list(self._joined_groups):
|
||||
try:
|
||||
self.leave_group(g).get()
|
||||
except (coordination.MemberNotJoined,
|
||||
coordination.GroupNotCreated):
|
||||
# Guess we got booted out/never existed in the first place...
|
||||
pass
|
||||
except coordination.ToozError:
|
||||
LOG.warning("Unable to leave group '%s'", g, exc_info=True)
|
||||
self._executor.stop()
|
||||
self.client.close()
|
||||
|
||||
|
|
|
@ -335,7 +335,6 @@ return 1
|
|||
self._client = None
|
||||
self._member_id = utils.to_binary(member_id, encoding=self._encoding)
|
||||
self._acquired_locks = set()
|
||||
self._joined_groups = set()
|
||||
self._executor = utils.ProxyExecutor.build("Redis", options)
|
||||
self._started = False
|
||||
self._server_info = {}
|
||||
|
@ -516,16 +515,6 @@ return 1
|
|||
lock.release()
|
||||
except coordination.ToozError:
|
||||
LOG.warning("Unable to release lock '%s'", lock, exc_info=True)
|
||||
while self._joined_groups:
|
||||
group_id = self._joined_groups.pop()
|
||||
try:
|
||||
self.leave_group(group_id).get()
|
||||
except (coordination.MemberNotJoined,
|
||||
coordination.GroupNotCreated):
|
||||
pass
|
||||
except coordination.ToozError:
|
||||
LOG.warning("Unable to leave group '%s'", group_id,
|
||||
exc_info=True)
|
||||
self._executor.stop()
|
||||
if self._client is not None:
|
||||
# Make sure we no longer exist...
|
||||
|
|
|
@ -550,7 +550,9 @@ class TestAPI(tests.TestCaseSkipNotImplemented):
|
|||
# Only works for clients that have access to the groups they are part
|
||||
# of, to ensure that after we got booted out by client3 that this
|
||||
# client now no longer believes its part of the group.
|
||||
if hasattr(self._coord, '_joined_groups'):
|
||||
if (hasattr(self._coord, '_joined_groups')
|
||||
and (self._coord.run_watchers
|
||||
== tooz.coordination._RunWatchersMixin.run_watchers)):
|
||||
self.assertIn(self.group_id, self._coord._joined_groups)
|
||||
self._coord.run_watchers()
|
||||
self.assertNotIn(self.group_id, self._coord._joined_groups)
|
||||
|
|
Loading…
Reference in New Issue