Skip to content

Commit

Permalink
Fixes and job.Timeout implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jesusfcr committed Jan 28, 2025
1 parent 83aa0ff commit 98c2943
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 39 deletions.
78 changes: 52 additions & 26 deletions internal/http/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,38 @@ type Check struct {
finished chan error // used to wait for the server to shutted down.
}

// ProcessRunRequest implements an HTTP POST handler that receives a JSON encoded job, and returns an
type Job struct {
CheckID string `json:"check_id"` // Required
StartTime time.Time `json:"start_time"` // Required
Image string `json:"image"` // Required
Target string `json:"target"` // Required
Timeout int `json:"timeout"` // Required
AssetType string `json:"assettype"` // Optional
Options string `json:"options"` // Optional
RequiredVars []string `json:"required_vars"` // Optional
Metadata map[string]string `json:"metadata"` // Optional
}

// handleRun implements an HTTP POST handler that receives a JSON encoded job, and returns an
// agent.State JSON encoded response.
func (c *Check) ProcessRunRequest(w http.ResponseWriter, r *http.Request) {
func (c *Check) handleRun(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "error reading request body", http.StatusBadRequest)
return
}

var job Job
if err = json.Unmarshal(body, &job); err != nil {
w.WriteHeader(500)
return
}

if job.StartTime.IsZero() {
job.StartTime = time.Now()
}

logger := c.Logger.WithFields(log.Fields{
"target": job.Target,
"checkID": job.CheckID,
Expand All @@ -61,7 +78,7 @@ func (c *Check) ProcessRunRequest(w http.ResponseWriter, r *http.Request) {
Report: report.Report{
CheckData: report.CheckData{
CheckID: job.CheckID,
StartTime: job.StartTime, // TODO: Is this correct or should be time.Now()
StartTime: job.StartTime,
ChecktypeName: c.config.Check.CheckTypeName,
ChecktypeVersion: c.config.Check.CheckTypeVersion,
Options: job.Options,
Expand All @@ -72,25 +89,43 @@ func (c *Check) ProcessRunRequest(w http.ResponseWriter, r *http.Request) {
},
}

var cancel context.CancelFunc
if job.Timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(job.Timeout)*time.Second)
defer cancel()
}

runtimeState := state.State{
ResultData: &checkState.state.Report.ResultData,
ProgressReporter: state.ProgressReporterHandler(checkState.SetProgress),
}
logger.WithField("opts", job.Options).Info("Starting check")

err = c.checker.Run(ctx, job.Target, job.AssetType, job.Options, runtimeState)
c.checker.CleanUp(ctx, job.Target, job.AssetType, job.Options)

// This allows to capture if the context was canceled (i.e. by the http request) or job.Timeout was reached.
select {
case <-ctx.Done():
err = ctx.Err()
default:
}

checkState.state.Report.CheckData.EndTime = time.Now()
elapsedTime := time.Since(startTime)
// If an error has been returned, we set the correct status.
logger.WithField("elapsedTime", elapsedTime).Info("Check finished")

if err != nil {
if errors.Is(err, context.Canceled) {
logger.WithError(err).Error("Error running check")
if errors.Is(err, context.DeadlineExceeded) {
checkState.state.Status = agent.StatusAborted
} else if errors.Is(err, context.Canceled) {
checkState.state.Status = agent.StatusAborted
} else if errors.Is(err, state.ErrAssetUnreachable) {
checkState.state.Status = agent.StatusInconclusive
} else if errors.Is(err, state.ErrNonPublicAsset) {
checkState.state.Status = agent.StatusInconclusive
} else {
logger.WithError(err).Error("Error running check")
checkState.state.Status = agent.StatusFailed
checkState.state.Report.Error = err.Error()
}
Expand All @@ -111,32 +146,36 @@ func (c *Check) ProcessRunRequest(w http.ResponseWriter, r *http.Request) {
w.Write(out)
}

// handleHealth implements an HTTP GET handler that returns a simple "OK" response.
func (c *Check) handleHealth(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`"OK"`))
}

// RunAndServe implements the behavior needed by the sdk for a check runner to
// execute a check.
func (c *Check) RunAndServe() {
mux := http.NewServeMux()
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`"OK"`))
})
mux.HandleFunc("/run", c.ProcessRunRequest)
mux.HandleFunc("/healthz", c.handleHealth)
mux.HandleFunc("/run", c.handleRun)

server := &http.Server{
Addr: fmt.Sprintf(":%d", c.port),
Handler: mux,
}

c.Logger.Info(fmt.Sprintf("Listening at %s", server.Addr))
chanErr := make(chan error)
go func() {
if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
c.finished <- err
chanErr <- err
} else {
c.finished <- nil
}
}()

select {
case err := <-c.finished:
case err := <-chanErr:
c.Logger.WithError(err).Error("ListenAndServe: Unable to start server")
return // No need to shutdow the server because it was not started.
case s := <-c.exitSignal:
Expand All @@ -159,19 +198,6 @@ func (c *Check) RunAndServe() {
c.Logger.Info("Finished RunAndServe")
}

type Job struct {
CheckID string `json:"check_id"` // Required
StartTime time.Time `json:"start_time"` // Required
Image string `json:"image"` // Required
Target string `json:"target"` // Required
Timeout int `json:"timeout"` // Required
AssetType string `json:"assettype"` // Optional
Options string `json:"options"` // Optional
RequiredVars []string `json:"required_vars"` // Optional
Metadata map[string]string `json:"metadata"` // Optional
RunTime int64
}

// Shutdown is needed to fulfil the check interface and in this case we are
// shutting down the http server and waiting
func (c *Check) Shutdown() error {
Expand Down
45 changes: 32 additions & 13 deletions internal/http/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ func sleepCheckRunner(ctx context.Context, target, _, optJSON string, st state.S
select {
case <-time.After(time.Duration(opt.SleepTime) * time.Second):
log.Debugf("slept successfully %s seconds", strconv.Itoa(opt.SleepTime))
st.AddVulnerabilities(report.Vulnerability{
Summary: "Summary",
Description: "Test Vulnerability",
})
case <-ctx.Done():
log.Info("Check aborted")
}
st.AddVulnerabilities(report.Vulnerability{
Summary: "Summary",
Description: "Test Vulnerability",
})
return nil
}

Expand Down Expand Up @@ -143,24 +143,31 @@ func TestIntegrationHttpMode(t *testing.T) {
Target: "www.example.com",
AssetType: "Hostname",
},
"checkDeadline": {
CheckID: "checkDeadline",
Options: `{"SleepTime": 10}`,
Target: "www.example.com",
AssetType: "Hostname",
},
"checkInconclusive": {
"checkInconclusive": { // The check will always return an Inconclusive status for that tharget
CheckID: "checkInconclusive",
Options: `{"SleepTime": 1}`,
Target: "inconclusive",
AssetType: "Hostname",
},
"checkFailed": {
"checkFailed": { // The check will always fail because the sleep time is missing
CheckID: "checkFailed",
Options: `{}`,
Target: "www.example.com",
AssetType: "Hostname",
},
"checkClientTimeout": { // The http request will be aborted by the client as it exceeds the requestTimeout
CheckID: "checkClientTimeout",
Options: `{"SleepTime": 10}`,
Target: "www.example.com",
AssetType: "Hostname",
},
"checkSdkTimeout": { // The check will be aborted by the server sdk as it exceeds the specified timeout
CheckID: "checkSdkTimeout",
Options: `{"SleepTime": 2}`,
Target: "www.example.com",
AssetType: "Hostname",
Timeout: 1,
},
},
resourceToClean: map[string]string{"key": "initial"},
},
Expand Down Expand Up @@ -189,9 +196,21 @@ func TestIntegrationHttpMode(t *testing.T) {
Notes: "",
},
}},
"checkDeadline": {
"checkClientTimeout": {
Status: agent.StatusAborted,
},
"checkSdkTimeout": {
Status: agent.StatusAborted,
Report: report.Report{
CheckData: report.CheckData{
CheckID: "checkSdkTimeout",
ChecktypeName: "checkTypeName",
ChecktypeVersion: "",
Target: "www.example.com",
Options: `{"SleepTime": 2}`,
Status: agent.StatusAborted,
},
}},
"checkInconclusive": {
Status: agent.StatusInconclusive,
Report: report.Report{
Expand Down

0 comments on commit 98c2943

Please sign in to comment.