Skip to content

Commit

Permalink
refactor(wal): error handling and early return (#507)
Browse files Browse the repository at this point in the history
  • Loading branch information
dakimura authored Sep 17, 2021
1 parent 271bd23 commit 8d561eb
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 17 deletions.
2 changes: 1 addition & 1 deletion executor/cachedfp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (cfp *CachedFP) GetFP(fileName string) (fp *os.File, err error) {
}
cfp.fp, err = os.OpenFile(fileName, os.O_RDWR, 0700)
if err != nil {
return nil, err
return nil, fmt.Errorf("open cached filepath: %w", err)
}
cfp.fileName = fileName
return cfp.fp, nil
Expand Down
11 changes: 9 additions & 2 deletions executor/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,10 +641,17 @@ func (wf *WALFileType) cleanupOldWALFiles(rootDir string) error {

log.Info("Found a WALFILE: %s, entering replay...", filename)
filePath := filepath.Join(rootDir, filename)
fi, _ := os.Stat(filePath)
fi, err := os.Stat(filePath)
if err != nil {
log.Error("failed to get fileStat of " + filePath)
continue
}
if fi.Size() < 11 {
log.Info("WALFILE: %s is empty, removing it...", filename)
os.Remove(filePath)
err = os.Remove(filePath)
if err != nil {
log.Error("failed to remove an empty WALfile", filename)
}
continue
}

Expand Down
39 changes: 25 additions & 14 deletions executor/walreplay.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ func (wf *WALFileType) Replay(dryRun bool) error {
return fmt.Errorf("check if walfile needs to be replayed: %w", err)
}
if !needsReplay {
err := fmt.Errorf("WALFileType.NeedsReplay No Replay Needed")
log.Info(err.Error())
return err
log.Info("No WAL Replay needed.")
return WALReplayError{
msg: "WALFileType.NeedsReplay No Replay Needed",
skipReplay: true,
}
}

// Take control of this file and set the status
Expand Down Expand Up @@ -63,7 +65,10 @@ func (wf *WALFileType) Replay(dryRun bool) error {
switch msgID {
case TGDATA:
// Read a TGData
offset, _ := wf.FilePtr.Seek(0, goio.SeekCurrent)
offset, err := wf.FilePtr.Seek(0, goio.SeekCurrent)
if err != nil {
return fmt.Errorf("seek error: %w", err)
}
tgID, tgSerialized, err := wf.readTGData()
tgData[tgID] = tgSerialized
if continueRead = fullRead(err); !continueRead {
Expand Down Expand Up @@ -129,18 +134,24 @@ func (wf *WALFileType) Replay(dryRun bool) error {
//for tgid, TG_Serialized := range tgData {
for _, tgid := range sortedTGIDs {
tgSerialized := tgData[tgid]
if tgSerialized != nil {
// Note that only TG data that did not have a COMMITCOMPLETE record are replayed
if !dryRun {
rootDir := filepath.Dir(wf.FilePtr.Name())
tgID, wtSets := ParseTGData(tgSerialized, rootDir)
if err := wf.replayTGData(tgID, wtSets); err != nil {
return fmt.Errorf("replay transaction group data. tgID=%d, "+
"write transaction size=%d:%w", tgID, len(wtSets), err)
}
}
if tgSerialized == nil {
continue
}

if dryRun {
continue
}

// Note that only TG data that did not have a COMMITCOMPLETE record are replayed
rootDir := filepath.Dir(wf.FilePtr.Name())
tgID, wtSets := ParseTGData(tgSerialized, rootDir)
if err := wf.replayTGData(tgID, wtSets); err != nil {
return fmt.Errorf("replay transaction group data. tgID=%d, "+
"write transaction size=%d:%w", tgID, len(wtSets), err)
}

}

log.Info("Replay of WAL file %s finished", wf.FilePtr.Name())
if !dryRun {
wf.WriteStatus(wal.OPEN, wal.REPLAYED)
Expand Down

0 comments on commit 8d561eb

Please sign in to comment.