讀古今文學網 > 編寫高質量代碼:改善Java程序的151個建議 > 建議126:適時選擇不同的線程池來實現 >

建議126:適時選擇不同的線程池來實現

Java的線程池實現從最根本上來說只有兩個:ThreadPoolExecutor類和Scheduled-ThreadPoolExecutor類,這兩個類還是父子關係,但是Java為了簡化並行計算,還提供了一個Executors的靜態類,它可以直接生成多種不同的線程池執行器,比如單線程執行器、帶緩衝功能的執行器等,但歸根結底還是使ThreadPoolExecutor類或ScheduledThreadPoolExecutor類的封裝類。

為了理解這些執行器,我們首先來ThreadPoolExecutor類,其中它複雜的構造函數可以很好解釋該線程池的作用,代碼如下:


public class ThreadPoolExecutor extends AbstractExecutorService{

//最完整的構造函數

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long

keep Alive Time, Time Unitunit, Block in gQueue<Runnable>

workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){

//檢驗輸入條件

if(corePoolSize<0||maximumPoolSize<=0||

maximumPoolSize<corePoolSize||keepAliveTime<0)

throw new IllegalArgumentException();

//檢驗運行環境

if(workQueue==null||threadFactory==null||handler==null)

throw new NullPointerException();

this.corePoolSize=corePoolSize;

this.maximumPoolSize=maximumPoolSize;this.workQueue=workQueue;

this.keepAliveTime=unit.toNanos(keepAliveTime);this.threadFactory=threadFactory;

this.handler=handler;

}

}


這是ThreadPoolExecutor最完整的構造函數,其他的構造函數都是引用該構造函數實現的,我們逐步來解釋這些參數的含義。

corePoolSize:最小線程數。

線程池啟動後,在池中保持線程的最小數量。需要說明的是線程數量是逐步到達corePoolSize值的,例如corePoolSize被設置為10,而任務數量只有5,則線程池中最多會啟動5個線程,而不是一次性地啟動10個線程。

maximumPoolSize:最大線程數量。

這是池中能夠容納的最大線程數量,如果超出,則使用RejectedExecutionHandler拒絕策略處理。

keepAliveTime:線程最大生命期。

這裡的生命期有兩個約束條件:一是該參數針對的是超過corePoolSize數量的線程;二是處於非運行狀態的線程。這麼說吧,如果corePoolSize為10,maximumPoolSize為20,此時線程池中有15個線程在運行,一段時間後,其中有3個線程處於等待狀態的時間超過了keepAliveTime指定的時間,則結束這3個線程,此時線程池中則還有12個線程正在運行。

unit:時間單位。

這是keepAliveTime的時間單位,可以是納秒、毫秒、秒、分鐘等選項。

workQueue:任務隊列。

當線程池中的線程都處於運行狀態,而此時任務數量繼續增加,則需要有一個容器來容納這些任務,這就是任務隊列。

threadFactory:線程工廠。

定義如何啟動一個線程,可以設置線程名稱,並且可以確認是否是後台線程等。

handler:拒絕任務處理器。

由於超出線程數量和隊列容量而對繼續增加的任務進行處理的程序。

線程池的管理是這樣一個過程:首先創建線程池,然後根據任務的數量逐步將線程增大到corePoolSize數量,如果此時仍有任務增加,則放置到workQueue中,直到workQueue爆滿為止,然後繼續增加池中的線程數量(增強處理能力),最終達到maximumPoolSize,那如果此時還有任務要增加進來呢?這就需要handler來處理了,或者丟棄新任務,或者拒絕新任務,或者擠占已有任務等。

在任務隊列和線程池都飽和的情況下,一旦有線程處於等待(任務處理完畢,沒有新任務增加)狀態的時間超過keepAliveTime,則該線程終止,也就是說池中的線程數量會逐漸降低,直至為corePoolSize數量為止。

我們可以把線程池想像成這樣一個場景:在一條生產線上,車間規定是可以有corePoolSize數量的工人,但是生產線剛建立時,工作不多,不需要那麼多的人。隨著工作數量的增加,工人數量也逐漸增加,直至增加到corePoolSize數量為止,此時任務還在增加,那怎麼辦呢?

好辦,任務排隊,corePoolSize數量的工人不停歇地處理任務,新增加的任務按照一定的規則存放在倉庫中(也就是我們的workQueue中),一旦任務增加的速度超過了工人處理的能力,也就是說倉庫爆滿時,車間就會繼續招聘工人(也就是擴大線程數),直至工人數量達到maximumPoolSize為止,那如果所有的maximumPoolSize工人都在處理任務,而且倉庫也是飽和狀態,新增任務的該怎麼處理呢?這就會扔給一個叫handler的專門機構去處理了,它要麼丟棄這些新增的任務,要麼無視,要麼替換掉別的任務。

過了一段時間後,任務的數量逐漸減少了,導致有一部分工人處以待工狀態,為了減少開支(Java是為了減少系統資源消耗),於是開始辭退工人,直至保持為corePoolSize數量的工人為止,此時即使沒有工作,也不再辭退工人(池中線程數量不再減少),這也是為了保證以後再有任務時能夠快速的處理。

明白了線程池的概念,我們再來看Executors提供的幾個創建線程池的便捷方法:

newSingleThreadExecutor:單線程池。

顧名思義就是一個池中只有一個線程在運行,該線程永不超時。而且由於是一個線程,當有多個任務需要處理時,會將它們放置到一個無界阻塞隊列中逐個處理,它的實現代碼如下:


public static ExecutorService newSingleThreadExecutor(){

return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1,1,0L,

TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));

}


它的使用方法也非常簡單,下面是簡單的示例:


public static void main(Stringargs)throws Exception{

//創建單線程執行器

ExecutorService es=Executors.newSingleThreadExecutor();

//執行一個任務

Future<String>future=es.submit(new Callable<String>(){

public String call()throws Exception{

return\"\";

}

});

//獲得任務執行後的返回值

System.out.println(\"返回值:\"+future.get());

//關閉執行器

es.shutdown();

}


newCachedThreadPool:緩衝功能的線程池。

建立了一個線程池,而且線程數量是沒有限制的(當然,不能超過Integer的最大值),新增一個任務即有一個線程處理,或者復用之前空閒的線程,或者新啟動一個線程,但是一旦一個線程在60秒內一直是出於等待狀態時(也就是1分鐘沒工作可做),則會被終止,其源代碼如下。


public static ExecutorService newCachedThreadPool(){

return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new

SynchronousQueue<Runnable>());

}


這裡需要說明的是,任務隊列使用了同步阻塞隊列,這意味著向隊列中加入一個元素,即可喚醒一個線程(新創建的線程或復用池中空閒線程)來處理,這種隊列已經沒有隊列深度的概念了。

newFixedThreadPool:固定線程數量的線程池。

在初始化時已經決定了線程的最大數量,若任務添加的能力超出了線程處理能力,則建立阻塞隊列容納多餘的任務,源代碼如下:


public static ExecutorService newFixedThreadPool(int nThreads){

return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new

LinkedBlockingQueue<Runnable>());

}


上面返回的是一個ThreadPoolExecutor,它的corePoolSize和maximumPoolSize是相等的,也就是說,最大線程數量也是nThreads。如果任務增長速度非常快,超過了LinkedBlockingQueue的最大容量(Integer的最大值),那此時會如何處理呢?會按照ThreadPoolExecutor默認的拒絕策略(默認是DiscardPolicy,直接丟棄)來處理。

以上三種線程池執行器都是ThreadPoolExecutor的簡化版,目的是幫助開發人員屏蔽過多的線程細節,簡化多線程開發。當需要運行異步任務時,可以直接通過Executors獲得一個線程池,然後運行任務,不需要關注ThreadPoolExecutor的一系列參數是什麼含義。當然,有時候這三個線程池不能滿足要求,此時則可以直接操作ThreadPoolExecutor來實現複雜的多線程運算。可以這樣來比喻:newSingleThreadExecutor、newCachedThreadPool、newFixedThreadPool是線程池的簡化版,而ThreadPoolExecutor則是旗艦版——簡化版更容易操作,需要瞭解的知識相對少些,方便實用,而且旗艦版功能齊全,適用面廣,但難於駕馭。