Java.Thread03

线程协作

生产者消费者问题

  • 假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者从仓库取走产品
  • 若仓库中没有产品,则生产者放入产品,否则一直等待,直到消费者取走产品
  • 若仓库中有产品,则消费者取走产品,否则一直等待,知道生产者放入产品

生产者消费者问题为线程同步问题,两者共享一个资源,相互依赖,互为条件:

  • 生产者未放入之前,通知消费者等待;放入后,提醒消费者取走
  • 消费者取走产品,通知生产者继续生产
  • 生产者消费者问题中,不能通过简单的synchronized实现
  • synchronized能阻止并发更新同一个资源,实现同步
  • synchronized不能用于实现线程间通信

线程通信

Java提供了一些解决线程通信的方法

方法说明
wait()线程一直等待,直到收到其他线程的通知,期间会释放锁
wait(long timeout)线程等待指定的毫秒数
notify()唤醒一个处于等待的线程
notifyAll()唤醒同一个对象上所有等待的线程,调度优先级高的线程

这些方法都是Object类的方法,只能在同步方法或同步代码块中使用,否则会抛出IllegalMonitorStateException异常

管程法

解决并发协作模型“生产者消费者问题”的方法之一。
实现管程法需要的对象有:

  • 生产者:负责生产数据的模块(可能是方法、对象、线程、进程)
  • 消费者:负责消耗数据的模块(可能是方法、对象、线程、进程)
  • 缓冲区:存放生产者生产的数据,等待消费者消耗数据;消费者不能直接使用生产者的数据
  • 产品:缓冲区中存放的数据

具体实现:

// 管程法实现生产者消费者问题
// 对象:产品、生产者、消费者、缓冲区
public class Monitors {
    public static void main(String[] args){
        Buffer buffer = new Buffer();
        new Producer(buffer).start();
        new Consumer(buffer).start();
    }
}

// 产品
class Production extends Thread{
    int id;

    public Production(int id) {
        this.id = id;
    }
}

// 生产者
class Producer extends Thread{
    Buffer buffer;
    public Producer(Buffer buffer) {
        this.buffer = buffer;
    }

    // 生产
    @Override
    public void run() {
        for (int i = 0; i < 30; i++) {
            buffer.push(new Production(i));
            System.out.println("生产第" + i + "个产品");
        }
    }
}

// 消费者
class Consumer extends Thread{
    Buffer buffer;

    public Consumer(Buffer buffer) {
        this.buffer = buffer;
    }

    // 消费
    @Override
    public void run() {
        for (int i = 0; i < 30; i++) {
            System.out.println("消耗了id为" + buffer.pop().id + "的产品");
        }
    }
}

// 缓冲区
class Buffer extends Thread{
    // 设置容器大小
    Production[] productions = new Production[10];
    // 容器内产品数量
    static int count = 0;

    // 生产者放入产品
    public synchronized void push(Production production) {
        // 容器满时,生产者等待
        if (count == productions.length) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        productions[count] = production;
        count++;

        // 放入产品后唤醒消费者
        this.notifyAll();
    }

    // 消费者消耗产品
    public synchronized Production pop() {
        // 容器为空,消费者等待
        if (count == 0){
            try {
                wait();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }

        count--;
        Production production = productions[count];
        // 取走产品后,唤醒消费者
        this.notifyAll();

        return production;
    }
}

信号灯法

解决并发协作模型“生产者消费者问题”的方法之一。
设置flag标志位,用于判断线程数据状态。
信号灯法缺点在于没有缓冲区存储大量的共享数据。
实现信号灯法需要的对象有:

  • 生产者:负责生产数据的模块(可能是方法、对象、线程、进程)
  • 消费者:负责消耗数据的模块(可能是方法、对象、线程、进程)
  • 产品:两者互斥访问的对象,包含一个flag标志位,用于控制两者的交替访问。

具体实现:

// 信号灯法实现生产者消费者问题
public class Flag {
    public static void main(String[] args) {
        TV tv = new TV();
        new Player(tv).start();
        new Watcher(tv).start();
    }
}

// 生产者
class Player extends Thread{
    TV tv;

    public Player(TV tv) {
        this.tv = tv;
    }

    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            if (i % 2 == 0) {
                tv.act("Peaky Blinder");
            } else {
                tv.act("Ads...");
            }
        }
    }
}

// 消费者
class Watcher extends Thread{
    TV tv;

    public Watcher(TV tv) {
        this.tv = tv;
    }

    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            tv.watch();
        }
    }
}

// 产品
class TV {
    static String movie;
    Boolean flag = false;

    // 生产
    public synchronized void act(String movieName) {
        // flag为真,播放器等待;flag为假,播放器生产
        if (flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        movie = movieName;
        System.out.println("正在播放 " + movieName);
        flag = true;
        this.notifyAll();
    }

    // 消费
    public synchronized void watch() {
        // flag为假,观众等待;flag为真,观众消费
        if (!flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("观看了 " + movie);
        flag = false;
        this.notifyAll();
    }
}

线程池

背景:经常创建和销毁线程,会消耗大量资源,如并发的线程对性能影响较大。

线程池:提前创建好多个线程,放入线程池中,使用时直接获取,用完后再放回池中。

线程池的优点:

  • 提高了响应速度(减少了创建新线程的时间)
  • 降低资源消耗(重复利用线程池中的线程,减少了创建销毁)
  • 便于线程管理
  • corePoolSize:核心池的大小
  • maximumPoolSize:最大线程数
  • keepAliveTime:线程没有任务时,最长保持时间,过后会终止

实现线程池

JDK5.0起,提供了线程池相关的接口:ExecutorServiceExecutor,以及提供工厂方法用来创建不同类型的线程池的工具类:Executors

Executor框架的成员及其关系可以用一下的关系图表示:

对比ExecutorServiceExecutor

  1. ExecutorService接口继承了Executor 接口,是Executor的子接口。
  2. Executor接口中定义了execute()方法,用来接收一个Runnable接口的对象,而ExecutorService接口中定义的submit()方法可以接收Runnable和Callable接口对象。
  3. Executor接口中execute()方法不返回任何结果,而ExecutorService接口中submit()方法可以通过一个 Future 对象返回运算结果。
  4. Executor和ExecutorService除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。比如:调用 shutDown() 方法终止线程池。

ExecutorService接口

实现线程池的接口,常用方法有:

  • void execute(Runnable command):执行任务/命令,无返回值,一般用于运行Runnable线程
  • <T> Future<T> submimt(Callable<T> Task):执行任务,有返回值,一般用于执行Callable接口
  • void shutdown():用于关闭线程池

Executors 类

提供工厂方法用来创建不同类型的线程池,返回的线程池类型都是ExecutorService。比如:

  • Executors.newSingleThreadExecutor() 创建一个只有一个线程的线程池
  • Executors.newFixedThreadPool(int numOfThreads)来创建固定线程数的线程池,
  • Executors.newCachedThreadPool()创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  • Executors.newScheduledThreadPool(int corePoolSize) 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。

线程总结

三种创建线程的方式:继承Thread类、继承Runnable接口、继承Callable接口

运行Thread线程:

// 创建线程对象
Thread01 t1 = new Thread01();
// 调用start()方法,开启线程
t1.start();

运行Runnable线程有两种方法:

  1. 通过Thread对象代理启动
// 实例化Runnable接口的实现类
Runnable01 thread1 = new Runnable01(1);
// 创建线程对象,通过线程对象启动线程(代理)
new Thread(thread1, "1").start();
  1. 通过FutureTask类,FutureTask类继承了Runnable接口,可以通过Thread启动
Runnable runnable = ()-> System.out.println(1234);
// Runnable线程通过FutureTask创建时必须提供返回值
FutureTask<Integer> futureTask = new FutureTask<Integer>(runnable, 123);
new Thread(futureTask).start();
// 通过FutureTask类具有返回值
System.out.println(futureTask.get());

运行Callable线程有两种方法:

  1. 通过ExecutorService接口
// 创建执行服务
ExecutorService ser = Executors.newFixedThreadPool(3);
// 提交执行
Future<Boolean> result1 = ser.submit(c1);
// 获取结果
Boolean r1 = result1.get();
// 关闭服务
ser.shutdownNow();
  1. 通过FutureTask类,FutureTask类继承了Runnable接口,可以通过Thread启动
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(c1);
new Thread(futureTask).start();
// 通过FutureTask类创建的线程具有返回值
System.out.println(futureTask.get());
tag(s):
show comments · back · home