From cfa325e2a1c7784ad2e3ffb1111dbc3f1b150e90 Mon Sep 17 00:00:00 2001 From: "maksim.nabokikh" Date: Sun, 15 Feb 2026 16:59:16 +0100 Subject: [PATCH] feat(crd): add CRD handling behavior and configuration options Signed-off-by: maksim.nabokikh --- storage/kubernetes/client.go | 8 ++- storage/kubernetes/client_test.go | 1 + storage/kubernetes/storage.go | 83 +++++++++++++++++++------------ 3 files changed, 59 insertions(+), 33 deletions(-) diff --git a/storage/kubernetes/client.go b/storage/kubernetes/client.go index 4d66b8bc..86d88cf7 100644 --- a/storage/kubernetes/client.go +++ b/storage/kubernetes/client.go @@ -65,6 +65,11 @@ type client struct { // storage opening. crdAPIVersion string + // CRD handling behavior controls how missing Custom Resource Definitions are handled: + // - "ensure": Attempt to create all missing CRDs. Fails if any CRD creation fails. (default) + // - "check": Fail if any CRDs are missing, with error "storage is not initialized, CRDs are not created" + crdHandling string + // This is called once the client's Close method is called to signal goroutines, // such as the one creating third party resources, to stop. cancel context.CancelFunc @@ -369,7 +374,7 @@ func defaultTLSConfig() *tls.Config { } } -func newClient(cluster k8sapi.Cluster, user k8sapi.AuthInfo, namespace string, logger *slog.Logger, inCluster bool) (*client, error) { +func newClient(cluster k8sapi.Cluster, user k8sapi.AuthInfo, namespace string, logger *slog.Logger, inCluster bool, crdHandling string) (*client, error) { tlsConfig := defaultTLSConfig() data := func(b string, file string) ([]byte, error) { if b != "" { @@ -440,6 +445,7 @@ func newClient(cluster k8sapi.Cluster, user k8sapi.AuthInfo, namespace string, l namespace: namespace, apiVersion: apiVersion, crdAPIVersion: crdAPIVersion, + crdHandling: crdHandling, logger: logger, }, nil } diff --git a/storage/kubernetes/client_test.go b/storage/kubernetes/client_test.go index e57992f7..6375685e 100644 --- a/storage/kubernetes/client_test.go +++ b/storage/kubernetes/client_test.go @@ -62,6 +62,7 @@ func TestInClusterTransport(t *testing.T) { "test", logger, true, + "", ) require.NoError(t, err) diff --git a/storage/kubernetes/storage.go b/storage/kubernetes/storage.go index a53e2554..028ced7e 100644 --- a/storage/kubernetes/storage.go +++ b/storage/kubernetes/storage.go @@ -40,6 +40,11 @@ const ( resourceDeviceToken = "devicetokens" ) +const ( + crdHandlingEnsure = "ensure" + crdHandlingCheck = "check" +) + var _ storage.Storage = (*client)(nil) const ( @@ -50,15 +55,16 @@ const ( type Config struct { InCluster bool `json:"inCluster"` KubeConfigFile string `json:"kubeConfigFile"` + // CRDHandling controls how the storage handles Custom Resource Definitions (CRDs). + // Supported values: + // - "ensure": Attempt to create all missing CRDs. If any CRD creation fails, initialization fails. (default) + // - "check": Fail immediately if any CRDs are missing with message "storage is not initialized, CRDs are not created" + CRDHandling string `json:"crdHandling"` } // Open returns a storage using Kubernetes third party resource. func (c *Config) Open(logger *slog.Logger) (storage.Storage, error) { - cli, err := c.open(logger, false) - if err != nil { - return nil, err - } - return cli, nil + return c.open(logger, false) } // open returns a kubernetes client, initializing the third party resources used @@ -67,6 +73,9 @@ func (c *Config) Open(logger *slog.Logger) (storage.Storage, error) { // waitForResources controls if errors creating the resources cause this method to return // immediately (used during testing), or if the client will asynchronously retry. func (c *Config) open(logger *slog.Logger, waitForResources bool) (*client, error) { + if c.CRDHandling == "" { + c.CRDHandling = crdHandlingEnsure + } if c.InCluster && (c.KubeConfigFile != "") { return nil, errors.New("cannot specify both 'inCluster' and 'kubeConfigFile'") } @@ -89,7 +98,7 @@ func (c *Config) open(logger *slog.Logger, waitForResources bool) (*client, erro return nil, err } - cli, err := newClient(cluster, user, namespace, logger, c.InCluster) + cli, err := newClient(cluster, user, namespace, logger, c.InCluster, c.CRDHandling) if err != nil { return nil, fmt.Errorf("create client: %v", err) } @@ -143,45 +152,55 @@ func (c *Config) open(logger *slog.Logger, waitForResources bool) (*client, erro // It logs all errors, returning true if the resources were created successfully. // // Creating a custom resource does not mean that they'll be immediately available. -func (cli *client) registerCustomResources() (ok bool) { - ok = true - +func (cli *client) registerCustomResources() bool { definitions := customResourceDefinitions(cli.crdAPIVersion) - length := len(definitions) - for i := 0; i < length; i++ { - var err error - var resourceName string + // First pass: collect all CRDs that don't exist + var missingCRDs []k8sapi.CustomResourceDefinition - r := definitions[i] + for _, r := range definitions { var i interface{} cli.logger.Info("checking if custom resource has already been created...", "object", r.ObjectMeta.Name) if err := cli.listN(r.Spec.Names.Plural, &i, 1); err == nil { cli.logger.Info("the custom resource already available, skipping create", "object", r.ObjectMeta.Name) - continue } else { - cli.logger.Info("failed to list custom resource, attempting to create", "object", r.ObjectMeta.Name, "err", err) + cli.logger.Info("custom resource not found", "object", r.ObjectMeta.Name, "err", err) + missingCRDs = append(missingCRDs, r) } + } - err = cli.postResource(cli.crdAPIVersion, "", "customresourcedefinitions", r) - resourceName = r.ObjectMeta.Name - - if err != nil { - switch err { - case storage.ErrAlreadyExists: - cli.logger.Info("custom resource already created", "object", resourceName) - case storage.ErrNotFound: - cli.logger.Error("custom resources not found, please enable the respective API group") - ok = false - default: - cli.logger.Error("creating custom resource", "object", resourceName, "err", err) - ok = false + // Second pass: handle missing CRDs based on crdHandling option + if len(missingCRDs) > 0 { + cli.logger.Info("found missing CRDs", "count", len(missingCRDs)) + switch cli.crdHandling { + case crdHandlingCheck: + // For "check" mode, fail and report that CRDs are not initialized + cli.logger.Error("storage is not initialized, CRDs are not created", "crdHandling", cli.crdHandling, "missing_count", len(missingCRDs)) + return false + case crdHandlingEnsure: + cli.logger.Info("crdHandling is 'ensure', attempting to create missing CRDs") + for _, r := range missingCRDs { + resourceName := r.ObjectMeta.Name + err := cli.postResource(cli.crdAPIVersion, "", "customresourcedefinitions", r) + if err != nil { + if !errors.Is(err, storage.ErrAlreadyExists) { + cli.logger.Error("failed to create custom resource", "object", resourceName, "err", err) + return false + } + cli.logger.Info("custom resource already created", "object", resourceName) + } else { + cli.logger.Info("successfully created custom resource", "object", resourceName) + } } - continue + return true + default: + cli.logger.Error("invalid crdHandling value", "value", cli.crdHandling) + return false } - cli.logger.Error("create custom resource", "object", resourceName) } - return ok + + // All CRDs exist + return true } // waitForCRDs waits for all CRDs to be in a ready state, and is used