package network import ( "encoding/json" "log" "sync" "time" "github.com/gorilla/websocket" ) type ClientConfig struct { ServerURL string SendInterval time.Duration BufferSize int } type WSClient struct { config ClientConfig conn *websocket.Conn sendChan chan Packet mu sync.Mutex isConnected bool stopChan chan struct{} handler MessageHandler reconnectDelay time.Duration // 指数退避延迟 } func NewWSClient(cfg ClientConfig) *WSClient { if cfg.BufferSize == 0 { cfg.BufferSize = 100 } return &WSClient{ config: cfg, sendChan: make(chan Packet, cfg.BufferSize), stopChan: make(chan struct{}), reconnectDelay: 1 * time.Second, // 初始延迟 1秒 } } // SetHandler 设置消息处理器 func (c *WSClient) SetHandler(handler MessageHandler) { c.handler = handler } func (c *WSClient) Start() { go c.connectionLoop() go c.sendLoop() } func (c *WSClient) SendQueue(packet Packet) { select { case c.sendChan <- packet: default: log.Printf("[网络] 发送队列已满,丢弃消息: %s", packet.Type) } } func (c *WSClient) Stop() { close(c.stopChan) if c.conn != nil { c.conn.Close() } } func (c *WSClient) sendLoop() { for { select { case <-c.stopChan: return case packet := <-c.sendChan: c.sendRaw(packet) } } } func (c *WSClient) sendRaw(packet Packet) { c.mu.Lock() defer c.mu.Unlock() if !c.isConnected || c.conn == nil { log.Printf("[网络] 无连接,无法发送消息: %s", packet.Type) return } c.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) payload, _ := json.Marshal(packet) if err := c.conn.WriteMessage(websocket.TextMessage, payload); err != nil { log.Printf("[网络] 发送消息失败: %v", err) c.isConnected = false } } func (c *WSClient) connectionLoop() { const ( initialDelay = 1 * time.Second maxDelay = 60 * time.Second backoffFactor = 2.0 ) for { select { case <-c.stopChan: return default: if !c.isConnected { if err := c.connect(); err != nil { log.Printf("[网络] 连接 %s 失败: %v. %v后重试...", c.config.ServerURL, err, c.reconnectDelay) // 等待退避时间 time.Sleep(c.reconnectDelay) // 计算下一次退避时间(指数增长,最大60秒) c.reconnectDelay = time.Duration(float64(c.reconnectDelay) * backoffFactor) if c.reconnectDelay > maxDelay { c.reconnectDelay = maxDelay } continue } // 连接成功,重置退避时间 c.reconnectDelay = initialDelay } _, message, err := c.conn.ReadMessage() if err != nil { log.Printf("[网络] 连接断开: %v", err) c.closeConn() // 重置退避时间,准备重新连接 c.reconnectDelay = initialDelay continue } // 处理服务器消息 c.handleServerMessage(message) } } } // handleServerMessage 处理服务器消息 func (c *WSClient) handleServerMessage(message []byte) { var packet Packet if err := json.Unmarshal(message, &packet); err != nil { log.Printf("[网络] JSON解析失败: %v", err) return // 丢弃畸形消息,不断开连接 } if c.handler == nil { log.Printf("[网络] 收到服务器消息但未设置处理器: %s", packet.Type) return } payload, ok := packet.Payload.(map[string]interface{}) if !ok { log.Printf("[网络] 无法解析消息载荷: %v", packet.Payload) return } switch packet.Type { case TypeConfigUpdate: c.handler.HandleConfigUpdate(payload) case TypeTaskScan: c.handler.HandleTaskScan(payload) case TypeTaskStop: c.handler.HandleTaskStop(payload) case TypeCommandResponse: // 命令响应,可用于确认 log.Printf("[网络] 收到命令响应: code=%d", packet.Code) default: log.Printf("[网络] 未知消息类型: %s", packet.Type) } } func (c *WSClient) closeConn() { c.mu.Lock() defer c.mu.Unlock() if c.conn != nil { c.conn.Close() c.conn = nil } c.isConnected = false } func (c *WSClient) connect() error { conn, _, err := websocket.DefaultDialer.Dial(c.config.ServerURL, nil) if err != nil { return err } c.mu.Lock() defer c.mu.Unlock() select { case <-c.stopChan: conn.Close() return nil default: } c.conn = conn c.isConnected = true log.Printf("[网络] 成功连接到服务器: %s", c.config.ServerURL) return nil }