讀古今文學網 > Java程序員修煉之道 > 4.4 控制執行 >

4.4 控制執行

我們在前面的討論中一直把工作任務當成抽像的單元。然而有個細節需要注意,我們一直沒有提到的是這些單元要比Thread小——它們提供的方法把計算任務包含在一個工作單元中,無需為每個單元啟動新的線程。這樣處理多線程代碼通常效率更高,因為免除了為每個單元啟動Thread的開銷。執行代碼的線程是重用的,處理完一個任務後會繼續處理新的工作單元。

雖然複雜一些,但你可以實現線程池、工人與管理者模式和執行者等開發人員最常用的模式。我們接下來要密切關注可以對任務(CallableFutureFutureTask)和執行者建模的類和接口,特別是ScheduledThreadPoolExecutor

4.4.1 任務建模

我們的終極目標是不用為調度每個任務或工作單元而啟動新線程。歸根結底,就是要把它們做成可以調用(通常由執行者調用)的代碼,而不是直接可運行的線程。

我們來看對任務建模的三種辦法——CallableFuture接口以及FutureTask類。

1.Callable接口

Callable接口是一個非常常見的概念,代表了一段可以調用並返回結果的代碼。儘管這種做法很直接,但實際上它的作用微妙而又強大,用它可以創建出一些特別實用的模式。

Callable的典型用法是匿名實現類。這段代碼的最後一行把s賦值為out.toString:

final MyObject out = getSampleObject;

Callable<String> cb = new Callable<String> {
  public String call throws Exception {
    return out.toString;
  }
};
String s = cb.call;
  

可以把Callable的匿名實現類當做對單一抽像方法call的遞延調用,該實現必須提供這個方法。

Callable是SAM類型(「單一抽像方法」的縮寫,有時會這樣稱呼它)的示例——這是Java 7把函數作為一等類型最可行的辦法。在後續章節討論非Java語言時還會遇到它們,那時我們還會進一步討論把函數作為值或一等類型的概念。

2.Future接口

Future接口用來表示異步任務,是還沒有完成的任務給出的未來結果。我們在第2章介紹NIO.2和異步I/O時提過。

下面是Future中的主要方法。

  • get——用來獲取結果。如果結果還沒準備好,get會被阻塞直到它能取得結果。還有一個可以設置超時的版本,這個版本永遠不會阻塞。

  • cancel——在運算結束前取消。

  • isDone——調用者用它來判斷運算是否結束。

下面這段代碼(找素數)展示了Future的用法:

Future<Long> fut = getNthPrime(1_000_000_000);

Long result = null;
while (result == null) {
  try {
    result = fut.get(60, TimeUnit.SECONDS);
  } catch (TimeoutException tox) { }
  System.out.println(\"Still not found the billionth prime!\");
}
System.out.println(\"Found it: \"+ result.longValue);
  

在這段代碼中,你應該想像一下返回FuturegetNthPrime在某個後台線程或多個線程上運行的情景,也有可能是在執行者框架上運行。即便使用先進的硬件,這種運算可能也需要很長時間——你最後還是要用Futurecancel方法。

3.FutureTask

FutureTaskFuture接口的常用實現類,它也實現了Runnable接口。這意味著FutureTask可以由執行者調度,這一點很關鍵。它對外提供的方法基本上就是FutureRunnable接口的組合:getcancelisDoneisCancelledrun,最後一個方法通常都是由執行者調用,你基本不需要直接調用它。

FutureTask還提供了兩個很方便的構造器:一個以Callable為參數,另一個以Runnable為參數。這些類之間的關聯表明對於任務建模的辦法非常靈活,允許你基於FutureTaskRunnable特性(因為它實現了Runnable接口),把任務寫成Callable,然後封裝進一個由執行者調度並在必要時可以取消的FutureTask

4.4.2 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor(以下簡稱STPE)是線程池類中的重中之重——它功能多樣,廣受歡迎。STPE接收任務,並把它們安排給線程池裡的線程。

  • 線程池的大小可以預定義,也可自適應。
  • 所安排的任務可以定期執行,也可只運行一次。
  • STPE擴展了ThreadPoolExecutor類(很相似,但不具備定期調度能力)。

java.util.concurrent中的工具類相結合的STPE線程池是大中型多線程應用程序最常見的模式之一,這些工具類包括我們在前面已經見過的ConcurrentHashMapCopyOnWriteArrayListBlockingQueue等。

STPE不過是通過Executors類的工廠方法輕易獲取的眾多執行者之一。使用這些工廠方法很方便,開發人員通過它們可以輕易獲取典型配置,需要時還可以開放完整的接口方法。

下面的代碼是一個定期讀取的例子。這是newScheduledThreadPool的常見用法:msgReader對像被安排poll一個隊列,從隊列中的WorkUnit對像裡取得工作項,然後輸出。

代碼清單4-15 STPE定期讀取

private ScheduledExecutorService stpe;
private ScheduledFuture<?> hndl;//取消時需要

private BlockingQueue<WorkUnit<String>> lbq = new LinkedBlockingQueue<>;

private void run{
  stpe = Executors.newScheduledThreadPool(2);//執行者的工廠方法

  final Runnable msgReader = new Runnable{
    public void run{
      String nextMsg = lbq.poll.getWork;
      if (nextMsg != null) System.out.println(\"Msg recvd: \"+ nextMsg);
    }
  };
  hndl = stpe.scheduleAtFixedRate(msgReader, 10, 10,
       TimeUnit.MILLISECONDS);
}

public void cancel {
  final ScheduledFuture<?> myHndl = hndl;

  stpe.schedule(new Runnable {
    public void run { myHndl.cancel(true); }//取消時需要
  }, 10, TimeUnit.MILLISECONDS);
}
  

在這個例子中,STPE每隔10毫秒就喚醒一個線程,讓它嘗試poll一個隊列。如果讀取返回null(因為隊列當前為空),則什麼也不會發生,線程回去繼續睡大覺。如果收到了一個工作單元,則線程會輸出該工作單元的內容。

用Callable調用的代表性問題

形式簡單的CallableFutureTask及相關類存在幾個問題——尤其在涉及類型系統時。 要明白這一點,可以想想怎麼才能滿足一個未知方法可能出現的所有方法簽名。Callable只能用於沒有參數的方法。要滿足所有可能性,你需要Callable的不同變體。

在Java中,你可以通過指定模型系統內的方法簽名來解決這個問題。但你在本書第三部分會見到,動態語言不能用這種靜態視圖來約束。我們將會返回來重點討論這種類型系統之間的不匹配。現在你只要注意到,雖然Callable很有用,但要用它構建一個通用框架來對線程執行進行建模還是有點兒限制得太死了。

現在我們要轉向Java 7重點突出的框架之一——用於輕量級並發的分支/合併(fork/join)框架。這個框架比我們在本節中見到的執行者在處理並發問題方面更加高效,要達到這點絕非易事。