讀古今文學網 > 機器學習實戰 > 15.2 Hadoop流 >

15.2 Hadoop流

Hadoop是一個開源的Java項目,為運行MapReduce作業提供了大量所需的功能。除了分佈式 計算之外,Hadoop自帶分佈式文件系統。

本書既不是Java,也不是Hadoop的教材,因此本節只對Hadoop做簡單介紹,只要能滿足在Python中用Hadoop來執行MapReduce作業的需求即可。如果讀者想對Hadoop做深入理解,可以閱讀《Hadoop實戰》1或者瀏覽Hadoop官方網站上的文檔(http://hadoop.apache.org/)。 此外,Mahout in action2一書也為在MapReduce下實現機器學習算法提供了很好的參考資料。

1. Chuck Lam, Hadoop in Action (Manning Publications, 2010),中文版由人民郵電出版社出版。

2.Sean Owen, Robin Anil, Ted Dunning, and Ellen Friedman, Mahout in Action (Manning Publications, 2011),中文版即將由人民郵電出版社出版。

Hadoop可以運行Java之外的其他語言編寫的分佈式程序。因為本書以Python為主,所以下面將使用Python編寫MapReduce代碼,並在Hadoop流中運行。Hadoop流(http://hadoop.apache.org/common/docs/current/streaming.html)很像Linux系統中的管道(管道使用符號|,可以將一個命令的輸出作為另一個命令的輸入)。如果用mapper.py調用mapper,用reducer.py調用reducer,那麼Hadoop流就可以像Linux命令一樣執行,例如:

cat inputFile.txt | python mapper.py | sort | python reducer.py > outputFile.txt  
  

這樣,類似的Hadoop流就可以在多台機器上分佈式執行,用戶可以通過Linux命令來測試Python語言編寫的MapReduce腳本。

15.2.1 分佈式計算均值和方差的mapper

接下來我們將構建一個海量數據上分佈式計算均值和方差的MapReduce作業。示範起見,這裡只選取了一個小數據集。在文本編輯器中創建文件mrMeanMapper.py,並加入如下程序清單中的代碼。

程序清單15-1 分佈式均值和方差計算的mapper

import sys
from numpy import mat, mean, power

def read_input(file):
    for line in file:
        yield line.rstrip
    input = read_input(sys.stdin)
    input = [float(line) for line in input]
    numInputs = len(input)
    input = mat(input)
    sqInput = power(input,2)

    print \"%dt%ft%f\" % (numInputs, mean(input), mean(sqInput))
    print >> sys.stderr, \"report: still alive\"
  

這是一個很簡單的例子:該mapper首先按行讀取所有的輸入並創建一組對應的浮點數,然後得到數組的長度並創建NumPy矩陣。再對所有的值進行平方,最後將均值和平方後的均值發送出去。這些值將用於計算全局的均值和方差。

注意:一個好的習慣是向標準錯誤輸出發送報告。如果某作業10分鐘內沒有報告輸出,則將被Hadoop中止。

下面看看程序清單15-1的運行效果。首先確認一下在下載的源碼中有一個文件inputFile.txt,其中包含了100個數。在正式使用Hadoop之前,先來測試一下mapper。在Linux終端執行以下命令:

cat inputFile.txt | python mrMeanMapper.py]  
  

如果在Windows系統下,可在DOS窗口輸入以下命令:

python mrMeanMapper.py < inputFile.txt
  

運行結果如下:

100 0.509570 0.344439
report: still alive 
  

其中第一行是標準輸出,也就是reducer的輸入;第二行是標準錯誤輸出,即對主節點做出的響應報告,表明本節點工作正常。

15.2.2 分佈式計算均值和方差的reducer

至此,mapper已經可以工作了,下面介紹reducer。根據前面的介紹,mapper接受原始的輸入並產生中間值傳遞給reducer。很多mapper是並行執行的,所以需要將這些mapper的輸出合併成一個值。接下來給出reducer的代碼:將中間的key/value對進行組合。打開文本編輯器,建立文件mrMeanReducer.py,然後輸入程序清單15-2的代碼。

程序清單15-2 分佈式均值和方差計算的reducer

import sys
from numpy import mat, mean, power

def read_input(file):
    for line in file:
        yield line.rstrip
input = read_input(sys.stdin)
mapperOut = [line.split(\'t\') for line in input]
cumVal=0.0
cumSumSq=0.0
cumN=0.0
for instance in mapperOut:
    nj = float(instance[0])
    cumN += nj
    cumVal += nj*float(instance[1])
    cumSumSq += nj*float(instance[2])
mean = cumVal/cumN
varSum = (cumSumSq - 2*mean*cumVal + cumN*mean*mean)/cumN
print \"%dt%ft%f\" % (cumN, mean, varSum)
print >> sys.stderr, \"report: still alive\"
  

程序清單15-2就是reducer的代碼,它接收程序清單15-1的輸出,並將它們合併成為全局的均值和方差,從而完成任務。

你可以在自己的單機上用下面的命令測試一下:

%cat inputFile.txt | python mrMeanMapper.py | python mrMeanReducer.py   
  

如果是DOS環境,鍵入如下命令:

%python mrMeanMapper.py < inputFile.txt | python mrMeanReducer.py   
  

後面的章節將介紹如何在多台機器上分佈式運行該代碼。你手邊或許沒有10台機器,沒有問題,下節就會介紹如何租用服務器。