讀古今文學網 > Java程序員修煉之道 > 10.6 Clojure並發 >

10.6 Clojure並發

Java的狀態模型從根本上來說是基於對象可變思想的。正如第4章中所提及的,這會直接導致並發代碼的安全問題。在某一線程修改對象的狀態時,為了防止其他線程看到對象的中間(即不一致)狀態,需要引入相當複雜的鎖策略。這些策略理解難,調試難,測試更難。

Clojure的並發概念在某些方面不像Java中那麼底層。比如說,由Clojure運行時管理線程池的使用(開發人員在這方面幾乎或根本不能控制)看起來可能有點奇怪。但是讓平台(此處即Clojure運行時)細緻地做好內務工作的好處在於,開發人員可以專注於更重要的任務,比如總體設計。

Clojure的指導思想是默認把線程彼此隔開,這種實現並發安全的辦法由來已久。假定「沒有共享資源」的基線和採用不可變值使Clojure避開了很多Java所面臨的問題,從而可以專注於為並發編程安全地共享狀態的方法。

注意 為了幫助提升安全性,Clojure的運行時提供了線程協調機制,我們強烈建議你使用這些機制,而不是用Java的慣例或構造自己的並髮結構。

實際上,Clojure用不同的方法實現了不同的並發模型:未來式(future)、並行調用(pcall)、引用形式(ref)和代理(agent)。且聽我們一一道來,先從最簡單的開始。

10.6.1 未來式與並行調用

第一個也是最明顯的一個狀態分享辦法就是不分享。實際上,我們一直使用的Clojure結構var本質上是不可以共享的。如果兩個不同的線程繼承了名字相同的var,並在線程裡重新綁定了它,那綁定只在這些線程內部可見,絕不可能被其他線程共享。

可以利用Clojure跟Java的緊密結合啟動新線程,也就是說在Clojure中寫Java並發代碼非常容易。但其中有些抽像在Clojure中有更乾淨的形式。比如對於第4章介紹的Java 未來式(Future),Clojure提供了非常乾淨的方式。代碼清單10-8是個簡單的例子。

代碼清單10-8 Clojure中的Future

user=> (def simple-future
  (future (do
    (println \"Line 0\")
    (Thread/sleep 10000)
    (println \"Line 1\")
    (Thread/sleep 10000)
    (println \"Line 2\"))))
#\'user/simple-future
Line 0  //馬上開始執行
user=> (future-done? simple-future)
user=> false
Line 1
user=> @simple-future //解引用導致阻塞
Line 2
nil
user=>  
  

這段代碼用(future)建立了一個Future。創建之後它馬上就開始在後台線程中運行,所以在Clojure REPL中看到了輸出Line 0(然後是Line 1)——代碼已經開始在另一個線程上運行了。接著可以用(future-done?)來檢查代碼是否已經運行完,這個調用是非阻塞的。然而對future的解引用會阻塞調用線程,直到函數完成。

這實際上是Clojure對Java Future的一個瘦封裝,語法更乾淨。Clojure還提供了對並發程序員非常有幫助的輔助形式。有個簡單的函數是(pcalls),可以接受數量可變的零參函數,讓它們並發執行。它們在運行時管理的線程池上執行,並返回一個懶序列結果。試圖訪問序列中的任何還沒完成的元素會導致訪問線程被阻塞。

代碼清單10-9建立了一個單參函數(wait-with-for)。它用了一個類似10.3.2節介紹過的loop形式。可以用它創建一些零參函數(wait-1)(wait-2)等,並把它們傳給(pcalls)

代碼清單10-9 Clojure中的並行調用

user=> (defn wait-with-for [limit]
  (let [counter 1]
    (loop [ctr counter]
    (Thread/sleep 500)
    (println (str \"Ctr=\" ctr))
    (if (< ctr limit)
        (recur (inc ctr))
        ctr))))
#\'user/wait-with-for
user=> (defn wait-1  (wait-with-for 1))
user=> #\'user/wait-1
user=> (defn wait-2  (wait-with-for 2))
user=> #\'user/wait-2
user=> (defn wait-3  (wait-with-for 3))
user=> #\'user/wait-3
user=> (def wait-seq (pcalls wait-1 wait-2 wait-3))
#\'user/wait-seq
Ctr=1
Ctr=1
Ctr=1
Ctr=2
Ctr=2
Ctr=3

user=> (first wait-seq)
1
user=> (first (next wait-seq))
2
  

因為線程睡眠值只有500毫秒,等待函數很快就能完成。通過調整超時(比如延遲到10秒),很容易驗證由(pcalls)返回的懶序列wait-seq是否有上面描述的那種阻塞行為。

對於不需要共享狀態的情況,這種簡單的多線程結構挺好,但在很多應用中,不同的處理線程都要在運行過程中相互通信。Clojure有幾個模型可以處理這種情況,接下來我們先看看其中的一個:借助(ref)形式實現的狀態共享。

10.6.2 ref形式

ref是Clojure在線程間共享狀態的辦法。它們基於運行時提供的一個模型,在這個模型中,狀態的改變要能被多個線程見到。該模型在符號和值之間引入了一個額外的中間層。也就是說,符號綁定到值的引用上,而不是直接綁到值上。這個系統基本上是事務化的,並且由Clojure運行時進行協調。如圖10-6所示。

圖10-6 軟件事務內存

這一中間層意味著改變或更新ref之前必須把它放在一個事務中。當事務完成的時候,或者全變了,或者什麼也沒變。這跟數據庫中的事務是類似的。

這可能有點抽像了,所以我們來看一個模擬ATM的例子。在Java中,要對所有敏感數據加鎖保護。代碼清單10-10是一個簡單的自動提款機模型,包括鎖。

代碼清單10-10 Java中的ATM模型

public class Account {
    private double balance = 0;
    private final String name;
    private final Lock lock = new ReentrantLock;

    public Account(String name_, double initialBal_){
        name = name_;
        balance = initialBal_;
    }

    public synchronized double getBalance{
        return balance;
    }

    public synchronized void debit(double debitAmt_) {
        balance -= debitAmt_;
    }

    public String getName {
        return name;
    }

    public String toString {
        return \"Account [balance=\" + balance + \", name=\" + name + \"]\";
    }

    public Lock getLock {
        return lock;
    }
}

public class Debitter implements Runnable {
    private final Account acc;
    private final CountDownLatch cdl;
    public Debitter(Account account_, CountDownLatch cdl_) {
        acc = account_;
        cdl = cdl_;
    }

    public void run {
        double bal = acc.getBalance;
        Lock lk = acc.getLock;

        while (bal > 0) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) { }
            lk.lock; //能在acc上同步
            bal = acc.getBalance; //必須重新取得餘額
            if (bal > 0) {
                acc.debit(1);
                bal--;
            }
            lk.unlock;
        }
        cdl.countDown;
    }
}
Account myAcc = new Account(\"Test Account\", 500 * NUM_THREADS);
CountDownLatch stopl = new CountDownLatch(NUM_THREADS);
for (int i=0; i<NUM_THREADS; i++) {
    new Thread(new Debitter(myAcc, stopl)).start;
}
stopl.await;
System.out.println(myAcc);
  

再來看看用Clojure怎麼寫。先來個單線程版本。然後我們再開發一個並發版本跟單線程版本比較,這樣並發代碼應該更容易理解。

代碼清單10-11是單線程版本。

代碼清單10-11 Clojure中的簡單ATM模型

(defn make-new-acc [account-name opening-balance]
    {:name account-name :bal opening-balance})

(defn loop-and-debit [account]
    (loop [acc account]
      (let [balance (:bal acc) my-name (:name acc)]
          (Thread/sleep 1)
          (if (&gt; balance 0)
            (recur (make-new-acc my-name (dec balance))) //用循環/遞歸代替Java中的while
            acc
     ))))

(loop-and-debit (make-new-acc \"Ben\" 5000))
  

這段代碼跟Java版比起來非常緊湊。必須承認,這是單線程的,但還是比Java的代碼少了很多。運行代碼會得到期望的結果:一個餘額為0的acc映射。現在我們看看並發形式。

要讓這段代碼並行,需要引入ref。它們是用(ref)形式創建的,並且類型為clojure.lang.Ref的JVM對象。通常建立時會帶一個保存狀態的映射,此外還需要(dosync)形式來設置事務。在事務之內,還要用到(alter)形式來修改ref。使用ref的多線程ATM函數如代碼清單10-12所示。

代碼清單10-12 多線程ATM

(defn make-new-acc [account-name opening-balance]
    (ref {:name account-name :bal opening-balance}))

(defn alter-acc [acc new-name new-balance]
    (assoc acc :bal new-balance :name new-name)) //必須返回值,而不是引用

(defn loop-and-debit [account]
    (loop [acc account]
      (let [balance (:bal @acc)
             my-name (:name @acc)]
         (Thread/sleep 1)
         (if (> balance 0)
           (recur (dosync (alter acc alter-acc my-name (dec balance)) acc))
           acc
     ))))

(def my-acc (make-new-acc \"Ben\" 5000))

(defn my-loop  (let [the-acc my-acc]
    (loop-and-debit the-acc)
))

(pcalls my-loop my-loop my-loop my-loop my-loop)
  

就像註釋中說的,對值進行操作的(alter-acc)函數必須返回一個值。所操作的值是對當前事務中線程可見的本地值,這稱為事務內的值。返回的值是在變更函數返回之後的ref新值。在退出 (dosync)所定義的事務塊之前,這個值對外界是不可見的。

與此同時,其他事務可能像這個一樣也在進行。如果是這樣,Clojure STM系統會進行跟蹤,並且只允許那些自開始以來已經提交過的事務組成的事務提交。如果不一致,它會回滾,並且可能在得到更新過的狀態後再次嘗試。

如果事務做了任何會產生副作用的事情(比如日誌文件或其他輸出),這個重試行為可能會引發問題。讓事務化部分在函數式編程中(即沒有副作用)盡可能地保持簡單純粹是你的責任。

對於某些多線程方式而言,這種持樂觀態度的事務行為看起來可能是相當重量級的做法。有些並發應用只需偶爾在線程間進行通信,並且是以相當不對稱的風格。幸運的是,Clojure提供了另外一種更好地體現「過後就忘「原則的並發機制,這也是我們下一節的主題。

10.6.3 代理

代理是Clojure中異步的、面向消息的並發原語。Clojure代理不是共享狀態,而是屬於另外一個線程的一點兒狀態,但它會從另外一個線程中接收消息(以函數的形式)。這乍看起來可能是個奇怪的想法,儘管遇到過Scala的actor之後這種感覺可能會少一點。

「我離它們太遠了,只能把禮物裝進包裹寄給它們,」她想,「這也太滑稽了,給自己的雙腳送禮物還需要郵寄!地址寫起來就更有趣了!」

——《愛麗絲夢遊仙境》,劉易斯‧卡羅爾

應用到代理上的函數在代理的線程上運行。這個線程是由Clojure運行時管理的,在一個程序員通常無法訪問的線程池裡。運行時還會保證代理中那些可以被外界看到的值是孤立的和原子的。這就是說用戶代碼只會見到狀態修改之前或之後的代理值。

代碼清單10-13是個簡單的代理例子,跟用來討論future的例子類似。

代碼清單10-13 Clojure代理

(defn wait-and-log [coll str-to-add]
    (do (Thread/sleep 10000)
        (let [my-coll (conj coll str-to-add)]
            (Thread/sleep 10000)
            (conj my-coll str-to-add))))

(def str-coll (agent ))

(send str-coll wait-and-log \"foo\")

@str-coll
  

send調用派發了一個(wait-and-log)調用給代理,通過使用REPL解引用,結果就像承諾的那樣,你絕不會看到代理的中間狀態——只有最後的狀態出現了(字符串\"foo\"被添加了兩次)。

實際上,代碼清單10-13上的(send)調用很容易讓人聯想到愛麗絲的腳的地址。劉易斯‧卡羅爾很可能是用Clojure代碼寫的地址:

愛麗絲的右腳收
    壁爐前的毛毯上
      靠近擋板
      (帶去愛麗絲的愛)
  

在你認為一個人的腳是身體的有機組成時,這的確挺怪異的。同樣,發消息給Clojure管理的線程池中一個線程上的代理看起來也挺怪異的,兩個線程還共享一個地址空間。但你目前多次遇到的一個並發主題就是如果它能讓用法更加簡單清晰,額外的複雜性可能是件好事。