From b8928138ed8def3b0b14d6cc075cc28a9c484588 Mon Sep 17 00:00:00 2001 From: Katerina Molchanova <35141662+rokatyy@users.noreply.github.com> Date: Mon, 3 Feb 2025 07:45:49 +0000 Subject: [PATCH] Move checks for errors (#157) Changed as it turned out not to be external dependency issue, because nuclio function restart fixes the problem. Thus, in case of network issue, we will wait forever and inform about this in the log --- pkg/common/helper.go | 20 ++++++++------------ pkg/dataplane/streamconsumergroup/claim.go | 12 ++++-------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/pkg/common/helper.go b/pkg/common/helper.go index 2c7cc6f..25ffa9b 100644 --- a/pkg/common/helper.go +++ b/pkg/common/helper.go @@ -22,12 +22,13 @@ package common import ( "context" + "errors" "reflect" "runtime" "strings" "time" - "github.com/nuclio/errors" + nuclioerrors "github.com/nuclio/errors" "github.com/nuclio/logger" ) @@ -87,7 +88,7 @@ func RetryFunc(ctx context.Context, time.Sleep(backoff.Duration()) } else { if retryInterval == nil { - return errors.New("Either retry interval or backoff must be given") + return nuclioerrors.New("Either retry interval or backoff must be given") } time.Sleep(*retryInterval) } @@ -108,7 +109,7 @@ func RetryFunc(ctx context.Context, "function", getFunctionName(fn), "err", err, "attempts", attempts) - return errors.New("Failed final attempt to invoke function without proper error supplied") + return nuclioerrors.New("Failed final attempt to invoke function without proper error supplied") } return err } @@ -192,17 +193,12 @@ func EngineErrorIsNonFatal(err error) bool { return errorMatches(err, nonFatalEngineErrorsPartialMatch) } -func EngineErrorIsFatal(err error) bool { - var fatalEngineErrorsPartialMatch = []string{ - "lookup v3io-webapi: i/o timeout", - } - return errorMatches(err, fatalEngineErrorsPartialMatch) -} - func errorMatches(err error, substrings []string) bool { - if err != nil && len(err.Error()) > 0 { + // Unwraps the entire error chain + for e := err; e != nil; e = errors.Unwrap(e) { + errMsg := e.Error() for _, substring := range substrings { - if strings.Contains(err.Error(), substring) || strings.Contains(errors.Cause(err).Error(), substring) { + if strings.Contains(errMsg, substring) { return true } } diff --git a/pkg/dataplane/streamconsumergroup/claim.go b/pkg/dataplane/streamconsumergroup/claim.go index 912c68f..d14cb64 100644 --- a/pkg/dataplane/streamconsumergroup/claim.go +++ b/pkg/dataplane/streamconsumergroup/claim.go @@ -130,20 +130,16 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time func(attempt int) (bool, error, int) { c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID) if err != nil { - if common.EngineErrorIsNonFatal(err) { - return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0 - } - - // if the error is fatal and requires external resolution, + // if the error is not fatal (as network issue), // we don't want to fail; instead, we will inform the user via a log - if common.EngineErrorIsFatal(err) { - c.logger.ErrorWith("A fatal error occurred. Will retry until successful", + if common.EngineErrorIsNonFatal(err) { + c.logger.ErrorWith("Failed to get shard location. Will retry until successful", "error", err, "shard", c.shardID) // for this type of error, we always increment the attempt counter // this ensures the smooth operation of other components in Nuclio // we avoid panicking and simply wait for the issue to be resolved - return true, errors.Wrap(err, "Failed to get shard location"), 1 + return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 1 } // requested for an immediate stop