diff --git a/devops/helpers/ntp.py b/devops/helpers/ntp.py index 871ecd03..333fa92e 100644 --- a/devops/helpers/ntp.py +++ b/devops/helpers/ntp.py @@ -12,10 +12,11 @@ # License for the specific language governing permissions and limitations # under the License. -import time +import abc -from devops.error import TimeoutError -from devops.helpers.helpers import get_admin_ip +from six import add_metaclass + +from devops.error import DevopsError from devops.helpers.helpers import get_admin_remote from devops.helpers.helpers import get_node_remote from devops.helpers.helpers import wait @@ -45,40 +46,253 @@ def sync_time(env, node_names, skip_sync=False): g_ntp.do_sync_time(g_ntp.other_ntps) all_ntps = g_ntp.admin_ntps + g_ntp.pacemaker_ntps + g_ntp.other_ntps - results = {ntp.node_name: ntp.date()[0].rstrip() for ntp in all_ntps} + results = {ntp.node_name: ntp.date for ntp in all_ntps} return results +@add_metaclass(abc.ABCMeta) +class AbstractNtp(object): + + def __init__(self, remote, node_name): + self._remote = remote + self._node_name = node_name + + def __repr__(self): + return "{0}(remote={1}, node_name={2!r})".format( + self.__class__.__name__, self.remote, self.node_name) + + @property + def remote(self): + """remote object""" + return self._remote + + @property + def node_name(self): + """node name""" + return self._node_name + + @property + def date(self): + return self.remote.execute("date")['stdout'][0].rstrip() + + @abc.abstractmethod + def start(self): + """Start ntp daemon""" + + @abc.abstractmethod + def stop(self): + """Stop ntp daemon""" + + @abc.abstractmethod + def set_actual_time(self, timeout=600): + """enforce time sync""" + + @abc.abstractmethod + def wait_peer(self, interval=8, timeout=600): + """Wait for connection""" + + +# pylint: disable=abstract-method +# noinspection PyAbstractClass +class BaseNtp(AbstractNtp): + """Base class for ntpd based services + + Provides common methods: + - set_actual_time + - wait_peer + """ + + def set_actual_time(self, timeout=600): + # Get IP of a server from which the time will be synchronized. + srv_cmd = "awk '/^server/ && $2 !~ /^127\./ {print $2}' /etc/ntp.conf" + server = self.remote.execute(srv_cmd)['stdout'][0] + + # Waiting for parent server until it starts providing the time + set_date_cmd = "ntpdate -p 4 -t 0.2 -bu {0}".format(server) + wait(lambda: not self.remote.execute(set_date_cmd)['exit_code'], + timeout=timeout, + timeout_msg='Failed to set actual time on node {!r}'.format( + self._node_name)) + + self.remote.check_call('hwclock -w') + + def _get_ntpq(self): + return self.remote.execute('ntpq -pn 127.0.0.1')['stdout'][2:] + + def _get_sync_complete(self): + peers = self._get_ntpq() + + logger.debug("Node: {0}, ntpd peers: {1}".format( + self.node_name, peers)) + + for peer in peers: + p = peer.split() + remote = str(p[0]) + reach = int(p[6], 8) # From octal to int + offset = float(p[8]) + jitter = float(p[9]) + + # 1. offset and jitter should not be higher than 500 + # Otherwise, time should be re-set. + if (abs(offset) > 500) or (abs(jitter) > 500): + continue + + # 2. remote should be marked with tally '*' + if remote[0] != '*': + continue + + # 3. reachability bit array should have '1' at least in + # two lower bits as the last two successful checks + if reach & 3 != 3: + continue + + return True + return False + + def wait_peer(self, interval=8, timeout=600): + wait(self._get_sync_complete, + interval=interval, + timeout=timeout, + timeout_msg='Failed to wait peer on node {!r}'.format( + self._node_name)) + +# pylint: enable=abstract-method + + +class NtpInitscript(BaseNtp): + """NtpInitscript.""" # TODO(ddmitriev) documentation + + def __init__(self, remote, node_name): + super(NtpInitscript, self).__init__(remote, node_name) + get_ntp_cmd = \ + "find /etc/init.d/ -regex '/etc/init.d/ntp.?' -executable" + result = remote.execute(get_ntp_cmd) + self._service = result['stdout'][0].strip() + + def start(self): + self.remote.check_call("{0} start".format(self._service)) + + def stop(self): + self.remote.check_call("{0} stop".format(self._service)) + + +class NtpPacemaker(BaseNtp): + """NtpPacemaker.""" # TODO(ddmitriev) documentation + + def start(self): + # Temporary workaround of the LP bug #1441121 + self.remote.execute('ip netns exec vrouter ip l set dev lo up') + + self.remote.execute('crm resource start p_ntp') + + def stop(self): + self.remote.execute('crm resource stop p_ntp; killall ntpd') + + def _get_ntpq(self): + return self.remote.execute( + 'ip netns exec vrouter ntpq -pn 127.0.0.1')['stdout'][2:] + + +class NtpSystemd(BaseNtp): + """NtpSystemd.""" # TODO(ddmitriev) documentation + + def start(self): + self.remote.check_call('systemctl start ntpd') + + def stop(self): + self.remote.check_call('systemctl stop ntpd') + + +class NtpChronyd(AbstractNtp): + """Implements communication with chrony service + + Reference: http://chrony.tuxfamily.org/ + """ + + def start(self): + # No need to stop/start chronyd + # client can't work without daemon + pass + + def stop(self): + # No need to stop/start chronyd + # client can't work without daemon + pass + + def _get_burst_complete(self): + result = self._remote.check_call('chronyc -a activity') + stdout = result['stdout'] + burst_line = stdout[4] + return burst_line == '0 sources doing burst (return to online)\n' + + def set_actual_time(self, timeout=600): + # sync time + # 3 - good measurements + # 5 - max measurements + self._remote.check_call('chronyc -a burst 3/5') + + # wait burst complete + wait(self._get_burst_complete, timeout=timeout, + timeout_msg='Failed to set actual time on node {!r}'.format( + self._node_name)) + + # set system clock + self._remote.check_call('chronyc -a makestep') + + def wait_peer(self, interval=8, timeout=600): + # wait for synchronization + # 10 - number of tries + # 0.01 - maximum allowed remaining correction + self._remote.check_call('chronyc -a waitsync 10 0.01') + + class GroupNtpSync(object): """Synchronize a group of nodes.""" + @staticmethod + def get_ntp(remote, node_name): + # Detect how NTPD is managed - by init script or by pacemaker. + pcs_cmd = "ps -C pacemakerd && crm_resource --resource p_ntp --locate" + systemd_cmd = "systemctl list-unit-files| grep ntpd" + chronyd_cmd = "systemctl is-active chronyd" + initd_cmd = "find /etc/init.d/ -regex '/etc/init.d/ntp.?' -executable" + + if remote.execute(pcs_cmd)['exit_code'] == 0: + # Pacemaker service found + return NtpPacemaker(remote, node_name) + elif remote.execute(systemd_cmd)['exit_code'] == 0: + return NtpSystemd(remote, node_name) + elif remote.execute(chronyd_cmd)['exit_code'] == 0: + return NtpChronyd(remote, node_name) + elif len(remote.execute(initd_cmd)['stdout']): + return NtpInitscript(remote, node_name) + else: + raise DevopsError('No suitable NTP service found on node {!r}' + ''.format(node_name)) + def __init__(self, env, node_names): """Context manager for synchronize time on nodes param: env - environment object param: node_names - list of devops node names """ - if not env: - raise Exception("'env' is not set, failed to initialize" - " connections to {0}".format(node_names)) self.admin_ntps = [] self.pacemaker_ntps = [] self.other_ntps = [] - admin_ip = get_admin_ip(env) - for node_name in node_names: if node_name == 'admin': # 1. Add a 'Ntp' instance with connection to Fuel admin node - self.admin_ntps.append( - Ntp.get_ntp(get_admin_remote(env), 'admin')) + admin_remote = get_admin_remote(env) + admin_ntp = self.get_ntp(admin_remote, 'admin') + self.admin_ntps.append(admin_ntp) logger.debug("Added node '{0}' to self.admin_ntps" .format(node_name)) continue - ntp = Ntp.get_ntp(get_node_remote(env, node_name), - node_name, admin_ip) - if ntp.is_pacemaker: + remote = get_node_remote(env, node_name) + ntp = self.get_ntp(remote, node_name) + if isinstance(ntp, NtpPacemaker): # 2. Create a list of 'Ntp' connections to the controller nodes self.pacemaker_ntps.append(ntp) logger.debug("Added node '{0}' to self.pacemaker_ntps" @@ -93,215 +307,34 @@ class GroupNtpSync(object): return self def __exit__(self, exp_type, exp_value, traceback): - [ntp.remote.clear() for ntp in self.admin_ntps] - [ntp.remote.clear() for ntp in self.pacemaker_ntps] - [ntp.remote.clear() for ntp in self.other_ntps] + pass - def is_synchronized(self, ntps): - return all([ntp.is_synchronized for ntp in ntps]) - - def is_connected(self, ntps): - return all([ntp.is_connected for ntp in ntps]) - - def report_not_synchronized(self, ntps): - return [(ntp.node_name, ntp.last_error_msg, ntp.date()) - for ntp in ntps if not ntp.is_synchronized] - - def report_not_connected(self, ntps): - return [(ntp.node_name, ntp.last_error_msg, ntp.peers) - for ntp in ntps if not ntp.is_connected] - - def report_node_names(self, ntps): + @staticmethod + def report_node_names(ntps): return [ntp.node_name for ntp in ntps] def do_sync_time(self, ntps): - # 0. 'ntps' can be filled by __init__() or outside the class - if not ntps: - raise ValueError("No servers were provided to synchronize " - "the time in self.ntps") - # 1. Stop NTPD service on nodes logger.debug("Stop NTPD service on nodes {0}" .format(self.report_node_names(ntps))) - [ntp.stop() for ntp in ntps] + for ntp in ntps: + ntp.stop() # 2. Set actual time on all nodes via 'ntpdate' logger.debug("Set actual time on all nodes via 'ntpdate' on nodes {0}" .format(self.report_node_names(ntps))) - [ntp.set_actual_time() for ntp in ntps] - if not self.is_synchronized(ntps): - raise TimeoutError("Time on nodes was not set with 'ntpdate':\n{0}" - .format(self.report_not_synchronized(ntps))) + for ntp in ntps: + ntp.set_actual_time() # 3. Start NTPD service on nodes logger.debug("Start NTPD service on nodes {0}" .format(self.report_node_names(ntps))) - [ntp.start() for ntp in ntps] + for ntp in ntps: + ntp.start() # 4. Wait for established peers logger.debug("Wait for established peers on nodes {0}" .format(self.report_node_names(ntps))) - [ntp.wait_peer() for ntp in ntps] - if not self.is_connected(ntps): - raise TimeoutError("NTPD on nodes was not synchronized:\n" - "{0}".format(self.report_not_connected(ntps))) - - -class Ntp(object): - """Common methods to work with ntpd service.""" - - @staticmethod - def get_ntp(remote, node_name='node', admin_ip=None): - - # Detect how NTPD is managed - by init script or by pacemaker. - pcs_cmd = "ps -C pacemakerd && crm_resource --resource p_ntp --locate" - systemd_cmd = "systemctl list-unit-files| grep ntpd" - - if remote.execute(pcs_cmd)['exit_code'] == 0: - # Pacemaker service found - cls = NtpPacemaker() - cls.is_pacemaker = True - elif remote.execute(systemd_cmd)['exit_code'] == 0: - cls = NtpSystemd() - cls.is_pacemaker = False - else: - # Pacemaker not found, using native ntpd - cls = NtpInitscript() - cls.is_pacemaker = False - # Find upstart / sysv-init executable script - cmd = "find /etc/init.d/ -regex '/etc/init.d/ntp.?'" - cls.service = remote.execute(cmd)['stdout'][0].strip() - - cls.is_synchronized = False - cls.is_connected = False - cls.remote = remote - cls.node_name = node_name - cls.peers = [] - cls.last_error_msg = '' - - # Get IP of a server from which the time will be synchronized. - cmd = "awk '/^server/ && $2 !~ /127.*/ {print $2}' /etc/ntp.conf" - cls.server = remote.execute(cmd)['stdout'][0] - - # Speedup time synchronization for slaves that use admin node as a peer - if admin_ip: - cmd = ("sed -i 's/^server {0} .*/server {0} minpoll 3 maxpoll 5 " - "iburst/' /etc/ntp.conf".format(admin_ip)) - remote.execute(cmd) - - return cls - - def set_actual_time(self, timeout=600): - # Waiting for parent server until it starts providing the time - cmd = "ntpdate -p 4 -t 0.2 -bu {0}".format(self.server) - self.is_synchronized = False - try: - wait(lambda: not self.remote.execute(cmd)['exit_code'], timeout) - self.is_synchronized = True - except TimeoutError: - result = self.remote.execute(cmd) - self.last_error_msg = ( - "Execution of the command '{cmd}' failed on the node " - "{node_name} with exit_code = {ec} : {stderr}" - .format(cmd=cmd, node_name=self.node_name, - ec=result['exit_code'], - stderr='\n'.join(result['stderr']))) - logger.debug(self.last_error_msg) - - if self.is_synchronized: - self.remote.execute('hwclock -w') - self.last_error_msg = '' - - return self.is_synchronized - - def wait_peer(self, interval=8, timeout=600): - self.is_connected = False - - start_time = time.time() - while start_time + timeout > time.time(): - # peer = `ntpq -pn 127.0.0.1` - self.peers = self.get_peers()[2:] # skip the header - - self.last_error_msg = "Node: {0}, ntpd peers: {1}".format( - self.node_name, self.peers) - logger.debug(self.last_error_msg) - - for peer in self.peers: - p = peer.split() - remote = str(p[0]) - reach = int(p[6], 8) # From octal to int - offset = float(p[8]) - jitter = float(p[9]) - - # 1. offset and jitter should not be higher than 500 - # Otherwise, time should be re-set. - if (abs(offset) > 500) or (abs(jitter) > 500): - return self.is_connected - - # 2. remote should be marked with tally '*' - if remote[0] != '*': - continue - - # 3. reachability bit array should have '1' at least in - # two lower bits as the last two successful checks - if reach & 3 == 3: - self.is_connected = True - self.last_error_msg = '' - return self.is_connected - - time.sleep(interval) - return self.is_connected - - def date(self): - return self.remote.execute("date")['stdout'] - - -class NtpInitscript(Ntp): - """NtpInitscript.""" # TODO(ddmitriev) documentation - - def start(self): - self.is_connected = False - self.remote.execute("{0} start".format(self.service)) - - def stop(self): - self.is_connected = False - self.remote.execute("{0} stop".format(self.service)) - - def get_peers(self): - return self.remote.execute('ntpq -pn 127.0.0.1')['stdout'] - - -class NtpPacemaker(Ntp): - """NtpPacemaker.""" # TODO(ddmitriev) documentation - - def start(self): - self.is_connected = False - - # Temporary workaround of the LP bug #1441121 - self.remote.execute('ip netns exec vrouter ip l set dev lo up') - - self.remote.execute('crm resource start p_ntp') - - def stop(self): - self.is_connected = False - self.remote.execute('crm resource stop p_ntp; killall ntpd') - - def get_peers(self): - return self.remote.execute( - 'ip netns exec vrouter ntpq -pn 127.0.0.1')['stdout'] - - -class NtpSystemd(Ntp): - """NtpSystemd.""" # TODO(ddmitriev) documentation - - def start(self): - self.is_connected = False - self.remote.execute('systemctl start ntpd') - - def stop(self): - self.is_connected = False - self.remote.execute('systemctl stop ntpd') - - def get_peers(self): - return self.remote.execute('ntpq -pn 127.0.0.1')['stdout'] + for ntp in ntps: + ntp.wait_peer()