Skip to content

Commit

Permalink
Finish pod-rebalancer-v0.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
norseto committed Dec 10, 2019
2 parents 8e9e52b + 2ffbf2b commit 55c8ad5
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Delete a pod that is scheduled to be biased to a specific node.

### Installation
```
kubectl apply -f https://github.com/norseto/k8s-watchdogs/releases/download/pod-rebalancer-v0.0.1/pod-rebalancer.yaml
kubectl apply -f https://github.com/norseto/k8s-watchdogs/releases/download/pod-rebalancer-v0.1.0/pod-rebalancer.yaml
```

### Limitation
Expand Down
7 changes: 4 additions & 3 deletions cmd/evicted-cleaner/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package main

// Evicted Pod Cleaner
// Deletes all evicted pod.

import (
k8sutils "github.com/norseto/k8s-watchdogs/pkg/k8sutils"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/api/core/v1"
common "github.com/norseto/k8s-watchdogs/pkg"
)

const (
Expand All @@ -18,7 +19,7 @@ const (
func main() {
var clientset *kubernetes.Clientset

clientset, err := common.NewClientset()
clientset, err := k8sutils.NewClientset()
if err != nil {
log.Panic(errors.Wrap(err, "failed to create clientset"))
}
Expand Down
25 changes: 14 additions & 11 deletions cmd/pod-rebalancer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func main() {
var clientset *kubernetes.Clientset
var namespace = metav1.NamespaceAll

log.Info("Starging multiple pod rs rebalancer...")
log.Info("Starting multiple pod rs rebalancer...")

clientset, err := k8sutils.NewClientset()
if err != nil {
Expand All @@ -33,11 +33,11 @@ func main() {
log.Panic(errors.Wrap(err, "failed to list nodes"))
}

replicasets, err := getTargetReplicasets(clientset, namespace)
replicasets, err := getTargetReplicaSets(clientset, namespace)
if err != nil {
log.Panic(errors.Wrap(err, "failed to list replicaset"))
}
rs, err := getTargetPods(clientset, namespace, nodes, replicasets)
rs, err := getCandidatePods(clientset, namespace, nodes, replicasets)
if err != nil {
log.Panic(errors.Wrap(err, "failed to list pods"))
}
Expand All @@ -47,14 +47,19 @@ func main() {
return
}

rsstat := k8sutils.NewReplicaSetStatus(replicasets)
rebalanced := 0
for _, r := range rs {
name := r.replicaset.Name
if rsstat.IsRollingUpdating(r.replicaset) {
log.Info(fmt.Sprint("May under rolling update. Leave untouched. rs: ", name))
continue
}
result, err := newRebalancer(r).Rebalance(clientset)
if err != nil {
log.Error(errors.Wrap(err, fmt.Sprint("failed to rebalance rs: ", name)))
} else if result {
log.Debug(fmt.Sprint("Rebalanced rs: ", name))
log.Info(fmt.Sprint("Rebalanced rs: ", name))
rebalanced++
} else {
log.Debug(fmt.Sprint("No need to rebalance rs: ", name))
Expand All @@ -64,28 +69,26 @@ func main() {
log.Info("Done multiple pod rs rebalancer. Rebalanced ", rebalanced, " ReplicaSet(s)")
}

// getTargetReplicasets gets target replicaset.
// getTargetReplicaSets gets target replicaset.
// Parameter:
// c *kubernetes.Clientset : clientset
// ns string : namespace of replicaset
// Returns:
// []appsv1.ReplicaSet : All target replicasets that does not hace
// affinity nor tolerations nor nodeselector
// error : error if error happens
func getTargetReplicasets(c *kubernetes.Clientset, ns string) ([]appsv1.ReplicaSet, error) {
func getTargetReplicaSets(c *kubernetes.Clientset, ns string) ([]appsv1.ReplicaSet, error) {
var replicasets = []appsv1.ReplicaSet{}
all, err := c.AppsV1().ReplicaSets(ns).List(metav1.ListOptions{IncludeUninitialized: false})
if err != nil {
return nil, errors.Wrap(err, "failed to list replicaset")
}
for _, rs := range all.Items {
replicasets = append(replicasets, rs)
}
replicasets = append(replicasets, all.Items...)
return replicasets, nil
}

// getTargetPods gets target pods.
func getTargetPods(c *kubernetes.Clientset, ns string, nodes []v1.Node, rslist []appsv1.ReplicaSet) ([]*replicaState, error) {
// getCandidatePods gets pod candidate.
func getCandidatePods(c *kubernetes.Clientset, ns string, nodes []v1.Node, rslist []appsv1.ReplicaSet) ([]*replicaState, error) {
nodeMap := make(map[string]v1.Node)
stats := []*replicaState{}
rsmap := make(map[types.UID]*replicaState)
Expand Down
4 changes: 3 additions & 1 deletion cmd/pod-rebalancer/rebalancer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -47,7 +48,7 @@ func (r *rebalancer) Rebalance(c *kubernetes.Clientset) (bool, error) {

node, num := r.maxPodNode()
ave := float32(sr) / float32(nodeCount)
if len(node) > 0 && num >= int(ave+1.0) {
if len(node) > 0 && float32(num) >= ave+1.0 {
err := r.deleteNodePod(c, node)
return true, err
}
Expand All @@ -59,6 +60,7 @@ func (r *rebalancer) Rebalance(c *kubernetes.Clientset) (bool, error) {
func (r *rebalancer) deleteNodePod(c *kubernetes.Clientset, node string) error {
for _, s := range r.current.podState {
if s.node.Name == node {
log.Debug("Deleting pod " + s.pod.Name + " in " + node)
return k8sutils.DeletePod(c, *s.pod)
}
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/k8sutils/rs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,56 @@ package k8sutils
import (
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)

// Replicaset owners counter.
type rsowners struct {
owners map[types.UID]int
}

// ReplicaSetStatus represents total ReplicaSets.
type ReplicaSetStatus interface {
// IsRollingUpdating checks ReplicaSet is now on rollingupdate.
// Parameters:
// rs: ReplicaSet
// Returns:
// True if under rollingupdate.
IsRollingUpdating(rs *appsv1.ReplicaSet) bool
}

// NewReplicaSetStatus retuns new ReplicasetStatus instance.
// Parameters:
// rs: Array of ReplicaSet
// Returns:
// a new instance.
func NewReplicaSetStatus(rs []appsv1.ReplicaSet) ReplicaSetStatus {
ret := &rsowners{owners: map[types.UID]int{}}
for _, r := range rs {
if *r.Spec.Replicas == 0 {
continue
}
for _, o := range r.OwnerReferences {
ret.owners[o.UID]++
}
}
return ret
}

// IsRollingUpdating checks ReplicaSet is now on rollingupdate.
// Parameters:
// rs: ReplicaSet
// Returns:
// True if under rollingupdate.
func (u *rsowners) IsRollingUpdating(rs *appsv1.ReplicaSet) bool {
for _, o := range rs.OwnerReferences {
if u.owners[o.UID] > 1 {
return true
}
}
return false
}

// IsPodScheduleLimeted returns true if Pod Spec of Replicaset has any schedule limeted
// like pod has Affinity, Toleration, or NodeSelector
// Parameter:
Expand Down
4 changes: 2 additions & 2 deletions pod-rebalancer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ metadata:
namespace: kube-system
name: pod-rebalancer
spec:
schedule: "20 * * * *"
schedule: "25 * * * *"
concurrencyPolicy: Forbid
jobTemplate:
spec:
Expand All @@ -57,7 +57,7 @@ spec:
- amd64
containers:
- name: pod-rebalancer
image: docker.io/norseto/pod-rebalancer:0.0.1
image: docker.io/norseto/pod-rebalancer:0.1.0
imagePullPolicy: IfNotPresent
restartPolicy: OnFailure
activeDeadlineSeconds: 60
Expand Down

0 comments on commit 55c8ad5

Please sign in to comment.