Add gRPC observability: logging, metrics, recovery interceptors
This commit is contained in:
@@ -1,16 +1,26 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
|
||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
|
||||
|
||||
"homelab.lan/music-agregator/internal"
|
||||
"homelab.lan/music-agregator/internal/config"
|
||||
"homelab.lan/music-agregator/internal/hello"
|
||||
@@ -33,9 +43,54 @@ func main() {
|
||||
serveGrpc(*cfg)
|
||||
}
|
||||
|
||||
func interceptorLogger(l zerolog.Logger) logging.Logger {
|
||||
return logging.LoggerFunc(func(ctx context.Context, lvl logging.Level, msg string, fields ...any) {
|
||||
l := l.With().Fields(fields).Logger()
|
||||
switch lvl {
|
||||
case logging.LevelDebug:
|
||||
l.Debug().Msg(msg)
|
||||
case logging.LevelInfo:
|
||||
l.Info().Msg(msg)
|
||||
case logging.LevelWarn:
|
||||
l.Warn().Msg(msg)
|
||||
case logging.LevelError:
|
||||
l.Error().Msg(msg)
|
||||
default:
|
||||
l.Info().Msg(msg)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func serveGrpc(config config.Config) {
|
||||
var opts []grpc.ServerOption
|
||||
server := grpc.NewServer(opts...)
|
||||
srvMetrics := grpcprom.NewServerMetrics(
|
||||
grpcprom.WithServerHandlingTimeHistogram(
|
||||
grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 30}),
|
||||
),
|
||||
)
|
||||
|
||||
logOpts := []logging.Option{
|
||||
logging.WithLogOnEvents(logging.StartCall, logging.FinishCall),
|
||||
}
|
||||
|
||||
recoveryOpts := []recovery.Option{
|
||||
recovery.WithRecoveryHandler(func(p any) (err error) {
|
||||
log.Error().Interface("panic", p).Msg("recovered from panic")
|
||||
return status.Errorf(codes.Internal, "internal error")
|
||||
}),
|
||||
}
|
||||
|
||||
server := grpc.NewServer(
|
||||
grpc.ChainUnaryInterceptor(
|
||||
srvMetrics.UnaryServerInterceptor(),
|
||||
logging.UnaryServerInterceptor(interceptorLogger(log.Logger), logOpts...),
|
||||
recovery.UnaryServerInterceptor(recoveryOpts...),
|
||||
),
|
||||
grpc.ChainStreamInterceptor(
|
||||
srvMetrics.StreamServerInterceptor(),
|
||||
logging.StreamServerInterceptor(interceptorLogger(log.Logger), logOpts...),
|
||||
recovery.StreamServerInterceptor(recoveryOpts...),
|
||||
),
|
||||
)
|
||||
|
||||
indexerServer, err := indexer.NewIndexerServer(config)
|
||||
if err != nil {
|
||||
@@ -51,11 +106,24 @@ func serveGrpc(config config.Config) {
|
||||
service.Register(server)
|
||||
}
|
||||
|
||||
srvMetrics.InitializeMetrics(server)
|
||||
prometheus.MustRegister(srvMetrics)
|
||||
|
||||
go func() {
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/metrics", promhttp.Handler())
|
||||
log.Info().Msg("Prometheus metrics available at :9090/metrics")
|
||||
if err := http.ListenAndServe(":9090", mux); err != nil {
|
||||
log.Fatal().Err(err).Msg("Failed to start metrics server")
|
||||
}
|
||||
}()
|
||||
|
||||
listener, err := net.Listen("tcp", fmt.Sprintf("%v:%v", config.App.Host, config.App.Port))
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Failed to listen on localhost:8081")
|
||||
log.Fatal().Err(err).Msg("Failed to listen")
|
||||
}
|
||||
|
||||
log.Info().Str("addr", listener.Addr().String()).Msg("gRPC server listening")
|
||||
server.Serve(listener)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user