From 3f54dd0e942ae2d10a70a582e2888b66431bd1ad Mon Sep 17 00:00:00 2001 From: ilijaNL Date: Mon, 22 Jul 2024 08:08:57 +0200 Subject: [PATCH 1/8] Rename + readme example --- README.md | 47 +++++++++++++++++++++++++++++++++++++++++++++ src/index.ts | 8 ++++++++ src/maintaince.ts | 4 ++-- src/manager.spec.ts | 4 ++-- src/plans.spec.ts | 28 +++++++++++++-------------- src/plans.ts | 4 ++-- src/queue-worker.ts | 2 +- 7 files changed, 76 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index df26837..369278e 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,50 @@ # pg-task A SQS like solution build on top of Postgres and NodeJS. + +## Usage + +``` +npm install pg-task +``` + +```typescript +import { createManager, executeQuery, createPlans, createTaskQueueFactory } from 'pg-task'; +import { Pool } from 'pg'; + +const pool = new Pool({}); + +const manager = createManager({ + pgClient: pool, + schema, +}); +await manager.start(); + +// Register a worker for `worker-queue` task queue +const workerId = await manager.work({ + queue: 'worker-queue', + async handler(data) { + await Promise.resolve(); + }, +}); + +// enqueue tasks +const plans = createPlans(schema); +const taskFactory = createTaskQueueFactory('worker-queue'); +await executeQuery( + pool, + plans.enqueueTasks( + taskFactory([ + { + data: { somepayload: 'test' }, + }, + { + data: { somepayload: 'test' }, + }, + ]) + ) +); + +// On application shutdown +await manager.stop(); +``` diff --git a/src/index.ts b/src/index.ts index 2ab3336..67d39d5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,4 +10,12 @@ export { type TaskResult, type TaskResultState, } from './task'; +export { + executeQuery, + type Pool, + type ClientFromPool, + type QueryClient, + type TypedQuery, + type QueryResultRow, +} from './utils/sql'; export default createManager; diff --git a/src/maintaince.ts b/src/maintaince.ts index 51d386c..c5f1eec 100644 --- a/src/maintaince.ts +++ b/src/maintaince.ts @@ -57,7 +57,7 @@ export const createMaintainceWorker = async (pool: Pool, schema: string, options // complete this task, and reschedule it in future await transactionExecutor(plans.resolveTasks([{ task_id: id, result: data, state: TaskResultStates.success }])); await transactionExecutor( - plans.createTasks( + plans.enqueueTasks( taskFactory([ { data: null, @@ -73,7 +73,7 @@ export const createMaintainceWorker = async (pool: Pool, schema: string, options // ensure we try to create the maintaince tasks always await executor( - plans.createTasks( + plans.enqueueTasks( taskFactory([ { data: null, diff --git a/src/manager.spec.ts b/src/manager.spec.ts index 1d93f34..6b82867 100644 --- a/src/manager.spec.ts +++ b/src/manager.spec.ts @@ -73,7 +73,7 @@ describe('pg worker', () => { await executeQuery( pool, - plans.createTasks([ + plans.enqueueTasks([ { data: { nosmoke: true }, expireInSeconds: 1, @@ -94,6 +94,6 @@ describe('pg worker', () => { await manager.stop(); // we should not have any pending tasks left - await expect(executeQuery(pool, plans.getAndStartTasks(queue, 100))).resolves.toHaveLength(0); + await expect(executeQuery(pool, plans.popTasks(queue, 100))).resolves.toHaveLength(0); }); }); diff --git a/src/plans.spec.ts b/src/plans.spec.ts index d72c70f..3b3c2ee 100644 --- a/src/plans.spec.ts +++ b/src/plans.spec.ts @@ -27,7 +27,7 @@ describe('plans', () => { const plans = createPlans('schema_a'); it('createTasks', () => { - const q = plans.createTasks(generateTasks(3, { works: true, value: '123' })); + const q = plans.enqueueTasks(generateTasks(3, { works: true, value: '123' })); expect(q.text).toMatchInlineSnapshot(` " SELECT @@ -105,8 +105,8 @@ describe('plans', () => { `); }); - it('getAndStartTasks', () => { - const q = plans.getAndStartTasks('queue', 20); + it('popTasks', () => { + const q = plans.popTasks('queue', 20); expect(q.text).toMatchInlineSnapshot(` " SELECT @@ -230,9 +230,9 @@ describe('plans', () => { }, ]; - await executeQuery(pool, plans.createTasks(tasks)); + await executeQuery(pool, plans.enqueueTasks(tasks)); - const fetchedTasks = await executeQuery(pool, plans.getAndStartTasks('test-queue', 11)); + const fetchedTasks = await executeQuery(pool, plans.popTasks('test-queue', 11)); expect(fetchedTasks).toHaveLength(tasks.length); expect(fetchedTasks.map((t) => t.data)).toEqual(tasks.map((t) => t.data)); @@ -271,9 +271,9 @@ describe('plans', () => { startAfterSeconds: 0, }; - await executeQuery(pool, plans.createTasks([task])); + await executeQuery(pool, plans.enqueueTasks([task])); - const fetchedTasks1 = await executeQuery(pool, plans.getAndStartTasks(queue, 11)); + const fetchedTasks1 = await executeQuery(pool, plans.popTasks(queue, 11)); // fails await executeQuery( @@ -287,13 +287,13 @@ describe('plans', () => { ) ); - const fetchedTasks2 = await executeQuery(pool, plans.getAndStartTasks(queue, 11)); + const fetchedTasks2 = await executeQuery(pool, plans.popTasks(queue, 11)); expect(fetchedTasks2.length).toBe(0); await setTimeout(1500); - const [fetchedTask3] = await executeQuery(pool, plans.getAndStartTasks(queue, 11)); + const [fetchedTask3] = await executeQuery(pool, plans.popTasks(queue, 11)); expect(fetchedTask3).toBeDefined(); const succeedTask = fetchedTask3!; @@ -408,7 +408,7 @@ describe('plans', () => { }); const start = process.hrtime(); - const createTaskQuery = plans.createTasks(taskBatch); + const createTaskQuery = plans.enqueueTasks(taskBatch); for (let i = 0; i < 2000; ++i) { await Promise.all([ executeQuery(pool, createTaskQuery), @@ -436,7 +436,7 @@ describe('plans', () => { 0 ); - const createTasksQuery = plans.createTasks(taskBatch); + const createTasksQuery = plans.enqueueTasks(taskBatch); const createPromises = []; // create for (let i = 0; i < 10000; ++i) { @@ -445,7 +445,7 @@ describe('plans', () => { await Promise.all(createPromises); - const getTaskQuery = plans.getAndStartTasks(taskBatch[0]!.queue, 10); + const getTaskQuery = plans.popTasks(taskBatch[0]!.queue, 10); const start = process.hrtime(); const getTasksPromises: Promise[] = []; @@ -471,7 +471,7 @@ describe('plans', () => { 0 ); - const createTasksQuery = plans.createTasks(taskBatch); + const createTasksQuery = plans.enqueueTasks(taskBatch); const createPromises = []; // create @@ -481,7 +481,7 @@ describe('plans', () => { await Promise.all(createPromises); - const getTaskQuery = plans.getAndStartTasks(taskBatch[0]!.queue, 50); + const getTaskQuery = plans.popTasks(taskBatch[0]!.queue, 50); const getTasksPromises: Promise[] = []; for (let i = 0; i < 2000; ++i) { diff --git a/src/plans.ts b/src/plans.ts index ac59d64..2a87c6a 100644 --- a/src/plans.ts +++ b/src/plans.ts @@ -18,7 +18,7 @@ const itemsToKeys = >(items: T[], init: KeysToArr< }, init); export const createPlans = (schema: string) => ({ - createTasks: (tasks: ConfiguredTask[]) => { + enqueueTasks: (tasks: ConfiguredTask[]) => { const payload = itemsToKeys(tasks, { data: new Array(tasks.length), expireInSeconds: new Array(tasks.length), @@ -49,7 +49,7 @@ FROM ${rawSql(schema)}.create_tasks( ) `; }, - getAndStartTasks: (queue: string, amount: number): TypedQuery => { + popTasks: (queue: string, amount: number): TypedQuery => { return sql<{ id: string; data: JsonValue; diff --git a/src/queue-worker.ts b/src/queue-worker.ts index 96dc48c..0f081e7 100644 --- a/src/queue-worker.ts +++ b/src/queue-worker.ts @@ -41,7 +41,7 @@ export const createQueueWorker = ( return createTaskWorker( { async popTasks(amount) { - return queryExecutor(sqlPlans.getAndStartTasks(props.queue, amount)); + return queryExecutor(sqlPlans.popTasks(props.queue, amount)); }, async resolveTask(task) { await resolveTaskBatcher.add(task); From 33299955a101e32112a400bd6afc83c40f4cf149 Mon Sep 17 00:00:00 2001 From: ilijaNL Date: Mon, 22 Jul 2024 08:13:52 +0200 Subject: [PATCH 2/8] Reduce performance requirements --- src/plans.spec.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/plans.spec.ts b/src/plans.spec.ts index 3b3c2ee..c01787b 100644 --- a/src/plans.spec.ts +++ b/src/plans.spec.ts @@ -459,7 +459,7 @@ describe('plans', () => { expect(seconds).toBeLessThan(10); }); - it('resolves 100000 tasks under 10 seconds', async () => { + it('resolves 50000 tasks under 10 seconds', async () => { jest.setTimeout(40000); // pre-generated batches @@ -475,7 +475,7 @@ describe('plans', () => { const createPromises = []; // create - for (let i = 0; i < 1000; ++i) { + for (let i = 0; i < 500; ++i) { createPromises.push(executeQuery(pool, createTasksQuery)); } @@ -484,14 +484,15 @@ describe('plans', () => { const getTaskQuery = plans.popTasks(taskBatch[0]!.queue, 50); const getTasksPromises: Promise[] = []; - for (let i = 0; i < 2000; ++i) { + for (let i = 0; i < 1000; ++i) { getTasksPromises.push(executeQuery(pool, getTaskQuery)); } const taskIds = (await Promise.all(getTasksPromises)).flat().map((t) => t.id); - expect(taskIds.length).toBe(100000); + expect(taskIds.length).toBe(50000); + const resolveBatcher = createBatcher({ - maxSize: 5, + maxSize: 10, maxTimeInMs: 100, onFlush: async (batch) => { await executeQuery(pool, plans.resolveTasks(batch.map((item) => item.data))); From 12918ea5d4056e7ffd01bce406cd2dfd663a73ba Mon Sep 17 00:00:00 2001 From: ilijaNL Date: Mon, 22 Jul 2024 08:22:50 +0200 Subject: [PATCH 3/8] Fix ci pnpm cache --- .github/workflows/test.yml | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ca35e55..30985fe 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,17 +22,34 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Use Node.js ${{ matrix.node-version }} + id: setup-node + uses: actions/setup-node@v4 + with: + node-version: ${{ matrix.node-version }} + - name: Install pnpm uses: pnpm/action-setup@v4 with: version: 8 - - name: Use Node.js ${{ matrix.node-version }} - uses: actions/setup-node@v4 + + # See https://github.com/actions/setup-node/issues/641#issuecomment-1358859686 + - name: pnpm cache path + id: pnpm-cache-path + run: | + echo "STORE_PATH=$(pnpm store path)" >> $GITHUB_OUTPUT + + - name: pnpm cache + uses: actions/cache@v3 with: - node-version: ${{ matrix.node-version }} - cache: 'pnpm' + path: ${{ steps.pnpm-cache-path.outputs.STORE_PATH }} + key: ${{ runner.os }}-${{ steps.setup-node.outputs.node-version }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }} + restore-keys: | + ${{ runner.os }}-${{ steps.setup-node.outputs.node-version }}-pnpm-store- + - name: Install dependencies - run: pnpm install + run: pnpm install --frozen-lockfile + - name: test run: pnpm run test # - name: ✅ Upload coverage to Codecov From 76fa7dbf93952461d3ef6b3b44dd1a0c590f61ff Mon Sep 17 00:00:00 2001 From: ilijaNL Date: Mon, 22 Jul 2024 08:30:22 +0200 Subject: [PATCH 4/8] Init changeset --- .changeset/README.md | 8 ++++++++ .changeset/config.json | 11 +++++++++++ 2 files changed, 19 insertions(+) create mode 100644 .changeset/README.md create mode 100644 .changeset/config.json diff --git a/.changeset/README.md b/.changeset/README.md new file mode 100644 index 0000000..e5b6d8d --- /dev/null +++ b/.changeset/README.md @@ -0,0 +1,8 @@ +# Changesets + +Hello and welcome! This folder has been automatically generated by `@changesets/cli`, a build tool that works +with multi-package repos, or single-package repos to help you version and publish your code. You can +find the full documentation for it [in our repository](https://github.com/changesets/changesets) + +We have a quick list of common questions to get you started engaging with this project in +[our documentation](https://github.com/changesets/changesets/blob/main/docs/common-questions.md) diff --git a/.changeset/config.json b/.changeset/config.json new file mode 100644 index 0000000..ae82eba --- /dev/null +++ b/.changeset/config.json @@ -0,0 +1,11 @@ +{ + "$schema": "https://unpkg.com/@changesets/config@3.0.2/schema.json", + "changelog": "@changesets/cli/changelog", + "commit": false, + "fixed": [], + "linked": [], + "access": "restricted", + "baseBranch": "main", + "updateInternalDependencies": "patch", + "ignore": [] +} From 655260584fc3f4fccee04e1de9088bdbaf1056b3 Mon Sep 17 00:00:00 2001 From: ilijaNL Date: Mon, 22 Jul 2024 08:38:18 +0200 Subject: [PATCH 5/8] Add ci/cd --- .changeset/good-seahorses-wave.md | 5 +++ .github/workflows/cd.yml | 53 ++++++++++++++++++++++++++ .github/workflows/{test.yml => ci.yml} | 6 +-- 3 files changed, 61 insertions(+), 3 deletions(-) create mode 100644 .changeset/good-seahorses-wave.md create mode 100644 .github/workflows/cd.yml rename .github/workflows/{test.yml => ci.yml} (93%) diff --git a/.changeset/good-seahorses-wave.md b/.changeset/good-seahorses-wave.md new file mode 100644 index 0000000..4a3938b --- /dev/null +++ b/.changeset/good-seahorses-wave.md @@ -0,0 +1,5 @@ +--- +'pg-task': patch +--- + +Add initial version diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml new file mode 100644 index 0000000..569f14b --- /dev/null +++ b/.github/workflows/cd.yml @@ -0,0 +1,53 @@ +name: continuous deployment + +on: + pull_request: + branches: + - main + push: + branches: + - main + +concurrency: ${{ github.workflow }}-${{ github.ref }} + +jobs: + test: + uses: ./.github/workflows/ci.yml + release: + name: Release + needs: test + runs-on: ubuntu-latest + permissions: + id-token: write + contents: write + packages: write + pull-requests: write + issues: read + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - uses: actions/setup-node@v4 + with: + node-version: 20.x + # See https://github.com/actions/setup-node/issues/641#issuecomment-1358859686 + - name: pnpm cache path + id: pnpm-cache-path + run: | + echo "STORE_PATH=$(pnpm store path)" >> $GITHUB_OUTPUT + + - name: pnpm cache + uses: actions/cache@v3 + with: + path: ${{ steps.pnpm-cache-path.outputs.STORE_PATH }} + key: ${{ runner.os }}-${{ steps.setup-node.outputs.node-version }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }} + restore-keys: | + ${{ runner.os }}-${{ steps.setup-node.outputs.node-version }}-pnpm-store- + + - name: Install dependencies + run: pnpm install --frozen-lockfile + + - name: Create Release Pull Request + uses: changesets/action@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/test.yml b/.github/workflows/ci.yml similarity index 93% rename from .github/workflows/test.yml rename to .github/workflows/ci.yml index 30985fe..af36741 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/ci.yml @@ -1,4 +1,4 @@ -name: test +name: integration on: workflow_call: @@ -52,5 +52,5 @@ jobs: - name: test run: pnpm run test - # - name: ✅ Upload coverage to Codecov - # uses: codecov/codecov-action@v3 \ No newline at end of file + - name: ✅ Upload coverage to Codecov + uses: codecov/codecov-action@v3 \ No newline at end of file From 5824ef8f09d6eddc8849d193372162b4f89a9620 Mon Sep 17 00:00:00 2001 From: ilijaNL Date: Mon, 22 Jul 2024 08:44:27 +0200 Subject: [PATCH 6/8] Fix cd --- .github/workflows/cd.yml | 7 ++++++- src/task-worker.spec.ts | 6 +++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index 569f14b..c74fa1e 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -28,9 +28,14 @@ jobs: with: fetch-depth: 0 - uses: actions/setup-node@v4 + id: setup-node with: node-version: 20.x - # See https://github.com/actions/setup-node/issues/641#issuecomment-1358859686 + - name: Install pnpm + uses: pnpm/action-setup@v4 + with: + version: 8 + # See https://github.com/actions/setup-node/issues/641#issuecomment-1358859686 - name: pnpm cache path id: pnpm-cache-path run: | diff --git a/src/task-worker.spec.ts b/src/task-worker.spec.ts index a98f590..305a18f 100644 --- a/src/task-worker.spec.ts +++ b/src/task-worker.spec.ts @@ -128,7 +128,7 @@ describe('task-worker', () => { } }, }, - { maxConcurrency: amountOfTasks, poolInternvalInMs: 20, refillThresholdPct: 0.33 } + { maxConcurrency: amountOfTasks, poolInternvalInMs: 80, refillThresholdPct: 0.33 } ); worker.start(); @@ -221,7 +221,7 @@ describe('task-worker', () => { await setTimeout(200); }, }, - { maxConcurrency: amountOfTasks, poolInternvalInMs: 20, refillThresholdPct: 0.33 } + { maxConcurrency: amountOfTasks, poolInternvalInMs: 100, refillThresholdPct: 0.33 } ); worker.start(); @@ -270,7 +270,7 @@ describe('task-worker', () => { }; }, }, - { maxConcurrency: amountOfTasks, poolInternvalInMs: 60, refillThresholdPct: 0.33 } + { maxConcurrency: amountOfTasks, poolInternvalInMs: 100, refillThresholdPct: 0.33 } ); const promise = once(ee, 'completed'); From 7cba56c637c3b31d2c84455ebdd194c7ddbeff0b Mon Sep 17 00:00:00 2001 From: ilijaNL Date: Mon, 22 Jul 2024 08:47:42 +0200 Subject: [PATCH 7/8] rename --- .github/workflows/{cd.yml => cicd.yml} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename .github/workflows/{cd.yml => cicd.yml} (98%) diff --git a/.github/workflows/cd.yml b/.github/workflows/cicd.yml similarity index 98% rename from .github/workflows/cd.yml rename to .github/workflows/cicd.yml index c74fa1e..849e903 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cicd.yml @@ -1,4 +1,4 @@ -name: continuous deployment +name: cicd on: pull_request: From 7bf2e11c80ddd0a00d5bfb1c39d87916354aa94a Mon Sep 17 00:00:00 2001 From: ilijaNL Date: Mon, 22 Jul 2024 08:52:32 +0200 Subject: [PATCH 8/8] Fix cicd --- .github/workflows/ci.yml | 3 ++- .github/workflows/cicd.yml | 3 --- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index af36741..3eb6f6e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,7 +49,8 @@ jobs: - name: Install dependencies run: pnpm install --frozen-lockfile - + - name: typecheck + run: pnpm run typecheck - name: test run: pnpm run test - name: ✅ Upload coverage to Codecov diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 849e903..c0de3e1 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -1,9 +1,6 @@ name: cicd on: - pull_request: - branches: - - main push: branches: - main