Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensuring the integrity of full snapshot before uploading it to the object store. #779

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

ishan16696
Copy link
Member

What this PR does / why we need it:
It has been observed that while doing the restoration from full snapshot sometimes backup-restore failed to restore the etcd due to sometimes full snapshots got corrupted or missing a hash.
This PR is try to minimise the occurrence of such scenarios by verifying the integrity of full snapshot before uploading it to the object store.

Which issue(s) this PR fixes:
Fixes #778

Special notes for your reviewer:
This is how it's done:

Screenshot 2024-09-23 at 10 10 45 AM

Release note:

Ensure the integrity of the full snapshot prior to uploading it to the object store, thereby reducing the potential restoration failures.

@ishan16696 ishan16696 requested a review from a team as a code owner September 23, 2024 07:25
@gardener-robot gardener-robot added needs/review Needs review size/m Size of pull request is medium (see gardener-robot robot/bots/size.py) labels Sep 23, 2024
@gardener-robot-ci-1 gardener-robot-ci-1 added reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) needs/ok-to-test Needs approval for testing (check PR in detail before setting this label because PR is run on CI/CD) and removed reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) labels Sep 23, 2024
@ishan16696
Copy link
Member Author

ishan16696 commented Sep 23, 2024

Performance tests have been performed with etcd of 7GB database size:

~ > etcdctl endpoint status --endpoints=http://127.0.0.1:2379 --cluster -w table
+-----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
|       ENDPOINT        |        ID        | VERSION | DB SIZE | IS LEADER | IS LEARNER | RAFT TERM | RAFT INDEX | RAFT APPLIED INDEX | ERRORS |
+-----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+
| http://localhost:2379 | 8e9e05c52164694d |   3.5.9 |  7.0 GB |      true |      false |         3 |      35952 |              35952 |        |
+-----------------------+------------------+---------+---------+-----------+------------+-----------+------------+--------------------+--------+

Backups taken by backup-restore with their sizes on disk.

25M	Full-00000000-02000001-1726772100.gz
79M	Full-00000000-05532798-1726864342.gz
140M	Full-00000000-09083821-1726946137.gz
185M	Full-00000000-11680572-1727038676.gz
200K	Incr-11680573-11691572-1727038777.gz
200K	Incr-11691573-11702572-1727038778.gz
200K	Incr-11702573-11712987-1727038807.gz
68K	Incr-11712988-11716517-1727068649.gz

Then a big full-snapshot is triggered by backup-restore:

INFO[0536] Taking scheduled full snapshot for time: 2024-09-23 11:00:00.014743 +0530 IST  actor=snapshotter
{"level":"info","ts":"2024-09-23T11:00:00.038+0530","caller":"clientv3/maintenance.go:200","msg":"opened snapshot stream; downloading"}
INFO[0536] Total time taken by Snapshot API: 0.002619 seconds.  actor=snapshotter
INFO[0536] checking the full snapshot integrity with the help of SHA256  actor=snapshotter
{"level":"info","ts":"2024-09-23T11:00:06.902+0530","caller":"clientv3/maintenance.go:208","msg":"completed snapshot read; closing"}
INFO[0547] full snapshot SHA256 hash has been successfully verified.  actor=snapshotter
INFO[0547] start compressing the snapshot using gzip Compression Policy  actor=compressor
INFO[0547] Total time taken in full snapshot compression: 0.001206 seconds.  actor=snapshotter
INFO[0547] Successfully opened snapshot reader on etcd   actor=snapshotter
INFO[0578] Total written bytes: 6982844448               actor=compressor
INFO[0578] Total time to save full snapshot: 41.202852 seconds.  actor=snapshotter
INFO[0578] Successfully saved full snapshot at: Full-00000000-11716517-1727069410.gz  actor=snapshotter
INFO[0578] Applied watch on etcd from revision: 11716518  actor=snapshotter

Then restoration is triggered:

INFO[0007] Received start initialization request.        actor=backup-restore-server
INFO[0007] Updating status from New to Progress          actor=backup-restore-server
INFO[0007] Setting status to : 503                       actor=backup-restore-server
INFO[0007] Validation failBelowRevision:                 actor=backup-restore-server
INFO[0007] Validation mode: full                         actor=backup-restore-server
INFO[0007] Checking for data directory structure validity...
INFO[0007] Data directory structure invalid.
INFO[0007] Finding latest set of snapshot to recover from...
INFO[0007] Removing directory(default.etcd.part).
INFO[0007] Restoring from base snapshot: Full-00000000-11716517-1727069410.gz  actor=restorer
INFO[0007] start decompressing the snapshot with gzip compressionPolicy  actor=de-compressor
INFO[0009] Responding to status request with: Progress   actor=backup-restore-server
{"level":"warn","ts":"2024-09-23T13:04:23.060+0530","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"passthrough:///http://localhost:2379","attempt":0,"error":"rpc error: code = DeadlineExceeded desc = latest balancer error: connection error: desc = \"transport: Error while dialing dial tcp [::1]:2379: connect: connection refused\""}
ERRO[0010] failed to get status of etcd endPoint: http://localhost:2379 with error: context deadline exceeded
INFO[0012] successfully fetched data of base snapshot in 4.938676375 seconds [CompressionPolicy:gzip]  actor=restorer
{"level":"warn","ts":"2024-09-23T13:04:27.060+0530","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"passthrough:///http://localhost:2379","attempt":0,"error":"rpc error: code = DeadlineExceeded desc = latest balancer error: connection error: desc = \"transport: Error while dialing dial tcp [::1]:2379: connect: connection refused\""}
ERRO[0014] failed to get status of etcd endPoint: http://localhost:2379 with error: context deadline exceeded
INFO[0016] Responding to status request with: Progress   actor=backup-restore-server
{"level":"warn","ts":"2024-09-23T13:04:31.061+0530","caller":"clientv3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"passthrough:///http://localhost:2379","attempt":0,"error":"rpc error: code = DeadlineExceeded desc = latest balancer error: connection error: desc = \"transport: Error while dialing dial tcp [::1]:2379: connect: connection refused\""}
ERRO[0018] failed to get status of etcd endPoint: http://localhost:2379 with error: context deadline exceeded
{"level":"info","ts":1727076872.963309,"caller":"membership/cluster.go:392","msg":"added member","cluster-id":"cdf818194e3a8c32","local-member-id":"0","added-peer-id":"8e9e05c52164694d","added-peer-peer-urls":["http://localhost:2380"]}
INFO[0020] No delta snapshots present over base snapshot.  actor=restorer
INFO[0020] Removing directory(default.etcd).
INFO[0020] Successfully restored the etcd data directory.
INFO[0020] Successfully initialized data directory for etcd.  actor=backup-restore-server
  • Full snapshot is successfully taken with integrity check. ✅
  • Successfully restored the etcd from the full snapshot. ✅
  • Snapshot-Compaction is also working as expected. ✅

@anveshreddy18
Copy link
Contributor

/assign

@gardener-robot-ci-2 gardener-robot-ci-2 added reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) and removed reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) labels Sep 24, 2024
Copy link
Contributor

@anveshreddy18 anveshreddy18 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @ishan16696.

PR looks good, I just have 2 small suggestions. PTAL

pkg/etcdutil/etcdutil.go Outdated Show resolved Hide resolved
pkg/etcdutil/etcdutil.go Outdated Show resolved Hide resolved
@gardener-robot-ci-1 gardener-robot-ci-1 added reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) and removed reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) labels Sep 25, 2024
Copy link
Contributor

@anveshreddy18 anveshreddy18 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it and things are working good.
LGTM

@renormalize renormalize self-assigned this Sep 30, 2024
@seshachalam-yv
Copy link
Contributor

/assign

@shreyas-s-rao shreyas-s-rao added this to the v0.31.0 milestone Oct 8, 2024
@anveshreddy18 anveshreddy18 removed their assignment Oct 8, 2024
Copy link
Member

@renormalize renormalize left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this much needed PR @ishan16696!

Just a few comments from my side.

Thanks.

pkg/compactor/compactor.go Outdated Show resolved Hide resolved
cmd/compact.go Outdated Show resolved Hide resolved
pkg/etcdutil/etcdutil.go Outdated Show resolved Hide resolved
pkg/etcdutil/etcdutil.go Outdated Show resolved Hide resolved
pkg/etcdutil/etcdutil.go Outdated Show resolved Hide resolved
Copy link
Member

@renormalize renormalize left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another suggestion for better error handling, as discussed.

pkg/etcdutil/etcdutil.go Outdated Show resolved Hide resolved
@gardener-robot-ci-1 gardener-robot-ci-1 added reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) and removed reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) labels Oct 17, 2024
Copy link
Member

@renormalize renormalize left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny nit, but otherwise the PR looks in great condition.
Thanks.

pkg/etcdutil/etcdutil.go Show resolved Hide resolved
@gardener-robot-ci-1 gardener-robot-ci-1 added reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) and removed reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) labels Oct 18, 2024
Copy link
Member

@renormalize renormalize left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ishan16696 for filling the gaps in my understanding in validation, and addressing all review comments!

@renormalize
Copy link
Member

@ishan16696 could you rebase your PR on master? This will fix the failing integration tests.
Thanks.

@renormalize renormalize removed their assignment Oct 18, 2024
@gardener-robot-ci-2 gardener-robot-ci-2 added the reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) label Oct 22, 2024
@gardener-robot-ci-1 gardener-robot-ci-1 added reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) and removed reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) labels Oct 22, 2024
@gardener-robot-ci-1 gardener-robot-ci-1 added reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) and removed reviewed/ok-to-test Has approval for testing (check PR in detail before setting this label because PR is run on CI/CD) labels Nov 8, 2024
@ishan16696 ishan16696 modified the milestones: v0.31.0, v0.32.0, v0.33.0 Nov 25, 2024
Copy link
Contributor

@seshachalam-yv seshachalam-yv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original TakeAndSaveFullSnapshot function is broken down into smaller, more focused functions:
- createEtcdSnapshot: Initiates the etcd snapshot process.
- verifyFullSnapshotIntegrity: Verifies the integrity of the full snapshot using SHA256.
- validateSnapshotSHA256: Checks the SHA256 hash appended to the snapshot.
- compressSnapshotData: Compresses the snapshot data if compression is enabled.
- saveSnapshotToStore: Saves the snapshot to the SnapStore.
- resetFilePointer: Resets the file pointer to the beginning of the file.
- cleanUpTempFile: Removes the temporary file used during snapshot verification.

Utilized bufio.Reader.ReadFull for reading data and io.Copy with io.LimitReader for copying data to improve efficiency and handle large files more effectively.

These changes aim to make the code more modular, easier to test, and maintain.

pkg/etcdutil/etcdutil.go Show resolved Hide resolved
Comment on lines +324 to +407

// checkFullSnapshotIntegrity verifies the integrity of the full snapshot by comparing
// the appended SHA256 hash of the full snapshot with the calculated SHA256 hash of the full snapshot data.
func checkFullSnapshotIntegrity(snapshotData io.ReadCloser, snapTempDBFilePath string, logger *logrus.Entry) (io.ReadCloser, error) {
logger.Info("checking the full snapshot integrity with the help of SHA256")

// If previous temp db file already exist then remove it.
if err := os.Remove(snapTempDBFilePath); err != nil && !os.IsNotExist(err) {
return nil, err
}

db, err := os.OpenFile(snapTempDBFilePath, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return nil, err
}

if _, err := io.Copy(db, snapshotData); err != nil {
return nil, err
}

lastOffset, err := db.Seek(0, io.SeekEnd)
if err != nil {
return nil, err
}
// 512 is chosen because it's a minimum disk sector size in most systems.
hasHash := (lastOffset % 512) == sha256.Size
if !hasHash {
return nil, fmt.Errorf("SHA256 hash seems to be missing from snapshot data")
}

totalSnapshotBytes, err := db.Seek(-sha256.Size, io.SeekEnd)
if err != nil {
return nil, err
}

// get snapshot SHA256 hash
sha := make([]byte, sha256.Size)
if _, err := db.Read(sha); err != nil {
return nil, fmt.Errorf("failed to read SHA256 from snapshot data %v", err)
}

buf := make([]byte, hashBufferSize)
hash := sha256.New()

logger.Infof("Total no. of bytes received from snapshot api call with SHA: %d", lastOffset)
logger.Infof("Total no. of bytes received from snapshot api call without SHA: %d", totalSnapshotBytes)

// reset the file pointer back to starting
currentOffset, err := db.Seek(0, io.SeekStart)
if err != nil {
return nil, err
}

for currentOffset+hashBufferSize <= totalSnapshotBytes {
offset, err := db.Read(buf)
if err != nil {
return nil, fmt.Errorf("unable to read snapshot data into buffer to calculate SHA256: %v", err)
}

hash.Write(buf[:offset])
currentOffset += int64(offset)
}

if currentOffset < totalSnapshotBytes {
if _, err := db.Read(buf); err != nil {
return nil, fmt.Errorf("unable to read last chunk of snapshot data into buffer to calculate SHA256: %v", err)
}

hash.Write(buf[:totalSnapshotBytes-currentOffset])
}

dbSha := hash.Sum(nil)
if !bytes.Equal(sha, dbSha) {
return nil, fmt.Errorf("expected SHA256 for full snapshot: %v, got %v", sha, dbSha)
}

// reset the file pointer back to starting
if _, err := db.Seek(0, io.SeekStart); err != nil {
return nil, err
}

// full-snapshot of database has been successfully verified.
return db, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted certain operations, such as snapshot creation (createEtcdSnapshot) and integrity verification (verifyFullSnapshotIntegrity), into separate functions. This improves readability, maintainability, and allows for better unit testing of individual functions.

Suggested change
// checkFullSnapshotIntegrity verifies the integrity of the full snapshot by comparing
// the appended SHA256 hash of the full snapshot with the calculated SHA256 hash of the full snapshot data.
func checkFullSnapshotIntegrity(snapshotData io.ReadCloser, snapTempDBFilePath string, logger *logrus.Entry) (io.ReadCloser, error) {
logger.Info("checking the full snapshot integrity with the help of SHA256")
// If previous temp db file already exist then remove it.
if err := os.Remove(snapTempDBFilePath); err != nil && !os.IsNotExist(err) {
return nil, err
}
db, err := os.OpenFile(snapTempDBFilePath, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
if _, err := io.Copy(db, snapshotData); err != nil {
return nil, err
}
lastOffset, err := db.Seek(0, io.SeekEnd)
if err != nil {
return nil, err
}
// 512 is chosen because it's a minimum disk sector size in most systems.
hasHash := (lastOffset % 512) == sha256.Size
if !hasHash {
return nil, fmt.Errorf("SHA256 hash seems to be missing from snapshot data")
}
totalSnapshotBytes, err := db.Seek(-sha256.Size, io.SeekEnd)
if err != nil {
return nil, err
}
// get snapshot SHA256 hash
sha := make([]byte, sha256.Size)
if _, err := db.Read(sha); err != nil {
return nil, fmt.Errorf("failed to read SHA256 from snapshot data %v", err)
}
buf := make([]byte, hashBufferSize)
hash := sha256.New()
logger.Infof("Total no. of bytes received from snapshot api call with SHA: %d", lastOffset)
logger.Infof("Total no. of bytes received from snapshot api call without SHA: %d", totalSnapshotBytes)
// reset the file pointer back to starting
currentOffset, err := db.Seek(0, io.SeekStart)
if err != nil {
return nil, err
}
for currentOffset+hashBufferSize <= totalSnapshotBytes {
offset, err := db.Read(buf)
if err != nil {
return nil, fmt.Errorf("unable to read snapshot data into buffer to calculate SHA256: %v", err)
}
hash.Write(buf[:offset])
currentOffset += int64(offset)
}
if currentOffset < totalSnapshotBytes {
if _, err := db.Read(buf); err != nil {
return nil, fmt.Errorf("unable to read last chunk of snapshot data into buffer to calculate SHA256: %v", err)
}
hash.Write(buf[:totalSnapshotBytes-currentOffset])
}
dbSha := hash.Sum(nil)
if !bytes.Equal(sha, dbSha) {
return nil, fmt.Errorf("expected SHA256 for full snapshot: %v, got %v", sha, dbSha)
}
// reset the file pointer back to starting
if _, err := db.Seek(0, io.SeekStart); err != nil {
return nil, err
}
// full-snapshot of database has been successfully verified.
return db, nil
}
// compressSnapshotDataIfNeeded compresses the snapshot data if compression is enabled.
func compressSnapshotDataIfNeeded(snapshotData io.ReadCloser, compressionConfig *compressor.CompressionConfig, logger *logrus.Entry) (io.ReadCloser, error) {
if compressionConfig != nil && compressionConfig.Enabled {
startTime := time.Now()
logger.Infof("Compression enabled. Starting compression of snapshot data.")
compressedData, err := compressor.CompressSnapshot(snapshotData, compressionConfig.CompressionPolicy)
if err != nil {
logger.Errorf("Failed to compress snapshot data: %v", err)
return nil, fmt.Errorf("unable to obtain reader for compressed file: %v", err)
}
logger.Infof("Total time taken in full snapshot compression: %f seconds.", time.Since(startTime).Seconds())
return compressedData, nil
}
return snapshotData, nil
}
// createEtcdSnapshot initiates the etcd snapshot process.
func createEtcdSnapshot(ctx context.Context, client client.MaintenanceCloser, logger *logrus.Entry) (io.ReadCloser, error) {
startTime := time.Now()
snapshotReader, err := client.Snapshot(ctx)
if err != nil {
return nil, &errors.EtcdError{
Message: fmt.Sprintf("Failed to create etcd snapshot: %v", err),
}
}
logger.Infof("Total time taken by Snapshot API: %f seconds.", time.Since(startTime).Seconds())
return snapshotReader, nil
}
// verifyFullSnapshotIntegrity verifies the integrity of the full snapshot.
func verifyFullSnapshotIntegrity(snapshotData io.ReadCloser, snapTempDBFilePath string, logger *logrus.Entry) (io.ReadCloser, error) {
logger.Info("Verifying full snapshot integrity using SHA256")
// Remove previous temporary DB file if it exists
if err := os.Remove(snapTempDBFilePath); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to remove previous temp DB file %s: %v", snapTempDBFilePath, err)
}
db, err := os.OpenFile(snapTempDBFilePath, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return nil, fmt.Errorf("failed to create temporary DB file at %s: %v", snapTempDBFilePath, err)
}
buf := make([]byte, hashBufferSize)
if _, err := io.CopyBuffer(db, snapshotData, buf); err != nil {
return nil, fmt.Errorf("failed to copy snapshot data to temporary DB file %s: %v", snapTempDBFilePath, err)
}
// Verify SHA256 hash
if err := validateSnapshotSHA256(db, logger); err != nil {
return nil, err
}
// Reset the file pointer back to the beginning
if err := resetFilePointer(db); err != nil {
return nil, err
}
return db, nil
}
// validateSnapshotSHA256 checks the SHA256 hash appended to the snapshot.
func validateSnapshotSHA256(db *os.File, logger *logrus.Entry) error {
lastOffset, err := db.Seek(0, io.SeekEnd)
if err != nil {
return fmt.Errorf("failed to seek to end of file %s: %v", db.Name(), err)
}
totalSnapshotBytes, err := db.Seek(-sha256.Size, io.SeekEnd)
if err != nil {
return fmt.Errorf("failed to seek to SHA256 offset in file %s: %v", db.Name(), err)
}
// Get snapshot SHA256 hash
sha := make([]byte, sha256.Size)
if _, err := io.ReadFull(db, sha); err != nil {
return fmt.Errorf("failed to read SHA256 from snapshot data in file %s: %v", db.Name(), err)
}
hash := sha256.New()
logger.Infof("Total bytes received from snapshot API call (including SHA256 hash): %d", lastOffset)
logger.Infof("Total bytes received from snapshot API call (excluding SHA256 hash): %d", totalSnapshotBytes)
// Reset file pointer and calculate hash
if err := resetFilePointer(db); err != nil {
return fmt.Errorf("failed to reset file pointer for file %s: %v", db.Name(), err)
}
limitedReader := io.LimitReader(db, totalSnapshotBytes)
buf := make([]byte, hashBufferSize)
if _, err := io.CopyBuffer(hash, limitedReader, buf); err != nil {
return fmt.Errorf("failed to calculate SHA256 for file %s: %v", db.Name(), err)
}
dbSha := hash.Sum(nil)
if !bytes.Equal(sha, dbSha) {
return fmt.Errorf("expected SHA256 for full snapshot: %x, got %x", sha, dbSha)
}
return nil
}
// saveSnapshotToStore saves the snapshot to the SnapStore.
func saveSnapshotToStore(store brtypes.SnapStore, snapshot *brtypes.Snapshot, snapshotData io.ReadCloser, startTime time.Time, logger *logrus.Entry) error {
if err := store.Save(*snapshot, snapshotData); err != nil {
timeTaken := time.Since(startTime)
metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(timeTaken.Seconds())
return &errors.SnapstoreError{
Message: fmt.Errorf("failed to save snapshot: %w", err).Error(),
}
}
timeTaken := time.Since(startTime)
metrics.SnapshotDurationSeconds.With(prometheus.Labels{metrics.LabelKind: brtypes.SnapshotKindFull, metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Observe(timeTaken.Seconds())
return nil
}
// resetFilePointer resets the file pointer to the beginning of the file.
func resetFilePointer(db *os.File) error {
if _, err := db.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("failed to reset file pointer: %w", err)
}
return nil
}
// cleanUpTempFile removes the temporary file used during snapshot verification.
func cleanUpTempFile(filePath string, logger *logrus.Entry) {
if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) {
logger.Warnf("Failed to remove temporary full snapshot file: %v", err)
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// resetFilePointer resets the file pointer to the beginning of the file.
func resetFilePointer(db *os.File) error {
if _, err := db.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("failed to reset file pointer: %w", err)
}
return nil
}
// cleanUpTempFile removes the temporary file used during snapshot verification.
func cleanUpTempFile(filePath string, logger *logrus.Entry) {
if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) {
logger.Warnf("Failed to remove temporary full snapshot file: %v", err)
}
}

I wouldn't like to create a separate function for one liner.

Copy link
Member Author

@ishan16696 ishan16696 Dec 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// compressSnapshotDataIfNeeded compresses the snapshot data if compression is enabled.
func compressSnapshotDataIfNeeded(snapshotData io.ReadCloser, compressionConfig *compressor.CompressionConfig, logger *logrus.Entry) (io.ReadCloser, error) {
if compressionConfig != nil && compressionConfig.Enabled {
startTime := time.Now()
logger.Infof("Compression enabled. Starting compression of snapshot data.")
compressedData, err := compressor.CompressSnapshot(snapshotData, compressionConfig.CompressionPolicy)
if err != nil {
logger.Errorf("Failed to compress snapshot data: %v", err)
return nil, fmt.Errorf("unable to obtain reader for compressed file: %v", err)
}
logger.Infof("Total time taken in full snapshot compression: %f seconds.", time.Since(startTime).Seconds())
return compressedData, nil
}
return snapshotData, nil
}

for this as I mentioned in #779 (comment). I wouldn't like to refactor this as this is out of scope of this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buf := make([]byte, hashBufferSize)
if _, err := io.CopyBuffer(db, snapshotData, buf); err != nil {
return nil, fmt.Errorf("failed to copy snapshot data to temporary DB file %s: %v", snapTempDBFilePath, err)
}
// Verify SHA256 hash
if err := validateSnapshotSHA256(db, logger); err != nil {
return nil, err
}
// Reset the file pointer back to the beginning
if err := resetFilePointer(db); err != nil {
return nil, err
}

this is wrong code, this is not how it's working.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I know what you mean by wrong.

Copy link
Member Author

@ishan16696 ishan16696 Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we have to read the data of snapshots in small buffer and keep calculating the hash

	buf := make([]byte, hashBufferSize)
	hash := sha256.New()

	// reset the file pointer back to starting
	currentOffset, err := db.Seek(0, io.SeekStart)
	if err != nil {
		return nil, err
	}

	for currentOffset+hashBufferSize <= totalSnapshotBytes {
		offset, err := db.Read(buf)
		if err != nil {
			return nil, fmt.Errorf("unable to read snapshot data into buffer to calculate SHA256: %v", err)
		}

		hash.Write(buf[:offset])
		currentOffset += int64(offset)
	}

I see you are using io.LimitReader but IMO we want to avoid that as we don't want to load all db data (~8Gi in worst case) into memory, that's why we went with this approach of reading the data in small chunk, so that there won't be any memory spike as you have yourself has verified this: #779 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io.LimitReader does not load all the data (~8Gi in the worst case) into memory at once. Instead, it ensures that at most totalSnapshotBytes are read from the underlying db, and the actual memory usage is determined by the buffer size provided to io.CopyBuffer (in this case, hashBufferSize). This keeps the memory footprint small and controlled while simplifying the code by removing manual offset tracking and loop management.

If there are still concerns about the behavior or specific cases where io.LimitReader might not meet expectations, I’d be happy to investigate further. Let me know if you’d like me to profile this or make adjustments!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, if you're confident about this io.LimitReader, then let me explore this once and get back to you.

@@ -254,7 +263,7 @@ func GetEtcdEndPointsSorted(ctx context.Context, clientMaintenance client.Mainte
}

// TakeAndSaveFullSnapshot takes full snapshot and save it to store
func TakeAndSaveFullSnapshot(ctx context.Context, client client.MaintenanceCloser, store brtypes.SnapStore, lastRevision int64, cc *compressor.CompressionConfig, suffix string, isFinal bool, logger *logrus.Entry) (*brtypes.Snapshot, error) {
func TakeAndSaveFullSnapshot(ctx context.Context, client client.MaintenanceCloser, store brtypes.SnapStore, tempDir string, lastRevision int64, cc *compressor.CompressionConfig, suffix string, isFinal bool, logger *logrus.Entry) (*brtypes.Snapshot, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func TakeAndSaveFullSnapshot(ctx context.Context, client client.MaintenanceCloser, store brtypes.SnapStore, tempDir string, lastRevision int64, cc *compressor.CompressionConfig, suffix string, isFinal bool, logger *logrus.Entry) (*brtypes.Snapshot, error) {
// TakeAndSaveFullSnapshot takes a full snapshot of the etcd database, verifies its integrity,
// optionally compresses it, and saves it to the specified snapshot store.
func TakeAndSaveFullSnapshot(ctx context.Context, client client.MaintenanceCloser, store brtypes.SnapStore, tempDir string, lastRevision int64, compressionConfig *compressor.CompressionConfig, suffix string, isFinal bool, logger *logrus.Entry) (*brtypes.Snapshot, error) {
snapshotStartTime := time.Now()
logger.Infof("Starting full snapshot process. Last revision: %d, TempDir: %s", lastRevision, tempDir)
snapshotReader, err := createEtcdSnapshot(ctx, client, logger)
if err != nil {
logger.Errorf("Failed to create etcd snapshot: %v", err)
return nil, err
}
defer snapshotReader.Close()
snapshotTempDBPath := filepath.Join(tempDir, "db")
logger.Infof("Temporary DB path for snapshot verification: %s", snapshotTempDBPath)
// Verify snapshot integrity
verifiedSnapshotReader, err := verifyFullSnapshotIntegrity(snapshotReader, snapshotTempDBPath, logger)
if err != nil {
logger.Errorf("Verification of full snapshot SHA256 hash failed: %v", err)
return nil, err
}
defer cleanUpTempFile(snapshotTempDBPath, logger)
logger.Info("Full snapshot SHA256 hash successfully verified.")
verifiedSnapshotReader, err = compressSnapshotDataIfNeeded(verifiedSnapshotReader, compressionConfig, logger)
if err != nil {
return nil, err
}
logger.Infof("Successfully opened snapshot reader on etcd")
// Save the snapshot to the store
fullSnapshot := snapstore.NewSnapshot(brtypes.SnapshotKindFull, 0, lastRevision, suffix, isFinal)
logger.Infof("Saving full snapshot to store. Snapshot details: %+v", fullSnapshot)
if err := saveSnapshotToStore(store, fullSnapshot, verifiedSnapshotReader, snapshotStartTime, logger); err != nil {
logger.Errorf("Failed to save snapshot to store: %v", err)
return nil, err
}
logger.Infof("Total time to save full snapshot: %f seconds.", time.Since(snapshotStartTime).Seconds())
return fullSnapshot, nil
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not like to change the function TakeAndSaveFullSnapshot() as this is not a refactoring PR.
Feel free to open a separate issue if you want to refactor TakeAndSaveFullSnapshot().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are touching this function and adding more 15+ lines, I don't want to grow this function. This won't help readability of code. Thats why I suggest to move the code into smaller functions, each function name clearly conveys the functionality

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, but I would not like to do that in the same PR for following reasons:

  1. That will change the scope of PR from adding feature PR to refactoring PR which will be hard to track after 1year (say) if some bug is found in some part of code.
  2. With this refactoring change, it will create a huge overhead of manual testing, performance testing etc like taking snapshots, compression of snapshots, restoration of compressed and uncompressed snapshots + verification of snapshots (feature is being added) all needs to be fully tested.
  3. It will completely change the scope of the PR, which will also invalidate the LGTM'ed of reviewers those who has already reviewed the PR as it will become a new PR.

That's I don't want to do this in the same PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point 1: Do you see this as a major refactor? It’s essentially just restructuring a few existing lines into two functions for better readability.

Point 2: This point isn’t entirely valid, as we are already planning to test this functionality as part of the release process. This includes verifying the new integrity checks that have been added.

Point 3: Whenever we introduce new changes or code, it’s common to make minor adjustments to the surrounding logic for better structure. In this case, it’s simply wrapping the logic into smaller, reusable functions.

That said, this isn’t a significant code change. Let’s get some thoughts from other reviewers as well.

WDYT, @anveshreddy18 and @renormalize?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you see this as a major refactor? I

yes, I do because of point 2
I have mentioned the my reasoning in this comment #779 (comment), I'm not making this PR as refactoring PR. Please feel free to open a separate issue later if somebody have time they can pick it up.

Copy link
Member Author

@ishan16696 ishan16696 Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This point isn’t entirely valid, as we are already planning to test this functionality as part of the release process. This includes verifying the new integrity checks that have been added.

point 2 is totally valid, Do currently we have e2e tests which will do what I have mentioned in point 2 or not ? the answer is no, then it will be create a huge overhead.

we are already planning to test this functionality as part of the release process.

Currently in release process nobody do the performance testing ... question is not about future, question is about do we have it now or not ? I don't think so, we have it and I don't see it coming in near by weeks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems your concern is about not adding the functions createEtcdSnapshot, saveSnapshotToStore, and compressSnapshotDataIfNeeded to encapsulate the existing logic.

I’m okay with that approach. Are you aligned with the other review comments? Let me know if there’s anything else to discuss!

@shreyas-s-rao shreyas-s-rao self-assigned this Dec 9, 2024
Copy link
Collaborator

@shreyas-s-rao shreyas-s-rao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ishan16696 thanks for the much needed PR which verifies full snapshot integrity before pushing it to the store. I have a few comments about the handling of the file descriptors and ReadCloser objects in general. PTAL.

I also checked @seshachalam-yv 's remaining open comment and I feel the structure suggested by him is better than the existing code, in terms of readability. I would request you to incorporate this comment as well. In terms of refactoring, it's not much of a change, since it just brings certain lines of code within the TakeAndSaveFullSnapshot function into smaller functions, and makes it easier to read, review and later modify the code if necessary. Your existing test results should still apply, since the requested refactoring doesn't really change the way the integrity check is done or the number of file descriptors or ReadClosers created to do so.

return nil, err
}

db, err := os.OpenFile(snapTempDBFilePath, os.O_RDWR|os.O_CREATE, 0600)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing db.Close() after opening the file. I see that the db file is returned as is at the end of the function, so can you please add a note in the docstring for this function for the caller to take care of closing the file once they read it? I don't see a snapshotData.Close() in the caller function as well, but just a os.Remove() of the file, which is not sufficient, since that can lead to memory leaks.

if err != nil {
return nil, fmt.Errorf("unable to obtain reader for compressed file: %v", err)
}
timeTakenCompression := time.Since(startTimeCompression)
logger.Infof("Total time taken in full snapshot compression: %f seconds.", timeTakenCompression.Seconds())
}
defer rc.Close()
defer func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move the defer snapshotData.Close() towards the top, to be declared as soon as the snapshotData object is created.

if err != nil {
return nil, fmt.Errorf("unable to obtain reader for compressed file: %v", err)
}
timeTakenCompression := time.Since(startTimeCompression)
logger.Infof("Total time taken in full snapshot compression: %f seconds.", timeTakenCompression.Seconds())
}
defer rc.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that rc is no longer being closed anywhere in the new code, which can cause memory leak. I would recommend you re-use the rc variable rather than creating a new variable named snapshotData. And the defer rc.Close() should be called towards the beginning of the function, where the rc object is first created. Please make this change.

@@ -265,22 +274,40 @@ func TakeAndSaveFullSnapshot(ctx context.Context, client client.MaintenanceClose
timeTaken := time.Since(startTime)
logger.Infof("Total time taken by Snapshot API: %f seconds.", timeTaken.Seconds())

var snapshotData io.ReadCloser
snapshotTempDBPath := filepath.Join(tempDir, "db")
if snapshotData, err = checkFullSnapshotIntegrity(rc, snapshotTempDBPath, logger); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if snapshotData, err = checkFullSnapshotIntegrity(rc, snapshotTempDBPath, logger); err != nil {
if rc, err = checkFullSnapshotIntegrity(rc, snapshotTempDBPath, logger); err != nil {

@@ -254,7 +263,7 @@ func GetEtcdEndPointsSorted(ctx context.Context, clientMaintenance client.Mainte
}

// TakeAndSaveFullSnapshot takes full snapshot and save it to store
func TakeAndSaveFullSnapshot(ctx context.Context, client client.MaintenanceCloser, store brtypes.SnapStore, lastRevision int64, cc *compressor.CompressionConfig, suffix string, isFinal bool, logger *logrus.Entry) (*brtypes.Snapshot, error) {
func TakeAndSaveFullSnapshot(ctx context.Context, client client.MaintenanceCloser, store brtypes.SnapStore, tempDir string, lastRevision int64, cc *compressor.CompressionConfig, suffix string, isFinal bool, logger *logrus.Entry) (*brtypes.Snapshot, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adapt the function docstring to reflect the new responsibilities of the function - fetch snapshot from etcd, verify integrity, compress it if enabled, save the snapshot to object storage bucket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs/ok-to-test Needs approval for testing (check PR in detail before setting this label because PR is run on CI/CD) needs/review Needs review size/m Size of pull request is medium (see gardener-robot robot/bots/size.py)
Projects
None yet
8 participants