讀古今文學網 > Java 8實戰 > 第7章 並行數據處理與性能 >

第7章 並行數據處理與性能

本章內容

  • 用並行流並行處理數據

  • 並行流的性能分析

  • 分支/合併框架

  • 使用Spliterator分割流

在前面三章中,我們已經看到了新的Stream接口可以讓你以聲明性方式處理數據集。我們還解釋了將外部迭代換為內部迭代能夠讓原生Java庫控制流元素的處理。這種方法讓Java程序員無需顯式實現優化來為數據集的處理加速。到目前為止,最重要的好處是可以對這些集合執行操作流水線,能夠自動利用計算機上的多個內核。

例如,在Java 7之前,並行處理數據集合非常麻煩。第一,你得明確地把包含數據的數據結構分成若干子部分。第二,你要給每個子部分分配一個獨立的線程。第三,你需要在恰當的時候對它們進行同步來避免不希望出現的競爭條件,等待所有線程完成,最後把這些部分結果合併起來。Java 7引入了一個叫作分支/合併的框架,讓這些操作更穩定、更不易出錯。我們會在7.2節探討這一框架。

在本章中,你將瞭解Stream接口如何讓你不用太費力氣就能對數據集執行並行操作。它允許你聲明性地將順序流變為並行流。此外,你將看到Java是如何變戲法的,或者更實際地來說,流是如何在幕後應用Java 7引入的分支/合併框架的。你還會發現,瞭解並行流內部是如何工作的很重要,因為如果你忽視這一方面,就可能因誤用而得到意外的(很可能是錯的)結果。

我們會特別演示,在並行處理數據塊之前,並行流被劃分為數據塊的方式在某些情況下恰恰是這些錯誤且無法解釋的結果的根源。因此,你將會學習如何通過實現和使用你自己的Spliterator來控制這個劃分過程。

7.1 並行流

在第4章中,我們簡要地提到了Stream接口可以讓你非常方便地處理它的元素:可以通過對收集源調用parallelStream方法來把集合轉換為並行流。並行流就是一個把內容分成多個數據塊,並用不同的線程分別處理每個數據塊的流。這樣一來,你就可以自動把給定操作的工作負荷分配給多核處理器的所有內核,讓它們都忙起來。讓我們用一個簡單的例子來試驗一下這個思想。

假設你需要寫一個方法,接受數字n作為參數,並返回從1到給定參數的所有數字的和。一個直接(也許有點土)的方法是生成一個無窮大的數字流,把它限制到給定的數目,然後用對兩個數字求和的BinaryOperator來歸約這個流,如下所示:

public static long sequentialSum(long n) {
    return Stream.iterate(1L, i -> i + 1)    ←─生成自然數無限流
                 .limit(n)    ←─限制到前n個數
                 .reduce(0L, Long::sum);    ←─對所有數字求和來歸納流
}

  

用更為傳統的Java術語來說,這段代碼與下面的迭代等價:

public static long iterativeSum(long n) {
    long result = 0;
    for (long i = 1L; i <= n; i++) {
        result += i;
    }
    return result;
}

  

這似乎是利用並行處理的好機會,特別是n很大的時候。那怎麼入手呢?你要對結果變量進行同步嗎?用多少個線程呢?誰負責生成數呢?誰來做加法呢?

根本用不著擔心啦。用並行流的話,這問題就簡單多了!

7.1.1 將順序流轉換為並行流

你可以把流轉換成並行流,從而讓前面的函數歸約過程(也就是求和)並行運行——對順序流調用parallel方法:

public static long parallelSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
                 .limit(n)
                 .parallel    ←─將流轉換為並行流
                 .reduce(0L, Long::sum);
}

  

在上面的代碼中,對流中所有數字求和的歸納過程的執行方式和5.4.1節中說的差不多。不同之處在於Stream在內部分成了幾塊。因此可以對不同的塊獨立並行進行歸納操作,如圖7-1所示。最後,同一個歸納操作會將各個子流的部分歸納結果合併起來,得到整個原始流的歸納結果。

圖 7-1 並行歸納操作

請注意,在現實中,對順序流調用parallel方法並不意味著流本身有任何實際的變化。它在內部實際上就是設了一個boolean標誌,表示你想讓調用parallel之後進行的所有操作都並行執行。類似地,你只需要對並行流調用sequential方法就可以把它變成順序流。請注意,你可能以為把這兩個方法結合起來,就可以更細化地控制在遍歷流時哪些操作要並行執行,哪些要順序執行。例如,你可以這樣做:

stream.parallel
      .filter(...)
      .sequential
      .map(...)
      .parallel
      .reduce;

  

但最後一次parallelsequential調用會影響整個流水線。在本例中,流水線會並行執行,因為最後調用的是它。

配置並行流使用的線程池

看看流的parallel方法,你可能會想,並行流用的線程是從哪兒來的?有多少個?怎麼自定義這個過程呢?

並行流內部使用了默認的ForkJoinPool(7.2節會進一步講到分支/合併框架),它默認的線程數量就是你的處理器數量,這個值是由Runtime.getRuntime.availableProcessors得到的。

但是你可以通過系統屬性java.util.concurrent.ForkJoinPool.common. parallelism來改變線程池大小,如下所示:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");

  

這是一個全局設置,因此它將影響代碼中所有的並行流。反過來說,目前還無法專為某個並行流指定這個值。一般而言,讓ForkJoinPool的大小等於處理器數量是個不錯的默認值,除非你有很好的理由,否則我們強烈建議你不要修改它。

回到我們的數字求和練習,我們說過,在多核處理器上運行並行版本時,會有顯著的性能提升。現在你有三個方法,用三種不同的方式(迭代式、順序歸納和並行歸納)做完全相同的操作,讓我們看看誰最快吧!

7.1.2 測量流性能

我們聲稱並行求和方法應該比順序和迭代方法性能好。然而在軟件工程上,靠猜絕對不是什麼好辦法!特別是在優化性能時,你應該始終遵循三個黃金規則:測量,測量,再測量。為此,你可以開發一個方法,它與6.6.2節中用於比較劃分質數的兩個收集器性能的測試框架非常類似,如下所示。

代碼清單7-1 測量對前 n 個自然數求和的函數的性能

public long measureSumPerf(Function<Long, Long> adder, long n) {
    long fastest = Long.MAX_VALUE;
    for (int i = 0; i < 10; i++) {
        long start = System.nanoTime;
        long sum = adder.apply(n);
        long duration = (System.nanoTime - start) / 1_000_000;
        System.out.println("Result: " + sum);
        if (duration < fastest) fastest = duration;
    }
    return fastest;
}

  

這個方法接受一個函數和一個long作為參數。它會對傳給方法的long應用函數10次,記錄每次執行的時間(以毫秒為單位),並返回最短的一次執行時間。假設你把先前開發的所有方法都放進了一個名為ParallelStreams的類,你就可以用這個框架來測試順序加法器函數對前一千萬個自然數求和要用多久:

System.out.println("Sequential sum done in:" +
    measureSumPerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs");

  

請注意,我們對這個結果應持保留態度。影響執行時間的因素有很多,比如你的電腦支持多少個內核。你可以在自己的機器上跑一下這些代碼。我們在一台四核英特爾i7 2.3 GHz的MacBook Pro上運行它,輸出是這樣的:

Sequential sum done in: 97 msecs

  

用傳統for循環的迭代版本執行起來應該會快很多,因為它更為底層,更重要的是不需要對原始類型做任何裝箱或拆箱操作。如果你試著測量它的性能,

System.out.println("Iterative sum done in:" +
    measureSumPerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs");

  

將得到:

Iterative sum done in: 2 msecs

  

現在我們來對函數的並行版本做測試:

System.out.println("Parallel sum done in: " +
    measureSumPerf(ParallelStreams::parallelSum, 10_000_000) + " msecs" );

  

看看會出現什麼情況:

Parallel sum done in: 164 msecs

  

這相當令人失望,求和方法的並行版本比順序版本要慢很多。你如何解釋這個意外的結果呢?這裡實際上有兩個問題:

  • iterate生成的是裝箱的對象,必須拆箱成數字才能求和;

  • 我們很難把iterate分成多個獨立塊來並行執行。

第二個問題更有意思一點,因為你必須意識到某些流操作比其他操作更容易並行化。具體來說,iterate很難分割成能夠獨立執行的小塊,因為每次應用這個函數都要依賴前一次應用的結果,如圖7-2所示。

圖 7-2 iterate在本質上是順序的

這意味著,在這個特定情況下,歸納進程不是像圖7-1那樣進行的;整張數字列表在歸納過程開始時沒有準備好,因而無法有效地把流劃分為小塊來並行處理。把流標記成並行,你其實是給順序處理增加了開銷,它還要把每次求和操作分到一個不同的線程上。

這就說明了並行編程可能很複雜,有時候甚至有點違反直覺。如果用得不對(比如採用了一個不易並行化的操作,如iterate),它甚至可能讓程序的整體性能更差,所以在調用那個看似神奇的parallel操作時,瞭解背後到底發生了什麼是很有必要的。

使用更有針對性的方法

那到底要怎麼利用多核處理器,用流來高效地並行求和呢?我們在第5章中討論了一個叫LongStream.rangeClosed的方法。這個方法與iterate相比有兩個優點。

  • LongStream.rangeClosed直接產生原始類型的long數字,沒有裝箱拆箱的開銷。

  • LongStream.rangeClosed會生成數字範圍,很容易拆分為獨立的小塊。例如,範圍1~20可分為1~5、6~10、11~15和16~20。

讓我們先看一下它用於順序流時的性能如何,看看拆箱的開銷到底要不要緊:

public static long rangedSum(long n) {
    return LongStream.rangeClosed(1, n)
                     .reduce(0L, Long::sum);
}

  

這一次的輸出是:

Ranged sum done in: 17 msecs

  

這個數值流比前面那個用iterate工廠方法生成數字的順序執行版本要快得多,因為數值流避免了非針對性流那些沒必要的自動裝箱和拆箱操作。由此可見,選擇適當的數據結構往往比並行化算法更重要。但要是對這個新版本應用並行流呢?

public static long parallelRangedSum(long n) {
    return LongStream.rangeClosed(1, n)
                     .parallel
                     .reduce(0L, Long::sum);
}

  

現在把這個函數傳給你的測試方法:

System.out.println("Parallel range sum done in:" +
    measureSumPerf(ParallelStreams::parallelRangedSum, 10_000_000) +
    " msecs");

  

你會得到:

Parallel range sum done in: 1 msecs

  

終於,我們得到了一個比順序執行更快的並行歸納,因為這一次歸納操作可以像圖7-1那樣執行了。這也表明,使用正確的數據結構然後使其並行工作能夠保證最佳的性能。

儘管如此,請記住,並行化並不是沒有代價的。並行化過程本身需要對流做遞歸劃分,把每個子流的歸納操作分配到不同的線程,然後把這些操作的結果合併成一個值。但在多個內核之間移動數據的代價也可能比你想的要大,所以很重要的一點是要保證在內核中並行執行工作的時間比在內核之間傳輸數據的時間長。總而言之,很多情況下不可能或不方便並行化。然而,在使用並行Stream加速代碼之前,你必須確保用得對;如果結果錯了,算得快就毫無意義了。讓我們來看一個常見的陷阱。

7.1.3 正確使用並行流

錯用並行流而產生錯誤的首要原因,就是使用的算法改變了某些共享狀態。下面是另一種實現對前 n 個自然數求和的方法,但這會改變一個共享累加器:

public static long sideEffectSum(long n) {
    Accumulator accumulator = new Accumulator;
    LongStream.rangeClosed(1, n).forEach(accumulator::add);
    return accumulator.total;
}
public class Accumulator {
    public long total = 0;
    public void add(long value) { total += value; }
}

  

這種代碼非常普遍,特別是對那些熟悉指令式編程範式的程序員來說。這段代碼和你習慣的那種指令式迭代數字列表的方式很像:初始化一個累加器,一個個遍歷列表中的元素,把它們和累加器相加。

那這種代碼又有什麼問題呢?不幸的是,它真的無可救藥,因為它在本質上就是順序的。每次訪問total都會出現數據競爭。如果你嘗試用同步來修復,那就完全失去並行的意義了。為了說明這一點,讓我們試著把Stream變成並行的:

public static long sideEffectParallelSum(long n) {
    Accumulator accumulator = new Accumulator;
    LongStream.rangeClosed(1, n).parallel.forEach(accumulator::add);
    return accumulator.total;
}

  

用代碼清單7-1中的測試框架來執行這個方法,並打印每次執行的結果:

System.out.println("SideEffect parallel sum done in: " +
    measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) +"
    msecs" );

  

你可能會得到類似於下面這種輸出:

Result: 5959989000692
Result: 7425264100768
Result: 6827235020033
Result: 7192970417739
Result: 6714157975331
Result: 7497810541907
Result: 6435348440385
Result: 6999349840672
Result: 7435914379978
Result: 7715125932481
SideEffect parallel sum done in: 49 msecs

  

這回方法的性能無關緊要了,唯一要緊的是每次執行都會返回不同的結果,都離正確值50000005000000差很遠。這是由於多個線程在同時訪問累加器,執行total += value,而這一句雖然看似簡單,卻不是一個原子操作。問題的根源在於,forEach中調用的方法有副作用,它會改變多個線程共享的對象的可變狀態。要是你想用並行Stream又不想引發類似的意外,就必須避免這種情況。

現在你知道了,共享可變狀態會影響並行流以及並行計算。第13章和第14章詳細討論函數式編程的時候,我們還會談到這一點。現在,記住要避免共享可變狀態,確保並行Stream得到正確的結果。接下來,我們會看到一些實用建議,你可以由此判斷什麼時候可以利用並行流來提升性能。

7.1.4 高效使用並行流

一般而言,想給出任何關於什麼時候該用並行流的定量建議都是不可能也毫無意義的,因為任何類似於“僅當至少有一千個(或一百萬個或隨便什麼數字)元素的時候才用並行流)”的建議對於某台特定機器上的某個特定操作可能是對的,但在略有差異的另一種情況下可能就是大錯特錯。儘管如此,我們至少可以提出一些定性意見,幫你決定某個特定情況下是否有必要使用並行流。

  • 如果有疑問,測量。把順序流轉成並行流輕而易舉,但卻不一定是好事。我們在本節中已經指出,並行流並不總是比順序流快。此外,並行流有時候會和你的直覺不一致,所以在考慮選擇順序流還是並行流時,第一個也是最重要的建議就是用適當的基準來檢查其性能。

  • 留意裝箱。自動裝箱和拆箱操作會大大降低性能。Java 8中有原始類型流(IntStreamLongStreamDoubleStream)來避免這種操作,但凡有可能都應該用這些流。

  • 有些操作本身在並行流上的性能就比順序流差。特別是limitfindFirst等依賴於元素順序的操作,它們在並行流上執行的代價非常大。例如,findAny會比findFirst性能好,因為它不一定要按順序來執行。你總是可以調用unordered方法來把有序流變成無序流。那麼,如果你需要流中的n 個元素而不是專門要前n 個的話,對無序並行流調用limit可能會比單個有序流(比如數據源是一個List)更高效。

  • 還要考慮流的操作流水線的總計算成本。設 N 是要處理的元素的總數,Q 是一個元素通過流水線的大致處理成本,則 N*Q 就是這個對成本的一個粗略的定性估計。Q 值較高就意味著使用並行流時性能好的可能性比較大。

  • 對於較小的數據量,選擇並行流幾乎從來都不是一個好的決定。並行處理少數幾個元素的好處還抵不上並行化造成的額外開銷。

  • 要考慮流背後的數據結構是否易於分解。例如,ArrayList的拆分效率比LinkedList高得多,因為前者用不著遍歷就可以平均拆分,而後者則必須遍歷。另外,用range工廠方法創建的原始類型流也可以快速分解。最後,你將在7.3節中學到,你可以自己實現Spliterator來完全掌控分解過程。

  • 流自身的特點,以及流水線中的中間操作修改流的方式,都可能會改變分解過程的性能。例如,一個SIZED流可以分成大小相等的兩部分,這樣每個部分都可以比較高效地並行處理,但篩選操作可能丟棄的元素個數卻無法預測,導致流本身的大小未知。

  • 還要考慮終端操作中合併步驟的代價是大是小(例如Collector中的combiner方法)。如果這一步代價很大,那麼組合每個子流產生的部分結果所付出的代價就可能會超出通過並行流得到的性能提升。

表7-1按照可分解性總結了一些流數據源適不適於並行。

表7-1 流的數據源和可分解性

可分解性

ArrayList

極佳

LinkedList

IntStream.range

極佳

Stream.iterate

HashSet

TreeSet

最後,我們還要強調並行流背後使用的基礎架構是Java 7中引入的分支/合併框架。並行匯總的示例證明了要想正確使用並行流,瞭解它的內部原理至關重要,所以我們會在下一節仔細研究分支/合併框架。

7.2 分支/合併框架

分支/合併框架的目的是以遞歸方式將可以並行的任務拆分成更小的任務,然後將每個子任務的結果合併起來生成整體結果。它是ExecutorService接口的一個實現,它把子任務分配給線程池(稱為ForkJoinPool)中的工作線程。首先來看看如何定義任務和子任務。

7.2.1 使用RecursiveTask

要把任務提交到這個池,必須創建RecursiveTask<R>的一個子類,其中R是並行化任務(以及所有子任務)產生的結果類型,或者如果任務不返回結果,則是RecursiveAction類型(當然它可能會更新其他非局部機構)。要定義RecursiveTask,只需實現它唯一的抽像方法compute

protected abstract R compute;

  

這個方法同時定義了將任務拆分成子任務的邏輯,以及無法再拆分或不方便再拆分時,生成單個子任務結果的邏輯。正由於此,這個方法的實現類似於下面的偽代碼:

if (任務足夠小或不可分) {
    順序計算該任務
} else {
    將任務分成兩個子任務
    遞歸調用本方法,拆分每個子任務,等待所有子任務完成
    合併每個子任務的結果
}

  

一般來說並沒有確切的標準決定一個任務是否應該再拆分,但有幾種試探方法可以幫助你做出這一決定。我們會在7.2.1節中進一步澄清。遞歸的任務拆分過程如圖7-3所示。

圖 7-3 分支/合併過程

你可能已經注意到,這只不過是著名的分治算法的並行版本而已。這裡舉一個用分支/合併框架的實際例子,還以前面的例子為基礎,讓我們試著用這個框架為一個數字範圍(這裡用一個long數組表示)求和。如前所述,你需要先為RecursiveTask類做一個實現,就是下面代碼清單中的ForkJoinSumCalculator

代碼清單7-2 用分支/合併框架執行並行求和

public class ForkJoinSumCalculator
             extends java.util.concurrent.RecursiveTask<Long> {    ←─繼承RecursiveTask來創建可以用於分支/合併框架的任務

    private final long numbers;    ←─要求和的數組
    private final int start;    ←─子任務處理的數組的起始和終止位置
    private final int end;

    public static final long THRESHOLD = 10_000;    ←─不再將任務分解為子任務的數組大小

    public ForkJoinSumCalculator(long numbers) {    ←─公共構造函數用於創建主任務
        this(numbers, 0, numbers.length);
    }

    private ForkJoinSumCalculator(long numbers, int start, int end) {    ←─私有構造函數用於以遞歸方式為主任務創建子任務
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute {    ←─覆蓋RecursiveTask抽像方法
        int length = end - start;    ←─該任務負責求和的部分的大小
        if (length <= THRESHOLD) {
            return computeSequentially;    ←─如果大小小於或等於閾值,順序計算結果
        }
        ForkJoinSumCalculator leftTask =
            new ForkJoinSumCalculator(numbers, start, start + length/2);    ←─創建一個子任務來為數組的前一半求和
        leftTask.fork;    ←─利用另一個ForkJoinPool線程異步執行新創建的子任務
        ForkJoinSumCalculator rightTask =
            new ForkJoinSumCalculator(numbers, start + length/2, end);    ←─創建一個任務為數組的後一半求和
        Long rightResult = rightTask.compute;    ←─同步執行第二個子任務,有可能允許進一步遞歸劃分
        Long leftResult = leftTask.join;    ←─讀取第一個子任務的結果,如果尚未完成就等待
        return leftResult + rightResult;    ←─該任務的結果是兩個子任務結果的組合
    }

    private long computeSequentially {    ←─在子任務不再可分時計算結果的簡單算法
        long sum = 0;
        for (int i = start; i < end; i++) {{
            sum += numbers[i];
        }
        return sum;
    }
}

  

現在編寫一個方法來並行對前 n 個自然數求和就很簡單了。你只需把想要的數字數組傳給ForkJoinSumCalculator的構造函數:

public static long forkJoinSum(long n) {
    long numbers = LongStream.rangeClosed(1, n).toArray;
    ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
    return new ForkJoinPool.invoke(task);
}

  

這裡用了一個LongStream來生成包含前 n 個自然數的數組,然後創建一個ForkJoinTaskRecursiveTask的父類),並把數組傳遞給代碼清單7-2所示ForkJoinSumCalculator的公共構造函數。最後,你創建了一個新的ForkJoinPool,並把任務傳給它的調用方法 。在ForkJoinPool中執行時,最後一個方法返回的值就是ForkJoinSumCalculator類定義的任務結果。

請注意在實際應用時,使用多個ForkJoinPool是沒有什麼意義的。正是出於這個原因,一般來說把它實例化一次,然後把實例保存在靜態字段中,使之成為單例,這樣就可以在軟件中任何部分方便地重用了。這裡創建時用了其默認的無參數構造函數,這意味著想讓線程池使用JVM能夠使用的所有處理器。更確切地說,該構造函數將使用Runtime.availableProcessors的返回值來決定線程池使用的線程數。請注意availableProcessors方法雖然看起來是處理器,但它實際上返回的是可用內核的數量,包括超線程生成的虛擬內核。

運行ForkJoinSumCalculator

當把ForkJoinSumCalculator任務傳給ForkJoinPool時,這個任務就由池中的一個線程執行,這個線程會調用任務的compute方法。該方法會檢查任務是否小到足以順序執行,如果不夠小則會把要求和的數組分成兩半,分給兩個新的ForkJoinSumCalculator,而它們也由ForkJoinPool安排執行。因此,這一過程可以遞歸重複,把原任務分為更小的任務,直到滿足不方便或不可能再進一步拆分的條件(本例中是求和的項目數小於等於10 000)。這時會順序計算每個任務的結果,然後由分支過程創建的(隱含的)任務二叉樹遍歷回到它的根。接下來會合併每個子任務的部分結果,從而得到總任務的結果。這一過程如圖7-4所示。

圖 7-4 分支/合併算法

你可以再用一次本章開始時寫的測試框架,來看看顯式使用分支/合併框架的求和方法的性能:

System.out.println("ForkJoin sum done in: " + measureSumPerf(
        ForkJoinSumCalculator::forkJoinSum, 10_000_000) + " msecs" );

  

它生成以下輸出:

ForkJoin sum done in: 41 msecs

  

這個性能看起來比用並行流的版本要差,但這只是因為必須先要把整個數字流都放進一個long,之後才能在ForkJoinSumCalculator任務中使用它。

7.2.2 使用分支/合併框架的最佳做法

雖然分支/合併框架還算簡單易用,不幸的是它也很容易被誤用。以下是幾個有效使用它的最佳做法。

  • 對一個任務調用join方法會阻塞調用方,直到該任務做出結果。因此,有必要在兩個子任務的計算都開始之後再調用它。否則,你得到的版本會比原始的順序算法更慢更複雜,因為每個子任務都必須等待另一個子任務完成才能啟動。

  • 不應該在RecursiveTask內部使用 ForkJoinPoolinvoke方法。相反,你應該始終直接調用computefork方法,只有順序代碼才應該用invoke來啟動並行計算。

  • 對子任務調用fork方法可以把它排進ForkJoinPool。同時對左邊和右邊的子任務調用它似乎很自然,但這樣做的效率要比直接對其中一個調用compute低。這樣做你可以為其中一個子任務重用同一線程,從而避免在線程池中多分配一個任務造成的開銷。

  • 調試使用分支/合併框架的並行計算可能有點棘手。特別是你平常都在你喜歡的IDE裡面看棧跟蹤(stack trace)來找問題,但放在分支-合併計算上就不行了,因為調用compute的線程並不是概念上的調用方,後者是調用fork的那個。

  • 和並行流一樣,你不應理所當然地認為在多核處理器上使用分支/合併框架就比順序計算快。我們已經說過,一個任務可以分解成多個獨立的子任務,才能讓性能在並行化時有所提升。所有這些子任務的運行時間都應該比分出新任務所花的時間長;一個慣用方法是把輸入/輸出放在一個子任務裡,計算放在另一個裡,這樣計算就可以和輸入/輸出同時進行。此外,在比較同一算法的順序和並行版本的性能時還有別的因素要考慮。就像任何其他Java代碼一樣,分支/合併框架需要“預熱”或者說要執行幾遍才會被JIT編譯器優化。這就是為什麼在測量性能之前跑幾遍程序很重要,我們的測試框架就是這麼做的。同時還要知道,編譯器內置的優化可能會為順序版本帶來一些優勢(例如執行死碼分析——刪去從未被使用的計算)。

對於分支/合併拆分策略還有最後一點補充:你必須選擇一個標準,來決定任務是要進一步拆分還是已小到可以順序求值。我們會在下一節中就此給出一些提示。

7.2.3 工作竊取

ForkJoinSumCalculator的例子中,我們決定在要求和的數組中最多包含10 000個項目時就不再創建子任務了。這個選擇是很隨意的,但大多數情況下也很難找到一個好的啟髮式方法來確定它,只能試幾個不同的值來嘗試優化它。在我們的測試案例中,我們先用了一個有1000萬項目的數組,意味著ForkJoinSumCalculator至少會分出1000個子任務來。這似乎有點浪費資源,因為我們用來運行它的機器上只有四個內核。在這個特定例子中可能確實是這樣,因為所有的任務都受CPU約束,預計所花的時間也差不多。

但分出大量的小任務一般來說都是一個好的選擇。這是因為,理想情況下,劃分並行任務時,應該讓每個任務都用完全相同的時間完成,讓所有的CPU內核都同樣繁忙。不幸的是,實際中,每個子任務所花的時間可能天差地別,要麼是因為劃分策略效率低,要麼是有不可預知的原因,比如磁盤訪問慢,或是需要和外部服務協調執行。

分支/合併框架工程用一種稱為工作竊取(work stealing)的技術來解決這個問題。在實際應用中,這意味著這些任務差不多被平均分配到ForkJoinPool中的所有線程上。每個線程都為分配給它的任務保存一個雙向鏈式隊列,每完成一個任務,就會從隊列頭上取出下一個任務開始執行。基於前面所述的原因,某個線程可能早早完成了分配給它的所有任務,也就是它的隊列已經空了,而其他的線程還很忙。這時,這個線程並沒有閒下來,而是隨機選了一個別的線程,從隊列的尾巴上“偷走”一個任務。這個過程一直繼續下去,直到所有的任務都執行完畢,所有的隊列都清空。這就是為什麼要劃成許多小任務而不是少數幾個大任務,這有助於更好地在工作線程之間平衡負載。

一般來說,這種工作竊取算法用於在池中的工作線程之間重新分配和平衡任務。圖7-5展示了這個過程。當工作線程隊列中有一個任務被分成兩個子任務時,一個子任務就被閒置的工作線程“偷走”了。如前所述,這個過程可以不斷遞歸,直到規定子任務應順序執行的條件為真。

圖 7-5 分支/合併框架使用的工作竊取算法

現在你應該清楚流如何使用分支/合併框架來並行處理它的項目了,不過還有一點沒有講。本節中我們分析了一個例子,你明確地指定了將數字數組拆分成多個任務的邏輯。但是,使用本章前面講的並行流時就用不著這麼做了,這就意味著,肯定有一種自動機制來為你拆分流。這種新的自動機制稱為Spliterator,我們會在下一節中討論。

7.3 Spliterator

Spliterator是Java 8中加入的另一個新接口;這個名字代表“可分迭代器”(splitable iterator)。和Iterator一樣,Spliterator也用於遍歷數據源中的元素,但它是為了並行執行而設計的。雖然在實踐中可能用不著自己開發Spliterator,但瞭解一下它的實現方式會讓你對並行流的工作原理有更深入的瞭解。Java 8已經為集合框架中包含的所有數據結構提供了一個默認的Spliterator實現。集合實現了Spliterator接口,接口提供了一個spliterator方法。這個接口定義了若干方法,如下面的代碼清單所示。

代碼清單7-3 Spliterator接口

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);
    Spliterator<T> trySplit;
    long estimateSize;
    int characteristics;
}

  

與往常一樣,TSpliterator遍歷的元素的類型。tryAdvance方法的行為類似於普通的Iterator,因為它會按順序一個一個使用Spliterator中的元素,並且如果還有其他元素要遍歷就返回true。但trySplit是專為Spliterator接口設計的,因為它可以把一些元素劃出去分給第二個Spliterator(由該方法返回),讓它們兩個並行處理。Spliterator還可通過estimateSize方法估計還剩下多少元素要遍歷,因為即使不那麼確切,能快速算出來是一個值也有助於讓拆分均勻一點。

重要的是,要瞭解這個拆分過程在內部是如何執行的,以便在需要時能夠掌控它。因此,我們會在下一節中詳細地分析它。

7.3.1 拆分過程

Stream拆分成多個部分的算法是一個遞歸過程,如圖7-6所示。第一步是對第一個Spliterator調用trySplit,生成第二個Spliterator。第二步對這兩個Spliterator調用trysplit,這樣總共就有了四個Spliterator。這個框架不斷對Spliterator調用trySplit直到它返回null,表明它處理的數據結構不能再分割,如第三步所示。最後,這個遞歸拆分過程到第四步就終止了,這時所有的Spliterator在調用trySplit時都返回了null

圖 7-6 遞歸拆分過程

這個拆分過程也受Spliterator本身的特性影響,而特性是通過characteristics方法聲明的。

Spliterator的特性

Spliterator接口聲明的最後一個抽像方法是characteristics,它將返回一個int,代表Spliterator本身特性集的編碼。使用Spliterator的客戶可以用這些特性來更好地控制和優化它的使用。表7-2總結了這些特性。(不幸的是,雖然它們在概念上與收集器的特性有重疊,編碼卻不一樣。)

表7-2 Spliterator的特性

特性

含義

ORDERED

元素有既定的順序(例如List),因此Spliterator在遍歷和劃分時也會遵循這一順序

DISTINCT

對於任意一對遍歷過的元素xyx.equals(y)返回false

SORTED

遍歷的元素按照一個預定義的順序排序

SIZED

Spliterator由一個已知大小的源建立(例如Set),因此estimatedSize返回的是準確值

NONNULL

保證遍歷的元素不會為null

IMMUTABLE

Spliterator的數據源不能修改。這意味著在遍歷時不能添加、刪除或修改任何元素

CONCURRENT

Spliterator的數據源可以被其他線程同時修改而無需同步

SUBSIZED

Spliterator和所有從它拆分出來的Spliterator都是SIZED

現在你已經看到了Spliterator接口是什麼以及它定義了哪些方法,你可以試著自己實現一個Spliterator了。

7.3.2 實現你自己的Spliterator

讓我們來看一個可能需要你自己實現Spliterator的實際例子。我們要開發一個簡單的方法來數數一個String中的單詞數。這個方法的一個迭代版本可以寫成下面的樣子。

代碼清單7-4 一個迭代式字數統計方法

public int countWordsIteratively(String s) {
    int counter = 0;
    boolean lastSpace = true;
    for (char c : s.toCharArray) {    ←─逐個遍歷String中的所有字符
        if (Character.isWhitespace(c)) {
            lastSpace = true;
        } else {
            if (lastSpace) counter++;    ←─上一個字符是空格,而當前遍歷的字符不是空格時,將單詞計數器加一
            lastSpace = false;
        }
    }
    return counter;
}

  

讓我們把這個方法用在但丁的《神曲》的《地獄篇》的第一句話上:1

1請參閱http://en.wikipedia.org/wiki/Inferno_(Dante)。

final String SENTENCE =
            " Nel   mezzo del cammin  di nostra  vita " +
            "mi  ritrovai in una  selva oscura" +
            " ché la  dritta via era   smarrita ";

System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");

  

請注意,我們在句子裡添加了一些額外的隨機空格,以演示這個迭代實現即使在兩個詞之間存在多個空格時也能正常工作。正如我們所料,這段代碼將打印以下內容:

Found 19 words

  

理想情況下,你會想要用更為函數式的風格來實現它,因為就像我們前面說過的,這樣你就可以用並行Stream來並行化這個過程,而無需顯式地處理線程和同步問題。

1. 以函數式風格重寫單詞計數器

首先你需要把String轉換成一個流。不幸的是,原始類型的流僅限於intlongdouble,所以你只能用Stream<Character>

Stream<Character> stream = IntStream.range(0, SENTENCE.length)
                                    .mapToObj(SENTENCE::charAt);

  

你可以對這個流做歸約來計算字數。在歸約流時,你得保留由兩個變量組成的狀態:一個int用來計算到目前為止數過的字數,還有一個boolean用來記得上一個遇到的Character是不是空格。因為Java沒有元組(tuple,用來表示由異類元素組成的有序列表的結構,不需要包裝對像),所以你必須創建一個新類WordCounter來把這個狀態封裝起來,如下所示。

代碼清單7-5 用來在遍歷Character流時計數的類

class WordCounter {
    private final int counter;
    private final boolean lastSpace;
    public WordCounter(int counter, boolean lastSpace) {
        this.counter = counter;
        this.lastSpace = lastSpace;
    }

    public WordCounter accumulate(Character c) {    ←─和迭代算法一樣,accumulate 方法一個個遍歷Character
        if (Character.isWhitespace(c)) {
            return lastSpace ?
                   this :
                   new WordCounter(counter, true);    ←─上一個字符是空格,而當前遍歷的字符不是空格時,將單詞計數器加一
        } else {
            return lastSpace ?
                   new WordCounter(counter + 1, false) :
                   this;
        }
    }

    public WordCounter combine(WordCounter wordCounter) {    ←─合併兩個Word-Counter,把其計數器加起來
        return new WordCounter(counter + wordCounter.counter,
                               wordCounter.lastSpace);    ←─僅需要計數器的總和,無需關心lastSpace
    }

    public int getCounter {
        return counter;
    }
}

  

在這個列表中,accumulate方法定義了如何更改WordCounter的狀態,或更確切地說是用哪個狀態來建立新的WordCounter,因為這個類是不可變的。每次遍歷到Stream中的一個新的Character時,就會調用accumulate方法。具體來說,就像代碼清單7-4中的countWordsIteratively方法一樣,當上一個字符是空格,新字符不是空格時,計數器就加一。圖7-7展示了accumulate方法遍歷到新的Character時,WordCounter的狀態轉換。

調用第二個方法combine時,會對作用於Character流的兩個不同子部分的兩個WordCounter 的部分結果進行匯總,也就是把兩個WordCounter內部的計數器加起來。

圖 7-7 遍歷到新的Character cWordCounter的狀態轉換

現在你已經寫好了在WordCounter中累計字符,以及在WordCounter中把它們結合起來的邏輯,那寫一個方法來歸約Character流就很簡單了:

private int countWords(Stream<Character> stream) {
    WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
                                            WordCounter::accumulate,
                                            WordCounter::combine);
    return wordCounter.getCounter;
}

  

現在你就可以試一試這個方法,給它由包含但丁的《神曲》中《地獄篇》第一句的String創建的流:

Stream<Character> stream = IntStream.range(0, SENTENCE.length)
                                    .mapToObj(SENTENCE::charAt);
System.out.println("Found " + countWords(stream) + " words");

  

你可以和迭代版本比較一下輸出:

Found 19 words

  

到現在為止都很好,但我們以函數式實現WordCounter的主要原因之一就是能輕鬆地並行處理,讓我們來看看具體是如何實現的。

2. 讓WordCounter並行工作

你可以嘗試用並行流來加快字數統計,如下所示:

System.out.println("Found " + countWords(stream.parallel) + " words");

  

不幸的是,這次的輸出是:

Found 25 words

  

顯然有什麼不對,可到底是哪裡不對呢?問題的根源並不難找。因為原始的String在任意位置拆分,所以有時一個詞會被分為兩個詞,然後數了兩次。這就說明,拆分流會影響結果,而把順序流換成並行流就可能使結果出錯。

如何解決這個問題呢?解決方案就是要確保String不是在隨機位置拆開的,而只能在詞尾拆開。要做到這一點,你必須為Character實現一個Spliterator,它只能在兩個詞之間拆開String(如下所示),然後由此創建並行流。

代碼清單7-6 WordCounterSpliterator

class WordCounterSpliterator implements Spliterator<Character> {

    private final String string;
    private int currentChar = 0;

    public WordCounterSpliterator(String string) {
        this.string = string;
    }

    @Override
    public boolean tryAdvance(Consumer<? super Character> action) {
        action.accept(string.charAt(currentChar++));    ←─處理當前字符
        return currentChar < string.length;    ←─如果還有字符要處理,則返回true
    }

    @Override
    public Spliterator<Character> trySplit {
        int currentSize = string.length - currentChar;
        if (currentSize < 10) {
            return null;                ←─返回null表示要解析的String已經足夠小,可以順序處理
        }
        for (int splitPos = currentSize / 2 + currentChar;
                 splitPos < string.length; splitPos++) {    ←─將試探拆分位置設定為要解析的String的中間
            if (Character.isWhitespace(string.charAt(splitPos))) {    ←─讓拆分位置前進直到下一個空格
                Spliterator<Character> spliterator =    ←─創建一個新WordCounter-Spliterator來解析String從開始到拆分位置的部分
                    new WordCounterSpliterator(string.substring(currentChar,
                                                                splitPos));
                currentChar = splitPos;    ←─將這個WordCounter-Spliterator 的起始位置設為拆分位置
                return spliterator;
            }
        }
        return null;
    }

    @Override
    public long estimateSize {
        return string.length - currentChar;
    }

    @Override
    public int characteristics {
        return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
    }
}

  

這個Spliterator由要解析的String創建,並遍歷了其中的Character,同時保存了當前正在遍歷的字符位置。讓我們快速回顧一下實現了Spliterator接口的WordCounterSpliterator中的各個函數。

  • tryAdvance方法把String中當前位置的Character傳給了Consumer,並讓位置加一。作為參數傳遞的Consumer是一個Java內部類,在遍歷流時將要處理的Character傳給了一系列要對其執行的函數。這裡只有一個歸約函數,即WordCounter類的accumulate方法。如果新的指針位置小於String的總長,且還有要遍歷的Character,則tryAdvance返回true

  • trySplit方法是Spliterator中最重要的一個方法,因為它定義了拆分要遍歷的數據結構的邏輯。就像在代碼清單7-1中實現的RecursiveTaskcompute方法一樣(分支/合併框架的使用方式),首先要設定不再進一步拆分的下限。這裡用了一個非常低的下限——10個Character,僅僅是為了保證程序會對那個比較短的String做幾次拆分。在實際應用中,就像分支/合併的例子那樣,你肯定要用更高的下限來避免生成太多的任務。如果剩餘的Character數量低於下限,你就返回null表示無需進一步拆分。相反,如果你需要執行拆分,就把試探的拆分位置設在要解析的String塊的中間。但我們沒有直接使用這個拆分位置,因為要避免把詞在中間斷開,於是就往前找,直到找到一個空格。一旦找到了適當的拆分位置,就可以創建一個新的Spliterator來遍歷從當前位置到拆分位置的子串;把當前位置this設為拆分位置,因為之前的部分將由新Spliterator來處理,最後返回。

  • 還需要遍歷的元素的estimatedSize就是這個Spliterator解析的String的總長度和當前遍歷的位置的差。

  • 最後,characteristic方法告訴框架這個SpliteratorORDERED(順序就是String中各個Character的次序)、SIZEDestimatedSize方法的返回值是精確的)、SUBSIZEDtrySplit方法創建的其他Spliterator也有確切大小)、NONNULLString中不能有為nullCharacter)和IMMUTABLE(在解析String時不能再添加Character,因為String本身是一個不可變類)的。

3. 運用WordCounterSpliterator

現在就可以用這個新的WordCounterSpliterator來處理並行流了,如下所示:

Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
Stream<Character> stream = StreamSupport.stream(spliterator, true);

  

傳給StreamSupport.stream工廠方法的第二個布爾參數意味著你想創建一個並行流。把這個並行流傳給countWords方法:

System.out.println("Found " + countWords(stream) + " words");

  

可以得到意料之中的正確輸出:

Found 19 words

  

你已經看到了Spliterator如何讓你控制拆分數據結構的策略。Spliterator還有最後一個值得注意的功能,就是可以在第一次遍歷、第一次拆分或第一次查詢估計大小時綁定元素的數據源,而不是在創建時就綁定。這種情況下,它稱為延遲綁定(late-binding)的Spliterator。我們專門用附錄C來展示如何開發一個工具類來利用這個功能在同一個流上執行多個操作。

7.4 小結

在本章中,你瞭解了以下內容。

  • 內部迭代讓你可以並行處理一個流,而無需在代碼中顯式使用和協調不同的線程。

  • 雖然並行處理一個流很容易,卻不能保證程序在所有情況下都運行得更快。並行軟件的行為和性能有時是違反直覺的,因此一定要測量,確保你並沒有把程序拖得更慢。

  • 像並行流那樣對一個數據集並行執行操作可以提升性能,特別是要處理的元素數量龐大,或處理單個元素特別耗時的時候。

  • 從性能角度來看,使用正確的數據結構,如盡可能利用原始流而不是一般化的流,幾乎總是比嘗試並行化某些操作更為重要。

  • 分支/合併框架讓你得以用遞歸方式將可以並行的任務拆分成更小的任務,在不同的線程上執行,然後將各個子任務的結果合併起來生成整體結果。

  • Spliterator定義了並行流如何拆分它要遍歷的數據。