Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/env-redis-uri' into badges
Browse files Browse the repository at this point in the history
  • Loading branch information
0xGabi committed Feb 18, 2024
2 parents ac91555 + 3a23f19 commit b2adb07
Show file tree
Hide file tree
Showing 12 changed files with 19 additions and 78 deletions.
5 changes: 1 addition & 4 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ GOOGLE_STORAGE_ENABLED=false
POSTGRES_STORAGE_ENABLED=true
SWARM_STORAGE_ENABLED=false

REDIS_QUEUE_HOST=localhost
REDIS_QUEUE_PORT=6379
REDIS_QUEUE_PASSWORD=s3cr3t
REDIS_QUEUE_USERNAME=blobscan
REDIS_URI=redis://blobscan:s3cr3t@localhost:6379/1

# PRISMA_BATCH_OPERATIONS_MAX_SIZE=

Expand Down
3 changes: 1 addition & 2 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@ GOOGLE_STORAGE_ENABLED=true
# Blob Propagator

BLOB_PROPAGATOR_ENABLED=false
REDIS_QUEUE_HOST=localhost
REDIS_QUEUE_PORT=6379
REDIS_URI=redis://localhost:6379/1
11 changes: 7 additions & 4 deletions clis/blob-propagation-jobs-cli/src/Context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@ export class Context {
#finalizerQueue: Queue<BlobPropagationJobData>;
#propagatorFlowProducer: FlowProducer;

constructor(storages: $Enums.BlobStorage[], connection: ConnectionOptions) {
constructor(storages: $Enums.BlobStorage[], redisUri: string) {
const uniqueStorageNames = [...new Set(storages)];

this.#storageQueues = uniqueStorageNames.map(
(storageName) =>
// TODO: Type of argument is now string
new Queue(STORAGE_WORKER_NAMES[storageName], {
connection,
redisUri,
})
);

// TODO: Type of argument is now string
this.#finalizerQueue = new Queue(FINALIZER_WORKER_NAME, {
connection,
redisUri,
});

// TODO: Type of argument is now string
this.#propagatorFlowProducer = new FlowProducer({
connection,
redisUri,
});
}

Expand Down
3 changes: 1 addition & 2 deletions clis/blob-propagation-jobs-cli/src/context-instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { BlobStorage } from "@blobscan/db";

import { Context } from "./Context";
import { env } from "./env";
import { redisConnection } from "./utils";

const availableStorages: $Enums.BlobStorage[] = [];

Expand All @@ -19,4 +18,4 @@ if (env.SWARM_STORAGE_ENABLED) {
availableStorages.push(BlobStorage.SWARM);
}

export const context = new Context(availableStorages, redisConnection);
export const context = new Context(availableStorages, env.REDIS_URI);
5 changes: 1 addition & 4 deletions clis/blob-propagation-jobs-cli/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ import {
export const env = createEnv({
envOptions: {
server: {
REDIS_QUEUE_HOST: z.string().default("localhost"),
REDIS_QUEUE_PORT: z.coerce.number().default(6379),
REDIS_QUEUE_PASSWORD: z.string().optional(),
REDIS_QUEUE_USERNAME: z.string().optional(),
REDIS_URI: z.string().default("redis://localhost:6379/1"),

POSTGRES_STORAGE_ENABLED: booleanSchema.default("false"),
GOOGLE_STORAGE_ENABLED: booleanSchema.default("false"),
Expand Down
7 changes: 0 additions & 7 deletions clis/blob-propagation-jobs-cli/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ import { env } from "./env";

export type Command<R = unknown> = (argv?: string[]) => Promise<R>;

export const redisConnection = {
host: env.REDIS_QUEUE_HOST,
port: Number(env.REDIS_QUEUE_PORT),
password: env.REDIS_QUEUE_PASSWORD,
username: env.REDIS_QUEUE_USERNAME,
};

export const helpOptionDef: commandLineUsage.OptionDefinition = {
name: "help",
alias: "h",
Expand Down
3 changes: 1 addition & 2 deletions clis/blob-propagation-jobs-cli/test/commands/retry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import type { BlobPropagationWorker } from "@blobscan/blob-propagator";

import { retry, retryCommandUsage } from "../../src/commands";
import { context } from "../../src/context-instance";
import { redisConnection } from "../../src/utils";
import {
processJobsManually,
argHelpTest,
Expand All @@ -27,7 +26,7 @@ describe("Retry command", () => {

storageWorkers = queues.map(
(queue) =>
new Worker(queue.name, undefined, { connection: redisConnection })
new Worker(queue.name, undefined, { redisUri: env.REDIS_URI })
);

await setUpJobs(queues, jobVersionedHashes);
Expand Down
9 changes: 1 addition & 8 deletions packages/blob-propagator/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,12 @@ import { blobFileManager } from "./src/blob-file-manager";
import { env } from "./src/env";
import { FINALIZER_WORKER_NAME, STORAGE_WORKER_NAMES } from "./src/utils";

const connection: RedisOptions = {
host: env.REDIS_QUEUE_HOST,
port: env.REDIS_QUEUE_PORT,
password: env.REDIS_QUEUE_PASSWORD,
username: env.REDIS_QUEUE_USERNAME,
};

afterAll(async () => {
const queues = [
STORAGE_WORKER_NAMES["GOOGLE"],
STORAGE_WORKER_NAMES["POSTGRES"],
FINALIZER_WORKER_NAME,
].map((queueName) => new Queue(queueName, { connection }));
].map((queueName) => new Queue(queueName, { env.REDIS_URI }));

let teardownPromise = Promise.all([
...queues.map((q) => q.obliterate({ force: true })),
Expand Down
26 changes: 2 additions & 24 deletions packages/blob-propagator/src/BlobPropagator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,6 @@ const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
removeOnComplete: { count: 1000 },
};

function isRedisOptions(
connection: ConnectionOptions
): connection is RedisOptions {
return (
"host" in connection &&
"port" in connection &&
"password" in connection &&
"username" in connection
);
}

export class BlobPropagator {
protected blobPropagationFlowProducer: FlowProducer;
protected finalizerWorker: Worker;
Expand Down Expand Up @@ -150,7 +139,7 @@ export class BlobPropagator {
return [...Object.values(this.storageWorkers), this.finalizerWorker];
}

#createBlobPropagationFlowProducer(connection?: ConnectionOptions) {
#createBlobPropagationFlowProducer(redisUri?: string) {
/*
* Instantiating a new `FlowProducer` appears to create two separate `RedisConnection` instances.
* This leads to an issue where one instance remains active, or "dangling", after the `FlowProducer` has been closed.
Expand All @@ -159,19 +148,8 @@ export class BlobPropagator {
*
* See: https://github.com/taskforcesh/bullmq/blob/d7cf6ea60830b69b636648238a51e5f981616d02/src/classes/flow-producer.ts#L111
*/
const redisConnection =
connection && isRedisOptions(connection)
? new IORedis({
host: connection.host,
port: connection.port,
username: connection.username,
password: connection.password,
maxRetriesPerRequest: null,
})
: connection;

const blobPropagationFlowProducer = new FlowProducer({
connection: redisConnection,
connection: new IORedis(redisUri),
});

blobPropagationFlowProducer.on("error", (err) => {
Expand Down
9 changes: 1 addition & 8 deletions packages/blob-propagator/src/blob-propagator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,9 @@ function createBlobPropagator() {
availableStorages.push(BlobStorage.SWARM);
}

const connection: RedisOptions = {
host: env.REDIS_QUEUE_HOST,
port: env.REDIS_QUEUE_PORT,
password: env.REDIS_QUEUE_PASSWORD,
username: env.REDIS_QUEUE_USERNAME,
};

return new BlobPropagator(availableStorages, {
workerOptions: {
connection,
env.REDIS_URI,
},
});
}
Expand Down
11 changes: 2 additions & 9 deletions packages/blob-propagator/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ import {
export const env = createEnv({
envOptions: {
server: {
REDIS_QUEUE_HOST: z.string().default("localhost"),
REDIS_QUEUE_PORT: z.coerce.number().default(6379),
REDIS_QUEUE_PASSWORD: z.string().optional(),
REDIS_QUEUE_USERNAME: z.string().optional(),
REDIS_URI: z.string().default("redis://localhost:6379"),

POSTGRES_STORAGE_ENABLED: booleanSchema.default("false"),
GOOGLE_STORAGE_ENABLED: booleanSchema.default("false"),
Expand All @@ -28,11 +25,7 @@ export const env = createEnv({
console.log(
`Blob propagator configuration: enabled=${
env.BLOB_PROPAGATOR_ENABLED
} redisQueueHost=${env.REDIS_QUEUE_HOST}, redisQueuePort=${
env.REDIS_QUEUE_PORT
} redisQueuePassword=${maskSensitiveData(
env.REDIS_QUEUE_PASSWORD
)}, redisQueueUsername=${env.REDIS_QUEUE_USERNAME}`
} redisUri=${env.REDIS_URI}` // TODO: mask password
);
},
});
Expand Down
5 changes: 1 addition & 4 deletions turbo.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@
"OTEL_EXPORTER_OTLP_PROTOCOL",
"OTEL_EXPORTER_OTLP_ENDPOINT",
"POSTGRES_STORAGE_ENABLED",
"REDIS_QUEUE_HOST",
"REDIS_QUEUE_PASSWORD",
"REDIS_QUEUE_PORT",
"REDIS_QUEUE_USERNAME",
"REDIS_URI",
"SECRET_KEY",
"SENTRY_DSN_API",
"SKIP_ENV_VALIDATION",
Expand Down

0 comments on commit b2adb07

Please sign in to comment.