Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into Create-EventPolicie…
Browse files Browse the repository at this point in the history
…s-for-Sequence
  • Loading branch information
Leo6Leo committed Aug 1, 2024
2 parents 56b071b + ff37e4e commit dce2679
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
28 changes: 24 additions & 4 deletions pkg/reconciler/sequence/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@ import (
"context"

"k8s.io/client-go/tools/cache"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/duck"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"

"knative.dev/pkg/injection/clients/dynamicclient"

flowsv1 "knative.dev/eventing/pkg/apis/flows/v1"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
Expand All @@ -33,7 +38,6 @@ import (
"knative.dev/eventing/pkg/client/injection/informers/flows/v1/sequence"
"knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"
sequencereconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/sequence"
"knative.dev/pkg/injection/clients/dynamicclient"
)

// NewController initializes the controller and is called by the generated code
Expand All @@ -47,14 +51,30 @@ func NewController(
subscriptionInformer := subscription.Get(ctx)
eventPolicyInformer := eventpolicy.Get(ctx)

var globalResync func()
store := feature.NewStore(logging.FromContext(ctx), func(name string, value interface{}) {
if globalResync != nil {
globalResync()
}
})
store.WatchConfigs(cmw)

r := &Reconciler{
sequenceLister: sequenceInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
dynamicClientSet: dynamicclient.Get(ctx),
eventingClientSet: eventingclient.Get(ctx),
eventPolicyLister: eventPolicyInformer.Lister(),
}
impl := sequencereconciler.NewImpl(ctx, r)
impl := sequencereconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: store,
}
})

globalResync = func() {
impl.GlobalResync(sequenceInformer.Informer())
}

r.channelableTracker = duck.NewListableTrackerFromTracker(ctx, channelable.Get, impl.Tracker)
sequenceInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
Expand Down
4 changes: 3 additions & 1 deletion pkg/reconciler/sequence/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"

"knative.dev/pkg/kmp"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
Expand All @@ -48,7 +50,6 @@ import (
listers "knative.dev/eventing/pkg/client/listers/flows/v1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1"
"knative.dev/eventing/pkg/duck"
"knative.dev/pkg/kmp"

"knative.dev/eventing/pkg/reconciler/sequence/resources"
)
Expand Down Expand Up @@ -224,6 +225,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1.
}
return newSub, nil
} else if equal, err := kmp.SafeEqual(sub.Spec, expected.Spec); !equal || err != nil {
expected.ResourceVersion = sub.ResourceVersion
// only the mutable fields were changed, so we can update the subscription
updatedSub, err := r.eventingClientSet.MessagingV1().Subscriptions(sub.Namespace).Update(ctx, expected, metav1.UpdateOptions{})
if err != nil {
Expand Down

0 comments on commit dce2679

Please sign in to comment.