Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait forever for fatal errors #155

Merged
merged 4 commits into from
Jan 19, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions pkg/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,19 @@ func RetryFunc(ctx context.Context,
attempts int,
retryInterval *time.Duration,
backoff *Backoff,
fn func(int) (bool, error)) error {
fn func(int) (bool, error, int)) error {
TomerShor marked this conversation as resolved.
Show resolved Hide resolved

var err error
var retry bool
var addAttempts int

for attempt := 1; attempt <= attempts; attempt++ {
retry, err = fn(attempt)
var attempt = 0
for attempt <= attempts {

attempt++
// some errors might require more attempts than expected, so allow incrementing attempts from outside
retry, err, addAttempts = fn(attempt)
attempts += addAttempts

// if there's no need to retry - we're done
if !retry {
Expand Down Expand Up @@ -178,9 +184,19 @@ func EngineErrorIsNonFatal(err error) bool {
"timeout",
"refused",
}
return errorMatches(err, nonFatalEngineErrorsPartialMatch)
}

func EngineErrorIsFatal(err error) bool {
var fatalEngineErrorsPartialMatch = []string{
"Failed to fetch record batches",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the error from v3io, this is a log the this client prints later.
As the ticket says, you want to retry on:

lookup v3io-webapi: i/o timeout

}
return errorMatches(err, fatalEngineErrorsPartialMatch)
}
func errorMatches(err error, substrings []string) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newline

if err != nil && len(err.Error()) > 0 {
for _, nonFatalError := range nonFatalEngineErrorsPartialMatch {
if strings.Contains(err.Error(), nonFatalError) || strings.Contains(errors.Cause(err).Error(), nonFatalError) {
for _, substring := range substrings {
if strings.Contains(err.Error(), substring) || strings.Contains(errors.Cause(err).Error(), substring) {
return true
}
}
Expand Down
27 changes: 21 additions & 6 deletions pkg/dataplane/streamconsumergroup/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,31 +126,46 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time
c.logger,
c.getShardLocationAttempts,
nil,
&c.getShardLocationBackoff, func(attempt int) (bool, error) {
&c.getShardLocationBackoff, func(attempt int) (bool, error, int) {
c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
&c.getShardLocationBackoff, func(attempt int) (bool, error, int) {
c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID)
&c.getShardLocationBackoff,
func(attempt int) (bool, error, int) {
c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID)

if err != nil {
if common.EngineErrorIsNonFatal(err) {
return true, errors.Wrap(err, "Failed to get shard location due to a network error")
return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0
}

// if the error is fatal and requires external resolution,
// we don't want to fail; instead, we will inform the user via a log
if common.EngineErrorIsFatal(err) {
// although RetryFunc already logs the error, it logs it as a warning
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually doesn't log the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loggerInstance.WarnWithCtx(ctx,
"Context error detected during retries",
"ctxErr", ctx.Err(),
"previousErr", err,
"function", getFunctionName(fn),
"attempt", attempt)
// return the error if one was provided
if err != nil {
return err
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warning is for a ctx.Error, like if an external error happened.
In your case you check the error returned from getCurrentShardLocation, which isn't handled by the retry func.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomerShor right 🤦

i'll remove comment

// to emphasize the severity, we log it again as an error
c.logger.ErrorWith("A fatal error occurred. Will retry until successful",
"error", err,
"shard", c.shardID)
// for this type of error, we always increment the attempt counter
// this ensures the smooth operation of other components in Nuclio
// we avoid panicking and simply wait for the issue to be resolved
return true, errors.Wrap(err, "Failed to get shard location"), 1
}

// requested for an immediate stop
if err == v3ioerrors.ErrStopped {
return false, nil
return false, nil, 0
}

switch errors.RootCause(err).(type) {

// in case of a network error, retry to avoid transient errors
case *net.OpError:
return true, errors.Wrap(err, "Failed to get shard location due to a network error")
return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0

// unknown error, fail now
default:
return false, errors.Wrap(err, "Failed to get shard location")
return false, errors.Wrap(err, "Failed to get shard location"), 0
}
}

// we have shard location
return false, nil
return false, nil, 0
}); err != nil {
return errors.Wrapf(err,
"Failed to get shard location state, attempts exhausted. shard id: %d",
Expand Down
18 changes: 9 additions & 9 deletions pkg/dataplane/streamconsumergroup/streamconsumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,19 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
backoff := scg.config.State.ModifyRetry.Backoff
attempts := scg.config.State.ModifyRetry.Attempts

err := common.RetryFunc(context.TODO(), scg.logger, attempts, nil, &backoff, func(attempt int) (bool, error) {
err := common.RetryFunc(context.TODO(), scg.logger, attempts, nil, &backoff, func(attempt int) (bool, error, int) {
state, stateMtimeNanoSeconds, stateMtimeSeconds, err := scg.getStateFromPersistency()
if err != nil && !errors.Is(err, v3ioerrors.ErrNotFound) {
return true, errors.Wrap(err, "Failed getting current state from persistency")
return true, errors.Wrap(err, "Failed getting current state from persistency"), 0
}
if common.EngineErrorIsNonFatal(err) {
return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error")
return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error"), 0
}

if state == nil {
state, err = newState()
if err != nil {
return true, errors.Wrap(err, "Failed to create state")
return true, errors.Wrap(err, "Failed to create state"), 0
}
}

Expand All @@ -137,9 +137,9 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
if errors.Is(errors.RootCause(err), errShardRetention) {

// if shard retention failed the member needs to be aborted, so we can stop retrying
return false, errors.Wrap(err, "Failed modifying state")
return false, errors.Wrap(err, "Failed modifying state"), 0
}
return true, errors.Wrap(err, "Failed modifying state")
return true, errors.Wrap(err, "Failed modifying state"), 0
}

// log only on change
Expand All @@ -157,14 +157,14 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
"attempt", attempt,
"err", errors.RootCause(err).Error())
}
return true, errors.Wrap(err, "Failed setting state in persistency state")
return true, errors.Wrap(err, "Failed setting state in persistency state"), 0
}

if err := handlePostSetStateInPersistency(); err != nil {
return false, errors.Wrap(err, "Failed handling post set state in persistency")
return false, errors.Wrap(err, "Failed handling post set state in persistency"), 0
}

return false, nil
return false, nil, 0
})

if err != nil {
Expand Down
24 changes: 12 additions & 12 deletions pkg/dataplane/test/streamconsumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,20 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() {
30,
&duration,
nil,
func(attempt int) (bool, error) {
func(attempt int) (bool, error, int) {
observedState, err := streamConsumerGroup.GetState()
suite.Require().NoError(err)
for _, sessionState := range observedState.SessionStates {
if sessionState.MemberID == member.streamConsumerGroupMember.GetID() {
suite.logger.DebugWith("Session state was not removed just yet")
return true, nil
return true, nil, 0
}
}

suite.logger.DebugWith("Session state was removed",
"observedState", observedState,
"memberID", member.id)
return false, nil
return false, nil, 0
})
})

Expand Down Expand Up @@ -270,7 +270,7 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() {
30,
&duration,
nil,
func(attempt int) (bool, error) {
func(attempt int) (bool, error, int) {
observedState, err := streamConsumerGroup.GetState()
suite.Require().NoError(err)

Expand All @@ -279,14 +279,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() {
suite.logger.DebugWith("retained shards",
"shards", sessionState.Shards,
"memberID", sessionState.MemberID)
return false, nil
return false, nil, 0
}
}

suite.logger.DebugWith("Session state shards were no retained just yet",
"sessionStates", observedState.SessionStates,
"memberID", member.streamConsumerGroupMember.GetID())
return true, nil
return true, nil, 0

})
})
Expand Down Expand Up @@ -368,14 +368,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerAbort() {
10,
&duration,
nil,
func(attempt int) (bool, error) {
func(attempt int) (bool, error, int) {
state, err = suite.getStateFromPersistency(suite.streamPath, consumerGroupName)
if err != nil {
suite.logger.DebugWith("State was not retrieved from persistency",
"err", err)
return true, err
return true, err, 0
}
return false, nil
return false, nil, 0
})
suite.Require().NoError(err)

Expand All @@ -391,14 +391,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerAbort() {
10,
&duration,
nil,
func(attempt int) (bool, error) {
func(attempt int) (bool, error, int) {
err = suite.setStateInPersistency(suite.streamPath, consumerGroupName, state)
if err != nil {
suite.logger.DebugWith("State was not set in persistency yet",
"err", err)
return true, err
return true, err, 0
}
return false, nil
return false, nil, 0
})
suite.Require().NoError(err)

Expand Down
Loading