Skip to content

Commit

Permalink
fix: Address Discord transport rate limiting issues (UMAprotocol#3858)
Browse files Browse the repository at this point in the history
* fix: Address Discord transport rate limiting issues

Signed-off-by: chrismaree <[email protected]>

* nit

Signed-off-by: chrismaree <[email protected]>

* nit

Signed-off-by: chrismaree <[email protected]>

* nit

Signed-off-by: chrismaree <[email protected]>

* nit

Signed-off-by: chrismaree <[email protected]>

* nit

Signed-off-by: chrismaree <[email protected]>
  • Loading branch information
chrismaree authored Mar 24, 2022
1 parent 02ea4c4 commit dc4cf55
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 20 deletions.
1 change: 1 addition & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ module.exports = {
rules: {
"@typescript-eslint/no-var-requires": 0,
"@typescript-eslint/no-unused-vars": ["error", { ignoreRestSiblings: true }],
"@typescript-eslint/ban-ts-comment": 0,
},
},
],
Expand Down
62 changes: 57 additions & 5 deletions packages/financial-templates-lib/src/logger/DiscordTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,24 @@ import axios from "axios";

type TransportOptions = ConstructorParameters<typeof Transport>[0];

interface Body {
username: string;
avatar_url: string;
embeds: { title: string; description: string; color: number }[];
}
interface QueueElement {
body: Body;
webHook: string;
}

export class DiscordTransport extends Transport {
private readonly defaultWebHookUrl: string | string[];
private readonly escalationPathWebhookUrls: { [key: string]: string | string[] };
private readonly postOnNonEscalationPaths: boolean;

private logQueue: QueueElement[];
private isQueueBeingExecuted = false;

constructor(
winstonOpts: TransportOptions,
ops: {
Expand All @@ -22,6 +36,8 @@ export class DiscordTransport extends Transport {
this.defaultWebHookUrl = ops.defaultWebHookUrl;
this.escalationPathWebhookUrls = ops.escalationPathWebhookUrls ?? {};
this.postOnNonEscalationPaths = ops.postOnNonEscalationPaths ?? true;

this.logQueue = [];
}

// Note: info must be any because that's what the base class uses.
Expand Down Expand Up @@ -59,18 +75,54 @@ export class DiscordTransport extends Transport {

// Send webhook request to each of the configured webhooks upstream. This posts the messages on Discord. Execute
// these sequentially with a soft delay of 2 seconds between calls to avoid hitting the discord rate limit.
if (webHooks.length)
for (const webHook of webHooks) {
await axios.post(webHook, body);
await delay(2);
}
if (webHooks.length) for (const webHook of webHooks) this.logQueue.push({ webHook, body });

await this.executeLogQueue(); // Start processing the log que.
} catch (error) {
console.error("Discord error", error);
}

callback();
}

// Processes a queue of logs produced by the transport. Executes sequentially and listens to the response from the
// Discord API to back off and sleep if we are exceeding their rate limiting. Sets the parent transports isFlushed
// variable to block the bot from closing until the whole queue has been flushed.
async executeLogQueue(backOffDuration = 0): Promise<void> {
if (this.isQueueBeingExecuted) return; // If the queue is currently being executed, return.
this.isQueueBeingExecuted = true; // Set the queue to being executed.
// Set the parent isFlushed to false to prevent the logger from closing while the queue is being executed. Note this
// is separate variable from the isQueueBeingExecuted flag as this isFlushed is global for the logger instance.
(this as any).parent.isFlushed = false;

// If the previous iteration set a backOffDuration then wait for this duration.
if (backOffDuration != 0) await delay(backOffDuration);

while (this.logQueue.length) {
try {
// Pop off the first element (oldest) and try send it to discord. If this errors then we are being rate limited.
await axios.post(this.logQueue[0].webHook, this.logQueue[0].body);
this.logQueue.shift(); // If the request does not fail remove it from the log queue as having been executed.
} catch (error: any) {
// Extract the retry_after from the response. This is the Discord API telling us how long to back off for.
let _backOffDuration = error?.response?.data.retry_after;
// If they tell us to back off for more than a minute then ignore it and cap at 1 min. This is enough time in
// practice to recover from a rate limit while not making the bot hang indefinitely.
if (_backOffDuration > 60) _backOffDuration = 60;
// We removed the element in the shift above, push it back on to the start of the queue to not drop any message.
// As we have errored we now need to re-enter the executeLogQuery method. Set isQueueBeingExecuted to false and
// re-call the executeLogQuery. This will initiate the backoff delay and then continue to process the queue.
this.isQueueBeingExecuted = false;
await this.executeLogQueue(_backOffDuration);
}
}

// Only if we get to the end of the function (no errors) then we can set isQueueBeingExecuted to false and can
// set the parent isFlushed to true, enabling the bot to close out.
this.isQueueBeingExecuted = false;
(this as any).parent.isFlushed = true;
}

// Discord URLS are formatted differently to markdown links produced upstream in the bots. For example, slack links
// will look like this <https://google.com|google.com> but links for discord should look like this
// [google.com](https://google.com).This function takes in the one format and converts it to the other such that
Expand Down
27 changes: 16 additions & 11 deletions packages/financial-templates-lib/src/logger/Logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ import winston from "winston";
import type { Logger as _Logger, LogEntry } from "winston";
import type * as Transport from "winston-transport";
import { createTransports } from "./Transports";
import { delay } from "../helpers/delay";

// This async function can be called by a bot if the log message is generated right before the process terminates.
// By calling `await waitForLogger(Logger)`, with the local Logger instance, the process will wait for all upstream
// transports to clear. This enables slower transports like slack to still send their messages before the process yields.
// Note: typescript infers the return type to be unknown. This is fine, as the return type should be void and unused.
export async function waitForLogger(logger: _Logger): Promise<unknown> {
const loggerDone = new Promise((resolve) => logger.on("finish", resolve));
logger.end();
return await loggerDone;
// This method will check if the AugmentedLogger's isFlushed is set to true. If not, it will block until such time
// that it has been set to true. Note that each blocking transport should implement this isFlushed bool to prevent
// the logger from closing before all logs have been propagated.
export async function waitForLogger(logger: AugmentedLogger) {
while (!logger.isFlushed) await delay(0.5); // While the logger is not flushed, wait for it to be flushed.
}

// If the log entry contains an error then extract the stack trace as the error message.
Expand Down Expand Up @@ -67,12 +66,16 @@ function botIdentifyFormatter(botIdentifier: string) {
};
}

export interface AugmentedLogger extends _Logger {
isFlushed: boolean;
}

export function createNewLogger(
injectedTransports: Transport[] = [],
transportsConfig = {},
botIdentifier = process.env.BOT_IDENTIFIER || "NO_BOT_ID"
): _Logger {
return winston.createLogger({
): AugmentedLogger {
const logger = winston.createLogger({
level: "debug",
format: winston.format.combine(
winston.format(botIdentifyFormatter(botIdentifier))(),
Expand All @@ -82,7 +85,9 @@ export function createNewLogger(
),
transports: [...createTransports(transportsConfig), ...injectedTransports],
exitOnError: !!process.env.EXIT_ON_ERROR,
});
}) as AugmentedLogger;
logger.isFlushed = true; // The logger should start off in a flushed state of "true". i.e it is ready to be close.
return logger;
}

export const Logger = createNewLogger();
export const Logger: AugmentedLogger = createNewLogger();
2 changes: 2 additions & 0 deletions packages/monitors/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const {
delay,
multicallAddressMap,
OptimisticOracleType,
waitForLogger,
} = require("@uma/financial-templates-lib");

// Monitor modules to report on client state changes.
Expand Down Expand Up @@ -395,6 +396,7 @@ async function run({
if (pollingDelay === 0) {
logger.debug({ at: "Monitor#index", message: "End of serverless execution loop - terminating process" });
await delay(5); // Set a delay to let the transports flush fully.
await waitForLogger(logger); // Blocks exiting until the Discord transport is fully flushed.
break;
}
logger.debug({ at: "Monitor#index", message: "End of execution loop - waiting polling delay" });
Expand Down
4 changes: 0 additions & 4 deletions packages/monitors/test/OptimisticOracleContractMonitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,6 @@ describe("OptimisticOracleContractMonitor.js", function () {
assert.isTrue(lastSpyLogIncludes(spy, `https://etherscan.io/address/${optimisticRequester.options.address}`));
assert.isTrue(lastSpyLogIncludes(spy, `https://etherscan.io/tx/${requestTxn.transactionHash}`));

console.log(
"TARGET",
`${sampleBaseUIUrl}/request?transactionHash=${requestTxn.transactionHash}&chainId=${contractProps.chainId}`
);
assert.isTrue(
lastSpyLogIncludes(
spy,
Expand Down
5 changes: 5 additions & 0 deletions packages/monitors/test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ describe("index.js", function () {
level: "debug",
transports: [new SpyTransport({ level: "debug" }, { spy: spy })],
});
spyLogger.isFlushed = true; // exit instantly when requested to do so.

// Create a new synthetic token
syntheticToken = await SyntheticToken.new("Test Synthetic Token", "SYNTH", 18).send({ from: contractCreator });
Expand Down Expand Up @@ -283,6 +284,7 @@ describe("index.js", function () {
level: "debug",
transports: [new SpyTransport({ level: "debug" }, { spy: spy })],
});
spyLogger.isFlushed = true; // exit instantly when requested to do so.

collateralToken = await Token.new("USDC", "USDC", 6).send({ from: contractCreator });
syntheticToken = await SyntheticToken.new("Test Synthetic Token", "SYNTH", 6).send({ from: contractCreator });
Expand Down Expand Up @@ -324,6 +326,7 @@ describe("index.js", function () {
level: "debug",
transports: [new SpyTransport({ level: "debug" }, { spy: spy })],
});
spyLogger.isFlushed = true; // exit instantly when requested to do so.

await Poll.run({
logger: spyLogger,
Expand Down Expand Up @@ -354,6 +357,7 @@ describe("index.js", function () {
level: "debug",
transports: [new SpyTransport({ level: "debug" }, { spy: spy })],
});
spyLogger.isFlushed = true; // exit instantly when requested to do so.

let didThrowError = false;
let errorString;
Expand Down Expand Up @@ -409,6 +413,7 @@ describe("index.js", function () {
level: "debug",
transports: [new SpyTransport({ level: "debug" }, { spy: spy })],
});
spyLogger.isFlushed = true; // exit instantly when requested to do so.

errorRetries = 3; // set execution retries to 3 to validate.
// Note both the token and medanizer price feeds are the same config. This is done so that createReferencePriceFeedForFinancialContract
Expand Down

0 comments on commit dc4cf55

Please sign in to comment.