Skip to content

Commit

Permalink
Fixed a memory leak in the Retry functionality for EVM clients (#909)
Browse files Browse the repository at this point in the history
Signed-off-by: Antonio Mindov <[email protected]>
  • Loading branch information
rokn authored Aug 18, 2023
1 parent 08b8458 commit 9eb5524
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 90 deletions.
60 changes: 21 additions & 39 deletions app/clients/evm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ const (
retryAfterTimer = 10 * time.Second
)


// Client EVM JSON RPC Client
type Client struct {
config config.Evm
Expand Down Expand Up @@ -184,19 +183,12 @@ func (ec Client) BlockConfirmations() uint64 {
// RetryBlockNumber returns the most recent block number
// Uses a retry mechanism in case the filter query is stuck
func (ec Client) RetryBlockNumber() (uint64, error) {
blockNumberFunc := func() <-chan retry.Result {
r := make(chan retry.Result)
go func() {
defer close(r)

block, err := ec.BlockNumber(context.Background())
r <- retry.Result{
Value: block,
Error: err,
}
}()

return r
blockNumberFunc := func(ctx context.Context) retry.Result {
block, err := ec.BlockNumber(ctx)
return retry.Result{
Value: block,
Error: err,
}
}

result, err := service.Retry(blockNumberFunc, executionRetries)
Expand All @@ -216,19 +208,12 @@ func (ec Client) RetryBlockNumber() (uint64, error) {
// RetryFilterLogs returns the logs from the input query
// Uses a retry mechanism in case the filter query is stuck
func (ec Client) RetryFilterLogs(query ethereum.FilterQuery) ([]types.Log, error) {
filterLogsFunc := func() <-chan retry.Result {
r := make(chan retry.Result)
go func() {
defer close(r)

logs, err := ec.FilterLogs(context.Background(), query)
r <- retry.Result{
Value: logs,
Error: err,
}
}()

return r
filterLogsFunc := func(ctx context.Context) retry.Result {
logs, err := ec.FilterLogs(ctx, query)
return retry.Result{
Value: logs,
Error: err,
}
}

result, err := service.Retry(filterLogsFunc, executionRetries)
Expand Down Expand Up @@ -277,31 +262,28 @@ func (ec *Client) WaitForConfirmations(raw types.Log) error {
}

func (ec *Client) RetryTransactionByHash(hash common.Hash) (*types.Transaction, error) {
res, err := service.Retry(func() <-chan retry.Result {
r := make(chan retry.Result)
go func() {
defer close(r)

tx, _, err := ec.TransactionByHash(context.Background(), hash)
res, err := service.Retry(
func(ctx context.Context) retry.Result {
tx, _, err := ec.TransactionByHash(ctx, hash)
if err != nil {
r <- retry.Result{
return retry.Result{
Value: nil,
Error: err,
}
return
}

r <- retry.Result{
return retry.Result{
Value: tx,
Error: nil,
}
}()
},
executionRetries,
)

return r
}, executionRetries)
if err != nil {
ec.logger.Warnf("Error in [RetryTransactionByHash - [%s]] Retry [%s]", hash, err)
return nil, err
}

return res.(*types.Transaction), nil
}
6 changes: 5 additions & 1 deletion app/domain/service/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

package service

import "errors"
import (
"errors"
"fmt"
)

var ErrNotFound = errors.New("not found")
var ErrBadRequestTransferTargetNetworkNoSignaturesRequired = errors.New("transfer target network does not require signatures")
var ErrWrongQuery = errors.New("wrong query parameter")
var ErrTooManyRetires = fmt.Errorf("too many retries")
52 changes: 13 additions & 39 deletions app/domain/service/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package service

import (
"context"
"errors"
"fmt"
"github.com/limechain/hedera-eth-bridge-validator/app/model/retry"
log "github.com/sirupsen/logrus"
"time"
Expand All @@ -28,62 +28,36 @@ const (
sleepPeriod = 5 * time.Second
)

var (
timeoutError = fmt.Errorf("Timeout after [%d]", sleepPeriod)
)

// timeout is a function that returns an error after sleepPeriod.
func timeout() <-chan retry.Result {
r := make(chan retry.Result)

go func() {
defer close(r)

time.Sleep(sleepPeriod)
r <- retry.Result{
Value: nil,
Error: timeoutError,
}
}()

return r
}

// Retry executes two functions in race condition ({@param executionFunction} and timeout function).
// It takes the first result from both functions.
// If timeout function finishes first, it will retry the same mechanism {@param retries} times.
// If {@param executionFunction} finishes first, it will directly resolve its result.
// Retry executes the given function with a timeout of {@param sleepPeriod}.
// If the function timeouts, it will retry the execution until the given {@param retries} is reached.
// If the function returns an error, this will return the error.
// If the function is executed successfully, this will return the result.
// This function finds usability in the execution of EVM queries, which from time to time do not return response -
// the query is stuck forever and breaks the business logic. This way, if the query takes more than sleepPeriod, it will
// retry the query {@param retries} times.
// If {@param retries} is reached, it will return an error.
func Retry(executionFunction func() <-chan retry.Result, retries int) (interface{}, error) {
func Retry(executionFunction func(context.Context) retry.Result, retries int) (interface{}, error) {
times := 0
var retryFunction func() (interface{}, error)

retryFunction = func() (interface{}, error) {
var executionResult retry.Result
select {
case executionResult = <-timeout():
case executionResult = <-executionFunction():
}
for {
ctx, cancel := context.WithTimeout(context.Background(), sleepPeriod)
executionResult := executionFunction(ctx)
cancel()

if executionResult.Error != nil {
if errors.Is(executionResult.Error, timeoutError) {
if errors.Is(executionResult.Error, context.DeadlineExceeded) {
times++
if times >= retries {
log.Warnf("Function execution timeouted. [%d/%d] tries.", times, retries)
return 0, errors.New("too many retries")
return nil, ErrTooManyRetires
}

log.Warnf("Function execution timeout. [%d/%d] tries.", times, retries)
return retryFunction()
continue
}
return nil, executionResult.Error
}

return executionResult.Value, executionResult.Error
}

return retryFunction()
}
106 changes: 106 additions & 0 deletions app/services/utils/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package utils

import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/accounts/abi"
Expand All @@ -25,11 +26,14 @@ import (
"github.com/limechain/hedera-eth-bridge-validator/app/clients/evm/contracts/router"
"github.com/limechain/hedera-eth-bridge-validator/app/domain/client"
"github.com/limechain/hedera-eth-bridge-validator/app/domain/service"
"github.com/limechain/hedera-eth-bridge-validator/app/model/retry"
"github.com/limechain/hedera-eth-bridge-validator/config"
"github.com/limechain/hedera-eth-bridge-validator/test/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"strings"
"testing"
"time"
)

var (
Expand Down Expand Up @@ -185,3 +189,105 @@ func Test_ConvertEvmHashToBridgeTxId_WithInvalidChainId(t *testing.T) {
assert.NotNil(t, err)
assert.Nil(t, actual)
}

func Test_Retry_HappyPath(t *testing.T) {
setup()

expectedValue := 1

res, err := service.Retry(func(ctx context.Context) retry.Result {
select {
case <-time.After(1 * time.Second): // Simulate work
case <-ctx.Done():
return retry.Result{
Value: nil,
Error: ctx.Err(),
}
}
return retry.Result{
Value: expectedValue,
Error: nil,
}
}, 1)

require.NoError(t, err)
require.Equal(t, expectedValue, res)
}

func Test_Retry_SuccessAfterRetry(t *testing.T) {
setup()

expectedValue := 1
currentRun := 0

res, err := service.Retry(func(ctx context.Context) retry.Result {
currentRun++

waitTime := 10 * time.Second
if currentRun > 1 {
waitTime = 1 * time.Second
}

select {
case <-time.After(waitTime): // Simulate work
case <-ctx.Done():
return retry.Result{
Value: nil,
Error: ctx.Err(),
}
}
return retry.Result{
Value: expectedValue,
Error: nil,
}
}, 2)

require.NoError(t, err)
require.Equal(t, expectedValue, res)
require.Equal(t, currentRun, 2)
}

func Test_Retry_Timeout(t *testing.T) {
setup()

expectedValue := 1

res, err := service.Retry(func(ctx context.Context) retry.Result {
waitTime := 10 * time.Second

select {
case <-time.After(waitTime): // Simulate work
case <-ctx.Done():
return retry.Result{
Value: nil,
Error: ctx.Err(),
}
}
return retry.Result{
Value: expectedValue,
Error: nil,
}
}, 1)

require.Error(t, err)
require.Nil(t, res)
require.ErrorIs(t, err, service.ErrTooManyRetires)
}

func Test_Retry_ReturnError(t *testing.T) {
setup()

currentRun := 0
res, err := service.Retry(func(ctx context.Context) retry.Result {
currentRun++
return retry.Result{
Value: nil,
Error: errors.New("some error"),
}
}, 3)

require.Error(t, err)
require.Nil(t, res)
require.Equal(t, err.Error(), "some error")
require.Equal(t, currentRun, 1)
}
11 changes: 0 additions & 11 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,17 @@ import (
"github.com/limechain/hedera-eth-bridge-validator/app/process/recovery"
"github.com/limechain/hedera-eth-bridge-validator/bootstrap"
"github.com/limechain/hedera-eth-bridge-validator/config"
"github.com/pkg/profile"
log "github.com/sirupsen/logrus"
"net/http"

_ "net/http/pprof"
)

func main() {
// Config
defer profile.Start(profile.MemProfile).Stop()

configuration, parsedBridge, err := config.LoadConfig()
if err != nil {
log.Fatalf("failed to load config: %v", err)
}

if !configuration.Node.Validator {
go func() {
http.ListenAndServe(":8080", nil)
}()
}

config.InitLogger(configuration.Node.LogLevel, configuration.Node.LogFormat)

// Prepare Clients
Expand Down

0 comments on commit 9eb5524

Please sign in to comment.