跳转至

生产者&消费者传统版

```java package com;

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;

/** * 资源类 */ class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition();

  public void increment() throws Exception {
      lock.lock();
      try {
          // 1.判断
          while (number != 0) {
              // 等待不能生产
              condition.await();
          }
          // 2.干活
          System.out.print(Thread.currentThread().getName() + "\t" + number);
          number++;
          System.out.println("\t>\t" + number);
          // 3.通知唤醒
          condition.signalAll();
      } catch (Exception e) {

      } finally {
          lock.unlock();
      }
  }

  public void decrement() throws Exception {
      lock.lock();
      try {
          // 1.判断
          while (number == 0) {
              // 等待不能生产
              condition.await();
          }
          // 2.干活
          System.out.print(Thread.currentThread().getName() + "\t" + number);
          number--;
          System.out.println("\t>\t" + number);
          // 3.通知唤醒
          condition.signalAll();
      } catch (Exception e) {

      } finally {
          lock.unlock();
      }
  }

}

public class ProdConsumer_TraditionDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.increment(); } catch (Exception e) {

              } finally {

              }
          }
      }, "生产者").start();
      new Thread(() -> {
          for (int i = 0; i < 5; i++) {
              try {
                  shareData.decrement();
              } catch (Exception e) {

              } finally {

              }
          }
      }, "消费者").start();
  }

} ```

使用==while==替代==if==判断,可以防止虚假唤醒

lock 和 synchronized :

  1. 区别:
  2. synchronized 是JVM层面,是==关键字==;
  3. lock 是JUC包的==类==,是1.5之后出现的;
  4. 使用方法:
  5. synchronized 不需要用户手动释放锁,当synchronized代码执行完成后系统会自动让线程释放对锁的占用;
  6. ReentrantLock则需要用户手动释放锁若没有释放锁,就会导致死锁现象;需要lock()和unlock()方法配合try...finally...语句块配合完成;
  7. 等待是否可中断:
  8. synchronized不可中断,除非抛出异常或者正常执行完成;
  9. ReentrantLock可中断;
    1. 设置超时方法trylock(long timeout , TimeUnit unit);
    2. lockIntrruptibly()放代码块中,调用intrrupt()方法可中断;
  10. 加锁是否公平:
  11. synchronized 非公平锁;
  12. ReentrantLock 默认为非公平锁,构造方法输入(true)是公平锁;
  13. 锁绑定多个 条件Condition
  14. synchronized 没有
  15. ReentrantLock 用来实现分组唤醒需要唤醒线程们,可以精确唤醒,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程;

多线程按顺序调用

  • 题目:多线程之间按照顺序调用,实现A B C三个线程启动,要求如下:

  • A打印5次 B打印10次 C打印15次

  • 重复10轮

范例 : SyncAndReentrantLockDemo.java

```java package com;

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;

class ShareResoure { private int number = 1;//A=1;B=2;C=3 private Lock lock = new ReentrantLock(); private Condition c1 = lock.newCondition(); private Condition c2 = lock.newCondition(); private Condition c3 = lock.newCondition();

  public void printA() {
      lock.lock();
      try {
          // 1.判断
          while (number != 1) {
              c1.await();
          }
          // 2.干活
          for (int i = 0; i < 1; i++) {
              System.out.println(Thread.currentThread().getName() + "\t" + i);
          }
          // 3.通知
          number = 2;//修改标志位
          c2.signal();//通知B可以运行
      } catch (Exception e) {

      } finally {
          lock.unlock();
      }
  }

  public void printB() {
      lock.lock();
      try {
          // 1.判断
          while (number != 2) {
              c2.await();
          }
          // 2.干活
          for (int i = 0; i < 2; i++) {
              System.out.println(Thread.currentThread().getName() + "\t" + i);
          }
          // 3.通知
          number = 3;//修改标志位
          c3.signal();//通知B可以运行
      } catch (Exception e) {

      } finally {
          lock.unlock();
      }
  }

  public void printC() {
      lock.lock();
      try {
          // 1.判断
          while (number != 3) {
              c3.await();
          }
          // 2.干活
          for (int i = 0; i < 3; i++) {
              System.out.println(Thread.currentThread().getName() + "\t" + i);
          }
          System.out.println();
          // 3.通知
          number = 1;//修改标志位
          c1.signal();//通知B可以运行
      } catch (Exception e) {

      } finally {
          lock.unlock();
      }
  }

}

public class SyncAndReentrantLockDemo { public static void main(String[] args) { ShareResoure shareResoure = new ShareResoure(); new Thread(() -> { for (int i = 0; i < 3; i++) { shareResoure.printA(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 3; i++) { shareResoure.printB(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 3; i++) { shareResoure.printC(); } }, "C").start(); } } ```

生产者&消费者阻塞队列版

知识点

  • volatile
  • CAS
  • atomicInteger
  • BlockQueue
  • 线程交互
  • 原子引用

SyncAndReentrantLockDemo.java

package com;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MyResource {
    BlockingQueue<String> blockingQueue = null;
    private volatile boolean FLAG = true;//默认开始,进行生产消费的开关
    private AtomicInteger atomicInteger = new AtomicInteger();

    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName());
    }

    public void stop() {
        this.FLAG = false;
    }

    public void myProd() throws Exception {
        String data = null;
        boolean reValule;
        while (FLAG) {
            data = atomicInteger.incrementAndGet() + "";
            reValule = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
            if (reValule) {
                System.out.println(Thread.currentThread().getName() + "\t" + "插入" + data + "成功");
            } else {
                System.out.println(Thread.currentThread().getName() + "\t" + "插入" + data + "失败");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName() + "/t" + "生产结束");
    }

    public void myConsumer() throws Exception {
        String result;
        while (FLAG) {
            result = blockingQueue.poll(2L, TimeUnit.SECONDS);
            if (result == null || "".equalsIgnoreCase(result)) {
                FLAG = false;
                System.out.println(Thread.currentThread().getName() + "\t 超过2秒没有取到商品,消费退出");
                System.out.println("============end==============");
                return;
            } else {
                System.out.println(Thread.currentThread().getName() + "\t" + result + " 消费成功");
            }

        }
    }
}

public class ProdConsumer_BlockQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));
        new Thread(() -> {
            try {
                myResource.myProd();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "prod").start();
        new Thread(() -> {
            try {
                myResource.myConsumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "cons").start();

        TimeUnit.SECONDS.sleep(5);
        myResource.stop();
        System.out.println("end");
    }
}