From cbba4a82cb2c7b8104d799b8dd26a5fad5908915 Mon Sep 17 00:00:00 2001 From: rekyyang <511965710@qq.com> Date: Thu, 9 Jan 2025 22:21:57 +0800 Subject: [PATCH 1/3] fix done channel --- client/xclient.go | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/client/xclient.go b/client/xclient.go index 2b0676ef..b1f136cf 100644 --- a/client/xclient.go +++ b/client/xclient.go @@ -16,11 +16,12 @@ import ( "time" "github.com/juju/ratelimit" + "golang.org/x/sync/singleflight" + ex "github.com/smallnest/rpcx/errors" "github.com/smallnest/rpcx/log" "github.com/smallnest/rpcx/protocol" "github.com/smallnest/rpcx/share" - "golang.org/x/sync/singleflight" ) const ( @@ -982,7 +983,9 @@ func (c *xClient) Broadcast(ctx context.Context, serviceMethod string, args inte } e := c.wrapCall(ctx, client, serviceMethod, args, clonedReply) - done <- (e == nil) + defer func() { + done <- (e == nil) + }() if e != nil { if uncoverError(e) { c.removeClient(k, c.servicePath, serviceMethod, client) @@ -998,7 +1001,6 @@ func (c *xClient) Broadcast(ctx context.Context, serviceMethod string, args inte }() } - timeout := time.NewTimer(time.Minute) check: for { select { @@ -1007,12 +1009,8 @@ check: if l == 0 || !result { // all returns or some one returns an error break check } - case <-timeout.C: - err.Append(errors.New(("timeout"))) - break check } } - timeout.Stop() return err.ErrorOrNil() } @@ -1080,7 +1078,9 @@ func (c *xClient) Fork(ctx context.Context, serviceMethod string, args interface reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(clonedReply).Elem()) }) } - done <- (e == nil) + defer func() { + done <- (e == nil) + }() if e != nil { if uncoverError(e) { c.removeClient(k, c.servicePath, serviceMethod, client) @@ -1090,7 +1090,6 @@ func (c *xClient) Fork(ctx context.Context, serviceMethod string, args interface }() } - timeout := time.NewTimer(time.Minute) check: for { select { @@ -1102,13 +1101,8 @@ check: if l == 0 { // all returns or some one returns an error break check } - - case <-timeout.C: - err.Append(errors.New(("timeout"))) - break check } } - timeout.Stop() return err.ErrorOrNil() } @@ -1175,7 +1169,9 @@ func (c *xClient) Inform(ctx context.Context, serviceMethod string, args interfa } e := c.wrapCall(ctx, client, serviceMethod, args, clonedReply) - done <- (e == nil) + defer func() { + done <- (e == nil) + }() if e != nil { if uncoverError(e) { c.removeClient(k, c.servicePath, serviceMethod, client) @@ -1204,7 +1200,6 @@ func (c *xClient) Inform(ctx context.Context, serviceMethod string, args interfa }() } - timeout := time.NewTimer(time.Minute) check: for { select { @@ -1213,12 +1208,8 @@ check: if l == 0 { // all returns or some one returns an error break check } - case <-timeout.C: - err.Append(errors.New(("timeout"))) - break check } } - timeout.Stop() return receipts, err.ErrorOrNil() } From 993dad169d6311cd73c2bce3c383c1fc2f830e9a Mon Sep 17 00:00:00 2001 From: rekyyang <511965710@qq.com> Date: Fri, 10 Jan 2025 15:23:52 +0800 Subject: [PATCH 2/3] opt timeout --- client/xclient.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/client/xclient.go b/client/xclient.go index b1f136cf..2318e1cb 100644 --- a/client/xclient.go +++ b/client/xclient.go @@ -945,6 +945,9 @@ func (c *xClient) Broadcast(ctx context.Context, serviceMethod string, args inte var replyOnce sync.Once ctx = setServerTimeout(ctx) + // add timeout after set server timeout, only prevent client hanging + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() callPlugins := make([]RPCClient, 0, len(c.servers)) clients := make(map[string]RPCClient) c.mu.Lock() @@ -1033,6 +1036,9 @@ func (c *xClient) Fork(ctx context.Context, serviceMethod string, args interface } ctx = setServerTimeout(ctx) + // add timeout after set server timeout, only prevent client hanging + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() callPlugins := make([]RPCClient, 0, len(c.servers)) clients := make(map[string]RPCClient) c.mu.Lock() @@ -1126,6 +1132,10 @@ func (c *xClient) Inform(ctx context.Context, serviceMethod string, args interfa } ctx = setServerTimeout(ctx) + + // add timeout after set server timeout, only prevent client hanging + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() callPlugins := make([]RPCClient, 0, len(c.servers)) clients := make(map[string]RPCClient) c.mu.Lock() From 5bf2c1aeeae7887318b9a82628f8de84e80d5f31 Mon Sep 17 00:00:00 2001 From: rekyyang <511965710@qq.com> Date: Fri, 10 Jan 2025 15:39:34 +0800 Subject: [PATCH 3/3] opt timeout --- client/xclient.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/client/xclient.go b/client/xclient.go index 2318e1cb..cf19af4b 100644 --- a/client/xclient.go +++ b/client/xclient.go @@ -1015,6 +1015,12 @@ check: } } + select { + case <-ctx.Done(): + err.Append(errors.New(("timeout"))) + default: + } + return err.ErrorOrNil() } @@ -1036,6 +1042,7 @@ func (c *xClient) Fork(ctx context.Context, serviceMethod string, args interface } ctx = setServerTimeout(ctx) + // add timeout after set server timeout, only prevent client hanging ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() @@ -1110,6 +1117,12 @@ check: } } + select { + case <-ctx.Done(): + err.Append(errors.New(("timeout"))) + default: + } + return err.ErrorOrNil() } @@ -1221,6 +1234,12 @@ check: } } + select { + case <-ctx.Done(): + err.Append(errors.New(("timeout"))) + default: + } + return receipts, err.ErrorOrNil() }