Skip to content

Commit

Permalink
Enable status subresource eventing CRDs (#699)
Browse files Browse the repository at this point in the history
* Fixes #697 enable status subresource

I've enabled it on all CRDs so that the metadata.generation will start to be
incremented by the Kubernetes API server

* Refetch the subscription prior to updating it's status

We only do this if the finalizer's have changed

* allow provisioners to access the channels/status endpoint

* include comment about updateStatus method might change a subscription's finalizers

* rename newSubscription to latestSubscription

The intent is to convey that we're only changing finalizers and the status
  • Loading branch information
dprotaso authored and Evan Anderson committed Jan 9, 2019
1 parent b2c2e52 commit 4712e3a
Show file tree
Hide file tree
Showing 18 changed files with 191 additions and 51 deletions.
2 changes: 2 additions & 0 deletions config/300-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ spec:
shortNames:
- chan
scope: Namespaced
subresources:
status: {}
4 changes: 4 additions & 0 deletions config/300-clusterchannelprovisioner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@ spec:
shortNames:
- ccp
scope: Cluster
# This is done so that metadata.generation will start incrementing
# in Kubernetes v1.11+
subresources:
status: {}
2 changes: 2 additions & 0 deletions config/300-subscription.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ spec:
shortNames:
- sub
scope: Namespaced
subresources:
status: {}
2 changes: 2 additions & 0 deletions config/provisioners/gcppubsub/gcppubsub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rules:
- eventing.knative.dev
resources:
- channels
- channels/status
- clusterchannelprovisioners
verbs:
- get
Expand Down Expand Up @@ -137,6 +138,7 @@ rules:
- eventing.knative.dev
resources:
- channels
- channels/status
verbs:
- get
- list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rules:
- eventing.knative.dev
resources:
- channels
- channels/status
- clusterchannelprovisioners
verbs:
- get
Expand Down
1 change: 1 addition & 0 deletions config/provisioners/kafka/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rules:
- eventing.knative.dev
resources:
- channels
- channels/status
- clusterchannelprovisioners
verbs:
- get
Expand Down
1 change: 1 addition & 0 deletions config/provisioners/natss/provisioner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rules:
- eventing.knative.dev
resources:
- channels
- channels/status
- clusterchannelprovisioners
verbs:
- get
Expand Down
13 changes: 7 additions & 6 deletions pkg/apis/eventing/v1alpha1/channel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
)

// +genclient
// +genclient:noStatus
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// Channel is an abstract resource that implements the Addressable contract.
Expand Down Expand Up @@ -57,12 +56,14 @@ var _ webhook.GenericCRD = (*Channel)(nil)
// ChannelSpec specifies the Provisioner backing a channel and the configuration
// arguments for a Channel.
type ChannelSpec struct {
// TODO: Generation does not work correctly with CRD. They are scrubbed
// by the APIserver (https://github.com/kubernetes/kubernetes/issues/58778)
// So, we add Generation here. Once that gets fixed, remove this and use
// ObjectMeta.Generation instead.
// TODO By enabling the status subresource metadata.generation should increment
// thus making this property obsolete.
//
// We should be able to drop this property with a CRD conversion webhook
// in the future
//
// +optional
Generation int64 `json:"generation,omitempty"`
DeprecatedGeneration int64 `json:"generation,omitempty"`

// Provisioner defines the name of the Provisioner backing this channel.
Provisioner *corev1.ObjectReference `json:"provisioner,omitempty"`
Expand Down
12 changes: 7 additions & 5 deletions pkg/apis/eventing/v1alpha1/cluster_channel_provisioner_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ var _ webhook.GenericCRD = (*ClusterChannelProvisioner)(nil)

// ClusterChannelProvisionerSpec is the spec for a ClusterChannelProvisioner resource.
type ClusterChannelProvisionerSpec struct {
// TODO: Generation does not work correctly with CRD. They are scrubbed
// by the APIserver (https://github.com/kubernetes/kubernetes/issues/58778)
// So, we add Generation here. Once that gets fixed, remove this and use
// ObjectMeta.Generation instead.
// TODO By enabling the status subresource metadata.generation should increment
// thus making this property obsolete.
//
// We should be able to drop this property with a CRD conversion webhook
// in the future
//
// +optional
Generation int64 `json:"generation,omitempty"`
DeprecatedGeneration int64 `json:"generation,omitempty"`
}

var ccProvCondSet = duckv1alpha1.NewLivingConditionSet()
Expand Down
12 changes: 7 additions & 5 deletions pkg/apis/eventing/v1alpha1/subscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ var _ webhook.GenericCRD = (*Subscription)(nil)
// no-op function (identity transformation):
// channel --> reply
type SubscriptionSpec struct {
// TODO: Generation used to not work correctly with CRD. They were scrubbed
// by the APIserver (https://github.com/kubernetes/kubernetes/issues/58778)
// So, we add Generation here. Once the above bug gets rolled out to production
// clusters, remove this and use ObjectMeta.Generation instead.
// TODO By enabling the status subresource metadata.generation should increment
// thus making this property obsolete.
//
// We should be able to drop this property with a CRD conversion webhook
// in the future
//
// +optional
Generation int64 `json:"generation,omitempty"`
DeprecatedGeneration int64 `json:"generation,omitempty"`

// Reference to a channel that will be used to create the subscription
// for receiving events. The channel must have spec.subscriptions
Expand Down
17 changes: 17 additions & 0 deletions pkg/client/clientset/versioned/typed/eventing/v1alpha1/channel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 24 additions & 2 deletions pkg/controller/eventing/inmemory/channel/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,19 @@ func TestReconcile(t *testing.T) {
MockUpdates: errorUpdatingChannel(),
},
WantErrMsg: testErrorMessage,
},
{
}, {
Name: "Channel status update fails",
InitialState: []runtime.Object{
makeChannel(),
makeConfigMap(),
makeK8sService(),
makeVirtualService(),
},
Mocks: controllertesting.Mocks{
MockStatusUpdates: errorUpdatingChannelStatus(),
},
WantErrMsg: testErrorMessage,
}, {
Name: "Channel reconcile successful - Channel list follows pagination",
InitialState: []runtime.Object{
makeChannel(),
Expand Down Expand Up @@ -756,6 +767,17 @@ func errorUpdatingChannel() []controllertesting.MockUpdate {
}
}

func errorUpdatingChannelStatus() []controllertesting.MockStatusUpdate {
return []controllertesting.MockStatusUpdate{
func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) {
if _, ok := obj.(*eventingv1alpha1.Channel); ok {
return controllertesting.Handled, errors.New(testErrorMessage)
}
return controllertesting.Unhandled, nil
},
}
}

func errorUpdatingConfigMap() []controllertesting.MockUpdate {
return []controllertesting.MockUpdate{
func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) {
Expand Down
41 changes: 24 additions & 17 deletions pkg/controller/eventing/subscription/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,36 +160,43 @@ func isNilOrEmptyReply(reply *v1alpha1.ReplyStrategy) bool {
return reply == nil || equality.Semantic.DeepEqual(reply, &v1alpha1.ReplyStrategy{})
}

// updateStatus may in fact update the subscription's finalizers in addition to the status
func (r *reconciler) updateStatus(subscription *v1alpha1.Subscription) (*v1alpha1.Subscription, error) {
newSubscription := &v1alpha1.Subscription{}
err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: subscription.Namespace, Name: subscription.Name}, newSubscription)
objectKey := client.ObjectKey{Namespace: subscription.Namespace, Name: subscription.Name}
latestSubscription := &v1alpha1.Subscription{}

if err != nil {
if err := r.client.Get(context.TODO(), objectKey, latestSubscription); err != nil {
return nil, err
}

updated := false
if !equality.Semantic.DeepEqual(newSubscription.Finalizers, subscription.Finalizers) {
newSubscription.SetFinalizers(subscription.ObjectMeta.Finalizers)
updated = true
subscriptionChanged := false

if !equality.Semantic.DeepEqual(latestSubscription.Finalizers, subscription.Finalizers) {
latestSubscription.SetFinalizers(subscription.ObjectMeta.Finalizers)
if err := r.client.Update(context.TODO(), latestSubscription); err != nil {
return nil, err
}
subscriptionChanged = true
}

if !equality.Semantic.DeepEqual(newSubscription.Status, subscription.Status) {
newSubscription.Status = subscription.Status
updated = true
if equality.Semantic.DeepEqual(latestSubscription.Status, subscription.Status) {
return latestSubscription, nil
}

if updated {
// Until #38113 is merged, we must use Update instead of UpdateStatus to
// update the Status block of the Subscription resource. UpdateStatus will not
// allow changes to the Spec of the resource, which is ideal for ensuring
// nothing other than resource status has been updated.
if err = r.client.Update(context.TODO(), newSubscription); err != nil {
if subscriptionChanged {
// Refetch
latestSubscription = &v1alpha1.Subscription{}
if err := r.client.Get(context.TODO(), objectKey, latestSubscription); err != nil {
return nil, err
}
}

return newSubscription, nil
latestSubscription.Status = subscription.Status
if err := r.client.Status().Update(context.TODO(), latestSubscription); err != nil {
return nil, err
}

return latestSubscription, nil
}

// resolveSubscriberSpec resolves the Spec.Call object. If it's an
Expand Down
35 changes: 29 additions & 6 deletions pkg/controller/testing/mock_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type MockList func(innerClient client.Client, ctx context.Context, opts *client.
type MockCreate func(innerClient client.Client, ctx context.Context, obj runtime.Object) (MockHandled, error)
type MockDelete func(innerClient client.Client, ctx context.Context, obj runtime.Object) (MockHandled, error)
type MockUpdate func(innerClient client.Client, ctx context.Context, obj runtime.Object) (MockHandled, error)
type MockStatusUpdate func(innerClient client.Client, ctx context.Context, obj runtime.Object) (MockHandled, error)

var _ client.Client = (*MockClient)(nil)

Expand All @@ -53,16 +54,21 @@ type MockClient struct {
mocks Mocks
}

type mockStatusWriter struct {
parent *MockClient
}

// The mocks to run on each function type. Each function will run through the mocks in its list
// until one responds with 'Handled'. If there is more than one mock in the list, then the one that
// responds 'Handled' will be removed and not run on subsequent calls to the function. If no mocks
// respond 'Handled', then the real underlying client is called.
type Mocks struct {
MockGets []MockGet
MockLists []MockList
MockCreates []MockCreate
MockDeletes []MockDelete
MockUpdates []MockUpdate
MockGets []MockGet
MockLists []MockList
MockCreates []MockCreate
MockDeletes []MockDelete
MockUpdates []MockUpdate
MockStatusUpdates []MockStatusUpdate
}

func NewMockClient(innerClient client.Client, mocks Mocks) *MockClient {
Expand Down Expand Up @@ -149,5 +155,22 @@ func (m *MockClient) Update(ctx context.Context, obj runtime.Object) error {
}

func (m *MockClient) Status() client.StatusWriter {
return m.innerClient.Status()
return &mockStatusWriter{
parent: m,
}
}

func (w *mockStatusWriter) Update(ctx context.Context, obj runtime.Object) error {
mocks := w.parent.mocks.MockStatusUpdates

for i, mock := range mocks {
handled, err := mock(w.parent.innerClient, ctx, obj)
if handled == Handled {
if len(w.parent.mocks.MockStatusUpdates) > 1 {
w.parent.mocks.MockStatusUpdates = append(mocks[:i], mocks[i+1:]...)
}
return err
}
}
return w.parent.innerClient.Status().Update(ctx, obj)
}
34 changes: 25 additions & 9 deletions pkg/provisioners/channel_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,26 +216,42 @@ func addExpectedLabels(actual, expected map[string]string) map[string]string {
}

func UpdateChannel(ctx context.Context, client runtimeClient.Client, u *eventingv1alpha1.Channel) error {
objectKey := runtimeClient.ObjectKey{Namespace: u.Namespace, Name: u.Name}
channel := &eventingv1alpha1.Channel{}
err := client.Get(ctx, runtimeClient.ObjectKey{Namespace: u.Namespace, Name: u.Name}, channel)
if err != nil {

if err := client.Get(ctx, objectKey, channel); err != nil {
return err
}

updated := false
channelChanged := false

if !equality.Semantic.DeepEqual(channel.Finalizers, u.Finalizers) {
channel.SetFinalizers(u.ObjectMeta.Finalizers)
updated = true
if err := client.Update(ctx, channel); err != nil {
return err
}

channelChanged = true
}

if !equality.Semantic.DeepEqual(channel.Status, u.Status) {
channel.Status = u.Status
updated = true
if equality.Semantic.DeepEqual(channel.Status, u.Status) {
return nil
}

if updated {
return client.Update(ctx, channel)
if channelChanged {
// Refetch
channel = &eventingv1alpha1.Channel{}
if err := client.Get(ctx, objectKey, channel); err != nil {
return err
}
}

channel.Status = u.Status

if err := client.Status().Update(ctx, channel); err != nil {
return err
}

return nil
}

Expand Down
Loading

0 comments on commit 4712e3a

Please sign in to comment.