讀古今文學網 > Netty實戰 > 第6章 ChannelHandler和ChannelPipeline >

第6章 ChannelHandler和ChannelPipeline

本章主要內容

  • ChannelHandler API和ChannelPipeline API
  • 檢測資源洩漏
  • 異常處理

在上一章中你學習了ByteBuf——Netty的數據容器。當我們在本章中探討Netty的數據流以及處理組件時,我們將基於已經學過的東西,並且你將開始看到框架的重要元素都結合到了一起。

你已經知道,可以在ChannelPipeline中將ChannelHandler鏈接在一起以組織處理邏輯。我們將會研究涉及這些類的各種用例,以及一個重要的關係——ChannelHandlerContext

理解所有這些組件之間的交互對於通過Netty構建模塊化的、可重用的實現至關重要。

6.1 ChannelHandler家族

在我們開始詳細地學習ChannelHandler之前,我們將在Netty的組件模型的這部分基礎上花上一些時間。

6.1.1 Channel的生命週期

Interface Channel定義了一組和ChannelInboundHandlerAPI密切相關的簡單但功能強大的狀態模型,表6-1列出了Channel的這4個狀態。

表6-1 Channel的生命週期狀態

狀  態

描  述

ChannelUnregistered

Channel已經被創建,但還未註冊到EventLoop

ChannelRegistered

Channel已經被註冊到了EventLoop

ChannelActive

Channel處於活動狀態(已經連接到它的遠程節點)。它現在可以接收和發送數據了

ChannelInactive

Channel沒有連接到遠程節點

Channel的正常生命週期如圖6-1所示。當這些狀態發生改變時,將會生成對應的事件。這些事件將會被轉發給ChannelPipeline中的ChannelHandler,其可以隨後對它們做出響應。

圖6-1 Channel的狀態模型

6.1.2 ChannelHandler的生命週期

表6-2中列出了interface ChannelHandler定義的生命週期操作,在ChannelHandler被添加到ChannelPipeline中或者被從ChannelPipeline中移除時會調用這些操作。這些方法中的每一個都接受一個ChannelHandlerContext參數。

表6-2 ChannelHandler的生命週期方法

類  型

描  述

handlerAdded

當把ChannelHandler添加到ChannelPipeline中時被調用

handlerRemoved

當從ChannelPipeline中移除ChannelHandler時被調用

exceptionCaught

當處理過程中在ChannelPipeline中有錯誤產生時被調用

Netty定義了下面兩個重要的ChannelHandler子接口:

  • ChannelInboundHandler——處理入站數據以及各種狀態變化;
  • ChannelOutboundHandler——處理出站數據並且允許攔截所有的操作。

在接下來的章節中,我們將詳細地討論這些子接口。

6.1.3 ChannelInboundHandler接口

表6-3列出了interface ChannelInboundHandler的生命週期方法。這些方法將會在數據被接收時或者與其對應的Channel狀態發生改變時被調用。正如我們前面所提到的,這些方法和Channel的生命週期密切相關。

表6-3 ChannelInboundHandler的方法

類  型

描  述

channelRegistered

Channel已經註冊到它的EventLoop並且能夠處理I/O時被調用

channelUnregistered

Channel從它的EventLoop註銷並且無法處理任何I/O時被調用

channelActive

Channel處於活動狀態時被調用;Channel已經連接/綁定並且已經就緒

channelInactive

Channel離開活動狀態並且不再連接它的遠程節點時被調用

channelReadComplete

Channel上的一個讀操作完成時被調用[1]

channelRead

當從Channel讀取數據時被調用

ChannelWritability - Changed

Channel的可寫狀態發生改變時被調用。用戶可以確保寫操作不會完成得太快(以避免發生OutOfMemoryError)或者可以在Channel變為再次可寫時恢復寫入。可以通過調用ChannelisWritable方法來檢測Channel的可寫性。與可寫性相關的閾值可以通過Channel.config. setWriteHighWaterMarkChannel.config.setWriteLowWater- Mark方法來設置

userEventTriggered

ChannelnboundHandler.fireUserEventTriggered方法被調用時被調用,因為一個POJO被傳經了ChannelPipeline

當某個ChannelInboundHandler的實現重寫channelRead方法時,它將負責顯式地釋放與池化的ByteBuf實例相關的內存。Netty為此提供了一個實用方法ReferenceCount-Util.release,如代碼清單6-1所示。

代碼清單6-1 釋放消息資源

@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {   ← --  擴展了Channel-InboundHandler-Adapter
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {  ← --  丟棄已接收的消息
    ReferenceCountUtil.release(msg); 
  }
}  

Netty將使用WARN級別的日誌消息記錄未釋放的資源,使得可以非常簡單地在代碼中發現違規的實例。但是以這種方式管理資源可能很繁瑣。一個更加簡單的方式是使用Simple- ChannelInboundHandler。代碼清單6-2是代碼清單6-1的一個變體,說明了這一點。

代碼清單6-2 使用SimpleChannelInboundHandler

@Sharable
public class SimpleDiscardHandler
  extends SimpleChannelInboundHandler<Object> {  ← --   擴展了SimpleChannelInboundHandler
  @Override
  public void channelRead0(ChannelHandlerContext ctx,
    Object msg) {
    // No need to do anything special  ← --  不需要任何顯式的資源釋放
  }
}  

由於SimpleChannelInboundHandler會自動釋放資源,所以你不應該存儲指向任何消息的引用供將來使用,因為這些引用都將會失效。

6.1.6節為引用處理提供了更加詳細的討論。

6.1.4 ChannelOutboundHandler接口

出站操作和數據將由ChannelOutboundHandler處理。它的方法將被ChannelChannel- Pipeline以及ChannelHandlerContext調用。

ChannelOutboundHandler的一個強大的功能是可以按需推遲操作或者事件,這使得可以通過一些複雜的方法來處理請求。例如,如果到遠程節點的寫入被暫停了,那麼你可以推遲沖刷操作並在稍後繼續。

表6-4顯示了所有由ChannelOutboundHandler本身所定義的方法(忽略了那些從Channel- Handler繼承的方法)。

表6-4 ChannelOutboundHandler的方法

類  型

描  述

bind(ChannelHandlerContext,
SocketAddress,ChannelPromise)

當請求將Channel綁定到本地地址時被調用

connect(ChannelHandlerContext,
SocketAddress,SocketAddress,ChannelPromise)

當請求將Channel連接到遠程節點時被調用

disconnect(ChannelHandlerContext,
ChannelPromise)

當請求將Channel從遠程節點斷開時被調用

close(ChannelHandlerContext,ChannelPromise)

當請求關閉Channel時被調用

deregister(ChannelHandlerContext,
ChannelPromise)

當請求將Channel從它的EventLoop註銷時被調用

read(ChannelHandlerContext)

當請求從Channel讀取更多的數據時被調用

flush(ChannelHandlerContext)

當請求通過Channel將入隊數據沖刷到遠程節點時被調用

write(ChannelHandlerContext,Object,
ChannelPromise)

當請求通過Channel將數據寫到遠程節點時被調用

ChannelPromise與ChannelFuture ChannelOutboundHandler中的大部分方法都需要一個ChannelPromise參數,以便在操作完成時得到通知。ChannelPromiseChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccesssetFailure,從而使ChannelFuture不可變[2]。

接下來我們將看一看那些簡化了編寫ChannelHandler的任務的類。

6.1.5 ChannelHandler適配器

你可以使用ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter類作為自己的ChannelHandler的起始點。這兩個適配器分別提供了ChannelInboundHandlerChannelOutboundHandler的基本實現。通過擴展抽像類ChannelHandlerAdapter,它們獲得了它們共同的超接口ChannelHandler的方法。生成的類的層次結構如圖6-2所示。

圖6-2 ChannelHandlerAdapter類的層次結構

ChannelHandlerAdapter還提供了實用方法isSharable。如果其對應的實現被標注為Sharable,那麼這個方法將返回true,表示它可以被添加到多個ChannelPipeline中(如在2.3.1節中所討論過的一樣)。

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter中所提供的方法體調用了其相關聯的ChannelHandlerContext上的等效方法,從而將事件轉發到了ChannelPipeline中的下一個ChannelHandler中。

你要想在自己的ChannelHandler中使用這些適配器類,只需要簡單地擴展它們,並且重寫那些你想要自定義的方法。

6.1.6 資源管理

每當通過調用ChannelInboundHandler.channelRead或者ChannelOutbound- Handler.write方法來處理數據時,你都需要確保沒有任何的資源洩漏。你可能還記得在前面的章節中所提到的,Netty使用引用計數來處理池化的ByteBuf。所以在完全使用完某個ByteBuf後,調整其引用計數是很重要的。

為了幫助你診斷潛在的(資源洩漏)問題,Netty提供了class ResourceLeakDetector[3],它將對你應用程序的緩衝區分配做大約1%的採樣來檢測內存洩露。相關的開銷是非常小的。

如果檢測到了內存洩露,將會產生類似於下面的日誌消息:

LEAK: ByteBuf.release was not called before it's garbage-collected. Enable
advanced leak reporting to find out where the leak occurred. To enable
advanced leak reporting, specify the JVM option
'-Dio.netty.leakDetectionLevel=ADVANCED' or call
ResourceLeakDetector.setLevel.  

Netty目前定義了4種洩漏檢測級別,如表6-5所示。

表6-5 洩漏檢測級別

級  別

描  述

DISABLED

禁用洩漏檢測。只有在詳盡的測試之後才應設置為這個值

SIMPLE

使用1%的默認採樣率檢測並報告任何發現的洩露。這是默認級別,適合絕大部分的情況

ADVANCED

使用默認的採樣率,報告所發現的任何的洩露以及對應的消息被訪問的位置

PARANOID

類似於ADVANCED,但是其將會對每次(對消息的)訪問都進行採樣。這對性能將會有很大的影響,應該只在調試階段使用

洩露檢測級別可以通過將下面的Java系統屬性設置為表中的一個值來定義:

java -Dio.netty.leakDetectionLevel=ADVANCED  

如果帶著該JVM選項重新啟動你的應用程序,你將看到自己的應用程序最近被洩漏的緩衝區被訪問的位置。下面是一個典型的由單元測試產生的洩漏報告:

Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:
   ByteBuf.release was not called before it's garbage-collected.
Recent access records: 1
#1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(
  AdvancedLeakAwareByteBuf.java:697)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(
  XmlFrameDecoderTest.java:157)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(
  XmlFrameDecoderTest.java:133)
...  

實現ChannelInboundHandler.channelReadChannelOutboundHandler.write方法時,應該如何使用這個診斷工具來防止洩露呢?讓我們看看你的channelRead操作直接消費入站消息的情況;也就是說,它不會通過調用ChannelHandlerContext.fireChannelRead方法將入站消息轉發給下一個ChannelInboundHandler。代碼清單6-3展示了如何釋放消息。

代碼清單6-3 消費並釋放入站消息

@Sharable
public class DiscardInboundHandler extends ChannelInboundHandlerAdapter {  ← --  擴展了ChannelInboundandlerAdapter
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
     ReferenceCountUtil.release(msg);  ← -- 通過調用ReferenceCountUtil.release方法釋放資源
   }
}  

消費入站消息的簡單方式 由於消費入站數據是一項常規任務,所以Netty提供了一個特殊的被稱為SimpleChannelInboundHandlerChannelInboundHandler實現。這個實現會在消息被channelRead0方法消費之後自動釋放消息。

在出站方向這邊,如果你處理了write操作並丟棄了一個消息,那麼你也應該負責釋放它。代碼清單6-4展示了一個丟棄所有的寫入數據的實現。

代碼清單6-4 丟棄並釋放出站消息

@Sharable
public class DiscardOutboundHandler
  extends ChannelOutboundHandlerAdapter {   ← --  擴展了ChannelOutboundHandlerAdapter
  @Override
  public void write(ChannelHandlerContext ctx,
    Object msg, ChannelPromise promise) {
    ReferenceCountUtil.release(msg);  ← -- 通過使用R eferenceCountUtil.realse(...)方法釋放資源
    promise.setSuccess;   ← -- 通知ChannelPromise數據已經被處理了
  }
}  

重要的是,不僅要釋放資源,還要通知ChannelPromise。否則可能會出現Channel-FutureListener收不到某個消息已經被處理了的通知的情況。

總之,如果一個消息被消費或者丟棄了,並且沒有傳遞給ChannelPipeline中的下一個ChannelOutboundHandler,那麼用戶就有責任調用ReferenceCountUtil.release。如果消息到達了實際的傳輸層,那麼當它被寫入時或者Channel關閉時,都將被自動釋放。

6.2 ChannelPipeline接口

如果你認為ChannelPipeline是一個攔截流經Channel的入站和出站事件的Channel-Handler實例鏈,那麼就很容易看出這些ChannelHandler之間的交互是如何組成一個應用程序數據和事件處理邏輯的核心的。

每一個新創建的Channel都將會被分配一個新的ChannelPipeline。這項關聯是永久性的;Channel既不能附加另外一個ChannelPipeline,也不能分離其當前的。在Netty組件的生命週期中,這是一項固定的操作,不需要開發人員的任何干預。

根據事件的起源,事件將會被ChannelInboundHandler或者ChannelOutboundHandler處理。隨後,通過調用ChannelHandlerContext實現,它將被轉發給同一超類型的下一個ChannelHandler

ChannelHandlerContext

ChannelHandlerContext使得ChannelHandler能夠和它的ChannelPipeline以及其他的ChannelHandler交互。ChannelHandler可以通知其所屬的ChannelPipeline中的下一個ChannelHandler,甚至可以動態修改它所屬的ChannelPipeline[4]

ChannelHandlerContext具有豐富的用於處理事件和執行I/O操作的API。6.3節將提供有關ChannelHandlerContext的更多內容。

圖6-3展示了一個典型的同時具有入站和出站ChannelHandlerChannelPipeline的佈局,並且印證了我們之前的關於ChannelPipeline主要由一系列的ChannelHandler所組成的說法。ChannelPipeline還提供了通過ChannelPipeline本身傳播事件的方法。如果一個入站事件被觸發,它將被從ChannelPipeline的頭部開始一直被傳播到Channel Pipeline的尾端。在圖6-3中,一個出站I/O事件將從ChannelPipeline的最右邊開始,然後向左傳播。

圖6-3 ChannelPipeline和它的ChannelHandler

ChannelPipeline相對論

你可能會說,從事件途經ChannelPipeline的角度來看,ChannelPipeline的頭部和尾端取決於該事件是入站的還是出站的。然而Netty總是將ChannelPipeline的入站口(圖6-3中的左側)作為頭部,而將出站口(該圖的右側)作為尾端。

當你完成了通過調用ChannelPipeline.add*方法將入站處理器(ChannelInboundHandler)和出站處理器(ChannelOutboundHandler)混合添加到ChannelPipeline之後,每一個ChannelHandler從頭部到尾端的順序位置正如同我們方纔所定義它們的一樣。因此,如果你將圖6-3中的處理器(ChannelHandler)從左到右進行編號,那麼第一個被入站事件看到的ChannelHandler將是1,而第一個被出站事件看到的ChannelHandler將是5。

ChannelPipeline傳播事件時,它會測試ChannelPipeline中的下一個Channel- Handler的類型是否和事件的運動方向相匹配。如果不匹配,ChannelPipeline將跳過該ChannelHandler並前進到下一個,直到它找到和該事件所期望的方向相匹配的為止。(當然,ChannelHandler也可以同時實現ChannelInboundHandler接口和ChannelOutbound- Handler接口。)

6.2.1 修改ChannelPipeline

ChannelHandler可以通過添加、刪除或者替換其他的ChannelHandler來實時地修改ChannelPipeline的佈局。(它也可以將它自己從ChannelPipeline中移除。)這是Channel- Handler最重要的能力之一,所以我們將仔細地來看看它是如何做到的。表6-6列出了相關的方法。

表6-6 ChannelPipeline上的相關方法,由ChannelHandler用來修改ChannelPipeline的佈局

名  稱

描  述

addFirst
addBefore
addAfter
addLast

將一個ChannelHandler添加到ChannelPipeline

remove

將一個ChannelHandlerChannelPipeline中移除

replace

ChannelPipeline中的一個ChannelHandler替換為另一個Channel- Handler

代碼清單6-5展示了這些方法的使用。

代碼清單6-5 修改ChannelPipeline

ChannelPipeline pipeline = ..;
FirstHandler firstHandler = new FirstHandler;   ← --  創建一個FirstHandler 的實例
pipeline.addLast("handler1", firstHandler);  ← --  將該實例作為"handler1" 添加到ChannelPipeline 中
pipeline.addFirst("handler2", new SecondHandler);  ← --  將一個SecondHandler的實例作為"handler2"添加到ChannelPipeline的第一個槽中。這意味著它將被放置在已有的"handler1"之前 
pipeline.addLast("handler3", new ThirdHandler);  ← --  將一個ThirdHandler 的實例作為"handler3"添加到ChannelPipeline 的最後一個槽中 
...
pipeline.remove("handler3");  ← --  通過名稱移除"handler3"  
pipeline.remove(firstHandler);  ← --  通過引 用移除FirstHandler(它是唯一的,所以不需要它的名稱) 
pipeline.replace("handler2", "handler4", new ForthHandler);  ← --  將SecondHandler("handler2")替換為FourthHandler:"handler4"  

稍後,你將看到,重組ChannelHandler的這種能力使我們可以用它來輕鬆地實現極其靈活的邏輯。

ChannelHandler的執行和阻塞

通常ChannelPipeline中的每一個ChannelHandler都是通過它的EventLoop(I/O線程)來處理傳遞給它的事件的。所以至關重要的是不要阻塞這個線程,因為這會對整體的I/O處理產生負面的影響。

但有時可能需要與那些使用阻塞API的遺留代碼進行交互。對於這種情況,ChannelPipeline有一些接受一個EventExecutorGroupadd方法。如果一個事件被傳遞給一個自定義的EventExecutor- Group,它將被包含在這個EventExecutorGroup中的某個EventExecutor所處理,從而被從該Channel本身的EventLoop中移除。對於這種用例,Netty提供了一個叫DefaultEventExecutor- Group的默認實現。

除了這些操作,還有別的通過類型或者名稱來訪問ChannelHandler的方法。這些方法都列在了表6-7中。

表6-7 ChannelPipeline的用於訪問ChannelHandler的操作

名  稱

描  述

get

通過類型或者名稱返回ChannelHandler

context

返回和ChannelHandler綁定的ChannelHandlerContext

names

返回ChannelPipeline中所有ChannelHandler的名稱

6.2.2 觸發事件

ChannelPipeline的API公開了用於調用入站和出站操作的附加方法。表6-8列出了入站操作,用於通知ChannelInboundHandlerChannelPipeline中所發生的事件。

表6-8 ChannelPipeline的入站操作

方 法 名 稱

描  述

fireChannelRegistered

調用ChannelPipeline中下一個ChannelInboundHandlerchannelRegistered(ChannelHandlerContext)方法

fireChannelUnregistered

調用ChannelPipeline中下一個ChannelInboundHandlerchannelUnregistered(ChannelHandlerContext)方法

fireChannelActive

調用ChannelPipeline中下一個ChannelInboundHandlerchannelActive(ChannelHandlerContext)方法

fireChannelInactive

調用ChannelPipeline中下一個ChannelInboundHandlerchannelInactive(ChannelHandlerContext)方法

fireExceptionCaught

調用ChannelPipeline中下一個ChannelInboundHandlerexceptionCaught(ChannelHandlerContext, Throwable)方法

fireUserEventTriggered

調用ChannelPipeline中下一個ChannelInboundHandleruserEventTriggered(ChannelHandlerContext, Object)方法

fireChannelRead

調用ChannelPipeline中下一個ChannelInboundHandlerchannelRead(ChannelHandlerContext, Object msg)方法

fireChannelReadComplete

調用ChannelPipeline中下一個ChannelInboundHandlerchannelReadComplete(ChannelHandlerContext)方法

fireChannelWritability - Changed

調用ChannelPipeline中下一個ChannelInboundHandlerchannelWritabilityChanged(ChannelHandlerContext)方

在出站這邊,處理事件將會導致底層的套接字上發生一系列的動作。表6-9列出了Channel- Pipeline API的出站操作。

表6-9 ChannelPipeline的出站操作

方 法 名 稱

描  述

bind

Channel綁定到一個本地地址,這將調用ChannelPipeline中的下一個ChannelOutboundHandlerbind(ChannelHandlerContext, Socket- Address, ChannelPromise)方法

connect

Channel連接到一個遠程地址,這將調用ChannelPipeline中的下一個ChannelOutboundHandlerconnect(ChannelHandlerContext, Socket- Address, ChannelPromise)方法

disconnect

Channel斷開連接。這將調用ChannelPipeline中的下一個ChannelOutbound- Handlerdisconnect(ChannelHandlerContext, Channel Promise)方法

close

Channel關閉。這將調用ChannelPipeline中的下一個ChannelOutbound- Handlerclose(ChannelHandlerContext, ChannelPromise)方法

deregister

Channel從它先前所分配的EventExecutor(即EventLoop)中註銷。這將調用ChannelPipeline中的下一個ChannelOutboundHandlerderegister (ChannelHandlerContext, ChannelPromise)方法

flush

沖刷Channel所有掛起的寫入。這將調用ChannelPipeline中的下一個Channel- OutboundHandlerflush(ChannelHandlerContext)方法

write

將消息寫入Channel。這將調用ChannelPipeline中的下一個Channel- OutboundHandlerwrite(ChannelHandlerContext, Object msg, Channel- Promise)方法。注意:這並不會將消息寫入底層的Socket,而只會將它放入隊列中。要將它寫入Socket,需要調用flush或者writeAndFlush方法

writeAndFlush

這是一個先調用write方法再接著調用flush方法的便利方法

read

請求從Channel中讀取更多的數據。這將調用ChannelPipeline中的下一個ChannelOutboundHandlerread(ChannelHandlerContext)方法

總結一下:

  • ChannelPipeline保存了與Channel相關聯的ChannelHandler
  • ChannelPipeline可以根據需要,通過添加或者刪除ChannelHandler來動態地修改;
  • ChannelPipeline有著豐富的API用以被調用,以響應入站和出站事件。

6.3 ChannelHandlerContext接口

ChannelHandlerContext代表了ChannelHandlerChannelPipeline之間的關聯,每當有ChannelHandler添加到ChannelPipeline中時,都會創建ChannelHandler- ContextChannelHandlerContext的主要功能是管理它所關聯的ChannelHandler和在同一個ChannelPipeline中的其他ChannelHandler之間的交互。

ChannelHandlerContext有很多的方法,其中一些方法也存在於ChannelChannel- Pipeline本身上,但是有一點重要的不同。如果調用Channel或者ChannelPipeline上的這些方法,它們將沿著整個ChannelPipeline進行傳播。而調用位於ChannelHandlerContext上的相同方法,則將從當前所關聯的ChannelHandler開始,並且只會傳播給位於該ChannelPipeline中的下一個能夠處理該事件的ChannelHandler

表6-10對ChannelHandlerContext API進行了總結。

表6-10 ChannelHandlerContext的API

方 法 名 稱

描  述

alloc

返回和這個實例相關聯的Channel所配置的ByteBufAllocator

bind

綁定到給定的SocketAddress,並返回ChannelFuture

channel

返回綁定到這個實例的Channel

close

關閉Channel,並返回ChannelFuture

connect

連接給定的SocketAddress,並返回ChannelFuture

deregister

從之前分配的EventExecutor註銷,並返回ChannelFuture

disconnect

從遠程節點斷開,並返回ChannelFuture

executor

返回調度事件的EventExecutor

fireChannelActive

觸發對下一個ChannelInboundHandler上的channelActive方法(已連接)的調用

fireChannelInactive

觸發對下一個ChannelInboundHandler上的channelInactive方法(已關閉)的調用

fireChannelRead

觸發對下一個ChannelInboundHandler上的channelRead方法(已接收的消息)的調用

fireChannelReadComplete

觸發對下一個ChannelInboundHandler上的channelReadComplete方法的調用

fireChannelRegistered

觸發對下一個ChannelInboundHandler上的fireChannelRegistered方法的調用

fireChannelUnregistered

觸發對下一個ChannelInboundHandler上的fireChannelUnregistered方法的調用

fireChannelWritabilityChanged

觸發對下一個ChannelInboundHandler上的fireChannelWritabilityChanged方法的調用

fireExceptionCaught

觸發對下一個ChannelInboundHandler上的fireExceptionCaught(Throwable)方法的調用

fireUserEventTriggered

觸發對下一個ChannelInboundHandler上的fireUserEventTriggered(Object evt)方法的調用

handler

返回綁定到這個實例的ChannelHandler

isRemoved

如果所關聯的ChannelHandler已經被從ChannelPipeline中移除則返回true

name

返回這個實例的唯一名稱

pipeline

返回這個實例所關聯的ChannelPipeline

read

將數據從Channel讀取到第一個入站緩衝區;如果讀取成功則觸發[5]一個channelRead事件,並(在最後一個消息被讀取完成後)通知ChannelInboundHandler的channelReadComplete (ChannelHandlerContext)方法

write

通過這個實例寫入消息並經過ChannelPipeline

writeAndFlush

通過這個實例寫入並沖刷消息並經過ChannelPipeline

當使用ChannelHandlerContext的API的時候,請牢記以下兩點:

  • ChannelHandlerContextChannelHandler之間的關聯(綁定)是永遠不會改變的,所以緩存對它的引用是安全的;
  • 如同我們在本節開頭所解釋的一樣,相對於其他類的同名方法,ChannelHandlerContext的方法將產生更短的事件流,應該盡可能地利用這個特性來獲得最大的性能。

6.3.1 使用ChannelHandlerContext

在這一節中我們將討論ChannelHandlerContext的用法,以及存在於ChannelHandler- ContextChannelChannelPipeline上的方法的行為。圖6-4展示了它們之間的關係。

圖6-4 ChannelChannelPipelineChannelHandler以及ChannelHandlerContext之間的關係

在代碼清單6-6中,將通過ChannelHandlerContext獲取到Channel的引用。調用Channel上的write方法將會導致寫入事件從尾端到頭部地流經ChannelPipeline

代碼清單6-6 從ChannelHandlerContext訪問Channel

ChannelHandlerContext ctx = ..; 
Channel channel = ctx.channel;   ← --  獲取到與ChannelHandlerContext相關聯的Channel 的引用
channel.write(Unpooled.copiedBuffer("Netty in Action",  ← --  通過Channel 寫入緩衝區
  CharsetUtil.UTF_8));  

代碼清單6-7展示了一個類似的例子,但是這一次是寫入ChannelPipeline。我們再次看到,(到ChannelPipline的)引用是通過ChannelHandlerContext獲取的。

代碼清單6-7 通過ChannelHandlerContext訪問ChannelPipeline

ChannelHandlerContext ctx = ..;
ChannelPipeline pipeline = ctx.pipeline;   ← --  獲取到與ChannelHandlerContext相關聯的ChannelPipeline 的引用
pipeline.write(Unpooled.copiedBuffer("Netty in Action",   ← --  通過ChannelPipeline寫入緩衝區
  CharsetUtil.UTF_8));  

如同在圖6-5中所能夠看到的一樣,代碼清單6-6和代碼清單6-7中的事件流是一樣的。重要的是要注意到,雖然被調用的Channel或ChannelPipeline上的write方法將一直傳播事件通過整個ChannelPipeline,但是在ChannelHandler的級別上,事件從一個ChannelHandler到下一個ChannelHandler的移動是由ChannelHandlerContext上的調用完成的。

圖6-5 通過Channel或者ChannelPipeline進行的事件傳播

為什麼會想要從ChannelPipeline中的某個特定點開始傳播事件呢?

  • 為了減少將事件傳經對它不感興趣的ChannelHandler所帶來的開銷。
  • 為了避免將事件傳經那些可能會對它感興趣的ChannelHandler

要想調用從某個特定的ChannelHandler開始的處理過程,必須獲取到在(Channel- Pipeline)該ChannelHandler之前的ChannelHandler所關聯的ChannelHandler- Context。這個ChannelHandlerContext將調用和它所關聯的ChannelHandler之後的ChannelHandler

代碼清單6-8和圖6-6說明了這種用法。

代碼清單6-8 調用ChannelHandlerContextwrite方法

ChannelHandlerContext ctx = ..;   ← --  獲取到ChannelHandlerContext的引用
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));   ← --  write方法將把緩衝區數據發送到下一個ChannelHandler  

如圖6-6所示,消息將從下一個ChannelHandler開始流經ChannelPipeline,繞過了所有前面的ChannelHandler

圖6-6 通過ChannelHandlerContext觸發的操作的事件流

我們剛才所描述的用例是常見的,對於調用特定的ChannelHandler實例上的操作尤其有用。

6.3.2 ChannelHandler和ChannelHandlerContext的高級用法

正如我們在代碼清單6-6中所看到的,你可以通過調用ChannelHandlerContext上的pipeline方法來獲得被封閉的ChannelPipeline的引用。這使得運行時得以操作ChannelPipelineChannelHandler,我們可以利用這一點來實現一些複雜的設計。例如,你可以通過將ChannelHandler添加到ChannelPipeline中來實現動態的協議切換。

另一種高級的用法是緩存到ChannelHandlerContext的引用以供稍後使用,這可能會發生在任何的ChannelHandler方法之外,甚至來自於不同的線程。代碼清單6-9展示了用這種模式來觸發事件。

代碼清單6-9 緩存到ChannelHandlerContext的引用

public class WriteHandler extends ChannelHandlerAdapter {
  private ChannelHandlerContext ctx;
  @Override
  public void handlerAdded(ChannelHandlerContext ctx) {
    this.ctx = ctx;   ← --  存儲到ChannelHandlerContext的引用以供稍後使用
  }
  public void send(String msg) {  ← --  使用之前存儲的到ChannelHandlerContext的引用來發送消息
    ctx.writeAndFlush(msg);
  }
}  

因為一個ChannelHandler可以從屬於多個ChannelPipeline,所以它也可以綁定到多個ChannelHandlerContext實例。對於這種用法指在多個ChannelPipeline中共享同一個ChannelHandler,對應的ChannelHandler必須要使用@Sharable註解標注;否則,試圖將它添加到多個ChannelPipeline時將會觸發異常。顯而易見,為了安全地被用於多個並發的Channel(即連接),這樣的ChannelHandler必須是線程安全的。

代碼清單6-10展示了這種模式的一個正確實現。

代碼清單6-10 可共享的ChannelHandler

@Sharable 
public class SharableHandler extends ChannelInboundHandlerAdapter {  ← --  使用註解@Sharable標注
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
    System.out.println("Channel read message: " + msg);
    ctx.fireChannelRead(msg);   ← --  記錄方法調用,並轉發給下一個ChannelHandler
  }
}  

前面的ChannelHandler實現符合所有的將其加入到多個ChannelPipeline的需求,即它使用了註解@Sharable標注,並且也不持有任何的狀態。相反,代碼清單6-11中的實現將會導致問題。

代碼清單6-11 @Sharable的錯誤用法

@Sharable  ← --  使用註解@Sharable標注
public class UnsharableHandler extends ChannelInboundHandlerAdapter {
  private int count;
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
    count++;  ← --  將count 字段的值加1
    System.out.println("channelRead(...) called the "
      + count + " time");   ← --  記錄方法調用,並轉發給下一個ChannelHandler
    ctx.fireChannelRead(msg);
  }
}  

這段代碼的問題在於它擁有狀態[6],即用於跟蹤方法調用次數的實例變量count。將這個類的一個實例添加到ChannelPipeline將極有可能在它被多個並發的Channel訪問時導致問題。(當然,這個簡單的問題可以通過使channelRead方法變為同步方法來修正。)

總之,只應該在確定了你的ChannelHandler是線程安全的時才使用@Sharable註解。

為何要共享同一個ChannelHandler 在多個ChannelPipeline中安裝同一個ChannelHandler的一個常見的原因是用於收集跨越多個Channel的統計信息。

我們對於ChannelHandlerContext和它與其他的框架組件之間的關係的討論到此就結束了。接下來我們將看看異常處理。

6.4 異常處理

異常處理是任何真實應用程序的重要組成部分,它也可以通過多種方式來實現。因此,Netty提供了幾種方式用於處理入站或者出站處理過程中所拋出的異常。這一節將幫助你瞭解如何設計最適合你需要的方式。

6.4.1 處理入站異常

如果在處理入站事件的過程中有異常被拋出,那麼它將從它在ChannelInboundHandler裡被觸發的那一點開始流經ChannelPipeline。要想處理這種類型的入站異常,你需要在你的ChannelInboundHandler實現中重寫下面的方法。

public void exceptionCaught(
  ChannelHandlerContext ctx, Throwable cause) throws Exception  

代碼清單6-12展示了一個簡單的示例,其關閉了Channel並打印了異常的棧跟蹤信息。

代碼清單6-12 基本的入站異常處理

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx,
    Throwable cause) {
    cause.printStackTrace;
    ctx.close;
  }
}  

因為異常將會繼續按照入站方向流動(就像所有的入站事件一樣),所以實現了前面所示邏輯的ChannelInboundHandler通常位於ChannelPipeline的最後。這確保了所有的入站異常都總是會被處理,無論它們可能會發生在ChannelPipeline中的什麼位置。

你應該如何響應異常,可能很大程度上取決於你的應用程序。你可能想要關閉Channel(和連接),也可能會嘗試進行恢復。如果你不實現任何處理入站異常的邏輯(或者沒有消費該異常),那麼Netty將會記錄該異常沒有被處理的事實[7]。

總結一下:

  • ChannelHandler.exceptionCaught的默認實現是簡單地將當前異常轉發給ChannelPipeline中的下一個ChannelHandler
  • 如果異常到達了ChannelPipeline的尾端,它將會被記錄為未被處理;
  • 要想定義自定義的處理邏輯,你需要重寫exceptionCaught方法。然後你需要決定是否需要將該異常傳播出去。

6.4.2 處理出站異常

用於處理出站操作中的正常完成以及異常的選項,都基於以下的通知機制。

  • 每個出站操作都將返回一個ChannelFuture。註冊到ChannelFutureChannel- FutureListener將在操作完成時被通知該操作是成功了還是出錯了。
  • 幾乎所有的ChannelOutboundHandler上的方法都會傳入一個ChannelPromise的實例。作為ChannelFuture的子類,ChannelPromise也可以被分配用於異步通知的監聽器。但是,ChannelPromise還具有提供立即通知的可寫方法:
  ChannelPromise setSuccess;
  ChannelPromise setFailure(Throwable cause);  

添加ChannelFutureListener只需要調用ChannelFuture實例上的addListener(ChannelFutureListener)方法,並且有兩種不同的方式可以做到這一點。其中最常用的方式是,調用出站操作(如write方法)所返回的ChannelFuture上的addListener方法。

代碼清單6-13使用了這種方式來添加ChannelFutureListener,它將打印棧跟蹤信息並且隨後關閉Channel

代碼清單6-13 添加ChannelFutureListenerChannelFuture

ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener {
  @Override
  public void operationComplete(ChannelFuture f) {
    if (!f.isSuccess) {
      f.cause.printStackTrace;
      f.channel.close;
    }
  }
});  

第二種方式是將ChannelFutureListener添加到即將作為參數傳遞給ChannelOut- boundHandler的方法的ChannelPromise。代碼清單6-14中所展示的代碼和代碼清單6-13中所展示的具有相同的效果。

代碼清單6-14 添加ChannelFutureListenerChannelPromise

public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
  @Override
  public void write(ChannelHandlerContext ctx, Object msg,
    ChannelPromise promise) {
    promise.addListener(new ChannelFutureListener {
      @Override
      public void operationComplete(ChannelFuture f) {
        if (!f.isSuccess) {
          f.cause.printStackTrace;
          f.channel.close;
        }
      }
    });
  }
}  

ChannelPromise的可寫方法

通過調用ChannelPromise上的setSuccesssetFailure方法,可以使一個操作的狀態在ChannelHandler的方法返回給其調用者時便即刻被感知到。

為何選擇一種方式而不是另一種呢?對於細緻的異常處理,你可能會發現,在調用出站操作時添加ChannelFutureListener更合適,如代碼清單6-13所示。而對於一般的異常處理,你可能會發現,代碼清單6-14所示的自定義的ChannelOutboundHandler實現的方式更加的簡單。

如果你的ChannelOutboundHandler本身拋出了異常會發生什麼呢?在這種情況下,Netty本身會通知任何已經註冊到對應ChannelPromise的監聽器。

6.5 小結

在本章中我們仔細地研究了Netty的數據處理組件——ChannelHandler。我們討論了ChannelHandler是如何鏈接在一起,以及它們是如何作為ChannelInboundHandlerChannelOutboundHandler與ChannelPipeline進行交互的。

下一章將介紹Netty的EventLoop和並發模型,這對於理解Netty是如何實現異步的、事件驅動的網絡編程模型來說至關重要。


[1] 當所有可讀的字節都已經從Channel中讀取之後,將會調用該回調方法;所以,可能在channelRead- Complete被調用之前看到多次調用channelRead(...)。——譯者注

[2] 這裡借鑒的是Scala的Promise和Future的設計,當一個Promise被完成之後,其對應的Future的值便不能再進行任何修改了。——譯者注

[3] 其利用了JDK提供的PhantomReference類來實現這一點。——譯者注

[4] 這裡指修改ChannelPipeline中的ChannelHandler的編排。——譯者注

[5] 通過配合ChannelConfig.setAutoRead(boolean autoRead)方法,可以實現反應式系統的特性之一回壓(back-pressure)。——譯者注

[6] 主要的問題在於,對於其所持有的狀態的修改並不是線程安全的,比如也可以通過使用AtomicInteger來規避這個問題。——譯者注

[7] 即Netty將會通過Warning級別的日誌記錄該異常到達了ChannelPipeline的尾端,但沒有被處理,並嘗試釋放該異常。——譯者注