讀古今文學網 > Netty實戰 > 第11章 預置的ChannelHandler和編解碼器 >

第11章 預置的ChannelHandler和編解碼器

本章主要內容

  • 通過SSL/TLS保護Netty應用程序
  • 構建基於Netty的HTTP/HTTPS應用程序
  • 處理空閒的連接和超時
  • 解碼基於分隔符的協議和基於長度的協議
  • 寫大型數據

Netty為許多通用協議提供了編解碼器和處理器,幾乎可以開箱即用,這減少了你在那些相當繁瑣的事務上本來會花費的時間與精力。在本章中,我們將探討這些工具以及它們所帶來的好處,其中包括Netty對於SSL/TLS和WebSocket的支持,以及如何簡單地通過數據壓縮來壓搾HTTP,以獲取更好的性能。

11.1 通過SSL/TLS保護Netty應用程序

如今,數據隱私是一個非常值得關注的問題,作為開發人員,我們需要準備好應對它。至少,我們應該熟悉像SSL和TLS[1]這樣的安全協議,它們層疊在其他協議之上,用以實現數據安全。我們在訪問安全網站時遇到過這些協議,但是它們也可用於其他不是基於HTTP的應用程序,如安全SMTP(SMTPS)郵件服務器甚至是關係型數據庫系統。

為了支持SSL/TLS,Java提供了javax.net.ssl包,它的SSLContextSSLEngine類使得實現解密和加密相當簡單直接。Netty通過一個名為SslHandlerChannelHandler實現利用了這個API,其中SslHandler在內部使用SSLEngine來完成實際的工作。

圖11-1展示了使用SslHandler的數據流。

Netty的OpenSSL/SSLEngine實現

Netty還提供了使用OpenSSL工具包(www.openssl.org)的SSLEngine實現。這個OpenSsl-Engine類提供了比JDK提供的SSLEngine實現更好的性能。

如果OpenSSL庫可用,可以將Netty應用程序(客戶端和服務器)配置為默認使用OpenSslEngine。如果不可用,Netty將會回退到JDK實現。有關配置OpenSSL支持的詳細說明,參見Netty文檔:http://netty.io/wiki/forked-tomcat-native.html#wikih2-1。

注意,無論你使用JDK的SSLEngine還是使用Netty的OpenSslEngine,SSL API和數據流都是一致的。

圖11-1 通過SslHandler進行解密和加密的數據流

代碼清單11-1展示了如何使用ChannelInitializer來將SslHandler添加到Channel- Pipeline中。回想一下,ChannelInitializer用於在Channel註冊好時設置Channel- Pipeline

代碼清單11-1 添加SSL/TLS支持

public class SslChannelInitializer extends ChannelInitializer<Channel>{
  private final SslContext context; 
  private final boolean startTls;

  public SslChannelInitializer(SslContext context,   ← --  傳入要使用的SslContext
    boolean startTls) {  ← --  如果設置為true,第一個寫入的消息將不會被加密(客戶端應該設置為true)
    this.context = context;
    this.startTls = startTls;
  }

  @Override
  protected void initChannel(Channel ch) throws Exception {
    SSLEngine engine = context.newEngine(ch.alloc);   ← -- 對於每個SslHandler 實例,都使用Channel 的ByteBuf-Allocator 從SslContext 獲取一個新的SSLEngine
    ch.pipeline.addFirst("ssl",
      new SslHandler(engine, startTls));   ← -- 將SslHandler 作為第一個ChannelHandler 添加到ChannelPipeline 中 
  }
}  

在大多數情況下,SslHandler將是ChannelPipeline中的第一個ChannelHandler。這確保了只有在所有其他的ChannelHandler將它們的邏輯應用到數據之後,才會進行加密。

SslHandler具有一些有用的方法,如表11-1所示。例如,在握手階段,兩個節點將相互驗證並且商定一種加密方式。你可以通過配置SslHandler來修改它的行為,或者在SSL/TLS握手一旦完成之後提供通知,握手階段完成之後,所有的數據都將會被加密。SSL/TLS握手將會被自動執行。

表11-1 SslHandler的方法

方 法 名 稱

描  述

setHandshakeTimeout (long,TimeUnit)
setHandshakeTimeoutMillis (long)
getHandshakeTimeoutMillis

設置和獲取超時時間,超時之後,握手ChannelFuture將會被通知失敗

setCloseNotifyTimeout (long,TimeUnit)
setCloseNotifyTimeoutMillis (long)
getCloseNotifyTimeoutMillis

設置和獲取超時時間,超時之後,將會觸發一個關閉通知並關閉連接。這也將會導致通知該ChannelFuture失敗

handshakeFuture

返回一個在握手完成後將會得到通知的ChannelFuture。如果握手先前已經執行過了,則返回一個包含了先前的握手結果的ChannelFuture

close
close(ChannelPromise)
close(ChannelHandlerContext,ChannelPromise)

發送close_notify以請求關閉並銷毀底層的SslEngine

11.2 構建基於Netty的HTTP/HTTPS應用程序

HTTP/HTTPS是最常見的協議套件之一,並且隨著智能手機的成功,它的應用也日益廣泛,因為對於任何公司來說,擁有一個可以被移動設備訪問的網站幾乎是必須的。這些協議也被用於其他方面。許多組織導出的用於和他們的商業合作夥伴通信的WebService API一般也是基於HTTP(S)的。

接下來,我們來看看Netty提供的ChannelHandler,你可以用它來處理HTTP和HTTPS協議,而不必編寫自定義的編解碼器。

11.2.1 HTTP解碼器、編碼器和編解碼器

HTTP是基於請求/響應模式的:客戶端向服務器發送一個HTTP請求,然後服務器將會返回一個HTTP響應。Netty提供了多種編碼器和解碼器以簡化對這個協議的使用。圖11-2和圖11-3分別展示了生產和消費HTTP請求和HTTP響應的方法。

圖11-2 HTTP請求的組成部分

圖11-3 HTTP響應的組成部分

如圖11-2和圖11-3所示,一個HTTP請求/響應可能由多個數據部分組成,並且它總是以一個LastHttpContent部分作為結束。FullHttpRequestFullHttpResponse消息是特殊的子類型,分別代表了完整的請求和響應。所有類型的HTTP消息(FullHttpRequestLastHttpContent以及代碼清單11-2中展示的那些)都實現了HttpObject接口。

表11-2概要地介紹了處理和生成這些消息的HTTP解碼器和編碼器。

表11-2 HTTP解碼器和編碼器

名  稱

描  述

HttpRequestEncoder

HttpRequestHttpContentLastHttpContent消息編碼為字節

HttpResponseEncoder

HttpResponseHttpContentLastHttpContent消息編碼為字節

HttpRequestDecoder

將字節解碼為HttpRequestHttpContentLastHttpContent消息

HttpResponseDecoder

將字節解碼為HttpResponseHttpContentLastHttpContent消息

代碼清單11-2中的HttpPipelineInitializer類展示了將HTTP支持添加到你的應用程序是多麼簡單——幾乎只需要將正確的ChannelHandler添加到ChannelPipeline中。

代碼清單11-2 添加HTTP支持

public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
  private final boolean client;

  public HttpPipelineInitializer(boolean client) {
    this.client = client;
  }

  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline;
    if (client) {  ← --  如果是客戶端,則添加HttpResponseDecoder 以處理來自服務器的響應
      pipeline.addLast("decoder", new HttpResponseDecoder); 
      pipeline.addLast("encoder", new HttpRequestEncoder);   ← --  如果是客戶端,則添加HttpRequestEncoder以向服務器發送請求
    } else {
      pipeline.addLast("decoder", new HttpRequestDecoder);   ← -- 如果是服務器,則添加HttpRequestDecoder以接收來自客戶端的請求
      pipeline.addLast("encoder", new HttpResponseEncoder);  ← -- 如果是服務器,則添加HttpResponseEncoder以向客戶端發送響應
    }
  }
}  

11.2.2 聚合HTTP消息

ChannelInitializerChannelHandler安裝到ChannelPipeline中之後,你便可以處理不同類型的HttpObject消息了。但是由於HTTP的請求和響應可能由許多部分組成,因此你需要聚合它們以形成完整的消息。為了消除這項繁瑣的任務,Netty提供了一個聚合器,它可以將多個消息部分合併為FullHttpRequest或者FullHttpResponse消息。通過這樣的方式,你將總是看到完整的消息內容。

由於消息分段需要被緩衝,直到可以轉發一個完整的消息給下一個ChannelInbound-Handler,所以這個操作有輕微的開銷。其所帶來的好處便是你不必關心消息碎片了。

引入這種自動聚合機制只不過是向ChannelPipeline中添加另外一個ChannelHandler罷了。代碼清單11-3展示了如何做到這一點。

代碼清單11-3 自動聚合HTTP的消息片段

public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
  private final boolean isClient;

  public HttpAggregatorInitializer(boolean isClient) {
    this.isClient = isClient;
  }

  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline;
    if (isClient) {
      pipeline.addLast("codec", new HttpClientCodec);   ← --  如果是客戶端,則添加HttpClientCodec
    } else {
      pipeline.addLast("codec", new HttpServerCodec);  ← -- 如果是服務器,則添加HttpServerCodec
    }
    pipeline.addLast("aggregator",
      new HttpObjectAggregator(512 * 1024));  ← --  將最大的消息大小為512 KB的HttpObjectAggregator 添加到ChannelPipeline
  }
}  

11.2.3 HTTP壓縮

當使用HTTP時,建議開啟壓縮功能以盡可能多地減小傳輸數據的大小。雖然壓縮會帶來一些CPU時鐘週期上的開銷,但是通常來說它都是一個好主意,特別是對於文本數據來說。

Netty為壓縮和解壓縮提供了ChannelHandler實現,它們同時支持gzipdeflate編碼。

HTTP請求的頭部信息

客戶端可以通過提供以下頭部信息來指示服務器它所支持的壓縮格式:

GET /encrypted-area HTTP/1.1
Host: www.example.com
Accept-Encoding: gzip, deflate  

然而,需要注意的是,服務器沒有義務壓縮它所發送的數據。

代碼清單11-4展示了一個例子。

代碼清單11-4 自動壓縮HTTP消息

public class HttpCompressionInitializer extends ChannelInitializer<Channel> {
  private final boolean isClient;

  public HttpCompressionInitializer(boolean isClient) {
    this.isClient = isClient;
  }

  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline;
    if (isClient) {
       pipeline.addLast("codec", new HttpClientCodec);   ← --  如果是客戶端,則添加HttpClientCodec
      pipeline.addLast("decompressor",
        new HttpContentDecompressor); ← --  如果是客戶端,則添加HttpContentDecompressor 以處理來自服務器的壓縮內容 
    } else {
      pipeline.addLast("codec", new HttpServerCodec);   ← --  如果是服務器,則添加HttpServerCodec
      pipeline.addLast("compressor",
      new HttpContentCompressor);  ← -- 如果是服務器,則添加HttpContentCompressor來壓縮數據(如果客戶端支持它)
    }
  }
}  

壓縮及其依賴

如果你正在使用的是JDK 6或者更早的版本,那麼你需要將JZlib(www.jcraft.com/jzlib/)添加到CLASSPATH中以支持壓縮功能。

對於Maven,請添加以下依賴項:

<dependency>
  <groupId>com.jcraft</groupId>
  <artifactId>jzlib</artifactId>
  <version>1.1.3</version>
</dependency>  

11.2.4 使用HTTPS

代碼清單11-5顯示,啟用HTTPS只需要將SslHandler添加到ChannelPipelineChannelHandler組合中。

代碼清單11-5 使用HTTPS

public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
  private final SslContext context;
  private final boolean isClient;

   public HttpsCodecInitializer(SslContext context, boolean isClient) {
     this.context = context;
     this.isClient = isClient;
   }

  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline;
    SSLEngine engine = context.newEngine(ch.alloc);
    pipeline.addFirst("ssl", new SslHandler(engine));   ← --  將SslHandler 添加到ChannelPipeline 中以使用HTTPS

    if (isClient) {
      pipeline.addLast("codec", new HttpClientCodec);   ← --  如果是客戶端,則添加HttpClientCodec
    } else {
      pipeline.addLast("codec", new HttpServerCodec);   ← --  如果是服務器,則添加HttpServerCodec
    }
  }
}  

前面的代碼是一個很好的例子,說明了Netty的架構方式是如何將代碼重用變為槓桿作用的。只需要簡單地將一個ChannelHandler添加到ChannelPipeline中,便可以提供一項新功能,甚至像加密這樣重要的功能都能提供。

11.2.5 WebSocket

Netty針對基於HTTP的應用程序的廣泛工具包中包括了對它的一些最先進的特性的支持。在這一節中,我們將探討WebSocket ——一種在2011年被互聯網工程任務組(IETF)標準化的協議。

WebSocket解決了一個長期存在的問題:既然底層的協議(HTTP)是一個請求/響應模式的交互序列,那麼如何實時地發佈信息呢?AJAX提供了一定程度上的改善,但是數據流仍然是由客戶端所發送的請求驅動的。還有其他的一些或多或少的取巧方式[2],但是最終它們仍然屬於擴展性受限的變通之法。

WebSocket規範以及它的實現代表了對一種更加有效的解決方案的嘗試。簡單地說,WebSocket提供了「在一個單個的TCP連接上提供雙向的通信……結合WebSocket API……它為網頁和遠程服務器之間的雙向通信提供了一種替代HTTP輪詢的方案。」[3]

也就是說,WebSocket在客戶端和服務器之間提供了真正的雙向數據交換。我們不會深入地描述太多的內部細節,但是我們還是應該提到,儘管最早的實現僅限於文本數據,但是現在已經不是問題了;WebSocket現在可以用於傳輸任意類型的數據,很像普通的套接字。

圖11-4給出了WebSocket協議的一般概念。在這個場景下,通信將作為普通的HTTP協議開始,隨後升級到雙向的WebSocket協議。

要想向你的應用程序中添加對於WebSocket的支持,你需要將適當的客戶端或者服務器WebSocket ChannelHandler添加到ChannelPipeline中。這個類將處理由WebSocket定義的稱為幀的特殊消息類型。如表11-3所示,WebSocketFrame可以被歸類為數據幀或者控制幀。

圖11-4 WebSocket協議

表11-3 WebSocketFrame類型

名  稱

描  述

BinaryWebSocketFrame

數據幀:二進制數據

TextWebSocketFrame

數據幀:文本數據

ContinuationWebSocketFrame

數據幀:屬於上一個BinaryWebSocketFrame或者TextWeb- SocketFrame的文本的或者二進制數據

CloseWebSocketFrame

控制幀:一個CLOSE請求、關閉的狀態碼以及關閉的原因

PingWebSocketFrame

控制幀:請求一個PongWebSocketFrame

PongWebSocketFrame

控制幀:對PingWebSocketFrame請求的響應

因為Netty主要是一種服務器端的技術,所以在這裡我們重點創建WebSocket服務器[4]。代碼清單11-6展示了一個使用WebSocketServerProtocolHandler的簡單示例,這個類處理協議升級握手,以及3種控制幀——ClosePingPongTextBinary數據幀將會被傳遞給下一個(由你實現的)ChannelHandler進行處理。

代碼清單11-6 在服務器端支持WebSocket

public class WebSocketServerInitializer extends ChannelInitializer<Channel>{
  @Override
  protected void initChannel(Channel ch) throws Exception {
    ch.pipeline.addLast(
      new HttpServerCodec,
      new HttpObjectAggregator(65536),   ← --  為握手提供聚合的HttpRequest
       new WebSocketServerProtocolHandler("/websocket"), ← --  如果被請求的端點是"/websocket",則處理該升級握手 
      new TextFrameHandler,  ← --  TextFrameHandler 處理TextWebSocketFrame
       new BinaryFrameHandler, ← -- BinaryFrameHandler 處理BinaryWebSocketFrame 
       new ContinuationFrameHandler); ← -- ContinuationFrameHandler 處理ContinuationWebSocketFrame  
  }

  public static final class TextFrameHandler extends
    SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,
      TextWebSocketFrame msg) throws Exception {
      // Handle text frame
    }
  }

  public static final class BinaryFrameHandler extends
    SimpleChannelInboundHandler<BinaryWebSocketFrame> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,
      BinaryWebSocketFrame msg) throws Exception {
      // Handle binary frame
    }
  }

  public static final class ContinuationFrameHandler extends
    SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,
      ContinuationWebSocketFrame msg) throws Exception {
      // Handle continuation frame
    }
  }
} 

保護WebSocket

要想為WebSocket添加安全性,只需要將SslHandler作為第一個ChannelHandler添加到ChannelPipeline中。

更加全面的示例參見第12章,那一章會深入探討實時WebSocket應用程序的設計。

11.3 空閒的連接和超時

到目前為止,我們的討論都集中在Netty通過專門的編解碼器和處理器對HTTP的變型HTTPS和WebSocket的支持上。只要你有效地管理你的網絡資源,這些技術就可以使得你的應用程序更加高效、易用和安全。所以,讓我們一起來探討下首先需要關注的——連接管理吧。

檢測空閒連接以及超時對於及時釋放資源來說是至關重要的。由於這是一項常見的任務,Netty特地為它提供了幾個ChannelHandler實現。表11-4給出了它們的概述。

表11-4 用於空閒連接以及超時的ChannelHandler

名  稱

描  述

IdleStateHandler

當連接空閒時間太長時,將會觸發一個IdleStateEvent事件。然後,你可以通過在你的ChannelInboundHandler中重寫userEvent- Triggered方法來處理該IdleStateEvent事件

ReadTimeoutHandler

如果在指定的時間間隔內沒有收到任何的入站數據,則拋出一個Read- TimeoutException並關閉對應的Channel。可以通過重寫你的ChannelHandler中的exceptionCaught方法來檢測該Read- TimeoutException

WriteTimeoutHandler

如果在指定的時間間隔內沒有任何出站數據寫入,則拋出一個Write- TimeoutException並關閉對應的Channel。可以通過重寫你的ChannelHandlerexceptionCaught方法檢測該WriteTimeout- Exception

讓我們仔細看看在實踐中使用得最多的IdleStateHandler吧。代碼清單11-7展示了當使用通常的發送心跳消息到遠程節點的方法時,如果在60秒之內沒有接收或者發送任何的數據,我們將如何得到通知;如果沒有響應,則連接會被關閉。

代碼清單11-7 發送心跳

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel>
  {
  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline;
    pipeline.addLast(
      new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));   ← --  ❶IdleStateHandler 將在被觸發時發送一個IdleStateEvent 事件
    pipeline.addLast(new HeartbeatHandler);  ← --  將一個HeartbeatHandler添加到ChannelPipeline中
  }

  public static final class HeartbeatHandler   ← -- 實現userEven t-Triggered方法以發送心跳消息
    extends ChannelInboundHandlerAdapter {
   private static final ByteBuf HEARTBEAT_SEQUENCE =   ← -- 發送到遠程節點的心跳消息 
      Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(
      "HEARTBEAT", CharsetUtil.ISO_8859_1));

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx,
      Object evt) throws Exception {
      if (evt instanceof IdleStateEvent) {   ← -- ❷發送心跳消息,並在發送失敗時關閉該連接 
        ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate)
          .addListener(
            ChannelFutureListener.CLOSE_ON_FAILURE);
      } else {
        super.userEventTriggered(ctx, evt);  ← -- 不是IdleStateEvent事件,所以將它傳遞給下一個Channel-InboundHandler 
      }
    }
  }
}  

這個示例演示了如何使用IdleStateHandler來測試遠程節點是否仍然還活著,並且在它失活時通過關閉連接來釋放資源。

如果連接超過60秒沒有接收或者發送任何的數據,那麼IdleStateHandler❶將會使用一個IdleStateEvent事件來調用fireUserEventTriggered方法。HeartbeatHandler實現了userEventTriggered方法,如果這個方法檢測到IdleStateEvent事件,它將會發送心跳消息,並且添加一個將在發送操作失敗時關閉該連接的ChannelFutureListener❷

11.4 解碼基於分隔符的協議和基於長度的協議

在使用Netty的過程中,你將會遇到需要解碼器的基於分隔符和幀長度的協議。下一節將解釋Netty所提供的用於處理這些場景的實現。

11.4.1 基於分隔符的協議

基於分隔符的(delimited)消息協議使用定義的字符來標記的消息或者消息段(通常被稱為幀)的開頭或者結尾。由RFC文檔正式定義的許多協議(如SMTP、POP3、IMAP以及Telnet[5])都是這樣的。此外,當然,私有組織通常也擁有他們自己的專有格式。無論你使用什麼樣的協議,表11-5中列出的解碼器都能幫助你定義可以提取由任意標記(token)序列分隔的幀的自定義解碼器。

表11-5 用於處理基於分隔符的協議和基於長度的協議的解碼器

名  稱

描  述

DelimiterBasedFrameDecoder

使用任何由用戶提供的分隔符來提取幀的通用解碼器

LineBasedFrameDecoder

提取由行尾符(\n或者\r\n)分隔的幀的解碼器。這個解碼器比DelimiterBasedFrameDecoder更快

圖11-5展示了當幀由行尾序列\r\n(回車符+換行符)分隔時是如何被處理的。

圖11-5 由行尾符分隔的幀

代碼清單11-8展示了如何使用LineBasedFrameDecoder來處理圖11-5所示的場景。

代碼清單11-8 處理由行尾符分隔的幀

public class LineBasedHandlerInitializer extends ChannelInitializer<Channel>
  {
  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline;
    pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));   ← --  該LineBasedFrame-Decoder 將提取的幀轉發給下一個Channel-InboundHandler
    pipeline.addLast(new FrameHandler);   ← --  添加FrameHandler以接收幀
  }

  public static final class FrameHandler
    extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,   ← -- 傳入了單個幀的內容 
       ByteBuf msg) throws Exception {
       // Do something with the data extracted from the frame
    }
  }
}  

如果你正在使用除了行尾符之外的分隔符分隔的幀,那麼你可以以類似的方式使用Delimiter-BasedFrameDecoder,只需要將特定的分隔符序列指定到其構造函數即可。

這些解碼器是實現你自己的基於分隔符的協議的工具。作為示例,我們將使用下面的協議規範:

  • 傳入數據流是一系列的幀,每個幀都由換行符(\n)分隔;
  • 每個幀都由一系列的元素組成,每個元素都由單個空格字符分隔;
  • 一個幀的內容代表一個命令,定義為一個命令名稱後跟著數目可變的參數。

我們用於這個協議的自定義解碼器將定義以下類:

  • Cmd——將幀(命令)的內容存儲在ByteBuf中,一個ByteBuf用於名稱,另一個用於參數;
  • CmdDecoder——從被重寫了的decode方法中獲取一行字符串,並從它的內容構建一個Cmd的實例;
  • CmdHandler ——從CmdDecoder獲取解碼的Cmd對象,並對它進行一些處理;
  • CmdHandlerInitializer ——為了簡便起見,我們將會把前面的這些類定義為專門的ChannelInitializer的嵌套類,其將會把這些ChannelInboundHandler安裝到ChannelPipeline中。

正如將在代碼清單11-9中所能看到的那樣,這個解碼器的關鍵是擴展LineBasedFrame-Decoder

代碼清單11-9 使用ChannelInitializer安裝解碼器

public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
  final byte SPACE = (byte)' ';
  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline;
    pipeline.addLast(new CmdDecoder(64 * 1024));  ← --   添加CmdDecoder 以提取Cmd 對象,並將它轉發給下一個ChannelInboundHandler
    pipeline.addLast(new CmdHandler);  ← -- 添加CmdHandler 以接收和處理Cmd 對像
  }

  public static final class Cmd {   ← -- Cmd POJO
    private final ByteBuf name;
    private final ByteBuf args;

    public Cmd(ByteBuf name, ByteBuf args) {
      this.name = name;
      this.args = args;
    }

    public ByteBuf name {
      return name;
    }

    public ByteBuf args {
      return args;
    }
  }

  public static final class CmdDecoder extends LineBasedFrameDecoder {
    public CmdDecoder(int maxLength) {
      super(maxLength);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer)
      throws Exception {
      ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);   ← --  從ByteBuf 中提取由行尾符序列分隔的幀
      if (frame == null) {  
        return null;   ← --  如果輸入中沒有幀,則返回null 
      }
      int index = frame.indexOf(frame.readerIndex,  ← --  查找第一個空格字符的索引。前面是命令名稱,接著是參數
        frame.writerIndex, SPACE);
      return new Cmd(frame.slice(frame.readerIndex, index),  ← -- 使用包含有命令名稱和參數的切片創建新的Cmd 對像
        frame.slice(index + 1, frame.writerIndex));
    }
  }

  public static final class CmdHandler
    extends SimpleChannelInboundHandler<Cmd> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Cmd msg)
      throws Exception {
      // Do something with the command  ← -- 處理傳經ChannelPipeline的Cmd 對像 
    }
  }
}  

11.4.2 基於長度的協議

基於長度的協議通過將它的長度編碼到幀的頭部來定義幀,而不是使用特殊的分隔符來標記它的結束。[6]表11-6列出了Netty提供的用於處理這種類型的協議的兩種解碼器。

表11-6 用於基於長度的協議的解碼器

名  稱

描  述

FixedLengthFrameDecoder

提取在調用構造函數時指定的定長幀

LengthFieldBasedFrameDecoder

根據編碼進幀頭部中的長度值提取幀;該字段的偏移量以及長度在構造函數中指定

圖11-6展示了FixedLengthFrameDecoder的功能,其在構造時已經指定了幀長度為8字節。

圖11-6 解碼長度為8字節的幀

你將經常會遇到被編碼到消息頭部的幀大小不是固定值的協議。為了處理這種變長幀,你可以使用LengthFieldBasedFrameDecoder,它將從頭部字段確定幀長,然後從數據流中提取指定的字節數。

圖11-7展示了一個示例,其中長度字段在幀中的偏移量為0,並且長度為2字節。

圖11-7 將變長幀大小編碼進頭部的消息

LengthFieldBasedFrameDecoder提供了幾個構造函數來支持各種各樣的頭部配置情況。代碼清單11-10展示了如何使用其3個構造參數分別為maxFrameLengthlengthField-OffsetlengthFieldLength的構造函數。在這個場景中,幀的長度被編碼到了幀起始的前8個字節中。

代碼清單11-10 使用LengthFieldBasedFrameDecoder解碼器基於長度的協議

public class LengthBasedInitializer extends ChannelInitializer<Channel> {  
  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline;
    pipeline.addLast(   ← --  使用LengthFieldBasedFrameDecoder 解碼將幀長度編碼到幀起始的前8 個字節中的消息
      new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8)); 
    pipeline.addLast(new FrameHandler);  ← --  添加FrameHandler以處理每個幀 
  }

  public static final class FrameHandler
    extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,
      ByteBuf msg) throws Exception {
      // Do something with the frame  ← --  處理幀的數據
    }
  }
}  

你現在已經看到了Netty提供的,用於支持那些通過指定協議幀的分隔符或者長度(固定的或者可變的)以定義字節流的結構的協議的編解碼器。你將會發現這些編解碼器的許多用途,因為許多的常見協議都落到了這些分類之一中。

11.5 寫大型數據

因為網絡飽和的可能性,如何在異步框架中高效地寫大塊的數據是一個特殊的問題。由於寫操作是非阻塞的,所以即使沒有寫出所有的數據,寫操作也會在完成時返回並通知Channel-Future。當這種情況發生時,如果仍然不停地寫入,就有內存耗盡的風險。所以在寫大型數據時,需要準備好處理到遠程節點的連接是慢速連接的情況,這種情況會導致內存釋放的延遲。讓我們考慮下將一個文件內容寫出到網絡的情況。

在我們討論傳輸(見4.2節)的過程中,提到了NIO的零拷貝特性,這種特性消除了將文件的內容從文件系統移動到網絡棧的複製過程。所有的這一切都發生在Netty的核心中,所以應用程序所有需要做的就是使用一個FileRegion接口的實現,其在Netty的API文檔中的定義是:「通過支持零拷貝的文件傳輸的Channel來發送的文件區域。」

代碼清單11-11展示了如何通過從FileInputStream創建一個DefaultFileRegion,並將其寫入Channel[7],從而利用零拷貝特性來傳輸一個文件的內容。

代碼清單11-11 使用FileRegion傳輸文件的內容

FileInputStream in = new FileInputStream(file);   ← -- 創建一個FileInputStream 
FileRegion region = new DefaultFileRegion(  ← -- 以該文件的完整長度創建一個新的DefaultFileRegion
  in.getChannel, 0, file.length);
channel.writeAndFlush(region).addListener(  ← --  發送該DefaultFile-Region,並註冊一個ChannelFutureListener
  new ChannelFutureListener {
  @Override
  public void operationComplete(ChannelFuture future)
    throws Exception {
    if (!future.isSuccess) {
      Throwable cause = future.cause;  ← --  處理失敗
      // Do something
    }
  }
});  

這個示例只適用於文件內容的直接傳輸,不包括應用程序對數據的任何處理。在需要將數據從文件系統複製到用戶內存中時,可以使用ChunkedWriteHandler,它支持異步寫大型數據流,而又不會導致大量的內存消耗。

關鍵是interface ChunkedInput<B>,其中類型參數BreadChunk方法返回的類型。Netty預置了該接口的4個實現,如表11-7中所列出的。每個都代表了一個將由Chunked-WriteHandler處理的不定長度的數據流。

代碼清單11-12說明了ChunkedStream的用法,它是實踐中最常用的實現。所示的類使用了一個File以及一個SslContext進行實例化。當initChannel方法被調用時,它將使用所示的ChannelHandler鏈初始化該Channel

表11-7 ChunkedInput的實現

名  稱

描  述

ChunkedFile

從文件中逐塊獲取數據,當你的平台不支持零拷貝或者你需要轉換數據時使用

ChunkedNioFile

ChunkedFile類似,只是它使用了FileChannel

ChunkedStream

InputStream中逐塊傳輸內容

ChunkedNioStream

ReadableByteChannel中逐塊傳輸內容

Channel的狀態變為活動的時,WriteStreamHandler將會逐塊地把來自文件中的數據作為ChunkedStream寫入。數據在傳輸之前將會由SslHandler加密。

代碼清單11-12 使用ChunkedStream傳輸文件內容

public class ChunkedWriteHandlerInitializer
  extends ChannelInitializer<Channel> {
  private final File file;
  private final SslContext sslCtx;
  public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
    this.file = file;
    this.sslCtx = sslCtx;
  }

  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline;
    pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc);   ← --  將SslHandler 添加到ChannelPipeline 中
    pipeline.addLast(new ChunkedWriteHandler);  ← --  添加Chunked-WriteHandler以處理作為ChunkedInput傳入的數據
    pipeline.addLast(new WriteStreamHandler);   ← --  一旦連接建立,WriteStreamHandler就開始寫文件數據  
  }

  public final class WriteStreamHandler
    extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx)   ← --  當連接建立時,channelActive方法將使用ChunkedInput寫文件數據
      throws Exception {
      super.channelActive(ctx);
      ctx.writeAndFlush(
      new ChunkedStream(new FileInputStream(file)));
    }
  }
} 

逐塊輸入 要使用你自己的ChunkedInput實現,請在ChannelPipeline中安裝一個ChunkedWriteHandler

在本節中,我們討論了如何通過使用零拷貝特性來高效地傳輸文件,以及如何通過使用ChunkedWriteHandler來寫大型數據而又不必冒著導致OutOfMemoryError的風險。在下一節中,我們將仔細研究幾種序列化POJO的方法。

11.6 序列化數據

JDK提供了ObjectOutputStreamObjectInputStream,用於通過網絡對POJO的基本數據類型和圖進行序列化和反序列化。該API並不複雜,而且可以被應用於任何實現了java.io.Serializable接口的對象。但是它的性能也不是非常高效的。在這一節中,我們將看到Netty必須為此提供什麼。

11.6.1 JDK序列化

如果你的應用程序必須要和使用了ObjectOutputStreamObjectInputStream的遠程節點交互,並且兼容性也是你最關心的,那麼JDK序列化將是正確的選擇[8]。表11-8中列出了Netty提供的用於和JDK進行互操作的序列化類。

表11-8 JDK序列化編解碼器

名  稱

描  述

CompatibleObjectDecoder [9]

和使用JDK序列化的非基於Netty的遠程節點進行互操作的解碼器

CompatibleObjectEncoder

和使用JDK序列化的非基於Netty的遠程節點進行互操作的編碼器

ObjectDecoder

構建於JDK序列化之上的使用自定義的序列化來解碼的解碼器;當沒有其他的外部依賴時,它提供了速度上的改進。否則其他的序列化實現更加可取

ObjectEncoder

構建於JDK序列化之上的使用自定義的序列化來編碼的編碼器;當沒有其他的外部依賴時,它提供了速度上的改進。否則其他的序列化實現更加可取

11.6.2 使用JBoss Marshalling進行序列化

如果你可以自由地使用外部依賴,那麼JBoss Marshalling將是個理想的選擇:它比JDK序列化最多快3倍,而且也更加緊湊。在JBoss Marshalling官方網站主頁[10]上的概述中對它是這麼定義的:

JBoss Marshalling是一種可選的序列化API,它修復了在JDK序列化API中所發現的許多問題,同時保留了與java.io.Serializable及其相關類的兼容性,並添加了幾個新的可調優參數以及額外的特性,所有的這些都是可以通過工廠配置(如外部序列化器、類/實例查找表、類解析以及對像替換等)實現可插拔的。

Netty通過表11-9所示的兩組解碼器/編碼器對為Boss Marshalling提供了支持。第一組兼容只使用JDK序列化的遠程節點。第二組提供了最大的性能,適用於和使用JBoss Marshalling的遠程節點一起使用。

表11-9 JBoss Marshalling編解碼器

名  稱

描  述

CompatibleMarshallingDecoder
CompatibleMarshallingEncoder

與只使用JDK序列化的遠程節點兼容

MarshallingDecoder
MarshallingEncoder

適用於使用JBoss Marshalling的節點。這些類必須一起使用

代碼清單11-13展示了如何使用MarshallingDecoderMarshallingEncoder。同樣,幾乎只是適當地配置ChannelPipeline罷了。

代碼清單11-13 使用JBoss Marshalling

public class MarshallingInitializer extends ChannelInitializer<Channel> {
  private final MarshallerProvider marshallerProvider;
  private final UnmarshallerProvider unmarshallerProvider;

  public MarshallingInitializer(
    UnmarshallerProvider unmarshallerProvider,
    MarshallerProvider marshallerProvider) {
     this.marshallerProvider = marshallerProvider;
    this.unmarshallerProvider = unmarshallerProvider;
  }

  @Override
  protected void initChannel(Channel channel) throws Exception {
    ChannelPipeline pipeline = channel.pipeline;
     pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));   ← --  添加MarshallingDecoder 以將ByteBuf 轉換為POJO
    pipeline.addLast(new MarshallingEncoder(marshallerProvider));  ← -- 添加Marshalling-Encoder 以將POJO轉換為ByteBuf
    pipeline.addLast(new ObjectHandler);  ← -- 添加ObjectHandler,以處理普通的實現了Serializable 接口的POJO
  }

  public static final class ObjectHandler
    extends SimpleChannelInboundHandler<Serializable> {
    @Override
    public void channelRead0(
      ChannelHandlerContext channelHandlerContext,
      Serializable serializable) throws Exception {
      // Do something
    }
  }
}  

11.6.3 通過Protocol Buffers序列化

Netty序列化的最後一個解決方案是利用Protocol Buffers[11]的編解碼器,它是一種由Google公司開發的、現在已經開源的數據交換格式。可以在https://github.com/google/protobuf找到源代碼。

Protocol Buffers以一種緊湊而高效的方式對結構化的數據進行編碼以及解碼。它具有許多的編程語言綁定,使得它很適合跨語言的項目。表11-10展示了Netty為支持protobuf所提供的ChannelHandler實現。

表11-10 Protobuf編解碼器

名  稱

描  述

ProtobufDecoder

使用protobuf對消息進行解碼

ProtobufEncoder

使用protobuf對消息進行編碼

ProtobufVarint32FrameDecoder

根據消息中的Google Protocol Buffers的「Base 128 Varints」a整型長度字段值動態地分割所接收到的ByteBuf

ProtobufVarint32LengthFieldPrepender

ByteBuf前追加一個Google Protocal Buffers的「Base 128 Varints」整型的長度字段值

a.參見Google的Protocol Buffers編碼的開發者指南:[https://developers.google.com/protocol-buffers/docs/encoding](https://developers.google.com/protocol-buffers/docs/encoding)。

在這裡我們又看到了,使用protobuf只不過是將正確的ChannelHandler添加到Channel-Pipeline中,如代碼清單11-14所示。

代碼清單11-14 使用protobuf

public class ProtoBufInitializer extends ChannelInitializer< Channel> {
  private final MessageLite lite;

  public ProtoBufInitializer(MessageLite lite) {
    this.lite = lite;
  }

  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline;
    pipeline.addLast(new ProtobufVarint32FrameDecoder);   ← --  添加ProtobufVarint32FrameDecoder以分隔幀
    pipeline.addLast(new ProtobufEncoder); [12]  ← -- 添加ProtobufEncoder以處理消息的編碼
    pipeline.addLast(new ProtobufDecoder(lite));  ← -- 添加ProtobufDecoder以解碼消息
    pipeline.addLast(new ObjectHandler);   ← --  添加Object-Handler 以處理解碼消息
  }

  public static final class ObjectHandler
    extends SimpleChannelInboundHandler< Object> {
      @Override
      public void channelRead0(ChannelHandlerContext ctx, Object msg)
      throws Exception {
      // Do something with the object
    }
  }
}
  

在這一節中,我們探討了由Netty專門的解碼器和編碼器所支持的不同的序列化選項:標準JDK序列化、JBoss Marshalling以及Google的Protocol Buffers。

11.7 小結

Netty提供的編解碼器以及各種ChannelHandler可以被組合和擴展,以實現非常廣泛的處理方案。此外,它們也是被論證的、健壯的組件,已經被許多的大型系統所使用。

需要注意的是,我們只涵蓋了最常見的示例;Netty的API文檔提供了更加全面的覆蓋。

在下一章中,我們將學習另一種先進的協議——WebSocket,它被開發用以改進Web應用程序的性能以及響應性。Netty提供了你將會需要的工具,以便你快速、輕鬆地利用它強大的功能。


[1] 傳輸層安全(TLS)協議,1.2版:http://tools.ietf.org/html/rfc5246。

[2] Comet就是一個例子:http://en.wikipedia.org/wiki/Comet_%28programming%29。

[3] RFC 6455,WebSocket協議,http://tools.ietf.org/html/rfc6455。

[4] 關於WebSocket的客戶端示例,請參考Netty源代碼中所包含的例子:https://github.com/netty/netty/tree/4.1/ example/src/main/java/io/netty/example/http/websocketx/client。

[5] 有關這些協議的RFC可以在IETF的網站上找到:SMTP在www.ietf.org/rfc/rfc2821.txt,POP3在www.ietf. org/rfc/rfc1939.txt,IMAP在http://tools.ietf.org/html/rfc3501,而Telnet在http://tools.ietf.org/search/rfc854。

[6] 對於固定幀大小的協議來說,不需要將幀長度編碼到頭部。——譯者注

[7] 我們甚至可以利用io.netty.channel.ChannelProgressivePromise來實時獲取傳輸的進度。——譯者注

[8] 參見Oracle的Java SE文檔中的「JavaObject Serialization」部分:http://docs.oracle.com/javase/8/docs/technotes/ guides/serialization/。

[9] 這個類已經在Netty 3.1中廢棄,並不存在於Netty 4.x中:https://issues.jboss.org/browse/NETTY-136。——譯者注

[10] 「About JBoss Marshalling」:www.jboss.org/jbossmarshalling。

[11] 有關Protocol Buffers的描述請參考https://developers.google.com/protocol-buffers/?hl=zh。

[12] 還需要在當前的ProtobufEncoder之前添加一個相應的ProtobufVarint32LengthFieldPrepender以編碼進幀長度信息。——譯者注