Skip to content

Commit

Permalink
feat(cluster): support partial rollback when creating clusters or joi…
Browse files Browse the repository at this point in the history
…ning nodes
  • Loading branch information
JacieChao committed Jan 31, 2024
1 parent aa2e6f1 commit 332e8e2
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 112 deletions.
155 changes: 106 additions & 49 deletions pkg/cluster/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type ProviderBase struct {
types.Status `json:"status"`
types.SSH `json:",inline"`
M *sync.Map
ErrM map[string]string
Logger *logrus.Logger
Callbacks map[string]*providerProcess
}
Expand Down Expand Up @@ -81,7 +82,8 @@ func NewBaseProvider() *ProviderBase {
SSH: types.SSH{
SSHPort: "22",
},
M: new(syncmap.Map),
M: new(syncmap.Map),
ErrM: map[string]string{},
}
}

Expand Down Expand Up @@ -345,19 +347,38 @@ func (p *ProviderBase) InitCluster(options interface{}, deployPlugins func() []s
Status: p.Status,
}
defer func() {
if er != nil {
p.Logger.Errorf("%v", er)
if er != nil || len(p.ErrM) > 0 {
// save failed status.
if c == nil {
c = &types.Cluster{
Metadata: p.Metadata,
Options: options,
SSH: p.SSH,
Status: types.Status{},
state, err := common.DefaultDB.GetCluster(p.Name, p.Provider)
if err == nil {
var c types.Cluster
if state == nil {
c = types.Cluster{
Metadata: p.Metadata,
Options: options,
SSH: p.SSH,
Status: types.Status{},
}
} else {
c = common.ConvertToCluster(state, true)
}
if er != nil {
p.Logger.Errorf("%v", er)
c.Status.Status = common.StatusFailed
}
if len(p.ErrM) > 0 && len(c.WorkerNodes) > 0 {
// remove failed node from cluster state
workers := []types.Node{}
for _, n := range c.WorkerNodes {
if _, ok := p.ErrM[n.InstanceID]; !ok {
workers = append(workers, n)
}
}
c.WorkerNodes = workers
c.Worker = strconv.Itoa(len(workers))
}
_ = common.DefaultDB.SaveCluster(&c)
}
c.Status.Status = common.StatusFailed
_ = common.DefaultDB.SaveCluster(c)
_ = p.RollbackCluster(rollbackInstance)
}
if er == nil && len(p.Status.MasterNodes) > 0 {
Expand Down Expand Up @@ -446,36 +467,11 @@ func (p *ProviderBase) InitCluster(options interface{}, deployPlugins func() []s
if p.Enable != nil {
for _, plugin := range p.Enable {
if plugin != "explorer" {
// check addon plugin
addon, err := common.DefaultDB.GetAddon(plugin)
if err != nil {
p.Logger.Errorf("[%s] failed to get addon by name %s, got error: %v", p.Provider, plugin, err)
continue
}
manifest := addon.Manifest
defaultValues := addon.Values
// check --set values
setValues := map[string]string{}
for key, value := range p.Values {
if strings.HasPrefix(key, fmt.Sprintf("%s.", plugin)) {
setValues[strings.TrimPrefix(key, fmt.Sprintf("%s.", plugin))] = value
} else {
setValues[key] = value
}
}
values, err := common.GenerateValues(setValues, defaultValues)
cmd, err := p.addonInstallation(plugin)
if err != nil {
p.Logger.Errorf("[%s] failed to generate values for addon %s with values %v: %v", p.Provider, plugin, p.Values, err)
continue
}
p.Logger.Debugf("assemble manifest with value %++v", values)
assembleManifest, err := common.AssembleManifest(values, string(manifest), p.parseDefaultTemplates())
if err != nil {
p.Logger.Errorf("[%s] failed to assemble manifest for addon %s with values %v: %v", p.Provider, plugin, setValues, err)
continue
}
cmds = append(cmds, fmt.Sprintf(deployPluginCmd,
base64.StdEncoding.EncodeToString(assembleManifest), common.K3sManifestsDir, plugin))
cmds = append(cmds, cmd)
}
}
}
Expand Down Expand Up @@ -510,12 +506,31 @@ func (p *ProviderBase) JoinNodes(cloudInstanceFunc func(ssh *types.SSH) (*types.
return fmt.Errorf("[%s] cluster %s is not exist", p.Provider, p.Name)
}
defer func() {
if er != nil {
if er != nil || len(p.ErrM) > 0 {
// join failed.
state.Status = common.StatusRunning
_ = common.DefaultDB.SaveClusterState(state)
// rollback instance.
_ = p.RollbackCluster(rollbackInstance)
state, err = common.DefaultDB.GetCluster(p.Name, p.Provider)
if err == nil {
if len(p.ErrM) > 0 {
// remove failed node from cluster state
workerNodes := []types.Node{}
_ = json.Unmarshal(state.WorkerNodes, &workerNodes)
workers := []types.Node{}
for _, n := range workerNodes {
if _, ok := p.ErrM[n.InstanceID]; !ok {
workers = append(workers, n)
}
}
wb, err := json.Marshal(workers)
if err == nil {
state.WorkerNodes = wb
}
state.Worker = strconv.Itoa(len(workers))
}
state.Status = common.StatusRunning
_ = common.DefaultDB.SaveClusterState(state)
// rollback instance.
_ = p.RollbackCluster(rollbackInstance)
}
}
// remove join state file and save running state.
_ = logFile.Close()
Expand Down Expand Up @@ -1189,13 +1204,22 @@ func (p *ProviderBase) RollbackCluster(rollbackInstance func(ids []string) error
p.Logger.Infof("[%s] executing rollback logic...", p.Provider)
if rollbackInstance != nil {
ids := make([]string, 0)
p.M.Range(func(key, value interface{}) bool {
v := value.(types.Node)
if v.RollBack {
ids = append(ids, key.(string))
// support for partial rollback
if len(p.ErrM) > 0 {
p.Logger.Warnf("[%s] The following instances need to rollback in some of reasons...", p.Provider)
for key, value := range p.ErrM {
p.Logger.Warnf("[%s] The instance %s is failed to join to the K3s cluster with error: %v", p.Provider, key, value)
ids = append(ids, key)
}
return true
})
} else {
p.M.Range(func(key, value interface{}) bool {
v := value.(types.Node)
if v.RollBack {
ids = append(ids, key.(string))
}
return true
})
}

p.Logger.Infof("[%s] instances %s will be rollback", p.Provider, ids)

Expand Down Expand Up @@ -1417,3 +1441,36 @@ func (p *ProviderBase) parseDefaultTemplates() template.FuncMap {
},
}
}

func (p *ProviderBase) addonInstallation(plugin string) (string, error) {
// check addon plugin
addon, err := common.DefaultDB.GetAddon(plugin)
if err != nil {
p.Logger.Errorf("[%s] failed to get addon by name %s, got error: %v", p.Provider, plugin, err)
return "", err
}
manifest := addon.Manifest
defaultValues := addon.Values
// check --set values
setValues := map[string]string{}
for key, value := range p.Values {
if strings.HasPrefix(key, fmt.Sprintf("%s.", plugin)) {
setValues[strings.TrimPrefix(key, fmt.Sprintf("%s.", plugin))] = value
} else {
setValues[key] = value
}
}
values, err := common.GenerateValues(setValues, defaultValues)
if err != nil {
p.Logger.Errorf("[%s] failed to generate values for addon %s with values %v: %v", p.Provider, plugin, p.Values, err)
return "", err
}
p.Logger.Debugf("assemble manifest with value %++v", values)
assembleManifest, err := common.AssembleManifest(values, string(manifest), p.parseDefaultTemplates())
if err != nil {
p.Logger.Errorf("[%s] failed to assemble manifest for addon %s with values %v: %v", p.Provider, plugin, setValues, err)
return "", err
}
return fmt.Sprintf(deployPluginCmd,
base64.StdEncoding.EncodeToString(assembleManifest), common.K3sManifestsDir, plugin), nil
}
Loading

0 comments on commit 332e8e2

Please sign in to comment.