Skip to content

Commit

Permalink
feat: add blob propagator jobs CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
PJColombo committed Dec 19, 2023
1 parent e5f75a1 commit f9975b5
Show file tree
Hide file tree
Showing 21 changed files with 570 additions and 27 deletions.
33 changes: 33 additions & 0 deletions clis/blob-propagation-jobs-cli/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"name": "@blobscan/blob-propagation-jobs-cli",
"version": "0.0.1",
"private": true,
"description": "A CLI that facilitates the managing of blob propagation jobs",
"main": "./src/index.ts",
"types": "./src/index.ts",
"scripts": {
"build": "tsc",
"clean": "rm -rf .turbo node_modules",
"lint": "eslint .",
"lint:fix": "pnpm lint --fix",
"type-check": "tsc --noEmit",
"start": "ts-node src/index.ts",
"test": "pnpm with-env:dev vitest",
"test:ui": "pnpm with-env:dev vitest --ui",
"with-env:dev": "dotenv -e ../../.env.development --"
},
"dependencies": {
"@blobscan/blob-propagator": "^0.0.1",
"@blobscan/db": "^0.0.1",
"bullmq": "^4.13.2"
},
"devDependencies": {
"@blobscan/eslint-config": "^0.0.1"
},
"eslintConfig": {
"root": true,
"extends": [
"@blobscan/eslint-config/base"
]
}
}
84 changes: 84 additions & 0 deletions clis/blob-propagation-jobs-cli/src/QueueManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import type { ConnectionOptions, JobType } from "bullmq";
import { Queue } from "bullmq";

import type { BlobPropagationJobData } from "@blobscan/blob-propagator";
import {
FINALIZER_WORKER_NAME,
STORAGE_WORKER_NAMES,
} from "@blobscan/blob-propagator";
import { $Enums } from "@blobscan/db";

export class QueueManager {
#storageQueues: Record<$Enums.BlobStorage, Queue<BlobPropagationJobData>>;
#finalizerQueue: Queue;

constructor(connection: ConnectionOptions) {
this.#storageQueues = Object.values($Enums.BlobStorage).reduce(
(queues, storage) => {
queues[storage] = new Queue(STORAGE_WORKER_NAMES[storage], {
connection,
});

return queues;
},
{} as Record<$Enums.BlobStorage, Queue>
);

this.#finalizerQueue = new Queue(FINALIZER_WORKER_NAME, {
connection,
});
}

getQueue(
queueName: "FINALIZER" | $Enums.BlobStorage
): typeof queueName extends "FINALIZER"
? Queue
: Queue<BlobPropagationJobData> {
if (queueName === "FINALIZER") {
return this.#finalizerQueue;
}

return this.#storageQueues[queueName];
}

getStorageQueues() {
return this.#storageQueues;
}

getJobs(types: JobType[]) {
return Promise.all([
...Object.values(this.#storageQueues).map((queue) =>
queue.getJobs(types)
),
this.#finalizerQueue.getJobs(types),
]).then((jobs) => jobs.flat());
}

drainQueues() {
return Promise.all([
...Object.values(this.#storageQueues).map((queue) => queue.drain()),
this.#finalizerQueue.drain(),
]);
}

obliterateQueues() {
return Promise.all([
...Object.values(this.#storageQueues).map((queue) => queue.obliterate()),
this.#finalizerQueue.obliterate(),
]);
}

close() {
let teardownPromise = Promise.resolve();

Object.values(this.#storageQueues).forEach((queue) => {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
teardownPromise = teardownPromise.finally(() => queue.close());
});

// eslint-disable-next-line @typescript-eslint/no-misused-promises
teardownPromise.finally(() => this.#finalizerQueue.close());

return teardownPromise;
}
}
1 change: 1 addition & 0 deletions clis/blob-propagation-jobs-cli/src/commands/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./retry";
101 changes: 101 additions & 0 deletions clis/blob-propagation-jobs-cli/src/commands/retry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import commandLineArgs from "command-line-args";
import commandLineUsage from "command-line-usage";

import { buildJobId, STORAGE_WORKER_NAMES } from "@blobscan/blob-propagator";
import { $Enums } from "@blobscan/db";

import { blobHashOptionDef, helpOptionDef, queueOptionDef } from "../common";
import { queueManager } from "../queue-manager";

const retryCommandOptDefs: commandLineArgs.OptionDefinition[] = [
helpOptionDef,
queueOptionDef,
blobHashOptionDef,
];

const retryCommandUsage = commandLineUsage([
{
header: "Retry Command",
content: "Retries failed jobs.",
},
{
header: "Options",
optionList: retryCommandOptDefs,
},
]);

function normalizeQueueName(
input: string
): Parameters<typeof queueManager.getQueue>[0] {
const input_ = input.toUpperCase();

if (input_ === "FINALIZER") {
return "FINALIZER";
}
const selectedBlobStorage = Object.keys(STORAGE_WORKER_NAMES).find(
(blobStorageName) => blobStorageName === input_
);

if (!selectedBlobStorage) {
throw new Error(`Invalid queue name: ${input}`);
}

return selectedBlobStorage as $Enums.BlobStorage;
}

export async function retry(argv?: string[]) {
const {
help,
queue: rawQueueNames,
blobHash: blobHashes,
} = commandLineArgs(retryCommandOptDefs, {
argv,
}) as {
help: boolean;
queue?: string[];
blobHash?: string[];
};

if (help) {
console.log(retryCommandUsage);

return;
}

const queueNames = rawQueueNames
? rawQueueNames.map(normalizeQueueName)
: // If no storage names are provided, retry failed jobs from all storage queues
Object.values($Enums.BlobStorage);

// If blob hashes are provided, retry only the jobs with those hashes
if (blobHashes?.length) {
const selectedQueueJobs = await Promise.all(
queueNames.map(async (name) => {
const queue = queueManager.getQueue(name);

return Promise.all(
blobHashes.map(async (blobHash) => {
const jobId = buildJobId(queue, blobHash);
const job = await queue.getJob(jobId);

if (!job) {
throw new Error(`Couldn't find job with id ${jobId}`);
}

return job;
})
);
})
);

return Promise.all(
selectedQueueJobs.map((jobs) => jobs.map((j) => j.retry()))
);
}

return Promise.all(
queueNames
.map((name) => queueManager.getQueue(name))
.map((queue) => queue.retryJobs())
);
}
34 changes: 34 additions & 0 deletions clis/blob-propagation-jobs-cli/src/common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import type commandLineUsage from "command-line-usage";

export const helpOptionDef: commandLineUsage.OptionDefinition = {
name: "help",
alias: "h",
description: "Print this usage guide.",
type: Boolean,
};

export const queueOptionDef: commandLineUsage.OptionDefinition = {
name: "queue",
alias: "q",
typeLabel: "{underline queue}",
description:
"Queue to retry failed jobs from. Valid valures are {italic finalizer}, {italic google}, {italic postgres} or {italic swarm}.",
type: String,
multiple: true,
};

export const blobHashOptionDef: commandLineUsage.OptionDefinition = {
name: "blobHash",
alias: "b",
typeLabel: "{underline blob-hash}",
description: "Blob hash of the failed jobs to retry.",
type: String,
multiple: true,
};

export const connection = {
host: process.env.REDIS_QUEUE_HOST,
port: Number(process.env.REDIS_QUEUE_PORT),
password: process.env.REDIS_QUEUE_PASSWORD,
username: process.env.REDIS_QUEUE_USERNAME,
};
72 changes: 72 additions & 0 deletions clis/blob-propagation-jobs-cli/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import commandLineArgs from "command-line-args";
import commandLineUsage from "command-line-usage";

import { retry } from "./commands";
import { helpOptionDef } from "./common";
import { queueManager } from "./queue-manager";

const mainDefs: commandLineUsage.OptionDefinition[] = [
{ name: "command", defaultOption: true },
helpOptionDef,
];

const mainUsage = commandLineUsage([
{
header: "Blobscan's Blob Propagation Job Manager",
content: "A CLI that facilitates the managing of blob propagation jobs",
},
{
header: "Command List",
content: [{ name: "{bold retry}", summary: "Retries failed jobs" }],
},
{
header: "Options",
optionList: [helpOptionDef],
},
]);

async function main() {
const mainOptions = commandLineArgs(mainDefs, {
stopAtFirstUnknown: true,
}) as commandLineArgs.CommandLineOptions & {
command?: string;
help?: boolean;
};
const { command, help } = mainOptions;

if (!command) {
if (help) {
console.log(mainUsage);

return;
}

throw new Error("No command specified");
}

const argv = mainOptions._unknown || [];

if (help) {
/**
* Re-add the help option so that the help message for the
* subcommand gets printed
*/
argv.unshift("-h");
}

switch (command) {
case "retry":
return retry(argv);
default:
throw new Error(`Invalid command: ${command}`);
}
}

main()
.then(() => process.exit(0))
.catch((err) => {
console.error(`Failed to run CLI: ${err}`);
return process.exit(1);
})
// eslint-disable-next-line @typescript-eslint/no-misused-promises
.finally(() => queueManager.close());
4 changes: 4 additions & 0 deletions clis/blob-propagation-jobs-cli/src/queue-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { QueueManager } from "./QueueManager";
import { connection } from "./common";

export const queueManager = new QueueManager(connection);
Loading

0 comments on commit f9975b5

Please sign in to comment.