上面列舉的算法大多是迭代的。也就是說,它們不能用一次MapReduce作業來完成,而通常需要多步。在15.3節中,Amazon的EMR上運行MapReduce作業只是一個簡例。如果想在大數據集上運行AdaBoost算法該怎麼辦呢?如果想運行10個MapReduce作業呢?
有一些框架可以將MapReduce作業流自動化,例如Cascading和Oozie,但它們不支持在Amazon的EMR上執行。Pig可以在EMR上執行,也可以使用Python腳本,但需要額外學習一種腳本語言。(Pig是一個Apache項目,為文本處理提供高級編程語言,可以將文本處理命令轉換成Hadoop的MapReduce作業。)還有一些工具可以在Python中運行MapReduce作業,如本書將要介紹的mrjob。
mrjob1 (http://packages.python.org/mrjob/)之前是Yelp(一個餐廳點評網站)的內部框架,它在2010年底實現了開源。讀者可以參考附錄A來學習如何安裝和使用。本書將介紹如何使用mrjob重寫之前的全局均值和方差計算的代碼,相信讀者能體會到mrjob的方便快捷。(需要指出的是,mrjob是一個很好的學習工具,但仍然使用Python語言編寫,如果想獲得更好的性能,應該使用Java。)
1.mrjob文檔: http://packages.python.org/mrjob/index.html; 源代碼: https://github.com/Yelp/mrjob.
15.5.1 mrjob與EMR的無縫集成
與15.3節介紹的一樣,本節將使用mrjob在EMR上運行Hadoop流,區別在於mrjob不需要上傳數據到S3,也不需要擔心命令輸入是否正確,所有這些都由mrjob後台完成。有了mrjob,讀者還可以在自己的Hadoop集群上運行MapReduce作業,當然也可以在單機上進行測試。作業在單機執行和在EMR執行之間的切換十分方便。例如,將一個作業在單機執行,可以輸入以下命令:
% python mrMean.py < inputFile.txt > myOut.txt
如果要在EMR上運行同樣的任務,可以執行以下命令:
% python mrMean.py -r emr < inputFile.txt > myOut.txt
在15.3節中,所有的上傳以及表單填寫全由mrjob自動完成。讀者還可以添加一條在本地的Hadoop集群上執行作業的命令1,也可以添加一些命令行參數來指定本作業在EMR上的服務器類型和數目。
1. 意指除了上面兩條命令之外,再加一條。——譯者注
另外,15.3節中的mapper和reducer分別存於兩個不同的文件中,而mrjob中的mapper和reducer可以寫在同一個腳本中。下節將展示該腳本的內容,並分析其工作原理。
15.5.2 mrjob的一個MapReduce腳本剖析
用mrjob可以做很多事情,本書仍從最典型的MapReduce作業開始介紹。為了方便闡述,繼續沿用前面的例子,計算數據集的均值和方差。這樣讀者可以更專注於框架的實現細節,所以程序清單15-3的代碼與程序清單15-1和15-2的功能一致。打開文本編輯器,創建一個新文件mrMean.py,並加入下面程序清單的代碼。
程序清單15-3 分佈式均值方差計算的mrjob實現
from mrjob.job import MRJob
class MRmean(MRJob):
def __init__(self, *args, **kwargs):
super(MRmean, self).__init__(*args, **kwargs)
self.inCount = 0
self.inSum = 0
self.inSqSum = 0
# 接收輸入數據流
def map(self, key, val):
if False: yield
inVal = float(val)
self.inCount += 1
self.inSum += inVal
self.inSqSum += inVal*inVal
#所有輸入到達後開始處理
def map_final(self):
mn = self.inSum/self.inCount
mnSq = self.inSqSum/self.inCount
yield (1, [self.inCount, mn, mnSq])
def reduce(self, key, packedValues):
cumVal=0.0; cumSumSq=0.0; cumN=0.0
for valArr in packedValues:
nj = float(valArr[0])
cumN += nj
cumVal += nj*float(valArr[1])
cumSumSq += nj*float(valArr[2])
mean = cumVal/cumN
var = (cumSumSq - 2*mean*cumVal + cumN*mean*mean)/cumN
yield (mean, var)
def steps(self):
return ([self.mr(mapper=self.map, reducer=self.reduce,mapper_final=self.map_final)])
if __name__ == \'__main__\':
MRmean.run
該代碼分佈式地計算了均值和方差。輸入文本分發給很多mappers來計算中間值,這些中間值再通過reducer進行累加,從而計算出全局的均值和方差。
為了使用mrjob庫,需要創建一個新的MRjob
繼承類,在本例中該類的類名為MRmean
。代碼中的mapper和reducer都是該類的方法,此外還有一個叫做steps
的方法定義了執行的步驟。執行順序不必完全遵從於map-reduce的模式,也可以是map-reduce-reduce-reduce,或者map-reduce-map-reduce-map-reduce(下節會給出相關例子)。在steps
方法裡,需要為mrjob指定mapper和reducer的名稱。如果未給出,它將默認調用mapper
和reducer
方法。
首先來看一下mapper
的行為:它類似於for
循環,在每行輸入上執行同樣的步驟。如果想在收到所有的輸入之後進行某些處理,可以考慮放在mapper_final
中實現。這乍看起來有些古怪,但非常實用。另外在mapper
和mapper_final
中還可以共享狀態。所以在上述例子中,首先在mapper
中對輸入值進行積累,所有值收集完畢後計算出均值和平方均值,最後把這些值作為中間值通過yield
語句傳出去。1
1. 在一個標準的map-reduce流程中,作業的輸入即mapper
的輸入,mapper
的輸出也稱為中間值,中間值經過排序、組合等操作會轉為reducer
的輸入,而reducer
的輸出即為作業的輸出。——譯者注
中間值以key/value對的形式傳遞。如果想傳出去多個中間值,一個好的辦法是將它們打包成一個列表。這些值在map階段之後會按照key來排序。Hadoop提供了更改排序方法的選項,但默認的排序方法足以應付大多數的常見應用。擁有相同key的中間值將發送給同一個reducer。因此你需要考慮key的設計,使得在sort階段後相似的值能夠收集在一起。這裡所有的mapper都使用「1」作為key,因為我希望所有的中間值都在同一個reducer裡加和起來。2
2.只要所有mapper都使用相同的key就可以。當然,不必是「1」,也可以是其他值。——譯者注
mrjob裡的reducer與mapper有一些不同之處,reducer
的輸入存放在迭代器對像裡。為了能讀取所有的輸入,需要使用類似for
循環的迭代器。mapper
或mapper_final
和reducer
之間不能共享狀態,因為Python腳本在map和reduce階段中間沒有保持活動。如果需要在mapper
和reducer
之間進行任何通信,那麼只能通過key/value對。在reducer
的最後有一條輸出語句,該語句沒有key,因為輸出的key值已經固定。如果該reducer
之後不是輸出而是執行另一個mapper
,那麼key仍需要賦值。
無須多言,下面看一下實際效果,先運行一下mapper
,在Linux/DOS的命令行輸入下面的命令(注意不是在Python提示符下)。其中的文件inputFile.txt在第15章的代碼裡。
%python mrMean.py --mapper < inputFile.txt
運行該命令後,將得到如下輸出:
1 [100, 0.50956970000000001, 0.34443931307935999]
要運行整個程序,移除--mapper
選項。
%python mrMean.py < inputFile.txt
你將在屏幕上看到很多中間步驟的描述文字,最終的輸出如下:
.
.
.
streaming final output from c:userspeterappdatalocal
tempmrMean.Peter.20110228.172656.279000outputpart-00000
0.50956970000000001 0.34443931307935999
removing tmp directory c:userspeterappdatalocal
tempmrMean.Peter.20110228.172656.279000
To stream the valid output into a file, enter the following command:
%python mrMean.py < inputFile.txt > outFile.txt
最後,要在Amazon的EMR上運行本程序,輸入如下命令(確保你已經設定了環境變量AWS_ACCESS_KEY_ID
和AWS_SECRET_ACCESS_KEY
,這些變量的設定見附錄A)。
完成了mrjob的使用練習,下面將用它來解決一些機器學習問題。上文提到,一些迭代算法僅使用EMR難以完成,因此下一節將介紹如何用mrjob完成這項任務。