// 高3位表示线程池状态,低29位表示线程池个数(默认是RUNNING状态线程个数为0) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 线程个数掩码位数 private static final int COUNT_BITS = Integer.SIZE - 3; // 线程最大个数29个二进制1 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// ***线程池状态***: // 运行状态(高三位)111 private static final int RUNNING = -1 << COUNT_BITS; // 暂停状态(高三位)000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 停止状态(高三位)001 private static final int STOP = 1 << COUNT_BITS; // 完成状态(高三位)010 private static final int TIDYING = 2 << COUNT_BITS; // 终止状态(高三位)011 private static final int TERMINATED = 3 << COUNT_BITS;
// 获得高三位(运行状态) private static int runStateOf(int c) { return c & ~CAPACITY; } // 获得低29位(线程个数) private static int workerCountOf(int c) { return c & CAPACITY; } 计算新值 private static int ctlOf(int rs, int wc) { return rs | wc; }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
if (command == null) throw new NullPointerException(); //获得当前线程池的状态和线程的个数组合值 int c = ctl.get(); //如果当前工作线程数小于核心线程数则创建线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //如果线程池是运行状态,则入队 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //二次检测,如果不是运行状态了就移除该线程,并执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); //如果当前线程池为空,则添加一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果队列满,则新增线程,新增线程则执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //这里比较绕下面会分析 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //如果当前运行线程超过了限制就会返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //同一时间只会有一个线程操作成功 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //如果当前线程池状态被改变则回到上层循环重新获得线程池状态 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
判断那里在三种情况下会返回false
当前线程池状态为STOP,TIDYING,TERMINATED
当前线程池状态为SHUTDOWN而且有第一个任务
当前线程池状态为SHUTDOWN而且任务队列为空
任务队列和拒绝策略
workQueue任务队列
任务队列一般分为4种,直接提交队列,有界队列,无界队列,优先队列。
SynchronousQueue(直接提交队列)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
public class TestThreadPoolExecutor { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,2,100, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); for (int i=0;i<5;i++){ threadPoolExecutor.execute(new Thread(new HelloTask(),"线程"+i)); } } public static class HelloTask implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName()+" Hello World! "); } }
}
输出结果为:
pool-1-thread-1 Hello World! pool-1-thread-2 Hello World! Exception in thread “main” java.util.concurrent.RejectedExecutionException: Task Thread[线程2,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@14ae5a5[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at com.czj.example01.TestThreadPoolExceutor.main(TestThreadPoolExceutor.java:12)
public class TestThreadPoolExecutor { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,10,100, TimeUnit.SECONDS,new PriorityBlockingQueue<>()); for (int i=0;i<10;i++){ threadPoolExecutor.execute(new HelloTask("线程"+i,i)); } } public static class HelloTask implements Runnable,Comparable<HelloTask>{
@Override public int compareTo(HelloTask o) { if (this==o){ return 0; } int num = this.priority-o.priority; if (num==0){ return 0; } //PriorityBlockingQueue底层实现是最小堆,所以比较越小越优先。 return num>0?-1:1; } }
}
输出结果
线程0 Start Hello World! 线程0 End Goodbye 线程9 Start Hello World! 线程9 End Goodbye 线程8 Start Hello World! 线程8 End Goodbye 线程7 Start Hello World! 线程7 End Goodbye 线程6 Start Hello World! 线程6 End Goodbye 线程5 Start Hello World! 线程5 End Goodbye 线程4 Start Hello World! 线程4 End Goodbye 线程3 Start Hello World! 线程3 End Goodbye 线程2 Start Hello World! 线程2 End Goodbye 线程1 Start Hello World! 线程1 End Goodbye