interface Queue {
boolean offer(Object obj);
Object poll();
}class FairnessBoundedBlockingQueue implements Queue { // 当前大小 protected int size;
// 容量 protected final int capacity;
// 头指针,empty: head.next == tail == null protected Node head;
// 尾指针 protected Node tail;
public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.head = new Node(null); this.tail = head; this.size = 0; }
// 如果队列已满,通过返回值标识 public boolean offer(Object obj) { if (size < capacity) { Node node = new Node(obj); tail.next = node; tail = node; ++size; return true; } return false; }
// 如果队列为空,head.next == null;返回空元素 public Object poll() { if (head.next != null) { Object result = head.next.value; head.next.value = null; head = head.next; // 丢弃头结点 --size; return result; } return null; }
class Node { Object value; Node next; Node(Object obj) { this.value = obj; next = null; } }}
// 省略接口定义class BoundedBlockingQueue implements Queue { // 当前大小 protected int size;
// 容量 protected final int capacity;
// 头指针,empty: head.next == tail == null protected Node head;
// 尾指针 protected Node tail;
public BoundedBlockingQueue(int capacity) { this.capacity = capacity; this.head = new Node(null); this.tail = head; this.size = 0; }
// 如果队列已满,通过返回值标识 public synchronized boolean offer(Object obj) { if (size < capacity) { Node node = new Node(obj); tail.next = node; tail = node; ++size; return true; } return false; }
// 如果队列为空,head.next == null;返回空元素 public synchronized Object poll() { if (head.next != null) { Object result = head.next.value; head.next.value = null; head = head.next; // 丢弃头结点 --size; return result; } return null; } // 省略 Node 的定义}
WHEN(condition) Object action(Object arg) { checkPreCondition(); doAction(arg); checkPostCondition();}
// 当前线程synchronized Object action(Object arg) { while(!condition) { wait(); } // 前置条件,不变式 checkPreCondition(); doAction(); // 后置条件,不变式 checkPostCondition();}
// 其他线程synchronized Object notifyAction(Object arg) { notifyAll();}
interface Queue {
boolean offer(Object obj) throws InterruptedException;
Object poll() throws InterruptedException;
}class FairnessBoundedBlockingQueue implements Queue { // 当前大小 protected int size;
// 容量 protected final int capacity;
// 头指针,empty: head.next == tail == null protected Node head;
// 尾指针 protected Node tail;
public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.head = new Node(null); this.tail = head; this.size = 0; }
// 如果队列已满,通过返回值标识 public synchronized boolean offer(Object obj) throws InterruptedException { size>=capacity { wait(); } Node node = new Node(obj); tail.next = node; tail = node; ++size; notifyAll(); // 可以出队 return true; }
// 如果队列为空,阻塞等待 public synchronized Object poll() throws InterruptedException { while (head.next == null) { wait(); } Object result = head.next.value; head.next.value = null; head = head.next; // 丢弃头结点 --size; notifyAll(); // 可以入队 return result; } // 省略 Node 的定义}
// 省略接口定义class FairnessBoundedBlockingQueue implements Queue { // 容量 protected final int capacity;
// 头指针,empty: head.next == tail == null protected Node head;
// 尾指针 protected Node tail;
// guard: canPollCount, head protected final Object pollLock = new Object(); protected int canPollCount;
// guard: canOfferCount, tail protected final Object offerLock = new Object(); protected int canOfferCount;
public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.head = new Node(null); this.tail = head; }
// 如果队列已满,通过返回值标识 public boolean offer(Object obj) throws InterruptedException { synchronized(offerLock) { while(canOfferCount <= 0) { offerLock.wait(); } Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; pollLock.notifyAll(); } return true; }
// 如果队列为空,阻塞等待 public Object poll() throws InterruptedException { Object result = null; synchronized(pollLock) { while(canPollCount <= 0) { pollLock.wait(); }
result = head.next.value; head.next.value = null; head = head.next; canPollCount--; } synchronized(offerLock) { canOfferCount++; offerLock.notifyAll(); } return result; } // 省略 Node 定义}
class FairnessBoundedBlockingQueue implements Queue { // 容量 protected final int capacity;
// 头指针,empty: head.next == tail == null protected Node head;
// 尾指针 protected Node tail;
// guard: canPollCount, head protected final Object pollLock = new Object(); protected int canPollCount; protected int waitPollCount;
// guard: canOfferCount, tail protected final Object offerLock = new Object(); protected int canOfferCount; protected int waitOfferCount;
public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.waitPollCount = 0; this.waitOfferCount = 0; this.head = new Node(null); this.tail = head; }
// 如果队列已满,通过返回值标识 public boolean offer(Object obj) throws InterruptedException { synchronized(offerLock) { while(canOfferCount <= 0) { waitOfferCount++; offerLock.wait(); waitOfferCount--; } Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; if (waitPollCount > 0) { pollLock.notify(); } } return true; }
// 如果队列为空,阻塞等待 public Object poll() throws InterruptedException { Object result; synchronized(pollLock) { while(canPollCount <= 0) { waitPollCount++; pollLock.wait(); waitPollCount--; }
result = head.next.value; head.next.value = null; head = head.next; canPollCount--; } synchronized(offerLock) { canOfferCount++; if (waitOfferCount > 0) { offerLock.notify(); } } return result; } // 省略 Node 的定义}
class FairnessBoundedBlockingQueue implements Queue { // 容量 protected final int capacity;
// 头指针,empty: head.next == tail == null protected Node head;
// 尾指针 protected Node tail;
// guard: canPollCount, head, waitPollCount protected final Object pollLock = new Object(); protected int canPollCount; protected int waitPollCount;
// guard: canOfferCount, tail, waitOfferCount protected final Object offerLock = new Object(); protected int canOfferCount; protected int waitOfferCount;
public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.waitPollCount = 0; this.waitOfferCount = 0; this.head = new Node(null); this.tail = head; }
// 如果队列已满,通过返回值标识 public boolean offer(Object obj) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); // 线程已中断,直接退出即可,防止中断线程竞争锁 } synchronized(offerLock) { while(canOfferCount <= 0) { waitOfferCount++; try { offerLock.wait(); } catch (InterruptedException e) { // 触发其他线程 offerLock.notify(); throw e;
} finally { waitOfferCount--; } } Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; if (waitPollCount > 0) { pollLock.notify(); } } return true; }
// 如果队列为空,阻塞等待 public Object poll() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } Object result = null; synchronized(pollLock) { while(canPollCount <= 0) { waitPollCount++; try { pollLock.wait(); } catch (InterruptedException e) { pollLock.notify(); throw e; } finally { waitPollCount--; } }
result = head.next.value; head.next.value = 0; // ignore head; head = head.next; canPollCount--; } synchronized(offerLock) { canOfferCount++; if (waitOfferCount > 0) { offerLock.notify(); } } return result; } // 省略 Node 的定义}
boolean needToWait;synchronized(this) { needToWait = calculateNeedToWait(); if (needToWait) { enqueue(monitor); // 请求对应的monitor }}if (needToWait) { monitor.doWait();}
// 省略接口定义class FairnessBoundedBlockingQueue implements Queue { // 容量 protected final int capacity;
// 头指针,empty: head.next == tail == null protected Node head;
// 尾指针 protected Node tail;
// guard: canPollCount, head, pollQueue protected final Object pollLock = new Object(); protected int canPollCount;
// guard: canOfferCount, tail, offerQueue protected final Object offerLock = new Object(); protected int canOfferCount;
protected final WaitQueue pollQueue = new WaitQueue(); protected final WaitQueue offerQueue = new WaitQueue();
public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canOfferCount = capacity; this.canPollCount = 0; this.head = new Node(null); this.tail = head; }
// 如果队列已满,通过返回值标识 public boolean offer(Object obj) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); // 线程已中断,直接退出即可,防止中断线程竞争锁 } WaitNode wait = null; synchronized(offerLock) { // 在有阻塞请求或者队列为空时,阻塞等待 if (canOfferCount <= 0 || !offerQueue.isEmpty()) { wait = new WaitNode(); offerQueue.enq(wait); } else { // continue. } }
try { if (wait != null) { wait.doWait(); } if (Thread.interrupted()) { throw new InterruptedException(); } } catch (InterruptedException e) { offerQueue.doNotify(); throw e; }
// 确保此时线程状态正常,以下不会校验中断 synchronized(offerLock) { Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; pollQueue.doNotify(); } return true; }
// 如果队列为空,阻塞等待 public Object poll() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } Object result = null; WaitNode wait = null; synchronized(pollLock) { // 在有阻塞请求或者队列为空时,阻塞等待 if (canPollCount <= 0 || !pollQueue.isEmpty()) { wait = new WaitNode(); pollQueue.enq(wait); } else { // ignore } }
try { if (wait != null) { wait.doWait(); } if (Thread.interrupted()) { throw new InterruptedException(); } } catch (InterruptedException e) { // 传递消息 pollQueue.doNotify(); throw e; }
// 以下不会检测线程中断状态 synchronized(pollLock) { result = head.next.value; head.next.value = 0; // ignore head; head = head.next; canPollCount--; }
synchronized(offerLock) { canOfferCount++; offerQueue.doNotify(); } return result; }
class WaitQueue {
WaitNode head; WaitNode tail;
WaitQueue() { head = new WaitNode(); tail = head; }
synchronized void doNotify() { for(;;) { WaitNode node = deq(); if (node == null) { break; } else if (node.doNotify()) { // 此处确保NOTIFY成功 break; } else { // ignore, and retry. } } }
synchronized boolean isEmpty() { return head.next == null; }
synchronized void enq(WaitNode node) { tail.next = node; tail = tail.next; }
synchronized WaitNode deq() { if (head.next == null) { return null; } WaitNode res = head.next; head = head.next; if (head.next == null) { tail = head; // 为空,迁移tail节点 } return res; } }
class WaitNode { boolean released; WaitNode next; WaitNode() { released = false; next = null; }
synchronized void doWait() throws InterruptedException { try { while (!released) { wait(); } } catch (InterruptedException e) { if (!released) { released = true; throw e; } else { // 如果是NOTIFY之后收到中断的信号,不能抛出异常;需要做RELAY处理 Thread.currentThread().interrupt(); } } }
synchronized boolean doNotify() { if (!released) { released = true; notify(); // 明确释放了一个线程,返回true return true; } else { // 没有释放新的线程,返回false return false; } } } // 省略 Node 的定义}
class TimeoutException extends InterruptedException {}
class WaitNode { boolean released; WaitNode next; WaitNode() { released = false; next = null; }
synchronized void doWait(long milliSeconds) throws InterruptedException { try { long startTime = System.currentTimeMillis(); long toWait = milliSeconds; for (;;) { wait(toWait); if (released) { return; } long now = System.currentTimeMillis(); toWait = toWait - (now - startTime); if (toWait <= 0) { throw new TimeoutException(); } } } catch (InterruptedException e) { if (!released) { released = true; throw e; } else { // 如果已经释放信号量,此处不抛出异常;但恢复中断状态 Thread.currentThread().interrupt(); } } }
synchronized boolean doNotify() { if (!released) { released = true; notify(); return true; } else { return false; } }
首先定义超时异常,此处只是为了方便异常处理,继承 InterruptedException;
此处依赖于 wait(long timeout) 的超时等待实现,这通常不是问题;
资源编排(Resource Orchestration)是一种简单易用的云计算资源管理和自动化运维服务。用户通过模板描述多个云计算资源的依赖关系、配置等,并自动完成所有资源的创建和配置,以达到自动化部署、运维等目的。编排模板同时也是一种标准化的资源和应用交付方式,并且可以随时编辑修改,使基础设施即代码(Infrastructure as Code)成为可能。
点击阅读原文查看详情!