From 5f905c86c9983338636eeaeaf13865230c488007 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Wed, 16 Nov 2016 13:02:19 +0100 Subject: [PATCH] Do not re-set the members cache for watchers by default MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The current code fills the group membership cache by default – without any check for data already been there - which can lead to a race condition in the following situation: coord1.watch_group_join(x, cb1) # Cache is filled: contains {} coord2.join_group(x) coord1.watch_group_join(x, cb2) # Cache is re-filled: contains {coord2} coord1.run_watchers() # Calls nothing since no new join is seen whereas # it should call cb1 Change-Id: I7376c9f320a37bb44282b52fa5b6fa3f1f86784b --- tooz/coordination.py | 15 +++++++++++++++ tooz/drivers/file.py | 13 ------------- tooz/drivers/memcached.py | 19 ------------------- tooz/drivers/redis.py | 12 ------------ tooz/drivers/zookeeper.py | 4 ---- tooz/tests/test_coordination.py | 4 ++++ 6 files changed, 19 insertions(+), 48 deletions(-) diff --git a/tooz/coordination.py b/tooz/coordination.py index 1881d9b4..5ec81ba2 100644 --- a/tooz/coordination.py +++ b/tooz/coordination.py @@ -627,6 +627,16 @@ class CoordinationDriverCachedRunWatchers(CoordinationDriver): self._group_members = collections.defaultdict(set) self._joined_groups = set() + def _init_watch_group(self, group_id): + members = self.get_members(group_id).get() + if group_id not in self._group_members: + self._group_members[group_id] = members + + def watch_join_group(self, group_id, callback): + self._init_watch_group(group_id) + super(CoordinationDriverCachedRunWatchers, self).watch_join_group( + group_id, callback) + def unwatch_join_group(self, group_id, callback): super(CoordinationDriverCachedRunWatchers, self).unwatch_join_group( group_id, callback) @@ -635,6 +645,11 @@ class CoordinationDriverCachedRunWatchers(CoordinationDriver): group_id in self._group_members): del self._group_members[group_id] + def watch_leave_group(self, group_id, callback): + self._init_watch_group(group_id) + super(CoordinationDriverCachedRunWatchers, self).watch_leave_group( + group_id, callback) + def unwatch_leave_group(self, group_id, callback): super(CoordinationDriverCachedRunWatchers, self).unwatch_leave_group( group_id, callback) diff --git a/tooz/drivers/file.py b/tooz/drivers/file.py index b966b72b..32f5c8a6 100644 --- a/tooz/drivers/file.py +++ b/tooz/drivers/file.py @@ -482,19 +482,6 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers): fut = self._executor.submit(_do_get_groups) return FileFutureResult(fut) - def _init_watch_group(self, group_id): - group_members_fut = self.get_members(group_id) - group_members = group_members_fut.get(timeout=None) - self._group_members[group_id].update(group_members) - - def watch_join_group(self, group_id, callback): - self._init_watch_group(group_id) - return super(FileDriver, self).watch_join_group(group_id, callback) - - def watch_leave_group(self, group_id, callback): - self._init_watch_group(group_id) - return super(FileDriver, self).watch_leave_group(group_id, callback) - @staticmethod def watch_elected_as_leader(group_id, callback): raise tooz.NotImplemented diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index 97205ff3..239d4eb2 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -476,25 +476,6 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers): self.leader_timeout, self.lock_timeout) - @_translate_failures - def _init_watch_group(self, group_id): - members = self.client.get(self._encode_group_id(group_id)) - if members is None: - raise coordination.GroupNotCreated(group_id) - # Initialize with the current group member list - if group_id not in self._group_members: - self._group_members[group_id] = set(members.keys()) - - def watch_join_group(self, group_id, callback): - self._init_watch_group(group_id) - return super(MemcachedDriver, self).watch_join_group( - group_id, callback) - - def watch_leave_group(self, group_id, callback): - self._init_watch_group(group_id) - return super(MemcachedDriver, self).watch_leave_group( - group_id, callback) - def get_lock(self, name): return MemcachedLock(self, name, self.lock_timeout) diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py index 58513dff..01b7dbe8 100644 --- a/tooz/drivers/redis.py +++ b/tooz/drivers/redis.py @@ -720,18 +720,6 @@ return 1 return RedisFutureResult(self._submit(_get_groups)) - def _init_watch_group(self, group_id): - members = self.get_members(group_id) - self._group_members[group_id].update(members.get(timeout=None)) - - def watch_join_group(self, group_id, callback): - self._init_watch_group(group_id) - return super(RedisDriver, self).watch_join_group(group_id, callback) - - def watch_leave_group(self, group_id, callback): - self._init_watch_group(group_id) - return super(RedisDriver, self).watch_leave_group(group_id, callback) - def _get_leader_lock(self, group_id): name = self._encode_group_leader(group_id) return self.get_lock(name) diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py index 2dc2e461..de66220b 100644 --- a/tooz/drivers/zookeeper.py +++ b/tooz/drivers/zookeeper.py @@ -475,7 +475,6 @@ class KazooDriver(coordination.CoordinationDriverCachedRunWatchers): return client.KazooClient(**client_kwargs) def _watch_group(self, group_id): - get_members_req = self.get_members(group_id) def on_children_change(children): # If we don't have any hook, stop watching @@ -506,9 +505,6 @@ class KazooDriver(coordination.CoordinationDriverCachedRunWatchers): self._group_members[group_id] = children - # Initialize the current member list - self._group_members[group_id] = get_members_req.get() - try: self._coord.ChildrenWatch(self._path_group(group_id), on_children_change) diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py index aa54eb23..c073161b 100644 --- a/tooz/tests/test_coordination.py +++ b/tooz/tests/test_coordination.py @@ -387,6 +387,10 @@ class TestAPI(tests.TestCaseSkipNotImplemented): # we get an event. self._coord.watch_leave_group(self.group_id, self._set_event) + # Run watchers to be sure we initialize the member cache and we *know* + # client2 is a member now + self._coord.run_watchers() + time.sleep(3) self._coord.heartbeat() time.sleep(3)