Factorize group quit on stop()

Some drivers implement it differently, some other do not at all.

Change-Id: I574918cd92197105ecd4d12772557a2df1cc6bce
This commit is contained in:
Julien Danjou 2016-11-17 18:29:04 +01:00
parent c87348e13f
commit ca432a0de5
5 changed files with 16 additions and 24 deletions

View File

@ -385,6 +385,8 @@ class CoordinationDriver(object):
if self.requires_beating and start_heart:
self.heart.start()
self._started = True
# Tracks which group are joined
self._joined_groups = set()
def _start(self):
pass
@ -400,6 +402,17 @@ class CoordinationDriver(object):
if self.heart.is_alive():
self.heart.stop()
self.heart.wait()
leaving = [self.leave_group(group)
for group in self._joined_groups]
for leave in leaving:
try:
leave.get()
except ToozError:
# Whatever happens, ignore. Maybe we got booted out/never
# existed in the first place, or something is down, but we just
# want to call _stop after whatever happens to not leak any
# connection.
pass
self._stop()
self._started = False

View File

@ -218,7 +218,6 @@ class FileDriver(coordination._RunWatchersMixin,
self._reserved_dirs = [self._dir, self._group_dir, self._tmpdir]
self._reserved_paths = list(self._reserved_dirs)
self._reserved_paths.append(self._driver_lock_path)
self._joined_groups = set()
self._safe_member_id = self._make_filesystem_safe(member_id)
@staticmethod
@ -261,8 +260,6 @@ class FileDriver(coordination._RunWatchersMixin,
self._executor.start()
def _stop(self):
while self._joined_groups:
self.leave_group(self._joined_groups.pop())
self._executor.stop()
def _update_group_metadata(self, path, group_id):

View File

@ -276,15 +276,6 @@ class MemcachedDriver(coordination._RunWatchersMixin,
for lock in list(self._acquired_locks):
lock.release()
self.client.delete(self._encode_member_id(self._member_id))
for g in list(self._joined_groups):
try:
self.leave_group(g).get()
except (coordination.MemberNotJoined,
coordination.GroupNotCreated):
# Guess we got booted out/never existed in the first place...
pass
except coordination.ToozError:
LOG.warning("Unable to leave group '%s'", g, exc_info=True)
self._executor.stop()
self.client.close()

View File

@ -335,7 +335,6 @@ return 1
self._client = None
self._member_id = utils.to_binary(member_id, encoding=self._encoding)
self._acquired_locks = set()
self._joined_groups = set()
self._executor = utils.ProxyExecutor.build("Redis", options)
self._started = False
self._server_info = {}
@ -516,16 +515,6 @@ return 1
lock.release()
except coordination.ToozError:
LOG.warning("Unable to release lock '%s'", lock, exc_info=True)
while self._joined_groups:
group_id = self._joined_groups.pop()
try:
self.leave_group(group_id).get()
except (coordination.MemberNotJoined,
coordination.GroupNotCreated):
pass
except coordination.ToozError:
LOG.warning("Unable to leave group '%s'", group_id,
exc_info=True)
self._executor.stop()
if self._client is not None:
# Make sure we no longer exist...

View File

@ -550,7 +550,9 @@ class TestAPI(tests.TestCaseSkipNotImplemented):
# Only works for clients that have access to the groups they are part
# of, to ensure that after we got booted out by client3 that this
# client now no longer believes its part of the group.
if hasattr(self._coord, '_joined_groups'):
if (hasattr(self._coord, '_joined_groups')
and (self._coord.run_watchers
== tooz.coordination._RunWatchersMixin.run_watchers)):
self.assertIn(self.group_id, self._coord._joined_groups)
self._coord.run_watchers()
self.assertNotIn(self.group_id, self._coord._joined_groups)