Skip to content

Commit

Permalink
more tweaks to imports, add doc comments for all new import operation…
Browse files Browse the repository at this point in the history
…s and types
  • Loading branch information
austin-denoble committed Oct 15, 2024
1 parent 28356bf commit 8c0d9fc
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 57 deletions.
188 changes: 185 additions & 3 deletions pinecone/index_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,10 +1003,60 @@ func (idx *IndexConnection) DescribeIndexStatsFiltered(ctx context.Context, meta
}, nil
}

// StartImportResponse holds the response parameters for the StartImport method.
//
// Fields:
// - Id: The ID of the import process that was started.
type StartImportResponse struct {
Id string `json:"id,omitempty"`
}

// StartImport imports data from a storage provider into an index. The uri parameter must start with the
// schema of a supported storage provider. For buckets that are not publicly readable, you will also need to
// separately configure a storage integration and pass the integration id.
//
// Returns a pointer to a StartImportResponse object with the import ID or an error if the request fails.
//
// Parameters:
// - ctx: A context.Context object controls the request's lifetime,
// allowing for the request to be canceled or to timeout according to the context's deadline.
// - uri: The URI of the data to import. The URI must start with the scheme of a supported storage provider.
// - integrationId: If your bucket requires authentication to access, you need to pass the id of your storage integration using this property.
// Pass nil if not required.
// - errorMode: If set to "continue", the import operation will continue even if some records fail to import.
// Pass "abort" to stop the import operation if any records fail. Will default to "continue" if nil is passed.
//
// Example:
//
// ctx := context.Background()
//
// clientParams := pinecone.NewClientParams{
// ApiKey: "YOUR_API_KEY",
// SourceTag: "your_source_identifier", // optional
// }
//
// pc, err := pinecone.NewClient(clientParams)
// if err != nil {
// log.Fatalf("Failed to create Client: %v", err)
// }
//
// idx, err := pc.DescribeIndex(ctx, "your-index-name")
// if err != nil {
// log.Fatalf("Failed to describe index \"%s\". Error:%s", idx.Name, err)
// }
//
// idxConnection, err := pc.Index(pinecone.NewIndexConnParams{Host: idx.Host})
// if err != nil {
// log.Fatalf("Failed to create IndexConnection for Host: %v. Error: %v", idx.Host, err)
// }
//
// uri := "s3://your-bucket/your-file.csv"
// errorMode := "abort"
// importRes, err := idxConnection.StartImport(ctx, uri, nil, &errorMode)
// if err != nil {
// log.Fatalf("Failed to start import: %v", err)
// }
// fmt.Printf("import starteed with ID: %s", importRes.Id)
func (idx *IndexConnection) StartImport(ctx context.Context, uri string, integrationId *string, errorMode *ImportErrorMode) (*StartImportResponse, error) {
if uri == "" {
return nil, fmt.Errorf("must specify a uri to start an import")
Expand Down Expand Up @@ -1042,6 +1092,44 @@ func (idx *IndexConnection) StartImport(ctx context.Context, uri string, integra
return decodeStartImportResponse(res.Body)
}

// DescribeImport retrieves information about a specific import operation.
//
// Returns an Import object representing the current state of the import or an error if the request fails.
//
// Parameters:
// - ctx: A context.Context object controls the request's lifetime,
// allowing for the request to be canceled or to timeout according to the context's deadline.
// - id: The id of the import operation. This is returned when you call StartImport, or can be retrieved
// through the ListImports method.
//
// Example:
//
// ctx := context.Background()
//
// clientParams := pinecone.NewClientParams{
// ApiKey: "YOUR_API_KEY",
// SourceTag: "your_source_identifier", // optional
// }
//
// pc, err := pinecone.NewClient(clientParams)
// if err != nil {
// log.Fatalf("Failed to create Client: %v", err)
// }
//
// idx, err := pc.DescribeIndex(ctx, "your-index-name")
// if err != nil {
// log.Fatalf("Failed to describe index \"%s\". Error:%s", idx.Name, err)
// }
//
// idxConnection, err := pc.Index(pinecone.NewIndexConnParams{Host: idx.Host})
// if err != nil {
// log.Fatalf("Failed to create IndexConnection for Host: %v. Error: %v", idx.Host, err)
// }
// importDesc, err := idxConnection.DescribeImport(ctx, "your-import-id")
// if err != nil {
// log.Fatalf("Failed to describe import: %s - %v", "your-import-id", err)
// }
// fmt.Printf("Import ID: %s, Status: %s", importDesc.Id, importDesc.Status)
func (idx *IndexConnection) DescribeImport(ctx context.Context, id string) (*Import, error) {
res, err := (*idx.restClient).DescribeBulkImport(idx.akCtx(ctx), id)
if err != nil {
Expand All @@ -1056,20 +1144,77 @@ func (idx *IndexConnection) DescribeImport(ctx context.Context, id string) (*Imp
return toImport(importModel), nil
}

// ListImportsRequest holds the parameters for the ListImports method.
//
// Fields:
// - Limit: The maximum number of imports to return.
// - PaginationToken: The token to retrieve the next page of imports, if available.
type ListImportsRequest struct {
Limit *int32
PaginationToken *string
}

// ListImportsResponse holds the result of listing bulk imports.
//
// Fields:
// - Imports: The list of Import objects returned.
// - NextPaginationToken: The token for paginating through results, if more imports are available.
type ListImportsResponse struct {
Imports []*Import `json:"imports,omitempty"`
NextPaginationToken *string `json:"next_pagination_token,omitempty"`
}

func (idx *IndexConnection) ListImports(ctx context.Context, req *ListImportsRequest) (*ListImportsResponse, error) {
// ListImports returns information about import operations. It returns operations in a
// paginated form, with a pagination token to fetch the next page of results.
//
// Returns a pointer to a ListImportsResponse object or an error if the request fails.
//
// Parameters:
// - ctx: A context.Context object controls the request's lifetime,
// allowing for the request to be canceled or to timeout according to the context's deadline.
// - req: A ListImportsRequest object containing pagination and filter options.
//
// Example:
//
// ctx := context.Background()
//
// clientParams := NewClientParams{
// ApiKey: "YOUR_API_KEY",
// SourceTag: "your_source_identifier", // optional
// }
//
// pc, err := NewClient(clientParams)
// if err != nil {
// log.Fatalf("Failed to create Client: %v", err)
// }
//
// idx, err := pc.DescribeIndex(ctx, "your-index-name")
// if err != nil {
// log.Fatalf("Failed to describe index \"%s\". Error:%s", idx.Name, err)
// }
//
// idxConnection, err := pc.Index(NewIndexConnParams{Host: idx.Host})
// if err != nil {
// log.Fatalf("Failed to create IndexConnection for Host: %v. Error: %v", idx.Host, err)
// }
//
// limit := int32(10)
// firstImportPage, err := idxConnection.ListImports(ctx, &limit, nil)
// if err != nil {
// log.Fatalf("Failed to list imports: %v", err)
// }
// fmt.Printf("First page of imports: %+v", firstImportPage.Imports)
//
// paginationToken := firstImportPage.NextPaginationToken
// nextImportPage, err := idxConnection.ListImports(ctx, &limit, paginationToken)
// if err != nil {
// log.Fatalf("Failed to list imports: %v", err)
// }
// fmt.Printf("Second page of imports: %+v", nextImportPage.Imports)
func (idx *IndexConnection) ListImports(ctx context.Context, limit *int32, paginationToken *string) (*ListImportsResponse, error) {
params := db_data_rest.ListBulkImportsParams{
Limit: req.Limit,
PaginationToken: req.PaginationToken,
Limit: limit,
PaginationToken: paginationToken,
}

res, err := (*idx.restClient).ListBulkImports(idx.akCtx(ctx), &params)
Expand All @@ -1085,6 +1230,43 @@ func (idx *IndexConnection) ListImports(ctx context.Context, req *ListImportsReq
return listImportsResponse, nil
}

// CancelImport cancels an import operation by id.
//
// Returns an error if the request fails.
//
// Parameters:
// - ctx: A context.Context object controls the request's lifetime,
// allowing for the request to be canceled or to timeout according to the context's deadline.
// - id: The id of the import operation to cancel.
//
// Example:
//
// ctx := context.Background()
//
// clientParams := NewClientParams{
// ApiKey: "YOUR_API_KEY",
// SourceTag: "your_source_identifier", // optional
// }
//
// pc, err := NewClient(clientParams)
// if err != nil {
// log.Fatalf("Failed to create Client: %v", err)
// }
//
// idx, err := pc.DescribeIndex(ctx, "your-index-name")
// if err != nil {
// log.Fatalf("Failed to describe index \"%s\". Error:%s", idx.Name, err)
// }
//
// idxConnection, err := pc.Index(NewIndexConnParams{Host: idx.Host})
// if err != nil {
// log.Fatalf("Failed to create IndexConnection for Host: %v. Error: %v", idx.Host, err)
// }
//
// err = idxConnection.CancelImport(ctx, "your-import-id")
// if err != nil {
// log.Fatalf("Failed to cancel import: %s", "your-import-id")
// }
func (idx *IndexConnection) CancelImport(ctx context.Context, id string) error {
res, err := (*idx.restClient).CancelBulkImport(idx.akCtx(ctx), id)
if err != nil {
Expand Down
Loading

0 comments on commit 8c0d9fc

Please sign in to comment.