diff --git a/cli/installer/installer.go b/cli/installer/installer.go index 4377d0d..fe256be 100644 --- a/cli/installer/installer.go +++ b/cli/installer/installer.go @@ -78,7 +78,7 @@ func (k *installer) connectToEtcd(_ context.Context, creds *config.EtcdCredentia if err != nil { return err } - if err := k.client.ConnectToStore(creds, []string{net.JoinHostPort(host, "2379")}); err != nil { + if err := k.client.ConnectToStoreExternal(creds, []string{net.JoinHostPort(host, "2379")}); err != nil { k.logger.With(zap.Error(err)).Error("failed to connect to store") return err } diff --git a/cli/terminate/terminate.go b/cli/terminate/terminate.go index 6093b85..79ba295 100644 --- a/cli/terminate/terminate.go +++ b/cli/terminate/terminate.go @@ -43,7 +43,7 @@ func NewTerminate(logger *zap.Logger, creds *config.EtcdCredentials) (Terminate, if err != nil { return nil, err } - if err := client.ConnectToStore(creds, []string{net.JoinHostPort(host, "2379")}); err != nil { + if err := client.ConnectToStoreExternal(creds, []string{net.JoinHostPort(host, "2379")}); err != nil { return nil, err } return &terminate{kubeClient: client, logger: logger}, nil diff --git a/internal/k8sapi/k8sapi.go b/internal/k8sapi/k8sapi.go index 38dbc20..9d5f05a 100644 --- a/internal/k8sapi/k8sapi.go +++ b/internal/k8sapi/k8sapi.go @@ -5,9 +5,13 @@ package k8sapi import ( + "context" "errors" + "net" "os" + "strings" "sync" + "time" "github.com/benschlueter/delegatio/internal/config" "github.com/benschlueter/delegatio/internal/store" @@ -63,8 +67,8 @@ func NewClient(logger *zap.Logger) (kubeClient *Client, err error) { return } -// ConnectToStore connects to the etcd store. -func (k *Client) ConnectToStore(creds *config.EtcdCredentials, endpoints []string) error { +// ConnectToStoreExternal connects to the etcd store. +func (k *Client) ConnectToStoreExternal(creds *config.EtcdCredentials, endpoints []string) error { if k.SharedStore != nil { k.logger.Info("client is already connected to store, reconnecting") } @@ -130,5 +134,74 @@ func GetKubeConfigPath() (string, error) { return val, nil } +func (k *Client) GetStore() (store.Store, error) { + if k.SharedStore == nil { + if err := k.ConnectToStoreInternal(); err != nil { + k.logger.Info("client is not connected to etcd") + return nil, err + } + } + return k.SharedStore, nil +} + +// GetStore returns a store backed by kube etcd. Its only supposed to used within a kubernetes pod. +// Before calling this the installer needs to populate a config map with the respective credentials. +// Furthermore a serviceaccount must be set up for the namespace and it needs to be attached to the +// running pod. +func (k *Client) ConnectToStoreInternal() error { + var err error + var ns string + if _, err := os.Stat(config.NameSpaceFilePath); errors.Is(err, os.ErrNotExist) { + // ns is not ready when container spawns + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + ns, err = waitForNamespaceMount(ctx) + if err != nil { + k.logger.Error("failed to get namespace after timeout", zap.Error(err)) + return err + } + } else { + // out of cluster mode currently assumes 'ssh' namespace + if content, err := os.ReadFile(config.NameSpaceFilePath); err == nil { + ns = strings.TrimSpace(string(content)) + } else { + return err + } + } + k.logger.Info("namespace", zap.String("namespace", ns)) + configData, err := k.GetConfigMapData(context.Background(), ns, "etcd-credentials") + if err != nil { + return err + } + // logger.Info("config", zap.Any("configData", configData)) + etcdStore, err := store.NewEtcdStore([]string{net.JoinHostPort(configData["advertiseAddr"], "2379")}, k.logger, []byte(configData["caCert"]), []byte(configData["cert"]), []byte(configData["key"])) + if err != nil { + return err + } + k.SharedStore = etcdStore + return nil +} + +// waitForNamespaceMount waits for the namespace file to be mounted and filled. +func waitForNamespaceMount(ctx context.Context) (string, error) { + t := time.NewTicker(100 * time.Millisecond) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-t.C: + data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + if err != nil && !errors.Is(err, os.ErrNotExist) { + return "", err + } + ns := strings.TrimSpace(string(data)) + if len(ns) != 0 { + return ns, nil + } + } + } +} + // ErrNotConnected is returned when the client is not connected to etcd. var ErrNotConnected = errors.New("client is not connected to etcd")