From af3efbc525c5f82588994c5046ac2075b3b9e721 Mon Sep 17 00:00:00 2001 From: openclaw Date: Thu, 23 Apr 2026 02:48:44 +0800 Subject: [PATCH] feat(kimi): add Kimi Code OAuth device flow support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backend: - Add Kimi OAuth service (InitiateDeviceAuth, PollToken, RefreshToken) - Add Kimi token provider with caching and auto-refresh - Add Kimi gateway service for OpenAI-compatible chat completions - Add admin routes for /kimi/oauth/* endpoints - Wire up all new services - Add PlatformKimi to error passthrough rule constants - Add 'kimi' to group handler platform validation tags - Fix httpclient usage in Kimi OAuth service (use GetClient with opts) - Add CanRefresh method to KimiTokenRefresher - Remove duplicate ProvideKimiTokenProvider from wire ProviderSet - Remove unused imports - Lower go.mod requirement to 1.26.1 for local toolchain compatibility - Add verification_uri_complete support for seamless device auth Frontend: - Add useKimiOAuth composable for device flow - Integrate Kimi device flow UI in CreateAccountModal Step 2 - Add kimi to GroupPlatform and AccountPlatform types - Add i18n translations for device flow (zh/en) - Fix Icon name lock-closed -> key - Add kimi platform colors/styles to platformColors.ts - Add kimi to AccountTableFilters platform options - Add kimi to ChannelsView platformOrder - Use verification_uri_complete for auto-filled user_code [宪宪/Kimi-1.6🐾] --- backend/cmd/server/wire.go | 5 + backend/cmd/server/wire_gen.go | 9 +- backend/go.mod | 2 +- backend/internal/domain/constants.go | 1 + .../internal/handler/admin/group_handler.go | 4 +- .../handler/admin/kimi_oauth_handler.go | 182 +++++++++ .../gateway_handler_chat_completions.go | 7 +- backend/internal/handler/handler.go | 1 + backend/internal/handler/wire.go | 3 + .../internal/model/error_passthrough_rule.go | 3 +- backend/internal/server/routes/admin.go | 13 + backend/internal/service/domain_constants.go | 1 + backend/internal/service/gateway_service.go | 12 + .../internal/service/kimi_gateway_service.go | 251 +++++++++++++ .../internal/service/kimi_oauth_service.go | 306 +++++++++++++++ .../internal/service/kimi_token_provider.go | 198 ++++++++++ .../service/token_cache_invalidator.go | 3 + .../internal/service/token_refresh_service.go | 10 + backend/internal/service/wire.go | 19 + .../components/account/CreateAccountModal.vue | 352 +++++++++++++++++- .../admin/account/AccountTableFilters.vue | 2 +- frontend/src/composables/useKimiOAuth.ts | 196 ++++++++++ frontend/src/i18n/locales/en.ts | 9 + frontend/src/i18n/locales/zh.ts | 9 + frontend/src/types/index.ts | 4 +- frontend/src/utils/platformColors.ts | 16 +- frontend/src/views/admin/ChannelsView.vue | 2 +- 27 files changed, 1603 insertions(+), 17 deletions(-) create mode 100644 backend/internal/handler/admin/kimi_oauth_handler.go create mode 100644 backend/internal/service/kimi_gateway_service.go create mode 100644 backend/internal/service/kimi_oauth_service.go create mode 100644 backend/internal/service/kimi_token_provider.go create mode 100644 frontend/src/composables/useKimiOAuth.ts diff --git a/backend/cmd/server/wire.go b/backend/cmd/server/wire.go index 64709b5b..53e4777e 100644 --- a/backend/cmd/server/wire.go +++ b/backend/cmd/server/wire.go @@ -92,6 +92,7 @@ func provideCleanup( oauth *service.OAuthService, openaiOAuth *service.OpenAIOAuthService, geminiOAuth *service.GeminiOAuthService, + kimiOAuth *service.KimiOAuthService, antigravityOAuth *service.AntigravityOAuthService, openAIGateway *service.OpenAIGatewayService, scheduledTestRunner *service.ScheduledTestRunnerService, @@ -211,6 +212,10 @@ func provideCleanup( geminiOAuth.Stop() return nil }}, + {"KimiOAuthService", func() error { + kimiOAuth.Stop() + return nil + }}, {"AntigravityOAuthService", func() error { antigravityOAuth.Stop() return nil diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 3b474c4a..0f13dcfa 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -121,6 +121,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { driveClient := repository.NewGeminiDriveClient() geminiOAuthService := service.NewGeminiOAuthService(proxyRepository, geminiOAuthClient, geminiCliCodeAssistClient, driveClient, configConfig) antigravityOAuthService := service.NewAntigravityOAuthService(proxyRepository) + kimiOAuthService := service.NewKimiOAuthService() geminiQuotaService := service.NewGeminiQuotaService(configConfig, settingRepository) tempUnschedCache := repository.NewTempUnschedCache(redisClient) timeoutCounterCache := repository.NewTimeoutCounterCache(redisClient) @@ -158,6 +159,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { openAIOAuthHandler := admin.NewOpenAIOAuthHandler(openAIOAuthService, adminService) geminiOAuthHandler := admin.NewGeminiOAuthHandler(geminiOAuthService) antigravityOAuthHandler := admin.NewAntigravityOAuthHandler(antigravityOAuthService) + kimiOAuthHandler := admin.NewKimiOAuthHandler(kimiOAuthService, proxyRepository) proxyHandler := admin.NewProxyHandler(adminService) adminRedeemHandler := admin.NewRedeemHandler(adminService, redeemService) promoHandler := admin.NewPromoHandler(promoService) @@ -172,12 +174,13 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { identityService := service.NewIdentityService(identityCache) deferredService := service.ProvideDeferredService(accountRepository, timingWheelService) claudeTokenProvider := service.ProvideClaudeTokenProvider(accountRepository, geminiTokenCache, oAuthService, oAuthRefreshAPI) + kimiTokenProvider := service.ProvideKimiTokenProvider(accountRepository, geminiTokenCache, kimiOAuthService, oAuthRefreshAPI) digestSessionStore := service.NewDigestSessionStore() channelRepository := repository.NewChannelRepository(db) channelService := service.NewChannelService(channelRepository, apiKeyAuthCacheInvalidator) modelPricingResolver := service.NewModelPricingResolver(channelService, billingService) balanceNotifyService := service.ProvideBalanceNotifyService(emailService, settingRepository, accountRepository) - gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, usageBillingRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService, claudeTokenProvider, sessionLimitCache, rpmCache, digestSessionStore, settingService, tlsFingerprintProfileService, channelService, modelPricingResolver, balanceNotifyService) + gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, usageBillingRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService, claudeTokenProvider, kimiTokenProvider, sessionLimitCache, rpmCache, digestSessionStore, settingService, tlsFingerprintProfileService, channelService, modelPricingResolver, balanceNotifyService) openAITokenProvider := service.ProvideOpenAITokenProvider(accountRepository, geminiTokenCache, openAIOAuthService, oAuthRefreshAPI) openAIGatewayService := service.NewOpenAIGatewayService(accountRepository, usageLogRepository, usageBillingRepository, userRepository, userSubscriptionRepository, userGroupRateRepository, gatewayCache, configConfig, schedulerSnapshotService, concurrencyService, billingService, rateLimitService, billingCacheService, httpUpstream, deferredService, openAITokenProvider, modelPricingResolver, channelService, balanceNotifyService) geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, schedulerSnapshotService, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig) @@ -221,7 +224,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { settingHandler := admin.NewSettingHandler(settingService, emailService, turnstileService, opsService, paymentConfigService, paymentService) paymentOrderExpiryService := service.ProvidePaymentOrderExpiryService(paymentService) paymentHandler := admin.NewPaymentHandler(paymentService, paymentConfigService) - adminHandlers := handler.ProvideAdminHandlers(dashboardHandler, adminUserHandler, groupHandler, accountHandler, adminAnnouncementHandler, dataManagementHandler, backupHandler, oAuthHandler, openAIOAuthHandler, geminiOAuthHandler, antigravityOAuthHandler, proxyHandler, adminRedeemHandler, promoHandler, settingHandler, opsHandler, systemHandler, adminSubscriptionHandler, adminUsageHandler, userAttributeHandler, errorPassthroughHandler, tlsFingerprintProfileHandler, adminAPIKeyHandler, scheduledTestHandler, channelHandler, paymentHandler) + adminHandlers := handler.ProvideAdminHandlers(dashboardHandler, adminUserHandler, groupHandler, accountHandler, adminAnnouncementHandler, dataManagementHandler, backupHandler, oAuthHandler, openAIOAuthHandler, geminiOAuthHandler, antigravityOAuthHandler, kimiOAuthHandler, proxyHandler, adminRedeemHandler, promoHandler, settingHandler, opsHandler, systemHandler, adminSubscriptionHandler, adminUsageHandler, userAttributeHandler, errorPassthroughHandler, tlsFingerprintProfileHandler, adminAPIKeyHandler, scheduledTestHandler, channelHandler, paymentHandler) usageRecordWorkerPool := service.NewUsageRecordWorkerPool(configConfig) userMsgQueueCache := repository.NewUserMsgQueueCache(redisClient) userMessageQueueService := service.ProvideUserMessageQueueService(userMsgQueueCache, rpmCache, configConfig) @@ -244,7 +247,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { opsAlertEvaluatorService := service.ProvideOpsAlertEvaluatorService(opsService, opsRepository, emailService, redisClient, configConfig) opsCleanupService := service.ProvideOpsCleanupService(opsRepository, db, redisClient, configConfig) opsScheduledReportService := service.ProvideOpsScheduledReportService(opsService, userService, emailService, redisClient, configConfig) - tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, schedulerCache, configConfig, tempUnschedCache, privacyClientFactory, proxyRepository, oAuthRefreshAPI) + tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, kimiOAuthService, compositeTokenCacheInvalidator, schedulerCache, configConfig, tempUnschedCache, privacyClientFactory, proxyRepository, oAuthRefreshAPI) accountExpiryService := service.ProvideAccountExpiryService(accountRepository) subscriptionExpiryService := service.ProvideSubscriptionExpiryService(userSubscriptionRepository) scheduledTestRunnerService := service.ProvideScheduledTestRunnerService(scheduledTestPlanRepository, scheduledTestService, accountTestService, rateLimitService, configConfig) diff --git a/backend/go.mod b/backend/go.mod index 627851bf..ebf8d1c2 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -1,6 +1,6 @@ module github.com/Wei-Shaw/sub2api -go 1.26.2 +go 1.26.1 require ( entgo.io/ent v0.14.5 diff --git a/backend/internal/domain/constants.go b/backend/internal/domain/constants.go index a57f7067..8863ffb5 100644 --- a/backend/internal/domain/constants.go +++ b/backend/internal/domain/constants.go @@ -22,6 +22,7 @@ const ( PlatformOpenAI = "openai" PlatformGemini = "gemini" PlatformAntigravity = "antigravity" + PlatformKimi = "kimi" ) // Account type constants diff --git a/backend/internal/handler/admin/group_handler.go b/backend/internal/handler/admin/group_handler.go index cb2bd201..93260bc9 100644 --- a/backend/internal/handler/admin/group_handler.go +++ b/backend/internal/handler/admin/group_handler.go @@ -84,7 +84,7 @@ func NewGroupHandler(adminService service.AdminService, dashboardService *servic type CreateGroupRequest struct { Name string `json:"name" binding:"required"` Description string `json:"description"` - Platform string `json:"platform" binding:"omitempty,oneof=anthropic openai gemini antigravity"` + Platform string `json:"platform" binding:"omitempty,oneof=anthropic openai gemini antigravity kimi"` RateMultiplier float64 `json:"rate_multiplier"` IsExclusive bool `json:"is_exclusive"` SubscriptionType string `json:"subscription_type" binding:"omitempty,oneof=standard subscription"` @@ -118,7 +118,7 @@ type CreateGroupRequest struct { type UpdateGroupRequest struct { Name string `json:"name"` Description string `json:"description"` - Platform string `json:"platform" binding:"omitempty,oneof=anthropic openai gemini antigravity"` + Platform string `json:"platform" binding:"omitempty,oneof=anthropic openai gemini antigravity kimi"` RateMultiplier *float64 `json:"rate_multiplier"` IsExclusive *bool `json:"is_exclusive"` Status string `json:"status" binding:"omitempty,oneof=active inactive"` diff --git a/backend/internal/handler/admin/kimi_oauth_handler.go b/backend/internal/handler/admin/kimi_oauth_handler.go new file mode 100644 index 00000000..5ef69c9f --- /dev/null +++ b/backend/internal/handler/admin/kimi_oauth_handler.go @@ -0,0 +1,182 @@ +package admin + +import ( + "encoding/json" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/pkg/response" + "github.com/Wei-Shaw/sub2api/internal/service" + + "github.com/gin-gonic/gin" +) + +// KimiOAuthHandler handles Kimi OAuth admin endpoints. +type KimiOAuthHandler struct { + kimiOAuthService *service.KimiOAuthService + proxyRepo service.ProxyRepository +} + +// NewKimiOAuthHandler creates a new KimiOAuthHandler. +func NewKimiOAuthHandler(kimiOAuthService *service.KimiOAuthService, proxyRepo service.ProxyRepository) *KimiOAuthHandler { + return &KimiOAuthHandler{ + kimiOAuthService: kimiOAuthService, + proxyRepo: proxyRepo, + } +} + +type kimiDeviceAuthRequest struct { + ProxyID *int64 `json:"proxy_id"` +} + +// InitiateDeviceAuth starts the Kimi OAuth device authorization flow. +// POST /api/v1/admin/kimi/oauth/device-auth +func (h *KimiOAuthHandler) InitiateDeviceAuth(c *gin.Context) { + var req kimiDeviceAuthRequest + if err := c.ShouldBindJSON(&req); err != nil { + response.BadRequest(c, "Invalid request: "+err.Error()) + return + } + + result, err := h.kimiOAuthService.InitiateDeviceAuth(c.Request.Context(), req.ProxyID, h.proxyRepo) + if err != nil { + response.InternalError(c, "Failed to initiate device auth: "+err.Error()) + return + } + + response.Success(c, gin.H{ + "device_code": result.DeviceCode, + "user_code": result.UserCode, + "verification_uri": result.VerificationURI, + "verification_uri_complete": result.VerificationURIComplete, + "expires_in": result.ExpiresIn, + "interval": result.Interval, + }) +} + +type kimiPollTokenRequest struct { + DeviceCode string `json:"device_code" binding:"required"` +} + +// PollToken polls for the OAuth token after device authorization. +// POST /api/v1/admin/kimi/oauth/token +func (h *KimiOAuthHandler) PollToken(c *gin.Context) { + var req kimiPollTokenRequest + if err := c.ShouldBindJSON(&req); err != nil { + response.BadRequest(c, "Invalid request: "+err.Error()) + return + } + + tokenInfo, err := h.kimiOAuthService.PollToken(c.Request.Context(), strings.TrimSpace(req.DeviceCode)) + if err != nil { + msg := err.Error() + if strings.Contains(msg, "authorization_pending") { + response.Success(c, gin.H{ + "status": "pending", + "message": "Authorization pending, please complete authorization in the browser", + }) + return + } + if strings.Contains(msg, "expired_token") || strings.Contains(msg, "invalid_grant") { + response.BadRequest(c, "Token request failed: "+msg) + return + } + response.InternalError(c, "Failed to poll token: "+msg) + return + } + + response.Success(c, gin.H{ + "status": "completed", + "access_token": tokenInfo.AccessToken, + "refresh_token": tokenInfo.RefreshToken, + "expires_in": tokenInfo.ExpiresIn, + "expires_at": tokenInfo.ExpiresAt, + "scope": tokenInfo.Scope, + "token_type": tokenInfo.TokenType, + }) +} + +type kimiRefreshTokenRequest struct { + RefreshToken string `json:"refresh_token" binding:"required"` + ProxyID *int64 `json:"proxy_id"` +} + +// RefreshToken refreshes the Kimi access token. +// POST /api/v1/admin/kimi/oauth/refresh +func (h *KimiOAuthHandler) RefreshToken(c *gin.Context) { + var req kimiRefreshTokenRequest + if err := c.ShouldBindJSON(&req); err != nil { + response.BadRequest(c, "Invalid request: "+err.Error()) + return + } + + var proxyURL string + if req.ProxyID != nil && h.proxyRepo != nil { + if proxy, err := h.proxyRepo.GetByID(c.Request.Context(), *req.ProxyID); err == nil && proxy != nil { + proxyURL = proxy.URL() + } + } + + tokenInfo, err := h.kimiOAuthService.RefreshToken(c.Request.Context(), strings.TrimSpace(req.RefreshToken), proxyURL) + if err != nil { + response.BadRequest(c, "Failed to refresh token: "+err.Error()) + return + } + + response.Success(c, gin.H{ + "access_token": tokenInfo.AccessToken, + "refresh_token": tokenInfo.RefreshToken, + "expires_in": tokenInfo.ExpiresIn, + "expires_at": tokenInfo.ExpiresAt, + "scope": tokenInfo.Scope, + "token_type": tokenInfo.TokenType, + }) +} + +// ImportLocalToken imports token from local kimi-cli credentials. +// POST /api/v1/admin/kimi/oauth/import-local +func (h *KimiOAuthHandler) ImportLocalToken(c *gin.Context) { + var req struct { + CredentialsJSON string `json:"credentials_json" binding:"required"` + } + if err := c.ShouldBindJSON(&req); err != nil { + response.BadRequest(c, "Invalid request: "+err.Error()) + return + } + + // Parse the local credentials JSON format + var localCreds struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` + ExpiresAt float64 `json:"expires_at"` + Scope string `json:"scope"` + TokenType string `json:"token_type"` + ExpiresIn float64 `json:"expires_in"` + } + if err := json.Unmarshal([]byte(req.CredentialsJSON), &localCreds); err != nil { + response.BadRequest(c, "Invalid credentials JSON: "+err.Error()) + return + } + + if strings.TrimSpace(localCreds.AccessToken) == "" { + response.BadRequest(c, "access_token is required in credentials") + return + } + + expiresAt := int64(localCreds.ExpiresAt) + if expiresAt == 0 && localCreds.ExpiresIn > 0 { + expiresAt = time.Now().Unix() + int64(localCreds.ExpiresIn) - service.KimiTokenSafetyWindow + } + minExpiresAt := time.Now().Unix() + service.KimiTokenMinTTL + if expiresAt < minExpiresAt { + expiresAt = minExpiresAt + } + + response.Success(c, gin.H{ + "access_token": localCreds.AccessToken, + "refresh_token": localCreds.RefreshToken, + "expires_at": expiresAt, + "scope": localCreds.Scope, + "token_type": localCreds.TokenType, + }) +} diff --git a/backend/internal/handler/gateway_handler_chat_completions.go b/backend/internal/handler/gateway_handler_chat_completions.go index be267332..430cc359 100644 --- a/backend/internal/handler/gateway_handler_chat_completions.go +++ b/backend/internal/handler/gateway_handler_chat_completions.go @@ -210,7 +210,12 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) { if channelMapping.Mapped { forwardBody = h.gatewayService.ReplaceModelInBody(body, channelMapping.MappedModel) } - result, err := h.gatewayService.ForwardAsChatCompletions(c.Request.Context(), c, account, forwardBody, parsedReq) + var result *service.ForwardResult + if account.Platform == service.PlatformKimi { + result, err = h.gatewayService.ForwardKimiChatCompletions(c.Request.Context(), c, account, forwardBody) + } else { + result, err = h.gatewayService.ForwardAsChatCompletions(c.Request.Context(), c, account, forwardBody, parsedReq) + } if accountReleaseFunc != nil { accountReleaseFunc() diff --git a/backend/internal/handler/handler.go b/backend/internal/handler/handler.go index 906a74f1..22dd5b22 100644 --- a/backend/internal/handler/handler.go +++ b/backend/internal/handler/handler.go @@ -17,6 +17,7 @@ type AdminHandlers struct { OpenAIOAuth *admin.OpenAIOAuthHandler GeminiOAuth *admin.GeminiOAuthHandler AntigravityOAuth *admin.AntigravityOAuthHandler + KimiOAuth *admin.KimiOAuthHandler Proxy *admin.ProxyHandler Redeem *admin.RedeemHandler Promo *admin.PromoHandler diff --git a/backend/internal/handler/wire.go b/backend/internal/handler/wire.go index 4b54d41a..65efd7a3 100644 --- a/backend/internal/handler/wire.go +++ b/backend/internal/handler/wire.go @@ -20,6 +20,7 @@ func ProvideAdminHandlers( openaiOAuthHandler *admin.OpenAIOAuthHandler, geminiOAuthHandler *admin.GeminiOAuthHandler, antigravityOAuthHandler *admin.AntigravityOAuthHandler, + kimiOAuthHandler *admin.KimiOAuthHandler, proxyHandler *admin.ProxyHandler, redeemHandler *admin.RedeemHandler, promoHandler *admin.PromoHandler, @@ -48,6 +49,7 @@ func ProvideAdminHandlers( OpenAIOAuth: openaiOAuthHandler, GeminiOAuth: geminiOAuthHandler, AntigravityOAuth: antigravityOAuthHandler, + KimiOAuth: kimiOAuthHandler, Proxy: proxyHandler, Redeem: redeemHandler, Promo: promoHandler, @@ -142,6 +144,7 @@ var ProviderSet = wire.NewSet( admin.NewOpenAIOAuthHandler, admin.NewGeminiOAuthHandler, admin.NewAntigravityOAuthHandler, + admin.NewKimiOAuthHandler, admin.NewProxyHandler, admin.NewRedeemHandler, admin.NewPromoHandler, diff --git a/backend/internal/model/error_passthrough_rule.go b/backend/internal/model/error_passthrough_rule.go index 620736cd..f72afc38 100644 --- a/backend/internal/model/error_passthrough_rule.go +++ b/backend/internal/model/error_passthrough_rule.go @@ -36,11 +36,12 @@ const ( PlatformOpenAI = "openai" PlatformGemini = "gemini" PlatformAntigravity = "antigravity" + PlatformKimi = "kimi" ) // AllPlatforms 返回所有支持的平台列表 func AllPlatforms() []string { - return []string{PlatformAnthropic, PlatformOpenAI, PlatformGemini, PlatformAntigravity} + return []string{PlatformAnthropic, PlatformOpenAI, PlatformGemini, PlatformAntigravity, PlatformKimi} } // Validate 验证规则配置的有效性 diff --git a/backend/internal/server/routes/admin.go b/backend/internal/server/routes/admin.go index 84c963ec..0ec6b7ca 100644 --- a/backend/internal/server/routes/admin.go +++ b/backend/internal/server/routes/admin.go @@ -41,6 +41,9 @@ func RegisterAdminRoutes( // Antigravity OAuth registerAntigravityOAuthRoutes(admin, h) + // Kimi OAuth + registerKimiOAuthRoutes(admin, h) + // 代理管理 registerProxyRoutes(admin, h) @@ -338,6 +341,16 @@ func registerAntigravityOAuthRoutes(admin *gin.RouterGroup, h *handler.Handlers) } } +func registerKimiOAuthRoutes(admin *gin.RouterGroup, h *handler.Handlers) { + kimi := admin.Group("/kimi") + { + kimi.POST("/oauth/device-auth", h.Admin.KimiOAuth.InitiateDeviceAuth) + kimi.POST("/oauth/token", h.Admin.KimiOAuth.PollToken) + kimi.POST("/oauth/refresh", h.Admin.KimiOAuth.RefreshToken) + kimi.POST("/oauth/import-local", h.Admin.KimiOAuth.ImportLocalToken) + } +} + func registerProxyRoutes(admin *gin.RouterGroup, h *handler.Handlers) { proxies := admin.Group("/proxies") { diff --git a/backend/internal/service/domain_constants.go b/backend/internal/service/domain_constants.go index 3c6888b8..870d8f7a 100644 --- a/backend/internal/service/domain_constants.go +++ b/backend/internal/service/domain_constants.go @@ -24,6 +24,7 @@ const ( PlatformOpenAI = domain.PlatformOpenAI PlatformGemini = domain.PlatformGemini PlatformAntigravity = domain.PlatformAntigravity + PlatformKimi = domain.PlatformKimi ) // Account type constants diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 5a91d0de..8b854cb3 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -547,6 +547,7 @@ type GatewayService struct { deferredService *DeferredService concurrencyService *ConcurrencyService claudeTokenProvider *ClaudeTokenProvider + kimiTokenProvider *KimiTokenProvider sessionLimitCache SessionLimitCache // 会话数量限制缓存(仅 Anthropic OAuth/SetupToken) rpmCache RPMCache // RPM 计数缓存(仅 Anthropic OAuth/SetupToken) userGroupRateResolver *userGroupRateResolver @@ -585,6 +586,7 @@ func NewGatewayService( httpUpstream HTTPUpstream, deferredService *DeferredService, claudeTokenProvider *ClaudeTokenProvider, + kimiTokenProvider *KimiTokenProvider, sessionLimitCache SessionLimitCache, rpmCache RPMCache, digestStore *DigestSessionStore, @@ -617,6 +619,7 @@ func NewGatewayService( httpUpstream: httpUpstream, deferredService: deferredService, claudeTokenProvider: claudeTokenProvider, + kimiTokenProvider: kimiTokenProvider, sessionLimitCache: sessionLimitCache, rpmCache: rpmCache, userGroupRateCache: gocache.New(userGroupRateTTL, time.Minute), @@ -3466,6 +3469,15 @@ func (s *GatewayService) getOAuthToken(ctx context.Context, account *Account) (s return accessToken, "oauth", nil } + // 对于 Kimi OAuth 账号,使用 KimiTokenProvider 获取缓存的 token + if account.Platform == PlatformKimi && account.Type == AccountTypeOAuth && s.kimiTokenProvider != nil { + accessToken, err := s.kimiTokenProvider.GetAccessToken(ctx, account) + if err != nil { + return "", "", err + } + return accessToken, "oauth", nil + } + // 其他情况(Gemini 有自己的 TokenProvider,setup-token 类型等)直接从账号读取 accessToken := account.GetCredential("access_token") if accessToken == "" { diff --git a/backend/internal/service/kimi_gateway_service.go b/backend/internal/service/kimi_gateway_service.go new file mode 100644 index 00000000..2ac13a71 --- /dev/null +++ b/backend/internal/service/kimi_gateway_service.go @@ -0,0 +1,251 @@ +package service + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/pkg/logger" + "github.com/gin-gonic/gin" + "github.com/tidwall/gjson" +) + +const ( + kimiAPIBaseURL = "https://api.kimi.com/coding/v1" + kimiChatCompletionsURL = kimiAPIBaseURL + "/chat/completions" + kimiDefaultUserAgent = "kimi-cli/1.0" +) + +// ForwardKimiChatCompletions forwards an OpenAI Chat Completions request directly to Kimi API. +// Kimi API is OpenAI Chat Completions compatible, so the body is forwarded as-is. +func (s *GatewayService) ForwardKimiChatCompletions( + ctx context.Context, + c *gin.Context, + account *Account, + body []byte, +) (*ForwardResult, error) { + startTime := time.Now() + + // 1. Get access token + token, tokenType, err := s.GetAccessToken(ctx, account) + if err != nil { + return nil, fmt.Errorf("get access token: %w", err) + } + + // 2. Get proxy URL + proxyURL := "" + if account.ProxyID != nil && account.Proxy != nil { + proxyURL = account.Proxy.URL() + } + + // 3. Parse request to determine if streaming + reqStream := gjson.GetBytes(body, "stream").Bool() + + // 4. Build upstream request + req, err := http.NewRequestWithContext(ctx, "POST", kimiChatCompletionsURL, bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("build upstream request: %w", err) + } + + // Set auth header + if tokenType == "oauth" { + setHeaderRaw(req.Header, "authorization", "Bearer "+token) + } else { + setHeaderRaw(req.Header, "x-api-key", token) + } + + // Set required headers for Kimi API + setHeaderRaw(req.Header, "content-type", "application/json") + setHeaderRaw(req.Header, "accept", "application/json") + // Kimi Coding API requires a known coding agent User-Agent pattern + setHeaderRaw(req.Header, "user-agent", kimiDefaultUserAgent) + + // Passthrough allowed client headers + var clientHeaders http.Header + if c != nil && c.Request != nil { + clientHeaders = c.Request.Header + } + for key, values := range clientHeaders { + lowerKey := strings.ToLower(key) + if allowedHeaders[lowerKey] { + wireKey := resolveWireCasing(key) + for _, v := range values { + addHeaderRaw(req.Header, wireKey, v) + } + } + } + + // 5. Resolve TLS fingerprint profile + tlsProfile := s.tlsFPProfileService.ResolveTLSProfile(account) + + // 6. Send request + resp, err := s.httpUpstream.DoWithTLS(req, proxyURL, account.ID, account.Concurrency, tlsProfile) + if err != nil { + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + safeErr := sanitizeUpstreamErrorMessage(err.Error()) + setOpsUpstreamError(c, 0, safeErr, "") + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: 0, + Kind: "request_error", + Message: safeErr, + }) + writeGatewayCCError(c, http.StatusBadGateway, "server_error", "Upstream request failed") + return nil, fmt.Errorf("upstream request failed: %s", safeErr) + } + defer func() { _ = resp.Body.Close() }() + + // 7. Handle error response + if resp.StatusCode >= 400 { + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + + if s.shouldFailoverUpstreamError(resp.StatusCode) { + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "failover", + Message: upstreamMsg, + }) + if s.rateLimitService != nil { + s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) + } + return nil, &UpstreamFailoverError{ + StatusCode: resp.StatusCode, + ResponseBody: respBody, + } + } + + writeGatewayCCError(c, mapUpstreamStatusCode(resp.StatusCode), "server_error", upstreamMsg) + return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg) + } + + // 8. Handle successful response + originalModel := gjson.GetBytes(body, "model").String() + mappedModel := originalModel + if account.Type == AccountTypeAPIKey { + mappedModel = account.GetMappedModel(originalModel) + } + + var result *ForwardResult + if reqStream { + result, err = s.handleKimiStreamingResponse(resp, c, originalModel, mappedModel, startTime) + } else { + result, err = s.handleKimiNonStreamingResponse(resp, c, originalModel, mappedModel, startTime) + } + + return result, err +} + +func (s *GatewayService) handleKimiStreamingResponse( + resp *http.Response, + c *gin.Context, + originalModel, mappedModel string, + startTime time.Time, +) (*ForwardResult, error) { + if c == nil || c.Writer == nil { + return nil, errors.New("gin context or writer is nil") + } + + c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.WriteHeader(http.StatusOK) + + flusher, ok := c.Writer.(http.Flusher) + if !ok { + return nil, errors.New("streaming not supported") + } + + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(make([]byte, 4096), defaultMaxLineSize) + + var firstTokenTime *time.Duration + var upstreamModel string + + for scanner.Scan() { + line := scanner.Text() + if line == "" { + continue + } + + // Detect first token + if firstTokenTime == nil { + elapsed := time.Since(startTime) + firstTokenTime = &elapsed + } + + // Write the SSE line as-is + fmt.Fprintf(c.Writer, "%s\n", line) //nolint:errcheck + flusher.Flush() + + // Try to extract upstream model from the first data line + if upstreamModel == "" && strings.HasPrefix(line, "data: {") { + upstreamModel = gjson.Get(line[6:], "model").String() + } + } + + if err := scanner.Err(); err != nil && !errors.Is(err, io.EOF) { + logger.LegacyPrintf("service.gateway", "Kimi stream scanner error: %v", err) + } + + // Send final [DONE] if not already sent by upstream + fmt.Fprintf(c.Writer, "data: [DONE]\n\n") //nolint:errcheck + flusher.Flush() + + var firstTokenMs *int + if firstTokenTime != nil { + ms := int(firstTokenTime.Milliseconds()) + firstTokenMs = &ms + } + + return &ForwardResult{ + UpstreamModel: upstreamModel, + FirstTokenMs: firstTokenMs, + }, nil +} + +func (s *GatewayService) handleKimiNonStreamingResponse( + resp *http.Response, + c *gin.Context, + originalModel, mappedModel string, + startTime time.Time, +) (*ForwardResult, error) { + body, err := io.ReadAll(io.LimitReader(resp.Body, 50<<20)) + if err != nil { + return nil, fmt.Errorf("read upstream response: %w", err) + } + + if c != nil && c.Writer != nil { + // Copy upstream headers + for key, values := range resp.Header { + for _, v := range values { + c.Writer.Header().Add(key, v) + } + } + c.Writer.Header().Set("Content-Type", "application/json") + c.Writer.WriteHeader(resp.StatusCode) + _, _ = c.Writer.Write(body) + } + + upstreamModel := gjson.GetBytes(body, "model").String() + + return &ForwardResult{ + UpstreamModel: upstreamModel, + }, nil +} + diff --git a/backend/internal/service/kimi_oauth_service.go b/backend/internal/service/kimi_oauth_service.go new file mode 100644 index 00000000..670f1262 --- /dev/null +++ b/backend/internal/service/kimi_oauth_service.go @@ -0,0 +1,306 @@ +package service + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/Wei-Shaw/sub2api/internal/pkg/httpclient" + "github.com/Wei-Shaw/sub2api/internal/pkg/logger" +) + +const ( + kimiAuthBaseURL = "https://auth.kimi.com" + kimiDeviceAuthURL = kimiAuthBaseURL + "/api/oauth/device_authorization" + kimiTokenURL = kimiAuthBaseURL + "/api/oauth/token" + kimiClientID = "17e5f671-d194-4dfb-9706-5516cb48c098" + kimiDefaultScope = "kimi-code" + KimiTokenSafetyWindow = 300 // 5 minutes + KimiTokenMinTTL = 30 // 30 seconds +) + +// KimiDeviceAuthResponse represents the response from the device authorization endpoint. +type KimiDeviceAuthResponse struct { + DeviceCode string `json:"device_code"` + UserCode string `json:"user_code"` + VerificationURI string `json:"verification_uri"` + VerificationURIComplete string `json:"verification_uri_complete"` + ExpiresIn int64 `json:"expires_in"` + Interval int64 `json:"interval"` +} + +// KimiTokenResponse represents the response from the token endpoint. +type KimiTokenResponse struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` + ExpiresIn int64 `json:"expires_in"` + Scope string `json:"scope"` + TokenType string `json:"token_type"` +} + +// KimiTokenInfo holds token information for an account. +type KimiTokenInfo struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` + ExpiresIn int64 `json:"expires_in"` + ExpiresAt int64 `json:"expires_at"` + Scope string `json:"scope,omitempty"` + TokenType string `json:"token_type,omitempty"` +} + +// KimiOAuthService handles Kimi OAuth device flow and token refresh. +type KimiOAuthService struct { + sessions sync.Map // device_code -> *kimiOAuthSession +} + +type kimiOAuthSession struct { + DeviceCode string + UserCode string + VerificationURI string + VerificationURIComplete string + ExpiresAt time.Time + Interval time.Duration + ProxyURL string +} + +// NewKimiOAuthService creates a new KimiOAuthService. +func NewKimiOAuthService() *KimiOAuthService { + return &KimiOAuthService{} +} + +// InitiateDeviceAuth starts the OAuth 2.0 Device Authorization Grant flow. +func (s *KimiOAuthService) InitiateDeviceAuth(ctx context.Context, proxyID *int64, proxyRepo ProxyRepository) (*KimiDeviceAuthResponse, error) { + data := url.Values{} + data.Set("client_id", kimiClientID) + data.Set("scope", kimiDefaultScope) + + var proxyURL string + if proxyID != nil && proxyRepo != nil { + if proxy, err := proxyRepo.GetByID(ctx, *proxyID); err == nil && proxy != nil { + proxyURL = proxy.URL() + } + } + + req, err := http.NewRequestWithContext(ctx, "POST", kimiDeviceAuthURL, strings.NewReader(data.Encode())) + if err != nil { + return nil, fmt.Errorf("failed to create device auth request: %w", err) + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Accept", "application/json") + + resp, err := s.doRequest(req, proxyURL) + if err != nil { + return nil, fmt.Errorf("device auth request failed: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) + if err != nil { + return nil, fmt.Errorf("failed to read device auth response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("device auth failed: status=%d body=%s", resp.StatusCode, string(body)) + } + + var result KimiDeviceAuthResponse + if err := json.Unmarshal(body, &result); err != nil { + return nil, fmt.Errorf("failed to parse device auth response: %w", err) + } + + interval := time.Duration(result.Interval) * time.Second + if interval <= 0 { + interval = 5 * time.Second + } + + session := &kimiOAuthSession{ + DeviceCode: result.DeviceCode, + UserCode: result.UserCode, + VerificationURI: result.VerificationURI, + VerificationURIComplete: result.VerificationURIComplete, + ExpiresAt: time.Now().Add(time.Duration(result.ExpiresIn) * time.Second), + Interval: interval, + ProxyURL: proxyURL, + } + s.sessions.Store(result.DeviceCode, session) + + logger.LegacyPrintf("service.kimi_oauth", "[KimiOAuth] Device auth initiated: user_code=%s", result.UserCode) + return &result, nil +} + +// PollToken polls the token endpoint for a device_code. +func (s *KimiOAuthService) PollToken(ctx context.Context, deviceCode string) (*KimiTokenInfo, error) { + sessionVal, ok := s.sessions.Load(deviceCode) + if !ok { + return nil, fmt.Errorf("session not found or expired") + } + session := sessionVal.(*kimiOAuthSession) + + if time.Now().After(session.ExpiresAt) { + s.sessions.Delete(deviceCode) + return nil, fmt.Errorf("device auth session expired") + } + + data := url.Values{} + data.Set("client_id", kimiClientID) + data.Set("grant_type", "urn:ietf:params:oauth:grant-type:device_code") + data.Set("device_code", deviceCode) + + req, err := http.NewRequestWithContext(ctx, "POST", kimiTokenURL, strings.NewReader(data.Encode())) + if err != nil { + return nil, fmt.Errorf("failed to create token request: %w", err) + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Accept", "application/json") + + resp, err := s.doRequest(req, session.ProxyURL) + if err != nil { + return nil, fmt.Errorf("token request failed: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) + if err != nil { + return nil, fmt.Errorf("failed to read token response: %w", err) + } + + if resp.StatusCode == http.StatusBadRequest || resp.StatusCode == http.StatusForbidden { + // Check for authorization_pending + var errResp struct { + Error string `json:"error"` + } + if json.Unmarshal(body, &errResp) == nil && errResp.Error == "authorization_pending" { + return nil, fmt.Errorf("authorization_pending") + } + if errResp.Error == "expired_token" || errResp.Error == "invalid_grant" { + s.sessions.Delete(deviceCode) + } + return nil, fmt.Errorf("token poll failed: %s", errResp.Error) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("token poll failed: status=%d body=%s", resp.StatusCode, string(body)) + } + + var tokenResp KimiTokenResponse + if err := json.Unmarshal(body, &tokenResp); err != nil { + return nil, fmt.Errorf("failed to parse token response: %w", err) + } + + s.sessions.Delete(deviceCode) + + expiresAt := time.Now().Unix() + tokenResp.ExpiresIn - KimiTokenSafetyWindow + minExpiresAt := time.Now().Unix() + KimiTokenMinTTL + if expiresAt < minExpiresAt { + expiresAt = minExpiresAt + } + + logger.LegacyPrintf("service.kimi_oauth", "[KimiOAuth] Token obtained successfully") + return &KimiTokenInfo{ + AccessToken: tokenResp.AccessToken, + RefreshToken: tokenResp.RefreshToken, + ExpiresIn: tokenResp.ExpiresIn, + ExpiresAt: expiresAt, + Scope: tokenResp.Scope, + TokenType: tokenResp.TokenType, + }, nil +} + +// RefreshToken refreshes the access token using a refresh token. +func (s *KimiOAuthService) RefreshToken(ctx context.Context, refreshToken, proxyURL string) (*KimiTokenInfo, error) { + if strings.TrimSpace(refreshToken) == "" { + return nil, fmt.Errorf("refresh_token is empty") + } + + data := url.Values{} + data.Set("client_id", kimiClientID) + data.Set("grant_type", "refresh_token") + data.Set("refresh_token", refreshToken) + + req, err := http.NewRequestWithContext(ctx, "POST", kimiTokenURL, strings.NewReader(data.Encode())) + if err != nil { + return nil, fmt.Errorf("failed to create refresh request: %w", err) + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Accept", "application/json") + + resp, err := s.doRequest(req, proxyURL) + if err != nil { + return nil, fmt.Errorf("refresh request failed: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) + if err != nil { + return nil, fmt.Errorf("failed to read refresh response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("refresh failed: status=%d body=%s", resp.StatusCode, string(body)) + } + + var tokenResp KimiTokenResponse + if err := json.Unmarshal(body, &tokenResp); err != nil { + return nil, fmt.Errorf("failed to parse refresh response: %w", err) + } + + expiresAt := time.Now().Unix() + tokenResp.ExpiresIn - KimiTokenSafetyWindow + minExpiresAt := time.Now().Unix() + KimiTokenMinTTL + if expiresAt < minExpiresAt { + expiresAt = minExpiresAt + } + + return &KimiTokenInfo{ + AccessToken: tokenResp.AccessToken, + RefreshToken: tokenResp.RefreshToken, + ExpiresIn: tokenResp.ExpiresIn, + ExpiresAt: expiresAt, + Scope: tokenResp.Scope, + TokenType: tokenResp.TokenType, + }, nil +} + +// BuildAccountCredentials builds the credentials map for a Kimi account. +func (s *KimiOAuthService) BuildAccountCredentials(tokenInfo *KimiTokenInfo) map[string]any { + creds := map[string]any{ + "access_token": tokenInfo.AccessToken, + "expires_at": strconv.FormatInt(tokenInfo.ExpiresAt, 10), + } + if tokenInfo.RefreshToken != "" { + creds["refresh_token"] = tokenInfo.RefreshToken + } + if tokenInfo.TokenType != "" { + creds["token_type"] = tokenInfo.TokenType + } + if tokenInfo.Scope != "" { + creds["scope"] = tokenInfo.Scope + } + return creds +} + +// Stop cleans up the service. +func (s *KimiOAuthService) Stop() { + s.sessions.Range(func(key, value any) bool { + s.sessions.Delete(key) + return true + }) +} + +func (s *KimiOAuthService) doRequest(req *http.Request, proxyURL string) (*http.Response, error) { + opts := httpclient.Options{ + ProxyURL: proxyURL, + Timeout: 30 * time.Second, + } + client, err := httpclient.GetClient(opts) + if err != nil { + return nil, err + } + return client.Do(req) +} diff --git a/backend/internal/service/kimi_token_provider.go b/backend/internal/service/kimi_token_provider.go new file mode 100644 index 00000000..4063769a --- /dev/null +++ b/backend/internal/service/kimi_token_provider.go @@ -0,0 +1,198 @@ +package service + +import ( + "context" + "errors" + "log/slog" + "strconv" + "strings" + "time" +) + +const ( + kimiTokenRefreshSkew = 3 * time.Minute + kimiTokenCacheSkew = 5 * time.Minute +) + +// KimiTokenProvider manages access_token for Kimi OAuth accounts. +type KimiTokenProvider struct { + accountRepo AccountRepository + tokenCache GeminiTokenCache + kimiOAuthService *KimiOAuthService + refreshAPI *OAuthRefreshAPI + executor OAuthRefreshExecutor + refreshPolicy ProviderRefreshPolicy +} + +// NewKimiTokenProvider creates a new KimiTokenProvider. +func NewKimiTokenProvider( + accountRepo AccountRepository, + tokenCache GeminiTokenCache, + kimiOAuthService *KimiOAuthService, +) *KimiTokenProvider { + return &KimiTokenProvider{ + accountRepo: accountRepo, + tokenCache: tokenCache, + kimiOAuthService: kimiOAuthService, + refreshPolicy: KimiProviderRefreshPolicy(), + } +} + +// SetRefreshAPI injects unified OAuth refresh API and executor. +func (p *KimiTokenProvider) SetRefreshAPI(api *OAuthRefreshAPI, executor OAuthRefreshExecutor) { + p.refreshAPI = api + p.executor = executor +} + +// SetRefreshPolicy injects caller-side refresh policy. +func (p *KimiTokenProvider) SetRefreshPolicy(policy ProviderRefreshPolicy) { + p.refreshPolicy = policy +} + +// GetAccessToken returns a valid access token for the given Kimi account. +func (p *KimiTokenProvider) GetAccessToken(ctx context.Context, account *Account) (string, error) { + if account == nil { + return "", errors.New("account is nil") + } + if account.Platform != PlatformKimi || account.Type != AccountTypeOAuth { + return "", errors.New("not a kimi oauth account") + } + + cacheKey := KimiTokenCacheKey(account) + + // 1) Try cache first. + if p.tokenCache != nil { + if token, err := p.tokenCache.GetAccessToken(ctx, cacheKey); err == nil && strings.TrimSpace(token) != "" { + return token, nil + } + } + + // 2) Refresh if needed (pre-expiry skew). + expiresAt := account.GetCredentialAsTime("expires_at") + needsRefresh := expiresAt == nil || time.Until(*expiresAt) <= kimiTokenRefreshSkew + + if needsRefresh && p.refreshAPI != nil && p.executor != nil { + result, err := p.refreshAPI.RefreshIfNeeded(ctx, account, p.executor, kimiTokenRefreshSkew) + if err != nil { + if p.refreshPolicy.OnRefreshError == ProviderRefreshErrorReturn { + return "", err + } + } else if result.LockHeld { + if p.refreshPolicy.OnLockHeld == ProviderLockHeldWaitForCache && p.tokenCache != nil { + if token, cacheErr := p.tokenCache.GetAccessToken(ctx, cacheKey); cacheErr == nil && strings.TrimSpace(token) != "" { + return token, nil + } + } + slog.Debug("kimi_token_lock_held_use_old", "account_id", account.ID) + } else { + account = result.Account + expiresAt = account.GetCredentialAsTime("expires_at") + } + } else if needsRefresh && p.tokenCache != nil { + locked, lockErr := p.tokenCache.AcquireRefreshLock(ctx, cacheKey, 30*time.Second) + if lockErr == nil && locked { + defer func() { _ = p.tokenCache.ReleaseRefreshLock(ctx, cacheKey) }() + } else if lockErr != nil { + slog.Warn("kimi_token_lock_failed", "account_id", account.ID, "error", lockErr) + } + } + + accessToken := account.GetCredential("access_token") + if strings.TrimSpace(accessToken) == "" { + return "", errors.New("access_token not found in credentials") + } + + // 3) Populate cache with TTL. + if p.tokenCache != nil { + latestAccount, isStale := CheckTokenVersion(ctx, account, p.accountRepo) + if isStale && latestAccount != nil { + slog.Debug("kimi_token_version_stale_use_latest", "account_id", account.ID) + accessToken = latestAccount.GetCredential("access_token") + if strings.TrimSpace(accessToken) == "" { + return "", errors.New("access_token not found after version check") + } + } else { + ttl := 30 * time.Minute + if expiresAt != nil { + until := time.Until(*expiresAt) + switch { + case until > kimiTokenCacheSkew: + ttl = until - kimiTokenCacheSkew + case until > 0: + ttl = until + default: + ttl = time.Minute + } + } + _ = p.tokenCache.SetAccessToken(ctx, cacheKey, accessToken, ttl) + } + } + + return accessToken, nil +} + +// KimiTokenCacheKey returns the cache key for a Kimi account. +func KimiTokenCacheKey(account *Account) string { + return "kimi:account:" + strconv.FormatInt(account.ID, 10) +} + +// KimiProviderRefreshPolicy returns the default refresh policy for Kimi. +func KimiProviderRefreshPolicy() ProviderRefreshPolicy { + return ProviderRefreshPolicy{ + OnRefreshError: ProviderRefreshErrorReturn, + OnLockHeld: ProviderLockHeldWaitForCache, + } +} + +// KimiTokenRefresher implements OAuthRefreshExecutor for Kimi. +type KimiTokenRefresher struct { + kimiOAuthService *KimiOAuthService +} + +// NewKimiTokenRefresher creates a new KimiTokenRefresher. +func NewKimiTokenRefresher(kimiOAuthService *KimiOAuthService) *KimiTokenRefresher { + return &KimiTokenRefresher{kimiOAuthService: kimiOAuthService} +} + +// CanRefresh checks if this refresher can handle the given account. +func (r *KimiTokenRefresher) CanRefresh(account *Account) bool { + return account != nil && account.Platform == PlatformKimi && account.Type == AccountTypeOAuth +} + +// NeedsRefresh checks if the account needs token refresh. +func (r *KimiTokenRefresher) NeedsRefresh(account *Account, refreshWindow time.Duration) bool { + if account == nil { + return false + } + expiresAt := account.GetCredentialAsTime("expires_at") + return expiresAt == nil || time.Until(*expiresAt) <= refreshWindow +} + +// Refresh performs the token refresh for a Kimi account. +func (r *KimiTokenRefresher) Refresh(ctx context.Context, account *Account) (map[string]any, error) { + if account == nil || r.kimiOAuthService == nil { + return nil, errors.New("kimi token refresher not initialized") + } + + refreshToken := account.GetCredential("refresh_token") + if strings.TrimSpace(refreshToken) == "" { + return nil, errors.New("no refresh token available") + } + + var proxyURL string + if account.ProxyID != nil && account.Proxy != nil { + proxyURL = account.Proxy.URL() + } + + tokenInfo, err := r.kimiOAuthService.RefreshToken(ctx, refreshToken, proxyURL) + if err != nil { + return nil, err + } + + return r.kimiOAuthService.BuildAccountCredentials(tokenInfo), nil +} + +// CacheKey returns the cache key for distributed locking. +func (r *KimiTokenRefresher) CacheKey(account *Account) string { + return KimiTokenCacheKey(account) +} diff --git a/backend/internal/service/token_cache_invalidator.go b/backend/internal/service/token_cache_invalidator.go index 74c9edc3..15b99182 100644 --- a/backend/internal/service/token_cache_invalidator.go +++ b/backend/internal/service/token_cache_invalidator.go @@ -46,6 +46,9 @@ func (c *CompositeTokenCacheInvalidator) InvalidateToken(ctx context.Context, ac keysToDelete = append(keysToDelete, OpenAITokenCacheKey(account)) case PlatformAnthropic: keysToDelete = append(keysToDelete, ClaudeTokenCacheKey(account)) + case PlatformKimi: + keysToDelete = append(keysToDelete, KimiTokenCacheKey(account)) + keysToDelete = append(keysToDelete, "kimi:"+accountIDKey) default: return nil } diff --git a/backend/internal/service/token_refresh_service.go b/backend/internal/service/token_refresh_service.go index 22f4aa29..5dc8bb26 100644 --- a/backend/internal/service/token_refresh_service.go +++ b/backend/internal/service/token_refresh_service.go @@ -84,6 +84,16 @@ func NewTokenRefreshService( return s } +// SetKimiOAuthService 注入 Kimi OAuth 服务并注册刷新器 +func (s *TokenRefreshService) SetKimiOAuthService(kimiOAuthService *KimiOAuthService) { + if kimiOAuthService == nil { + return + } + kimiRefresher := NewKimiTokenRefresher(kimiOAuthService) + s.refreshers = append(s.refreshers, kimiRefresher) + s.executors = append(s.executors, kimiRefresher) +} + // SetPrivacyDeps 注入 OpenAI privacy opt-out 所需依赖 func (s *TokenRefreshService) SetPrivacyDeps(factory PrivacyClientFactory, proxyRepo ProxyRepository) { s.privacyClientFactory = factory diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index 9f33c46a..502a69ce 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -46,6 +46,7 @@ func ProvideTokenRefreshService( openaiOAuthService *OpenAIOAuthService, geminiOAuthService *GeminiOAuthService, antigravityOAuthService *AntigravityOAuthService, + kimiOAuthService *KimiOAuthService, cacheInvalidator TokenCacheInvalidator, schedulerCache SchedulerCache, cfg *config.Config, @@ -55,6 +56,8 @@ func ProvideTokenRefreshService( refreshAPI *OAuthRefreshAPI, ) *TokenRefreshService { svc := NewTokenRefreshService(accountRepo, oauthService, openaiOAuthService, geminiOAuthService, antigravityOAuthService, cacheInvalidator, schedulerCache, cfg, tempUnschedCache) + // 注入 Kimi OAuth 服务 + svc.SetKimiOAuthService(kimiOAuthService) // 注入 OpenAI privacy opt-out 依赖 svc.SetPrivacyDeps(privacyClientFactory, proxyRepo) // 注入统一 OAuth 刷新 API(消除 TokenRefreshService 与 TokenProvider 之间的竞争条件) @@ -123,6 +126,20 @@ func ProvideAntigravityTokenProvider( return p } +// ProvideKimiTokenProvider creates KimiTokenProvider with OAuthRefreshAPI injection +func ProvideKimiTokenProvider( + accountRepo AccountRepository, + tokenCache GeminiTokenCache, + kimiOAuthService *KimiOAuthService, + refreshAPI *OAuthRefreshAPI, +) *KimiTokenProvider { + p := NewKimiTokenProvider(accountRepo, tokenCache, kimiOAuthService) + executor := NewKimiTokenRefresher(kimiOAuthService) + p.SetRefreshAPI(refreshAPI, executor) + p.SetRefreshPolicy(KimiProviderRefreshPolicy()) + return p +} + // ProvideDashboardAggregationService 创建并启动仪表盘聚合服务 func ProvideDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService { svc := NewDashboardAggregationService(repo, timingWheel, cfg) @@ -409,8 +426,10 @@ var ProviderSet = wire.NewSet( NewCompositeTokenCacheInvalidator, wire.Bind(new(TokenCacheInvalidator), new(*CompositeTokenCacheInvalidator)), NewAntigravityOAuthService, + NewKimiOAuthService, NewOAuthRefreshAPI, ProvideGeminiTokenProvider, + ProvideKimiTokenProvider, NewGeminiMessagesCompatService, ProvideAntigravityTokenProvider, ProvideOpenAITokenProvider, diff --git a/frontend/src/components/account/CreateAccountModal.vue b/frontend/src/components/account/CreateAccountModal.vue index 2130c9ab..26177a9b 100644 --- a/frontend/src/components/account/CreateAccountModal.vue +++ b/frontend/src/components/account/CreateAccountModal.vue @@ -147,6 +147,21 @@ Antigravity + @@ -729,6 +744,63 @@ + +
+ +
+ + +
+
+
@@ -2550,7 +2622,145 @@
+ +
+
+
+
+ +
+
+

+ Kimi Code OAuth +

+ + +
+

+ {{ t('admin.accounts.oauth.kimi.deviceFlowDesc') }} +

+ + +
+

+ {{ kimiOAuth.error.value }} +

+
+
+ + +
+
+

+ {{ t('admin.accounts.oauth.kimi.step1OpenLink') }} +

+
+ + {{ kimiOAuth.userCode.value }} + +
+ + + {{ t('admin.accounts.oauth.kimi.openVerificationPage') }} + +
+ +
+

+ {{ t('admin.accounts.oauth.kimi.step2Wait') }} +

+
+ + + + + {{ t('admin.accounts.oauth.kimi.pollingForToken') }} +
+
+ + +
+

+ {{ kimiOAuth.error.value }} +

+
+
+
+
+
+
+ + -