package etcd import ( "context" "fmt" "log/slog" "os" "runtime" "strings" "testing" "time" clientv3 "go.etcd.io/etcd/client/v3" "github.com/dexidp/dex/storage" "github.com/dexidp/dex/storage/conformance" ) func withTimeout(t time.Duration, f func()) { c := make(chan struct{}) defer close(c) go func() { select { case <-c: case <-time.After(t): // Dump a stack trace of the program. Useful for debugging deadlocks. buf := make([]byte, 2<<20) fmt.Fprintf(os.Stderr, "%s\n", buf[:runtime.Stack(buf, true)]) panic("test took too long") } }() f() } func cleanDB(c *conn) error { ctx := context.TODO() for _, prefix := range []string{ clientPrefix, authCodePrefix, refreshTokenPrefix, authRequestPrefix, passwordPrefix, offlineSessionPrefix, connectorPrefix, deviceRequestPrefix, deviceTokenPrefix, } { _, err := c.db.Delete(ctx, prefix, clientv3.WithPrefix()) if err != nil { return err } } return nil } func TestEtcd(t *testing.T) { testEtcdEnv := "DEX_ETCD_ENDPOINTS" endpointsStr := os.Getenv(testEtcdEnv) if endpointsStr == "" { t.Skipf("test environment variable %q not set, skipping", testEtcdEnv) return } endpoints := strings.Split(endpointsStr, ",") newStorage := func(t *testing.T) storage.Storage { s := &Etcd{ Endpoints: endpoints, } logger := slog.New(slog.NewTextHandler(t.Output(), &slog.HandlerOptions{Level: slog.LevelDebug})) conn, err := s.open(logger) if err != nil { fmt.Fprintln(os.Stdout, err) t.Fatal(err) } if err := cleanDB(conn); err != nil { fmt.Fprintln(os.Stdout, err) t.Fatal(err) } return conn } withTimeout(time.Second*10, func() { conformance.RunTests(t, newStorage) }) withTimeout(time.Minute*1, func() { conformance.RunTransactionTests(t, newStorage) }) // TODO(nabokihms): etcd uses compare-and-swap (txnUpdate) for UpdateRefreshToken, // but does not retry on CAS conflicts ("concurrent conflicting update happened"). // Under high contention virtually all updates fail — only the first writer succeeds. // withTimeout(time.Minute*1, func() { // conformance.RunConcurrencyTests(t, newStorage) // }) }