Skip to content

Commit

Permalink
Add Bulk Import (#282)
Browse files Browse the repository at this point in the history
We currently do not have Bulk Import support in the 2024-10 RC branch.

Add it in!

Notes:
- Bulk Import product team is still working on some final tweaks the API
(e.g. `integration` --> `integration_id`), so it's likely there will be
follow up PRs to this PR once those are finalized
- "" will work on decreasing latency; once that is done, we can build
more integration tests (right now, it takes upwards of 15 mins for
vectors from a bulk import operation to show up in an index, so building
integration tests in CI isn't sustainable)
- I added a `featureFlag` similar to what @jhamon did in Python

- [ ] Bug fix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [x] This change requires a documentation update
- [ ] Infrastructure change (CI configs, etc)
- [ ] Non-code change (docs, etc)
- [ ] None of the above: (explain here)

New unit tests & integration tests pass in CI.
  • Loading branch information
aulorbe committed Oct 23, 2024
1 parent 8c1a209 commit 94f1a7b
Show file tree
Hide file tree
Showing 25 changed files with 910 additions and 253 deletions.
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,57 @@ const vectors = [
await index.upsert(vectors);
```

### Bulk import vectors from object storage

You can now [import vectors en masse](https://docs.pinecone.io/guides/data/understanding-imports) from object
storage. Bulk Import is a long-running, asynchronous operation that imports large numbers of records into a Pinecone
serverless index.

In order to import vectors from object storage, they must be stored in Parquet files and adhere to the necessary
[file format](https://docs.pinecone.io/guides/data/understanding-imports#parquet-file-format). Your object storage
must also adhere to the necessary [directory structure](https://docs.pinecone.io/guides/data/understanding-imports#directory-structure).

The following example imports 10 vectors from an Amazon S3 bucket into a Pinecone serverless index:

```typescript
import { Pinecone } from '@pinecone-database/pinecone';

const pc = new Pinecone();
const indexName = 'sample-index';

await pc.createIndex({
name: indexName,
dimension: 10,
spec: {
serverless: {
cloud: 'aws',
region: 'eu-west-1',
},
},
});

const index = pc.Index(indexName);

const storageURI = 's3://my-bucket/my-directory/';

await index.startImport(storageURI, 'continue'); // "Continue" will avoid aborting the operation if errors are encountered.

// {
// "id": "import-id"
// }
```

You can [start, cancel, and check the status](https://docs.pinecone.io/guides/data/import-data) of all or one import operation(s).

**Notes:**

- Bulk Import only works with Serverless indexes
- Bulk Import is in [public preview](https://docs.pinecone.io/release-notes/feature-availability)
- The only object storage provider currently supported is [Amazon S3](https://docs.pinecone.io/guides/operations/integrations/integrate-with-amazon-s3)
- Vectors will take _at least 10 minutes_ to appear in your index upon completion of the import operation, since
this operation is optimized for very large workloads
- See [limits](https://docs.pinecone.io/guides/data/understanding-imports#limits) for further information

### Seeing index statistics

When experimenting with data operations, it's sometimes helpful to know how many records/vectors are stored in each
Expand Down
144 changes: 144 additions & 0 deletions src/data/__tests__/bulkImport/bulkImport.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import { StartImportCommand } from '../../bulkImport/startImport';
import { ListImportsCommand } from '../../bulkImport/listImports';
import { DescribeImportCommand } from '../../bulkImport/describeImport';
import { CancelImportCommand } from '../../bulkImport/cancelImport';
import { BulkOperationsProvider } from '../../bulkImport/bulkOperationsProvider';
import {
ImportErrorModeOnErrorEnum,
ListImportsRequest,
StartImportOperationRequest,
} from '../../../pinecone-generated-ts-fetch/db_data';
import { PineconeArgumentError } from '../../../errors';

describe('StartImportCommand', () => {
let apiProviderMock: jest.Mocked<BulkOperationsProvider>;
let apiMock: jest.Mocked<any>; // Mocking the API returned by `provide`
let startImportCommand: StartImportCommand;
let listImportCommand: ListImportsCommand;
let describeImportCommand: DescribeImportCommand;
let cancelImportCommand: CancelImportCommand;

beforeEach(() => {
apiMock = {
startImport: jest.fn(),
listImports: jest.fn(),
describeImport: jest.fn(),
cancelImport: jest.fn(),
};

apiProviderMock = {
provide: jest.fn().mockResolvedValue(apiMock),
} as unknown as jest.Mocked<BulkOperationsProvider>;

startImportCommand = new StartImportCommand(apiProviderMock, '');
listImportCommand = new ListImportsCommand(apiProviderMock, '');
describeImportCommand = new DescribeImportCommand(apiProviderMock, '');
cancelImportCommand = new CancelImportCommand(apiProviderMock, '');
});

test('should call startImport with correct request when errorMode is "continue"', async () => {
const uri = 's3://my-bucket/my-file.csv';
const errorMode = 'continue';

const expectedRequest: StartImportOperationRequest = {
startImportRequest: {
uri,
errorMode: { onError: ImportErrorModeOnErrorEnum.Continue },
},
};

await startImportCommand.run(uri, errorMode);

expect(apiProviderMock.provide).toHaveBeenCalled();
expect(apiMock.startImport).toHaveBeenCalledWith(expectedRequest);
});

test('should call startImport with correct request when errorMode is "abort"', async () => {
const uri = 's3://my-bucket/my-file.csv';
const errorMode = 'abort';

const expectedRequest: StartImportOperationRequest = {
startImportRequest: {
uri,
errorMode: { onError: ImportErrorModeOnErrorEnum.Abort },
},
};

await startImportCommand.run(uri, errorMode);

expect(apiProviderMock.provide).toHaveBeenCalled();
expect(apiMock.startImport).toHaveBeenCalledWith(expectedRequest);
});

test('should throw PineconeArgumentError for invalid errorMode', async () => {
const uri = 's3://my-bucket/my-file.csv';
const errorMode = 'invalid';

await expect(startImportCommand.run(uri, errorMode)).rejects.toThrow(
PineconeArgumentError
);

expect(apiMock.startImport).not.toHaveBeenCalled();
});

test('should use "continue" as default when errorMode is undefined', async () => {
const uri = 's3://my-bucket/my-file.csv';

const expectedRequest: StartImportOperationRequest = {
startImportRequest: {
uri,
errorMode: { onError: ImportErrorModeOnErrorEnum.Continue },
},
};

await startImportCommand.run(uri, undefined);

expect(apiProviderMock.provide).toHaveBeenCalled();
expect(apiMock.startImport).toHaveBeenCalledWith(expectedRequest);
});

test('should throw error when URI/1st arg is missing', async () => {
const toThrow = async () => {
// @ts-ignore
await startImportCommand.run();
};

await expect(toThrow).rejects.toThrowError(PineconeArgumentError);
await expect(toThrow).rejects.toThrowError(
'`uri` field is required and must start with the scheme of a supported storage provider.'
);
});

test('should call listImport with correct request', async () => {
const limit = 1;

const expectedRequest: ListImportsRequest = {
limit,
};

await listImportCommand.run(limit);

expect(apiProviderMock.provide).toHaveBeenCalled();
expect(apiMock.listImports).toHaveBeenCalledWith(expectedRequest);
});

test('should call describeImport with correct request', async () => {
const importId = 'import-id';
const req = { id: importId };

await describeImportCommand.run(importId);

expect(apiProviderMock.provide).toHaveBeenCalled();
expect(apiMock.describeImport).toHaveBeenCalledWith(req);
});

test('should call cancelImport with correct request', async () => {
const importId = 'import-id';
const req = { id: importId };

await cancelImportCommand.run(importId);

expect(apiProviderMock.provide).toHaveBeenCalled();
expect(apiMock.cancelImport).toHaveBeenCalledWith(req);
});
});
77 changes: 77 additions & 0 deletions src/data/bulkImport/bulkOperationsProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import type { PineconeConfiguration } from '../types';
import type { HTTPHeaders } from '../../pinecone-generated-ts-fetch/db_data';
import {
BulkOperationsApi,
Configuration,
ConfigurationParameters,
X_PINECONE_API_VERSION,
} from '../../pinecone-generated-ts-fetch/db_data';
import {
buildUserAgent,
getFetch,
normalizeUrl,
queryParamsStringify,
} from '../../utils';
import { IndexHostSingleton } from '../indexHostSingleton';
import { middleware } from '../../utils/middleware';

export class BulkOperationsProvider {
private readonly config: PineconeConfiguration;
private readonly indexName: string;
private indexHostUrl?: string;
private bulkOperations?: BulkOperationsApi;
private readonly additionalHeaders?: HTTPHeaders;

constructor(
config: PineconeConfiguration,
indexName: string,
indexHostUrl?: string,
additionalHeaders?: HTTPHeaders
) {
this.config = config;
this.indexName = indexName;
this.indexHostUrl = normalizeUrl(indexHostUrl);
this.additionalHeaders = additionalHeaders;
}

async provide() {
if (this.bulkOperations) {
return this.bulkOperations;
}

// If an indexHostUrl has been manually passed we use that,
// otherwise we rely on resolving the host from the IndexHostSingleton
if (this.indexHostUrl) {
this.bulkOperations = this.buildBulkOperationsConfig();
} else {
this.indexHostUrl = await IndexHostSingleton.getHostUrl(
this.config,
this.indexName
);

this.bulkOperations = this.buildBulkOperationsConfig();
}

return this.bulkOperations;
}

buildBulkOperationsConfig() {
const headers = this.additionalHeaders || null;

const indexConfigurationParameters: ConfigurationParameters = {
basePath: this.indexHostUrl,
apiKey: this.config.apiKey,
queryParamsStringify,
headers: {
'User-Agent': buildUserAgent(this.config),
'X-Pinecone-Api-Version': X_PINECONE_API_VERSION,
...headers,
},
fetchApi: getFetch(this.config),
middleware,
};

const indexConfiguration = new Configuration(indexConfigurationParameters);
return new BulkOperationsApi(indexConfiguration);
}
}
19 changes: 19 additions & 0 deletions src/data/bulkImport/cancelImport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { BulkOperationsProvider } from './bulkOperationsProvider';

export class CancelImportCommand {
apiProvider: BulkOperationsProvider;
namespace: string;

constructor(apiProvider: BulkOperationsProvider, namespace: string) {
this.apiProvider = apiProvider;
this.namespace = namespace;
}

async run(id: string): Promise<object> {
const req = {
id: id,
};
const api = await this.apiProvider.provide();
return await api.cancelImport(req);
}
}
20 changes: 20 additions & 0 deletions src/data/bulkImport/describeImport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { BulkOperationsProvider } from './bulkOperationsProvider';
import type { ImportModel } from '../../pinecone-generated-ts-fetch/db_data';

export class DescribeImportCommand {
apiProvider: BulkOperationsProvider;
namespace: string;

constructor(apiProvider: BulkOperationsProvider, namespace: string) {
this.apiProvider = apiProvider;
this.namespace = namespace;
}

async run(id: string): Promise<ImportModel> {
const req = {
id: id,
};
const api = await this.apiProvider.provide();
return await api.describeImport(req);
}
}
27 changes: 27 additions & 0 deletions src/data/bulkImport/listImports.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { BulkOperationsProvider } from './bulkOperationsProvider';
import {
ImportListResponse,
ListImportsRequest,
} from '../../pinecone-generated-ts-fetch/db_data';

export class ListImportsCommand {
apiProvider: BulkOperationsProvider;
namespace: string;

constructor(apiProvider: BulkOperationsProvider, namespace: string) {
this.apiProvider = apiProvider;
this.namespace = namespace;
}

async run(
limit?: number,
paginationToken?: string
): Promise<ImportListResponse> {
const req = {
limit: limit,
paginationToken: paginationToken,
} as ListImportsRequest;
const api = await this.apiProvider.provide();
return await api.listImports(req);
}
}
Loading

0 comments on commit 94f1a7b

Please sign in to comment.