diff --git a/consumer.go b/consumer.go index b4d7487b..895c89a5 100644 --- a/consumer.go +++ b/consumer.go @@ -609,8 +609,11 @@ func (r *Consumer) ConnectToNSQD(addr string) error { // pre-emptive signal to existing connections to lower their RDY count for _, c := range r.conns() { - r.maybeUpdateRDY(c) + if c != conn { + r.maybeUpdateRDY(c) + } } + r.maybeUpdateRDY(conn) return nil } @@ -912,7 +915,9 @@ func (r *Consumer) maybeUpdateRDY(conn *Conn) { count := r.perConnMaxInFlight() r.log(LogLevelDebug, "(%s) sending RDY %d", conn, count) - r.updateRDY(conn, count) + if err := r.updateRDY(conn, count); err != nil { + r.log(LogLevelWarning, "(%s) error sending RDY %d: %v", conn, count, err) + } } func (r *Consumer) rdyLoop() {