From 5f35ce8f2a0ee2acb2ae7626bf90ea12666d4025 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 25 Oct 2021 13:11:05 +0300 Subject: [PATCH] consumer: lower old RDYs first, then assign to new connection Currently when a consumer is connected to a new NSQD, the connection is added to the pool of connections and then a loop iterates over all of them, rebalancing the RDY values. The iteration order of a map is random, thus it can happen that the consumer tries to assign the RDY value to the new connection, before decreasing the existing ones. This leads to a RDY of 0 being assigned. This issue is even more pronounced when there are only two connections: initially all the RDY are assigned the existing connection, and when a new NSQD is connected, the old one needs to be cut in half and the half just taken away gets assigned to the new connection. If map iteration happens in reverse order - starting with the new connection - it will get assigned 0 RDY and then the old connection gets cut in half. The end result is blocked communication on the new NSQD instance until a new round of rebalance is triggered. This issue may be less relevant in long running processes, but it is very annoying in tests where we're adding and remocing NSQD instances and the test hangs from time to time due to a flaky RDY allocation. The issue is fixed by fist iterating over all the existing connections and rebalancing them, and only at the very end calling maybeUpdateRDY on the new NSQD instance. --- consumer.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/consumer.go b/consumer.go index b4d7487b..895c89a5 100644 --- a/consumer.go +++ b/consumer.go @@ -609,8 +609,11 @@ func (r *Consumer) ConnectToNSQD(addr string) error { // pre-emptive signal to existing connections to lower their RDY count for _, c := range r.conns() { - r.maybeUpdateRDY(c) + if c != conn { + r.maybeUpdateRDY(c) + } } + r.maybeUpdateRDY(conn) return nil } @@ -912,7 +915,9 @@ func (r *Consumer) maybeUpdateRDY(conn *Conn) { count := r.perConnMaxInFlight() r.log(LogLevelDebug, "(%s) sending RDY %d", conn, count) - r.updateRDY(conn, count) + if err := r.updateRDY(conn, count); err != nil { + r.log(LogLevelWarning, "(%s) error sending RDY %d: %v", conn, count, err) + } } func (r *Consumer) rdyLoop() {