Skip to content

Commit

Permalink
consumer: lower old RDYs first, then assign to new connection
Browse files Browse the repository at this point in the history
Currently when a consumer is connected to a new NSQD, the connection
is added to the pool of connections and then a loop iterates over all
of them, rebalancing the RDY values. The iteration order of a map is
random, thus it can happen that the consumer tries to assign the RDY
value to the new connection, before decreasing the existing ones. This
leads to a RDY of 0 being assigned.

This issue is even more pronounced when there are only two connections:
initially all the RDY are assigned the existing connection, and when a
new NSQD is connected, the old one needs to be cut in half and the half
just taken away gets assigned to the new connection. If map iteration
happens in reverse order - starting with the new connection - it will
get assigned 0 RDY and then the old connection gets cut in half. The
end result is blocked communication on the new NSQD instance until a new
round of rebalance is triggered.

This issue may be less relevant in long running processes, but it is
very annoying in tests where we're adding and remocing NSQD instances
and the test hangs from time to time due to a flaky RDY allocation.

The issue is fixed by fist iterating over all the existing connections
and rebalancing them, and only at the very end calling maybeUpdateRDY on
the new NSQD instance.
  • Loading branch information
karalabe committed Oct 25, 2021
1 parent 259dc59 commit 5f35ce8
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,11 @@ func (r *Consumer) ConnectToNSQD(addr string) error {

// pre-emptive signal to existing connections to lower their RDY count
for _, c := range r.conns() {
r.maybeUpdateRDY(c)
if c != conn {
r.maybeUpdateRDY(c)
}
}
r.maybeUpdateRDY(conn)

return nil
}
Expand Down Expand Up @@ -912,7 +915,9 @@ func (r *Consumer) maybeUpdateRDY(conn *Conn) {

count := r.perConnMaxInFlight()
r.log(LogLevelDebug, "(%s) sending RDY %d", conn, count)
r.updateRDY(conn, count)
if err := r.updateRDY(conn, count); err != nil {
r.log(LogLevelWarning, "(%s) error sending RDY %d: %v", conn, count, err)
}
}

func (r *Consumer) rdyLoop() {
Expand Down

0 comments on commit 5f35ce8

Please sign in to comment.