线程协作
生产者消费者问题
- 假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者从仓库取走产品
- 若仓库中没有产品,则生产者放入产品,否则一直等待,直到消费者取走产品
- 若仓库中有产品,则消费者取走产品,否则一直等待,知道生产者放入产品
生产者消费者问题为线程同步问题,两者共享一个资源,相互依赖,互为条件:
- 生产者未放入之前,通知消费者等待;放入后,提醒消费者取走
- 消费者取走产品,通知生产者继续生产
- 生产者消费者问题中,不能通过简单的synchronized实现
- synchronized能阻止并发更新同一个资源,实现同步
- synchronized不能用于实现线程间通信
线程通信
Java提供了一些解决线程通信的方法
方法 | 说明 |
---|---|
wait() | 线程一直等待,直到收到其他线程的通知,期间会释放锁 |
wait(long timeout) | 线程等待指定的毫秒数 |
notify() | 唤醒一个处于等待的线程 |
notifyAll() | 唤醒同一个对象上所有等待的线程,调度优先级高的线程 |
这些方法都是Object类的方法,只能在同步方法或同步代码块中使用,否则会抛出IllegalMonitorStateException异常
管程法
解决并发协作模型“生产者消费者问题”的方法之一。
实现管程法需要的对象有:
- 生产者:负责生产数据的模块(可能是方法、对象、线程、进程)
- 消费者:负责消耗数据的模块(可能是方法、对象、线程、进程)
- 缓冲区:存放生产者生产的数据,等待消费者消耗数据;消费者不能直接使用生产者的数据
- 产品:缓冲区中存放的数据
具体实现:
// 管程法实现生产者消费者问题
// 对象:产品、生产者、消费者、缓冲区
public class Monitors {
public static void main(String[] args){
Buffer buffer = new Buffer();
new Producer(buffer).start();
new Consumer(buffer).start();
}
}
// 产品
class Production extends Thread{
int id;
public Production(int id) {
this.id = id;
}
}
// 生产者
class Producer extends Thread{
Buffer buffer;
public Producer(Buffer buffer) {
this.buffer = buffer;
}
// 生产
@Override
public void run() {
for (int i = 0; i < 30; i++) {
buffer.push(new Production(i));
System.out.println("生产第" + i + "个产品");
}
}
}
// 消费者
class Consumer extends Thread{
Buffer buffer;
public Consumer(Buffer buffer) {
this.buffer = buffer;
}
// 消费
@Override
public void run() {
for (int i = 0; i < 30; i++) {
System.out.println("消耗了id为" + buffer.pop().id + "的产品");
}
}
}
// 缓冲区
class Buffer extends Thread{
// 设置容器大小
Production[] productions = new Production[10];
// 容器内产品数量
static int count = 0;
// 生产者放入产品
public synchronized void push(Production production) {
// 容器满时,生产者等待
if (count == productions.length) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
productions[count] = production;
count++;
// 放入产品后唤醒消费者
this.notifyAll();
}
// 消费者消耗产品
public synchronized Production pop() {
// 容器为空,消费者等待
if (count == 0){
try {
wait();
}catch (InterruptedException e){
e.printStackTrace();
}
}
count--;
Production production = productions[count];
// 取走产品后,唤醒消费者
this.notifyAll();
return production;
}
}
信号灯法
解决并发协作模型“生产者消费者问题”的方法之一。
设置flag标志位,用于判断线程数据状态。
信号灯法缺点在于没有缓冲区存储大量的共享数据。
实现信号灯法需要的对象有:
- 生产者:负责生产数据的模块(可能是方法、对象、线程、进程)
- 消费者:负责消耗数据的模块(可能是方法、对象、线程、进程)
- 产品:两者互斥访问的对象,包含一个flag标志位,用于控制两者的交替访问。
具体实现:
// 信号灯法实现生产者消费者问题
public class Flag {
public static void main(String[] args) {
TV tv = new TV();
new Player(tv).start();
new Watcher(tv).start();
}
}
// 生产者
class Player extends Thread{
TV tv;
public Player(TV tv) {
this.tv = tv;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
if (i % 2 == 0) {
tv.act("Peaky Blinder");
} else {
tv.act("Ads...");
}
}
}
}
// 消费者
class Watcher extends Thread{
TV tv;
public Watcher(TV tv) {
this.tv = tv;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
tv.watch();
}
}
}
// 产品
class TV {
static String movie;
Boolean flag = false;
// 生产
public synchronized void act(String movieName) {
// flag为真,播放器等待;flag为假,播放器生产
if (flag) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
movie = movieName;
System.out.println("正在播放 " + movieName);
flag = true;
this.notifyAll();
}
// 消费
public synchronized void watch() {
// flag为假,观众等待;flag为真,观众消费
if (!flag) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("观看了 " + movie);
flag = false;
this.notifyAll();
}
}
线程池
背景:经常创建和销毁线程,会消耗大量资源,如并发的线程对性能影响较大。
线程池:提前创建好多个线程,放入线程池中,使用时直接获取,用完后再放回池中。
线程池的优点:
- 提高了响应速度(减少了创建新线程的时间)
- 降低资源消耗(重复利用线程池中的线程,减少了创建销毁)
- 便于线程管理
- corePoolSize:核心池的大小
- maximumPoolSize:最大线程数
- keepAliveTime:线程没有任务时,最长保持时间,过后会终止
实现线程池
JDK5.0起,提供了线程池相关的接口:ExecutorService
和Executor
,以及提供工厂方法用来创建不同类型的线程池的工具类:Executors
。
Executor框架的成员及其关系可以用一下的关系图表示:
对比ExecutorService
和Executor
:
- ExecutorService接口继承了Executor 接口,是Executor的子接口。
- Executor接口中定义了execute()方法,用来接收一个Runnable接口的对象,而ExecutorService接口中定义的submit()方法可以接收Runnable和Callable接口对象。
- Executor接口中execute()方法不返回任何结果,而ExecutorService接口中submit()方法可以通过一个 Future 对象返回运算结果。
- Executor和ExecutorService除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。比如:调用 shutDown() 方法终止线程池。
ExecutorService接口
实现线程池的接口,常用方法有:
- void execute(Runnable command):执行任务/命令,无返回值,一般用于运行Runnable线程
- <T> Future<T> submimt(Callable<T> Task):执行任务,有返回值,一般用于执行Callable接口
- void shutdown():用于关闭线程池
Executors 类
提供工厂方法用来创建不同类型的线程池,返回的线程池类型都是ExecutorService。比如:
- Executors.newSingleThreadExecutor() 创建一个只有一个线程的线程池
- Executors.newFixedThreadPool(int numOfThreads)来创建固定线程数的线程池,
- Executors.newCachedThreadPool()创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
- Executors.newScheduledThreadPool(int corePoolSize) 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
线程总结
三种创建线程的方式:继承Thread类、继承Runnable接口、继承Callable接口
运行Thread线程:
// 创建线程对象
Thread01 t1 = new Thread01();
// 调用start()方法,开启线程
t1.start();
运行Runnable线程有两种方法:
- 通过Thread对象代理启动
// 实例化Runnable接口的实现类
Runnable01 thread1 = new Runnable01(1);
// 创建线程对象,通过线程对象启动线程(代理)
new Thread(thread1, "1").start();
- 通过FutureTask类,FutureTask类继承了Runnable接口,可以通过Thread启动
Runnable runnable = ()-> System.out.println(1234);
// Runnable线程通过FutureTask创建时必须提供返回值
FutureTask<Integer> futureTask = new FutureTask<Integer>(runnable, 123);
new Thread(futureTask).start();
// 通过FutureTask类具有返回值
System.out.println(futureTask.get());
运行Callable线程有两种方法:
- 通过ExecutorService接口
// 创建执行服务
ExecutorService ser = Executors.newFixedThreadPool(3);
// 提交执行
Future<Boolean> result1 = ser.submit(c1);
// 获取结果
Boolean r1 = result1.get();
// 关闭服务
ser.shutdownNow();
- 通过FutureTask类,FutureTask类继承了Runnable接口,可以通过Thread启动
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(c1);
new Thread(futureTask).start();
// 通过FutureTask类创建的线程具有返回值
System.out.println(futureTask.get());