由于项目目前业务场景涉及服务的水平扩缩容,这就要求每台服务器上的后台服务具备平滑退出的能力。
所谓平滑退出,是指服务器被收回时,项目内部的业务操作不会因为服务器被收回导致业务上的错误,尤其是项目使用到了一些三方中间件,比如说redis,nacos或kafka等等。
当下的实现方式涉及两个方面。其一是kafka根据关闭信号停止消费消息,其二是当jvm获取到退出信号后,项目内部维护的线程池,能够保证活跃的线程处理完其负责的任务在销毁。
针对两个问题,实现步骤如下:
第一,kafka主动停止消费
@Slf4j
@RestController
public class ExitController {
@Autowired
private KafkaListenerEndpointRegistry registry;
@GetMapping("/exit/{password}")
public ResultHelper<Integer> materialToADS(@PathVariable("password") String password) {
//最大五千
return BizTemplate.execute(new BizCallBack<Integer>() {
@Override
public void paramCheck() {
if (!StringUtils.equals(password, "password")) {
throw new BizException("无效请求!");
}
}
@Override
public Integer preCheck() {
return null;
}
@Override
public Integer execute() throws Exception {
try {
log.info("========收到关闭指令========");
log.info("========kafka停止接收消息========");
log.info("========阻塞60s,等待现有任务消费完毕========");
Set<String> containerIds = registry.getListenerContainerIds();
for (String containerId : containerIds) {
registry.getListenerContainer(containerId).stop();
}
try {
Thread.sleep(1000 * 60);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("========服务即将关闭========");
System.exit(0);
} catch (Exception e) {
log.error("异常信息,error:", e);
log.info("========服务退出异常========");
return 1;
}
log.info("========服务正常退出中。。。========");
return 0;
}
});
}
}
第二,线程池的退出
public class ThreadPoolShutDownHook {
private static final ThreadPoolShutDownHook INSTANCE = new ThreadPoolShutDownHook();
private List<ExecutorService> executorServices = Lists.newArrayList();
private AtomicBoolean closed = new AtomicBoolean(false);
public static ThreadPoolShutDownHook getInstance() {
return INSTANCE;
}
private ThreadPoolShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
shutdown();
}
});
}
@PreDestroy
public void shutdown() {
if (!closed.compareAndSet(false, true)) {
return;
}
for (ExecutorService executorService : executorServices) {
tryShutdownNow(executorService);
}
}
private void tryShutdownNow(ExecutorService executorService) {
try {
executorService.shutdownNow();
} catch (Throwable e) {
//ignore logger maybe has been destroyed
}
try {
executorService.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//ignore logger maybe has been destroyed
}
}
public ExecutorService register(ExecutorService executorService) {
this.executorServices.add(executorService);
return executorService;
}
}
需要注意的是项目内部的线程池在配置完毕后需要注册近钩子维护的列表中。
第三步是对项目关停脚本进行编写
#!/bin/bash
APP_NAME=project_name.jar
SPRING_PROFILES_ACTIVE=pro
#使用说明,用来提示输入参数
usage() {
echo "Usage: sh 执行脚本.sh [start|stop|restart[r]|status]"
exit 1
}
#检查程序是否在运行
is_exist() {
pid=$(ps -ef | grep $APP_NAME | grep -v grep | awk '{print $2}')
#如果不存在返回1,存在返回0
if [ -z "${pid}" ]; then
return 1
else
return 0
fi
}
#启动方法
start() {
is_exist
if [ $? -eq "0" ]; then
echo " ## ${APP_NAME} 该服务已经运行. pid=${pid} ."
else
if [ "$2" != "" ]; then
SPRING_PROFILES_ACTIVE=$2
fi
nohup java -server -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=512m -Xms512m -Xmx4g -Xmn256m -Xss512k -XX:+StartAttachListener -verbose:gc -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=oom.hprof -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=18887 -jar $APP_NAME --spring.profiles.active=${SPRING_PROFILES_ACTIVE} >/dev/null 2>&1 &
echo " ## $APP_NAME 启动成功 ! 已激活profiles: ${SPRING_PROFILES_ACTIVE}"
fi
}
#停止方法
stop() {
is_exist
if [ $? -eq "0" ]; then
echo " ## ${APP_NAME} 关闭signal已发出,请耐心等待"
curl http://localhost:port/exit/password
status=0
until [ $status -eq "1" ]; do
is_exist
status=$?
done
echo " ## status is $status "
echo " ## ${APP_NAME} 服务已关闭"
else
echo " ## ${APP_NAME} 服务没法有运行,无法停止"
fi
}
#输出运行状态
status() {
is_exist
if [ $? -eq "0" ]; then
echo " ## ${APP_NAME} 服务正在运行. 进程号(pid): ${pid}"
else
echo " ## ${APP_NAME} 服务已停止运行."
fi
}
#重启
restart() {
stop
deployJar
start
}
#根据输入参数,选择执行对应方法,不输入则执行使用说明
case "$1" in
"start")
start
;;
"stop")
stop
;;
"status")
status
;;
"restart")
restart
;;
"r")
restart
;;
*)
usage
;;
esac