Skip to content

Commit

Permalink
add bulk es
Browse files Browse the repository at this point in the history
  • Loading branch information
gr4c2-2000 committed May 28, 2024
1 parent ac7c1a8 commit b6f9a7b
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (e *ElasticSearchConnector) Create(ctx context.Context, index string, docTy
return nil
}

func (e *ElasticSearchConnector) BulkIndexDocuments(index string, docType string, documents []map[string]interface{}) error {
return e.esInterface.BulkIndexDocuments(context.Background(), index, docType, documents)
}

func (e *ElasticSearchConnector) ExecuteQuery(Index string, Type string, query map[string]interface{}) (*ResposeES, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
Expand Down
36 changes: 36 additions & 0 deletions pkg/elasticsearch/elasticsearch5.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -95,6 +97,40 @@ func (e *ElasticSearchGatway5) Create(ctx context.Context, index string, docType
return nil
}

func (e *ElasticSearchGatway5) BulkIndexDocuments(ctx context.Context, index string, docType string, documents []map[string]interface{}) error {
// Create a buffer for the bulk request body
var buf bytes.Buffer

for _, doc := range documents {
// Meta line for each bulk operation
meta := []byte(fmt.Sprintf(`{ "index" : { "_index": "%s", "_type": "%s", "_id": "%s" } }%s`, index, docType, doc["id"], "\n"))
buf.Write(meta)

// Document data
data, err := json.Marshal(doc)
if err != nil {
return err
}
data = append(data, '\n')
buf.Write(data)
}

// Perform the bulk request
req := esapi.BulkRequest{
Body: &buf,
}
res, err := req.Do(ctx, e.client)
if err != nil {
return err
}
defer res.Body.Close()

if res.IsError() {
return fmt.Errorf("bulk indexing error: %s", res.String())
}
return nil
}

func (e *ElasticSearchGatway5) Search(ctx context.Context, index string, docType string, query io.Reader) (*bytes.Buffer, error) {
res, err := e.client.Search(
e.client.Search.WithContext(ctx),
Expand Down
32 changes: 32 additions & 0 deletions pkg/elasticsearch/elasticsearch8.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -94,6 +96,36 @@ func (e *ElasticSearchGatway8) Create(ctx context.Context, index string, docType
return nil
}

func (e *ElasticSearchGatway8) BulkIndexDocuments(ctx context.Context, index string, docType string, documents []map[string]interface{}) error {
var buf bytes.Buffer
for _, doc := range documents {
meta := []byte(fmt.Sprintf(`{ "index" : { "_id" : "%s" } }%s`, doc["id"], "\n"))
data, err := json.Marshal(doc)
if err != nil {
return err
}
data = append(data, "\n"...)
buf.Write(meta)
buf.Write(data)
}

req := esapi.BulkRequest{
Index: index,
Body: &buf,
Refresh: "true",
}
res, err := req.Do(ctx, e.client)
if err != nil {
return err
}
defer res.Body.Close()

if res.IsError() {
return fmt.Errorf("error response status: %s", res.String())
}
return nil
}

func (e *ElasticSearchGatway8) Search(ctx context.Context, index string, docType string, query io.Reader) (*bytes.Buffer, error) {
res, err := e.client.Search(
e.client.Search.WithContext(ctx),
Expand Down
1 change: 1 addition & 0 deletions pkg/elasticsearch/elasticsearch_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ type ElasticSearchInterface interface {
Replace(ctx context.Context, index string, docType string, id string, document io.Reader) error
Create(ctx context.Context, index string, docType string, document io.Reader) error
getDialerContext() func(ctx context.Context, network, address string) (net.Conn, error)
BulkIndexDocuments(ctx context.Context, index string, docType string, documents []map[string]interface{}) error
setConnection()
}

0 comments on commit b6f9a7b

Please sign in to comment.