diff --git a/pkg/cluster/base.go b/pkg/cluster/base.go index 01094fb0..3b801aab 100644 --- a/pkg/cluster/base.go +++ b/pkg/cluster/base.go @@ -49,6 +49,7 @@ type ProviderBase struct { types.Status `json:"status"` types.SSH `json:",inline"` M *sync.Map + ErrM map[string]string Logger *logrus.Logger Callbacks map[string]*providerProcess } @@ -81,7 +82,8 @@ func NewBaseProvider() *ProviderBase { SSH: types.SSH{ SSHPort: "22", }, - M: new(syncmap.Map), + M: new(syncmap.Map), + ErrM: map[string]string{}, } } @@ -345,19 +347,38 @@ func (p *ProviderBase) InitCluster(options interface{}, deployPlugins func() []s Status: p.Status, } defer func() { - if er != nil { - p.Logger.Errorf("%v", er) + if er != nil || len(p.ErrM) > 0 { // save failed status. - if c == nil { - c = &types.Cluster{ - Metadata: p.Metadata, - Options: options, - SSH: p.SSH, - Status: types.Status{}, + state, err := common.DefaultDB.GetCluster(p.Name, p.Provider) + if err == nil { + var c types.Cluster + if state == nil { + c = types.Cluster{ + Metadata: p.Metadata, + Options: options, + SSH: p.SSH, + Status: types.Status{}, + } + } else { + c = common.ConvertToCluster(state, true) + } + if er != nil { + p.Logger.Errorf("%v", er) + c.Status.Status = common.StatusFailed + } + if len(p.ErrM) > 0 && len(c.WorkerNodes) > 0 { + // remove failed node from cluster state + workers := []types.Node{} + for _, n := range c.WorkerNodes { + if _, ok := p.ErrM[n.InstanceID]; !ok { + workers = append(workers, n) + } + } + c.WorkerNodes = workers + c.Worker = strconv.Itoa(len(workers)) } + _ = common.DefaultDB.SaveCluster(&c) } - c.Status.Status = common.StatusFailed - _ = common.DefaultDB.SaveCluster(c) _ = p.RollbackCluster(rollbackInstance) } if er == nil && len(p.Status.MasterNodes) > 0 { @@ -446,36 +467,11 @@ func (p *ProviderBase) InitCluster(options interface{}, deployPlugins func() []s if p.Enable != nil { for _, plugin := range p.Enable { if plugin != "explorer" { - // check addon plugin - addon, err := common.DefaultDB.GetAddon(plugin) - if err != nil { - p.Logger.Errorf("[%s] failed to get addon by name %s, got error: %v", p.Provider, plugin, err) - continue - } - manifest := addon.Manifest - defaultValues := addon.Values - // check --set values - setValues := map[string]string{} - for key, value := range p.Values { - if strings.HasPrefix(key, fmt.Sprintf("%s.", plugin)) { - setValues[strings.TrimPrefix(key, fmt.Sprintf("%s.", plugin))] = value - } else { - setValues[key] = value - } - } - values, err := common.GenerateValues(setValues, defaultValues) + cmd, err := p.addonInstallation(plugin) if err != nil { - p.Logger.Errorf("[%s] failed to generate values for addon %s with values %v: %v", p.Provider, plugin, p.Values, err) continue } - p.Logger.Debugf("assemble manifest with value %++v", values) - assembleManifest, err := common.AssembleManifest(values, string(manifest), p.parseDefaultTemplates()) - if err != nil { - p.Logger.Errorf("[%s] failed to assemble manifest for addon %s with values %v: %v", p.Provider, plugin, setValues, err) - continue - } - cmds = append(cmds, fmt.Sprintf(deployPluginCmd, - base64.StdEncoding.EncodeToString(assembleManifest), common.K3sManifestsDir, plugin)) + cmds = append(cmds, cmd) } } } @@ -510,12 +506,31 @@ func (p *ProviderBase) JoinNodes(cloudInstanceFunc func(ssh *types.SSH) (*types. return fmt.Errorf("[%s] cluster %s is not exist", p.Provider, p.Name) } defer func() { - if er != nil { + if er != nil || len(p.ErrM) > 0 { // join failed. - state.Status = common.StatusRunning - _ = common.DefaultDB.SaveClusterState(state) - // rollback instance. - _ = p.RollbackCluster(rollbackInstance) + state, err = common.DefaultDB.GetCluster(p.Name, p.Provider) + if err == nil { + if len(p.ErrM) > 0 { + // remove failed node from cluster state + workerNodes := []types.Node{} + _ = json.Unmarshal(state.WorkerNodes, &workerNodes) + workers := []types.Node{} + for _, n := range workerNodes { + if _, ok := p.ErrM[n.InstanceID]; !ok { + workers = append(workers, n) + } + } + wb, err := json.Marshal(workers) + if err == nil { + state.WorkerNodes = wb + } + state.Worker = strconv.Itoa(len(workers)) + } + state.Status = common.StatusRunning + _ = common.DefaultDB.SaveClusterState(state) + // rollback instance. + _ = p.RollbackCluster(rollbackInstance) + } } // remove join state file and save running state. _ = logFile.Close() @@ -1189,13 +1204,22 @@ func (p *ProviderBase) RollbackCluster(rollbackInstance func(ids []string) error p.Logger.Infof("[%s] executing rollback logic...", p.Provider) if rollbackInstance != nil { ids := make([]string, 0) - p.M.Range(func(key, value interface{}) bool { - v := value.(types.Node) - if v.RollBack { - ids = append(ids, key.(string)) + // support for partial rollback + if len(p.ErrM) > 0 { + p.Logger.Warnf("[%s] The following instances need to rollback in some of reasons...", p.Provider) + for key, value := range p.ErrM { + p.Logger.Warnf("[%s] The instance %s is failed to join to the K3s cluster with error: %v", p.Provider, key, value) + ids = append(ids, key) } - return true - }) + } else { + p.M.Range(func(key, value interface{}) bool { + v := value.(types.Node) + if v.RollBack { + ids = append(ids, key.(string)) + } + return true + }) + } p.Logger.Infof("[%s] instances %s will be rollback", p.Provider, ids) @@ -1417,3 +1441,36 @@ func (p *ProviderBase) parseDefaultTemplates() template.FuncMap { }, } } + +func (p *ProviderBase) addonInstallation(plugin string) (string, error) { + // check addon plugin + addon, err := common.DefaultDB.GetAddon(plugin) + if err != nil { + p.Logger.Errorf("[%s] failed to get addon by name %s, got error: %v", p.Provider, plugin, err) + return "", err + } + manifest := addon.Manifest + defaultValues := addon.Values + // check --set values + setValues := map[string]string{} + for key, value := range p.Values { + if strings.HasPrefix(key, fmt.Sprintf("%s.", plugin)) { + setValues[strings.TrimPrefix(key, fmt.Sprintf("%s.", plugin))] = value + } else { + setValues[key] = value + } + } + values, err := common.GenerateValues(setValues, defaultValues) + if err != nil { + p.Logger.Errorf("[%s] failed to generate values for addon %s with values %v: %v", p.Provider, plugin, p.Values, err) + return "", err + } + p.Logger.Debugf("assemble manifest with value %++v", values) + assembleManifest, err := common.AssembleManifest(values, string(manifest), p.parseDefaultTemplates()) + if err != nil { + p.Logger.Errorf("[%s] failed to assemble manifest for addon %s with values %v: %v", p.Provider, plugin, setValues, err) + return "", err + } + return fmt.Sprintf(deployPluginCmd, + base64.StdEncoding.EncodeToString(assembleManifest), common.K3sManifestsDir, plugin), nil +} diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index b5f24efa..a24a8502 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -10,6 +10,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/cnrancher/autok3s/pkg/airgap" @@ -71,50 +72,57 @@ func (p *ProviderBase) InitK3sCluster(cluster *types.Cluster) error { publicIP = cluster.MasterNodes[0].PublicIPAddress[0] } - workerExtraArgs := cluster.WorkerExtraArgs - - for i, master := range cluster.MasterNodes { - p.Logger.Infof("[%s] creating k3s master-%d...", p.Provider, i+1) - masterExtraArgs := cluster.MasterExtraArgs - providerExtraArgs := provider.GenerateMasterExtraArgs(cluster, master) - if providerExtraArgs != "" { - masterExtraArgs += providerExtraArgs + // initialize the first master node and worker node to validate the K3s configuration. + var firstControl, firstWorker types.Node + controlNodes := []types.Node{} + for i, control := range cluster.MasterNodes { + if i == 0 { + firstControl = control + continue + } + controlNodes = append(controlNodes, control) + } + workerNodes := []types.Node{} + if len(p.WorkerNodes) > 0 { + for i, w := range cluster.WorkerNodes { + if i == 0 { + firstWorker = w + continue + } + workerNodes = append(workerNodes, w) } + } - if err := p.initNode(i == 0, publicIP, cluster, master, masterExtraArgs, pkg); err != nil { + err = p.validateClusterConfig(cluster, provider, publicIP, pkg, firstControl, firstWorker) + if err != nil { + return err + } + + for i, master := range controlNodes { + p.Logger.Infof("[%s] join k3s control-%d...", p.Provider, i+1) + if err := p.initControlNode(cluster, provider, publicIP, pkg, master, false); err != nil { return err } - p.Logger.Infof("[%s] successfully created k3s master-%d", p.Provider, i+1) } - errGroup := utils.NewFirstErrorGroup() - for i, worker := range cluster.WorkerNodes { - errGroup.Go(func(i int, worker types.Node) func() error { - return func() error { - p.Logger.Infof("[%s] creating k3s worker-%d...", p.Provider, i+1) - extraArgs := workerExtraArgs - providerExtraArgs := provider.GenerateWorkerExtraArgs(cluster, worker) - if providerExtraArgs != "" { - extraArgs += providerExtraArgs - } - if err := p.initNode(false, publicIP, cluster, worker, extraArgs, pkg); err != nil { - return err - } - p.Logger.Infof("[%s] successfully created k3s worker-%d", p.Provider, i+1) - return nil + // batch join worker nodes + var wg sync.WaitGroup + var l sync.RWMutex + for i, worker := range workerNodes { + wg.Add(1) + go func(i int, worker types.Node) { + defer wg.Done() + p.Logger.Infof("[%s] creating k3s worker-%d...", p.Provider, i+1) + if err := p.initWorkerNode(cluster, provider, publicIP, pkg, worker); err != nil { + l.Lock() + p.ErrM[worker.InstanceID] = err.Error() + l.Unlock() } - }(i, worker)) - } - - // will block here to get the first error - if err, _ := <-errGroup.FirstError(); err != nil { - go func() { - errCount := errGroup.Wait() - p.Logger.Debugf("%d error occurs", errCount) - }() - return err + p.Logger.Infof("[%s] successfully created k3s worker-%d", p.Provider, i+1) + }(i, worker) } + wg.Wait() // get k3s cluster config. cfg, err := p.executeWithRetry(3, &cluster.MasterNodes[0], catCfgCommand) @@ -231,38 +239,34 @@ func (p *ProviderBase) Join(merged, added *types.Cluster) error { p.Logger.Infof("[%s] successfully joined k3s master-%d", merged.Provider, i+1) } - errGroup := utils.NewFirstErrorGroup() + var wg sync.WaitGroup + var l sync.RWMutex + for i := 0; i < len(added.Status.WorkerNodes); i++ { currentNode := added.WorkerNodes[i] full, ok := workerNodes[currentNode.InstanceID] if !ok { continue } + wg.Add(1) - errGroup.Go(func(i int, node types.Node) func() error { - return func() error { - p.Logger.Infof("[%s] joining k3s worker-%d...", merged.Provider, i+1) - extraArgs := merged.WorkerExtraArgs - additionalExtraArgs := provider.GenerateWorkerExtraArgs(added, full) - if additionalExtraArgs != "" { - extraArgs += additionalExtraArgs - } - if err := p.initNode(false, publicIP, merged, full, extraArgs, pkg); err != nil { - return err - } - p.Logger.Infof("[%s] successfully joined k3s worker-%d", merged.Provider, i+1) - return nil + go func(i int, node types.Node) { + defer wg.Done() + p.Logger.Infof("[%s] joining k3s worker-%d...", merged.Provider, i+1) + extraArgs := merged.WorkerExtraArgs + additionalExtraArgs := provider.GenerateWorkerExtraArgs(added, full) + if additionalExtraArgs != "" { + extraArgs += additionalExtraArgs } - }(i, full)) - } - - if err, _ := <-errGroup.FirstError(); err != nil { - go func() { - errCount := errGroup.Wait() - p.Logger.Debugf("%d error occurs", errCount) - }() - return err + if err := p.initNode(false, publicIP, merged, full, extraArgs, pkg); err != nil { + l.Lock() + p.ErrM[full.InstanceID] = err.Error() + l.Unlock() + } + p.Logger.Infof("[%s] successfully joined k3s worker-%d", merged.Provider, i+1) + }(i, full) } + wg.Wait() // sync master & worker numbers. merged.Master = strconv.Itoa(len(merged.MasterNodes)) @@ -904,3 +908,42 @@ func (p *ProviderBase) handleDataStoreCertificate(n *types.Node, c *types.Cluste _, err := p.execute(n, cmd...) return err } + +func (p *ProviderBase) validateClusterConfig(cluster *types.Cluster, provider providers.Provider, publicIP string, pkg *common.Package, firstControl, firstWorker types.Node) error { + p.Logger.Infof("[%s] initialize control node...", p.Provider) + if err := p.initControlNode(cluster, provider, publicIP, pkg, firstControl, true); err != nil { + return err + } + p.Logger.Infof("[%s] successfully initialize the first control node", p.Provider) + + if len(firstWorker.PublicIPAddress) <= 0 && firstWorker.InstanceID == "" { + // skip with empty worker node + return nil + } + p.Logger.Infof("[%s] initialize worker node...", p.Provider) + if err := p.initWorkerNode(cluster, provider, publicIP, pkg, firstWorker); err != nil { + return err + } + p.Logger.Infof("[%s] successfully initialize the first worker node", p.Provider) + + return nil +} + +func (p *ProviderBase) initControlNode(cluster *types.Cluster, provider providers.Provider, publicIP string, pkg *common.Package, controlNode types.Node, isFirst bool) error { + masterExtraArgs := cluster.MasterExtraArgs + providerExtraArgs := provider.GenerateMasterExtraArgs(cluster, controlNode) + if providerExtraArgs != "" { + masterExtraArgs += providerExtraArgs + } + + return p.initNode(isFirst, publicIP, cluster, controlNode, masterExtraArgs, pkg) +} + +func (p *ProviderBase) initWorkerNode(cluster *types.Cluster, provider providers.Provider, publicIP string, pkg *common.Package, workerNode types.Node) error { + workerExtraArgs := cluster.WorkerExtraArgs + providerExtraArgs := provider.GenerateWorkerExtraArgs(cluster, workerNode) + if providerExtraArgs != "" { + workerExtraArgs += providerExtraArgs + } + return p.initNode(false, publicIP, cluster, workerNode, workerExtraArgs, pkg) +} diff --git a/pkg/providers/native/flag.go b/pkg/providers/native/flag.go index 600f91b2..931702c7 100644 --- a/pkg/providers/native/flag.go +++ b/pkg/providers/native/flag.go @@ -164,6 +164,23 @@ func (p *Native) MergeClusterOptions() error { if option.WorkerIps != "" { workerIps = strings.Split(option.WorkerIps, ",") } + // remove invalid worker-ip + requeuedWorkerIps := []string{} + if len(p.WorkerNodes) > 0 { + for _, ip := range workerIps { + isContained := false + for _, n := range p.WorkerNodes { + if n.PublicIPAddress[0] == ip { + isContained = true + break + } + } + if isContained { + requeuedWorkerIps = append(requeuedWorkerIps, ip) + } + } + workerIps = requeuedWorkerIps + } optionWorkerIps := strings.Split(p.WorkerIps, ",") for _, ip := range optionWorkerIps { if !slice.ContainsString(workerIps, ip) { diff --git a/pkg/providers/native/native.go b/pkg/providers/native/native.go index 1969f764..b866728c 100644 --- a/pkg/providers/native/native.go +++ b/pkg/providers/native/native.go @@ -8,6 +8,7 @@ import ( "reflect" "strconv" "strings" + "time" "github.com/cnrancher/autok3s/pkg/cluster" "github.com/cnrancher/autok3s/pkg/common" @@ -133,6 +134,51 @@ func (p *Native) rollbackInstance(ids []string) error { nodes = append(nodes, node.(types.Node)) } } + kubeCfg := filepath.Join(common.CfgPath, common.KubeCfgFile) + client, err := cluster.GetClusterConfig(p.ContextName, kubeCfg) + if err != nil { + p.Logger.Errorf("[%s] failed to get kubeclient for rollback: %v", p.GetProviderName(), err) + } + if err == nil { + timeout := int64(5 * time.Second) + nodeList, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{TimeoutSeconds: &timeout}) + if err == nil && nodeList != nil { + rNodeList := []v1.Node{} + for _, id := range ids { + if v, ok := p.M.Load(id); ok { + n := v.(types.Node) + for _, node := range nodeList.Items { + var externalIP string + addressList := node.Status.Addresses + for _, address := range addressList { + switch address.Type { + case v1.NodeExternalIP: + externalIP = address.Address + default: + continue + } + } + if n.PublicIPAddress[0] == externalIP { + rNodeList = append(rNodeList, node) + } + } + } + } + if len(rNodeList) > 0 { + for _, rNode := range rNodeList { + p.Logger.Infof("[%s] remove node %s for rollback", p.GetProviderName(), rNode.Name) + e := client.CoreV1().Nodes().Delete(context.TODO(), rNode.Name, metav1.DeleteOptions{}) + if e != nil { + p.Logger.Errorf("[%s] failed to remove node %s for rollback: %v", p.GetProviderName(), rNode.Name, err) + } + } + } + } + } + + if err != nil { + p.Logger.Errorf("[%s] failed to get node list for rollback: %v", p.GetProviderName(), err) + } warnMsg := p.UninstallK3sNodes(nodes) for _, w := range warnMsg { p.Logger.Warnf("[%s] %s", p.GetProviderName(), w) @@ -394,11 +440,19 @@ func (p *Native) syncInstanceNodes() ([]types.Node, error) { // sync cluster nodes masterBytes, _ := json.Marshal(masterNodes) workerBytes, _ := json.Marshal(workerNodes) - state.MasterNodes = masterBytes - state.WorkerNodes = workerBytes - state.Master = strconv.Itoa(len(masterNodes)) - state.Worker = strconv.Itoa(len(workerNodes)) - err = common.DefaultDB.SaveClusterState(state) + + // check difference + if !reflect.DeepEqual(state.MasterNodes, masterBytes) || + !reflect.DeepEqual(state.WorkerNodes, workerBytes) || + !strings.EqualFold(state.Master, strconv.Itoa(len(masterNodes))) || + !strings.EqualFold(state.Worker, strconv.Itoa(len(workerNodes))) { + state.MasterNodes = masterBytes + state.WorkerNodes = workerBytes + state.Master = strconv.Itoa(len(masterNodes)) + state.Worker = strconv.Itoa(len(workerNodes)) + err = common.DefaultDB.SaveClusterState(state) + } + return nodes, err }