diff --git a/pkg/common/helper.go b/pkg/common/helper.go index 2c7cc6f..25ffa9b 100644 --- a/pkg/common/helper.go +++ b/pkg/common/helper.go @@ -22,12 +22,13 @@ package common import ( "context" + "errors" "reflect" "runtime" "strings" "time" - "github.com/nuclio/errors" + nuclioerrors "github.com/nuclio/errors" "github.com/nuclio/logger" ) @@ -87,7 +88,7 @@ func RetryFunc(ctx context.Context, time.Sleep(backoff.Duration()) } else { if retryInterval == nil { - return errors.New("Either retry interval or backoff must be given") + return nuclioerrors.New("Either retry interval or backoff must be given") } time.Sleep(*retryInterval) } @@ -108,7 +109,7 @@ func RetryFunc(ctx context.Context, "function", getFunctionName(fn), "err", err, "attempts", attempts) - return errors.New("Failed final attempt to invoke function without proper error supplied") + return nuclioerrors.New("Failed final attempt to invoke function without proper error supplied") } return err } @@ -192,17 +193,12 @@ func EngineErrorIsNonFatal(err error) bool { return errorMatches(err, nonFatalEngineErrorsPartialMatch) } -func EngineErrorIsFatal(err error) bool { - var fatalEngineErrorsPartialMatch = []string{ - "lookup v3io-webapi: i/o timeout", - } - return errorMatches(err, fatalEngineErrorsPartialMatch) -} - func errorMatches(err error, substrings []string) bool { - if err != nil && len(err.Error()) > 0 { + // Unwraps the entire error chain + for e := err; e != nil; e = errors.Unwrap(e) { + errMsg := e.Error() for _, substring := range substrings { - if strings.Contains(err.Error(), substring) || strings.Contains(errors.Cause(err).Error(), substring) { + if strings.Contains(errMsg, substring) { return true } } diff --git a/pkg/dataplane/streamconsumergroup/claim.go b/pkg/dataplane/streamconsumergroup/claim.go index 912c68f..d14cb64 100644 --- a/pkg/dataplane/streamconsumergroup/claim.go +++ b/pkg/dataplane/streamconsumergroup/claim.go @@ -130,20 +130,16 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time func(attempt int) (bool, error, int) { c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID) if err != nil { - if common.EngineErrorIsNonFatal(err) { - return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0 - } - - // if the error is fatal and requires external resolution, + // if the error is not fatal (as network issue), // we don't want to fail; instead, we will inform the user via a log - if common.EngineErrorIsFatal(err) { - c.logger.ErrorWith("A fatal error occurred. Will retry until successful", + if common.EngineErrorIsNonFatal(err) { + c.logger.ErrorWith("Failed to get shard location. Will retry until successful", "error", err, "shard", c.shardID) // for this type of error, we always increment the attempt counter // this ensures the smooth operation of other components in Nuclio // we avoid panicking and simply wait for the issue to be resolved - return true, errors.Wrap(err, "Failed to get shard location"), 1 + return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 1 } // requested for an immediate stop