From 8c0d9fcc07251c493daa1955444f8d75c32fb6ee Mon Sep 17 00:00:00 2001 From: Austin DeNoble Date: Tue, 15 Oct 2024 01:16:45 -0400 Subject: [PATCH] more tweaks to imports, add doc comments for all new import operations and types --- pinecone/index_connection.go | 188 +++++++++++++++++++++++++++++- pinecone/index_connection_test.go | 75 ++++++------ pinecone/models.go | 47 +++++--- 3 files changed, 253 insertions(+), 57 deletions(-) diff --git a/pinecone/index_connection.go b/pinecone/index_connection.go index ab53232..77657de 100644 --- a/pinecone/index_connection.go +++ b/pinecone/index_connection.go @@ -1003,10 +1003,60 @@ func (idx *IndexConnection) DescribeIndexStatsFiltered(ctx context.Context, meta }, nil } +// StartImportResponse holds the response parameters for the StartImport method. +// +// Fields: +// - Id: The ID of the import process that was started. type StartImportResponse struct { Id string `json:"id,omitempty"` } +// StartImport imports data from a storage provider into an index. The uri parameter must start with the +// schema of a supported storage provider. For buckets that are not publicly readable, you will also need to +// separately configure a storage integration and pass the integration id. +// +// Returns a pointer to a StartImportResponse object with the import ID or an error if the request fails. +// +// Parameters: +// - ctx: A context.Context object controls the request's lifetime, +// allowing for the request to be canceled or to timeout according to the context's deadline. +// - uri: The URI of the data to import. The URI must start with the scheme of a supported storage provider. +// - integrationId: If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. +// Pass nil if not required. +// - errorMode: If set to "continue", the import operation will continue even if some records fail to import. +// Pass "abort" to stop the import operation if any records fail. Will default to "continue" if nil is passed. +// +// Example: +// +// ctx := context.Background() +// +// clientParams := pinecone.NewClientParams{ +// ApiKey: "YOUR_API_KEY", +// SourceTag: "your_source_identifier", // optional +// } +// +// pc, err := pinecone.NewClient(clientParams) +// if err != nil { +// log.Fatalf("Failed to create Client: %v", err) +// } +// +// idx, err := pc.DescribeIndex(ctx, "your-index-name") +// if err != nil { +// log.Fatalf("Failed to describe index \"%s\". Error:%s", idx.Name, err) +// } +// +// idxConnection, err := pc.Index(pinecone.NewIndexConnParams{Host: idx.Host}) +// if err != nil { +// log.Fatalf("Failed to create IndexConnection for Host: %v. Error: %v", idx.Host, err) +// } +// +// uri := "s3://your-bucket/your-file.csv" +// errorMode := "abort" +// importRes, err := idxConnection.StartImport(ctx, uri, nil, &errorMode) +// if err != nil { +// log.Fatalf("Failed to start import: %v", err) +// } +// fmt.Printf("import starteed with ID: %s", importRes.Id) func (idx *IndexConnection) StartImport(ctx context.Context, uri string, integrationId *string, errorMode *ImportErrorMode) (*StartImportResponse, error) { if uri == "" { return nil, fmt.Errorf("must specify a uri to start an import") @@ -1042,6 +1092,44 @@ func (idx *IndexConnection) StartImport(ctx context.Context, uri string, integra return decodeStartImportResponse(res.Body) } +// DescribeImport retrieves information about a specific import operation. +// +// Returns an Import object representing the current state of the import or an error if the request fails. +// +// Parameters: +// - ctx: A context.Context object controls the request's lifetime, +// allowing for the request to be canceled or to timeout according to the context's deadline. +// - id: The id of the import operation. This is returned when you call StartImport, or can be retrieved +// through the ListImports method. +// +// Example: +// +// ctx := context.Background() +// +// clientParams := pinecone.NewClientParams{ +// ApiKey: "YOUR_API_KEY", +// SourceTag: "your_source_identifier", // optional +// } +// +// pc, err := pinecone.NewClient(clientParams) +// if err != nil { +// log.Fatalf("Failed to create Client: %v", err) +// } +// +// idx, err := pc.DescribeIndex(ctx, "your-index-name") +// if err != nil { +// log.Fatalf("Failed to describe index \"%s\". Error:%s", idx.Name, err) +// } +// +// idxConnection, err := pc.Index(pinecone.NewIndexConnParams{Host: idx.Host}) +// if err != nil { +// log.Fatalf("Failed to create IndexConnection for Host: %v. Error: %v", idx.Host, err) +// } +// importDesc, err := idxConnection.DescribeImport(ctx, "your-import-id") +// if err != nil { +// log.Fatalf("Failed to describe import: %s - %v", "your-import-id", err) +// } +// fmt.Printf("Import ID: %s, Status: %s", importDesc.Id, importDesc.Status) func (idx *IndexConnection) DescribeImport(ctx context.Context, id string) (*Import, error) { res, err := (*idx.restClient).DescribeBulkImport(idx.akCtx(ctx), id) if err != nil { @@ -1056,20 +1144,77 @@ func (idx *IndexConnection) DescribeImport(ctx context.Context, id string) (*Imp return toImport(importModel), nil } +// ListImportsRequest holds the parameters for the ListImports method. +// +// Fields: +// - Limit: The maximum number of imports to return. +// - PaginationToken: The token to retrieve the next page of imports, if available. type ListImportsRequest struct { Limit *int32 PaginationToken *string } +// ListImportsResponse holds the result of listing bulk imports. +// +// Fields: +// - Imports: The list of Import objects returned. +// - NextPaginationToken: The token for paginating through results, if more imports are available. type ListImportsResponse struct { Imports []*Import `json:"imports,omitempty"` NextPaginationToken *string `json:"next_pagination_token,omitempty"` } -func (idx *IndexConnection) ListImports(ctx context.Context, req *ListImportsRequest) (*ListImportsResponse, error) { +// ListImports returns information about import operations. It returns operations in a +// paginated form, with a pagination token to fetch the next page of results. +// +// Returns a pointer to a ListImportsResponse object or an error if the request fails. +// +// Parameters: +// - ctx: A context.Context object controls the request's lifetime, +// allowing for the request to be canceled or to timeout according to the context's deadline. +// - req: A ListImportsRequest object containing pagination and filter options. +// +// Example: +// +// ctx := context.Background() +// +// clientParams := NewClientParams{ +// ApiKey: "YOUR_API_KEY", +// SourceTag: "your_source_identifier", // optional +// } +// +// pc, err := NewClient(clientParams) +// if err != nil { +// log.Fatalf("Failed to create Client: %v", err) +// } +// +// idx, err := pc.DescribeIndex(ctx, "your-index-name") +// if err != nil { +// log.Fatalf("Failed to describe index \"%s\". Error:%s", idx.Name, err) +// } +// +// idxConnection, err := pc.Index(NewIndexConnParams{Host: idx.Host}) +// if err != nil { +// log.Fatalf("Failed to create IndexConnection for Host: %v. Error: %v", idx.Host, err) +// } +// +// limit := int32(10) +// firstImportPage, err := idxConnection.ListImports(ctx, &limit, nil) +// if err != nil { +// log.Fatalf("Failed to list imports: %v", err) +// } +// fmt.Printf("First page of imports: %+v", firstImportPage.Imports) +// +// paginationToken := firstImportPage.NextPaginationToken +// nextImportPage, err := idxConnection.ListImports(ctx, &limit, paginationToken) +// if err != nil { +// log.Fatalf("Failed to list imports: %v", err) +// } +// fmt.Printf("Second page of imports: %+v", nextImportPage.Imports) +func (idx *IndexConnection) ListImports(ctx context.Context, limit *int32, paginationToken *string) (*ListImportsResponse, error) { params := db_data_rest.ListBulkImportsParams{ - Limit: req.Limit, - PaginationToken: req.PaginationToken, + Limit: limit, + PaginationToken: paginationToken, } res, err := (*idx.restClient).ListBulkImports(idx.akCtx(ctx), ¶ms) @@ -1085,6 +1230,43 @@ func (idx *IndexConnection) ListImports(ctx context.Context, req *ListImportsReq return listImportsResponse, nil } +// CancelImport cancels an import operation by id. +// +// Returns an error if the request fails. +// +// Parameters: +// - ctx: A context.Context object controls the request's lifetime, +// allowing for the request to be canceled or to timeout according to the context's deadline. +// - id: The id of the import operation to cancel. +// +// Example: +// +// ctx := context.Background() +// +// clientParams := NewClientParams{ +// ApiKey: "YOUR_API_KEY", +// SourceTag: "your_source_identifier", // optional +// } +// +// pc, err := NewClient(clientParams) +// if err != nil { +// log.Fatalf("Failed to create Client: %v", err) +// } +// +// idx, err := pc.DescribeIndex(ctx, "your-index-name") +// if err != nil { +// log.Fatalf("Failed to describe index \"%s\". Error:%s", idx.Name, err) +// } +// +// idxConnection, err := pc.Index(NewIndexConnParams{Host: idx.Host}) +// if err != nil { +// log.Fatalf("Failed to create IndexConnection for Host: %v. Error: %v", idx.Host, err) +// } +// +// err = idxConnection.CancelImport(ctx, "your-import-id") +// if err != nil { +// log.Fatalf("Failed to cancel import: %s", "your-import-id") +// } func (idx *IndexConnection) CancelImport(ctx context.Context, id string) error { res, err := (*idx.restClient).CancelBulkImport(idx.akCtx(ctx), id) if err != nil { diff --git a/pinecone/index_connection_test.go b/pinecone/index_connection_test.go index 9aa8266..847b783 100644 --- a/pinecone/index_connection_test.go +++ b/pinecone/index_connection_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - dbDataGrpc "github.com/pinecone-io/go-pinecone/internal/gen/db_data/grpc" + db_data_grpc "github.com/pinecone-io/go-pinecone/internal/gen/db_data/grpc" "github.com/pinecone-io/go-pinecone/internal/utils" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -291,10 +291,7 @@ func (ts *IntegrationTests) TestImportFlow() { assert.Equal(ts.T(), startRes.Id, describeRes.Id) limit := int32(10) - listRes, err := ts.idxConn.ListImports(ctx, &ListImportsRequest{ - PaginationToken: nil, - Limit: &limit, - }) + listRes, err := ts.idxConn.ListImports(ctx, &limit, nil) assert.NoError(ts.T(), err) assert.NotNil(ts.T(), listRes) @@ -547,7 +544,7 @@ func TestMarshalDescribeIndexStatsResponseUnit(t *testing.T) { func TestToVectorUnit(t *testing.T) { tests := []struct { name string - vector *dbDataGrpc.Vector + vector *db_data_grpc.Vector expected *Vector }{ { @@ -557,7 +554,7 @@ func TestToVectorUnit(t *testing.T) { }, { name: "Pass dense vector", - vector: &dbDataGrpc.Vector{ + vector: &db_data_grpc.Vector{ Id: "dense-1", Values: []float32{0.01, 0.02, 0.03}, }, @@ -568,10 +565,10 @@ func TestToVectorUnit(t *testing.T) { }, { name: "Pass sparse vector", - vector: &dbDataGrpc.Vector{ + vector: &db_data_grpc.Vector{ Id: "sparse-1", Values: nil, - SparseValues: &dbDataGrpc.SparseValues{ + SparseValues: &db_data_grpc.SparseValues{ Indices: []uint32{0, 2}, Values: []float32{0.01, 0.03}, }, @@ -587,10 +584,10 @@ func TestToVectorUnit(t *testing.T) { }, { name: "Pass hybrid vector", - vector: &dbDataGrpc.Vector{ + vector: &db_data_grpc.Vector{ Id: "hybrid-1", Values: []float32{0.01, 0.02, 0.03}, - SparseValues: &dbDataGrpc.SparseValues{ + SparseValues: &db_data_grpc.SparseValues{ Indices: []uint32{0, 2}, Values: []float32{0.01, 0.03}, }, @@ -607,10 +604,10 @@ func TestToVectorUnit(t *testing.T) { }, { name: "Pass hybrid vector with metadata", - vector: &dbDataGrpc.Vector{ + vector: &db_data_grpc.Vector{ Id: "hybrid-metadata-1", Values: []float32{0.01, 0.02, 0.03}, - SparseValues: &dbDataGrpc.SparseValues{ + SparseValues: &db_data_grpc.SparseValues{ Indices: []uint32{0, 2}, Values: []float32{0.01, 0.03}, }, @@ -645,7 +642,7 @@ func TestToVectorUnit(t *testing.T) { func TestToSparseValuesUnit(t *testing.T) { tests := []struct { name string - sparseValues *dbDataGrpc.SparseValues + sparseValues *db_data_grpc.SparseValues expected *SparseValues }{ { @@ -655,7 +652,7 @@ func TestToSparseValuesUnit(t *testing.T) { }, { name: "Pass sparse values", - sparseValues: &dbDataGrpc.SparseValues{ + sparseValues: &db_data_grpc.SparseValues{ Indices: []uint32{0, 2}, Values: []float32{0.01, 0.03}, }, @@ -676,7 +673,7 @@ func TestToSparseValuesUnit(t *testing.T) { func TestToScoredVectorUnit(t *testing.T) { tests := []struct { name string - scoredVector *dbDataGrpc.ScoredVector + scoredVector *db_data_grpc.ScoredVector expected *ScoredVector }{ { @@ -686,7 +683,7 @@ func TestToScoredVectorUnit(t *testing.T) { }, { name: "Pass scored dense vector", - scoredVector: &dbDataGrpc.ScoredVector{ + scoredVector: &db_data_grpc.ScoredVector{ Id: "dense-1", Values: []float32{0.01, 0.01, 0.01}, Score: 0.1, @@ -701,9 +698,9 @@ func TestToScoredVectorUnit(t *testing.T) { }, { name: "Pass scored sparse vector", - scoredVector: &dbDataGrpc.ScoredVector{ + scoredVector: &db_data_grpc.ScoredVector{ Id: "sparse-1", - SparseValues: &dbDataGrpc.SparseValues{ + SparseValues: &db_data_grpc.SparseValues{ Indices: []uint32{0, 2}, Values: []float32{0.01, 0.03}, }, @@ -722,10 +719,10 @@ func TestToScoredVectorUnit(t *testing.T) { }, { name: "Pass scored hybrid vector", - scoredVector: &dbDataGrpc.ScoredVector{ + scoredVector: &db_data_grpc.ScoredVector{ Id: "hybrid-1", Values: []float32{0.01, 0.02, 0.03}, - SparseValues: &dbDataGrpc.SparseValues{ + SparseValues: &db_data_grpc.SparseValues{ Indices: []uint32{0, 2}, Values: []float32{0.01, 0.03}, }, @@ -745,10 +742,10 @@ func TestToScoredVectorUnit(t *testing.T) { }, { name: "Pass scored hybrid vector with metadata", - scoredVector: &dbDataGrpc.ScoredVector{ + scoredVector: &db_data_grpc.ScoredVector{ Id: "hybrid-metadata-1", Values: []float32{0.01, 0.02, 0.03}, - SparseValues: &dbDataGrpc.SparseValues{ + SparseValues: &db_data_grpc.SparseValues{ Indices: []uint32{0, 2}, Values: []float32{0.01, 0.03}, }, @@ -789,7 +786,7 @@ func TestVecToGrpcUnit(t *testing.T) { tests := []struct { name string vector *Vector - expected *dbDataGrpc.Vector + expected *db_data_grpc.Vector }{ { name: "Pass nil vector, expect nil to be returned", @@ -802,7 +799,7 @@ func TestVecToGrpcUnit(t *testing.T) { Id: "dense-1", Values: []float32{0.01, 0.02, 0.03}, }, - expected: &dbDataGrpc.Vector{ + expected: &db_data_grpc.Vector{ Id: "dense-1", Values: []float32{0.01, 0.02, 0.03}, }, @@ -817,9 +814,9 @@ func TestVecToGrpcUnit(t *testing.T) { Values: []float32{0.01, 0.03}, }, }, - expected: &dbDataGrpc.Vector{ + expected: &db_data_grpc.Vector{ Id: "sparse-1", - SparseValues: &dbDataGrpc.SparseValues{ + SparseValues: &db_data_grpc.SparseValues{ Indices: []uint32{0, 2}, Values: []float32{0.01, 0.03}, }, @@ -835,10 +832,10 @@ func TestVecToGrpcUnit(t *testing.T) { Values: []float32{0.01, 0.03}, }, }, - expected: &dbDataGrpc.Vector{ + expected: &db_data_grpc.Vector{ Id: "hybrid-1", Values: []float32{0.01, 0.02, 0.03}, - SparseValues: &dbDataGrpc.SparseValues{ + SparseValues: &db_data_grpc.SparseValues{ Indices: []uint32{0, 2}, Values: []float32{0.01, 0.03}, }, @@ -859,10 +856,10 @@ func TestVecToGrpcUnit(t *testing.T) { }, }, }, - expected: &dbDataGrpc.Vector{ + expected: &db_data_grpc.Vector{ Id: "hybrid-metadata-1", Values: []float32{0.01, 0.02, 0.03}, - SparseValues: &dbDataGrpc.SparseValues{ + SparseValues: &db_data_grpc.SparseValues{ Indices: []uint32{0, 2}, Values: []float32{0.01, 0.03}, }, @@ -888,7 +885,7 @@ func TestSparseValToGrpcUnit(t *testing.T) { name string sparseValues *SparseValues metadata *structpb.Struct - expected *dbDataGrpc.SparseValues + expected *db_data_grpc.SparseValues }{ { name: "Pass nil sparse values, expect nil to be returned", @@ -901,7 +898,7 @@ func TestSparseValToGrpcUnit(t *testing.T) { Indices: []uint32{0, 2}, Values: []float32{0.01, 0.03}, }, - expected: &dbDataGrpc.SparseValues{ + expected: &db_data_grpc.SparseValues{ Indices: []uint32{0, 2}, Values: []float32{0.01, 0.03}, }, @@ -917,7 +914,7 @@ func TestSparseValToGrpcUnit(t *testing.T) { "genre": {Kind: &structpb.Value_StringValue{StringValue: "classical"}}, }, }, - expected: &dbDataGrpc.SparseValues{ + expected: &db_data_grpc.SparseValues{ Indices: []uint32{0, 2}, Values: []float32{0.01, 0.03}, }, @@ -1000,7 +997,7 @@ func TestToUsageUnit(t *testing.T) { tests := []struct { name string - usage *dbDataGrpc.Usage + usage *db_data_grpc.Usage expected *Usage }{ { @@ -1010,7 +1007,7 @@ func TestToUsageUnit(t *testing.T) { }, { name: "Pass usage", - usage: &dbDataGrpc.Usage{ + usage: &db_data_grpc.Usage{ ReadUnits: &u5, }, expected: &Usage{ @@ -1062,17 +1059,17 @@ func TestToPaginationTokenGrpc(t *testing.T) { tests := []struct { name string - token *dbDataGrpc.Pagination + token *db_data_grpc.Pagination expected *string }{ { name: "Pass empty token, expect empty string to be returned", - token: &dbDataGrpc.Pagination{}, + token: &db_data_grpc.Pagination{}, expected: &tokenForNilCase, }, { name: "Pass token", - token: &dbDataGrpc.Pagination{ + token: &db_data_grpc.Pagination{ Next: "next-token", }, expected: &tokenForPositiveCase, diff --git a/pinecone/models.go b/pinecone/models.go index d0a8448..a29cf9d 100644 --- a/pinecone/models.go +++ b/pinecone/models.go @@ -170,6 +170,14 @@ type MetadataFilter = structpb.Struct // [attached to, or updated for, a vector]: https://docs.pinecone.io/guides/data/filter-with-metadata#inserting-metadata-into-an-index type Metadata = structpb.Struct +// ImportStatus represents the status of an import operation. +// +// Values: +// - Cancelled: The import was canceled. +// - Completed: The import completed successfully. +// - Failed: The import encountered an error and did not complete successfully. +// - InProgress: The import is currently in progress. +// - Pending: The import is pending and has not yet started. type ImportStatus string const ( @@ -180,6 +188,11 @@ const ( Pending ImportStatus = "Pending" ) +// ImportErrorMode specifies how errors are handled during an import. +// +// Values: +// - Abort: The import process will abort upon encountering an error. +// - Continue: The import process will continue, skipping over records that produce errors. type ImportErrorMode string const ( @@ -187,20 +200,24 @@ const ( Continue ImportErrorMode = "continue" ) +// Import represents the details and status of a bulk import process. +// +// Fields: +// - Id: The unique identifier of the import process. +// - PercentComplete: The percentage of the import process that has been completed. +// - RecordsImported: The total number of records successfully imported. +// - Status: The current status of the import (e.g., "InProgress", "Completed", "Failed"). +// - Uri: The URI of the source data for the import. +// - CreatedAt: The time at which the import process was initiated. +// - FinishedAt: The time at which the import process finished (either successfully or with an error). +// - Error: If the import failed, contains the error message associated with the failure. type Import struct { - Id string `json:"id,omitempty"` - - PercentComplete float32 `json:"percent_complete,omitempty"` - - RecordsImported int64 `json:"records_imported,omitempty"` - - Status ImportStatus `json:"status,omitempty"` - - Uri string `json:"uri,omitempty"` - - CreatedAt *time.Time `json:"created_at,omitempty"` - - Error *string `json:"error,omitempty"` - - FinishedAt *time.Time `json:"finished_at,omitempty"` + Id string `json:"id,omitempty"` + PercentComplete float32 `json:"percent_complete,omitempty"` + RecordsImported int64 `json:"records_imported,omitempty"` + Status ImportStatus `json:"status,omitempty"` + Uri string `json:"uri,omitempty"` + CreatedAt *time.Time `json:"created_at,omitempty"` + FinishedAt *time.Time `json:"finished_at,omitempty"` + Error *string `json:"error,omitempty"` }