Skip to content

Commit

Permalink
Add metrics on transaction processor (#1316)
Browse files Browse the repository at this point in the history
* add transaction processor metrics

* fix module import
  • Loading branch information
gabrielmatei authored Sep 2, 2024
1 parent 628f435 commit 840a742
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 2 deletions.
30 changes: 30 additions & 0 deletions src/common/metrics/api.metrics.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export class ApiMetricsService {
private static graphqlDurationHistogram: Histogram<string>;
private static currentNonceGauge: Gauge<string>;
private static lastProcessedNonceGauge: Gauge<string>;
private static lastProcessedBatchProcessorNonce: Gauge<string>;
private static lastProcessedTransactionCompletedProcessorNonce: Gauge<string>;

constructor(
private readonly apiConfigService: ApiConfigService,
Expand Down Expand Up @@ -87,6 +89,22 @@ export class ApiMetricsService {
labelNames: ['shardId'],
});
}

if (!ApiMetricsService.lastProcessedBatchProcessorNonce) {
ApiMetricsService.lastProcessedBatchProcessorNonce = new Gauge({
name: 'last_processed_batch_processor_nonce',
help: 'Last processed nonce of the given shard',
labelNames: ['shardId'],
});
}

if (!ApiMetricsService.lastProcessedTransactionCompletedProcessorNonce) {
ApiMetricsService.lastProcessedTransactionCompletedProcessorNonce = new Gauge({
name: 'last_processed_transaction_completed_processor_nonce',
help: 'Last processed nonce of the given shard',
labelNames: ['shardId'],
});
}
}

@OnEvent(MetricsEvents.SetVmQuery)
Expand Down Expand Up @@ -128,6 +146,18 @@ export class ApiMetricsService {
ApiMetricsService.lastProcessedNonceGauge.set({ shardId }, nonce);
}

@OnEvent(MetricsEvents.SetLastProcessedBatchProcessorNonce)
setLastProcessedBatchProcessorNonce(payload: LogMetricsEvent) {
const [shardId, nonce] = payload.args;
ApiMetricsService.lastProcessedBatchProcessorNonce.set({ shardId }, nonce);
}

@OnEvent(MetricsEvents.SetLastProcessedTransactionCompletedProcessorNonce)
setLastProcessedTransactionCompletedProcessorNonce(payload: LogMetricsEvent) {
const [shardId, nonce] = payload.args;
ApiMetricsService.lastProcessedTransactionCompletedProcessorNonce.set({ shardId }, nonce);
}

async getMetrics(): Promise<string> {
const shardIds = await this.protocolService.getShardIds();
if (this.apiConfigService.getIsTransactionProcessorCronActive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import { TransactionBatchStatus } from "src/endpoints/transactions.batch/entitie
import { TransactionsBatchService } from "src/endpoints/transactions.batch/transactions.batch.service";
import { TransactionService } from "src/endpoints/transactions/transaction.service";
import { CacheInfo } from "src/utils/cache.info";
import { LogMetricsEvent } from "src/common/entities/log.metrics.event";
import { EventEmitter2 } from "@nestjs/event-emitter";
import { MetricsEvents } from "src/utils/metrics-events.constants";

@Injectable()
export class BatchTransactionProcessorService {
Expand All @@ -22,7 +25,8 @@ export class BatchTransactionProcessorService {
private readonly cachingService: CacheService,
private readonly transactionsBatchService: TransactionsBatchService,
@Inject('PUBSUB_SERVICE') private clientProxy: ClientProxy,
private readonly transactionService: TransactionService
private readonly transactionService: TransactionService,
private readonly eventEmitter: EventEmitter2,
) {
this.logger = new Logger(BatchTransactionProcessorService.name);
}
Expand Down Expand Up @@ -140,6 +144,13 @@ export class BatchTransactionProcessorService {
return await this.cachingService.getRemote(CacheInfo.TransactionBatchShardNonce(shardId).key);
},
setLastProcessedNonce: async (shardId, nonce) => {
const event = new LogMetricsEvent();
event.args = [shardId, nonce];
this.eventEmitter.emit(
MetricsEvents.SetLastProcessedBatchProcessorNonce,
event
);

await this.cachingService.setRemote(CacheInfo.TransactionBatchShardNonce(shardId).key, nonce, CacheInfo.TransactionBatchShardNonce(shardId).ttl);
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ import { ScheduleModule } from '@nestjs/schedule';
import { ApiConfigModule } from 'src/common/api-config/api.config.module';
import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils';
import { TransactionCompletedService } from './transaction.completed.service';
import { ApiMetricsModule } from 'src/common/metrics/api.metrics.module';
import { GatewayModule } from 'src/common/gateway/gateway.module';
import { ProtocolModule } from 'src/common/protocol/protocol.module';

@Module({
imports: [
ScheduleModule.forRoot(),
ApiConfigModule,
ApiMetricsModule,
GatewayModule,
ProtocolModule,
DynamicModuleUtils.getCacheModule(),
],
providers: [
Expand Down
11 changes: 11 additions & 0 deletions src/crons/transaction.processor/transaction.completed.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import { ClientProxy } from "@nestjs/microservices";
import { Cron } from "@nestjs/schedule";
import { ApiConfigService } from "src/common/api-config/api.config.service";
import { CacheInfo } from "src/utils/cache.info";
import { LogMetricsEvent } from "src/common/entities/log.metrics.event";
import { EventEmitter2 } from "@nestjs/event-emitter";
import { MetricsEvents } from "src/utils/metrics-events.constants";

@Injectable()
export class TransactionCompletedService {
Expand All @@ -16,6 +19,7 @@ export class TransactionCompletedService {
private readonly apiConfigService: ApiConfigService,
private readonly cachingService: CacheService,
@Inject('PUBSUB_SERVICE') private clientProxy: ClientProxy,
private readonly eventEmitter: EventEmitter2,
) { }

@Cron('*/1 * * * * *')
Expand Down Expand Up @@ -59,6 +63,13 @@ export class TransactionCompletedService {
return await this.cachingService.get<number>(CacheInfo.TransactionCompletedShardNonce(shardId).key);
},
setLastProcessedNonce: async (shardId, nonce) => {
const event = new LogMetricsEvent();
event.args = [shardId, nonce];
this.eventEmitter.emit(
MetricsEvents.SetLastProcessedTransactionCompletedProcessorNonce,
event
);

await this.cachingService.set<number>(CacheInfo.TransactionCompletedShardNonce(shardId).key, nonce, CacheInfo.TransactionCompletedShardNonce(shardId).ttl);
},
onMessageLogged: (topic, message) => {
Expand Down
4 changes: 3 additions & 1 deletion src/utils/metrics-events.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@ export enum MetricsEvents {
SetPersistenceDuration = "setPersistenceDuration",
SetIndexerDuration = "setIndexerDuration",
SetGraphqlDuration = "setGraphqlDuration",
SetLastProcessedNonce = "setLastProcessedNonce"
SetLastProcessedNonce = "setLastProcessedNonce",
SetLastProcessedBatchProcessorNonce = "setLastProcessedBatchProcessorNonce",
SetLastProcessedTransactionCompletedProcessorNonce = "setLastProcessedTransactionCompletedProcessorNonce",
}

0 comments on commit 840a742

Please sign in to comment.