39372A03-8688-72A7-E911-805486341B42:/home/fsp # cps template-instance-list --service ntp ntp-server
+------------+---------------+---------+--------------------------------------+----------------+
| instanceid | componenttype | status | runsonhost | omip |
+------------+---------------+---------+--------------------------------------+----------------+
| 0 | ntp-server | active | 39372A03-8688-72A7-E911-805486341B42 | 10.200.128.142 |
| 1 | ntp-server | standby | A6244EDB-F848-0786-E911-81559A70E331 | 10.200.130.127 |
+------------+---------------+---------+--------------------------------------+----------------+
39372A03-8688-72A7-E911-805486341B42:/home/fsp # date
Tue Sep 1 14:45:38 CST 2020
39372A03-8688-72A7-E911-805486341B42:/home/fsp # ntp time-delta --host all
39372A03-8688-72A7-E911-805486341B42:/home/fsp # ps -ef |grep ntp
cps 15476 1 0 Apr26 ? 09:57:50 /usr/bin/python /usr/local/bin/ntp-server/ntp_server/ntpserver.py
root 18696 1 0 Apr26 ? 00:53:13 /usr/sbin/ntpd -4 -c /usr/local/bin/ntp-server/conf/ntp.user.conf
root 22761 1 0 Apr26 ? 00:13:09 /usr/bin/python /usr/local/bin/ntp-client/ntp_client/ntpclient.py -s 172.28.9.37
39372A03-8688-72A7-E911-805486341B42:/home/fsp # ll /usr/local/bin/ntp-server/ntp_server/ntpserver.py
-rwxr-xr-x 1 cps cps 18956 Sep 8 2019 /usr/local/bin/ntp-server/ntp_server/ntpserver.py
39372A03-8688-72A7-E911-805486341B42:/home/fsp # cat /usr/local/bin/ntp-server/ntp_server/ntpserver.py
#!/usr/bin/python
# coding:utf-8
import commands
import json
import os
from os.path import join
import random
import subprocess
import sys
import thread
import threading
import time
import traceback
from FSHeartBeat.monitorClient import MonitorClient
from FSSysconf import ipUtil
import log
from oslo_config import cfg
from ntp_base.cpsbase import CPSBase
from ntp_base.cpsconstant import CPSConstant
from ntp_base import ntp_constant
from ntp_base.ntp_sync_mgr import NtpBase
from ntp_base.ntp_sync_mgr import NtpConstant
from ntp_base.time_server_monitor import ExtendServerMonitor
from ntp_base.utils import run_cmd_secure
from ntp_rest_server import RestAliveWatcher
from ntp_rest_server import RestServer
# 共享内存大小
MON_SHM_SIZE = 600000
CONF = cfg.CONF
auth_opts = [cfg.StrOpt("auth_host"),
cfg.StrOpt("auth_port"),
cfg.StrOpt("auth_protocol"),
cfg.StrOpt("auth_uri"),
cfg.StrOpt("signing_dir"),
cfg.StrOpt("admin_user"),
cfg.StrOpt("admin_password", secret=True),
cfg.StrOpt("auth_mode"),
cfg.StrOpt("admin_tenant_name"),
cfg.StrOpt("auth_version"),
cfg.StrOpt("keystone_offline_time"),
cfg.StrOpt("token_cache_time"),
cfg.StrOpt("check_revocations_for_cached"),
cfg.StrOpt("inseucre"),
cfg.StrOpt("revocation_cache_time"),
cfg.StrOpt("auth_admin_prefix"),
cfg.StrOpt("http_connect_timeout"),
cfg.ListOpt('hash_algorithms',
default=['sha256'],
secret=True,
help="used for generate token uuid")]
ntp_opts = [cfg.StrOpt('ciphers',
default='',
help="Set security encryption algorithm"),
cfg.StrOpt('ssl_verify_enable',
default='false',
help="whether check certfile"),
cfg.StrOpt('server',
default='127.127.1.0',
help="extend time server"),
cfg.StrOpt('active_ip',
default='',
help="ip use to connect to extend server"),
cfg.StrOpt('panic',
default='60',
help="tinker panic for ntpd")]
CONF.register_opts(auth_opts, ntp_constant.SECTION_NTPSERVER)
CONF.register_opts(ntp_opts, ntp_constant.SECTION_NTPSERVER)
class NtpServer(threading.Thread):
"""该类主要处理如下.
1.周期检查ntp.user.conf中的server是否可以设定为外部时钟源,
若发现故障则用默认的配置文件ntp.user.conf
2.进行上次强制同步的任务
3.启动ntp-server的服务端
"""
# ntpd的配置文件
NTP_SERVER_ROOT = "/usr/local/bin/ntp-server"
NTP_CONF_FILE_PATH = join(NTP_SERVER_ROOT, "conf/ntp.conf")
NTP_USER_CONF_FILE_PATH = join(NTP_SERVER_ROOT, "conf/ntp.user.conf")
STR_NTPD_PROCESS_CHAR = "%s/conf/" % NTP_SERVER_ROOT
STR_NTPD_STOP_CMD = "sudo kill -9 `ps -eo pid,cmd ww | grep '%s'| " \
"grep -v grep| awk '{print $1}'`" % \
STR_NTPD_PROCESS_CHAR
NTP_CONF_FILE = "ntp.conf"
NTP_USER_CONF_FILE = "ntp.user.conf"
NTP_BIN_PATH = "/usr/sbin"
# 共享内存路径及文件名
NTP_RESULT_DIRPATH = join(NTP_SERVER_ROOT, "ntp_server/result")
# 喂心跳周期
TIMER_INTERVAL = 1
# 检查ntpd的配置文件的周期
CYC_TIMER_INTERVAL = 30
DEFAULT_INTERFACE = "brcps:ntp-s"
LOCAL_IP = "127.127.1.0"
restartflag = True
EXTERNAL_CLOCK_OK = "OK"
EXTERNAL_CLOCK_FAIL = "FAIL"
EXTERNAL_CLOCK_BIG = "BIG_60"
NTP_TOTAL_RESULT = "totalResult"
def __init__(self):
threading.Thread.__init__(self)
threading.Thread.setName(self, "ntpSrvMon-Thread")
# 获取ntp-server的浮动IP
strIp, strPort = self._getNtpRestAddr()
self.rest_ip = strIp
self.is_ipv6_env = False
if ipUtil.is_ipv6_format(self.rest_ip):
self.is_ipv6_env = True
# 启动周期性定时器检查配置文件,时间间隔为30s
self._cycCheckTimer = NTPServerTimer(self.CYC_TIMER_INTERVAL,
self.__timerCycCheck)
self._cycCheckTimer.start()
# 启动线程监控ntpd进程
if self.is_ipv6_env:
NTPAlivenessCheck.STR_CHECK_NTPD = "/usr/sbin/ntpd -6 -c"
self._aliveness_check = NTPAlivenessCheck()
self._aliveness_check.start()
# 启动进程喂心跳
self.start()
# 启动外部时钟源监控告警任务
self._start_ext_monitor()
# 启动restserver
objRestApp = RestServer(strIp, strPort)
objRestApp.start()
# 启动rest监控线程
addr_list = [(strIp, strPort)]
objRestAliveApp = RestAliveWatcher(addr_list)
objRestAliveApp.start()
def run(self):
mon_client = MonitorClient()
while self.restartflag:
if self._is_ntpd_running():
self._monitor_heart_beat_func(mon_client)
time.sleep(self.TIMER_INTERVAL)
def _is_ntpd_running(self):
if self._aliveness_check.is_alive:
return True
else:
return False
def _getNtpRestAddr(self):
try:
strSysFile = join(CPSConstant.CFG_PATH, CPSConstant.SYS_FILE)
strNetPolicyFile = join(CPSConstant.CFG_PATH,
CPSConstant.NETPOLICY_FILE)
ipList = NtpBase.getIpListBySysFile(strSysFile)
strPort = int(NtpBase.parseNetpolicyFile(
strNetPolicyFile, "ntp-server", "floatingport"))
strIpData = int(NtpBase.parseNetpolicyFile(
strNetPolicyFile, "ntp-server", "floatingip"))
if ipList is None or strIpData is None:
log.error("the ipList or hostOsIp is none")
return None
# 计算ip段总的ip个数
ipNum = NtpBase.calcIpRange(ipList)
if strIpData > ipNum or 0 == ipNum:
log.error("the hostOsIp is bigger than ipNum, "
"hostOsIp:%d, iprange:%s." %
(strIpData, str(ipList)))
return None
strServerIp = NtpBase.calcHostOsIp(ipList, strIpData)
return strServerIp, strPort
except Exception:
log.error("get ntp server rest addr fail.errorInfo:%s" %
str(traceback.format_exc()))
sys.exit(1)
def __timerCycCheck(self):
"""周期性检查ntp.user.conf文件是否可用.
如果可用用ntp.user.conf起ntpd进程;
否则用ntp.conf起
"""
suitable_process = self.NTP_CONF_FILE_PATH
try:
# 设置硬件时钟
cmd = "sudo `/usr/bin/which hwclock` --systohc -u"
self._run_cmd(cmd)
# 查看当前单板状态,在stopping/stop ok/sync ing/sync ok 状态下不作切换
status = self._read_result_file(
NtpConstant.NTP_CLIENT_RESULT_FILE_PATH)
if status in NtpConstant.NTP_CHANGE_CONF_ACTION:
log.warn("The status is %s, no swiching of ntpd." % status)
return
# 解析ntp.user.conf文件,判断servert字段是否合法,不合法抛出异常
self._check_ntp_file(self.NTP_USER_CONF_FILE_PATH)
# 当前可以使用ntp.user.conf文件起ntpd进程
suitable_process = self.NTP_USER_CONF_FILE_PATH
except Exception as msg:
log.error("Period task failed, msg is: %s" % msg)
# 使用合适的配置文件起进程
try:
self._start_suitable_process(suitable_process)
except Exception as msg:
log.error("Start ntpd process failed, msg is: %s" %
traceback.format_exc())
def _read_result_file(self, file_name):
status = 0
try:
with open(file_name, 'r') as fp:
file_context = fp.readlines()[0]
file_dict = json.loads(file_context)
status = file_dict.get(self.NTP_TOTAL_RESULT,
NtpConstant.UNDOING_STATUS_NUM)
except Exception:
log.error("read ntp result file throws exception %s" %
traceback.format_exc())
return None
return status
def _check_ntp_file(self, file_name):
# 读取ntp.user.conf文件,从中解析server、interface字段
server, interface = self._read_user_confFile(file_name)
# 查看server是否合法
self._check_server(server, interface)
def _read_user_confFile(self, file_name):
"""读取ntp.user.conf(或者ntp.conf,由输入决定)文件.
返回server、interface字段。
例如:[192.168.1.1, 192.168.1.2],[brcps:ntp-s, external_api]
"""
server = []
interface = []
# ntp.user.conf文件不存在抛出异常
if not os.path.isfile(file_name):
msg = "There is no file named %s." % file_name
raise Exception(msg)
with open(file_name, 'r') as fp:
file_context = fp.readlines()
# 默认ntp.conf文件里面的数据都是由ntp-serverControl组装,
# 因此格式合法,没有多个连续空格
for line in file_context:
if line.startswith("interface"):
interface_list = line.split()
interface.append(interface_list[2])
continue
if line.startswith("server"):
server_list = line.split()
server.append(server_list[1])
continue
return server, interface
def _check_server(self, server_list, interface_list):
"""判断server字段是否合法。
1.若interface_list只有brcps:ntp-s的只,则server_list中必须只有本地时钟源
2.若interface_list中其他端口,则server_list必须要可以被访问的外部时钟源
"""
# 如果使用本地时钟源,server必须只有一个127.127.1.0
if 1 == len(server_list) and self.LOCAL_IP == server_list[0]:
log.debug("Use local server.server is ok.")
return
# 如果是外部时钟源,至少有一个server能够进行同步
is_clock_ok = False
if self.DEFAULT_INTERFACE in interface_list:
interface_list.remove(self.DEFAULT_INTERFACE)
for server in server_list:
# 对于存在本地时钟源场景,还需要检查跟外部时钟源的时间差
# 如果时间差超过panic,ntpd进程会自行退出,所以还是需要进行配置
# 文件切换,这里对于本地ip不进行检查
if server == self.LOCAL_IP:
continue
clock_status = self._check_external_clock(
server, interface_list[0])
if clock_status == self.EXTERNAL_CLOCK_OK:
# 若存在一个可用的外部时钟源,则记录时钟源可用
is_clock_ok = True
elif clock_status == self.EXTERNAL_CLOCK_BIG:
# 存在一个超过panic的时钟源,则切换为本地时钟源。
is_clock_ok = False
break
# 如果所有外部时钟源都不能用就抛出异常
if not is_clock_ok:
msg = "All servers are not workable."
raise Exception(msg)
def _check_external_clock(self, server, interface):
"""检查外部时钟源,是否能够获取时间差,是否时间差小于panic"""
# 通过ntpdate -d $server来检查外部时钟源是否故障
cmd = ["sudo", "%s/ntpdate" % self.NTP_BIN_PATH,
"-r", "124", "-o", "3", "-d", str(server)]
try:
output = run_cmd_secure(cmd)
except Exception as e:
log.error("The external ntp source is fault or not supply service"
" for the moment, ip:%s, error: %s" % (server, e))
return self.EXTERNAL_CLOCK_FAIL
# 使用上述命令则会输出类似于
# step/adjust time server 192.28.0.1 offset -124.397676 sec
if "time server" not in output or "offset" not in output:
log.error("The external ntp source is fault or not supply service "
"for the moment, ip:%s, output2:%s" % (server, output))
return self.EXTERNAL_CLOCK_FAIL
# offset时间差不能超过panic
src_list = output.split("offset")
src_time = src_list[-1]
time_list = src_time.split(" ")
time_date = time_list[1]
offset_time = abs(float(time_date))
if float(CONF.ntpserver["panic"]) <= offset_time:
log.error("The offset time is more than %s second,time:%s, ip:%s" %
(CONF.ntpserver["panic"], offset_time, server))
return self.EXTERNAL_CLOCK_BIG
return self.EXTERNAL_CLOCK_OK
def _start_suitable_process(self, conf):
"""启动ntpd进程.
当前ntpd应该用配置文件start_process启动进程,如果环境上与预期不符则重新启动。
start_process:/usr/local/bin/ntp-server/conf/ntp.conf或
/usr/local/bin/ntp-server/conf/ntp.user.conf
"""
# 查看当前是否启动start_process进程
start_process = "%s/ntpd" % self.NTP_BIN_PATH
is_match = False
status, pid = CPSBase.check_process_by_pidof(start_process)
if status == 0 and pid:
is_match = CPSBase.match_pid(pid, conf)
if not is_match:
# 若查询不到该进程,则需要停止另一个进程,重启本进程
log.warn("Need to run process:%s." % str(start_process))
try:
self._run_cmd(self.STR_NTPD_STOP_CMD)
except Exception:
log.info("Fail to kill ntpd. traceback is %s." %
traceback.format_exc())
# 无论删除失败与否,都重启
if self.is_ipv6_env:
cmd = "sudo %s/ntpd -6 -c %s" % (self.NTP_BIN_PATH, conf)
else:
cmd = "sudo %s/ntpd -4 -c %s" % (self.NTP_BIN_PATH, conf)
(status, output) = self._run_cmd(cmd)
if status != 0:
msg = "Start ntpd failed, msg is %s" % output
raise Exception(msg)
# 心跳注册到monitor
def _monitor_heart_beat_func(self, mon_client):
try:
ret = mon_client.setHB("ntp-server")
if ret:
raise Exception
except Exception:
log.error("Set ntp-server heartbeat failed")
def _start_ext_monitor(self):
task = ExtendServerMonitor(CONF.ntpserver["server"],
CONF.ntpserver["active_ip"],
self.is_ipv6_env)
task.setDaemon(True)
task.start()
def _run_cmd(self, cmd):
try:
(status, output) = commands.getstatusoutput(cmd)
return status, output
except Exception as msg:
log.error("Excute cmd %s failed, msg is : %s" % (cmd, msg))
return 1, output
class NTPAlivenessCheck(threading.Thread):
NTP_SERVER_ROOT = "/usr/local/bin/ntp-server"
STR_NTPD_PROCESS_CHAR = "%s/conf/" % NTP_SERVER_ROOT
STR_CHECK_NTPD = "/usr/sbin/ntpd"
CHECK_INTERVAL = 2
def __init__(self):
threading.Thread.__init__(self)
threading.Thread.setName(self, "aliveness-Thread")
self.is_alive = True
def run(self):
# 隔2秒检查一次ntpd是否在运行
while True:
try:
status, pids = CPSBase.check_process_by_pidof(
self.STR_CHECK_NTPD)
if status == 0 and pids:
self.is_alive = \
CPSBase.match_pid(pids, self.STR_NTPD_PROCESS_CHAR)
if not self.is_alive:
log.error("Ntpd(Server) is not Running")
except subprocess.CalledProcessError as e:
log.error("The query ntp command failed. Exception is %s" %
str(e))
time.sleep(self.CHECK_INTERVAL)
class NTPTimerBase(threading.Thread):
def __init__(self, interval, function, args=None, kwargs=None):
if not args:
args = []
if not kwargs:
kwargs = {}
threading.Thread.__init__(self)
threading.Thread.setName(self, "%s-Thread" % str(function))
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.restartflag = bool(True)
def _restart_flag_set(self, restartflag):
self.restartflag = restartflag
def run(self):
while self.restartflag:
try:
self.function(*self.args, **self.kwargs)
except Exception as e:
log.error("Function err! " + str(e))
time.sleep(self.interval)
class NTPServerTimer(object):
def __init__(self, interval, function, args=None, kwargs=None):
if not args:
args = []
if not kwargs:
kwargs = {}
self._timer = NTPTimerBase(interval, function, args, kwargs)
def start(self):
self._timer.setDaemon(True)
self._timer.start()
def _close_timer(self):
self._timer._restart_flag_set(False)
def restart(self):
self._timer.start()
def stop(self):
self._timer._restart_flag_set(False)
def random_arping(cmd):
for i in xrange(30):
tmout = random.randint(30, 60)
log.debug("count: %s, timeout: %s, cmd: %s" % (i, tmout, cmd))
commands.getstatusoutput(cmd)
time.sleep(tmout)
def main():
log.init("ntp-server")
try:
# read config ini
CONF(args="", default_config_files=[CPSConstant.NTPSERVER_INI_FILE])
ntp_server = NtpServer()
bond = os.getenv("BASE_BOND", "brcps")
if ipUtil.is_ipv6_format(ntp_server.rest_ip):
cmd = "sudo ndisc6 %s %s 2>&1 1>/dev/null" \
% (ntp_server.rest_ip, bond)
else:
cmd = "sudo arping -U -I %s %s -c 3 2>&1 1>/dev/null" % \
(bond, ntp_server.rest_ip)
thread.start_new(random_arping, (cmd, ))
except Exception as e:
log.error("start ntp-server fail, exe=%s, traceback=%s" %
(str(e), traceback.format_exc()))
raise
while True:
time.sleep(60)
if __name__ == "__main__":
main()
39372A03-8688-72A7-E911-805486341B42:/home/fsp #