Skip to content

Commit

Permalink
test: implement general setup and teardown hooks for bullmq queues
Browse files Browse the repository at this point in the history
  • Loading branch information
PJColombo committed Dec 20, 2023
1 parent 51dc4fc commit 02cd912
Show file tree
Hide file tree
Showing 21 changed files with 161 additions and 97 deletions.
1 change: 0 additions & 1 deletion clis/blob-propagation-jobs-cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
"main": "./src/index.ts",
"types": "./src/index.ts",
"scripts": {
"build": "tsc",
"clean": "rm -rf .turbo node_modules",
"lint": "eslint .",
"lint:fix": "pnpm lint --fix",
Expand Down
13 changes: 13 additions & 0 deletions clis/blob-propagation-jobs-cli/setup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { afterAll, beforeEach } from "vitest";

import { queueManager } from "./src/queue-manager";

beforeEach(async () => {
await queueManager.obliterateQueues({ force: true });
});
afterAll(async () => {
await queueManager
.obliterateQueues({ force: true })
// eslint-disable-next-line @typescript-eslint/no-misused-promises
.finally(() => queueManager.close());
});
6 changes: 4 additions & 2 deletions clis/blob-propagation-jobs-cli/src/QueueManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ export class QueueManager {
]);
}

obliterateQueues() {
obliterateQueues({ force = false } = {}) {
return Promise.all([
...Object.values(this.#storageQueues).map((queue) => queue.obliterate()),
...Object.values(this.#storageQueues).map((queue) =>
queue.obliterate({ force })
),
this.#finalizerQueue.obliterate(),
]);
}
Expand Down
15 changes: 1 addition & 14 deletions clis/blob-propagation-jobs-cli/test/commands/remove.test.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,5 @@
/* eslint-disable @typescript-eslint/no-misused-promises */
import type { SpyInstance } from "vitest";
import {
afterAll,
afterEach,
beforeEach,
describe,
expect,
it,
vi,
} from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

import { remove, removeCommandUsage } from "../../src/commands";
import { queueManager } from "../../src/queue-manager";
Expand Down Expand Up @@ -159,8 +150,4 @@ describe("Remove command", () => {
'"Invalid queue name: invalid-queue-name"'
);
});

afterAll(async () => {
await queueManager.obliterateQueues().finally(() => queueManager.close());
});
});
4 changes: 1 addition & 3 deletions clis/blob-propagation-jobs-cli/test/commands/retry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ describe("Retry command", () => {
});

afterAll(async () => {
let teardownPromise = queueManager
.obliterateQueues()
.finally(() => queueManager.close());
let teardownPromise = Promise.resolve();

storageWorkers.forEach((worker) => {
teardownPromise = teardownPromise.finally(() => worker.close());
Expand Down
2 changes: 1 addition & 1 deletion clis/blob-propagation-jobs-cli/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"extends": "../../tsconfig.json",
"include": ["src/**/*.ts", "test/**/*.ts", "vitest.config.ts"]
"include": ["src/**/*.ts", "test/**/*.ts", "vitest.config.ts", "setup.ts"]
}
18 changes: 11 additions & 7 deletions clis/blob-propagation-jobs-cli/vitest.config.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { defineProject } from "vitest/config";
import { defineConfig, mergeConfig } from "vitest/config";

export default defineProject({
test: {
include: ["test/**/*.test.ts"],
threads: false,
},
});
import { sharedProjectConfig } from "../../vitest.shared";

export default mergeConfig(
sharedProjectConfig,
defineConfig({
test: {
setupFiles: ["./setup.ts"],
},
})
);
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
"type-check": "turbo type-check",
"stats": "ts-node scripts/stats-aggregator/index.ts",
"job:daily": "ts-node scripts/jobs/aggregate-yesterdays-daily-stats.ts",
"job:overall": "ts-node scripts/jobs/aggregate-overall-stats.ts"
"job:overall": "ts-node scripts/jobs/aggregate-overall-stats.ts",
"validate": "turbo lint type-check && manypkg check && pnpm test"
},
"dependencies": {
"@blobscan/dayjs": "^0.0.1",
Expand Down
10 changes: 5 additions & 5 deletions packages/api/test/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
} from "vitest";

import type { Blob as PropagatorBlob } from "@blobscan/blob-propagator";
import { blobFileManager } from "@blobscan/blob-propagator/src/blob-file-manager";
import type { BlobReference } from "@blobscan/blob-storage-manager";
import { omitDBTimestampFields } from "@blobscan/test";

Expand Down Expand Up @@ -420,11 +419,12 @@ describe("Indexer router", async () => {
});

afterAll(async () => {
await blobFileManager.removeFolder();
const blobPropagator = ctxWithBlobPropagator.blobPropagator;

await ctxWithBlobPropagator.blobPropagator?.close({
emptyJobs: true,
});
if (blobPropagator) {
await blobPropagator.empty({ force: true });
await blobPropagator.close();
}
});

it("should call blob propagator", async () => {
Expand Down
36 changes: 36 additions & 0 deletions packages/blob-propagator/setup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Queue } from "bullmq";
import type { RedisOptions } from "ioredis";
import { afterAll } from "vitest";

import { blobFileManager } from "./src/blob-file-manager";
import { env } from "./src/env";
import { FINALIZER_WORKER_NAME, STORAGE_WORKER_NAMES } from "./src/utils";

const connection: RedisOptions = {
host: env.REDIS_QUEUE_HOST,
port: env.REDIS_QUEUE_PORT,
password: env.REDIS_QUEUE_PASSWORD,
username: env.REDIS_QUEUE_USERNAME,
};

afterAll(async () => {
const queues = [
STORAGE_WORKER_NAMES["GOOGLE"],
STORAGE_WORKER_NAMES["POSTGRES"],
FINALIZER_WORKER_NAME,
].map((queueName) => new Queue(queueName, { connection }));

let teardownPromise = Promise.all([
...queues.map((q) => q.obliterate({ force: true })),
blobFileManager.removeFolder(),
]);

queues.forEach((q) => {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
teardownPromise = teardownPromise.finally(async () => {
await q.close();
});
});

await teardownPromise;
});
71 changes: 41 additions & 30 deletions packages/blob-propagator/src/BlobPropagator.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { FlowProducer, Worker } from "bullmq";
/* eslint-disable @typescript-eslint/no-misused-promises */
import { FlowProducer, Queue, Worker } from "bullmq";
import type {
ConnectionOptions,
FlowChildJob,
Expand All @@ -19,7 +20,6 @@ import {
FINALIZER_WORKER_NAME,
STORAGE_WORKER_NAMES,
buildJobId,
emptyWorkerJobQueue,
} from "./utils";
import {
finalizerProcessor,
Expand Down Expand Up @@ -91,42 +91,49 @@ export class BlobPropagator {
);
}

close(opts?: { emptyJobs: boolean }) {
async empty({ force }: { force: boolean } = { force: false }) {
const workers = this.#getWorkers();
const queues = await Promise.all(
workers.map(
async (w) => new Queue(w.name, { connection: await w.client })
)
);

let emptyPromise = Promise.all([
...queues.map((q) => q.obliterate({ force })),
blobFileManager.removeFolder(),
]);

queues.forEach((q) => {
emptyPromise = emptyPromise.finally(async () => {
await q.close();
});
});

await emptyPromise;
}

close() {
let teardownPromise: Promise<void> = Promise.resolve();
const emptyJobs = opts?.emptyJobs;

Object.values(this.storageWorkers).forEach((w) => {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
teardownPromise = teardownPromise.finally(async () => {
if (emptyJobs) {
await emptyWorkerJobQueue(w);
}

await w.close();
});
});

return (
teardownPromise
// eslint-disable-next-line @typescript-eslint/no-misused-promises
.finally(async () => {
if (emptyJobs) {
await emptyWorkerJobQueue(this.finalizerWorker);
}

await this.finalizerWorker.close();
})
// eslint-disable-next-line @typescript-eslint/no-misused-promises
.finally(async () => {
const redisClient = await this.blobPropagationFlowProducer.client;

await redisClient.quit();
})
// eslint-disable-next-line @typescript-eslint/no-misused-promises
.finally(async () => {
await this.blobPropagationFlowProducer.close();
})
);
return teardownPromise
.finally(async () => {
await this.finalizerWorker.close();
})
.finally(async () => {
const redisClient = await this.blobPropagationFlowProducer.client;

await redisClient.quit();
})
.finally(async () => {
await this.blobPropagationFlowProducer.close();
});
}

async propagateBlob(blob: Blob) {
Expand All @@ -149,6 +156,10 @@ export class BlobPropagator {
await this.blobPropagationFlowProducer.addBulk(blobPropagationFlowJobs);
}

#getWorkers() {
return [...Object.values(this.storageWorkers), this.finalizerWorker];
}

#createBlobPropagationFlowProducer(connection?: ConnectionOptions) {
/*
* Instantiating a new `FlowProducer` appears to create two separate `RedisConnection` instances.
Expand Down
2 changes: 1 addition & 1 deletion packages/blob-propagator/src/blob-file-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ const basePath = !env.TEST ? os.tmpdir() : undefined;

export const blobFileManager = new BlobFileManager({
basePath,
folderName: "blob-files-test",
folderName: ".blob-files-test",
});
6 changes: 2 additions & 4 deletions packages/blob-propagator/src/blob-propagator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ function createBlobPropagator() {
},
});
}

const blobPropagator = env.BLOB_PROPAGATOR_ENABLED
? createBlobPropagator()
: undefined;
const blobPropagator =
env.BLOB_PROPAGATOR_ENABLED === true ? createBlobPropagator() : undefined;

export { blobPropagator, createBlobPropagator };
16 changes: 4 additions & 12 deletions packages/blob-propagator/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Worker } from "bullmq";
import { Queue } from "bullmq";
import type { Queue } from "bullmq";

import { getBlobStorageManager } from "@blobscan/blob-storage-manager";
import type { BlobStorageManager } from "@blobscan/blob-storage-manager";
import { prisma, $Enums } from "@blobscan/db";

import { blobFileManager } from "./blob-file-manager";
Expand All @@ -28,19 +28,11 @@ export function getStorageFromjobId(jobId: string) {
)?.[0] as $Enums.BlobStorage;
}

export async function emptyWorkerJobQueue(worker: Worker) {
const q = new Queue(worker.name, {
connection: await worker.client,
});

return q.drain();
}

export async function propagateBlob(
versionedHash: string,
targetStorage: $Enums.BlobStorage
targetStorage: $Enums.BlobStorage,
{ blobStorageManager }: { blobStorageManager: BlobStorageManager }
) {
const blobStorageManager = await getBlobStorageManager();
const blobData = await blobFileManager.readFile(versionedHash);

const result = await blobStorageManager.storeBlob(
Expand Down
10 changes: 8 additions & 2 deletions packages/blob-propagator/src/worker-processors/gcs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { getBlobStorageManager } from "@blobscan/blob-storage-manager";

import type { BlobPropagationWorkerProcessor } from "../types";
import { propagateBlob } from "../utils";

export const gcsProcessor: BlobPropagationWorkerProcessor = (job) => {
return propagateBlob(job.data.versionedHash, "GOOGLE");
export const gcsProcessor: BlobPropagationWorkerProcessor = async (job) => {
const blobStorageManager = await getBlobStorageManager();

return propagateBlob(job.data.versionedHash, "GOOGLE", {
blobStorageManager,
});
};
12 changes: 10 additions & 2 deletions packages/blob-propagator/src/worker-processors/postgres.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import { getBlobStorageManager } from "@blobscan/blob-storage-manager";

import type { BlobPropagationWorkerProcessor } from "../types";
import { propagateBlob } from "../utils";

export const postgresProcessor: BlobPropagationWorkerProcessor = (job) => {
return propagateBlob(job.data.versionedHash, "POSTGRES");
export const postgresProcessor: BlobPropagationWorkerProcessor = async (
job
) => {
const blobStorageManager = await getBlobStorageManager();

return propagateBlob(job.data.versionedHash, "POSTGRES", {
blobStorageManager,
});
};
10 changes: 8 additions & 2 deletions packages/blob-propagator/src/worker-processors/swarm.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { getBlobStorageManager } from "@blobscan/blob-storage-manager";

import type { BlobPropagationWorkerProcessor } from "../types";
import { propagateBlob } from "../utils";

export const swarmProcessor: BlobPropagationWorkerProcessor = function (job) {
return propagateBlob(job.data.versionedHash, "SWARM");
export const swarmProcessor: BlobPropagationWorkerProcessor = async function (
job
) {
const blobStorageManager = await getBlobStorageManager();

return propagateBlob(job.data.versionedHash, "SWARM", { blobStorageManager });
};
6 changes: 1 addition & 5 deletions packages/blob-propagator/test/swarm-worker.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { afterAll, describe, vi } from "vitest";
import { describe, vi } from "vitest";

import type { BlobStorageManager } from "@blobscan/blob-storage-manager";

Expand All @@ -10,10 +10,6 @@ const fixtures = {
blobVersionedHash: "swarmWorkerVersionedHash",
};

afterAll(() => {
vi.resetModules();
});

describe(
"Swarm Worker",
runStorageWorkerTestSuite("SWARM", {
Expand Down
2 changes: 1 addition & 1 deletion packages/blob-propagator/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"extends": "../../tsconfig.json",
"include": ["src/**/*.ts", "test/**/*.ts", "vitest.config.ts"]
"include": ["src/**/*.ts", "test/**/*.ts", "vitest.config.ts", "setup.ts"]
}
Loading

0 comments on commit 02cd912

Please sign in to comment.