mirror of https://github.com/dexidp/dex.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
147 lines
4.8 KiB
147 lines
4.8 KiB
package kubernetes |
|
|
|
import ( |
|
"fmt" |
|
"time" |
|
) |
|
|
|
const ( |
|
lockAnnotation = "dexidp.com/resource-lock" |
|
lockTimeFormat = time.RFC3339 |
|
) |
|
|
|
var ( |
|
lockTimeout = 10 * time.Second |
|
lockCheckPeriod = 100 * time.Millisecond |
|
) |
|
|
|
// refreshTokenLock is an implementation of annotation-based optimistic locking. |
|
// |
|
// Refresh token contains data to refresh identity in external authentication system. |
|
// There is a requirement that refresh should be called only once because of several reasons: |
|
// - Some of OIDC providers could use the refresh token rotation feature which requires calling refresh only once. |
|
// - Providers can limit the rate of requests to the token endpoint, which will lead to the error |
|
// in case of many concurrent requests. |
|
// |
|
// The lock uses a Kubernetes annotation on the refresh token resource as a mutex. |
|
// Only one goroutine can hold the lock at a time; others poll until the annotation |
|
// is removed (unlocked) or expires (broken). The Kubernetes resourceVersion on put |
|
// acts as compare-and-swap: if two goroutines race to set the annotation, only one |
|
// succeeds and the other gets a 409 Conflict. |
|
type refreshTokenLock struct { |
|
cli *client |
|
// waitingState tracks whether this lock instance has lost a compare-and-swap race |
|
// and is now polling for the lock to be released. Used by Unlock to skip the |
|
// annotation removal — only the goroutine that successfully wrote the annotation |
|
// should remove it. |
|
waitingState bool |
|
} |
|
|
|
func newRefreshTokenLock(cli *client) *refreshTokenLock { |
|
return &refreshTokenLock{cli: cli} |
|
} |
|
|
|
// Lock polls until the lock annotation can be set on the refresh token resource. |
|
// Returns nil when the lock is acquired, or an error on timeout (200 attempts × 100ms). |
|
func (l *refreshTokenLock) Lock(id string) error { |
|
for i := 0; i <= 200; i++ { |
|
ok, err := l.setLockAnnotation(id) |
|
if err != nil { |
|
return err |
|
} |
|
if !ok { |
|
return nil |
|
} |
|
time.Sleep(lockCheckPeriod) |
|
} |
|
return fmt.Errorf("timeout waiting for refresh token %s lock", id) |
|
} |
|
|
|
// Unlock removes the lock annotation from the refresh token resource. |
|
// Only the holder of the lock (waitingState == false) performs the removal. |
|
func (l *refreshTokenLock) Unlock(id string) { |
|
if l.waitingState { |
|
// This goroutine never successfully wrote the annotation, so there's |
|
// nothing to remove. Another goroutine holds (or held) the lock. |
|
return |
|
} |
|
|
|
r, err := l.cli.getRefreshToken(id) |
|
if err != nil { |
|
l.cli.logger.Debug("failed to get resource to release lock for refresh token", "token_id", id, "err", err) |
|
return |
|
} |
|
|
|
r.Annotations = nil |
|
err = l.cli.put(resourceRefreshToken, r.ObjectMeta.Name, r) |
|
if err != nil { |
|
l.cli.logger.Debug("failed to release lock for refresh token", "token_id", id, "err", err) |
|
} |
|
} |
|
|
|
// setLockAnnotation attempts to acquire the lock by writing an annotation with |
|
// an expiration timestamp. Returns (true, nil) when the caller should keep waiting, |
|
// (false, nil) when the lock is acquired, or (false, err) on a non-retriable error. |
|
// |
|
// The locking protocol relies on Kubernetes optimistic concurrency: every put |
|
// includes the resource's current resourceVersion, so concurrent writes to the |
|
// same object result in a 409 Conflict for all but one writer. |
|
func (l *refreshTokenLock) setLockAnnotation(id string) (bool, error) { |
|
r, err := l.cli.getRefreshToken(id) |
|
if err != nil { |
|
return false, err |
|
} |
|
|
|
currentTime := time.Now() |
|
lockData := map[string]string{ |
|
lockAnnotation: currentTime.Add(lockTimeout).Format(lockTimeFormat), |
|
} |
|
|
|
val, ok := r.Annotations[lockAnnotation] |
|
if !ok { |
|
// No annotation means the lock is free. Every goroutine — whether it's |
|
// a first-time caller or was previously waiting — must compete by writing |
|
// the annotation. The put uses the current resourceVersion, so only one |
|
// writer succeeds; the rest get a 409 Conflict and go back to polling. |
|
r.Annotations = lockData |
|
err := l.cli.put(resourceRefreshToken, r.ObjectMeta.Name, r) |
|
if err == nil { |
|
l.waitingState = false |
|
return false, nil |
|
} |
|
|
|
if isKubernetesAPIConflictError(err) { |
|
l.waitingState = true |
|
return true, nil |
|
} |
|
return false, err |
|
} |
|
|
|
until, err := time.Parse(lockTimeFormat, val) |
|
if err != nil { |
|
return false, fmt.Errorf("lock annotation value is malformed: %v", err) |
|
} |
|
|
|
if !currentTime.After(until) { |
|
// Lock is held by another goroutine and has not expired yet — keep polling. |
|
l.waitingState = true |
|
return true, nil |
|
} |
|
|
|
// Lock has expired (holder crashed or is too slow). Attempt to break it by |
|
// overwriting the annotation with a new expiration. Again, only one writer |
|
// can win the compare-and-swap race. |
|
r.Annotations = lockData |
|
|
|
err = l.cli.put(resourceRefreshToken, r.ObjectMeta.Name, r) |
|
if err == nil { |
|
return false, nil |
|
} |
|
|
|
l.cli.logger.Debug("break lock annotation", "error", err) |
|
if isKubernetesAPIConflictError(err) { |
|
l.waitingState = true |
|
return true, nil |
|
} |
|
return false, err |
|
}
|
|
|