From 3249bdc35cbe33e52fff4516d36428633e6b952f Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 6 May 2026 21:58:24 +0200 Subject: [PATCH] Add gRPC observability: logging, metrics, recovery interceptors --- cmd/music-agregator/main.go | 74 +++++++++++++++++++++++++++++++++++-- go.mod | 10 +++++ go.sum | 18 +++++++++ internal/indexer/jackett.go | 19 +++++++--- internal/indexer/search.go | 28 ++++++++++++-- internal/indexer/server.go | 29 +++++++++++++-- internal/indexer/service.go | 7 +++- 7 files changed, 170 insertions(+), 15 deletions(-) diff --git a/cmd/music-agregator/main.go b/cmd/music-agregator/main.go index 713b5c3..0797e57 100644 --- a/cmd/music-agregator/main.go +++ b/cmd/music-agregator/main.go @@ -1,16 +1,26 @@ package main import ( + "context" "flag" "fmt" "net" + "net/http" "os" + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "gopkg.in/yaml.v2" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "homelab.lan/music-agregator/internal" "homelab.lan/music-agregator/internal/config" "homelab.lan/music-agregator/internal/hello" @@ -33,9 +43,54 @@ func main() { serveGrpc(*cfg) } +func interceptorLogger(l zerolog.Logger) logging.Logger { + return logging.LoggerFunc(func(ctx context.Context, lvl logging.Level, msg string, fields ...any) { + l := l.With().Fields(fields).Logger() + switch lvl { + case logging.LevelDebug: + l.Debug().Msg(msg) + case logging.LevelInfo: + l.Info().Msg(msg) + case logging.LevelWarn: + l.Warn().Msg(msg) + case logging.LevelError: + l.Error().Msg(msg) + default: + l.Info().Msg(msg) + } + }) +} + func serveGrpc(config config.Config) { - var opts []grpc.ServerOption - server := grpc.NewServer(opts...) + srvMetrics := grpcprom.NewServerMetrics( + grpcprom.WithServerHandlingTimeHistogram( + grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 30}), + ), + ) + + logOpts := []logging.Option{ + logging.WithLogOnEvents(logging.StartCall, logging.FinishCall), + } + + recoveryOpts := []recovery.Option{ + recovery.WithRecoveryHandler(func(p any) (err error) { + log.Error().Interface("panic", p).Msg("recovered from panic") + return status.Errorf(codes.Internal, "internal error") + }), + } + + server := grpc.NewServer( + grpc.ChainUnaryInterceptor( + srvMetrics.UnaryServerInterceptor(), + logging.UnaryServerInterceptor(interceptorLogger(log.Logger), logOpts...), + recovery.UnaryServerInterceptor(recoveryOpts...), + ), + grpc.ChainStreamInterceptor( + srvMetrics.StreamServerInterceptor(), + logging.StreamServerInterceptor(interceptorLogger(log.Logger), logOpts...), + recovery.StreamServerInterceptor(recoveryOpts...), + ), + ) indexerServer, err := indexer.NewIndexerServer(config) if err != nil { @@ -51,11 +106,24 @@ func serveGrpc(config config.Config) { service.Register(server) } + srvMetrics.InitializeMetrics(server) + prometheus.MustRegister(srvMetrics) + + go func() { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + log.Info().Msg("Prometheus metrics available at :9090/metrics") + if err := http.ListenAndServe(":9090", mux); err != nil { + log.Fatal().Err(err).Msg("Failed to start metrics server") + } + }() + listener, err := net.Listen("tcp", fmt.Sprintf("%v:%v", config.App.Host, config.App.Port)) if err != nil { - log.Fatal().Err(err).Msg("Failed to listen on localhost:8081") + log.Fatal().Err(err).Msg("Failed to listen") } + log.Info().Str("addr", listener.Addr().String()).Msg("gRPC server listening") server.Serve(listener) } diff --git a/go.mod b/go.mod index a3e3253..8581c56 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,19 @@ go 1.26.2 require github.com/rs/zerolog v1.35.1 require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0 // indirect + github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.3 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/mattn/go-colorable v0.1.14 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_golang v1.23.2 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect ) diff --git a/go.sum b/go.sum index fa2fd8f..75a962e 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -11,6 +13,10 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0 h1:QGLs/O40yoNK9vmy4rhUGBVyMf1lISBGtXRpsu/Qu/o= +github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0/go.mod h1:hM2alZsMUni80N33RBe6J0e423LB+odMj7d3EMP9l20= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.3 h1:B+8ClL/kCQkRiU82d9xajRPKYMrB7E0MbtzWVi1K4ns= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.3/go.mod h1:NbCUVmiS4foBGBHOYlCT25+YmGpJ32dZPi75pGEUpj4= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -22,7 +28,17 @@ github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHP github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= @@ -40,6 +56,8 @@ go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfC go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A= go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A= go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/indexer/jackett.go b/internal/indexer/jackett.go index e26e5e3..525ba08 100644 --- a/internal/indexer/jackett.go +++ b/internal/indexer/jackett.go @@ -34,33 +34,42 @@ func (indexer *JacketIndexer) Search(query string, limit int32, tracker string) url := indexer.cfg.Indexer.Url uri := fmt.Sprintf("%v/api/v2.0/indexers/%v/results/torznab?apikey=%v&limit=%d&cat=3010,3040&q=%v&t=search", url, searchTracker, indexer.cfg.Indexer.ApiKey, limit, query) - log.Debug().Str("uri", uri).Msg("Sending search request") + log.Trace().Str("tracker", searchTracker).Str("query", query).Int32("limit", limit).Msg("jackett request") req, err := http.NewRequest("GET", uri, nil) if err != nil { - log.Error().Err(err).Msg("Error creating request") + log.Error().Err(err).Msg("error creating request") return SearchResult{}, err } + start := time.Now() resp, err := indexer.client.Do(req) if err != nil { - log.Error().Err(err).Msg("Error making search request") + log.Error().Err(err).Msg("error making search request") return SearchResult{}, err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - log.Error().Err(err).Msg("Error reading search response body") + log.Error().Err(err).Msg("error reading search response body") return SearchResult{}, err } + log.Trace(). + Int("status", resp.StatusCode). + Int("body_bytes", len(body)). + Dur("duration", time.Since(start)). + Msg("jackett response") + var searchResult SearchResult if err := xml.Unmarshal(body, &searchResult); err != nil { - log.Error().Err(err).Msg("Error parsing search XML") + log.Error().Err(err).Msg("error parsing search XML") return SearchResult{}, err } + log.Trace().Int("items", len(searchResult.Items)).Msg("jackett XML parsed") + return searchResult, nil } diff --git a/internal/indexer/search.go b/internal/indexer/search.go index 0f2bbc6..f212669 100644 --- a/internal/indexer/search.go +++ b/internal/indexer/search.go @@ -3,6 +3,8 @@ package indexer import ( "encoding/xml" + "github.com/rs/zerolog/log" + pb "homelab.lan/music-agregator/gen/music_agregator/indexer/v1" "homelab.lan/music-agregator/internal/tracker/rutracker" ) @@ -46,8 +48,25 @@ var ( func (sr *SearchResult) ToProto() *pb.SearchResponse { var pbItems []*pb.SearchItem + var skipped int for _, item := range sr.Items { + release := rutrackerParserFactory.GetParser(item.Categories).Parse(item.Title) + + log.Trace(). + Str("tracker", item.JackettIndexer.ID). + Str("title", item.Title). + Str("artist", release.Artist). + Str("album", release.Album). + Int("year", release.Year). + Bool("parsed", release.ParsedSuccessfully). + Msg("parsed item") + + if !release.ParsedSuccessfully { + skipped++ + continue + } + pbAttrs := make([]*pb.TorznabAttr, len(item.TorznabAttrs)) for j, attr := range item.TorznabAttrs { pbAttrs[j] = &pb.TorznabAttr{ @@ -56,9 +75,6 @@ func (sr *SearchResult) ToProto() *pb.SearchResponse { } } - // TODO add the check from what tracker the result is to properly get the parser and thus parse it - release := rutrackerParserFactory.GetParser(item.Categories).Parse(item.Title) - pbItems = append(pbItems, &pb.SearchItem{ Title: item.Title, DownloadLink: item.Link, @@ -77,6 +93,12 @@ func (sr *SearchResult) ToProto() *pb.SearchResponse { }) } + log.Trace(). + Int("total", len(sr.Items)). + Int("parsed", len(pbItems)). + Int("skipped", skipped). + Msg("conversion complete") + return &pb.SearchResponse{ Result: pbItems, } diff --git a/internal/indexer/server.go b/internal/indexer/server.go index 359279b..976ab46 100644 --- a/internal/indexer/server.go +++ b/internal/indexer/server.go @@ -26,13 +26,36 @@ func NewIndexerServer(cfg config.Config) (*IndexerServer, error) { } func (server *IndexerServer) Search(ctx context.Context, req *pb.SearchRequest) (*pb.SearchResponse, error) { - log.Debug().Str("query", req.GetQuery()).Int32("limit", req.GetLimit()).Str("indexer", req.GetTracker()).Msg("Running search with these prams") + log.Debug(). + Str("query", req.GetQuery()). + Int32("limit", req.GetLimit()). + Str("tracker", req.GetTracker()). + Msg("search started") - return server.service.Search(req) + resp, err := server.service.Search(req) + if err != nil { + log.Error().Err(err).Str("query", req.GetQuery()).Msg("search failed") + return nil, err + } + + log.Debug(). + Str("query", req.GetQuery()). + Int("results", len(resp.GetResult())). + Msg("search completed") + + return resp, nil } func (server *IndexerServer) Capabilities(ctx context.Context, req *pb.CapabilitiesRequest) (*pb.CapabilitiesResponse, error) { - return server.service.Capabilities(req) + log.Debug().Str("indexer", req.GetIndexer()).Msg("capabilities requested") + + resp, err := server.service.Capabilities(req) + if err != nil { + log.Error().Err(err).Str("indexer", req.GetIndexer()).Msg("capabilities failed") + return nil, err + } + + return resp, nil } func (s *IndexerServer) Register(server *grpc.Server) { diff --git a/internal/indexer/service.go b/internal/indexer/service.go index a01f30f..39973e3 100644 --- a/internal/indexer/service.go +++ b/internal/indexer/service.go @@ -24,11 +24,16 @@ func NewIndexerService(cfg config.Config) (*IndexerService, error) { } func (service *IndexerService) Search(req *pb.SearchRequest) (*pb.SearchResponse, error) { + log.Trace().Str("query", req.GetQuery()).Msg("fetching results from indexer") + searchResult, err := service.indexer.Search(req.GetQuery(), req.GetLimit(), req.GetTracker()) if err != nil { - log.Error().Err(err).Msg("Failed to search in indexer") + log.Error().Err(err).Msg("failed to search in indexer") return nil, err } + + log.Trace().Int("raw_items", len(searchResult.Items)).Msg("indexer returned results, converting to proto") + return searchResult.ToProto(), nil }