讀古今文學網 > Netty實戰 > 第12章 WebSocket >

第12章 WebSocket

本章主要內容

  • 實時Web的概念
  • WebSocket協議
  • 使用Netty構建一個基於WebSocket的聊天室服務器

如果你有跟進Web技術的最新進展,你很可能就遇到過「實時Web」這個短語,而如果你在工程領域中有實時應用程序的實戰經驗,那麼你可能有點懷疑這個術語到底意味著什麼。

因此,讓我們首先澄清,這裡並不是指所謂的硬實時服務質量(QoS),硬實時服務質量是保證計算結果將在指定的時間間隔內被遞交。僅HTTP的請求/響應模式設計就使得其很難被支持,從過去所設計的各種方案中都沒有提供一種能夠提供令人滿意的解決方案的事實中便可見一斑。

雖然已經有了一些關於正式定義實時Web服務[1]語義的學術討論,但是被普遍接受的定義似乎還未出現。因此現在我們將採納下面來自維基百科的非權威性描述:

實時Web利用技術和實踐,使用戶在信息的作者發佈信息之後就能夠立即收到信息,而不需要他們或者他們的軟件週期性地檢查信息源以獲取更新。

簡而言之,雖然全面的實時Web可能並不會馬上到來,但是它背後的想法卻助長了對於幾乎瞬時獲得信息的期望。我們將在本章中討論的WebSocket[2]協議便是在這個方向上邁出的堅實的一步。

12.1 WebSocket簡介

WebSocket協議是完全重新設計的協議,旨在為Web上的雙向數據傳輸問題提供一個切實可行的解決方案,使得客戶端和服務器之間可以在任意時刻傳輸消息,因此,這也就要求它們異步地處理消息回執。(作為HTML5客戶端API的一部分,大部分最新的瀏覽器都已經支持了WebSocket。)

Netty對於WebSocket的支持包含了所有正在使用中的主要實現,因此在你的下一個應用程序中採用它將是簡單直接的。和往常使用Netty一樣,你可以完全使用該協議,而無需關心它內部的實現細節。我們將通過創建一個基於WebSocket的實時聊天應用程序來演示這一點。

12.2 我們的WebSocket示例應用程序

為了讓示例應用程序展示它的實時功能,我們將通過使用WebSocket協議來實現一個基於瀏覽器的聊天應用程序,就像你可能在Facebook的文本消息功能中見到過的那樣。我們將通過使得多個用戶之間可以同時進行相互通信,從而更進一步。

圖12-1說明了該應用程序的邏輯:

(1)客戶端發送一個消息;

(2)該消息將被廣播到所有其他連接的客戶端。

圖12-1 WebSocket應用程序邏輯

這正如你可能會預期的一個聊天室應當的工作方式:所有的人都可以和其他的人聊天。在示例中,我們將只實現服務器端,而客戶端則是通過Web頁面訪問該聊天室的瀏覽器。正如同你將在接下來的幾頁中所看到的,WebSocket簡化了編寫這樣的服務器的過程。

12.3 添加WebSocket支持

在從標準的HTTP或者HTTPS協議切換到WebSocket時,將會使用一種稱為升級握手[3]的機制。因此,使用WebSocket的應用程序將始終以HTTP/S作為開始,然後再執行升級。這個升級動作發生的確切時刻特定於應用程序;它可能會發生在啟動時,也可能會發生在請求了某個特定的URL之後。

我們的應用程序將採用下面的約定:如果被請求的URL以/ws結尾,那麼我們將會把該協議升級為WebSocket;否則,服務器將使用基本的HTTP/S。在連接已經升級完成之後,所有數據都將會使用WebSocket進行傳輸。圖12-2說明了該服務器邏輯,一如在Netty中一樣,它由一組ChannelHandler實現。我們將會在下一節中,解釋用於處理HTTP以及WebSocket協議的技術時,描述它們。

圖12-2 服務器邏輯

12.3.1 處理HTTP請求

首先,我們將實現該處理HTTP請求的組件。這個組件將提供用於訪問聊天室並顯示由連接的客戶端發送的消息的網頁。代碼清單12-1給出了這個HttpRequestHandler對應的代碼,其擴展了SimpleChannelInboundHandler以處理FullHttpRequest消息。需要注意的是,channelRead0方法的實現是如何轉發任何目標URI為/ws的請求的。

代碼清單12-1 HTTPRequestHandler

public class HttpRequestHandler
  extends SimpleChannelInboundHandler<FullHttpRequest> {   ← --  擴展SimpleChannel-InboundHandler 以處理FullHttpRequest 消息
  private final String wsUri;
  private static final File INDEX;

  static {
    URL location = HttpRequestHandler.class
      .getProtectionDomain
      .getCodeSource.getLocation;
    try {
      String path = location.toURI + "index.html";
      path = !path.contains("file:") ? path : path.substring(5);
      INDEX = new File(path);
    } catch (URISyntaxException e) {
      throw new IllegalStateException(
        "Unable to locate index.html", e);
    }
  }

   public HttpRequestHandler(String wsUri) {
    this.wsUri = wsUri;
  }

  @Override
  public void channelRead0(ChannelHandlerContext ctx,
    FullHttpRequest request) throws Exception {
    if (wsUri.equalsIgnoreCase(request.getUri)) {   ← -- ❶如果請求了WebSocket協議升級,則增加引用計數(調用retain方法),並將它傳遞給下一個ChannelInboundHandler
      ctx.fireChannelRead(request.retain);
    } else {
      if (HttpHeaders.is100ContinueExpected(request)) {  ← -- ❷ 處理100 Continue請求以符合HTTP1.1 規範
        send100Continue(ctx);
      }
      RandomAccessFile file = new RandomAccessFile(INDEX, "r");  ← -- 讀取index.html
      HttpResponse response = new DefaultHttpResponse(
        request.getProtocolVersion, HttpResponseStatus.OK);
      response.headers.set(
        HttpHeaders.Names.CONTENT_TYPE,
        "text/html; charset=UTF-8");
      boolean keepAlive = HttpHeaders.isKeepAlive(request);
      if (keepAlive) {  ← -- 如果請求了keep-alive,則添加所需要的HTTP頭信息  
        response.headers.set(
          HttpHeaders.Names.CONTENT_LENGTH, file.length);
        response.headers.set( HttpHeaders.Names.CONNECTION,
          HttpHeaders.Values.KEEP_ALIVE);
      }
      ctx.write(response);   ← -- ❸將HttpResponse寫到客戶端
      if (ctx.pipeline.get(SslHandler.class) == null) {   ← -- ❹將index.html寫到客戶端
        ctx.write(new DefaultFileRegion(
          file.getChannel, 0, file.length));
      } else {
        ctx.write(new ChunkedNioFile(file.getChannel));
      }
      ChannelFuture future = ctx.writeAndFlush(   ← -- ❺寫LastHttpContent並沖刷至客戶端
        LastHttpContent.EMPTY_LAST_CONTENT);
      if (!keepAlive) {   ← -- ❻如果沒有請求keep-alive,則在寫操作完成後關閉Channel
        future.addListener(ChannelFutureListener.CLOSE);
      }
    }
  }

  private static void send100Continue(ChannelHandlerContext ctx) {
    FullHttpResponse response = new DefaultFullHttpResponse(
      HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
    ctx.writeAndFlush(response);
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
    throws Exception {
    cause.printStackTrace;
    ctx.close;
  }
}  

如果該HTTP請求指向了地址為/ws的URI,那麼HttpRequestHandler將調用FullHttp-Request對像上的retain方法,並通過調用fireChannelRead(msg)方法將它轉發給下一個ChannelInboundHandler❶。之所以需要調用retain方法,是因為調用channelRead方法完成之後,它將調用FullHttpRequest對像上的release方法以釋放它的資源。(參見我們在第6章中對於SimpleChannelInboundHandler的討論。)

如果客戶端發送了HTTP 1.1的HTTP頭信息Expect: 100-continue,那麼Http-RequestHandler將會發送一個100 Continue❷響應。在該HTTP頭信息被設置之後,Http-RequestHandler將會寫回一個HttpResponse❸給客戶端。這不是一個FullHttp-Response,因為它只是響應的第一個部分。此外,這裡也不會調用writeAndFlush方法,在結束的時候才會調用。

如果不需要加密和壓縮,那麼可以通過將index.html❹的內容存儲到DefaultFile-Region中來達到最佳效率。這將會利用零拷貝特性來進行內容的傳輸。為此,你可以檢查一下,是否有SslHandler存在於在ChannelPipeline中。否則,你可以使用ChunkedNioFile

HttpRequestHandler將寫一個LastHttpContent❺來標記響應的結束。如果沒有請求keep-alive❻,那麼HttpRequestHandler將會添加一個ChannelFutureListener到最後一次寫出動作的ChannelFuture,並關閉該連接。在這裡,你將調用writeAndFlush方法以沖刷所有之前寫入的消息。

這部分代碼代表了聊天服務器的第一個部分,它管理純粹的HTTP請求和響應。接下來,我們將處理傳輸實際聊天消息的WebSocket幀。

WEBSOCKET幀 WebSocket以幀的方式傳輸數據,每一幀代表消息的一部分。一個完整的消息可能會包含許多幀。

12.3.2 處理WebSocket幀

由IETF發佈的WebSocket RFC,定義了6種幀,Netty為它們每種都提供了一個POJO實現。表12-1列出了這些幀類型,並描述了它們的用法。

表12-1 WebSocketFrame的類型

幀 類 型

描  述

BinaryWebSocketFrame

包含了二進制數據

TextWebSocketFrame

包含了文本數據

ContinuationWebSocketFrame

包含屬於上一個BinaryWebSocketFrameTextWebSocket- Frame的文本數據或者二進制數據

CloseWebSocketFrame

表示一個CLOSE請求,包含一個關閉的狀態碼和關閉的原因

PingWebSocketFrame

請求傳輸一個PongWebSocketFrame

PongWebSocketFrame

作為一個對於PingWebSocketFrame的響應被發送

我們的聊天應用程序將使用下面幾種幀類型:

  • CloseWebSocketFrame;
  • PingWebSocketFrame;
  • PongWebSocketFrame;
  • TextWebSocketFrame。

TextWebSocketFrame是我們唯一真正需要處理的幀類型。為了符合WebSocket RFC,Netty提供了WebSocketServerProtocolHandler來處理其他類型的幀。

代碼清單12-2展示了我們用於處理TextWebSocketFrameChannelInboundHandler,其還將在它的ChannelGroup中跟蹤所有活動的WebSocket連接。

代碼清單12-2 處理文本幀

public class TextWebSocketFrameHandler
  extends SimpleChannelInboundHandler<TextWebSocketFrame> {   ← --  擴展SimpleChannelInboundHandler,並處理TextWebSocketFrame 消息
  private final ChannelGroup group;

  public TextWebSocketFrameHandler(ChannelGroup group) {
    this.group = group;
  }

  @Override
  public void userEventTriggered(ChannelHandlerContext ctx,  ← --  重寫userEventTriggered方法以處理自定義事件
    Object evt) throws Exception {
    if (evt == WebSocketServerProtocolHandler
      .ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
      ctx.pipeline.remove(HttpRequestHandler.class);   ← --  如果該事件表示握手成功,則從該Channelipeline中移除Http-RequestHandler,因為將不會接收到任何HTTP 消息了
      group.writeAndFlush(new TextWebSocketFrame(   ← --  ❶通知所有已經連接的WebSocket 客戶端新的客戶端已經連接上了
         "Client " + ctx.channel + " joined"));  ← -- ❷將新的WebSocket Channel添加到ChannelGroup 中,以便它可以接收到所有的消息
      group.add(ctx.channel);  
    } else {
      super.userEventTriggered(ctx, evt);
    }
   }

   @Override
   public void channelRead0(ChannelHandlerContext ctx,
     TextWebSocketFrame msg) throws Exception {
     group.writeAndFlush(msg.retain);   ← -- ❸增加消息的引用計數,並將它寫到ChannelGroup 中所有已經連接的客戶端
  }
}  

TextWebSocketFrameHandler只有一組非常少量的責任。當和新客戶端的WebSocket握手成功完成之後❶,它將通過把通知消息寫到ChannelGroup中的所有Channel來通知所有已經連接的客戶端,然後它將把這個新Channel加入到該ChannelGroup中❷。

如果接收到了TextWebSocketFrame消息❸,TextWebSocketFrameHandler將調用TextWebSocketFrame消息上的retain方法,並使用writeAndFlush方法來將它傳輸給ChannelGroup,以便所有已經連接的WebSocket Channel都將接收到它。

和之前一樣,對於retain方法的調用是必需的,因為當channelRead0方法返回時,TextWebSocketFrame的引用計數將會被減少。由於所有的操作都是異步的,因此,writeAnd-Flush方法可能會在channelRead0方法返回之後完成,而且它絕對不能訪問一個已經失效的引用。

因為Netty在內部處理了大部分剩下的功能,所以現在剩下唯一需要做的事情就是為每個新創建的Channel初始化其ChannelPipeline。為此,我們將需要一個ChannelInitializer

12.3.3 初始化ChannelPipeline

正如你已經學習到的,為了將ChannelHandler安裝到ChannelPipeline中,你擴展了ChannelInitializer,並實現了initChannel方法。代碼清單12-3展示了由此生成的ChatServerInitializer的代碼。

代碼清單12-3 初始化ChannelPipeline

public class ChatServerInitializer extends ChannelInitializer<Channel> {   ← --  擴展了ChannelInitializer
  private final ChannelGroup group;

  public ChatServerInitializer(ChannelGroup group) {
    this.group = group;
  }

  @Override
  protected void initChannel(Channel ch) throws Exception {   ← -- 將所有需要的ChannelHandler 添加到ChannelPipeline 中
    ChannelPipeline pipeline = ch.pipeline;
    pipeline.addLast(new HttpServerCodec);
    pipeline.addLast(new ChunkedWriteHandler);
    pipeline.addLast(new HttpObjectAggregator(64 * 1024));
    pipeline.addLast(new HttpRequestHandler("/ws"));
    pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
    pipeline.addLast(new TextWebSocketFrameHandler(group));
  }
}  

對於initChannel方法的調用,通過安裝所有必需的ChannelHandler來設置該新註冊的ChannelChannelPipeline。這些ChannelHandler以及它們各自的職責都被總結在了表12-2中。

表12-2 基於WebSocket聊天服務器的ChannelHandler

ChannelHandler

職  責

HttpServerCodec

將字節解碼為HttpRequestHttpContentLastHttp- Content。並將HttpRequestHttpContentLast- HttpContent編碼為字節

ChunkedWriteHandler

寫入一個文件的內容

HttpObjectAggregator

將一個HttpMessage和跟隨它的多個HttpContent聚合為單個FullHttpRequest或者FullHttpResponse(取決於它是被用來處理請求還是響應)。安裝了這個之後,ChannelPipeline中的下一個ChannelHandler將只會收到完整的HTTP請求或響應

HttpRequestHandler

處理FullHttpRequest(那些不發送到/ws URI的請求)

WebSocketServerProtocolHandler

按照WebSocket規範的要求,處理WebSocket升級握手、PingWebSocketFramePongWebSocketFrameCloseWebSocketFrame

TextWebSocketFrameHandler

處理TextWebSocketFrame和握手完成事件

Netty的WebSocketServerProtocolHandler處理了所有委託管理的WebSocket幀類型以及升級握手本身。如果握手成功,那麼所需的ChannelHandler將會被添加到ChannelPipeline中,而那些不再需要的ChannelHandler則將會被移除。

WebSocket協議升級之前的ChannelPipeline的狀態如圖12-3所示。這代表了剛剛被ChatServerInitializer初始化之後的ChannelPipeline

圖12-3 WebSocket協議升級之前的ChannelPipeline

當WebSocket協議升級完成之後,WebSocketServerProtocolHandler將會把Http-RequestDecoder替換為WebSocketFrameDecoder,把HttpResponseEncoder替換為WebSocketFrameEncoder。為了性能最大化,它將移除任何不再被WebSocket連接所需要的ChannelHandler。這也包括了圖12-3所示的 HttpObjectAggregatorHttpRequest-Handler

圖12-4展示了這些操作完成之後的ChannelPipeline。需要注意的是,Netty目前支持4個版本的WebSocket協議,它們每個都具有自己的實現類。Netty將會根據客戶端(這裡指瀏覽器)所支持的版本[4],自動地選擇正確版本的WebSocketFrameDecoderWebSocket-FrameEncoder

圖12-4 WebSocket協議升級完成之後的ChannelPipeline

12.3.4 引導

這幅拼圖最後的一部分是引導該服務器,並安裝ChatServerInitializer的代碼。這將由ChatServer類處理,如代碼清單12-4所示。

代碼清單12-4 引導服務器

public class ChatServer {
  private final ChannelGroup channelGroup =
    new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);   ← --  創建DefaultChannelGroup,其將保存所有已經連接的WebSocket Channel
  private final EventLoopGroup group = new NioEventLoopGroup;
  private Channel channel;

  public ChannelFuture start(InetSocketAddress address) {  ← --  引導服務器
    ServerBootstrap bootstrap = new ServerBootstrap; 
    bootstrap.group(group)
      .channel(NioServerSocketChannel.class)
      .childHandler(createInitializer(channelGroup));
    ChannelFuture future = bootstrap.bind(address);
    future.syncUninterruptibly;
    channel = future.channel;
    return future;
  }

  protected ChannelInitializer<Channel> createInitializer(   ← --  創建ChatServerInitializer
     ChannelGroup group) {
    return new ChatServerInitializer(group);
  }

  public void destroy {   ← --  處理服務器關閉,並釋放所有的資源
    if (channel != null) {
      channel.close;
    }
    channelGroup.close;
    group.shutdownGracefully;
  }

  public static void main(String args) throws Exception {
    if (args.length != 1) {
      System.err.println("Please give port as argument");
      System.exit(1);
    }
    int port = Integer.parseInt(args[0]);
    final ChatServer endpoint = new ChatServer;
    ChannelFuture future = endpoint.start(
      new InetSocketAddress(port));
    Runtime.getRuntime.addShutdownHook(new Thread {
      @Override
      public void run {
        endpoint.destroy;
      }
    });
    future.channel.closeFuture.syncUninterruptibly;
  }
}  

這也就完成了該應用程序本身。現在讓我們來測試它吧。

12.4 測試該應用程序

目錄chapter12中的示例代碼包含了你需要用來構建並運行該服務器的所有資源。(如果你還沒有設置好你的包括Apache Maven在內的開發環境,參見第2章中的操作說明。)

我們將使用下面的Maven命令來構建和啟動服務器:

mvn -PChatServer clean package exec:exec  

項目文件pom.xml被配置為在端口9999上啟動服務器。如果要使用不同的端口,可以通過編輯文件中對應的值,或者使用一個System屬性來對它進行重寫:

mvn -PChatServer -Dport=1111 clean package exec:exec  

代碼清單12-5展示了該命令主要的輸出(無關緊要的行已經被刪除了)。

代碼清單12-5 編譯並運行ChatServer

$ mvn -PChatServer clean package exec:exec
[INFO] Scanning for projects...
[INFO]
[INFO] ----------------------------------------------------------------
[INFO] Building ChatServer 1.0-SNAPSHOT
[INFO] ----------------------------------------------------------------
...
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action ---
[INFO] Building jar: target/chat-server-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ chat-server ---
Starting ChatServer on port 9999  

你通過將自己的瀏覽器指向http://localhost:9999來訪問該應用程序。圖12-5展示了其在Chrome瀏覽器中的UI。

圖中展示了兩個已經連接的客戶端。第一個客戶端是使用上面的界面連接的,第二個客戶端則是通過底部的Chrome瀏覽器的命令行工具連接的。[5]你會注意到,兩個客戶端都發送了消息,並且每個消息都顯示在兩個客戶端中。

這是一個非常簡單的演示,演示了WebSocket如何在瀏覽器中實現實時通信。

圖12-5 基於WebSocket的ChatServer的演示

如何進行加密

在真實世界的場景中,你將很快就會被要求向該服務器添加加密。使用Netty,這不過是將一個SslHandler添加到ChannelPipeline中,並配置它的問題。代碼清單12-6展示了如何通過擴展我們的ChatServerInitializer來創建一個SecureChatServerInitializer以完成這個需求。

代碼清單12-6 為ChannelPipeline添加加密

public class SecureChatServerInitializer extends ChatServerInitializer {   ← --  擴展ChatServerInitializer以添加加密
   private final SslContext context;

  public SecureChatServerInitializer(ChannelGroup group,
    SslContext context) {
    super(group);
    this.context = context;
  }

  @Override
  protected void initChannel(Channel ch) throws Exception {
    super.initChannel(ch);  ← --  調用父類的initChannel方法
    SSLEng.ine engine = context.newEngine(ch.alloc);
    engine.setUseClientMode(false);
    ch.pipeline.addFirst(new SslHandler(engine));  ← --  將SslHandler 添加到ChannelPipeline 中  
  }
}  

最後一步是調整ChatServer以使用SecureChatServerInitializer,以便在Channel-Pipeline中安裝SslHandler。這給了我們代碼清單12-7中所展示的SecureChatServer

代碼清單12-7 向ChatServer添加加密

public class SecureChatServer extends ChatServer {  ← --   SecureChatServer 擴展ChatServer 以支持加密
  private final SslContext context;

  public SecureChatServer(SslContext context) {
    this.context = context;
  }

  @Override
  protected ChannelInitializer<Channel> createInitializer(
    ChannelGroup group) {
    return new SecureChatServerInitializer(group, context);   ← --   返回之前創建的SecureChatServer-Initializer 以啟用加密
  }

  public static void main(String args) throws Exception {
    if (args.length != 1) {
      System.err.println("Please give port as argument");
      System.exit(1);
    }
    int port = Integer.parseInt(args[0]);
    SelfSignedCertificate cert = new SelfSignedCertificate;
    SslContext context = SslContext.newServerContext(
    cert.certificate, cert.privateKey);

    final SecureChatServer endpoint = new SecureChatServer(context);
    ChannelFuture future = endpoint.start(new InetSocketAddress(port));
    Runtime.getRuntime.addShutdownHook(new Thread {
      @Override
      public void run {
        endpoint.destroy;
      }
    });
    future.channel.closeFuture.syncUninterruptibly;
  }
}  

這就是為所有的通信啟用SSL/TLS加密需要做的全部。和之前一樣,可以使用Apache Maven來運行該應用程序,如代碼清單12-8所示。它還將檢索任何所需的依賴。

代碼清單12-8 啟動SecureChatServer

$ mvn -PSecureChatServer clean package exec:exec
[INFO] Scanning for projects...
[INFO]
[INFO] ----------------------------------------------------------------
[INFO] Building ChatServer 1.0-SNAPSHOT
[INFO] ----------------------------------------------------------------
...
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action ---
[INFO] Building jar: target/chat-server-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ chat-server ---
Starting SecureChatServer on port 9999  

現在,你便可以從SecureChatServer的HTTPS URL地址https://localhost:9999訪問它了。

12.5 小結

在本章中,你學習了如何使用Netty的WebSocket實現來管理Web應用程序中的實時數據。我們覆蓋了其所支持的數據類型,並討論了你可能會遇到的一些限制。儘管不可能在所有的情況下都使用WebSocket,但是仍然需要清晰地認識到,它代表了Web技術的一個重要進展。


[1] 「Real-time Web Services Orchestration and Choreography」:http://ceur-ws.org/Vol-601/EOMAS10_paper13.pdf。

[2] IETF RFC 6455, The WebSocket Protocol: http://tools.ietf.org/html/rfc6455。

[3] Mozilla開發者網絡,「Protocol upgrade mechanism」:https://developer.mozilla.org/en-US/docs/HTTP/ Protocol_upgrade_mechanism。

[4] 在這個例子中,我們假設使用了13版的WebSocket協議,所以圖中展示的是WebSocketFrameDecoder13WebSocketFrameEncoder13

[5] 也可以通過在一個新的瀏覽器中訪問http://localhost:9999來達到同樣的目的,從而代替Chrome瀏覽器的開發者工具。——譯者注