From 92c83b2b2ef7877bd448329cad5138da7746c8bf Mon Sep 17 00:00:00 2001 From: mengalong Date: Tue, 4 Apr 2017 18:02:32 +0800 Subject: [PATCH] Implement heartbeat for FileDriver As the title said this patch just implement the heartbeat api for FileDriver. 1. You can delivery the parameter 'timeout' via the connect url like this: file:///tmp/test_coord?timeout=10. Otherwise the default timeout is 10secs. 2. We can call the function: heartbeat(self) periodic to make sure the specific member is alive. 3. When you call get_members(), it will compare the last modify time of the file with current time, if the delta_seconds greater than the timeout, it will mark the member status is dead and will not return the member_id. Change-Id: I8617e17fc1a74e70d7eb5b6f8e12dc9d9a66c755 Closes-Bug: #1664559 --- tooz/drivers/file.py | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/tooz/drivers/file.py b/tooz/drivers/file.py index 45e5230b..99626692 100644 --- a/tooz/drivers/file.py +++ b/tooz/drivers/file.py @@ -232,6 +232,8 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers): self._reserved_paths = list(self._reserved_dirs) self._reserved_paths.append(self._driver_lock_path) self._safe_member_id = self._make_filesystem_safe(member_id) + self._options = utils.collapse(options) + self._timeout = int(self._options.get('timeout', 10)) @staticmethod def _normalize_path(path): @@ -405,7 +407,15 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers): continue entry_path = os.path.join(group_dir, entry) try: - member_id = self._read_member_id(entry_path) + m_time = datetime.datetime.fromtimestamp( + os.stat(entry_path).st_mtime) + current_time = datetime.datetime.now() + delta_time = timeutils.delta_seconds(m_time, + current_time) + if delta_time >= 0 and delta_time <= self._timeout: + member_id = self._read_member_id(entry_path) + else: + continue except EnvironmentError as e: if e.errno != errno.ENOENT: raise @@ -497,6 +507,23 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers): fut = self._executor.submit(_do_get_groups) return FileFutureResult(fut) + def heartbeat(self): + for group_id in self._joined_groups: + safe_group_id = self._make_filesystem_safe(group_id) + group_dir = os.path.join(self._group_dir, safe_group_id) + member_path = os.path.join(group_dir, "%s.raw" % + self._safe_member_id) + + @_lock_me(self._driver_lock) + def _do_heartbeat(): + try: + os.utime(member_path, None) + except EnvironmentError as err: + if err.errno != errno.ENOENT: + raise + _do_heartbeat() + return self._timeout + @staticmethod def watch_elected_as_leader(group_id, callback): raise tooz.NotImplemented