Skip to content

Commit

Permalink
[Kubernetes secret provider] Add cache for the secrets (#3822)
Browse files Browse the repository at this point in the history
* Add secret cache

* Add changelog
  • Loading branch information
constanca-m authored Dec 18, 2023
1 parent b5ff06b commit 52a4275
Show file tree
Hide file tree
Showing 4 changed files with 418 additions and 41 deletions.
32 changes: 32 additions & 0 deletions changelog/fragments/1701091034-add-cache-for-secrets.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: add cache for secrets when using kubernetes secret provider

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/3822

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/3594
18 changes: 17 additions & 1 deletion internal/pkg/composable/providers/kubernetessecrets/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,26 @@

package kubernetessecrets

import "github.com/elastic/elastic-agent-autodiscover/kubernetes"
import (
"time"

"github.com/elastic/elastic-agent-autodiscover/kubernetes"
)

// Config for kubernetes provider
type Config struct {
KubeConfig string `config:"kube_config"`
KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"`

RefreshInterval time.Duration `config:"cache_refresh_interval"`
TTLDelete time.Duration `config:"cache_ttl"`
RequestTimeout time.Duration `config:"cache_request_timeout"`
DisableCache bool `config:"cache_disable"`
}

func (c *Config) InitDefaults() {
c.RefreshInterval = 60 * time.Second
c.TTLDelete = 1 * time.Hour
c.RequestTimeout = 5 * time.Second
c.DisableCache = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"strings"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sclient "k8s.io/client-go/kubernetes"
Expand All @@ -33,6 +34,14 @@ type contextProviderK8sSecrets struct {

clientMx sync.Mutex
client k8sclient.Interface

secretsCacheMx sync.RWMutex
secretsCache map[string]*secretsData
}

type secretsData struct {
value string
lastAccess time.Time
}

// ContextProviderBuilder builds the context provider.
Expand All @@ -46,67 +55,235 @@ func ContextProviderBuilder(logger *logger.Logger, c *config.Config, managed boo
return nil, errors.New(err, "failed to unpack configuration")
}
return &contextProviderK8sSecrets{
logger: logger,
config: &cfg,
logger: logger,
config: &cfg,
secretsCache: make(map[string]*secretsData),
}, nil
}

func (p *contextProviderK8sSecrets) Fetch(key string) (string, bool) {
// key = "kubernetes_secrets.somenamespace.somesecret.value"
if p.config.DisableCache {
valid := p.validateKey(key)
if valid {
return p.fetchSecretWithTimeout(key)
} else {
return "", false
}
} else {
return p.getFromCache(key)
}
}

// Run initializes the k8s secrets context provider.
func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.ContextProviderComm) error {
client, err := getK8sClientFunc(p.config.KubeConfig, p.config.KubeClientOptions)
if err != nil {
p.logger.Debugf("kubernetes_secrets provider skipped, unable to connect: %s", err)
return nil
}
p.clientMx.Lock()
client := p.client
p.client = client
p.clientMx.Unlock()
if client == nil {
return "", false

if !p.config.DisableCache {
go p.updateSecrets(ctx)
}

<-comm.Done()

p.clientMx.Lock()
p.client = nil
p.clientMx.Unlock()
return comm.Err()
}

func getK8sClient(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) {
return kubernetes.GetKubernetesClient(kubeconfig, opt)
}

// Update the secrets in the cache every RefreshInterval
func (p *contextProviderK8sSecrets) updateSecrets(ctx context.Context) {
timer := time.NewTimer(p.config.RefreshInterval)
for {
select {
case <-ctx.Done():
return
case <-timer.C:
p.updateCache()
timer.Reset(p.config.RefreshInterval)
}
}
}

// mergeWithCurrent merges the updated map with the cache map.
// This function needs to be called between the mutex lock for the map.
func (p *contextProviderK8sSecrets) mergeWithCurrent(updatedMap map[string]*secretsData) map[string]*secretsData {
merged := make(map[string]*secretsData)

for name, data := range p.secretsCache {
diff := time.Since(data.lastAccess)
if diff < p.config.TTLDelete {
merged[name] = data
}
}

for name, data := range updatedMap {
// We need to check if the key is already in the new map. If it is, lastAccess cannot be overwritten since
// it could have been updated when trying to fetch the secret at the same time we are running update cache.
// In that case, we only update the value.
if _, ok := merged[name]; ok {
merged[name].value = data.value
}
}

return merged
}

func (p *contextProviderK8sSecrets) updateCache() {
// deleting entries does not free the memory, so we need to create a new map
// to place the secrets we want to keep
cacheTmp := make(map[string]*secretsData)

// to not hold the lock for long, we copy the current state of the cache map
copyMap := make(map[string]secretsData)
p.secretsCacheMx.RLock()
for name, data := range p.secretsCache {
copyMap[name] = *data
}
p.secretsCacheMx.RUnlock()

for name, data := range copyMap {
diff := time.Since(data.lastAccess)
if diff < p.config.TTLDelete {
value, ok := p.fetchSecretWithTimeout(name)
if ok {
newData := &secretsData{
value: value,
lastAccess: data.lastAccess,
}
cacheTmp[name] = newData
}

}
}

// While the cache was updated, it is possible that some secret was added through another go routine.
// We need to merge the updated map with the current cache map to catch the new entries and avoid
// loss of data.
p.secretsCacheMx.Lock()
p.secretsCache = p.mergeWithCurrent(cacheTmp)
p.secretsCacheMx.Unlock()
}

func (p *contextProviderK8sSecrets) getFromCache(key string) (string, bool) {
p.secretsCacheMx.RLock()
_, ok := p.secretsCache[key]
p.secretsCacheMx.RUnlock()

// if value is still not present in cache, it is possible we haven't tried to fetch it yet
if !ok {
value, ok := p.addToCache(key)
// if it was not possible to fetch the secret, return
if !ok {
return value, ok
}
}

p.secretsCacheMx.Lock()
data, ok := p.secretsCache[key]
data.lastAccess = time.Now()
pass := data.value
p.secretsCacheMx.Unlock()

return pass, ok
}

func (p *contextProviderK8sSecrets) validateKey(key string) bool {
// Make sure the key has the expected format "kubernetes_secrets.somenamespace.somesecret.value"
tokens := strings.Split(key, ".")
if len(tokens) > 0 && tokens[0] != "kubernetes_secrets" {
return "", false
return false
}
if len(tokens) != 4 {
p.logger.Debugf(
"not valid secret key: %v. Secrets should be of the following format %v",
key,
"kubernetes_secrets.somenamespace.somesecret.value",
)
return false
}
return true
}

func (p *contextProviderK8sSecrets) addToCache(key string) (string, bool) {
valid := p.validateKey(key)
if !valid {
return "", false
}

value, ok := p.fetchSecretWithTimeout(key)
if ok {
p.secretsCacheMx.Lock()
p.secretsCache[key] = &secretsData{value: value}
p.secretsCacheMx.Unlock()
}
return value, ok
}

type Result struct {
value string
ok bool
}

func (p *contextProviderK8sSecrets) fetchSecretWithTimeout(key string) (string, bool) {
ctxTimeout, cancel := context.WithTimeout(context.Background(), p.config.RequestTimeout)
defer cancel()

resultCh := make(chan Result, 1)
p.fetchSecret(ctxTimeout, key, resultCh)

select {
case <-ctxTimeout.Done():
p.logger.Errorf("Could not retrieve value for key %v: %v", key, ctxTimeout.Err())
return "", false
case result := <-resultCh:
return result.value, result.ok
}
}

func (p *contextProviderK8sSecrets) fetchSecret(context context.Context, key string, resultCh chan Result) {
p.clientMx.Lock()
client := p.client
p.clientMx.Unlock()
if client == nil {
resultCh <- Result{value: "", ok: false}
return
}

tokens := strings.Split(key, ".")
// key has the format "kubernetes_secrets.somenamespace.somesecret.value"
// This function is only called from:
// - addToCache, where we already validated that the key has the right format.
// - updateCache, where the results are only added to the cache through addToCache
// Because of this we no longer need to validate the key
ns := tokens[1]
secretName := tokens[2]
secretVar := tokens[3]

secretIntefrace := client.CoreV1().Secrets(ns)
ctx := context.TODO()
secret, err := secretIntefrace.Get(ctx, secretName, metav1.GetOptions{})
secretInterface := client.CoreV1().Secrets(ns)
secret, err := secretInterface.Get(context, secretName, metav1.GetOptions{})

if err != nil {
p.logger.Errorf("Could not retrieve secret from k8s API: %v", err)
return "", false
resultCh <- Result{value: "", ok: false}
return
}
if _, ok := secret.Data[secretVar]; !ok {
p.logger.Errorf("Could not retrieve value %v for secret %v", secretVar, secretName)
return "", false
resultCh <- Result{value: "", ok: false}
return
}
secretString := secret.Data[secretVar]
return string(secretString), true
}

// Run initializes the k8s secrets context provider.
func (p *contextProviderK8sSecrets) Run(ctx context.Context, comm corecomp.ContextProviderComm) error {
client, err := getK8sClientFunc(p.config.KubeConfig, p.config.KubeClientOptions)
if err != nil {
p.logger.Debugf("Kubernetes_secrets provider skipped, unable to connect: %s", err)
return nil
}
p.clientMx.Lock()
p.client = client
p.clientMx.Unlock()
<-comm.Done()
p.clientMx.Lock()
p.client = nil
p.clientMx.Unlock()
return comm.Err()
}

func getK8sClient(kubeconfig string, opt kubernetes.KubeClientOptions) (k8sclient.Interface, error) {
return kubernetes.GetKubernetesClient(kubeconfig, opt)
secretString := secret.Data[secretVar]
resultCh <- Result{value: string(secretString), ok: true}
}
Loading

0 comments on commit 52a4275

Please sign in to comment.