讀古今文學網 > Netty實戰 > 第7章 EventLoop和線程模型 >

第7章 EventLoop和線程模型

本章主要內容

  • 線程模型概述
  • 事件循環的概念和實現
  • 任務調度
  • 實現細節

簡單地說,線程模型指定了操作系統、編程語言、框架或者應用程序的上下文中的線程管理的關鍵方面。顯而易見地,如何以及何時創建線程將對應用程序代碼的執行產生顯著的影響,因此開發人員需要理解與不同模型相關的權衡。無論是他們自己選擇模型,還是通過採用某種編程語言或者框架隱式地獲得它,這都是真實的。

在本章中,我們將詳細地探討Netty的線程模型。它強大但又易用,並且和Netty的一貫宗旨一樣,旨在簡化你的應用程序代碼,同時最大限度地提高性能和可維護性。我們還將討論致使選擇當前線程模型的經驗。

如果你對Java的並發API(java.util.concurrent)有比較好的理解,那麼你應該會發現在本章中的討論都是直截了當的。如果這些概念對你來說還比較陌生,或者你需要更新自己的相關知識,那麼由Brian Goetz等編寫的《Java並發編程實戰》 (Addison-Wesley Professional,2006)這本書將是極好的資源。

7.1 線程模型概述

在這一節中,我們將介紹常見的線程模型,隨後將繼續討論Netty過去以及當前的線程模型,並評審它們各自的優點以及局限性。

正如我們在本章開頭所指出的,線程模型確定了代碼的執行方式。由於我們總是必須規避並發執行可能會帶來的副作用,所以理解所採用的並發模型(也有單線程的線程模型)的影響很重要。忽略這些問題,僅寄希望於最好的情況(不會引發並發問題)無疑是賭博——賠率必然會擊敗你。

因為具有多核心或多個CPU的計算機現在已經司空見慣,大多數的現代應用程序都利用了複雜的多線程處理技術以有效地利用系統資源。相比之下,在早期的Java語言中,我們使用多線程處理的主要方式無非是按需創建和啟動新的Thread來執行並發的任務單元——一種在高負載下工作得很差的原始方式。Java 5隨後引入了ExecutorAPI,其線程池通過緩存和重用Thread極大地提高了性能。

基本的線程池化模式可以描述為:

  • 從池的空閒線程列表中選擇一個Thread,並且指派它去運行一個已提交的任務(一個Runnable的實現);
  • 當任務完成時,將該Thread返回給該列表,使其可被重用。

圖7-1說明了這個模式。

圖7-1 Executor的執行邏輯

雖然池化和重用線程相對於簡單地為每個任務都創建和銷毀線程是一種進步,但是它並不能消除由上下文切換所帶來的開銷,其將隨著線程數量的增加很快變得明顯,並且在高負載下愈演愈烈。此外,僅僅由於應用程序的整體複雜性或者並發需求,在項目的生命週期內也可能會出現其他和線程相關的問題。

簡而言之,多線程處理是很複雜的。在接下來的章節中,我們將會看到Netty是如何幫助簡化它的。

7.2 EventLoop接口

運行任務來處理在連接的生命週期內發生的事件是任何網絡框架的基本功能。與之相應的編程上的構造通常被稱為事件循環——一個Netty使用了interface io.netty.channel. EventLoop來適配的術語。

代碼清單7-1中說明了事件循環的基本思想,其中每個任務都是一個Runnable的實例(如圖7-1所示)。

代碼清單7-1 在事件循環中執行任務

while (!terminated) {
 List<Runnable> readyEvents = blockUntilEventsReady;  ← --  阻塞,直到有事件已經就緒可被運行
  for (Runnable ev: readyEvents) {
   ev.run;  ← --  循環遍歷,並處理所有的事件
  }
}  

Netty的EventLoop是協同設計的一部分,它採用了兩個基本的API:並發和網絡編程。首先,io.netty.util.concurrent包構建在JDK的java.util.concurrent包上,用來提供線程執行器。其次,io.netty.channel包中的類,為了與Channel的事件進行交互,擴展了這些接口/類。圖7-2展示了生成的類層次結構。

圖7-2 EventLoop的類層次結構

在這個模型中,一個EventLoop將由一個永遠都不會改變的Thread驅動,同時任務(Runnable或者Callable)可以直接提交給EventLoop實現,以立即執行或者調度執行。根據配置和可用核心的不同,可能會創建多個EventLoop實例用以優化資源的使用,並且單個EventLoop可能會被指派用於服務多個Channel

需要注意的是,Netty的EventLoop在繼承了ScheduledExecutorService的同時,只定義了一個方法,parent[1]。這個方法,如下面的代碼片斷所示,用於返回到當前EventLoop實現的實例所屬的EventLoopGroup的引用。

public interface EventLoop extends EventExecutor, EventLoopGroup {
  @Override
  EventLoopGroup parent;
}  

事件/任務的執行順序 事件和任務是以先進先出(FIFO)的順序執行的。這樣可以通過保證字節內容總是按正確的順序被處理,消除潛在的數據損壞的可能性。

7.2.1 Netty 4中的I/O和事件處理

正如我們在第6章中所詳細描述的,由I/O操作觸發的事件將流經安裝了一個或者多個ChannelHandlerChannelPipeline。傳播這些事件的方法調用可以隨後被Channel- Handler所攔截,並且可以按需地處理事件。

事件的性質通常決定了它將被如何處理;它可能將數據從網絡棧中傳遞到你的應用程序中,或者進行逆向操作,或者執行一些截然不同的操作。但是事件的處理邏輯必須足夠的通用和靈活,以處理所有可能的用例。因此,在Netty 4中,所有的I/O操作和事件都由已經被分配給了EventLoop的那個Thread來處理[2]。

這不同於Netty 3中所使用的模型。在下一節中,我們將討論這個早期的模型以及它被替換的原因。

7.2.2 Netty 3中的I/O操作

在以前的版本中所使用的線程模型只保證了入站(之前稱為上游)事件會在所謂的I/O線程(對應於Netty 4中的EventLoop)中執行。所有的出站(下游)事件都由調用線程處理,其可能是I/O線程也可能是別的線程。開始看起來這似乎是個好主意,但是已經被發現是有問題的,因為需要在ChannelHandler中對出站事件進行仔細的同步。簡而言之,不可能保證多個線程不會在同一時刻嘗試訪問出站事件。例如,如果你通過在不同的線程中調用Channel.write方法,針對同一個Channel同時觸發出站的事件,就會發生這種情況。

當出站事件觸發了入站事件時,將會導致另一個負面影響。當Channel.write方法導致異常時,需要生成並觸發一個exceptionCaught事件。但是在Netty 3的模型中,由於這是一個入站事件,需要在調用線程中執行代碼,然後將事件移交給I/O線程去執行,然而這將帶來額外的上下文切換。

Netty 4中所採用的線程模型,通過在同一個線程中處理某個給定的EventLoop中所產生的所有事件,解決了這個問題。這提供了一個更加簡單的執行體系架構,並且消除了在多個ChannelHandler中進行同步的需要(除了任何可能需要在多個Channel中共享的)。

現在,已經理解了EventLoop的角色,讓我們來看看任務是如何被調度執行的吧。

7.3 任務調度

偶爾,你將需要調度一個任務以便稍後(延遲)執行或者週期性地執行。例如,你可能想要註冊一個在客戶端已經連接了5分鐘之後觸發的任務。一個常見的用例是,發送心跳消息到遠程節點,以檢查連接是否仍然還活著。如果沒有響應,你便知道可以關閉該Channel了。

在接下來的幾節中,我們將展示如何使用核心的Java API和Netty的EventLoop來調度任務。然後,我們將研究Netty的內部實現,並討論它的優點和局限性。

7.3.1 JDK的任務調度API

在Java 5之前,任務調度是建立在java.util.Timer類之上的,其使用了一個後台Thread,並且具有與標準線程相同的限制。隨後,JDK提供了java.util.concurrent包,它定義了interface ScheduledExecutorService。表7-1展示了java.util.concurrent.Executors的相關工廠方法。

表7-1 java.util.concurrent.Executors類的工廠方法

方  法

描  述

newScheduledThreadPool(
  int corePoolSize)


newScheduledThreadPool(
  int corePoolSize,
  ThreadFactorythreadFactory)

創建一個ScheduledThreadExecutorService,用於調度命令在指定延遲之後運行或者週期性地執行。它使用corePoolSize參數來計算線程數

newSingleThreadScheduledExecutor

newSingleThreadScheduledExecutor(
  ThreadFactorythreadFactory)

創建一個ScheduledThreadExecutorService,用於調度命令在指定延遲之後運行或者週期性地執行。它使用一個線程來執行被調度的任務

雖然選擇不是很多[3],但是這些預置的實現已經足以應對大多數的用例。代碼清單7-2展示了如何使用ScheduledExecutorService來在60秒的延遲之後執行一個任務。

代碼清單7-2 使用ScheduledExecutorService調度任務

ScheduledExecutorService executor =
  Executors.newScheduledThreadPool(10);   ← --  創建一個其線程池具有10 個線程的ScheduledExecutorService

ScheduledFuture<?> future = executor.schedule(
  new Runnable {   ← --  創建一個R unnable,以供調度稍後執行
  @Override
  public void run {
    System.out.println("60 seconds later");  ← --  該任務要打印的消息
  }
}, 60, TimeUnit.SECONDS);  ← -- 調度任務在從現在開始的60 秒之後執行
...
executor.shutdown;   ← -- 一旦調度任務執行完成,就關閉ScheduledExecutorService 以釋放資源  

雖然ScheduledExecutorServiceAPI是直截了當的,但是在高負載下它將帶來性能上的負擔。在下一節中,我們將看到Netty是如何以更高的效率提供相同的功能的。

7.3.2 使用EventLoop調度任務

ScheduledExecutorService的實現具有局限性,例如,事實上作為線程池管理的一部分,將會有額外的線程創建。如果有大量任務被緊湊地調度,那麼這將成為一個瓶頸。Netty通過ChannelEventLoop實現任務調度解決了這一問題,如代碼清單7-3所示。

代碼清單7-3 使用EventLoop調度任務

Channel ch = ...
ScheduledFuture<?> future = ch.eventLoop.schedule(  ← --  創建一個Runnable以供調度稍後執行
  new Runnable { 
  @Override
  public void run {  ← --  要執行的代碼
    System.out.println("60 seconds later"); 
  }
}, 60, TimeUnit.SECONDS);  ← --  調度任務在從現在開始的60 秒之後執行  

經過60秒之後,Runnable實例將由分配給ChannelEventLoop執行。如果要調度任務以每隔60秒執行一次,請使用scheduleAtFixedRate方法,如代碼清單7-4所示。

代碼清單7-4 使用EventLoop調度週期性的任務

Channel ch = ...
ScheduledFuture<?> future = ch.eventLoop.scheduleAtFixedRate(   ← -- 創建一個Runnable,以供調度稍後執行 
  new Runnable {
  @Override
  public void run {
    System.out.println("Run every 60 seconds");   ← -- 這將一直運行,直到ScheduledFuture 被取消
  }
}, 60, 60, TimeUnit.Seconds);   ← -- 調度在60 秒之後,並且以後每間隔60 秒運行  

如我們前面所提到的,Netty的EventLoop擴展了ScheduledExecutorService(見圖7-2),所以它提供了使用JDK實現可用的所有方法,包括在前面的示例中使用到的schedulescheduleAtFixedRate方法。所有操作的完整列表可以在ScheduledExecutorService的Javadoc中找到[4]。

要想取消或者檢查(被調度任務的)執行狀態,可以使用每個異步操作所返回的Scheduled- Future。代碼清單7-5展示了一個簡單的取消操作。

代碼清單7-5 使用ScheduledFuture取消任務

ScheduledFuture<?> future = ch.eventLoop.scheduleAtFixedRate(...);   ← --  調度任務,並獲得所返回的ScheduledFuture
// Some other code that runs...
boolean mayInterruptIfRunning = false;
future.cancel(mayInterruptIfRunning);  ← --  取消該任務,防止它再次運行  

這些例子說明,可以利用Netty的任務調度功能來獲得性能上的提升。反過來,這些也依賴於底層的線程模型,我們接下來將對其進行研究。

7.4 實現細節

這一節將更加詳細地探討Netty的線程模型和任務調度實現的主要內容。我們也將會提到需要注意的局限性,以及正在不斷發展中的領域。

7.4.1 線程管理

Netty線程模型的卓越性能取決於對於當前執行的Thread的身份的確定[5],也就是說,確定它是否是分配給當前Channel以及它的EventLoop的那一個線程。(回想一下EventLoop將負責處理一個Channel的整個生命週期內的所有事件。)

如果(當前)調用線程正是支撐EventLoop的線程,那麼所提交的代碼塊將會被(直接)執行。否則,EventLoop將調度該任務以便稍後執行,並將它放入到內部隊列中。當EventLoop下次處理它的事件時,它會執行隊列中的那些任務/事件。這也就解釋了任何的Thread是如何與Channel直接交互而無需在ChannelHandler中進行額外同步的。

注意,每個EventLoop都有它自已的任務隊列,獨立於任何其他的EventLoop。圖7-3展示了EventLoop用於調度任務的執行邏輯。這是Netty線程模型的關鍵組成部分。

圖7-3 EventLoop的執行邏輯

我們之前已經闡明了不要阻塞當前I/O線程的重要性。我們再以另一種方式重申一次:「永遠不要將一個長時間運行的任務放入到執行隊列中,因為它將阻塞需要在同一線程上執行的任何其他任務。」如果必須要進行阻塞調用或者執行長時間運行的任務,我們建議使用一個專門的EventExecutor。(見6.2.1節的「ChannelHandler的執行和阻塞」)。

除了這種受限的場景,如同傳輸所採用的不同的事件處理實現一樣,所使用的線程模型也可以強烈地影響到排隊的任務對整體系統性能的影響。(如同我們在第4章中所看到的,使用Netty可以輕鬆地切換到不同的傳輸實現,而不需要修改你的代碼庫。)

7.4.2 EventLoop/線程的分配

服務於Channel的I/O和事件的EventLoop包含在EventLoopGroup中。根據不同的傳輸實現,EventLoop的創建和分配方式也不同。

1.異步傳輸

異步傳輸實現只使用了少量的EventLoop(以及和它們相關聯的Thread),而且在當前的線程模型中,它們可能會被多個Channel所共享。這使得可以通過盡可能少量的Thread來支撐大量的Channel,而不是每個Channel分配一個Thread

圖7-4顯示了一個EventLoopGroup,它具有3個固定大小的EventLoop(每個EventLoop都由一個Thread支撐)。在創建EventLoopGroup時就直接分配了EventLoop(以及支撐它們的Thread),以確保在需要時它們是可用的。

圖7-4 用於非阻塞傳輸(如NIO和AIO)的EventLoop分配方式

EventLoopGroup負責為每個新創建的Channel分配一個EventLoop。在當前實現中,使用順序循環(round-robin)的方式進行分配以獲取一個均衡的分佈,並且相同的EventLoop可能會被分配給多個Channel。(這一點在將來的版本中可能會改變。)

一旦一個Channel被分配給一個EventLoop,它將在它的整個生命週期中都使用這個EventLoop(以及相關聯的Thread)。請牢記這一點,因為它可以使你從擔憂你的Channel- Handler實現中的線程安全和同步問題中解脫出來。

另外,需要注意的是,EventLoop的分配方式對ThreadLocal的使用的影響。因為一個EventLoop通常會被用於支撐多個Channel,所以對於所有相關聯的Channel來說,ThreadLocal都將是一樣的。這使得它對於實現狀態追蹤等功能來說是個糟糕的選擇。然而,在一些無狀態的上下文中,它仍然可以被用於在多個Channel之間共享一些重度的或者代價昂貴的對象,甚至是事件。

2.阻塞傳輸

用於像OIO(舊的阻塞I/O)這樣的其他傳輸的設計略有不同,如圖7-5所示。

這裡每一個Channel都將被分配給一個EventLoop(以及它的Thread)。如果你開發的應用程序使用過java.io包中的阻塞I/O實現,你可能就遇到過這種模型。

圖7-5 阻塞傳輸(如OIO)的EventLoop分配方式

但是,正如同之前一樣,得到的保證是每個Channel的I/O事件都將只會被一個Thread(用於支撐該ChannelEventLoop的那個Thread)處理。這也是另一個Netty設計一致性的例子,它(這種設計上的一致性)對Netty的可靠性和易用性做出了巨大貢獻。

7.5 小結

在本章中,你瞭解了通常的線程模型,並且特別深入地學習了Netty所採用的線程模型,我們詳細探討了其性能以及一致性。

你看到了如何在EventLoop(I/O Thread)中執行自己的任務,就如同Netty框架自身一樣。你學習了如何調度任務以便推遲執行,並且我們還探討了高負載下的伸縮性問題。你也看到了如何驗證一個任務是否已被執行以及如何取消它。

通過我們對Netty框架的實現細節的研究所獲得的這些信息,將幫助你在簡化你的應用程序代碼庫的同時最大限度地提高它的性能。關於更多一般意義上的有關線程池和並發編程的詳細信息,我們建議閱讀由Brian Goetz編寫的《Java並發編程實戰》。他的書將會帶你更加深入地理解多線程處理甚至是最複雜的多線程處理用例。

我們已經到達了一個令人興奮的時刻——在下一章中我們將討論引導,這是一個配置以及連接所有的Netty組件使你的應用程序運行起來的過程。


[1] 這個方法重寫了EventExecutorEventExecutorGroup.parent方法。

[2] 這裡使用的是「來處理」而不是「來觸發」,其中寫操作是可以從外部的任意線程觸發的。——譯者注

[3] 由JDK提供的這個接口的唯一具體實現是java.util.concurrent.ScheduledThreadPoolExecutor

[4] Java平台,標準版第8版API規範,java.util.concurrent,Interface ScheduledExecutorService:http://docs.oracle. com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html。

[5] 通過調用EventLoopinEventLoop(Thread)方法實現。——譯者注