这一节我们讲解 logback 异步日志打印中 ArrayBlockingQueue 的使用。

异步日志打印模型概述

在高并发、高流量并且响应时间要求比较小的系统中同步打印日志已经满足不了需求了,这是因为打印日志本身是需要写磁盘的,写磁盘的操作会暂时阻塞调用打印日志的业务线程,这会造成调用线程的 rt 增加。如图 11-1 所示为同步日志打印模型。

ArrayBlockingQueue 的使用 - 图1

图 11-1

同步日志打印模型的缺点是将日志写入磁盘的操作是业务线程同步调用完成的,那么是否可以让业务线程把要打印的日志任务放入一个队列后直接返回,然后使用一个线程专门负责从队列中获取日志任务并将其写入磁盘呢?这样的话,业务线程打印日志的耗时就仅仅是把日志任务放入队列的耗时了,其实这就是 logback 提供的异步日志打印模型要做的事,具体如图 11-2 所示。

ArrayBlockingQueue 的使用 - 图2

图 11-2

由图 11-2 可知,其实 logback 的异步日志模型是一个多生产者-单消费者模型,其通过使用队列把同步日志打印转换为了异步,业务线程只需要通过调用异步 appender 把日志任务放入日志队列,而日志线程则负责使用同步的 appender 进行具体的日志打印。日志打印线程只需要负责生产日志并将其放入队列,而不需要关心消费线程何时把日志具体写入磁盘。

异步日志与具体实现

1.异步日志

一般配置同步日志打印时会在 logback 的 xml 文件里面配置如下内容。

  1. //(1)配置同步日志打印 appender
  2. <appender name=「PROJECT」 class=「ch.qos.logback.core.FileAppender」>
  3. <file>project.log</file>
  4. <encoding>UTF-8</encoding>
  5. <append>true</append>
  6. <rollingPolicy class=「ch.qos.logback.core.rolling.TimeBasedRollingPolicy」>
  7. <! -- daily rollover -->
  8. <fileNamePattern>project.log.%d{yyyy-MM-dd}</fileNamePattern>
  9. <! -- keep 7 days『 worth of history -->
  10. <maxHistory>7</maxHistory>
  11. </rollingPolicy>
  12. <layout class=「ch.qos.logback.classic.PatternLayout」>
  13. <pattern><! [CDATA[
  14. %n%-4r [%d{yyyy-MM-dd HH:mm:ss}] %X{productionMode} - %X{method}
  15. %X{requestURIWithQueryString} [ip=%X{remoteAddr}, ref=%X{referrer},
  16. ua=%X{userAgent}, sid=%X{cookie.JSESSIONID}]%n %-5level %logger{35} - %m%n
  17. ]]></pattern>
  18. </layout>
  19. </appender>
  20. //(2) 设置 logger
  21. <logger name=「PROJECT_LOGGER」 additivity=「false」>
  22. <level value=「WARN」 />
  23. <appender-ref ref=「PROJECT」 />
  24. </logger>

然后以如下方式使用。

ArrayBlockingQueue 的使用 - 图3

要把同步日志打印改为异步则需要修改 logback 的 xml 配置文件为如下所示。

  1. <appender name="PROJECT" class="ch.qos.logback.core.FileAppender">
  2. <file>project.log</file>
  3. <encoding>UTF-8</encoding>
  4. <append>true</append>
  5. <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  6. <! -- daily rollover -->
  7. <fileNamePattern>project.log.%d{yyyy-MM-dd}</fileNamePattern>
  8. <! -- keep 7 days' worth of history -->
  9. <maxHistory>7</maxHistory>
  10. </rollingPolicy>
  11. <layout class="ch.qos.logback.classic.PatternLayout">
  12. <pattern><! [CDATA[
  13. %n%-4r [%d{yyyy-MM-dd HH:mm:ss}] %X{productionMode} - %X{method}
  14. %X{requestURIWithQueryString} [ip=%X{remoteAddr}, ref=%X{referrer},
  15. ua=%X{userAgent}, sid=%X{cookie.JSESSIONID}]%n %-5level %logger{35} - %m%n
  16. ]]></pattern>
  17. </layout>
  18. </appender>
  19. <appender name="asyncProject" class="ch.qos.logback.classic.AsyncAppender">
  20. <discardingThreshold>0</discardingThreshold>
  21. <queueSize>1024</queueSize>
  22. <neverBlock>true</neverBlock>
  23. <appender-ref ref="PROJECT" />
  24. </appender>
  25. <logger name="PROJECT_LOGGER" additivity="false">
  26. <level value="WARN" />
  27. <appender-ref ref="asyncProject" />
  28. </logger>

由以上代码可以看出,AsyncAppender 是实现异步日志的关键,下一节主要讲它的内部实现。

2.异步日志实现原理

本文使用的 logback-classic 的版本为 1.0.13。我们首先从 AsyncAppender 的类图结构来认识下 AsyncAppender 的组件构成,如图 11-3 所示。

ArrayBlockingQueue 的使用 - 图4

图 11-3

由图 11-3 可知,AsyncAppender 继承自 AsyncAppenderBase,其中后者具体实现了异步日志模型的主要功能,前者只是重写了其中的一些方法。由该图可知,logback 中的异步日志队列是一个阻塞队列,其实就是有界阻塞队列 ArrayBlockingQueue,其中 queueSize 表示有界队列的元素个数,默认为 256 个。

worker 是个线程,也就是异步日志打印模型中的单消费者线程aai 是一个 appender 的装饰器,里面存放同步日志的 appender,其中 appenderCount 记录 aai 里面附加的同步 appender 的个数。neverBlock 用来指示当日志队列满时是否阻塞打印日志的线程。discardingThreshold 是一个阈值,当日志队列里面的空闲元素个数小于该值时,新来的某些级别的日志会被直接丢弃,下面会具体讲。

首先我们来看何时创建日志队列,以及何时启动消费线程,这需要看 AsyncAppenderBase 的 start 方法。该方法在解析完配置 AsyncAppenderBase 的 xml 的节点元素后被调用。

  1. public void start() {
  2. ...
  3. //(1)日志队列为有界阻塞队列
  4. blockingQueue = new ArrayBlockingQueue<E>(queueSize);
  5. //(2)如果没设置 discardingThreshold 则设置为队列大小的 1/5
  6. if discardingThreshold == UNDEFINED
  7. discardingThreshold = queueSize / 5
  8. //(3)设置消费线程为守护线程,并设置日志名称
  9. worker.setDaemontrue);
  10. worker.setName(「AsyncAppender-Worker-」 + worker.getName());
  11. //(4)设置启动消费线程
  12. super.start();
  13. worker.start();
  14. }

由以上代码可知,logback 使用的是有界队列 ArrayBlockingQueue,之所以使用有界队列是考虑内存溢出问题。在高并发下写日志的 QPS 会很高,如果设置为无界队列,队列本身会占用很大的内存,很可能会造成 OOM。

这里消费日志队列的 worker 线程被设置为守护线程,这意味着当主线程运行结束并且当前没有用户线程时,该 worker 线程会随着 JVM 的退出而终止,而不管日志队列里面是否还有日志任务未被处理。另外,这里设置了线程的名称,这是个很好的习惯,因为在查找问题时会很有帮助,根据线程名字就可以定位线程。

既然是有界队列,那么肯定需要考虑队列满的问题,是丢弃老的日志任务,还是阻塞日志打印线程直到队列有空余元素呢?要回答这个问题,我们需要看看具体进行日志打印的 AsyncAppenderBase 的 append 方法。

  1. protected void appendE eventObject {
  2. //(5)调用 AsyncAppender 重写的 isDiscardable 方法
  3. if isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) {
  4. return
  5. }
  6. ...
  7. //(6)将日志任务放入队列
  8. puteventObject);
  9. }
  10. private boolean isQueueBelowDiscardingThreshold() {
  11. return blockingQueue.remainingCapacity() < discardingThreshold);
  12. }

其中代码(5)调用了 AsyncAppender 重写的 isDiscardable 方法,该方法的具体内容为

  1. //(7)
  2. protected boolean isDiscardable(ILoggingEvent event) {
  3. Level level = event.getLevel();
  4. return level.toInt() <= Level.INFO_INT;
  5. }

结合代码(5)和代码(7)可知,如果当前日志的级别小于等于 INFO_INT 并且当前队列的剩余容量小于 discardingThreshold 则会直接丢弃这些日志任务。

下面看具体代码(6)中的 put 方法。

  1. private void put(E eventObject) {
  2. //(8)
  3. if (neverBlock) {
  4. blockingQueue.offer(eventObject);
  5. } else {
  6. try {//(9)
  7. blockingQueue.put(eventObject);
  8. } catch (InterruptedException e) {
  9. // Interruption of current thread when in doAppend method should not
  10. // be consumed
  11. // by AsyncAppender
  12. Thread.currentThread().interrupt();
  13. }
  14. }
  15. }

如果 neverBlock 被设置为 false(默认为 false)则会调用阻塞队列的 put 方法,而 put 是阻塞的,也就是说如果当前队列满,则在调用 put 方法向队列放入一个元素时调用线程会被阻塞直到队列有空余空间。这里可以看下 put 方法的实现。

  1. public void putE e throws InterruptedException {
  2. ...
  3. final ReentrantLock lock = this.lock
  4. lock.lockInterruptibly();
  5. try {
  6. //如果队列满,则调用 await 方法阻塞当前调用线程
  7. while count == items.length
  8. notFull.await();
  9. enqueuee);
  10. } finally {
  11. lock.unlock();
  12. }
  13. }

这里有必要解释下代码(9),当日志队列满时 put 方法会调用 await()方法阻塞当前线程,而如果其他线程中断了该线程,那么该线程会抛出 InterruptedException 异常,并且当前的日志任务就会被丢弃。在 logback-classic 的 1.2.3 版本中,则添加了不对中断进行响应的方法。

  1. private void put(E eventObject) {
  2. if (neverBlock) {
  3. blockingQueue.offer(eventObject);
  4. } else {
  5. putUninterruptibly(eventObject);
  6. }
  7. }
  8. private void putUninterruptibly(E eventObject) {
  9. boolean interrupted = false;
  10. try {
  11. while (true) {
  12. try {
  13. blockingQueue.put(eventObject);
  14. break;
  15. } catch (InterruptedException e) {
  16. interrupted = true;
  17. }
  18. }
  19. } finally {
  20. if (interrupted) {
  21. Thread.currentThread().interrupt();
  22. }
  23. }
  24. }

如果当前日志打印线程在调用 blockingQueue.put 时被其他线程中断,则只是记录中断标志,然后继续循环调用 blockingQueue.put,尝试把日志任务放入日志队列。新版本的这个实现通过使用循环保证了即使当前线程被中断,日志任务最终也会被放入日志队列。

如果 neverBlock 被设置为 true 则会调用阻塞队列的 offer 方法,而该方法是非阻塞的,所以如果当前队列满,则会直接返回,也就是丢弃当前日志任务。这里回顾下 offer 方法的实现。

  1. public boolean offerE e {
  2. ...
  3. final ReentrantLock lock = this.lock
  4. lock.lock();
  5. try {
  6. //如果队列满则直接返回 false。
  7. if count == items.length
  8. return false
  9. else {
  10. enqueuee);
  11. return true
  12. }
  13. } finally {
  14. lock.unlock();
  15. }
  16. }

最后来看 addAppender 方法都做了什么。

  1. public void addAppender(Appender<E> newAppender) {
  2. if (appenderCount == 0) {
  3. appenderCount++;
  4. ...
  5. aai.addAppender(newAppender);
  6. } else {
  7. addWarn("One and only one appender may be attached to AsyncAppender.");
  8. addWarn("Ignoring additional appender named [" + newAppender.getName() + "]");
  9. }
  10. }

由如上代码可知,一个异步 appender 只能绑定一个同步 appender。这个 appender 会被放到 AppenderAttachableImpl 的 appenderList 列表里面。

到这里我们已经分析完了日志生产线程把日志任务放入日志队列的实现,下面一起来看消费线程是如何从队列里面消费日志任务并将其写入磁盘的。由于消费线程是一个线程,所以就从 worker 的 run 方法开始。

  1. class Worker extends Thread {
  2. public void run() {
  3. AsyncAppenderBase<E> parent = AsyncAppenderBase.this
  4. AppenderAttachableImpl<E> aai = parent.aai
  5. //(10)一直循环直到该线程被中断
  6. while parent.isStarted()) {
  7. try {//(11)从阻塞队列获取元素
  8. E e = parent.blockingQueue.take();
  9. aai.appendLoopOnAppenderse);
  10. } catch InterruptedException ie {
  11. break
  12. }
  13. }
  14. //(12)到这里说明该线程被中断,则把队列里面的剩余日志任务
  15. //刷新到磁盘
  16. for E e : parent.blockingQueue {
  17. aai.appendLoopOnAppenderse);
  18. parent.blockingQueue.removee);
  19. }
  20. ...
  21. .. }
  22. }

其中代码(11)使用 take 方法从日志队列获取一个日志任务,如果当前队列为空则当前线程会被阻塞直到队列不为空才返回。获取到日志任务后会调用 AppenderAttachableImpl 的 aai.appendLoopOnAppenders 方法,该方法会循环调用通过 addAppender 注入的同步日志,appener 具体实现把日志打印到磁盘。

小结

本节结合 logback 中异步日志的实现介绍了并发组件 ArrayBlockingQueue 的使用,包括 put、offer 方法的使用场景以及它们之间的区别,take 方法的使用,同时也介绍了如何使用 ArrayBlockingQueue 来实现一个多生产者-单消费者模型。另外使用 ArrayBlockingQueue 时需要注意合理设置队列的大小以免造成 OOM,队列满或者剩余元素比较少时,要根据具体场景制定一些抛弃策略以避免队列满时业务线程被阻塞。