https://kaiwu.lagou.com/course/courseInfo.htm?courseId=1356&sid=20-h5Url-0&lgec_type=website&lgec_sign=56C78768E1A134C49750D2797FB14316&buyFrom=1&pageId=1pz4#/detail/pc?id=8998

大家好,我是山海。

“双11”到了,很多技术同学还在加班加点备战购物高峰。既然说到这里,本期内容我就来跟大家聊一聊与“双11”相关性最强的并发型模式和结构模式,同时,它们也是你在工作和面试中最常涉及到的设计模式。

生产者与消费者模式

如果你是 5 年左右的开发,也许你已经发现,在面试时并发型模式基本是必考题目。这是为什么呢?因为候选人对这种模式的理解和应用能很好得体现出他的并发技术功底。而且在实际工作中,只要你熟知并发模式,就不会闹出低级的并发问题。

所以,无论是为了面试,还是为了实际工作,掌握并发型模式都是非常有必要的。而生产者与消费者模式是对并发场景最基本的抽象,可以理解为是一种最常见的并发场景。

我们可以认为,它主要应用场景是,在容器固定的情况下,有多个生产的线程与多个消费的线程,同时往容器里放物品与获取容器内的物品。这时候,如果并发操作写得不够严谨,就会导致容器里放了超过容器最大容量的物品,这明显是不正确的。或者可能导致明明容器中已经没有物品了,还能进行消费,这明显也是有问题的。

举个例子,很多同学都参加过秒杀活动,比如我们定 100 个产品参与秒杀,但如果代码写的不足够严谨,就可能有超过 100 个用户秒杀到产品,那就会给公司带来损失,或者导致有一些用户得不到秒杀的产品,这样的程序明显是有问题的。

那我们先模拟出这个有问题的程序,然后再给针对性地给出 3 种解决方案,来解决这个问题。

public class Storage {
    // 最大存储量,由于不同商品,可能数量不同,所以,可以传入进来
    private int size;
    // 存储商品的的容器
    private LinkedList<Object> list = new LinkedList<Object>();
    //初始化容器时,需要初始化大小
    public Storage(int size) {
        this.size = size;
    }
    // 生产n个产品
    public void produce(int n) {
        for (int i = 1; i <= n; ++i) {
            if (list.size() > size) {
                System.out.println("容器满了,不可以再放入了,此时物品数量为:" + list.size());
                return;
            }
            list.add(new Object());
        }
        System.out.println("生产" + n + "个产品,当前容器内的数量为:" + list.size());
    }
    // 消费n个产品
    public void consume(int n) {
        for (int i = 1; i <= n; ++i) {
            if (list.size() == 0) {
                System.out.println("容器空了,不能再消费了,此时容器内的数量为:" + list.size());
                return;
            }
            list.remove();
        }
        System.out.println("消费" + n + "个产品,当前容器内的数量为:" + list.size());
    }
}

这里,我们可以先把消费者与生产者也定义出来,如下:

/**
 * 消费者
 */
class Consumer extends Thread {
    // 每次消费的数量
    private int n;
    // 所在放置的容器
    private Storage storage;
    // 构造函数,设置仓库
    public Consumer(int n, Storage storage) {
        this.n = n;
        this.storage = storage;
    }
    // 新线程,来消费
    public void run() {
        storage.consume(n);
    }
}
/**
 * 生产者
 */
class Producer extends Thread {
    // 每次生产的产品数量
    private int n;
    // 所在放置的仓库
    private Storage storage;
    // 构造函数,设置仓库
    public Producer(int n, Storage storage) {
        this.n = n;
        this.storage = storage;
    }
    // 线程run函数
    public void run() {
        storage.produce(n);
    }
}

下面,我们写一个多线程的测试程序,你也可以复制粘贴到自己的编译器里实际跑一下。

class Test {
    public static void main(String[] args) {
        // 创建容器对象
        Storage storage = new Storage(100);
        // 生产者对象
        Producer p1 = new Producer(10, storage);
        Producer p2 = new Producer(20, storage);
        Producer p3 = new Producer(30, storage);
        Producer p4 = new Producer(20, storage);
        Producer p5 = new Producer(20, storage);
        Producer p6 = new Producer(20, storage);
        Producer p7 = new Producer(10, storage);
        // 消费者对象
        Consumer c1 = new Consumer(10, storage);
        Consumer c2 = new Consumer(10, storage);
        Consumer c3 = new Consumer(30, storage);
        // 线程开始执行
        c1.start();
        c2.start();
        c3.start();
        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        p6.start();
        p7.start();
    }
}

多执行几次代码你就会发现,由于多线程的存在,有时候容器的数量会多于 n 个,如果我们在测试程序里面多设置了消费者的话,就可能导致报错。是因为可能容器里面已经没有产品了,但还有线程去取产品。

那这个程序如何变成线程安全的呢?方案有很多,我建议你现在先停下来,不去看后面的代码,自己来改写一下,看看可以改出多少种方案。

下面,来跟我一起把 3 种方案逐一实现出来。

sychorinzed 实现方案

public class Storage {
    // 最大存储量,由于不同商品,可能数量不同,所以,可以传入进来
    private int size;
    // 存储商品的的容器
    private LinkedList<Object> list = new LinkedList<Object>();
    //初始化容器时,需要初始化大小
    public Storage(int size) {
        this.size = size;
    }
    // 生产n个产品
    public void produce(int n) {
        //加锁,这里一定要注意,所有的生产者、消费者一定要锁同一个对象
        synchronized (list) {
            //注意,这里一定要用 while,因为 wait可能存在被错误唤醒的时候,一定醒了要重新判断条件
            while (list.size() + n > size) {
                System.out.println("容器满了,不可以再放入了,此时物品数量为:" + list.size());
                try {
                    //满了就等一下
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            for (int i = 1; i <= n; ++i) {
                list.add(new Object());
            }
        }
        //生产完成,叫醒其他生产者和消费者
        list.notifyAll();
        System.out.println("生产" + n + "个产品,当前容器内的数量为:" + list.size());
    }
    // 消费n个产品
    public void consume(int n) {
        //加锁,这里一定要注意,所有的生产者、消费者一定要锁同一个对象
        synchronized (list) {
            //注意,这里一定要用 while,因为 wait可能存在被错误唤醒的时候,一定醒了要重新判断条件
            while (list.size() - n < 0) {
                System.out.println("容器空了,不能再消费了,此时容器内的数量为:" + list.size());
                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            for (int i = 1; i <= n; ++i) {
                list.remove();
            }
        }
        //生产完成,叫醒其他生产者和消费者
        list.notifyAll();
        System.out.println("消费" + n + "个产品,当前容器内的数量为:" + list.size());
    }
}