1. import org.slf4j.Logger;
    2. import org.slf4j.LoggerFactory;
    3. import java.util.concurrent.ConcurrentSkipListSet;
    4. import java.util.concurrent.RejectedExecutionHandler;
    5. import java.util.concurrent.ThreadFactory;
    6. import java.util.concurrent.ThreadPoolExecutor;
    7. /**
    8. *
    9. *
    10. * @author q4073
    11. * @date 2020-07-02 19:53
    12. */
    13. public class SequentialThreadFactory implements ThreadFactory {
    14. private final static int INIT_MAX_SERIAL_NUMBER = 64;
    15. private final ConcurrentSkipListSet<Integer> serialNumberPool;
    16. private final int maxSerialNumber;
    17. private final String threadNamePrefix;
    18. private boolean daemon = false;
    19. public SequentialThreadFactory(String threadNamePrefix) {
    20. this(threadNamePrefix, INIT_MAX_SERIAL_NUMBER);
    21. }
    22. public SequentialThreadFactory(String threadNamePrefix, int expectantThreadCount) {
    23. this.threadNamePrefix = threadNamePrefix;
    24. serialNumberPool = new ConcurrentSkipListSet<>();
    25. maxSerialNumber = expectantThreadCount;
    26. for (int i = 0; i < maxSerialNumber; i++) {
    27. serialNumberPool.add(i);
    28. }
    29. }
    30. public SequentialThreadFactory withDaemon(boolean daemon) {
    31. this.daemon = daemon;
    32. return this;
    33. }
    34. public boolean isDaemon() {
    35. return daemon;
    36. }
    37. private synchronized void expand() {
    38. int newCount = maxSerialNumber * 2;
    39. for (int i = maxSerialNumber; i < newCount; i++) {
    40. serialNumberPool.add(i);
    41. }
    42. }
    43. @Override
    44. public Thread newThread(Runnable r) {
    45. Integer serialNumber = serialNumberPool.pollFirst();
    46. if (serialNumber == null) {
    47. expand();
    48. serialNumber = serialNumberPool.pollFirst();
    49. serialNumber = serialNumber == null ? 0 : serialNumber;
    50. }
    51. RunnableProxy proxy = new RunnableProxy(serialNumber, r);
    52. Thread t = new Thread(proxy, threadNamePrefix + serialNumber);
    53. t.setDaemon(daemon);
    54. return t;
    55. }
    56. static class LogRejectedHandler implements RejectedExecutionHandler {
    57. private static final Logger LOG = LoggerFactory.getLogger(LogRejectedHandler.class);
    58. @Override
    59. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    60. String msg = "推送任务队列严重阻塞!推送任务已被丢弃: " + r.toString();
    61. LOG.error(msg);
    62. }
    63. }
    64. private class RunnableProxy implements Runnable {
    65. private final int serialNumber;
    66. private final Runnable runnable;
    67. RunnableProxy(int serialNumber, Runnable runnable) {
    68. this.serialNumber = serialNumber;
    69. this.runnable = runnable;
    70. }
    71. @Override
    72. public void run() {
    73. try {
    74. runnable.run();
    75. } finally {
    76. serialNumberPool.add(serialNumber);
    77. }
    78. }
    79. }
    80. }