This repository has been archived on 2026-03-28. You can view files and clone it, but cannot push or open issues or pull requests.
old-sysmonitord/internal/network/client.go
2026-03-25 08:22:57 +08:00

240 lines
5.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package network
import (
"encoding/json"
"log"
"net/url"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/wuko233/sysmonitord/internal/agent"
)
type ClientConfig struct {
ServerURL string
AgentID string
SendInterval time.Duration
BufferSize int
}
type WSClient struct {
config ClientConfig
conn *websocket.Conn
sendChan chan Packet
mu sync.Mutex
isConnected bool
stopChan chan struct{}
handler MessageHandler
reconnectDelay time.Duration // 指数退避延迟
agentIdentity *agent.Identity
}
func NewWSClient(cfg ClientConfig) *WSClient {
if cfg.BufferSize == 0 {
cfg.BufferSize = 100
}
agentIdentity, err := agent.GetIdentity()
if err != nil {
log.Fatalf("[网络] 获取代理身份信息失败: %v", err)
}
cfg.AgentID = agentIdentity.AgentID
return &WSClient{
config: cfg,
sendChan: make(chan Packet, cfg.BufferSize),
stopChan: make(chan struct{}),
reconnectDelay: 1 * time.Second, // 初始延迟 1秒
agentIdentity: agentIdentity,
}
}
func (c *WSClient) GetAgentIdentity() *agent.Identity {
return c.agentIdentity
}
// SetHandler 设置消息处理器
func (c *WSClient) SetHandler(handler MessageHandler) {
c.handler = handler
}
func (c *WSClient) Start() {
go c.connectionLoop()
go c.sendLoop()
}
func (c *WSClient) SendQueue(packet Packet) {
select {
case c.sendChan <- packet:
default:
log.Printf("[网络] 发送队列已满,丢弃消息: %s", packet.Type)
}
}
func (c *WSClient) Stop() {
close(c.stopChan)
if c.conn != nil {
c.conn.Close()
}
}
func (c *WSClient) sendLoop() {
for {
select {
case <-c.stopChan:
return
case packet := <-c.sendChan:
c.sendRaw(packet)
}
}
}
func (c *WSClient) sendRaw(packet Packet) {
c.mu.Lock()
defer c.mu.Unlock()
if !c.isConnected || c.conn == nil {
log.Printf("[网络] 无连接,无法发送消息: %s", packet.Type)
return
}
c.conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
payload, _ := json.Marshal(packet)
if err := c.conn.WriteMessage(websocket.TextMessage, payload); err != nil {
log.Printf("[网络] 发送消息失败: %v", err)
c.isConnected = false
}
}
func (c *WSClient) connectionLoop() {
const (
initialDelay = 1 * time.Second
maxDelay = 60 * time.Second
backoffFactor = 2.0
)
for {
select {
case <-c.stopChan:
return
default:
if !c.isConnected {
if err := c.connect(); err != nil {
log.Printf("[网络] 连接 %s 失败: %v. %v后重试...",
c.config.ServerURL, err, c.reconnectDelay)
// 等待退避时间
time.Sleep(c.reconnectDelay)
// 计算下一次退避时间指数增长最大60秒
c.reconnectDelay = time.Duration(float64(c.reconnectDelay) * backoffFactor)
if c.reconnectDelay > maxDelay {
c.reconnectDelay = maxDelay
}
continue
}
// 连接成功,重置退避时间
c.reconnectDelay = initialDelay
}
_, message, err := c.conn.ReadMessage()
if err != nil {
log.Printf("[网络] 连接断开: %v", err)
c.closeConn()
// 重置退避时间,准备重新连接
c.reconnectDelay = initialDelay
continue
}
// 处理服务器消息
c.handleServerMessage(message)
}
}
}
// handleServerMessage 处理服务器消息
func (c *WSClient) handleServerMessage(message []byte) {
var packet Packet
if err := json.Unmarshal(message, &packet); err != nil {
log.Printf("[网络] JSON解析失败: %v", err)
return // 丢弃畸形消息,不断开连接
}
if c.handler == nil {
log.Printf("[网络] 收到服务器消息但未设置处理器: %s", packet.Type)
return
}
payload, ok := packet.Payload.(map[string]interface{})
if !ok {
log.Printf("[网络] 无法解析消息载荷: %v", packet.Payload)
return
}
switch packet.Type {
case TypeConfigUpdate:
c.handler.HandleConfigUpdate(payload)
case TypeTaskScan:
c.handler.HandleTaskScan(payload)
case TypeTaskStop:
c.handler.HandleTaskStop(payload)
case TypeCommandResponse:
// 命令响应,可用于确认
log.Printf("[网络] 收到命令响应: code=%d", packet.Code)
default:
log.Printf("[网络] 未知消息类型: %s", packet.Type)
}
}
func (c *WSClient) closeConn() {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.isConnected = false
}
func (c *WSClient) connect() error {
serverURL := c.buildURL()
conn, _, err := websocket.DefaultDialer.Dial(serverURL, nil)
if err != nil {
return err
}
c.mu.Lock()
defer c.mu.Unlock()
select {
case <-c.stopChan:
conn.Close()
return nil
default:
}
c.conn = conn
c.isConnected = true
log.Printf("[网络] 成功连接到服务器: %s", c.config.ServerURL)
return nil
}
func (c *WSClient) buildURL() string {
u, err := url.Parse(c.config.ServerURL)
if err != nil {
log.Printf("[网络] 解析服务器URL失败: %v", err)
return c.config.ServerURL
}
q := u.Query()
q.Set("agent_id", c.config.AgentID)
u.RawQuery = q.Encode()
return u.String()
}