Skip to content

Commit

Permalink
Fix terminal status consistency problems (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
seilagamo authored Jul 3, 2023
1 parent a7be056 commit b360d78
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 50 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
*.swp
*.swo
.vscode/
.idea/
local.toml
local_autoscaling.toml
vendor/
.DS_Store

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/BurntSushi/toml v1.2.1
github.com/adevinta/vulcan-metrics-client v1.0.1
github.com/adevinta/vulcan-report v1.0.0
github.com/aws/aws-sdk-go v1.44.266
github.com/aws/aws-sdk-go v1.44.290
github.com/docker/cli v24.0.1+incompatible
github.com/docker/distribution v2.8.2+incompatible
github.com/docker/docker v24.0.1+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ github.com/adevinta/vulcan-metrics-client v1.0.1 h1:BAugnnRWvkA3vnuCX77W04PWhneZ
github.com/adevinta/vulcan-metrics-client v1.0.1/go.mod h1:we8vxfPMYQqZtOy42PJxsWwv2DwruSaT/wwNMxkum8I=
github.com/adevinta/vulcan-report v1.0.0 h1:44aICPZ+4svucgCSA5KmjlT3ZGzrvZXiSnkbnj6AC2k=
github.com/adevinta/vulcan-report v1.0.0/go.mod h1:k34KaeoXc3H77WNMwI9F4F1G28hBjB95PeMUp9oHbEE=
github.com/aws/aws-sdk-go v1.44.266 h1:MWd775dcYf7NrwgcHLtlsIbWoWkX8p4vomfNHr88zH0=
github.com/aws/aws-sdk-go v1.44.266/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go v1.44.290 h1:Md4+os9DQtJjow0lWLMzeJljsimD+XS2xwwHDr5Z+Lk=
github.com/aws/aws-sdk-go v1.44.290/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/danieljoos/wincred v1.1.0/go.mod h1:XYlo+eRTsVA9aHGp7NGjFkPla4m+DCL7hqDjlFjiygg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
42 changes: 33 additions & 9 deletions jobrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ type CheckStateUpdater interface {
UpdateState(stateupdater.CheckState) error
UploadCheckData(checkID, kind string, startedAt time.Time, content []byte) (string, error)
CheckStatusTerminal(ID string) bool
DeleteCheckStatusTerminal(ID string)
FlushCheckStatus(ID string) error
UpdateCheckStatusTerminal(stateupdater.CheckState)
}

// AbortedChecks defines the shape of the component needed by a Runner in order
Expand Down Expand Up @@ -207,6 +208,13 @@ func (cr *Runner) runJob(m queue.Message, t interface{}, processed chan bool) {
return
}
cr.Logger.Errorf("error max processed times exceeded for check: %s", j.CheckID)
// We flush the terminal status and send the final status to the writer.
err = cr.CheckUpdater.FlushCheckStatus(j.CheckID)
if err != nil {
err = fmt.Errorf("error deleting the terminal status of the check: %s, error: %w", j.CheckID, err)
cr.finishJob(j, status, processed, false, err)
return
}
cr.finishJob(j, status, processed, true, nil)
return
}
Expand Down Expand Up @@ -288,20 +296,16 @@ func (cr *Runner) runJob(m queue.Message, t interface{}, processed chan bool) {
// so we remove it from aborter.
cr.cAborter.Remove(j.CheckID)

// We query if the check has sent any status update with a terminal status.
isterminal := cr.CheckUpdater.CheckStatusTerminal(j.CheckID)
// We signal the CheckUpdater that we don't need it to store that
// information anymore.
cr.CheckUpdater.DeleteCheckStatusTerminal(j.CheckID)

// Try always to upload the logs of the check if present.
if res.Output != nil {
logsLink, err = cr.CheckUpdater.UploadCheckData(j.CheckID, "logs", j.StartTime, res.Output)
if err != nil {
err = fmt.Errorf("error storing the logs of the check: %s, error %w", j.CheckID, err)
cr.finishJob(j, "uploading_raw", processed, false, err)
// We return to retry the log upload later.
return
}

// Set the link for the logs of the check.
err = cr.CheckUpdater.UpdateState(stateupdater.CheckState{
ID: j.CheckID,
Expand All @@ -310,10 +314,14 @@ func (cr *Runner) runJob(m queue.Message, t interface{}, processed chan bool) {
if err != nil {
err = fmt.Errorf("error updating the link to the logs of the check: %s, error: %w", j.CheckID, err)
cr.finishJob(j, "linking_raw", processed, false, err)
return
}

cr.Logger.Debugf(j.logTrace(logsLink, "raw_logs"))
}

// We query if the check has sent any status update with a terminal status.
isTerminal := cr.CheckUpdater.CheckStatusTerminal(j.CheckID)

// Check if the backend returned any not expected error while running the check.
execErr := res.Error
if execErr != nil &&
Expand All @@ -339,11 +347,19 @@ func (cr *Runner) runJob(m queue.Message, t interface{}, processed chan bool) {
status = stateupdater.StatusFailed
}
// Ensure the check sent a status update with a terminal status.
if status == "" && !isterminal {
if status == "" && !isTerminal {
status = stateupdater.StatusFailed
}
// If the check was not canceled or aborted we just finish its execution.
if status == "" {
// We signal the CheckUpdater that we don't need it to store that
// information any more.
err = cr.CheckUpdater.FlushCheckStatus(j.CheckID)
if err != nil {
err = fmt.Errorf("error deleting the terminal status of the check: %s, error: %w", j.CheckID, err)
cr.finishJob(j, status, processed, false, err)
return
}
cr.finishJob(j, "completed", processed, true, err)
return
}
Expand All @@ -354,6 +370,14 @@ func (cr *Runner) runJob(m queue.Message, t interface{}, processed chan bool) {
if err != nil {
err = fmt.Errorf("error updating the status of the check: %s, error: %w", j.CheckID, err)
}
// We signal the CheckUpdater that we don't need it to store that
// information any more.
err = cr.CheckUpdater.FlushCheckStatus(j.CheckID)
if err != nil {
err = fmt.Errorf("error deleting the terminal status of the check: %s, error: %w", j.CheckID, err)
cr.finishJob(j, status, processed, false, err)
return
}
cr.finishJob(j, "finished", processed, err == nil, err)
}

Expand Down
159 changes: 132 additions & 27 deletions jobrunner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,26 @@ type CheckRaw struct {
}

type inMemChecksUpdater struct {
updates []stateupdater.CheckState
raws []CheckRaw
updates []stateupdater.CheckState
raws []CheckRaw
terminalStatus map[string]stateupdater.CheckState
uploadLogError error
}

func (im *inMemChecksUpdater) UpdateState(cs stateupdater.CheckState) error {
status := ""
if cs.Status != nil {
status = *cs.Status
} else {
storedCheckStatus, ok := im.terminalStatus[cs.ID]
if ok {
status = *storedCheckStatus.Status
}
}
if _, ok := stateupdater.TerminalStatuses[status]; ok {
im.UpdateCheckStatusTerminal(cs)
return nil
}
if im.updates == nil {
im.updates = make([]stateupdater.CheckState, 0)
}
Expand All @@ -66,6 +81,10 @@ func (im *inMemChecksUpdater) UpdateState(cs stateupdater.CheckState) error {
}

func (im *inMemChecksUpdater) UploadCheckData(checkID string, kind string, stime time.Time, raw []byte) (string, error) {
if im.uploadLogError != nil {
return "", im.uploadLogError
}

if im.raws != nil {
im.raws = make([]CheckRaw, 0)
}
Expand All @@ -78,26 +97,57 @@ func (im *inMemChecksUpdater) UploadCheckData(checkID string, kind string, stime
}

func (im *inMemChecksUpdater) CheckStatusTerminal(ID string) bool {
for _, u := range im.updates {
status := ""
if u.Status != nil {
status = *u.Status
}
if _, ok := stateupdater.TerminalStatuses[status]; ok {
return true
}
_, ok := im.terminalStatus[ID]
return ok
}

func (im *inMemChecksUpdater) UpdateCheckStatusTerminal(s stateupdater.CheckState) {
if im.terminalStatus == nil {
im.terminalStatus = make(map[string]stateupdater.CheckState)
}
return false
checkState, ok := im.terminalStatus[s.ID]
if !ok {
im.terminalStatus[s.ID] = s
return
}

// We update the existing CheckState
if s.Status != nil {
checkState.Status = s.Status
}
if s.Raw != nil {
checkState.Raw = s.Raw
}
if s.AgentID != nil {
checkState.AgentID = s.AgentID
}
if s.Progress != nil {
checkState.Progress = s.Progress
}
if s.Report != nil {
checkState.Report = s.Report
}

im.terminalStatus[checkState.ID] = checkState
}

func (im *inMemChecksUpdater) DeleteCheckStatusTerminal(ID string) {
func (im *inMemChecksUpdater) FlushCheckStatus(ID string) error {
if im.updates == nil {
im.updates = make([]stateupdater.CheckState, 0)
}
terminalStatus, ok := im.terminalStatus[ID]
if ok {
im.updates = append(im.updates, terminalStatus)
}
return nil
}

type mockChecksUpdater struct {
stateUpdater func(cs stateupdater.CheckState) error
checkRawUpload func(checkID, kind string, startedAt time.Time, content []byte) (string, error)
checkTerminalChecker func(ID string) bool
checkTerminalDeleter func(ID string)
checkTerminalDeleter func(ID string) error
checkTerminalUpdater func(cs stateupdater.CheckState)
}

func (m *mockChecksUpdater) UpdateState(cs stateupdater.CheckState) error {
Expand All @@ -112,8 +162,12 @@ func (m *mockChecksUpdater) CheckStatusTerminal(ID string) bool {
return m.checkTerminalChecker(ID)
}

func (m *mockChecksUpdater) DeleteCheckStatusTerminal(ID string) {
m.checkTerminalDeleter(ID)
func (m *mockChecksUpdater) FlushCheckStatus(ID string) error {
return m.checkTerminalDeleter(ID)
}

func (m *mockChecksUpdater) UpdateCheckStatusTerminal(cs stateupdater.CheckState) {
m.checkTerminalUpdater(cs)
}

type mockBackend struct {
Expand Down Expand Up @@ -188,8 +242,8 @@ func TestRunner_ProcessMessage(t *testing.T) {
Tokens: make(chan interface{}, 10),
Logger: &log.NullLog{},
CheckUpdater: &inMemChecksUpdater{
updates: []stateupdater.CheckState{
{
terminalStatus: map[string]stateupdater.CheckState{
runJobFixture1.CheckID: {
ID: runJobFixture1.CheckID,
Status: str2ptr(stateupdater.StatusFinished),
},
Expand Down Expand Up @@ -226,12 +280,9 @@ func TestRunner_ProcessMessage(t *testing.T) {
wantUpdates := []stateupdater.CheckState{
{
ID: runJobFixture1.CheckID,
Raw: &rawLink,
Status: str2ptr(stateupdater.StatusFinished),
},
{
ID: runJobFixture1.CheckID,
Raw: &rawLink,
},
}
rawsDiff := cmp.Diff(wantRaws, gotRaws)
updateDiff := cmp.Diff(wantUpdates, gotUpdates)
Expand Down Expand Up @@ -350,7 +401,6 @@ func TestRunner_ProcessMessage(t *testing.T) {
return fmt.Sprintf("%s%s", rawsDiff, updateDiff)
},
},

{
name: "UpdatesStateWhenCheckTimedout",
fields: fields{
Expand Down Expand Up @@ -424,7 +474,6 @@ func TestRunner_ProcessMessage(t *testing.T) {
return fmt.Sprintf("%s%s", rawsDiff, updateDiff)
},
},

{
name: "UpdatesStateWhenCheckCanceled",
fields: fields{
Expand Down Expand Up @@ -498,7 +547,6 @@ func TestRunner_ProcessMessage(t *testing.T) {
return fmt.Sprintf("%s%s", rawsDiff, updateDiff)
},
},

{
name: "DoesNotRunAbortedChecks",
fields: fields{
Expand Down Expand Up @@ -556,7 +604,6 @@ func TestRunner_ProcessMessage(t *testing.T) {
return fmt.Sprintf("%s%s", rawsDiff, updateDiff)
},
},

{
name: "DontDeleteWhenErrorUpdatingStatus",
fields: fields{
Expand Down Expand Up @@ -597,7 +644,10 @@ func TestRunner_ProcessMessage(t *testing.T) {
checkTerminalChecker: func(ID string) bool {
return false
},
checkTerminalDeleter: func(string) {
checkTerminalDeleter: func(string) error {
return nil
},
checkTerminalUpdater: func(cs stateupdater.CheckState) {
},
},
},
Expand All @@ -612,7 +662,6 @@ func TestRunner_ProcessMessage(t *testing.T) {
return ""
},
},

{
name: "UpdatesLogsWhenUnexpectedError",
fields: fields{
Expand Down Expand Up @@ -798,6 +847,62 @@ func TestRunner_ProcessMessage(t *testing.T) {
return fmt.Sprintf("%s%s", rawsDiff, updateDiff)
},
},
{
name: "FailUploadingLogs",
fields: fields{
Backend: &mockBackend{
CheckRunner: func(ctx context.Context, params backend.RunParams) (<-chan backend.RunResult, error) {
res := make(chan backend.RunResult)
go func() {
output, err := json.Marshal(params)
if err != nil {
panic(err)
}
results := backend.RunResult{
Output: output,
}
res <- results
}()
return res, nil
},
},
cAborter: &checkAborter{
cancels: sync.Map{},
},
aborted: &inMemAbortedChecks{make(map[string]struct{}), nil},
defaultTimeout: time.Duration(10 * time.Second),
Tokens: make(chan interface{}, 10),
Logger: &log.NullLog{},
CheckUpdater: &inMemChecksUpdater{
terminalStatus: map[string]stateupdater.CheckState{
runJobFixture1.CheckID: {
ID: runJobFixture1.CheckID,
Status: str2ptr(stateupdater.StatusFinished),
},
},
uploadLogError: errors.New("error uploading logs"),
},
},
args: args{
msg: queue.Message{
Body: string(mustMarshal(runJobFixture1)),
},
token: token{},
},
want: false,
wantState: func(r *Runner) string {
updater := r.CheckUpdater.(*inMemChecksUpdater)
gotRaws := updater.raws
var wantRaws []CheckRaw
gotUpdates := updater.updates
// No update with a FAILED terminal status will be sent because
// it will be reattempted.
var wantUpdates []stateupdater.CheckState
rawsDiff := cmp.Diff(wantRaws, gotRaws)
updateDiff := cmp.Diff(wantUpdates, gotUpdates)
return fmt.Sprintf("%s%s", rawsDiff, updateDiff)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading

0 comments on commit b360d78

Please sign in to comment.