Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/next' into badges
Browse files Browse the repository at this point in the history
  • Loading branch information
0xGabi committed Feb 18, 2024
2 parents b2adb07 + aaa1427 commit 06ed27b
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 117 deletions.
15 changes: 7 additions & 8 deletions clis/blob-propagation-jobs-cli/src/Context.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { ConnectionOptions, JobType } from "bullmq";
import type { JobType } from "bullmq";
import { FlowProducer } from "bullmq";
import { Queue } from "bullmq";
import IORedis from "ioredis";

import type { BlobPropagationJobData } from "@blobscan/blob-propagator";
import {
Expand All @@ -18,23 +19,21 @@ export class Context {

constructor(storages: $Enums.BlobStorage[], redisUri: string) {
const uniqueStorageNames = [...new Set(storages)];
const connection = new IORedis(redisUri, { maxRetriesPerRequest: null });

this.#storageQueues = uniqueStorageNames.map(
(storageName) =>
// TODO: Type of argument is now string
new Queue(STORAGE_WORKER_NAMES[storageName], {
redisUri,
connection,
})
);

// TODO: Type of argument is now string
this.#finalizerQueue = new Queue(FINALIZER_WORKER_NAME, {
redisUri,
connection,
});

// TODO: Type of argument is now string
this.#propagatorFlowProducer = new FlowProducer({
redisUri,
connection,
});
}

Expand Down Expand Up @@ -109,7 +108,7 @@ export class Context {
}

async clearQueues() {
await Promise.all((await this.getJobs()).map(j => j.remove()))
await Promise.all((await this.getJobs()).map((j) => j.remove()));
}

close() {
Expand Down
13 changes: 7 additions & 6 deletions clis/blob-propagation-jobs-cli/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import { STORAGE_WORKER_NAMES, buildJobId } from "@blobscan/blob-propagator";
import dayjs from "@blobscan/dayjs";
import type { $Enums } from "@blobscan/db";

import { env } from "./env";

export type Command<R = unknown> = (argv?: string[]) => Promise<R>;

export const helpOptionDef: commandLineUsage.OptionDefinition = {
Expand Down Expand Up @@ -45,22 +43,25 @@ export const blobHashOptionDef: commandLineUsage.OptionDefinition = {
multiple: true,
};

export const datePeriodOptionDefs: Record<"to" | "from", commandLineUsage.OptionDefinition> = {
export const datePeriodOptionDefs: Record<
"to" | "from",
commandLineUsage.OptionDefinition
> = {
from: {
name: "from",
alias: "f",
typeLabel: "{underline from}",
description: "Date from which execute jobs.",
type: String,
},
to: {
to: {
name: "to",
alias: "t",
typeLabel: "{underline to}",
description: "Date to which execute jobs.",
type: String,
}
}
},
};

export function normalizeQueueName(input: string) {
if (input.toUpperCase() === "FINALIZER") {
Expand Down
8 changes: 7 additions & 1 deletion clis/blob-propagation-jobs-cli/test/commands/retry.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
/* eslint-disable @typescript-eslint/no-misused-promises */
import { Worker } from "bullmq";
import IORedis from "ioredis";
import { afterAll, beforeEach, describe, expect, it } from "vitest";

import type { BlobPropagationWorker } from "@blobscan/blob-propagator";

import { retry, retryCommandUsage } from "../../src/commands";
import { context } from "../../src/context-instance";
import { env } from "../../src/env";
import {
processJobsManually,
argHelpTest,
Expand All @@ -26,7 +28,11 @@ describe("Retry command", () => {

storageWorkers = queues.map(
(queue) =>
new Worker(queue.name, undefined, { redisUri: env.REDIS_URI })
new Worker(queue.name, undefined, {
connection: new IORedis(env.REDIS_URI, {
maxRetriesPerRequest: null,
}),
})
);

await setUpJobs(queues, jobVersionedHashes);
Expand Down
18 changes: 11 additions & 7 deletions clis/stats-aggregation-cli/src/commands/overall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { prisma } from "@blobscan/db";
import { env } from "../env";
import { deleteOptionDef, helpOptionDef } from "../utils";

const DEFAULT_UNPROCESSED_BLOCKS_BATCH_SIZE = 100_000;
const PRISMA_BLOCKS_BATCH_SIZE = 2_000_000;

type BeaconFinalizedBlockResponse = {
data: {
Expand Down Expand Up @@ -44,8 +44,8 @@ const overallCommandOptDefs: commandLineUsage.OptionDefinition[] = [
name: "batchSize",
alias: "s",
typeLabel: "{underline size}",
description: `Number of blocks to process in a single batch. It defaults to ${DEFAULT_UNPROCESSED_BLOCKS_BATCH_SIZE}`,
defaultValue: DEFAULT_UNPROCESSED_BLOCKS_BATCH_SIZE,
description: `Number of blocks to process in a single batch. It defaults to ${PRISMA_BLOCKS_BATCH_SIZE}`,
defaultValue: PRISMA_BLOCKS_BATCH_SIZE,
type: Number,
},
];
Expand Down Expand Up @@ -110,7 +110,7 @@ async function deleteOverallStats() {

async function incrementOverallStats({
targetBlockId,
batchSize = DEFAULT_UNPROCESSED_BLOCKS_BATCH_SIZE,
batchSize = PRISMA_BLOCKS_BATCH_SIZE,
}: {
targetBlockId: BlockId;
batchSize?: number;
Expand Down Expand Up @@ -157,7 +157,7 @@ async function incrementOverallStats({

if (fromBlockNumber > toBlockNumber) {
console.log(
`Skipping stats aggregation as there are no new finalized blocks (Latest processed finalized block: ${latestProcessedFinalizedBlock})`
`Skipping stats aggregation as there are no new finalized blocks (Last processed finalized block: ${latestProcessedFinalizedBlock.toLocaleString()})`
);

return;
Expand Down Expand Up @@ -193,13 +193,17 @@ async function incrementOverallStats({
console.log(
`Batch ${
i + 1
}/${batches} processed. Data aggregated from block ${batchFrom} to ${batchTo}`
}/${batches} processed. Data aggregated from block ${batchFrom.toLocaleString()} to ${batchTo.toLocaleString()} (${(
batchTo - batchFrom
).toLocaleString()} blocks processed).`
);
}
}

console.log(
`Overall stats increment operation executed: Data aggregated from block ${fromBlockNumber} to ${toBlockNumber}.`
`Overall stats increment operation executed: Data aggregated from block ${fromBlockNumber.toLocaleString()} to ${toBlockNumber.toLocaleString()} (${(
toBlockNumber - fromBlockNumber
).toLocaleString()} blocks processed).`
);
}

Expand Down
9 changes: 7 additions & 2 deletions packages/blob-propagator/setup.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Queue } from "bullmq";
import type { RedisOptions } from "ioredis";
import IORedis from "ioredis";
import { afterAll } from "vitest";

import { blobFileManager } from "./src/blob-file-manager";
Expand All @@ -11,7 +11,12 @@ afterAll(async () => {
STORAGE_WORKER_NAMES["GOOGLE"],
STORAGE_WORKER_NAMES["POSTGRES"],
FINALIZER_WORKER_NAME,
].map((queueName) => new Queue(queueName, { env.REDIS_URI }));
].map(
(queueName) =>
new Queue(queueName, {
connection: new IORedis(env.REDIS_URI, { maxRetriesPerRequest: null }),
})
);

let teardownPromise = Promise.all([
...queues.map((q) => q.obliterate({ force: true })),
Expand Down
12 changes: 3 additions & 9 deletions packages/blob-propagator/src/BlobPropagator.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
/* eslint-disable @typescript-eslint/no-misused-promises */
import { FlowProducer, Queue, Worker } from "bullmq";
import type {
ConnectionOptions,
FlowJob,
WorkerOptions,
RedisOptions,
} from "bullmq";
import IORedis from "ioredis";
import type { ConnectionOptions, FlowJob, WorkerOptions } from "bullmq";

import type { $Enums } from "@blobscan/db";
import type { BlobStorage } from "@blobscan/db";
Expand Down Expand Up @@ -139,7 +133,7 @@ export class BlobPropagator {
return [...Object.values(this.storageWorkers), this.finalizerWorker];
}

#createBlobPropagationFlowProducer(redisUri?: string) {
#createBlobPropagationFlowProducer(connection?: ConnectionOptions) {
/*
* Instantiating a new `FlowProducer` appears to create two separate `RedisConnection` instances.
* This leads to an issue where one instance remains active, or "dangling", after the `FlowProducer` has been closed.
Expand All @@ -149,7 +143,7 @@ export class BlobPropagator {
* See: https://github.com/taskforcesh/bullmq/blob/d7cf6ea60830b69b636648238a51e5f981616d02/src/classes/flow-producer.ts#L111
*/
const blobPropagationFlowProducer = new FlowProducer({
connection: new IORedis(redisUri),
connection,
});

blobPropagationFlowProducer.on("error", (err) => {
Expand Down
6 changes: 4 additions & 2 deletions packages/blob-propagator/src/blob-propagator.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { RedisOptions } from "bullmq";
import IORedis from "ioredis";

import { BlobStorage } from "@blobscan/db";

Expand All @@ -20,9 +20,11 @@ function createBlobPropagator() {
availableStorages.push(BlobStorage.SWARM);
}

const connection = new IORedis(env.REDIS_URI, { maxRetriesPerRequest: null });

return new BlobPropagator(availableStorages, {
workerOptions: {
env.REDIS_URI,
connection,
},
});
}
Expand Down
4 changes: 2 additions & 2 deletions packages/blob-propagator/src/env.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {
booleanSchema,
createEnv,
maskSensitiveData,
maskPassword,
presetEnvOptions,
z,
} from "@blobscan/zod";
Expand All @@ -25,7 +25,7 @@ export const env = createEnv({
console.log(
`Blob propagator configuration: enabled=${
env.BLOB_PROPAGATOR_ENABLED
} redisUri=${env.REDIS_URI}` // TODO: mask password
} redisUri=${maskPassword(env.REDIS_URI)}`
);
},
});
Expand Down
Loading

0 comments on commit 06ed27b

Please sign in to comment.