Skip to content

Commit

Permalink
Merge pull request #73 from alecmerdler/broadcaster
Browse files Browse the repository at this point in the history
Ensure given event broadcaster is used by manager
  • Loading branch information
alecmerdler authored Jan 17, 2025
2 parents 8224053 + 96526e6 commit f26fe5d
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 5 deletions.
74 changes: 73 additions & 1 deletion manager/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package manager
import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
ctrlmanageropts "k8s.io/controller-manager/options"
Expand Down Expand Up @@ -52,7 +56,7 @@ func TestControllerQueueDone(t *testing.T) {
CtxQueue := queue.NewQueueOperationsCtx()
registry := typed.NewRegistry()
broadcaster := record.NewBroadcaster()
eventSink := &typedcorev1.EventSinkImpl{Interface: fake.NewSimpleClientset().CoreV1().Events("")}
eventSink := newFakeEventSink()

controller := NewOwnedResourceController(klogr.New(), "my-controller", gvr, CtxQueue, registry, broadcaster, func(_ context.Context, gvr schema.GroupVersionResource, namespace, name string) {
fmt.Println("processing", gvr, namespace, name)
Expand Down Expand Up @@ -81,3 +85,71 @@ func TestControllerQueueDone(t *testing.T) {
return controller.Queue.Len() == 0
}, 1*time.Second, 1*time.Millisecond)
}

func TestControllerEventsBroadcast(t *testing.T) {
gvr := schema.GroupVersionResource{
Group: "example.com",
Version: "v1",
Resource: "mytypes",
}
CtxQueue := queue.NewQueueOperationsCtx()
registry := typed.NewRegistry()
broadcaster := record.NewBroadcaster()
eventSink := newFakeEventSink()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "my-controller"})

controller := NewOwnedResourceController(klogr.New(), "my-controller", gvr, CtxQueue, registry, broadcaster, func(_ context.Context, gvr schema.GroupVersionResource, namespace, name string) {
fmt.Println("processing", gvr, namespace, name)
})

mgr := NewManager(ctrlmanageropts.RecommendedDebuggingOptions().DebuggingConfiguration, ":8888", broadcaster, eventSink)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
_ = mgr.Start(ctx, controller)
}()
require.Eventually(t, healthCheckPassing(), 1*time.Second, 50*time.Millisecond)

recorder.Event(&v1.ObjectReference{Namespace: "test", Name: "a"}, v1.EventTypeNormal, "test", "test")

require.Eventually(t, func() bool {
return len(eventSink.Events) > 0
}, 5*time.Second, 1*time.Millisecond)
}

type fakeEventSink struct {
Events map[types.UID]*v1.Event
}

func newFakeEventSink() *fakeEventSink {
return &fakeEventSink{
Events: make(map[types.UID]*v1.Event),
}
}

func (f *fakeEventSink) Create(event *v1.Event) (*v1.Event, error) {
f.Events[event.UID] = event
return event, nil
}

func (f *fakeEventSink) Update(event *v1.Event) (*v1.Event, error) {
f.Events[event.UID] = event
return event, nil
}

func (f *fakeEventSink) Patch(oldEvent *v1.Event, _ []byte) (*v1.Event, error) {
f.Events[oldEvent.UID] = oldEvent
return oldEvent, nil
}

func healthCheckPassing() func() bool {
return func() bool {
resp, err := http.Get("http://localhost:8888/healthz")
if err != nil {
return false
}
defer resp.Body.Close()

return resp.StatusCode == http.StatusOK
}
}
6 changes: 2 additions & 4 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ func (m *Manager) Start(ctx context.Context, controllers ...Controller) error {
}
m.RUnlock()

broadcaster := record.NewBroadcaster()

m.once.Do(func() {
m.Lock()
m.errG, ctx = errgroup.WithContext(ctx)
Expand All @@ -88,8 +86,8 @@ func (m *Manager) Start(ctx context.Context, controllers ...Controller) error {

// start broadcaster
m.errG.Go(func() error {
broadcaster.StartStructuredLogging(2)
broadcaster.StartRecordingToSink(m.sink)
m.broadcaster.StartStructuredLogging(2)
m.broadcaster.StartRecordingToSink(m.sink)
return nil
})

Expand Down

0 comments on commit f26fe5d

Please sign in to comment.