Skip to content

Commit

Permalink
Add ci
Browse files Browse the repository at this point in the history
  • Loading branch information
ilijaNL committed Jul 22, 2024
1 parent 7dc1fe1 commit a560162
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 6 deletions.
39 changes: 39 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: test

on:
workflow_call:
pull_request:
branches:
- "main"
paths-ignore:
- 'docs/**'
- 'example/**'
- '**/*.md'

jobs:
# Label of the container job
test:
# Containers must run in Linux based operating systems
runs-on: ubuntu-latest

strategy:
matrix:
node-version: [18.x, 20.x]

steps:
- uses: actions/checkout@v4
- name: Install pnpm
uses: pnpm/action-setup@v4
with:
version: 8
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v4
with:
node-version: ${{ matrix.node-version }}
cache: 'pnpm'
- name: Install dependencies
run: pnpm install
- name: test
run: pnpm run test
# - name: ✅ Upload coverage to Codecov
# uses: codecov/codecov-action@v3
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "pg-task",
"author": "IlijaNL",
"version": "0.0.1",
"type": "module",
"type": "commonjs",
"types": "dist/index.d.ts",
"module": "dist/index.mjs",
"main": "dist/index.js",
Expand Down Expand Up @@ -37,6 +37,9 @@
"registry": "https://registry.npmjs.org",
"access": "public"
},
"engines": {
"node": ">=18.0.0"
},
"sideEffects": false,
"devDependencies": {
"@changesets/cli": "^2.27.7",
Expand Down
12 changes: 10 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import { createManager } from './manager';
export { createPlans } from './plans';
export * from './plans';
export { type WorkerManager, type WorkerManagerProperties, createManager } from './manager';
export { createTaskQueueFactory } from './task';
export {
createTaskQueueFactory,
TaskResultStates,
type ConfiguredTask,
type SelectedTask,
type Task,
type TaskResult,
type TaskResultState,
} from './task';
export default createManager;
157 changes: 154 additions & 3 deletions src/plans.spec.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { Pool } from 'pg';
import { createPlans } from './plans';
import { ConfiguredTask, TaskResultStates, Task } from './task';
import { ConfiguredTask, TaskResultStates, Task, SelectedTask, TaskResult } from './task';
import { PostgreSqlContainer, StartedPostgreSqlContainer } from '@testcontainers/postgresql';
import { cleanupSchema, createRandomSchema } from '../__tests__/db';
import { executeQuery } from './utils/sql';
import { migrate } from './utils/migrate';
import { createMigrations } from './migrations';
import { setTimeout } from 'timers/promises';
import { createBatcher } from 'node-batcher';

const generateTasks = (amount: number, data: Task['data']): Array<ConfiguredTask> =>
const generateTasks = (amount: number, data: Task['data'], startAfterSeconds = 100): Array<ConfiguredTask> =>
new Array(amount).fill({
data: data,
expireInSeconds: 100,
Expand All @@ -18,7 +19,7 @@ const generateTasks = (amount: number, data: Task['data']): Array<ConfiguredTask
retryBackoff: false,
retryDelayInSeconds: 20,
singletonKey: null,
startAfterSeconds: 100,
startAfterSeconds: startAfterSeconds,
} as ConfiguredTask);

describe('plans', () => {
Expand Down Expand Up @@ -366,4 +367,154 @@ describe('plans', () => {
]);
});
});

describe('performance', () => {
let pool: Pool;
let container: StartedPostgreSqlContainer;

let schema = createRandomSchema();
let plans: ReturnType<typeof createPlans>;

beforeAll(async () => {
container = await new PostgreSqlContainer().start();
});

afterAll(async () => {
await container.stop();
});

beforeEach(async () => {
pool = new Pool({
connectionString: container.getConnectionUri(),
// use 5
max: 5,
});
schema = createRandomSchema();
plans = createPlans(schema);
await migrate(pool, schema, createMigrations(schema));
});

afterEach(async () => {
await cleanupSchema(pool, schema);
await pool?.end();
});

it('creates 100000 tasks under 10 seconds', async () => {
jest.setTimeout(30000);

// pre-generated batches
const taskBatch = generateTasks(10, {
nested: { morenested: { a: '123', b: true, deep: { abc: 'long-string', number: 123123123 } } },
});

const start = process.hrtime();
const createTaskQuery = plans.createTasks(taskBatch);
for (let i = 0; i < 2000; ++i) {
await Promise.all([
executeQuery(pool, createTaskQuery),
executeQuery(pool, createTaskQuery),
executeQuery(pool, createTaskQuery),
executeQuery(pool, createTaskQuery),
executeQuery(pool, createTaskQuery),
]);
}

const [seconds] = process.hrtime(start);

expect(seconds).toBeLessThan(10);
});

it('pops 100000 tasks under 10 seconds', async () => {
jest.setTimeout(30000);

// pre-generated batches
const taskBatch = generateTasks(
10,
{
nested: { morenested: { a: '123', b: true, deep: { abc: 'long-string', number: 123123123 } } },
},
0
);

const createTasksQuery = plans.createTasks(taskBatch);
const createPromises = [];
// create
for (let i = 0; i < 10000; ++i) {
createPromises.push(executeQuery(pool, createTasksQuery));
}

await Promise.all(createPromises);

const getTaskQuery = plans.getAndStartTasks(taskBatch[0]!.queue, 10);
const start = process.hrtime();

const getTasksPromises: Promise<SelectedTask[]>[] = [];
for (let i = 0; i < 10000; ++i) {
getTasksPromises.push(executeQuery(pool, getTaskQuery));
}
const tasks = (await Promise.all(getTasksPromises)).flat();
const [seconds] = process.hrtime(start);

expect(tasks.length).toBe(100000);
expect(seconds).toBeLessThan(10);
});

it('resolves 100000 tasks under 10 seconds', async () => {
jest.setTimeout(40000);

// pre-generated batches
const taskBatch = generateTasks(
100,
{
nested: { morenested: { a: '123', b: true, deep: { abc: 'long-string', number: 123123123 } } },
},
0
);

const createTasksQuery = plans.createTasks(taskBatch);

const createPromises = [];
// create
for (let i = 0; i < 1000; ++i) {
createPromises.push(executeQuery(pool, createTasksQuery));
}

await Promise.all(createPromises);

const getTaskQuery = plans.getAndStartTasks(taskBatch[0]!.queue, 50);

const getTasksPromises: Promise<SelectedTask[]>[] = [];
for (let i = 0; i < 2000; ++i) {
getTasksPromises.push(executeQuery(pool, getTaskQuery));
}
const taskIds = (await Promise.all(getTasksPromises)).flat().map((t) => t.id);

expect(taskIds.length).toBe(100000);
const resolveBatcher = createBatcher<TaskResult>({
maxSize: 5,
maxTimeInMs: 100,
onFlush: async (batch) => {
await executeQuery(pool, plans.resolveTasks(batch.map((item) => item.data)));
},
});
const start = process.hrtime();

const promises: Array<Promise<any>> = [];
for (let i = 0; i < taskIds.length; ++i) {
promises.push(
resolveBatcher.add({ result: { success: true }, state: TaskResultStates.success, task_id: taskIds[i]! })
);
}

await Promise.all(promises);

const [seconds] = process.hrtime(start);

expect(seconds).toBeLessThan(10);

// sanity check
const [lastLog] = await executeQuery(pool, plans.getTaskExecutionLog(taskIds[taskIds.length - 1]!));
expect(lastLog?.task_id).toBe(taskIds[taskIds.length - 1]!);
});
});
});
1 change: 1 addition & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"compilerOptions": {
"target": "ESNext",
"composite": false,
"outDir": "./dist",
"declaration": true,
"declarationMap": false,
"newLine": "lf",
Expand Down

0 comments on commit a560162

Please sign in to comment.