Skip to content

Commit

Permalink
[release-1.12] Fix Sequence TLS support (#7603)
Browse files Browse the repository at this point in the history
* Add TLS test for sequence (#7600)

* Add TLS test for sequence

* Run goimports

* Fix TLS support in sequence

* Fix intend for CACerts in sequence template provisioning
  • Loading branch information
creydr authored Jan 22, 2024
1 parent 16927cb commit 35d5618
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 4 deletions.
3 changes: 3 additions & 0 deletions config/core/resources/sequence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ spec:
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
type: string
CACerts:
type: string
description: Certification Authority (CA) certificates in PEM format that the source trusts when sending events to the reply.
steps:
description: Steps is the list of Destinations (processors / functions) that will be called in the order provided. Each step has its own delivery options
type: array
Expand Down
10 changes: 6 additions & 4 deletions pkg/reconciler/sequence/resources/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ func NewSubscription(stepNumber int, s *v1.Sequence) *messagingv1.Subscription {
Name: SequenceChannelName(s.Name, stepNumber),
},
Subscriber: &duckv1.Destination{
Ref: s.Spec.Steps[stepNumber].Destination.Ref,
URI: s.Spec.Steps[stepNumber].Destination.URI,
Ref: s.Spec.Steps[stepNumber].Destination.Ref,
URI: s.Spec.Steps[stepNumber].Destination.URI,
CACerts: s.Spec.Steps[stepNumber].Destination.CACerts,
},
Delivery: s.Spec.Steps[stepNumber].Delivery,
},
Expand All @@ -71,8 +72,9 @@ func NewSubscription(stepNumber int, s *v1.Sequence) *messagingv1.Subscription {
}
} else if s.Spec.Reply != nil {
r.Spec.Reply = &duckv1.Destination{
Ref: s.Spec.Reply.Ref,
URI: s.Spec.Reply.URI,
Ref: s.Spec.Reply.Ref,
URI: s.Spec.Reply.URI,
CACerts: s.Spec.Reply.CACerts,
}
}
return r
Expand Down
75 changes: 75 additions & 0 deletions test/rekt/features/sequence/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

"github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
"knative.dev/eventing/test/rekt/features/featureflags"
"knative.dev/eventing/test/rekt/resources/addressable"
duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing/test/rekt/resources/channel_template"
Expand Down Expand Up @@ -102,3 +104,76 @@ func SequenceTest(channelTemplate channel_template.ChannelTemplate) *feature.Fea

return f
}

func SequenceTestTLS(channelTemplate channel_template.ChannelTemplate) *feature.Feature {
f := feature.NewFeatureNamed("Sequence test.")

f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict())
f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled())

sequenceName := feature.MakeRandomK8sName("sequence")
source := feature.MakeRandomK8sName("source")
sink := feature.MakeRandomK8sName("sink")
// Sequence's steps
step1 := feature.MakeRandomK8sName("step1")
step2 := feature.MakeRandomK8sName("step2")
step3 := feature.MakeRandomK8sName("step3")
msgAppender1 := "-step1"
msgAppender2 := "-step2"
msgAppender3 := "-step3"

f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiverTLS))
// Construct steps with appended data
f.Setup("install subscriber1", eventshub.Install(step1, eventshub.ReplyWithAppendedData(msgAppender1), eventshub.StartReceiverTLS))
f.Setup("install subscriber2", eventshub.Install(step2, eventshub.ReplyWithAppendedData(msgAppender2), eventshub.StartReceiverTLS))
f.Setup("install subscriber", eventshub.Install(step3, eventshub.ReplyWithAppendedData(msgAppender3), eventshub.StartReceiverTLS))

// Install a Sequence with three steps
f.Setup("install Sequence", func(ctx context.Context, t feature.T) {

cfg := []manifest.CfgFn{
sequence.WithChannelTemplate(channelTemplate),

sequence.WithStepFromDestination(&duckv1.Destination{
Ref: service.AsKReference(step1),
CACerts: eventshub.GetCaCerts(ctx),
}),
sequence.WithStepFromDestination(&duckv1.Destination{
Ref: service.AsKReference(step2),
CACerts: eventshub.GetCaCerts(ctx),
}),
sequence.WithStepFromDestination(&duckv1.Destination{
Ref: service.AsKReference(step3),
CACerts: eventshub.GetCaCerts(ctx),
}),

sequence.WithReplyFromDestination(&duckv1.Destination{
Ref: service.AsKReference(sink),
CACerts: eventshub.GetCaCerts(ctx),
}),
}

sequence.Install(sequenceName, cfg...)(ctx, t)
})
f.Setup("Sequence goes ready", sequence.IsReady(sequenceName))
f.Setup("Sequence has HTTPS address", sequence.ValidateAddress(sequenceName, addressable.AssertHTTPSAddress))

event := test.FullEvent()
event.SetData("text/plain", "hello")
f.Requirement("install source", eventshub.Install(source,
eventshub.StartSenderToResourceTLS(sequence.GVR(), sequenceName, nil),
eventshub.InputEvent(event)))

expectedMsg := string(event.Data())
expectedMsg += msgAppender1
expectedMsg += msgAppender2
expectedMsg += msgAppender3

f.Stable("Sequence with steps and reply via HTTPS").
Must("Delivers events correctly to reply",
assert.OnStore(sink).MatchEvent(
test.HasData([]byte(expectedMsg)),
).AtLeast(1))

return f
}
76 changes: 76 additions & 0 deletions test/rekt/resources/sequence/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sequence
import (
"context"
"embed"
"strings"
"time"

"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -112,6 +113,44 @@ func WithStep(ref *duckv1.KReference, uri string) manifest.CfgFn {
}
}

func WithStepFromDestination(dest *duckv1.Destination) manifest.CfgFn {
return func(cfg map[string]interface{}) {
if _, set := cfg["steps"]; !set {
cfg["steps"] = []map[string]interface{}{}
}

step := map[string]interface{}{}

uri := dest.URI
ref := dest.Ref

if dest.CACerts != nil {
// This is a multi-line string and should be indented accordingly.
// Replace "new line" with "new line + spaces".
step["CACerts"] = strings.ReplaceAll(*dest.CACerts, "\n", "\n ")
}

if uri != nil {
step["uri"] = uri.String()
}
if ref != nil {
if _, set := step["ref"]; !set {
step["ref"] = map[string]interface{}{}
}
sref := step["ref"].(map[string]interface{})
sref["apiVersion"] = ref.APIVersion
sref["kind"] = ref.Kind
sref["namespace"] = ref.Namespace
sref["name"] = ref.Name
}

steps := cfg["steps"].([]map[string]interface{})
steps = append(steps, step)

cfg["steps"] = steps
}
}

// WithReply adds the top level reply config to a Parallel spec.
func WithReply(ref *duckv1.KReference, uri string) manifest.CfgFn {
return func(cfg map[string]interface{}) {
Expand All @@ -136,6 +175,38 @@ func WithReply(ref *duckv1.KReference, uri string) manifest.CfgFn {
}
}

func WithReplyFromDestination(dest *duckv1.Destination) manifest.CfgFn {
return func(cfg map[string]interface{}) {
if _, set := cfg["reply"]; !set {
cfg["reply"] = map[string]interface{}{}
}
reply := cfg["reply"].(map[string]interface{})

uri := dest.URI
ref := dest.Ref

if dest.CACerts != nil {
// This is a multi-line string and should be indented accordingly.
// Replace "new line" with "new line + spaces".
reply["CACerts"] = strings.ReplaceAll(*dest.CACerts, "\n", "\n ")
}

if uri != nil {
reply["uri"] = uri.String()
}
if ref != nil {
if _, set := reply["ref"]; !set {
reply["ref"] = map[string]interface{}{}
}
rref := reply["ref"].(map[string]interface{})
rref["apiVersion"] = ref.APIVersion
rref["kind"] = ref.Kind
rref["namespace"] = ref.Namespace
rref["name"] = ref.Name
}
}
}

// WithChannelTemplate adds the top level channel references.
func WithChannelTemplate(template channel_template.ChannelTemplate) manifest.CfgFn {
return func(cfg map[string]interface{}) {
Expand All @@ -150,3 +221,8 @@ func WithChannelTemplate(template channel_template.ChannelTemplate) manifest.Cfg
channelTemplate["spec"] = template.Spec
}
}

// ValidateAddress validates the address retured by Address
func ValidateAddress(name string, validate addressable.ValidateAddressFn, timings ...time.Duration) feature.StepFn {
return addressable.ValidateAddress(GVR(), name, validate, timings...)
}
8 changes: 8 additions & 0 deletions test/rekt/resources/sequence/sequence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ spec:
{{ if .uri }}
uri: {{ .uri }}
{{ end }}
{{ if .CACerts }}
CACerts: |-
{{ .CACerts }}
{{ end }}
{{ end }}
{{ if .reply }}
reply:
Expand All @@ -64,4 +68,8 @@ spec:
{{ if .reply.uri }}
uri: {{ .reply.uri }}
{{ end }}
{{ if .reply.CACerts }}
CACerts: |-
{{ .reply.CACerts }}
{{ end }}
{{ end }}
16 changes: 16 additions & 0 deletions test/rekt/sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"knative.dev/eventing/test/rekt/resources/channel_template"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"
)
Expand All @@ -44,3 +45,18 @@ func TestSequence(t *testing.T) {

env.Test(ctx, t, sequence.SequenceTest(channel_template.ImmemoryChannelTemplate()))
}

func TestSequenceTLS(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
eventshub.WithTLS(t),
)

env.Test(ctx, t, sequence.SequenceTestTLS(channel_template.ImmemoryChannelTemplate()))
}

0 comments on commit 35d5618

Please sign in to comment.