1. 39372A03-8688-72A7-E911-805486341B42:/home/fsp # cps template-instance-list --service ntp ntp-server
    2. +------------+---------------+---------+--------------------------------------+----------------+
    3. | instanceid | componenttype | status | runsonhost | omip |
    4. +------------+---------------+---------+--------------------------------------+----------------+
    5. | 0 | ntp-server | active | 39372A03-8688-72A7-E911-805486341B42 | 10.200.128.142 |
    6. | 1 | ntp-server | standby | A6244EDB-F848-0786-E911-81559A70E331 | 10.200.130.127 |
    7. +------------+---------------+---------+--------------------------------------+----------------+
    8. 39372A03-8688-72A7-E911-805486341B42:/home/fsp # date
    9. Tue Sep 1 14:45:38 CST 2020
    10. 39372A03-8688-72A7-E911-805486341B42:/home/fsp # ntp time-delta --host all
    39372A03-8688-72A7-E911-805486341B42:/home/fsp # ps -ef |grep ntp
    cps      15476     1  0 Apr26 ?        09:57:50 /usr/bin/python /usr/local/bin/ntp-server/ntp_server/ntpserver.py
    root     18696     1  0 Apr26 ?        00:53:13 /usr/sbin/ntpd -4 -c /usr/local/bin/ntp-server/conf/ntp.user.conf
    root     22761     1  0 Apr26 ?        00:13:09 /usr/bin/python /usr/local/bin/ntp-client/ntp_client/ntpclient.py -s 172.28.9.37
    
    39372A03-8688-72A7-E911-805486341B42:/home/fsp # ll /usr/local/bin/ntp-server/ntp_server/ntpserver.py
    -rwxr-xr-x 1 cps cps 18956 Sep  8  2019 /usr/local/bin/ntp-server/ntp_server/ntpserver.py
    39372A03-8688-72A7-E911-805486341B42:/home/fsp # cat /usr/local/bin/ntp-server/ntp_server/ntpserver.py
    #!/usr/bin/python
    # coding:utf-8
    
    import commands
    import json
    import os
    from os.path import join
    import random
    import subprocess
    import sys
    import thread
    import threading
    import time
    import traceback
    
    from FSHeartBeat.monitorClient import MonitorClient
    from FSSysconf import ipUtil
    import log
    from oslo_config import cfg
    
    from ntp_base.cpsbase import CPSBase
    from ntp_base.cpsconstant import CPSConstant
    from ntp_base import ntp_constant
    from ntp_base.ntp_sync_mgr import NtpBase
    from ntp_base.ntp_sync_mgr import NtpConstant
    from ntp_base.time_server_monitor import ExtendServerMonitor
    from ntp_base.utils import run_cmd_secure
    from ntp_rest_server import RestAliveWatcher
    from ntp_rest_server import RestServer
    
    
    # 共享内存大小
    MON_SHM_SIZE = 600000
    
    CONF = cfg.CONF
    auth_opts = [cfg.StrOpt("auth_host"),
                 cfg.StrOpt("auth_port"),
                 cfg.StrOpt("auth_protocol"),
                 cfg.StrOpt("auth_uri"),
                 cfg.StrOpt("signing_dir"),
                 cfg.StrOpt("admin_user"),
                 cfg.StrOpt("admin_password", secret=True),
                 cfg.StrOpt("auth_mode"),
                 cfg.StrOpt("admin_tenant_name"),
                 cfg.StrOpt("auth_version"),
                 cfg.StrOpt("keystone_offline_time"),
                 cfg.StrOpt("token_cache_time"),
                 cfg.StrOpt("check_revocations_for_cached"),
                 cfg.StrOpt("inseucre"),
                 cfg.StrOpt("revocation_cache_time"),
                 cfg.StrOpt("auth_admin_prefix"),
                 cfg.StrOpt("http_connect_timeout"),
                 cfg.ListOpt('hash_algorithms',
                             default=['sha256'],
                             secret=True,
                             help="used for generate token uuid")]
    ntp_opts = [cfg.StrOpt('ciphers',
                           default='',
                           help="Set security encryption algorithm"),
                cfg.StrOpt('ssl_verify_enable',
                           default='false',
                           help="whether check certfile"),
                cfg.StrOpt('server',
                           default='127.127.1.0',
                           help="extend time server"),
                cfg.StrOpt('active_ip',
                           default='',
                           help="ip use to connect to extend server"),
                cfg.StrOpt('panic',
                           default='60',
                           help="tinker panic for ntpd")]
    CONF.register_opts(auth_opts, ntp_constant.SECTION_NTPSERVER)
    CONF.register_opts(ntp_opts, ntp_constant.SECTION_NTPSERVER)
    
    
    class NtpServer(threading.Thread):
        """该类主要处理如下.
    
        1.周期检查ntp.user.conf中的server是否可以设定为外部时钟源,
        若发现故障则用默认的配置文件ntp.user.conf
        2.进行上次强制同步的任务
        3.启动ntp-server的服务端
        """
        # ntpd的配置文件
        NTP_SERVER_ROOT = "/usr/local/bin/ntp-server"
        NTP_CONF_FILE_PATH = join(NTP_SERVER_ROOT, "conf/ntp.conf")
        NTP_USER_CONF_FILE_PATH = join(NTP_SERVER_ROOT, "conf/ntp.user.conf")
        STR_NTPD_PROCESS_CHAR = "%s/conf/" % NTP_SERVER_ROOT
        STR_NTPD_STOP_CMD = "sudo kill -9 `ps -eo pid,cmd ww | grep '%s'| " \
                            "grep -v grep| awk '{print $1}'`" % \
                            STR_NTPD_PROCESS_CHAR
        NTP_CONF_FILE = "ntp.conf"
        NTP_USER_CONF_FILE = "ntp.user.conf"
        NTP_BIN_PATH = "/usr/sbin"
    
        # 共享内存路径及文件名
        NTP_RESULT_DIRPATH = join(NTP_SERVER_ROOT, "ntp_server/result")
    
        # 喂心跳周期
        TIMER_INTERVAL = 1
    
        # 检查ntpd的配置文件的周期
        CYC_TIMER_INTERVAL = 30
    
        DEFAULT_INTERFACE = "brcps:ntp-s"
        LOCAL_IP = "127.127.1.0"
        restartflag = True
    
        EXTERNAL_CLOCK_OK = "OK"
        EXTERNAL_CLOCK_FAIL = "FAIL"
        EXTERNAL_CLOCK_BIG = "BIG_60"
    
        NTP_TOTAL_RESULT = "totalResult"
    
        def __init__(self):
            threading.Thread.__init__(self)
            threading.Thread.setName(self, "ntpSrvMon-Thread")
    
            # 获取ntp-server的浮动IP
            strIp, strPort = self._getNtpRestAddr()
            self.rest_ip = strIp
            self.is_ipv6_env = False
            if ipUtil.is_ipv6_format(self.rest_ip):
                self.is_ipv6_env = True
    
            # 启动周期性定时器检查配置文件,时间间隔为30s
            self._cycCheckTimer = NTPServerTimer(self.CYC_TIMER_INTERVAL,
                                                 self.__timerCycCheck)
            self._cycCheckTimer.start()
    
            # 启动线程监控ntpd进程
            if self.is_ipv6_env:
                NTPAlivenessCheck.STR_CHECK_NTPD = "/usr/sbin/ntpd -6 -c"
            self._aliveness_check = NTPAlivenessCheck()
            self._aliveness_check.start()
    
            # 启动进程喂心跳
            self.start()
    
            # 启动外部时钟源监控告警任务
            self._start_ext_monitor()
    
            # 启动restserver
            objRestApp = RestServer(strIp, strPort)
            objRestApp.start()
    
            # 启动rest监控线程
            addr_list = [(strIp, strPort)]
            objRestAliveApp = RestAliveWatcher(addr_list)
            objRestAliveApp.start()
    
        def run(self):
            mon_client = MonitorClient()
            while self.restartflag:
                if self._is_ntpd_running():
                    self._monitor_heart_beat_func(mon_client)
                time.sleep(self.TIMER_INTERVAL)
    
        def _is_ntpd_running(self):
            if self._aliveness_check.is_alive:
                return True
            else:
                return False
    
        def _getNtpRestAddr(self):
            try:
                strSysFile = join(CPSConstant.CFG_PATH, CPSConstant.SYS_FILE)
                strNetPolicyFile = join(CPSConstant.CFG_PATH,
                                        CPSConstant.NETPOLICY_FILE)
    
                ipList = NtpBase.getIpListBySysFile(strSysFile)
                strPort = int(NtpBase.parseNetpolicyFile(
                    strNetPolicyFile, "ntp-server", "floatingport"))
                strIpData = int(NtpBase.parseNetpolicyFile(
                    strNetPolicyFile, "ntp-server", "floatingip"))
    
                if ipList is None or strIpData is None:
                    log.error("the ipList or hostOsIp is none")
                    return None
    
                # 计算ip段总的ip个数
                ipNum = NtpBase.calcIpRange(ipList)
                if strIpData > ipNum or 0 == ipNum:
                    log.error("the hostOsIp is bigger than ipNum, "
                              "hostOsIp:%d, iprange:%s." %
                              (strIpData, str(ipList)))
                    return None
    
                strServerIp = NtpBase.calcHostOsIp(ipList, strIpData)
                return strServerIp, strPort
            except Exception:
                log.error("get ntp server rest addr fail.errorInfo:%s" %
                          str(traceback.format_exc()))
                sys.exit(1)
    
        def __timerCycCheck(self):
            """周期性检查ntp.user.conf文件是否可用.
    
            如果可用用ntp.user.conf起ntpd进程;
            否则用ntp.conf起
            """
            suitable_process = self.NTP_CONF_FILE_PATH
    
            try:
                # 设置硬件时钟
                cmd = "sudo `/usr/bin/which hwclock` --systohc -u"
                self._run_cmd(cmd)
    
                # 查看当前单板状态,在stopping/stop ok/sync ing/sync ok 状态下不作切换
                status = self._read_result_file(
                    NtpConstant.NTP_CLIENT_RESULT_FILE_PATH)
                if status in NtpConstant.NTP_CHANGE_CONF_ACTION:
                    log.warn("The status is %s, no swiching of ntpd." % status)
                    return
    
                # 解析ntp.user.conf文件,判断servert字段是否合法,不合法抛出异常
                self._check_ntp_file(self.NTP_USER_CONF_FILE_PATH)
    
                # 当前可以使用ntp.user.conf文件起ntpd进程
                suitable_process = self.NTP_USER_CONF_FILE_PATH
            except Exception as msg:
                log.error("Period task failed, msg is: %s" % msg)
    
            # 使用合适的配置文件起进程
            try:
                self._start_suitable_process(suitable_process)
            except Exception as msg:
                log.error("Start ntpd process failed, msg is: %s" %
                          traceback.format_exc())
    
        def _read_result_file(self, file_name):
            status = 0
            try:
                with open(file_name, 'r') as fp:
                    file_context = fp.readlines()[0]
                    file_dict = json.loads(file_context)
                    status = file_dict.get(self.NTP_TOTAL_RESULT,
                                           NtpConstant.UNDOING_STATUS_NUM)
            except Exception:
                log.error("read ntp result file throws exception %s" %
                          traceback.format_exc())
                return None
    
            return status
    
        def _check_ntp_file(self, file_name):
    
            # 读取ntp.user.conf文件,从中解析server、interface字段
            server, interface = self._read_user_confFile(file_name)
    
            # 查看server是否合法
            self._check_server(server, interface)
    
        def _read_user_confFile(self, file_name):
            """读取ntp.user.conf(或者ntp.conf,由输入决定)文件.
    
            返回server、interface字段。
            例如:[192.168.1.1, 192.168.1.2],[brcps:ntp-s, external_api]
            """
            server = []
            interface = []
    
            # ntp.user.conf文件不存在抛出异常
            if not os.path.isfile(file_name):
                msg = "There is no file named %s." % file_name
                raise Exception(msg)
    
            with open(file_name, 'r') as fp:
                file_context = fp.readlines()
    
            # 默认ntp.conf文件里面的数据都是由ntp-serverControl组装,
            # 因此格式合法,没有多个连续空格
            for line in file_context:
                if line.startswith("interface"):
                    interface_list = line.split()
                    interface.append(interface_list[2])
                    continue
    
                if line.startswith("server"):
                    server_list = line.split()
                    server.append(server_list[1])
                    continue
    
            return server, interface
    
        def _check_server(self, server_list, interface_list):
            """判断server字段是否合法。
    
            1.若interface_list只有brcps:ntp-s的只,则server_list中必须只有本地时钟源
            2.若interface_list中其他端口,则server_list必须要可以被访问的外部时钟源
            """
            # 如果使用本地时钟源,server必须只有一个127.127.1.0
            if 1 == len(server_list) and self.LOCAL_IP == server_list[0]:
                log.debug("Use local server.server is ok.")
                return
    
            # 如果是外部时钟源,至少有一个server能够进行同步
            is_clock_ok = False
            if self.DEFAULT_INTERFACE in interface_list:
                interface_list.remove(self.DEFAULT_INTERFACE)
            for server in server_list:
                # 对于存在本地时钟源场景,还需要检查跟外部时钟源的时间差
                # 如果时间差超过panic,ntpd进程会自行退出,所以还是需要进行配置
                # 文件切换,这里对于本地ip不进行检查
                if server == self.LOCAL_IP:
                    continue
                clock_status = self._check_external_clock(
                    server, interface_list[0])
                if clock_status == self.EXTERNAL_CLOCK_OK:
                    # 若存在一个可用的外部时钟源,则记录时钟源可用
                    is_clock_ok = True
                elif clock_status == self.EXTERNAL_CLOCK_BIG:
                    # 存在一个超过panic的时钟源,则切换为本地时钟源。
                    is_clock_ok = False
                    break
    
            # 如果所有外部时钟源都不能用就抛出异常
            if not is_clock_ok:
                msg = "All servers are not workable."
                raise Exception(msg)
    
        def _check_external_clock(self, server, interface):
            """检查外部时钟源,是否能够获取时间差,是否时间差小于panic"""
    
            # 通过ntpdate -d $server来检查外部时钟源是否故障
            cmd = ["sudo", "%s/ntpdate" % self.NTP_BIN_PATH,
                   "-r", "124", "-o", "3", "-d", str(server)]
            try:
                output = run_cmd_secure(cmd)
            except Exception as e:
                log.error("The external ntp source is fault or not supply service"
                          " for the moment, ip:%s, error: %s" % (server, e))
                return self.EXTERNAL_CLOCK_FAIL
    
            # 使用上述命令则会输出类似于
            # step/adjust time server 192.28.0.1 offset -124.397676 sec
            if "time server" not in output or "offset" not in output:
                log.error("The external ntp source is fault or not supply service "
                          "for the moment, ip:%s, output2:%s" % (server, output))
                return self.EXTERNAL_CLOCK_FAIL
    
            # offset时间差不能超过panic
            src_list = output.split("offset")
            src_time = src_list[-1]
            time_list = src_time.split(" ")
            time_date = time_list[1]
            offset_time = abs(float(time_date))
            if float(CONF.ntpserver["panic"]) <= offset_time:
                log.error("The offset time is more than %s second,time:%s, ip:%s" %
                          (CONF.ntpserver["panic"], offset_time, server))
                return self.EXTERNAL_CLOCK_BIG
    
            return self.EXTERNAL_CLOCK_OK
    
        def _start_suitable_process(self, conf):
            """启动ntpd进程.
    
            当前ntpd应该用配置文件start_process启动进程,如果环境上与预期不符则重新启动。
            start_process:/usr/local/bin/ntp-server/conf/ntp.conf或
            /usr/local/bin/ntp-server/conf/ntp.user.conf
            """
            # 查看当前是否启动start_process进程
            start_process = "%s/ntpd" % self.NTP_BIN_PATH
            is_match = False
            status, pid = CPSBase.check_process_by_pidof(start_process)
            if status == 0 and pid:
                is_match = CPSBase.match_pid(pid, conf)
            if not is_match:
                # 若查询不到该进程,则需要停止另一个进程,重启本进程
                log.warn("Need to run process:%s." % str(start_process))
                try:
                    self._run_cmd(self.STR_NTPD_STOP_CMD)
                except Exception:
                    log.info("Fail to kill ntpd. traceback is %s." %
                             traceback.format_exc())
    
                # 无论删除失败与否,都重启
                if self.is_ipv6_env:
                    cmd = "sudo %s/ntpd -6 -c %s" % (self.NTP_BIN_PATH, conf)
                else:
                    cmd = "sudo %s/ntpd -4 -c %s" % (self.NTP_BIN_PATH, conf)
                (status, output) = self._run_cmd(cmd)
                if status != 0:
                    msg = "Start ntpd failed, msg is %s" % output
                    raise Exception(msg)
    
        # 心跳注册到monitor
        def _monitor_heart_beat_func(self, mon_client):
            try:
                ret = mon_client.setHB("ntp-server")
                if ret:
                    raise Exception
            except Exception:
                log.error("Set ntp-server heartbeat failed")
    
        def _start_ext_monitor(self):
            task = ExtendServerMonitor(CONF.ntpserver["server"],
                                       CONF.ntpserver["active_ip"],
                                       self.is_ipv6_env)
            task.setDaemon(True)
            task.start()
    
        def _run_cmd(self, cmd):
            try:
                (status, output) = commands.getstatusoutput(cmd)
                return status, output
            except Exception as msg:
                log.error("Excute cmd %s failed, msg is : %s" % (cmd, msg))
                return 1, output
    
    
    class NTPAlivenessCheck(threading.Thread):
        NTP_SERVER_ROOT = "/usr/local/bin/ntp-server"
        STR_NTPD_PROCESS_CHAR = "%s/conf/" % NTP_SERVER_ROOT
        STR_CHECK_NTPD = "/usr/sbin/ntpd"
        CHECK_INTERVAL = 2
    
        def __init__(self):
            threading.Thread.__init__(self)
            threading.Thread.setName(self, "aliveness-Thread")
            self.is_alive = True
    
        def run(self):
            # 隔2秒检查一次ntpd是否在运行
            while True:
                try:
                    status, pids = CPSBase.check_process_by_pidof(
                        self.STR_CHECK_NTPD)
                    if status == 0 and pids:
                        self.is_alive = \
                            CPSBase.match_pid(pids, self.STR_NTPD_PROCESS_CHAR)
                    if not self.is_alive:
                        log.error("Ntpd(Server) is not Running")
                except subprocess.CalledProcessError as e:
                    log.error("The query ntp command failed. Exception is %s" %
                              str(e))
                time.sleep(self.CHECK_INTERVAL)
    
    
    class NTPTimerBase(threading.Thread):
    
        def __init__(self, interval, function, args=None, kwargs=None):
            if not args:
                args = []
            if not kwargs:
                kwargs = {}
            threading.Thread.__init__(self)
            threading.Thread.setName(self, "%s-Thread" % str(function))
            self.interval = interval
            self.function = function
            self.args = args
            self.kwargs = kwargs
            self.restartflag = bool(True)
    
        def _restart_flag_set(self, restartflag):
            self.restartflag = restartflag
    
        def run(self):
            while self.restartflag:
                try:
                    self.function(*self.args, **self.kwargs)
                except Exception as e:
                    log.error("Function err! " + str(e))
    
                time.sleep(self.interval)
    
    
    class NTPServerTimer(object):
    
        def __init__(self, interval, function, args=None, kwargs=None):
            if not args:
                args = []
            if not kwargs:
                kwargs = {}
            self._timer = NTPTimerBase(interval, function, args, kwargs)
    
        def start(self):
            self._timer.setDaemon(True)
            self._timer.start()
    
        def _close_timer(self):
            self._timer._restart_flag_set(False)
    
        def restart(self):
            self._timer.start()
    
        def stop(self):
            self._timer._restart_flag_set(False)
    
    
    def random_arping(cmd):
        for i in xrange(30):
            tmout = random.randint(30, 60)
            log.debug("count: %s, timeout: %s, cmd: %s" % (i, tmout, cmd))
            commands.getstatusoutput(cmd)
            time.sleep(tmout)
    
    
    def main():
        log.init("ntp-server")
        try:
            # read config ini
            CONF(args="", default_config_files=[CPSConstant.NTPSERVER_INI_FILE])
            ntp_server = NtpServer()
            bond = os.getenv("BASE_BOND", "brcps")
            if ipUtil.is_ipv6_format(ntp_server.rest_ip):
                cmd = "sudo ndisc6 %s %s 2>&1 1>/dev/null" \
                      % (ntp_server.rest_ip, bond)
            else:
                cmd = "sudo arping -U -I %s %s -c 3 2>&1 1>/dev/null" % \
                      (bond, ntp_server.rest_ip)
            thread.start_new(random_arping, (cmd, ))
        except Exception as e:
            log.error("start ntp-server fail, exe=%s, traceback=%s" %
                      (str(e), traceback.format_exc()))
            raise
    
        while True:
            time.sleep(60)
    
    
    if __name__ == "__main__":
        main()
    39372A03-8688-72A7-E911-805486341B42:/home/fsp #