计时器在应用开发中经常碰到,如果只是有限个位数的计数器大多不会过分关注。但是如果遇到了几十个或是成百上千的计时器的场景,不对其进行管理,不但在性能上会有所损失,而且还存在协程泄露的风险。毕竟,在 Go 语言中单个计时器的实现是需要一个协程辅助实现的。所以,对于一堆计时器的应用场景,重新实现即时提醒功能是非常必要的。
既然是一堆计时器,必然存在一个先后顺序,所以,通过对计时信息的排序同时辅助一个可调计时时间的计时器就可以更加高效的完成原本是一堆计时器实现的场景。具体的实现原理图如下:
对每个计时器的计时时间进行一个简单的堆排序,每次取堆头作为下次计时的时间点。这样就可以通过一个堆排序和一个计时器完成之前一堆计时器的功能了。
接口
在接口定义上,使用了 x-mod 模块通用的定义方式:
package main
import (
"time"
"context"
"github.com/x-mod/heaptimer"//具体实现项目在这里
)
func main() {
timer := heaptimer.New()
//启动服务
go timer.Serve(context.TODO())
defer timer.Close()
//增加计时
timer.Push(value1, time.Now())
timer.Push(value2, time.Now().Add(time.Second))
timer.PushByDuration(value3, time.Second)
//触发计时
for v, ok := range timer.Pop() {
...
}
//OR 触发计时
for v, ok := range timer.C {
...
}
//抽干计时
for {
if v := timer.Drain(); v != nil {
log.Println("drain:", v)
} else {
break
}
}
}
实现
这样一个简单功能包在实现上没有太多难度,具体实现感兴趣的同学可以直接参考项目源码:x-mox/heaptimer.
这里主要记录一下实现过程中碰到的两个小功能点,加强一下记忆.
向一个关闭的 chan 通道中写数据会引发程序 panic
在实现代码中有这样一段代码:
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-tm.close.Done():
return nil
case _, ok := <-tm.timer.C:
if !ok { // timer closed
return nil
}
d := tm.duration
head := tm.heap.Head()
for head != nil && head.tm.Before(time.Now()) {
node := heap.Pop(tm.heap).(*Node)
//panic when tm.C closed
tm.C <- node.value
head = tm.heap.Head()
}
if head != nil {
d = head.tm.Sub(time.Now())
}
tm.timer.Reset(d)
}
}
这样一个循环语句中,程序可能会阻塞在 tm.C <- node.value
这句话上,等待 tm.C
完成消费。如果此时,关闭服务,触发Close(tm.C)
时,就会可能出现向一个关闭的 chan 通道中写数据的情况从而导致程序崩溃。
所以也就出现了下面需要关心的问题:
如何在 recover 代码中返回函数值
因为以上的程序崩溃的情形是无法在代码中规避,所以就必须在遇到类似情形时,进行恢复处理。所以,就会出现下面这样的恢复返回值的处理。
func (tm *Timer) Serve(ctx context.Context) (err error) {
defer func() {
tm.stopped.Fire()
if rc := recover(); rc != nil {
switch rv := rc.(type) {
case error:
err = rv.(error)
return
default:
err = fmt.Errorf("%v", rv)
return
}
}
}()
...
}
在 defer
函数中返回值,关键点就是使用命名返回值,即在 defer
函数中赋值命名返回值即可。