Skip to content

Commit

Permalink
Move checks for errors (#157)
Browse files Browse the repository at this point in the history
Changed as it turned out not to be external dependency issue, because nuclio function restart fixes the problem. Thus, in case of network issue, we will wait forever and inform about this in the log
  • Loading branch information
rokatyy authored Feb 3, 2025
1 parent dee4488 commit b892813
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 20 deletions.
20 changes: 8 additions & 12 deletions pkg/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ package common

import (
"context"
"errors"
"reflect"
"runtime"
"strings"
"time"

"github.com/nuclio/errors"
nuclioerrors "github.com/nuclio/errors"
"github.com/nuclio/logger"
)

Expand Down Expand Up @@ -87,7 +88,7 @@ func RetryFunc(ctx context.Context,
time.Sleep(backoff.Duration())
} else {
if retryInterval == nil {
return errors.New("Either retry interval or backoff must be given")
return nuclioerrors.New("Either retry interval or backoff must be given")
}
time.Sleep(*retryInterval)
}
Expand All @@ -108,7 +109,7 @@ func RetryFunc(ctx context.Context,
"function", getFunctionName(fn),
"err", err,
"attempts", attempts)
return errors.New("Failed final attempt to invoke function without proper error supplied")
return nuclioerrors.New("Failed final attempt to invoke function without proper error supplied")
}
return err
}
Expand Down Expand Up @@ -192,17 +193,12 @@ func EngineErrorIsNonFatal(err error) bool {
return errorMatches(err, nonFatalEngineErrorsPartialMatch)
}

func EngineErrorIsFatal(err error) bool {
var fatalEngineErrorsPartialMatch = []string{
"lookup v3io-webapi: i/o timeout",
}
return errorMatches(err, fatalEngineErrorsPartialMatch)
}

func errorMatches(err error, substrings []string) bool {
if err != nil && len(err.Error()) > 0 {
// Unwraps the entire error chain
for e := err; e != nil; e = errors.Unwrap(e) {
errMsg := e.Error()
for _, substring := range substrings {
if strings.Contains(err.Error(), substring) || strings.Contains(errors.Cause(err).Error(), substring) {
if strings.Contains(errMsg, substring) {
return true
}
}
Expand Down
12 changes: 4 additions & 8 deletions pkg/dataplane/streamconsumergroup/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,16 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time
func(attempt int) (bool, error, int) {
c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID)
if err != nil {
if common.EngineErrorIsNonFatal(err) {
return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0
}

// if the error is fatal and requires external resolution,
// if the error is not fatal (as network issue),
// we don't want to fail; instead, we will inform the user via a log
if common.EngineErrorIsFatal(err) {
c.logger.ErrorWith("A fatal error occurred. Will retry until successful",
if common.EngineErrorIsNonFatal(err) {
c.logger.ErrorWith("Failed to get shard location. Will retry until successful",
"error", err,
"shard", c.shardID)
// for this type of error, we always increment the attempt counter
// this ensures the smooth operation of other components in Nuclio
// we avoid panicking and simply wait for the issue to be resolved
return true, errors.Wrap(err, "Failed to get shard location"), 1
return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 1
}

// requested for an immediate stop
Expand Down

0 comments on commit b892813

Please sign in to comment.