diff --git a/internal/controllers/clusterclass/clusterclass_controller.go b/internal/controllers/clusterclass/clusterclass_controller.go index 9fc0c5a0ef11..8d4df7e89c3d 100644 --- a/internal/controllers/clusterclass/clusterclass_controller.go +++ b/internal/controllers/clusterclass/clusterclass_controller.go @@ -46,7 +46,9 @@ import ( "sigs.k8s.io/cluster-api/feature" tlog "sigs.k8s.io/cluster-api/internal/log" runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client" + runtimeregistry "sigs.k8s.io/cluster-api/internal/runtime/registry" "sigs.k8s.io/cluster-api/internal/topology/variables" + "sigs.k8s.io/cluster-api/internal/util/cache" "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/cluster-api/util/conversion" @@ -67,6 +69,10 @@ type Reconciler struct { // RuntimeClient is a client for calling runtime extensions. RuntimeClient runtimeclient.Client + + // discoverVariablesCache is used to temporarily store the response of a DiscoveryVariables call for + // a specific runtime extension/settings combination. + discoverVariablesCache cache.Cache[runtimeclient.CallExtensionCacheEntry] } func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { @@ -84,6 +90,8 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") } + + r.discoverVariablesCache = cache.New[runtimeclient.CallExtensionCacheEntry]() return nil } @@ -236,8 +244,13 @@ func (r *Reconciler) reconcileVariables(ctx context.Context, clusterClass *clust req := &runtimehooksv1.DiscoverVariablesRequest{} req.Settings = patch.External.Settings + // We temporarily cache the response of a DiscoveryVariables call to improve performance in case there are + // many ClusterClasses using the same runtime extension/settings combination. + // This also mitigates spikes when ClusterClass re-syncs happen or when changes to the ExtensionConfig are applied. + // DiscoverVariables is expected to return a "static" response and usually there are few ExtensionConfigs in a mgmt cluster. resp := &runtimehooksv1.DiscoverVariablesResponse{} - err := r.RuntimeClient.CallExtension(ctx, runtimehooksv1.DiscoverVariables, clusterClass, *patch.External.DiscoverVariablesExtension, req, resp) + err := r.RuntimeClient.CallExtension(ctx, runtimehooksv1.DiscoverVariables, clusterClass, *patch.External.DiscoverVariablesExtension, req, resp, + runtimeclient.WithCaching{Cache: r.discoverVariablesCache, CacheKeyFunc: cacheKeyFunc}) if err != nil { errs = append(errs, errors.Wrapf(err, "failed to call DiscoverVariables for patch %s", patch.Name)) continue @@ -460,3 +473,12 @@ func matchNamespace(ctx context.Context, c client.Client, selector labels.Select } return selector.Matches(labels.Set(ns.GetLabels())) } + +func cacheKeyFunc(registration *runtimeregistry.ExtensionRegistration, request runtimehooksv1.RequestObject) string { + // Note: registration.Name is identical to the value of the patch.External.DiscoverVariablesExtension field in the ClusterClass. + s := fmt.Sprintf("%s-%s", registration.Name, registration.ExtensionConfigResourceVersion) + for k, v := range request.GetSettings() { + s += fmt.Sprintf(",%s=%s", k, v) + } + return s +} diff --git a/internal/controllers/clusterclass/clusterclass_controller_test.go b/internal/controllers/clusterclass/clusterclass_controller_test.go index 803d50334d8b..0868772c544b 100644 --- a/internal/controllers/clusterclass/clusterclass_controller_test.go +++ b/internal/controllers/clusterclass/clusterclass_controller_test.go @@ -41,8 +41,10 @@ import ( runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1" "sigs.k8s.io/cluster-api/feature" tlog "sigs.k8s.io/cluster-api/internal/log" + runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client" fakeruntimeclient "sigs.k8s.io/cluster-api/internal/runtime/client/fake" "sigs.k8s.io/cluster-api/internal/test/builder" + "sigs.k8s.io/cluster-api/internal/util/cache" ) func TestClusterClassReconciler_reconcile(t *testing.T) { @@ -1193,7 +1195,8 @@ func TestReconciler_reconcileVariables(t *testing.T) { Build() r := &Reconciler{ - RuntimeClient: fakeRuntimeClient, + RuntimeClient: fakeRuntimeClient, + discoverVariablesCache: cache.New[runtimeclient.CallExtensionCacheEntry](), } err := r.reconcileVariables(ctx, tt.clusterClass) diff --git a/internal/controllers/topology/cluster/patches/external/external_patch_generator_test.go b/internal/controllers/topology/cluster/patches/external/external_patch_generator_test.go index 74e890eb34be..e8d301271c17 100644 --- a/internal/controllers/topology/cluster/patches/external/external_patch_generator_test.go +++ b/internal/controllers/topology/cluster/patches/external/external_patch_generator_test.go @@ -129,7 +129,7 @@ func (f *fakeRuntimeClient) CallAllExtensions(_ context.Context, _ runtimecatalo panic("implement me") } -func (f *fakeRuntimeClient) CallExtension(_ context.Context, _ runtimecatalog.Hook, _ metav1.Object, _ string, request runtimehooksv1.RequestObject, _ runtimehooksv1.ResponseObject) error { +func (f *fakeRuntimeClient) CallExtension(_ context.Context, _ runtimecatalog.Hook, _ metav1.Object, _ string, request runtimehooksv1.RequestObject, _ runtimehooksv1.ResponseObject, _ ...runtimeclient.CallExtensionOption) error { // Keep a copy of the request object. // We keep a copy because the request is modified after the call is made. So we keep a copy to perform assertions. f.callExtensionRequest = request.DeepCopyObject().(runtimehooksv1.RequestObject) diff --git a/internal/runtime/client/client.go b/internal/runtime/client/client.go index 005bc8605db9..b0cb562759ce 100644 --- a/internal/runtime/client/client.go +++ b/internal/runtime/client/client.go @@ -27,6 +27,7 @@ import ( "net/http" "net/url" "path" + "reflect" "strconv" "strings" "time" @@ -49,6 +50,7 @@ import ( runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1" runtimemetrics "sigs.k8s.io/cluster-api/internal/runtime/metrics" runtimeregistry "sigs.k8s.io/cluster-api/internal/runtime/registry" + "sigs.k8s.io/cluster-api/internal/util/cache" "sigs.k8s.io/cluster-api/util" ) @@ -96,7 +98,7 @@ type Client interface { CallAllExtensions(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject) error // CallExtension calls the ExtensionHandler with the given name. - CallExtension(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, name string, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject) error + CallExtension(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, name string, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject, opts ...CallExtensionOption) error } var _ Client = &client{} @@ -276,6 +278,44 @@ func aggregateSuccessfulResponses(aggregatedResponse runtimehooksv1.ResponseObje aggregatedResponse.SetMessage(strings.Join(messages, ", ")) } +// CallExtensionOption is the interface for configuration that modifies CallExtensionOptions for a CallExtension call. +type CallExtensionOption interface { + // ApplyToOptions applies this configuration to the given CallExtensionOptions. + ApplyToOptions(*CallExtensionOptions) +} + +// CallExtensionCacheEntry is a cache entry for the cache that can be used with the CallExtension call via +// the WithCaching option. +type CallExtensionCacheEntry struct { + CacheKey string + Response runtimehooksv1.ResponseObject +} + +// Key returns the cache key of a CallExtensionCacheEntry. +func (c CallExtensionCacheEntry) Key() string { + return c.CacheKey +} + +// WithCaching enables caching for the CallExtension call. +type WithCaching struct { + Cache cache.Cache[CallExtensionCacheEntry] + CacheKeyFunc func(*runtimeregistry.ExtensionRegistration, runtimehooksv1.RequestObject) string +} + +// ApplyToOptions applies WithCaching to the given CallExtensionOptions. +func (w WithCaching) ApplyToOptions(in *CallExtensionOptions) { + in.WithCaching = true + in.Cache = w.Cache + in.CacheKeyFunc = w.CacheKeyFunc +} + +// CallExtensionOptions contains the options for the CallExtension call. +type CallExtensionOptions struct { + WithCaching bool + Cache cache.Cache[CallExtensionCacheEntry] + CacheKeyFunc func(*runtimeregistry.ExtensionRegistration, runtimehooksv1.RequestObject) string +} + // CallExtension makes the call to the extension with the given name. // The response object passed will be updated with the response of the call. // An error is returned if the extension is not compatible with the hook. @@ -288,7 +328,13 @@ func aggregateSuccessfulResponses(aggregatedResponse runtimehooksv1.ResponseObje // Nb. FailurePolicy does not affect the following kinds of errors: // - Internal errors. Examples: hooks is incompatible with ExtensionHandler, ExtensionHandler information is missing. // - Error when ExtensionHandler returns a response with `Status` set to `Failure`. -func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, name string, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject) error { +func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, name string, request runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject, opts ...CallExtensionOption) error { + // Calculate the options. + options := &CallExtensionOptions{} + for _, opt := range opts { + opt.ApplyToOptions(options) + } + log := ctrl.LoggerFrom(ctx).WithValues("extensionHandler", name, "hook", runtimecatalog.HookName(hook)) ctx = ctrl.LoggerInto(ctx, log) hookGVH, err := c.catalog.GroupVersionHook(hook) @@ -331,7 +377,23 @@ func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, fo // Prepare the request by merging the settings in the registration with the settings in the request. request = cloneAndAddSettings(request, registration.Settings) - opts := &httpCallOptions{ + var cacheKey string + if options.WithCaching { + // Return a cached response if response is cached. + cacheKey = options.CacheKeyFunc(registration, request) + if cacheEntry, ok := options.Cache.Has(cacheKey); ok { + // Set response to cacheEntry.Response. + outVal := reflect.ValueOf(response) + cacheVal := reflect.ValueOf(cacheEntry.Response) + if !cacheVal.Type().AssignableTo(outVal.Type()) { + return fmt.Errorf("failed to call extension handler %q: cached response of type %s instead of type %s", name, cacheVal.Type(), outVal.Type()) + } + reflect.Indirect(outVal).Set(reflect.Indirect(cacheVal)) + return nil + } + } + + httpOpts := &httpCallOptions{ catalog: c.catalog, config: registration.ClientConfig, registrationGVH: registration.GroupVersionHook, @@ -339,7 +401,7 @@ func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, fo name: strings.TrimSuffix(registration.Name, "."+registration.ExtensionConfigName), timeout: timeoutDuration, } - err = httpCall(ctx, request, response, opts) + err = httpCall(ctx, request, response, httpOpts) if err != nil { // If the error is errCallingExtensionHandler then apply failure policy to calculate // the effective result of the operation. @@ -368,6 +430,14 @@ func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, fo log.V(4).Info("Extension handler returned success response") } + if options.WithCaching { + // Add response to the cache. + options.Cache.Add(CallExtensionCacheEntry{ + CacheKey: cacheKey, + Response: response, + }) + } + // Received a successful response from the extension handler. The `response` object // has been populated with the result. Return no error. return nil diff --git a/internal/runtime/client/client_test.go b/internal/runtime/client/client_test.go index 73ec5aa5cef1..5d868e36acea 100644 --- a/internal/runtime/client/client_test.go +++ b/internal/runtime/client/client_test.go @@ -44,6 +44,7 @@ import ( runtimeregistry "sigs.k8s.io/cluster-api/internal/runtime/registry" fakev1alpha1 "sigs.k8s.io/cluster-api/internal/runtime/test/v1alpha1" fakev1alpha2 "sigs.k8s.io/cluster-api/internal/runtime/test/v1alpha2" + "sigs.k8s.io/cluster-api/internal/util/cache" ) func TestClient_httpCall(t *testing.T) { @@ -543,6 +544,9 @@ func TestClient_CallExtension(t *testing.T) { fpIgnore := runtimev1.FailurePolicyIgnore validExtensionHandlerWithFailPolicy := runtimev1.ExtensionConfig{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "15", + }, Spec: runtimev1.ExtensionConfigSpec{ ClientConfig: runtimev1.ClientConfig{ // Set a fake URL, in test cases where we start the test server the URL will be overridden. @@ -566,6 +570,9 @@ func TestClient_CallExtension(t *testing.T) { }, } validExtensionHandlerWithIgnorePolicy := runtimev1.ExtensionConfig{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "15", + }, Spec: runtimev1.ExtensionConfigSpec{ ClientConfig: runtimev1.ClientConfig{ // Set a fake URL, in test cases where we start the test server the URL will be overridden. @@ -599,6 +606,7 @@ func TestClient_CallExtension(t *testing.T) { args args testServer testServerConfig wantErr bool + wantResponseCached bool }{ { name: "should fail when hook and request/response are not compatible", @@ -612,7 +620,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.SecondFakeRequest{}, response: &fakev1alpha1.SecondFakeResponse{}, }, - wantErr: true, + wantErr: true, + wantResponseCached: false, }, { name: "should fail when hook GVH does not match the registered ExtensionHandler", @@ -626,7 +635,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.SecondFakeRequest{}, response: &fakev1alpha1.SecondFakeResponse{}, }, - wantErr: true, + wantErr: true, + wantResponseCached: false, }, { name: "should fail if ExtensionHandler is not registered", @@ -643,7 +653,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: true, + wantErr: true, + wantResponseCached: false, }, { name: "should succeed when calling ExtensionHandler with success response and FailurePolicyFail", @@ -660,7 +671,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: false, + wantErr: false, + wantResponseCached: true, }, { name: "should succeed when calling ExtensionHandler with success response and FailurePolicyIgnore", @@ -677,7 +689,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: false, + wantErr: false, + wantResponseCached: true, }, { name: "should fail when calling ExtensionHandler with failure response and FailurePolicyFail", @@ -694,7 +707,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: true, + wantErr: true, + wantResponseCached: false, }, { name: "should fail when calling ExtensionHandler with failure response and FailurePolicyIgnore", @@ -711,7 +725,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: true, + wantErr: true, + wantResponseCached: false, }, { @@ -726,7 +741,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: false, + wantErr: false, + wantResponseCached: false, // Note: We only want to cache entirely successful responses. }, { name: "should fail with unreachable extension and FailurePolicyFail", @@ -740,7 +756,8 @@ func TestClient_CallExtension(t *testing.T) { request: &fakev1alpha1.FakeRequest{}, response: &fakev1alpha1.FakeResponse{}, }, - wantErr: true, + wantErr: true, + wantResponseCached: false, }, } @@ -748,8 +765,11 @@ func TestClient_CallExtension(t *testing.T) { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) + var serverCallCount int if tt.testServer.start { - srv := createSecureTestServer(tt.testServer) + srv := createSecureTestServer(tt.testServer, func() { + serverCallCount++ + }) srv.StartTLS() defer srv.Close() @@ -778,17 +798,58 @@ func TestClient_CallExtension(t *testing.T) { Namespace: "foo", }, } + // Call once without caching. err := c.CallExtension(context.Background(), tt.args.hook, obj, tt.args.name, tt.args.request, tt.args.response) + if tt.wantErr { + g.Expect(err).To(HaveOccurred()) + } else { + g.Expect(err).ToNot(HaveOccurred()) + } + // Call again with caching. + serverCallCount = 0 + cache := cache.New[CallExtensionCacheEntry]() + err = c.CallExtension(context.Background(), tt.args.hook, obj, tt.args.name, tt.args.request, tt.args.response, + WithCaching{Cache: cache, CacheKeyFunc: cacheKeyFunc}) if tt.wantErr { g.Expect(err).To(HaveOccurred()) } else { g.Expect(err).ToNot(HaveOccurred()) } + + if tt.wantResponseCached { + // When we expect the response to be cached we expect 1 call to the server. + g.Expect(serverCallCount).To(Equal(1)) + cacheEntry, isCached := cache.Has("valid-extension-15") + g.Expect(isCached).To(BeTrue()) + g.Expect(cacheEntry).ToNot(BeNil()) + + err = c.CallExtension(context.Background(), tt.args.hook, obj, tt.args.name, tt.args.request, tt.args.response, + WithCaching{Cache: cache, CacheKeyFunc: cacheKeyFunc}) + // When we expect the response to be cached we always expect no errors. + g.Expect(err).ToNot(HaveOccurred()) + // As the response is cached we expect no further calls to the server. + g.Expect(serverCallCount).To(Equal(1)) + cacheEntry, isCached = cache.Has("valid-extension-15") + g.Expect(isCached).To(BeTrue()) + g.Expect(cacheEntry).ToNot(BeNil()) + } else { + _, isCached := cache.Has("valid-extension-15") + g.Expect(isCached).To(BeFalse()) + } }) } } +func cacheKeyFunc(registration *runtimeregistry.ExtensionRegistration, request runtimehooksv1.RequestObject) string { + // Note: registration.Name is identical to the value of the name parameter passed into CallExtension. + s := fmt.Sprintf("%s-%s", registration.Name, registration.ExtensionConfigResourceVersion) + for k, v := range request.GetSettings() { + s += fmt.Sprintf(",%s=%s", k, v) + } + return s +} + func TestPrepareRequest(t *testing.T) { t.Run("request should have the correct settings", func(t *testing.T) { tests := []struct { @@ -1255,9 +1316,13 @@ func response(status runtimehooksv1.ResponseStatus) testServerResponse { } } -func createSecureTestServer(server testServerConfig) *httptest.Server { +func createSecureTestServer(server testServerConfig, callbacks ...func()) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + for _, callback := range callbacks { + callback() + } + // Write the response for the first match in tt.testServer.responses. for pathRegex, resp := range server.responses { if !regexp.MustCompile(pathRegex).MatchString(r.URL.Path) { diff --git a/internal/runtime/client/fake/fake_client.go b/internal/runtime/client/fake/fake_client.go index d1722a2d40a1..c37efe186e2f 100644 --- a/internal/runtime/client/fake/fake_client.go +++ b/internal/runtime/client/fake/fake_client.go @@ -119,7 +119,7 @@ func (fc *RuntimeClient) CallAllExtensions(ctx context.Context, hook runtimecata } // CallExtension implements Client. -func (fc *RuntimeClient) CallExtension(ctx context.Context, _ runtimecatalog.Hook, _ metav1.Object, name string, _ runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject) error { +func (fc *RuntimeClient) CallExtension(ctx context.Context, _ runtimecatalog.Hook, _ metav1.Object, name string, _ runtimehooksv1.RequestObject, response runtimehooksv1.ResponseObject, _ ...runtimeclient.CallExtensionOption) error { expectedResponse, ok := fc.callResponses[name] if !ok { // This should actually panic because an error here would mean a mistake in the test setup. diff --git a/internal/runtime/registry/registry.go b/internal/runtime/registry/registry.go index 1ed2a7407f95..1de4c1eebcfa 100644 --- a/internal/runtime/registry/registry.go +++ b/internal/runtime/registry/registry.go @@ -64,6 +64,9 @@ type ExtensionRegistration struct { // ExtensionConfigName is the name of the corresponding ExtensionConfig. ExtensionConfigName string + // ExtensionConfigResourceVersion is the ResourceVersion of the corresponding ExtensionConfig. + ExtensionConfigResourceVersion string + // GroupVersionHook is the GroupVersionHook that the RuntimeExtension implements. GroupVersionHook runtimecatalog.GroupVersionHook @@ -247,8 +250,9 @@ func (r *extensionRegistry) add(extensionConfig *runtimev1.ExtensionConfig) erro // Registrations will only be added to the registry if no errors occur (all or nothing). registrations = append(registrations, &ExtensionRegistration{ - ExtensionConfigName: extensionConfig.Name, - Name: e.Name, + ExtensionConfigName: extensionConfig.Name, + ExtensionConfigResourceVersion: extensionConfig.ResourceVersion, + Name: e.Name, GroupVersionHook: runtimecatalog.GroupVersionHook{ Group: gv.Group, Version: gv.Version, diff --git a/internal/util/cache/cache.go b/internal/util/cache/cache.go new file mode 100644 index 000000000000..0765a0ff8db1 --- /dev/null +++ b/internal/util/cache/cache.go @@ -0,0 +1,148 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + kcache "k8s.io/client-go/tools/cache" + ctrl "sigs.k8s.io/controller-runtime" +) + +const ( + // ttl is the duration for which we keep entries in the cache. + ttl = 10 * time.Minute + + // expirationInterval is the interval in which we will remove expired entries + // from the cache. + expirationInterval = 10 * time.Hour +) + +// Entry is the interface for the type of Cache entries. +type Entry interface { + // Key returns the cache key of an Entry. + Key() string +} + +// Cache caches objects of type Entry. +type Cache[E Entry] interface { + // Add adds the given entry to the Cache. + // Note: entries expire after the ttl. + Add(entry E) + + // Has checks if the given key (still) exists in the Cache. + // Note: entries expire after the ttl. + Has(key string) (E, bool) +} + +// New creates a new cache. +func New[E Entry]() Cache[E] { + r := &cache[E]{ + Store: kcache.NewTTLStore(func(obj interface{}) (string, error) { + // We only add objects of type E to the cache, so it's safe to cast to E. + return obj.(E).Key(), nil + }, ttl), + } + go func() { + for { + // Call list to clear the cache of expired items. + // We have to do this periodically as the cache itself only expires + // items lazily. If we don't do this the cache grows indefinitely. + r.List() + + time.Sleep(expirationInterval) + } + }() + return r +} + +type cache[E Entry] struct { + kcache.Store +} + +// Add adds the given entry to the Cache. +// Note: entries expire after the ttl. +func (r *cache[E]) Add(entry E) { + // Note: We can ignore the error here because the key func we pass into NewTTLStore never + // returns errors. + _ = r.Store.Add(entry) +} + +// Has checks if the given key (still) exists in the Cache. +// Note: entries expire after the ttl. +func (r *cache[E]) Has(key string) (E, bool) { + // Note: We can ignore the error here because GetByKey never returns an error. + item, exists, _ := r.Store.GetByKey(key) + if exists { + return item.(E), true + } + return *new(E), false +} + +// ReconcileEntry is an Entry for the Cache that stores the +// earliest time after which the next Reconcile should be executed. +type ReconcileEntry struct { + Request ctrl.Request + ReconcileAfter time.Time +} + +// NewReconcileEntry creates a new ReconcileEntry based on an object and a reconcileAfter time. +func NewReconcileEntry(obj metav1.Object, reconcileAfter time.Time) ReconcileEntry { + return ReconcileEntry{ + Request: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + }, + }, + ReconcileAfter: reconcileAfter, + } +} + +// NewReconcileEntryKey returns the key of a ReconcileEntry based on an object. +func NewReconcileEntryKey(obj metav1.Object) string { + return ReconcileEntry{ + Request: ctrl.Request{ + NamespacedName: types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + }, + }, + }.Key() +} + +var _ Entry = &ReconcileEntry{} + +// Key returns the cache key of a ReconcileEntry. +func (r ReconcileEntry) Key() string { + return r.Request.String() +} + +// ShouldRequeue returns if the current Reconcile should be requeued. +func (r ReconcileEntry) ShouldRequeue(now time.Time) (requeueAfter time.Duration, requeue bool) { + if r.ReconcileAfter.IsZero() { + return time.Duration(0), false + } + + if r.ReconcileAfter.After(now) { + return r.ReconcileAfter.Sub(now), true + } + + return time.Duration(0), false +} diff --git a/internal/util/cache/cache_test.go b/internal/util/cache/cache_test.go new file mode 100644 index 000000000000..4134c50d8fc6 --- /dev/null +++ b/internal/util/cache/cache_test.go @@ -0,0 +1,100 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" + "time" + + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" +) + +func TestCache(t *testing.T) { + g := NewWithT(t) + + c := New[ReconcileEntry]() + + machine := &clusterv1.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-machine", + }, + } + entry := NewReconcileEntry(machine, time.Now()) + + _, ok := c.Has(entry.Key()) + g.Expect(ok).To(BeFalse()) + + c.Add(entry) + + entryFromCache, ok := c.Has(entry.Key()) + g.Expect(ok).To(BeTrue()) + g.Expect(entryFromCache).To(Equal(entry)) +} + +func TestShouldRequeueDrain(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + now time.Time + reconcileAfter time.Time + wantRequeue bool + wantRequeueAfter time.Duration + }{ + { + name: "Don't requeue, reconcileAfter is zero", + now: now, + reconcileAfter: time.Time{}, + wantRequeue: false, + wantRequeueAfter: time.Duration(0), + }, + { + name: "Requeue after 15s", + now: now, + reconcileAfter: now.Add(time.Duration(15) * time.Second), + wantRequeue: true, + wantRequeueAfter: time.Duration(15) * time.Second, + }, + { + name: "Don't requeue, reconcileAfter is now", + now: now, + reconcileAfter: now, + wantRequeue: false, + wantRequeueAfter: time.Duration(0), + }, + { + name: "Don't requeue, reconcileAfter is before now", + now: now, + reconcileAfter: now.Add(-time.Duration(60) * time.Second), + wantRequeue: false, + wantRequeueAfter: time.Duration(0), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + gotRequeueAfter, gotRequeue := ReconcileEntry{ReconcileAfter: tt.reconcileAfter}.ShouldRequeue(tt.now) + g.Expect(gotRequeue).To(Equal(tt.wantRequeue)) + g.Expect(gotRequeueAfter).To(Equal(tt.wantRequeueAfter)) + }) + } +} diff --git a/internal/util/cache/doc.go b/internal/util/cache/doc.go new file mode 100644 index 000000000000..be95d4e80272 --- /dev/null +++ b/internal/util/cache/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package cache implements some cache utils. +package cache diff --git a/test/extension/handlers/topologymutation/handler_integration_test.go b/test/extension/handlers/topologymutation/handler_integration_test.go index 04c8e8eca028..0db650d9e010 100644 --- a/test/extension/handlers/topologymutation/handler_integration_test.go +++ b/test/extension/handlers/topologymutation/handler_integration_test.go @@ -54,6 +54,7 @@ import ( "sigs.k8s.io/cluster-api/exp/topology/desiredstate" "sigs.k8s.io/cluster-api/exp/topology/scope" "sigs.k8s.io/cluster-api/feature" + runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client" "sigs.k8s.io/cluster-api/util/contract" "sigs.k8s.io/cluster-api/webhooks" ) @@ -403,7 +404,7 @@ type injectRuntimeClient struct { runtimeExtension TopologyMutationHook } -func (i injectRuntimeClient) CallExtension(ctx context.Context, hook runtimecatalog.Hook, _ metav1.Object, _ string, req runtimehooksv1.RequestObject, resp runtimehooksv1.ResponseObject) error { +func (i injectRuntimeClient) CallExtension(ctx context.Context, hook runtimecatalog.Hook, _ metav1.Object, _ string, req runtimehooksv1.RequestObject, resp runtimehooksv1.ResponseObject, _ ...runtimeclient.CallExtensionOption) error { // Note: We have to copy the requests. Otherwise we could get side effect by Runtime Extensions // modifying the request instead of properly returning a response. Also after Unmarshal, // only the Raw fields in runtime.RawExtension fields should be filled out and Object should be nil.