Files
lan-manager/server/services/ping.go
shirainbown 1c0fce6a17 feat: independent SSH sync service with concurrency limit
- Split SSH sync from ping loop into independent service
- Ping service: serial polling, 30s interval, only updates online status
- SSH sync service: runs every 10 minutes for all online machines
- Global semaphore limits concurrent SSH to 2 (prevents resource exhaustion)
- SSH command timeout 8s prevents hanging on unresponsive hosts
- Offline machines are skipped for SSH sync
- Ping fallback: if ping fails but SSH succeeds, mark as online (SSH sync handles info)
2026-06-19 18:21:36 +08:00

371 lines
10 KiB
Go

package services
import (
"context"
"encoding/json"
"fmt"
"lan-manager/server/db"
"lan-manager/server/models"
"lan-manager/server/utils"
"net"
"os"
"os/exec"
"runtime"
"strings"
"sync"
"time"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
)
// PingResult holds connectivity status and failure reason
type PingResult struct {
Online bool
Reason string // empty if online, otherwise human-readable failure reason
}
func PingHost(ip string, port int) PingResult {
if ok := pingWithSystem(ip); ok {
return PingResult{Online: true}
}
if ok := pingWithICMP(ip); ok {
return PingResult{Online: true}
}
if ok := pingWithTCP(ip, port); ok {
return PingResult{Online: true}
}
return PingResult{Online: false, Reason: "unreachable (ping/icmp/tcp all failed)"}
}
// PingHostRepeated pings the host up to retries times.
// If any attempt succeeds, it returns online immediately.
// Only if all retries fail does it return offline.
func PingHostRepeated(ip string, port int, retries int) PingResult {
if retries <= 0 {
retries = 1
}
var lastReason string
for i := 0; i < retries; i++ {
if i > 0 {
time.Sleep(2 * time.Second)
}
res := PingHost(ip, port)
if res.Online {
return res
}
lastReason = res.Reason
}
return PingResult{Online: false, Reason: fmt.Sprintf("unreachable after %d retries (%s)", retries, lastReason)}
}
func pingWithSystem(ip string) bool {
var cmd *exec.Cmd
if runtime.GOOS == "windows" {
cmd = exec.Command("ping", "-n", "1", "-w", "5000", ip)
} else if runtime.GOOS == "darwin" {
cmd = exec.Command("ping", "-c", "1", "-W", "5000", ip)
} else {
cmd = exec.Command("ping", "-c", "1", "-W", "5", ip)
}
out, err := cmd.CombinedOutput()
if err != nil {
if _, ok := err.(*exec.ExitError); !ok {
fmt.Printf("[Ping] system ping error for %s: %v, output: %s\n", ip, err, strings.TrimSpace(string(out)))
}
return false
}
return true
}
func pingWithICMP(ip string) bool {
c, err := icmp.ListenPacket("ip4:icmp", "0.0.0.0")
if err != nil {
return false
}
defer c.Close()
id := os.Getpid() & 0xffff
m := &icmp.Message{
Type: ipv4.ICMPTypeEcho,
Code: 0,
Body: &icmp.Echo{ID: id, Seq: 1, Data: []byte("lan-manager")},
}
wb, err := m.Marshal(nil)
if err != nil {
return false
}
if _, err := c.WriteTo(wb, &net.IPAddr{IP: parseIP(ip)}); err != nil {
return false
}
rb := make([]byte, 1500)
_ = c.SetReadDeadline(time.Now().Add(5 * time.Second))
n, _, err := c.ReadFrom(rb)
if err != nil {
return false
}
rm, err := icmp.ParseMessage(ipv4.ICMPTypeEchoReply.Protocol(), rb[:n])
if err != nil {
return false
}
return rm.Type == ipv4.ICMPTypeEchoReply
}
func pingWithTCP(ip string, port int) bool {
if port <= 0 {
port = 22
}
addr := fmt.Sprintf("%s:%d", ip, port)
conn, err := net.DialTimeout("tcp", addr, 3*time.Second)
if err != nil {
return false
}
_ = conn.Close()
return true
}
func parseIP(ip string) []byte {
parts := strings.Split(ip, ".")
if len(parts) != 4 {
return nil
}
res := make([]byte, 4)
for i, p := range parts {
var v byte
fmt.Sscanf(p, "%d", &v)
res[i] = v
}
return res
}
type machinePing struct {
id int64
ip string
sshPort int
sshUsername string
sshPassword string
wasOnline bool
}
func saveSSHResult(mid int64, mip string, mport int, result *models.SSHInfoResult) {
portsStr := ""
if len(result.ListenPorts) > 0 {
b, _ := json.Marshal(result.ListenPorts)
portsStr = string(b)
if len(portsStr) > 2 {
portsStr = portsStr[1 : len(portsStr)-1]
}
}
_, dbErr := db.DB.Exec(`UPDATE machines SET cpu_info=?, memory_info=?, disk_info=?, uptime=?, listen_ports=?, ssh_synced_at=CURRENT_TIMESTAMP, updated_at=CURRENT_TIMESTAMP WHERE id=?`,
result.RawCPUInfo, result.RawMemoryInfo, result.RawDiskInfo, result.Uptime, portsStr, mid)
if dbErr != nil {
fmt.Printf("[SSH] auto-fetch db error for %s:%d: %v\n", mip, mport, dbErr)
} else {
fmt.Printf("[SSH] auto-fetch success for %s:%d\n", mip, mport)
}
}
func handlePingResult(m *machinePing, res PingResult) {
online := res.Online
reason := res.Reason
// If network ping fails, try SSH as a fallback to determine online status
if !online && m.sshUsername != "" && m.sshPassword != "" {
plainPass, decryptErr := utils.Decrypt(m.sshPassword)
if decryptErr == nil {
_, err := GetSSHInfo(m.ip, m.sshPort, m.sshUsername, plainPass)
if err == nil {
online = true
reason = ""
fmt.Printf("[Ping] %s -> network unreachable, but SSH info fetched successfully, treating as online\n", m.ip)
// Note: SSH sync service will handle periodic info updates
}
}
}
var onlineInt int
if online {
onlineInt = 1
}
// State transition: online -> offline
if m.wasOnline && !online {
_, dbErr := db.DB.Exec(`UPDATE machines SET is_online=0, last_ping_at=CURRENT_TIMESTAMP, offline_count=offline_count+1, last_offline_at=CURRENT_TIMESTAMP, last_offline_reason=? WHERE id=?`,
reason, m.id)
if dbErr != nil {
fmt.Printf("[Ping] update status error for %s: %v\n", m.ip, dbErr)
} else {
fmt.Printf("[Ping] %s -> online=false, reason=%s\n", m.ip, reason)
}
_, _ = db.DB.Exec(`INSERT INTO offline_logs (machine_id, reason, started_at) VALUES (?, ?, CURRENT_TIMESTAMP)`, m.id, reason)
m.wasOnline = false
return
}
// State transition: offline -> online
if !m.wasOnline && online {
_, dbErr := db.DB.Exec(`UPDATE machines SET is_online=1, last_ping_at=CURRENT_TIMESTAMP, last_offline_reason='' WHERE id=?`, m.id)
if dbErr != nil {
fmt.Printf("[Ping] update status error for %s: %v\n", m.ip, dbErr)
} else {
fmt.Printf("[Ping] %s -> online=true\n", m.ip)
}
var logID int64
var startedAt time.Time
row := db.DB.QueryRow(`SELECT id, started_at FROM offline_logs WHERE machine_id=? AND ended_at IS NULL ORDER BY started_at DESC LIMIT 1`, m.id)
if err := row.Scan(&logID, &startedAt); err == nil {
duration := int(time.Since(startedAt).Seconds())
_, _ = db.DB.Exec(`UPDATE offline_logs SET ended_at=CURRENT_TIMESTAMP, duration_seconds=? WHERE id=?`, duration, logID)
_, _ = db.DB.Exec(`UPDATE machines SET total_offline_seconds=total_offline_seconds+? WHERE id=?`, duration, m.id)
}
m.wasOnline = true
return
}
// No state change
_, dbErr := db.DB.Exec(`UPDATE machines SET is_online=?, last_ping_at=CURRENT_TIMESTAMP WHERE id=?`, onlineInt, m.id)
if dbErr != nil {
fmt.Printf("[Ping] update status error for %s: %v\n", m.ip, dbErr)
} else {
fmt.Printf("[Ping] %s -> online=%v\n", m.ip, online)
}
}
// sshSyncSem limits concurrent SSH sync operations to prevent resource exhaustion
var sshSyncSem = make(chan struct{}, 2)
// StartPingService starts the ping loop with configurable interval
func StartPingService(interval int) {
if interval <= 0 {
interval = 30
}
step := time.Duration(interval) * time.Second
go func() {
time.Sleep(2 * time.Second)
for {
rows, err := db.DB.Query(`SELECT id, ip, ssh_port, ssh_username, ssh_password, is_online FROM machines ORDER BY id ASC`)
if err != nil {
fmt.Printf("[Ping] query error: %v\n", err)
time.Sleep(step)
continue
}
list := []machinePing{}
for rows.Next() {
var m machinePing
var isOnline int
if err := rows.Scan(&m.id, &m.ip, &m.sshPort, &m.sshUsername, &m.sshPassword, &isOnline); err == nil {
m.wasOnline = isOnline == 1
list = append(list, m)
}
}
rows.Close()
if len(list) == 0 {
time.Sleep(step)
continue
}
fmt.Printf("[Ping] start round, %d machines, interval %v each\n", len(list), step)
for i := range list {
res := PingHostRepeated(list[i].ip, list[i].sshPort, 3)
handlePingResult(&list[i], res)
time.Sleep(step)
}
}
}()
}
// sshSyncMachine holds SSH sync state for a single machine
type sshSyncMachine struct {
id int64
ip string
sshPort int
username string
password string
}
// StartSSHSyncService starts an independent SSH sync service.
// Each machine has its own 10-minute timer, but global concurrency is limited.
func StartSSHSyncService() {
go func() {
time.Sleep(5 * time.Second)
for {
rows, err := db.DB.Query(`SELECT id, ip, ssh_port, ssh_username, ssh_password FROM machines WHERE is_online=1 AND ssh_username != '' AND ssh_password != '' ORDER BY id ASC`)
if err != nil {
fmt.Printf("[SSH] query error: %v\n", err)
time.Sleep(30 * time.Second)
continue
}
list := []sshSyncMachine{}
for rows.Next() {
var m sshSyncMachine
if err := rows.Scan(&m.id, &m.ip, &m.sshPort, &m.username, &m.password); err == nil {
list = append(list, m)
}
}
rows.Close()
if len(list) == 0 {
time.Sleep(30 * time.Second)
continue
}
fmt.Printf("[SSH] start sync round, %d online machines\n", len(list))
var wg sync.WaitGroup
for _, m := range list {
wg.Add(1)
go func(machine sshSyncMachine) {
defer wg.Done()
// Acquire semaphore to limit concurrent SSH connections
sshSyncSem <- struct{}{}
defer func() { <-sshSyncSem }()
plainPass, decryptErr := utils.Decrypt(machine.password)
if decryptErr != nil {
fmt.Printf("[SSH] decrypt failed for %s:%d: %v\n", machine.ip, machine.sshPort, decryptErr)
return
}
result, err := GetSSHInfo(machine.ip, machine.sshPort, machine.username, plainPass)
if err != nil {
fmt.Printf("[SSH] sync failed for %s:%d: %v\n", machine.ip, machine.sshPort, err)
return
}
saveSSHResult(machine.id, machine.ip, machine.sshPort, result)
}(m)
}
wg.Wait()
fmt.Printf("[SSH] sync round completed, next in 10 minutes\n")
time.Sleep(10 * time.Minute)
}
}()
}
func CleanupLogs(ctx context.Context, retentionDays int) {
if retentionDays <= 0 {
return
}
ticker := time.NewTicker(24 * time.Hour)
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_, _ = db.DB.Exec(`DELETE FROM operation_logs WHERE created_at < datetime('now', '-` + fmt.Sprintf("%d", retentionDays) + ` days')`)
_, _ = db.DB.Exec(`DELETE FROM offline_logs WHERE started_at < datetime('now', '-` + fmt.Sprintf("%d", retentionDays) + ` days')`)
}
}
}()
}