由于项目目前业务场景涉及服务的水平扩缩容,这就要求每台服务器上的后台服务具备平滑退出的能力。
所谓平滑退出,是指服务器被收回时,项目内部的业务操作不会因为服务器被收回导致业务上的错误,尤其是项目使用到了一些三方中间件,比如说redis,nacos或kafka等等。
当下的实现方式涉及两个方面。其一是kafka根据关闭信号停止消费消息,其二是当jvm获取到退出信号后,项目内部维护的线程池,能够保证活跃的线程处理完其负责的任务在销毁。
针对两个问题,实现步骤如下:
第一,kafka主动停止消费
@Slf4j@RestControllerpublic class ExitController {@Autowiredprivate KafkaListenerEndpointRegistry registry;@GetMapping("/exit/{password}")public ResultHelper<Integer> materialToADS(@PathVariable("password") String password) {//最大五千return BizTemplate.execute(new BizCallBack<Integer>() {@Overridepublic void paramCheck() {if (!StringUtils.equals(password, "password")) {throw new BizException("无效请求!");}}@Overridepublic Integer preCheck() {return null;}@Overridepublic Integer execute() throws Exception {try {log.info("========收到关闭指令========");log.info("========kafka停止接收消息========");log.info("========阻塞60s,等待现有任务消费完毕========");Set<String> containerIds = registry.getListenerContainerIds();for (String containerId : containerIds) {registry.getListenerContainer(containerId).stop();}try {Thread.sleep(1000 * 60);} catch (InterruptedException e) {e.printStackTrace();}log.info("========服务即将关闭========");System.exit(0);} catch (Exception e) {log.error("异常信息,error:", e);log.info("========服务退出异常========");return 1;}log.info("========服务正常退出中。。。========");return 0;}});}}
第二,线程池的退出
public class ThreadPoolShutDownHook {private static final ThreadPoolShutDownHook INSTANCE = new ThreadPoolShutDownHook();private List<ExecutorService> executorServices = Lists.newArrayList();private AtomicBoolean closed = new AtomicBoolean(false);public static ThreadPoolShutDownHook getInstance() {return INSTANCE;}private ThreadPoolShutDownHook() {Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic void run() {shutdown();}});}@PreDestroypublic void shutdown() {if (!closed.compareAndSet(false, true)) {return;}for (ExecutorService executorService : executorServices) {tryShutdownNow(executorService);}}private void tryShutdownNow(ExecutorService executorService) {try {executorService.shutdownNow();} catch (Throwable e) {//ignore logger maybe has been destroyed}try {executorService.awaitTermination(1, TimeUnit.SECONDS);} catch (InterruptedException e) {//ignore logger maybe has been destroyed}}public ExecutorService register(ExecutorService executorService) {this.executorServices.add(executorService);return executorService;}}
需要注意的是项目内部的线程池在配置完毕后需要注册近钩子维护的列表中。
第三步是对项目关停脚本进行编写
#!/bin/bashAPP_NAME=project_name.jarSPRING_PROFILES_ACTIVE=pro#使用说明,用来提示输入参数usage() {echo "Usage: sh 执行脚本.sh [start|stop|restart[r]|status]"exit 1}#检查程序是否在运行is_exist() {pid=$(ps -ef | grep $APP_NAME | grep -v grep | awk '{print $2}')#如果不存在返回1,存在返回0if [ -z "${pid}" ]; thenreturn 1elsereturn 0fi}#启动方法start() {is_existif [ $? -eq "0" ]; thenecho " ## ${APP_NAME} 该服务已经运行. pid=${pid} ."elseif [ "$2" != "" ]; thenSPRING_PROFILES_ACTIVE=$2finohup java -server -XX:MetaspaceSize=256m -XX:MaxMetaspaceSize=512m -Xms512m -Xmx4g -Xmn256m -Xss512k -XX:+StartAttachListener -verbose:gc -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=oom.hprof -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=18887 -jar $APP_NAME --spring.profiles.active=${SPRING_PROFILES_ACTIVE} >/dev/null 2>&1 &echo " ## $APP_NAME 启动成功 ! 已激活profiles: ${SPRING_PROFILES_ACTIVE}"fi}#停止方法stop() {is_existif [ $? -eq "0" ]; thenecho " ## ${APP_NAME} 关闭signal已发出,请耐心等待"curl http://localhost:port/exit/passwordstatus=0until [ $status -eq "1" ]; dois_existstatus=$?doneecho " ## status is $status "echo " ## ${APP_NAME} 服务已关闭"elseecho " ## ${APP_NAME} 服务没法有运行,无法停止"fi}#输出运行状态status() {is_existif [ $? -eq "0" ]; thenecho " ## ${APP_NAME} 服务正在运行. 进程号(pid): ${pid}"elseecho " ## ${APP_NAME} 服务已停止运行."fi}#重启restart() {stopdeployJarstart}#根据输入参数,选择执行对应方法,不输入则执行使用说明case "$1" in"start")start;;"stop")stop;;"status")status;;"restart")restart;;"r")restart;;*)usage;;esac
