package proxy import ( "bufio" "io" "log" "net/http" "github.com/gin-gonic/gin" "github.com/tidwall/gjson" "github.com/fujin/anthropic-proxy/internal/auth" ) func HandleMessages(pool *auth.Pool, profile *SniffedProfile) gin.HandlerFunc { upstream := NewUpstreamClient(profile) return func(c *gin.Context) { body, err := io.ReadAll(c.Request.Body) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "failed to read request body"}) return } cred, err := pool.Pick() if err != nil { c.JSON(http.StatusServiceUnavailable, gin.H{"error": err.Error()}) return } isStream := gjson.GetBytes(body, "stream").Bool() if isStream { handleStream(c, upstream, pool, cred, body) } else { handleNonStream(c, upstream, pool, cred, body) } } } func handleNonStream(c *gin.Context, upstream *UpstreamClient, pool *auth.Pool, cred *auth.Credential, body []byte) { respBody, headers, statusCode, err := upstream.Execute(c.Request.Context(), cred, body) if err != nil { log.Printf("upstream error for %s: %v", cred.Email, err) c.JSON(http.StatusBadGateway, gin.H{"error": "upstream request failed"}) return } if statusCode >= 400 { pool.MarkFailure(cred, statusCode) log.Printf("upstream %d for %s", statusCode, cred.Email) } else { pool.MarkSuccess(cred) } for _, h := range []string{"Content-Type", "X-Request-Id"} { if v := headers.Get(h); v != "" { c.Header(h, v) } } c.Data(statusCode, headers.Get("Content-Type"), respBody) } func handleStream(c *gin.Context, upstream *UpstreamClient, pool *auth.Pool, cred *auth.Credential, body []byte) { resp, err := upstream.ExecuteStream(c.Request.Context(), cred, body) if err != nil { log.Printf("upstream stream error for %s: %v", cred.Email, err) c.JSON(http.StatusBadGateway, gin.H{"error": "upstream stream request failed"}) return } defer resp.Body.Close() if resp.StatusCode >= 400 { pool.MarkFailure(cred, resp.StatusCode) log.Printf("upstream stream %d for %s", resp.StatusCode, cred.Email) respBody, _ := io.ReadAll(resp.Body) c.Data(resp.StatusCode, resp.Header.Get("Content-Type"), respBody) return } pool.MarkSuccess(cred) c.Header("Content-Type", "text/event-stream") c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") c.Status(http.StatusOK) flusher, ok := c.Writer.(http.Flusher) if !ok { log.Printf("response writer does not support flushing") c.JSON(http.StatusInternalServerError, gin.H{"error": "streaming not supported"}) return } scanner := bufio.NewScanner(resp.Body) scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) for scanner.Scan() { line := scanner.Text() c.Writer.WriteString(line + "\n") flusher.Flush() } if err := scanner.Err(); err != nil { log.Printf("stream scan error: %v", err) } }