Browse Source

feat: grpc health service

Implements gRPC Health check using standard grpc library. Reuses the
health as reported by gosundheit health checker. Grpc health server
implements also streaming Watch method, so it was necessary to add
Stop() failback to stop server even if some clients ignore NOT_SERVING
status. Timeout for graceful shutdown was chosen to match timeouts for
http and telemetry servers.

Signed-off-by: Robert Moucha <robert.moucha@gooddata.com>
pull/4020/head
Robert Moucha 1 year ago
parent
commit
c4d13dbd76
No known key found for this signature in database
GPG Key ID: 1D27B16C7ECFEBD9
  1. 61
      cmd/dex/serve.go

61
cmd/dex/serve.go

@ -32,6 +32,8 @@ import (
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"github.com/dexidp/dex/api/v2"
@ -59,6 +61,11 @@ var buildInfo = prometheus.NewGaugeVec(
[]string{"version", "go_version", "platform"},
)
var (
healthCheckPeriod = 15 * time.Second
shutdownTimeout = 1 * time.Minute
)
func commandServe() *cobra.Command {
options := serveOptions{}
@ -379,7 +386,7 @@ func runServe(options serveOptions) error {
CheckName: "storage",
CheckFunc: storage.NewCustomHealthCheckFunc(serverConfig.Storage, serverConfig.Now),
},
gosundheit.ExecutionPeriod(15*time.Second),
gosundheit.ExecutionPeriod(healthCheckPeriod),
gosundheit.InitiallyPassing(true),
)
@ -408,7 +415,7 @@ func runServe(options serveOptions) error {
group.Add(func() error {
return server.Serve(l)
}, func(err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
logger.Debug("starting graceful shutdown", "server", name)
@ -437,7 +444,7 @@ func runServe(options serveOptions) error {
group.Add(func() error {
return server.Serve(l)
}, func(err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
logger.Debug("starting graceful shutdown", "server", name)
@ -488,7 +495,7 @@ func runServe(options serveOptions) error {
group.Add(func() error {
return server.ServeTLS(l, "", "")
}, func(err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
logger.Debug("starting graceful shutdown", "server", name)
@ -508,6 +515,8 @@ func runServe(options serveOptions) error {
}
grpcSrv := grpc.NewServer(grpcOptions...)
healthcheck := health.NewServer()
healthgrpc.RegisterHealthServer(grpcSrv, healthcheck)
api.RegisterDexServer(grpcSrv, server.NewAPI(serverConfig.Storage, logger, version, serv))
grpcMetrics.InitializeMetrics(grpcSrv)
@ -515,12 +524,44 @@ func runServe(options serveOptions) error {
logger.Info("enabling reflection in grpc service")
reflection.Register(grpcSrv)
}
ctx, cancelHealthcheck := context.WithCancel(context.Background())
defer cancelHealthcheck()
group.Add(func() error {
setHealthCheckStatus(healthcheck, healthChecker)
for {
select {
case <-ctx.Done():
logger.Debug("stopping health check status updater", "server", "grpc")
return nil
case <-time.After(healthCheckPeriod):
setHealthCheckStatus(healthcheck, healthChecker)
}
}
}, func(err error) {
logger.Debug("stopped health check status updater", "server", "grpc")
})
group.Add(func() error {
return grpcSrv.Serve(grpcListener)
}, func(err error) {
logger.Debug("starting graceful shutdown", "server", "grpc")
grpcSrv.GracefulStop()
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
go func() {
healthcheck.Shutdown()
cancelHealthcheck()
grpcSrv.GracefulStop()
cancel()
}()
<-ctx.Done()
if ctx.Err() != nil {
logger.Debug("Graceful shutdown timed out. forcing shutdown", "server", "grpc")
grpcSrv.Stop()
} else {
logger.Debug("Graceful shutdown completed", "server", "grpc")
}
})
}
@ -534,6 +575,16 @@ func runServe(options serveOptions) error {
return nil
}
func setHealthCheckStatus(healthServer *health.Server, healthChecker gosundheit.Health) {
var status healthgrpc.HealthCheckResponse_ServingStatus
if healthChecker.IsHealthy() {
status = healthgrpc.HealthCheckResponse_SERVING
} else {
status = healthgrpc.HealthCheckResponse_NOT_SERVING
}
healthServer.SetServingStatus("", status)
}
func applyConfigOverrides(options serveOptions, config *Config) {
if options.webHTTPAddr != "" {
config.Web.HTTP = options.webHTTPAddr

Loading…
Cancel
Save