diff --git a/README.md b/README.md index 75b142f..f34ef78 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Delete a pod that is scheduled to be biased to a specific node. ### Installation ``` -kubectl apply -f https://github.com/norseto/k8s-watchdogs/releases/download/pod-rebalancer-v0.0.1/pod-rebalancer.yaml +kubectl apply -f https://github.com/norseto/k8s-watchdogs/releases/download/pod-rebalancer-v0.1.0/pod-rebalancer.yaml ``` ### Limitation diff --git a/cmd/evicted-cleaner/main.go b/cmd/evicted-cleaner/main.go index 11096e3..28b43a3 100755 --- a/cmd/evicted-cleaner/main.go +++ b/cmd/evicted-cleaner/main.go @@ -1,14 +1,15 @@ package main + // Evicted Pod Cleaner // Deletes all evicted pod. import ( + k8sutils "github.com/norseto/k8s-watchdogs/pkg/k8sutils" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "k8s.io/api/core/v1" - common "github.com/norseto/k8s-watchdogs/pkg" ) const ( @@ -18,7 +19,7 @@ const ( func main() { var clientset *kubernetes.Clientset - clientset, err := common.NewClientset() + clientset, err := k8sutils.NewClientset() if err != nil { log.Panic(errors.Wrap(err, "failed to create clientset")) } diff --git a/cmd/pod-rebalancer/main.go b/cmd/pod-rebalancer/main.go index 9bc53b0..b32546c 100755 --- a/cmd/pod-rebalancer/main.go +++ b/cmd/pod-rebalancer/main.go @@ -21,7 +21,7 @@ func main() { var clientset *kubernetes.Clientset var namespace = metav1.NamespaceAll - log.Info("Starging multiple pod rs rebalancer...") + log.Info("Starting multiple pod rs rebalancer...") clientset, err := k8sutils.NewClientset() if err != nil { @@ -33,11 +33,11 @@ func main() { log.Panic(errors.Wrap(err, "failed to list nodes")) } - replicasets, err := getTargetReplicasets(clientset, namespace) + replicasets, err := getTargetReplicaSets(clientset, namespace) if err != nil { log.Panic(errors.Wrap(err, "failed to list replicaset")) } - rs, err := getTargetPods(clientset, namespace, nodes, replicasets) + rs, err := getCandidatePods(clientset, namespace, nodes, replicasets) if err != nil { log.Panic(errors.Wrap(err, "failed to list pods")) } @@ -47,14 +47,19 @@ func main() { return } + rsstat := k8sutils.NewReplicaSetStatus(replicasets) rebalanced := 0 for _, r := range rs { name := r.replicaset.Name + if rsstat.IsRollingUpdating(r.replicaset) { + log.Info(fmt.Sprint("May under rolling update. Leave untouched. rs: ", name)) + continue + } result, err := newRebalancer(r).Rebalance(clientset) if err != nil { log.Error(errors.Wrap(err, fmt.Sprint("failed to rebalance rs: ", name))) } else if result { - log.Debug(fmt.Sprint("Rebalanced rs: ", name)) + log.Info(fmt.Sprint("Rebalanced rs: ", name)) rebalanced++ } else { log.Debug(fmt.Sprint("No need to rebalance rs: ", name)) @@ -64,7 +69,7 @@ func main() { log.Info("Done multiple pod rs rebalancer. Rebalanced ", rebalanced, " ReplicaSet(s)") } -// getTargetReplicasets gets target replicaset. +// getTargetReplicaSets gets target replicaset. // Parameter: // c *kubernetes.Clientset : clientset // ns string : namespace of replicaset @@ -72,20 +77,18 @@ func main() { // []appsv1.ReplicaSet : All target replicasets that does not hace // affinity nor tolerations nor nodeselector // error : error if error happens -func getTargetReplicasets(c *kubernetes.Clientset, ns string) ([]appsv1.ReplicaSet, error) { +func getTargetReplicaSets(c *kubernetes.Clientset, ns string) ([]appsv1.ReplicaSet, error) { var replicasets = []appsv1.ReplicaSet{} all, err := c.AppsV1().ReplicaSets(ns).List(metav1.ListOptions{IncludeUninitialized: false}) if err != nil { return nil, errors.Wrap(err, "failed to list replicaset") } - for _, rs := range all.Items { - replicasets = append(replicasets, rs) - } + replicasets = append(replicasets, all.Items...) return replicasets, nil } -// getTargetPods gets target pods. -func getTargetPods(c *kubernetes.Clientset, ns string, nodes []v1.Node, rslist []appsv1.ReplicaSet) ([]*replicaState, error) { +// getCandidatePods gets pod candidate. +func getCandidatePods(c *kubernetes.Clientset, ns string, nodes []v1.Node, rslist []appsv1.ReplicaSet) ([]*replicaState, error) { nodeMap := make(map[string]v1.Node) stats := []*replicaState{} rsmap := make(map[types.UID]*replicaState) diff --git a/cmd/pod-rebalancer/rebalancer.go b/cmd/pod-rebalancer/rebalancer.go index eb4cec0..5b32250 100644 --- a/cmd/pod-rebalancer/rebalancer.go +++ b/cmd/pod-rebalancer/rebalancer.go @@ -1,6 +1,7 @@ package main import ( + log "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" @@ -47,7 +48,7 @@ func (r *rebalancer) Rebalance(c *kubernetes.Clientset) (bool, error) { node, num := r.maxPodNode() ave := float32(sr) / float32(nodeCount) - if len(node) > 0 && num >= int(ave+1.0) { + if len(node) > 0 && float32(num) >= ave+1.0 { err := r.deleteNodePod(c, node) return true, err } @@ -59,6 +60,7 @@ func (r *rebalancer) Rebalance(c *kubernetes.Clientset) (bool, error) { func (r *rebalancer) deleteNodePod(c *kubernetes.Clientset, node string) error { for _, s := range r.current.podState { if s.node.Name == node { + log.Debug("Deleting pod " + s.pod.Name + " in " + node) return k8sutils.DeletePod(c, *s.pod) } } diff --git a/pkg/k8sutils/rs.go b/pkg/k8sutils/rs.go index 695e6aa..1fa4d1e 100644 --- a/pkg/k8sutils/rs.go +++ b/pkg/k8sutils/rs.go @@ -3,8 +3,56 @@ package k8sutils import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" ) +// Replicaset owners counter. +type rsowners struct { + owners map[types.UID]int +} + +// ReplicaSetStatus represents total ReplicaSets. +type ReplicaSetStatus interface { + // IsRollingUpdating checks ReplicaSet is now on rollingupdate. + // Parameters: + // rs: ReplicaSet + // Returns: + // True if under rollingupdate. + IsRollingUpdating(rs *appsv1.ReplicaSet) bool +} + +// NewReplicaSetStatus retuns new ReplicasetStatus instance. +// Parameters: +// rs: Array of ReplicaSet +// Returns: +// a new instance. +func NewReplicaSetStatus(rs []appsv1.ReplicaSet) ReplicaSetStatus { + ret := &rsowners{owners: map[types.UID]int{}} + for _, r := range rs { + if *r.Spec.Replicas == 0 { + continue + } + for _, o := range r.OwnerReferences { + ret.owners[o.UID]++ + } + } + return ret +} + +// IsRollingUpdating checks ReplicaSet is now on rollingupdate. +// Parameters: +// rs: ReplicaSet +// Returns: +// True if under rollingupdate. +func (u *rsowners) IsRollingUpdating(rs *appsv1.ReplicaSet) bool { + for _, o := range rs.OwnerReferences { + if u.owners[o.UID] > 1 { + return true + } + } + return false +} + // IsPodScheduleLimeted returns true if Pod Spec of Replicaset has any schedule limeted // like pod has Affinity, Toleration, or NodeSelector // Parameter: diff --git a/pod-rebalancer.yaml b/pod-rebalancer.yaml index 6c346fb..4004486 100755 --- a/pod-rebalancer.yaml +++ b/pod-rebalancer.yaml @@ -39,7 +39,7 @@ metadata: namespace: kube-system name: pod-rebalancer spec: - schedule: "20 * * * *" + schedule: "25 * * * *" concurrencyPolicy: Forbid jobTemplate: spec: @@ -57,7 +57,7 @@ spec: - amd64 containers: - name: pod-rebalancer - image: docker.io/norseto/pod-rebalancer:0.0.1 + image: docker.io/norseto/pod-rebalancer:0.1.0 imagePullPolicy: IfNotPresent restartPolicy: OnFailure activeDeadlineSeconds: 60