import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
/**
*
*
* @author q4073
* @date 2020-07-02 19:53
*/
public class SequentialThreadFactory implements ThreadFactory {
private final static int INIT_MAX_SERIAL_NUMBER = 64;
private final ConcurrentSkipListSet<Integer> serialNumberPool;
private final int maxSerialNumber;
private final String threadNamePrefix;
private boolean daemon = false;
public SequentialThreadFactory(String threadNamePrefix) {
this(threadNamePrefix, INIT_MAX_SERIAL_NUMBER);
}
public SequentialThreadFactory(String threadNamePrefix, int expectantThreadCount) {
this.threadNamePrefix = threadNamePrefix;
serialNumberPool = new ConcurrentSkipListSet<>();
maxSerialNumber = expectantThreadCount;
for (int i = 0; i < maxSerialNumber; i++) {
serialNumberPool.add(i);
}
}
public SequentialThreadFactory withDaemon(boolean daemon) {
this.daemon = daemon;
return this;
}
public boolean isDaemon() {
return daemon;
}
private synchronized void expand() {
int newCount = maxSerialNumber * 2;
for (int i = maxSerialNumber; i < newCount; i++) {
serialNumberPool.add(i);
}
}
@Override
public Thread newThread(Runnable r) {
Integer serialNumber = serialNumberPool.pollFirst();
if (serialNumber == null) {
expand();
serialNumber = serialNumberPool.pollFirst();
serialNumber = serialNumber == null ? 0 : serialNumber;
}
RunnableProxy proxy = new RunnableProxy(serialNumber, r);
Thread t = new Thread(proxy, threadNamePrefix + serialNumber);
t.setDaemon(daemon);
return t;
}
static class LogRejectedHandler implements RejectedExecutionHandler {
private static final Logger LOG = LoggerFactory.getLogger(LogRejectedHandler.class);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
String msg = "推送任务队列严重阻塞!推送任务已被丢弃: " + r.toString();
LOG.error(msg);
}
}
private class RunnableProxy implements Runnable {
private final int serialNumber;
private final Runnable runnable;
RunnableProxy(int serialNumber, Runnable runnable) {
this.serialNumber = serialNumber;
this.runnable = runnable;
}
@Override
public void run() {
try {
runnable.run();
} finally {
serialNumberPool.add(serialNumber);
}
}
}
}