diff --git a/backend/cmd/server/wire.go b/backend/cmd/server/wire.go
index 64709b5b..53e4777e 100644
--- a/backend/cmd/server/wire.go
+++ b/backend/cmd/server/wire.go
@@ -92,6 +92,7 @@ func provideCleanup(
oauth *service.OAuthService,
openaiOAuth *service.OpenAIOAuthService,
geminiOAuth *service.GeminiOAuthService,
+ kimiOAuth *service.KimiOAuthService,
antigravityOAuth *service.AntigravityOAuthService,
openAIGateway *service.OpenAIGatewayService,
scheduledTestRunner *service.ScheduledTestRunnerService,
@@ -211,6 +212,10 @@ func provideCleanup(
geminiOAuth.Stop()
return nil
}},
+ {"KimiOAuthService", func() error {
+ kimiOAuth.Stop()
+ return nil
+ }},
{"AntigravityOAuthService", func() error {
antigravityOAuth.Stop()
return nil
diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go
index 3b474c4a..0f13dcfa 100644
--- a/backend/cmd/server/wire_gen.go
+++ b/backend/cmd/server/wire_gen.go
@@ -121,6 +121,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
driveClient := repository.NewGeminiDriveClient()
geminiOAuthService := service.NewGeminiOAuthService(proxyRepository, geminiOAuthClient, geminiCliCodeAssistClient, driveClient, configConfig)
antigravityOAuthService := service.NewAntigravityOAuthService(proxyRepository)
+ kimiOAuthService := service.NewKimiOAuthService()
geminiQuotaService := service.NewGeminiQuotaService(configConfig, settingRepository)
tempUnschedCache := repository.NewTempUnschedCache(redisClient)
timeoutCounterCache := repository.NewTimeoutCounterCache(redisClient)
@@ -158,6 +159,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
openAIOAuthHandler := admin.NewOpenAIOAuthHandler(openAIOAuthService, adminService)
geminiOAuthHandler := admin.NewGeminiOAuthHandler(geminiOAuthService)
antigravityOAuthHandler := admin.NewAntigravityOAuthHandler(antigravityOAuthService)
+ kimiOAuthHandler := admin.NewKimiOAuthHandler(kimiOAuthService, proxyRepository)
proxyHandler := admin.NewProxyHandler(adminService)
adminRedeemHandler := admin.NewRedeemHandler(adminService, redeemService)
promoHandler := admin.NewPromoHandler(promoService)
@@ -172,12 +174,13 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
identityService := service.NewIdentityService(identityCache)
deferredService := service.ProvideDeferredService(accountRepository, timingWheelService)
claudeTokenProvider := service.ProvideClaudeTokenProvider(accountRepository, geminiTokenCache, oAuthService, oAuthRefreshAPI)
+ kimiTokenProvider := service.ProvideKimiTokenProvider(accountRepository, geminiTokenCache, kimiOAuthService, oAuthRefreshAPI)
digestSessionStore := service.NewDigestSessionStore()
channelRepository := repository.NewChannelRepository(db)
channelService := service.NewChannelService(channelRepository, apiKeyAuthCacheInvalidator)
modelPricingResolver := service.NewModelPricingResolver(channelService, billingService)
balanceNotifyService := service.ProvideBalanceNotifyService(emailService, settingRepository, accountRepository)
- gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, usageBillingRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService, claudeTokenProvider, sessionLimitCache, rpmCache, digestSessionStore, settingService, tlsFingerprintProfileService, channelService, modelPricingResolver, balanceNotifyService)
+ gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, usageBillingRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService, claudeTokenProvider, kimiTokenProvider, sessionLimitCache, rpmCache, digestSessionStore, settingService, tlsFingerprintProfileService, channelService, modelPricingResolver, balanceNotifyService)
openAITokenProvider := service.ProvideOpenAITokenProvider(accountRepository, geminiTokenCache, openAIOAuthService, oAuthRefreshAPI)
openAIGatewayService := service.NewOpenAIGatewayService(accountRepository, usageLogRepository, usageBillingRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, httpUpstream, deferredService, openAITokenProvider, modelPricingResolver, channelService, balanceNotifyService)
geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, schedulerSnapshotService, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig)
@@ -221,7 +224,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
settingHandler := admin.NewSettingHandler(settingService, emailService, turnstileService, opsService, paymentConfigService, paymentService)
paymentOrderExpiryService := service.ProvidePaymentOrderExpiryService(paymentService)
paymentHandler := admin.NewPaymentHandler(paymentService, paymentConfigService)
- adminHandlers := handler.ProvideAdminHandlers(dashboardHandler, adminUserHandler, groupHandler, accountHandler, adminAnnouncementHandler, dataManagementHandler, backupHandler, oAuthHandler, openAIOAuthHandler, geminiOAuthHandler, antigravityOAuthHandler, proxyHandler, adminRedeemHandler, promoHandler, settingHandler, opsHandler, systemHandler, adminSubscriptionHandler, adminUsageHandler, userAttributeHandler, errorPassthroughHandler, tlsFingerprintProfileHandler, adminAPIKeyHandler, scheduledTestHandler, channelHandler, paymentHandler)
+ adminHandlers := handler.ProvideAdminHandlers(dashboardHandler, adminUserHandler, groupHandler, accountHandler, adminAnnouncementHandler, dataManagementHandler, backupHandler, oAuthHandler, openAIOAuthHandler, geminiOAuthHandler, antigravityOAuthHandler, kimiOAuthHandler, proxyHandler, adminRedeemHandler, promoHandler, settingHandler, opsHandler, systemHandler, adminSubscriptionHandler, adminUsageHandler, userAttributeHandler, errorPassthroughHandler, tlsFingerprintProfileHandler, adminAPIKeyHandler, scheduledTestHandler, channelHandler, paymentHandler)
usageRecordWorkerPool := service.NewUsageRecordWorkerPool(configConfig)
userMsgQueueCache := repository.NewUserMsgQueueCache(redisClient)
userMessageQueueService := service.ProvideUserMessageQueueService(userMsgQueueCache, rpmCache, configConfig)
@@ -244,7 +247,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
opsAlertEvaluatorService := service.ProvideOpsAlertEvaluatorService(opsService, opsRepository, emailService, redisClient, configConfig)
opsCleanupService := service.ProvideOpsCleanupService(opsRepository, db, redisClient, configConfig)
opsScheduledReportService := service.ProvideOpsScheduledReportService(opsService, userService, emailService, redisClient, configConfig)
- tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, schedulerCache, configConfig, tempUnschedCache, privacyClientFactory, proxyRepository, oAuthRefreshAPI)
+ tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, kimiOAuthService, compositeTokenCacheInvalidator, schedulerCache, configConfig, tempUnschedCache, privacyClientFactory, proxyRepository, oAuthRefreshAPI)
accountExpiryService := service.ProvideAccountExpiryService(accountRepository)
subscriptionExpiryService := service.ProvideSubscriptionExpiryService(userSubscriptionRepository)
scheduledTestRunnerService := service.ProvideScheduledTestRunnerService(scheduledTestPlanRepository, scheduledTestService, accountTestService, rateLimitService, configConfig)
diff --git a/backend/go.mod b/backend/go.mod
index 627851bf..ebf8d1c2 100644
--- a/backend/go.mod
+++ b/backend/go.mod
@@ -1,6 +1,6 @@
module github.com/Wei-Shaw/sub2api
-go 1.26.2
+go 1.26.1
require (
entgo.io/ent v0.14.5
diff --git a/backend/internal/domain/constants.go b/backend/internal/domain/constants.go
index a57f7067..8863ffb5 100644
--- a/backend/internal/domain/constants.go
+++ b/backend/internal/domain/constants.go
@@ -22,6 +22,7 @@ const (
PlatformOpenAI = "openai"
PlatformGemini = "gemini"
PlatformAntigravity = "antigravity"
+ PlatformKimi = "kimi"
)
// Account type constants
diff --git a/backend/internal/handler/admin/group_handler.go b/backend/internal/handler/admin/group_handler.go
index cb2bd201..93260bc9 100644
--- a/backend/internal/handler/admin/group_handler.go
+++ b/backend/internal/handler/admin/group_handler.go
@@ -84,7 +84,7 @@ func NewGroupHandler(adminService service.AdminService, dashboardService *servic
type CreateGroupRequest struct {
Name string `json:"name" binding:"required"`
Description string `json:"description"`
- Platform string `json:"platform" binding:"omitempty,oneof=anthropic openai gemini antigravity"`
+ Platform string `json:"platform" binding:"omitempty,oneof=anthropic openai gemini antigravity kimi"`
RateMultiplier float64 `json:"rate_multiplier"`
IsExclusive bool `json:"is_exclusive"`
SubscriptionType string `json:"subscription_type" binding:"omitempty,oneof=standard subscription"`
@@ -118,7 +118,7 @@ type CreateGroupRequest struct {
type UpdateGroupRequest struct {
Name string `json:"name"`
Description string `json:"description"`
- Platform string `json:"platform" binding:"omitempty,oneof=anthropic openai gemini antigravity"`
+ Platform string `json:"platform" binding:"omitempty,oneof=anthropic openai gemini antigravity kimi"`
RateMultiplier *float64 `json:"rate_multiplier"`
IsExclusive *bool `json:"is_exclusive"`
Status string `json:"status" binding:"omitempty,oneof=active inactive"`
diff --git a/backend/internal/handler/admin/kimi_oauth_handler.go b/backend/internal/handler/admin/kimi_oauth_handler.go
new file mode 100644
index 00000000..0eeddc8a
--- /dev/null
+++ b/backend/internal/handler/admin/kimi_oauth_handler.go
@@ -0,0 +1,181 @@
+package admin
+
+import (
+ "encoding/json"
+ "strings"
+ "time"
+
+ "github.com/Wei-Shaw/sub2api/internal/pkg/response"
+ "github.com/Wei-Shaw/sub2api/internal/service"
+
+ "github.com/gin-gonic/gin"
+)
+
+// KimiOAuthHandler handles Kimi OAuth admin endpoints.
+type KimiOAuthHandler struct {
+ kimiOAuthService *service.KimiOAuthService
+ proxyRepo service.ProxyRepository
+}
+
+// NewKimiOAuthHandler creates a new KimiOAuthHandler.
+func NewKimiOAuthHandler(kimiOAuthService *service.KimiOAuthService, proxyRepo service.ProxyRepository) *KimiOAuthHandler {
+ return &KimiOAuthHandler{
+ kimiOAuthService: kimiOAuthService,
+ proxyRepo: proxyRepo,
+ }
+}
+
+type kimiDeviceAuthRequest struct {
+ ProxyID *int64 `json:"proxy_id"`
+}
+
+// InitiateDeviceAuth starts the Kimi OAuth device authorization flow.
+// POST /api/v1/admin/kimi/oauth/device-auth
+func (h *KimiOAuthHandler) InitiateDeviceAuth(c *gin.Context) {
+ var req kimiDeviceAuthRequest
+ if err := c.ShouldBindJSON(&req); err != nil {
+ response.BadRequest(c, "Invalid request: "+err.Error())
+ return
+ }
+
+ result, err := h.kimiOAuthService.InitiateDeviceAuth(c.Request.Context(), req.ProxyID, h.proxyRepo)
+ if err != nil {
+ response.InternalError(c, "Failed to initiate device auth: "+err.Error())
+ return
+ }
+
+ response.Success(c, gin.H{
+ "device_code": result.DeviceCode,
+ "user_code": result.UserCode,
+ "verification_uri": result.VerificationURI,
+ "expires_in": result.ExpiresIn,
+ "interval": result.Interval,
+ })
+}
+
+type kimiPollTokenRequest struct {
+ DeviceCode string `json:"device_code" binding:"required"`
+}
+
+// PollToken polls for the OAuth token after device authorization.
+// POST /api/v1/admin/kimi/oauth/token
+func (h *KimiOAuthHandler) PollToken(c *gin.Context) {
+ var req kimiPollTokenRequest
+ if err := c.ShouldBindJSON(&req); err != nil {
+ response.BadRequest(c, "Invalid request: "+err.Error())
+ return
+ }
+
+ tokenInfo, err := h.kimiOAuthService.PollToken(c.Request.Context(), strings.TrimSpace(req.DeviceCode))
+ if err != nil {
+ msg := err.Error()
+ if strings.Contains(msg, "authorization_pending") {
+ response.Success(c, gin.H{
+ "status": "pending",
+ "message": "Authorization pending, please complete authorization in the browser",
+ })
+ return
+ }
+ if strings.Contains(msg, "expired_token") || strings.Contains(msg, "invalid_grant") {
+ response.BadRequest(c, "Token request failed: "+msg)
+ return
+ }
+ response.InternalError(c, "Failed to poll token: "+msg)
+ return
+ }
+
+ response.Success(c, gin.H{
+ "status": "completed",
+ "access_token": tokenInfo.AccessToken,
+ "refresh_token": tokenInfo.RefreshToken,
+ "expires_in": tokenInfo.ExpiresIn,
+ "expires_at": tokenInfo.ExpiresAt,
+ "scope": tokenInfo.Scope,
+ "token_type": tokenInfo.TokenType,
+ })
+}
+
+type kimiRefreshTokenRequest struct {
+ RefreshToken string `json:"refresh_token" binding:"required"`
+ ProxyID *int64 `json:"proxy_id"`
+}
+
+// RefreshToken refreshes the Kimi access token.
+// POST /api/v1/admin/kimi/oauth/refresh
+func (h *KimiOAuthHandler) RefreshToken(c *gin.Context) {
+ var req kimiRefreshTokenRequest
+ if err := c.ShouldBindJSON(&req); err != nil {
+ response.BadRequest(c, "Invalid request: "+err.Error())
+ return
+ }
+
+ var proxyURL string
+ if req.ProxyID != nil && h.proxyRepo != nil {
+ if proxy, err := h.proxyRepo.GetByID(c.Request.Context(), *req.ProxyID); err == nil && proxy != nil {
+ proxyURL = proxy.URL()
+ }
+ }
+
+ tokenInfo, err := h.kimiOAuthService.RefreshToken(c.Request.Context(), strings.TrimSpace(req.RefreshToken), proxyURL)
+ if err != nil {
+ response.BadRequest(c, "Failed to refresh token: "+err.Error())
+ return
+ }
+
+ response.Success(c, gin.H{
+ "access_token": tokenInfo.AccessToken,
+ "refresh_token": tokenInfo.RefreshToken,
+ "expires_in": tokenInfo.ExpiresIn,
+ "expires_at": tokenInfo.ExpiresAt,
+ "scope": tokenInfo.Scope,
+ "token_type": tokenInfo.TokenType,
+ })
+}
+
+// ImportLocalToken imports token from local kimi-cli credentials.
+// POST /api/v1/admin/kimi/oauth/import-local
+func (h *KimiOAuthHandler) ImportLocalToken(c *gin.Context) {
+ var req struct {
+ CredentialsJSON string `json:"credentials_json" binding:"required"`
+ }
+ if err := c.ShouldBindJSON(&req); err != nil {
+ response.BadRequest(c, "Invalid request: "+err.Error())
+ return
+ }
+
+ // Parse the local credentials JSON format
+ var localCreds struct {
+ AccessToken string `json:"access_token"`
+ RefreshToken string `json:"refresh_token"`
+ ExpiresAt float64 `json:"expires_at"`
+ Scope string `json:"scope"`
+ TokenType string `json:"token_type"`
+ ExpiresIn float64 `json:"expires_in"`
+ }
+ if err := json.Unmarshal([]byte(req.CredentialsJSON), &localCreds); err != nil {
+ response.BadRequest(c, "Invalid credentials JSON: "+err.Error())
+ return
+ }
+
+ if strings.TrimSpace(localCreds.AccessToken) == "" {
+ response.BadRequest(c, "access_token is required in credentials")
+ return
+ }
+
+ expiresAt := int64(localCreds.ExpiresAt)
+ if expiresAt == 0 && localCreds.ExpiresIn > 0 {
+ expiresAt = time.Now().Unix() + int64(localCreds.ExpiresIn) - service.KimiTokenSafetyWindow
+ }
+ minExpiresAt := time.Now().Unix() + service.KimiTokenMinTTL
+ if expiresAt < minExpiresAt {
+ expiresAt = minExpiresAt
+ }
+
+ response.Success(c, gin.H{
+ "access_token": localCreds.AccessToken,
+ "refresh_token": localCreds.RefreshToken,
+ "expires_at": expiresAt,
+ "scope": localCreds.Scope,
+ "token_type": localCreds.TokenType,
+ })
+}
diff --git a/backend/internal/handler/gateway_handler_chat_completions.go b/backend/internal/handler/gateway_handler_chat_completions.go
index be267332..430cc359 100644
--- a/backend/internal/handler/gateway_handler_chat_completions.go
+++ b/backend/internal/handler/gateway_handler_chat_completions.go
@@ -210,7 +210,12 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) {
if channelMapping.Mapped {
forwardBody = h.gatewayService.ReplaceModelInBody(body, channelMapping.MappedModel)
}
- result, err := h.gatewayService.ForwardAsChatCompletions(c.Request.Context(), c, account, forwardBody, parsedReq)
+ var result *service.ForwardResult
+ if account.Platform == service.PlatformKimi {
+ result, err = h.gatewayService.ForwardKimiChatCompletions(c.Request.Context(), c, account, forwardBody)
+ } else {
+ result, err = h.gatewayService.ForwardAsChatCompletions(c.Request.Context(), c, account, forwardBody, parsedReq)
+ }
if accountReleaseFunc != nil {
accountReleaseFunc()
diff --git a/backend/internal/handler/handler.go b/backend/internal/handler/handler.go
index 906a74f1..22dd5b22 100644
--- a/backend/internal/handler/handler.go
+++ b/backend/internal/handler/handler.go
@@ -17,6 +17,7 @@ type AdminHandlers struct {
OpenAIOAuth *admin.OpenAIOAuthHandler
GeminiOAuth *admin.GeminiOAuthHandler
AntigravityOAuth *admin.AntigravityOAuthHandler
+ KimiOAuth *admin.KimiOAuthHandler
Proxy *admin.ProxyHandler
Redeem *admin.RedeemHandler
Promo *admin.PromoHandler
diff --git a/backend/internal/handler/wire.go b/backend/internal/handler/wire.go
index 4b54d41a..65efd7a3 100644
--- a/backend/internal/handler/wire.go
+++ b/backend/internal/handler/wire.go
@@ -20,6 +20,7 @@ func ProvideAdminHandlers(
openaiOAuthHandler *admin.OpenAIOAuthHandler,
geminiOAuthHandler *admin.GeminiOAuthHandler,
antigravityOAuthHandler *admin.AntigravityOAuthHandler,
+ kimiOAuthHandler *admin.KimiOAuthHandler,
proxyHandler *admin.ProxyHandler,
redeemHandler *admin.RedeemHandler,
promoHandler *admin.PromoHandler,
@@ -48,6 +49,7 @@ func ProvideAdminHandlers(
OpenAIOAuth: openaiOAuthHandler,
GeminiOAuth: geminiOAuthHandler,
AntigravityOAuth: antigravityOAuthHandler,
+ KimiOAuth: kimiOAuthHandler,
Proxy: proxyHandler,
Redeem: redeemHandler,
Promo: promoHandler,
@@ -142,6 +144,7 @@ var ProviderSet = wire.NewSet(
admin.NewOpenAIOAuthHandler,
admin.NewGeminiOAuthHandler,
admin.NewAntigravityOAuthHandler,
+ admin.NewKimiOAuthHandler,
admin.NewProxyHandler,
admin.NewRedeemHandler,
admin.NewPromoHandler,
diff --git a/backend/internal/model/error_passthrough_rule.go b/backend/internal/model/error_passthrough_rule.go
index 620736cd..f72afc38 100644
--- a/backend/internal/model/error_passthrough_rule.go
+++ b/backend/internal/model/error_passthrough_rule.go
@@ -36,11 +36,12 @@ const (
PlatformOpenAI = "openai"
PlatformGemini = "gemini"
PlatformAntigravity = "antigravity"
+ PlatformKimi = "kimi"
)
// AllPlatforms 返回所有支持的平台列表
func AllPlatforms() []string {
- return []string{PlatformAnthropic, PlatformOpenAI, PlatformGemini, PlatformAntigravity}
+ return []string{PlatformAnthropic, PlatformOpenAI, PlatformGemini, PlatformAntigravity, PlatformKimi}
}
// Validate 验证规则配置的有效性
diff --git a/backend/internal/server/routes/admin.go b/backend/internal/server/routes/admin.go
index 84c963ec..0ec6b7ca 100644
--- a/backend/internal/server/routes/admin.go
+++ b/backend/internal/server/routes/admin.go
@@ -41,6 +41,9 @@ func RegisterAdminRoutes(
// Antigravity OAuth
registerAntigravityOAuthRoutes(admin, h)
+ // Kimi OAuth
+ registerKimiOAuthRoutes(admin, h)
+
// 代理管理
registerProxyRoutes(admin, h)
@@ -338,6 +341,16 @@ func registerAntigravityOAuthRoutes(admin *gin.RouterGroup, h *handler.Handlers)
}
}
+func registerKimiOAuthRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
+ kimi := admin.Group("/kimi")
+ {
+ kimi.POST("/oauth/device-auth", h.Admin.KimiOAuth.InitiateDeviceAuth)
+ kimi.POST("/oauth/token", h.Admin.KimiOAuth.PollToken)
+ kimi.POST("/oauth/refresh", h.Admin.KimiOAuth.RefreshToken)
+ kimi.POST("/oauth/import-local", h.Admin.KimiOAuth.ImportLocalToken)
+ }
+}
+
func registerProxyRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
proxies := admin.Group("/proxies")
{
diff --git a/backend/internal/service/domain_constants.go b/backend/internal/service/domain_constants.go
index 3c6888b8..870d8f7a 100644
--- a/backend/internal/service/domain_constants.go
+++ b/backend/internal/service/domain_constants.go
@@ -24,6 +24,7 @@ const (
PlatformOpenAI = domain.PlatformOpenAI
PlatformGemini = domain.PlatformGemini
PlatformAntigravity = domain.PlatformAntigravity
+ PlatformKimi = domain.PlatformKimi
)
// Account type constants
diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go
index 5a91d0de..8b854cb3 100644
--- a/backend/internal/service/gateway_service.go
+++ b/backend/internal/service/gateway_service.go
@@ -547,6 +547,7 @@ type GatewayService struct {
deferredService *DeferredService
concurrencyService *ConcurrencyService
claudeTokenProvider *ClaudeTokenProvider
+ kimiTokenProvider *KimiTokenProvider
sessionLimitCache SessionLimitCache // 会话数量限制缓存(仅 Anthropic OAuth/SetupToken)
rpmCache RPMCache // RPM 计数缓存(仅 Anthropic OAuth/SetupToken)
userGroupRateResolver *userGroupRateResolver
@@ -585,6 +586,7 @@ func NewGatewayService(
httpUpstream HTTPUpstream,
deferredService *DeferredService,
claudeTokenProvider *ClaudeTokenProvider,
+ kimiTokenProvider *KimiTokenProvider,
sessionLimitCache SessionLimitCache,
rpmCache RPMCache,
digestStore *DigestSessionStore,
@@ -617,6 +619,7 @@ func NewGatewayService(
httpUpstream: httpUpstream,
deferredService: deferredService,
claudeTokenProvider: claudeTokenProvider,
+ kimiTokenProvider: kimiTokenProvider,
sessionLimitCache: sessionLimitCache,
rpmCache: rpmCache,
userGroupRateCache: gocache.New(userGroupRateTTL, time.Minute),
@@ -3466,6 +3469,15 @@ func (s *GatewayService) getOAuthToken(ctx context.Context, account *Account) (s
return accessToken, "oauth", nil
}
+ // 对于 Kimi OAuth 账号,使用 KimiTokenProvider 获取缓存的 token
+ if account.Platform == PlatformKimi && account.Type == AccountTypeOAuth && s.kimiTokenProvider != nil {
+ accessToken, err := s.kimiTokenProvider.GetAccessToken(ctx, account)
+ if err != nil {
+ return "", "", err
+ }
+ return accessToken, "oauth", nil
+ }
+
// 其他情况(Gemini 有自己的 TokenProvider,setup-token 类型等)直接从账号读取
accessToken := account.GetCredential("access_token")
if accessToken == "" {
diff --git a/backend/internal/service/kimi_gateway_service.go b/backend/internal/service/kimi_gateway_service.go
new file mode 100644
index 00000000..2ac13a71
--- /dev/null
+++ b/backend/internal/service/kimi_gateway_service.go
@@ -0,0 +1,251 @@
+package service
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/Wei-Shaw/sub2api/internal/pkg/logger"
+ "github.com/gin-gonic/gin"
+ "github.com/tidwall/gjson"
+)
+
+const (
+ kimiAPIBaseURL = "https://api.kimi.com/coding/v1"
+ kimiChatCompletionsURL = kimiAPIBaseURL + "/chat/completions"
+ kimiDefaultUserAgent = "kimi-cli/1.0"
+)
+
+// ForwardKimiChatCompletions forwards an OpenAI Chat Completions request directly to Kimi API.
+// Kimi API is OpenAI Chat Completions compatible, so the body is forwarded as-is.
+func (s *GatewayService) ForwardKimiChatCompletions(
+ ctx context.Context,
+ c *gin.Context,
+ account *Account,
+ body []byte,
+) (*ForwardResult, error) {
+ startTime := time.Now()
+
+ // 1. Get access token
+ token, tokenType, err := s.GetAccessToken(ctx, account)
+ if err != nil {
+ return nil, fmt.Errorf("get access token: %w", err)
+ }
+
+ // 2. Get proxy URL
+ proxyURL := ""
+ if account.ProxyID != nil && account.Proxy != nil {
+ proxyURL = account.Proxy.URL()
+ }
+
+ // 3. Parse request to determine if streaming
+ reqStream := gjson.GetBytes(body, "stream").Bool()
+
+ // 4. Build upstream request
+ req, err := http.NewRequestWithContext(ctx, "POST", kimiChatCompletionsURL, bytes.NewReader(body))
+ if err != nil {
+ return nil, fmt.Errorf("build upstream request: %w", err)
+ }
+
+ // Set auth header
+ if tokenType == "oauth" {
+ setHeaderRaw(req.Header, "authorization", "Bearer "+token)
+ } else {
+ setHeaderRaw(req.Header, "x-api-key", token)
+ }
+
+ // Set required headers for Kimi API
+ setHeaderRaw(req.Header, "content-type", "application/json")
+ setHeaderRaw(req.Header, "accept", "application/json")
+ // Kimi Coding API requires a known coding agent User-Agent pattern
+ setHeaderRaw(req.Header, "user-agent", kimiDefaultUserAgent)
+
+ // Passthrough allowed client headers
+ var clientHeaders http.Header
+ if c != nil && c.Request != nil {
+ clientHeaders = c.Request.Header
+ }
+ for key, values := range clientHeaders {
+ lowerKey := strings.ToLower(key)
+ if allowedHeaders[lowerKey] {
+ wireKey := resolveWireCasing(key)
+ for _, v := range values {
+ addHeaderRaw(req.Header, wireKey, v)
+ }
+ }
+ }
+
+ // 5. Resolve TLS fingerprint profile
+ tlsProfile := s.tlsFPProfileService.ResolveTLSProfile(account)
+
+ // 6. Send request
+ resp, err := s.httpUpstream.DoWithTLS(req, proxyURL, account.ID, account.Concurrency, tlsProfile)
+ if err != nil {
+ if resp != nil && resp.Body != nil {
+ _ = resp.Body.Close()
+ }
+ safeErr := sanitizeUpstreamErrorMessage(err.Error())
+ setOpsUpstreamError(c, 0, safeErr, "")
+ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
+ Platform: account.Platform,
+ AccountID: account.ID,
+ AccountName: account.Name,
+ UpstreamStatusCode: 0,
+ Kind: "request_error",
+ Message: safeErr,
+ })
+ writeGatewayCCError(c, http.StatusBadGateway, "server_error", "Upstream request failed")
+ return nil, fmt.Errorf("upstream request failed: %s", safeErr)
+ }
+ defer func() { _ = resp.Body.Close() }()
+
+ // 7. Handle error response
+ if resp.StatusCode >= 400 {
+ respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
+ upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
+ upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
+
+ if s.shouldFailoverUpstreamError(resp.StatusCode) {
+ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
+ Platform: account.Platform,
+ AccountID: account.ID,
+ AccountName: account.Name,
+ UpstreamStatusCode: resp.StatusCode,
+ UpstreamRequestID: resp.Header.Get("x-request-id"),
+ Kind: "failover",
+ Message: upstreamMsg,
+ })
+ if s.rateLimitService != nil {
+ s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
+ }
+ return nil, &UpstreamFailoverError{
+ StatusCode: resp.StatusCode,
+ ResponseBody: respBody,
+ }
+ }
+
+ writeGatewayCCError(c, mapUpstreamStatusCode(resp.StatusCode), "server_error", upstreamMsg)
+ return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg)
+ }
+
+ // 8. Handle successful response
+ originalModel := gjson.GetBytes(body, "model").String()
+ mappedModel := originalModel
+ if account.Type == AccountTypeAPIKey {
+ mappedModel = account.GetMappedModel(originalModel)
+ }
+
+ var result *ForwardResult
+ if reqStream {
+ result, err = s.handleKimiStreamingResponse(resp, c, originalModel, mappedModel, startTime)
+ } else {
+ result, err = s.handleKimiNonStreamingResponse(resp, c, originalModel, mappedModel, startTime)
+ }
+
+ return result, err
+}
+
+func (s *GatewayService) handleKimiStreamingResponse(
+ resp *http.Response,
+ c *gin.Context,
+ originalModel, mappedModel string,
+ startTime time.Time,
+) (*ForwardResult, error) {
+ if c == nil || c.Writer == nil {
+ return nil, errors.New("gin context or writer is nil")
+ }
+
+ c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
+ c.Writer.Header().Set("Cache-Control", "no-cache")
+ c.Writer.Header().Set("Connection", "keep-alive")
+ c.Writer.WriteHeader(http.StatusOK)
+
+ flusher, ok := c.Writer.(http.Flusher)
+ if !ok {
+ return nil, errors.New("streaming not supported")
+ }
+
+ scanner := bufio.NewScanner(resp.Body)
+ scanner.Buffer(make([]byte, 4096), defaultMaxLineSize)
+
+ var firstTokenTime *time.Duration
+ var upstreamModel string
+
+ for scanner.Scan() {
+ line := scanner.Text()
+ if line == "" {
+ continue
+ }
+
+ // Detect first token
+ if firstTokenTime == nil {
+ elapsed := time.Since(startTime)
+ firstTokenTime = &elapsed
+ }
+
+ // Write the SSE line as-is
+ fmt.Fprintf(c.Writer, "%s\n", line) //nolint:errcheck
+ flusher.Flush()
+
+ // Try to extract upstream model from the first data line
+ if upstreamModel == "" && strings.HasPrefix(line, "data: {") {
+ upstreamModel = gjson.Get(line[6:], "model").String()
+ }
+ }
+
+ if err := scanner.Err(); err != nil && !errors.Is(err, io.EOF) {
+ logger.LegacyPrintf("service.gateway", "Kimi stream scanner error: %v", err)
+ }
+
+ // Send final [DONE] if not already sent by upstream
+ fmt.Fprintf(c.Writer, "data: [DONE]\n\n") //nolint:errcheck
+ flusher.Flush()
+
+ var firstTokenMs *int
+ if firstTokenTime != nil {
+ ms := int(firstTokenTime.Milliseconds())
+ firstTokenMs = &ms
+ }
+
+ return &ForwardResult{
+ UpstreamModel: upstreamModel,
+ FirstTokenMs: firstTokenMs,
+ }, nil
+}
+
+func (s *GatewayService) handleKimiNonStreamingResponse(
+ resp *http.Response,
+ c *gin.Context,
+ originalModel, mappedModel string,
+ startTime time.Time,
+) (*ForwardResult, error) {
+ body, err := io.ReadAll(io.LimitReader(resp.Body, 50<<20))
+ if err != nil {
+ return nil, fmt.Errorf("read upstream response: %w", err)
+ }
+
+ if c != nil && c.Writer != nil {
+ // Copy upstream headers
+ for key, values := range resp.Header {
+ for _, v := range values {
+ c.Writer.Header().Add(key, v)
+ }
+ }
+ c.Writer.Header().Set("Content-Type", "application/json")
+ c.Writer.WriteHeader(resp.StatusCode)
+ _, _ = c.Writer.Write(body)
+ }
+
+ upstreamModel := gjson.GetBytes(body, "model").String()
+
+ return &ForwardResult{
+ UpstreamModel: upstreamModel,
+ }, nil
+}
+
diff --git a/backend/internal/service/kimi_oauth_service.go b/backend/internal/service/kimi_oauth_service.go
new file mode 100644
index 00000000..5e4960ae
--- /dev/null
+++ b/backend/internal/service/kimi_oauth_service.go
@@ -0,0 +1,303 @@
+package service
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/Wei-Shaw/sub2api/internal/pkg/httpclient"
+ "github.com/Wei-Shaw/sub2api/internal/pkg/logger"
+)
+
+const (
+ kimiAuthBaseURL = "https://auth.kimi.com"
+ kimiDeviceAuthURL = kimiAuthBaseURL + "/api/oauth/device_authorization"
+ kimiTokenURL = kimiAuthBaseURL + "/api/oauth/token"
+ kimiClientID = "17e5f671-d194-4dfb-9706-5516cb48c098"
+ kimiDefaultScope = "kimi-code"
+ KimiTokenSafetyWindow = 300 // 5 minutes
+ KimiTokenMinTTL = 30 // 30 seconds
+)
+
+// KimiDeviceAuthResponse represents the response from the device authorization endpoint.
+type KimiDeviceAuthResponse struct {
+ DeviceCode string `json:"device_code"`
+ UserCode string `json:"user_code"`
+ VerificationURI string `json:"verification_uri"`
+ ExpiresIn int64 `json:"expires_in"`
+ Interval int64 `json:"interval"`
+}
+
+// KimiTokenResponse represents the response from the token endpoint.
+type KimiTokenResponse struct {
+ AccessToken string `json:"access_token"`
+ RefreshToken string `json:"refresh_token"`
+ ExpiresIn int64 `json:"expires_in"`
+ Scope string `json:"scope"`
+ TokenType string `json:"token_type"`
+}
+
+// KimiTokenInfo holds token information for an account.
+type KimiTokenInfo struct {
+ AccessToken string `json:"access_token"`
+ RefreshToken string `json:"refresh_token"`
+ ExpiresIn int64 `json:"expires_in"`
+ ExpiresAt int64 `json:"expires_at"`
+ Scope string `json:"scope,omitempty"`
+ TokenType string `json:"token_type,omitempty"`
+}
+
+// KimiOAuthService handles Kimi OAuth device flow and token refresh.
+type KimiOAuthService struct {
+ sessions sync.Map // device_code -> *kimiOAuthSession
+}
+
+type kimiOAuthSession struct {
+ DeviceCode string
+ UserCode string
+ VerificationURI string
+ ExpiresAt time.Time
+ Interval time.Duration
+ ProxyURL string
+}
+
+// NewKimiOAuthService creates a new KimiOAuthService.
+func NewKimiOAuthService() *KimiOAuthService {
+ return &KimiOAuthService{}
+}
+
+// InitiateDeviceAuth starts the OAuth 2.0 Device Authorization Grant flow.
+func (s *KimiOAuthService) InitiateDeviceAuth(ctx context.Context, proxyID *int64, proxyRepo ProxyRepository) (*KimiDeviceAuthResponse, error) {
+ data := url.Values{}
+ data.Set("client_id", kimiClientID)
+ data.Set("scope", kimiDefaultScope)
+
+ var proxyURL string
+ if proxyID != nil && proxyRepo != nil {
+ if proxy, err := proxyRepo.GetByID(ctx, *proxyID); err == nil && proxy != nil {
+ proxyURL = proxy.URL()
+ }
+ }
+
+ req, err := http.NewRequestWithContext(ctx, "POST", kimiDeviceAuthURL, strings.NewReader(data.Encode()))
+ if err != nil {
+ return nil, fmt.Errorf("failed to create device auth request: %w", err)
+ }
+ req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+ req.Header.Set("Accept", "application/json")
+
+ resp, err := s.doRequest(req, proxyURL)
+ if err != nil {
+ return nil, fmt.Errorf("device auth request failed: %w", err)
+ }
+ defer func() { _ = resp.Body.Close() }()
+
+ body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
+ if err != nil {
+ return nil, fmt.Errorf("failed to read device auth response: %w", err)
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("device auth failed: status=%d body=%s", resp.StatusCode, string(body))
+ }
+
+ var result KimiDeviceAuthResponse
+ if err := json.Unmarshal(body, &result); err != nil {
+ return nil, fmt.Errorf("failed to parse device auth response: %w", err)
+ }
+
+ interval := time.Duration(result.Interval) * time.Second
+ if interval <= 0 {
+ interval = 5 * time.Second
+ }
+
+ session := &kimiOAuthSession{
+ DeviceCode: result.DeviceCode,
+ UserCode: result.UserCode,
+ VerificationURI: result.VerificationURI,
+ ExpiresAt: time.Now().Add(time.Duration(result.ExpiresIn) * time.Second),
+ Interval: interval,
+ ProxyURL: proxyURL,
+ }
+ s.sessions.Store(result.DeviceCode, session)
+
+ logger.LegacyPrintf("service.kimi_oauth", "[KimiOAuth] Device auth initiated: user_code=%s", result.UserCode)
+ return &result, nil
+}
+
+// PollToken polls the token endpoint for a device_code.
+func (s *KimiOAuthService) PollToken(ctx context.Context, deviceCode string) (*KimiTokenInfo, error) {
+ sessionVal, ok := s.sessions.Load(deviceCode)
+ if !ok {
+ return nil, fmt.Errorf("session not found or expired")
+ }
+ session := sessionVal.(*kimiOAuthSession)
+
+ if time.Now().After(session.ExpiresAt) {
+ s.sessions.Delete(deviceCode)
+ return nil, fmt.Errorf("device auth session expired")
+ }
+
+ data := url.Values{}
+ data.Set("client_id", kimiClientID)
+ data.Set("grant_type", "urn:ietf:params:oauth:grant-type:device_code")
+ data.Set("device_code", deviceCode)
+
+ req, err := http.NewRequestWithContext(ctx, "POST", kimiTokenURL, strings.NewReader(data.Encode()))
+ if err != nil {
+ return nil, fmt.Errorf("failed to create token request: %w", err)
+ }
+ req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+ req.Header.Set("Accept", "application/json")
+
+ resp, err := s.doRequest(req, session.ProxyURL)
+ if err != nil {
+ return nil, fmt.Errorf("token request failed: %w", err)
+ }
+ defer func() { _ = resp.Body.Close() }()
+
+ body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
+ if err != nil {
+ return nil, fmt.Errorf("failed to read token response: %w", err)
+ }
+
+ if resp.StatusCode == http.StatusBadRequest || resp.StatusCode == http.StatusForbidden {
+ // Check for authorization_pending
+ var errResp struct {
+ Error string `json:"error"`
+ }
+ if json.Unmarshal(body, &errResp) == nil && errResp.Error == "authorization_pending" {
+ return nil, fmt.Errorf("authorization_pending")
+ }
+ if errResp.Error == "expired_token" || errResp.Error == "invalid_grant" {
+ s.sessions.Delete(deviceCode)
+ }
+ return nil, fmt.Errorf("token poll failed: %s", errResp.Error)
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("token poll failed: status=%d body=%s", resp.StatusCode, string(body))
+ }
+
+ var tokenResp KimiTokenResponse
+ if err := json.Unmarshal(body, &tokenResp); err != nil {
+ return nil, fmt.Errorf("failed to parse token response: %w", err)
+ }
+
+ s.sessions.Delete(deviceCode)
+
+ expiresAt := time.Now().Unix() + tokenResp.ExpiresIn - KimiTokenSafetyWindow
+ minExpiresAt := time.Now().Unix() + KimiTokenMinTTL
+ if expiresAt < minExpiresAt {
+ expiresAt = minExpiresAt
+ }
+
+ logger.LegacyPrintf("service.kimi_oauth", "[KimiOAuth] Token obtained successfully")
+ return &KimiTokenInfo{
+ AccessToken: tokenResp.AccessToken,
+ RefreshToken: tokenResp.RefreshToken,
+ ExpiresIn: tokenResp.ExpiresIn,
+ ExpiresAt: expiresAt,
+ Scope: tokenResp.Scope,
+ TokenType: tokenResp.TokenType,
+ }, nil
+}
+
+// RefreshToken refreshes the access token using a refresh token.
+func (s *KimiOAuthService) RefreshToken(ctx context.Context, refreshToken, proxyURL string) (*KimiTokenInfo, error) {
+ if strings.TrimSpace(refreshToken) == "" {
+ return nil, fmt.Errorf("refresh_token is empty")
+ }
+
+ data := url.Values{}
+ data.Set("client_id", kimiClientID)
+ data.Set("grant_type", "refresh_token")
+ data.Set("refresh_token", refreshToken)
+
+ req, err := http.NewRequestWithContext(ctx, "POST", kimiTokenURL, strings.NewReader(data.Encode()))
+ if err != nil {
+ return nil, fmt.Errorf("failed to create refresh request: %w", err)
+ }
+ req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+ req.Header.Set("Accept", "application/json")
+
+ resp, err := s.doRequest(req, proxyURL)
+ if err != nil {
+ return nil, fmt.Errorf("refresh request failed: %w", err)
+ }
+ defer func() { _ = resp.Body.Close() }()
+
+ body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
+ if err != nil {
+ return nil, fmt.Errorf("failed to read refresh response: %w", err)
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("refresh failed: status=%d body=%s", resp.StatusCode, string(body))
+ }
+
+ var tokenResp KimiTokenResponse
+ if err := json.Unmarshal(body, &tokenResp); err != nil {
+ return nil, fmt.Errorf("failed to parse refresh response: %w", err)
+ }
+
+ expiresAt := time.Now().Unix() + tokenResp.ExpiresIn - KimiTokenSafetyWindow
+ minExpiresAt := time.Now().Unix() + KimiTokenMinTTL
+ if expiresAt < minExpiresAt {
+ expiresAt = minExpiresAt
+ }
+
+ return &KimiTokenInfo{
+ AccessToken: tokenResp.AccessToken,
+ RefreshToken: tokenResp.RefreshToken,
+ ExpiresIn: tokenResp.ExpiresIn,
+ ExpiresAt: expiresAt,
+ Scope: tokenResp.Scope,
+ TokenType: tokenResp.TokenType,
+ }, nil
+}
+
+// BuildAccountCredentials builds the credentials map for a Kimi account.
+func (s *KimiOAuthService) BuildAccountCredentials(tokenInfo *KimiTokenInfo) map[string]any {
+ creds := map[string]any{
+ "access_token": tokenInfo.AccessToken,
+ "expires_at": strconv.FormatInt(tokenInfo.ExpiresAt, 10),
+ }
+ if tokenInfo.RefreshToken != "" {
+ creds["refresh_token"] = tokenInfo.RefreshToken
+ }
+ if tokenInfo.TokenType != "" {
+ creds["token_type"] = tokenInfo.TokenType
+ }
+ if tokenInfo.Scope != "" {
+ creds["scope"] = tokenInfo.Scope
+ }
+ return creds
+}
+
+// Stop cleans up the service.
+func (s *KimiOAuthService) Stop() {
+ s.sessions.Range(func(key, value any) bool {
+ s.sessions.Delete(key)
+ return true
+ })
+}
+
+func (s *KimiOAuthService) doRequest(req *http.Request, proxyURL string) (*http.Response, error) {
+ opts := httpclient.Options{
+ ProxyURL: proxyURL,
+ Timeout: 30 * time.Second,
+ }
+ client, err := httpclient.GetClient(opts)
+ if err != nil {
+ return nil, err
+ }
+ return client.Do(req)
+}
diff --git a/backend/internal/service/kimi_token_provider.go b/backend/internal/service/kimi_token_provider.go
new file mode 100644
index 00000000..4063769a
--- /dev/null
+++ b/backend/internal/service/kimi_token_provider.go
@@ -0,0 +1,198 @@
+package service
+
+import (
+ "context"
+ "errors"
+ "log/slog"
+ "strconv"
+ "strings"
+ "time"
+)
+
+const (
+ kimiTokenRefreshSkew = 3 * time.Minute
+ kimiTokenCacheSkew = 5 * time.Minute
+)
+
+// KimiTokenProvider manages access_token for Kimi OAuth accounts.
+type KimiTokenProvider struct {
+ accountRepo AccountRepository
+ tokenCache GeminiTokenCache
+ kimiOAuthService *KimiOAuthService
+ refreshAPI *OAuthRefreshAPI
+ executor OAuthRefreshExecutor
+ refreshPolicy ProviderRefreshPolicy
+}
+
+// NewKimiTokenProvider creates a new KimiTokenProvider.
+func NewKimiTokenProvider(
+ accountRepo AccountRepository,
+ tokenCache GeminiTokenCache,
+ kimiOAuthService *KimiOAuthService,
+) *KimiTokenProvider {
+ return &KimiTokenProvider{
+ accountRepo: accountRepo,
+ tokenCache: tokenCache,
+ kimiOAuthService: kimiOAuthService,
+ refreshPolicy: KimiProviderRefreshPolicy(),
+ }
+}
+
+// SetRefreshAPI injects unified OAuth refresh API and executor.
+func (p *KimiTokenProvider) SetRefreshAPI(api *OAuthRefreshAPI, executor OAuthRefreshExecutor) {
+ p.refreshAPI = api
+ p.executor = executor
+}
+
+// SetRefreshPolicy injects caller-side refresh policy.
+func (p *KimiTokenProvider) SetRefreshPolicy(policy ProviderRefreshPolicy) {
+ p.refreshPolicy = policy
+}
+
+// GetAccessToken returns a valid access token for the given Kimi account.
+func (p *KimiTokenProvider) GetAccessToken(ctx context.Context, account *Account) (string, error) {
+ if account == nil {
+ return "", errors.New("account is nil")
+ }
+ if account.Platform != PlatformKimi || account.Type != AccountTypeOAuth {
+ return "", errors.New("not a kimi oauth account")
+ }
+
+ cacheKey := KimiTokenCacheKey(account)
+
+ // 1) Try cache first.
+ if p.tokenCache != nil {
+ if token, err := p.tokenCache.GetAccessToken(ctx, cacheKey); err == nil && strings.TrimSpace(token) != "" {
+ return token, nil
+ }
+ }
+
+ // 2) Refresh if needed (pre-expiry skew).
+ expiresAt := account.GetCredentialAsTime("expires_at")
+ needsRefresh := expiresAt == nil || time.Until(*expiresAt) <= kimiTokenRefreshSkew
+
+ if needsRefresh && p.refreshAPI != nil && p.executor != nil {
+ result, err := p.refreshAPI.RefreshIfNeeded(ctx, account, p.executor, kimiTokenRefreshSkew)
+ if err != nil {
+ if p.refreshPolicy.OnRefreshError == ProviderRefreshErrorReturn {
+ return "", err
+ }
+ } else if result.LockHeld {
+ if p.refreshPolicy.OnLockHeld == ProviderLockHeldWaitForCache && p.tokenCache != nil {
+ if token, cacheErr := p.tokenCache.GetAccessToken(ctx, cacheKey); cacheErr == nil && strings.TrimSpace(token) != "" {
+ return token, nil
+ }
+ }
+ slog.Debug("kimi_token_lock_held_use_old", "account_id", account.ID)
+ } else {
+ account = result.Account
+ expiresAt = account.GetCredentialAsTime("expires_at")
+ }
+ } else if needsRefresh && p.tokenCache != nil {
+ locked, lockErr := p.tokenCache.AcquireRefreshLock(ctx, cacheKey, 30*time.Second)
+ if lockErr == nil && locked {
+ defer func() { _ = p.tokenCache.ReleaseRefreshLock(ctx, cacheKey) }()
+ } else if lockErr != nil {
+ slog.Warn("kimi_token_lock_failed", "account_id", account.ID, "error", lockErr)
+ }
+ }
+
+ accessToken := account.GetCredential("access_token")
+ if strings.TrimSpace(accessToken) == "" {
+ return "", errors.New("access_token not found in credentials")
+ }
+
+ // 3) Populate cache with TTL.
+ if p.tokenCache != nil {
+ latestAccount, isStale := CheckTokenVersion(ctx, account, p.accountRepo)
+ if isStale && latestAccount != nil {
+ slog.Debug("kimi_token_version_stale_use_latest", "account_id", account.ID)
+ accessToken = latestAccount.GetCredential("access_token")
+ if strings.TrimSpace(accessToken) == "" {
+ return "", errors.New("access_token not found after version check")
+ }
+ } else {
+ ttl := 30 * time.Minute
+ if expiresAt != nil {
+ until := time.Until(*expiresAt)
+ switch {
+ case until > kimiTokenCacheSkew:
+ ttl = until - kimiTokenCacheSkew
+ case until > 0:
+ ttl = until
+ default:
+ ttl = time.Minute
+ }
+ }
+ _ = p.tokenCache.SetAccessToken(ctx, cacheKey, accessToken, ttl)
+ }
+ }
+
+ return accessToken, nil
+}
+
+// KimiTokenCacheKey returns the cache key for a Kimi account.
+func KimiTokenCacheKey(account *Account) string {
+ return "kimi:account:" + strconv.FormatInt(account.ID, 10)
+}
+
+// KimiProviderRefreshPolicy returns the default refresh policy for Kimi.
+func KimiProviderRefreshPolicy() ProviderRefreshPolicy {
+ return ProviderRefreshPolicy{
+ OnRefreshError: ProviderRefreshErrorReturn,
+ OnLockHeld: ProviderLockHeldWaitForCache,
+ }
+}
+
+// KimiTokenRefresher implements OAuthRefreshExecutor for Kimi.
+type KimiTokenRefresher struct {
+ kimiOAuthService *KimiOAuthService
+}
+
+// NewKimiTokenRefresher creates a new KimiTokenRefresher.
+func NewKimiTokenRefresher(kimiOAuthService *KimiOAuthService) *KimiTokenRefresher {
+ return &KimiTokenRefresher{kimiOAuthService: kimiOAuthService}
+}
+
+// CanRefresh checks if this refresher can handle the given account.
+func (r *KimiTokenRefresher) CanRefresh(account *Account) bool {
+ return account != nil && account.Platform == PlatformKimi && account.Type == AccountTypeOAuth
+}
+
+// NeedsRefresh checks if the account needs token refresh.
+func (r *KimiTokenRefresher) NeedsRefresh(account *Account, refreshWindow time.Duration) bool {
+ if account == nil {
+ return false
+ }
+ expiresAt := account.GetCredentialAsTime("expires_at")
+ return expiresAt == nil || time.Until(*expiresAt) <= refreshWindow
+}
+
+// Refresh performs the token refresh for a Kimi account.
+func (r *KimiTokenRefresher) Refresh(ctx context.Context, account *Account) (map[string]any, error) {
+ if account == nil || r.kimiOAuthService == nil {
+ return nil, errors.New("kimi token refresher not initialized")
+ }
+
+ refreshToken := account.GetCredential("refresh_token")
+ if strings.TrimSpace(refreshToken) == "" {
+ return nil, errors.New("no refresh token available")
+ }
+
+ var proxyURL string
+ if account.ProxyID != nil && account.Proxy != nil {
+ proxyURL = account.Proxy.URL()
+ }
+
+ tokenInfo, err := r.kimiOAuthService.RefreshToken(ctx, refreshToken, proxyURL)
+ if err != nil {
+ return nil, err
+ }
+
+ return r.kimiOAuthService.BuildAccountCredentials(tokenInfo), nil
+}
+
+// CacheKey returns the cache key for distributed locking.
+func (r *KimiTokenRefresher) CacheKey(account *Account) string {
+ return KimiTokenCacheKey(account)
+}
diff --git a/backend/internal/service/token_cache_invalidator.go b/backend/internal/service/token_cache_invalidator.go
index 74c9edc3..15b99182 100644
--- a/backend/internal/service/token_cache_invalidator.go
+++ b/backend/internal/service/token_cache_invalidator.go
@@ -46,6 +46,9 @@ func (c *CompositeTokenCacheInvalidator) InvalidateToken(ctx context.Context, ac
keysToDelete = append(keysToDelete, OpenAITokenCacheKey(account))
case PlatformAnthropic:
keysToDelete = append(keysToDelete, ClaudeTokenCacheKey(account))
+ case PlatformKimi:
+ keysToDelete = append(keysToDelete, KimiTokenCacheKey(account))
+ keysToDelete = append(keysToDelete, "kimi:"+accountIDKey)
default:
return nil
}
diff --git a/backend/internal/service/token_refresh_service.go b/backend/internal/service/token_refresh_service.go
index 22f4aa29..5dc8bb26 100644
--- a/backend/internal/service/token_refresh_service.go
+++ b/backend/internal/service/token_refresh_service.go
@@ -84,6 +84,16 @@ func NewTokenRefreshService(
return s
}
+// SetKimiOAuthService 注入 Kimi OAuth 服务并注册刷新器
+func (s *TokenRefreshService) SetKimiOAuthService(kimiOAuthService *KimiOAuthService) {
+ if kimiOAuthService == nil {
+ return
+ }
+ kimiRefresher := NewKimiTokenRefresher(kimiOAuthService)
+ s.refreshers = append(s.refreshers, kimiRefresher)
+ s.executors = append(s.executors, kimiRefresher)
+}
+
// SetPrivacyDeps 注入 OpenAI privacy opt-out 所需依赖
func (s *TokenRefreshService) SetPrivacyDeps(factory PrivacyClientFactory, proxyRepo ProxyRepository) {
s.privacyClientFactory = factory
diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go
index 9f33c46a..502a69ce 100644
--- a/backend/internal/service/wire.go
+++ b/backend/internal/service/wire.go
@@ -46,6 +46,7 @@ func ProvideTokenRefreshService(
openaiOAuthService *OpenAIOAuthService,
geminiOAuthService *GeminiOAuthService,
antigravityOAuthService *AntigravityOAuthService,
+ kimiOAuthService *KimiOAuthService,
cacheInvalidator TokenCacheInvalidator,
schedulerCache SchedulerCache,
cfg *config.Config,
@@ -55,6 +56,8 @@ func ProvideTokenRefreshService(
refreshAPI *OAuthRefreshAPI,
) *TokenRefreshService {
svc := NewTokenRefreshService(accountRepo, oauthService, openaiOAuthService, geminiOAuthService, antigravityOAuthService, cacheInvalidator, schedulerCache, cfg, tempUnschedCache)
+ // 注入 Kimi OAuth 服务
+ svc.SetKimiOAuthService(kimiOAuthService)
// 注入 OpenAI privacy opt-out 依赖
svc.SetPrivacyDeps(privacyClientFactory, proxyRepo)
// 注入统一 OAuth 刷新 API(消除 TokenRefreshService 与 TokenProvider 之间的竞争条件)
@@ -123,6 +126,20 @@ func ProvideAntigravityTokenProvider(
return p
}
+// ProvideKimiTokenProvider creates KimiTokenProvider with OAuthRefreshAPI injection
+func ProvideKimiTokenProvider(
+ accountRepo AccountRepository,
+ tokenCache GeminiTokenCache,
+ kimiOAuthService *KimiOAuthService,
+ refreshAPI *OAuthRefreshAPI,
+) *KimiTokenProvider {
+ p := NewKimiTokenProvider(accountRepo, tokenCache, kimiOAuthService)
+ executor := NewKimiTokenRefresher(kimiOAuthService)
+ p.SetRefreshAPI(refreshAPI, executor)
+ p.SetRefreshPolicy(KimiProviderRefreshPolicy())
+ return p
+}
+
// ProvideDashboardAggregationService 创建并启动仪表盘聚合服务
func ProvideDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService {
svc := NewDashboardAggregationService(repo, timingWheel, cfg)
@@ -409,8 +426,10 @@ var ProviderSet = wire.NewSet(
NewCompositeTokenCacheInvalidator,
wire.Bind(new(TokenCacheInvalidator), new(*CompositeTokenCacheInvalidator)),
NewAntigravityOAuthService,
+ NewKimiOAuthService,
NewOAuthRefreshAPI,
ProvideGeminiTokenProvider,
+ ProvideKimiTokenProvider,
NewGeminiMessagesCompatService,
ProvideAntigravityTokenProvider,
ProvideOpenAITokenProvider,
diff --git a/frontend/src/components/account/CreateAccountModal.vue b/frontend/src/components/account/CreateAccountModal.vue
index 2130c9ab..92c82f76 100644
--- a/frontend/src/components/account/CreateAccountModal.vue
+++ b/frontend/src/components/account/CreateAccountModal.vue
@@ -147,6 +147,21 @@
+ {{ t('admin.accounts.oauth.kimi.deviceFlowDesc') }} +
+ + ++ {{ kimiOAuth.error.value }} +
++ {{ t('admin.accounts.oauth.kimi.step1OpenLink') }} +
+
+ {{ kimiOAuth.userCode.value }}
+
+ + {{ t('admin.accounts.oauth.kimi.step2Wait') }} +
++ {{ kimiOAuth.error.value }} +
+