From dbe0fd04d5267854612973a67d6fcea68b943dbb Mon Sep 17 00:00:00 2001 From: PJColombo Date: Thu, 15 Feb 2024 15:12:09 +0100 Subject: [PATCH 1/4] fix(db): default to zero when no data has been aggregated when executing overall stats queries --- packages/db/prisma/extensions/stats.ts | 164 ++++++++++++++++--------- packages/db/prisma/utils/dates.ts | 21 ---- packages/db/prisma/utils/sql.ts | 42 +++++++ 3 files changed, 147 insertions(+), 80 deletions(-) create mode 100644 packages/db/prisma/utils/sql.ts diff --git a/packages/db/prisma/extensions/stats.ts b/packages/db/prisma/extensions/stats.ts index f9b70fb10..cfdfab96e 100644 --- a/packages/db/prisma/extensions/stats.ts +++ b/packages/db/prisma/extensions/stats.ts @@ -4,7 +4,11 @@ import { curryPrismaExtensionFnSpan } from "../instrumentation"; import type { BlockNumberRange } from "../types"; import type { RawDatePeriod } from "../utils/dates"; import { normalizeDailyDatePeriod } from "../utils/dates"; -import { buildRawWhereClause } from "../utils/dates"; +import { + buildAvgUpdateExpression, + buildRawWhereClause, + updatedAtField, +} from "../utils/sql"; const startExtensionFnSpan = curryPrismaExtensionFnSpan("stats"); @@ -90,21 +94,27 @@ export const statsExtension = Prisma.defineExtension((prisma) => `; }, increment({ from, to }: BlockNumberRange) { + const statsTableAlias = Prisma.sql`bos`; + const totalBlobsField = Prisma.sql`total_blobs`; + const totalUniqueBlobsField = Prisma.sql`total_unique_blobs`; + const totalBlobSizeField = Prisma.sql`total_blob_size`; + const avgBlobSizeField = Prisma.sql`avg_blob_size`; + return prisma.$executeRaw` - INSERT INTO blob_overall_stats AS st ( + INSERT INTO blob_overall_stats AS ${statsTableAlias} ( id, - total_blobs, - total_unique_blobs, - total_blob_size, - avg_blob_size, - updated_at + ${totalBlobsField}, + ${totalUniqueBlobsField}, + ${totalBlobSizeField}, + ${avgBlobSizeField}, + ${updatedAtField} ) SELECT 1 AS id, - COUNT(btx.blob_hash)::INT AS total_blobs, - COUNT(DISTINCT CASE WHEN b.first_block_number BETWEEN ${from} AND ${to} THEN b.versioned_hash END)::INT AS total_unique_blobs, - SUM(b.size) AS total_blob_size, - AVG(b.size)::FLOAT AS avg_blob_size, + COALESCE(COUNT(btx.blob_hash)::INT, 0) AS ${totalBlobsField}, + COALESCE(COUNT(DISTINCT CASE WHEN b.first_block_number BETWEEN ${from} AND ${to} THEN b.versioned_hash END)::INT, 0) AS total_unique_blobs, + COALESCE(SUM(b.size), 0) AS ${totalBlobSizeField}, + COALESCE(AVG(b.size)::FLOAT, 0) AS avg_blob_size, NOW() AS updated_at FROM blob b JOIN blobs_on_transactions btx ON btx.blob_hash = b.versioned_hash @@ -112,11 +122,15 @@ export const statsExtension = Prisma.defineExtension((prisma) => JOIN "block" bck ON bck."number" = tx.block_number WHERE bck."number" BETWEEN ${from} AND ${to} ON CONFLICT (id) DO UPDATE SET - total_blobs = st.total_blobs + EXCLUDED.total_blobs, - total_unique_blobs = st.total_unique_blobs + EXCLUDED.total_unique_blobs, - total_blob_size = st.total_blob_size + EXCLUDED.total_blob_size, - avg_blob_size = st.avg_blob_size + ((EXCLUDED.avg_blob_size - st.avg_blob_size) / (st.total_blobs + EXCLUDED.total_blobs)), - updated_at = EXCLUDED.updated_at + ${totalBlobsField} = ${Prisma.sql`${statsTableAlias}.${totalBlobsField}`} + ${Prisma.sql`EXCLUDED.${totalBlobsField}`}, + ${totalUniqueBlobsField} = ${Prisma.sql`${statsTableAlias}.${totalUniqueBlobsField}`} + ${Prisma.sql`EXCLUDED.${totalUniqueBlobsField}`}, + ${totalBlobSizeField} = ${Prisma.sql`${statsTableAlias}.${totalBlobSizeField}`} + ${Prisma.sql`EXCLUDED.${totalBlobSizeField}`}, + ${avgBlobSizeField} = ${buildAvgUpdateExpression( + statsTableAlias, + totalBlobsField, + avgBlobSizeField + )}, + ${updatedAtField} = ${Prisma.sql`EXCLUDED.${updatedAtField}`} `; }, }, @@ -212,42 +226,64 @@ export const statsExtension = Prisma.defineExtension((prisma) => `; }, increment({ from, to }: BlockNumberRange) { + const statsTableAlias = Prisma.sql`bos`; + const totalBlocksField = Prisma.sql`total_blocks`; + const totalBlobGasUsedField = Prisma.sql`total_blob_gas_used`; + const totalBlobAsCalldataGasUsedField = Prisma.sql`total_blob_as_calldata_gas_used`; + const totalBlobFeeField = Prisma.sql`total_blob_fee`; + const totalBlobAsCalldataFeeField = Prisma.sql`total_blob_as_calldata_fee`; + const avgBlobFeeField = Prisma.sql`avg_blob_fee`; + const avgBlobAsCalldataFeeField = Prisma.sql`avg_blob_as_calldata_fee`; + const avgBlobGasPriceField = Prisma.sql`avg_blob_gas_price`; + return prisma.$executeRaw` - INSERT INTO block_overall_stats AS st ( + INSERT INTO block_overall_stats AS ${statsTableAlias} ( id, - total_blocks, - total_blob_gas_used, - total_blob_as_calldata_gas_used, - total_blob_fee, - total_blob_as_calldata_fee, - avg_blob_fee, - avg_blob_as_calldata_fee, - avg_blob_gas_price, + ${totalBlocksField}, + ${totalBlobGasUsedField}, + ${totalBlobAsCalldataGasUsedField}, + ${totalBlobFeeField}, + ${totalBlobAsCalldataFeeField}, + ${avgBlobFeeField}, + ${avgBlobAsCalldataFeeField}, + ${avgBlobGasPriceField}, updated_at ) SELECT 1 as id, - COUNT("hash")::INT as total_blocks, - SUM(blob_gas_used)::DECIMAL(50,0) as total_blob_gas_used, - SUM(blob_as_calldata_gas_used)::DECIMAL(50,0) as total_blob_as_calldata_gas_used, - SUM(blob_gas_used * blob_gas_price)::DECIMAL(50,0) as total_blob_fee, - SUM(blob_as_calldata_gas_used * blob_gas_price)::DECIMAL(50,0) as total_blob_as_calldata_fee, - AVG(blob_gas_used * blob_gas_price)::FLOAT as avg_blob_fee, - AVG(blob_as_calldata_gas_used * blob_gas_price)::FLOAT as avg_blob_as_calldata_fee, - AVG(blob_gas_price)::FLOAT as avg_blob_gas_price, - NOW() as updated_at + COALESCE(COUNT("hash")::INT, 0) as ${totalBlocksField}, + COALESCE(SUM(blob_gas_used)::DECIMAL(50,0), 0) as ${totalBlobGasUsedField}, + COALESCE(SUM(blob_as_calldata_gas_used)::DECIMAL(50,0), 0) as ${totalBlobAsCalldataGasUsedField}, + COALESCE(SUM(blob_gas_used * blob_gas_price)::DECIMAL(50,0), 0) as ${totalBlobFeeField}, + COALESCE(SUM(blob_as_calldata_gas_used * blob_gas_price)::DECIMAL(50,0), 0) as ${totalBlobAsCalldataFeeField}, + COALESCE(AVG(blob_gas_used * blob_gas_price)::FLOAT, 0) as ${avgBlobFeeField}, + COALESCE(AVG(blob_as_calldata_gas_used * blob_gas_price)::FLOAT, 0) as ${avgBlobAsCalldataFeeField}, + COALESCE(AVG(blob_gas_price)::FLOAT, 0) as ${avgBlobGasPriceField}, + NOW() as ${updatedAtField} FROM "block" WHERE "number" BETWEEN ${from} AND ${to} ON CONFLICT (id) DO UPDATE SET - total_blocks = st.total_blocks + EXCLUDED.total_blocks, - total_blob_gas_used = st.total_blob_gas_used + EXCLUDED.total_blob_gas_used, - total_blob_as_calldata_gas_used = st.total_blob_as_calldata_gas_used + EXCLUDED.total_blob_as_calldata_gas_used, - total_blob_fee = st.total_blob_fee + EXCLUDED.total_blob_fee, - total_blob_as_calldata_fee = st.total_blob_as_calldata_fee + EXCLUDED.total_blob_as_calldata_fee, - avg_blob_fee = st.avg_blob_fee + ((EXCLUDED.avg_blob_fee - st.avg_blob_fee) / (st.total_blocks + EXCLUDED.total_blocks)), - avg_blob_as_calldata_fee = st.avg_blob_as_calldata_fee + ((EXCLUDED.avg_blob_as_calldata_fee - st.avg_blob_as_calldata_fee) / (st.total_blocks + EXCLUDED.total_blocks)), - avg_blob_gas_price = st.avg_blob_gas_price + ((EXCLUDED.avg_blob_gas_price - st.avg_blob_gas_price) / (st.total_blocks + EXCLUDED.total_blocks)), - updated_at = EXCLUDED.updated_at + ${totalBlocksField} = ${Prisma.sql`${statsTableAlias}.${totalBlocksField}`} + ${Prisma.sql`EXCLUDED.${totalBlocksField}`}, + ${totalBlobGasUsedField} = ${Prisma.sql`${statsTableAlias}.${totalBlobGasUsedField}`} + ${Prisma.sql`EXCLUDED.${totalBlobGasUsedField}`}, + ${totalBlobAsCalldataGasUsedField} = ${Prisma.sql`${statsTableAlias}.${totalBlobAsCalldataGasUsedField}`} + ${Prisma.sql`EXCLUDED.${totalBlobAsCalldataGasUsedField}`}, + ${totalBlobFeeField} = ${Prisma.sql`${statsTableAlias}.${totalBlobFeeField}`} + ${Prisma.sql`EXCLUDED.${totalBlobFeeField}`}, + ${totalBlobAsCalldataFeeField} = ${Prisma.sql`${statsTableAlias}.${totalBlobAsCalldataFeeField}`} + ${Prisma.sql`EXCLUDED.${totalBlobAsCalldataFeeField}`}, + ${avgBlobFeeField} = ${buildAvgUpdateExpression( + statsTableAlias, + totalBlocksField, + avgBlobFeeField + )}, + ${avgBlobAsCalldataFeeField} = ${buildAvgUpdateExpression( + statsTableAlias, + totalBlocksField, + avgBlobAsCalldataFeeField + )}, + ${avgBlobGasPriceField} = ${buildAvgUpdateExpression( + statsTableAlias, + totalBlocksField, + avgBlobGasPriceField + )}, + ${updatedAtField} = ${Prisma.sql`EXCLUDED.${updatedAtField}`} `; }, }, @@ -321,30 +357,40 @@ export const statsExtension = Prisma.defineExtension((prisma) => `; }, increment({ from, to }: BlockNumberRange) { + const statsTableAlias = Prisma.sql`tos`; + const totalTransactionsField = Prisma.sql`total_transactions`; + const totalUniqueReceiversField = Prisma.sql`total_unique_receivers`; + const totalUniqueSendersField = Prisma.sql`total_unique_senders`; + const avgMaxBlobGasFeeField = Prisma.sql`avg_max_blob_gas_fee`; + return prisma.$executeRaw` - INSERT INTO transaction_overall_stats AS st ( + INSERT INTO transaction_overall_stats AS ${statsTableAlias} ( id, - total_transactions, - total_unique_receivers, - total_unique_senders, - avg_max_blob_gas_fee, - updated_at + ${totalTransactionsField}, + ${totalUniqueReceiversField}, + ${totalUniqueSendersField}, + ${avgMaxBlobGasFeeField}, + ${updatedAtField} ) SELECT 1 AS id, - COUNT("hash")::INT AS total_transactions, - COUNT(DISTINCT CASE WHEN taddr.first_block_number_as_receiver BETWEEN ${from} AND ${to} THEN taddr.address END)::INT AS total_unique_receivers, - COUNT(DISTINCT CASE WHEN faddr.first_block_number_as_sender BETWEEN ${from} AND ${to} THEN faddr.address END )::INT AS total_unique_senders, - AVG(max_fee_per_blob_gas)::FLOAT AS avg_max_blob_gas_fee, - NOW() AS updated_at + COALESCE(COUNT("hash")::INT, 0) AS ${totalTransactionsField}, + COALESCE(COUNT(DISTINCT CASE WHEN taddr.first_block_number_as_receiver BETWEEN ${from} AND ${to} THEN taddr.address END)::INT, 0) AS ${totalUniqueReceiversField}, + COALESCE(COUNT(DISTINCT CASE WHEN faddr.first_block_number_as_sender BETWEEN ${from} AND ${to} THEN faddr.address END )::INT, 0) AS ${totalUniqueSendersField}, + COALESCE(AVG(max_fee_per_blob_gas)::FLOAT, 0) AS ${avgMaxBlobGasFeeField}, + NOW() AS ${updatedAtField} FROM "transaction" tx JOIN "address" faddr ON faddr.address = tx.from_id JOIN "address" taddr ON taddr.address = tx.to_id WHERE tx.block_number BETWEEN ${from} AND ${to} ON CONFLICT (id) DO UPDATE SET - total_transactions = st.total_transactions + EXCLUDED.total_transactions, - total_unique_receivers = st.total_unique_receivers + EXCLUDED.total_unique_receivers, - total_unique_senders = st.total_unique_senders + EXCLUDED.total_unique_senders, - avg_max_blob_gas_fee = st.avg_max_blob_gas_fee + ((EXCLUDED.avg_max_blob_gas_fee - st.avg_max_blob_gas_fee) / (st.total_transactions + EXCLUDED.total_transactions)), - updated_at = EXCLUDED.updated_at + ${totalTransactionsField} = ${Prisma.sql`${statsTableAlias}.${totalTransactionsField}`} + ${Prisma.sql`EXCLUDED.${totalTransactionsField}`}, + ${totalUniqueReceiversField} = ${Prisma.sql`${statsTableAlias}.${totalUniqueReceiversField}`} + ${Prisma.sql`EXCLUDED.${totalUniqueReceiversField}`}, + ${totalUniqueSendersField} = ${Prisma.sql`${statsTableAlias}.${totalUniqueSendersField}`} + ${Prisma.sql`EXCLUDED.${totalUniqueSendersField}`}, + ${avgMaxBlobGasFeeField} = ${buildAvgUpdateExpression( + statsTableAlias, + totalTransactionsField, + avgMaxBlobGasFeeField + )}, + ${updatedAtField} = ${Prisma.sql`EXCLUDED.${updatedAtField}`} `; }, }, diff --git a/packages/db/prisma/utils/dates.ts b/packages/db/prisma/utils/dates.ts index d5b68c07c..297a414c0 100644 --- a/packages/db/prisma/utils/dates.ts +++ b/packages/db/prisma/utils/dates.ts @@ -1,5 +1,3 @@ -import { Prisma } from "@prisma/client"; - import dayjs from "@blobscan/dayjs"; export type RawDate = string | Date | dayjs.Dayjs; @@ -42,22 +40,3 @@ export function normalizeDailyDatePeriod( return normalizedDatePeriod; } - -export function buildRawWhereClause( - dateField: Prisma.Sql, - { from, to }: DatePeriod -): Prisma.Sql { - if (from && to) { - return Prisma.sql`WHERE ${dateField} BETWEEN ${from}::TIMESTAMP AND ${to}::TIMESTAMP`; - } else if (from) { - return Prisma.sql`WHERE ${dateField} >= ${from}::TIMESTAMP`; - } else if (to) { - return Prisma.sql`WHERE ${dateField} < ${to}::TIMESTAMP`; - } - - return Prisma.empty; -} - -export function buildWhereClause(dateField: string, { from, to }: DatePeriod) { - return { [dateField]: { gte: from, lte: to } }; -} diff --git a/packages/db/prisma/utils/sql.ts b/packages/db/prisma/utils/sql.ts new file mode 100644 index 000000000..805e6c616 --- /dev/null +++ b/packages/db/prisma/utils/sql.ts @@ -0,0 +1,42 @@ +import { Prisma } from "@prisma/client"; + +import { DatePeriod } from "./dates"; + +export const updatedAtField = Prisma.sql`updated_at`; + +export function buildRawWhereClause( + dateField: Prisma.Sql, + { from, to }: DatePeriod +): Prisma.Sql { + if (from && to) { + return Prisma.sql`WHERE ${dateField} BETWEEN ${from}::TIMESTAMP AND ${to}::TIMESTAMP`; + } else if (from) { + return Prisma.sql`WHERE ${dateField} >= ${from}::TIMESTAMP`; + } else if (to) { + return Prisma.sql`WHERE ${dateField} < ${to}::TIMESTAMP`; + } + + return Prisma.empty; +} + +export function buildWhereClause(dateField: string, { from, to }: DatePeriod) { + return { [dateField]: { gte: from, lte: to } }; +} + +export function buildAvgUpdateExpression( + tableAlias: Prisma.Sql, + totalItemsField: Prisma.Sql, + avgFieldName: Prisma.Sql +) { + const prefixedTotalItemsField = Prisma.sql`${tableAlias}.${Prisma.sql`${totalItemsField}`}`; + const prefixedAvgField = Prisma.sql`${tableAlias}.${Prisma.sql`${avgFieldName}`}`; + const excludedTotalItemsField = Prisma.sql`EXCLUDED.${Prisma.sql`${totalItemsField}`}`; + const excludedAvgField = Prisma.sql`EXCLUDED.${Prisma.sql`${avgFieldName}`}`; + + return Prisma.sql` + CASE + WHEN ${prefixedTotalItemsField} + ${excludedTotalItemsField} = 0 THEN 0 + ELSE ${prefixedAvgField} + ((${excludedAvgField} - ${prefixedAvgField}) / (${prefixedTotalItemsField} + ${excludedTotalItemsField})) + END + `; +} From 399b7c125f264e97b331822dec2bfd05f005b61e Mon Sep 17 00:00:00 2001 From: PJColombo Date: Thu, 15 Feb 2024 18:34:18 +0100 Subject: [PATCH 2/4] fix(stats-aggregator-cli): update overall stats processed blocks batch size + format overall logs --- .../src/commands/overall.ts | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/clis/stats-aggregation-cli/src/commands/overall.ts b/clis/stats-aggregation-cli/src/commands/overall.ts index 481d2d385..ff79ae4d9 100644 --- a/clis/stats-aggregation-cli/src/commands/overall.ts +++ b/clis/stats-aggregation-cli/src/commands/overall.ts @@ -7,7 +7,7 @@ import { prisma } from "@blobscan/db"; import { env } from "../env"; import { deleteOptionDef, helpOptionDef } from "../utils"; -const DEFAULT_UNPROCESSED_BLOCKS_BATCH_SIZE = 100_000; +const PRISMA_BLOCKS_BATCH_SIZE = 2_000_000; type BeaconFinalizedBlockResponse = { data: { @@ -44,8 +44,8 @@ const overallCommandOptDefs: commandLineUsage.OptionDefinition[] = [ name: "batchSize", alias: "s", typeLabel: "{underline size}", - description: `Number of blocks to process in a single batch. It defaults to ${DEFAULT_UNPROCESSED_BLOCKS_BATCH_SIZE}`, - defaultValue: DEFAULT_UNPROCESSED_BLOCKS_BATCH_SIZE, + description: `Number of blocks to process in a single batch. It defaults to ${PRISMA_BLOCKS_BATCH_SIZE}`, + defaultValue: PRISMA_BLOCKS_BATCH_SIZE, type: Number, }, ]; @@ -110,7 +110,7 @@ async function deleteOverallStats() { async function incrementOverallStats({ targetBlockId, - batchSize = DEFAULT_UNPROCESSED_BLOCKS_BATCH_SIZE, + batchSize = PRISMA_BLOCKS_BATCH_SIZE, }: { targetBlockId: BlockId; batchSize?: number; @@ -157,7 +157,7 @@ async function incrementOverallStats({ if (fromBlockNumber > toBlockNumber) { console.log( - `Skipping stats aggregation as there are no new finalized blocks (Latest processed finalized block: ${latestProcessedFinalizedBlock})` + `Skipping stats aggregation as there are no new finalized blocks (Last processed finalized block: ${latestProcessedFinalizedBlock.toLocaleString()})` ); return; @@ -193,13 +193,17 @@ async function incrementOverallStats({ console.log( `Batch ${ i + 1 - }/${batches} processed. Data aggregated from block ${batchFrom} to ${batchTo}` + }/${batches} processed. Data aggregated from block ${batchFrom.toLocaleString()} to ${batchTo.toLocaleString()} (${( + batchTo - batchFrom + ).toLocaleString()} blocks processed).` ); } } console.log( - `Overall stats increment operation executed: Data aggregated from block ${fromBlockNumber} to ${toBlockNumber}.` + `Overall stats increment operation executed: Data aggregated from block ${fromBlockNumber.toLocaleString()} to ${toBlockNumber.toLocaleString()} (${( + toBlockNumber - fromBlockNumber + ).toLocaleString()} blocks processed).` ); } From 8fdfbe9fb9904deaf3a0dd7f75dd1f759d3e5db7 Mon Sep 17 00:00:00 2001 From: PJColombo Date: Thu, 15 Feb 2024 18:59:13 +0100 Subject: [PATCH 3/4] refactor(db): abstract coalesce sql expression into a function --- packages/db/prisma/extensions/stats.ts | 59 +++++++++++++++++++------- packages/db/prisma/utils/sql.ts | 4 ++ 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/packages/db/prisma/extensions/stats.ts b/packages/db/prisma/extensions/stats.ts index cfdfab96e..1f07fab7e 100644 --- a/packages/db/prisma/extensions/stats.ts +++ b/packages/db/prisma/extensions/stats.ts @@ -7,6 +7,7 @@ import { normalizeDailyDatePeriod } from "../utils/dates"; import { buildAvgUpdateExpression, buildRawWhereClause, + coalesceToZero, updatedAtField, } from "../utils/sql"; @@ -111,10 +112,14 @@ export const statsExtension = Prisma.defineExtension((prisma) => ) SELECT 1 AS id, - COALESCE(COUNT(btx.blob_hash)::INT, 0) AS ${totalBlobsField}, - COALESCE(COUNT(DISTINCT CASE WHEN b.first_block_number BETWEEN ${from} AND ${to} THEN b.versioned_hash END)::INT, 0) AS total_unique_blobs, - COALESCE(SUM(b.size), 0) AS ${totalBlobSizeField}, - COALESCE(AVG(b.size)::FLOAT, 0) AS avg_blob_size, + ${coalesceToZero( + "COUNT(btx.blob_hash)::INT" + )} AS ${totalBlobsField}, + ${coalesceToZero( + `COUNT(DISTINCT CASE WHEN b.first_block_number BETWEEN ${from} AND ${to} THEN b.versioned_hash END)::INT` + )} AS total_unique_blobs, + ${coalesceToZero("SUM(b.size)")} AS ${totalBlobSizeField}, + ${coalesceToZero("AVG(b.size)::FLOAT")} AS avg_blob_size, NOW() AS updated_at FROM blob b JOIN blobs_on_transactions btx ON btx.blob_hash = b.versioned_hash @@ -251,14 +256,28 @@ export const statsExtension = Prisma.defineExtension((prisma) => ) SELECT 1 as id, - COALESCE(COUNT("hash")::INT, 0) as ${totalBlocksField}, - COALESCE(SUM(blob_gas_used)::DECIMAL(50,0), 0) as ${totalBlobGasUsedField}, - COALESCE(SUM(blob_as_calldata_gas_used)::DECIMAL(50,0), 0) as ${totalBlobAsCalldataGasUsedField}, - COALESCE(SUM(blob_gas_used * blob_gas_price)::DECIMAL(50,0), 0) as ${totalBlobFeeField}, - COALESCE(SUM(blob_as_calldata_gas_used * blob_gas_price)::DECIMAL(50,0), 0) as ${totalBlobAsCalldataFeeField}, - COALESCE(AVG(blob_gas_used * blob_gas_price)::FLOAT, 0) as ${avgBlobFeeField}, - COALESCE(AVG(blob_as_calldata_gas_used * blob_gas_price)::FLOAT, 0) as ${avgBlobAsCalldataFeeField}, - COALESCE(AVG(blob_gas_price)::FLOAT, 0) as ${avgBlobGasPriceField}, + ${coalesceToZero(`COUNT("hash")::INT`)} as ${totalBlocksField}, + ${coalesceToZero( + `SUM(blob_gas_used)::DECIMAL(50,0)` + )} as ${totalBlobGasUsedField}, + ${coalesceToZero( + `SUM(blob_as_calldata_gas_used)::DECIMAL(50,0)` + )} as ${totalBlobAsCalldataGasUsedField}, + ${coalesceToZero( + `SUM(blob_gas_used * blob_gas_price)::DECIMAL(50,0)` + )} as ${totalBlobFeeField}, + ${coalesceToZero( + `SUM(blob_as_calldata_gas_used * blob_gas_price)::DECIMAL(50,0)` + )} as ${totalBlobAsCalldataFeeField}, + ${coalesceToZero( + `AVG(blob_gas_used * blob_gas_price)::FLOAT` + )} as ${avgBlobFeeField}, + ${coalesceToZero( + `AVG(blob_as_calldata_gas_used * blob_gas_price)::FLOAT` + )} as ${avgBlobAsCalldataFeeField}, + ${coalesceToZero( + `AVG(blob_gas_price)::FLOAT` + )} as ${avgBlobGasPriceField}, NOW() as ${updatedAtField} FROM "block" WHERE "number" BETWEEN ${from} AND ${to} @@ -374,10 +393,18 @@ export const statsExtension = Prisma.defineExtension((prisma) => ) SELECT 1 AS id, - COALESCE(COUNT("hash")::INT, 0) AS ${totalTransactionsField}, - COALESCE(COUNT(DISTINCT CASE WHEN taddr.first_block_number_as_receiver BETWEEN ${from} AND ${to} THEN taddr.address END)::INT, 0) AS ${totalUniqueReceiversField}, - COALESCE(COUNT(DISTINCT CASE WHEN faddr.first_block_number_as_sender BETWEEN ${from} AND ${to} THEN faddr.address END )::INT, 0) AS ${totalUniqueSendersField}, - COALESCE(AVG(max_fee_per_blob_gas)::FLOAT, 0) AS ${avgMaxBlobGasFeeField}, + ${coalesceToZero( + 'COUNT("hash")::INT' + )} AS ${totalTransactionsField}, + ${coalesceToZero( + `COUNT(DISTINCT CASE WHEN taddr.first_block_number_as_receiver BETWEEN ${from} AND ${to} THEN taddr.address END)::INT` + )} AS ${totalUniqueReceiversField}, + ${coalesceToZero( + `COUNT(DISTINCT CASE WHEN faddr.first_block_number_as_sender BETWEEN ${from} AND ${to} THEN faddr.address END )::INT` + )} AS ${totalUniqueSendersField}, + ${coalesceToZero( + "AVG(max_fee_per_blob_gas)::FLOAT" + )} AS ${avgMaxBlobGasFeeField}, NOW() AS ${updatedAtField} FROM "transaction" tx JOIN "address" faddr ON faddr.address = tx.from_id JOIN "address" taddr ON taddr.address = tx.to_id WHERE tx.block_number BETWEEN ${from} AND ${to} diff --git a/packages/db/prisma/utils/sql.ts b/packages/db/prisma/utils/sql.ts index 805e6c616..3f94d73a6 100644 --- a/packages/db/prisma/utils/sql.ts +++ b/packages/db/prisma/utils/sql.ts @@ -40,3 +40,7 @@ export function buildAvgUpdateExpression( END `; } + +export function coalesceToZero(value: string) { + return Prisma.sql`COALESCE(${Prisma.sql([value])}, 0)`; +} From 3a853c1b5fc0e77787962bc4a1ef8aacfa166adc Mon Sep 17 00:00:00 2001 From: 0xGabi Date: Sun, 18 Feb 2024 17:29:49 -0300 Subject: [PATCH 4/4] fix(blob-propagator): resolve types issues by using the connection object required by bullmq --- clis/blob-propagation-jobs-cli/src/Context.ts | 15 +++++++-------- clis/blob-propagation-jobs-cli/src/utils.ts | 13 +++++++------ .../test/commands/retry.test.ts | 8 +++++++- packages/blob-propagator/setup.ts | 9 +++++++-- packages/blob-propagator/src/BlobPropagator.ts | 12 +++--------- packages/blob-propagator/src/blob-propagator.ts | 6 ++++-- packages/blob-propagator/src/env.ts | 4 ++-- packages/zod/src/create-env.ts | 6 ++++++ 8 files changed, 43 insertions(+), 30 deletions(-) diff --git a/clis/blob-propagation-jobs-cli/src/Context.ts b/clis/blob-propagation-jobs-cli/src/Context.ts index b4443dd4a..1ab704fd9 100644 --- a/clis/blob-propagation-jobs-cli/src/Context.ts +++ b/clis/blob-propagation-jobs-cli/src/Context.ts @@ -1,6 +1,7 @@ -import type { ConnectionOptions, JobType } from "bullmq"; +import type { JobType } from "bullmq"; import { FlowProducer } from "bullmq"; import { Queue } from "bullmq"; +import IORedis from "ioredis"; import type { BlobPropagationJobData } from "@blobscan/blob-propagator"; import { @@ -18,23 +19,21 @@ export class Context { constructor(storages: $Enums.BlobStorage[], redisUri: string) { const uniqueStorageNames = [...new Set(storages)]; + const connection = new IORedis(redisUri, { maxRetriesPerRequest: null }); this.#storageQueues = uniqueStorageNames.map( (storageName) => - // TODO: Type of argument is now string new Queue(STORAGE_WORKER_NAMES[storageName], { - redisUri, + connection, }) ); - // TODO: Type of argument is now string this.#finalizerQueue = new Queue(FINALIZER_WORKER_NAME, { - redisUri, + connection, }); - // TODO: Type of argument is now string this.#propagatorFlowProducer = new FlowProducer({ - redisUri, + connection, }); } @@ -109,7 +108,7 @@ export class Context { } async clearQueues() { - await Promise.all((await this.getJobs()).map(j => j.remove())) + await Promise.all((await this.getJobs()).map((j) => j.remove())); } close() { diff --git a/clis/blob-propagation-jobs-cli/src/utils.ts b/clis/blob-propagation-jobs-cli/src/utils.ts index 2e6be4e54..d2e86dd14 100644 --- a/clis/blob-propagation-jobs-cli/src/utils.ts +++ b/clis/blob-propagation-jobs-cli/src/utils.ts @@ -5,8 +5,6 @@ import { STORAGE_WORKER_NAMES, buildJobId } from "@blobscan/blob-propagator"; import dayjs from "@blobscan/dayjs"; import type { $Enums } from "@blobscan/db"; -import { env } from "./env"; - export type Command = (argv?: string[]) => Promise; export const helpOptionDef: commandLineUsage.OptionDefinition = { @@ -45,7 +43,10 @@ export const blobHashOptionDef: commandLineUsage.OptionDefinition = { multiple: true, }; -export const datePeriodOptionDefs: Record<"to" | "from", commandLineUsage.OptionDefinition> = { +export const datePeriodOptionDefs: Record< + "to" | "from", + commandLineUsage.OptionDefinition +> = { from: { name: "from", alias: "f", @@ -53,14 +54,14 @@ export const datePeriodOptionDefs: Record<"to" | "from", commandLineUsage.Option description: "Date from which execute jobs.", type: String, }, - to: { + to: { name: "to", alias: "t", typeLabel: "{underline to}", description: "Date to which execute jobs.", type: String, - } -} + }, +}; export function normalizeQueueName(input: string) { if (input.toUpperCase() === "FINALIZER") { diff --git a/clis/blob-propagation-jobs-cli/test/commands/retry.test.ts b/clis/blob-propagation-jobs-cli/test/commands/retry.test.ts index 724ee1d16..7ee16b2d7 100644 --- a/clis/blob-propagation-jobs-cli/test/commands/retry.test.ts +++ b/clis/blob-propagation-jobs-cli/test/commands/retry.test.ts @@ -1,11 +1,13 @@ /* eslint-disable @typescript-eslint/no-misused-promises */ import { Worker } from "bullmq"; +import IORedis from "ioredis"; import { afterAll, beforeEach, describe, expect, it } from "vitest"; import type { BlobPropagationWorker } from "@blobscan/blob-propagator"; import { retry, retryCommandUsage } from "../../src/commands"; import { context } from "../../src/context-instance"; +import { env } from "../../src/env"; import { processJobsManually, argHelpTest, @@ -26,7 +28,11 @@ describe("Retry command", () => { storageWorkers = queues.map( (queue) => - new Worker(queue.name, undefined, { redisUri: env.REDIS_URI }) + new Worker(queue.name, undefined, { + connection: new IORedis(env.REDIS_URI, { + maxRetriesPerRequest: null, + }), + }) ); await setUpJobs(queues, jobVersionedHashes); diff --git a/packages/blob-propagator/setup.ts b/packages/blob-propagator/setup.ts index 7e603937b..c4a56f22b 100644 --- a/packages/blob-propagator/setup.ts +++ b/packages/blob-propagator/setup.ts @@ -1,5 +1,5 @@ import { Queue } from "bullmq"; -import type { RedisOptions } from "ioredis"; +import IORedis from "ioredis"; import { afterAll } from "vitest"; import { blobFileManager } from "./src/blob-file-manager"; @@ -11,7 +11,12 @@ afterAll(async () => { STORAGE_WORKER_NAMES["GOOGLE"], STORAGE_WORKER_NAMES["POSTGRES"], FINALIZER_WORKER_NAME, - ].map((queueName) => new Queue(queueName, { env.REDIS_URI })); + ].map( + (queueName) => + new Queue(queueName, { + connection: new IORedis(env.REDIS_URI, { maxRetriesPerRequest: null }), + }) + ); let teardownPromise = Promise.all([ ...queues.map((q) => q.obliterate({ force: true })), diff --git a/packages/blob-propagator/src/BlobPropagator.ts b/packages/blob-propagator/src/BlobPropagator.ts index b166584b8..d0ec86ca6 100644 --- a/packages/blob-propagator/src/BlobPropagator.ts +++ b/packages/blob-propagator/src/BlobPropagator.ts @@ -1,12 +1,6 @@ /* eslint-disable @typescript-eslint/no-misused-promises */ import { FlowProducer, Queue, Worker } from "bullmq"; -import type { - ConnectionOptions, - FlowJob, - WorkerOptions, - RedisOptions, -} from "bullmq"; -import IORedis from "ioredis"; +import type { ConnectionOptions, FlowJob, WorkerOptions } from "bullmq"; import type { $Enums } from "@blobscan/db"; import type { BlobStorage } from "@blobscan/db"; @@ -139,7 +133,7 @@ export class BlobPropagator { return [...Object.values(this.storageWorkers), this.finalizerWorker]; } - #createBlobPropagationFlowProducer(redisUri?: string) { + #createBlobPropagationFlowProducer(connection?: ConnectionOptions) { /* * Instantiating a new `FlowProducer` appears to create two separate `RedisConnection` instances. * This leads to an issue where one instance remains active, or "dangling", after the `FlowProducer` has been closed. @@ -149,7 +143,7 @@ export class BlobPropagator { * See: https://github.com/taskforcesh/bullmq/blob/d7cf6ea60830b69b636648238a51e5f981616d02/src/classes/flow-producer.ts#L111 */ const blobPropagationFlowProducer = new FlowProducer({ - connection: new IORedis(redisUri), + connection, }); blobPropagationFlowProducer.on("error", (err) => { diff --git a/packages/blob-propagator/src/blob-propagator.ts b/packages/blob-propagator/src/blob-propagator.ts index 8243eced5..0ec9ccbef 100644 --- a/packages/blob-propagator/src/blob-propagator.ts +++ b/packages/blob-propagator/src/blob-propagator.ts @@ -1,4 +1,4 @@ -import type { RedisOptions } from "bullmq"; +import IORedis from "ioredis"; import { BlobStorage } from "@blobscan/db"; @@ -20,9 +20,11 @@ function createBlobPropagator() { availableStorages.push(BlobStorage.SWARM); } + const connection = new IORedis(env.REDIS_URI, { maxRetriesPerRequest: null }); + return new BlobPropagator(availableStorages, { workerOptions: { - env.REDIS_URI, + connection, }, }); } diff --git a/packages/blob-propagator/src/env.ts b/packages/blob-propagator/src/env.ts index aa19d132e..dbd119e60 100644 --- a/packages/blob-propagator/src/env.ts +++ b/packages/blob-propagator/src/env.ts @@ -1,7 +1,7 @@ import { booleanSchema, createEnv, - maskSensitiveData, + maskPassword, presetEnvOptions, z, } from "@blobscan/zod"; @@ -25,7 +25,7 @@ export const env = createEnv({ console.log( `Blob propagator configuration: enabled=${ env.BLOB_PROPAGATOR_ENABLED - } redisUri=${env.REDIS_URI}` // TODO: mask password + } redisUri=${maskPassword(env.REDIS_URI)}` ); }, }); diff --git a/packages/zod/src/create-env.ts b/packages/zod/src/create-env.ts index f4e4161d1..98ae38cca 100644 --- a/packages/zod/src/create-env.ts +++ b/packages/zod/src/create-env.ts @@ -37,3 +37,9 @@ export const presetEnvOptions = { export function maskSensitiveData(sensitiveData: string | undefined) { return sensitiveData?.replace(/./g, "*"); } + +export function maskPassword(uri: string | undefined) { + const regex = /:\/\/(.*):.*@/; + + return uri?.replace(regex, (_, username) => `://${username}:****@`); +}