讀古今文學網 > 機器學習實戰 > 15.5 在Python中使用mrjob來自動化MapReduce >

15.5 在Python中使用mrjob來自動化MapReduce

上面列舉的算法大多是迭代的。也就是說,它們不能用一次MapReduce作業來完成,而通常需要多步。在15.3節中,Amazon的EMR上運行MapReduce作業只是一個簡例。如果想在大數據集上運行AdaBoost算法該怎麼辦呢?如果想運行10個MapReduce作業呢?

有一些框架可以將MapReduce作業流自動化,例如Cascading和Oozie,但它們不支持在Amazon的EMR上執行。Pig可以在EMR上執行,也可以使用Python腳本,但需要額外學習一種腳本語言。(Pig是一個Apache項目,為文本處理提供高級編程語言,可以將文本處理命令轉換成Hadoop的MapReduce作業。)還有一些工具可以在Python中運行MapReduce作業,如本書將要介紹的mrjob。

mrjob1 (http://packages.python.org/mrjob/)之前是Yelp(一個餐廳點評網站)的內部框架,它在2010年底實現了開源。讀者可以參考附錄A來學習如何安裝和使用。本書將介紹如何使用mrjob重寫之前的全局均值和方差計算的代碼,相信讀者能體會到mrjob的方便快捷。(需要指出的是,mrjob是一個很好的學習工具,但仍然使用Python語言編寫,如果想獲得更好的性能,應該使用Java。)

1.mrjob文檔: http://packages.python.org/mrjob/index.html; 源代碼: https://github.com/Yelp/mrjob.

15.5.1 mrjob與EMR的無縫集成

與15.3節介紹的一樣,本節將使用mrjob在EMR上運行Hadoop流,區別在於mrjob不需要上傳數據到S3,也不需要擔心命令輸入是否正確,所有這些都由mrjob後台完成。有了mrjob,讀者還可以在自己的Hadoop集群上運行MapReduce作業,當然也可以在單機上進行測試。作業在單機執行和在EMR執行之間的切換十分方便。例如,將一個作業在單機執行,可以輸入以下命令:

 % python mrMean.py < inputFile.txt > myOut.txt  
  

如果要在EMR上運行同樣的任務,可以執行以下命令:

 % python mrMean.py -r emr < inputFile.txt > myOut.txt  
  

在15.3節中,所有的上傳以及表單填寫全由mrjob自動完成。讀者還可以添加一條在本地的Hadoop集群上執行作業的命令1,也可以添加一些命令行參數來指定本作業在EMR上的服務器類型和數目。

1. 意指除了上面兩條命令之外,再加一條。——譯者注

另外,15.3節中的mapper和reducer分別存於兩個不同的文件中,而mrjob中的mapper和reducer可以寫在同一個腳本中。下節將展示該腳本的內容,並分析其工作原理。

15.5.2 mrjob的一個MapReduce腳本剖析

用mrjob可以做很多事情,本書仍從最典型的MapReduce作業開始介紹。為了方便闡述,繼續沿用前面的例子,計算數據集的均值和方差。這樣讀者可以更專注於框架的實現細節,所以程序清單15-3的代碼與程序清單15-1和15-2的功能一致。打開文本編輯器,創建一個新文件mrMean.py,並加入下面程序清單的代碼。

程序清單15-3 分佈式均值方差計算的mrjob實現

from mrjob.job import MRJob

class MRmean(MRJob):
    def __init__(self, *args, **kwargs):
        super(MRmean, self).__init__(*args, **kwargs)
        self.inCount = 0
        self.inSum = 0
        self.inSqSum = 0

# 接收輸入數據流
def map(self, key, val):
    if False: yield
    inVal = float(val)
    self.inCount += 1
    self.inSum += inVal
    self.inSqSum += inVal*inVal

 #所有輸入到達後開始處理
def map_final(self):
    mn = self.inSum/self.inCount
    mnSq = self.inSqSum/self.inCount
    yield (1, [self.inCount, mn, mnSq])

def reduce(self, key, packedValues):
    cumVal=0.0; cumSumSq=0.0; cumN=0.0
    for valArr in packedValues:
        nj = float(valArr[0])
        cumN += nj
        cumVal += nj*float(valArr[1])
        cumSumSq += nj*float(valArr[2])
    mean = cumVal/cumN
    var = (cumSumSq - 2*mean*cumVal + cumN*mean*mean)/cumN
    yield (mean, var)

def steps(self):
    return ([self.mr(mapper=self.map, reducer=self.reduce,mapper_final=self.map_final)])
if __name__ == \'__main__\':
    MRmean.run
  

該代碼分佈式地計算了均值和方差。輸入文本分發給很多mappers來計算中間值,這些中間值再通過reducer進行累加,從而計算出全局的均值和方差。

為了使用mrjob庫,需要創建一個新的MRjob繼承類,在本例中該類的類名為MRmean。代碼中的mapper和reducer都是該類的方法,此外還有一個叫做steps的方法定義了執行的步驟。執行順序不必完全遵從於map-reduce的模式,也可以是map-reduce-reduce-reduce,或者map-reduce-map-reduce-map-reduce(下節會給出相關例子)。在steps方法裡,需要為mrjob指定mapper和reducer的名稱。如果未給出,它將默認調用mapperreducer方法。

首先來看一下mapper的行為:它類似於for循環,在每行輸入上執行同樣的步驟。如果想在收到所有的輸入之後進行某些處理,可以考慮放在mapper_final中實現。這乍看起來有些古怪,但非常實用。另外在mappermapper_final中還可以共享狀態。所以在上述例子中,首先在mapper中對輸入值進行積累,所有值收集完畢後計算出均值和平方均值,最後把這些值作為中間值通過yield語句傳出去。1

1. 在一個標準的map-reduce流程中,作業的輸入即mapper的輸入,mapper的輸出也稱為中間值,中間值經過排序、組合等操作會轉為reducer的輸入,而reducer的輸出即為作業的輸出。——譯者注

中間值以key/value對的形式傳遞。如果想傳出去多個中間值,一個好的辦法是將它們打包成一個列表。這些值在map階段之後會按照key來排序。Hadoop提供了更改排序方法的選項,但默認的排序方法足以應付大多數的常見應用。擁有相同key的中間值將發送給同一個reducer。因此你需要考慮key的設計,使得在sort階段後相似的值能夠收集在一起。這裡所有的mapper都使用「1」作為key,因為我希望所有的中間值都在同一個reducer裡加和起來。2

2.只要所有mapper都使用相同的key就可以。當然,不必是「1」,也可以是其他值。——譯者注

mrjob裡的reducer與mapper有一些不同之處,reducer的輸入存放在迭代器對像裡。為了能讀取所有的輸入,需要使用類似for循環的迭代器。mappermapper_finalreducer之間不能共享狀態,因為Python腳本在map和reduce階段中間沒有保持活動。如果需要在mapperreducer之間進行任何通信,那麼只能通過key/value對。在reducer的最後有一條輸出語句,該語句沒有key,因為輸出的key值已經固定。如果該reducer之後不是輸出而是執行另一個mapper,那麼key仍需要賦值。

無須多言,下面看一下實際效果,先運行一下mapper,在Linux/DOS的命令行輸入下面的命令(注意不是在Python提示符下)。其中的文件inputFile.txt在第15章的代碼裡。

%python mrMean.py --mapper < inputFile.txt
  

運行該命令後,將得到如下輸出:

1 [100, 0.50956970000000001, 0.34443931307935999]
  

要運行整個程序,移除--mapper選項。

%python mrMean.py < inputFile.txt  
  

你將在屏幕上看到很多中間步驟的描述文字,最終的輸出如下:

                             .
                             .
                             .
streaming final output from c:userspeterappdatalocal
tempmrMean.Peter.20110228.172656.279000outputpart-00000
0.50956970000000001 0.34443931307935999
removing tmp directory c:userspeterappdatalocal
tempmrMean.Peter.20110228.172656.279000
To stream the valid output into a file, enter the following command:
%python mrMean.py < inputFile.txt > outFile.txt   
  

最後,要在Amazon的EMR上運行本程序,輸入如下命令(確保你已經設定了環境變量AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY,這些變量的設定見附錄A)。

完成了mrjob的使用練習,下面將用它來解決一些機器學習問題。上文提到,一些迭代算法僅使用EMR難以完成,因此下一節將介紹如何用mrjob完成這項任務。