From 7176ce64501bb6cb5f47e33cb258cb48d463a938 Mon Sep 17 00:00:00 2001
From: Matthias Wessendorf
-(Appears on:ChannelableStatus, BrokerStatus, RequestReplyStatus, ParallelStatus, SequenceStatus, JobSinkStatus)
+(Appears on:ChannelableStatus, BrokerStatus, RequestReplyStatus, ParallelStatus, SequenceStatus, IntegrationSinkStatus, JobSinkStatus)
AppliedEventPoliciesStatus contains the list of policies which apply to a resource.
@@ -6144,8 +6144,109 @@ resolved delivery options.AppliedEventPoliciesStatus
+
IntegrationSink is the Schema for the IntegrationSink API.
+ +Field | +Description | +||||
---|---|---|---|---|---|
+apiVersion +string |
+
+
+sinks.knative.dev/v1alpha1
+
+ |
+||||
+kind +string + |
+IntegrationSink |
+||||
+metadata + + +Kubernetes meta/v1.ObjectMeta + + + |
+
+Refer to the Kubernetes API documentation for the fields of the
+metadata field.
+ |
+||||
+spec + + +IntegrationSinkSpec + + + |
+
+ + +
|
+||||
+status + + +IntegrationSinkStatus + + + |
++ | +
@@ -6234,6 +6335,164 @@ JobSinkStatus +
+(Appears on:IntegrationSinkSpec) +
++
+Field | +Description | +
---|---|
+s3 + +knative.dev/eventing/pkg/apis/common/integration/v1alpha1.AWSS3 + + |
++ | +
+sqs + +knative.dev/eventing/pkg/apis/common/integration/v1alpha1.AWSSQS + + |
+
+ S3 source configuration + |
+
+auth + +knative.dev/eventing/pkg/apis/common/integration/v1alpha1.Auth + + |
+
+ SQS source configuration + |
+
+(Appears on:IntegrationSink) +
++
+Field | +Description | +
---|---|
+aws + + +Aws + + + |
++ | +
+log + + +Log + + + |
+
+ AWS source configuration + |
+
+(Appears on:IntegrationSink) +
++
+Field | +Description | +
---|---|
+Status + + +knative.dev/pkg/apis/duck/v1.Status + + + |
+
+
+(Members of |
+
+AddressStatus + + +knative.dev/pkg/apis/duck/v1.AddressStatus + + + |
+
+
+(Members of AddressStatus is the part where the JobSink fulfills the Addressable contract. +It exposes the endpoint as an URI to get events delivered. + |
+
+AppliedEventPoliciesStatus + + +AppliedEventPoliciesStatus + + + |
+
+
+(Members of AppliedEventPoliciesStatus contains the list of EventPolicies which apply to this JobSink + |
+
@@ -6374,6 +6633,165 @@ string +
+(Appears on:IntegrationSinkSpec) +
++
+Field | +Description | +
---|---|
+loggerName + +string + + |
++ | +
+level + +string + + |
+
+ Name of the logging category to use + |
+
+logMask + +bool + + |
+
+ Logging level to use + |
+
+marker + +string + + |
+
+ Mask sensitive information in the log + |
+
+multiline + +bool + + |
+
+ An optional Marker name to use + |
+
+showAllProperties + +bool + + |
+
+ If enabled, outputs each information on a newline + |
+
+showBody + +bool + + |
+
+ Show all of the exchange properties (both internal and custom) + |
+
+showBodyType + +bool + + |
+
+ Show the message body + |
+
+showExchangePattern + +bool + + |
+
+ Show the body Java type + |
+
+showHeaders + +bool + + |
+
+ Show the Message Exchange Pattern (MEP) + |
+
+showProperties + +bool + + |
+
+ Show the headers received + |
+
+showStreams + +bool + + |
+
+ Show the exchange properties (only custom) + |
+
+showCachedStreams + +bool + + |
+
+ Show the stream bodies + |
+
diff --git a/pkg/apis/sinks/register.go b/pkg/apis/sinks/register.go index 676fa75e841..1994d579181 100644 --- a/pkg/apis/sinks/register.go +++ b/pkg/apis/sinks/register.go @@ -33,6 +33,12 @@ var ( Group: GroupName, Resource: "jobsinks", } + + // IntegrationSinkResource respresents a Knative Eventing sink IntegrationSink + IntegrationSinkResource = schema.GroupResource{ + Group: GroupName, + Resource: "integrationsinks", + } ) type Config struct { diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_conversion.go b/pkg/apis/sinks/v1alpha1/integration_sink_conversion.go new file mode 100644 index 00000000000..8f41d2da41c --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_conversion.go @@ -0,0 +1,36 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "fmt" + + "knative.dev/pkg/apis" +) + +// ConvertTo implements apis.Convertible +// Converts source from v1alpha1.IntegrationSink into a higher version. +func (sink *IntegrationSink) ConvertTo(ctx context.Context, obj apis.Convertible) error { + return fmt.Errorf("v1alpha1 is the highest known version, got: %T", sink) +} + +// ConvertFrom implements apis.Convertible +// Converts source from a higher version into v1beta2.IntegrationSink +func (sink *IntegrationSink) ConvertFrom(ctx context.Context, obj apis.Convertible) error { + return fmt.Errorf("v1alpha1 is the highest known version, got: %T", sink) +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_conversion_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_conversion_test.go new file mode 100644 index 00000000000..015e6bee3c3 --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_conversion_test.go @@ -0,0 +1,34 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "testing" +) + +func TestIntegrationSinkConversionBadType(t *testing.T) { + good, bad := &IntegrationSink{}, &testObject{} + + if err := good.ConvertTo(context.Background(), bad); err == nil { + t.Errorf("ConvertTo() = %#v, wanted error", bad) + } + + if err := good.ConvertFrom(context.Background(), bad); err == nil { + t.Errorf("ConvertFrom() = %#v, wanted error", good) + } +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_defaults.go b/pkg/apis/sinks/v1alpha1/integration_sink_defaults.go new file mode 100644 index 00000000000..f77df267e8d --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_defaults.go @@ -0,0 +1,26 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import "context" + +func (sink *IntegrationSink) SetDefaults(ctx context.Context) { + sink.Spec.SetDefaults(ctx) +} + +func (sink *IntegrationSinkSpec) SetDefaults(ctx context.Context) { +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_defaults_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_defaults_test.go new file mode 100644 index 00000000000..fd605809c77 --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_defaults_test.go @@ -0,0 +1,39 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestIntegrationSinkSetDefaults(t *testing.T) { + testCases := map[string]struct { + initial IntegrationSink + expected IntegrationSink + }{} + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + tc.initial.SetDefaults(context.TODO()) + if diff := cmp.Diff(tc.expected, tc.initial); diff != "" { + t.Fatal("Unexpected defaults (-want, +got):", diff) + } + }) + } +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go new file mode 100644 index 00000000000..1ad33e2cae9 --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go @@ -0,0 +1,123 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +const ( + // IntegrationSinkConditionReady has status True when the IntegrationSink is ready to send events. + IntegrationSinkConditionReady = apis.ConditionReady + + IntegrationSinkConditionAddressable apis.ConditionType = "Addressable" + + // IntegrationSinkConditionDeploymentReady has status True when the IntegrationSink has been configured with a deployment. + IntegrationSinkConditionDeploymentReady apis.ConditionType = "DeploymentReady" + + // IntegrationSinkConditionEventPoliciesReady has status True when all the applying EventPolicies for this + // IntegrationSink are ready. + IntegrationSinkConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady" +) + +var IntegrationSinkCondSet = apis.NewLivingConditionSet( + IntegrationSinkConditionAddressable, + IntegrationSinkConditionDeploymentReady, + IntegrationSinkConditionEventPoliciesReady, +) + +// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. +func (*IntegrationSink) GetConditionSet() apis.ConditionSet { + return IntegrationSinkCondSet +} + +// GetCondition returns the condition currently associated with the given type, or nil. +func (s *IntegrationSinkStatus) GetCondition(t apis.ConditionType) *apis.Condition { + return IntegrationSinkCondSet.Manage(s).GetCondition(t) +} + +// GetTopLevelCondition returns the top level Condition. +func (ps *IntegrationSinkStatus) GetTopLevelCondition() *apis.Condition { + return IntegrationSinkCondSet.Manage(ps).GetTopLevelCondition() +} + +// IsReady returns true if the resource is ready overall. +func (s *IntegrationSinkStatus) IsReady() bool { + return IntegrationSinkCondSet.Manage(s).IsHappy() +} + +// InitializeConditions sets relevant unset conditions to Unknown state. +func (s *IntegrationSinkStatus) InitializeConditions() { + IntegrationSinkCondSet.Manage(s).InitializeConditions() +} + +// MarkAddressableReady marks the Addressable condition to True. +func (s *IntegrationSinkStatus) MarkAddressableReady() { + IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkConditionAddressable) +} + +// MarkEventPoliciesFailed marks the EventPoliciesReady condition to False with the given reason and message. +func (s *IntegrationSinkStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) { + IntegrationSinkCondSet.Manage(s).MarkFalse(IntegrationSinkConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +// MarkEventPoliciesUnknown marks the EventPoliciesReady condition to Unknown with the given reason and message. +func (s *IntegrationSinkStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) { + IntegrationSinkCondSet.Manage(s).MarkUnknown(IntegrationSinkConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +// MarkEventPoliciesTrue marks the EventPoliciesReady condition to True. +func (s *IntegrationSinkStatus) MarkEventPoliciesTrue() { + IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkConditionEventPoliciesReady) +} + +// MarkEventPoliciesTrueWithReason marks the EventPoliciesReady condition to True with the given reason and message. +func (s *IntegrationSinkStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) { + IntegrationSinkCondSet.Manage(s).MarkTrueWithReason(IntegrationSinkConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +func (s *IntegrationSinkStatus) PropagateDeploymentStatus(d *appsv1.DeploymentStatus) { + deploymentAvailableFound := false + for _, cond := range d.Conditions { + if cond.Type == appsv1.DeploymentAvailable { + deploymentAvailableFound = true + if cond.Status == corev1.ConditionTrue { + IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkConditionDeploymentReady) + } else if cond.Status == corev1.ConditionFalse { + IntegrationSinkCondSet.Manage(s).MarkFalse(IntegrationSinkConditionDeploymentReady, cond.Reason, cond.Message) + } else if cond.Status == corev1.ConditionUnknown { + IntegrationSinkCondSet.Manage(s).MarkUnknown(IntegrationSinkConditionDeploymentReady, cond.Reason, cond.Message) + } + } + } + if !deploymentAvailableFound { + IntegrationSinkCondSet.Manage(s).MarkUnknown(IntegrationSinkConditionDeploymentReady, "DeploymentUnavailable", "The Deployment '%s' is unavailable.", d) + } +} + +func (s *IntegrationSinkStatus) SetAddress(address *duckv1.Addressable) { + s.Address = address + if address == nil || address.URL.IsEmpty() { + IntegrationSinkCondSet.Manage(s).MarkFalse(IntegrationSinkConditionAddressable, "EmptyHostname", "hostname is the empty string") + } else { + IntegrationSinkCondSet.Manage(s).MarkTrue(IntegrationSinkConditionAddressable) + + } +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go new file mode 100644 index 00000000000..627b3b13c6c --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go @@ -0,0 +1,125 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +func TestIntegrationSinkGetConditionSet(t *testing.T) { + r := &IntegrationSink{} + + if got, want := r.GetConditionSet().GetTopLevelConditionType(), apis.ConditionReady; got != want { + t.Errorf("GetTopLevelCondition=%v, want=%v", got, want) + } +} + +func TestIntegrationSinkInitializeConditions(t *testing.T) { + tests := []struct { + name string + js *IntegrationSinkStatus + want *IntegrationSinkStatus + }{{ + name: "empty", + js: &IntegrationSinkStatus{}, + want: &IntegrationSinkStatus{ + Status: duckv1.Status{ + Conditions: []apis.Condition{{ + Type: IntegrationSinkConditionAddressable, + Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkConditionDeploymentReady, + Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkConditionReady, + Status: corev1.ConditionUnknown, + }}, + }, + }, + }, { + name: "one false", + js: &IntegrationSinkStatus{ + Status: duckv1.Status{ + Conditions: []apis.Condition{{ + Type: IntegrationSinkConditionAddressable, + Status: corev1.ConditionFalse, + }}, + }, + }, + want: &IntegrationSinkStatus{ + Status: duckv1.Status{ + Conditions: []apis.Condition{{ + Type: IntegrationSinkConditionAddressable, + Status: corev1.ConditionFalse, + }, { + Type: IntegrationSinkConditionDeploymentReady, + Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkConditionReady, + Status: corev1.ConditionUnknown, + }}, + }, + }, + }, { + name: "one true", + js: &IntegrationSinkStatus{ + Status: duckv1.Status{ + Conditions: []apis.Condition{{ + Type: IntegrationSinkConditionAddressable, + Status: corev1.ConditionTrue, + }}, + }, + }, + want: &IntegrationSinkStatus{ + Status: duckv1.Status{ + Conditions: []apis.Condition{{ + Type: IntegrationSinkConditionAddressable, + Status: corev1.ConditionTrue, + }, { + Type: IntegrationSinkConditionDeploymentReady, + Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, + }, { + Type: IntegrationSinkConditionReady, + Status: corev1.ConditionUnknown, + }}, + }, + }, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + test.js.InitializeConditions() + if diff := cmp.Diff(test.want, test.js, ignoreAllButTypeAndStatus); diff != "" { + t.Error("unexpected conditions (-want, +got) =", diff) + } + }) + } +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_types.go b/pkg/apis/sinks/v1alpha1/integration_sink_types.go new file mode 100644 index 00000000000..5e2dbb46fd0 --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_types.go @@ -0,0 +1,117 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/eventing/pkg/apis/common/integration/v1alpha1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/kmeta" +) + +// +genclient +// +genreconciler +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +k8s:defaulter-gen=true + +// IntegrationSink is the Schema for the IntegrationSink API. +type IntegrationSink struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec IntegrationSinkSpec `json:"spec,omitempty"` + Status IntegrationSinkStatus `json:"status,omitempty"` +} + +// Check the interfaces that JobSink should be implementing. +var ( + _ runtime.Object = (*IntegrationSink)(nil) + _ kmeta.OwnerRefable = (*IntegrationSink)(nil) + _ apis.Validatable = (*IntegrationSink)(nil) + _ apis.Defaultable = (*IntegrationSink)(nil) + _ apis.HasSpec = (*IntegrationSink)(nil) + _ duckv1.KRShaped = (*IntegrationSink)(nil) + _ apis.Convertible = (*JobSink)(nil) +) + +type IntegrationSinkSpec struct { + Aws *Aws `json:"aws,omitempty"` // AWS source configuration + Log *Log `json:"log,omitempty"` // Log sink configuration +} + +type Log struct { + LoggerName string `json:"loggerName,omitempty" default:"log-sink"` // Name of the logging category to use + Level string `json:"level,omitempty" default:"INFO"` // Logging level to use + LogMask bool `json:"logMask,omitempty" default:"false"` // Mask sensitive information in the log + Marker string `json:"marker,omitempty"` // An optional Marker name to use + Multiline bool `json:"multiline,omitempty" default:"false"` // If enabled, outputs each information on a newline + ShowAllProperties bool `json:"showAllProperties,omitempty" default:"false"` // Show all of the exchange properties (both internal and custom) + ShowBody bool `json:"showBody,omitempty" default:"true"` // Show the message body + ShowBodyType bool `json:"showBodyType,omitempty" default:"true"` // Show the body Java type + ShowExchangePattern bool `json:"showExchangePattern,omitempty" default:"true"` // Show the Message Exchange Pattern (MEP) + ShowHeaders bool `json:"showHeaders,omitempty" default:"false"` // Show the headers received + ShowProperties bool `json:"showProperties,omitempty" default:"false"` // Show the exchange properties (only custom) + ShowStreams bool `json:"showStreams,omitempty" default:"false"` // Show the stream bodies + ShowCachedStreams bool `json:"showCachedStreams,omitempty" default:"true"` // Show cached stream bodies +} + +type Aws struct { + S3 *v1alpha1.AWSS3 `json:"s3,omitempty"` // S3 source configuration + SQS *v1alpha1.AWSSQS `json:"sqs,omitempty"` // SQS source configuration + Auth *v1alpha1.Auth `json:"auth,omitempty"` +} + +type IntegrationSinkStatus struct { + duckv1.Status `json:",inline"` + + // AddressStatus is the part where the JobSink fulfills the Addressable contract. + // It exposes the endpoint as an URI to get events delivered. + // +optional + duckv1.AddressStatus `json:",inline"` + + // AppliedEventPoliciesStatus contains the list of EventPolicies which apply to this JobSink + // +optional + eventingduckv1.AppliedEventPoliciesStatus `json:",inline"` +} + +// GetGroupVersionKind returns the GroupVersionKind. +func (*IntegrationSink) GetGroupVersionKind() schema.GroupVersionKind { + return SchemeGroupVersion.WithKind("IntegrationSink") +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// IntegrationSinkList contains a list of IntegrationSink +type IntegrationSinkList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []IntegrationSink `json:"items"` +} + +// GetUntypedSpec returns the spec of the IntegrationSink. +func (c *IntegrationSink) GetUntypedSpec() interface{} { + return c.Spec +} + +// GetStatus retrieves the status of the IntegrationSink. Implements the KRShaped interface. +func (c *IntegrationSink) GetStatus() *duckv1.Status { + return &c.Status.Status +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go new file mode 100644 index 00000000000..27efddffeca --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "testing" + + "knative.dev/eventing/pkg/apis/common/integration/v1alpha1" +) + +func TestIntegrationSink_GetStatus(t *testing.T) { + r := &IntegrationSink{ + Status: IntegrationSinkStatus{}, + } + if got, want := r.GetStatus(), &r.Status.Status; got != want { + t.Errorf("GetStatus=%v, want=%v", got, want) + } +} + +func TestIntegrationSink_GetGroupVersionKind(t *testing.T) { + src := &IntegrationSink{} + gvk := src.GetGroupVersionKind() + + if gvk.Kind != "IntegrationSink" { + t.Errorf("Should be IntegrationSink.") + } +} + +func TestLog(t *testing.T) { + log := Log{ + Level: "info", + ShowHeaders: true, + } + + if log.Level != "info" { + t.Errorf("Log.Level = %v, want 'info'", log.Level) + } + if log.ShowHeaders != true { + t.Errorf("Log.ShowHeaders = %v, want 'false'", log.ShowHeaders) + } +} + +func TestAWS(t *testing.T) { + s3 := v1alpha1.AWSS3{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "eu-north-1", + }, + Arn: "example-bucket", + } + + if s3.Region != "eu-north-1" { + t.Errorf("AWSS3.Region = %v, want 'eu-north-1'", s3.Region) + } + + sqs := v1alpha1.AWSSQS{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "eu-north-1", + }, + Arn: "example-queue", + } + + if sqs.Region != "eu-north-1" { + t.Errorf("AWSSQS.Region = %v, want 'eu-north-1'", sqs.Region) + } + + ddbStreams := v1alpha1.AWSDDBStreams{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "eu-north-1", + }, + Table: "example-table", + } + + if ddbStreams.Region != "eu-north-1" { + t.Errorf("AWSDDBStreams.Region = %v, want 'eu-north-1'", ddbStreams.Region) + } +} + +// TestAuthFieldAccess tests the HasAuth method and field access in Auth struct +func TestAuthFieldAccess(t *testing.T) { + auth := v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ + Name: "aws-secret", + }, + }, + AccessKey: "access-key", + SecretKey: "secret-key", + } + + if !auth.HasAuth() { + t.Error("Auth.HasAuth() = false, want true") + } + + if auth.Secret.Ref.Name != "aws-secret" { + t.Errorf("Auth.Secret.Ref.Name = %v, want 'aws-secret'", auth.Secret.Ref.Name) + } +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_validation.go b/pkg/apis/sinks/v1alpha1/integration_sink_validation.go new file mode 100644 index 00000000000..c96b83d7dbe --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_validation.go @@ -0,0 +1,85 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + + "knative.dev/pkg/apis" +) + +func (sink *IntegrationSink) Validate(ctx context.Context) *apis.FieldError { + ctx = apis.WithinParent(ctx, sink.ObjectMeta) + return sink.Spec.Validate(ctx).ViaField("spec") +} + +func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError { + var errs *apis.FieldError + + // Count how many fields are set to ensure mutual exclusivity + sinkSetCount := 0 + if spec.Log != nil { + sinkSetCount++ + } + if spec.Aws != nil { + if spec.Aws.S3 != nil { + sinkSetCount++ + } + if spec.Aws.SQS != nil { + sinkSetCount++ + } + } + + // Validate that only one sink field is set + if sinkSetCount > 1 { + errs = errs.Also(apis.ErrGeneric("only one sink type can be set", "spec")) + } else if sinkSetCount == 0 { + errs = errs.Also(apis.ErrGeneric("at least one sink type must be specified", "spec")) + } + + // Only perform AWS-specific validation if exactly one AWS sink is configured + if sinkSetCount == 1 && spec.Aws != nil { + if spec.Aws.S3 != nil || spec.Aws.SQS != nil { + // Check that AWS Auth is properly configured + if !spec.Aws.Auth.HasAuth() { + errs = errs.Also(apis.ErrMissingField("aws.auth.secret.ref.name")) + } + } + + // Additional validation for AWS S3 required fields + if spec.Aws.S3 != nil { + if spec.Aws.S3.Arn == "" { + errs = errs.Also(apis.ErrMissingField("aws.s3.bucketNameOrArn")) + } + if spec.Aws.S3.Region == "" { + errs = errs.Also(apis.ErrMissingField("aws.s3.region")) + } + } + + // Additional validation for AWS SQS required fields + if spec.Aws.SQS != nil { + if spec.Aws.SQS.Arn == "" { + errs = errs.Also(apis.ErrMissingField("aws.sqs.queueNameOrArn")) + } + if spec.Aws.SQS.Region == "" { + errs = errs.Also(apis.ErrMissingField("aws.sqs.region")) + } + } + } + + return errs +} diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go new file mode 100644 index 00000000000..cba73a420b7 --- /dev/null +++ b/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go @@ -0,0 +1,199 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "testing" + + "knative.dev/eventing/pkg/apis/common/integration/v1alpha1" + + "github.com/google/go-cmp/cmp" + "knative.dev/pkg/apis" +) + +func TestIntegrationSinkSpecValidation(t *testing.T) { + tests := []struct { + name string + spec IntegrationSinkSpec + want *apis.FieldError + }{ + { + name: "valid log sink", + spec: IntegrationSinkSpec{ + Log: &Log{ + Level: "info", + ShowHeaders: true, + }, + }, + want: nil, + }, + { + name: "valid AWS S3 sink with auth and region", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + S3: &v1alpha1.AWSS3{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "example-bucket", + }, + Auth: &v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ + Name: "aws-secret", + }, + }, + }, + }, + }, + want: nil, + }, + { + name: "valid AWS SQS sink with auth and region", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + SQS: &v1alpha1.AWSSQS{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "example-queue", + }, + Auth: &v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ + Name: "aws-secret", + }, + }, + }, + }, + }, + want: nil, + }, + { + name: "multiple sinks set (invalid)", + spec: IntegrationSinkSpec{ + Log: &Log{ + Level: "info", + ShowHeaders: true, + }, + Aws: &Aws{ + S3: &v1alpha1.AWSS3{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "example-bucket", + }, + }, + }, + want: apis.ErrGeneric("only one sink type can be set", "spec"), + }, + { + name: "multiple AWS sinks set (invalid)", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + S3: &v1alpha1.AWSS3{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "example-bucket", + }, + SQS: &v1alpha1.AWSSQS{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "example-queue", + }, + Auth: &v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ + Name: "aws-secret", + }, + }, + }, + }, + }, + want: apis.ErrGeneric("only one sink type can be set", "spec"), + }, + { + name: "AWS SQS sink without QueueNameOrArn (invalid)", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + SQS: &v1alpha1.AWSSQS{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + }, + Auth: &v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ + Name: "aws-secret", + }, + }, + }, + }, + }, + want: apis.ErrMissingField("aws.sqs.queueNameOrArn"), + }, + { + name: "no sink type specified (invalid)", + spec: IntegrationSinkSpec{}, + want: apis.ErrGeneric("at least one sink type must be specified", "spec"), + }, + { + name: "AWS sink without auth (invalid)", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + S3: &v1alpha1.AWSS3{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "example-bucket", + }, + }, + }, + want: apis.ErrMissingField("aws.auth.secret.ref.name"), + }, + { + name: "AWS S3 sink without region (invalid)", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + S3: &v1alpha1.AWSS3{ + Arn: "example-bucket", + }, + Auth: &v1alpha1.Auth{ + Secret: &v1alpha1.Secret{ + Ref: &v1alpha1.SecretReference{ + Name: "aws-secret", + }, + }, + }, + }, + }, + want: apis.ErrMissingField("aws.s3.region"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.spec.Validate(context.TODO()) + if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { + t.Errorf("IntegrationSinkSpec.Validate (-want, +got) = %v", diff) + } + }) + } +} diff --git a/pkg/apis/sinks/v1alpha1/register.go b/pkg/apis/sinks/v1alpha1/register.go index 827ebc28b42..89e4f3fbec8 100644 --- a/pkg/apis/sinks/v1alpha1/register.go +++ b/pkg/apis/sinks/v1alpha1/register.go @@ -47,6 +47,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &JobSink{}, &JobSinkList{}, + &IntegrationSink{}, + &IntegrationSinkList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/sinks/v1alpha1/register_test.go b/pkg/apis/sinks/v1alpha1/register_test.go index aebdc4a2c15..d9eee0e2e80 100644 --- a/pkg/apis/sinks/v1alpha1/register_test.go +++ b/pkg/apis/sinks/v1alpha1/register_test.go @@ -62,6 +62,8 @@ func TestKnownTypes(t *testing.T) { for _, name := range []string{ "JobSink", "JobSinkList", + "IntegrationSink", + "IntegrationSinkList", } { if _, ok := types[name]; !ok { t.Errorf("Did not find %q as registered type", name) diff --git a/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go index 58c9fdfaf8e..3dff23adf1d 100644 --- a/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go @@ -24,8 +24,146 @@ package v1alpha1 import ( v1 "k8s.io/api/batch/v1" runtime "k8s.io/apimachinery/pkg/runtime" + integrationv1alpha1 "knative.dev/eventing/pkg/apis/common/integration/v1alpha1" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Aws) DeepCopyInto(out *Aws) { + *out = *in + if in.S3 != nil { + in, out := &in.S3, &out.S3 + *out = new(integrationv1alpha1.AWSS3) + **out = **in + } + if in.SQS != nil { + in, out := &in.SQS, &out.SQS + *out = new(integrationv1alpha1.AWSSQS) + **out = **in + } + if in.Auth != nil { + in, out := &in.Auth, &out.Auth + *out = new(integrationv1alpha1.Auth) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Aws. +func (in *Aws) DeepCopy() *Aws { + if in == nil { + return nil + } + out := new(Aws) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IntegrationSink) DeepCopyInto(out *IntegrationSink) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IntegrationSink. +func (in *IntegrationSink) DeepCopy() *IntegrationSink { + if in == nil { + return nil + } + out := new(IntegrationSink) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *IntegrationSink) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IntegrationSinkList) DeepCopyInto(out *IntegrationSinkList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]IntegrationSink, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IntegrationSinkList. +func (in *IntegrationSinkList) DeepCopy() *IntegrationSinkList { + if in == nil { + return nil + } + out := new(IntegrationSinkList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *IntegrationSinkList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IntegrationSinkSpec) DeepCopyInto(out *IntegrationSinkSpec) { + *out = *in + if in.Aws != nil { + in, out := &in.Aws, &out.Aws + *out = new(Aws) + (*in).DeepCopyInto(*out) + } + if in.Log != nil { + in, out := &in.Log, &out.Log + *out = new(Log) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IntegrationSinkSpec. +func (in *IntegrationSinkSpec) DeepCopy() *IntegrationSinkSpec { + if in == nil { + return nil + } + out := new(IntegrationSinkSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IntegrationSinkStatus) DeepCopyInto(out *IntegrationSinkStatus) { + *out = *in + in.Status.DeepCopyInto(&out.Status) + in.AddressStatus.DeepCopyInto(&out.AddressStatus) + in.AppliedEventPoliciesStatus.DeepCopyInto(&out.AppliedEventPoliciesStatus) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IntegrationSinkStatus. +func (in *IntegrationSinkStatus) DeepCopy() *IntegrationSinkStatus { + if in == nil { + return nil + } + out := new(IntegrationSinkStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JobSink) DeepCopyInto(out *JobSink) { *out = *in @@ -143,3 +281,19 @@ func (in *JobStatus) DeepCopy() *JobStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Log) DeepCopyInto(out *Log) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Log. +func (in *Log) DeepCopy() *Log { + if in == nil { + return nil + } + out := new(Log) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/client/clientset/versioned/typed/sinks/v1alpha1/fake/fake_integrationsink.go b/pkg/client/clientset/versioned/typed/sinks/v1alpha1/fake/fake_integrationsink.go new file mode 100644 index 00000000000..690b2a948b9 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/sinks/v1alpha1/fake/fake_integrationsink.go @@ -0,0 +1,141 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" + v1alpha1 "knative.dev/eventing/pkg/apis/sinks/v1alpha1" +) + +// FakeIntegrationSinks implements IntegrationSinkInterface +type FakeIntegrationSinks struct { + Fake *FakeSinksV1alpha1 + ns string +} + +var integrationsinksResource = v1alpha1.SchemeGroupVersion.WithResource("integrationsinks") + +var integrationsinksKind = v1alpha1.SchemeGroupVersion.WithKind("IntegrationSink") + +// Get takes name of the integrationSink, and returns the corresponding integrationSink object, and an error if there is any. +func (c *FakeIntegrationSinks) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.IntegrationSink, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(integrationsinksResource, c.ns, name), &v1alpha1.IntegrationSink{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.IntegrationSink), err +} + +// List takes label and field selectors, and returns the list of IntegrationSinks that match those selectors. +func (c *FakeIntegrationSinks) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.IntegrationSinkList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(integrationsinksResource, integrationsinksKind, c.ns, opts), &v1alpha1.IntegrationSinkList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.IntegrationSinkList{ListMeta: obj.(*v1alpha1.IntegrationSinkList).ListMeta} + for _, item := range obj.(*v1alpha1.IntegrationSinkList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested integrationSinks. +func (c *FakeIntegrationSinks) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(integrationsinksResource, c.ns, opts)) + +} + +// Create takes the representation of a integrationSink and creates it. Returns the server's representation of the integrationSink, and an error, if there is any. +func (c *FakeIntegrationSinks) Create(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.CreateOptions) (result *v1alpha1.IntegrationSink, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(integrationsinksResource, c.ns, integrationSink), &v1alpha1.IntegrationSink{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.IntegrationSink), err +} + +// Update takes the representation of a integrationSink and updates it. Returns the server's representation of the integrationSink, and an error, if there is any. +func (c *FakeIntegrationSinks) Update(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.UpdateOptions) (result *v1alpha1.IntegrationSink, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(integrationsinksResource, c.ns, integrationSink), &v1alpha1.IntegrationSink{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.IntegrationSink), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeIntegrationSinks) UpdateStatus(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.UpdateOptions) (*v1alpha1.IntegrationSink, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(integrationsinksResource, "status", c.ns, integrationSink), &v1alpha1.IntegrationSink{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.IntegrationSink), err +} + +// Delete takes name of the integrationSink and deletes it. Returns an error if one occurs. +func (c *FakeIntegrationSinks) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteActionWithOptions(integrationsinksResource, c.ns, name, opts), &v1alpha1.IntegrationSink{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeIntegrationSinks) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(integrationsinksResource, c.ns, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.IntegrationSinkList{}) + return err +} + +// Patch applies the patch and returns the patched integrationSink. +func (c *FakeIntegrationSinks) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.IntegrationSink, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(integrationsinksResource, c.ns, name, pt, data, subresources...), &v1alpha1.IntegrationSink{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.IntegrationSink), err +} diff --git a/pkg/client/clientset/versioned/typed/sinks/v1alpha1/fake/fake_sinks_client.go b/pkg/client/clientset/versioned/typed/sinks/v1alpha1/fake/fake_sinks_client.go index f64a39a5c92..c3cbdcfe5f1 100644 --- a/pkg/client/clientset/versioned/typed/sinks/v1alpha1/fake/fake_sinks_client.go +++ b/pkg/client/clientset/versioned/typed/sinks/v1alpha1/fake/fake_sinks_client.go @@ -28,6 +28,10 @@ type FakeSinksV1alpha1 struct { *testing.Fake } +func (c *FakeSinksV1alpha1) IntegrationSinks(namespace string) v1alpha1.IntegrationSinkInterface { + return &FakeIntegrationSinks{c, namespace} +} + func (c *FakeSinksV1alpha1) JobSinks(namespace string) v1alpha1.JobSinkInterface { return &FakeJobSinks{c, namespace} } diff --git a/pkg/client/clientset/versioned/typed/sinks/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/sinks/v1alpha1/generated_expansion.go index 029b7bee811..b444c446060 100644 --- a/pkg/client/clientset/versioned/typed/sinks/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/sinks/v1alpha1/generated_expansion.go @@ -18,4 +18,6 @@ limitations under the License. package v1alpha1 +type IntegrationSinkExpansion interface{} + type JobSinkExpansion interface{} diff --git a/pkg/client/clientset/versioned/typed/sinks/v1alpha1/integrationsink.go b/pkg/client/clientset/versioned/typed/sinks/v1alpha1/integrationsink.go new file mode 100644 index 00000000000..3e75e3aedff --- /dev/null +++ b/pkg/client/clientset/versioned/typed/sinks/v1alpha1/integrationsink.go @@ -0,0 +1,195 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + "time" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" + v1alpha1 "knative.dev/eventing/pkg/apis/sinks/v1alpha1" + scheme "knative.dev/eventing/pkg/client/clientset/versioned/scheme" +) + +// IntegrationSinksGetter has a method to return a IntegrationSinkInterface. +// A group's client should implement this interface. +type IntegrationSinksGetter interface { + IntegrationSinks(namespace string) IntegrationSinkInterface +} + +// IntegrationSinkInterface has methods to work with IntegrationSink resources. +type IntegrationSinkInterface interface { + Create(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.CreateOptions) (*v1alpha1.IntegrationSink, error) + Update(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.UpdateOptions) (*v1alpha1.IntegrationSink, error) + UpdateStatus(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.UpdateOptions) (*v1alpha1.IntegrationSink, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.IntegrationSink, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.IntegrationSinkList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.IntegrationSink, err error) + IntegrationSinkExpansion +} + +// integrationSinks implements IntegrationSinkInterface +type integrationSinks struct { + client rest.Interface + ns string +} + +// newIntegrationSinks returns a IntegrationSinks +func newIntegrationSinks(c *SinksV1alpha1Client, namespace string) *integrationSinks { + return &integrationSinks{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the integrationSink, and returns the corresponding integrationSink object, and an error if there is any. +func (c *integrationSinks) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.IntegrationSink, err error) { + result = &v1alpha1.IntegrationSink{} + err = c.client.Get(). + Namespace(c.ns). + Resource("integrationsinks"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of IntegrationSinks that match those selectors. +func (c *integrationSinks) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.IntegrationSinkList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.IntegrationSinkList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("integrationsinks"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested integrationSinks. +func (c *integrationSinks) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("integrationsinks"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a integrationSink and creates it. Returns the server's representation of the integrationSink, and an error, if there is any. +func (c *integrationSinks) Create(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.CreateOptions) (result *v1alpha1.IntegrationSink, err error) { + result = &v1alpha1.IntegrationSink{} + err = c.client.Post(). + Namespace(c.ns). + Resource("integrationsinks"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(integrationSink). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a integrationSink and updates it. Returns the server's representation of the integrationSink, and an error, if there is any. +func (c *integrationSinks) Update(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.UpdateOptions) (result *v1alpha1.IntegrationSink, err error) { + result = &v1alpha1.IntegrationSink{} + err = c.client.Put(). + Namespace(c.ns). + Resource("integrationsinks"). + Name(integrationSink.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(integrationSink). + Do(ctx). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *integrationSinks) UpdateStatus(ctx context.Context, integrationSink *v1alpha1.IntegrationSink, opts v1.UpdateOptions) (result *v1alpha1.IntegrationSink, err error) { + result = &v1alpha1.IntegrationSink{} + err = c.client.Put(). + Namespace(c.ns). + Resource("integrationsinks"). + Name(integrationSink.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(integrationSink). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the integrationSink and deletes it. Returns an error if one occurs. +func (c *integrationSinks) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("integrationsinks"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *integrationSinks) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("integrationsinks"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched integrationSink. +func (c *integrationSinks) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.IntegrationSink, err error) { + result = &v1alpha1.IntegrationSink{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("integrationsinks"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/pkg/client/clientset/versioned/typed/sinks/v1alpha1/sinks_client.go b/pkg/client/clientset/versioned/typed/sinks/v1alpha1/sinks_client.go index 2012a8f57a6..9d394dfb0b5 100644 --- a/pkg/client/clientset/versioned/typed/sinks/v1alpha1/sinks_client.go +++ b/pkg/client/clientset/versioned/typed/sinks/v1alpha1/sinks_client.go @@ -28,6 +28,7 @@ import ( type SinksV1alpha1Interface interface { RESTClient() rest.Interface + IntegrationSinksGetter JobSinksGetter } @@ -36,6 +37,10 @@ type SinksV1alpha1Client struct { restClient rest.Interface } +func (c *SinksV1alpha1Client) IntegrationSinks(namespace string) IntegrationSinkInterface { + return newIntegrationSinks(c, namespace) +} + func (c *SinksV1alpha1Client) JobSinks(namespace string) JobSinkInterface { return newJobSinks(c, namespace) } diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index c53cfc57753..6664060b734 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -101,6 +101,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Messaging().V1().Subscriptions().Informer()}, nil // Group=sinks.knative.dev, Version=v1alpha1 + case sinksv1alpha1.SchemeGroupVersion.WithResource("integrationsinks"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Sinks().V1alpha1().IntegrationSinks().Informer()}, nil case sinksv1alpha1.SchemeGroupVersion.WithResource("jobsinks"): return &genericInformer{resource: resource.GroupResource(), informer: f.Sinks().V1alpha1().JobSinks().Informer()}, nil diff --git a/pkg/client/informers/externalversions/sinks/v1alpha1/integrationsink.go b/pkg/client/informers/externalversions/sinks/v1alpha1/integrationsink.go new file mode 100644 index 00000000000..d67f21681ff --- /dev/null +++ b/pkg/client/informers/externalversions/sinks/v1alpha1/integrationsink.go @@ -0,0 +1,90 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" + sinksv1alpha1 "knative.dev/eventing/pkg/apis/sinks/v1alpha1" + versioned "knative.dev/eventing/pkg/client/clientset/versioned" + internalinterfaces "knative.dev/eventing/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "knative.dev/eventing/pkg/client/listers/sinks/v1alpha1" +) + +// IntegrationSinkInformer provides access to a shared informer and lister for +// IntegrationSinks. +type IntegrationSinkInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.IntegrationSinkLister +} + +type integrationSinkInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewIntegrationSinkInformer constructs a new informer for IntegrationSink type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewIntegrationSinkInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredIntegrationSinkInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredIntegrationSinkInformer constructs a new informer for IntegrationSink type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredIntegrationSinkInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.SinksV1alpha1().IntegrationSinks(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.SinksV1alpha1().IntegrationSinks(namespace).Watch(context.TODO(), options) + }, + }, + &sinksv1alpha1.IntegrationSink{}, + resyncPeriod, + indexers, + ) +} + +func (f *integrationSinkInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredIntegrationSinkInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *integrationSinkInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&sinksv1alpha1.IntegrationSink{}, f.defaultInformer) +} + +func (f *integrationSinkInformer) Lister() v1alpha1.IntegrationSinkLister { + return v1alpha1.NewIntegrationSinkLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/informers/externalversions/sinks/v1alpha1/interface.go b/pkg/client/informers/externalversions/sinks/v1alpha1/interface.go index 68ed173ac5f..fec7f62e628 100644 --- a/pkg/client/informers/externalversions/sinks/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/sinks/v1alpha1/interface.go @@ -24,6 +24,8 @@ import ( // Interface provides access to all the informers in this group version. type Interface interface { + // IntegrationSinks returns a IntegrationSinkInformer. + IntegrationSinks() IntegrationSinkInformer // JobSinks returns a JobSinkInformer. JobSinks() JobSinkInformer } @@ -39,6 +41,11 @@ func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakList return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} } +// IntegrationSinks returns a IntegrationSinkInformer. +func (v *version) IntegrationSinks() IntegrationSinkInformer { + return &integrationSinkInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // JobSinks returns a JobSinkInformer. func (v *version) JobSinks() JobSinkInformer { return &jobSinkInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/client/injection/informers/sinks/v1alpha1/integrationsink/fake/fake.go b/pkg/client/injection/informers/sinks/v1alpha1/integrationsink/fake/fake.go new file mode 100644 index 00000000000..6985bd80009 --- /dev/null +++ b/pkg/client/injection/informers/sinks/v1alpha1/integrationsink/fake/fake.go @@ -0,0 +1,40 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package fake + +import ( + context "context" + + fake "knative.dev/eventing/pkg/client/injection/informers/factory/fake" + integrationsink "knative.dev/eventing/pkg/client/injection/informers/sinks/v1alpha1/integrationsink" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" +) + +var Get = integrationsink.Get + +func init() { + injection.Fake.RegisterInformer(withInformer) +} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := fake.Get(ctx) + inf := f.Sinks().V1alpha1().IntegrationSinks() + return context.WithValue(ctx, integrationsink.Key{}, inf), inf.Informer() +} diff --git a/pkg/client/injection/informers/sinks/v1alpha1/integrationsink/filtered/fake/fake.go b/pkg/client/injection/informers/sinks/v1alpha1/integrationsink/filtered/fake/fake.go new file mode 100644 index 00000000000..c299df61a25 --- /dev/null +++ b/pkg/client/injection/informers/sinks/v1alpha1/integrationsink/filtered/fake/fake.go @@ -0,0 +1,52 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package fake + +import ( + context "context" + + factoryfiltered "knative.dev/eventing/pkg/client/injection/informers/factory/filtered" + filtered "knative.dev/eventing/pkg/client/injection/informers/sinks/v1alpha1/integrationsink/filtered" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +var Get = filtered.Get + +func init() { + injection.Fake.RegisterFilteredInformers(withInformer) +} + +func withInformer(ctx context.Context) (context.Context, []controller.Informer) { + untyped := ctx.Value(factoryfiltered.LabelKey{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch labelkey from context.") + } + labelSelectors := untyped.([]string) + infs := []controller.Informer{} + for _, selector := range labelSelectors { + f := factoryfiltered.Get(ctx, selector) + inf := f.Sinks().V1alpha1().IntegrationSinks() + ctx = context.WithValue(ctx, filtered.Key{Selector: selector}, inf) + infs = append(infs, inf.Informer()) + } + return ctx, infs +} diff --git a/pkg/client/injection/informers/sinks/v1alpha1/integrationsink/filtered/integrationsink.go b/pkg/client/injection/informers/sinks/v1alpha1/integrationsink/filtered/integrationsink.go new file mode 100644 index 00000000000..b4ed22b8be8 --- /dev/null +++ b/pkg/client/injection/informers/sinks/v1alpha1/integrationsink/filtered/integrationsink.go @@ -0,0 +1,65 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package filtered + +import ( + context "context" + + v1alpha1 "knative.dev/eventing/pkg/client/informers/externalversions/sinks/v1alpha1" + filtered "knative.dev/eventing/pkg/client/injection/informers/factory/filtered" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +func init() { + injection.Default.RegisterFilteredInformers(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct { + Selector string +} + +func withInformer(ctx context.Context) (context.Context, []controller.Informer) { + untyped := ctx.Value(filtered.LabelKey{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch labelkey from context.") + } + labelSelectors := untyped.([]string) + infs := []controller.Informer{} + for _, selector := range labelSelectors { + f := filtered.Get(ctx, selector) + inf := f.Sinks().V1alpha1().IntegrationSinks() + ctx = context.WithValue(ctx, Key{Selector: selector}, inf) + infs = append(infs, inf.Informer()) + } + return ctx, infs +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context, selector string) v1alpha1.IntegrationSinkInformer { + untyped := ctx.Value(Key{Selector: selector}) + if untyped == nil { + logging.FromContext(ctx).Panicf( + "Unable to fetch knative.dev/eventing/pkg/client/informers/externalversions/sinks/v1alpha1.IntegrationSinkInformer with selector %s from context.", selector) + } + return untyped.(v1alpha1.IntegrationSinkInformer) +} diff --git a/pkg/client/injection/informers/sinks/v1alpha1/integrationsink/integrationsink.go b/pkg/client/injection/informers/sinks/v1alpha1/integrationsink/integrationsink.go new file mode 100644 index 00000000000..0207137a26d --- /dev/null +++ b/pkg/client/injection/informers/sinks/v1alpha1/integrationsink/integrationsink.go @@ -0,0 +1,52 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package integrationsink + +import ( + context "context" + + v1alpha1 "knative.dev/eventing/pkg/client/informers/externalversions/sinks/v1alpha1" + factory "knative.dev/eventing/pkg/client/injection/informers/factory" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +func init() { + injection.Default.RegisterInformer(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct{} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := factory.Get(ctx) + inf := f.Sinks().V1alpha1().IntegrationSinks() + return context.WithValue(ctx, Key{}, inf), inf.Informer() +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context) v1alpha1.IntegrationSinkInformer { + untyped := ctx.Value(Key{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch knative.dev/eventing/pkg/client/informers/externalversions/sinks/v1alpha1.IntegrationSinkInformer from context.") + } + return untyped.(v1alpha1.IntegrationSinkInformer) +} diff --git a/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink/controller.go b/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink/controller.go new file mode 100644 index 00000000000..e201d14bbb4 --- /dev/null +++ b/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink/controller.go @@ -0,0 +1,170 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package integrationsink + +import ( + context "context" + fmt "fmt" + reflect "reflect" + strings "strings" + + zap "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + labels "k8s.io/apimachinery/pkg/labels" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + scheme "k8s.io/client-go/kubernetes/scheme" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + record "k8s.io/client-go/tools/record" + versionedscheme "knative.dev/eventing/pkg/client/clientset/versioned/scheme" + client "knative.dev/eventing/pkg/client/injection/client" + integrationsink "knative.dev/eventing/pkg/client/injection/informers/sinks/v1alpha1/integrationsink" + kubeclient "knative.dev/pkg/client/injection/kube/client" + controller "knative.dev/pkg/controller" + logging "knative.dev/pkg/logging" + logkey "knative.dev/pkg/logging/logkey" + reconciler "knative.dev/pkg/reconciler" +) + +const ( + defaultControllerAgentName = "integrationsink-controller" + defaultFinalizerName = "integrationsinks.sinks.knative.dev" +) + +// NewImpl returns a controller.Impl that handles queuing and feeding work from +// the queue through an implementation of controller.Reconciler, delegating to +// the provided Interface and optional Finalizer methods. OptionsFn is used to return +// controller.ControllerOptions to be used by the internal reconciler. +func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsFn) *controller.Impl { + logger := logging.FromContext(ctx) + + // Check the options function input. It should be 0 or 1. + if len(optionsFns) > 1 { + logger.Fatal("Up to one options function is supported, found: ", len(optionsFns)) + } + + integrationsinkInformer := integrationsink.Get(ctx) + + lister := integrationsinkInformer.Lister() + + var promoteFilterFunc func(obj interface{}) bool + var promoteFunc = func(bkt reconciler.Bucket) {} + + rec := &reconcilerImpl{ + LeaderAwareFuncs: reconciler.LeaderAwareFuncs{ + PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error { + + // Signal promotion event + promoteFunc(bkt) + + all, err := lister.List(labels.Everything()) + if err != nil { + return err + } + for _, elt := range all { + if promoteFilterFunc != nil { + if ok := promoteFilterFunc(elt); !ok { + continue + } + } + enq(bkt, types.NamespacedName{ + Namespace: elt.GetNamespace(), + Name: elt.GetName(), + }) + } + return nil + }, + }, + Client: client.Get(ctx), + Lister: lister, + reconciler: r, + finalizerName: defaultFinalizerName, + } + + ctrType := reflect.TypeOf(r).Elem() + ctrTypeName := fmt.Sprintf("%s.%s", ctrType.PkgPath(), ctrType.Name()) + ctrTypeName = strings.ReplaceAll(ctrTypeName, "/", ".") + + logger = logger.With( + zap.String(logkey.ControllerType, ctrTypeName), + zap.String(logkey.Kind, "sinks.knative.dev.IntegrationSink"), + ) + + impl := controller.NewContext(ctx, rec, controller.ControllerOptions{WorkQueueName: ctrTypeName, Logger: logger}) + agentName := defaultControllerAgentName + + // Pass impl to the options. Save any optional results. + for _, fn := range optionsFns { + opts := fn(impl) + if opts.ConfigStore != nil { + rec.configStore = opts.ConfigStore + } + if opts.FinalizerName != "" { + rec.finalizerName = opts.FinalizerName + } + if opts.AgentName != "" { + agentName = opts.AgentName + } + if opts.SkipStatusUpdates { + rec.skipStatusUpdates = true + } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } + if opts.PromoteFilterFunc != nil { + promoteFilterFunc = opts.PromoteFilterFunc + } + if opts.PromoteFunc != nil { + promoteFunc = opts.PromoteFunc + } + } + + rec.Recorder = createRecorder(ctx, agentName) + + return impl +} + +func createRecorder(ctx context.Context, agentName string) record.EventRecorder { + logger := logging.FromContext(ctx) + + recorder := controller.GetEventRecorder(ctx) + if recorder == nil { + // Create event broadcaster + logger.Debug("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + watches := []watch.Interface{ + eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), + eventBroadcaster.StartRecordingToSink( + &v1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events("")}), + } + recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: agentName}) + go func() { + <-ctx.Done() + for _, w := range watches { + w.Stop() + } + }() + } + + return recorder +} + +func init() { + versionedscheme.AddToScheme(scheme.Scheme) +} diff --git a/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink/reconciler.go b/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink/reconciler.go new file mode 100644 index 00000000000..6175aacf9aa --- /dev/null +++ b/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink/reconciler.go @@ -0,0 +1,440 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package integrationsink + +import ( + context "context" + json "encoding/json" + fmt "fmt" + + zap "go.uber.org/zap" + "go.uber.org/zap/zapcore" + v1 "k8s.io/api/core/v1" + equality "k8s.io/apimachinery/pkg/api/equality" + errors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + types "k8s.io/apimachinery/pkg/types" + sets "k8s.io/apimachinery/pkg/util/sets" + record "k8s.io/client-go/tools/record" + v1alpha1 "knative.dev/eventing/pkg/apis/sinks/v1alpha1" + versioned "knative.dev/eventing/pkg/client/clientset/versioned" + sinksv1alpha1 "knative.dev/eventing/pkg/client/listers/sinks/v1alpha1" + controller "knative.dev/pkg/controller" + kmp "knative.dev/pkg/kmp" + logging "knative.dev/pkg/logging" + reconciler "knative.dev/pkg/reconciler" +) + +// Interface defines the strongly typed interfaces to be implemented by a +// controller reconciling v1alpha1.IntegrationSink. +type Interface interface { + // ReconcileKind implements custom logic to reconcile v1alpha1.IntegrationSink. Any changes + // to the objects .Status or .Finalizers will be propagated to the stored + // object. It is recommended that implementors do not call any update calls + // for the Kind inside of ReconcileKind, it is the responsibility of the calling + // controller to propagate those properties. The resource passed to ReconcileKind + // will always have an empty deletion timestamp. + ReconcileKind(ctx context.Context, o *v1alpha1.IntegrationSink) reconciler.Event +} + +// Finalizer defines the strongly typed interfaces to be implemented by a +// controller finalizing v1alpha1.IntegrationSink. +type Finalizer interface { + // FinalizeKind implements custom logic to finalize v1alpha1.IntegrationSink. Any changes + // to the objects .Status or .Finalizers will be ignored. Returning a nil or + // Normal type reconciler.Event will allow the finalizer to be deleted on + // the resource. The resource passed to FinalizeKind will always have a set + // deletion timestamp. + FinalizeKind(ctx context.Context, o *v1alpha1.IntegrationSink) reconciler.Event +} + +// ReadOnlyInterface defines the strongly typed interfaces to be implemented by a +// controller reconciling v1alpha1.IntegrationSink if they want to process resources for which +// they are not the leader. +type ReadOnlyInterface interface { + // ObserveKind implements logic to observe v1alpha1.IntegrationSink. + // This method should not write to the API. + ObserveKind(ctx context.Context, o *v1alpha1.IntegrationSink) reconciler.Event +} + +type doReconcile func(ctx context.Context, o *v1alpha1.IntegrationSink) reconciler.Event + +// reconcilerImpl implements controller.Reconciler for v1alpha1.IntegrationSink resources. +type reconcilerImpl struct { + // LeaderAwareFuncs is inlined to help us implement reconciler.LeaderAware. + reconciler.LeaderAwareFuncs + + // Client is used to write back status updates. + Client versioned.Interface + + // Listers index properties about resources. + Lister sinksv1alpha1.IntegrationSinkLister + + // Recorder is an event recorder for recording Event resources to the + // Kubernetes API. + Recorder record.EventRecorder + + // configStore allows for decorating a context with config maps. + // +optional + configStore reconciler.ConfigStore + + // reconciler is the implementation of the business logic of the resource. + reconciler Interface + + // finalizerName is the name of the finalizer to reconcile. + finalizerName string + + // skipStatusUpdates configures whether or not this reconciler automatically updates + // the status of the reconciled resource. + skipStatusUpdates bool +} + +// Check that our Reconciler implements controller.Reconciler. +var _ controller.Reconciler = (*reconcilerImpl)(nil) + +// Check that our generated Reconciler is always LeaderAware. +var _ reconciler.LeaderAware = (*reconcilerImpl)(nil) + +func NewReconciler(ctx context.Context, logger *zap.SugaredLogger, client versioned.Interface, lister sinksv1alpha1.IntegrationSinkLister, recorder record.EventRecorder, r Interface, options ...controller.Options) controller.Reconciler { + // Check the options function input. It should be 0 or 1. + if len(options) > 1 { + logger.Fatal("Up to one options struct is supported, found: ", len(options)) + } + + // Fail fast when users inadvertently implement the other LeaderAware interface. + // For the typed reconcilers, Promote shouldn't take any arguments. + if _, ok := r.(reconciler.LeaderAware); ok { + logger.Fatalf("%T implements the incorrect LeaderAware interface. Promote() should not take an argument as genreconciler handles the enqueuing automatically.", r) + } + + rec := &reconcilerImpl{ + LeaderAwareFuncs: reconciler.LeaderAwareFuncs{ + PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error { + all, err := lister.List(labels.Everything()) + if err != nil { + return err + } + for _, elt := range all { + // TODO: Consider letting users specify a filter in options. + enq(bkt, types.NamespacedName{ + Namespace: elt.GetNamespace(), + Name: elt.GetName(), + }) + } + return nil + }, + }, + Client: client, + Lister: lister, + Recorder: recorder, + reconciler: r, + finalizerName: defaultFinalizerName, + } + + for _, opts := range options { + if opts.ConfigStore != nil { + rec.configStore = opts.ConfigStore + } + if opts.FinalizerName != "" { + rec.finalizerName = opts.FinalizerName + } + if opts.SkipStatusUpdates { + rec.skipStatusUpdates = true + } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } + } + + return rec +} + +// Reconcile implements controller.Reconciler +func (r *reconcilerImpl) Reconcile(ctx context.Context, key string) error { + logger := logging.FromContext(ctx) + + // Initialize the reconciler state. This will convert the namespace/name + // string into a distinct namespace and name, determine if this instance of + // the reconciler is the leader, and any additional interfaces implemented + // by the reconciler. Returns an error is the resource key is invalid. + s, err := newState(key, r) + if err != nil { + logger.Error("Invalid resource key: ", key) + return nil + } + + // If we are not the leader, and we don't implement either ReadOnly + // observer interfaces, then take a fast-path out. + if s.isNotLeaderNorObserver() { + return controller.NewSkipKey(key) + } + + // If configStore is set, attach the frozen configuration to the context. + if r.configStore != nil { + ctx = r.configStore.ToContext(ctx) + } + + // Add the recorder to context. + ctx = controller.WithEventRecorder(ctx, r.Recorder) + + // Get the resource with this namespace/name. + + getter := r.Lister.IntegrationSinks(s.namespace) + + original, err := getter.Get(s.name) + + if errors.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing and call + // the ObserveDeletion handler if appropriate. + logger.Debugf("Resource %q no longer exists", key) + if del, ok := r.reconciler.(reconciler.OnDeletionInterface); ok { + return del.ObserveDeletion(ctx, types.NamespacedName{ + Namespace: s.namespace, + Name: s.name, + }) + } + return nil + } else if err != nil { + return err + } + + // Don't modify the informers copy. + resource := original.DeepCopy() + + var reconcileEvent reconciler.Event + + name, do := s.reconcileMethodFor(resource) + // Append the target method to the logger. + logger = logger.With(zap.String("targetMethod", name)) + switch name { + case reconciler.DoReconcileKind: + // Set and update the finalizer on resource if r.reconciler + // implements Finalizer. + if resource, err = r.setFinalizerIfFinalizer(ctx, resource); err != nil { + return fmt.Errorf("failed to set finalizers: %w", err) + } + + if !r.skipStatusUpdates { + reconciler.PreProcessReconcile(ctx, resource) + } + + // Reconcile this copy of the resource and then write back any status + // updates regardless of whether the reconciliation errored out. + reconcileEvent = do(ctx, resource) + + if !r.skipStatusUpdates { + reconciler.PostProcessReconcile(ctx, resource, original) + } + + case reconciler.DoFinalizeKind: + // For finalizing reconcilers, if this resource being marked for deletion + // and reconciled cleanly (nil or normal event), remove the finalizer. + reconcileEvent = do(ctx, resource) + + if resource, err = r.clearFinalizer(ctx, resource, reconcileEvent); err != nil { + return fmt.Errorf("failed to clear finalizers: %w", err) + } + + case reconciler.DoObserveKind: + // Observe any changes to this resource, since we are not the leader. + reconcileEvent = do(ctx, resource) + + } + + // Synchronize the status. + switch { + case r.skipStatusUpdates: + // This reconciler implementation is configured to skip resource updates. + // This may mean this reconciler does not observe spec, but reconciles external changes. + case equality.Semantic.DeepEqual(original.Status, resource.Status): + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the injectionInformer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + case !s.isLeader: + // High-availability reconcilers may have many replicas watching the resource, but only + // the elected leader is expected to write modifications. + logger.Warn("Saw status changes when we aren't the leader!") + default: + if err = r.updateStatus(ctx, logger, original, resource); err != nil { + logger.Warnw("Failed to update resource status", zap.Error(err)) + r.Recorder.Eventf(resource, v1.EventTypeWarning, "UpdateFailed", + "Failed to update status for %q: %v", resource.Name, err) + return err + } + } + + // Report the reconciler event, if any. + if reconcileEvent != nil { + var event *reconciler.ReconcilerEvent + if reconciler.EventAs(reconcileEvent, &event) { + logger.Infow("Returned an event", zap.Any("event", reconcileEvent)) + r.Recorder.Event(resource, event.EventType, event.Reason, event.Error()) + + // the event was wrapped inside an error, consider the reconciliation as failed + if _, isEvent := reconcileEvent.(*reconciler.ReconcilerEvent); !isEvent { + return reconcileEvent + } + return nil + } + + if controller.IsSkipKey(reconcileEvent) { + // This is a wrapped error, don't emit an event. + } else if ok, _ := controller.IsRequeueKey(reconcileEvent); ok { + // This is a wrapped error, don't emit an event. + } else { + logger.Errorw("Returned an error", zap.Error(reconcileEvent)) + r.Recorder.Event(resource, v1.EventTypeWarning, "InternalError", reconcileEvent.Error()) + } + return reconcileEvent + } + + return nil +} + +func (r *reconcilerImpl) updateStatus(ctx context.Context, logger *zap.SugaredLogger, existing *v1alpha1.IntegrationSink, desired *v1alpha1.IntegrationSink) error { + existing = existing.DeepCopy() + return reconciler.RetryUpdateConflicts(func(attempts int) (err error) { + // The first iteration tries to use the injectionInformer's state, subsequent attempts fetch the latest state via API. + if attempts > 0 { + + getter := r.Client.SinksV1alpha1().IntegrationSinks(desired.Namespace) + + existing, err = getter.Get(ctx, desired.Name, metav1.GetOptions{}) + if err != nil { + return err + } + } + + // If there's nothing to update, just return. + if equality.Semantic.DeepEqual(existing.Status, desired.Status) { + return nil + } + + if logger.Desugar().Core().Enabled(zapcore.DebugLevel) { + if diff, err := kmp.SafeDiff(existing.Status, desired.Status); err == nil && diff != "" { + logger.Debug("Updating status with: ", diff) + } + } + + existing.Status = desired.Status + + updater := r.Client.SinksV1alpha1().IntegrationSinks(existing.Namespace) + + _, err = updater.UpdateStatus(ctx, existing, metav1.UpdateOptions{}) + return err + }) +} + +// updateFinalizersFiltered will update the Finalizers of the resource. +// TODO: this method could be generic and sync all finalizers. For now it only +// updates defaultFinalizerName or its override. +func (r *reconcilerImpl) updateFinalizersFiltered(ctx context.Context, resource *v1alpha1.IntegrationSink, desiredFinalizers sets.Set[string]) (*v1alpha1.IntegrationSink, error) { + // Don't modify the informers copy. + existing := resource.DeepCopy() + + var finalizers []string + + // If there's nothing to update, just return. + existingFinalizers := sets.New[string](existing.Finalizers...) + + if desiredFinalizers.Has(r.finalizerName) { + if existingFinalizers.Has(r.finalizerName) { + // Nothing to do. + return resource, nil + } + // Add the finalizer. + finalizers = append(existing.Finalizers, r.finalizerName) + } else { + if !existingFinalizers.Has(r.finalizerName) { + // Nothing to do. + return resource, nil + } + // Remove the finalizer. + existingFinalizers.Delete(r.finalizerName) + finalizers = sets.List(existingFinalizers) + } + + mergePatch := map[string]interface{}{ + "metadata": map[string]interface{}{ + "finalizers": finalizers, + "resourceVersion": existing.ResourceVersion, + }, + } + + patch, err := json.Marshal(mergePatch) + if err != nil { + return resource, err + } + + patcher := r.Client.SinksV1alpha1().IntegrationSinks(resource.Namespace) + + resourceName := resource.Name + updated, err := patcher.Patch(ctx, resourceName, types.MergePatchType, patch, metav1.PatchOptions{}) + if err != nil { + r.Recorder.Eventf(existing, v1.EventTypeWarning, "FinalizerUpdateFailed", + "Failed to update finalizers for %q: %v", resourceName, err) + } else { + r.Recorder.Eventf(updated, v1.EventTypeNormal, "FinalizerUpdate", + "Updated %q finalizers", resource.GetName()) + } + return updated, err +} + +func (r *reconcilerImpl) setFinalizerIfFinalizer(ctx context.Context, resource *v1alpha1.IntegrationSink) (*v1alpha1.IntegrationSink, error) { + if _, ok := r.reconciler.(Finalizer); !ok { + return resource, nil + } + + finalizers := sets.New[string](resource.Finalizers...) + + // If this resource is not being deleted, mark the finalizer. + if resource.GetDeletionTimestamp().IsZero() { + finalizers.Insert(r.finalizerName) + } + + // Synchronize the finalizers filtered by r.finalizerName. + return r.updateFinalizersFiltered(ctx, resource, finalizers) +} + +func (r *reconcilerImpl) clearFinalizer(ctx context.Context, resource *v1alpha1.IntegrationSink, reconcileEvent reconciler.Event) (*v1alpha1.IntegrationSink, error) { + if _, ok := r.reconciler.(Finalizer); !ok { + return resource, nil + } + if resource.GetDeletionTimestamp().IsZero() { + return resource, nil + } + + finalizers := sets.New[string](resource.Finalizers...) + + if reconcileEvent != nil { + var event *reconciler.ReconcilerEvent + if reconciler.EventAs(reconcileEvent, &event) { + if event.EventType == v1.EventTypeNormal { + finalizers.Delete(r.finalizerName) + } + } + } else { + finalizers.Delete(r.finalizerName) + } + + // Synchronize the finalizers filtered by r.finalizerName. + return r.updateFinalizersFiltered(ctx, resource, finalizers) +} diff --git a/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink/state.go b/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink/state.go new file mode 100644 index 00000000000..6cae877f558 --- /dev/null +++ b/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink/state.go @@ -0,0 +1,97 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package integrationsink + +import ( + fmt "fmt" + + types "k8s.io/apimachinery/pkg/types" + cache "k8s.io/client-go/tools/cache" + v1alpha1 "knative.dev/eventing/pkg/apis/sinks/v1alpha1" + reconciler "knative.dev/pkg/reconciler" +) + +// state is used to track the state of a reconciler in a single run. +type state struct { + // key is the original reconciliation key from the queue. + key string + // namespace is the namespace split from the reconciliation key. + namespace string + // name is the name split from the reconciliation key. + name string + // reconciler is the reconciler. + reconciler Interface + // roi is the read only interface cast of the reconciler. + roi ReadOnlyInterface + // isROI (Read Only Interface) the reconciler only observes reconciliation. + isROI bool + // isLeader the instance of the reconciler is the elected leader. + isLeader bool +} + +func newState(key string, r *reconcilerImpl) (*state, error) { + // Convert the namespace/name string into a distinct namespace and name. + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return nil, fmt.Errorf("invalid resource key: %s", key) + } + + roi, isROI := r.reconciler.(ReadOnlyInterface) + + isLeader := r.IsLeaderFor(types.NamespacedName{ + Namespace: namespace, + Name: name, + }) + + return &state{ + key: key, + namespace: namespace, + name: name, + reconciler: r.reconciler, + roi: roi, + isROI: isROI, + isLeader: isLeader, + }, nil +} + +// isNotLeaderNorObserver checks to see if this reconciler with the current +// state is enabled to do any work or not. +// isNotLeaderNorObserver returns true when there is no work possible for the +// reconciler. +func (s *state) isNotLeaderNorObserver() bool { + if !s.isLeader && !s.isROI { + // If we are not the leader, and we don't implement the ReadOnly + // interface, then take a fast-path out. + return true + } + return false +} + +func (s *state) reconcileMethodFor(o *v1alpha1.IntegrationSink) (string, doReconcile) { + if o.GetDeletionTimestamp().IsZero() { + if s.isLeader { + return reconciler.DoReconcileKind, s.reconciler.ReconcileKind + } else if s.isROI { + return reconciler.DoObserveKind, s.roi.ObserveKind + } + } else if fin, ok := s.reconciler.(Finalizer); s.isLeader && ok { + return reconciler.DoFinalizeKind, fin.FinalizeKind + } + return "unknown", nil +} diff --git a/pkg/client/listers/sinks/v1alpha1/expansion_generated.go b/pkg/client/listers/sinks/v1alpha1/expansion_generated.go index 48dbd032632..85542eb93d8 100644 --- a/pkg/client/listers/sinks/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/sinks/v1alpha1/expansion_generated.go @@ -18,6 +18,14 @@ limitations under the License. package v1alpha1 +// IntegrationSinkListerExpansion allows custom methods to be added to +// IntegrationSinkLister. +type IntegrationSinkListerExpansion interface{} + +// IntegrationSinkNamespaceListerExpansion allows custom methods to be added to +// IntegrationSinkNamespaceLister. +type IntegrationSinkNamespaceListerExpansion interface{} + // JobSinkListerExpansion allows custom methods to be added to // JobSinkLister. type JobSinkListerExpansion interface{} diff --git a/pkg/client/listers/sinks/v1alpha1/integrationsink.go b/pkg/client/listers/sinks/v1alpha1/integrationsink.go new file mode 100644 index 00000000000..95202b53f26 --- /dev/null +++ b/pkg/client/listers/sinks/v1alpha1/integrationsink.go @@ -0,0 +1,99 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" + v1alpha1 "knative.dev/eventing/pkg/apis/sinks/v1alpha1" +) + +// IntegrationSinkLister helps list IntegrationSinks. +// All objects returned here must be treated as read-only. +type IntegrationSinkLister interface { + // List lists all IntegrationSinks in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.IntegrationSink, err error) + // IntegrationSinks returns an object that can list and get IntegrationSinks. + IntegrationSinks(namespace string) IntegrationSinkNamespaceLister + IntegrationSinkListerExpansion +} + +// integrationSinkLister implements the IntegrationSinkLister interface. +type integrationSinkLister struct { + indexer cache.Indexer +} + +// NewIntegrationSinkLister returns a new IntegrationSinkLister. +func NewIntegrationSinkLister(indexer cache.Indexer) IntegrationSinkLister { + return &integrationSinkLister{indexer: indexer} +} + +// List lists all IntegrationSinks in the indexer. +func (s *integrationSinkLister) List(selector labels.Selector) (ret []*v1alpha1.IntegrationSink, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.IntegrationSink)) + }) + return ret, err +} + +// IntegrationSinks returns an object that can list and get IntegrationSinks. +func (s *integrationSinkLister) IntegrationSinks(namespace string) IntegrationSinkNamespaceLister { + return integrationSinkNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// IntegrationSinkNamespaceLister helps list and get IntegrationSinks. +// All objects returned here must be treated as read-only. +type IntegrationSinkNamespaceLister interface { + // List lists all IntegrationSinks in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.IntegrationSink, err error) + // Get retrieves the IntegrationSink from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.IntegrationSink, error) + IntegrationSinkNamespaceListerExpansion +} + +// integrationSinkNamespaceLister implements the IntegrationSinkNamespaceLister +// interface. +type integrationSinkNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all IntegrationSinks in the indexer for a given namespace. +func (s integrationSinkNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.IntegrationSink, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.IntegrationSink)) + }) + return ret, err +} + +// Get retrieves the IntegrationSink from the indexer for a given namespace and name. +func (s integrationSinkNamespaceLister) Get(name string) (*v1alpha1.IntegrationSink, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("integrationsink"), name) + } + return obj.(*v1alpha1.IntegrationSink), nil +} diff --git a/pkg/eventingtls/eventingtls.go b/pkg/eventingtls/eventingtls.go index 718a744c197..eedea290d51 100644 --- a/pkg/eventingtls/eventingtls.go +++ b/pkg/eventingtls/eventingtls.go @@ -57,6 +57,9 @@ const ( BrokerFilterServerTLSSecretName = "mt-broker-filter-server-tls" //nolint:gosec // This is not a hardcoded credential // BrokerIngressServerTLSSecretName is the name of the tls secret for the broker ingress server BrokerIngressServerTLSSecretName = "mt-broker-ingress-server-tls" //nolint:gosec // This is not a hardcoded credential + + // IntegrationSinkDispatcherServerTLSSecretName is the name of the tls secret for the integration sink dispatcher server + IntegrationSinkDispatcherServerTLSSecretName = "integration-sink-server-tls" //nolint:gosec // This is not a hardcoded credential ) type ClientConfig struct { diff --git a/pkg/reconciler/integration/sink/controller.go b/pkg/reconciler/integration/sink/controller.go new file mode 100644 index 00000000000..a50e2e8c9ac --- /dev/null +++ b/pkg/reconciler/integration/sink/controller.go @@ -0,0 +1,106 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sink + +import ( + "context" + + "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/apis/feature" + v1alpha1 "knative.dev/eventing/pkg/apis/sinks/v1alpha1" + "knative.dev/eventing/pkg/auth" + "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" + "knative.dev/eventing/pkg/client/injection/informers/sinks/v1alpha1/integrationsink" + deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" + "knative.dev/pkg/client/injection/kube/informers/core/v1/service" + + secretinformer "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret" + + integrationsinkreconciler "knative.dev/eventing/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink" + "knative.dev/eventing/pkg/eventingtls" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" + "knative.dev/pkg/system" +) + +// NewController creates a Reconciler for IntegrationSource and returns the result of NewImpl. +func NewController( + ctx context.Context, + cmw configmap.Watcher, +) *controller.Impl { + integrationSinkInformer := integrationsink.Get(ctx) + secretInformer := secretinformer.Get(ctx) + eventPolicyInformer := eventpolicy.Get(ctx) + deploymentInformer := deploymentinformer.Get(ctx) + + serviceInformer := service.Get(ctx) + + r := &Reconciler{ + kubeClientSet: kubeclient.Get(ctx), + + deploymentLister: deploymentInformer.Lister(), + serviceLister: serviceInformer.Lister(), + + systemNamespace: system.Namespace(), + secretLister: secretInformer.Lister(), + eventPolicyLister: eventPolicyInformer.Lister(), + } + + var globalResync func(obj interface{}) + + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if globalResync != nil { + globalResync(nil) + } + }) + featureStore.WatchConfigs(cmw) + + impl := integrationsinkreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { + return controller.Options{ + ConfigStore: featureStore, + } + }) + + integrationSinkInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + + globalResync = func(interface{}) { + impl.GlobalResync(integrationSinkInformer.Informer()) + } + secretInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterWithName(eventingtls.IntegrationSinkDispatcherServerTLSSecretName), + Handler: controller.HandleAll(globalResync), + }) + + deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterControllerGVK(v1alpha1.SchemeGroupVersion.WithKind("IntegrationSink")), + Handler: controller.HandleAll(impl.EnqueueControllerOf), + }) + + integrationSinkGK := v1alpha1.SchemeGroupVersion.WithKind("IntegrationSink").GroupKind() + + // Enqueue the JobSink, if we have an EventPolicy which was referencing + // or got updated and now is referencing the JobSink. + eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler( + integrationSinkInformer.Informer().GetIndexer(), + integrationSinkGK, + impl.EnqueueKey, + )) + + return impl +} diff --git a/pkg/reconciler/integration/sink/integrationsink.go b/pkg/reconciler/integration/sink/integrationsink.go new file mode 100644 index 00000000000..fcce9483eb3 --- /dev/null +++ b/pkg/reconciler/integration/sink/integrationsink.go @@ -0,0 +1,257 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sink + +import ( + "context" + "fmt" + + v1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/equality" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + appsv1listers "k8s.io/client-go/listers/apps/v1" + corev1listers "k8s.io/client-go/listers/core/v1" + "knative.dev/eventing/pkg/reconciler/integration/sink/resources" + "knative.dev/pkg/apis" + "knative.dev/pkg/network" + + "knative.dev/eventing/pkg/apis/feature" + + "k8s.io/utils/ptr" + sinks "knative.dev/eventing/pkg/apis/sinks/v1alpha1" + "knative.dev/eventing/pkg/auth" + eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" + "knative.dev/eventing/pkg/eventingtls" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" + "knative.dev/pkg/reconciler" +) + +const ( + // Name of the corev1.Events emitted from the reconciliation process + sinkReconciled = "IntegrationSinkReconciled" + deploymentCreated = "DeploymentCreated" + deploymentUpdated = "DeploymentUpdated" + serviceCreated = "ServiceCreated" + serviceUpdated = "ServiceUpdated" +) + +type Reconciler struct { + secretLister corev1listers.SecretLister + eventPolicyLister eventingv1alpha1listers.EventPolicyLister + + kubeClientSet kubernetes.Interface + + deploymentLister appsv1listers.DeploymentLister + serviceLister corev1listers.ServiceLister + + systemNamespace string +} + +// newReconciledNormal makes a new reconciler event with event type Normal, and +// reason IntegrationSink. +func newReconciledNormal(namespace, name string) reconciler.Event { + return reconciler.NewEvent(corev1.EventTypeNormal, sinkReconciled, "IntegrationSink reconciled: \"%s/%s\"", namespace, name) +} + +func (r *Reconciler) ReconcileKind(ctx context.Context, sink *sinks.IntegrationSink) reconciler.Event { + featureFlags := feature.FromContext(ctx) + + _, err := r.reconcileDeployment(ctx, sink) + if err != nil { + logging.FromContext(ctx).Errorw("Error reconciling Pod", zap.Error(err)) + return err + } + + _, err = r.reconcileService(ctx, sink) + if err != nil { + logging.FromContext(ctx).Errorw("Error reconciling Service", zap.Error(err)) + return err + } + + if err := r.reconcileAddress(ctx, sink); err != nil { + return fmt.Errorf("failed to reconcile address: %w", err) + } + + err = auth.UpdateStatusWithEventPolicies(featureFlags, &sink.Status.AppliedEventPoliciesStatus, &sink.Status, r.eventPolicyLister, sinks.SchemeGroupVersion.WithKind("IntegrationSink"), sink.ObjectMeta) + if err != nil { + return fmt.Errorf("could not update IntegrationSink status with EventPolicies: %v", err) + } + + return newReconciledNormal(sink.Namespace, sink.Name) +} + +func (r *Reconciler) reconcileDeployment(ctx context.Context, sink *sinks.IntegrationSink) (*v1.Deployment, error) { + + expected := resources.MakeDeploymentSpec(sink) + deployment, err := r.deploymentLister.Deployments(sink.Namespace).Get(expected.Name) + if apierrors.IsNotFound(err) { + deployment, err = r.kubeClientSet.AppsV1().Deployments(sink.Namespace).Create(ctx, expected, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("creating new Deployment: %v", err) + } + controller.GetEventRecorder(ctx).Eventf(sink, corev1.EventTypeNormal, deploymentCreated, "Deployment created %q", deployment.Name) + } else if err != nil { + return nil, fmt.Errorf("getting Deployment: %v", err) + } else if !metav1.IsControlledBy(deployment, sink) { + return nil, fmt.Errorf("Deployment %q is not owned by IntegrationSink %q", deployment.Name, sink.Name) + } else if r.podSpecChanged(deployment.Spec.Template.Spec, expected.Spec.Template.Spec) { + + deployment.Spec.Template.Spec = expected.Spec.Template.Spec + deployment, err = r.kubeClientSet.AppsV1().Deployments(sink.Namespace).Update(ctx, deployment, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("updating Deployment: %v", err) + } + controller.GetEventRecorder(ctx).Eventf(sink, corev1.EventTypeNormal, deploymentUpdated, "Deployment %q updated", deployment.Name) + } else { + logging.FromContext(ctx).Debugw("Reusing existing Deployment", zap.Any("Deployment", deployment)) + } + + sink.Status.PropagateDeploymentStatus(&deployment.Status) + return deployment, nil +} + +func (r *Reconciler) reconcileService(ctx context.Context, sink *sinks.IntegrationSink) (*corev1.Service, error) { + expected := resources.MakeService(sink) + + svc, err := r.serviceLister.Services(sink.Namespace).Get(expected.Name) + if apierrors.IsNotFound(err) { + svc, err := r.kubeClientSet.CoreV1().Services(sink.Namespace).Create(ctx, expected, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("creating new Service: %v", err) + } + controller.GetEventRecorder(ctx).Eventf(sink, corev1.EventTypeNormal, serviceCreated, "Service created %q", svc.Name) + } else if err != nil { + return nil, fmt.Errorf("getting Service : %v", err) + } else if !metav1.IsControlledBy(svc, sink) { + return nil, fmt.Errorf("Service %q is not owned by IntegrationSink %q", svc.Name, sink.Name) + } else { + logging.FromContext(ctx).Debugw("Reusing existing Service", zap.Any("Service", svc)) + } + + return svc, nil +} + +func (r *Reconciler) reconcileAddress(ctx context.Context, sink *sinks.IntegrationSink) error { + + featureFlags := feature.FromContext(ctx) + if featureFlags.IsPermissiveTransportEncryption() { + caCerts, err := r.getCaCerts() + if err != nil { + return err + } + + httpAddress := r.httpAddress(sink) + httpsAddress := r.httpsAddress(caCerts, sink) + // Permissive mode: + // - status.address http address with host-based routing + // - status.addresses: + // - https address with path-based routing + // - http address with host-based routing + sink.Status.Addresses = []duckv1.Addressable{httpsAddress, httpAddress} + sink.Status.Address = &httpAddress + } else if featureFlags.IsStrictTransportEncryption() { + // Strict mode: (only https addresses) + // - status.address https address with path-based routing + // - status.addresses: + // - https address with path-based routing + caCerts, err := r.getCaCerts() + if err != nil { + return err + } + + httpsAddress := r.httpsAddress(caCerts, sink) + sink.Status.Addresses = []duckv1.Addressable{httpsAddress} + sink.Status.Address = &httpsAddress + } else { + httpAddress := r.httpAddress(sink) + sink.Status.Address = &httpAddress + } + + if featureFlags.IsOIDCAuthentication() { + audience := auth.GetAudience(sinks.SchemeGroupVersion.WithKind("IntegrationSink"), sink.ObjectMeta) + + logging.FromContext(ctx).Debugw("Setting the audience", zap.String("audience", audience)) + sink.Status.Address.Audience = &audience + for i := range sink.Status.Addresses { + sink.Status.Addresses[i].Audience = &audience + } + } else { + logging.FromContext(ctx).Debug("Clearing the imc audience as OIDC is not enabled") + sink.Status.Address.Audience = nil + for i := range sink.Status.Addresses { + sink.Status.Addresses[i].Audience = nil + } + } + + sink.GetConditionSet().Manage(sink.GetStatus()).MarkTrue(sinks.IntegrationSinkConditionAddressable) + + return nil +} + +func (r *Reconciler) getCaCerts() (*string, error) { + // Getting the secret called "imc-dispatcher-tls" from system namespace + secret, err := r.secretLister.Secrets(r.systemNamespace).Get(eventingtls.IntegrationSinkDispatcherServerTLSSecretName) + if err != nil { + return nil, fmt.Errorf("failed to get CA certs from %s/%s: %w", r.systemNamespace, eventingtls.IntegrationSinkDispatcherServerTLSSecretName, err) + } + caCerts, ok := secret.Data[eventingtls.SecretCACert] + if !ok { + return nil, nil + } + return ptr.To(string(caCerts)), nil +} + +func (r *Reconciler) httpAddress(sink *sinks.IntegrationSink) duckv1.Addressable { + // http address uses host-based routing + httpAddress := duckv1.Addressable{ + Name: ptr.To("http"), + URL: &apis.URL{ + Scheme: "http", + Host: network.GetServiceHostname(sink.GetName()+"-deployment", sink.GetNamespace()), + }, + } + return httpAddress +} + +func (r *Reconciler) httpsAddress(certs *string, sink *sinks.IntegrationSink) duckv1.Addressable { + addr := r.httpAddress(sink) + addr.URL.Scheme = "https" + addr.CACerts = certs + return addr +} + +func (r *Reconciler) podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool { + if !equality.Semantic.DeepDerivative(newPodSpec, oldPodSpec) { + return true + } + if len(oldPodSpec.Containers) != len(newPodSpec.Containers) { + return true + } + for i := range newPodSpec.Containers { + if !equality.Semantic.DeepEqual(newPodSpec.Containers[i].Env, oldPodSpec.Containers[i].Env) { + return true + } + } + return false +} diff --git a/pkg/reconciler/integration/sink/integrationsink_test.go b/pkg/reconciler/integration/sink/integrationsink_test.go new file mode 100644 index 00000000000..501332e1161 --- /dev/null +++ b/pkg/reconciler/integration/sink/integrationsink_test.go @@ -0,0 +1,322 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sink + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/network" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clientgotesting "k8s.io/client-go/testing" + sinksv1alpha1 "knative.dev/eventing/pkg/apis/sinks/v1alpha1" + fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" + "knative.dev/eventing/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink" + "knative.dev/eventing/pkg/reconciler/integration/sink/resources" + fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake" + "knative.dev/pkg/kmeta" + "knative.dev/pkg/logging" + + "context" + + . "knative.dev/eventing/pkg/reconciler/testing/v1" + . "knative.dev/eventing/pkg/reconciler/testing/v1alpha1" + + "knative.dev/pkg/client/injection/ducks/duck/v1/addressable" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + logtesting "knative.dev/pkg/logging/testing" + . "knative.dev/pkg/reconciler/testing" + + "testing" +) + +const ( + // testNamespace is the namespace used for testing. + sinkName = "test-integration-sink" + sinkUID = "1234-5678-90" + testNS = "test-namespace" +) + +var ( + conditionTrue = corev1.ConditionTrue + deploymentName = fmt.Sprintf("%s-deployment", sinkName) + + sinkAddressable = duckv1.Addressable{ + Name: ptr.To("http"), + URL: &apis.URL{ + Scheme: "http", + Host: network.GetServiceHostname(deploymentName, testNS), + }, + } +) + +func TestReconcile(t *testing.T) { + + table := TableTest{ + { + Name: "bad work queue key", + Key: "too/many/parts", + }, + { + Name: "key not found", + // Make sure Reconcile handles good keys that don't exist. + Key: "foo/not-found", + }, { + Name: "error creating deployment", + Objects: []runtime.Object{ + NewIntegrationSink(sinkName, testNS, + WithIntegrationSinkUID(sinkUID), + WithIntegrationSinkSpec(makeIntegrationSinkSpec()), + ), + }, + Key: testNS + "/" + sinkName, + WithReactors: []clientgotesting.ReactionFunc{ + InduceFailure("create", "deployments"), + }, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, "InternalError", "creating new Deployment: inducing failure for %s %s", "create", "deployments"), + }, + WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewIntegrationSink(sinkName, testNS, + WithIntegrationSinkUID(sinkUID), + WithIntegrationSinkSpec(makeIntegrationSinkSpec()), + WithInitIntegrationSinkConditions, + ), + }}, + WantCreates: []runtime.Object{ + makeDeployment(NewIntegrationSink(sinkName, testNS, + WithIntegrationSinkUID(sinkUID), + WithIntegrationSinkSpec(makeIntegrationSinkSpec())), + nil), + }, + }, { + Name: "successfully reconciled and ready", + Objects: []runtime.Object{ + NewIntegrationSink(sinkName, testNS, + WithIntegrationSinkUID(sinkUID), + WithIntegrationSinkSpec(makeIntegrationSinkSpec()), + ), + makeDeployment(NewIntegrationSink(sinkName, testNS, + WithIntegrationSinkUID(sinkUID), + WithIntegrationSinkSpec(makeIntegrationSinkSpec())), + &conditionTrue), + makeService(deploymentName, testNS), + }, + Key: testNS + "/" + sinkName, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, sinkReconciled, `IntegrationSink reconciled: "%s/%s"`, testNS, sinkName), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewIntegrationSink(sinkName, testNS, + WithIntegrationSinkUID(sinkUID), + WithIntegrationSinkAddressableReady(), + WithIntegrationSinkAddress(&sinkAddressable), + WithIntegrationSinkSpec(makeIntegrationSinkSpec()), + WithIntegrationSinkEventPoliciesReadyBecauseOIDCDisabled(), + WithInitIntegrationSinkConditions, + WithIntegrationSinkPropagateDeploymenteStatus(makeDeploymentStatus(&conditionTrue)), + ), + }}, + }} + + logger := logtesting.TestLogger(t) + table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { + ctx = addressable.WithDuck(ctx) + r := &Reconciler{ + kubeClientSet: fakekubeclient.Get(ctx), + deploymentLister: listers.GetDeploymentLister(), + serviceLister: listers.GetServiceLister(), + secretLister: listers.GetSecretLister(), + eventPolicyLister: listers.GetEventPolicyLister(), + systemNamespace: testNS, + } + + return integrationsink.NewReconciler(ctx, logging.FromContext(ctx), fakeeventingclient.Get(ctx), listers.GetIntegrationSinkLister(), controller.GetEventRecorder(ctx), r) + }, + true, + logger, + )) +} + +func makeDeployment(sink *sinksv1alpha1.IntegrationSink, ready *corev1.ConditionStatus) runtime.Object { + + status := appsv1.DeploymentStatus{} + if ready != nil { + status.Conditions = []appsv1.DeploymentCondition{ + { + Type: appsv1.DeploymentAvailable, + Status: *ready, + }, + } + if *ready == corev1.ConditionTrue { + status.ReadyReplicas = 1 + } + } + + d := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: deploymentName, + Namespace: sink.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(sink), + }, + Labels: resources.Labels(sink.Name), + }, + Status: status, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: resources.Labels(sink.Name), + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: resources.Labels(sink.Name), + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "sink", + Image: "gcr.io/knative-nightly/log-sink:latest", + ImagePullPolicy: corev1.PullIfNotPresent, + Ports: []corev1.ContainerPort{{ + ContainerPort: 8080, + Protocol: corev1.ProtocolTCP, + Name: "http", + }}, + Env: []corev1.EnvVar{ + { + Name: "CAMEL_KAMELET_LOG_SINK_LEVEL", + Value: "info", + }, + { + Name: "CAMEL_KAMELET_LOG_SINK_LOGMASK", + Value: "false", + }, + { + Name: "CAMEL_KAMELET_LOG_SINK_MULTILINE", + Value: "false", + }, + { + Name: "CAMEL_KAMELET_LOG_SINK_SHOWALLPROPERTIES", + Value: "false", + }, + { + Name: "CAMEL_KAMELET_LOG_SINK_SHOWBODY", + Value: "true", + }, + { + Name: "CAMEL_KAMELET_LOG_SINK_SHOWBODYTYPE", + Value: "true", + }, + { + Name: "CAMEL_KAMELET_LOG_SINK_SHOWEXCHANGEPATTERN", + Value: "false", + }, + { + Name: "CAMEL_KAMELET_LOG_SINK_SHOWHEADERS", + Value: "true", + }, + { + Name: "CAMEL_KAMELET_LOG_SINK_SHOWPROPERTIES", + Value: "false", + }, + { + Name: "CAMEL_KAMELET_LOG_SINK_SHOWSTREAMS", + Value: "false", + }, + { + Name: "CAMEL_KAMELET_LOG_SINK_SHOWCACHEDSTREAMS", + Value: "false", + }, + }, + }, + }, + }, + }, + }, + } + return d +} + +func makeService(name, namespace string) *corev1.Service { + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: resources.Labels(sinkName), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "sinks.knative.dev/v1alpha1", + Kind: "IntegrationSink", + Name: sinkName, + UID: sinkUID, + Controller: ptr.To(true), + BlockOwnerDeletion: ptr.To(true), + }, + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "http", + Protocol: corev1.ProtocolTCP, + Port: 80, + TargetPort: intstr.IntOrString{IntVal: 8080}, + }, + }, + Selector: resources.Labels(sinkName), + }, + } +} + +func makeIntegrationSinkSpec() sinksv1alpha1.IntegrationSinkSpec { + return sinksv1alpha1.IntegrationSinkSpec{ + Log: &sinksv1alpha1.Log{ + Level: "info", + ShowHeaders: true, + ShowBody: true, + ShowBodyType: true, + }, + } +} + +func makeDeploymentStatus(ready *corev1.ConditionStatus) *appsv1.DeploymentStatus { + return &appsv1.DeploymentStatus{ + Conditions: []appsv1.DeploymentCondition{{ + Type: appsv1.DeploymentAvailable, + Status: *ready, + }}, + Replicas: 1, + } +} diff --git a/pkg/reconciler/integration/sink/resources/container_image.go b/pkg/reconciler/integration/sink/resources/container_image.go new file mode 100644 index 00000000000..1d6928429b8 --- /dev/null +++ b/pkg/reconciler/integration/sink/resources/container_image.go @@ -0,0 +1,162 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "knative.dev/eventing/pkg/apis/sinks/v1alpha1" + "knative.dev/eventing/pkg/reconciler/integration" + "knative.dev/pkg/kmeta" +) + +func MakeDeploymentSpec(sink *v1alpha1.IntegrationSink) *appsv1.Deployment { + + deploy := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: DeploymentName(sink), + Namespace: sink.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(sink), + }, + Labels: Labels(sink.Name), + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: Labels(sink.Name), + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: Labels(sink.Name), + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "sink", + Image: selectImage(sink), + ImagePullPolicy: corev1.PullIfNotPresent, + Ports: []corev1.ContainerPort{{ + ContainerPort: 8080, + Protocol: corev1.ProtocolTCP, + Name: "http", + }}, + Env: makeEnv(sink), + }, + }, + }, + }, + }, + } + + return deploy +} + +func MakeService(sink *v1alpha1.IntegrationSink) *corev1.Service { + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: DeploymentName(sink), + Namespace: sink.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(sink), + }, + Labels: Labels(sink.Name), + }, + Spec: corev1.ServiceSpec{ + Selector: Labels(sink.Name), + Ports: []corev1.ServicePort{ + { + Name: "http", + Protocol: corev1.ProtocolTCP, + Port: 80, + TargetPort: intstr.IntOrString{IntVal: 8080}, + }, + }, + }, + } +} + +func DeploymentName(sink *v1alpha1.IntegrationSink) string { + return kmeta.ChildName(sink.Name, "-deployment") +} + +func makeEnv(sink *v1alpha1.IntegrationSink) []corev1.EnvVar { + var envVars []corev1.EnvVar + + // Log environment variables + if sink.Spec.Log != nil { + envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_LOG_SINK", *sink.Spec.Log)...) + return envVars + } + + // Handle secret name only if AWS is configured + var secretName string + if sink.Spec.Aws != nil && sink.Spec.Aws.Auth != nil && sink.Spec.Aws.Auth.Secret != nil && sink.Spec.Aws.Auth.Secret.Ref != nil { + secretName = sink.Spec.Aws.Auth.Secret.Ref.Name + } + + // AWS S3 environment variables + if sink.Spec.Aws != nil && sink.Spec.Aws.S3 != nil { + envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_S3_SINK", *sink.Spec.Aws.S3)...) + if secretName != "" { + envVars = append(envVars, []corev1.EnvVar{ + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SINK_ACCESSKEY", "aws.accessKey", secretName), + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SINK_SECRETKEY", "aws.secretKey", secretName), + }...) + } + return envVars + } + + // AWS SQS environment variables + if sink.Spec.Aws != nil && sink.Spec.Aws.SQS != nil { + envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_SQS_SINK", *sink.Spec.Aws.SQS)...) + if secretName != "" { + envVars = append(envVars, []corev1.EnvVar{ + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SINK_ACCESSKEY", "aws.accessKey", secretName), + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SINK_SECRETKEY", "aws.secretKey", secretName), + }...) + } + return envVars + } + + // If no valid configuration is found, return empty envVars + return envVars +} + +func selectImage(sink *v1alpha1.IntegrationSink) string { + if sink.Spec.Log != nil { + return "gcr.io/knative-nightly/log-sink:latest" + } + if sink.Spec.Aws != nil { + if sink.Spec.Aws.S3 != nil { + return "gcr.io/knative-nightly/aws-s3-source:latest" + } + if sink.Spec.Aws.SQS != nil { + return "gcr.io/knative-nightly/aws-sqs-source:latest" + } + } + return "" +} diff --git a/pkg/reconciler/integration/sink/resources/labels.go b/pkg/reconciler/integration/sink/resources/labels.go new file mode 100644 index 00000000000..7a06aadb81c --- /dev/null +++ b/pkg/reconciler/integration/sink/resources/labels.go @@ -0,0 +1,24 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +func Labels(name string) map[string]string { + return map[string]string{ + + "app.kubernetes.io/name": name, + } +} diff --git a/pkg/reconciler/integration/source/integrationsource_test.go b/pkg/reconciler/integration/source/integrationsource_test.go index d500c0cccbb..eee019f041d 100644 --- a/pkg/reconciler/integration/source/integrationsource_test.go +++ b/pkg/reconciler/integration/source/integrationsource_test.go @@ -79,7 +79,8 @@ func TestReconcile(t *testing.T) { Name: "key not found", // Make sure Reconcile handles good keys that don't exist. Key: "foo/not-found", - }, { + }, + { Name: "error creating containersource", Objects: []runtime.Object{ NewIntegrationSource(sourceName, testNS, @@ -160,12 +161,12 @@ func TestReconcile(t *testing.T) { ), }}, }} - logger := logtesting.TestLogger(t) table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { ctx = addressable.WithDuck(ctx) r := &Reconciler{ + kubeClientSet: fakekubeclient.Get(ctx), eventingClientSet: fakeeventingclient.Get(ctx), containerSourceLister: listers.GetContainerSourceLister(), diff --git a/pkg/reconciler/testing/v1/listers.go b/pkg/reconciler/testing/v1/listers.go index a6d0733136a..3ba7f5c833b 100644 --- a/pkg/reconciler/testing/v1/listers.go +++ b/pkg/reconciler/testing/v1/listers.go @@ -124,6 +124,10 @@ func (l *Listers) GetEventPolicyLister() eventingv1alpha1listers.EventPolicyList return eventingv1alpha1listers.NewEventPolicyLister(l.indexerFor(&eventingv1alpha1.EventPolicy{})) } +func (l *Listers) GetIntegrationSinkLister() sinkslisters.IntegrationSinkLister { + return sinkslisters.NewIntegrationSinkLister(l.indexerFor(&sinksv1alpha1.IntegrationSink{})) +} + func (l *Listers) GetJobSinkLister() sinkslisters.JobSinkLister { return sinkslisters.NewJobSinkLister(l.indexerFor(&sinksv1alpha1.JobSink{})) } diff --git a/pkg/reconciler/testing/v1alpha1/integrationsink.go b/pkg/reconciler/testing/v1alpha1/integrationsink.go new file mode 100644 index 00000000000..eab21bd5132 --- /dev/null +++ b/pkg/reconciler/testing/v1alpha1/integrationsink.go @@ -0,0 +1,88 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + + "knative.dev/eventing/pkg/apis/feature" + duckv1 "knative.dev/pkg/apis/duck/v1" + + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "knative.dev/eventing/pkg/apis/sinks/v1alpha1" +) + +// IntegrationSinkOption enables further configuration of a IntegrationSink. +type IntegrationSinkOption func(source *v1alpha1.IntegrationSink) + +// NewIntegrationSink creates a v1 IntegrationSink with IntegrationSinkOptions +func NewIntegrationSink(name, namespace string, o ...IntegrationSinkOption) *v1alpha1.IntegrationSink { + s := &v1alpha1.IntegrationSink{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + for _, opt := range o { + opt(s) + } + s.SetDefaults(context.Background()) + return s +} + +func WithIntegrationSinkUID(uid types.UID) IntegrationSinkOption { + return func(s *v1alpha1.IntegrationSink) { + s.UID = uid + } +} + +// WithInitIntegrationSinkConditions initializes the IntegrationSink's conditions. +func WithInitIntegrationSinkConditions(s *v1alpha1.IntegrationSink) { + s.Status.InitializeConditions() +} + +func WithIntegrationSinkPropagateDeploymenteStatus(status *appsv1.DeploymentStatus) IntegrationSinkOption { + return func(s *v1alpha1.IntegrationSink) { + s.Status.PropagateDeploymentStatus(status) + } +} + +func WithIntegrationSinkAddressableReady() IntegrationSinkOption { + return func(s *v1alpha1.IntegrationSink) { + s.Status.MarkAddressableReady() + } +} + +func WithIntegrationSinkAddress(addr *duckv1.Addressable) IntegrationSinkOption { + return func(s *v1alpha1.IntegrationSink) { + s.Status.SetAddress(addr) + } +} + +func WithIntegrationSinkSpec(spec v1alpha1.IntegrationSinkSpec) IntegrationSinkOption { + return func(s *v1alpha1.IntegrationSink) { + s.Spec = spec + } +} + +func WithIntegrationSinkEventPoliciesReadyBecauseOIDCDisabled() IntegrationSinkOption { + return func(s *v1alpha1.IntegrationSink) { + s.Status.MarkEventPoliciesTrueWithReason("OIDCDisabled", "Feature %q must be enabled to support Authorization", feature.OIDCAuthentication) + } +} diff --git a/test/rekt/features/integrationsink/features.go b/test/rekt/features/integrationsink/features.go new file mode 100644 index 00000000000..b80628412d6 --- /dev/null +++ b/test/rekt/features/integrationsink/features.go @@ -0,0 +1,97 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integrationsink + +import ( + "time" + + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" + "knative.dev/eventing/test/rekt/features/featureflags" + "knative.dev/eventing/test/rekt/resources/addressable" + "knative.dev/eventing/test/rekt/resources/integrationsink" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" +) + +func Success() *feature.Feature { + f := feature.NewFeature() + + // sink := feature.MakeRandomK8sName("sink") + integrationSink := feature.MakeRandomK8sName("integrationsink") + source := feature.MakeRandomK8sName("source") + + event := cetest.FullEvent() + event.SetID(uuid.NewString()) + + f.Setup("install integration sink", integrationsink.Install(integrationSink)) + + f.Setup("integrationsink is addressable", integrationsink.IsAddressable(integrationSink)) + f.Setup("integrationsink is ready", integrationsink.IsReady(integrationSink)) + + f.Requirement("install source for ksink", eventshub.Install(source, + eventshub.StartSenderToResource(integrationsink.GVR(), integrationSink), + eventshub.InputEvent(cetest.FullEvent()), + eventshub.AddSequence, + eventshub.SendMultipleEvents(2, time.Millisecond))) + + f.Assert("Source sent the event", assert.OnStore(source). + Match(assert.MatchKind(eventshub.EventResponse)). + Match(assert.MatchStatusCode(204)). + AtLeast(1), + ) + + return f +} + +func SuccessTLS() *feature.Feature { + f := feature.NewFeature() + + // sink := feature.MakeRandomK8sName("sink") + integrationSink := feature.MakeRandomK8sName("integrationsink") + source := feature.MakeRandomK8sName("source") + + //sinkURL := &apis.URL{Scheme: "http", Host: sink} + + event := cetest.FullEvent() + event.SetID(uuid.NewString()) + + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + f.Setup("install integration sink", integrationsink.Install(integrationSink)) //, integrationsink.WithForwarderJob(sinkURL.String()))) + + f.Setup("integrationsink is addressable", integrationsink.IsAddressable(integrationSink)) + f.Setup("integrationsink is ready", integrationsink.IsReady(integrationSink)) + + f.Requirement("install source for ksink", eventshub.Install(source, + eventshub.StartSenderToResource(integrationsink.GVR(), integrationSink), + eventshub.InputEvent(cetest.FullEvent()), + eventshub.AddSequence, + eventshub.SendMultipleEvents(2, time.Millisecond))) + + f.Assert("IntegrationSink has https address", addressable.ValidateAddress(integrationsink.GVR(), integrationSink, addressable.AssertHTTPSAddress)) + + f.Assert("Source sent the event", assert.OnStore(source). + Match(assert.MatchKind(eventshub.EventResponse)). + Match(assert.MatchStatusCode(204)). + AtLeast(1), + ) + + return f +} diff --git a/test/rekt/integration_sink_test.go b/test/rekt/integration_sink_test.go new file mode 100644 index 00000000000..a05b38a1995 --- /dev/null +++ b/test/rekt/integration_sink_test.go @@ -0,0 +1,44 @@ +//go:build e2e +// +build e2e + +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rekt + +import ( + "testing" + + "knative.dev/eventing/test/rekt/features/integrationsink" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" +) + +func TestIntegrationSinkSuccess(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.Test(ctx, t, integrationsink.Success()) +} diff --git a/test/rekt/resources/integrationsink/integrationsink.go b/test/rekt/resources/integrationsink/integrationsink.go new file mode 100644 index 00000000000..6ed054918f7 --- /dev/null +++ b/test/rekt/resources/integrationsink/integrationsink.go @@ -0,0 +1,68 @@ +package integrationsink + +import ( + "context" + "embed" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" + "knative.dev/reconciler-test/pkg/manifest" +) + +//go:embed integrationsink.yaml +var yamlEmbed embed.FS + +func GVR() schema.GroupVersionResource { + return schema.GroupVersionResource{Group: "sinks.knative.dev", Version: "v1alpha1", Resource: "integrationsinks"} +} + +// WithAnnotations adds annotations to the IntegrationSink. +func WithAnnotations(annotations map[string]interface{}) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if annotations != nil { + cfg["annotations"] = annotations + } + } +} + +// Install will create a resource, augmented with the config fn options. +func Install(name string, opts ...manifest.CfgFn) feature.StepFn { + + return func(ctx context.Context, t feature.T) { + cfg := map[string]interface{}{ + "name": name, + "namespace": environment.FromContext(ctx).Namespace(), + "image": eventshub.ImageFromContext(ctx), + eventshub.ConfigLoggingEnv: knative.LoggingConfigFromContext(ctx), + eventshub.ConfigTracingEnv: knative.TracingConfigFromContext(ctx), + } + for _, fn := range opts { + fn(cfg) + } + + if _, err := manifest.InstallYamlFS(ctx, yamlEmbed, cfg); err != nil { + t.Fatal(err) + } + } +} + +// IsReady tests to see if a IntegrationSink becomes ready within the time given. +func IsReady(name string, timing ...time.Duration) feature.StepFn { + return k8s.IsReady(GVR(), name, timing...) +} + +// IsNotReady tests to see if a IntegrationSink becomes NotReady within the time given. +func IsNotReady(name string, timing ...time.Duration) feature.StepFn { + return k8s.IsNotReady(GVR(), name, timing...) +} + +// IsAddressable tests to see if a IntegrationSink becomes addressable within the time +// given. +func IsAddressable(name string, timings ...time.Duration) feature.StepFn { + return k8s.IsAddressable(GVR(), name, timings...) +} diff --git a/test/rekt/resources/integrationsink/integrationsink.yaml b/test/rekt/resources/integrationsink/integrationsink.yaml new file mode 100644 index 00000000000..b19cfe7eaab --- /dev/null +++ b/test/rekt/resources/integrationsink/integrationsink.yaml @@ -0,0 +1,27 @@ +# Copyright 2024 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sinks.knative.dev/v1alpha1 +kind: IntegrationSink +metadata: + name: {{ .name }} + namespace: {{ .namespace }} + {{ if .annotations }} + {{ range $key, $value := .annotations }} + {{ $key }}: {{ $value }} + {{ end }} + {{ end }} +spec: + log: + showHeaders: true