在使用缓存时,容易发生缓存击穿。
缓存击穿:一个存在的key,在缓存过期的瞬间,同时有大量的请求过来,造成所有请求都去读dB,这些请求都会击穿到DB,造成瞬时DB请求量大、压力骤增。
singleflight
介绍
import "golang.org/x/sync/singleflight"
singleflight类的使用方法就新建一个singleflight.Group,使用其方法Do或者DoChan来包装方法,被包装的方法在对于同一个key,只会有一个协程执行,其他协程等待那个协程执行结束后,拿到同样的结果。
Group结构体
代表一类工作,同一个group中,同样的key同时只能被执行一次。
Do方法
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
key:同一个key,同时只有一个协程执行。
fn:被包装的函数。
v:返回值,即执行的结果。其他等待的协程都会拿到。
shared:表示是否有其他协程得到了这个结果v。
DoChan方法
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
与Do方法一样,只是返回的是一个channel,执行结果会发送到channel中,其他等待的协程都可以从channel中拿到结果。
ref:https://godoc.org/golang.org/x/sync/singleflight
示例
使用Do方法来模拟,解决缓存击穿的问题
func main() { var singleSetCache singleflight.Group getAndSetCache:=func (requestID int,cacheKey string) (string, error) { log.Printf("request %v start to get and set cache...",requestID) value,_, _ :=singleSetCache.Do(cacheKey, func() (ret interface{}, err error) {//do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB log.Printf("request %v is setting cache...",requestID) time.Sleep(3*time._Second_) log.Printf("request %v set cache success!",requestID) return "VALUE",nil }) return value.(string),nil } cacheKey:="cacheKey" for i:=1;i<10;i++{//模拟多个协程同时请求 go func(requestID int) { value,_:=getAndSetCache(requestID,cacheKey) log.Printf("request %v get value: %v",requestID,value) }(i) } time.Sleep(20*time._Second_) }
输出:
2020/04/12 18:18:40 request 4 start to get and set cache...
2020/04/12 18:18:40 request 4 is setting cache...
2020/04/12 18:18:40 request 2 start to get and set cache...
2020/04/12 18:18:40 request 7 start to get and set cache...
2020/04/12 18:18:40 request 5 start to get and set cache...
2020/04/12 18:18:40 request 1 start to get and set cache...
2020/04/12 18:18:40 request 6 start to get and set cache...
2020/04/12 18:18:40 request 3 start to get and set cache...
2020/04/12 18:18:40 request 8 start to get and set cache...
2020/04/12 18:18:40 request 9 start to get and set cache...
2020/04/12 18:18:43 request 4 set cache success!
2020/04/12 18:18:43 request 4 get value: VALUE
2020/04/12 18:18:43 request 9 get value: VALUE
2020/04/12 18:18:43 request 6 get value: VALUE
2020/04/12 18:18:43 request 3 get value: VALUE
2020/04/12 18:18:43 request 8 get value: VALUE
2020/04/12 18:18:43 request 1 get value: VALUE
2020/04/12 18:18:43 request 5 get value: VALUE
2020/04/12 18:18:43 request 2 get value: VALUE
2020/04/12 18:18:43 request 7 get value: VALUE`
可以看到确实只有一个协程执行了被包装的函数,并且其他协程都拿到了结果。
源码分析
看一下这个Do方法是怎么实现的。
首先看一下Group的结构:
type Group struct { mu sync.Mutex m map[string]*call //保存key对应的函数执行过程和结果的变量。 }
Group的结构非常简单,一个锁来保证并发安全,另一个map用来保存key对应的函数执行过程和结果的变量。
看下call的结构:
type call struct { wg sync.WaitGroup //用WaitGroup实现只有一个协程执行函数 val interface{} //函数执行结果 err error forgotten bool dups int //含义是duplications,即同时执行同一个key的协程数量 chans []chan<- Result }
看下Do方法
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock()//写Group的m字段时,加锁保证写安全。 if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok {//如果key已经存在,说明已经有协程在执行,则dups++,并等待其执行完毕后,返回其执行结果,执行结果保存在对应的call的val字段里 c.dups++ g.mu.Unlock() c.wg.Wait() return c.val, c.err, true } //如果key不存在,则新建一个call,并使用WaitGroup来阻塞其他协程,同时在m字段里写入key和对应的call c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() g.doCall(c, key, fn)//第一个进来的协程来执行这个函数 return c.val, c.err, c.dups > 0 }
继续看下g.doCall里具体干了什么
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { c.val, c.err = fn()//执行被包装的函数 c.wg.Done()//执行完毕后,就可以通知其他协程可以拿结果了 g.mu.Lock() if !c.forgotten {//其实这里是为了保证执行完毕之后,对应的key被删除,Group有一个方法Forget(key string),可以用来主动删除key,这里是判断那个方法是否被调用过,被调用过则字段forgotten会置为true,如果没有被调用过,则在这里把key删除。 delete(g.m, key) } for _, ch := range c.chans {//将执行结果发送到channel里,这里是给DoChan方法使用的 ch <- Result{c.val, c.err, c.dups > 0} } g.mu.Unlock() }
由此看来,其实现是非常简单的。不得不赞叹一百来行代码就实现了功能。
其他
顺便附上DoChan方法的使用示例:
func main() { var singleSetCache singleflight.Group getAndSetCache:=func (requestID int,cacheKey string) (string, error) { log.Printf("request %v start to get and set cache...",requestID) retChan:=singleSetCache.DoChan(cacheKey, func() (ret interface{}, err error) { log.Printf("request %v is setting cache...",requestID) time.Sleep(3*time._Second_) log.Printf("request %v set cache success!",requestID) return "VALUE",nil }) var ret singleflight.Result timeout := time.After(5 * time._Second_) select {//加入了超时机制 case <-timeout: log.Printf("time out!") return "",errors.New("time out") case ret =<- retChan://从chan中取出结果 return ret.Val.(string),ret.Err } return "",nil } cacheKey:="cacheKey" for i:=1;i<10;i++{ go func(requestID int) { value,_:=getAndSetCache(requestID,cacheKey) log.Printf("request %v get value: %v",requestID,value) }(i) } time.Sleep(20*time._Second_) }
看下DoChan的源码
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch)//可以看到,每个等待的协程,都有一个结果channel。从之前的g.doCall里也可以看到,每个channel都给塞了结果。为什么不所有协程共用一个channel?因为那样就得在channel里塞至少与协程数量一样的结果数量,但是你却无法保证用户一个协程只读取一次。 g.mu.Unlock() return ch } c := &call{chans: []chan<- Result{ch}} c.wg.Add(1) g.m[key] = c g.mu.Unlock() go g.doCall(c, key, fn) return ch }
稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!
昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。
这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。
而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?
更新动态
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]