From c4d13dbd768d475c722abc69a91f880f0c47f0da Mon Sep 17 00:00:00 2001 From: Robert Moucha Date: Tue, 4 Mar 2025 16:40:38 +0100 Subject: [PATCH] feat: grpc health service Implements gRPC Health check using standard grpc library. Reuses the health as reported by gosundheit health checker. Grpc health server implements also streaming Watch method, so it was necessary to add Stop() failback to stop server even if some clients ignore NOT_SERVING status. Timeout for graceful shutdown was chosen to match timeouts for http and telemetry servers. Signed-off-by: Robert Moucha --- cmd/dex/serve.go | 61 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 5 deletions(-) diff --git a/cmd/dex/serve.go b/cmd/dex/serve.go index 8a69c7ee..07b92121 100644 --- a/cmd/dex/serve.go +++ b/cmd/dex/serve.go @@ -32,6 +32,8 @@ import ( "github.com/spf13/cobra" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + healthgrpc "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" "github.com/dexidp/dex/api/v2" @@ -59,6 +61,11 @@ var buildInfo = prometheus.NewGaugeVec( []string{"version", "go_version", "platform"}, ) +var ( + healthCheckPeriod = 15 * time.Second + shutdownTimeout = 1 * time.Minute +) + func commandServe() *cobra.Command { options := serveOptions{} @@ -379,7 +386,7 @@ func runServe(options serveOptions) error { CheckName: "storage", CheckFunc: storage.NewCustomHealthCheckFunc(serverConfig.Storage, serverConfig.Now), }, - gosundheit.ExecutionPeriod(15*time.Second), + gosundheit.ExecutionPeriod(healthCheckPeriod), gosundheit.InitiallyPassing(true), ) @@ -408,7 +415,7 @@ func runServe(options serveOptions) error { group.Add(func() error { return server.Serve(l) }, func(err error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() logger.Debug("starting graceful shutdown", "server", name) @@ -437,7 +444,7 @@ func runServe(options serveOptions) error { group.Add(func() error { return server.Serve(l) }, func(err error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() logger.Debug("starting graceful shutdown", "server", name) @@ -488,7 +495,7 @@ func runServe(options serveOptions) error { group.Add(func() error { return server.ServeTLS(l, "", "") }, func(err error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() logger.Debug("starting graceful shutdown", "server", name) @@ -508,6 +515,8 @@ func runServe(options serveOptions) error { } grpcSrv := grpc.NewServer(grpcOptions...) + healthcheck := health.NewServer() + healthgrpc.RegisterHealthServer(grpcSrv, healthcheck) api.RegisterDexServer(grpcSrv, server.NewAPI(serverConfig.Storage, logger, version, serv)) grpcMetrics.InitializeMetrics(grpcSrv) @@ -515,12 +524,44 @@ func runServe(options serveOptions) error { logger.Info("enabling reflection in grpc service") reflection.Register(grpcSrv) } + ctx, cancelHealthcheck := context.WithCancel(context.Background()) + defer cancelHealthcheck() + + group.Add(func() error { + setHealthCheckStatus(healthcheck, healthChecker) + for { + select { + case <-ctx.Done(): + logger.Debug("stopping health check status updater", "server", "grpc") + return nil + case <-time.After(healthCheckPeriod): + setHealthCheckStatus(healthcheck, healthChecker) + } + } + }, func(err error) { + logger.Debug("stopped health check status updater", "server", "grpc") + }) group.Add(func() error { return grpcSrv.Serve(grpcListener) }, func(err error) { logger.Debug("starting graceful shutdown", "server", "grpc") - grpcSrv.GracefulStop() + ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) + + go func() { + healthcheck.Shutdown() + cancelHealthcheck() + grpcSrv.GracefulStop() + cancel() + }() + + <-ctx.Done() + if ctx.Err() != nil { + logger.Debug("Graceful shutdown timed out. forcing shutdown", "server", "grpc") + grpcSrv.Stop() + } else { + logger.Debug("Graceful shutdown completed", "server", "grpc") + } }) } @@ -534,6 +575,16 @@ func runServe(options serveOptions) error { return nil } +func setHealthCheckStatus(healthServer *health.Server, healthChecker gosundheit.Health) { + var status healthgrpc.HealthCheckResponse_ServingStatus + if healthChecker.IsHealthy() { + status = healthgrpc.HealthCheckResponse_SERVING + } else { + status = healthgrpc.HealthCheckResponse_NOT_SERVING + } + healthServer.SetServingStatus("", status) +} + func applyConfigOverrides(options serveOptions, config *Config) { if options.webHTTPAddr != "" { config.Web.HTTP = options.webHTTPAddr