Skip to content

Commit

Permalink
[release-1.11] Eventing TLS: Add E2E TLS test for Parallel (#7509)
Browse files Browse the repository at this point in the history
* Add TLS test for Parallel

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Fix parallel reconciler to create resources with CA Certs set

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Fix use filter2 in text

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Remove duplicate func

Signed-off-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
knative-prow-robot and pierDipi authored Jan 2, 2024
1 parent e50ad59 commit 8b1ec7d
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 130 deletions.
24 changes: 7 additions & 17 deletions pkg/reconciler/parallel/resources/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"knative.dev/pkg/kmeta"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

v1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
)
Expand Down Expand Up @@ -69,10 +70,8 @@ func NewFilterSubscription(branchNumber int, p *v1.Parallel) *messagingv1.Subscr
},
}
} else {
r.Spec.Subscriber = &duckv1.Destination{
Ref: p.Spec.Branches[branchNumber].Filter.Ref,
URI: p.Spec.Branches[branchNumber].Filter.URI,
}
r.Spec.Subscriber = p.Spec.Branches[branchNumber].Filter.DeepCopy()

r.Spec.Reply = &duckv1.Destination{
Ref: &duckv1.KReference{
APIVersion: p.Spec.ChannelTemplate.APIVersion,
Expand Down Expand Up @@ -105,24 +104,15 @@ func NewSubscription(branchNumber int, p *v1.Parallel) *messagingv1.Subscription
Kind: p.Spec.ChannelTemplate.Kind,
Name: ParallelBranchChannelName(p.Name, branchNumber),
},
Subscriber: &duckv1.Destination{
Ref: p.Spec.Branches[branchNumber].Subscriber.Ref,
URI: p.Spec.Branches[branchNumber].Subscriber.URI,
},
Delivery: p.Spec.Branches[branchNumber].Delivery,
Subscriber: p.Spec.Branches[branchNumber].Subscriber.DeepCopy(),
Delivery: p.Spec.Branches[branchNumber].Delivery.DeepCopy(),
},
}

if p.Spec.Branches[branchNumber].Reply != nil {
r.Spec.Reply = &duckv1.Destination{
Ref: p.Spec.Branches[branchNumber].Reply.Ref,
URI: p.Spec.Branches[branchNumber].Reply.URI,
}
r.Spec.Reply = p.Spec.Branches[branchNumber].Reply.DeepCopy()
} else if p.Spec.Reply != nil {
r.Spec.Reply = &duckv1.Destination{
Ref: p.Spec.Reply.Ref,
URI: p.Spec.Reply.URI,
}
r.Spec.Reply = p.Spec.Reply.DeepCopy()
}
return r
}
100 changes: 93 additions & 7 deletions test/rekt/features/parallel/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ limitations under the License.
package parallel

import (
"context"
"strconv"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/test"
duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing/test/rekt/features/featureflags"
"knative.dev/eventing/test/rekt/resources/addressable"
"knative.dev/eventing/test/rekt/resources/channel_template"
"knative.dev/eventing/test/rekt/resources/parallel"

Expand Down Expand Up @@ -50,7 +54,7 @@ func ParallelWithTwoBranches(channelTemplate channel_template.ChannelTemplate) *
event.SetData(cloudevents.ApplicationJSON, []byte(eventBody))

cfg := []manifest.CfgFn{
parallel.WithReply(service.AsKReference(sink), ""),
parallel.WithReply(service.AsDestinationRef(sink)),
parallel.WithChannelTemplate(channelTemplate),
}

Expand All @@ -70,12 +74,12 @@ func ParallelWithTwoBranches(channelTemplate channel_template.ChannelTemplate) *
f.Setup("install filter1", eventshub.Install(filter1, eventshub.ReplyWithTransformedEvent(event.Type(), eventSource, eventBody), eventshub.StartReceiver))
f.Setup("install filter2", eventshub.Install(filter2, eventshub.StartReceiver))
cfg = append(cfg,
parallel.WithSubscriberAt(branch1Num, service.AsKReference(subscriber1), ""),
parallel.WithSubscriberAt(branch2Num, service.AsKReference(subscriber2), ""),
parallel.WithFilterAt(branch1Num, service.AsKReference(filter1), ""),
parallel.WithFilterAt(branch2Num, service.AsKReference(filter2), ""),
parallel.WithReplyAt(branch1Num, nil, ""),
parallel.WithReplyAt(branch2Num, nil, ""),
parallel.WithSubscriberAt(branch1Num, service.AsDestinationRef(subscriber1)),
parallel.WithSubscriberAt(branch2Num, service.AsDestinationRef(subscriber2)),
parallel.WithFilterAt(branch1Num, service.AsDestinationRef(filter1)),
parallel.WithFilterAt(branch2Num, service.AsDestinationRef(filter2)),
parallel.WithReplyAt(branch1Num, nil),
parallel.WithReplyAt(branch2Num, nil),
)
// Install a Parallel with two branches
f.Setup("install Parallel", parallel.Install(parallelName, cfg...))
Expand All @@ -95,3 +99,85 @@ func ParallelWithTwoBranches(channelTemplate channel_template.ChannelTemplate) *

return f
}

func ParallelWithTwoBranchesTLS(channelTemplate channel_template.ChannelTemplate) *feature.Feature {
f := feature.NewFeatureNamed("Parallel test.")

f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict())
f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled())

parallelName := feature.MakeRandomK8sName("parallel1")
source := feature.MakeRandomK8sName("source1")
sink := feature.MakeRandomK8sName("sink1")

eventBody := `{"msg":"test msg"}`
event := test.FullEvent()
_ = event.SetData(cloudevents.ApplicationJSON, []byte(eventBody))

cfg := []manifest.CfgFn{
parallel.WithChannelTemplate(channelTemplate),
}

// Construct two branches
branch1Num := 0
branch2Num := 1
subscriber1 := feature.MakeRandomK8sName("subscriber" + strconv.Itoa(branch1Num))
subscriber2 := feature.MakeRandomK8sName("subscriber" + strconv.Itoa(branch2Num))
filter1 := feature.MakeRandomK8sName("filter" + strconv.Itoa(branch1Num))
filter2 := feature.MakeRandomK8sName("filter" + strconv.Itoa(branch2Num))

f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiverTLS))

f.Setup("install subscriber1", eventshub.Install(subscriber1, eventshub.ReplyWithAppendedData("appended data 1"), eventshub.StartReceiverTLS))
f.Setup("install subscriber2", eventshub.Install(subscriber2, eventshub.ReplyWithAppendedData("appended data 2"), eventshub.StartReceiverTLS))

f.Setup("install filter1", eventshub.Install(filter1, eventshub.ReplyWithTransformedEvent(event.Type(), event.Source(), string(event.Data())), eventshub.StartReceiverTLS))
f.Setup("install filter2", eventshub.Install(filter2, eventshub.ReplyWithTransformedEvent(event.Type(), event.Source(), string(event.Data())), eventshub.StartReceiverTLS))

// Install a Parallel with two branches
f.Setup("install Parallel", func(ctx context.Context, t feature.T) {
cfg = append(cfg,
parallel.WithReply(asDestinationRefCtx(ctx, sink)),
parallel.WithSubscriberAt(branch1Num, asDestinationRefCtx(ctx, subscriber1)),
parallel.WithSubscriberAt(branch2Num, asDestinationRefCtx(ctx, subscriber2)),
parallel.WithFilterAt(branch1Num, asDestinationRefCtx(ctx, filter1)),
parallel.WithFilterAt(branch2Num, asDestinationRefCtx(ctx, filter2)),
parallel.WithReplyAt(branch1Num, nil),
parallel.WithReplyAt(branch2Num, nil),
)

parallel.Install(parallelName, cfg...)(ctx, t)
})
f.Setup("Parallel goes ready", parallel.IsReady(parallelName))
f.Setup("Parallel is addressable", parallel.IsAddressable(parallelName))
f.Setup("Parallel has HTTPS address", parallel.ValidateAddress(parallelName, addressable.AssertHTTPSAddress))

f.Requirement("install source", eventshub.Install(
source,
eventshub.StartSenderToResourceTLS(parallel.GVR(), parallelName, nil),
eventshub.InputEvent(event),
))

f.Stable("test Parallel with two TLS branches, both passed").
Must("deliver event to subscriber1", eventasssert.OnStore(subscriber1).MatchEvent(test.HasId(event.ID())).AtLeast(1)).
Must("deliver event to subscriber2", eventasssert.OnStore(subscriber2).MatchEvent(test.HasId(event.ID())).AtLeast(1)).
Must("deliver event to filter1", eventasssert.OnStore(filter1).MatchEvent(test.HasId(event.ID())).AtLeast(1)).
Must("deliver event to filter2", eventasssert.OnStore(filter2).MatchEvent(test.HasId(event.ID())).AtLeast(1)).
Must("deliver event from subscriber 1 to reply", eventasssert.OnStore(sink).
MatchEvent(test.HasId(event.ID()), test.HasData([]byte("appended data 1"))).
AtLeast(1),
).
Must("deliver event from subscriber 2 to reply", eventasssert.OnStore(sink).
MatchEvent(test.HasId(event.ID()), test.HasData([]byte("appended data 2"))).
AtLeast(1),
)

return f
}

func asDestinationRefCtx(ctx context.Context, name string) *duckv1.Destination {
caCerts := eventshub.GetCaCerts(ctx)
d := service.AsDestinationRef(name)
d.CACerts = caCerts
return d
}
10 changes: 5 additions & 5 deletions test/rekt/features/parallel/readyness.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,27 @@ func GoesReady(name string, cfg ...manifest.CfgFn) *feature.Feature {
reply := feature.MakeRandomK8sName("reply")
f.Setup("install a reply service", service.Install(reply,
service.WithSelectors(map[string]string{"app": "rekt"})))
cfg = append(cfg, parallel.WithReply(service.AsKReference(reply), ""))
cfg = append(cfg, parallel.WithReply(service.AsDestinationRef(reply)))
}

for i := 0; i < 3; i++ {
// Filter
filter := feature.MakeRandomK8sName("subscriber" + strconv.Itoa(i))
f.Setup("install filter "+strconv.Itoa(i), service.Install(filter,
service.WithSelectors(map[string]string{"app": "rekt"})))
cfg = append(cfg, parallel.WithFilterAt(i, service.AsKReference(filter), ""))
cfg = append(cfg, parallel.WithFilterAt(i, service.AsDestinationRef(filter)))

// Subscriber
subscriber := feature.MakeRandomK8sName("subscriber" + strconv.Itoa(i))
f.Setup("install subscriber "+strconv.Itoa(i), service.Install(subscriber,
service.WithSelectors(map[string]string{"app": "rekt"})))
cfg = append(cfg, parallel.WithSubscriberAt(i, service.AsKReference(subscriber), ""))
cfg = append(cfg, parallel.WithSubscriberAt(i, service.AsDestinationRef(subscriber)))

// Reply
reply := feature.MakeRandomK8sName("reply" + strconv.Itoa(i))
f.Setup("install reply "+strconv.Itoa(i), service.Install(reply,
service.WithSelectors(map[string]string{"app": "rekt"})))
cfg = append(cfg, parallel.WithReplyAt(i, service.AsKReference(reply), ""))
cfg = append(cfg, parallel.WithReplyAt(i, service.AsDestinationRef(reply)))
}

f.Setup("install a Parallel", parallel.Install(name, cfg...))
Expand All @@ -74,7 +74,7 @@ func GoesReadyWithoutFilters(name string, cfg ...manifest.CfgFn) *feature.Featur
subscriber := feature.MakeRandomK8sName("subscriber")
f.Setup("install subscriber", service.Install(subscriber,
service.WithSelectors(map[string]string{"app": "rekt"})))
cfg = append(cfg, parallel.WithSubscriberAt(0, service.AsKReference(subscriber), ""))
cfg = append(cfg, parallel.WithSubscriberAt(0, service.AsDestinationRef(subscriber)))

f.Setup("install a Parallel", parallel.Install(name, cfg...))

Expand Down
22 changes: 20 additions & 2 deletions test/rekt/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ package rekt
import (
"testing"

"knative.dev/eventing/test/rekt/features/parallel"
"knative.dev/eventing/test/rekt/resources/channel_template"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"

"knative.dev/eventing/test/rekt/features/parallel"
"knative.dev/eventing/test/rekt/resources/channel_template"
)

func TestParallel(t *testing.T) {
Expand All @@ -44,3 +46,19 @@ func TestParallel(t *testing.T) {

env.Test(ctx, t, parallel.ParallelWithTwoBranches(channel_template.ImmemoryChannelTemplate()))
}

func TestParallelTLS(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
eventshub.WithTLS(t),
)
t.Cleanup(env.Finish)

env.Test(ctx, t, parallel.ParallelWithTwoBranchesTLS(channel_template.ImmemoryChannelTemplate()))
}
Loading

0 comments on commit 8b1ec7d

Please sign in to comment.