楠木軒

什麼時候線程不安全?怎樣做到線程安全?怎麼擴展線程安全的類?

由 費莫白竹 發佈於 科技

當多個線程去訪問某個類時,如果類會表現出我們預期出現的行為,那麼可以稱這個類是線程安全的。

什麼時候會出現線程不安全?

操作並非原子。多個線程執行某段代碼,如果這段代碼產生的結果受不同線程之間的執行時序影響,而產生非預期的結果,即發生了靜態條件,就會出現線程不安全;

常見場景:

1. count++。它本身包含三個操作,讀取、修改、寫入,多線程時,由於線程執行的時序不同,有可能導致兩個線程執行後 count 只加了 1,而原有的目標確實希望每次執行都加 1;

2. 單例。多個線程可能同時執行到instance == null成立,然後新建了兩個對象,而原有目標是希望這個對象永遠只有一個;

public MyObj getInstance(){
if (instance == null){
instance = new MyObj();
}
return instance
}

解決方式是:當前線程在操作這段代碼時,其它線程不能對進行操作

常見方案:

1. 單個狀態使用 java.util.concurrent.atomic 包中的一些原子變量類,注意如果是多個狀態就算每個操作是原子的,複合使用的時候並不是原子的;

2. 加鎖。比如使用 synchronized 包圍對應代碼塊,保證多線程之間是互斥的,注意應儘可能的只包含在需要作為原子處理的代碼塊上;

synchronized 的可重複性

當線程要去獲取它自己已經持有的鎖是會成功的,這樣的鎖是可重入的,synchronized 是可重入的

class Paxi {
public synchronized void sayHello(){
System.out.println("hello");
}
}
class MyClass extends Paxi{
public synchronized void dosomething(){
System.out.println("do thing ..");
super.sayHello();
System.out.println("over");
}
}

它的輸出為

do thing ..
hello
over

修改不可見。讀線程無法感知到其它線程寫入的值

常見場景:

1. 重排序。在沒有同步的情況下,編譯器、處理器以及運行時等都有可能對操作的執行順序進行調整,即寫的代碼順序和真正的執行順序不一樣, 導致讀到的是一個失效的值

2. 讀取 long、double 等類型的變量。JVM 允許將一個 64 位的操作分解成兩個 32 位的操作,讀寫在不同的線程中時,可能讀到錯誤的高低位組合

常見方案:

加鎖。所有線程都能看到共享變量的最新值;
使用 Volatile 關鍵字聲明變量。只要對這個變量產生了寫操作,那麼所有的讀操作都會看到這個修改;
注意:Volatile 並不能保證操作的原子性,比如count++操作同樣有風險,它僅保證讀取時返回最新的值。使用的好處在於訪問 Volatile 變量並不會執行加鎖操作,也就不會阻塞線程。

不同步的情況下如何做到線程安全?

1. 線程封閉。即僅在單線程內訪問數據,線程封閉技術有以下幾種:

Ad-hoc 線程封閉。即靠自己寫程序來實現,比如保證程序只在單線程上對 volatile 進行 讀取-修改-寫入

棧封閉。所有的操作都發生執行線程的棧中,比如在方法中的一個局部變量

ThreadLocal 類。內部維護了每個線程和變量的一個獨立副本

1. 只讀共享。即使用不可變的對象。

使用 final 去修飾字段,這樣這個字段的 “值” 是不可改變的

注意 final 如果修飾的是一個對象引用,比如 set, 它本身包含的值是可變的

創建一個不可變的類,來包含多個可變的數據。

class OneValue{
//創建不可變對象,創建之後無法修改,事實上這裏也沒有提供修改的方法
private final BigInteger last;
private final BigInteger[] lastfactor;
public OneValue(BigInteger i,BigInteger[] lastfactor){
this.last=i;
this.lastfactor=Arrays.copy(lastfactor,lastfactor.length);
}
public BigInteger[] getF(BigInteger i){
if(last==null || !last.equals(i)){
return null;
}else{
return Arrays.copy(lastfactor,lastfactor.length)
}
}
}
class MyService {
//volatile使得cache一經更改,就能被所有線程感知到
private volatile OneValue cache=new OneValue(null,null);
public void handle(BigInteger i){
BigInteger[] lastfactor=cache.getF(i);
if(lastfactor==null){
lastfactor=factor(i);
//每次都封裝最新的值
cache=new OneValue(i,lastfactor)
}
nextHandle(lastfactor)
}
}

如何構造線程安全的類?

實例封閉。將一個對象封裝到另一個對象中,這樣能夠訪問被封裝對象的所有代碼路徑都是已知的,通過合適的加鎖策略可以確保被封裝對象的訪問是線程安全的。

java 中的 Collections.synchronizedList 使用的原理就是這樣。部分代碼為

public static

List

synchronizedList(List

list) {
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));

SynchronizedList 的實現, 注意此處用到的 mutex 是內置鎖

static class SynchronizedList
extends SynchronizedCollection
implements List
private static final long serialVersionUID = -7754090372962971524L;
final List

{

list;
public E get(int index) {
synchronized (mutex) {return list.get(index);}
}
public E set(int index, E element) {
synchronized (mutex) {return list.set(index, element);}
}
public void add(int index, E element) {
synchronized (mutex) {list.add(index, element);}
}
public E remove(int index) {
synchronized (mutex) {return list.remove(index);}
}
}

mutex 的實現

static class SynchronizedCollection
private static final long serialVersionUID = 3053995032091335093L;
final Collection

implements Collection

, >Serializable {

c; // Backing Collection
final Object mutex; // Object on which to synchronize
SynchronizedCollection(Collection
if (c==null)
throw new NullPointerException();
this.c = c;
mutex = this; // mutex實際上就是對象本身
}

c) {

什麼是監視器模式

java 的監視器模式,將對象所有可變狀態都封裝起來,並由對象自己的內置鎖來保護, 即是一種實例封閉。比如 HashTable 就是運用的監視器模式。它的 get 操作就是用的 synchronized,內置鎖,來實現的線程安全

public synchronized V get(Object key) {
Entry tab[] = table;
int hash = hash(key);
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry

e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
return e.value;
}
}
return null;
}

內置鎖

每個對象都有內置鎖。內置鎖也稱為監視器鎖。或者可以簡稱為監視器線程執行一個對象的用 synchronized 修飾的方法時,會自動的獲取這個對象的內置鎖,方法返回時自動釋放內置鎖,執行過程中就算拋出異常也會自動釋放。以下兩種寫法等效:

synchronized void myMethdo(){
//do something
}
void myMethdo(){
synchronized(this){
//do somthding
}
}

官方文檔

私有鎖

public class PrivateLock{
private Object mylock = new Object(); //私有鎖
void myMethod(){
synchronized(mylock){
//do something
}
}
}

它也可以用來保護對象,相對內置鎖,優勢在於私有鎖可以有多個,同時可以讓客户端代碼顯示的獲取私有鎖

類鎖

在 staic 方法上修飾的,一個類的所有對象共用一把鎖

1. 把線程安全性委託給線程安全的類

如果一個類中的各個組件都是線程安全的,該類是否要處理線程安全問題?

視情況而定。

1. 只有單個組件,且它是線程安全的。

public class DVT{
private final ConcurrentMap

locations;
private final Map

unmodifiableMap;
public DVT(Map{
locations=new ConcurrentHashMap

points)

(points);
unmodifiableMap=Collections.unmodifiableMap(locations);
}
public Map{
return unmodifiableMap;
}
public Point getLocation(String id){
return locations.get(id);
}
public void setLocation(String id,int x,int y){
if(locations.replace(id,new Point(x,y))==null){
throw new IllegalArgumentException("invalid "+id);
}
}
}
public class Point{
public final int x,y;
public Point(int x,int y){
this.x=x;
this.y=y;
}
}

getLocations()

線程安全性分析

Point 類本身是無法更改的,所以它是線程安全的,DVT 返回的 Point 方法也是線程安全的

DVT 的方法 getLocations 返回的對象是不可修改的,是線程安全的

setLocation 實際操作的是 ConcurrentHashMap 它也是線程安全的

綜上,DVT 的安全交給了‘locations’,它本身是線程安全的,DVT 本身雖沒有任何顯示的同步,也是線程安全。這種情況下,就是 DVT 的線程安全實際是委託給了‘locations’, 整個 DVT 表現出了線程安全。

1. 線程安全性委託給了多個狀態變量只要多個狀態變量之間彼此獨立,組合的類並不會在其包含的多個狀態變量上增加不變性。依賴的增加則無法保證線程安全

public class NumberRange{
private final AtomicInteger lower = new AtomicInteger(0);
private final AtomicInteger upper = new AtomicInteger(0);
public void setLower(int i){
//先檢查後執行,存在隱患
if (i>upper.get(i)){
throw new IllegalArgumentException('can not ..');
}
lower.set(i);
}
public void setUpper(int i){
//先檢查後執行,存在隱患
if(i

throw new IllegalArgumentException('can not ..');
}
upper.set(i);
}
}

setLower 和 setUpper 都是‘先檢查後執行’的操作,但是沒有足夠的加鎖機制保證操作的原子性。假設原始範圍是 (0,10), 一個線程調用 setLower(5), 一個設置 setUpper(4) 錯誤的執行時序將可能導致結果為(5,4)

如何對現有的線程安全類進行擴展?

假設需要擴展的功能為 ‘沒有就添加’。

1. 直接修改原有的代碼。但通常沒有辦法修改源代碼

2. 繼承。繼承原有的代碼,添加新的功能。但是同步策略保存在兩份文件中,如果底層同步策略變更,很容易出問題

3. 組合。將類放入一個輔助類中,通過輔助類的操作代碼。比如擴展 Collections.synchronizedList。期間需要注意鎖的機制,錯誤方式為

public class ListHelper
public List

{

list=Collections.synchronizedList(new ArrayList

());
...
public synchronized boolean putIfAbsent(E x){
boolean absent = !list.contains(x);
if(absent){
list.add(x);
}
return absent;
}
}

這裏的 putIfAbsent 並不能帶來線程安全,原因是 list 的內置鎖並不是 ListHelper, 也就是 putIfAbsent 相對 list 的其它方法並不是原子的。Collections.synchronizedList 是鎖在 list 本身的,正確方式為

public boolean putIfAbsent(E x){
synchronized(list){
boolean absent = !list.contains(x);
if(absent){
list.add(x);
}
return absent;
}
}

另外可以不管要操作的類是否是線程安全,對類統一添加一層額外的鎖。實現參考 Collections.synchronizedList 方法