Skip to content

Commit

Permalink
Switch to bytes for key/value (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
jawang35 authored Apr 29, 2019
1 parent 6afe550 commit 65d8176
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 97 deletions.
19 changes: 7 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,20 @@ import (
"github.com/Surfline/badgerutils"
)

type sampleValues struct {
Field1 string
Field2 string
}

type sampleRecord struct {
Key []string
Value sampleValues
Key string
Value string
}

func csvToKeyValue(line string) (*badgerutils.KeyValue, error) {
values := strings.Split(line, ",")
if len(values) < 4 {
return nil, fmt.Errorf("%v has less than 4 values", line)
values := strings.Split(line, ":")
if len(values) < 2 {
return nil, fmt.Errorf("%v has less than 2 values", line)
}

return &badgerutils.KeyValue{
Key: []interface{}{values[0], values[1]},
Value: sampleValues{values[2], values[3]},
Key: []byte(kv[0]),
Value: []byte(kv[1]),
}, nil
}

Expand Down
19 changes: 7 additions & 12 deletions examples/writer_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,20 @@ import (
"github.com/Surfline/badgerutils"
)

type sampleValues struct {
Field1 string
Field2 string
}

type sampleRecord struct {
Key []string
Value sampleValues
Key string
Value string
}

func csvToKeyValue(line string) (*badgerutils.KeyValue, error) {
values := strings.Split(line, ",")
if len(values) < 4 {
return nil, fmt.Errorf("%v has less than 4 values", line)
kv := strings.Split(line, ":")
if len(kv) < 2 {
return nil, fmt.Errorf("%v has less than 2 kv", line)
}

return &badgerutils.KeyValue{
Key: []interface{}{values[0], values[1]},
Value: sampleValues{values[2], values[3]},
Key: []byte(kv[0]),
Value: []byte(kv[1]),
}, nil
}

Expand Down
45 changes: 14 additions & 31 deletions test_helpers.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,26 @@
package badgerutils

import (
"bytes"
"encoding/gob"
"fmt"
"strings"

"github.com/dgraph-io/badger"
)

type sampleValues struct {
Field1 string
Field2 string
Field3 string
}

type sampleRecord struct {
Key []string
Value sampleValues
Key string
Value string
}

func csvToKeyValue(line string) (*KeyValue, error) {
values := strings.Split(line, ",")
if len(values) < 3 {
return nil, fmt.Errorf("%v has less than 3 values", line)
kv := strings.Split(line, ":")
if len(kv) < 2 {
return nil, fmt.Errorf("%v has less than 2 kv", line)
}

return &KeyValue{
Key: values,
Value: sampleValues{values[0], values[1], values[2]},
Key: []byte(kv[0]),
Value: []byte(kv[1]),
}, nil
}

Expand All @@ -39,8 +31,8 @@ func readDB(dir string) ([]sampleRecord, error) {
}
defer db.Close()

chkv, cherr := make(chan kvBytes), make(chan error)
go func(chan kvBytes, chan error) {
chkv, cherr := make(chan KeyValue), make(chan error)
go func(chan KeyValue, chan error) {
err := db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
Expand All @@ -52,7 +44,7 @@ func readDB(dir string) ([]sampleRecord, error) {
if err != nil {
return err
}
kv := kvBytes{key, value}
kv := KeyValue{Key: key, Value: value}
chkv <- kv
}
close(chkv)
Expand All @@ -63,19 +55,10 @@ func readDB(dir string) ([]sampleRecord, error) {

sampleRecords := make([]sampleRecord, 0)
for kv := range chkv {
key := make([]string, 3)
keyBuf := bytes.NewReader(kv.Key)
if keyErr := gob.NewDecoder(keyBuf).Decode(&key); keyErr != nil {
return nil, keyErr
}

val := new(sampleValues)
valBuf := bytes.NewReader(kv.Value)
if valErr := gob.NewDecoder(valBuf).Decode(&val); valErr != nil {
return nil, valErr
}

sampleRecords = append(sampleRecords, sampleRecord{Key: key, Value: *val})
sampleRecords = append(sampleRecords, sampleRecord{
Key: string(kv.Key),
Value: string(kv.Value),
})
}

if err := <-cherr; err != nil {
Expand Down
37 changes: 4 additions & 33 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package badgerutils

import (
"bufio"
"bytes"
"encoding/gob"
"fmt"
"io"
"log"
Expand All @@ -18,11 +16,6 @@ import (

// KeyValue struct defines a Key and a Value empty interface to be translated into a record.
type KeyValue struct {
Key interface{}
Value interface{}
}

type kvBytes struct {
Key []byte
Value []byte
}
Expand All @@ -37,29 +30,7 @@ func (c *count32) get() int32 {
return atomic.LoadInt32((*int32)(c))
}

func stringToKVBytes(str string, lineToKeyValue func(string) (*KeyValue, error)) (*kvBytes, error) {
record, parseErr := lineToKeyValue(str)
if parseErr != nil {
return nil, parseErr
}

keyBuf := &bytes.Buffer{}
if keyErr := gob.NewEncoder(keyBuf).Encode(record.Key); keyErr != nil {
return nil, keyErr
}

valBuf := &bytes.Buffer{}
if valErr := gob.NewEncoder(valBuf).Encode(record.Value); valErr != nil {
return nil, valErr
}

return &kvBytes{
Key: keyBuf.Bytes(),
Value: valBuf.Bytes(),
}, nil
}

func writeBatch(kvs []kvBytes, db *badger.DB, cherr chan error, done func(int32)) {
func writeBatch(kvs []KeyValue, db *badger.DB, cherr chan error, done func(int32)) {
txn := db.NewTransaction(true)
defer txn.Discard()

Expand Down Expand Up @@ -102,21 +73,21 @@ func WriteStream(reader io.Reader, dir string, batchSize int, lineToKeyValue fun
wg.Done()
}

kvBatch := make([]kvBytes, 0)
kvBatch := make([]KeyValue, 0)
cherr := make(chan error)

// Read from stream and write key/values in batches
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
kv, err := stringToKVBytes(scanner.Text(), lineToKeyValue)
kv, err := lineToKeyValue(scanner.Text())
if err != nil {
return err
}
kvBatch = append(kvBatch, *kv)
if len(kvBatch) == batchSize {
wg.Add(1)
go writeBatch(kvBatch, db, cherr, done)
kvBatch = make([]kvBytes, 0)
kvBatch = make([]KeyValue, 0)
}
}

Expand Down
18 changes: 9 additions & 9 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,25 @@ func TestWriteStream(t *testing.T) {

dbPath := path.Join(tmpDir, "path", "to", "db")

reader := strings.NewReader(`field11,field12,field13
field21,field22,field23
field31,field32,field33`)
reader := strings.NewReader(`key1:value1
key2:value2
key3:value3`)
err = WriteStream(reader, dbPath, 2, csvToKeyValue)
require.Nil(t, err)

writtenSampleRecords, err := readDB(dbPath)
require.Nil(t, err)
require.Equal(t, 3, len(writtenSampleRecords))
require.EqualValues(t, writtenSampleRecords[0], sampleRecord{
Key: []string{"field11", "field12", "field13"},
Value: sampleValues{"field11", "field12", "field13"},
Key: "key1",
Value: "value1",
})
require.EqualValues(t, writtenSampleRecords[1], sampleRecord{
Key: []string{"field21", "field22", "field23"},
Value: sampleValues{"field21", "field22", "field23"},
Key: "key2",
Value: "value2",
})
require.EqualValues(t, writtenSampleRecords[2], sampleRecord{
Key: []string{"field31", "field32", "field33"},
Value: sampleValues{"field31", "field32", "field33"},
Key: "key3",
Value: "value3",
})
}

0 comments on commit 65d8176

Please sign in to comment.