讀古今文學網 > Java 8實戰 > 第11章 CompletableFuture:組合式異步編程 >

第11章 CompletableFuture:組合式異步編程

本章內容

  • 創建異步計算,並獲取計算結果

  • 使用非阻塞操作提升吞吐量

  • 設計和實現異步API

  • 如何以異步的方式使用同步的API

  • 如何對兩個或多個異步操作進行流水線和合併操作

  • 如何處理異步操作的完成狀態

最近這些年,兩種趨勢不斷地推動我們反思我們設計軟件的方式。第一種趨勢和應用運行的硬件平台相關,第二種趨勢與應用程序的架構相關,尤其是它們之間如何交互。我們在第7章中已經討論過硬件平台的影響。我們注意到隨著多核處理器的出現,提升應用程序處理速度最有效的方式是編寫能充分發揮多核能力的軟件。你已經看到通過切分大型的任務,讓每個子任務並行運行,這一目標是能夠實現的;你也已經瞭解相對直接使用線程的方式,使用分支/合併框架(在Java 7中引入)和並行流(在Java 8中新引入)能以更簡單、更有效的方式實現這一目標。

第二種趨勢反映在公共API日益增長的互聯網服務應用。著名的互聯網大鱷們紛紛提供了自己的公共API服務,比如谷歌提供了地理信息服務,Facebook提供了社交信息服務,Twitter提供了新聞服務。現在,很少有網站或者網絡應用會以完全隔離的方式工作。更多的時候,我們看到的下一代網絡應用都採用“混聚”(mash-up)的方式:它會使用來自多個來源的內容,將這些內容聚合在一起,方便用戶的生活。

比如,你可能希望為你的法國客戶提供指定主題的熱點報道。為實現這一功能,你需要向谷歌或者Twitter的API請求所有語言中針對該主題最熱門的評論,可能還需要依據你的內部算法對它們的相關性進行排序。之後,你可能還需要使用谷歌的翻譯服務把它們翻譯成法語,甚至利用谷歌地圖服務定位出評論作者的位置信息,最終將所有這些信息聚集起來,呈現在你的網站上。

當然,如果某些外部網絡服務發生響應慢的情況,你希望依舊能為用戶提供部分信息,比如提供帶問號標記的通用地圖,以文本的方式顯示信息,而不是呆呆地顯示一片空白屏幕,直到地圖服務器返回結果或者超時退出。圖11-1解釋了這種典型的“混聚”應用如何與所需的遠程服務交互。

圖 11-1 典型的“混聚”式應用

要實現類似的服務,你需要與互聯網上的多個Web服務通信。可是,你並不希望因為等待某些服務的響應,阻塞應用程序的運行,浪費數十億寶貴的CPU時鐘週期。比如,不要因為等待Facebook的數據,暫停對來自Twitter的數據處理。

這些場景體現了多任務程序設計的另一面。第7章中介紹的分支/合併框架以及並行流是實現並行處理的寶貴工具;它們將一個操作切分為多個子操作,在多個不同的核、CPU甚至是機器上並行地執行這些子操作。

與此相反,如果你的意圖是實現並發,而非並行,或者你的主要目標是在同一個CPU上執行幾個松耦合的任務,充分利用CPU的核,讓其足夠忙碌,從而最大化程序的吞吐量,那麼你其實真正想做的是避免因為等待遠程服務的返回,或者對數據庫的查詢,而阻塞線程的執行,浪費寶貴的計算資源,因為這種等待的時間很可能相當長。通過本章中你會瞭解,Future接口,尤其是它的新版實現CompletableFuture,是處理這種情況的利器。圖11-2說明了並行和並發的區別。

圖 11-2 並發和並行

11.1 Future接口

Future接口在Java 5中被引入,設計初衷是對將來某個時刻會發生的結果進行建模。它建模了一種異步計算,返回一個執行運算結果的引用,當運算結束後,這個引用被返回給調用方。在Future中觸發那些潛在耗時的操作把調用線程解放出來,讓它能繼續執行其他有價值的工作,不再需要呆呆等待耗時的操作完成。打個比方,你可以把它想像成這樣的場景:你拿了一袋子衣服到你中意的乾洗店去洗。乾洗店的員工會給你張發票,告訴你什麼時候你的衣服會洗好(這就是一個Future事件)。衣服乾洗的同時,你可以去做其他的事情。Future的另一個優點是它比更底層的Thread更易用。要使用Future,通常你只需要將耗時的操作封裝在一個Callable對像中,再將它提交給ExecutorService,就萬事大吉了。下面這段代碼展示了Java 8之前使用Future的一個例子。

代碼清單11-1 使用Future以異步的方式執行一個耗時的操作

ExecutorService executor = Executors.newCachedThreadPool;    ←─創建Executor-Service,通過它你可以向線程池提交任務
Future<Double> future = executor.submit(new Callable<Double> {    ←─向Executor-Service提交一個Callable對像
        public Double call {
            return doSomeLongComputation;    ←─以異步方式在新的線程中執行耗時的操作
        }});
doSomethingElse;    ←─異步操作進行的同時,你可以做其他的事情

try {
    Double result = future.get(1, TimeUnit.SECONDS);    ←─獲取異步操作的結果,如果最終被阻塞,無法得到結果,那麼在最多等待1秒鐘之後退出
} catch (ExecutionException ee) {
    // 計算拋出一個異常
} catch (InterruptedException ie) {
    // 當前線程在等待過程中被中斷
} catch (TimeoutException te) {
    // 在Future對像完成之前超過已過期
}

  

正像圖11-3介紹的那樣,這種編程方式讓你的線程可以在ExecutorService以並發方式調用另一個線程執行耗時操作的同時,去執行一些其他的任務。接著,如果你已經運行到沒有異步操作的結果就無法繼續任何有意義的工作時,可以調用它的get方法去獲取操作的結果。如果操作已經完成,該方法會立刻返回操作的結果,否則它會阻塞你的線程,直到操作完成,返回相應的結果。

你能想像這種場景存在怎樣的問題嗎?如果該長時間運行的操作永遠不返回了會怎樣?為了處理這種可能性,雖然Future提供了一個無需任何參數的get方法,我們還是推薦大家使用重載版本的get方法,它接受一個超時的參數,通過它,你可以定義你的線程等待Future結果的最長時間,而不是像代碼清單11-1中那樣永無止境地等待下去。

圖 11-3 使用Future以異步方式執行長時間的操作

11.1.1 Future接口的局限性

通過第一個例子,我們知道Future接口提供了方法來檢測異步計算是否已經結束(使用isDone方法),等待異步操作結束,以及獲取計算的結果。但是這些特性還不足以讓你編寫簡潔的並發代碼。比如,我們很難表述Future結果之間的依賴性;從文字描述上這很簡單,“當長時間計算任務完成時,請將該計算的結果通知到另一個長時間運行的計算任務,這兩個計算任務都完成後,將計算的結果與另一個查詢操作結果合併”。但是,使用Future中提供的方法完成這樣的操作又是另外一回事。這也是我們需要更具描述能力的特性的原因,比如下面這些。

  • 將兩個異步計算合併為一個——這兩個異步計算之間相互獨立,同時第二個又依賴於第一個的結果。

  • 等待Future集合中的所有任務都完成。

  • 僅等待Future集合中最快結束的任務完成(有可能因為它們試圖通過不同的方式計算同一個值),並返回它的結果。

  • 通過編程方式完成一個Future任務的執行(即以手工設定異步操作結果的方式)。

  • 應對Future的完成事件(即當Future的完成事件發生時會收到通知,並能使用Future計算的結果進行下一步的操作,不只是簡單地阻塞等待操作的結果)。

這一章中,你會瞭解新的CompletableFuture類(它實現了Future接口)如何利用Java 8的新特性以更直觀的方式將上述需求都變為可能。StreamCompletableFuture的設計都遵循了類似的模式:它們都使用了Lambda表達式以及流水線的思想。從這個角度,你可以說CompletableFutureFuture的關係就跟StreamCollection的關係一樣。

11.1.2 使用CompletableFuture構建異步應用

為了展示CompletableFuture的強大特性,我們會創建一個名為“最佳價格查詢器”(best-price-finder)的應用,它會查詢多個在線商店,依據給定的產品或服務找出最低的價格。這個過程中,你會學到幾個重要的技能。

  • 首先,你會學到如何為你的客戶提供異步API(如果你擁有一間在線商店的話,這是非常有幫助的)。

  • 其次,你會掌握如何讓你使用了同步API的代碼變為非阻塞代碼。你會瞭解如何使用流水線將兩個接續的異步操作合併為一個異步計算操作。這種情況肯定會出現,比如,在線商店返回了你想要購買商品的原始價格,並附帶著一個折扣代碼——最終,要計算出該商品的實際價格,你不得不訪問第二個遠程折扣服務,查詢該折扣代碼對應的折扣比率。

  • 你還會學到如何以響應式的方式處理異步操作的完成事件,以及隨著各個商店返回它的商品價格,最佳價格查詢器如何持續地更新每種商品的最佳推薦,而不是等待所有的商店都返回他們各自的價格(這種方式存在著一定的風險,一旦某家商店的服務中斷,用戶可能遭遇白屏)。

同步API與異步API

同步API其實只是對傳統方法調用的另一種稱呼:你調用了某個方法,調用方在被調用方運行的過程中會等待,被調用方運行結束返回,調用方取得被調用方的返回值並繼續運行。即使調用方和被調用方在不同的線程中運行,調用方還是需要等待被調用方結束運行,這就是阻塞式調用這個名詞的由來。

與此相反,異步API會直接返回,或者至少在被調用方計算完成之前,將它剩餘的計算任務交給另一個線程去做,該線程和調用方是異步的——這就是非阻塞式調用的由來。執行剩餘計算任務的線程會將它的計算結果返回給調用方。返回的方式要麼是通過回調函數,要麼是由調用方再次執行一個“等待,直到計算完成”的方法調用。這種方式的計算在I/O系統程序設計中非常常見:你發起了一次磁盤訪問,這次訪問和你的其他計算操作是異步的,你完成其他的任務時,磁盤塊的數據可能還沒載入到內存,你只需要等待數據的載入完成。

11.2 實現異步API

為了實現最佳價格查詢器應用,讓我們從每個商店都應該提供的API定義入手。首先,商店應該聲明依據指定產品名稱返回價格的方法:

public class Shop {
    public double getPrice(String product) {
        // 待實現
    }
}

  

該方法的內部實現會查詢商店的數據庫,但也有可能執行一些其他耗時的任務,比如聯繫其他外部服務(比如,商店的供應商,或者跟製造商相關的推廣折扣)。我們在本章剩下的內容中,採用delay方法模擬這些長期運行的方法的執行,它會人為地引入1秒鐘的延遲,方法聲明如下。

代碼清單11-2 模擬1秒鐘延遲的方法

public static void delay {
    try {
        Thread.sleep(1000L);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

  

為了介紹本章的內容,getPrice方法會調用delay方法,並返回一個隨機計算的值,代碼清單如下所示。返回隨機計算的價格這段代碼看起來有些取巧。它使用charAt,依據產品的名稱,生成一個隨機值作為價格。

代碼清單11-3 在getPrice方法中引入一個模擬的延遲

public double getPrice(String product) {
    return calculatePrice(product);
}
private double calculatePrice(String product) {
    delay;
    return random.nextDouble * product.charAt(0) + product.charAt(1);
}

  

很明顯,這個API的使用者(這個例子中為最佳價格查詢器)調用該方法時,它依舊會被阻塞。為等待同步事件完成而等待1秒鐘,這是無法接受的,尤其是考慮到最佳價格查詢器對網絡中的所有商店都要重複這種操作。本章接下來的小節中,你會瞭解如何以異步方式使用同步API解決這個問題。但是,出於學習如何設計異步API的考慮,我們會繼續這一節的內容,假裝我們還在深受這一困難的煩擾:你是一個睿智的商店店主,你已經意識到了這種同步API會為你的用戶帶來多麼痛苦的體驗,你希望以異步API的方式重寫這段代碼,讓用戶更流暢地訪問你的網站。

11.2.1 將同步方法轉換為異步方法

為了實現這個目標,你首先需要將 getPrice轉換為getPriceAsync方法,並修改它的返回值:

public Future<Double> getPriceAsync(String product) { ... }

  

我們在本章開頭已經提到,Java 5引入了java.util.concurrent.Future接口表示一個異步計算(即調用線程可以繼續運行,不會因為調用方法而阻塞)的結果。這意味著Future是一個暫時還不可知值的處理器,這個值在計算完成後,可以通過調用它的get方法取得。因為這樣的設計,getPriceAsync方法才能立刻返回,給調用線程一個機會,能在同一時間去執行其他有價值的計算任務。新的CompletableFuture類提供了大量的方法,讓我們有機會以多種可能的方式輕鬆地實現這個方法,比如下面就是這樣一段實現代碼。

代碼清單11-4 getPriceAsync方法的實現

public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>;
    new Thread(  -> {                  ←─創建CompletableFuture對象,它會包含計算的結果
                double price = calculatePrice(product);    ←─在另一個線程中以異步方式執行計算
                futurePrice.complete(price);    ←─需長時間計算的任務結束並得出結果時,設置Future的返回值
    }).start;
    return futurePrice;    ←─無需等待還沒結束的計算,直接返回Future對像
}

  

在這段代碼中,你創建了一個代表異步計算的CompletableFuture對像實例,它在計算完成時會包含計算的結果。接著,你調用fork創建了另一個線程去執行實際的價格計算工作,不等該耗時計算任務結束,直接返回一個Future實例。當請求的產品價格最終計算得出時,你可以使用它的complete方法,結束completableFuture對象的運行,並設置變量的值。很顯然,這個新版Future的名稱也解釋了它所具有的特性。使用這個API的客戶端,可以通過下面的這段代碼對其進行調用。

代碼清單11-5 使用異步API

Shop shop = new Shop("BestShop");
long start = System.nanoTime;
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");    ←─查詢商店,試圖取得商品的價格
long invocationTime = ((System.nanoTime - start) / 1_000_000);
System.out.println("Invocation returned after " + invocationTime
                                                + " msecs");
// 執行更多任務,比如查詢其他商店
doSomethingElse;
// 在計算商品價格的同時
try {
    double price = futurePrice.get;    ←─從Future對像中讀取價格,如果價格未知,會發生阻塞
    System.out.printf("Price is %.2f%n", price);
} catch (Exception e) {
    throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");

  

我們看到這段代碼中,客戶向商店查詢了某種商品的價格。由於商店提供了異步API,該次調用立刻返回了一個Future對象,通過該對像客戶可以在將來的某個時刻取得商品的價格。這種方式下,客戶在進行商品價格查詢的同時,還能執行一些其他的任務,比如查詢其他家商店中商品的價格,不會呆呆地阻塞在那裡等待第一家商店返回請求的結果。最後,如果所有有意義的工作都已經完成,客戶所有要執行的工作都依賴於商品價格時,再調用Futureget方法。執行了這個操作後,客戶要麼獲得Future中封裝的值(如果異步任務已經完成),要麼發生阻塞,直到該異步任務完成,期望的值能夠訪問。代碼清單11-5產生的輸出可能是下面這樣:

Invocation returned after 43 msecs
Price is 123.26
Price returned after 1045 msecs

  

你一定已經發現getPriceAsync方法的調用返回遠遠早於最終價格計算完成的時間。在11.4節中,你還會知道我們有可能避免發生客戶端被阻塞的風險。實際上這非常簡單,Future執行完畢可以發送一個通知,僅在計算結果可用時執行一個由Lambda表達式或者方法引用定義的回調函數。不過,我們當下不會對此進行討論,現在我們要解決的是另一個問題:如何正確地管理異步任務執行過程中可能出現的錯誤。

11.2.2 錯誤處理

如果沒有意外,我們目前開發的代碼工作得很正常。但是,如果價格計算過程中產生了錯誤會怎樣呢?非常不幸,這種情況下你會得到一個相當糟糕的結果:用於提示錯誤的異常會被限制在試圖計算商品價格的當前線程的範圍內,最終會殺死該線程,而這會導致等待get方法返回結果的客戶端永久地被阻塞。

客戶端可以使用重載版本的get方法,它使用一個超時參數來避免發生這樣的情況。這是一種值得推薦的做法,你應該盡量在你的代碼中添加超時判斷的邏輯,避免發生類似的問題。使用這種方法至少能防止程序永久地等待下去,超時發生時,程序會得到通知發生了TimeoutException。不過,也因為如此,你不會有機會發現計算商品價格的線程內到底發生了什麼問題才引發了這樣的失效。為了讓客戶端能瞭解商店無法提供請求商品價格的原因,你需要使用CompletableFuturecompleteExceptionally方法將導致CompletableFuture內發生問題的異常拋出。對代碼清單11-4優化後的結果如下所示。

代碼清單11-6 拋出CompletableFuture內的異常

public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>;
    new Thread(  -> {
                try {
                    double price = calculatePrice(product);
                    futurePrice.complete(price);    ←─如果價格計算正常結束,完成Future操作並設置商品價格
                } catch (Exception ex) {
                    futurePrice.completeExceptionally(ex);    ←─否則就拋出導致失敗的異常,完成這次Future操作
                }
    }).start;
    return futurePrice;
}

  

客戶端現在會收到一個ExecutionException異常,該異常接收了一個包含失敗原因的Exception參數,即價格計算方法最初拋出的異常。所以,舉例來說,如果該方法拋出了一個運行時異常“product not available”,客戶端就會得到像下面這樣一段ExecutionException

java.util.concurrent.ExecutionException: java.lang.RuntimeException: product
     not available
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237)
    at lambdasinaction.chap11.AsyncShopClient.main(AsyncShopClient.java:14)
    ... 5 more
Caused by: java.lang.RuntimeException: product not available
    at lambdasinaction.chap11.AsyncShop.calculatePrice(AsyncShop.java:36)
    at lambdasinaction.chap11.AsyncShop.lambda$getPrice$0(AsyncShop.java:23)
    at lambdasinaction.chap11.AsyncShop$$Lambda$1/24071475.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:744)

  

使用工廠方法supplyAsync創建CompletableFuture

目前為止我們已經瞭解了如何通過編程創建CompletableFuture對像以及如何獲取返回值,雖然看起來這些操作已經比較方便,但還有進一步提升的空間,CompletableFuture類自身提供了大量精巧的工廠方法,使用這些方法能更容易地完成整個流程,還不用擔心實現的細節。比如,採用supplyAsync方法後,你可以用一行語句重寫代碼清單11-4中的getPriceAsync方法,如下所示。

代碼清單11-7 使用工廠方法supplyAsync創建CompletableFuture對像

public Future<Double> getPriceAsync(String product) {
    return CompletableFuture.supplyAsync( -> calculatePrice(product));
}

  

supplyAsync方法接受一個生產者(Supplier)作為參數,返回一個CompletableFuture對象,該對像完成異步執行後會讀取調用生產者方法的返回值。生產者方法會交由ForkJoinPool池中的某個執行線程(Executor)運行,但是你也可以使用supplyAsync方法的重載版本,傳遞第二個參數指定不同的執行線程執行生產者方法。一般而言,向CompletableFuture的工廠方法傳遞可選參數,指定生產者方法的執行線程是可行的,在11.3.4節中,你會使用這一能力,我們會在該小節介紹如何使用適合你應用特性的執行線程改善程序的性能。

此外,代碼清單11-7中getPriceAsync方法返回的CompletableFuture對像和代碼清單11-6中你手工創建和完成的CompletableFuture對象是完全等價的,這意味著它提供了同樣的錯誤管理機制,而前者你花費了大量的精力才得以構建。

本章的剩餘部分中,我們會假設你非常不幸,無法控制Shop類提供API的具體實現,最終提供給你的API都是同步阻塞式的方法。這也是當你試圖使用服務提供的HTTP API時最常發生的情況。你會學到如何以異步的方式查詢多個商店,避免被單一的請求所阻塞,並由此提升你的“最佳價格查詢器”的性能和吞吐量。

11.3 讓你的代碼免受阻塞之苦

所以,你已經被要求進行“最佳價格查詢器”應用的開發了,不過你需要查詢的所有商店都如11.2節開始時介紹的那樣,只提供了同步API。換句話說,你有一個商家的列表,如下所示:

List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
                                 new Shop("LetsSaveBig"),
                                 new Shop("MyFavoriteShop"),
                                 new Shop("BuyItAll"));

  

你需要使用下面這樣的簽名實現一個方法,它接受產品名作為參數,返回一個字符串列表,這個字符串列表中包括商店的名稱、該商店中指定商品的價格:

public List<String> findPrices(String product);

  

你的第一個想法可能是使用我們在第4、5、6章中學習的Stream特性。你可能試圖寫出類似下面這個清單中的代碼(是的,作為第一個方案,如果你想到這些已經相當棒了!)。

代碼清單11-8 採用順序查詢所有商店的方式實現的findPrices方法

public List<String> findPrices(String product) {
    return shops.stream
        .map(shop -> String.format("%s price is %.2f",
                                   shop.getName, shop.getPrice(product)))
        .collect(toList);
}

  

好吧,這段代碼看起來非常直白。現在試著用該方法去查詢你最近這些天瘋狂著迷的唯一產品(是的,你已經猜到了,它就是myPhone27S)。此外,也請記錄下方法的執行時間,通過這些數據,我們可以比較優化之後的方法會帶來多大的性能提升,具體的代碼清單如下。

代碼清單11-9 驗證findPrices的正確性和執行性能

long start = System.nanoTime;
System.out.println(findPrices("myPhone27S"));
long duration = (System.nanoTime - start) / 1_000_000;
System.out.println("Done in " + duration + " msecs");

  

代碼清單11-9的運行結果輸出如下:

[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
     is 214.13, BuyItAll price is 184.74]
Done in 4032 msecs

  

正如你預期的,findPrices方法的執行時間僅比4秒鐘多了那麼幾毫秒,因為對這4個商店的查詢是順序進行的,並且一個查詢操作會阻塞另一個,每一個操作都要花費大約1秒左右的時間計算請求商品的價格。你怎樣才能改進這個結果呢?

11.3.1 使用並行流對請求進行並行操作

讀完第7章,你應該想到的第一個,可能也是最快的改善方法是使用並行流來避免順序計算,如下所示。

代碼清單11-10 對findPrices進行並行操作

public List<String> findPrices(String product) {
    return shops.parallelStream          ←─使用並行流並行地從不同的商店獲取價格
        .map(shop -> String.format("%s price is %.2f",
                                   shop.getName, shop.getPrice(product)))
        .collect(toList);
}

  

運行代碼,與代碼清單11-9的執行結果相比較,你發現了新版findPrices的改進了吧。

[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
     is 214.13, BuyItAll price is 184.74]
Done in 1180 msecs

  

相當不錯啊!看起來這是個簡單但有效的主意:現在對四個不同商店的查詢實現了並行,所以完成所有操作的總耗時只有1秒多一點兒。你能做得更好嗎?讓我們嘗試使用剛學過的CompletableFuture,將findPrices方法中對不同商店的同步調用替換為異步調用。

11.3.2 使用CompletableFuture發起異步請求

你已經知道我們可以使用工廠方法supplyAsync創建CompletableFuture對象。讓我們把它利用起來:

List<CompletableFuture<String>> priceFutures =
        shops.stream
        .map(shop -> CompletableFuture.supplyAsync(
              -> String.format("%s price is %.2f",
             shop.getName, shop.getPrice(product))))
        .collect(toList);

 

使用這種方式,你會得到一個List<CompletableFuture<String>>,列表中的每個CompletableFuture對像在計算完成後都包含商店的String類型的名稱。但是,由於你用CompletableFutures實現的findPrices方法要求返回一個List<String>,你需要等待所有的future執行完畢,將其包含的值抽取出來,填充到列表中才能返回。

為了實現這個效果,你可以向最初的List<CompletableFuture<String>>施加第二個map操作,對List中的所有future對像執行join操作,一個接一個地等待它們運行結束。注意CompletableFuture類中的join方法和Future接口中的get有相同的含義,並且也聲明在Future接口中,它們唯一的不同是join不會拋出任何檢測到的異常。使用它你不再需要使用 try/catch語句塊讓你傳遞給第二個map方法的Lambda表達式變得過於臃腫。所有這些整合在一起,你就可以重新實現findPrices了,具體代碼如下。

代碼清單11-11 使用CompletableFuture實現findPrices方法

public List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures =
            shops.stream
            .map(shop -> CompletableFuture.supplyAsync(    ←─使用CompletableFuture以異步方式計算每種商品的價格
                          -> shop.getName + " price is " +
                               shop.getPrice(product)))
            .collect(Collectors.toList);

    return priceFutures.stream
            .map(CompletableFuture::join)    ←─等待所有異步操作結束
            .collect(toList);
}

  

注意到了嗎?這裡使用了兩個不同的Stream流水線,而不是在同一個處理流的流水線上一個接一個地放置兩個map操作——這其實是有緣由的。考慮流操作之間的延遲特性,如果你在單一流水線中處理流,發向不同商家的請求只能以同步、順序執行的方式才會成功。因此,每個創建CompletableFuture對像只能在前一個操作結束之後執行查詢指定商家的動作、通知join方法返回計算結果。圖11-4解釋了這些重要的細節。

圖 11-4 為什麼Stream的延遲特性會引起順序執行,以及如何避免

圖11-4的上半部分展示了使用單一流水線處理流的過程,我們看到,執行的流程(以虛線標識)是順序的。事實上,新的CompletableFuture對像只有在前一個操作完全結束之後,才能創建。與此相反,圖的下半部分展示了如何先將CompletableFutures對像聚集到一個列表中(即圖中以橢圓表示的部分),讓對像們可以在等待其他對像完成操作之前就能啟動。

運行代碼清單11-11中的代碼來瞭解下第三個版本findPrices方法的性能,你會得到下面這幾行輸出:

[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
     is 214.13, BuyItAll price is 184.74]
Done in 2005 msecs

  

這個結果讓人相當失望,不是嗎?超過2秒意味著利用CompletableFuture實現的版本,比剛開始代碼清單11-8中原生順序執行且會發生阻塞的版本快。但是它的用時也差不多是使用並行流的前一個版本的兩倍。尤其是,考慮到從順序執行的版本轉換到並行流的版本只做了非常小的改動,就讓人更加沮喪。

與此形成鮮明對比的是,我們為採用CompletableFutures完成的新版方法做了大量的工作!但,這就是全部的真相嗎?這種場景下使用CompletableFutures真的是浪費時間嗎?或者我們可能漏掉了某些重要的東西?繼續往下探究之前,讓我們休息幾分鐘,尤其是想想你測試代碼的機器是否足以以並行方式運行四個線程。1

1如果你使用的機器足夠強大,能以並行方式運行更多的線程(比如說8個線程),那你需要使用更多的商店和並行進程,才能重現這幾頁中介紹的行為。

11.3.3 尋找更好的方案

並行流的版本工作得非常好,那是因為它能並行地執行四個任務,所以它幾乎能為每個商家分配一個線程。但是,如果你想要增加第五個商家到商店列表中,讓你的“最佳價格查詢”應用對其進行處理,這時會發生什麼情況?毫不意外,順序執行版本的執行還是需要大約5秒多鐘的時間,下面是執行的輸出:

[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
     is 214.13, BuyItAll price is 184.74, ShopEasy price is 176.08]
Done in 5025 msecs            ←─使用順序流方式的程序輸出

  

非常不幸,並行流版本的程序這次比之前也多消耗了差不多1秒鐘的時間,因為可以並行運行(通用線程池中處於可用狀態的)的四個線程現在都處於繁忙狀態,都在對前4個商店進行查詢。第五個查詢只能等到前面某一個操作完成釋放出空閒線程才能繼續,它的運行結果如下:

[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
     is 214.13, BuyItAll price is 184.74, ShopEasy price is 176.08]
Done in 2177 msecs          ←─使用並行流方式的程序輸出

  

CompletableFuture版本的程序結果如何呢?我們也試著添加第5個商店對其進行了測試,結果如下:

[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
     is 214.13, BuyItAll price is 184.74, ShopEasy price is 176.08]
Done in 2006 msecs                   ←─使用CompletableFuture的程序輸出

  

CompletableFuture版本的程序似乎比並行流版本的程序還快那麼一點兒。但是最後這個版本也不太令人滿意。比如,如果你試圖讓你的代碼處理9個商店,並行流版本耗時3143毫秒,而CompletableFuture版本耗時3009毫秒。它們看起來不相伯仲,究其原因都一樣:它們內部採用的是同樣的通用線程池,默認都使用固定數目的線程,具體線程數取決於Runtime.getRuntime.availableProcessors的返回值。然而,CompletableFuture具有一定的優勢,因為它允許你對執行器(Executor)進行配置,尤其是線程池的大小,讓它以更適合應用需求的方式進行配置,滿足程序的要求,而這是並行流API無法提供的。讓我們看看你怎樣利用這種配置上的靈活性帶來實際應用程序性能上的提升。

11.3.4 使用定制的執行器

就這個主題而言,明智的選擇似乎是創建一個配有線程池的執行器,線程池中線程的數目取決於你預計你的應用需要處理的負荷,但是你該如何選擇合適的線程數目呢?

調整線程池的大小

《Java並發編程實戰》(http://mng.bz/979c)一書中,Brian Goetz和合著者們為線程池大小的優化提供了不少中肯的建議。這非常重要,如果線程池中線程的數量過多,最終它們會競爭稀缺的處理器和內存資源,浪費大量的時間在上下文切換上。反之,如果線程的數目過少,正如你的應用所面臨的情況,處理器的一些核可能就無法充分利用。Brian Goetz建議,線程池大小與處理器的利用率之比可以使用下面的公式進行估算:

Nthreads = NCPU * UCPU * (1 + W/C)

其中:

  • NCPU是處理器的核的數目,可以通過Runtime.getRuntime.availableProcessors得到

  • UCPU是期望的CPU利用率(該值應該介於0和1之間)

  • W/C是等待時間與計算時間的比率

你的應用99%的時間都在等待商店的響應,所以估算出的W/C比率為100。這意味著如果你期望的CPU利用率是100%,你需要創建一個擁有400個線程的線程池。實際操作中,如果你創建的線程數比商店的數目更多,反而是一種浪費,因為這樣做之後,你線程池中的有些線程根本沒有機會被使用。出於這種考慮,我們建議你將執行器使用的線程數,與你需要查詢的商店數目設定為同一個值,這樣每個商店都應該對應一個服務線程。不過,為了避免發生由於商店的數目過多導致服務器超負荷而崩潰,你還是需要設置一個上限,比如100個線程。代碼清單如下所示。

代碼清單11-12 為“最優價格查詢器”應用定制的執行器

private final Executor executor =
        Executors.newFixedThreadPool(Math.min(shops.size, 100),    ←─創建一個線程池,線程池中線程的數目為100和商店數目二者中較小的一個值
                                     new ThreadFactory {
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);    ←─使用守護線程——這種方式不會阻止程序的關停
                return t;
            }
});

  

注意,你現在正創建的是一個由守護線程構成的線程池。Java程序無法終止或者退出一個正在運行中的線程,所以最後剩下的那個線程會由於一直等待無法發生的事件而引發問題。與此相反,如果將線程標記為守護進程,意味著程序退出時它也會被回收。這二者之間沒有性能上的差異。現在,你可以將執行器作為第二個參數傳遞給supplyAsync工廠方法了。比如,你現在可以按照下面的方式創建一個可查詢指定商品價格的CompletableFuture對像:

CompletableFuture.supplyAsync( -> shop.getName + " price is " +
                                    shop.getPrice(product), executor);

  

改進之後,使用CompletableFuture方案的程序處理5個商店僅耗時1021秒,處理9個商店時耗時1022秒。一般而言,這種狀態會一直持續,直到商店的數目達到我們之前計算的閾值400。這個例子證明了要創建更適合你的應用特性的執行器,利用CompletableFutures向其提交任務執行是個不錯的主意。處理需大量使用異步操作的情況時,這幾乎是最有效的策略。

並行——使用流還是CompletableFutures

目前為止,你已經知道對集合進行並行計算有兩種方式:要麼將其轉化為並行流,利用map這樣的操作開展工作,要麼枚舉出集合中的每一個元素,創建新的線程,在CompletableFuture內對其進行操作。後者提供了更多的靈活性,你可以調整線程池的大小,而這能幫助你確保整體的計算不會因為線程都在等待I/O而發生阻塞。

我們對使用這些API的建議如下。

  • 如果你進行的是計算密集型的操作,並且沒有I/O,那麼推薦使用Stream接口,因為實現簡單,同時效率也可能是最高的(如果所有的線程都是計算密集型的,那就沒有必要創建比處理器核數更多的線程)。

  • 反之,如果你並行的工作單元還涉及等待I/O的操作(包括網絡連接等待),那麼使用CompletableFuture靈活性更好,你可以像前文討論的那樣,依據等待/計算,或者W/C的比率設定需要使用的線程數。這種情況不使用並行流的另一個原因是,處理流的流水線中如果發生I/O等待,流的延遲特性會讓我們很難判斷到底什麼時候觸發了等待。

現在你已經瞭解了如何利用CompletableFuture為你的用戶提供異步API,以及如何將一個同步又緩慢的服務轉換為異步的服務。不過到目前為止,我們每個Future中進行的都是單次的操作。下一節中,你會看到如何將多個異步操作結合在一起,以流水線的方式運行,從描述形式上,它與你在前面學習的Stream API有幾分類似。

11.4 對多個異步任務進行流水線操作

讓我們假設所有的商店都同意使用一個集中式的折扣服務。該折扣服務提供了五個不同的折扣代碼,每個折扣代碼對應不同的折扣率。你使用一個枚舉型變量Discount.Code來實現這一想法,具體代碼如下所示。

代碼清單11-13 以枚舉類型定義的折扣代碼

public class Discount {
    public enum Code {
        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }
    // Discount類的具體實現這裡暫且不表示,參見代碼清單11-14
}

  

我們還假設所有的商店都同意修改getPrice方法的返回格式。getPrice現在以Shop-Name:price:DiscountCode的格式返回一個String類型的值。我們的示例實現中會返回一個隨機生成的Discount.Code,以及已經計算得出的隨機價格:

public String getPrice(String product) {
    double price = calculatePrice(product);
    Discount.Code code = Discount.Code.values[
                            random.nextInt(Discount.Code.values.length)];
    return String.format("%s:%.2f:%s", name, price, code);
}
private double calculatePrice(String product) {
    delay;
    return random.nextDouble * product.charAt(0) + product.charAt(1);
}

  

調用getPrice方法可能會返回像下面這樣一個String值:

BestPrice:123.26:GOLD

  

11.4.1 實現折扣服務

你的“最佳價格查詢器”應用現在能從不同的商店取得商品價格,解析結果字符串,針對每個字符串,查詢折扣服務取的折扣代碼2。這個流程決定了請求商品的最終折扣價格(每個折扣代碼的實際折扣比率有可能發生變化,所以你每次都需要查詢折扣服務)。我們已經將對商店返回字符串的解析操作封裝到了下面的 Quote類之中:

2原文為for each String, query the discount server's needs,此處在上下文中略有不通,疑為原文有誤。——譯者注

public class Quote {

    private final String shopName;
    private final double price;
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code code) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = code;
    }

    public static Quote parse(String s) {
        String split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }

    public String getShopName { return shopName; }
    public double getPrice { return price; }
    public Discount.Code getDiscountCode { return discountCode; }
}

  

通過傳遞shop對像返回的字符串給靜態工廠方法parse,你可以得到Quote類的一個實例,它包含了shop的名稱、折扣之前的價格,以及折扣代碼。

Discount服務還提供了一個applyDiscount方法,它接收一個Quote對象,返回一個字符串,表示生成該Quoteshop中的折扣價格,代碼如下所示。

代碼清單11-14 Discount服務

public class Discount {
    public enum Code {
        // 源碼暫時省略……
    }

    public static String applyDiscount(Quote quote) {
        return quote.getShopName + " price is " +
               Discount.apply(quote.getPrice,    ←─將折扣代碼應用於商品最初的原始價格
                              quote.getDiscountCode);
    }
    private static double apply(double price, Code code) {
        delay;                                              ←─模擬Discount服務響應的延遲
        return format(price * (100 - code.percentage) / 100);
    }
}

  

11.4.2 使用Discount服務

由於Discount服務是一種遠程服務,你還需要增加1秒鐘的模擬延遲,代碼如下所示。和在11.3節中一樣,首先嘗試以最直接的方式(壞消息是,這種方式是順序而且同步執行的)重新實現findPrices,以滿足這些新增的需求。

代碼清單11-15 以最簡單的方式實現使用Discount服務的findPrices方法

public List<String> findPrices(String product) {
    return shops.stream
            .map(shop -> shop.getPrice(product))    ←─取得每個shop對像中商品的原始價格
            .map(Quote::parse)            ←─在Quote 對像中對shop返回的字符串進行轉換
            .map(Discount::applyDiscount)    ←─聯繫Discount服務,為每個Quote申請折扣
            .collect(toList);
}

  

通過在shop構成的流上採用流水線方式執行三次map操作,我們得到了期望的結果。

  • 第一個操作將每個shop對像轉換成了一個字符串,該字符串包含了該 shop中指定商品的價格和折扣代碼。

  • 第二個操作對這些字符串進行瞭解析,在Quote對像中對它們進行轉換。

  • 最終,第三個map會操作聯繫遠程的Discount服務,計算出最終的折扣價格,並返回該價格及提供該價格商品的shop

你可能已經猜到,這種實現方式的性能遠非最優,不過我們還是應該測量一下。跟之前一樣,通過運行基準測試,我們得到下面的數據:

[BestPrice price is 110.93, LetsSaveBig price is 135.58, MyFavoriteShop price
     is 192.72, BuyItAll price is 184.74, ShopEasy price is 167.28]
Done in 10028 msecs

  

毫無意外,這次執行耗時10秒,因為順序查詢5個商店耗時大約5秒,現在又加上了Discount服務為5個商店返回的價格申請折扣所消耗的5秒鐘。你已經知道,把流轉換為並行流的方式,非常容易提升該程序的性能。不過,通過11.3節的介紹,你也知道這一方案在商店的數目增加時,擴展性不好,因為Stream底層依賴的是線程數量固定的通用線程池。相反,你也知道,如果自定義CompletableFutures調度任務執行的執行器能夠更充分地利用CPU資源。

11.4.3 構造同步和異步操作

讓我們再次使用CompletableFuture提供的特性,以異步方式重新實現findPrices方法。詳細代碼如下所示。如果你發現有些內容不太熟悉,不用太擔心,我們很快會進行針對性的介紹。

代碼清單11-16 使用CompletableFuture實現findPrices方法

public List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures =
        shops.stream
             .map(shop -> CompletableFuture.supplyAsync(    ←─以異步方式取得每個shop中指定產品的原始價格
                                    -> shop.getPrice(product), executor))
             .map(future -> future.thenApply(Quote::parse))    ←─Quote對像存在時,對其返回的值進行轉換
             .map(future -> future.thenCompose(quote ->    ←─使用另一個異步任務構造期望的Future,申請折扣
                         CompletableFuture.supplyAsync(
                            -> Discount.applyDiscount(quote), executor)))
                .collect(toList);

    return priceFutures.stream
            .map(CompletableFuture::join)    ←─等待流中的所有Future執行完畢,並提取各自的返回值
            .collect(toList);
}

  

這一次,事情看起來變得更加複雜了,所以讓我們一步一步地理解到底發生了什麼。這三次轉換的流程如圖11-5所示。

圖 11-5 構造同步操作和異步任務

你所進行的這三次map操作和代碼清單11-5中的同步方案沒有太大的區別,不過你使用CompletableFuture類提供的特性,在需要的地方把它們變成了異步操作。

1. 獲取價格

這三個操作中的第一個你已經在本章的各個例子中見過很多次,只需要將Lambda表達式作為參數傳遞給supplyAsync工廠方法就可以以異步方式對shop進行查詢。第一個轉換的結果是一個Stream<CompletableFuture<String>>,一旦運行結束,每個CompletableFuture對像中都會包含對應shop返回的字符串。注意,你對CompletableFuture進行了設置,用代碼清單11-12中的方法向其傳遞了一個訂製的執行器Executor

2. 解析報價

現在你需要進行第二次轉換將字符串轉變為訂單。由於一般情況下解析操作不涉及任何遠程服務,也不會進行任何I/O操作,它幾乎可以在第一時間進行,所以能夠採用同步操作,不會帶來太多的延遲。由於這個原因,你可以對第一步中生成的CompletableFuture對像調用它的thenApply,將一個由字符串轉換Quote的方法作為參數傳遞給它。

注意到了嗎?直到你調用的CompletableFuture執行結束,使用的thenApply方法都不會阻塞你代碼的執行。這意味著CompletableFuture最終結束運行時,你希望傳遞Lambda表達式給thenApply方法,將Stream中的每個CompletableFuture<String>對像轉換為對應的CompletableFuture<Quote>對象。你可以把這看成是為處理CompletableFuture的結果建立了一個菜單,就像你曾經為Stream的流水線所做的事兒一樣。

3. 為計算折扣價格構造Future

第三個map操作涉及聯繫遠程的Discount服務,為從商店中得到的原始價格申請折扣率。這一轉換與前一個轉換又不大一樣,因為這一轉換需要遠程執行(或者,就這個例子而言,它需要模擬遠程調用帶來的延遲),出於這一原因,你也希望它能夠異步執行。

為了實現這一目標,你像第一個調用傳遞getPricesupplyAsync那樣,將這一操作以Lambda表達式的方式傳遞給了supplyAsync工廠方法,該方法最終會返回另一個CompletableFuture對象。到目前為止,你已經進行了兩次異步操作,用了兩個不同的CompletableFutures對像進行建模,你希望能把它們以級聯的方式串接起來進行工作。

  • shop對像中獲取價格,接著把價格轉換為Quote

  • 拿到返回的Quote對象,將其作為參數傳遞給Discount服務,取得最終的折扣價格。

Java 8的 CompletableFuture API提供了名為thenCompose的方法,它就是專門為這一目的而設計的,thenCompose方法允許你對兩個異步操作進行流水線,第一個操作完成時,將其結果作為參數傳遞給第二個操作。換句話說,你可以創建兩個CompletableFutures對象,對第一個CompletableFuture對像調用thenCompose,並向其傳遞一個函數。當第一個CompletableFuture執行完畢後,它的結果將作為該函數的參數,這個函數的返回值是以第一個CompletableFuture的返回做輸入計算出的第二個CompletableFuture對象。使用這種方式,即使Future在向不同的商店收集報價,主線程還是能繼續執行其他重要的操作,比如響應UI事件。

將這三次map操作的返回的Stream元素收集到一個列表,你就得到了一個List<CompletableFuture<String>>,等這些CompletableFuture對像最終執行完畢,你就可以像代碼清單11-11中那樣利用join取得它們的返回值。代碼清單11-18實現的新版findPrices方法產生的輸出如下:

[BestPrice price is 110.93, LetsSaveBig price is 135.58, MyFavoriteShop price
     is 192.72, BuyItAll price is 184.74, ShopEasy price is 167.28]
Done in 2035 msecs

  

你在代碼清單11-16中使用的thenCompose方法像CompletableFuture類中的其他方法一樣,也提供了一個以Async後綴結尾的版本thenComposeAsync。通常而言,名稱中不帶Async的方法和它的前一個任務一樣,在同一個線程中運行;而名稱以Async結尾的方法會將後續的任務提交到一個線程池,所以每個任務是由不同的線程處理的。就這個例子而言,第二個CompletableFuture對象的結果取決於第一個CompletableFuture,所以無論你使用哪個版本的方法來處理CompletableFuture對象,對於最終的結果,或者大致的時間而言都沒有多少差別。我們選擇thenCompose方法的原因是因為它更高效一些,因為少了很多線程切換的開銷。

11.4.4 將兩個CompletableFuture對像整合起來,無論它們是否存在依賴

代碼清單11-16中,你對一個CompletableFuture對像調用了thenCompose方法,並向其傳遞了第二個CompletableFuture,而第二個CompletableFuture又需要使用第一個CompletableFuture的執行結果作為輸入。但是,另一種比較常見的情況是,你需要將兩個完全不相干的CompletableFuture對象的結果整合起來,而且你也不希望等到第一個任務完全結束才開始第二項任務。

這種情況,你應該使用thenCombine方法,它接收名為BiFunction的第二參數,這個參數定義了當兩個CompletableFuture對像完成計算後,結果如何合併。同thenCompose方法一樣,thenCombine方法也提供有一個Async的版本。這裡,如果使用thenCombineAsync會導致BiFunction中定義的合併操作被提交到線程池中,由另一個任務以異步的方式執行。

回到我們正在運行的這個例子,你知道,有一家商店提供的價格是以歐元(EUR)計價的,但是你希望以美元的方式提供給你的客戶。你可以用異步的方式向商店查詢指定商品的價格,同時從遠程的匯率服務那裡查到歐元和美元之間的匯率。當二者都結束時,再將這兩個結果結合起來,用返回的商品價格乘以當時的匯率,得到以美元計價的商品價格。用這種方式,你需要使用第三個CompletableFuture對象,當前兩個CompletableFuture計算出結果,並由BiFunction方法完成合併後,由它來最終結束這一任務,代碼清單如下所示。

代碼清單11-17 合併兩個獨立的CompletableFuture對像

Future<Double> futurePriceInUSD =
        CompletableFuture.supplyAsync( -> shop.getPrice(product))    ←─創建第一個任務查詢商店取得商品的價格
        .thenCombine(
            CompletableFuture.supplyAsync(
                 ->  exchangeService.getRate(Money.EUR, Money.USD)),    ←─創建第二個獨立任務,查詢美元和歐元之間的轉換匯率
            (price, rate) -> price * rate    ←─通過乘法整合得到的商品價格和匯率
        );

  

這裡整合的操作只是簡單的乘法操作,用另一個單獨的任務對其進行操作有些浪費資源,所以你只要使用thenCombine方法,無需特別求助於異步版本的thenCombineAsync方法。圖11-6展示了代碼清單11-17中創建的多個任務是如何在線程池中選擇不同的線程執行的,以及它們最終的運行結果又是如何整合的。