推送服务开发过程的问题总结
Posted June 10, 2022 by Yusank ‐ 5 min read
在开发开发 push service 的过程中,有一大部分时间花在 websocket 的使用和连接管理上,期间遇到了一些问题和挑战,这里做一个总结
前言
push server 的开发是在做 goim 的前期的重头戏,其核心就是长链接(本次用 websocket)的管理和与业务相互结合.
websocket 算是一个比较久远成熟的长链接方案况且有成熟的框架(本项目使用的 gorilla/websocket)支持,按理来说,它的使用应该是比较简单的,但是它的使用过程中,有一些问题和挑战,这里做一个总结.
而遇到的问题大部分是在使用和与业务的整合上,大致分为一下几点:
- 事件管理(如 ping pong,心跳,消息等)
- 连接管理
- 消息推送
事件管理
在建立起连接之后,需要对一些 ws 消息类型做不同的处理,如 ping/pong 时,连接管理层面刷新一些连接最后活跃时间已确保连接可用,而业务层面需要对该连接对应的用户 的 last_online_time 信息进行刷新动作,又比如 close 事件也是连接管理层面和业务层面各自需要做处理.
虽说框架提供了注册事件处理器的机制,但是不足以满足需求.连接层是纯粹管理长链接不依赖业务逻辑(且连接层与业务是分开的项目代码),因此没办法在连接层直接引入逻辑层的逻辑去注册相关处理函数。而业务层虽说可以引入连接层的逻辑,但是不同业务处理的方式不一样很难统一管理。
以下为原始注册方法,只能注册单个方法,如果需要有多个处理函数则需要手动嵌套:
1// 连接层注册连接层的事件处理器
2conn.SetCloseHandler(func(code int, text string) error {
3 wc.cancelWithError(nil)
4 message := websocket.FormatCloseMessage(code, "")
5 _ = wc.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
6 return nil
7})
8
9conn.SetPingHandler(func(message string) error {
10 err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
11 if err == nil || err == websocket.ErrCloseSent {
12 return nil
13 }
14
15 return err
16})
为了解决这个问题,很明显框架提供的原始的结构体的能力是不够,需要进行封装扩展。
1type WebsocketConn struct {
2 conn *websocket.Conn // 原始连接
3
4 // ctx control
5 ctx context.Context
6 cancel context.CancelFunc
7
8 uid string
9}
10
11
12func WrapWs(ctx context.Context, c *websocket.Conn, uid string) *WebsocketConn {
13 if ctx == nil {
14 ctx = context.Background()
15 }
16 ctx2, cancel := context.WithCancel(ctx)
17 wc := &WebsocketConn{
18 conn: c,
19 ctx: ctx2,
20 cancel: cancel,
21 uid: uid,
22 }
23
24 wc.conn.SetCloseHandler(func(code int, text string) error {
25 wc.cancelWithError(nil)
26 message := websocket.FormatCloseMessage(code, "")
27 _ = wc.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
28 return nil
29 })
30
31 wc.conn.SetPingHandler(func(message string) error {
32 err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
33 if err == nil || err == websocket.ErrCloseSent {
34 return nil
35 }
36
37 return err
38 })
39
40 return wc
41}
42
43// 通过该方法注册业务的处理方法
44func (w *WebsocketConn) AddCloseAction(f func() error) {
45 cf := w.conn.CloseHandler()
46 w.conn.SetCloseHandler(func(code int, text string) error {
47 err := cf(code, text)
48 if err == nil {
49 return f()
50 }
51
52 return err
53 })
54}
55
56// 通过该方法注册业务的处理方法
57func (w *WebsocketConn) AddPingAction(f func() error) {
58 pf := w.conn.PingHandler()
59 w.conn.SetPingHandler(func(appData string) error {
60 err := pf(appData)
61 if err == nil {
62 return f()
63 }
64
65 return err
66 })
67}
在连接层将原始 conn 进行一层封装,逻辑层能取到封装后的连接,可根据业务需求注册任意数量的事件处理器。而连接层在封装连接时就将连接层需要的处理方法注册进去了。
现在看一下业务层的使用方式:
1// 建立连接并封装后注册业务处理函数
2wc.AddPingAction(func() error {
3 return app.GetApplication().Redis.SetEX(context.Background(),
4 consts.GetUserOnlineAgentKey(uid), app.GetAgentIP(), consts.UserOnlineAgentKeyExpire).Err()
5})
6// 建立连接并封装后注册业务处理函数
7wc.AddCloseAction(func() error {
8 ctx2, cancel := context.WithTimeout(ctx, time.Second)
9 defer cancel()
10 return app.GetApplication().Redis.Del(ctx2, consts.GetUserOnlineAgentKey(uid)).Err()
11
12})
这样就把业务和连接管理拆分开互不影响,连接层不关心上层业务怎么处理,只关心连接的正常可用即可。
连接管理
连接管理也是花心思比较多的模块,由于 websocket 的连接不同于普通的连接,因此没办法用普通的连接池来管理(其实我已经有一套可用的连接池的方案了yusank/conn-pool,可惜了)。
websocket 的每个连接都是唯一的且需要根据一些唯一的值(UID,sessionID 等)来拿到对应的连接,这些连接也不是连接池初始化出来的,是外部传进去让连接池去维护这些连接。
同时 websocket 连接随时可能断开,这种断开可能主动的也可能是连续几个心跳周期没有收到客户端的心跳包从而服务端主动断开的,不管是那种情况,在连接层需要有个 goroutine 去维护这个连接的可用性。在连接需要断开时,停止维护并从连接池删除该连接。
为了确保 WebsocketConn
的纯洁性,在维护连接模块里定义一个简单的连接
的概念,把 WebsocketConn
封装起来。
1type idleConn struct {
2 *WebsocketConn
3 p *namedPool // 连接池
4 stopChan chan struct{}
5}
6
7// close is different form stop
8// close is closes the connection and delete it from pool
9// stop is a trigger to stop the daemon then call the close
10func (i *idleConn) close() {
11 _ = i.Close() // i.WebsocketConn.Close()
12 i.p.delete(i.Key()) // i.WebsocketConn.Key() => uid
13}
14
15func (i *idleConn) stop() {
16 i.stopChan <- struct{}{}
17}
18
19func (i *idleConn) daemon() {
20 ticker := time.NewTicker(config.Interval * time.Second)
21loop:
22 for {
23 select {
24 case <- ticker.C:
25 // check last ping time and decide whether stop daemon.
26 case <-i.ctx.Done():
27 // WebsocketConn ctx cancelled
28 log.Error("conn done", "key", i.Key())
29 break loop
30 // stop() called
31 case <-i.stopChan:
32 log.Info("conn stop", "key", i.Key())
33 break loop
34 case data := <-i.writeChan:
35 i.writeToClient(data) // write msg to client
36 }
37 }
38
39 log.Info("conn daemon exit", "key", i.Key())
40 i.close()
41}
idleConn
能力相对纯粹,维护连接可用性和特殊情况下从连接池剔除当前连接。
下面看一下连接池的定义和实现:
1type namedPool struct {
2 *sync.RWMutex
3 m map[string]*idleConn
4}
5
6func newNamedPool() *namedPool {
7 p := &namedPool{
8 RWMutex: new(sync.RWMutex),
9 m: make(map[string]*idleConn),
10 }
11
12 return p
13}
14
15// 添加连接
16func (p *namedPool) add(c *WebsocketConn) {
17 select {
18 case <-c.ctx.Done():
19 return
20 default:
21 if c.Err() != nil {
22 return
23 }
24 }
25
26 p.Lock()
27 defer p.Unlock()
28 // 若已存在 则先停止上一个 daemon
29 i, loaded := p.m[c.Key()]
30 if loaded {
31 i.stop()
32 }
33
34 i = &idleConn{
35 WebsocketConn: c,
36 stopChan: make(chan struct{}),
37 p: p,
38 }
39
40 // TODO: 使用可控大小的 goroutine 池,防止goroutine 泄露
41 go i.daemon()
42 p.m[c.Key()] = i
43}
44
45func (p *namedPool) get(key string) *WebsocketConn {
46 p.RLock()
47 i, ok := p.m[key]
48 p.RUnlock()
49
50 if ok {
51 // 拿连接时 先判断是否已失效
52 select {
53 case <-i.ctx.Done():
54 i.stop()
55 default:
56 if i.Err() != nil {
57 i.stop()
58 return nil
59 }
60 return i.WebsocketConn
61 }
62 }
63
64 return nil
65}
66
67// 需要读取所有连接做全量推送的需求
68func (p *namedPool) loadAllConns() chan *WebsocketConn {
69 p.Lock()
70 defer p.Unlock()
71
72 ch := make(chan *WebsocketConn, len(p.m))
73 for _, i := range p.m {
74 ch <- i.WebsocketConn
75 }
76
77 close(ch)
78 return ch
79}
80
81func (p *namedPool) delete(key string) {
82 p.Lock()
83 defer p.Unlock()
84 delete(p.m, key)
85}
经过 idleConn
和 namedPool
配合,连接管理算是达到预期的要求了且通过了测试,后期可以做一些优化项。
消息推送
在解决了非业务层的各种问题后,现在只剩下消息推送的问题了。消息推送最开始其实直接调用的框架提供的原生方法 WriteMessage
,即需要推送时,从连接池拿到对应的连接然后直接调用写消息的方法,但是有大量消息需要写入时可能会出现消息顺序异常的问题,因此在 WebsocketConn
层屏蔽了原生的方法,并添加一个消息 channel
来控制并发带来的消息顺序异常问题。
新增消息 channel
:
1 conn *websocket.Conn
2
3 ctx context.Context
4 cancel context.CancelFunc
5
6 uid string
7 // 消息 channel
8 writeChan chan []byte
9 // 无法写到客户端时回调该方法 上层一般会进行重试或写到 mq 待会儿进行处理
10 onWriteError func()
11 err error
12}
新的写消息方法:
1func (w *WebsocketConn) Write(data []byte) error {
2 select {
3 case w.writeChan <- data:
4 return nil
5 default:
6 }
7
8 // 尝试等待一小段时间,如果这段时间内写入失败则直接报错,这种情况下大概率是服务端与客户端的连接异常,消息阻塞了
9 timer := time.NewTimer(time.Millisecond * 10)
10 select {
11 case w.writeChan <- data:
12 return nil
13 case <-timer.C:
14 return ErrWriteChanFull
15 }
16}
写入到 writeChan
后,由上述 idleConn.daemon
方法读取 channel
内消息后,调用以下方法来正式写入的ws 上:
1func (w *WebsocketConn) writeToClient(data []byte) {
2 _ = w.conn.SetWriteDeadline(time.Now().Add(time.Second))
3 err := w.conn.WriteMessage(websocket.TextMessage, data)
4 if err != nil {
5 w.onWriteError()
6 return
7 }
8}
至此,消息的推送也解决了已遇到的问题。
总结
本文记录了在开发 goim 的推送服务(长连接服务)时遇到的一些问题和解决方案,方便理解在看相关代码时其背后的思考。
内容分为三个部分:
- websocket 连接中的一些事件处理和如何与业务关联起来
- websocket 连接池的维护
- 在聊天系统中确保消息推送的顺序