From b17b205899316d09dafbc96562f1e9975303fe0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Putra?= Date: Wed, 31 Aug 2022 15:15:27 +0200 Subject: [PATCH 1/5] go.mod: bump go version to 1.19 --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index d269f50b..1707ed11 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/scylladb/scylla-go-driver -go 1.18 +go 1.19 require ( github.com/google/go-cmp v0.5.6 From 1a50af7556d9ea0c2a560a4e82f1195002c365cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Putra?= Date: Fri, 9 Sep 2022 16:21:20 +0200 Subject: [PATCH 2/5] transport: go fmt --- transport/policy_test.go | 44 ++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/transport/policy_test.go b/transport/policy_test.go index d00f0d2a..966ef3c0 100644 --- a/transport/policy_test.go +++ b/transport/policy_test.go @@ -139,16 +139,16 @@ func TestDCAwareRoundRobinPolicy(t *testing.T) { //nolint:paralleltest // Can't } /* - mockTopologyTokenAwareSimpleStrategy creates cluster topology with info about 3 nodes living in the same datacenter. +mockTopologyTokenAwareSimpleStrategy creates cluster topology with info about 3 nodes living in the same datacenter. - Ring field is populated as follows: - ring tokens: 50 100 150 200 250 300 400 500 - corresponding node ids: 2 1 2 3 1 2 3 1 +Ring field is populated as follows: +ring tokens: 50 100 150 200 250 300 400 500 +corresponding node ids: 2 1 2 3 1 2 3 1 - Keyspaces: - names: "rf2" "rf3" - strategies: simple simple - rep factors: 2 3 +Keyspaces: +names: "rf2" "rf3" +strategies: simple simple +rep factors: 2 3. */ func mockTopologyTokenAwareSimpleStrategy() *topology { dummyNodes := []*Node{ @@ -239,24 +239,24 @@ func TestTokenAwareSimpleStrategyPolicy(t *testing.T) { //nolint:paralleltest // } /* - mockTopologyTokenAwareNetworkStrategy creates cluster topology with info about 8 nodes - living in two different datacenters. +mockTopologyTokenAwareNetworkStrategy creates cluster topology with info about 8 nodes +living in two different datacenters. - Ring field is populated as follows: - ring tokens: 50 100 150 200 250 300 400 500 510 - corresponding node ids: 1 5 2 1 6 4 8 7 3 +Ring field is populated as follows: +ring tokens: 50 100 150 200 250 300 400 500 510 +corresponding node ids: 1 5 2 1 6 4 8 7 3 - Datacenter: waw - nodes in rack r1: 1 2 - nodes in rack r2: 3 4 +Datacenter: waw +nodes in rack r1: 1 2 +nodes in rack r2: 3 4 - Datacenter: her - nodes in rack r3: 5 6 - nodes in rack r4: 7 8 +Datacenter: her +nodes in rack r3: 5 6 +nodes in rack r4: 7 8 - Keyspace: "waw/her" - strategy: network topology - replication factors: waw: 2 her: 3 +Keyspace: "waw/her" +strategy: network topology +replication factors: waw: 2 her: 3. */ func mockTopologyTokenAwareDCAwareStrategy() *topology { dummyNodes := []*Node{ From 3c89450e0b20abee164bc29cc430730a3397af41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Putra?= Date: Wed, 31 Aug 2022 15:30:22 +0200 Subject: [PATCH 3/5] transport: pool, store connections using atomic.Pointer[Conn] --- go.mod | 2 +- go.sum | 2 ++ transport/export_test.go | 4 ++-- transport/pool.go | 12 +++++------- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 1707ed11..68539814 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/google/go-cmp v0.5.6 github.com/klauspost/compress v1.15.1 github.com/pierrec/lz4/v4 v4.1.14 - go.uber.org/atomic v1.9.0 + go.uber.org/atomic v1.10.0 go.uber.org/goleak v1.1.12 ) diff --git a/go.sum b/go.sum index 1a791895..2babdf6f 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/transport/export_test.go b/transport/export_test.go index 84a4a911..f9258628 100644 --- a/transport/export_test.go +++ b/transport/export_test.go @@ -2,8 +2,8 @@ package transport func (p *ConnPool) AllConns() []*Conn { var conns = make([]*Conn, len(p.conns)) - for i, v := range p.conns { - conns[i], _ = v.Load().(*Conn) + for i := range conns { + conns[i] = p.loadConn(i) } return conns } diff --git a/transport/pool.go b/transport/pool.go index b41a21d4..f0f770cc 100644 --- a/transport/pool.go +++ b/transport/pool.go @@ -19,7 +19,7 @@ type ConnPool struct { host string nrShards int msbIgnore uint8 - conns []atomic.Value + conns []atomic.Pointer[Conn] connClosedCh chan int // notification channel for when connection is closed connObs ConnObserver } @@ -99,13 +99,11 @@ func (p *ConnPool) storeConn(conn *Conn) { } func (p *ConnPool) loadConn(shard int) *Conn { - conn, _ := p.conns[shard].Load().(*Conn) - return conn + return p.conns[shard].Load() } func (p *ConnPool) clearConn(shard int) bool { - conn, _ := p.conns[shard].Swap((*Conn)(nil)).(*Conn) - return conn != nil + return p.conns[shard].Swap(nil) != nil } func (p *ConnPool) Close() { @@ -115,7 +113,7 @@ func (p *ConnPool) Close() { // closeAll is called by PoolRefiller. func (p *ConnPool) closeAll() { for i := range p.conns { - if conn, ok := p.conns[i].Swap((*Conn)(nil)).(*Conn); ok { + if conn := p.conns[i].Swap(nil); conn != nil { conn.Close() } } @@ -168,7 +166,7 @@ func (r *PoolRefiller) init(ctx context.Context, host string) error { host: host, nrShards: int(ss.NrShards), msbIgnore: ss.MsbIgnore, - conns: make([]atomic.Value, int(ss.NrShards)), + conns: make([]atomic.Pointer[Conn], int(ss.NrShards)), connClosedCh: make(chan int, int(ss.NrShards)+1), connObs: r.cfg.ConnObserver, } From ffef1b2336c215695481607220c67cf09079f05b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Putra?= Date: Mon, 12 Sep 2022 12:57:19 +0200 Subject: [PATCH 4/5] transport: cluster, store topology using atomic.Pointer[topology] --- transport/cluster.go | 4 ++-- transport/cluster_integration_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/transport/cluster.go b/transport/cluster.go index e95511ff..0a6aad56 100644 --- a/transport/cluster.go +++ b/transport/cluster.go @@ -26,7 +26,7 @@ type ( ) type Cluster struct { - topology atomic.Value // *topology + topology atomic.Pointer[topology] control *Conn cfg ConnConfig handledEvents []frame.EventType // This will probably be moved to config. @@ -443,7 +443,7 @@ func parseTokensFromRow(n *Node, r frame.Row, ring *Ring) error { } func (c *Cluster) Topology() *topology { - return c.topology.Load().(*topology) + return c.topology.Load() } func (c *Cluster) setTopology(t *topology) { diff --git a/transport/cluster_integration_test.go b/transport/cluster_integration_test.go index d8f29c13..3a752e2a 100644 --- a/transport/cluster_integration_test.go +++ b/transport/cluster_integration_test.go @@ -80,7 +80,7 @@ func TestClusterIntegration(t *testing.T) { } // There should be at least system keyspaces present. - if len(c.topology.Load().(*topology).keyspaces) == 0 { + if len(c.Topology().keyspaces) == 0 { t.Fatalf("Keyspaces failed to load") } From f757a7769efeb37ca43720d1aa2209173f3f2f59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Putra?= Date: Wed, 14 Sep 2022 10:44:07 +0200 Subject: [PATCH 5/5] transport: cluster, add Nodes() in place of Topology() and setTopology() The point of Topology() and setTopology() cluster methods was to give some sort of type safety as topology was an atomic.Value that can hold any type. As topology is now an atomic.Pointer they are no longer needed. Session sometimes needs to perform a query on all nodes of the cluster, to allow that cluster now has an exported Nodes() method. --- session.go | 4 ++-- transport/cluster.go | 28 ++++++++++++--------------- transport/cluster_integration_test.go | 4 ++-- transport/policy_test.go | 2 +- 4 files changed, 17 insertions(+), 21 deletions(-) diff --git a/session.go b/session.go index b68c2b15..4f37edd1 100644 --- a/session.go +++ b/session.go @@ -161,7 +161,7 @@ func (s *Session) Prepare(ctx context.Context, content string) (Query, error) { stmt := transport.Statement{Content: content, Consistency: frame.ALL} // Prepare on all nodes concurrently. - nodes := s.cluster.Topology().Nodes + nodes := s.cluster.Nodes() resStmt := make([]transport.Statement, len(nodes)) resErr := make([]error, len(nodes)) var wg sync.WaitGroup @@ -234,7 +234,7 @@ func (s *Session) handleAutoAwaitSchemaAgreement(ctx context.Context, stmt strin func (s *Session) CheckSchemaAgreement(ctx context.Context) (bool, error) { // Get schema version from all nodes concurrently. - nodes := s.cluster.Topology().Nodes + nodes := s.cluster.Nodes() versions := make([]frame.UUID, len(nodes)) errors := make([]error, len(nodes)) var wg sync.WaitGroup diff --git a/transport/cluster.go b/transport/cluster.go index 0a6aad56..e8d818af 100644 --- a/transport/cluster.go +++ b/transport/cluster.go @@ -38,6 +38,10 @@ type Cluster struct { queryInfoCounter atomic.Uint64 } +func (c *Cluster) Nodes() []*Node { + return c.topology.Load().Nodes +} + type topology struct { localDC string peers peerMap @@ -86,13 +90,13 @@ type QueryInfo struct { func (c *Cluster) NewQueryInfo() QueryInfo { return QueryInfo{ tokenAware: false, - topology: c.Topology(), + topology: c.topology.Load(), offset: c.generateOffset(), } } func (c *Cluster) NewTokenAwareQueryInfo(t Token, ks string) (QueryInfo, error) { - top := c.Topology() + top := c.topology.Load() // When keyspace is not specified, we take default keyspace from ConnConfig. if ks == "" { if c.cfg.Keyspace == "" { @@ -144,7 +148,7 @@ func NewCluster(ctx context.Context, cfg ConnConfig, p HostSelectionPolicy, e [] if p, ok := p.(*TokenAwarePolicy); ok { localDC = p.localDC } - c.setTopology(&topology{localDC: localDC}) + c.topology.Store(&topology{localDC: localDC}) if control, err := c.NewControl(ctx); err != nil { return nil, fmt.Errorf("create control connection: %w", err) @@ -190,9 +194,9 @@ func (c *Cluster) refreshTopology(ctx context.Context) error { return fmt.Errorf("query info about nodes in cluster: %w", err) } - old := c.Topology().peers + old := c.topology.Load().peers t := newTopology() - t.localDC = c.Topology().localDC + t.localDC = c.topology.Load().localDC t.keyspaces, err = c.updateKeyspace(ctx) if err != nil { return fmt.Errorf("query keyspaces: %w", err) @@ -247,7 +251,7 @@ func (c *Cluster) refreshTopology(ctx context.Context) error { t.policyInfo.Preprocess(t, keyspace{}) } - c.setTopology(t) + c.topology.Store(t) drainChan(c.refreshChan) return nil } @@ -442,14 +446,6 @@ func parseTokensFromRow(n *Node, r frame.Row, ring *Ring) error { return nil } -func (c *Cluster) Topology() *topology { - return c.topology.Load() -} - -func (c *Cluster) setTopology(t *topology) { - c.topology.Store(t) -} - // handleEvent creates function which is passed to control connection // via registerEvents in order to handle events right away instead // of registering handlers for them. @@ -478,7 +474,7 @@ func (c *Cluster) handleTopologyChange(v *TopologyChange) { func (c *Cluster) handleStatusChange(v *StatusChange) { log.Printf("cluster: handle status change: %+#v", v) - m := c.Topology().peers + m := c.topology.Load().peers addr := v.Address.String() if n, ok := m[addr]; ok { switch v.Status { @@ -549,7 +545,7 @@ func (c *Cluster) tryReopenControl(ctx context.Context) { func (c *Cluster) handleClose() { log.Printf("cluster: handle cluster close") c.control.Close() - m := c.Topology().peers + m := c.topology.Load().peers for _, v := range m { if v.pool != nil { v.pool.Close() diff --git a/transport/cluster_integration_test.go b/transport/cluster_integration_test.go index 3a752e2a..ab93d483 100644 --- a/transport/cluster_integration_test.go +++ b/transport/cluster_integration_test.go @@ -17,7 +17,7 @@ import ( const awaitingChanges = 100 * time.Millisecond func compareNodes(c *Cluster, addr string, expected *Node) error { - m := c.Topology().peers + m := c.topology.Load().peers got, ok := m[addr] switch { case !ok: @@ -80,7 +80,7 @@ func TestClusterIntegration(t *testing.T) { } // There should be at least system keyspaces present. - if len(c.Topology().keyspaces) == 0 { + if len(c.topology.Load().keyspaces) == 0 { t.Fatalf("Keyspaces failed to load") } diff --git a/transport/policy_test.go b/transport/policy_test.go index 966ef3c0..f7c59786 100644 --- a/transport/policy_test.go +++ b/transport/policy_test.go @@ -33,7 +33,7 @@ func mockCluster(t *topology, ks, localDC string) *Cluster { } else { t.policyInfo.Preprocess(t, keyspace{}) } - c.setTopology(t) + c.topology.Store(t) return &c }