Skip to content

Commit

Permalink
[release-1.16] Scheduler: LastOrdinal based on replicas instead of Fr…
Browse files Browse the repository at this point in the history
…eeCap (#8394)

Scheduler: LastOrdinal based on replicas instead of FreeCap

When scaling down and compacting, basing the last ordinal on the
free capacity structure leads to have a lastOrdinal off by one since
`FreeCap` might contain the free capacity for unschedulable pods.

We will have to continue including unschduelable pods in FreeCap
because it might happen that a pod becomes unscheduleble for external
reasons like node gets shutdown for pods with lower ordinals
and the pod need to be rescheduled and during that time period
we want to consider those when compacting; once all vpods that
were on that pod that is gone get rescheduled, FreeCap will only
include scheduleable pods.

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
knative-prow-robot and pierDipi authored Dec 19, 2024
1 parent ee786ee commit 7da3cee
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 57 deletions.
2 changes: 2 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type VPodLister func() ([]VPod, error)
// Evictor allows for vreplicas to be evicted.
// For instance, the evictor is used by the statefulset scheduler to
// move vreplicas to pod with a lower ordinal.
//
// pod might be `nil`.
type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) error

// Scheduler is responsible for placing VPods into real Kubernetes pods
Expand Down
56 changes: 23 additions & 33 deletions pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ type StateAccessor interface {
// It is used by for the scheduler and the autoscaler
type State struct {
// free tracks the free capacity of each pod.
//
// Including pods that might not exist anymore, it reflects the free capacity determined by
// placements in the vpod status.
FreeCap []int32

// schedulable pods tracks the pods that aren't being evicted.
SchedulablePods []int32

// LastOrdinal is the ordinal index corresponding to the last statefulset replica
// with placed vpods.
LastOrdinal int32

// Pod capacity.
Capacity int32

Expand Down Expand Up @@ -143,14 +142,10 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
return nil, err
}

free := make([]int32, 0)
freeCap := make([]int32, 0)
pending := make(map[types.NamespacedName]int32, 4)
expectedVReplicasByVPod := make(map[types.NamespacedName]int32, len(vpods))
schedulablePods := sets.NewInt32()
last := int32(-1)

// keep track of (vpod key, podname) pairs with existing placements
withPlacement := make(map[types.NamespacedName]map[string]bool)

podSpread := make(map[types.NamespacedName]map[string]int32)

Expand All @@ -172,7 +167,7 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
}

for _, p := range schedulablePods.List() {
free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0)
freeCap = s.updateFreeCapacity(logger, freeCap, PodNameFromOrdinal(s.statefulSetName, p), 0)
}

// Getting current state from existing placements for all vpods
Expand All @@ -182,16 +177,13 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
pending[vpod.GetKey()] = pendingFromVPod(vpod)
expectedVReplicasByVPod[vpod.GetKey()] = vpod.GetVReplicas()

withPlacement[vpod.GetKey()] = make(map[string]bool)
podSpread[vpod.GetKey()] = make(map[string]int32)

for i := 0; i < len(ps); i++ {
podName := ps[i].PodName
vreplicas := ps[i].VReplicas

free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas)

withPlacement[vpod.GetKey()][podName] = true
freeCap = s.updateFreeCapacity(logger, freeCap, podName, vreplicas)

pod, err := s.podLister.Get(podName)
if err != nil {
Expand All @@ -204,8 +196,17 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
}
}

state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, StatefulSetName: s.statefulSetName, PodLister: s.podLister,
PodSpread: podSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod}
state := &State{
FreeCap: freeCap,
SchedulablePods: schedulablePods.List(),
Capacity: s.capacity,
Replicas: scale.Spec.Replicas,
StatefulSetName: s.statefulSetName,
PodLister: s.podLister,
PodSpread: podSpread,
Pending: pending,
ExpectedVReplicaByVPod: expectedVReplicasByVPod,
}

logger.Infow("cluster state info", zap.Any("state", state))

Expand All @@ -219,23 +220,19 @@ func pendingFromVPod(vpod scheduler.VPod) int32 {
return int32(math.Max(float64(0), float64(expected-scheduled)))
}

func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) {
func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, freeCap []int32, podName string, vreplicas int32) []int32 {
ordinal := OrdinalFromPodName(podName)
free = grow(free, ordinal, s.capacity)
freeCap = grow(freeCap, ordinal, s.capacity)

free[ordinal] -= vreplicas
freeCap[ordinal] -= vreplicas

// Assert the pod is not overcommitted
if free[ordinal] < 0 {
if overcommit := freeCap[ordinal]; overcommit < 0 {
// This should not happen anymore. Log as an error but do not interrupt the current scheduling.
logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
}

if ordinal > last {
last = ordinal
logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("overcommit", overcommit))
}

return free, last
return freeCap
}

func (s *State) TotalPending() int32 {
Expand Down Expand Up @@ -283,23 +280,16 @@ func (s *State) MarshalJSON() ([]byte, error) {
type S struct {
FreeCap []int32 `json:"freeCap"`
SchedulablePods []int32 `json:"schedulablePods"`
LastOrdinal int32 `json:"lastOrdinal"`
Capacity int32 `json:"capacity"`
Replicas int32 `json:"replicas"`
NumZones int32 `json:"numZones"`
NumNodes int32 `json:"numNodes"`
NodeToZoneMap map[string]string `json:"nodeToZoneMap"`
StatefulSetName string `json:"statefulSetName"`
PodSpread map[string]map[string]int32 `json:"podSpread"`
NodeSpread map[string]map[string]int32 `json:"nodeSpread"`
ZoneSpread map[string]map[string]int32 `json:"zoneSpread"`
Pending map[string]int32 `json:"pending"`
}

sj := S{
FreeCap: s.FreeCap,
SchedulablePods: s.SchedulablePods,
LastOrdinal: s.LastOrdinal,
Capacity: s.Capacity,
Replicas: s.Replicas,
StatefulSetName: s.StatefulSetName,
Expand Down
14 changes: 7 additions & 7 deletions pkg/scheduler/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ func TestStateBuilder(t *testing.T) {
name: "no vpods",
replicas: int32(0),
vpods: [][]duckv1alpha1.Placement{},
expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, LastOrdinal: -1, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}},
expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}},
freec: int32(0),
},
{
name: "one vpods",
replicas: int32(1),
vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}},
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand All @@ -88,7 +88,7 @@ func TestStateBuilder(t *testing.T) {
{{PodName: "statefulset-name-1", VReplicas: 2}},
{{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(0), int32(1), int32(2)}, LastOrdinal: 2, Replicas: 3, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(0), int32(1), int32(2)}, Replicas: 3, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestStateBuilder(t *testing.T) {
{{PodName: "statefulset-name-1", VReplicas: 2}},
{{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(1), int32(2)}, LastOrdinal: 2, Replicas: 3, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(1), int32(2)}, Replicas: 3, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-2": 5,
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestStateBuilder(t *testing.T) {
{{PodName: "statefulset-name-1", VReplicas: 0}},
{{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 3, Replicas: 4, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, Replicas: 4, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestStateBuilder(t *testing.T) {
name: "three vpods but one tainted and one with no zone label",
replicas: int32(1),
vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}},
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand All @@ -207,7 +207,7 @@ func TestStateBuilder(t *testing.T) {
name: "one vpod (HA)",
replicas: int32(1),
vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}},
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand Down
18 changes: 5 additions & 13 deletions pkg/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/integer"
Expand Down Expand Up @@ -250,17 +251,8 @@ func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State) error {
zap.Any("state", s),
)

// when there is only one pod there is nothing to move or number of pods is just enough!
if s.LastOrdinal < 1 || len(s.SchedulablePods) <= 1 {
return nil
}

// Determine if there is enough free capacity to
// move all vreplicas placed in the last pod to pods with a lower ordinal
freeCapacity := s.FreeCapacity() - s.Free(s.LastOrdinal)
usedInLastPod := s.Capacity - s.Free(s.LastOrdinal)

if freeCapacity >= usedInLastPod {
// Determine if there are vpods that need compaction
if s.Replicas != int32(len(s.FreeCap)) {
a.lastCompactAttempt = time.Now()
err := a.compact(s)
if err != nil {
Expand All @@ -285,9 +277,9 @@ func (a *autoscaler) compact(s *st.State) error {
for i := len(placements) - 1; i >= 0; i-- { //start from the last placement
ordinal := st.OrdinalFromPodName(placements[i].PodName)

if ordinal == s.LastOrdinal {
if ordinal >= s.Replicas {
pod, err = s.PodLister.Get(placements[i].PodName)
if err != nil {
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err)
}

Expand Down
47 changes: 43 additions & 4 deletions pkg/scheduler/statefulset/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,7 @@ func TestCompactor(t *testing.T) {
{PodName: "statefulset-name-0", VReplicas: int32(8)},
{PodName: "statefulset-name-1", VReplicas: int32(2)}}),
},
wantEvictions: map[types.NamespacedName][]duckv1alpha1.Placement{
{Name: "vpod-1", Namespace: testNs}: {{PodName: "statefulset-name-1", VReplicas: int32(2)}},
},
wantEvictions: nil,
},
{
name: "multiple vpods, with placements in multiple pods, compacted",
Expand Down Expand Up @@ -500,8 +498,49 @@ func TestCompactor(t *testing.T) {
{PodName: "statefulset-name-0", VReplicas: int32(2)},
{PodName: "statefulset-name-2", VReplicas: int32(7)}}),
},
wantEvictions: nil,
},
{
name: "multiple vpods, scale down, with placements in multiple pods, compacted",
replicas: int32(2),
vpods: []scheduler.VPod{
tscheduler.NewVPod(testNs, "vpod-1", 10, []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: int32(3)},
{PodName: "statefulset-name-1", VReplicas: int32(7)}}),
tscheduler.NewVPod(testNs, "vpod-2", 10, []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: int32(7)},
{PodName: "statefulset-name-2", VReplicas: int32(3)}}),
},
wantEvictions: map[types.NamespacedName][]duckv1alpha1.Placement{
{Name: "vpod-2", Namespace: testNs}: {{PodName: "statefulset-name-2", VReplicas: int32(7)}},
{Name: "vpod-2", Namespace: testNs}: {{PodName: "statefulset-name-2", VReplicas: int32(3)}},
},
},
{
name: "multiple vpods, scale down multiple, with placements in multiple pods, compacted",
replicas: int32(1),
vpods: []scheduler.VPod{
tscheduler.NewVPod(testNs, "vpod-1", 6, []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: int32(3)},
{PodName: "statefulset-name-1", VReplicas: int32(7)},
{PodName: "statefulset-name-2", VReplicas: int32(6)},
}),
tscheduler.NewVPod(testNs, "vpod-2", 3, []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: int32(7)},
{PodName: "statefulset-name-2", VReplicas: int32(3)},
{PodName: "statefulset-name-3", VReplicas: int32(2)},
{PodName: "statefulset-name-10", VReplicas: int32(1)},
}),
},
wantEvictions: map[types.NamespacedName][]duckv1alpha1.Placement{
{Name: "vpod-1", Namespace: testNs}: {
{PodName: "statefulset-name-2", VReplicas: int32(6)},
{PodName: "statefulset-name-1", VReplicas: int32(7)},
},
{Name: "vpod-2", Namespace: testNs}: {
{PodName: "statefulset-name-10", VReplicas: int32(1)},
{PodName: "statefulset-name-3", VReplicas: int32(2)},
{PodName: "statefulset-name-2", VReplicas: int32(3)},
},
},
},
}
Expand Down

0 comments on commit 7da3cee

Please sign in to comment.