Skip to content

Commit

Permalink
[release-1.12] Support for seconds field in PingSource schedule (#7529)
Browse files Browse the repository at this point in the history
* feat: support for seconds field in PingSource schedule

* chore: additional test added

* chore: e2e test for configuring pingsource with seconds field in schedule

---------

Co-authored-by: SiBell <[email protected]>
  • Loading branch information
knative-prow-robot and SiBell authored Jan 3, 2024
1 parent 65bd6b9 commit cb5dfda
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 7 deletions.
7 changes: 6 additions & 1 deletion pkg/adapter/mtping/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ func NewEnvConfig() adapter.EnvConfigAccessor {

func NewAdapter(ctx context.Context, env adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
logger := logging.FromContext(ctx)
runner := NewCronJobsRunner(adapter.GetClientConfig(ctx), kubeclient.Get(ctx), logging.FromContext(ctx))

opts := cron.WithParser(cron.NewParser(
cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
))

runner := NewCronJobsRunner(adapter.GetClientConfig(ctx), kubeclient.Get(ctx), logging.FromContext(ctx), opts)

return &mtpingAdapter{
logger: logger,
Expand Down
6 changes: 5 additions & 1 deletion pkg/apis/sources/v1/ping_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func (cs *PingSourceSpec) Validate(ctx context.Context) *apis.FieldError {
schedule = "CRON_TZ=" + cs.Timezone + " " + schedule
}

if _, err := cron.ParseStandard(schedule); err != nil {
parser := cron.NewParser(
cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)

if _, err := parser.Parse(schedule); err != nil {
if strings.HasPrefix(err.Error(), "provided bad location") {
fe := apis.ErrInvalidValue(err, "timezone")
errs = errs.Also(fe)
Expand Down
26 changes: 23 additions & 3 deletions pkg/apis/sources/v1/ping_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ func TestPingSourceValidation(t *testing.T) {
},
},
want: nil,
},
{
name: "valid spec with schedule including seconds",
source: PingSource{
Spec: PingSourceSpec{
Schedule: "10 0/5 * * * ?",
SourceSpec: duckv1.SourceSpec{
Sink: duckv1.Destination{
Ref: &duckv1.KReference{
APIVersion: "v1",
Kind: "broker",
Name: "default",
},
},
},
},
},
want: nil,
}, {
name: "valid spec with timezone",
source: PingSource{
Expand Down Expand Up @@ -107,7 +125,8 @@ func TestPingSourceValidation(t *testing.T) {
want: func() *apis.FieldError {
return apis.ErrGeneric("expected at least one, got none", "ref", "uri").ViaField("spec.sink")
}(),
}, {
},
{
name: "invalid schedule",
source: PingSource{
Spec: PingSourceSpec{
Expand All @@ -125,11 +144,12 @@ func TestPingSourceValidation(t *testing.T) {
},
want: func() *apis.FieldError {
var errs *apis.FieldError
fe := apis.ErrInvalidValue("expected exactly 5 fields, found 1: [2]", "spec.schedule")
fe := apis.ErrInvalidValue("expected 5 to 6 fields, found 1: [2]", "spec.schedule")
errs = errs.Also(fe)
return errs
}(),
}, {
},
{
name: "valid spec with data",
source: PingSource{
Spec: PingSourceSpec{
Expand Down
6 changes: 5 additions & 1 deletion pkg/apis/sources/v1beta2/ping_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func (cs *PingSourceSpec) Validate(ctx context.Context) *apis.FieldError {
schedule = "CRON_TZ=" + cs.Timezone + " " + schedule
}

if _, err := cron.ParseStandard(schedule); err != nil {
parser := cron.NewParser(
cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)

if _, err := parser.Parse(schedule); err != nil {
if strings.HasPrefix(err.Error(), "provided bad location") {
fe := apis.ErrInvalidValue(err, "timezone")
errs = errs.Also(fe)
Expand Down
20 changes: 19 additions & 1 deletion pkg/apis/sources/v1beta2/ping_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ func TestPingSourceValidation(t *testing.T) {
},
},
want: nil,
},
{
name: "valid spec with schedule including seconds",
source: PingSource{
Spec: PingSourceSpec{
Schedule: "10 0/5 * * * ?",
SourceSpec: duckv1.SourceSpec{
Sink: duckv1.Destination{
Ref: &duckv1.KReference{
APIVersion: "v1",
Kind: "broker",
Name: "default",
},
},
},
},
},
want: nil,
}, {
name: "valid spec with timezone",
source: PingSource{
Expand Down Expand Up @@ -125,7 +143,7 @@ func TestPingSourceValidation(t *testing.T) {
},
want: func() *apis.FieldError {
var errs *apis.FieldError
fe := apis.ErrInvalidValue("expected exactly 5 fields, found 1: [2]", "spec.schedule")
fe := apis.ErrInvalidValue("expected 5 to 6 fields, found 1: [2]", "spec.schedule")
errs = errs.Also(fe)
return errs
}(),
Expand Down
20 changes: 20 additions & 0 deletions test/rekt/features/pingsource/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,26 @@ func SendsEventsWithCloudEventData() *feature.Feature {
return f
}

func SendsEventsWithSecondsInSchedule() *feature.Feature {
source := feature.MakeRandomK8sName("pingsource")
sink := feature.MakeRandomK8sName("sink")
f := feature.NewFeature()

f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver))

f.Requirement("install pingsource", pingsource.Install(source,
pingsource.WithSchedule("10 0/1 * * * ?"),
pingsource.WithSink(service.AsDestinationRef(sink)),
))
f.Requirement("pingsource goes ready", pingsource.IsReady(source))

f.Stable("pingsource as event source").
Must("delivers events",
assert.OnStore(sink).MatchEvent(test.HasType("dev.knative.sources.ping")).AtLeast(1))

return f
}

// SendsEventsWithEventTypes tests pingsource to a ready broker.
func SendsEventsWithEventTypes() *feature.Feature {
source := feature.MakeRandomK8sName("source")
Expand Down
14 changes: 14 additions & 0 deletions test/rekt/pingsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,17 @@ func TestPingSourceWithCloudEventData(t *testing.T) {

env.Test(ctx, t, pingsource.SendsEventsWithCloudEventData())
}

func TestPingSourceWithSecondsInSchedule(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

env.Test(ctx, t, pingsource.SendsEventsWithSecondsInSchedule())
}
45 changes: 45 additions & 0 deletions test/rekt/resources/pingsource/pingsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,48 @@ func Example_fullbase64() {
// CACerts: |-
// xyz
}

func Example_schedule_with_secs() {
ctx := testlog.NewContext()
images := map[string]string{}
cfg := map[string]interface{}{
"name": "foo",
"namespace": "bar",
"schedule": "10 0/5 * * * ?",
"contentType": "application/json",
"data": `{"message": "Hello world!"}`,
"sink": map[string]interface{}{
"ref": map[string]string{
"kind": "sinkkind",
"namespace": "sinknamespace",
"name": "sinkname",
"apiVersion": "sinkversion",
},
"uri": "uri/parts",
},
}

files, err := manifest.ExecuteYAML(ctx, yaml, images, cfg)
if err != nil {
panic(err)
}

manifest.OutputYAML(os.Stdout, files)
// Output:
// apiVersion: sources.knative.dev/v1
// kind: PingSource
// metadata:
// name: foo
// namespace: bar
// spec:
// schedule: '10 0/5 * * * ?'
// contentType: 'application/json'
// data: '{"message": "Hello world!"}'
// sink:
// ref:
// kind: sinkkind
// namespace: sinknamespace
// name: sinkname
// apiVersion: sinkversion
// uri: uri/parts
}

0 comments on commit cb5dfda

Please sign in to comment.