Files
sub2api/backend/internal/service/kimi_gateway_service.go
openclaw e746e82c39
Some checks failed
CI / test (push) Has been cancelled
CI / frontend (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
Security Scan / backend-security (push) Has been cancelled
Security Scan / frontend-security (push) Has been cancelled
feat(kimi): add Kimi CLI forward mode support
- Add AccountTypeCLI domain constant
- Add KimiCLIGateway to forward requests through local kimi-cli binary
- Route CLI accounts in ForwardKimiChatCompletions to cli gateway
- Handle CLI type in GetAccessToken (no token needed)
- Fix Gin oneof binding to accept 'cli' type (Create/Update Account)
- Fix validateDataAccount to accept bedrock and cli types
- Remove unsupported --model arg from kimi-cli invocation
- Frontend: CLI account creation UI with model mapping, pool mode
- Frontend: CLI edit modal support
- Frontend: UseKeyModal shows OpenAI examples for kimi platform
- Add i18n strings for CLI account type

[缅因猫/Codex🐾]
2026-04-24 01:54:59 +08:00

259 lines
7.3 KiB
Go

package service
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/gin-gonic/gin"
"github.com/tidwall/gjson"
)
const (
kimiAPIBaseURL = "https://api.kimi.com/coding/v1"
kimiChatCompletionsURL = kimiAPIBaseURL + "/chat/completions"
kimiDefaultUserAgent = "kimi-cli/1.0"
)
// ForwardKimiChatCompletions forwards an OpenAI Chat Completions request directly to Kimi API.
// Kimi API is OpenAI Chat Completions compatible, so the body is forwarded as-is.
// For CLI-type accounts, requests are forwarded through the local kimi-cli binary.
func (s *GatewayService) ForwardKimiChatCompletions(
ctx context.Context,
c *gin.Context,
account *Account,
body []byte,
) (*ForwardResult, error) {
// If account type is CLI, use local kimi-cli (CLI manages its own auth)
if account.Type == AccountTypeCLI {
cliGateway := NewKimiCLIGateway()
return cliGateway.ForwardChatCompletions(ctx, c, account, body)
}
startTime := time.Now()
// 1. Get access token
token, tokenType, err := s.GetAccessToken(ctx, account)
if err != nil {
return nil, fmt.Errorf("get access token: %w", err)
}
// 2. Get proxy URL
proxyURL := ""
if account.ProxyID != nil && account.Proxy != nil {
proxyURL = account.Proxy.URL()
}
// 3. Parse request to determine if streaming
reqStream := gjson.GetBytes(body, "stream").Bool()
// 4. Build upstream request
req, err := http.NewRequestWithContext(ctx, "POST", kimiChatCompletionsURL, bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("build upstream request: %w", err)
}
// Set auth header
if tokenType == "oauth" {
setHeaderRaw(req.Header, "authorization", "Bearer "+token)
} else {
setHeaderRaw(req.Header, "x-api-key", token)
}
// Set required headers for Kimi API
setHeaderRaw(req.Header, "content-type", "application/json")
setHeaderRaw(req.Header, "accept", "application/json")
// Kimi Coding API requires a known coding agent User-Agent pattern
setHeaderRaw(req.Header, "user-agent", kimiDefaultUserAgent)
// Passthrough allowed client headers
var clientHeaders http.Header
if c != nil && c.Request != nil {
clientHeaders = c.Request.Header
}
for key, values := range clientHeaders {
lowerKey := strings.ToLower(key)
if allowedHeaders[lowerKey] {
wireKey := resolveWireCasing(key)
for _, v := range values {
addHeaderRaw(req.Header, wireKey, v)
}
}
}
// 5. Resolve TLS fingerprint profile
tlsProfile := s.tlsFPProfileService.ResolveTLSProfile(account)
// 6. Send request
resp, err := s.httpUpstream.DoWithTLS(req, proxyURL, account.ID, account.Concurrency, tlsProfile)
if err != nil {
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
safeErr := sanitizeUpstreamErrorMessage(err.Error())
setOpsUpstreamError(c, 0, safeErr, "")
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: 0,
Kind: "request_error",
Message: safeErr,
})
writeGatewayCCError(c, http.StatusBadGateway, "server_error", "Upstream request failed")
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
}
defer func() { _ = resp.Body.Close() }()
// 7. Handle error response
if resp.StatusCode >= 400 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
if s.shouldFailoverUpstreamError(resp.StatusCode) {
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "failover",
Message: upstreamMsg,
})
if s.rateLimitService != nil {
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
}
return nil, &UpstreamFailoverError{
StatusCode: resp.StatusCode,
ResponseBody: respBody,
}
}
writeGatewayCCError(c, mapUpstreamStatusCode(resp.StatusCode), "server_error", upstreamMsg)
return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg)
}
// 8. Handle successful response
originalModel := gjson.GetBytes(body, "model").String()
mappedModel := originalModel
if account.Type == AccountTypeAPIKey {
mappedModel = account.GetMappedModel(originalModel)
}
var result *ForwardResult
if reqStream {
result, err = s.handleKimiStreamingResponse(resp, c, originalModel, mappedModel, startTime)
} else {
result, err = s.handleKimiNonStreamingResponse(resp, c, originalModel, mappedModel, startTime)
}
return result, err
}
func (s *GatewayService) handleKimiStreamingResponse(
resp *http.Response,
c *gin.Context,
originalModel, mappedModel string,
startTime time.Time,
) (*ForwardResult, error) {
if c == nil || c.Writer == nil {
return nil, errors.New("gin context or writer is nil")
}
c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.WriteHeader(http.StatusOK)
flusher, ok := c.Writer.(http.Flusher)
if !ok {
return nil, errors.New("streaming not supported")
}
scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 4096), defaultMaxLineSize)
var firstTokenTime *time.Duration
var upstreamModel string
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
// Detect first token
if firstTokenTime == nil {
elapsed := time.Since(startTime)
firstTokenTime = &elapsed
}
// Write the SSE line as-is
fmt.Fprintf(c.Writer, "%s\n", line) //nolint:errcheck
flusher.Flush()
// Try to extract upstream model from the first data line
if upstreamModel == "" && strings.HasPrefix(line, "data: {") {
upstreamModel = gjson.Get(line[6:], "model").String()
}
}
if err := scanner.Err(); err != nil && !errors.Is(err, io.EOF) {
logger.LegacyPrintf("service.gateway", "Kimi stream scanner error: %v", err)
}
// Send final [DONE] if not already sent by upstream
fmt.Fprintf(c.Writer, "data: [DONE]\n\n") //nolint:errcheck
flusher.Flush()
var firstTokenMs *int
if firstTokenTime != nil {
ms := int(firstTokenTime.Milliseconds())
firstTokenMs = &ms
}
return &ForwardResult{
UpstreamModel: upstreamModel,
FirstTokenMs: firstTokenMs,
}, nil
}
func (s *GatewayService) handleKimiNonStreamingResponse(
resp *http.Response,
c *gin.Context,
originalModel, mappedModel string,
startTime time.Time,
) (*ForwardResult, error) {
body, err := io.ReadAll(io.LimitReader(resp.Body, 50<<20))
if err != nil {
return nil, fmt.Errorf("read upstream response: %w", err)
}
if c != nil && c.Writer != nil {
// Copy upstream headers
for key, values := range resp.Header {
for _, v := range values {
c.Writer.Header().Add(key, v)
}
}
c.Writer.Header().Set("Content-Type", "application/json")
c.Writer.WriteHeader(resp.StatusCode)
_, _ = c.Writer.Write(body)
}
upstreamModel := gjson.GetBytes(body, "model").String()
return &ForwardResult{
UpstreamModel: upstreamModel,
}, nil
}