使用Golang編寫Cache

學習如何使用golang從0編寫cache系統


前言

為什麼需要緩存?

  • 請求網頁時,網頁會將第一次請求的內容存入緩存中(通常為key-value),以後再次訪問時,就可以直接從緩存中提取,從而降低載入時間

本地緩存 vs 分布式緩存

  • 本地緩存
    • 這邊的本地緩存通常指的為應用中的緩存組件,最大的優點就是應用和cache在同一個進程裡面,不會發生過多的網路開銷.缺點就是沒辦法共享緩存,個應用或集群都要維護自己的緩存,從而造成大量的開銷
  • 分布式緩存
    • 指的是應用和組件分離的緩存服務,優點就是緩存為一個獨立的應用,和本地分離,因此可以直接共享緩存

LRU緩存淘汰政策

  • 核心思想就是維護一個隊列,如果該條紀錄已經被訪問過了,就會到隊尾,那麼隊首變為最少訪問的紀錄,可以直接淘汰
  • 使用go語言中的container/list當作map中的值,該list為一個雙向隊列
  • entry是雙向隊列的數據,這麼做的目的是當要淘汰隊列的首節點時,使用key從字典中映射即可
type Cache struct {
	maxBytes  int64
	nbytes    int64
	ll        *list.List
	cache     map[string]*list.Element
	onEvicted func(key string, value Value)
}
 
type entry struct {
	key   string
	value Value
}
 
type Value interface {
	Len() int
}
 
func (c *Cache) Len() int {
	return c.ll.Len()
}
 
func New(maxBytes int64, onEvicted func(string, Value)) *Cache {
	return &Cache{
		maxBytes:  maxBytes,
		ll:        list.New(),
		cache:     make(map[string]*list.Element),
		onEvicted: onEvicted,
	}
} 

查找功能

  • 如果鍵對應的值存在,將該節點移動到隊尾,並返回值
  • c.ll.MoveToFront(ele) 即為將值移動到隊尾(在這裡的front代表隊尾)
func (c *Cache) Get(key string) (value Value, ok bool) {
	if ele, ok := c.cache[key]; ok {
		c.ll.MoveToFront(ele)
		kv := ele.Value.(*entry)
		return kv.value, true
	}
	return
} 

刪除功能

  • 實際上為淘汰最少訪問的首位節點,從linklist中刪除
  • 使用delete從字典中刪除該key
  • nbytes更新為當前正確的數值
  • 如果onEvicted不為nil的話,調用該函數
func (c *Cache) RemoveOldest() {
	ele := c.ll.Back()
	if ele != nil {
		c.ll.Remove(ele)
		kv := ele.Value.(*entry)
		delete(c.cache, kv.key)
		c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len())
		if c.onEvicted != nil {
			c.onEvicted(kv.key, kv.value)
		}
	}
}

新增

  • 如果key存在的話,更新對應的值,並將該節點移動到隊尾
  • 不存在的話首先在隊尾新增該節點,並且新增和map的映射關係
  • 更新nbytes後如果超過最大值的話,調用Remove函數
func (c *Cache) Add(key string, value Value) {
	if ele, ok := c.cache[key]; ok {
		c.ll.MoveToFront(ele)
		kv := ele.Value.(*entry)
		c.nbytes -= int64(len(kv.key)) + int64(kv.value.Len())
		kv.value = value
	} else {
		ele := c.ll.PushFront(&entry{key, value})
		c.cache[key] = ele
		c.nbytes += int64(len(key)) + int64(value.Len())
	}
	for c.maxBytes != 0 && c.maxBytes < c.nbytes {
		c.RemoveOldest()
	}
}

分布式緩存算法

一至性哈希

  • 將key映射到2^32的空間中,將這個數字首尾相連,行程一個環
    • 計算節點/機器的哈希值,放置在環上
    • 計算key的哈希值,放置在環上,順時針尋找到的第一個節點,就是應選取的節點/機器

傾斜問題

  • 如果服務器節點過少,很容易都對應到同個節點,造成環存節點間負載不均衡
  • 使用虛擬節點的概念解決
    • 計算虛擬節點上的Hash值,放置在環上
    • 計算key的Hash值,在環上順時針尋找對應選取得虛擬節點例如peer2-1對應的就是真實節點peer2

實現一至性哈希

初始化

  • 定義函數類型的Hash,採取依賴注入的方式,允許用來替換成自定義的Hash函數,默認為ChecksumIEEE算法
  • Map是一至性哈希算法的主數據結構,包含
    1. Hash函數 hash
    2. 虛擬節點倍數 replicas
    3. 虛擬節點和真實節點的映射表 hashMap,key是虛擬節點的哈希值,值是真實節點的名稱
  • 構造函數New允許自定義虛擬節點倍數和Hash函數
type Hash func(data []byte) uint32
 
// Map constains all hashed keys
type Map struct {
	hash     Hash
	replicas int
	keys     []int // Sorted
	hashMap  map[int]string
}
 
// New creates a Map instance
func New(replicas int, fn Hash) *Map {
	m := &Map{
		replicas: replicas,
		hash:     fn,
		hashMap:  make(map[int]string),
	}
	if m.hash == nil {
		m.hash = crc32.ChecksumIEEE
	}
	return m
} 

新增真實節點/機器

  • Add函數允許傳入0或多個真實節點的名稱
  • 對每一個真實節點key,對應創建m.replicas個虛擬節點
  • 使用m.hash()計算虛擬節點的哈希值,再使用append添加到環上
  • 在hashMap中增加虛擬節點和真實節點的映射關西
  • 依照環上哈希排序
func (m *Map) Add(keys ...string) {
	for _, key := range keys {
		for i := 0; i < m.replicas; i++ {
			hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
			m.keys = append(m.keys, hash)
			m.hashMap[hash] = key
		}
	}
	sort.Ints(m.keys)
}

選擇節點

  • 計算key的哈希值
  • 順時針找到第一個匹配虛擬節點的下標idx,從m.keys中或去對應的哈希值,如果len相同,說明選擇第一個,因為keys是一個環的結構
  • 透過hashMap映射得到真實節點
func (m *Map) Get(key string) string {
	if len(m.keys) == 0 {
		return ""
	}
 
	hash := int(m.hash([]byte(key)))
	// Binary search for appropriate replica.
	idx := sort.Search(len(m.keys), func(i int) bool {
		return m.keys[i] >= hash
	})
 
	return m.hashMap[m.keys[idx%len(m.keys)]]
} 

緩存問題和解決方案

  • 緩存雪崩
    • 緩存在同一時刻全部失效,遭成瞬間DB請求量大,壓力驟增引起雪崩.通常因為緩存服務器當機,緩存的key設置相同過期時間導致
  • 緩存擊穿
    • 一個存在的key在緩存過期的那一刻,同時有大量的請求,這些請求都會擊穿DB,造成順時DB請求量大,壓力驟增
  • 緩存穿透
    • 查詢一個不存在的數據,因為不存在則不會寫入緩存,所以美以都會請求DB,造成瞬間流量過大,穿透DB造成當機

實現解決方法

  • 創建call和group類型
  • call代表正在進行或已經結束的請求
  • Group管理不同key的請求
type call struct {
	wg  sync.WaitGroup
	val interface{}
	err error
}
 
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call
}
  • Do方法的作用為針對相同key,無論Do被調用多少次,函數fn都只會被調用一次,等待fn調用結束,return返回值或error
  • 使用延遲初始化g.m以提高內存使用效率
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		g.mu.Unlock()
		c.wg.Wait()
		return c.val, c.err
	}
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()
 
	c.val, c.err = fn()
	c.wg.Done()
 
	g.mu.Lock()
	delete(g.m, key)
	g.mu.Unlock()
 
	return c.val, c.err
}

重點地方

  • g.mu是保護Group的成員變量m不被併發讀寫加上鎖
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
	if c, ok := g.m[key]; ok {
		c.wg.Wait()   //如果請求正在進行中則等待
		return c.val, c.err //請求結束返回結果 
	}
	c := new(call)
	c.wg.Add(1)       //發起請求前保護鎖
	g.m[key] = c      //添加g.m,表明key已經有對應的請求在處理
 
	c.val, c.err = fn() //調用fn發起請求
	c.wg.Done()         //請求結束
 
    delete(g.m, key)    //更新 g.m
    
	return c.val, c.err //返回结果
}

使用

  • 修改geecache中的Group,他加成員變量loader並更新構建函數NewGroup
  • 修改load函數,將原來的load邏輯使用g.loader.Do包裹,確保併發場景下針對同個key,load只會調用一次
type Group struct {
	name      string
	getter    Getter
	mainCache cache
	peers     PeerPicker
	// use singleflight.Group to make sure that
	// each key is only fetched once
	loader *singleflight.Group
}
 
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
    // ...
	g := &Group{
        // ...
		loader:    &singleflight.Group{},
	}
	return g
}
 
func (g *Group) load(key string) (value ByteView, err error) {
	// each key is only fetched once (either locally or remotely)
	// regardless of the number of concurrent callers.
	viewi, err := g.loader.Do(key, func() (interface{}, error) {
		if g.peers != nil {
			if peer, ok := g.peers.PickPeer(key); ok {
				if value, err = g.getFromPeer(peer, key); err == nil {
					return value, nil
				}
				log.Println("[GeeCache] Failed to get from peer", err)
			}
		}
 
		return g.getLocally(key)
	})
 
	if err == nil {
		return viewi.(ByteView), nil
	}
	return
}

使用Protocol通信

syntax = "proto3";
 
package geecachepb;
 
option go_package = "./";
 
message Request {
  string group = 1;
  string key = 2;
}
 
message Response {
  bytes value = 1;
}
 
service GroupCache {
  rpc Get(Request) returns (Response);
}
 
//protoc --go_out=. *.proto

使用

func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // ...
	// Write the value to the response body as a proto message.
	body, err := proto.Marshal(&pb.Response{Value: view.ByteSlice()})
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.Header().Set("Content-Type", "application/octet-stream")
	w.Write(body)
}
 
func (h *httpGetter) Get(in *pb.Request, out *pb.Response) error {
	u := fmt.Sprintf(
		"%v%v/%v",
		h.baseURL,
		url.QueryEscape(in.GetGroup()),
		url.QueryEscape(in.GetKey()),
	)
    res, err := http.Get(u)
	// ...
	if err = proto.Unmarshal(bytes, out); err != nil {
		return fmt.Errorf("decoding response body: %v", err)
	}
 
	return nil
}