隨著Java 5的到來,Java對並發的重新思考也浮出了水面。這些新思想主要體現在 java.util.concurrent
包上,其中包含了大量用來編寫多線程代碼的新工具。在後續版本中,這些工具不斷得到改進,但其工作方式卻依然保持不變,並且直到今天還是對開發人員很有幫助。
我們馬上快速過一下java.util.concurrent
中主要的類及相關包,比如atomic和locks包。我們會向你介紹這些類及其適用的情景。你也應該讀一下它們的Javadoc,並嘗試熟悉整個包——它們使編寫並發類容易多了。
代碼遷移
如果你還有基於(Java 5之前的)老辦法編寫的多線程代碼,建議你用
java.util.concurrent
重構。按我們的經驗,幾乎在所有案例中,如果你特意把代碼遷移到新的API中,代碼就會得以改進。你的努力付出將使代碼在清晰性和可靠性上得到極大提升。
請把這次討論當做並發編程的啟動工具,而不是一次研討會。想要充分利用好java.util.concurrent
,你還需要知道更多的知識。
4.3.1 原子類:java.util.concurrent.atomic
java.util.concurrent.atomic
中有幾個名字以Atomic
打頭的類。它們的語義基本上和volatile
一樣,只是封裝在一個API裡了,這個API包含為操作提供的適當的原子(要麼不做,要做就全做)方法。對於開發人員來說,這是非常簡單的避免在共享數據上出現競爭危害1的辦法。
1 競爭危害(race hazard)又名競態條件(race condition)。一個系統或進程的輸出,依賴於不受控制事件的出現順序或時機。例如兩個進程都試圖修改一個共享內存的內容。在沒有並發控制的情況下,最後的結果取決於兩個進程的執行順序與時機,如果發生了並發訪問衝突,最後的結果是不正確的。——譯者注
在編寫這些實現時利用了現代處理器的特性,所以如果能從硬件和操作系統上得到適當的支持,它們可以是非阻塞(無需線程鎖)的,而大多數現代系統都能提供這種支持。常見的用法是實現序列號機制,在AtomicInteger
或AtomicLong
上用原子操作getAndIncrement
方法。
要做序列號,該類應該有個nextId
方法,每次調用時肯定能返回一個唯一併且完全增長的數值。這和數據庫裡序列號的概念很像(所以這個變量叫這個名字)。
來看一段產生序列號的代碼:
private final AtomicLong sequenceNumber = new AtomicLong(0);
public long nextId {
return sequenceNumber.getAndIncrement;
}
注意 原子類不是從有相似名稱的類繼承而來的,所以
AtomicBoolean
不能當Boolean
用,AtomicInteger
也不是Integer
,雖然它確實擴展了Number
。
接下來,我們會檢查一下java.util.concurrent
如何對同步模型的核心建模——Lock
接口。
4.3.2 線程鎖:java.util.concurrent.locks
塊結構同步方式基於鎖這樣一個簡單的概念。這種方式有幾個缺點。
- 鎖只有一種類型。
- 對被鎖住對象的所有同步操作都是一樣的作用。
- 在同步代碼塊或方法開始時取得線程鎖。
- 在同步代碼塊或方法結束時釋放線程鎖。
- 線程或者得到鎖,或者阻塞——沒有其他可能。
如果我們要重構對線程鎖的支持,有幾處可以得到提升。
- 添加不同類型的鎖,比如讀取鎖和寫入鎖。
- 對鎖的阻塞沒有限制,即允許在一個方法中上鎖,在另一個方法中解鎖)。
- 如果線程得不到鎖,比如鎖由另外一個線程持有,就允許該線程後退或繼續執行,或者做點別的事情——運用
tryLock
方法。 - 允許線程嘗試取鎖,並可以在超過等待時間後放棄。
能實現以上這些的關鍵就是java.util.concurrent.locks
中的Lock
接口。還有它的兩個實現類。
ReentrantLock
——本質上跟用在同步塊上那種鎖是一樣的,但它要稍微靈活點兒。ReentrantReadWriteLock
——在需要讀取很多線程而寫入很少線程時,用它性能會更好。
塊結構並發能實現的所有功能都可以用Lock
接口實現。下面是用ReentrantLock
重寫的那個死鎖的例子。
代碼清單4-4 用ReentrantLock
重寫死鎖
private final Lock lock = new ReentrantLock;
public void propagateUpdate(Update upd_, MicroBlogNode backup_) {
//每個線程都先鎖住自己的鎖
lock.lock;
try {
System.out.println(ident +": recvd: "+upd_.getUpdateText +" ; backup: "+backup_.getIdent);
//調用confirmUpdate知悉其他線程
backup_.confirmUpdate(this, upd_);
} finally {
lock.unlock;
}
}
public void confirmUpdate(MicroBlogNode other_, Update upd_) {
//1嘗試鎖住其他線程
lock.lock;
try{
System.out.println(iden +": recvd confirm: "+upd_.getUpdateText +" from "+ other_.getIdentifier);
} finally {
lock.unlock;
}
}
鎖住其他線程的嘗試1通常都會失敗,因為它已經被鎖住了(如圖4-3所示)。這就是導致死鎖出現的原因。
用鎖時帶上try...finally
把
lock
放在try...finally
塊中(釋放也在這裡)的模式是另外一個好用的小工具。在跟塊結構並發相似的情景中它同樣很好用。而另一方面,如果需要傳遞Lock
對象,比如從一個方法中返回,則不能用這個模式。 使用Lock
對象可能要比塊結構方式強大得多,但有時用它們很難設計出完善的鎖定策略。
對付死鎖的策略有很多,但你應該特別注意一個不起任何作用的策略。請看下面這段代碼中新版的propagateUpdate
方法(假定confirmUpdate
也做出了同樣的修改)。在這個例子中,我們用帶有超時機制的tryLock
替換了無條件的鎖。通過這種辦法可以為其他線程提供得到線程鎖的機會,從而去除死鎖。
代碼清單4-5 一次有缺陷的解決死鎖問題的嘗試
public void propagateUpdate(Update upd_, MicroBlogNode backup_) {
boolean acquired = false;
while (!acquired) {
try {
int wait = (int)(Math.random * 10);
//嘗試與鎖定,超時時長隨機
acquired = lock.tryLock(wait, TimeUnit.MILLISECONDS);
if (acquired) {
System.out.println(ident +": recvd: "+upd_.getUpdateText +" ; backup: "+backup_.getIdent);
//在其他線程上確認
backup_.confirmUpdate(this, update_);
} else {
Thread.sleep(wait);
}
} catch (InterruptedException e) {
} finally {
//僅在鎖定時解鎖
if (acquired) lock.unlock;
}
}
}
如果運行代碼清單4-5中的代碼,你會發現它有時候還是不能解決死鎖問題。你能看到「received confirm of update」,但它並不會一直出現,時有時無。
實際上,死鎖問題並沒有真正解決,因為如果線程取得了第一個鎖(在propagateUpdate
中),它才會調用confirmUpdate
,並且在完成之前絕不會釋放第一個鎖。即使兩個線程都能在彼此調用confirmUpdate
之前取得第一個線程鎖,它們還是會產生死鎖。
如果取得第二個鎖的嘗試失敗,能真正解決問題的辦法是讓線程釋放其持有的第一個鎖,再次從頭開始等待,從而使其他線程有機會得到完整的鎖集合,能走完全程。代碼如下所示。
代碼清單4-6 修正死鎖
public void propagateUpdate(Update upd_, MicroBlogNode backup_) {
boolean acquired = false;
boolean done = false;
while (!done) {
int wait = (int)(Math.random * 10);
try {
acquired = lock.tryLock(wait, TimeUnit.MILLISECONDS);
if (acquired) {
System.out.println(ident +": recvd: "+upd_.getUpdateText +" ; backup: "+backup_.getIdent);
//檢查tryConfirmUpdate的返回值
done = backupNode_.tryConfirmUpdate(this, update_);
}
} catch (InterruptedException e) {
} finally {
if (acquired) lock.unlock;
}
//如果done為false,釋放鎖並等待
if (!done) try {
Thread.sleep(wait);
} catch (InterruptedException e) { }
}
}
public boolean tryConfirmUpdate(MicroBlogNode other_, Update upd_) {
boolean acquired = false;
try {
int wait = (int)(Math.random * 10);
acquired = lock.tryLock(wait, TimeUnit.MILLISECONDS);
if (acquired) {
long elapsed = System.currentTimeMillis - startTime;
System.out.println(ident +": recvd confirm: "+
upd_.getUpdateText +" from "+other_.getIdent
+" - took "+ elapsed +" millis");
return true;
}
} catch (InterruptedException e) {
} finally {
if (acquired) lock.unlock;
}
return false;
}
這一版會檢查tryConfirmUpdate
的返回碼。如果為false
,最初的鎖被釋放。該線程會暫停一段時間,讓其他線程有機會獲取鎖。
把這段代碼運行幾次,你會發現這兩個線程基本上總能走完全程——死鎖問題已經被你解決了。你也許想試驗試驗之前版本中那段代碼的不同形式,諸如最原始的、有缺陷的或被改正的。通過對這些代碼的演練,你能對鎖機制有更深刻的理解,並且開始漸漸地憑直覺避免死鎖問題的出現。
為什麼那個有缺陷的版本有時候能奏效?
你已經看到了,死鎖仍然存在,那是什麼原因導致這個版本中的代碼有時可以成功呢?代碼中附加的複雜性是罪魁禍首。它影響JVM的線程調度器,讓它變得更加難以預測。這意味著它有時候能讓某個線程(通常是第一個)在其他線程運行之前進入
confirmUpdate
方法並取得第二個鎖。這種情況也會發生在原始代碼中,只是可能性更低罷了。
我們只是揭開了Lock
各種可能性的面紗——有很多種方法可以產生更加複雜的鎖定結構。接下來我們就來討論其中一個概念——鎖存器。
4.3.3 CountDownLatch
CountDownLatch
是一種簡單的同步模式,這種模式允許線程在通過同步屏障之前做些少量的準備工作。
為了達到這種效果,在構建新的CountDownLatch
實例時要給它提供一個int
值(計數器)。此外,還有兩個用來控制鎖存器的方法:countDown
和await
。前者對計數器減1,而後者讓調用線程在計數器到0之前一直等待。如果計數器已經為0或更小,則它什麼也不做。這個簡單的機制使得這種所需準備最少的模式非常容易部署。
在下面的代碼中,同一進程內的一組更新處理線程至少必須有一半線程正確初始化(假定更新處理線程的初始化要佔用一定時間)之後,才能開始接受系統發送給它們中的任何一個線程的更新。
代碼清單4-7 用鎖存器輔助初始化
public static class ProcessingThread extends Thread {
private final String ident;
private final CountDownLatch latch;
public ProcessingThread(String ident_, CountDownLatch cdl_) {
ident = ident_;
latch = cdl_;
}
public String getIdentifier {
return identifier;
}
//節點初始化
public void initialize {
latch.countDown;
}
public void run {
initialize;
}
}
final int quorum = 1 + (int)(MAX_THREADS / 2);
final CountDownLatch cdl = new CountDownLatch(quorum);
final Set<ProcessingThread> nodes = new HashSet<>;
try {
for (int i=0; i<MAX_THREADS; i++) {
ProcessingThread local = new ProcessingThread("localhost:"+(9000 + i), cdl);
nodes.add(local);
local.start;
}
//達到quorum,開始發送更新
cdl.await;
} catch (InterruptedException e) {
} finally {
}
這段代碼把鎖存器的值設置為quorum
。一旦被初始化的線程達到這個數量,就可以開始處理更新了。每個線程完成初始化後都會馬上調用countDown
,所以主線程只需等待quorum
的到來,然後啟動(並派發更新,儘管我們沒給出那部分代碼)。
我們接下來要討論的是對多線程開發人員來說最有用的類之一:java.util.concurrent
中的ConcurrentHashMap
。
4.3.4 ConcurrentHashMap
ConcurrentHashMap
類是標準HashMap
的並發版本。它改進了Collections
類中提供的synchronizedMap
功能,因為那些方法返回的集合中包含的鎖要比需要的多。
圖4-7 HashMap
的經典視圖
如圖4-7所示,傳統的HashMap
用hash函數來確定存放鍵/值對的「桶」,這是該類名字中「Hash」的由來。這意味著多線程處理可以更加簡單直接——修改HashMap
時並不需要把整個結構都鎖住,只要鎖住即將修改的桶就行了。
提示 好的並發
HashMap
實現在讀取時不用鎖,寫入時只需鎖住要修改的桶。Java基本上能達到這個標準,但這裡還有一些大多數開發人員都無需過多關注的底層細節。
ConcurrentHashMap
類還實現了ConcurrentMap
接口,有些提供了原子操作的新方法:
putIfAbsent
——如果還沒有對應鍵,則把鍵/值對添加到HashMap
中。remove
——如果對應鍵存在,且值也與當前狀態相等(equal
),則用原子方式移除鍵值對。replace
——API為HashMap
中原子替換的操作方法提供了兩種不同的形式。
比如說,如果你把代碼清單4-1中的私有final
域arrivalTime
的類型從HashMap
改成ConcurrentHashMap
,那就可以把synchronized
方法替換成常規的非同步訪問。注意代碼清單4-8中鎖的缺失——根本就沒有顯式的同步。
代碼清單4-8 使用ConcurrentHashMap
public class ExampleMicroBlogTimingNode implements SimpleMicroBlogNode {
...
private final Map<Update, Long> arrivalTime = new ConcurrentHashMap <>;
...
public void propagateUpdate(Update upd_) {
arrivalTime.putIfAbsent(upd_, System.currentTimeMillis);
}
public boolean confirmUpdateReceived(Update upd_) {
return arrivalTime.get(upd_) != null;
}
}
ConcurrentHashMap
是java.util.concurrent
包中最有用的類之一。它不僅提供了多線程的安全性,並且性能更優,在日常使用中沒有嚴重的缺陷。接下來我們會討論它的最佳拍檔,用於List
的CopyOnWriteArrayList
。
4.3.5 CopyOnWriteArrayList
從名字就能看出來,CopyOnWriteArrayList是標準ArrayList的替代品。CopyOnWriteArrayList通過增加寫時複製(copy-on-write)語義來實現線程安全性,也就是說修改列表的任何操作都會創建一個列表底層數組的新復本(如圖4-8所示)。這就意味著所有成形的迭代器1都不用擔心它們會碰到意料之外的修改。
1 迭代器(iterator)是一個對象,它的工作是遍歷並選擇序列中的對象,而客戶端程序員不必知道或關心該序列底層的結構(也就是不同容器的類型)。——譯者注
圖4-8 寫時複製數組
當快速、一致的數據快照(不同的讀取器讀到的數據偶爾可能會不一樣)比完美的同步以及性能上的突破更重要時,這種共享數據的方法非常理想,並經常出現在非關鍵任務中。
我們來看一個寫時複製的案例。假設有個微博的時間線更新,這是一個典型的非關鍵任務的例子。每個讀取器的性能、自身一致性的快照要比全局的一致性更受歡迎。代碼清單4-9表示每個用戶時間線視圖的持有者類。我們將會在代碼清單4-10中用它來演示寫時複製操作是如何進行的。
代碼清單4-9 寫時複製案例
public class MicroBlogTimeline {
private final CopyOnWriteArrayList<Update> updates;
private final ReentrantLock lock;
private final String name;
private Iterator<Update> it;
//構造方法已省略
public void addUpdate(Update update_) {
updates.add(update_);
}
//設置迭代器
public void prep {
it = updates.iterator;
}
public void printTimeline {
//需要在這裡鎖定
lock.lock;
try {
if (it != null) {
System.out.print(name+ ": ");
while (it.hasNext) {
Update s = it.next;
System.out.print(s+ ", ");
}
System.out.println;
}
} finally {
lock.unlock;
}
}
}
我們專門設計了這個類來闡明在寫時複製語義下的迭代器行為。你需要在輸出方法中鎖定,以防止輸出在兩個線程間亂掉,此外你也能看到兩個線程各自的狀態。
你可以從下面的代碼中調用MicroBlogTimeline
類。
代碼清單4-10 揭示寫時複製行為
final CountDownLatch firstLatch = new CountDownLatch(1);
final CountDownLatch secondLatch = new CountDownLatch(1);
final Update.Builder ub = new Update.Builder;
//1設置初始狀態
final List<Update> l = new CopyOnWriteArrayList<>;
l.add(ub.author(new Author("Ben")).updateText("I like pie").build);
l.add(ub.author(new Author("Charles")).updateText(
➥ "I like ham on rye").build);
ReentrantLock lock = new ReentrantLock;
final MicroBlogTimeline tl1 = new MicroBlogTimeline("TL1", l, lock);
final MicroBlogTimeline tl2 = new MicroBlogTimeline("TL2", l, lock);
Thread t1 = new Thread {
public void run {
l.add(ub.author(new Author("Jeffrey")).updateText("I like a lot of things").build);
tl1.prep;
firstLatch.countDown;
//用鎖存器嚴格限制事件的順序(1)
try { secondLatch.await; } catch (InterruptedException e) { }
tl1.printTimeline;
}
};
Thread t2 = new Thread{
public void run{
try {
//用鎖存器嚴格限制事件的順序(2)
firstLatch.await;
l.add(ub.author(new Author("Gavin")).updateText("I like otters").build);
tl2.prep;
//用鎖存器嚴格限制事件的順序(3)
secondLatch.countDown;
} catch (InterruptedException e) { }
tl2.printTimeline;
}
};
t1.start;
t2.start;
這段代碼裡有很多輔助的測試代碼。但也有很多值得注意的地方:
CountDownLatch
用來嚴格控制兩個線程之間發生的事情。如果用普通的
List
代替CopyOnWriteArrayList
,結果會導致出現ConcurrentModificationException
異常。這也是在兩個線程之間共享一個
Lock
對像以控制對共享資源(即STDOUT
)訪問的例子。如果用塊結構方式寫這段代碼,會顯得更加雜亂。
這段代碼的輸出如下:
TL2: Update [author=Author [name=Ben], updateText=I like pie, createTime=0],
Update [author=Author [name=Charles], updateText=I like ham on rye,
createTime=0], Update [author=Author [name=Jeffrey], updateText=I like a
lot of things, createTime=0], Update [author=Author [name=Gavin],
updateText=I like otters, createTime=0],
TL1: Update [author=Author [name=Ben], updateText=I like pie, createTime=0],
Update [author=Author [name=Charles], updateText=I like ham on rye,
createTime=0], Update [author=Author [name=Jeffrey], updateText=I like a
lot of things, createTime=0],
第二行輸出(標籤為TL1
)漏掉了最後一個更新,就是提到了水獺的那個,儘管按鎖存器的意思在列表被修改後tl1
2是可以訪問的。這說明了tl1
中所包含的迭代器被tl2
複製,並且最後一個更新對tl1
是不可見的。這就是我們想要展示的寫時複製特性。
2 原文為mbex1
,下文同。——譯者注
CopyOnWriteArrayList
的性能使用
CopyOnWriteArrayList
類要比使用ConcurrentHashMap
多花點心思,它是HashMap
的即用型並發替代品。這是因為性能問題——寫時複製特性意味著如果列表在被讀取或遍歷時做了修改,那就必須複製整個數組。也就是說如果對列表的修改次數跟讀取次數相差不多,這種方式未必能達到較好的性能。但就像我們在第6章一再提到的那樣,得到性能優異的代碼的唯一可靠的方法就是測試,再測試,並衡量結果。
下一個在並發代碼中常用的構件是java.util.concurrent
中的Queue
。它用於在線程之間切換工作元素,並且還是很多靈活可靠的多線程設計的基礎。
4.3.6 Queue
隊列是一個非常美妙的抽像概念。不,之所以這麼說並不是因為我們生活在倫敦這個世界排隊之都。為把處理資源分發給工作單位(或者把工作單元分配給處理資源,這取決於你看待問題的方式),隊列提供了一種簡單又可靠的方式。
Java中有些多線程編程模式在很大程度上都依賴於Queue
實現的線程安全性,所以很有必要充分認識它。Queue
接口被放在了java.util
包中,因為即便在單線程編程中它也是一個重要的模式,但我們的重點是多線程編程,並且假定你已經熟悉隊列的基本用法了。
隊列經常用來在線程之間傳遞工作單元,這個模式通常適合用Queue
最簡單的並發擴展BlockingQueue
來實現。接下來我們就會重點介紹它。
1.BlockingQueue
BlockingQueue
還有兩個特性。
- 在向隊列中
put
時,如果隊列已滿,它會讓放入線程等待隊列騰出空間。 - 在從隊列中
take
時,如果隊列為空,會導致取出線程阻塞。
這兩個特性非常有用,因為如果一個線程(或線程池)的能力超過了其他線程,比較快的線程就會被強制等待,因此可以對整個系統起到調節作用,如圖4-9所示。
圖4-9 BlockingQueue
BlockingQueue的兩個實現
Java提供了
BlockingQueue
接口的兩個基本實現:LinkedBlockingQueue
和ArrayBlockingQueue
。它們的特性稍有不同;比如說,在已知隊列的大小而能確定合適的邊界時,用ArrayBlockingQueue
非常高效,而LinkedBlockingQueue
在某些情況下則會快一點兒。
2.使用工作單元
Queue
接口全都是泛型的——它們是Queue<E>
,BlockingQueue<E>
,等等依此類推。儘管看起來奇怪,但有時候利用這一點把工作項封裝在一個人工容器類內卻是明智之舉。
比如說,你有一個表示工作單元的MyAwesomeClass
類,想要用多線程方式處理,與其用
BlockingQueue<MyAwesomeClass>
不如用
BlockingQueue<WorkUnit<MyAwesomeClass>>
。其中WorkUnit
(或QueueObject
,或隨你怎麼命名這個容器類)是像下面這樣的包裝接口或類:
public class WorkUnit<T> {
private final T workUnit;
public T getWork{ return workUnit; }
public WorkUnit(T workUnit_) {
workUnit = workUnit_;
}
}
有了這層間接引用,不用犧牲所包含類型(在此即MyAwesomeClass
)在概念上的完整性就可以在這裡添加額外的元數據了。
這特別有用。能用上額外元數據的用例很多,下面舉幾個例子:
- 測試(比如展示一個對象的修改歷史)
- 性能指標(比如到達時間或服務質量)
- 運行時系統信息(比如
MyAwesomeClass
實例是如何被排到隊列中的)
以後再在這種間接引用裡增加元數據可能會非常困難。如果你發現在某些情況下需要更多的元數據,那麼要把它們加入到間接引用中可能需要大量的重構工作,而加在WorkUnit
類中就只是個簡單的修改。
3.一個BlockingQueue
的例子
我們用一個簡單的例子——等著看醫生的寵物們——來看看如何使用BlockingQueue
。這個例子中有一個等著讓醫生給做檢查的寵物集合。
代碼清單4-11 在Java中對寵物建模
public abstract class Pet {
protected final String name;
public Pet(String name) {
this.name = name;
}
public abstract void examine;
}
public class Cat extends Pet {
public Cat(String name) {
super(name);
}
public void examine{
System.out.println("Meow!");
}
}
public class Dog extends Pet
public Dog(String name) {
super(name);
}
public void examine{
System.out.println("Woof!");
}
}
public class Appointment<T> {
private final T toBeSeen;
public T getPatient{ return toBeSeen; }
public Appointment(T incoming) {
toBeSeen = incoming;
}
}
在這個簡單的例子中,我們用LinkedBlockingQueue<Appointment<Pet>>
表示獸醫的候診隊列,Appointment
起到了WorkUnit
的作用。
獸醫對象是由一個隊列和一個暫停時間構建的,其中隊列是由一個代表接待員的對象提供的預約隊列,暫停時間表示獸醫在預約之間的停工時間。
我們可以在下面這段代碼中建立獸醫的模型。在線程運行時,它在一個無限循環中重複調用seePatient
。當然,現實世界中的獸醫不可能這樣,因為他們晚上和週末要回家,不能一直在辦公室等著生病的小動物上門就醫。
代碼清單4-12 對獸醫建模
public class Veterinarian extends Thread {
protected final BlockingQueue<Appointment<Pet>> appts;
protected String text = "";
protected final int restTime;
private boolean shutdown = false;
public Veterinarian(BlockingQueue<Appointment<Pet>> lbq, int pause) {
appts = lbq;
restTime = pause;
}
public synchronized void shutdown{
shutdown = true;
}
@Override
public void run{
while (!shutdown) {
seePatient;
try {
Thread.sleep(restTime);
} catch (InterruptedException e) {
shutdown = true;
}
}
}
public void seePatient {
try {
//阻塞take
Appointment<Pet> ap = appts.take;
Pet patient = ap.getPatient;
patient.examine;
} catch (InterruptedException e) {
shutdown = true;
}
}
}
在seePatient
方法中,線程會從隊列中取出預約,並挨個檢查對應的寵物,如果當前隊列中沒有預約等待,則會阻塞。
4.BlockingQueue
的細粒度控制
除了簡單的take
和offer
API,BlockingQueue
還提供了另外一種與隊列交互的方式,這種方式對隊列的控制力度更大,但稍微有點複雜。這就是帶有超時的放入或取出操作,它允許線程在遇到問題時可以從與隊列的交互中退出來,轉而做點兒其他的事情。
實際上,這個功能並不常用,但它偶爾能派上大用場,所以我們要介紹一下。下面的例子還是來自微博。
代碼清單4-13 BlockingQueue行為的例子
public abstract class MicroBlogExampleThread extends Thread {
protected final BlockingQueue<Update> updates;
protected String text = "";
protected final int pauseTime;
private boolean shutdown = false;
public MicroBlogExampleThread(BlockingQueue<Update> lbq_, int pause_) {
updates = lbq_;
pauseTime = pause_;
}
//使線程可以徹底地結束(1)
public synchronized void shutdown{
shutdown = true;
}
@Override
public void run{
while (!shutdown){//使線程可以徹底地結束(2)
doAction;
try {
Thread.sleep(pauseTime);
} catch (InterruptedException e) {
//使線程可以徹底地結束(3)
shutdown = true;
}
}
}
//由子類實現具體動作
public abstract void doAction;
}
final Update.Builder ub = new Update.Builder;
final BlockingQueue<Update> lbq = new LinkedBlockingQueue<>(100);
MicroBlogExampleThread t1 = new MicroBlogExampleThread(lbq,10) {
public void doAction{
text = text + "X";
Update u = ub.author(new Author("Tallulah")).updateText(text).build;
boolean handed = false;
try {
handed = updates.offer(u,100,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (!handed) System.out.println("Unable to hand off Update to Queue due to timeout");
}
};
MicroBlogExampleThread t2 = new MicroBlogExampleThread(lbq, 1000) {
public void doAction{
Update u = null;
try {
u = updates.take;
} catch (InterruptedException e) {
return;
}
}
};
t1.start;
t2.start;
運行這段代碼展示了填充隊列的速度有多麼快,也表明供給線程的速度超過了提取線程的速度。很快,「Unable to hand off Update to Queue due to timeout」消息就出現了。
這是「相連線程池」中的一種典型的極端狀況,當上游的線程池比下游的快,這種情況就會發生。「相連線程池」可能會引發一些問題,比如會導致LinkedBlockingQueue
溢出。另外,如果消費者比生產者多,隊列會因此而經常空著。好在Java 7在BlockingQueue
上有了解決辦法——TransferQueue
。
5.TransferQueue——Java 7中的新貴
Java 7引入了TransferQueue
。它本質上是多了一項transfer
操作的BlockingQueue
。如果接收線程處於等待狀態,該操作會馬上把工作項傳給它。否則就會阻塞直到取走工作項的線程出現。你可以把這看做「掛號信」選項,即正在處理工作項的線程在交付當前工作項之前不會開始其他工作項的處理工作。這樣系統就可以調控上游線程池獲取新工作項的速度。
用限定大小的阻塞隊列也能達到這種調控效果,但TransferQueue
接口更靈活。此外,用TransferQueue
取代BlockingQueue
的代碼性能表現可能會更好。這是因為編寫TransferQueue
的實現時已經將現代編譯器和處理器的特性考慮在內,執行起來效率更高。聊了這麼久性能,不能空口無憑,必須給出測量結果並能證明才行。另外你也應該意識到,Java 7只給出了TransferQueue
的一種實現形式——鏈表版。
在下面的例子中,你會發現用TransferQueue
代替BlockingQueue
是多麼簡單。只要對清單4-13中的代碼做些簡單修改,就可以升級成TransferQueue
,請看這裡。
代碼清單4-14 用TransferQueue代替BlockingQueue
public abstract class MicroBlogExampleThread extends Thread {
protected final TransferQueue<Update> updates;
...
public MicroBlogExampleThread(TransferQueue<Update> lbq_, int pause_) {
updates = lbq_;
pauseTime = pause_;
}
...
}
final TransferQueue<Update> lbq = new LinkedTransferQueue<Update>(100);
MicroBlogExampleThread t1 = new MicroBlogExampleThread(lbq, 10) {
public void doAction{
...
try {
handed = updates.tryTransfer(u, 100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
...
}
};
到此為止,用來開發多線程應用的主要構件我們都見識過了。接下來該把它們整合到驅動並發代碼的引擎(執行器框架)上了。用它們可以對任務進行調度和控制,可以組合高效的並發流處理工作項,從而構建大型多線程應用程序。