Skip to content

Commit

Permalink
Merge pull request #209 from bonitoo-io/fix/wab_sync
Browse files Browse the repository at this point in the history
fix: synchronizing access to write service
  • Loading branch information
vlastahajek authored Oct 30, 2020
2 parents f8b3f3d + 2df75aa commit 2068222
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
1. [#206](https://github.com/influxdata/influxdb-client-go/pull/206) Adding TasksAPI for managing tasks and associated logs and runs.

### Bug fixes
1. [#209](https://github.com/influxdata/influxdb-client-go/pull/209) Synchronizing access to the write service in WriteAPIBlocking.

### Documentation

Expand Down
4 changes: 4 additions & 0 deletions api/writeAPIBlocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package api
import (
"context"
"strings"
"sync"

http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
"github.com/influxdata/influxdb-client-go/v2/api/write"
Expand Down Expand Up @@ -60,6 +61,7 @@ type WriteAPIBlocking interface {
type writeAPIBlocking struct {
service *iwrite.Service
writeOptions *write.Options
lock sync.Mutex
}

// NewWriteAPIBlocking creates new WriteAPIBlocking instance for org and bucket with underlying client
Expand All @@ -68,6 +70,8 @@ func NewWriteAPIBlocking(org string, bucket string, service http2.Service, write
}

func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
w.lock.Lock()
defer w.lock.Unlock()
err := w.service.HandleWrite(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval()))
return err
}
Expand Down
27 changes: 27 additions & 0 deletions api/writeAPIBlocking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,30 @@ func TestWriteContextCancel(t *testing.T) {
require.Equal(t, context.Canceled, err)
assert.Len(t, service.Lines(), 0)
}

func TestWriteParallel(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
lines := genRecords(1000)

chanLine := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for l := range chanLine {
err := writeAPI.WriteRecord(context.Background(), l)
assert.Nil(t, err)
}
wg.Done()
}()
}
for _, l := range lines {
chanLine <- l
}
close(chanLine)
wg.Wait()
assert.Len(t, service.Lines(), len(lines))

service.Close()
}

0 comments on commit 2068222

Please sign in to comment.