生产者&消费者传统版¶
-
题目:一个初始值为0的变量,两个线程对其交替操作,一个加1一个减1,来5轮
```java package com;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
/** * 资源类 */ class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition();
public void increment() throws Exception {
lock.lock();
try {
// 1.判断
while (number != 0) {
// 等待不能生产
condition.await();
}
// 2.干活
System.out.print(Thread.currentThread().getName() + "\t" + number);
number++;
System.out.println("\t>\t" + number);
// 3.通知唤醒
condition.signalAll();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
public void decrement() throws Exception {
lock.lock();
try {
// 1.判断
while (number == 0) {
// 等待不能生产
condition.await();
}
// 2.干活
System.out.print(Thread.currentThread().getName() + "\t" + number);
number--;
System.out.println("\t>\t" + number);
// 3.通知唤醒
condition.signalAll();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}
public class ProdConsumer_TraditionDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.increment(); } catch (Exception e) {
} finally {
}
}
}, "生产者").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
} finally {
}
}
}, "消费者").start();
}
} ```
使用==while==替代==if==判断,可以防止虚假唤醒
lock 和 synchronized :¶
- 区别:
- synchronized 是JVM层面,是==关键字==;
- lock 是JUC包的==类==,是1.5之后出现的;
- 使用方法:
- synchronized 不需要用户手动释放锁,当synchronized代码执行完成后系统会自动让线程释放对锁的占用;
- ReentrantLock则需要用户手动释放锁若没有释放锁,就会导致死锁现象;需要lock()和unlock()方法配合try...finally...语句块配合完成;
- 等待是否可中断:
- synchronized不可中断,除非抛出异常或者正常执行完成;
- ReentrantLock可中断;
- 设置超时方法trylock(long timeout , TimeUnit unit);
- lockIntrruptibly()放代码块中,调用intrrupt()方法可中断;
- 加锁是否公平:
- synchronized 非公平锁;
- ReentrantLock 默认为非公平锁,构造方法输入(true)是公平锁;
- 锁绑定多个 条件Condition
- synchronized 没有
- ReentrantLock 用来实现分组唤醒需要唤醒线程们,可以精确唤醒,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程;
多线程按顺序调用¶
-
题目:多线程之间按照顺序调用,实现A B C三个线程启动,要求如下:
-
A打印5次 B打印10次 C打印15次
- 重复10轮
范例 : SyncAndReentrantLockDemo.java
```java package com;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
class ShareResoure { private int number = 1;//A=1;B=2;C=3 private Lock lock = new ReentrantLock(); private Condition c1 = lock.newCondition(); private Condition c2 = lock.newCondition(); private Condition c3 = lock.newCondition();
public void printA() {
lock.lock();
try {
// 1.判断
while (number != 1) {
c1.await();
}
// 2.干活
for (int i = 0; i < 1; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 3.通知
number = 2;//修改标志位
c2.signal();//通知B可以运行
} catch (Exception e) {
} finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
// 1.判断
while (number != 2) {
c2.await();
}
// 2.干活
for (int i = 0; i < 2; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 3.通知
number = 3;//修改标志位
c3.signal();//通知B可以运行
} catch (Exception e) {
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
// 1.判断
while (number != 3) {
c3.await();
}
// 2.干活
for (int i = 0; i < 3; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
System.out.println();
// 3.通知
number = 1;//修改标志位
c1.signal();//通知B可以运行
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}
public class SyncAndReentrantLockDemo { public static void main(String[] args) { ShareResoure shareResoure = new ShareResoure(); new Thread(() -> { for (int i = 0; i < 3; i++) { shareResoure.printA(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 3; i++) { shareResoure.printB(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 3; i++) { shareResoure.printC(); } }, "C").start(); } } ```
生产者&消费者阻塞队列版¶
知识点
- volatile
- CAS
- atomicInteger
- BlockQueue
- 线程交互
- 原子引用
package com;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class MyResource {
BlockingQueue<String> blockingQueue = null;
private volatile boolean FLAG = true;//默认开始,进行生产消费的开关
private AtomicInteger atomicInteger = new AtomicInteger();
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void stop() {
this.FLAG = false;
}
public void myProd() throws Exception {
String data = null;
boolean reValule;
while (FLAG) {
data = atomicInteger.incrementAndGet() + "";
reValule = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (reValule) {
System.out.println(Thread.currentThread().getName() + "\t" + "插入" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t" + "插入" + data + "失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "/t" + "生产结束");
}
public void myConsumer() throws Exception {
String result;
while (FLAG) {
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (result == null || "".equalsIgnoreCase(result)) {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 超过2秒没有取到商品,消费退出");
System.out.println("============end==============");
return;
} else {
System.out.println(Thread.currentThread().getName() + "\t" + result + " 消费成功");
}
}
}
}
public class ProdConsumer_BlockQueueDemo {
public static void main(String[] args) throws InterruptedException {
MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));
new Thread(() -> {
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
}, "prod").start();
new Thread(() -> {
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "cons").start();
TimeUnit.SECONDS.sleep(5);
myResource.stop();
System.out.println("end");
}
}