WaitGroup

從源碼分析 wait group


介紹WaitGroup

使用WaitGroup的三個重要方法

  1. Add(delta int) 給WaitGroup的計數器增加一個值,當WaitGroup的計數值減少到零時(這邊的計數是Wait的不是Add),阻塞在Wait上的Goroutine就不會阻塞,但計數器為負數時就會panic
  2. Done() 表示一個goroutine完成任務,WaitGroup減少1 (這邊是Add不是Wait)
  3. Wait() 此方法調用者會被阻塞,直到WaitGroup的計數值減小到0

WaitGroup標準使用

避免意外觸發panic

package main
 
import (
	"fmt"
	"sync"
)
 
func main(){
	var wg sync.WaitGroup
	wg.Add(3) //第一步
	for i := 0;i<3;i++{
		go func ()  {	
			defer wg.Done() //第二步,在goroutine內加上這個調用	
			...
		}()
	}
	wg.Wait() //第三步,在主goroutine內調用Wait
}

分析WaitGroup結構

type WaitGroup struct {
	noCopy noCopy
 
	state atomic.Uint64 // 高32位為計數器的值,低32位為waiter的數量
	sema  uint32 // 信號量
  • noCopy為輔助字段,主要用於輔助檢查工具vet
  • 使用atomic.Uint64保證64位對齊,所以state能夠紀錄計數器的值和waiter的數量
  • sema 用來喚醒waiter的信號量

分析 noCopy字段

type noCopy struct{}
 
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}
 

主要是讓noCopy字段實現Locke接口,因為noCopy是未輸出類型,因此不會調用到Lock或UnLock方法

分析Add方法

func (wg *WaitGroup) Add(delta int) {
	state := wg.state.Add(uint64(delta) << 32)
	v := int32(state >> 32)
	w := uint32(state)
	if v < 0 {
		panic("sync: negative WaitGroup counter") //計數器不該 < 0
	}
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	if v > 0 || w == 0 {
		return
	}
	if wg.state.Load() != state { //檢查是否還有waiter在等待,如果有不應該再併發調用Add
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// Reset waiters count to 0.
	wg.state.Store(0)
	for ; w != 0; w-- {
		runtime_Semrelease(&wg.sema, false, 0)
	}
}
  • state := wg.state.Add(uint64(delta) << 32) 計數器加delta值
  • v := int32(state >> 32) 右移32位,只保留計數器的值
  • w := uint32(state) waiter的數量
  • if w != 0 && delta > 0 && v == int32(delta) 檢查是否還有waiter在等待,如果有不應該再併發調用Add
  • wg.state.Store(0) 將計數器的值變為0,準備喚醒waiter

分析Wait

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
	for {
		state := wg.state.Load()
		v := int32(state >> 32)
		w := uint32(state)
		if v == 0 {
			// Counter is 0, no need to wait.
			return
		}
		// Increment waiters count.
		if wg.state.CompareAndSwap(state, state+1) {
			runtime_Semacquire(&wg.sema)
			if wg.state.Load() != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			return
		}
	}
}
 
  • 會先嘗試檢查計數器的值v
    • v為0:不會發生阻塞,直接返回
    • v不為0:原子操作state,把goroutine加入到waiter中
      • 成功:被阻塞喚醒
      • 失敗:進行循環檢查(因為可能同時有多個waiter調用Wait方法)
  • 如果阻塞的Waiter被喚醒,理論上state的值為0(從Add方法實現可以看到,先清0再喚醒waiter),那就直接返回就好,因為state為0代表計數器的值也為0