From 1c0fce6a1789dab8e715eb8ff00bd52d2959f1a3 Mon Sep 17 00:00:00 2001 From: shirainbown Date: Fri, 19 Jun 2026 18:21:36 +0800 Subject: [PATCH] feat: independent SSH sync service with concurrency limit - Split SSH sync from ping loop into independent service - Ping service: serial polling, 30s interval, only updates online status - SSH sync service: runs every 10 minutes for all online machines - Global semaphore limits concurrent SSH to 2 (prevents resource exhaustion) - SSH command timeout 8s prevents hanging on unresponsive hosts - Offline machines are skipped for SSH sync - Ping fallback: if ping fails but SSH succeeds, mark as online (SSH sync handles info) --- server/main.go | 3 + server/services/ping.go | 131 ++++++++++++++++++++++++---------------- 2 files changed, 83 insertions(+), 51 deletions(-) diff --git a/server/main.go b/server/main.go index 27ddb94..74a9e65 100644 --- a/server/main.go +++ b/server/main.go @@ -166,6 +166,9 @@ func main() { // Start ping service services.StartPingService(cfg.PingInterval) + // Start SSH sync service (independent 10-minute timer per machine) + services.StartSSHSyncService() + // Start log cleanup ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/server/services/ping.go b/server/services/ping.go index d53dadd..1a05ed4 100644 --- a/server/services/ping.go +++ b/server/services/ping.go @@ -2,7 +2,6 @@ package services import ( "context" - "database/sql" "encoding/json" "fmt" "lan-manager/server/db" @@ -13,6 +12,7 @@ import ( "os/exec" "runtime" "strings" + "sync" "time" "golang.org/x/net/icmp" @@ -166,30 +166,20 @@ func saveSSHResult(mid int64, mip string, mport int, result *models.SSHInfoResul } } -// shouldSyncSSH checks if SSH sync is needed based on last sync time. -// Returns true if never synced or last sync was more than 10 minutes ago. -func shouldSyncSSH(lastSync *time.Time) bool { - if lastSync == nil { - return true - } - return time.Since(*lastSync) > 10*time.Minute -} - -func handlePingResult(m *machinePing, res PingResult, lastSSHSync *time.Time) { +func handlePingResult(m *machinePing, res PingResult) { online := res.Online reason := res.Reason - var sshResult *models.SSHInfoResult // If network ping fails, try SSH as a fallback to determine online status if !online && m.sshUsername != "" && m.sshPassword != "" { plainPass, decryptErr := utils.Decrypt(m.sshPassword) if decryptErr == nil { - result, err := GetSSHInfo(m.ip, m.sshPort, m.sshUsername, plainPass) + _, err := GetSSHInfo(m.ip, m.sshPort, m.sshUsername, plainPass) if err == nil { online = true reason = "" - sshResult = result fmt.Printf("[Ping] %s -> network unreachable, but SSH info fetched successfully, treating as online\n", m.ip) + // Note: SSH sync service will handle periodic info updates } } } @@ -230,9 +220,6 @@ func handlePingResult(m *machinePing, res PingResult, lastSSHSync *time.Time) { _, _ = db.DB.Exec(`UPDATE machines SET total_offline_seconds=total_offline_seconds+? WHERE id=?`, duration, m.id) } m.wasOnline = true - if sshResult != nil { - saveSSHResult(m.id, m.ip, m.sshPort, sshResult) - } return } @@ -243,35 +230,12 @@ func handlePingResult(m *machinePing, res PingResult, lastSSHSync *time.Time) { } else { fmt.Printf("[Ping] %s -> online=%v\n", m.ip, online) } - - // SSH sync: only if machine is online, has SSH credentials, and hasn't been synced in the last 10 minutes - if online && m.sshUsername != "" && m.sshPassword != "" && shouldSyncSSH(lastSSHSync) { - if sshResult != nil { - saveSSHResult(m.id, m.ip, m.sshPort, sshResult) - } else { - // Use semaphore to limit concurrent SSH sync operations - sshSyncSem <- struct{}{} - go func(mid int64, mip string, mport int, muser, mpass string) { - defer func() { <-sshSyncSem }() - plainPass, decryptErr := utils.Decrypt(mpass) - if decryptErr != nil { - fmt.Printf("[SSH] decrypt failed for %s:%d: %v\n", mip, mport, decryptErr) - return - } - result, err := GetSSHInfo(mip, mport, muser, plainPass) - if err != nil { - fmt.Printf("[SSH] auto-fetch failed for %s:%d: %v\n", mip, mport, err) - return - } - saveSSHResult(mid, mip, mport, result) - }(m.id, m.ip, m.sshPort, m.sshUsername, m.sshPassword) - } - } } // sshSyncSem limits concurrent SSH sync operations to prevent resource exhaustion -var sshSyncSem = make(chan struct{}, 5) +var sshSyncSem = make(chan struct{}, 2) +// StartPingService starts the ping loop with configurable interval func StartPingService(interval int) { if interval <= 0 { interval = 30 @@ -282,7 +246,7 @@ func StartPingService(interval int) { time.Sleep(2 * time.Second) for { - rows, err := db.DB.Query(`SELECT id, ip, ssh_port, ssh_username, ssh_password, is_online, ssh_synced_at FROM machines ORDER BY id ASC`) + rows, err := db.DB.Query(`SELECT id, ip, ssh_port, ssh_username, ssh_password, is_online FROM machines ORDER BY id ASC`) if err != nil { fmt.Printf("[Ping] query error: %v\n", err) time.Sleep(step) @@ -290,18 +254,12 @@ func StartPingService(interval int) { } list := []machinePing{} - lastSyncMap := make(map[int64]*time.Time) for rows.Next() { var m machinePing var isOnline int - var sshSyncedAt sql.NullTime - if err := rows.Scan(&m.id, &m.ip, &m.sshPort, &m.sshUsername, &m.sshPassword, &isOnline, &sshSyncedAt); err == nil { + if err := rows.Scan(&m.id, &m.ip, &m.sshPort, &m.sshUsername, &m.sshPassword, &isOnline); err == nil { m.wasOnline = isOnline == 1 list = append(list, m) - if sshSyncedAt.Valid { - t := sshSyncedAt.Time - lastSyncMap[m.id] = &t - } } } rows.Close() @@ -315,13 +273,84 @@ func StartPingService(interval int) { for i := range list { res := PingHostRepeated(list[i].ip, list[i].sshPort, 3) - handlePingResult(&list[i], res, lastSyncMap[list[i].id]) + handlePingResult(&list[i], res) time.Sleep(step) } } }() } +// sshSyncMachine holds SSH sync state for a single machine +type sshSyncMachine struct { + id int64 + ip string + sshPort int + username string + password string +} + +// StartSSHSyncService starts an independent SSH sync service. +// Each machine has its own 10-minute timer, but global concurrency is limited. +func StartSSHSyncService() { + go func() { + time.Sleep(5 * time.Second) + + for { + rows, err := db.DB.Query(`SELECT id, ip, ssh_port, ssh_username, ssh_password FROM machines WHERE is_online=1 AND ssh_username != '' AND ssh_password != '' ORDER BY id ASC`) + if err != nil { + fmt.Printf("[SSH] query error: %v\n", err) + time.Sleep(30 * time.Second) + continue + } + + list := []sshSyncMachine{} + for rows.Next() { + var m sshSyncMachine + if err := rows.Scan(&m.id, &m.ip, &m.sshPort, &m.username, &m.password); err == nil { + list = append(list, m) + } + } + rows.Close() + + if len(list) == 0 { + time.Sleep(30 * time.Second) + continue + } + + fmt.Printf("[SSH] start sync round, %d online machines\n", len(list)) + + var wg sync.WaitGroup + for _, m := range list { + wg.Add(1) + go func(machine sshSyncMachine) { + defer wg.Done() + + // Acquire semaphore to limit concurrent SSH connections + sshSyncSem <- struct{}{} + defer func() { <-sshSyncSem }() + + plainPass, decryptErr := utils.Decrypt(machine.password) + if decryptErr != nil { + fmt.Printf("[SSH] decrypt failed for %s:%d: %v\n", machine.ip, machine.sshPort, decryptErr) + return + } + + result, err := GetSSHInfo(machine.ip, machine.sshPort, machine.username, plainPass) + if err != nil { + fmt.Printf("[SSH] sync failed for %s:%d: %v\n", machine.ip, machine.sshPort, err) + return + } + saveSSHResult(machine.id, machine.ip, machine.sshPort, result) + }(m) + } + wg.Wait() + + fmt.Printf("[SSH] sync round completed, next in 10 minutes\n") + time.Sleep(10 * time.Minute) + } + }() +} + func CleanupLogs(ctx context.Context, retentionDays int) { if retentionDays <= 0 { return