package proxy import ( "bufio" "io" "net/http" "time" "github.com/gin-gonic/gin" "github.com/rs/zerolog/log" "github.com/tidwall/gjson" "github.com/fujin/anthropic-proxy/internal/auth" "github.com/fujin/anthropic-proxy/internal/logging" ) func HandleMessages(pool *auth.Pool, profile *SniffedProfile, getSanitizer func() *Sanitizer) gin.HandlerFunc { upstream := NewUpstreamClient(profile) return func(c *gin.Context) { body, err := io.ReadAll(c.Request.Body) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "failed to read request body"}) return } originalBody := make([]byte, len(body)) copy(originalBody, body) log.Info(). Str("method", c.Request.Method). Str("path", c.Request.URL.Path). Int("body_size", len(body)). Str("model", gjson.GetBytes(body, "model").String()). Msg("incoming request") san := getSanitizer() body = san.SanitizeRequest(body) cred, err := pool.Pick() if err != nil { c.JSON(http.StatusServiceUnavailable, gin.H{"error": err.Error()}) return } isStream := gjson.GetBytes(body, "stream").Bool() if isStream { handleStream(c, upstream, san, pool, cred, body, originalBody) } else { handleNonStream(c, upstream, san, pool, cred, body, originalBody) } } } func handleNonStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool *auth.Pool, cred *auth.Credential, body []byte, originalBody []byte) { startTime := time.Now() respBody, headers, statusCode, err := upstream.Execute(c.Request.Context(), cred, body) if err != nil { latencyMs := float64(time.Since(startTime).Milliseconds()) model := gjson.GetBytes(body, "model").String() log.Error(). Err(err). Str("credential", cred.Email). Str("model", model). Bool("stream", false). Str("request_body_original", string(originalBody)). Str("request_body_sanitized", string(body)). Int("request_body_size", len(body)). Float64("latency_ms", latencyMs). Msg("upstream connection error") c.JSON(http.StatusBadGateway, gin.H{"error": "upstream request failed"}) return } if statusCode >= 400 { pool.MarkFailure(cred, statusCode) latencyMs := float64(time.Since(startTime).Milliseconds()) model := gjson.GetBytes(body, "model").String() errorType := gjson.GetBytes(respBody, "error.type").String() errorMessage := gjson.GetBytes(respBody, "error.message").String() log.Error(). Int("status", statusCode). Str("error_type", errorType). Str("error_message", errorMessage). Str("response_body", string(respBody)). Str("request_id", headers.Get("X-Request-Id")). Float64("latency_ms", latencyMs). Str("credential", cred.Email). Str("model", model). Bool("stream", false). Str("request_body_original", string(originalBody)). Str("request_body_sanitized", string(body)). Int("request_body_size", len(body)). Str("request_headers", logging.RedactHeaders(c.Request.Header)). Msg("upstream error") } else { pool.MarkSuccess(cred) respBody = san.DesanitizeResponse(respBody) } for _, h := range []string{"Content-Type", "X-Request-Id"} { if v := headers.Get(h); v != "" { c.Header(h, v) } } c.Data(statusCode, headers.Get("Content-Type"), respBody) } func handleStream(c *gin.Context, upstream *UpstreamClient, san *Sanitizer, pool *auth.Pool, cred *auth.Credential, body []byte, originalBody []byte) { startTime := time.Now() resp, err := upstream.ExecuteStream(c.Request.Context(), cred, body) if err != nil { latencyMs := float64(time.Since(startTime).Milliseconds()) model := gjson.GetBytes(body, "model").String() log.Error(). Err(err). Str("credential", cred.Email). Str("model", model). Bool("stream", true). Str("request_body_original", string(originalBody)). Str("request_body_sanitized", string(body)). Int("request_body_size", len(body)). Float64("latency_ms", latencyMs). Msg("upstream connection error") c.JSON(http.StatusBadGateway, gin.H{"error": "upstream stream request failed"}) return } defer resp.Body.Close() if resp.StatusCode >= 400 { pool.MarkFailure(cred, resp.StatusCode) respBody, _ := io.ReadAll(resp.Body) latencyMs := float64(time.Since(startTime).Milliseconds()) model := gjson.GetBytes(body, "model").String() errorType := gjson.GetBytes(respBody, "error.type").String() errorMessage := gjson.GetBytes(respBody, "error.message").String() log.Error(). Int("status", resp.StatusCode). Str("error_type", errorType). Str("error_message", errorMessage). Str("response_body", string(respBody)). Str("request_id", resp.Header.Get("X-Request-Id")). Float64("latency_ms", latencyMs). Str("credential", cred.Email). Str("model", model). Bool("stream", true). Str("request_body_original", string(originalBody)). Str("request_body_sanitized", string(body)). Int("request_body_size", len(body)). Str("request_headers", logging.RedactHeaders(c.Request.Header)). Msg("upstream error") c.Data(resp.StatusCode, resp.Header.Get("Content-Type"), respBody) return } pool.MarkSuccess(cred) c.Header("Content-Type", "text/event-stream") c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") c.Status(http.StatusOK) flusher, ok := c.Writer.(http.Flusher) if !ok { log.Error().Msg("response writer does not support flushing") c.JSON(http.StatusInternalServerError, gin.H{"error": "streaming not supported"}) return } scanner := bufio.NewScanner(resp.Body) scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) for scanner.Scan() { line := san.DesanitizeStreamEvent(scanner.Text()) c.Writer.WriteString(line + "\n") flusher.Flush() } if err := scanner.Err(); err != nil { log.Error().Err(err).Msg("stream scan error") } }