|
|
|
|
@ -40,6 +40,11 @@ const (
|
|
|
|
|
resourceDeviceToken = "devicetokens" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
crdHandlingEnsure = "ensure" |
|
|
|
|
crdHandlingCheck = "check" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
var _ storage.Storage = (*client)(nil) |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
@ -50,15 +55,16 @@ const (
|
|
|
|
|
type Config struct { |
|
|
|
|
InCluster bool `json:"inCluster"` |
|
|
|
|
KubeConfigFile string `json:"kubeConfigFile"` |
|
|
|
|
// CRDHandling controls how the storage handles Custom Resource Definitions (CRDs).
|
|
|
|
|
// Supported values:
|
|
|
|
|
// - "ensure": Attempt to create all missing CRDs. If any CRD creation fails, initialization fails. (default)
|
|
|
|
|
// - "check": Fail immediately if any CRDs are missing with message "storage is not initialized, CRDs are not created"
|
|
|
|
|
CRDHandling string `json:"crdHandling"` |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Open returns a storage using Kubernetes third party resource.
|
|
|
|
|
func (c *Config) Open(logger *slog.Logger) (storage.Storage, error) { |
|
|
|
|
cli, err := c.open(logger, false) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
return cli, nil |
|
|
|
|
return c.open(logger, false) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// open returns a kubernetes client, initializing the third party resources used
|
|
|
|
|
@ -67,6 +73,9 @@ func (c *Config) Open(logger *slog.Logger) (storage.Storage, error) {
|
|
|
|
|
// waitForResources controls if errors creating the resources cause this method to return
|
|
|
|
|
// immediately (used during testing), or if the client will asynchronously retry.
|
|
|
|
|
func (c *Config) open(logger *slog.Logger, waitForResources bool) (*client, error) { |
|
|
|
|
if c.CRDHandling == "" { |
|
|
|
|
c.CRDHandling = crdHandlingEnsure |
|
|
|
|
} |
|
|
|
|
if c.InCluster && (c.KubeConfigFile != "") { |
|
|
|
|
return nil, errors.New("cannot specify both 'inCluster' and 'kubeConfigFile'") |
|
|
|
|
} |
|
|
|
|
@ -89,7 +98,7 @@ func (c *Config) open(logger *slog.Logger, waitForResources bool) (*client, erro
|
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
cli, err := newClient(cluster, user, namespace, logger, c.InCluster) |
|
|
|
|
cli, err := newClient(cluster, user, namespace, logger, c.InCluster, c.CRDHandling) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("create client: %v", err) |
|
|
|
|
} |
|
|
|
|
@ -143,45 +152,55 @@ func (c *Config) open(logger *slog.Logger, waitForResources bool) (*client, erro
|
|
|
|
|
// It logs all errors, returning true if the resources were created successfully.
|
|
|
|
|
//
|
|
|
|
|
// Creating a custom resource does not mean that they'll be immediately available.
|
|
|
|
|
func (cli *client) registerCustomResources() (ok bool) { |
|
|
|
|
ok = true |
|
|
|
|
|
|
|
|
|
func (cli *client) registerCustomResources() bool { |
|
|
|
|
definitions := customResourceDefinitions(cli.crdAPIVersion) |
|
|
|
|
length := len(definitions) |
|
|
|
|
|
|
|
|
|
for i := 0; i < length; i++ { |
|
|
|
|
var err error |
|
|
|
|
var resourceName string |
|
|
|
|
// First pass: collect all CRDs that don't exist
|
|
|
|
|
var missingCRDs []k8sapi.CustomResourceDefinition |
|
|
|
|
|
|
|
|
|
r := definitions[i] |
|
|
|
|
for _, r := range definitions { |
|
|
|
|
var i interface{} |
|
|
|
|
cli.logger.Info("checking if custom resource has already been created...", "object", r.ObjectMeta.Name) |
|
|
|
|
if err := cli.listN(r.Spec.Names.Plural, &i, 1); err == nil { |
|
|
|
|
cli.logger.Info("the custom resource already available, skipping create", "object", r.ObjectMeta.Name) |
|
|
|
|
continue |
|
|
|
|
} else { |
|
|
|
|
cli.logger.Info("failed to list custom resource, attempting to create", "object", r.ObjectMeta.Name, "err", err) |
|
|
|
|
cli.logger.Info("custom resource not found", "object", r.ObjectMeta.Name, "err", err) |
|
|
|
|
missingCRDs = append(missingCRDs, r) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
err = cli.postResource(cli.crdAPIVersion, "", "customresourcedefinitions", r) |
|
|
|
|
resourceName = r.ObjectMeta.Name |
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
switch err { |
|
|
|
|
case storage.ErrAlreadyExists: |
|
|
|
|
cli.logger.Info("custom resource already created", "object", resourceName) |
|
|
|
|
case storage.ErrNotFound: |
|
|
|
|
cli.logger.Error("custom resources not found, please enable the respective API group") |
|
|
|
|
ok = false |
|
|
|
|
default: |
|
|
|
|
cli.logger.Error("creating custom resource", "object", resourceName, "err", err) |
|
|
|
|
ok = false |
|
|
|
|
// Second pass: handle missing CRDs based on crdHandling option
|
|
|
|
|
if len(missingCRDs) > 0 { |
|
|
|
|
cli.logger.Info("found missing CRDs", "count", len(missingCRDs)) |
|
|
|
|
switch cli.crdHandling { |
|
|
|
|
case crdHandlingCheck: |
|
|
|
|
// For "check" mode, fail and report that CRDs are not initialized
|
|
|
|
|
cli.logger.Error("storage is not initialized, CRDs are not created", "crdHandling", cli.crdHandling, "missing_count", len(missingCRDs)) |
|
|
|
|
return false |
|
|
|
|
case crdHandlingEnsure: |
|
|
|
|
cli.logger.Info("crdHandling is 'ensure', attempting to create missing CRDs") |
|
|
|
|
for _, r := range missingCRDs { |
|
|
|
|
resourceName := r.ObjectMeta.Name |
|
|
|
|
err := cli.postResource(cli.crdAPIVersion, "", "customresourcedefinitions", r) |
|
|
|
|
if err != nil { |
|
|
|
|
if !errors.Is(err, storage.ErrAlreadyExists) { |
|
|
|
|
cli.logger.Error("failed to create custom resource", "object", resourceName, "err", err) |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
cli.logger.Info("custom resource already created", "object", resourceName) |
|
|
|
|
} else { |
|
|
|
|
cli.logger.Info("successfully created custom resource", "object", resourceName) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
continue |
|
|
|
|
return true |
|
|
|
|
default: |
|
|
|
|
cli.logger.Error("invalid crdHandling value", "value", cli.crdHandling) |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
cli.logger.Error("create custom resource", "object", resourceName) |
|
|
|
|
} |
|
|
|
|
return ok |
|
|
|
|
|
|
|
|
|
// All CRDs exist
|
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// waitForCRDs waits for all CRDs to be in a ready state, and is used
|
|
|
|
|
|