阻塞隊列BlockingQueue擴展了Queue、Collection接口,對元素的插入和提取使用了「阻塞」處理,我們知道Collection下的實現類一般都採用了長度自行管理方式(也就是變長),比如這樣的代碼是可以正常運行的:
public static void main(Stringargs){
//定義初始長度為5
List<String>list=new ArrayList<String>(5);
//加入10個元素
for(int i=0;i<10;i++){
list.add(\"\");
}
}
上面的代碼定義了列表的初始長度為5,在實際使用時,當加入的元素超過初始容量時,ArrayList會自行擴容,確保能夠正常加入元素。那BlockingQueue也是集合,也實現了Collection接口,它的容量是否會自行管理呢?我們來看代碼:
public static void main(Stringargs)throws Exception{
//定義初始長度為5
BlockingQueue<String>bq=new ArrayBlockingQueue<String>(5);
//加入10個元素
for(int i=0;i<10;i++){
bq.add(\"\");
}
}
BlockingQueue能夠自行擴容嗎?答案是不能,運行結果如下:
Exception in thread\"main\"java.lang.IllegalStateException:Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:71)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:209)
at Client.main(Client.java:12)
沒錯,報隊列已滿異常,這是阻塞隊列和非阻塞隊列一個重要區別:阻塞隊列的容量是固定的,非阻塞隊列則是變長的。阻塞隊列可以在聲明時指定隊列的容量,若指定的容量,則元素的數量不可超過該容量,若不指定,隊列的容量為Integer的最大值。
阻塞隊列和非阻塞隊列有此區別的原因是阻塞隊列是為了容納(或排序)多線程任務而存在的,其服務的對象是多線程應用,而非阻塞隊列容納的則是普通的數據元素。我們來看一下ArrayBlockingQueue類最常用的add方法。
public class ArrayBlockingQueue<E>extends AbstractQueue<E>
implements BlockingQueue<E>,java.io.Serializable{
//容納元素的數組
private final Eitems;
//元素數量計數器
private int count;
public boolean add(E e){
//調用offer方法嘗試寫入
if(offer(e))
return true;
else
//寫入失敗,隊列已滿
throw new IllegalStateException(\"Queue full\");
}
public boolean offer(E e){
final ReentrantLock lock=this.lock;
//申請鎖,只允許同時有一個線程操作
lock.lock();
try{
//元素計數器的計數與數組長度相同,表示隊列已滿
if(count==items.length)
return false;
else{//隊列未滿,插入元素
insert(e);
return true;
}
}finally{
//釋放鎖
lock.unlock();
}
}
}
上面在加入元素時,如果判斷出當前隊列已滿,則返回false,表示插入失敗,之後再包裝成隊列滿異常。此處需要注意offer方法,如果我們直接調用offer方法插入元素,在超出容量的情況下,它除了返回false外,不會提供任何其他信息,如果我們的代碼不做插入判斷,那就會造成數據的「默默」丟失,這就是它與非阻塞隊列的不同之處。
阻塞隊列的這種機制對異步計算是非常有幫助的,例如我們定義深度為100的阻塞隊列容納100個任務,多個線程從該隊列中獲取任務並處理,當所有的線程都在繁忙,並且隊列中任務數量已經為100時,也預示著系統運算壓力非常巨大,而且處理結果的時間也會比較長,於是在第101個任務期望加入時,隊列拒絕加入,而且返回異常,由系統自行處理,避免了異步運算的不可知性。但是如果應用期望無論等待多長時間都要運行該任務,不希望返回異常,那該怎麼處理呢?
此時就需要用BlockingQueue接口定義的put方法了,它的作用也是把元素加入到隊列中,但它與add、offer方法不同,它會等待隊列空出元素,再讓自己加入進去,通俗地講,put方法提供的是一種「無賴」式的插入,無論等待多長時間都要把該元素插入到隊列中,它的實現代碼如下:
public void put(E e)throws InterruptedException{
//容納元素的數組
final Eitems=this.items;
final ReentrantLock lock=this.lock;
//可中斷鎖
lock.lockInterruptibly();
try{
try{
//隊列滿,等待其他線程移除元素
while(count==items.length)
notFull.await();
}catch(InterruptedException ie){
//被中斷了,喚醒其他線程
notFull.signal();
throw ie;
}
//插入元素
insert(e);
}finally{
//釋放鎖
lock.unlock();
}
}
put方法的目的就是確保元素肯定會加入到隊列中,問題是此種等待是一個循環,會不停地消耗系統資源,當等待加入的元素數量較多時勢必會對系統性能產生影響,那該如何解決呢?JDK已經想到了這個問題,它提供了帶有超時時間的offer方法,其實現方法與put比較類似,只是使用Condition的awaitNanos方法來判斷當前線程已經等待了多少納秒,超時則返回false。
與插入元素相對應,取出元素也有不同的實現,例如remove、poll、take等方法,對於此類方法的理解要建立在阻塞隊列的長度固定的基礎上,然後根據是否阻塞、阻塞是否超時等實際情況選用不同的插入和提取方法。
注意 阻塞隊列的長度是固定的。