本章主要內容
ChannelHandler
API和ChannelPipeline
API- 檢測資源洩漏
- 異常處理
在上一章中你學習了ByteBuf
——Netty的數據容器。當我們在本章中探討Netty的數據流以及處理組件時,我們將基於已經學過的東西,並且你將開始看到框架的重要元素都結合到了一起。
你已經知道,可以在ChannelPipeline
中將ChannelHandler
鏈接在一起以組織處理邏輯。我們將會研究涉及這些類的各種用例,以及一個重要的關係——ChannelHandlerContext
。
理解所有這些組件之間的交互對於通過Netty構建模塊化的、可重用的實現至關重要。
6.1 ChannelHandler家族
在我們開始詳細地學習ChannelHandler
之前,我們將在Netty的組件模型的這部分基礎上花上一些時間。
6.1.1 Channel的生命週期
Interface Channel
定義了一組和ChannelInboundHandler
API密切相關的簡單但功能強大的狀態模型,表6-1列出了Channel
的這4個狀態。
表6-1 Channel
的生命週期狀態
狀 態
描 述
ChannelUnregistered
Channel
已經被創建,但還未註冊到EventLoop
ChannelRegistered
Channel
已經被註冊到了EventLoop
ChannelActive
Channel
處於活動狀態(已經連接到它的遠程節點)。它現在可以接收和發送數據了
ChannelInactive
Channel
沒有連接到遠程節點
Channel
的正常生命週期如圖6-1所示。當這些狀態發生改變時,將會生成對應的事件。這些事件將會被轉發給ChannelPipeline
中的ChannelHandler
,其可以隨後對它們做出響應。
圖6-1 Channel
的狀態模型
6.1.2 ChannelHandler的生命週期
表6-2中列出了interface ChannelHandler
定義的生命週期操作,在ChannelHandler
被添加到ChannelPipeline
中或者被從ChannelPipeline
中移除時會調用這些操作。這些方法中的每一個都接受一個ChannelHandlerContext
參數。
表6-2 ChannelHandler
的生命週期方法
類 型
描 述
handlerAdded
當把ChannelHandler
添加到ChannelPipeline
中時被調用
handlerRemoved
當從ChannelPipeline
中移除ChannelHandler
時被調用
exceptionCaught
當處理過程中在ChannelPipeline
中有錯誤產生時被調用
Netty定義了下面兩個重要的ChannelHandler
子接口:
ChannelInboundHandler
——處理入站數據以及各種狀態變化;ChannelOutboundHandler
——處理出站數據並且允許攔截所有的操作。
在接下來的章節中,我們將詳細地討論這些子接口。
6.1.3 ChannelInboundHandler接口
表6-3列出了interface ChannelInboundHandler
的生命週期方法。這些方法將會在數據被接收時或者與其對應的Channel
狀態發生改變時被調用。正如我們前面所提到的,這些方法和Channel
的生命週期密切相關。
表6-3 ChannelInboundHandler
的方法
類 型
描 述
channelRegistered
當Channel
已經註冊到它的EventLoop
並且能夠處理I/O時被調用
channelUnregistered
當Channel
從它的EventLoop
註銷並且無法處理任何I/O時被調用
channelActive
當Channel
處於活動狀態時被調用;Channel
已經連接/綁定並且已經就緒
channelInactive
當Channel
離開活動狀態並且不再連接它的遠程節點時被調用
channelReadComplete
當Channel
上的一個讀操作完成時被調用[1]
channelRead
當從Channel
讀取數據時被調用
ChannelWritability - Changed
當Channel
的可寫狀態發生改變時被調用。用戶可以確保寫操作不會完成得太快(以避免發生OutOfMemoryError
)或者可以在Channel
變為再次可寫時恢復寫入。可以通過調用Channel
的isWritable
方法來檢測Channel
的可寫性。與可寫性相關的閾值可以通過Channel.config. setWriteHighWaterMark
和Channel.config.setWriteLowWater- Mark
方法來設置
userEventTriggered
當ChannelnboundHandler.fireUserEventTriggered
方法被調用時被調用,因為一個POJO被傳經了ChannelPipeline
當某個ChannelInboundHandler
的實現重寫channelRead
方法時,它將負責顯式地釋放與池化的ByteBuf
實例相關的內存。Netty為此提供了一個實用方法ReferenceCount-Util.release
,如代碼清單6-1所示。
代碼清單6-1 釋放消息資源
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter { ← -- 擴展了Channel-InboundHandler-Adapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { ← -- 丟棄已接收的消息
ReferenceCountUtil.release(msg);
}
}
Netty將使用WARN
級別的日誌消息記錄未釋放的資源,使得可以非常簡單地在代碼中發現違規的實例。但是以這種方式管理資源可能很繁瑣。一個更加簡單的方式是使用Simple- ChannelInboundHandler
。代碼清單6-2是代碼清單6-1的一個變體,說明了這一點。
代碼清單6-2 使用SimpleChannelInboundHandler
@Sharable
public class SimpleDiscardHandler
extends SimpleChannelInboundHandler<Object> { ← -- 擴展了SimpleChannelInboundHandler
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
// No need to do anything special ← -- 不需要任何顯式的資源釋放
}
}
由於SimpleChannelInboundHandler
會自動釋放資源,所以你不應該存儲指向任何消息的引用供將來使用,因為這些引用都將會失效。
6.1.6節為引用處理提供了更加詳細的討論。
6.1.4 ChannelOutboundHandler接口
出站操作和數據將由ChannelOutboundHandler
處理。它的方法將被Channel
、Channel- Pipeline
以及ChannelHandlerContext
調用。
ChannelOutboundHandler
的一個強大的功能是可以按需推遲操作或者事件,這使得可以通過一些複雜的方法來處理請求。例如,如果到遠程節點的寫入被暫停了,那麼你可以推遲沖刷操作並在稍後繼續。
表6-4顯示了所有由ChannelOutboundHandler
本身所定義的方法(忽略了那些從Channel- Handler
繼承的方法)。
表6-4 ChannelOutboundHandler
的方法
類 型
描 述
bind(ChannelHandlerContext,
SocketAddress,ChannelPromise)
當請求將Channel
綁定到本地地址時被調用
connect(ChannelHandlerContext,
SocketAddress,SocketAddress,ChannelPromise)
當請求將Channel
連接到遠程節點時被調用
disconnect(ChannelHandlerContext,
ChannelPromise)
當請求將Channel
從遠程節點斷開時被調用
close(ChannelHandlerContext,ChannelPromise)
當請求關閉Channel
時被調用
deregister(ChannelHandlerContext,
ChannelPromise)
當請求將Channel
從它的EventLoop
註銷時被調用
read(ChannelHandlerContext)
當請求從Channel
讀取更多的數據時被調用
flush(ChannelHandlerContext)
當請求通過Channel
將入隊數據沖刷到遠程節點時被調用
write(ChannelHandlerContext,Object,
ChannelPromise)
當請求通過Channel
將數據寫到遠程節點時被調用
ChannelPromise與ChannelFuture
ChannelOutboundHandler
中的大部分方法都需要一個ChannelPromise
參數,以便在操作完成時得到通知。ChannelPromise
是ChannelFuture
的一個子類,其定義了一些可寫的方法,如setSuccess
和setFailure
,從而使ChannelFuture
不可變[2]。
接下來我們將看一看那些簡化了編寫ChannelHandler
的任務的類。
6.1.5 ChannelHandler適配器
你可以使用ChannelInboundHandlerAdapter
和ChannelOutboundHandlerAdapter
類作為自己的ChannelHandler
的起始點。這兩個適配器分別提供了ChannelInboundHandler
和ChannelOutboundHandler
的基本實現。通過擴展抽像類ChannelHandlerAdapter
,它們獲得了它們共同的超接口ChannelHandler
的方法。生成的類的層次結構如圖6-2所示。
圖6-2 ChannelHandlerAdapter
類的層次結構
ChannelHandlerAdapter
還提供了實用方法isSharable
。如果其對應的實現被標注為Sharable
,那麼這個方法將返回true
,表示它可以被添加到多個ChannelPipeline
中(如在2.3.1節中所討論過的一樣)。
在ChannelInboundHandlerAdapter
和ChannelOutboundHandlerAdapter
中所提供的方法體調用了其相關聯的ChannelHandlerContext
上的等效方法,從而將事件轉發到了ChannelPipeline
中的下一個ChannelHandler
中。
你要想在自己的ChannelHandler
中使用這些適配器類,只需要簡單地擴展它們,並且重寫那些你想要自定義的方法。
6.1.6 資源管理
每當通過調用ChannelInboundHandler.channelRead
或者ChannelOutbound- Handler.write
方法來處理數據時,你都需要確保沒有任何的資源洩漏。你可能還記得在前面的章節中所提到的,Netty使用引用計數來處理池化的ByteBuf
。所以在完全使用完某個ByteBuf
後,調整其引用計數是很重要的。
為了幫助你診斷潛在的(資源洩漏)問題,Netty提供了class ResourceLeakDetector
[3],它將對你應用程序的緩衝區分配做大約1%的採樣來檢測內存洩露。相關的開銷是非常小的。
如果檢測到了內存洩露,將會產生類似於下面的日誌消息:
LEAK: ByteBuf.release was not called before it's garbage-collected. Enable
advanced leak reporting to find out where the leak occurred. To enable
advanced leak reporting, specify the JVM option
'-Dio.netty.leakDetectionLevel=ADVANCED' or call
ResourceLeakDetector.setLevel.
Netty目前定義了4種洩漏檢測級別,如表6-5所示。
表6-5 洩漏檢測級別
級 別
描 述
DISABLED
禁用洩漏檢測。只有在詳盡的測試之後才應設置為這個值
SIMPLE
使用1%的默認採樣率檢測並報告任何發現的洩露。這是默認級別,適合絕大部分的情況
ADVANCED
使用默認的採樣率,報告所發現的任何的洩露以及對應的消息被訪問的位置
PARANOID
類似於ADVANCED
,但是其將會對每次(對消息的)訪問都進行採樣。這對性能將會有很大的影響,應該只在調試階段使用
洩露檢測級別可以通過將下面的Java系統屬性設置為表中的一個值來定義:
java -Dio.netty.leakDetectionLevel=ADVANCED
如果帶著該JVM選項重新啟動你的應用程序,你將看到自己的應用程序最近被洩漏的緩衝區被訪問的位置。下面是一個典型的由單元測試產生的洩漏報告:
Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:
ByteBuf.release was not called before it's garbage-collected.
Recent access records: 1
#1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(
AdvancedLeakAwareByteBuf.java:697)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(
XmlFrameDecoderTest.java:157)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(
XmlFrameDecoderTest.java:133)
...
實現ChannelInboundHandler.channelRead
和ChannelOutboundHandler.write
方法時,應該如何使用這個診斷工具來防止洩露呢?讓我們看看你的channelRead
操作直接消費入站消息的情況;也就是說,它不會通過調用ChannelHandlerContext.fireChannelRead
方法將入站消息轉發給下一個ChannelInboundHandler
。代碼清單6-3展示了如何釋放消息。
代碼清單6-3 消費並釋放入站消息
@Sharable
public class DiscardInboundHandler extends ChannelInboundHandlerAdapter { ← -- 擴展了ChannelInboundandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg); ← -- 通過調用ReferenceCountUtil.release方法釋放資源
}
}
消費入站消息的簡單方式 由於消費入站數據是一項常規任務,所以Netty提供了一個特殊的被稱為
SimpleChannelInboundHandler
的ChannelInboundHandler
實現。這個實現會在消息被channelRead0
方法消費之後自動釋放消息。
在出站方向這邊,如果你處理了write
操作並丟棄了一個消息,那麼你也應該負責釋放它。代碼清單6-4展示了一個丟棄所有的寫入數據的實現。
代碼清單6-4 丟棄並釋放出站消息
@Sharable
public class DiscardOutboundHandler
extends ChannelOutboundHandlerAdapter { ← -- 擴展了ChannelOutboundHandlerAdapter
@Override
public void write(ChannelHandlerContext ctx,
Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg); ← -- 通過使用R eferenceCountUtil.realse(...)方法釋放資源
promise.setSuccess; ← -- 通知ChannelPromise數據已經被處理了
}
}
重要的是,不僅要釋放資源,還要通知ChannelPromise
。否則可能會出現Channel-FutureListener
收不到某個消息已經被處理了的通知的情況。
總之,如果一個消息被消費或者丟棄了,並且沒有傳遞給ChannelPipeline
中的下一個ChannelOutboundHandler
,那麼用戶就有責任調用ReferenceCountUtil.release
。如果消息到達了實際的傳輸層,那麼當它被寫入時或者Channel
關閉時,都將被自動釋放。
6.2 ChannelPipeline接口
如果你認為ChannelPipeline
是一個攔截流經Channel
的入站和出站事件的Channel-Handler
實例鏈,那麼就很容易看出這些ChannelHandler
之間的交互是如何組成一個應用程序數據和事件處理邏輯的核心的。
每一個新創建的Channel
都將會被分配一個新的ChannelPipeline
。這項關聯是永久性的;Channel
既不能附加另外一個ChannelPipeline
,也不能分離其當前的。在Netty組件的生命週期中,這是一項固定的操作,不需要開發人員的任何干預。
根據事件的起源,事件將會被ChannelInboundHandler
或者ChannelOutboundHandler
處理。隨後,通過調用ChannelHandlerContext
實現,它將被轉發給同一超類型的下一個ChannelHandler
。
ChannelHandlerContext
ChannelHandlerContext
使得ChannelHandler
能夠和它的ChannelPipeline
以及其他的ChannelHandler
交互。ChannelHandler
可以通知其所屬的ChannelPipeline
中的下一個ChannelHandler
,甚至可以動態修改它所屬的ChannelPipeline[4]
。
ChannelHandlerContext
具有豐富的用於處理事件和執行I/O操作的API。6.3節將提供有關ChannelHandlerContext
的更多內容。
圖6-3展示了一個典型的同時具有入站和出站ChannelHandler
的ChannelPipeline
的佈局,並且印證了我們之前的關於ChannelPipeline
主要由一系列的ChannelHandler
所組成的說法。ChannelPipeline
還提供了通過ChannelPipeline
本身傳播事件的方法。如果一個入站事件被觸發,它將被從ChannelPipeline
的頭部開始一直被傳播到Channel Pipeline
的尾端。在圖6-3中,一個出站I/O事件將從ChannelPipeline
的最右邊開始,然後向左傳播。
圖6-3 ChannelPipeline
和它的ChannelHandler
ChannelPipeline相對論
你可能會說,從事件途經
ChannelPipeline
的角度來看,ChannelPipeline
的頭部和尾端取決於該事件是入站的還是出站的。然而Netty總是將ChannelPipeline
的入站口(圖6-3中的左側)作為頭部,而將出站口(該圖的右側)作為尾端。當你完成了通過調用
ChannelPipeline.add*
方法將入站處理器(ChannelInboundHandler
)和出站處理器(ChannelOutboundHandler
)混合添加到ChannelPipeline
之後,每一個ChannelHandler
從頭部到尾端的順序位置正如同我們方纔所定義它們的一樣。因此,如果你將圖6-3中的處理器(ChannelHandler
)從左到右進行編號,那麼第一個被入站事件看到的ChannelHandler
將是1,而第一個被出站事件看到的ChannelHandler
將是5。
在ChannelPipeline
傳播事件時,它會測試ChannelPipeline
中的下一個Channel- Handler
的類型是否和事件的運動方向相匹配。如果不匹配,ChannelPipeline
將跳過該ChannelHandler
並前進到下一個,直到它找到和該事件所期望的方向相匹配的為止。(當然,ChannelHandler
也可以同時實現ChannelInboundHandler
接口和ChannelOutbound- Handler
接口。)
6.2.1 修改ChannelPipeline
ChannelHandler
可以通過添加、刪除或者替換其他的ChannelHandler
來實時地修改ChannelPipeline
的佈局。(它也可以將它自己從ChannelPipeline
中移除。)這是Channel- Handler
最重要的能力之一,所以我們將仔細地來看看它是如何做到的。表6-6列出了相關的方法。
表6-6 ChannelPipeline
上的相關方法,由ChannelHandler
用來修改ChannelPipeline
的佈局
名 稱
描 述
addFirst
addBefore
addAfter
addLast
將一個ChannelHandler
添加到ChannelPipeline
中
remove
將一個ChannelHandler
從ChannelPipeline
中移除
replace
將ChannelPipeline
中的一個ChannelHandler
替換為另一個Channel- Handler
代碼清單6-5展示了這些方法的使用。
代碼清單6-5 修改ChannelPipeline
ChannelPipeline pipeline = ..;
FirstHandler firstHandler = new FirstHandler; ← -- 創建一個FirstHandler 的實例
pipeline.addLast("handler1", firstHandler); ← -- 將該實例作為"handler1" 添加到ChannelPipeline 中
pipeline.addFirst("handler2", new SecondHandler); ← -- 將一個SecondHandler的實例作為"handler2"添加到ChannelPipeline的第一個槽中。這意味著它將被放置在已有的"handler1"之前
pipeline.addLast("handler3", new ThirdHandler); ← -- 將一個ThirdHandler 的實例作為"handler3"添加到ChannelPipeline 的最後一個槽中
...
pipeline.remove("handler3"); ← -- 通過名稱移除"handler3"
pipeline.remove(firstHandler); ← -- 通過引 用移除FirstHandler(它是唯一的,所以不需要它的名稱)
pipeline.replace("handler2", "handler4", new ForthHandler); ← -- 將SecondHandler("handler2")替換為FourthHandler:"handler4"
稍後,你將看到,重組ChannelHandler
的這種能力使我們可以用它來輕鬆地實現極其靈活的邏輯。
ChannelHandler的執行和阻塞
通常
ChannelPipeline
中的每一個ChannelHandler
都是通過它的EventLoop
(I/O線程)來處理傳遞給它的事件的。所以至關重要的是不要阻塞這個線程,因為這會對整體的I/O處理產生負面的影響。但有時可能需要與那些使用阻塞API的遺留代碼進行交互。對於這種情況,
ChannelPipeline
有一些接受一個EventExecutorGroup
的add
方法。如果一個事件被傳遞給一個自定義的EventExecutor- Group
,它將被包含在這個EventExecutorGroup
中的某個EventExecutor
所處理,從而被從該Channel
本身的EventLoop
中移除。對於這種用例,Netty提供了一個叫DefaultEventExecutor- Group
的默認實現。
除了這些操作,還有別的通過類型或者名稱來訪問ChannelHandler
的方法。這些方法都列在了表6-7中。
表6-7 ChannelPipeline
的用於訪問ChannelHandler
的操作
名 稱
描 述
get
通過類型或者名稱返回ChannelHandler
context
返回和ChannelHandler
綁定的ChannelHandlerContext
names
返回ChannelPipeline
中所有ChannelHandler
的名稱
6.2.2 觸發事件
ChannelPipeline
的API公開了用於調用入站和出站操作的附加方法。表6-8列出了入站操作,用於通知ChannelInboundHandler
在ChannelPipeline
中所發生的事件。
表6-8 ChannelPipeline
的入站操作
方 法 名 稱
描 述
fireChannelRegistered
調用ChannelPipeline
中下一個ChannelInboundHandler
的channelRegistered(ChannelHandlerContext)
方法
fireChannelUnregistered
調用ChannelPipeline
中下一個ChannelInboundHandler
的channelUnregistered(ChannelHandlerContext)
方法
fireChannelActive
調用ChannelPipeline
中下一個ChannelInboundHandler
的channelActive(ChannelHandlerContext)
方法
fireChannelInactive
調用ChannelPipeline
中下一個ChannelInboundHandler
的channelInactive(ChannelHandlerContext)
方法
fireExceptionCaught
調用ChannelPipeline
中下一個ChannelInboundHandler
的exceptionCaught(ChannelHandlerContext, Throwable)
方法
fireUserEventTriggered
調用ChannelPipeline
中下一個ChannelInboundHandler
的userEventTriggered(ChannelHandlerContext, Object)
方法
fireChannelRead
調用ChannelPipeline
中下一個ChannelInboundHandler
的channelRead(ChannelHandlerContext, Object msg)
方法
fireChannelReadComplete
調用ChannelPipeline
中下一個ChannelInboundHandler
的channelReadComplete(ChannelHandlerContext)
方法
fireChannelWritability - Changed
調用ChannelPipeline
中下一個ChannelInboundHandler
的channelWritabilityChanged(ChannelHandlerContext)方
法
在出站這邊,處理事件將會導致底層的套接字上發生一系列的動作。表6-9列出了Channel- Pipeline
API的出站操作。
表6-9 ChannelPipeline
的出站操作
方 法 名 稱
描 述
bind
將Channel
綁定到一個本地地址,這將調用ChannelPipeline
中的下一個ChannelOutboundHandler
的bind(ChannelHandlerContext, Socket- Address, ChannelPromise)
方法
connect
將Channel
連接到一個遠程地址,這將調用ChannelPipeline
中的下一個ChannelOutboundHandler
的connect(ChannelHandlerContext, Socket- Address, ChannelPromise)
方法
disconnect
將Channel
斷開連接。這將調用ChannelPipeline
中的下一個ChannelOutbound- Handler
的disconnect(ChannelHandlerContext, Channel Promise)
方法
close
將Channel
關閉。這將調用ChannelPipeline
中的下一個ChannelOutbound- Handler
的close(ChannelHandlerContext, ChannelPromise)
方法
deregister
將Channel
從它先前所分配的EventExecutor
(即EventLoop
)中註銷。這將調用ChannelPipeline
中的下一個ChannelOutboundHandler
的deregister (ChannelHandlerContext, ChannelPromise)
方法
flush
沖刷Channel
所有掛起的寫入。這將調用ChannelPipeline
中的下一個Channel- OutboundHandler
的flush(ChannelHandlerContext)
方法
write
將消息寫入Channel
。這將調用ChannelPipeline
中的下一個Channel- OutboundHandler
的write(ChannelHandlerContext, Object msg, Channel- Promise)
方法。注意:這並不會將消息寫入底層的Socket
,而只會將它放入隊列中。要將它寫入Socket
,需要調用flush
或者writeAndFlush
方法
writeAndFlush
這是一個先調用write
方法再接著調用flush
方法的便利方法
read
請求從Channel
中讀取更多的數據。這將調用ChannelPipeline
中的下一個ChannelOutboundHandler
的read(ChannelHandlerContext)
方法
總結一下:
ChannelPipeline
保存了與Channel
相關聯的ChannelHandler
;ChannelPipeline
可以根據需要,通過添加或者刪除ChannelHandler
來動態地修改;ChannelPipeline
有著豐富的API用以被調用,以響應入站和出站事件。
6.3 ChannelHandlerContext接口
ChannelHandlerContext
代表了ChannelHandler
和ChannelPipeline
之間的關聯,每當有ChannelHandler
添加到ChannelPipeline
中時,都會創建ChannelHandler- Context
。ChannelHandlerContext
的主要功能是管理它所關聯的ChannelHandler
和在同一個ChannelPipeline
中的其他ChannelHandler
之間的交互。
ChannelHandlerContext
有很多的方法,其中一些方法也存在於Channel
和Channel- Pipeline
本身上,但是有一點重要的不同。如果調用Channel
或者ChannelPipeline
上的這些方法,它們將沿著整個ChannelPipeline
進行傳播。而調用位於ChannelHandlerContext
上的相同方法,則將從當前所關聯的ChannelHandler
開始,並且只會傳播給位於該ChannelPipeline
中的下一個能夠處理該事件的ChannelHandler
。
表6-10對ChannelHandlerContext
API進行了總結。
表6-10 ChannelHandlerContext
的API
方 法 名 稱
描 述
alloc
返回和這個實例相關聯的Channel
所配置的ByteBufAllocator
bind
綁定到給定的SocketAddress
,並返回ChannelFuture
channel
返回綁定到這個實例的Channel
close
關閉Channel
,並返回ChannelFuture
connect
連接給定的SocketAddress
,並返回ChannelFuture
deregister
從之前分配的EventExecutor
註銷,並返回ChannelFuture
disconnect
從遠程節點斷開,並返回ChannelFuture
executor
返回調度事件的EventExecutor
fireChannelActive
觸發對下一個ChannelInboundHandler
上的channelActive
方法(已連接)的調用
fireChannelInactive
觸發對下一個ChannelInboundHandler
上的channelInactive
方法(已關閉)的調用
fireChannelRead
觸發對下一個ChannelInboundHandler
上的channelRead
方法(已接收的消息)的調用
fireChannelReadComplete
觸發對下一個ChannelInboundHandler
上的channelReadComplete
方法的調用
fireChannelRegistered
觸發對下一個ChannelInboundHandler
上的fireChannelRegistered
方法的調用
fireChannelUnregistered
觸發對下一個ChannelInboundHandler
上的fireChannelUnregistered
方法的調用
fireChannelWritabilityChanged
觸發對下一個ChannelInboundHandler
上的fireChannelWritabilityChanged
方法的調用
fireExceptionCaught
觸發對下一個ChannelInboundHandler
上的fireExceptionCaught(Throwable)
方法的調用
fireUserEventTriggered
觸發對下一個ChannelInboundHandler
上的fireUserEventTriggered(Object evt)
方法的調用
handler
返回綁定到這個實例的ChannelHandler
isRemoved
如果所關聯的ChannelHandler
已經被從ChannelPipeline中
移除則返回true
name
返回這個實例的唯一名稱
pipeline
返回這個實例所關聯的ChannelPipeline
read
將數據從Channel
讀取到第一個入站緩衝區;如果讀取成功則觸發[5]一個channelRead
事件,並(在最後一個消息被讀取完成後)通知ChannelInboundHandler的channelReadComplete (ChannelHandlerContext)方法
write
通過這個實例寫入消息並經過ChannelPipeline
writeAndFlush
通過這個實例寫入並沖刷消息並經過ChannelPipeline
當使用ChannelHandlerContext
的API的時候,請牢記以下兩點:
ChannelHandlerContext
和ChannelHandler
之間的關聯(綁定)是永遠不會改變的,所以緩存對它的引用是安全的;- 如同我們在本節開頭所解釋的一樣,相對於其他類的同名方法,
ChannelHandlerContext
的方法將產生更短的事件流,應該盡可能地利用這個特性來獲得最大的性能。
6.3.1 使用ChannelHandlerContext
在這一節中我們將討論ChannelHandlerContext
的用法,以及存在於ChannelHandler- Context
、Channel
和ChannelPipeline
上的方法的行為。圖6-4展示了它們之間的關係。
圖6-4 Channel
、ChannelPipeline
、ChannelHandler
以及ChannelHandlerContext
之間的關係
在代碼清單6-6中,將通過ChannelHandlerContext獲取到Channel的引用。調用Channel上的write
方法將會導致寫入事件從尾端到頭部地流經ChannelPipeline
。
代碼清單6-6 從ChannelHandlerContext
訪問Channel
ChannelHandlerContext ctx = ..;
Channel channel = ctx.channel; ← -- 獲取到與ChannelHandlerContext相關聯的Channel 的引用
channel.write(Unpooled.copiedBuffer("Netty in Action", ← -- 通過Channel 寫入緩衝區
CharsetUtil.UTF_8));
代碼清單6-7展示了一個類似的例子,但是這一次是寫入ChannelPipeline
。我們再次看到,(到ChannelPipline
的)引用是通過ChannelHandlerContext
獲取的。
代碼清單6-7 通過ChannelHandlerContext
訪問ChannelPipeline
ChannelHandlerContext ctx = ..;
ChannelPipeline pipeline = ctx.pipeline; ← -- 獲取到與ChannelHandlerContext相關聯的ChannelPipeline 的引用
pipeline.write(Unpooled.copiedBuffer("Netty in Action", ← -- 通過ChannelPipeline寫入緩衝區
CharsetUtil.UTF_8));
如同在圖6-5中所能夠看到的一樣,代碼清單6-6和代碼清單6-7中的事件流是一樣的。重要的是要注意到,雖然被調用的Channel或ChannelPipeline
上的write
方法將一直傳播事件通過整個ChannelPipeline
,但是在ChannelHandler
的級別上,事件從一個ChannelHandler
到下一個ChannelHandler
的移動是由ChannelHandlerContext
上的調用完成的。
圖6-5 通過Channel
或者ChannelPipeline
進行的事件傳播
為什麼會想要從ChannelPipeline
中的某個特定點開始傳播事件呢?
- 為了減少將事件傳經對它不感興趣的
ChannelHandler
所帶來的開銷。 - 為了避免將事件傳經那些可能會對它感興趣的
ChannelHandler
。
要想調用從某個特定的ChannelHandler
開始的處理過程,必須獲取到在(Channel- Pipeline
)該ChannelHandler
之前的ChannelHandler
所關聯的ChannelHandler- Context
。這個ChannelHandlerContext
將調用和它所關聯的ChannelHandler
之後的ChannelHandler
。
代碼清單6-8和圖6-6說明了這種用法。
代碼清單6-8 調用ChannelHandlerContext
的write
方法
ChannelHandlerContext ctx = ..; ← -- 獲取到ChannelHandlerContext的引用
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8)); ← -- write方法將把緩衝區數據發送到下一個ChannelHandler
如圖6-6所示,消息將從下一個ChannelHandler
開始流經ChannelPipeline
,繞過了所有前面的ChannelHandler
。
圖6-6 通過ChannelHandlerContext
觸發的操作的事件流
我們剛才所描述的用例是常見的,對於調用特定的ChannelHandler
實例上的操作尤其有用。
6.3.2 ChannelHandler和ChannelHandlerContext的高級用法
正如我們在代碼清單6-6中所看到的,你可以通過調用ChannelHandlerContext
上的pipeline
方法來獲得被封閉的ChannelPipeline
的引用。這使得運行時得以操作ChannelPipeline
的ChannelHandler
,我們可以利用這一點來實現一些複雜的設計。例如,你可以通過將ChannelHandler
添加到ChannelPipeline
中來實現動態的協議切換。
另一種高級的用法是緩存到ChannelHandlerContext
的引用以供稍後使用,這可能會發生在任何的ChannelHandler
方法之外,甚至來自於不同的線程。代碼清單6-9展示了用這種模式來觸發事件。
代碼清單6-9 緩存到ChannelHandlerContext
的引用
public class WriteHandler extends ChannelHandlerAdapter {
private ChannelHandlerContext ctx;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx; ← -- 存儲到ChannelHandlerContext的引用以供稍後使用
}
public void send(String msg) { ← -- 使用之前存儲的到ChannelHandlerContext的引用來發送消息
ctx.writeAndFlush(msg);
}
}
因為一個ChannelHandler
可以從屬於多個ChannelPipeline
,所以它也可以綁定到多個ChannelHandlerContext
實例。對於這種用法指在多個ChannelPipeline
中共享同一個ChannelHandler
,對應的ChannelHandler
必須要使用@Sharable
註解標注;否則,試圖將它添加到多個ChannelPipeline
時將會觸發異常。顯而易見,為了安全地被用於多個並發的Channel
(即連接),這樣的ChannelHandler
必須是線程安全的。
代碼清單6-10展示了這種模式的一個正確實現。
代碼清單6-10 可共享的ChannelHandler
@Sharable
public class SharableHandler extends ChannelInboundHandlerAdapter { ← -- 使用註解@Sharable標注
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("Channel read message: " + msg);
ctx.fireChannelRead(msg); ← -- 記錄方法調用,並轉發給下一個ChannelHandler
}
}
前面的ChannelHandler
實現符合所有的將其加入到多個ChannelPipeline
的需求,即它使用了註解@Sharable
標注,並且也不持有任何的狀態。相反,代碼清單6-11中的實現將會導致問題。
代碼清單6-11 @Sharable
的錯誤用法
@Sharable ← -- 使用註解@Sharable標注
public class UnsharableHandler extends ChannelInboundHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
count++; ← -- 將count 字段的值加1
System.out.println("channelRead(...) called the "
+ count + " time"); ← -- 記錄方法調用,並轉發給下一個ChannelHandler
ctx.fireChannelRead(msg);
}
}
這段代碼的問題在於它擁有狀態[6],即用於跟蹤方法調用次數的實例變量count
。將這個類的一個實例添加到ChannelPipeline
將極有可能在它被多個並發的Channel
訪問時導致問題。(當然,這個簡單的問題可以通過使channelRead
方法變為同步方法來修正。)
總之,只應該在確定了你的ChannelHandler
是線程安全的時才使用@Sharable
註解。
為何要共享同一個
ChannelHandler
在多個ChannelPipeline
中安裝同一個ChannelHandler
的一個常見的原因是用於收集跨越多個Channel
的統計信息。
我們對於ChannelHandlerContext
和它與其他的框架組件之間的關係的討論到此就結束了。接下來我們將看看異常處理。
6.4 異常處理
異常處理是任何真實應用程序的重要組成部分,它也可以通過多種方式來實現。因此,Netty提供了幾種方式用於處理入站或者出站處理過程中所拋出的異常。這一節將幫助你瞭解如何設計最適合你需要的方式。
6.4.1 處理入站異常
如果在處理入站事件的過程中有異常被拋出,那麼它將從它在ChannelInboundHandler
裡被觸發的那一點開始流經ChannelPipeline
。要想處理這種類型的入站異常,你需要在你的ChannelInboundHandler
實現中重寫下面的方法。
public void exceptionCaught(
ChannelHandlerContext ctx, Throwable cause) throws Exception
代碼清單6-12展示了一個簡單的示例,其關閉了Channel
並打印了異常的棧跟蹤信息。
代碼清單6-12 基本的入站異常處理
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace;
ctx.close;
}
}
因為異常將會繼續按照入站方向流動(就像所有的入站事件一樣),所以實現了前面所示邏輯的ChannelInboundHandler
通常位於ChannelPipeline
的最後。這確保了所有的入站異常都總是會被處理,無論它們可能會發生在ChannelPipeline
中的什麼位置。
你應該如何響應異常,可能很大程度上取決於你的應用程序。你可能想要關閉Channel
(和連接),也可能會嘗試進行恢復。如果你不實現任何處理入站異常的邏輯(或者沒有消費該異常),那麼Netty將會記錄該異常沒有被處理的事實[7]。
總結一下:
ChannelHandler.exceptionCaught
的默認實現是簡單地將當前異常轉發給ChannelPipeline
中的下一個ChannelHandler
;- 如果異常到達了
ChannelPipeline
的尾端,它將會被記錄為未被處理; - 要想定義自定義的處理邏輯,你需要重寫
exceptionCaught
方法。然後你需要決定是否需要將該異常傳播出去。
6.4.2 處理出站異常
用於處理出站操作中的正常完成以及異常的選項,都基於以下的通知機制。
- 每個出站操作都將返回一個
ChannelFuture
。註冊到ChannelFuture
的Channel- FutureListener
將在操作完成時被通知該操作是成功了還是出錯了。 - 幾乎所有的
ChannelOutboundHandler
上的方法都會傳入一個ChannelPromise
的實例。作為ChannelFuture
的子類,ChannelPromise
也可以被分配用於異步通知的監聽器。但是,ChannelPromise
還具有提供立即通知的可寫方法:
ChannelPromise setSuccess;
ChannelPromise setFailure(Throwable cause);
添加ChannelFutureListener
只需要調用ChannelFuture
實例上的addListener(ChannelFutureListener)
方法,並且有兩種不同的方式可以做到這一點。其中最常用的方式是,調用出站操作(如write
方法)所返回的ChannelFuture
上的addListener
方法。
代碼清單6-13使用了這種方式來添加ChannelFutureListener
,它將打印棧跟蹤信息並且隨後關閉Channel
。
代碼清單6-13 添加ChannelFutureListener
到ChannelFuture
ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess) {
f.cause.printStackTrace;
f.channel.close;
}
}
});
第二種方式是將ChannelFutureListener
添加到即將作為參數傳遞給ChannelOut- boundHandler
的方法的ChannelPromise
。代碼清單6-14中所展示的代碼和代碼清單6-13中所展示的具有相同的效果。
代碼清單6-14 添加ChannelFutureListener
到ChannelPromise
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
promise.addListener(new ChannelFutureListener {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess) {
f.cause.printStackTrace;
f.channel.close;
}
}
});
}
}
ChannelPromise的可寫方法
通過調用
ChannelPromise
上的setSuccess
和setFailure
方法,可以使一個操作的狀態在ChannelHandler
的方法返回給其調用者時便即刻被感知到。
為何選擇一種方式而不是另一種呢?對於細緻的異常處理,你可能會發現,在調用出站操作時添加ChannelFutureListener
更合適,如代碼清單6-13所示。而對於一般的異常處理,你可能會發現,代碼清單6-14所示的自定義的ChannelOutboundHandler
實現的方式更加的簡單。
如果你的ChannelOutboundHandler
本身拋出了異常會發生什麼呢?在這種情況下,Netty本身會通知任何已經註冊到對應ChannelPromise
的監聽器。
6.5 小結
在本章中我們仔細地研究了Netty的數據處理組件——ChannelHandler
。我們討論了ChannelHandler
是如何鏈接在一起,以及它們是如何作為ChannelInboundHandler
和ChannelOutboundHandler與ChannelPipeline
進行交互的。
下一章將介紹Netty的EventLoop
和並發模型,這對於理解Netty是如何實現異步的、事件驅動的網絡編程模型來說至關重要。
[1] 當所有可讀的字節都已經從Channel
中讀取之後,將會調用該回調方法;所以,可能在channelRead- Complete
被調用之前看到多次調用channelRead(...)
。——譯者注
[2] 這裡借鑒的是Scala的Promise和Future的設計,當一個Promise被完成之後,其對應的Future的值便不能再進行任何修改了。——譯者注
[3] 其利用了JDK提供的PhantomReference
類來實現這一點。——譯者注
[4] 這裡指修改ChannelPipeline
中的ChannelHandler
的編排。——譯者注
[5] 通過配合ChannelConfig.setAutoRead(boolean autoRead)
方法,可以實現反應式系統的特性之一回壓(back-pressure)。——譯者注
[6] 主要的問題在於,對於其所持有的狀態的修改並不是線程安全的,比如也可以通過使用AtomicInteger
來規避這個問題。——譯者注
[7] 即Netty將會通過Warning級別的日誌記錄該異常到達了ChannelPipeline
的尾端,但沒有被處理,並嘗試釋放該異常。——譯者注