讀古今文學網 > 編寫高質量代碼:改善Java程序的151個建議 > 建議129:適當設置阻塞隊列長度 >

建議129:適當設置阻塞隊列長度

阻塞隊列BlockingQueue擴展了Queue、Collection接口,對元素的插入和提取使用了「阻塞」處理,我們知道Collection下的實現類一般都採用了長度自行管理方式(也就是變長),比如這樣的代碼是可以正常運行的:


public static void main(Stringargs){

//定義初始長度為5

List<String>list=new ArrayList<String>(5);

//加入10個元素

for(int i=0;i<10;i++){

list.add(\"\");

}

}


上面的代碼定義了列表的初始長度為5,在實際使用時,當加入的元素超過初始容量時,ArrayList會自行擴容,確保能夠正常加入元素。那BlockingQueue也是集合,也實現了Collection接口,它的容量是否會自行管理呢?我們來看代碼:


public static void main(Stringargs)throws Exception{

//定義初始長度為5

BlockingQueue<String>bq=new ArrayBlockingQueue<String>(5);

//加入10個元素

for(int i=0;i<10;i++){

bq.add(\"\");

}

}


BlockingQueue能夠自行擴容嗎?答案是不能,運行結果如下:


Exception in thread\"main\"java.lang.IllegalStateException:Queue full

at java.util.AbstractQueue.add(AbstractQueue.java:71)

at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:209)

at Client.main(Client.java:12)


沒錯,報隊列已滿異常,這是阻塞隊列和非阻塞隊列一個重要區別:阻塞隊列的容量是固定的,非阻塞隊列則是變長的。阻塞隊列可以在聲明時指定隊列的容量,若指定的容量,則元素的數量不可超過該容量,若不指定,隊列的容量為Integer的最大值。

阻塞隊列和非阻塞隊列有此區別的原因是阻塞隊列是為了容納(或排序)多線程任務而存在的,其服務的對象是多線程應用,而非阻塞隊列容納的則是普通的數據元素。我們來看一下ArrayBlockingQueue類最常用的add方法。


public class ArrayBlockingQueue<E>extends AbstractQueue<E>

implements BlockingQueue<E>,java.io.Serializable{

//容納元素的數組

private final Eitems;

//元素數量計數器

private int count;

public boolean add(E e){

//調用offer方法嘗試寫入

if(offer(e))

return true;

else

//寫入失敗,隊列已滿

throw new IllegalStateException(\"Queue full\");

}

public boolean offer(E e){

final ReentrantLock lock=this.lock;

//申請鎖,只允許同時有一個線程操作

lock.lock();

try{

//元素計數器的計數與數組長度相同,表示隊列已滿

if(count==items.length)

return false;

else{//隊列未滿,插入元素

insert(e);

return true;

}

}finally{

//釋放鎖

lock.unlock();

}

}

}


上面在加入元素時,如果判斷出當前隊列已滿,則返回false,表示插入失敗,之後再包裝成隊列滿異常。此處需要注意offer方法,如果我們直接調用offer方法插入元素,在超出容量的情況下,它除了返回false外,不會提供任何其他信息,如果我們的代碼不做插入判斷,那就會造成數據的「默默」丟失,這就是它與非阻塞隊列的不同之處。

阻塞隊列的這種機制對異步計算是非常有幫助的,例如我們定義深度為100的阻塞隊列容納100個任務,多個線程從該隊列中獲取任務並處理,當所有的線程都在繁忙,並且隊列中任務數量已經為100時,也預示著系統運算壓力非常巨大,而且處理結果的時間也會比較長,於是在第101個任務期望加入時,隊列拒絕加入,而且返回異常,由系統自行處理,避免了異步運算的不可知性。但是如果應用期望無論等待多長時間都要運行該任務,不希望返回異常,那該怎麼處理呢?

此時就需要用BlockingQueue接口定義的put方法了,它的作用也是把元素加入到隊列中,但它與add、offer方法不同,它會等待隊列空出元素,再讓自己加入進去,通俗地講,put方法提供的是一種「無賴」式的插入,無論等待多長時間都要把該元素插入到隊列中,它的實現代碼如下:


public void put(E e)throws InterruptedException{

//容納元素的數組

final Eitems=this.items;

final ReentrantLock lock=this.lock;

//可中斷鎖

lock.lockInterruptibly();

try{

try{

//隊列滿,等待其他線程移除元素

while(count==items.length)

notFull.await();

}catch(InterruptedException ie){

//被中斷了,喚醒其他線程

notFull.signal();

throw ie;

}

//插入元素

insert(e);

}finally{

//釋放鎖

lock.unlock();

}

}


put方法的目的就是確保元素肯定會加入到隊列中,問題是此種等待是一個循環,會不停地消耗系統資源,當等待加入的元素數量較多時勢必會對系統性能產生影響,那該如何解決呢?JDK已經想到了這個問題,它提供了帶有超時時間的offer方法,其實現方法與put比較類似,只是使用Condition的awaitNanos方法來判斷當前線程已經等待了多少納秒,超時則返回false。

與插入元素相對應,取出元素也有不同的實現,例如remove、poll、take等方法,對於此類方法的理解要建立在阻塞隊列的長度固定的基礎上,然後根據是否阻塞、阻塞是否超時等實際情況選用不同的插入和提取方法。

注意 阻塞隊列的長度是固定的。