讀古今文學網 > 機器學習實戰 > 15.6 示例:分佈式SVM的Pegasos算法 >

15.6 示例:分佈式SVM的Pegasos算法

第4章介紹過一個文本分類算法:樸素貝葉斯。該算法將文本文檔看做是詞彙空間裡的向量。第6章又介紹了效果很好的SVM分類算法,該算法將每個文檔看做是成千上萬個特徵組成的向量。

在機器學習領域,海量文檔上做文本分類面臨很大的挑戰。怎樣在如此大的數據上訓練分類器呢?如果能將算法分成並行的子任務,那麼MapReduce框架有望幫我們實現這一點。回憶第6章,SMO算法一次優化兩個支持向量,並在整個數據集上迭代,在需要注意的值上停止。該算法看上去並不容易並行化。

在MapReduce框架上使用SVM的一般方法

  1. 收集數據:數據按文本格式存放。
  2. 準備數據:輸入數據已經是可用的格式,所以不需任何準備工作。如果你需要解析一個大規模的數據集,建議使用map作業來完成,從而達到並行處理的目的。
  3. 分析數據:無。
  4. 訓練算法:與普通的SVM一樣,在分類器訓練上仍需花費大量的時間。
  5. 測試算法:在二維空間上可視化之後,觀察超平面,判斷算法是否有效。
  6. 使用算法:本例不會展示一個完整的應用,但會展示如何在大數據集上訓練SVM。該算法其中一個應用場景就是文本分類,通常在文本分類裡可能有大量的文檔和成千上萬的特徵。

SMO算法的一個替代品是Pegasos算法,後者可以很容易地寫成MapReduce的形式。本節將分析Pegasos算法,介紹如何寫出分佈式版本的Pegasos算法,最後在mrjob中運行該算法。

15.6.1 Pegasos算法

Pegasos是指原始估計梯度求解器(Primal Estimated sub-GrAdient Solver)。該算法使用某種形式的隨機梯度下降方法來解決SVM所定義的優化問題,研究表明該算法所需的迭代次數取決於用戶所期望的精確度而不是數據集的大小,有關細節可以參考原文1。原文有長文和短文兩個版本,推薦閱讀長文。

1. S. Shalev-Shwartz, Y. Singer, N. Srebro, 「Pegasos: Primal Estimated sub-GrAdient SOlver for SVM,」Proceed- ings of the 24th International Conference on Machine Learning 2007.

第6章提到,SVM算法的目的是找到一個分類超平面。在二維情況下也就是要找到一條直線,將兩類數據分隔開來。Pegasos算法工作流程是:從訓練集中隨機挑選一些樣本點添加到待處理列表中,之後按序判斷每個樣本點是否被正確分類;如果是則忽略,如果不是則將其加入到待更新集合。批處理完畢後,權重向量按照這些錯分的樣本進行更新。整個算法循環執行。

上述算法偽代碼如下:

將w初始化為0
對每次批處理
   隨機選擇k個樣本點(向量)
   對每個向量
      如果該向量被錯分:
         更新權重向量w
   累加對w的更新  
  

為瞭解實際效果,Python版本的實現見程序清單15-4。

程序清單15-4 SVM的Pegasos算法

def predict(w, x):
    return w*x.T

def batchPegasos(dataSet, labels, lam, T, k):
    m,n = shape(dataSet); w = zeros(n);
    dataIndex = range(m)
    for t in range(1, T+1):
        wDelta = mat(zeros(n))
        eta = 1.0/(lam*t)
        random.shuffle(dataIndex)
        for j in range(k):
            i = dataIndex[j]
            p = predict(w, dataSet[i,:])
            if labels[i]*p < 1:
                wDelta += labels[i]*dataSet[i,:].A
        w = (1.0 - 1/t)*w + (eta/k)*wDelta
    return w 
  

代碼註釋翻譯為: 1:將待更新值累加

程序清單15-4的代碼是Pegasos算法的串行版本。輸入值Tk分別設定了迭代次數和待處理列表的大小。在T次迭代過程中,每次需要重新計算eta。它是學習率,代表了權重調整幅度的大小。在外循環中,需要選擇另一批樣本進行下一次批處理;在內循環中執行批處理,將分類錯誤的值全部累加之後更新權重向量❶。

如果想試試它的效果,可以用第6章的數據來運行本例程序。本書不會對該代碼做過多分析,它只為Pegasos算法的MapReduce版本做一個鋪墊。下節將在mrjob中建立並運行一個MapReduce版本的Pegasos算法。

15.6.2 訓練算法:用mrjob實現MapReduce版本的SVM

本節將用MapReduce來實現程序清單15-4的Pegasos算法,之後再用15.5節討論的mrjob框架運行該算法。首先要明白如何將該算法劃分成map階段和reduce階段,確認哪些可以並行,哪些不能並行。

對程序清單15-4的代碼運行情況稍作觀察將會發現,大量的時間花費在內積計算上。另外,內積運算可以並行,但創建新的權重變量w是不能並行的。這就是將算法改寫為MapReduce作業的一個切入點。在編寫mapper和reducer的代碼之前,先完成一部分外圍代碼。打開文本編輯器,創建一個新文件mrSVM.py,然後在該文件中添加下面程序清單的代碼。

程序清單15-5 mrjob中分佈式Pegasos算法的外圍代碼

from mrjob.job import MRJob

import pickle
from numpy import *

class MRsvm(MRJob):
    DEFAULT_INPUT_PROTOCOL = \'json_value\'

def __init__(self, *args, **kwargs):
    super(MRsvm, self).__init__(*args, **kwargs)
    self.data = pickle.load(open(\'<path to your Ch15 code directory>svmDat27\'))
    self.w = 0
    self.eta = 0.69
    self.dataList = 
    self.k = self.options.batchsize
    self.numMappers = 1
    self.t = 1

def configure_options(self):
    super(MRsvm, self).configure_options
    self.add_passthrough_option(\'--iterations\', dest=\'iterations\', default=2, type=\'int\',help=\'T: number of iterations to run\')
    self.add_passthrough_option(\'--batchsize\', dest=\'batchsize\', default=100, type=\'int\',help=\'k: number of data points in a batch\')

def steps(self):
    return ([self.mr(mapper=self.map, mapper_final=self.map_fin,reducer=self.reduce)]*self.options.iterations)
if __name__ == \'__main__\':
    MRsvm.run    
  

程序清單15-5的代碼進行了一些設定,從而保證了map和reduce階段的正確執行。在程序開頭,Mrjob、NumPy和Pickle模塊分別通過一條include語句導入。之後創建了一個mrjob類MRsvm,其中__init__方法初始化了一些在map和reduce階段用到的變量。Python的模塊Pickle在加載不同版本的Python文件時會出現問題。為此,我將Python2.6和2.7兩個版本對應的數據文件各自存為svmDat26和svmDat27。

對應於命令行輸入的參數,Configure_options方法建立了一些變量,包括迭代次數(T)、待處理列表的大小(k)。這些參數都是可選的,如果未指定,它們將採用默認值。

最後,steps方法告訴mrjob應該做什麼,以什麼順序來做。它創建了一個Python的列表,包含map、map_fin和reduce這幾個步驟,然後將該列表乘以迭代次數,即在每次迭代中重複調用這個列表。為了保證作業裡的任務鏈能正確執行,mapper需要能夠正確讀取reducer輸出的數據。單個MapReduce作業中無須考慮這個因素,這裡需要特別注意輸入和輸出格式的對應。

我們對輸入和輸出格式進行如下規定:

傳入的值是列表數組,valueList的第一個元素是一個字符串,用於表示列表的後面存放的是什麼類型的數據,例如{『x』,23}[『w』,[1,5,6]]。每個mapper_final都將輸出同樣的key,這是為了保證所有的key/value對都輸出給同一個reducer。

定義好了輸入和輸出之後,下面開始寫mapper和reducer方法,打開mrSVM.py文件並在MRsvm類中添加下面的代碼。

程序清單15-6 分佈式Pegasos算法的mapper和reducer代碼

def map(self, mapperId, inVals):
        if False: yield
        if inVals[0]==\'w\':
            self.w = inVals[1]
        elif inVals[0]==\'x\':
            self.dataList.append(inVals[1])
        elif inVals[0]==\'t\': self.t = inVals[1]
    def map_fin(self):
        labels = self.data[:,-1]; X=self.data[:,0:-1]
        if self.w == 0: self.w = [0.001]*shape(X)[1]
        for index in self.dataList:
            p = mat(self.w)*X[index,:].T
            if labels[index]*p < 1.0:
                yield (1, [\'u\', index])
        yield (1, [\'w\', self.w])
        yield (1, [\'t\', self.t])
def reduce(self, _, packedVals):
    for valArr in packedVals:
        if valArr[0]==\'u\': self.dataList.append(valArr[1])
        elif valArr[0]==\'w\': self.w = valArr[1]
        elif valArr[0]==\'t\': self.t = valArr[1]
    labels = self.data[:,-1]; X=self.data[:,0:-1]
    wMat = mat(self.w); wDelta = mat(zeros(len(self.w)))
    for index in self.dataList:
        #❶ 將更新值累加  
        wDelta += float(labels[index])*X[index,:]
    eta = 1.0/(2.0*self.t)
    wMat = (1.0 - 1.0/self.t)*wMat + (eta/self.k)*wDelta
    for mapperNum in range(1,self.numMappers+1):
        yield (mapperNum, [\'w\', wMat.tolist[0] ])
        if self.t < self.options.iterations:
            yield (mapperNum, [\'t\', self.t+1])
            for j in range(self.k/self.numMappers):
                yield (mapperNum, [\'x\',random.randint(shape(self.data)[0]) ])  
  

程序清單15-6里的第一個方法是map,這也是分佈式的部分,它得到輸入值並存儲,以便在map_fin中處理。該方法支持三種類型的輸入:w向量、t或者xt是迭代次數,在本方法中不參與運算。狀態不能保存,因此如果需要在每次迭代時保存任何變量並留給下一次迭代,可以使用key/value對傳遞該值,抑或是將其保存在磁盤上。顯然前者更容易實現,速度也更快。

map_fin方法在所有輸入到達後開始執行。這時已經獲得了權重向量w和本次批處理中的一組x值。每個x值是一個整數,它並不是數據本身,而是索引。數據存儲在磁盤上,當腳本執行的時候讀入到內存中。當map_fin啟動時,它首先將數據分成標籤和數據,然後在本次批處理的數據(存儲在self.dataList裡)上進行迭代,如果有任何值被錯分就將其輸出給reducer。為了在mapper和reducer之間保存狀態,w向量和t值都應被發送給reducer。

最後是reduce函數,對應本例只有一個reducer執行。該函數首先迭代所有的key/value對並將值解包到一個局部變量datalist裡。之後dataList裡的值都將用於更新權重向量w,更新量在wDelta中完成累加❶。然後,wMat按照wDelta和學習率eta進行更新。在wMat更新完畢後,又可以重新開始整個過程:一個新的批處理過程開始,隨機選擇一組向量並輸出。注意,這些向量的key是mapper編號。

為了看一下該算法的執行效果,還需要用一些類似於reducer輸出的數據作為輸入數據啟動該任務,我為此附上了一個文件kickStart.txt。在本機上執行前面的代碼可以用下面的命令:

%python mrSVM.py < kickStart.txt
                                .
                                .
                                .
streaming final output from c:userspeterappdatalocaltemp
mrSVM.Peter.20110301.011916.373000outputpart-00000
1 [\"w\", [0.51349820499999987, -0.084934502500000009]]
removing tmp directory c:userspeterappdatalocaltemp
mrSVM.Peter.20110301.011916.373000  
  

這樣就輸出了結果。經過2次和50次迭代後的分類面如圖15-9所示。

圖15-9 經過多次迭代的分佈式Pegasos算法執行結果。該算法收斂迅速,多次迭代後可以得到更好的結果

如果想在EMR上運行該任務,可以添加運行參數:-r emr。該作業默認使用的服務器個數是1。如果要調整的話,添加運行參數:--num-ec2-instances=2(這裡的2也可以是其他正整數),整個命令如下:

%python mrSVM.py -r emr --num-ec2-instances=3 < kickStart.txt > myLog.txt
  

要查看所有可用的運行參數,輸入%python mrSVM.py –h

調試mrjob

調試一個mrjob腳本將比調試一個簡單的Python腳本棘手得多。這裡僅給出一些調試建議。

  • 確保已經安裝了所有所需的部件:boto、simplejson和可選的PyYAML。
  • 可以在~/.mrjob.conf文件中設定一些參數,確定它們是正確的。
  • 在將作業放在EMR上運行之前,盡可能在本地多做調試。能在花費10秒就發現一個錯誤的情況下,就不要花費10分鐘才發現一個錯誤。
  • 檢查base_temp_dir目錄,它在~/.mrjob.conf中設定。例如在我的機器上,該目錄的存放位置是/scratch/$USER,其中可以看到作業的輸入和輸出,它將對程序的調試非常有幫助。
  • 一次只運行一個步驟。

到現在為止,讀者已經學習了如何編寫以及如何在大量機器上運行機器學習作業,下節將分析這樣做的必要性。