Skip to content

Commit

Permalink
handle rate limit for notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
TychoTheTaco committed Oct 22, 2022
1 parent 27d4012 commit 61a987d
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ type ConfigEventMapType = {
"new_drops_campaign"?: { games: "all" | "config" },
"drop_claimed"?: { games: "all" | "config" },
"community_points_earned"?: { reasons: Event_CommunityPointsEarned_ClaimReason[] },
"drop_ready_to_claim"?: { },
"drop_ready_to_claim"?: {},
}

interface DiscordNotifier {
Expand Down
24 changes: 16 additions & 8 deletions src/notifiers/discord.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import axios from "axios";
import axios, {AxiosResponse} from "axios";

import {DropCampaign, getDropBenefitNames, TimeBasedDrop} from "../twitch.js";
import {EventMapType, formatTime, formatTimestamp, Notifier} from "./notifier.js";
import {EventMapType, formatTime, formatTimestamp, RateLimitedNotifier} from "./notifier.js";
import {CommunityPointsUserV1_PointsEarned} from "../web_socket_listener.js";

export class DiscordWebhookSender extends Notifier {
export class DiscordWebhookSender extends RateLimitedNotifier<object> {

readonly #webhookUrl: string;

Expand All @@ -13,6 +13,14 @@ export class DiscordWebhookSender extends Notifier {
this.#webhookUrl = webhookUrl;
}

protected sendNotification(data: object): Promise<AxiosResponse> {
return axios.post(this.#webhookUrl, data);
}

protected getRetryAfterSeconds(response: AxiosResponse): number {
return response.data["retry_after"];
}

async notifyOnNewDropCampaign(campaign: DropCampaign): Promise<void> {
let dropsString = "";
const timeBasedDrops = campaign.timeBasedDrops;
Expand Down Expand Up @@ -47,7 +55,7 @@ export class DiscordWebhookSender extends Notifier {
});
}

await axios.post(this.#webhookUrl, {
await this.post({
embeds: [
{
title: "New Drops Campaign",
Expand All @@ -61,7 +69,7 @@ export class DiscordWebhookSender extends Notifier {
}

async notifyOnDropClaimed(drop: TimeBasedDrop, campaign: DropCampaign): Promise<void> {
await axios.post(this.#webhookUrl, {
await this.post({
embeds: [
{
title: "Drop Claimed",
Expand All @@ -88,7 +96,7 @@ export class DiscordWebhookSender extends Notifier {
}

async notifyOnCommunityPointsEarned(data: CommunityPointsUserV1_PointsEarned, channelLogin: string): Promise<void> {
await axios.post(this.#webhookUrl, {
await this.post({
embeds: [
{
title: "Community Points Earned",
Expand All @@ -115,8 +123,8 @@ export class DiscordWebhookSender extends Notifier {
});
}

async notifyOnDropReadyToClaim(drop:TimeBasedDrop, campaign: DropCampaign): Promise<void> {
await axios.post(this.#webhookUrl, {
async notifyOnDropReadyToClaim(drop: TimeBasedDrop, campaign: DropCampaign): Promise<void> {
await this.post({
embeds: [
{
title: "Drop Ready To Claim",
Expand Down
68 changes: 67 additions & 1 deletion src/notifiers/notifier.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import {DropCampaign, TimeBasedDrop} from "../twitch.js";
import {CommunityPointsUserV1_PointsEarned} from "../web_socket_listener.js";
import axios, {AxiosResponse} from "axios";
import logger from "../logger.js";

export type EventName = "new_drops_campaign" | "drop_claimed" | "community_points_earned" | "drop_ready_to_claim";

Expand Down Expand Up @@ -107,6 +109,70 @@ export abstract class Notifier {

}

export abstract class RateLimitedNotifier<T> extends Notifier {

/**
* Queue of pending requests. If we get rate limited, any new requests will be put into this queue. We will try
* sending them again at {@link #tryAgainTime}.
* @private
*/
#pendingRequests: T[] = [];

/**
* The time that we can try sending requests again.
* @private
*/
#tryAgainTime: Date = new Date();

protected abstract sendNotification(data: T): Promise<AxiosResponse>;

protected abstract getRetryAfterSeconds(response: AxiosResponse): number;

protected async post(data: T) {
const remainingSeconds = (this.#tryAgainTime.getTime() - new Date().getTime()) / 1000;
if (remainingSeconds > 0) {
logger.warn(`Delaying notification due to rate limit! Trying again in ${remainingSeconds} seconds.`);
this.#pendingRequests.push(data);
return;
}

try {
await this.sendNotification(data);
} catch (error) {
if (axios.isAxiosError(error)) {
const response = error.response;
if (response) {
if (response.status === 429) {
const retryAfterSeconds = this.getRetryAfterSeconds(response);
const retryAfterMilliseconds = retryAfterSeconds * 1000;
this.#tryAgainTime = new Date(new Date().getTime() + retryAfterMilliseconds);
logger.warn(`Hit notification rate limit! Delaying notification for ${retryAfterSeconds} seconds.`);
this.#pendingRequests.push(data);
setTimeout(this.#sendPendingRequests.bind(this), retryAfterMilliseconds);
return;
}
}
}
throw error;
}
}

async #sendPendingRequests() {
logger.info(`Sending ${this.#pendingRequests.length} pending notifications...`);
while (this.#pendingRequests.length > 0) {
const data = this.#pendingRequests.shift();
if (data) {
try {
await this.post(data);
} catch (error) {
logger.error("Error sending notification: " + error);
}
}
}
}

}

export function formatTimestamp(timestamp: string) {
return new Date(timestamp).toLocaleString(undefined, {
timeStyle: "short",
Expand All @@ -120,4 +186,4 @@ export function formatTime(minutes: number): string {
return `${hours} hr` + (hours === 1 ? "" : "s");
}
return `${minutes} min` + (minutes === 1 ? "" : "s");
}
}
26 changes: 15 additions & 11 deletions src/notifiers/telegram.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import axios from "axios";
import axios, {AxiosResponse} from "axios";

import {DropCampaign, getDropBenefitNames, TimeBasedDrop} from "../twitch.js";
import {EventMapType, formatTime, formatTimestamp, Notifier} from "./notifier.js";
import {EventMapType, formatTime, formatTimestamp, RateLimitedNotifier} from "./notifier.js";
import {CommunityPointsUserV1_PointsEarned} from "../web_socket_listener.js";

function escapeFormatting(text: string) {
Expand All @@ -15,7 +15,7 @@ function escapeFormatting(text: string) {
return safe;
}

export class TelegramNotifier extends Notifier {
export class TelegramNotifier extends RateLimitedNotifier<string> {

readonly #token: string;
readonly #chatId: string;
Expand All @@ -26,6 +26,14 @@ export class TelegramNotifier extends Notifier {
this.#chatId = chatId;
}

protected sendNotification(data: string): Promise<AxiosResponse> {
return axios.post(`https://api.telegram.org/bot${this.#token}/sendMessage`, {chat_id: this.#chatId, parse_mode: "MarkdownV2", text: data});
}

protected getRetryAfterSeconds(response: AxiosResponse): number {
return response.data["parameters"]["retry_after"];
}

#createMessage(title: string, fields: { name: string, value: any }[]): string {
let message = `__*${title}*__\n\n`;
for (const field of fields) {
Expand All @@ -34,10 +42,6 @@ export class TelegramNotifier extends Notifier {
return message;
}

async #sendMessage(text: string) {
await axios.post(`https://api.telegram.org/bot${this.#token}/sendMessage`, {chat_id: this.#chatId, parse_mode: "MarkdownV2", text: text});
}

async notifyOnNewDropCampaign(campaign: DropCampaign): Promise<void> {
let dropsString = "";
const timeBasedDrops = campaign.timeBasedDrops;
Expand Down Expand Up @@ -73,7 +77,7 @@ export class TelegramNotifier extends Notifier {
}

const message = this.#createMessage("New Drop Campaign", fields);
await this.#sendMessage(message);
await this.post(message);
}

async notifyOnDropClaimed(drop: TimeBasedDrop, campaign: DropCampaign): Promise<void> {
Expand All @@ -92,7 +96,7 @@ export class TelegramNotifier extends Notifier {
}
];
const message = this.#createMessage("Drop Claimed", fields);
await this.#sendMessage(message);
await this.post(message);
}

async notifyOnCommunityPointsEarned(data: CommunityPointsUserV1_PointsEarned, channelLogin: string): Promise<void> {
Expand All @@ -114,11 +118,11 @@ export class TelegramNotifier extends Notifier {
value: escapeFormatting(data.balance.balance.toLocaleString())
}
]);
await this.#sendMessage(message);
await this.post(message);
}

async notifyOnDropReadyToClaim(drop:TimeBasedDrop, campaign: DropCampaign): Promise<void> {
await this.#sendMessage(this.#createMessage("Drop Ready To Claim", [
await this.post(this.#createMessage("Drop Ready To Claim", [
{
name: "Game",
value: escapeFormatting(campaign.game.displayName)
Expand Down

0 comments on commit 61a987d

Please sign in to comment.