From 082726aeb7f9de85a51c5a9a3c1810cf1674658c Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Mon, 2 Dec 2024 07:50:14 -0800 Subject: [PATCH] =?UTF-8?q?[release-0.19]=20=F0=9F=90=9B=20Refactor=20cert?= =?UTF-8?q?ificate=20watcher=20to=20use=20polling,=20instead=20of=20fsnoti?= =?UTF-8?q?fy=20(#3023)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Reestablish watch for the certificate paths * Remove fsnotify and use cached read watcher * Simplify return * 🐛Fix certwatcher test to be backwards compatible --------- Co-authored-by: Maxim Muzafarov Signed-off-by: Vince Prignano --- examples/scratch-env/go.mod | 3 +- examples/scratch-env/go.sum | 6 +- go.mod | 6 +- go.sum | 8 +- pkg/certwatcher/certwatcher.go | 166 ++++++++-------------- pkg/certwatcher/certwatcher_suite_test.go | 1 + pkg/certwatcher/certwatcher_test.go | 51 +++++-- pkg/certwatcher/example_test.go | 2 +- pkg/certwatcher/metrics/metrics.go | 1 + 9 files changed, 116 insertions(+), 128 deletions(-) diff --git a/examples/scratch-env/go.mod b/examples/scratch-env/go.mod index 345940ee47..be92f0eea7 100644 --- a/examples/scratch-env/go.mod +++ b/examples/scratch-env/go.mod @@ -14,8 +14,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch/v5 v5.9.0 // indirect - github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect diff --git a/examples/scratch-env/go.sum b/examples/scratch-env/go.sum index 12c31c9069..c64c0651b7 100644 --- a/examples/scratch-env/go.sum +++ b/examples/scratch-env/go.sum @@ -12,10 +12,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg= github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= -github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= -github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= github.com/go-logr/zapr v1.3.0/go.mod h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg= github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= diff --git a/go.mod b/go.mod index db128c8ab7..ccb82dac89 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,8 @@ go 1.22.0 require ( github.com/evanphx/json-patch v4.12.0+incompatible // Using v4 to match upstream github.com/evanphx/json-patch/v5 v5.9.0 - github.com/fsnotify/fsnotify v1.7.0 - github.com/go-logr/logr v1.4.1 + github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/go-logr/logr v1.4.2 github.com/go-logr/zapr v1.3.0 github.com/google/go-cmp v0.6.0 github.com/google/gofuzz v1.2.0 @@ -38,7 +38,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect - github.com/felixge/httpsnoop v1.0.3 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect diff --git a/go.sum b/go.sum index 1e47c0243e..63919580af 100644 --- a/go.sum +++ b/go.sum @@ -21,13 +21,13 @@ github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg= github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= -github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= -github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= diff --git a/pkg/certwatcher/certwatcher.go b/pkg/certwatcher/certwatcher.go index fe15fc0dd7..f629dd4e16 100644 --- a/pkg/certwatcher/certwatcher.go +++ b/pkg/certwatcher/certwatcher.go @@ -17,58 +17,55 @@ limitations under the License. package certwatcher import ( + "bytes" "context" "crypto/tls" - "fmt" + "os" "sync" "time" - "github.com/fsnotify/fsnotify" - kerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" ) var log = logf.RuntimeLog.WithName("certwatcher") -// CertWatcher watches certificate and key files for changes. When either file -// changes, it reads and parses both and calls an optional callback with the new -// certificate. +const defaultWatchInterval = 10 * time.Second + +// CertWatcher watches certificate and key files for changes. +// It always returns the cached version, +// but periodically reads and parses certificate and key for changes +// and calls an optional callback with the new certificate. type CertWatcher struct { sync.RWMutex currentCert *tls.Certificate - watcher *fsnotify.Watcher + interval time.Duration certPath string keyPath string + cachedKeyPEMBlock []byte + // callback is a function to be invoked when the certificate changes. callback func(tls.Certificate) } // New returns a new CertWatcher watching the given certificate and key. func New(certPath, keyPath string) (*CertWatcher, error) { - var err error - cw := &CertWatcher{ certPath: certPath, keyPath: keyPath, + interval: defaultWatchInterval, } - // Initial read of certificate and key. - if err := cw.ReadCertificate(); err != nil { - return nil, err - } - - cw.watcher, err = fsnotify.NewWatcher() - if err != nil { - return nil, err - } + return cw, cw.ReadCertificate() +} - return cw, nil +// WithWatchInterval sets the watch interval and returns the CertWatcher pointer +func (cw *CertWatcher) WithWatchInterval(interval time.Duration) *CertWatcher { + cw.interval = interval + return cw } // RegisterCallback registers a callback to be invoked when the certificate changes. @@ -91,72 +88,71 @@ func (cw *CertWatcher) GetCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate, // Start starts the watch on the certificate and key files. func (cw *CertWatcher) Start(ctx context.Context) error { - files := sets.New(cw.certPath, cw.keyPath) - - { - var watchErr error - if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { - for _, f := range files.UnsortedList() { - if err := cw.watcher.Add(f); err != nil { - watchErr = err - return false, nil //nolint:nilerr // We want to keep trying. - } - // We've added the watch, remove it from the set. - files.Delete(f) - } - return true, nil - }); err != nil { - return fmt.Errorf("failed to add watches: %w", kerrors.NewAggregate([]error{err, watchErr})) - } - } - - go cw.Watch() + ticker := time.NewTicker(cw.interval) + defer ticker.Stop() log.Info("Starting certificate watcher") - - // Block until the context is done. - <-ctx.Done() - - return cw.watcher.Close() -} - -// Watch reads events from the watcher's channel and reacts to changes. -func (cw *CertWatcher) Watch() { for { select { - case event, ok := <-cw.watcher.Events: - // Channel is closed. - if !ok { - return + case <-ctx.Done(): + return nil + case <-ticker.C: + if err := cw.ReadCertificate(); err != nil { + log.Error(err, "failed read certificate") } + } + } +} - cw.handleEvent(event) +// Watch used to read events from the watcher's channel and reacts to changes, +// it has currently no function and it's left here for backward compatibility until a future release. +// +// Deprecated: fsnotify has been removed and Start() is now polling instead. +func (cw *CertWatcher) Watch() { +} - case err, ok := <-cw.watcher.Errors: - // Channel is closed. - if !ok { - return - } +// updateCachedCertificate checks if the new certificate differs from the cache, +// updates it and returns the result if it was updated or not +func (cw *CertWatcher) updateCachedCertificate(cert *tls.Certificate, keyPEMBlock []byte) bool { + cw.Lock() + defer cw.Unlock() - log.Error(err, "certificate watch error") - } + if cw.currentCert != nil && + bytes.Equal(cw.currentCert.Certificate[0], cert.Certificate[0]) && + bytes.Equal(cw.cachedKeyPEMBlock, keyPEMBlock) { + log.V(7).Info("certificate already cached") + return false } + cw.currentCert = cert + cw.cachedKeyPEMBlock = keyPEMBlock + return true } // ReadCertificate reads the certificate and key files from disk, parses them, -// and updates the current certificate on the watcher. If a callback is set, it +// and updates the current certificate on the watcher if updated. If a callback is set, it // is invoked with the new certificate. func (cw *CertWatcher) ReadCertificate() error { metrics.ReadCertificateTotal.Inc() - cert, err := tls.LoadX509KeyPair(cw.certPath, cw.keyPath) + certPEMBlock, err := os.ReadFile(cw.certPath) + if err != nil { + metrics.ReadCertificateErrors.Inc() + return err + } + keyPEMBlock, err := os.ReadFile(cw.keyPath) if err != nil { metrics.ReadCertificateErrors.Inc() return err } - cw.Lock() - cw.currentCert = &cert - cw.Unlock() + cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) + if err != nil { + metrics.ReadCertificateErrors.Inc() + return err + } + + if !cw.updateCachedCertificate(&cert, keyPEMBlock) { + return nil + } log.Info("Updated current TLS certificate") @@ -170,39 +166,3 @@ func (cw *CertWatcher) ReadCertificate() error { } return nil } - -func (cw *CertWatcher) handleEvent(event fsnotify.Event) { - // Only care about events which may modify the contents of the file. - if !(isWrite(event) || isRemove(event) || isCreate(event) || isChmod(event)) { - return - } - - log.V(1).Info("certificate event", "event", event) - - // If the file was removed or renamed, re-add the watch to the previous name - if isRemove(event) || isChmod(event) { - if err := cw.watcher.Add(event.Name); err != nil { - log.Error(err, "error re-watching file") - } - } - - if err := cw.ReadCertificate(); err != nil { - log.Error(err, "error re-reading certificate") - } -} - -func isWrite(event fsnotify.Event) bool { - return event.Op.Has(fsnotify.Write) -} - -func isCreate(event fsnotify.Event) bool { - return event.Op.Has(fsnotify.Create) -} - -func isRemove(event fsnotify.Event) bool { - return event.Op.Has(fsnotify.Remove) -} - -func isChmod(event fsnotify.Event) bool { - return event.Op.Has(fsnotify.Chmod) -} diff --git a/pkg/certwatcher/certwatcher_suite_test.go b/pkg/certwatcher/certwatcher_suite_test.go index 2d0f677685..d0d9a72a62 100644 --- a/pkg/certwatcher/certwatcher_suite_test.go +++ b/pkg/certwatcher/certwatcher_suite_test.go @@ -22,6 +22,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" ) diff --git a/pkg/certwatcher/certwatcher_test.go b/pkg/certwatcher/certwatcher_test.go index 1fb247581f..fa7e09adae 100644 --- a/pkg/certwatcher/certwatcher_test.go +++ b/pkg/certwatcher/certwatcher_test.go @@ -17,6 +17,7 @@ limitations under the License. package certwatcher_test import ( + "bytes" "context" "crypto/rand" "crypto/rsa" @@ -34,6 +35,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus/testutil" + "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics" ) @@ -80,7 +82,7 @@ var _ = Describe("CertWatcher", func() { go func() { defer GinkgoRecover() defer close(doneCh) - Expect(watcher.Start(ctx)).To(Succeed()) + Expect(watcher.WithWatchInterval(time.Second).Start(ctx)).To(Succeed()) }() // wait till we read first cert Eventually(func() error { @@ -113,7 +115,7 @@ var _ = Describe("CertWatcher", func() { Eventually(func() bool { secondcert, _ := watcher.GetCertificate(nil) first := firstcert.PrivateKey.(*rsa.PrivateKey) - return first.Equal(secondcert.PrivateKey) + return first.Equal(secondcert.PrivateKey) || bytes.Equal(firstcert.Certificate[0], secondcert.Certificate[0]) }).ShouldNot(BeTrue()) ctxCancel() @@ -143,7 +145,7 @@ var _ = Describe("CertWatcher", func() { Eventually(func() bool { secondcert, _ := watcher.GetCertificate(nil) first := firstcert.PrivateKey.(*rsa.PrivateKey) - return first.Equal(secondcert.PrivateKey) + return first.Equal(secondcert.PrivateKey) || bytes.Equal(firstcert.Certificate[0], secondcert.Certificate[0]) }).ShouldNot(BeTrue()) ctxCancel() @@ -151,6 +153,33 @@ var _ = Describe("CertWatcher", func() { Expect(called.Load()).To(BeNumerically(">=", 1)) }) + It("should reload currentCert after move out", func() { + doneCh := startWatcher() + called := atomic.Int64{} + watcher.RegisterCallback(func(crt tls.Certificate) { + called.Add(1) + Expect(crt.Certificate).ToNot(BeEmpty()) + }) + + firstcert, _ := watcher.GetCertificate(nil) + + Expect(os.Rename(certPath, certPath+".old")).To(Succeed()) + Expect(os.Rename(keyPath, keyPath+".old")).To(Succeed()) + + err := writeCerts(certPath, keyPath, "192.168.0.3") + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() bool { + secondcert, _ := watcher.GetCertificate(nil) + first := firstcert.PrivateKey.(*rsa.PrivateKey) + return first.Equal(secondcert.PrivateKey) || bytes.Equal(firstcert.Certificate[0], secondcert.Certificate[0]) + }, "10s", "1s").ShouldNot(BeTrue()) + + ctxCancel() + Eventually(doneCh, "4s").Should(BeClosed()) + Expect(called.Load()).To(BeNumerically(">=", 1)) + }) + Context("prometheus metric read_certificate_total", func() { var readCertificateTotalBefore float64 var readCertificateErrorsBefore float64 @@ -165,8 +194,8 @@ var _ = Describe("CertWatcher", func() { Eventually(func() error { readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal) - if readCertificateTotalAfter != readCertificateTotalBefore+1.0 { - return fmt.Errorf("metric read certificate total expected: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter) + if readCertificateTotalAfter < readCertificateTotalBefore+1.0 { + return fmt.Errorf("metric read certificate total expected at least: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter) } return nil }, "4s").Should(Succeed()) @@ -180,8 +209,8 @@ var _ = Describe("CertWatcher", func() { Eventually(func() error { readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal) - if readCertificateTotalAfter != readCertificateTotalBefore+1.0 { - return fmt.Errorf("metric read certificate total expected: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter) + if readCertificateTotalAfter < readCertificateTotalBefore+1.0 { + return fmt.Errorf("metric read certificate total expected at least: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter) } readCertificateTotalBefore = readCertificateTotalAfter return nil @@ -192,15 +221,15 @@ var _ = Describe("CertWatcher", func() { // Note, we are checking two errors here, because os.Remove generates two fsnotify events: Chmod + Remove Eventually(func() error { readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal) - if readCertificateTotalAfter != readCertificateTotalBefore+2.0 { - return fmt.Errorf("metric read certificate total expected: %v and got: %v", readCertificateTotalBefore+2.0, readCertificateTotalAfter) + if readCertificateTotalAfter < readCertificateTotalBefore+2.0 { + return fmt.Errorf("metric read certificate total expected at least: %v and got: %v", readCertificateTotalBefore+2.0, readCertificateTotalAfter) } return nil }, "4s").Should(Succeed()) Eventually(func() error { readCertificateErrorsAfter := testutil.ToFloat64(metrics.ReadCertificateErrors) - if readCertificateErrorsAfter != readCertificateErrorsBefore+2.0 { - return fmt.Errorf("metric read certificate errors expected: %v and got: %v", readCertificateErrorsBefore+2.0, readCertificateErrorsAfter) + if readCertificateErrorsAfter < readCertificateErrorsBefore+2.0 { + return fmt.Errorf("metric read certificate errors expected at least: %v and got: %v", readCertificateErrorsBefore+2.0, readCertificateErrorsAfter) } return nil }, "4s").Should(Succeed()) diff --git a/pkg/certwatcher/example_test.go b/pkg/certwatcher/example_test.go index e322aeebfc..e85b2403cb 100644 --- a/pkg/certwatcher/example_test.go +++ b/pkg/certwatcher/example_test.go @@ -39,7 +39,7 @@ func Example() { panic(err) } - // Start goroutine with certwatcher running fsnotify against supplied certdir + // Start goroutine with certwatcher running against supplied cert go func() { if err := watcher.Start(ctx); err != nil { panic(err) diff --git a/pkg/certwatcher/metrics/metrics.go b/pkg/certwatcher/metrics/metrics.go index 05869eff03..f128abbcf0 100644 --- a/pkg/certwatcher/metrics/metrics.go +++ b/pkg/certwatcher/metrics/metrics.go @@ -18,6 +18,7 @@ package metrics import ( "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/controller-runtime/pkg/metrics" )