讀古今文學網 > Netty實戰 > 第15章 案例研究,第二部分 >

第15章 案例研究,第二部分

本章主要內容

  • Facebook的案例研究

  • Twitter的案例研究

在本章中,我們將看到Facebook和Twitter(兩個最流行的社交網絡)是如何使用Netty的。他們都利用了Netty靈活和通用的設計來構建框架和服務,以滿足對極端伸縮性以及可擴展性的需求。

這裡所呈現的案例研究都是由那些負責設計和實現所述解決方案的工程師所撰寫的。

15.1 Netty在Facebook的使用:Nifty和Swift[1]

Andrew Cox,Facebook軟件工程師

在Facebook,我們在我們的幾個後端服務中使用了Netty(用於處理來自手機應用程序的消息通信、用於HTTP客戶端等),但是我們增長最快的用法還是通過我們所開發的用來構建Java的Thrift服務的兩個新框架:Nifty和Swift。

15.1.1 什麼是Thrift

Thrift是一個用來構建服務和客戶端的框架,其通過遠程過程調用(RPC)來進行通信。它最初是在Facebook開發的[2],用以滿足我們構建能夠處理客戶端和服務器之間的特定類型的接口不匹配的服務的需要。這種方式十分便捷,因為服務器和它們的客戶端通常不能全部同時升級。

Thrift的另一個重要的特點是它可以被用於多種語言。這使得在Facebook的團隊可以為工作選擇正確的語言,而不必擔心他們是否能夠找到和其他的服務相互交互的客戶端代碼。在Facebook,Thrift已經成為我們的後端服務之間相互通信的主要方式之一,同時它還被用於非RPC的序列化任務,因為它提供了一個通用的、緊湊的存儲格式,能夠被多種語言讀取,以便後續處理。

自從Thrift在Facebook被開發以來,它已經作為一個Apache項目(http://thrift.apache.org/)開源了,在那裡它將繼續成長以滿足服務開發人員的需要,不止在Facebook有使用,在其他公司也有使用,如Evernote和last.fm[3],以及主要的開源項目如Apache Cassandra和HBase等。

下面是Thrift的主要組件:

  • Thrift的接口定義語言(IDL)——用來定義你的服務,並且編排你的服務將要發送和接收的任何自定義類型;
  • 協議——用來控制將數據元素編碼/解碼為一個通用的二進制格式(如Thrift的二進制協議或者JSON);
  • 傳輸——提供了一個用於讀/寫不同媒體(如TCP套接字、管道、內存緩衝區)的通用接口;
  • Thrift編譯器——解析Thrift的IDL文件以生成用於服務器和客戶端的存根代碼,以及在IDL中定義的自定義類型的序列化/反序列化代碼;
  • 服務器實現——處理接受連接、從這些連接中讀取請求、派發調用到實現了這些接口的對象,以及將響應發回給客戶端;
  • 客戶端實現——將方法調用轉換為請求,並將它們發送給服務器。

15.1.2 使用Netty改善Java Thrift的現狀

Thrift的Apache分發版本已經被移植到了大約20種不同的語言,而且還有用於其他語言的和Thrift相互兼容的獨立框架(Twitter的用於Scala的Finagle便是一個很好的例子)。這些語言中的一些在Facebook多多少少有被使用,但是在Facebook最常用的用來編寫Thrift服務的還是C++和Java。

當我加入Facebook時,我們已經在使用C++圍繞著libevent,順利地開發可靠的、高性能的、異步的Thrift實現了。通過libevent,我們得到了OS API之上的跨平台的異步I/O抽像,但是libevent並不會比,比如說,原始的Java NIO,更加容易使用。因此,我們也在其上構建了抽像,如異步的消息通道,同時我們還使用了來自Folly[4]的鏈式緩衝區盡可能地避免複製。這個框架還具有一個支持帶有多路復用的異步調用的客戶端實現,以及一個支持異步的請求處理的服務器實現。(該服務器可以啟動一個異步任務來處理請求並立即返回,隨後在響應就緒時調用一個回調或者稍後設置一個Future。)

同時,我們的Java Thrift框架卻很少受到關注,而且我們的負載測試工具顯示Java版本的性能明顯落後於C++版本。雖然已經有了構建於NIO之上的Java Thrift框架,並且異步的基於NIO的客戶端也可用。但是該客戶端不支持流水線化以及請求的多路復用,而服務器也不支持異步的請求處理。由於這些缺失的特性,在Facebook,這裡的Java Thrift服務開發人員遇到了那些在C++(的Thrift框架)中已經解決了的問題,並且它也成為了挫敗感的源泉。

我們本來可以在NIO之上構建一個類似的自定義框架,並在那之上構建我們新的Java Thrift實現,就如同我們為C++版本的實現所做的一樣。但是經驗告訴我們,這需要巨大的工作量才能完成,不過碰巧,我們所需要的框架已經存在了,只等著我們去使用它:Netty。

我們很快地組裝了一個服務器實現,並且將名字「Netty」和「Thrift」混在一起,為新的服務器實現提出了「Nifty」這個名字。相對於在C++版本中達到同樣的效果我們所需要做的一切,那麼少的代碼便可以使得Nifty工作,這立即就讓人印象深刻。

接下來,我們使用Nifty構建了一個簡單的用於負載測試的Thrift服務器,並且使用我們的負載測試工具,將它和我們現有的服務器進行了對比。結果是顯而易見的:Nifty的表現要優於其他的NIO服務器,而且和我們最新的C++版本的Thrift服務器的結果也不差上下。使用Netty就是為了要提高性能!

15.1.3 Nifty服務器的設計

Nifty(https://github.com/facebook/nifty)是一個開源的、使用Apache許可的、構建於Apache Thrift庫之上的Thrift客戶端/服務器實現。它被專門設計,以便無縫地從任何其他的Java Thrift服務器實現遷移過來:你可以重用相同的Thrift IDL文件、相同的Thrift代碼生成器(與Apache Thrift庫打包在一起),以及相同的服務接口實現。唯一真正需要改變的只是你的服務器的啟動代碼(Nifty的設置風格與Apache Thrift中的傳統的Thrift服務器實現稍微有所不同)。

1.Nifty的編碼器/解碼器

默認的Nifty服務器能處理普通消息或者分幀消息(帶有4字節的前綴)。它通過使用自定義的Netty幀解碼器做到了這一點,其首先查看前幾個字節,以確定如何對剩餘的部分進行解碼。然後,當發現了一個完整的消息時,解碼器將會把消息的內容和一個指示了消息類型的字段包裝在一起。服務器隨後將會根據該字段來以相同的格式對響應進行編碼。

Nifty還支持接駁你自己的自定義編解碼器。例如,我們的一些服務使用了自定義的編解碼器來從客戶端在每條消息前面所插入的頭部中讀取額外的信息(包含可選的元數據、客戶端的能力等)。解碼器也可以被方便地擴展以處理其他類型的消息傳輸,如HTTP。

2.在服務器上排序響應

Java Thrift的初始版本使用了OIO套接字,並且服務器為每個活動連接都維護了一個線程。使用這種設置,在下一個響應被讀取之前,每個請求都將在同一個線程中被讀取、處理和應答。這保證了響應將總會以對應的請求所到達的順序返回。

較新的異步I/O的服務器實現誕生了,其不需要每個連接一個線程,而且這些服務器可以處理更多的並發連接,但是客戶端仍然主要使用同步I/O,因此服務器可以期望它在發送當前響應之前,不會收到下一個請求。這個請求/執行流如圖15-1所示。

圖15-1 同步的請求/響應流

客戶端最初的偽異步用法開始於一些Thrift用戶利用的一項事實:對於一個生成的客戶端方法foo來說,方法send_foorecv_foo將會被單獨暴露出來。這使得Thrift用戶可以發送多個請求(無論是在多個客戶端上,還是在同一個客戶端上),然後調用相應的接收方法來開始等待並收集結果。

在這個新的場景下,服務器可能會在它完成處理第一個請求之前,從單個客戶端讀取多個請求。在一個理想的世界中,我們可以假設所有流水線化請求的異步Thrift客戶端都能夠處理以任意順序到達的這些請求所對應的響應。然而,在現實生活中,雖然新的客戶端可以處理這種情況,而那些舊一點的異步Thrift客戶端可能會寫出多個請求,但是必須要求按順序接收響應。

這種問題可以通過使用Netty 4的EventExecutor或者Netty 3.x中的OrderedMemory-AwareThreadPoolExcecutor解決,其能夠保證順序地處理同一個連接上的所有傳入消息,而不會強制所有這些消息都在同一個執行器線程上運行。

圖15-2展示了流水線化的請求是如何被以正確的順序處理的,這也就意味著對應於第一個請求的響應將會被首先返回,然後是對應於第二個請求的響應,以此類推。

圖15-2 對於流水線化的請求的順序化處理的請求/響應流

儘管Nifty有著特殊的要求:我們的目標是以客戶端能夠處理的最佳的響應順序服務於每個客戶端。我們希望允許用於來自於單個連接上的多個流水線化的請求的處理器能夠被並行處理,但是那樣我們又控制不了這些處理器完成的先後順序。

相反,我們使用了一種涉及緩衝響應的方案;如果客戶端要求保持順序的響應,我們將會緩衝後續的響應,直到所有較早的響應也可用,然後我們將按照所要求的順序將它們一起發送出去。見圖15-3所示。

圖15-3 對於流水線化的請求的並行處理的請求/響應流

當然,Nifty包括了實實在在支持無序響應的異步Channel(可以通過Swift使用)。當使用能夠讓客戶端通知服務器此客戶端的能力的自定義的傳輸時,服務器將會免除緩衝響應的負擔,並且將以請求完成的任意順序把它們發送回去。

15.1.4 Nifty異步客戶端的設計

Nifty的客戶端開發主要集中在異步客戶端上。Nifty實際上也提供了一個針對Thrift的同步傳輸接口的Netty實現,但是它的使用相當受限,因為相對於Thrift所提供的標準的套接字傳輸,它並沒有太多的優勢。因此,用戶應該盡可能地使用異步客戶端。

1.流水線化

Thrift庫擁有它自己的基於NIO的異步客戶端實現,但是我們想要的一個特性是請求的流水線化。流水線化是一種在同一連接上發送多個請求,而不需要等待其響應的能力。如果服務器有空閒的工作線程,那麼它便可以並行地處理這些請求,但是即使所有的工作線程都處於忙綠狀態,流水線化仍然可以有其他方面的裨益。服務器將會花費更少的時間來等待讀取數據,而客戶端則可以在一個單一的TCP數據包裡一起發送多個小請求,從而更好地利用網絡帶寬。

使用Netty,流水線化水到渠成。Netty做了所有管理各種NIO選擇鍵的狀態的艱澀的工作,Nifty則可以專注於解碼請求以及編碼響應。

2.多路復用

隨著我們的基礎設施的增長,我們開始看到在我們的服務器上建立起來了大量的連接。多路復用(為所有的連接來自於單一的來源的Thrift客戶端共享連接)可以幫助減輕這種狀況。但是在需要按序響應的客戶端連接上進行多路復用會導致一個問題:該連接上的客戶端可能會招致額外的延遲,因為它的響應必須要跟在對應於其他共享該連接的請求的響應之後。

基本的解決方案也相當簡單:Thrift已經在發送每個消息時都捎帶了一個序列標識符,所以為了支持無序響應,我們只需要客戶端Channel維護一個從序列ID到響應處理器的一個映射,而不是一個使用隊列。

但是問題的關鍵在於,在標準的同步Thrift客戶端中,協議層將負責從消息中提取序列標識符,再由協議層協議調用傳輸層,而不是其他的方式。

對於同步客戶端來說,這種簡單的流程(如圖15-4所示)能夠良好地工作,其協議層可以在傳輸層上等待,以實際接收響應,但是對於異步客戶端來說,其控制流就變得更加複雜了。客戶端調用將會被分發到Swift庫中,其將首先要求協議層將請求編碼到一個緩衝區,然後將編碼請求緩衝區傳遞給Nifty的Channel以便被寫出。當該Channel收到來自服務器的響應時,它將會通知Swift庫,其將再次使用協議層以對響應緩衝區進行解碼。這就是圖15-5中所展示的流程。

圖15-4 多路復用/傳輸層

圖15-5 派發

15.1.5 Swift:一種更快的構建Java Thrift服務的方式

我們新的Java Thrift框架的另一個關鍵部分叫作Swift。它使用了Nifty作為它的I/O引擎,但是其服務規範可以直接通過Java註解來表示,使得Thrift服務開發人員可以純粹地使用Java進行開發。當你的服務啟動時,Swift運行時將通過組合使用反射以及解析Swift的註解來收集所有相關服務以及類型的信息。通過這些信息,它可以構建出和Thrift編譯器在解析Thrift IDL文件時構建的模型一樣的模型。然後,它將使用這個模型,並通過從字節碼生成用於序列化/反序列化這些自定義類型的新類,來直接運行服務器以及客戶端(而不需要任何生成的服務器或者客戶端的存根代碼)。

跳過常規的Thrift代碼生成,還能使添加新功能變得更加輕鬆,而無需修改IDL編譯器,所以我們的許多新功能(如異步客戶端)都是首先在Swift中得到支持。如果你感興趣,可以查閱Swift的GitHub頁面(https://github.com/facebook/swift)上的介紹信息。

15.1.6 結果

在下面的各節中,我們將量化一些我們使用Netty的過程中所觀察到的一些成果。

1.性能比較

一種測量Thrift服務器性能的方式是對於空操作的基準測試。這種基準測試使用了長時間運行的客戶端,這些客戶端不間斷地對發送回空響應的服務器進行Thrift調用。雖然這種測量方式對於大部分的實際Thrift服務來說,不是真實意義上的性能測試,但是它仍然很好地度量了Thrift服務的最大潛能,而且提高這一基準,通常也就意味著減少了該框架本身的CPU使用。

如表15-1所示,在這個基準測試下,Nifty的性能優於所有其他基於NIO的Thrift服務器(TNonblockingServer、TThreadedSelectorServer以及TThreadPoolServer)的實現。它甚至輕鬆地擊敗了我們以前的Java服務器實現(我們內部使用的一個Nifty之前的服務器實現,基於原始的NIO以及直接緩衝區)。

表15-1 不同實現的基準測試結果

Thrift服務器實現

空操作請求/秒

TNonblockingServer

~68 000

TThreadedSelectorServer

188 000

TThreadPoolServer

867 000

較老的Java服務器(使用NIO和直接緩衝區)

367 000

Nifty

963 000

較老的基於libevent的C++服務器

895 000

下一代基於libevent的C++服務器

1 150 000

我們所測試過的唯一能夠和Nifty相提並論的Java服務器是TThreadPoolServer。這個服務器實現使用了原始的OIO,並且在一個專門的線程上運行每個連接。這使得它在處理少量的連接時表現不錯;然而,使用OIO,當你的服務器需要處理大量的並發連接時,你將很容易遇到伸縮性問題。

Nifty甚至擊敗了之前的C++服務器實現,這是我們開始開發Nifty時最奪目的一點,雖然它相對於我們的下一代C++服務器框架還有一些差距,但至少也大致相當。

2.穩定性問題的例子

在Nifty之前,我們在Facebook的許多主要的Java服務都使用了一個較老的、自定義的基於NIO的Thrift服務器實現,它的工作方式類似於Nifty。該實現是一個較舊的代碼庫,有更多的時間成熟,但是由於它的異步I/O處理代碼是從零開始構建的,而且因為Nifty是構建在Netty的異步I/O框架的堅實基礎之上的,所以(相比之下)它的問題也就少了很多。

我們的一個自定義的消息隊列服務是基於那個較舊的框架構建的,而它開始遭受一種套接字洩露。大量的連接都停留在了CLOSE_WAIT狀態,這意味著服務器接收了客戶端已經關閉了套接字的通知,但是服務器從來不通過其自身的調用來關閉套接字進行回應。這使得這些套接字都停滯在了CLOSE_WAIT狀態。

問題發生得很慢;在處理這個服務的整個機器集群中,每秒可能有數以百萬計的請求,但是通常在一個服務器上只有一個套接字會在一個小時之內進入這個狀態。這不是一個迫在眉睫的問題,因為在那種速率下,在一個服務器需要重啟前,將需要花費很長的時間,但是這也複雜化了追查原因的過程。徹底地挖掘代碼也沒有帶來太大的幫助:最初的幾個地方看起來可疑,但是最終都被排除了,而我們也並沒有定位到問題所在。

最終,我們將該服務遷移到了Nifty之上。轉換(包括在預發環境中進行測試)花了不到一天的時間,而這個問題就此消失了。使用Nifty,我們就真的再也沒見過類似的問題了。

這只是在直接使用NIO時可能會出現的微妙bug的一個例子,而且它類似於那些在我們的C++ Thrift框架穩定的過程中,不得不一次又一次地解決的bug。但是我認為這是一個很好的例子,它說明了通過使用Netty是如何幫助我們利用它多年來收到的穩定性修復的。

3.改進C++實現的超時處理

Netty還通過為改進我們的C++框架提供一些啟發間接地幫助了我們。一個這樣的例子是基於散列輪的計時器。我們的C++框架使用了來自於libevent的超時事件來驅動客戶端以及服務器的超時,但是為每個請求都添加一個單獨的超時被證明是十分昂貴的,因此我們一直都在使用我們稱之為超時集的東西。其思想是:一個到特定服務的客戶端連接,對於由該客戶端發出的每個請求,通常都具有相同的接收超時,因此對於一組共享了相同的時間間隔的超時集合,我們僅維護一個真正的計時器事件。每個新的超時都將被保證會在對於該超時集合的現存的超時被調度之後觸發,因此當每個超時過期或者被取消時,我們將只會安排下一個超時。

然而,我們的用戶偶爾想要為每個調用都提供單獨的超時,為在相同連接上的不同的請求設置不同的超時值。在這種情況下,使用超時集合的好處就消失了,因此我們嘗試了使用單獨的計時器事件。在大量的超時被同時調度時,我們開始看到了性能問題。我們知道Nifty不會碰到這個問題,除了它不使用超時集的這個事實—— Netty通過它的HashedWheelTimer[5]解決了該問題。因此,帶著來自Netty的靈感,我們為我們的C++ Thrift框架添加了一個基於散列輪的計時器,並解決了可變的每請求(per-request)超時時間間隔所帶來的性能問題。

4.未來基於Netty 4的改進

Nifty目前運行在Netty 3上,這對我們來說已經很好了,但是我們已經有一個基於Netty 4的移植版本準備好了,現在第4版的Netty已經穩定下來了,我們很快就會遷移過去。我們熱切地期待著Netty 4的API將會帶給我們的一些益處。

一個我們計劃如何更好地利用Netty 4的例子是實現更好地控制哪個線程將管理一個給定的連接。我們希望使用這項特性,可以使服務器的處理器方法能夠從和該服務器調用所運行的I/O線程相同的線程開始異步的客戶端調用。這是那些專門的C++服務器(如Thrift請求路由器)已經能夠利用的特性。

從該例子延伸開來,我們也期待著能夠構建更好的客戶端連接池,使得能夠把現有的池化連接遷移到期望的I/O工作線程上,這在第3版的Netty中是不可能做到的。

15.1.7 Facebook小結

在Netty的幫助下,我們已經能夠構建更好的Java服務器框架了,其幾乎能夠與我們最快的C++ Thrift服務器框架的性能相媲美。我們已經將我們現有的一些主要的Java服務遷移到了Nifty,並解決了一些令人討厭的穩定性和性能問題,同時我們還開始將一些來自Netty,以及Nifty和Swift開發過程中的思想,反饋到提高C++ Thrift的各個方面中。

不僅如此,使用Netty是令人愉悅的,並且它已經添加了大量的新特性,例如,對於Thrift客戶端的內置SOCKS支持來說,添加起來小菜一碟。

但是我們並不止步於此。我們還有大量的性能調優工作要做,以及針對將來的大量的其他方面的改進計劃。如果你對使用Java進行Thrift開發感興趣,一定要關注哦!

15.2 Netty在Twitter的使用:Finagle

Jeff Smick,Twitter軟件工程師

Finagle是Twitter構建在Netty之上的容錯的、協議不可知的RPC框架。從提供用戶信息、推特以及時間線的後端服務到處理HTTP請求的前端API端點,所有組成Twitter架構的核心服務都建立在Finagle之上。

15.2.1 Twitter成長的煩惱

Twitter最初是作為一個整體式的Ruby On Rails應用程序構建的,我們半親切地稱之為Monorail。隨著Twitter開始經歷大規模的成長,Ruby運行時以及Rails框架開始成為瓶頸。從計算機的角度來看,Ruby對資源的利用是相對低效的。從開發的角度來看,該Monorail開始變得難以維護。對一個部分的代碼修改將會不透明地影響到另外的部分。代碼的不同方面的所屬權也不清楚。無關核心業務對象的小改動也需要一次完整的部署。核心業務對象也沒有暴露清晰的API,其加劇了內部結構的脆弱性以及發生故障的可能性。

我們決定將該Monorail分拆為不同的服務,明確歸屬人並且精簡API,使迭代更快速,維護更容易。每個核心業務對象都將由一個專門的團隊維護,並且由它自己的服務提供支撐。公司內部已經有了在JVM上進行開發的先例—幾個核心的服務已經從該Monorail中遷移出去,並已經用Scala重寫了。我們的運維團隊也有運維JVM服務的背景,並且知道如何運維它們。鑒於此,我們決定使用Java或者Scala在JVM上構建所有的新服務。大多數的服務開發團隊都決定選用Scala作為他們的JVM語言。

15.2.2 Finagle的誕生

為了構建出這個新的架構,我們需要一個高性能的、容錯的、協議不可知的、異步的RPC框架。在面向服務的架構中,服務花費了它們大部分的時間來等待來自其他上游的服務的響應。使用異步的庫使得服務可以並發地處理請求,並且充分地利用硬件資源。儘管Finagle可以直接建立在NIO之上,但是Netty已經解決了許多我們可能會遇到的問題,並且它提供了一個簡潔、清晰的API。

Twitter構建在幾種開源的協議之上,主要是HTTP、Thrift、Memcached、MySQL以及Redis。我們的網絡棧需要具備足夠的靈活性,能夠和任何的這些協議進行交流,並且具備足夠的可擴展性,以便我們可以方便地添加更多的協議。Netty並沒有綁定任何特定的協議。向它添加協議就像創建適當的ChannelHandler一樣簡單。這種擴展性也催生了許多社區驅動的協議實現,包括SPDY、PostgreSQL、WebSockets、IRC以及AWS[6]。

Netty的連接管理以及協議不可知的特性為構建Finagle提供了絕佳的基礎。但是我們也有一些其他的Netty不能開箱即滿足的需求,因為那些需求都更高級。客戶端需要連接到服務器集群,並且需要做跨服務器集群的負載均衡。所有的服務都需要暴露運行指標(請求率、延遲等),其可以為調試服務的行為提供有價值的數據。在面向服務的架構中,一個單一的請求都可能需要經過數十種服務,使得如果沒有一個由Dapper啟發的跟蹤框架,調試性能問題幾乎是不可能的[7]。Finagle正是為了解決這些問題而構建的。

15.2.3 Finagle是如何工作的

Finagle的內部結構是非常模塊化的。組件都是先獨立編寫,然後再堆疊在一起。根據所提供的配置,每個組件都可以被換入或者換出。例如,所有的跟蹤器都實現了相同的接口,因此可以創建一個跟蹤器用來將跟蹤數據發送到本地文件、保存在內存中並暴露一個讀取端點,或者將它寫出到網絡。

在Finagle棧的底部是Transport層。這個類表示了一個能夠被異步讀取和寫入的對象流。Transport被實現為Netty的ChannelHandler,並被插入到了ChannelPipeline的尾端。來自網絡的消息在被Netty接收之後,將經由ChannelPipeline,在那裡它們將被編解碼器解釋,並隨後被發送到Finagle的Transport層。從那裡Finagle將會從Transport層讀取消息,並且通過它自己的棧發送消息。

對於客戶端的連接,Finagle維護了一個可以在其中進行負載均衡的傳輸(Transport)池。根據所提供的連接池的語義,Finagle將從Netty請求一個新連接或者復用一個現有的連接。當請求新連接時,將會根據客戶端的編解碼器創建一個Netty的ChannelPipeline。額外的用於統計、日誌記錄以及SSL的ChannelHandler將會被添加到該ChannelPipeline中。該連接隨後將會被遞給一個Finagle能夠寫入和讀取的ChannelTransport[8]

在服務器端,創建了一個Netty服務器,然後向其提供一個管理編解碼器、統計、超時以及日誌記錄的ChannelPipelineFactory。位於服務器ChannelPipeline尾端的ChannelHandler是一個Finagle的橋接器。該橋接器將監控新的傳入連接,並為每一個傳入連接創建一個新的Transport。該Transport將在新的Channel被遞交給某個服務器的實現之前對其進行包裝。隨後從ChannelPipeline讀取消息,並將其發送到已實現的服務器實例。

圖15-6展示了Finagle的客戶端和服務器之間的關係。

圖15-6 Netty的使用

Netty/Finagle橋接器

代碼清單15-1展示了一個使用默認選項的靜態的ChannelFactory

代碼清單15-1 設置ChannelFactory

object Netty3Transporter {
  val channelFactory: ChannelFactory =
    new NioClientSocketChannelFactory(    ← --  創建一個ChannelFactory的實例
      Executor, 1 /*# boss threads*/, WorkerPool, DefaultTimer
    ){
      // no-op; unreleasable
      override def releaseExternalResources = 
    }
  val defaultChannelOptions: Map[String, Object] = Map(   ← -- 設置用於新Channel的選項
    "tcpNoDelay" -> java.lang.Boolean.TRUE,
    "reuseAddress" -> java.lang.Boolean.TRUE
  )
}  

這個ChannelFactory橋接了Netty的Channel和Finagle的Transport(為簡潔起見,這裡移除了統計代碼)。當通過apply方法被調用時,這將創建一個新的Channel以及Transport。當該Channel已經連接或者連接失敗時,將會返回一個被完整填充的Future

代碼清單15-2展示了將Channel連接到遠程主機的ChannelConnector

代碼清單15-2 連接到遠程服務器

private[netty3] class ChannelConnector[In, Out](
  newChannel:  => Channel,
  newTransport: Channel => Transport[In, Out]
) extends (SocketAddress => Future[Transport[In, Out]]) {
  def apply(addr: SocketAddress): Future[Transport[In, Out]] = {
    require(addr != null)   ← --  如果Channel 創建失敗,那麼異常將會被包裝在Future 中返回
    val ch = try newChannel catch { 
      case NonFatal(exc) => return Future.exception(exc)
    }
    // Transport is now bound to the channel; this is done prior to
    // it being connected so we don't lose any messages.
    val transport = newTransport(ch)  ← -- 使用Channel創建一個新的Transport
    val connectFuture = ch.connect(addr)   ← -- 創建一個新的Promise,以便在連接嘗試完成時及時收到通知  
    val promise = new Promise[Transport[In, Out]]  
    promise setInterruptHandler { case _cause =>
      // Propagate cancellations onto the netty future.
      connectFuture.cancel
    }
    connectFuture.addListener(new ChannelFutureListener {
      def operationComplete(f: ChannelFuture) {    ← -- 通過完全填充已經創建的Promise 來處理connectFuture的完成狀態
        if (f.isSuccess) {
          promise.setValue(transport)
        } else if (f.isCancelled) {
          promise.setException(
          WriteException(new CancelledConnectionException))
        } else {
          promise.setException(WriteException(f.getCause))
        }
      }
    })
    promise onFailure { _ => Channels.close(ch)
    }
  }
}  

這個工廠提供了一個ChannelPipelineFactory,它是一個ChannelTransport的工廠。該工廠是通過apply方法調用的。一旦被調用,就會創建一個新的ChannelPipelinenewPipeline)。ChannelFactory將會使用這個ChannelPipeline來創建新的Channel,隨後使用所提供的選項(newConfiguredChannel)對它進行配置。配置好的Channel將會被作為一個匿名的工廠傳遞給一個ChannelConnector。該連接器將會被調用,並返回一個Future[Transport]

代碼清單15-3展示了細節[9]。

代碼清單15-3 基於Netty 3的傳輸

case class Netty3Transporter[In, Out](
  pipelineFactory: ChannelPipelineFactory,
  newChannel: ChannelPipeline => Channel =
    Netty3Transporter.channelFactory.newChannel(_),
  newTransport: Channel => Transport[In, Out] =
    new ChannelTransport[In, Out](_),
  // various timeout/ssl options
) extends (
  (SocketAddress, StatsReceiver) => Future[Transport[In, Out]]
){
  private def newPipeline(
    addr: SocketAddress,
    statsReceiver: StatsReceiver
  )={
    val pipeline = pipelineFactory.getPipeline
    // add stats, timeouts, and ssl handlers
    pipeline    ← --  創建一個ChannelPipeline,並添加所需的ChannelHandler
  }
  private def newConfiguredChannel(
    addr: SocketAddress,
    statsReceiver: StatsReceiver
  )={
    val ch = newChannel(newPipeline(addr, statsReceiver))
    ch.getConfig.setOptions(channelOptions.asJava)
    ch
  }
  def apply(
    addr: SocketAddress,
    statsReceiver: StatsReceiver
  ): Future[Transport[In, Out]] = {
    val conn = new ChannelConnector[In, Out](
       => newConfiguredChannel(addr, statsReceiver),
      newTransport, statsReceiver)   ← --  創建一個內部使用的ChannelConnector
    conn(addr)
  }
}  

Finagle服務器使用Listener將自身綁定到給定的地址。在這個示例中,監聽器提供了一個ChannelPipelineFactory、一個ChannelFactory以及各種選項(為了簡潔起見,這裡沒包括)。我們使用一個要綁定的地址以及一個用於通信的Transport調用了Listener。接著,創建並配置了一個Netty的ServerBootstrap。然後,創建了一個匿名的ServerBridge工廠,遞給ChannelPipelineFactory,其將被遞交給該引導服務器。最後,該服務器將會被綁定到給定的地址。

現在,讓我們來看看基於Netty的Listener實現,如代碼清單15-4所示。

代碼清單15-4 基於Netty的Listener實現

case class Netty3Listener[In, Out](
  pipelineFactory: ChannelPipelineFactory,
  channelFactory: ServerChannelFactory
  bootstrapOptions: Map[String, Object], ... // stats/timeouts/ssl config
) extends Listener[In, Out] {
  def newServerPipelineFactory(
    statsReceiver: StatsReceiver, newBridge:  => ChannelHandler
  ) = new ChannelPipelineFactory {    ← --  創建一個ChannelPipelineFactory
    def getPipeline = {
      val pipeline = pipelineFactory.getPipeline
      ... // add stats/timeouts/ssl
      pipeline.addLast("finagleBridge", newBridge)   ← --  將該橋接器添 加到ChannelPipeline 中
      pipeline
    }
  }
  def listen(addr: SocketAddress)(
    serveTransport: Transport[In, Out] => Unit
  ): ListeningServer =
    new ListeningServer with CloseAwaitably {
      val newBridge =  => new ServerBridge(serveTransport, ...)
      val bootstrap = new ServerBootstrap(channelFactory)
      bootstrap.setOptions(bootstrapOptions.asJava)
      bootstrap.setPipelineFactory(
        newServerPipelineFactory(scopedStatsReceiver, newBridge))
      val ch = bootstrap.bind(addr)
    }
}  }  

當一個新的Channel打開時,該橋接器將會創建一個新的ChannelTransport並將其遞回給Finagle服務器。代碼清單15-5展示了所需的代碼[10]。

代碼清單15-5 橋接Netty和Finagle

class ServerBridge[In, Out](
  serveTransport: Transport[In, Out] => Unit,
) extends SimpleChannelHandler {
  override def channelOpen(
    ctx: ChannelHandlerContext,
    e: ChannelStateEvent
  ){
    val channel = e.getChannel
    val transport = new ChannelTransport[In, Out](channel)    ← --  創建一個ChannelTransport,以便在一個新Channel 被打開時橋接到Finagle
    serveTransport(transport)
    super.channelOpen(ctx, e)
  }

  override def exceptionCaught(
    ctx: ChannelHandlerContext,
    e: ExceptionEvent
  ) { // log exception and close channel }
}  

15.2.4 Finagle的抽像

Finagle的核心概念是一個從RequestFuture[Response]的[11]的簡單函數(函數式編程語言是這裡的關鍵)。

type Service[Req, Rep] = Req => Future[Rep][12]  

這種簡單性釋放了非常強大的組合性。Service是一種對稱的API,同時代表了客戶端以及服務器。服務器實現了該服務的接口。該服務器可以被具體地用於測試,或者Finagle也可以將它暴露到網絡接口上。客戶端將被提供一個服務實現,其要麼是虛擬的,要麼是某個遠程服務器的具體表示。

例如,我們可以通過實現一個服務來創建一個簡單的HTTP服務器,該服務接受HttpReq作為參數,返回一個代表最終響應的Future[HttpRep]

val s: Service[HttpReq, HttpRep] = new Service[HttpReq, HttpRep] {
  def apply(req: HttpReq): Future[HttpRep] =
    Future.value(HttpRep(Status.OK, req.body))
}
Http.serve(":80", s)  

隨後,客戶端將被提供一個該服務的對稱表示。

val client: Service[HttpReq, HttpRep] = Http.newService("twitter.com:80")
val f: Future[HttpRep] = client(HttpReq("/"))
f map { rep => processResponse(rep) }  

這個例子將把該服務器暴露到所有網絡接口的80端口上,並從twitter.com的80端口消費。

我們也可以選擇不暴露該服務器,而是直接使用它。

server(HttpReq("/")) map { rep => processResponse(rep) }  

在這裡,客戶端代碼有相同的行為,只是不需要網絡連接。這使得測試客戶端和服務器非常簡單直接。

客戶端以及服務器都提供了特定於應用程序的功能。但是,也有對和應用程序無關的功能的需求。這樣的例子如超時、身份驗證以及統計等。Filter為實現應用程序無關的功能提供了抽像。

過濾器接收一個請求和一個將被它組合的服務:

type Filter[Req, Rep] = (Req, Service[Req, Rep]) => Future[Rep]  

多個過濾器可以在被應用到某個服務之前鏈接在一起:

recordHandletime andThen
traceRequest andThen
collectJvmStats andThen
myService  

這允許了清晰的邏輯抽像以及良好的關注點分離。在內部,Finagle大量地使用了過濾器,其有助於提高模塊化以及可復用性。它們已經被證明,在測試中很有價值,因為它們通過很小的模擬便可以被獨立地單元測試。

過濾器可以同時修改請求和響應的數據以及類型。圖15-7展示了一個請求,它在通過一個過濾器鏈之後到達了某個服務並返回。

圖15-7 請求/響應流

我們可以使用類型修改來實現身份驗證。

val auth: Filter[HttpReq, AuthHttpReq, HttpRes, HttpRes] =
  { (req, svc) => authReq(req) flatMap { authReq => svc(authReq) } }

val authedService: Service[AuthHttpReq, HttpRes] = ...
val service: Service[HttpReq, HttpRes] =
  auth andThen authedService  

這裡我們有一個需要AuthHttpReq 的服務。為了滿足這個需求,創建了一個能接收HttpReq並對它進行身份驗證的過濾器。隨後,該過濾器將和該服務進行組合,產生一個新的可以接受HttpReq並產生HttpRes的服務。這使得我們可以從該服務隔離,單獨地測試身份驗證過濾器。

15.2.5 故障管理

我們假設故障總是會發生;硬件會失效、網絡會變得擁塞、網絡鏈接會斷開。對於庫來說,如果它們正在上面運行的或者正在與之通信的系統發生故障,那麼庫所擁有的極高的吞吐量以及極低的延遲都將毫無意義。為此,Finagle是建立在有原則地管理故障的基礎之上的。為了能夠更好地管理故障,它犧牲了一些吞吐量以及延遲。

Finagle可以通過隱式地使用延遲作為啟髮式(算法的因子)來均衡跨集群主機的負載。Finagle客戶端將在本地通過統計派發到單個主機的還未完成的請求數來追蹤它所知道的每個主機上的負載。有了這些信息,Finagle會將新的請求(隱式地)派發給具有最低負載、最低延遲的主機。

失敗的請求將導致Finagle關閉到故障主機的連接,並將它從負載均衡器中移除。在後台,Finagle將不斷地嘗試重新連接。只有在Finagle能夠重新建立一個連接時,該主機才會被重新加入到負載均衡器中。然後,服務的所有者可以自由地關閉各個主機,而不會對下游的客戶端造成負面的影響。

15.2.6 組合服務

Finagle的服務即函數(service-as-a-function)的觀點允許編寫簡單但富有表現力的代碼。例如,一個用戶發出的對於他們的主頁時間線的請求涉及了大量的服務,其中的核心是身份驗證服務、時間線服務以及推特服務。這些關係可以被簡潔地表達。

代碼清單15-6 通過Finagle組合服務

val timelineSvc = Thrift.newIface[TimelineService](...)   ← --  為每個服務創建一個客戶端
val tweetSvc = Thrift.newIface[TweetService](...)
val authSvc = Thrift.newIface[AuthService](...)

val authFilter = Filter.mk[Req, AuthReq, Res, Res] { (req, svc) =>  ← --  創建一個新的過濾器,對傳入的請求進行身份驗證
  authSvc.authenticate(req) flatMap svc(_)
}

val apiService = Service.mk[AuthReq, Res] { req =>   ← -- 創建一個服務,將已通過身份驗證的時間線請求轉換為一個JSON 響應  
  timelineSvc(req.userId) flatMap {tl =>
    val tweets = tl map tweetSvc.getById(_)
    Future.collect(tweets) map tweetsToJson(_)
  }
}

Http.serve(":80", authFilter andThen apiService)   ← -- 使用該身份驗證過濾器以及我們的服務在80 端口上啟動一個新的HTTP 服務  

在這裡,我們為時間線服務、推特服務以及身份驗證服務都創建了客戶端。並且,為了對原始的請求進行身份驗證,創建了一個過濾器。最後,我們實現的服務,結合了身份驗證過濾器,暴露在80端口上。

當收到請求時,身份驗證過濾器將嘗試對它進行身份驗證。錯誤都會被立即返回,不會影響核心業務。身份驗證成功之後,AuthReq將會被發送到API服務。該服務將會使用附加的userId通過時間線服務來查找該用戶的時間線。然後,返回一組推特ID,並在稍後遍歷。每個ID都會被用來請求與之相關聯的推特。最後,這組推特請求會被收集起來,轉換為一個JSON格式的響應。

正如你所看到的,我們定義了數據流,並且將並發的問題留給了Finagle。我們不必管理線程池,也不必擔心競態條件。這段代碼既清晰又安全。

15.2.7 未來:Netty

為了改善Netty的各個部分,讓Finagle以及更加廣泛的社區都能夠從中受益,我們一直在與Netty的維護者密切合作[13]。最近,Finagle的內部結構已經升級為更加模塊化的結構,為升級到Netty 4鋪平了道路。

15.2.8 Twitter小結

Finagle已經取得了輝煌的成績。我們已經想方設法大幅度地提高了我們所能夠處理的流量,同時也降低了延遲以及硬件需求。例如,在將我們的API端點從Ruby技術棧遷移到Finagle之後,我們看到,延遲從數百毫秒下降到了數十毫秒之內,同時還將所需要的機器數量從3位數減少到了個位數。我們新的技術棧已經使得我們達到了新的吞吐量記錄。在撰寫本文時,我們所記錄的每秒的推特數是143 199[14]。這一數字對於我們的舊架構來說簡直是難以想像的。

Finagle的誕生是為了滿足Twitter橫向擴展以支持全球數以億計的用戶的需求,而在當時支撐數以百萬計的用戶並保證服務在線已然是一項艱巨的任務了。使用Netty作為基礎,我們能夠快速地設計和建造Finagle,以解決我們的伸縮性難題。Finagle和Netty處理了Twitter所遇到的每一個請求。

15.3 小結

本章深入瞭解了對於像Facebook以及Twitter這樣的大公司是如何使用Netty來保證最高水準的性能以及靈活性的。

  • Facebook的Nifty項目展示了,如何通過提供自定義的協議編碼器以及解碼器,利用Netty來替換現有的Thrift實現。
  • Twitter的Finagle展示了,如何基於Netty來構建你自己的高性能框架,並通過類似於負載均衡以及故障轉移這樣的特性來增強它的。

我們希望這裡所提供的案例研究,能夠成為你打造下一代傑作的時候的信息和靈感的來源。


[1] 本節所表達的觀點都是本節作者的觀點,並不一定反映了該作者的僱主的觀點。

[2] 一份來自原始的Thrift的開發者的不舊不新的白皮書可以在http://thrift.apache.org/static/files/thrift-20070401. pdf找到。

[3] 可以在http://thrift.apache.org找到更多的例子。

[4] Folly是Facebook的開源C++公共庫:https://www.facebook.com/notes/facebook-engineering/folly-the-facebook- open-source-library/10150864656793920。

[5] 有關HashedWheelTimer類的更多的信息,參見http://netty.io/4.1/api/io/netty/util/HashedWheel Timer.html。

[6] 關於SPDY的更多信息參見https://github.com/twitter/finagle/tree/master/finagle-spdy。關於PostgreSQL參見https://github.com/mairbek/finagle-postgres。關於WebSockets參見https://github.com/sprsquish/finagle- websocket。關於IRC參見https://github.com/sprsquish/finagle-irc。關於AWS參見https://github.com/sclasen/ finagle-aws。

[7] 有關Dapper的信息可以在http://research.google.com/pubs/pub36356.html找到。該分佈式的跟蹤框架是Zipkin,可以在https://github.com/twitter/zipkin找到。

[8] 相關的類可以在https://github.com/twitter/finagle/blob/develop/finagle-netty4/src/main/scala/com/twitter/finagle/ netty4/transport/ChannelTransport.scala找到。——譯者注

[9] Finagle的源代碼位於https://github.com/twitter/finagle。

[10] 完整的源代碼在https://github.com/twitter/finagle。

[11] 這裡的Future[Response]相當於Java 8 中的CompletionStage。——譯者注

[12] 雖然不完全等價,但是可以理解為Java 8的public interface Service extends Function>{}。——譯者注

[13] 「Netty 4 at Twitter: Reduced GC Overhead」:https://blog.twitter.com/2013/netty-4-at-twitter-reduced-gc-overhead。

[14] 「New Tweets per second record, and how!」:https://blog.twitter.com/2013/new-tweets-per-second-recordand-how。