Implement heartbeat for FileDriver

As the title said this patch just implement the heartbeat api for
FileDriver.
1. You can delivery the parameter 'timeout' via the connect url like
   this: file:///tmp/test_coord?timeout=10. Otherwise the default timeout
   is 10secs.
2. We can call the function: heartbeat(self) periodic
   to make sure the specific member is alive.
3. When you call get_members(), it will compare the last modify time of
   the file with current time, if the delta_seconds greater than
   the timeout, it will mark the member status is dead and will not return
   the member_id.

Change-Id: I8617e17fc1a74e70d7eb5b6f8e12dc9d9a66c755
Closes-Bug: #1664559
This commit is contained in:
mengalong 2017-04-04 18:02:32 +08:00
parent 9f87cf158d
commit 92c83b2b2e
1 changed files with 28 additions and 1 deletions

View File

@ -232,6 +232,8 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers):
self._reserved_paths = list(self._reserved_dirs)
self._reserved_paths.append(self._driver_lock_path)
self._safe_member_id = self._make_filesystem_safe(member_id)
self._options = utils.collapse(options)
self._timeout = int(self._options.get('timeout', 10))
@staticmethod
def _normalize_path(path):
@ -405,7 +407,15 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers):
continue
entry_path = os.path.join(group_dir, entry)
try:
member_id = self._read_member_id(entry_path)
m_time = datetime.datetime.fromtimestamp(
os.stat(entry_path).st_mtime)
current_time = datetime.datetime.now()
delta_time = timeutils.delta_seconds(m_time,
current_time)
if delta_time >= 0 and delta_time <= self._timeout:
member_id = self._read_member_id(entry_path)
else:
continue
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
@ -497,6 +507,23 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers):
fut = self._executor.submit(_do_get_groups)
return FileFutureResult(fut)
def heartbeat(self):
for group_id in self._joined_groups:
safe_group_id = self._make_filesystem_safe(group_id)
group_dir = os.path.join(self._group_dir, safe_group_id)
member_path = os.path.join(group_dir, "%s.raw" %
self._safe_member_id)
@_lock_me(self._driver_lock)
def _do_heartbeat():
try:
os.utime(member_path, None)
except EnvironmentError as err:
if err.errno != errno.ENOENT:
raise
_do_heartbeat()
return self._timeout
@staticmethod
def watch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented