interface Queue {
boolean offer(Object obj);
Object poll();
}
class FairnessBoundedBlockingQueue implements Queue {
// 当前大小
protected int size;
// 容量
protected final int capacity;
// 头指针,empty: head.next == tail == null
protected Node head;
// 尾指针
protected Node tail;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.head = new Node(null);
this.tail = head;
this.size = 0;
}
// 如果队列已满,通过返回值标识
public boolean offer(Object obj) {
if (size < capacity) {
Node node = new Node(obj);
tail.next = node;
tail = node;
++size;
return true;
}
return false;
}
// 如果队列为空,head.next == null;返回空元素
public Object poll() {
if (head.next != null) {
Object result = head.next.value;
head.next.value = null;
head = head.next; // 丢弃头结点
--size;
return result;
}
return null;
}
class Node {
Object value;
Node next;
Node(Object obj) {
this.value = obj;
next = null;
}
}
}
// 省略接口定义
class BoundedBlockingQueue implements Queue {
// 当前大小
protected int size;
// 容量
protected final int capacity;
// 头指针,empty: head.next == tail == null
protected Node head;
// 尾指针
protected Node tail;
public BoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.head = new Node(null);
this.tail = head;
this.size = 0;
}
// 如果队列已满,通过返回值标识
public synchronized boolean offer(Object obj) {
if (size < capacity) {
Node node = new Node(obj);
tail.next = node;
tail = node;
++size;
return true;
}
return false;
}
// 如果队列为空,head.next == null;返回空元素
public synchronized Object poll() {
if (head.next != null) {
Object result = head.next.value;
head.next.value = null;
head = head.next; // 丢弃头结点
--size;
return result;
}
return null;
}
// 省略 Node 的定义
}
WHEN(condition) Object action(Object arg) {
checkPreCondition();
doAction(arg);
checkPostCondition();
}
// 当前线程
synchronized Object action(Object arg) {
while(!condition) {
wait();
}
// 前置条件,不变式
checkPreCondition();
doAction();
// 后置条件,不变式
checkPostCondition();
}
// 其他线程
synchronized Object notifyAction(Object arg) {
notifyAll();
}
interface Queue {
boolean offer(Object obj) throws InterruptedException;
Object poll() throws InterruptedException;
}
class FairnessBoundedBlockingQueue implements Queue {
// 当前大小
protected int size;
// 容量
protected final int capacity;
// 头指针,empty: head.next == tail == null
protected Node head;
// 尾指针
protected Node tail;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.head = new Node(null);
this.tail = head;
this.size = 0;
}
// 如果队列已满,通过返回值标识
public synchronized boolean offer(Object obj) throws InterruptedException {
size>=capacity {
wait();
}
Node node = new Node(obj);
tail.next = node;
tail = node;
++size;
notifyAll(); // 可以出队
return true;
}
// 如果队列为空,阻塞等待
public synchronized Object poll() throws InterruptedException {
while (head.next == null) {
wait();
}
Object result = head.next.value;
head.next.value = null;
head = head.next; // 丢弃头结点
--size;
notifyAll(); // 可以入队
return result;
}
// 省略 Node 的定义
}
// 省略接口定义
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;
// 头指针,empty: head.next == tail == null
protected Node head;
// 尾指针
protected Node tail;
// guard: canPollCount, head
protected final Object pollLock = new Object();
protected int canPollCount;
// guard: canOfferCount, tail
protected final Object offerLock = new Object();
protected int canOfferCount;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canPollCount = 0;
this.canOfferCount = capacity;
this.head = new Node(null);
this.tail = head;
}
// 如果队列已满,通过返回值标识
public boolean offer(Object obj) throws InterruptedException {
synchronized(offerLock) {
while(canOfferCount <= 0) {
offerLock.wait();
}
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
pollLock.notifyAll();
}
return true;
}
// 如果队列为空,阻塞等待
public Object poll() throws InterruptedException {
Object result = null;
synchronized(pollLock) {
while(canPollCount <= 0) {
pollLock.wait();
}
result = head.next.value;
head.next.value = null;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
offerLock.notifyAll();
}
return result;
}
// 省略 Node 定义
}
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;
// 头指针,empty: head.next == tail == null
protected Node head;
// 尾指针
protected Node tail;
// guard: canPollCount, head
protected final Object pollLock = new Object();
protected int canPollCount;
protected int waitPollCount;
// guard: canOfferCount, tail
protected final Object offerLock = new Object();
protected int canOfferCount;
protected int waitOfferCount;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canPollCount = 0;
this.canOfferCount = capacity;
this.waitPollCount = 0;
this.waitOfferCount = 0;
this.head = new Node(null);
this.tail = head;
}
// 如果队列已满,通过返回值标识
public boolean offer(Object obj) throws InterruptedException {
synchronized(offerLock) {
while(canOfferCount <= 0) {
waitOfferCount++;
offerLock.wait();
waitOfferCount--;
}
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
if (waitPollCount > 0) {
pollLock.notify();
}
}
return true;
}
// 如果队列为空,阻塞等待
public Object poll() throws InterruptedException {
Object result;
synchronized(pollLock) {
while(canPollCount <= 0) {
waitPollCount++;
pollLock.wait();
waitPollCount--;
}
result = head.next.value;
head.next.value = null;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
if (waitOfferCount > 0) {
offerLock.notify();
}
}
return result;
}
// 省略 Node 的定义
}
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;
// 头指针,empty: head.next == tail == null
protected Node head;
// 尾指针
protected Node tail;
// guard: canPollCount, head, waitPollCount
protected final Object pollLock = new Object();
protected int canPollCount;
protected int waitPollCount;
// guard: canOfferCount, tail, waitOfferCount
protected final Object offerLock = new Object();
protected int canOfferCount;
protected int waitOfferCount;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canPollCount = 0;
this.canOfferCount = capacity;
this.waitPollCount = 0;
this.waitOfferCount = 0;
this.head = new Node(null);
this.tail = head;
}
// 如果队列已满,通过返回值标识
public boolean offer(Object obj) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException(); // 线程已中断,直接退出即可,防止中断线程竞争锁
}
synchronized(offerLock) {
while(canOfferCount <= 0) {
waitOfferCount++;
try {
offerLock.wait();
} catch (InterruptedException e) {
// 触发其他线程
offerLock.notify();
throw e;
} finally {
waitOfferCount--;
}
}
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
if (waitPollCount > 0) {
pollLock.notify();
}
}
return true;
}
// 如果队列为空,阻塞等待
public Object poll() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Object result = null;
synchronized(pollLock) {
while(canPollCount <= 0) {
waitPollCount++;
try {
pollLock.wait();
} catch (InterruptedException e) {
pollLock.notify();
throw e;
} finally {
waitPollCount--;
}
}
result = head.next.value;
head.next.value = 0;
// ignore head;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
if (waitOfferCount > 0) {
offerLock.notify();
}
}
return result;
}
// 省略 Node 的定义
}
boolean needToWait;
synchronized(this) {
needToWait = calculateNeedToWait();
if (needToWait) {
enqueue(monitor); // 请求对应的monitor
}
}
if (needToWait) {
monitor.doWait();
}
// 省略接口定义
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;
// 头指针,empty: head.next == tail == null
protected Node head;
// 尾指针
protected Node tail;
// guard: canPollCount, head, pollQueue
protected final Object pollLock = new Object();
protected int canPollCount;
// guard: canOfferCount, tail, offerQueue
protected final Object offerLock = new Object();
protected int canOfferCount;
protected final WaitQueue pollQueue = new WaitQueue();
protected final WaitQueue offerQueue = new WaitQueue();
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canOfferCount = capacity;
this.canPollCount = 0;
this.head = new Node(null);
this.tail = head;
}
// 如果队列已满,通过返回值标识
public boolean offer(Object obj) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException(); // 线程已中断,直接退出即可,防止中断线程竞争锁
}
WaitNode wait = null;
synchronized(offerLock) {
// 在有阻塞请求或者队列为空时,阻塞等待
if (canOfferCount <= 0 || !offerQueue.isEmpty()) {
wait = new WaitNode();
offerQueue.enq(wait);
} else {
// continue.
}
}
try {
if (wait != null) {
wait.doWait();
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
} catch (InterruptedException e) {
offerQueue.doNotify();
throw e;
}
// 确保此时线程状态正常,以下不会校验中断
synchronized(offerLock) {
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
pollQueue.doNotify();
}
return true;
}
// 如果队列为空,阻塞等待
public Object poll() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Object result = null;
WaitNode wait = null;
synchronized(pollLock) {
// 在有阻塞请求或者队列为空时,阻塞等待
if (canPollCount <= 0 || !pollQueue.isEmpty()) {
wait = new WaitNode();
pollQueue.enq(wait);
} else {
// ignore
}
}
try {
if (wait != null) {
wait.doWait();
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
} catch (InterruptedException e) {
// 传递消息
pollQueue.doNotify();
throw e;
}
// 以下不会检测线程中断状态
synchronized(pollLock) {
result = head.next.value;
head.next.value = 0;
// ignore head;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
offerQueue.doNotify();
}
return result;
}
class WaitQueue {
WaitNode head;
WaitNode tail;
WaitQueue() {
head = new WaitNode();
tail = head;
}
synchronized void doNotify() {
for(;;) {
WaitNode node = deq();
if (node == null) {
break;
} else if (node.doNotify()) {
// 此处确保NOTIFY成功
break;
} else {
// ignore, and retry.
}
}
}
synchronized boolean isEmpty() {
return head.next == null;
}
synchronized void enq(WaitNode node) {
tail.next = node;
tail = tail.next;
}
synchronized WaitNode deq() {
if (head.next == null) {
return null;
}
WaitNode res = head.next;
head = head.next;
if (head.next == null) {
tail = head; // 为空,迁移tail节点
}
return res;
}
}
class WaitNode {
boolean released;
WaitNode next;
WaitNode() {
released = false;
next = null;
}
synchronized void doWait() throws InterruptedException {
try {
while (!released) {
wait();
}
} catch (InterruptedException e) {
if (!released) {
released = true;
throw e;
} else {
// 如果是NOTIFY之后收到中断的信号,不能抛出异常;需要做RELAY处理
Thread.currentThread().interrupt();
}
}
}
synchronized boolean doNotify() {
if (!released) {
released = true;
notify();
// 明确释放了一个线程,返回true
return true;
} else {
// 没有释放新的线程,返回false
return false;
}
}
}
// 省略 Node 的定义
}
class TimeoutException extends InterruptedException {}
class WaitNode {
boolean released;
WaitNode next;
WaitNode() {
released = false;
next = null;
}
synchronized void doWait(long milliSeconds) throws InterruptedException {
try {
long startTime = System.currentTimeMillis();
long toWait = milliSeconds;
for (;;) {
wait(toWait);
if (released) {
return;
}
long now = System.currentTimeMillis();
toWait = toWait - (now - startTime);
if (toWait <= 0) {
throw new TimeoutException();
}
}
} catch (InterruptedException e) {
if (!released) {
released = true;
throw e;
} else {
// 如果已经释放信号量,此处不抛出异常;但恢复中断状态
Thread.currentThread().interrupt();
}
}
}
synchronized boolean doNotify() {
if (!released) {
released = true;
notify();
return true;
} else {
return false;
}
}
首先定义超时异常,此处只是为了方便异常处理,继承 InterruptedException;
此处依赖于 wait(long timeout) 的超时等待实现,这通常不是问题;
资源编排(Resource Orchestration)是一种简单易用的云计算资源管理和自动化运维服务。用户通过模板描述多个云计算资源的依赖关系、配置等,并自动完成所有资源的创建和配置,以达到自动化部署、运维等目的。编排模板同时也是一种标准化的资源和应用交付方式,并且可以随时编辑修改,使基础设施即代码(Infrastructure as Code)成为可能。
点击阅读原文查看详情!