Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait forever for fatal errors #155

Merged
merged 4 commits into from
Jan 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions pkg/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,30 @@ func getFunctionName(fn interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
}

// give either retryInterval or backoff
// RetryFunc give either retryInterval or backoff
// gets fn func(int) (bool, error, int) as parameter
// which returns:
// bool - whether should be retried
// error - whether error happened
// int - increments retries (allows manage retries count from inside of this function)
func RetryFunc(ctx context.Context,
loggerInstance logger.Logger,
attempts int,
retryInterval *time.Duration,
backoff *Backoff,
fn func(int) (bool, error)) error {
fn func(int) (bool, error, int)) error {
TomerShor marked this conversation as resolved.
Show resolved Hide resolved

var err error
var retry bool
var addAttempts int

for attempt := 1; attempt <= attempts; attempt++ {
retry, err = fn(attempt)
var attempt = 0
for attempt <= attempts {

attempt++
// some errors might require more attempts than expected, so allow incrementing attempts from outside
retry, err, addAttempts = fn(attempt)
attempts += addAttempts

// if there's no need to retry - we're done
if !retry {
Expand Down Expand Up @@ -178,9 +189,20 @@ func EngineErrorIsNonFatal(err error) bool {
"timeout",
"refused",
}
return errorMatches(err, nonFatalEngineErrorsPartialMatch)
}

func EngineErrorIsFatal(err error) bool {
var fatalEngineErrorsPartialMatch = []string{
"lookup v3io-webapi: i/o timeout",
}
return errorMatches(err, fatalEngineErrorsPartialMatch)
}

func errorMatches(err error, substrings []string) bool {
if err != nil && len(err.Error()) > 0 {
for _, nonFatalError := range nonFatalEngineErrorsPartialMatch {
if strings.Contains(err.Error(), nonFatalError) || strings.Contains(errors.Cause(err).Error(), nonFatalError) {
for _, substring := range substrings {
if strings.Contains(err.Error(), substring) || strings.Contains(errors.Cause(err).Error(), substring) {
return true
}
}
Expand Down
26 changes: 20 additions & 6 deletions pkg/dataplane/streamconsumergroup/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,31 +126,45 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time
c.logger,
c.getShardLocationAttempts,
nil,
&c.getShardLocationBackoff, func(attempt int) (bool, error) {
&c.getShardLocationBackoff,
func(attempt int) (bool, error, int) {
c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID)
if err != nil {
if common.EngineErrorIsNonFatal(err) {
return true, errors.Wrap(err, "Failed to get shard location due to a network error")
return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0
}

// if the error is fatal and requires external resolution,
// we don't want to fail; instead, we will inform the user via a log
if common.EngineErrorIsFatal(err) {
c.logger.ErrorWith("A fatal error occurred. Will retry until successful",
"error", err,
"shard", c.shardID)
// for this type of error, we always increment the attempt counter
// this ensures the smooth operation of other components in Nuclio
// we avoid panicking and simply wait for the issue to be resolved
return true, errors.Wrap(err, "Failed to get shard location"), 1
}

// requested for an immediate stop
if err == v3ioerrors.ErrStopped {
return false, nil
return false, nil, 0
}

switch errors.RootCause(err).(type) {

// in case of a network error, retry to avoid transient errors
case *net.OpError:
return true, errors.Wrap(err, "Failed to get shard location due to a network error")
return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0

// unknown error, fail now
default:
return false, errors.Wrap(err, "Failed to get shard location")
return false, errors.Wrap(err, "Failed to get shard location"), 0
}
}

// we have shard location
return false, nil
return false, nil, 0
}); err != nil {
return errors.Wrapf(err,
"Failed to get shard location state, attempts exhausted. shard id: %d",
Expand Down
18 changes: 9 additions & 9 deletions pkg/dataplane/streamconsumergroup/streamconsumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,19 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
backoff := scg.config.State.ModifyRetry.Backoff
attempts := scg.config.State.ModifyRetry.Attempts

err := common.RetryFunc(context.TODO(), scg.logger, attempts, nil, &backoff, func(attempt int) (bool, error) {
err := common.RetryFunc(context.TODO(), scg.logger, attempts, nil, &backoff, func(attempt int) (bool, error, int) {
state, stateMtimeNanoSeconds, stateMtimeSeconds, err := scg.getStateFromPersistency()
if err != nil && !errors.Is(err, v3ioerrors.ErrNotFound) {
return true, errors.Wrap(err, "Failed getting current state from persistency")
return true, errors.Wrap(err, "Failed getting current state from persistency"), 0
}
if common.EngineErrorIsNonFatal(err) {
return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error")
return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error"), 0
}

if state == nil {
state, err = newState()
if err != nil {
return true, errors.Wrap(err, "Failed to create state")
return true, errors.Wrap(err, "Failed to create state"), 0
}
}

Expand All @@ -137,9 +137,9 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
if errors.Is(errors.RootCause(err), errShardRetention) {

// if shard retention failed the member needs to be aborted, so we can stop retrying
return false, errors.Wrap(err, "Failed modifying state")
return false, errors.Wrap(err, "Failed modifying state"), 0
}
return true, errors.Wrap(err, "Failed modifying state")
return true, errors.Wrap(err, "Failed modifying state"), 0
}

// log only on change
Expand All @@ -157,14 +157,14 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
"attempt", attempt,
"err", errors.RootCause(err).Error())
}
return true, errors.Wrap(err, "Failed setting state in persistency state")
return true, errors.Wrap(err, "Failed setting state in persistency state"), 0
}

if err := handlePostSetStateInPersistency(); err != nil {
return false, errors.Wrap(err, "Failed handling post set state in persistency")
return false, errors.Wrap(err, "Failed handling post set state in persistency"), 0
}

return false, nil
return false, nil, 0
})

if err != nil {
Expand Down
24 changes: 12 additions & 12 deletions pkg/dataplane/test/streamconsumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,20 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() {
30,
&duration,
nil,
func(attempt int) (bool, error) {
func(attempt int) (bool, error, int) {
observedState, err := streamConsumerGroup.GetState()
suite.Require().NoError(err)
for _, sessionState := range observedState.SessionStates {
if sessionState.MemberID == member.streamConsumerGroupMember.GetID() {
suite.logger.DebugWith("Session state was not removed just yet")
return true, nil
return true, nil, 0
}
}

suite.logger.DebugWith("Session state was removed",
"observedState", observedState,
"memberID", member.id)
return false, nil
return false, nil, 0
})
})

Expand Down Expand Up @@ -270,7 +270,7 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() {
30,
&duration,
nil,
func(attempt int) (bool, error) {
func(attempt int) (bool, error, int) {
observedState, err := streamConsumerGroup.GetState()
suite.Require().NoError(err)

Expand All @@ -279,14 +279,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() {
suite.logger.DebugWith("retained shards",
"shards", sessionState.Shards,
"memberID", sessionState.MemberID)
return false, nil
return false, nil, 0
}
}

suite.logger.DebugWith("Session state shards were no retained just yet",
"sessionStates", observedState.SessionStates,
"memberID", member.streamConsumerGroupMember.GetID())
return true, nil
return true, nil, 0

})
})
Expand Down Expand Up @@ -368,14 +368,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerAbort() {
10,
&duration,
nil,
func(attempt int) (bool, error) {
func(attempt int) (bool, error, int) {
state, err = suite.getStateFromPersistency(suite.streamPath, consumerGroupName)
if err != nil {
suite.logger.DebugWith("State was not retrieved from persistency",
"err", err)
return true, err
return true, err, 0
}
return false, nil
return false, nil, 0
})
suite.Require().NoError(err)

Expand All @@ -391,14 +391,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerAbort() {
10,
&duration,
nil,
func(attempt int) (bool, error) {
func(attempt int) (bool, error, int) {
err = suite.setStateInPersistency(suite.streamPath, consumerGroupName, state)
if err != nil {
suite.logger.DebugWith("State was not set in persistency yet",
"err", err)
return true, err
return true, err, 0
}
return false, nil
return false, nil, 0
})
suite.Require().NoError(err)

Expand Down
Loading