Skip to content

Commit

Permalink
[release-1.8] [Scheduler] Handle permanent pending pods (#6603)
Browse files Browse the repository at this point in the history
* Handle permanent pending pods

When the autoscaler tries to scale up the number of the statefulset
pods but some pods are unschedulable due to missing resources
in the cluster, the scheduler ends up bringing every already
scheduled resource (vpod) to Ready=False since pending pods have
no nodes assigned (Pod.Spec.NodeName is empty), so when the
scheduler tries to build the state of the system, it will try
to get the empty name node which doesn't ever exist.

To reproduce this issue, we can use this command on a small
cluster (on KinD is consistently reproducible since it's
very unlikely that KinD locally can schedule hundreds of pods):

```
seq 100 | xargs -I{} sh -c "kubectl create ns {}; kubectl apply -f ../kafka-source-example.yaml -n {}"
```

where `kafka-source-example.yaml` is:

```
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: knative-demo-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 3
  replicas: 1
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824

* Review comments

Signed-off-by: Pierangelo Di Pilato <[email protected]>

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
knative-prow-robot and pierDipi authored Nov 10, 2022
1 parent ae5fdcc commit 23a7b56
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 40 deletions.
27 changes: 16 additions & 11 deletions pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
corev1 "k8s.io/client-go/listers/core/v1"
Expand Down Expand Up @@ -180,7 +181,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}

free := make([]int32, 0)
schedulablePods := make([]int32, 0)
schedulablePods := sets.NewInt32()
last := int32(-1)

// keep track of (vpod key, podname) pairs with existing placements
Expand Down Expand Up @@ -224,6 +225,10 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
})

if pod != nil {
if isPodUnschedulable(pod) {
// Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod.
continue
}

node, err := s.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
Expand All @@ -234,14 +239,10 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
// Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node.
continue
}
if isPodUnschedulable(pod) {
// Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod.
continue
}

// Pod has no annotation or not annotated as unschedulable and
// not on an unschedulable node, so add to feasible
schedulablePods = append(schedulablePods, podId)
schedulablePods.Insert(podId)
}
}

Expand Down Expand Up @@ -271,7 +272,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
return err == nil, nil
})

if pod != nil {
if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) {
nodeName := pod.Spec.NodeName //node name for this pod
zoneName := nodeToZoneMap[nodeName] //zone name for this pod
podSpread[vpod.GetKey()][podName] = podSpread[vpod.GetKey()][podName] + vreplicas
Expand All @@ -296,7 +297,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
return err == nil, nil
})

if pod != nil {
if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) {
nodeName := pod.Spec.NodeName //node name for this pod
zoneName := nodeToZoneMap[nodeName] //zone name for this pod
podSpread[key][podName] = podSpread[key][podName] + rvreplicas
Expand All @@ -310,7 +311,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}

s.logger.Infow("cluster state info", zap.String("NumPods", fmt.Sprint(scale.Spec.Replicas)), zap.String("NumZones", fmt.Sprint(len(zoneMap))), zap.String("NumNodes", fmt.Sprint(len(nodeToZoneMap))), zap.String("Schedulable", fmt.Sprint(schedulablePods)))
return &State{FreeCap: free, SchedulablePods: schedulablePods, LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, NumZones: int32(len(zoneMap)), NumNodes: int32(len(nodeToZoneMap)),
return &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, NumZones: int32(len(zoneMap)), NumNodes: int32(len(nodeToZoneMap)),
SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister,
PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread}, nil
}
Expand Down Expand Up @@ -371,8 +372,12 @@ func withReserved(key types.NamespacedName, podName string, committed int32, res

func isPodUnschedulable(pod *v1.Pod) bool {
annotVal, ok := pod.ObjectMeta.Annotations[scheduler.PodAnnotationKey]
unschedulable, val := strconv.ParseBool(annotVal)
return ok && val == nil && unschedulable
unschedulable, err := strconv.ParseBool(annotVal)

isMarkedUnschedulable := ok && err == nil && unschedulable
isPending := pod.Spec.NodeName == ""

return isMarkedUnschedulable || isPending
}

func isNodeUnschedulable(node *v1.Node) bool {
Expand Down
69 changes: 65 additions & 4 deletions pkg/scheduler/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestStateBuilder(t *testing.T) {
testCases := []struct {
name string
replicas int32
pendingReplicas int32
vpods [][]duckv1alpha1.Placement
expected State
freec int32
Expand Down Expand Up @@ -145,6 +146,55 @@ func TestStateBuilder(t *testing.T) {
schedulerPolicyType: scheduler.MAXFILLUP,
nodes: []*v1.Node{tscheduler.MakeNode("node-0", "zone-0"), tscheduler.MakeNode("node-1", "zone-1"), tscheduler.MakeNode("node-2", "zone-2")},
},
{
name: "many vpods, unschedulable pending pods (statefulset-name-0)",
replicas: int32(3),
pendingReplicas: int32(1),
vpods: [][]duckv1alpha1.Placement{
{{PodName: "statefulset-name-0", VReplicas: 1}, {PodName: "statefulset-name-2", VReplicas: 5}},
{{PodName: "statefulset-name-1", VReplicas: 2}},
{{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(1), int32(2)}, LastOrdinal: 2, Replicas: 3, NumNodes: 3, NumZones: 3, SchedulerPolicy: scheduler.MAXFILLUP, SchedPolicy: &scheduler.SchedulerPolicy{}, DeschedPolicy: &scheduler.SchedulerPolicy{}, StatefulSetName: sfsName,
NodeToZoneMap: map[string]string{"node-0": "zone-0", "node-1": "zone-1", "node-2": "zone-2"},
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-2": 5,
},
{Name: vpodName + "-1", Namespace: vpodNs + "-1"}: {
"statefulset-name-1": 2,
},
{Name: vpodName + "-2", Namespace: vpodNs + "-2"}: {
"statefulset-name-1": 3,
},
},
NodeSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"node-2": 5,
},
{Name: vpodName + "-1", Namespace: vpodNs + "-1"}: {
"node-1": 2,
},
{Name: vpodName + "-2", Namespace: vpodNs + "-2"}: {
"node-1": 3,
},
},
ZoneSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"zone-2": 5,
},
{Name: vpodName + "-1", Namespace: vpodNs + "-1"}: {
"zone-1": 2,
},
{Name: vpodName + "-2", Namespace: vpodNs + "-2"}: {
"zone-1": 3,
},
},
},
freec: int32(10),
schedulerPolicyType: scheduler.MAXFILLUP,
nodes: []*v1.Node{tscheduler.MakeNode("node-0", "zone-0"), tscheduler.MakeNode("node-1", "zone-1"), tscheduler.MakeNode("node-2", "zone-2")},
},
{
name: "many vpods, with gaps",
replicas: int32(4),
Expand Down Expand Up @@ -461,6 +511,10 @@ func TestStateBuilder(t *testing.T) {
nodelist := make([]runtime.Object, 0, len(tc.nodes))
podlist := make([]runtime.Object, 0, tc.replicas)

if tc.pendingReplicas > tc.replicas {
t.Fatalf("Inconsistent test configuration pending replicas %d greater than replicas %d", tc.pendingReplicas, tc.replicas)
}

for i, placements := range tc.vpods {
vpodName := fmt.Sprint(vpodName+"-", i)
vpodNamespace := fmt.Sprint(vpodNs+"-", i)
Expand All @@ -485,10 +539,17 @@ func TestStateBuilder(t *testing.T) {
nodelist = append(nodelist, node)
}

for i := int32(0); i < tc.replicas; i++ {
nodeName := "node-" + fmt.Sprint(i)
podName := sfsName + "-" + fmt.Sprint(i)
pod, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, tscheduler.MakePod(testNs, podName, nodeName), metav1.CreateOptions{})
for i := tc.replicas - 1; i >= 0; i-- {
var pod *v1.Pod
var err error
if i < tc.pendingReplicas {
podName := sfsName + "-" + fmt.Sprint(i)
pod, err = kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, tscheduler.MakePod(testNs, podName, ""), metav1.CreateOptions{})
} else {
nodeName := "node-" + fmt.Sprint(i)
podName := sfsName + "-" + fmt.Sprint(i)
pod, err = kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, tscheduler.MakePod(testNs, podName, nodeName), metav1.CreateOptions{})
}
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down
65 changes: 40 additions & 25 deletions pkg/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ import (
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"

podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/eventing/pkg/scheduler"
"knative.dev/eventing/pkg/scheduler/factory"
st "knative.dev/eventing/pkg/scheduler/state"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"

_ "knative.dev/eventing/pkg/scheduler/plugins/core/availabilitynodepriority"
_ "knative.dev/eventing/pkg/scheduler/plugins/core/availabilityzonepriority"
Expand Down Expand Up @@ -81,15 +82,16 @@ func NewScheduler(ctx context.Context,

// StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods
type StatefulSetScheduler struct {
ctx context.Context
logger *zap.SugaredLogger
statefulSetName string
statefulSetClient clientappsv1.StatefulSetInterface
podLister corev1listers.PodNamespaceLister
vpodLister scheduler.VPodLister
lock sync.Locker
stateAccessor st.StateAccessor
autoscaler Autoscaler
ctx context.Context
logger *zap.SugaredLogger
statefulSetName string
statefulSetNamespace string
statefulSetClient clientappsv1.StatefulSetInterface
podLister corev1listers.PodNamespaceLister
vpodLister scheduler.VPodLister
lock sync.Locker
stateAccessor st.StateAccessor
autoscaler Autoscaler

// replicas is the (cached) number of statefulset replicas.
replicas int32
Expand All @@ -111,17 +113,18 @@ func NewStatefulSetScheduler(ctx context.Context,
autoscaler Autoscaler, podlister corev1listers.PodNamespaceLister) scheduler.Scheduler {

scheduler := &StatefulSetScheduler{
ctx: ctx,
logger: logging.FromContext(ctx),
statefulSetName: name,
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace),
podLister: podlister,
vpodLister: lister,
pending: make(map[types.NamespacedName]int32),
lock: new(sync.Mutex),
stateAccessor: stateAccessor,
reserved: make(map[types.NamespacedName]map[string]int32),
autoscaler: autoscaler,
ctx: ctx,
logger: logging.FromContext(ctx),
statefulSetNamespace: namespace,
statefulSetName: name,
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace),
podLister: podlister,
vpodLister: lister,
pending: make(map[types.NamespacedName]int32),
lock: new(sync.Mutex),
stateAccessor: stateAccessor,
reserved: make(map[types.NamespacedName]map[string]int32),
autoscaler: autoscaler,
}

// Monitor our statefulset
Expand Down Expand Up @@ -244,12 +247,12 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1

if state.SchedPolicy != nil {
logger.Info("reverting to previous placements")
s.reservePlacements(vpod, existingPlacements) //rebalancing doesn't care about new placements since all vreps will be re-placed
delete(s.pending, vpod.GetKey()) //rebalancing doesn't care about pending since all vreps will be re-placed
return existingPlacements, controller.NewRequeueAfter(5 * time.Second) //requeue to wait for the autoscaler to do its job
s.reservePlacements(vpod, existingPlacements) // rebalancing doesn't care about new placements since all vreps will be re-placed
delete(s.pending, vpod.GetKey()) // rebalancing doesn't care about pending since all vreps will be re-placed
return existingPlacements, s.notEnoughPodReplicas(left) // requeue to wait for the autoscaler to do its job
}

return placements, controller.NewRequeueAfter(5 * time.Second) //requeue to wait for the autoscaler to do its job
return placements, s.notEnoughPodReplicas(left)
}

logger.Infow("scheduling successful", zap.Any("placement", placements))
Expand Down Expand Up @@ -707,3 +710,15 @@ func (s *StatefulSetScheduler) makeZeroPlacements(vpod scheduler.VPod, placement
// free capacity when there are existing placements for a vpod
s.reservePlacements(vpod, newPlacements)
}

// newNotEnoughPodReplicas returns an error explaining what is the problem, what are the actions we're taking
// to try to fix it (retry), wrapping a controller.requeueKeyError which signals to ReconcileKind to requeue the
// object after a given delay.
func (s *StatefulSetScheduler) notEnoughPodReplicas(left int32) error {
// Wrap controller.requeueKeyError error to wait for the autoscaler to do its job.
return fmt.Errorf("insufficient running pods replicas for StatefulSet %s/%s to schedule resource replicas (left: %d): retry %w",
s.statefulSetNamespace, s.statefulSetName,
left,
controller.NewRequeueAfter(5*time.Second),
)
}

0 comments on commit 23a7b56

Please sign in to comment.