diff --git a/.changeset/quiet-houses-fry.md b/.changeset/quiet-houses-fry.md new file mode 100644 index 0000000..31396b2 --- /dev/null +++ b/.changeset/quiet-houses-fry.md @@ -0,0 +1,5 @@ +--- +'pg-task': patch +--- + +Deal with unexpected connection errors diff --git a/__utils__/db.ts b/__utils__/db.ts index 8d16960..e8f28df 100644 --- a/__utils__/db.ts +++ b/__utils__/db.ts @@ -1,6 +1,6 @@ -import { Pool } from 'pg'; +import { QueryClient } from '../src'; -export async function cleanupSchema(pool: Pool, schema: string) { +export async function cleanupSchema(pool: QueryClient, schema: string) { await pool.query(`DROP SCHEMA ${schema} CASCADE`); } diff --git a/src/maintaince.ts b/src/maintaince.ts index c5f1eec..8116bb2 100644 --- a/src/maintaince.ts +++ b/src/maintaince.ts @@ -35,7 +35,7 @@ export const createMaintainceWorker = async (pool: Pool, schema: string, options poolInternvalInMs: 60000, onResolve(_, err, _result) { if (err) { - console.error('failed maintaince task', err); + console.log('failed maintaince task', err); } }, }, diff --git a/src/manager.spec.ts b/src/manager.spec.ts index ccd8abb..f33e340 100644 --- a/src/manager.spec.ts +++ b/src/manager.spec.ts @@ -1,16 +1,18 @@ import { PostgreSqlContainer, StartedPostgreSqlContainer } from '@testcontainers/postgresql'; import { Pool } from 'pg'; import { cleanupSchema, createRandomSchema } from '../__utils__/db'; -import { createManager } from './manager'; +import { createManager, WorkerManager } from './manager'; import EventEmitter, { once } from 'node:events'; import { executeQuery } from './utils/sql'; import { createPlans } from './plans'; import { setTimeout } from 'timers/promises'; +import { DeferredPromise } from './utils/common'; describe('pg worker', () => { jest.setTimeout(30000); let pool: Pool; let container: StartedPostgreSqlContainer; + let manager: WorkerManager; beforeAll(async () => { container = await new PostgreSqlContainer().start(); @@ -26,16 +28,18 @@ describe('pg worker', () => { pool = new Pool({ connectionString: container.getConnectionUri(), }); + schema = createRandomSchema(); }); afterEach(async () => { + await manager?.stop(); await cleanupSchema(pool, schema); await pool?.end(); }); it('smoke test', async () => { - const manager = createManager({ + manager = createManager({ pgClient: pool, schema, }); @@ -48,7 +52,7 @@ describe('pg worker', () => { }); it('smoke test with task', async () => { - const manager = createManager({ + manager = createManager({ pgClient: pool, schema, }); @@ -96,4 +100,58 @@ describe('pg worker', () => { // we should not have any pending tasks left await expect(executeQuery(pool, plans.popTasks(queue, 100))).resolves.toHaveLength(0); }); + + it('smoke test with connection drop', async () => { + manager = createManager({ + pgClient: pool, + schema, + }); + + const queue = 'test-queue'; + + const deferredPromise = new DeferredPromise(); + + await manager.register({ + queue: queue, + options: { + poolInternvalInMs: 20, + maxConcurrency: 10, + refillThresholdPct: 0.33, + }, + handler(data) { + deferredPromise.resolve(data); + }, + }); + + await setTimeout(200); + + pool + .query( + 'SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND usename = current_user' + ) + .catch(() => {}); + + await setTimeout(200); + + const plans = createPlans(schema); + + await executeQuery( + pool, + plans.enqueueTasks([ + { + data: { nosmoke: true }, + expireInSeconds: 1, + maxAttempts: 1, + metaData: {}, + queue: queue, + retryBackoff: false, + retryDelayInSeconds: 10, + singletonKey: null, + startAfterSeconds: 0, + }, + ]) + ); + + await expect(deferredPromise.promise).resolves.toEqual({ nosmoke: true }); + }); }); diff --git a/src/manager.ts b/src/manager.ts index f857b69..8b8579c 100644 --- a/src/manager.ts +++ b/src/manager.ts @@ -37,6 +37,12 @@ export const createManager = (properties: WorkerManagerProperties): WorkerManage let startPromise: Promise | null = null; + const onError = (err: any) => { + console.log('pgClient error', err?.message ?? err); + }; + + pgClient.on('error', onError); + async function init() { state = 'starting'; @@ -118,6 +124,7 @@ export const createManager = (properties: WorkerManagerProperties): WorkerManage state = 'idle'; startPromise = null; + pgClient.off('error', onError); }, }; }; diff --git a/src/task-worker.ts b/src/task-worker.ts index 77f8081..6f69340 100644 --- a/src/task-worker.ts +++ b/src/task-worker.ts @@ -85,7 +85,10 @@ export const createTaskWorker = (implementation: WorkerImpl, config: TaskWorkerC } const requestedAmount = maxConcurrency - activeTasks.size; - const tasks = await implementation.popTasks(requestedAmount); + const tasks = await implementation.popTasks(requestedAmount).catch((err) => { + console.log('error popping tasks', 'message' in err ? err.message : err); + return [] as SelectedTask[]; + }); // high chance that there are more tasks when requested amount is same as fetched probablyHasMoreTasks = tasks.length === requestedAmount; diff --git a/src/utils/sql.ts b/src/utils/sql.ts index 3aad1cd..3f8472b 100644 --- a/src/utils/sql.ts +++ b/src/utils/sql.ts @@ -11,6 +11,12 @@ export type TypedQuery = { [rowTypeSymbol]: TRow; }; +export interface Notification { + processId: number; + channel: string; + payload?: string | undefined; +} + export interface QueryClient { query(query: string, values?: any[]): Promise; query(props: { @@ -25,10 +31,16 @@ export interface QueryClient { export interface Pool extends QueryClient { connect(): Promise; + on(event: 'error', listener: (...args: any[]) => void): this; + off(event: 'error', listener: (...args: any[]) => void): this; } export interface ClientFromPool extends QueryClient { release(err?: boolean | Error | undefined): void; + on(event: 'notification', listener: (message: Notification) => void): unknown; + off(event: 'notification', listener: (message: Notification) => void): any; + on(event: 'error', listener: (...args: any[]) => void): unknown; + off(event: 'error', listener: (...args: any[]) => void): unknown; } export async function executeQuery( @@ -69,12 +81,12 @@ export async function runTransaction(pool: Pool, handler: (client: ClientFrom /** * Values supported by SQL engine. */ -export type Value = unknown; +type Value = unknown; /** * Supported value or SQL instance. */ -export type RawValue = Value | Sql; +type RawValue = Value | Sql; /** * A SQL instance can be nested within each other to build SQL strings. diff --git a/src/utils/worker.ts b/src/utils/worker.ts index 71476a4..b39a4c1 100644 --- a/src/utils/worker.ts +++ b/src/utils/worker.ts @@ -23,7 +23,10 @@ export function createBaseWorker(run: () => Promise, props: { lo while (state.polling) { const started = Date.now(); state.executing = true; - const shouldContinue = await run(); + const shouldContinue = await run().catch((err) => { + console.log('error in worker loop', err?.message ?? err); + return false; + }); state.executing = false; const duration = Date.now() - started; diff --git a/tsconfig.json b/tsconfig.json index c0771cb..95e7c54 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -31,5 +31,5 @@ "strict": true }, "exclude": ["**/node_modules**/"], - + "include": ["./src/*", "__utils__"] } \ No newline at end of file