Skip to content

Commit

Permalink
Generalize exported WriteStream vs WriteStdin (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
jawang35 authored Nov 6, 2018
1 parent 51086a1 commit e335f55
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 38 deletions.
29 changes: 23 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Go package with utilities for interacting with [Badger](https://github.com/dgrap


- [Getting Started](#getting-started)
- [Stream Stdin to Badger](#stream-stdin-to-badger)
- [IO Stream to Badger](#io-stream-to-badger)
- [Example](#example)
- [Development](#development)
- [Dependency Management](#dependency-management)
Expand All @@ -20,21 +20,27 @@ Go package with utilities for interacting with [Badger](https://github.com/dgrap

## Getting Started

### Stream Stdin to Badger
### IO Stream to Badger

To create a CLI to stream stdin into Badger, use `badgerutils.WriteStdin`. It takes a function `lineToKeyed` as a parameter, which converts a `string` into a struct that implements the `Keyed` interface.
To stream data Badger, use `badgerutils.WriteStream`.

#### Example

Creates a CLI tool that streams data from stdin.

```Go
// examples/writer.go
// examples/writer_cli.go
package main

import (
"errors"
"flag"
"fmt"
"github.com/Surfline/badgerutils"
"log"
"os"
"strings"

"github.com/Surfline/badgerutils"
)

type sampleRecord struct {
Expand All @@ -53,7 +59,18 @@ func lineToKeyed(line string) (badgerutils.Keyed, error) {
}

func main() {
if err := badgerutils.WriteStdin(lineToKeyed); err != nil {
dir := flag.String("dir", "", "Directory to save DB files")
batchSize := flag.Int("batch-size", 1000, "Number of records to write per transaction")
flag.Parse()

if *dir == "" {
log.Fatal(errors.New("dir flag is required"))
}

log.Printf("Directory: %v", *dir)
log.Printf("Batch Size: %v", *batchSize)

if err := badgerutils.WriteStream(os.Stdin, *dir, *batchSize, lineToKeyed); err != nil {
log.Fatal(err)
}
}
Expand Down
19 changes: 17 additions & 2 deletions examples/writer.go → examples/writer_cli.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package main

import (
"errors"
"flag"
"fmt"
"github.com/Surfline/badgerutils"
"log"
"os"
"strings"

"github.com/Surfline/badgerutils"
)

type sampleRecord struct {
Expand All @@ -23,7 +27,18 @@ func lineToKeyed(line string) (badgerutils.Keyed, error) {
}

func main() {
if err := badgerutils.WriteStdin(lineToKeyed); err != nil {
dir := flag.String("dir", "", "Directory to save DB files")
batchSize := flag.Int("batch-size", 1000, "Number of records to write per transaction")
flag.Parse()

if *dir == "" {
log.Fatal(errors.New("dir flag is required"))
}

log.Printf("Directory: %v", *dir)
log.Printf("Batch Size: %v", *batchSize)

if err := badgerutils.WriteStream(os.Stdin, *dir, *batchSize, lineToKeyed); err != nil {
log.Fatal(err)
}
}
36 changes: 10 additions & 26 deletions writer.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
// Package badgerutils provides functions for interacting with the underlying database.
// Package badgerutils provides functions for interacting with Badger.
package badgerutils

import (
"bufio"
"bytes"
"encoding/gob"
"errors"
"flag"
"fmt"
"github.com/dgraph-io/badger"
"io"
"log"
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/badger"
)

// Keyed interface defines a Key method for defining a key from a struct.
type Keyed interface {
Key() string
}
Expand Down Expand Up @@ -72,10 +71,10 @@ func writeBatch(kvs []keyValue, db *badger.DB, cherr chan error, done func(int32
})
}

func writeInput(reader io.Reader, dir string, batchSize int, lineToKeyed func(string) (Keyed, error)) error {
log.Printf("Directory: %v", dir)
log.Printf("Batch Size: %v", batchSize)

// WriteStream translates io.Reader stream into key/value pairs that are written into the Badger.
// lineToKeyed function parameter defines how stdin is translated to a value and how to define a key
// from that value.
func WriteStream(reader io.Reader, dir string, batchSize int, lineToKeyed func(string) (Keyed, error)) error {
// Open Badger database from directory
opts := badger.DefaultOptions
opts.Dir = dir
Expand All @@ -100,7 +99,7 @@ func writeInput(reader io.Reader, dir string, batchSize int, lineToKeyed func(st
kvBatch := make([]keyValue, 0)
cherr := make(chan error)

// Read from stdin and write key/values in batches
// Read from stream and write key/values in batches
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
kv, err := stringToKeyValue(scanner.Text(), lineToKeyed)
Expand All @@ -121,7 +120,7 @@ func writeInput(reader io.Reader, dir string, batchSize int, lineToKeyed func(st
writeBatch(kvBatch, db, cherr, done)
}

// Read and handle errors streaming from stdin
// Read and handle errors from stream
if err = scanner.Err(); err != nil {
return err
}
Expand All @@ -144,18 +143,3 @@ func writeInput(reader io.Reader, dir string, batchSize int, lineToKeyed func(st
log.Printf("Inserted %v records in %v", kvCount.get(), elapsed)
return nil
}

// WriteStdin translates stdin into key/value pairs that are written into the Badger.
// lineToKeyed function parameter defines how stdin is translated to a value and how to define a key
// from that value.
func WriteStdin(lineToKeyed func(string) (Keyed, error)) error {
dir := flag.String("dir", "", "Directory to save DB files")
batchSize := flag.Int("batch-size", 1000, "Number of records to write per transaction")
flag.Parse()

if *dir == "" {
return errors.New("dir flag is required")
}

return writeInput(os.Stdin, *dir, *batchSize, lineToKeyed)
}
9 changes: 5 additions & 4 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"bytes"
"encoding/gob"
"fmt"
"github.com/dgraph-io/badger"
"github.com/stretchr/testify/require"
"io/ioutil"
"os"
"strings"
"testing"

"github.com/dgraph-io/badger"
"github.com/stretchr/testify/require"
)

type sampleRecord struct {
Expand Down Expand Up @@ -76,7 +77,7 @@ func readDB(dir string) ([]sampleRecord, error) {
return sampleRecords, nil
}

func TestWriteInput(t *testing.T) {
func TestWriteStream(t *testing.T) {
dir, err := os.Getwd()
require.Nil(t, err)
tmpDir, err := ioutil.TempDir(dir, "temp")
Expand All @@ -86,7 +87,7 @@ func TestWriteInput(t *testing.T) {
reader := strings.NewReader(`field11,field12,field13
field21,field22,field23
field31,field32,field33`)
err = writeInput(reader, tmpDir, 2, csvToSampleRecord)
err = WriteStream(reader, tmpDir, 2, csvToSampleRecord)
require.Nil(t, err)

writtenSampleRecords, err := readDB(tmpDir)
Expand Down

0 comments on commit e335f55

Please sign in to comment.