Skip to content

Commit

Permalink
squash! Check that a promotion isn't invoked repeatedly
Browse files Browse the repository at this point in the history
 - use helpers to start and complete promotions
 - use a predicate to prevent queueing updates unless the spec chnaged

On the second one: there's a race between the cache getting an object
with the promotion status, and the reconciler processing the object
again. It's possible for the reconciler, having just triggered a
promotion, to run again and see a version of the pipeline object that
doesn't have the promotion recorded -- and that will mean attempting
the promotion again.

In testing, this happens almost always, because patches will cause the
object to be requeued before it's finished reconciling, and it'll run
again as soon as its exited. I would expect it to also be common in
normal operation; and the outcome is that a promotion is re-run, which
is not great!

To mitigate the race, I've added a predicate to the watch which will
drop updates that didn't change the spec. This means that the patches
earlier in Reconcile won't cause the object to be requeued, and it is
much, much less likely that it'll run again straight away. It's still
possible -- maybe the spec changed while it was running.

Other, more involved ways to avoid it:

 - use conflict detection on write to bail before running a promotion,
   if the observed pipeline has changed

 - create Promotion objects and guard promotions by trying to create
   (another form of detecting conflict on write)
  • Loading branch information
squaremo committed Nov 23, 2023
1 parent 4f6b290 commit 5f0b4de
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 52 deletions.
27 changes: 19 additions & 8 deletions controllers/leveltriggered/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/weaveworks/pipeline-controller/api/v1alpha1"
Expand Down Expand Up @@ -240,9 +242,10 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

removePendingCondition(&pipeline)
if err := patcher.Patch(ctx, &pipeline, withFieldOwner); err != nil {
return ctrl.Result{}, fmt.Errorf("error removing pending condition: %w", err)
if removePendingCondition(&pipeline) {
if err := patcher.Patch(ctx, &pipeline, withFieldOwner); err != nil {
return ctrl.Result{}, fmt.Errorf("error removing pending condition: %w", err)
}
}

for _, env := range pipeline.Spec.Environments[1:] {
Expand All @@ -256,11 +259,15 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
continue
}

if checkAnyTargetHasRevision(envStatus, latestRevision) {
return ctrl.Result{}, nil
// otherwise: if there's a promotion recorded, we can stop here.
if envStatus.Promotion != nil && envStatus.Promotion.Revision == latestRevision {
logger.Info("promotion already recorded", "env", env.Name, "revision", latestRevision)
break
}

// other-otherwise: attempt a promotion
promoteErr := r.promoteLatestRevision(ctx, pipeline, env, latestRevision)
logger.Info("promoting env", "env", env.Name, "revision", latestRevision)
err := setPromotionStatus(&pipeline, env.Name, latestRevision, promoteErr)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error recording promotion status: %w", err)
Expand Down Expand Up @@ -299,8 +306,12 @@ func setPendingCondition(pipeline *v1alpha1.Pipeline, reason, message string) {
apimeta.SetStatusCondition(&pipeline.Status.Conditions, condition)
}

func removePendingCondition(pipeline *v1alpha1.Pipeline) {
apimeta.RemoveStatusCondition(&pipeline.Status.Conditions, conditions.PromotionPendingCondition)
func removePendingCondition(pipeline *v1alpha1.Pipeline) bool {
ok := apimeta.FindStatusCondition(pipeline.Status.Conditions, conditions.PromotionPendingCondition) != nil
if ok {
apimeta.RemoveStatusCondition(&pipeline.Status.Conditions, conditions.PromotionPendingCondition)
}
return ok
}

func (r *PipelineReconciler) promoteLatestRevision(ctx context.Context, pipeline v1alpha1.Pipeline, env v1alpha1.Environment, revision string) error {
Expand Down Expand Up @@ -469,7 +480,7 @@ func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.Pipeline{}).
For(&v1alpha1.Pipeline{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(
&clusterctrlv1alpha1.GitopsCluster{},
handler.EnqueueRequestsFromMapFunc(r.requestsForCluster(gitopsClusterIndexKey)),
Expand Down
60 changes: 16 additions & 44 deletions controllers/leveltriggered/promote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ func TestPromotionAlgorithm(t *testing.T) {
startPromotion(prom.Environment.Name, prom.Version)
})

checkAndCompletePromotion := func(g Gomega, env, version string) {
g.Eventually(func() bool {
return isPromotionStarted(env, version)
}).Should(BeTrue())
completePromotion(env, version)
}

t.Run("promotes revision to all environments", func(t *testing.T) {
g := testingutils.NewGomegaWithT(t)
name := "pipeline-" + rand.String(5)
Expand Down Expand Up @@ -141,15 +148,8 @@ func TestPromotionAlgorithm(t *testing.T) {

// Bumping dev revision to trigger the promotion
setAppRevisionAndReadyStatus(ctx, g, devApp, versionToPromote)
g.Eventually(func() bool {
return isPromotionStarted("staging", versionToPromote)
}).Should(BeTrue())
completePromotion("staging", versionToPromote)

g.Eventually(func() bool {
return isPromotionStarted("prod", versionToPromote)
}).Should(BeTrue())
completePromotion("prod", versionToPromote)
checkAndCompletePromotion(g, "staging", versionToPromote)
checkAndCompletePromotion(g, "prod", versionToPromote)

// checks if the revision of all target status is v1.0.1
var p *v1alpha1.Pipeline
Expand All @@ -174,55 +174,27 @@ func TestPromotionAlgorithm(t *testing.T) {
checkPromotionSuccess(g, p, versionToPromote, "prod")

t.Run("triggers another promotion if the app is updated again", func(t *testing.T) {
const newVersion = "v1.0.2"
g := testingutils.NewGomegaWithT(t)
// Bumping dev revision to trigger the promotion
setAppRevisionAndReadyStatus(ctx, g, devApp, "v1.0.2")

// checks if the revision of all target status is v1.0.2
g.Eventually(func() bool {
p := getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline))

for _, env := range p.Spec.Environments {
if !checkAllTargetsRunRevision(p.Status.Environments[env.Name], "v1.0.2") {
return false
}
}

return true
}, "5s", "0.2s").Should(BeTrue())

p := getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline))
checkPromotionSuccess(g, p, "v1.0.2", "prod")
})

t.Run("does not trigger another promotion if an app is updated _without_ changing the version", func(t *testing.T) {
// This deliberately provokes the controller to reconsider
// a promotion. There is a race between a promotion being
// triggered, and the pipeline seeing the result of that
// promotion. For example, a PR can be created, but not
// merged yet (so no action has been taken in the cluster)
// -- we don't want the controller to create another PR if
// it happens to process the pipeline again.

g := testingutils.NewGomegaWithT(t)
// Bumping dev revision to trigger the promotion
setAppRevisionAndReadyStatus(ctx, g, devApp, "v1.0.2")
setAppRevisionAndReadyStatus(ctx, g, devApp, newVersion)
checkAndCompletePromotion(g, "staging", newVersion)
checkAndCompletePromotion(g, "prod", newVersion)

// checks if the revision of all target status is v1.0.2
// checks if the revision of all target status is the new version
g.Eventually(func() bool {
p := getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline))

for _, env := range p.Spec.Environments {
if !checkAllTargetsRunRevision(p.Status.Environments[env.Name], "v1.0.2") {
if !checkAllTargetsRunRevision(p.Status.Environments[env.Name], newVersion) {
return false
}
}

return true
}, "5s", "0.2s").Should(BeTrue())

p := getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline))
checkPromotionSuccess(g, p, "v1.0.2", "prod")
checkPromotionSuccess(g, p, newVersion, "prod")
})
})

Expand Down

0 comments on commit 5f0b4de

Please sign in to comment.