blog
blog copied to clipboard
ReentrantLock、Condition 介绍
直接上例子,我们用 ReentrantLock 实现一个简单的“FIFO 的阻塞队列”
FiFoBlockingQueue.java
public class FiFoBlockingQueue<E> {
private Object[] elems = null;
private int index = 0;
private int removeIndex = 0;
private int count = 0;
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public FiFoBlockingQueue(int capacity) {
this.elems = new Object[capacity];
}
public void add(E elem) throws InterruptedException {
final Lock lock = this.lock;
lock.lock();
while (count == elems.length) {
notFull.await();
}
if (index == elems.length) {
index = 0;
}
elems[index++] = elem;
count++;
notEmpty.signal();
lock.unlock();
}
public E remove() throws InterruptedException {
final Lock lock = this.lock;
lock.lock();
while (count <= 0) {
notEmpty.await();
}
if (removeIndex == elems.length) {
removeIndex = 0;
}
@SuppressWarnings("unchecked")
E elem = (E) elems[removeIndex];
elems[removeIndex++] = null;
count--;
notFull.signal();
lock.unlock();
return elem;
}
}
Message.java
public class Message {
private String phone;
private String msg;
Message(String phone, String msg) {
this.phone = phone;
this.msg = msg;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
Producer.java
public class Producer extends Thread {
private final FiFoBlockingQueue<Message> queue;
Producer(FiFoBlockingQueue<Message> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 20; i++) {
Message message = new Message(
Integer.toString((int)(Math.random() * 9999999)),
"Message " + i
);
System.out.println("[Producer] phone=" + message.getPhone()
+ ", message=" + message.getMsg());
queue.add(message);
}
queue.add(null);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Consumer.java
public class Consumer extends Thread {
private final FiFoBlockingQueue<Message> queue;
Consumer(FiFoBlockingQueue<Message> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Message message = queue.remove();
if (null == message) {
break;
}
// 模拟消费时间
Thread.sleep(1000);
System.out.println("[Consumer: "+
Thread.currentThread().getName() +"]: phone=" +
message.getPhone() + ", message=" +
message.getMsg()
);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Test.java
public class Test {
static final FiFoBlockingQueue<Message> queue = new FiFoBlockingQueue<>(10);
public static void main(String[] args) {
Producer producer = new Producer(queue);
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
producer.start();
c1.start();
c2.start();
}
}
我们借助于 Condition 来判断队列是否为空、是否满,在为空的时候通知 Producer 生产,在满的时候通知 Consumer 消费。
关于 Condition 原理,参考:
- http://www.importnew.com/9281.html