package proxy import ( "bufio" "io" "log" "net/http" "github.com/gin-gonic/gin" "github.com/tidwall/gjson" "github.com/fujin/anthropic-proxy/internal/auth" "github.com/fujin/anthropic-proxy/internal/config" ) func HandleMessages(pool *auth.Pool, profile *SniffedProfile, sanitizeCfg config.SanitizeConfig) gin.HandlerFunc { upstream := NewUpstreamClient(profile) san := NewSanitizer(sanitizeCfg) return func(c *gin.Context) { body, err := io.ReadAll(c.Request.Body) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "failed to read request body"}) return } log.Printf("incoming: %s %s (%d bytes) model=%s", c.Request.Method, c.Request.URL.Path, len(body), gjson.GetBytes(body, "model").String()) body = san.SanitizeRequest(body) cred, err := pool.Pick() if err != nil { c.JSON(http.StatusServiceUnavailable, gin.H{"error": err.Error()}) return } isStream := gjson.GetBytes(body, "stream").Bool() if isStream { handleStream(c, upstream, san, pool, cred, body) } else { handleNonStream(c, upstream, san, pool, cred, body) } } } func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool *auth.Pool, cred *auth.Credential, body []byte) { respBody, headers, statusCode, err := upstream.Execute(c.Request.Context(), cred, body) if err != nil { log.Printf("upstream error for %s: %v", cred.Email, err) c.JSON(http.StatusBadGateway, gin.H{"error": "upstream request failed"}) return } if statusCode >= 400 { pool.MarkFailure(cred, statusCode) log.Printf("upstream %d for %s: %s", statusCode, cred.Email, string(respBody)) } else { pool.MarkSuccess(cred) respBody = san.DesanitizeResponse(respBody) } for _, h := range []string{"Content-Type", "X-Request-Id"} { if v := headers.Get(h); v != "" { c.Header(h, v) } } c.Data(statusCode, headers.Get("Content-Type"), respBody) } func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool *auth.Pool, cred *auth.Credential, body []byte) { resp, err := upstream.ExecuteStream(c.Request.Context(), cred, body) if err != nil { log.Printf("upstream stream error for %s: %v", cred.Email, err) c.JSON(http.StatusBadGateway, gin.H{"error": "upstream stream request failed"}) return } defer resp.Body.Close() if resp.StatusCode >= 400 { pool.MarkFailure(cred, resp.StatusCode) respBody, _ := io.ReadAll(resp.Body) log.Printf("upstream stream %d for %s: %s", resp.StatusCode, cred.Email, string(respBody)) c.Data(resp.StatusCode, resp.Header.Get("Content-Type"), respBody) return } pool.MarkSuccess(cred) c.Header("Content-Type", "text/event-stream") c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") c.Status(http.StatusOK) flusher, ok := c.Writer.(http.Flusher) if !ok { log.Printf("response writer does not support flushing") c.JSON(http.StatusInternalServerError, gin.H{"error": "streaming not supported"}) return } scanner := bufio.NewScanner(resp.Body) scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) for scanner.Scan() { line := san.DesanitizeStreamEvent(scanner.Text()) c.Writer.WriteString(line + "\n") flusher.Flush() } if err := scanner.Err(); err != nil { log.Printf("stream scan error: %v", err) } }