diff --git a/executor/cachedfp.go b/executor/cachedfp.go index 906f13db..8343ace0 100644 --- a/executor/cachedfp.go +++ b/executor/cachedfp.go @@ -22,7 +22,7 @@ func (cfp *CachedFP) GetFP(fileName string) (fp *os.File, err error) { } cfp.fp, err = os.OpenFile(fileName, os.O_RDWR, 0700) if err != nil { - return nil, err + return nil, fmt.Errorf("open cached filepath: %w", err) } cfp.fileName = fileName return cfp.fp, nil diff --git a/executor/wal.go b/executor/wal.go index 2d27c0a3..0851a393 100644 --- a/executor/wal.go +++ b/executor/wal.go @@ -641,10 +641,17 @@ func (wf *WALFileType) cleanupOldWALFiles(rootDir string) error { log.Info("Found a WALFILE: %s, entering replay...", filename) filePath := filepath.Join(rootDir, filename) - fi, _ := os.Stat(filePath) + fi, err := os.Stat(filePath) + if err != nil { + log.Error("failed to get fileStat of " + filePath) + continue + } if fi.Size() < 11 { log.Info("WALFILE: %s is empty, removing it...", filename) - os.Remove(filePath) + err = os.Remove(filePath) + if err != nil { + log.Error("failed to remove an empty WALfile", filename) + } continue } diff --git a/executor/walreplay.go b/executor/walreplay.go index 78b27738..d0d022fa 100644 --- a/executor/walreplay.go +++ b/executor/walreplay.go @@ -28,9 +28,11 @@ func (wf *WALFileType) Replay(dryRun bool) error { return fmt.Errorf("check if walfile needs to be replayed: %w", err) } if !needsReplay { - err := fmt.Errorf("WALFileType.NeedsReplay No Replay Needed") - log.Info(err.Error()) - return err + log.Info("No WAL Replay needed.") + return WALReplayError{ + msg: "WALFileType.NeedsReplay No Replay Needed", + skipReplay: true, + } } // Take control of this file and set the status @@ -63,7 +65,10 @@ func (wf *WALFileType) Replay(dryRun bool) error { switch msgID { case TGDATA: // Read a TGData - offset, _ := wf.FilePtr.Seek(0, goio.SeekCurrent) + offset, err := wf.FilePtr.Seek(0, goio.SeekCurrent) + if err != nil { + return fmt.Errorf("seek error: %w", err) + } tgID, tgSerialized, err := wf.readTGData() tgData[tgID] = tgSerialized if continueRead = fullRead(err); !continueRead { @@ -129,18 +134,24 @@ func (wf *WALFileType) Replay(dryRun bool) error { //for tgid, TG_Serialized := range tgData { for _, tgid := range sortedTGIDs { tgSerialized := tgData[tgid] - if tgSerialized != nil { - // Note that only TG data that did not have a COMMITCOMPLETE record are replayed - if !dryRun { - rootDir := filepath.Dir(wf.FilePtr.Name()) - tgID, wtSets := ParseTGData(tgSerialized, rootDir) - if err := wf.replayTGData(tgID, wtSets); err != nil { - return fmt.Errorf("replay transaction group data. tgID=%d, "+ - "write transaction size=%d:%w", tgID, len(wtSets), err) - } - } + if tgSerialized == nil { + continue + } + + if dryRun { + continue + } + + // Note that only TG data that did not have a COMMITCOMPLETE record are replayed + rootDir := filepath.Dir(wf.FilePtr.Name()) + tgID, wtSets := ParseTGData(tgSerialized, rootDir) + if err := wf.replayTGData(tgID, wtSets); err != nil { + return fmt.Errorf("replay transaction group data. tgID=%d, "+ + "write transaction size=%d:%w", tgID, len(wtSets), err) } + } + log.Info("Replay of WAL file %s finished", wf.FilePtr.Name()) if !dryRun { wf.WriteStatus(wal.OPEN, wal.REPLAYED)