Skip to content

Commit

Permalink
fix(consumer): remove old nsqd connections if addresses change
Browse files Browse the repository at this point in the history
  • Loading branch information
jrkt committed Nov 10, 2022
1 parent 0e8d7a7 commit b8bb2c5
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ type Handler interface {
// HandlerFunc is a convenience type to avoid having to declare a struct
// to implement the Handler interface, it can be used like this:
//
// consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error {
// // handle the message
// }))
// consumer.AddHandler(nsq.HandlerFunc(func(m *Message) error {
// // handle the message
// }))
type HandlerFunc func(message *Message) error

// HandleMessage implements the Handler interface
Expand Down Expand Up @@ -220,8 +220,7 @@ func (r *Consumer) conns() []*Conn {
// The logger parameter is an interface that requires the following
// method to be implemented (such as the the stdlib log.Logger):
//
// Output(calldepth int, s string) error
//
// Output(calldepth int, s string) error
func (r *Consumer) SetLogger(l logger, lvl LogLevel) {
r.logGuard.Lock()
defer r.logGuard.Unlock()
Expand Down Expand Up @@ -266,8 +265,7 @@ func (r *Consumer) getLogLevel() LogLevel {
// of the following interfaces that modify the behavior
// of the `Consumer`:
//
// DiscoveryFilter
//
// DiscoveryFilter
func (r *Consumer) SetBehaviorDelegate(cb interface{}) {
matched := false

Expand Down Expand Up @@ -312,7 +310,7 @@ func (r *Consumer) getMaxInFlight() int32 {
// ChangeMaxInFlight sets a new maximum number of messages this comsumer instance
// will allow in-flight, and updates all existing connections as appropriate.
//
// For example, ChangeMaxInFlight(0) would pause message flow
// # For example, ChangeMaxInFlight(0) would pause message flow
//
// If already connected, it updates the reader RDY state for each connection.
func (r *Consumer) ChangeMaxInFlight(maxInFlight int) {
Expand Down Expand Up @@ -513,13 +511,33 @@ retry:
if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok {
nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
}

var successfulNsqdAddrs []string
for _, addr := range nsqdAddrs {
err = r.ConnectToNSQD(addr)
if err != nil && err != ErrAlreadyConnected {
r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
continue
}
successfulNsqdAddrs = append(successfulNsqdAddrs, addr)
}

// in the event that there are new nsqd addresses, remove the old connections from the connections map
for addr := range r.connections {
if !inAddrs(successfulNsqdAddrs, addr) {
delete(r.connections, addr)
}
}
}

func inAddrs(addrs []string, addr string) bool {
for _, a := range addrs {
if addr == a {
return true
}
}

return false
}

// ConnectToNSQDs takes multiple nsqd addresses to connect directly to.
Expand Down Expand Up @@ -1109,7 +1127,7 @@ func (r *Consumer) stopHandlers() {
// AddHandler sets the Handler for messages received by this Consumer. This can be called
// multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
// # This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) AddHandler(handler Handler) {
Expand All @@ -1120,7 +1138,7 @@ func (r *Consumer) AddHandler(handler Handler) {
// takes a second argument which indicates the number of goroutines to spawn for
// message handling.
//
// This panics if called after connecting to NSQD or NSQ Lookupd
// # This panics if called after connecting to NSQD or NSQ Lookupd
//
// (see Handler or HandlerFunc for details on implementing this interface)
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
Expand Down

0 comments on commit b8bb2c5

Please sign in to comment.