Skip to content

Commit

Permalink
transport: pool, store connections using atomic.Pointer[Conn]
Browse files Browse the repository at this point in the history
  • Loading branch information
Kulezi committed Sep 12, 2022
1 parent 655c377 commit 4c1e97e
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/google/go-cmp v0.5.6
github.com/klauspost/compress v1.15.1
github.com/pierrec/lz4/v4 v4.1.14
go.uber.org/atomic v1.9.0
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.12
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
4 changes: 2 additions & 2 deletions transport/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package transport

func (p *ConnPool) AllConns() []*Conn {
var conns = make([]*Conn, len(p.conns))
for i, v := range p.conns {
conns[i], _ = v.Load().(*Conn)
for i := range conns {
conns[i] = p.loadConn(i)
}
return conns
}
12 changes: 5 additions & 7 deletions transport/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type ConnPool struct {
host string
nrShards int
msbIgnore uint8
conns []atomic.Value
conns []atomic.Pointer[Conn]
connClosedCh chan int // notification channel for when connection is closed
connObs ConnObserver
}
Expand Down Expand Up @@ -99,13 +99,11 @@ func (p *ConnPool) storeConn(conn *Conn) {
}

func (p *ConnPool) loadConn(shard int) *Conn {
conn, _ := p.conns[shard].Load().(*Conn)
return conn
return p.conns[shard].Load()
}

func (p *ConnPool) clearConn(shard int) bool {
conn, _ := p.conns[shard].Swap((*Conn)(nil)).(*Conn)
return conn != nil
return p.conns[shard].Swap(nil) != nil
}

func (p *ConnPool) Close() {
Expand All @@ -115,7 +113,7 @@ func (p *ConnPool) Close() {
// closeAll is called by PoolRefiller.
func (p *ConnPool) closeAll() {
for i := range p.conns {
if conn, ok := p.conns[i].Swap((*Conn)(nil)).(*Conn); ok {
if conn := p.conns[i].Swap(nil); conn != nil {
conn.Close()
}
}
Expand Down Expand Up @@ -168,7 +166,7 @@ func (r *PoolRefiller) init(ctx context.Context, host string) error {
host: host,
nrShards: int(ss.NrShards),
msbIgnore: ss.MsbIgnore,
conns: make([]atomic.Value, int(ss.NrShards)),
conns: make([]atomic.Pointer[Conn], int(ss.NrShards)),
connClosedCh: make(chan int, int(ss.NrShards)+1),
connObs: r.cfg.ConnObserver,
}
Expand Down

0 comments on commit 4c1e97e

Please sign in to comment.