diff --git a/README.md b/README.md index 90eff92..c72ff4f 100644 --- a/README.md +++ b/README.md @@ -43,25 +43,20 @@ import ( "github.com/Surfline/badgerutils" ) -type sampleValues struct { - Field1 string - Field2 string -} - type sampleRecord struct { - Key []string - Value sampleValues + Key string + Value string } func csvToKeyValue(line string) (*badgerutils.KeyValue, error) { - values := strings.Split(line, ",") - if len(values) < 4 { - return nil, fmt.Errorf("%v has less than 4 values", line) + values := strings.Split(line, ":") + if len(values) < 2 { + return nil, fmt.Errorf("%v has less than 2 values", line) } return &badgerutils.KeyValue{ - Key: []interface{}{values[0], values[1]}, - Value: sampleValues{values[2], values[3]}, + Key: []byte(kv[0]), + Value: []byte(kv[1]), }, nil } diff --git a/examples/writer_cli.go b/examples/writer_cli.go index e05d346..3f12c5b 100644 --- a/examples/writer_cli.go +++ b/examples/writer_cli.go @@ -11,25 +11,20 @@ import ( "github.com/Surfline/badgerutils" ) -type sampleValues struct { - Field1 string - Field2 string -} - type sampleRecord struct { - Key []string - Value sampleValues + Key string + Value string } func csvToKeyValue(line string) (*badgerutils.KeyValue, error) { - values := strings.Split(line, ",") - if len(values) < 4 { - return nil, fmt.Errorf("%v has less than 4 values", line) + kv := strings.Split(line, ":") + if len(kv) < 2 { + return nil, fmt.Errorf("%v has less than 2 kv", line) } return &badgerutils.KeyValue{ - Key: []interface{}{values[0], values[1]}, - Value: sampleValues{values[2], values[3]}, + Key: []byte(kv[0]), + Value: []byte(kv[1]), }, nil } diff --git a/test_helpers.go b/test_helpers.go index 22530cd..06446eb 100644 --- a/test_helpers.go +++ b/test_helpers.go @@ -1,34 +1,26 @@ package badgerutils import ( - "bytes" - "encoding/gob" "fmt" "strings" "github.com/dgraph-io/badger" ) -type sampleValues struct { - Field1 string - Field2 string - Field3 string -} - type sampleRecord struct { - Key []string - Value sampleValues + Key string + Value string } func csvToKeyValue(line string) (*KeyValue, error) { - values := strings.Split(line, ",") - if len(values) < 3 { - return nil, fmt.Errorf("%v has less than 3 values", line) + kv := strings.Split(line, ":") + if len(kv) < 2 { + return nil, fmt.Errorf("%v has less than 2 kv", line) } return &KeyValue{ - Key: values, - Value: sampleValues{values[0], values[1], values[2]}, + Key: []byte(kv[0]), + Value: []byte(kv[1]), }, nil } @@ -39,8 +31,8 @@ func readDB(dir string) ([]sampleRecord, error) { } defer db.Close() - chkv, cherr := make(chan kvBytes), make(chan error) - go func(chan kvBytes, chan error) { + chkv, cherr := make(chan KeyValue), make(chan error) + go func(chan KeyValue, chan error) { err := db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions it := txn.NewIterator(opts) @@ -52,7 +44,7 @@ func readDB(dir string) ([]sampleRecord, error) { if err != nil { return err } - kv := kvBytes{key, value} + kv := KeyValue{Key: key, Value: value} chkv <- kv } close(chkv) @@ -63,19 +55,10 @@ func readDB(dir string) ([]sampleRecord, error) { sampleRecords := make([]sampleRecord, 0) for kv := range chkv { - key := make([]string, 3) - keyBuf := bytes.NewReader(kv.Key) - if keyErr := gob.NewDecoder(keyBuf).Decode(&key); keyErr != nil { - return nil, keyErr - } - - val := new(sampleValues) - valBuf := bytes.NewReader(kv.Value) - if valErr := gob.NewDecoder(valBuf).Decode(&val); valErr != nil { - return nil, valErr - } - - sampleRecords = append(sampleRecords, sampleRecord{Key: key, Value: *val}) + sampleRecords = append(sampleRecords, sampleRecord{ + Key: string(kv.Key), + Value: string(kv.Value), + }) } if err := <-cherr; err != nil { diff --git a/writer.go b/writer.go index 1d7c243..a798c9f 100644 --- a/writer.go +++ b/writer.go @@ -2,8 +2,6 @@ package badgerutils import ( "bufio" - "bytes" - "encoding/gob" "fmt" "io" "log" @@ -18,11 +16,6 @@ import ( // KeyValue struct defines a Key and a Value empty interface to be translated into a record. type KeyValue struct { - Key interface{} - Value interface{} -} - -type kvBytes struct { Key []byte Value []byte } @@ -37,29 +30,7 @@ func (c *count32) get() int32 { return atomic.LoadInt32((*int32)(c)) } -func stringToKVBytes(str string, lineToKeyValue func(string) (*KeyValue, error)) (*kvBytes, error) { - record, parseErr := lineToKeyValue(str) - if parseErr != nil { - return nil, parseErr - } - - keyBuf := &bytes.Buffer{} - if keyErr := gob.NewEncoder(keyBuf).Encode(record.Key); keyErr != nil { - return nil, keyErr - } - - valBuf := &bytes.Buffer{} - if valErr := gob.NewEncoder(valBuf).Encode(record.Value); valErr != nil { - return nil, valErr - } - - return &kvBytes{ - Key: keyBuf.Bytes(), - Value: valBuf.Bytes(), - }, nil -} - -func writeBatch(kvs []kvBytes, db *badger.DB, cherr chan error, done func(int32)) { +func writeBatch(kvs []KeyValue, db *badger.DB, cherr chan error, done func(int32)) { txn := db.NewTransaction(true) defer txn.Discard() @@ -102,13 +73,13 @@ func WriteStream(reader io.Reader, dir string, batchSize int, lineToKeyValue fun wg.Done() } - kvBatch := make([]kvBytes, 0) + kvBatch := make([]KeyValue, 0) cherr := make(chan error) // Read from stream and write key/values in batches scanner := bufio.NewScanner(reader) for scanner.Scan() { - kv, err := stringToKVBytes(scanner.Text(), lineToKeyValue) + kv, err := lineToKeyValue(scanner.Text()) if err != nil { return err } @@ -116,7 +87,7 @@ func WriteStream(reader io.Reader, dir string, batchSize int, lineToKeyValue fun if len(kvBatch) == batchSize { wg.Add(1) go writeBatch(kvBatch, db, cherr, done) - kvBatch = make([]kvBytes, 0) + kvBatch = make([]KeyValue, 0) } } diff --git a/writer_test.go b/writer_test.go index 67c8adc..e11787e 100644 --- a/writer_test.go +++ b/writer_test.go @@ -19,9 +19,9 @@ func TestWriteStream(t *testing.T) { dbPath := path.Join(tmpDir, "path", "to", "db") - reader := strings.NewReader(`field11,field12,field13 -field21,field22,field23 -field31,field32,field33`) + reader := strings.NewReader(`key1:value1 +key2:value2 +key3:value3`) err = WriteStream(reader, dbPath, 2, csvToKeyValue) require.Nil(t, err) @@ -29,15 +29,15 @@ field31,field32,field33`) require.Nil(t, err) require.Equal(t, 3, len(writtenSampleRecords)) require.EqualValues(t, writtenSampleRecords[0], sampleRecord{ - Key: []string{"field11", "field12", "field13"}, - Value: sampleValues{"field11", "field12", "field13"}, + Key: "key1", + Value: "value1", }) require.EqualValues(t, writtenSampleRecords[1], sampleRecord{ - Key: []string{"field21", "field22", "field23"}, - Value: sampleValues{"field21", "field22", "field23"}, + Key: "key2", + Value: "value2", }) require.EqualValues(t, writtenSampleRecords[2], sampleRecord{ - Key: []string{"field31", "field32", "field33"}, - Value: sampleValues{"field31", "field32", "field33"}, + Key: "key3", + Value: "value3", }) }