Implement heartbeat for FileDriver
As the title said this patch just implement the heartbeat api for FileDriver. 1. You can delivery the parameter 'timeout' via the connect url like this: file:///tmp/test_coord?timeout=10. Otherwise the default timeout is 10secs. 2. We can call the function: heartbeat(self) periodic to make sure the specific member is alive. 3. When you call get_members(), it will compare the last modify time of the file with current time, if the delta_seconds greater than the timeout, it will mark the member status is dead and will not return the member_id. Change-Id: I8617e17fc1a74e70d7eb5b6f8e12dc9d9a66c755 Closes-Bug: #1664559
This commit is contained in:
parent
9f87cf158d
commit
92c83b2b2e
|
@ -232,6 +232,8 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers):
|
|||
self._reserved_paths = list(self._reserved_dirs)
|
||||
self._reserved_paths.append(self._driver_lock_path)
|
||||
self._safe_member_id = self._make_filesystem_safe(member_id)
|
||||
self._options = utils.collapse(options)
|
||||
self._timeout = int(self._options.get('timeout', 10))
|
||||
|
||||
@staticmethod
|
||||
def _normalize_path(path):
|
||||
|
@ -405,7 +407,15 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers):
|
|||
continue
|
||||
entry_path = os.path.join(group_dir, entry)
|
||||
try:
|
||||
member_id = self._read_member_id(entry_path)
|
||||
m_time = datetime.datetime.fromtimestamp(
|
||||
os.stat(entry_path).st_mtime)
|
||||
current_time = datetime.datetime.now()
|
||||
delta_time = timeutils.delta_seconds(m_time,
|
||||
current_time)
|
||||
if delta_time >= 0 and delta_time <= self._timeout:
|
||||
member_id = self._read_member_id(entry_path)
|
||||
else:
|
||||
continue
|
||||
except EnvironmentError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
|
@ -497,6 +507,23 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers):
|
|||
fut = self._executor.submit(_do_get_groups)
|
||||
return FileFutureResult(fut)
|
||||
|
||||
def heartbeat(self):
|
||||
for group_id in self._joined_groups:
|
||||
safe_group_id = self._make_filesystem_safe(group_id)
|
||||
group_dir = os.path.join(self._group_dir, safe_group_id)
|
||||
member_path = os.path.join(group_dir, "%s.raw" %
|
||||
self._safe_member_id)
|
||||
|
||||
@_lock_me(self._driver_lock)
|
||||
def _do_heartbeat():
|
||||
try:
|
||||
os.utime(member_path, None)
|
||||
except EnvironmentError as err:
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
_do_heartbeat()
|
||||
return self._timeout
|
||||
|
||||
@staticmethod
|
||||
def watch_elected_as_leader(group_id, callback):
|
||||
raise tooz.NotImplemented
|
||||
|
|
Loading…
Reference in New Issue