提示
Java 多线程与并发相关。@ermo
# 3 Java 多线程与并发
# 3.1 进程和线程
# 线程和线程的概念?
进程是一次独立的程序过程,是操作系统分配资源的最小单位,进程与进程之间相互独立,互不干扰,可以理解为一个应用程序就是一个进程。
线程是程序执行流中的最小单元,一个进程有多个线程,每个线程有自己独立的程序计数器、虚拟机栈、本地方法栈。多个线程之间共享当前进程的内存空间。
# 并行和并发的区别是什么?
并行是多个处理器同时处理多个不同的任务,多个任务在同一时刻发生。
并发是一个处理器同时处理多个任务,多个任务在微观上的同一时间段发生。
举例说明,2个售票窗口排着2队人,售票员(处理器)一会去 A 窗口卖票,一会去 B 窗口卖票,这就是并发。
2个售票窗口排着2队人,每个售票窗口都有一名售票员(处理器),购票人(任务)可以同时买票,这就是并行。
# 说一下你对 Java 内存模型的理解?
Java 内存模型(Java Memory Model,JMM)是一种规范,定义 Java 程序中多线程访问共享变量时内存的使用和交互行为。在 Java 之前,编程语言可以直接复用操作系统的内存模型,但是 Java 是一门跨平台语言,面对不同操作系统的差异性,Java 自己定义了一套内存模型用于屏蔽操作系统的差异性。
JMM 的出现是为了按需禁用 CPU 缓存和指令重排带来的可见性问题。JMM 定义了主内存和工作内存:
- 主内存:所有变量都必须存到主内存(Main Memory)中,包括成员变量和方法中的局部变量。
- 工作内存:每个线程都有一个工作内存(Working Memory)用于存储共享变量的副本,工作内存是私有的,只能被当前线程访问。
上图中,如果2个线程需要对一个共享变量进行交互,就必须执行2个操作:
- 线程1将工作内存中修改过的共享变量副本同步大主内存中。
- 线程2去主内存中读取共享变量。
针对线程、工作内存和主内存的交互,JMM 定义了8种原子操作:
- lock(锁定)
- unlock(解锁)
- read(读取)
- load(载入)
- use(使用)
- assign(赋值)
- store(存储)
- write(写入)
共享变量在工作内存与主内存的交互必须使用上述8中原子操作,并且执行顺序必须按照 JMM 的规则。比如,变量从主内存复制到工作内存,必须顺序执行 read 和 load 操作;变量从工作内存回到主内存必须顺序执行 store 和 write 操作。
有了 JMM,Java 可以解决多线程中的三个主要问题:
- 原子性:只有基本数据类型的读取和赋值属于原子性操作,更大范围的原子性操作可以使用
synchronized
和Lock
来解决。 - 可见性:当一个线程修改了变量的值,其他线程可以立即获取到最新的修改。为了保证共享变量在多个线程之间可见,Java 使用
volatile
、synchronized
和final
关键字来解决,本质上是使用 happens-before 原则。 - 有序性:为防止指令重排,Java 使用了
volatile
和synchronized
来保证指令的有序性。防止指令重排的底层原理是内存屏障,加上volatile
的变量反编译后会出现lock addl $0x0,(%esp)
的操作,相当于一个 lock 指令,也就是内存屏障指令。
JMM 中规定的一组 happens-before 用于保证两个操作之间的内存可见性,即操作1 happens-before 操作2,那么操作1的执行结果对操作2可见。一共有8条规则,与书写代码相关的规则有:
- 程序顺序规则:一个线程内,程序按照代码书写顺序执行,前面的动作 happens-befores 后面的动作。
- 监视器锁定规则:监视器的解锁动作 happens-before 后续对这个监视器的锁定动作。
volatile
变量规则:对volatile
的写入动作 happens-before 后续对这个字段的读取动作。- 传递性:A happens-before B,B happens-before C,可以推导出 A happens-before C。
- 线程启动规则:
Thread
中的start
方法 happens-before 线程run
中的任一个动作。
# 线程的生命周期和状态包括哪些?
线程的状态:新建(New),就绪(Runable),运行(Running),阻塞(Blocking),无限期等待(Waiting),限期等待(Timed Waiting),结束(Terminated)。
关于进入每种状态的节点:
- 新建(New)
线程创建后没有调用 start()
方法。
- 就绪(Runnable)
线程调用 start()
方法后处于就绪状态,当前状态的线程可能正在运行,也可能等待 CPU 分配资源。
- 阻塞(Blocked)
阻塞状态,线程因为某种原因放弃 CPU 使用权,暂时停止运行。直到线程进入就绪状态,才会有机会转入运行状态。2个线程竞争 synchronized
关键字描述的代码块执行机会,如果线程1成功获取到锁,线程2就会进入 blocked 状态。
- 无限期等待(Waiting)
只能等待其他线程显示调用 Object.notify()
或者 Object.notifyAll()
方法,或者被调用的线程执行完毕。
- 限期等待(Timed Waiting)
无需等待其他线程显示唤醒,在一定时间后会被系统自动唤醒。
阻塞 blocked 和等待 waiting/timed waiting 的区别是,阻塞是被动的,获取排他锁失败的时候会进入阻塞状态,而等待是主动的,可以通过调用 Thread.sleep()
和 Object.wait()
主动进入等待状态。
- 结束(Terminated)
线程执行完毕或者是执行异常退出 run()
方法,当前线程结束生命周期。
# 有几种创建线程的方式?
- 继承
Thread
类
public class ThreadTest extends Thread {
@Override
public void run() {
System.out.println("I'm a test thread.");
}
public static void main(String[] args) throws InterruptedException {
// 创建线程
ThreadTest threadTest = new ThreadTest();
// 启动线程
threadTest.start();
System.out.println("main 线程结束");
}
}
执行 main()
方法输出:
main 线程结束
I'm a test thread.
- 实现 Runnable 接口,重写 run 方法
public class RunnableTest implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " execute");
}
public static void main(String[] args) {
Thread test = new Thread(new RunnableTest());
test.start();
System.out.println("main thread execute");
}
}
执行 main()
方法,输出:
main thread execute
Thread-0 execute
- 实现
Callable
接口,覆写run()
方法
public class CallableTest implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName() + " execute");
return 2;
}
public static void main(String[] args) {
FutureTask<Integer> futureTask = new FutureTask<>(new CallableTest());
Thread thread = new Thread(futureTask);
thread.start();
System.out.println("main thread execute");
}
}
执行 main()
方法,输出:
main thread execute
Thread-0 execute
# 用户线程和守护线程的区别是什么?
Java 中分为两种线程:用户线程(user)和守护线程(daemon)。
可以通过 thread.setDaemon(true)
来设置守护线程。
最后一个非守护线程结束 JVM 才会退出,守护线程不影响 JVM 退出。
# 线程的中断有哪些方式?
interrupt()
方法
设置线程中断标志后线程并不会直接终止,被中断的线程会根据自身状态自行处理。
interrupt()
方法,中断线程,set 操作。设置中断标志为 true,此时线程不会真正退出,如果当前线程处于阻塞状态,再被调用 interrupt()
方法后,线程会抛出 InterruptedException
异常然后终止。
Thread
类中还有一个 isInterrupted()
方法,判断当前线程是否被中断,get 操作。
public class ThreadInterruptTest {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread() {
@Override
public void run() {
while(!Thread.currentThread().isInterrupted()) {
System.out.println(Thread.currentThread() + " hello");
}
}
};
thread.start();
// 主线程休眠1s
Thread.sleep(1000);
// 子线程设置中断标志
System.out.println("main thread interrupt thread");
thread.interrupt();
// 等待子线程执行结束
thread.join();
System.out.println("main is over");
}
}
interrupted()
方法
interrupted()
方法,静态方法,检测当前线程是否被中断,内部调用当前线程的 isInterrupted()
方法。如果发现当前线程中断标志为 true,就会清楚中断标志。属于 getandset 操作。
- 自定义中断标识符
定义一个使用 volatile
修饰的静态变量作为中断标识符,通过标识符的值来决定是否要中断线程,这种方法不会很及时。
public class InterruptFlagTest {
private static volatile boolean isInterrupt = false;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread() {
@Override
public void run() {
while (!isInterrupt) {
System.out.println(Thread.currentThread().getName() + " start run");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " end run");
}
}
};
thread.start();
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " set isInterrupt flag=true");
isInterrupt = true;
}
}
输出
Thread-0 start run
Thread-0 end run
Thread-0 start run
main set isInterrupt flag=true
Thread-0 end run
# 线程池有哪几种状态?
TODO 待完成
running,shutdown,stop,Tidying,TERMINATED。
# 线程之间有哪些协作方式?
wait()
和notify()
/notifyAll()
方法
wait()
和 notify()
/ notifyAll()
方法用于指定 object 的锁,而不是自己的。这些方法用于实现等待和通知机制,通过调用 wait()
方法,一个线程可以主动释放对象的锁并且进入等待状态,直到其他线程调用 notify()
来唤醒它。
join()
方法
用于等待一个线程的结束,在线程 A 内调用线程 B 的 join()
方法,线程 A 会进入阻塞状态,直到线程 B 的执行完毕,线程 A 才会继续执行。
/**
* join() 示例,在线程 t2 中调用线程 t1.join(),t2 阻塞,直到 t1 执行完成。
*/
public class JointTest {
private static class Thread1 extends Thread {
@Override
public void run() {
System.out.println("thread1 run.");
}
}
private static class Thread2 extends Thread {
private final Thread1 thread1;
Thread2(Thread1 thread1) {
this.thread1 = thread1;
}
@Override
public void run() {
try {
// t2 等待 t1 执行结束
thread1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread2 run.");
}
}
public static void main(String[] args) {
Thread1 t1 = new Thread1();
Thread2 t2 = new Thread2(t1);
t2.start();
t1.start();
}
}
sleep()
方法
sleep()
方法会使线程暂停执行一段时间。线程在调用 sleep()
方法会进入阻塞状态,并且不会释放锁,常用于模拟一段时间间隔或定时任务。
# sleep() 方法和 wait() 方法的区别?
wait()
是Object
类的方法,sleep()
是Thread
类的静态本地方法wait()
释放锁,sleep()
没有释放锁wait()
配合notify()
或者notifyAll()
用于线程通信,sleep()
用于暂停执行
# 调用 Thread 的 run() 和 start() 方法有什么区别?
run()
方法定义了线程的执行逻辑,并在当前线程的上下文中执行,不会创建新的线程。当通过调用Thread
对象的 run()
方法时,实际上是在当前线程中按顺序执行 run()
方法的代码。这相当于普通的方法调用,没有并发执行的效果。
start()
方法用于启动一个新线程,并执行其中的线程代码。当通过调用 Thread
对象的start()
方法时,会创建一个新的线程,并在新线程的上下文中执行 run()
方法的代码。start()
方法会触发线程调度,使得新线程可以与其他线程并发执行。
# 手写一个生产者和消费者案例?
生产者:
public class Producer implements Runnable {
private LinkedList<Integer> buffer;
private int capacity;
private int num;
public Producer(LinkedList<Integer> buffer, int capacity, int num) {
this.buffer = buffer;
this.capacity = capacity;
this.num = num;
}
@Override
public void run() {
for (int i = 0; i < num; i++) {
try {
synchronized (buffer) {
if (buffer.size() == capacity) {
buffer.wait();
}
int value = i + 1;
buffer.addLast(value);
System.out.println("生产者生产数据:" + value);
buffer.notifyAll();
}
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费者:
public class Consumer implements Runnable {
private LinkedList<Integer> buffer;
private int capacity;
public Consumer(LinkedList<Integer> buffer, int capacity) {
this.buffer = buffer;
this.capacity = capacity;
}
@Override
public void run() {
while (true) {
try {
synchronized (buffer) {
while (buffer.size() == 0) {
buffer.wait();
}
int value = buffer.removeFirst();
System.out.println("消费者消费数据:" + value);
buffer.notifyAll();
}
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
启动类:
public class MainDemo {
public static void main(String[] args) {
LinkedList<Integer> buffer = new LinkedList<>();
int capacity = 10;
Producer producer = new Producer(buffer, capacity, 20);
Consumer consumer = new Consumer(buffer, capacity);
Thread t1 = new Thread(producer);
Thread t2 = new Thread(consumer);
t1.start();
t2.start();
}
}
输出:
生产者生产数据:1
消费者消费数据:1
生产者生产数据:2
消费者消费数据:2
生产者生产数据:3
消费者消费数据:3
生产者生产数据:4
消费者消费数据:4
生产者生产数据:5
消费者消费数据:5
生产者生产数据:6
消费者消费数据:6
生产者生产数据:7
消费者消费数据:7
生产者生产数据:8
消费者消费数据:8
生产者生产数据:9
消费者消费数据:9
...
# 3.2 锁
# 什么是死锁?
程序在执行过程中,两个或多个线程因争夺资源而造成相互等待,程序无法执行下去的一种状态称为死锁。
线程1占有资源 B 的同时还想要占有资源 A,线程2占有资源 A 的同时还想要占有资源 B,这种情况就是死锁。
# 产生死锁的四个必要条件是什么?
- 互斥:资源 A 在同一时刻只能被一个线程占用
- 占有且等待:线程1占有资源 A 的同时又对资源 B 提出请求
- 不可抢占:资源 A 一旦被线程1占有,其他线程不能抢占资源 A
- 环路等待:线程1持有资源 A 在等待资源 B,线程2持有资源 B 在等待资源 A
# 自己手写一个死锁?
public class DeadLockDemo {
private static Object lock1 = new Object();
private static Object lock2 = new Object();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized(lock1) {
System.out.println("t1 get lock1.");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t1 wait to get lock2.");
synchronized (lock2) {
System.out.println("t1 get lock2.");
}
}
});
Thread t2 = new Thread(() -> {
synchronized (lock2) {
System.out.println("t2 get lock2.");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t2 wait to get lock1.");
synchronized (lock1) {
System.out.println("t2 get lock1.");
}
}
});
t1.start();
t2.start();
}
}
输出
t1 get lock1.
t2 get lock2.
t1 wait to get lock2.
t2 wait to get lock1.
可以看出线程 t1 拿到资源 lock1 正在等待 lock2 资源,线程 t2 拿到资源 lock2 正在等待 lock1 资源,程序出现死锁状态。
# 锁池和等待池的区别?
锁池(Lock Pool),所有需要竞争同步锁的线程都会放到锁池中,锁池由操作系统管理,当一个线程想要获取一个被其他线程占用的锁时,该线程会被放到锁池中,一旦同步锁被释放,操作系统会从锁池中选取一个或者多个线程,这些线程会进入就绪(Runnable)状态,准备竞争同步锁的获取权。
等待池(Wait Pool),是由 Java 虚拟机管理的一组线程,当一个线程调用 wait()
方法,该线程将被放置到等待池中,并且释放锁。在等待池中的线程只有当其他线程调用 notify()
或 notifyAll()
方法才会被唤醒,等待池中的线程被唤醒后,就会被放置到锁池中,准备重新竞争锁的获取权。
# 悲观锁和乐观锁的区别?
悲观锁,假设多个事务同时访问共享资源时会发生冲突,因此在访问共享资源时将其锁住,同一时刻只能一个事务访问共享资源。比如上卫生间的例子,每个人就是一个独立的事务,每个隔间都有一把锁,一个人上锁的时候,别的用户无法访问。
常见的悲观锁有 synchronized
关键字,ReentrantLock
类以及数据库锁。
乐观锁,假设多个事务访问共享资源时不会发生冲突,因此在访问共享资源时不会上锁。
乐观锁的实现一般使用版本号机制和 CAS 算法实现。
版本号机制实现方式,以 MySQL 举例,数据库表通常添加版本字段,或者状态字段,更新数据时使用版本进行判断。
例:id=1
的账户余额设置为1000,$version
代表上次从数据库中读取的变量。
update balance set amount = 1000, version = $version + 1 where id = 1 and version = $version;
CAS 全称为 Compare And Swap,属于原子操作,是一种进行多线程安全的比较和变更操作的算法,用预期值和要更新的变量值比较,一致才会更新为新值。
Java 中 CAS 具体实现在 Unsafe
类中的 CompareAndSwapXxx()
方法。
// java 源代码
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
# CAS 有哪些问题?
- ABA 问题
- 线程A读取共享变量的值为A。
- 在线程A读取共享变量的值后,线程B将共享变量的值改为B,然后又改回A,形成了一个“ABA”序列。
- 线程A再次使用CAS操作将共享变量的值由A改为C,由于共享变量的值在CAS操作之前和之后都是A,所以线程A并不知道共享变量的值被其他线程改变过。
- 循环时间长开销大,如果 CAS 失败,自旋会服务器带来压力
- 只能保证对一个变量的原子性操作,无法保证像
i++
这样操作的原子性
# 公平锁和非公平锁的区别?
公平锁,先到先得,多个线程按照排队的顺序来获取锁,ReentrantLock
类可以创建公平锁:
Lock lock = new ReentrantLock(Boolean.TRUE);
非公平锁,多个线程不会根据先到先得的顺序获取锁,并发场景下会造成饥饿状态,有的线程有可能一直无法获取锁。常见的非公平锁有 synchronized
关键字和 new ReentrantLock()
。
# 独占锁和共享锁的区别?
独占锁,也称为排他锁,一个线程对共享数据加上独占锁后,其他线程不能对该共享数据再加任何类型的锁,获得独占锁的线程拥有读写共享数据的权利。常用的独占锁有 synchronized
关键字和 Lock
类。
共享锁,可以被多个线程持有,一个线程对共享数据加上共享锁后,其他线程只能对共享数据再加共享锁,不能加独占锁,获得共享锁的线程只能读共享数据,不能执行写操作。常用的共享锁有 ReentrantReadWriteLock
。
# ReentrantLock 是什么?
ReentrantLock
是 Lock
的一个实现类,是 juc(java.util.concurrent)包中的锁。
看一个 ReentrantLock
的简单使用:
public class LockExample {
private Lock lock = new ReentrantLock();
public void func() {
lock.lock();
try {
for (int i = 0; i < 5; i++) {
System.out.println(i + " ");
}
} finally {
// 释放锁
lock.unlock();
}
}
public static void main(String[] args) {
LockExample lockExample = new LockExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> lockExample.func());
executorService.execute(() -> lockExample.func());
}
}
输出:
0
1
2
3
4
0
1
2
3
4
# ReentrantLock 和 synchronized 的区别是什么?
synchronized
是 JVM 实现的,ReentrantLock
是 JDK 实现的。- 新版本对
synchronized
做了很多性能优化,比如有自旋锁,性能方面两者大致相同。 ReentrantLock
可以绑定多个Condition
条件。synchronized
是非公平的,ReentrantLock
默认也是非公平的,可以通过构造参数设置为公平锁。- 持有锁的线程长期不放弃的锁的时候,
ReentrantLock
可以中断线程放弃等待,synchronized
不行。
# 简单说下你对 AQS 的理解?
AQS(AbstractQueuedSynchronizer)是 JUC 包中的一个抽象类,是锁和同步器的一些通用实现,有很多锁和同步器都是使用 AQS 实现的,比如有 ReentrantLock
(可重入锁)、ReentrantReadWriteLock
(可重入读写锁)、Sempaphore
(信号量)和 CountDownLatch
等。
AQS 的底层其实是使用 CLH(Craig, Landin, and Hagersten)队列锁,这是一种自旋锁的变体,通过 FIFO 队列和自旋锁来管理等待线程。当线程尝试获取锁时,如果发现当前共享资源被占用,则会将当前线程以节点的形式添加到队列尾部,并进行自旋尝试获取锁资源。每个 CLH 队列节点保存着线程的引用 thread、当前节点的状态 waitStatus、前驱节点 prev、后继节点 next。
AQS 的资源共享模式分为2种:独占模式和共享模式。
独占模式是指同一时刻只允许有一个线程可以获取到锁,典型的类有 ReentrantLock
。
共享模式是值同一时刻允许多个线程同时获取锁,常见的实现有 Semaphore
、CountDownLatch
和 CyclicBarrier
等。
ReentrantLock
使用 AQS 的大致流程:内部有一个 state
字段,初始化为0,表示未锁定状态。线程 t1 调用 lock()
后,内部调用 tryAcquire()
会使用 CAS 将 state
加1,其他线程调用 tryAcquire()
直接返回 false,直到调用 unlock()
将 state
降到0,即释放资源,其他线程才有机会获取到锁。
在线程 t1 释放锁前,线程 t1 可以重复获取锁,state
字段会累加,这就是可重入概念。state
获取(累加)多少次就要释放(减少)多少次,最终保证 state
为0,这样其他线程才可以重新获取锁。
下面是 ReentrantLock
的 tryAcquire()
源代码:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 状态+1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
参考:
- https://www.cnblogs.com/waterystone/p/4920797.html
- https://mp.weixin.qq.com/s/jEx-4XhNGOFdCo4Nou5tqg
# 3.3 Atomic
TODO Atuomic
# 3.4 阻塞队列
TODO 阻塞队列
# 3.5 工具类和容器类
TODO 工具类和容器类
# 3.6 Executor
TODO Executor
# 3.7 Fork&Join 框架
TODO Fork and Join
# 3.8 线程池
# 线程池的参数有哪些?
这个问题问的是线程池创建类 ThreadPoolExecutor
创建线程池的构造器中各个参数的含义。
核心参数有7个,对应源代码为:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 省略具体实现代码
}
- int corePoolSize:线程池核心线程数。
该参数用于设置线程池维护的最小核心线程数,即便这些线程处于空闲状态也不会被销毁。除非设置了 allowCoreThreadTimeOut。一般将 corePoolSize
设置为 CPU 数量的1到2倍,最大线程数设置为 CPU 的2到4倍即可。
- int maximumPoolSize:最大线程数。
一个任务创建由线程池中的空闲线程执行,如果没有空闲线程,就将任务放到工作队列缓存,工作队列满了才会创建新线程。之后从工作队列头部取出任务,交给新线程处理,新线程不会无限制创建,maximumPoolSize
就是指定创建新线程的最大的数量。
- long keepAliveTime:空闲线程存活时长。
大于 corePoolSize
的线程如果处于空闲状态,经过一段时间后将被销毁,这个参数就是设置的空闲线程的存活时间。
- TimeUnit:空闲线程存活时间单位。
- BlockingQueue:任务队列。
线程池存放任务的队列,用于存储线程池中待执行的任务。当线程池中的线程数达到核心线程数 corePoolSize
时,剩余任务将放到任务队列中等待执行。
- ThreadFactory:线程工厂。
用于创建新的线程、可以定制线程名字、线程组和线程优先级等。一般使用默认的线程工厂即可。
- RejectedExecutionHandler:拒绝策略,当线程队列满了并且工作线程大于等于线程池的最大数量
maximumPoolSize
时如果拒绝请求执行的策略。
- RejectedExecutionHandler:拒绝策略,当线程队列满了并且工作线程大于等于线程池的最大数量
常用的策略有:
- AbortPolicy:直接抛出异常处理,为默认策略;
- DiscardPolicy:直接抛弃任务不处理;
- DiscardOldestPolicy:丢弃队列中最老的任务;
- CallerRunsPolicy:将任务分配给当前执行 execute 方法线程处理。
整个线程池的执行流程是这样:
- (1)线程池中线程数量小于核心线程数
corePoolSize
时,新任务进来将创建一个新的线程进行处理,不论此时线程池中是否存在空闲线程; - (2)线程池中的线程数大于
corePoolSize
,新任务将放置到workQueue
任务队列中,等待线程池中任务调度执行; - (3)工作队列
workQueue
已满,且最大线程数maximumPoolSzie
大于核心线程数corePoolSzie
时,新任务会创建新的线程执行; - (4)工作队列
workQueue
已满,且任务提交数大于maximumPoolSize
时,任务交由RejectedExecutionHandler
执行具体的拒绝策略; - (5)当线程池中的线程数量大于
corePoolSize
时,并且空闲时间大于keepAliveTime
,回收当前线程; - (6)如果参数
allowCoreThreadTimeOut
为true
,线程池中小于corePoolSize
的线程池空闲并且达到空闲时间keepAliveTime
,这些核心线程也将被回收。
放一张执行流程图供参考:
# 你在项目中是如何使用线程池的?
直接写一个使用线程池的 demo:
定义一个配置类,用于创建 ThreadPoolExecutor
并交给 Spring 管理。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Configuration
public class ThreadPoolConfig {
@Bean("threadPoolExecutor")
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties properties) {
return new ThreadPoolExecutor(properties.getCorePoolSize(),
properties.getMaximumPoolSize(),
properties.getKeepAliveTime(),
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(properties.getQueueSize()),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
}
}
然后定义一个配置类 ThreadPoolConfigProperties
映射到 application.yml
配置文件:
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@ConfigurationProperties("thread.pool")
@Component
public class ThreadPoolConfigProperties {
/**
* 核心线程数
*/
private Integer corePoolSize;
/**
* 最大线程数
*/
private Integer maximumPoolSize;
/**
* 队列长度
*/
private Integer queueSize;
/**
* 空闲线程存活时间,毫秒
*/
private Integer keepAliveTime;
// 省略 getter/setter
}
记得在 pom.xml 文件中映入配置相关的包依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
在 application.properties
中定义线程池参数:
server.port=8899
thread.pool.corePoolSize=5
thread.pool.maximumPoolSize=10
thread.pool.queueSize=1000
thread.pool.keepAliveTime=5000
定义一个 controller 类,使用创建好的线程池:
@SpringBootApplication
@RestController
public class JavaJucDemoApplication {
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
public static void main(String[] args) {
SpringApplication.run(JavaJucDemoApplication.class, args);
}
@GetMapping("/thread")
public void thread() {
for (int i = 0; i < 20; i++) {
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + ":执行任务");
});
}
}
}
项目启动后,访问 http://localhost:8899/thread,查看输出:
pool-1-thread-1:执行任务
pool-1-thread-2:执行任务
pool-1-thread-3:执行任务
pool-1-thread-4:执行任务
pool-1-thread-5:执行任务
pool-1-thread-1:执行任务
pool-1-thread-3:执行任务
pool-1-thread-2:执行任务
pool-1-thread-5:执行任务
pool-1-thread-5:执行任务
pool-1-thread-3:执行任务
pool-1-thread-3:执行任务
pool-1-thread-3:执行任务
pool-1-thread-3:执行任务
pool-1-thread-3:执行任务
pool-1-thread-3:执行任务
pool-1-thread-1:执行任务
pool-1-thread-4:执行任务
pool-1-thread-5:执行任务
pool-1-thread-2:执行任务
Disconnected from the target VM, address: '127.0.0.1:56391', transport: 'socket'
Process finished with exit code 130 (interrupted by signal 2: SIGINT)