讀古今文學網 > Java程序員修煉之道 > 4.5 分支/合併框架 >

4.5 分支/合併框架

就像第6章要討論的一樣,處理器的速度(或者更準確地說是CPU上的晶體管數量)最近幾年增長迅猛。由此產生的副作用就是處理器等待I/O操作變成了家常便飯。這表明我們能夠更好地利用計算機的處理能力。分支/合併框架就可以解決這個問題——這也是Java 7對並發領域新做出的最大貢獻。

分支/合併框架完全是為了實現線程池中任務的自動調度,並且這種調度對用戶來說是透明的。為了達到這種效果,必須按用戶指定的方式對任務進行分解。在很多應用程序中,對於分支/合併框架來說都可以很自然地把其中的任務分成「小型」和「大型」任務。

我們來快速瀏覽一些與分支/合併框架相關的重要事實和基本原理。

  • 引入了一種新的執行者服務,稱為ForkJoinPool
  • ForkJoinPool服務處理一種比線程「更小」的並發單元ForkJoinTask
  • ForkJoinTask是一種由ForkJoinPool以更輕量化的方式所調度的抽像。
  • 通常使用兩種任務(儘管兩種都表示為ForkJoinTask實例):
    • 「小型」任務是那種無需處理器耗費太多時間就可以直接執行的任務。
    • 「大型」任務是那種需要在直接執行前進行分解(還可能不止一次)的任務。
  • 提供了支持大型任務分解的基本方法,它還有自動調度和重新調度的能力。

這個框架的關鍵特性之一就是這些輕量的任務都能夠生成新的ForkJoinTask實例,而這些實例將仍由執行它們父任務的線程池來安排調度。這就是分而治之。

我們會通過一個簡單的例子告訴你如何使用分支/合併框架,然後簡短介紹「工作竊取」這個特性,最後討論一下那些適用於並行處理技術的特性。使用分支/合併框架最好的辦法就是從例子入手。

4.5.1 一個簡單的分支/合併例子

我們為說明分支/合併框架而設定了這樣的應用場景:有一個數組,裡面存放不同時間到達的微博更新,我們想按到達時間對它們排序,以便為用戶生成時間線,就像在代碼清單4-9中生成的那個一樣。

我們會用MergeSort的變體實現多線程排序。代碼清單4-16中用到了ForkJoinTask的特定子類RecursiveAction。因為它明顯可以獨立完成任務(對這些更新的排序能當即完成),而且具備遞歸處理能力(遞歸特別適合做排序),所以用RecursiveAction會比用通用的ForkJoinTask更簡單。

MicroBlogUpdateSorter類用Update對象的compareTo方法對更新列表排序。compute方法(超類RecursiveAction中的抽像方法,必須實現)基本上是按創建時間對微博更新數組排序。

代碼清單4-16 用RecursiveAction排序

public class MicroBlogUpdateSorter extends RecursiveAction {
  private static final int SMALL_ENOUGH = 32;// 串行排序項只有32個或更少
  private final Update updates;
  private final int start, end;
  private final Update result;

  public MicroBlogUpdateSorter(Update updates_) {
    this(updates_, 0, updates_.length);
  }

  public MicroBlogUpdateSorter(Update upds_, int startPos_, int endPos_) {
    start = startPos_;
    end = endPos_;
    updates = upds_;
    result = new Update[updates.length];
  }

  private void merge(MicroBlogUpdateSorter left_, MicroBlogUpdateSorter right_) {
    int i = 0;
    int lCt = 0;
    int rCt = 0;
    while (lCt < left_.size && rCt < right_.size) {
      result[i++] = (left_.result[lCt].compareTo(right_.result[rCt]) < 0)
        ? left_.result[lCt++]
        : right_.result[rCt++];
    }
    while (lCt < left_.size) result[i++] = left_.result[lCt++];
    while (rCt < right_.size) result[i++] = right_.result[rCt++];
  }

  public int size {
    return end - start;
  }

  public Update getResult {
    return result;
  }

  //RecursiveAction中聲明的方法
  @Override
  protected void compute {
    if (size < SMALL_ENOUGH) {
      System.arraycopy(updates, start, result, 0, size);
      Arrays.sort(result, 0, size);
    } else {
      int mid = size / 2;
      MicroBlogUpdateSorter left = new MicroBlogUpdateSorter(updates, start, start + mid);
      MicroBlogUpdateSorter right = new MicroBlogUpdateSorter(updates, start + mid, end);
      invokeAll(left, right);
      merge(left, right)
    }
  }
}
  

要使用這個排序器,你可以用下面這樣的代碼驅動它,生成一些包含由X組成的字符串的更新,並打亂它們的順序,之後再傳給排序器。最終得到重新排序後的更新。

代碼清單4-17 使用遞歸排序器

List<Update> lu = new ArrayList<Update>;
String text = \"\";
final Update.Builder ub = new Update.Builder;
final Author a = new Author(\"Tallulah\");

for (int i=0; i<256; i++) {
  text = text + \"X\";
  long now = System.currentTimeMillis;
  lu.add(ub.author(a).updateText(text).createTime(now).build);
  try {
    Thread.sleep(1);
  } catch (InterruptedException e) {
  }
}
Collections.shuffle(lu);
Update updates = lu.toArray(new Update[0]);//傳入空數組,省掉空間分配

MicroBlogUpdateSorter sorter = new MicroBlogUpdateSorter(updates);
ForkJoinPool pool = new ForkJoinPool(4);
pool.invoke(sorter);

for (Update u: sorter.getResult) {
  System.out.println(u);
}
  

TimSort

隨著Java 7的到來,默認的數組排序算法已經變了。以前是以QuickSort的形式,但到了Java 7時代則變成了「TimSort」——MergeSort和插入排序的混合體。TimSort最初是Tim Peters為Python開發的,而且從2.3版(2002)開始就是Python中的默認排序算法了。

如果想看看TimSort在Java 7中存在的證據,可以給清單4-16中的代碼傳入一個null數組。對數組排序時,由於數組尺寸太小,會調用Array.sort方法,該方法會拋出空指針異常,在輸出的異常信息裡就能看到TimSort類。

4.5.2 ForkJoinTask與工作竊取

ForkJoinTaskRecursiveAction的超類。它是從動作中返回結果的泛型類型,所以RecursiveAction擴展了ForkJoinTask<Void>。這使得ForkJoinTask非常適合用MapReduce1 方式返回數據集中歸結出的結果。

1 MapReduce是Google提出的一個軟件架構,用於大規模數據集(大於1TB)的並行運算。當前的軟件實現是指定一個Map(映射)函數,用來把一組鍵值對映射成一組新的鍵值對,指定並發的Reduce(化簡)函數,用來保證所有映射的鍵值對中的每一個元素都共享相同的鍵組。——譯者注

ForkJoinTasksForkJoinPool調度安排,ForkJoinPool是專為這些輕量任務設計的新型執行者服務。該服務維護每個線程的任務列表,並且當某個任務完成時,它能把掛在滿負荷線程上的任務重新安排到空閒線程上去。

採用這種「工作竊取」的算法是為了解決大小不同的任務所導致的調度問題。大小不同的任務所需的運行時間通常也會有很大差別。比如說,某個線程的運行隊列中都是小任務,而另外一個全是大任務。如果小任務的運行速度比大任務快五倍,只處理小任務的線程很可能在處理大任務的線程完成之前就處於空閒狀態了。

Java 7實現的工作竊取機制精準地解決了這個問題,並且在分支/合併框架工作的整個生命週期中使線程池中的所有線程都有用武之地。工作竊取完全是自動的,你什麼也不用做就能享受到它帶來的好處。不需要手工干預,而是由運行環境承擔更多工作幫助開發人員管理並發,這在Java 7中已經不是什麼新鮮事了。

4.5.3 並行問題

分支/合併框架的確對我們的幫助很大,但在實際中,並不是每個問題都能像4.5.1節中那樣輕易地簡化成多線程MergeSort

這裡是一些可以用分支/合併方法解決的問題:

  • 模擬大量簡單對象的運動,比如粒子效果;
  • 日誌文件分析;
  • 從輸入中計數的數據操作,比如mapreduce操作。

從另一個角度來說,圖4-10中這個被分解的問題正是分支/合併框架可以解決的。

圖4-10 分支與合併

用下面這個列表檢查問題及其子任務是一個切實有效的方法,它可以確定是否能用分支/合併來解決這個問題。

  • 問題的子任務是否無需與其他子任務有顯式的協作或同步也可以工作?

  • 子任務是不是不會對數據進行修改,只是經過計算得出些結果(它們是不是函數程序員稱為「純粹的」函數的函數)?

  • 對於子任務來說,分而治之是不是很自然的事?子任務是不是會創建更多的子任務,而且它們要比派生出它們的任務粒度更細?

對於前面這些提問,如果答案是肯定的,或者「大體如此,但有臨界情況」,那你的問題很可能適合用分支/合併的方式解決。反過來,如果答案是「可能吧」或者「算不上」,你就會發現分支/合併幫不上什麼忙,可能用其他的同步方式更合適。

注意 前面的檢查列表是測試某個問題(比如在Hadoop和NoSQL數據庫中常見的那種)能否很好地用分支/合併方式解決的有效方法。

想設計出優秀的多線程算法並不容易,分支/合併方法也不能面面俱到。在適用的領域,它的用處很廣。其實歸根結底,你必須要確定你的問題是否適合這個框架,如果不適合,你只能在性能卓越的java.util.concurrent工具箱上構建自己的解決方案。

在下一節中,我們會詳細討論經常被誤解的Java內存模型(Java Memory Model,JMM)。很多Java程序員都知道JMM,並且在沒有經過正式介紹的情況下按自己的理解寫代碼。如果你覺得這是在說你,那麼接下來的內容會幫助你重新認識JMM,並且幫你打下紮實的基礎。JMM這個話題相當有深度,所以如果你急著進入下一章,可以先跳過它。