Skip to content

Commit

Permalink
refactor(wal): add error handlings (#506)
Browse files Browse the repository at this point in the history
* refactor(wal): improve readability
  • Loading branch information
dakimura authored Sep 17, 2021
1 parent e9212d8 commit 271bd23
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 119 deletions.
2 changes: 1 addition & 1 deletion cmd/tool/wal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ func executeWAL(cmd *cobra.Command, args []string) error {
wf.FilePtr = filePtr

// Execute.
return wf.Replay(false)
return wf.Replay(true)
}
9 changes: 8 additions & 1 deletion executor/cachedfp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package executor

import "os"
import (
"fmt"
"os"
)

type CachedFP struct {
fileName string
Expand Down Expand Up @@ -31,3 +34,7 @@ func (cfp *CachedFP) Close() error {
}
return nil
}

func (cfp *CachedFP) String() string {
return fmt.Sprintf("CachedFP(fileName: %s)", cfp.fileName)
}
13 changes: 13 additions & 0 deletions executor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executor

import (
"fmt"
"strconv"

"github.com/alpacahq/marketstore/v4/utils/io"
"github.com/alpacahq/marketstore/v4/utils/log"
Expand Down Expand Up @@ -62,6 +63,18 @@ func (msg WALWriteError) Error() string {
return errReport("%s: Error Writing to WAL", string(msg))
}

// WALReplayError is used when the WALfile Replay process fails.
// If skipReplay:true, it will attempt to give up the Replay process,
// move the walfile to a temporary file, and continue with other marketstore processing.
type WALReplayError struct {
msg string
skipReplay bool
}

func (e WALReplayError) Error() string {
return errReport("%s: Error Replaying WAL. skipReplay="+strconv.FormatBool(e.skipReplay), e.msg)
}

func errReport(base string, msg string) string {
base = io.GetCallerFileContext(2) + ":" + base
log.Error(base, msg)
Expand Down
50 changes: 2 additions & 48 deletions executor/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func (wf *WALFileType) WriteTransactionInfo(tid int64, did DestEnum, txnStatus T
}
func (wf *WALFileType) readTransactionInfo() (tgid int64, destination DestEnum, txnStatus TxnStatusEnum, err error) {
var buffer [10]byte
buf, _, err := wal.Read(wf.FilePtr, -1, buffer[:])
buf, _, err := wal.Read(wf.FilePtr, buffer[:])
if err != nil {
return 0, 0, 0, wal.ShortReadError("WALFileType.readTransactionInfo")
}
Expand All @@ -465,52 +465,6 @@ func (wf *WALFileType) initMessage(mid MIDEnum) []byte {
func (wf *WALFileType) writeMessageID(mid MIDEnum) {
wf.write(wf.initMessage(mid))
}
func (wf *WALFileType) readMessageID() (mid MIDEnum, err error) {
var buffer [1]byte
buf, _, err := wal.Read(wf.FilePtr, -1, buffer[:])
if err != nil {
return 0, wal.ShortReadError("WALFileType.ReadMessageID")
}
MID := MIDEnum(buf[0])
switch MID {
case TGDATA, TXNINFO, STATUS:
return MID, nil
}
return 99, fmt.Errorf("WALFileType.ReadMessageID Incorrect MID read, value: %d", MID)
}
func (wf *WALFileType) readTGData() (TGID int64, TG_Serialized []byte, err error) {
TGLen_Serialized := make([]byte, 8)
TGLen_Serialized, _, err = wal.Read(wf.FilePtr, -1, TGLen_Serialized)
if err != nil {
return 0, nil, wal.ShortReadError(io.GetCallerFileContext(0))
}
TGLen := io.ToInt64(TGLen_Serialized)

if !sanityCheckValue(wf.FilePtr, TGLen) {
return 0, nil, fmt.Errorf(io.GetCallerFileContext(0) + fmt.Sprintf(": Insane TG Length: %d", TGLen))
}

// Read the data
TG_Serialized = make([]byte, TGLen)
n, err := wf.FilePtr.Read(TG_Serialized)
if int64(n) != TGLen || err != nil {
return 0, nil, wal.ShortReadError(io.GetCallerFileContext(0) + ":Reading Data")
}
TGID = io.ToInt64(TG_Serialized[:7])

// Read the checksum
checkBuf := make([]byte, 16)
n, err = wf.FilePtr.Read(checkBuf)
if n != 16 || err != nil {
return 0, nil, wal.ShortReadError(io.GetCallerFileContext(0) + ":Reading Checksum")
}

if err := validateCheckSum(TGLen_Serialized, TG_Serialized, checkBuf); err != nil {
return 0, nil, err
}

return TGID, TG_Serialized, nil
}

func validateCheckSum(tgLenSerialized, tgSerialized, checkBuf []byte) error {
// compute the checksum
Expand Down Expand Up @@ -698,7 +652,7 @@ func (wf *WALFileType) cleanupOldWALFiles(rootDir string) error {
if err != nil {
return fmt.Errorf("opening %s: %w", filename, err)
}
if err = w.Replay(true); err != nil {
if err = w.Replay(false); err != nil {
return fmt.Errorf("unable to replay %s: %w", filename, err)
}

Expand Down
22 changes: 9 additions & 13 deletions executor/wal/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,27 @@ const (

func ReadStatus(filePtr *os.File) (fileStatus FileStatusEnum, replayStatus ReplayStateEnum, OwningInstanceID int64, err error) {
var buffer [10]byte
buf, _, err := Read(filePtr, -1, buffer[:])
buf, _, err := Read(filePtr, buffer[:])
return FileStatusEnum(buf[0]), ReplayStateEnum(buf[1]), io.ToInt64(buf[2:]), err
}

func Read(fp *os.File, targetOffset int64, buffer []byte) (result []byte, newOffset int64, err error) {
/*
Read from the WAL file
targetOffset: -1 will read from current position
*/
// Read reads the WAL file from current position
func Read(fp *os.File, buffer []byte) (result []byte, newOffset int64, err error) {
offset, err := fp.Seek(0, io2.SeekCurrent)
if err != nil {
log.Fatal(io.GetCallerFileContext(0) + ": Unable to seek in WALFile")
}
if targetOffset != -1 {
if offset != targetOffset {
fp.Seek(targetOffset, io2.SeekStart)
}
log.Error(io.GetCallerFileContext(0) + ": Unable to seek in WALFile")
return nil, 0, fmt.Errorf("unable to seek in WALFile from curpos:%w", err)
}

numToRead := len(buffer)
n, err := fp.Read(buffer)
if n != numToRead {
msg := fmt.Sprintf("Read: Expected: %d Got: %d", numToRead, n)
err = ShortReadError(msg)
} else if err != nil {
log.Fatal(io.GetCallerFileContext(0) + ": Unable to read WALFile")
log.Error(io.GetCallerFileContext(0) + ": Unable to read WALFile")
err = fmt.Errorf("unable to read WALFile:%w", err)
}

return buffer, offset + int64(n), err
}
4 changes: 2 additions & 2 deletions executor/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestBrokenWAL(t *testing.T) {
assert.Nil(t, err)
newTGC := executor.NewTransactionPipe()
assert.NotNil(t, newTGC)
err = WALFile.Replay(true)
err = WALFile.Replay(false)
assert.Nil(t, err)

err = WALFile.Delete(WALFile.OwningInstanceID)
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestWALReplay(t *testing.T) {
assert.True(t, compareFileToBuf(t, fileContentsOriginal2002, queryFiles2002))

// Replay the WALFile into the new cache
err = WALFile.Replay(true)
err = WALFile.Replay(false)
assert.Nil(t, err)

// Verify that the files are in the correct state after replay
Expand Down
Loading

0 comments on commit 271bd23

Please sign in to comment.