From dc4cf55f739377b98e3301d8d02892fd7083acf9 Mon Sep 17 00:00:00 2001 From: Chris Maree Date: Thu, 24 Mar 2022 17:33:34 +0200 Subject: [PATCH] fix: Address Discord transport rate limiting issues (#3858) * fix: Address Discord transport rate limiting issues Signed-off-by: chrismaree * nit Signed-off-by: chrismaree * nit Signed-off-by: chrismaree * nit Signed-off-by: chrismaree * nit Signed-off-by: chrismaree * nit Signed-off-by: chrismaree --- .eslintrc.js | 1 + .../src/logger/DiscordTransport.ts | 62 +++++++++++++++++-- .../src/logger/Logger.ts | 27 ++++---- packages/monitors/index.js | 2 + .../test/OptimisticOracleContractMonitor.js | 4 -- packages/monitors/test/index.js | 5 ++ 6 files changed, 81 insertions(+), 20 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index 96fa2a1c2f..7390f9aba9 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -26,6 +26,7 @@ module.exports = { rules: { "@typescript-eslint/no-var-requires": 0, "@typescript-eslint/no-unused-vars": ["error", { ignoreRestSiblings: true }], + "@typescript-eslint/ban-ts-comment": 0, }, }, ], diff --git a/packages/financial-templates-lib/src/logger/DiscordTransport.ts b/packages/financial-templates-lib/src/logger/DiscordTransport.ts index ebfd35dbe1..4ce907399c 100644 --- a/packages/financial-templates-lib/src/logger/DiscordTransport.ts +++ b/packages/financial-templates-lib/src/logger/DiscordTransport.ts @@ -6,10 +6,24 @@ import axios from "axios"; type TransportOptions = ConstructorParameters[0]; +interface Body { + username: string; + avatar_url: string; + embeds: { title: string; description: string; color: number }[]; +} +interface QueueElement { + body: Body; + webHook: string; +} + export class DiscordTransport extends Transport { private readonly defaultWebHookUrl: string | string[]; private readonly escalationPathWebhookUrls: { [key: string]: string | string[] }; private readonly postOnNonEscalationPaths: boolean; + + private logQueue: QueueElement[]; + private isQueueBeingExecuted = false; + constructor( winstonOpts: TransportOptions, ops: { @@ -22,6 +36,8 @@ export class DiscordTransport extends Transport { this.defaultWebHookUrl = ops.defaultWebHookUrl; this.escalationPathWebhookUrls = ops.escalationPathWebhookUrls ?? {}; this.postOnNonEscalationPaths = ops.postOnNonEscalationPaths ?? true; + + this.logQueue = []; } // Note: info must be any because that's what the base class uses. @@ -59,11 +75,9 @@ export class DiscordTransport extends Transport { // Send webhook request to each of the configured webhooks upstream. This posts the messages on Discord. Execute // these sequentially with a soft delay of 2 seconds between calls to avoid hitting the discord rate limit. - if (webHooks.length) - for (const webHook of webHooks) { - await axios.post(webHook, body); - await delay(2); - } + if (webHooks.length) for (const webHook of webHooks) this.logQueue.push({ webHook, body }); + + await this.executeLogQueue(); // Start processing the log que. } catch (error) { console.error("Discord error", error); } @@ -71,6 +85,44 @@ export class DiscordTransport extends Transport { callback(); } + // Processes a queue of logs produced by the transport. Executes sequentially and listens to the response from the + // Discord API to back off and sleep if we are exceeding their rate limiting. Sets the parent transports isFlushed + // variable to block the bot from closing until the whole queue has been flushed. + async executeLogQueue(backOffDuration = 0): Promise { + if (this.isQueueBeingExecuted) return; // If the queue is currently being executed, return. + this.isQueueBeingExecuted = true; // Set the queue to being executed. + // Set the parent isFlushed to false to prevent the logger from closing while the queue is being executed. Note this + // is separate variable from the isQueueBeingExecuted flag as this isFlushed is global for the logger instance. + (this as any).parent.isFlushed = false; + + // If the previous iteration set a backOffDuration then wait for this duration. + if (backOffDuration != 0) await delay(backOffDuration); + + while (this.logQueue.length) { + try { + // Pop off the first element (oldest) and try send it to discord. If this errors then we are being rate limited. + await axios.post(this.logQueue[0].webHook, this.logQueue[0].body); + this.logQueue.shift(); // If the request does not fail remove it from the log queue as having been executed. + } catch (error: any) { + // Extract the retry_after from the response. This is the Discord API telling us how long to back off for. + let _backOffDuration = error?.response?.data.retry_after; + // If they tell us to back off for more than a minute then ignore it and cap at 1 min. This is enough time in + // practice to recover from a rate limit while not making the bot hang indefinitely. + if (_backOffDuration > 60) _backOffDuration = 60; + // We removed the element in the shift above, push it back on to the start of the queue to not drop any message. + // As we have errored we now need to re-enter the executeLogQuery method. Set isQueueBeingExecuted to false and + // re-call the executeLogQuery. This will initiate the backoff delay and then continue to process the queue. + this.isQueueBeingExecuted = false; + await this.executeLogQueue(_backOffDuration); + } + } + + // Only if we get to the end of the function (no errors) then we can set isQueueBeingExecuted to false and can + // set the parent isFlushed to true, enabling the bot to close out. + this.isQueueBeingExecuted = false; + (this as any).parent.isFlushed = true; + } + // Discord URLS are formatted differently to markdown links produced upstream in the bots. For example, slack links // will look like this but links for discord should look like this // [google.com](https://google.com).This function takes in the one format and converts it to the other such that diff --git a/packages/financial-templates-lib/src/logger/Logger.ts b/packages/financial-templates-lib/src/logger/Logger.ts index 942d3e83d1..ebcdff9f64 100644 --- a/packages/financial-templates-lib/src/logger/Logger.ts +++ b/packages/financial-templates-lib/src/logger/Logger.ts @@ -31,15 +31,14 @@ import winston from "winston"; import type { Logger as _Logger, LogEntry } from "winston"; import type * as Transport from "winston-transport"; import { createTransports } from "./Transports"; +import { delay } from "../helpers/delay"; // This async function can be called by a bot if the log message is generated right before the process terminates. -// By calling `await waitForLogger(Logger)`, with the local Logger instance, the process will wait for all upstream -// transports to clear. This enables slower transports like slack to still send their messages before the process yields. -// Note: typescript infers the return type to be unknown. This is fine, as the return type should be void and unused. -export async function waitForLogger(logger: _Logger): Promise { - const loggerDone = new Promise((resolve) => logger.on("finish", resolve)); - logger.end(); - return await loggerDone; +// This method will check if the AugmentedLogger's isFlushed is set to true. If not, it will block until such time +// that it has been set to true. Note that each blocking transport should implement this isFlushed bool to prevent +// the logger from closing before all logs have been propagated. +export async function waitForLogger(logger: AugmentedLogger) { + while (!logger.isFlushed) await delay(0.5); // While the logger is not flushed, wait for it to be flushed. } // If the log entry contains an error then extract the stack trace as the error message. @@ -67,12 +66,16 @@ function botIdentifyFormatter(botIdentifier: string) { }; } +export interface AugmentedLogger extends _Logger { + isFlushed: boolean; +} + export function createNewLogger( injectedTransports: Transport[] = [], transportsConfig = {}, botIdentifier = process.env.BOT_IDENTIFIER || "NO_BOT_ID" -): _Logger { - return winston.createLogger({ +): AugmentedLogger { + const logger = winston.createLogger({ level: "debug", format: winston.format.combine( winston.format(botIdentifyFormatter(botIdentifier))(), @@ -82,7 +85,9 @@ export function createNewLogger( ), transports: [...createTransports(transportsConfig), ...injectedTransports], exitOnError: !!process.env.EXIT_ON_ERROR, - }); + }) as AugmentedLogger; + logger.isFlushed = true; // The logger should start off in a flushed state of "true". i.e it is ready to be close. + return logger; } -export const Logger = createNewLogger(); +export const Logger: AugmentedLogger = createNewLogger(); diff --git a/packages/monitors/index.js b/packages/monitors/index.js index afff38c1f4..26f24e016c 100755 --- a/packages/monitors/index.js +++ b/packages/monitors/index.js @@ -16,6 +16,7 @@ const { delay, multicallAddressMap, OptimisticOracleType, + waitForLogger, } = require("@uma/financial-templates-lib"); // Monitor modules to report on client state changes. @@ -395,6 +396,7 @@ async function run({ if (pollingDelay === 0) { logger.debug({ at: "Monitor#index", message: "End of serverless execution loop - terminating process" }); await delay(5); // Set a delay to let the transports flush fully. + await waitForLogger(logger); // Blocks exiting until the Discord transport is fully flushed. break; } logger.debug({ at: "Monitor#index", message: "End of execution loop - waiting polling delay" }); diff --git a/packages/monitors/test/OptimisticOracleContractMonitor.js b/packages/monitors/test/OptimisticOracleContractMonitor.js index d5aa73fadf..6fd2bf1413 100644 --- a/packages/monitors/test/OptimisticOracleContractMonitor.js +++ b/packages/monitors/test/OptimisticOracleContractMonitor.js @@ -244,10 +244,6 @@ describe("OptimisticOracleContractMonitor.js", function () { assert.isTrue(lastSpyLogIncludes(spy, `https://etherscan.io/address/${optimisticRequester.options.address}`)); assert.isTrue(lastSpyLogIncludes(spy, `https://etherscan.io/tx/${requestTxn.transactionHash}`)); - console.log( - "TARGET", - `${sampleBaseUIUrl}/request?transactionHash=${requestTxn.transactionHash}&chainId=${contractProps.chainId}` - ); assert.isTrue( lastSpyLogIncludes( spy, diff --git a/packages/monitors/test/index.js b/packages/monitors/test/index.js index 1f3d0a1c2d..fae16097b0 100644 --- a/packages/monitors/test/index.js +++ b/packages/monitors/test/index.js @@ -128,6 +128,7 @@ describe("index.js", function () { level: "debug", transports: [new SpyTransport({ level: "debug" }, { spy: spy })], }); + spyLogger.isFlushed = true; // exit instantly when requested to do so. // Create a new synthetic token syntheticToken = await SyntheticToken.new("Test Synthetic Token", "SYNTH", 18).send({ from: contractCreator }); @@ -283,6 +284,7 @@ describe("index.js", function () { level: "debug", transports: [new SpyTransport({ level: "debug" }, { spy: spy })], }); + spyLogger.isFlushed = true; // exit instantly when requested to do so. collateralToken = await Token.new("USDC", "USDC", 6).send({ from: contractCreator }); syntheticToken = await SyntheticToken.new("Test Synthetic Token", "SYNTH", 6).send({ from: contractCreator }); @@ -324,6 +326,7 @@ describe("index.js", function () { level: "debug", transports: [new SpyTransport({ level: "debug" }, { spy: spy })], }); + spyLogger.isFlushed = true; // exit instantly when requested to do so. await Poll.run({ logger: spyLogger, @@ -354,6 +357,7 @@ describe("index.js", function () { level: "debug", transports: [new SpyTransport({ level: "debug" }, { spy: spy })], }); + spyLogger.isFlushed = true; // exit instantly when requested to do so. let didThrowError = false; let errorString; @@ -409,6 +413,7 @@ describe("index.js", function () { level: "debug", transports: [new SpyTransport({ level: "debug" }, { spy: spy })], }); + spyLogger.isFlushed = true; // exit instantly when requested to do so. errorRetries = 3; // set execution retries to 3 to validate. // Note both the token and medanizer price feeds are the same config. This is done so that createReferencePriceFeedForFinancialContract