Skip to content

Commit

Permalink
feat: Switch to AMQP (#470)
Browse files Browse the repository at this point in the history
* feat: Switch to AMQP

* Only 2 queues

* Remove useless file
  • Loading branch information
amaury1093 authored Dec 10, 2023
1 parent e902216 commit b1bf8c2
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 64 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"@sentry/nextjs": "^7.86.0",
"@stripe/stripe-js": "^1.52.0",
"@supabase/supabase-js": "^1.35.7",
"@types/amqplib": "^0.10.4",
"@types/cors": "^2.8.17",
"@types/mailgun-js": "^0.22.18",
"@types/markdown-pdf": "^9.0.5",
Expand All @@ -30,6 +31,7 @@
"@types/react-dom": "^18.2.17",
"@types/request-ip": "^0.0.41",
"@types/uuid": "^9.0.7",
"amqplib": "^0.10.3",
"axios": "^1.6.2",
"cors": "^2.8.5",
"date-fns": "^2.30.0",
Expand Down
2 changes: 1 addition & 1 deletion src/components/Footer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export function Footer(): React.ReactElement {
</Link>
</Text>
<a
href="https://vercel.com?utm_source=devsincrypto&utm_campaign=oss"
href="https://vercel.com?utm_source=reacherhq&utm_campaign=oss"
target="_blank"
rel="noopener noreferrer"
>
Expand Down
11 changes: 4 additions & 7 deletions src/pages/api/calls/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface WebhookExtra {
}

export interface WebhookPayload {
data: CheckEmailOutput;
output: CheckEmailOutput;
extra: WebhookExtra;
}

Expand All @@ -25,15 +25,12 @@ const POST = async (
return;
}

if (
req.headers["x-orchestrator-secret"] !==
process.env.RCH_ORCHESTRATOR_SECRET
) {
res.status(403).json({ error: "Invalid orchestrator secret" });
if (req.headers["x-reacher-secret"] !== process.env.RCH_HEADER_SECRET) {
res.status(403).json({ error: "Invalid header secret" });
return;
}

const { extra, data: output } = req.body as WebhookPayload;
const { extra, output } = req.body as WebhookPayload;

// Add to supabase
const response = await supabaseAdmin.from<SupabaseCall>("calls").insert({
Expand Down
78 changes: 65 additions & 13 deletions src/pages/api/v0/check_email.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import type { CheckEmailInput, CheckEmailOutput } from "@reacherhq/api";
import { PostgrestError } from "@supabase/supabase-js";
import axios, { AxiosError } from "axios";
import { NextApiRequest, NextApiResponse } from "next";
import { v4 } from "uuid";
import amqplib from "amqplib";
import dns from "dns/promises";

import { checkUserInDB, cors } from "@/util/api";
import { convertAxiosError, getWebappURL } from "@/util/helpers";
import { getWebappURL } from "@/util/helpers";
import { updateSendinblue } from "@/util/sendinblue";
import { sentryException } from "@/util/sentry";
import { SupabaseCall } from "@/util/supabaseClient";
import { supabaseAdmin } from "@/util/supabaseServer";
import { WebhookExtra } from "../calls/webhook";

const ORCHESTRATOR_URL = process.env.RCH_ORCHESTRATOR_URL as string;
const TIMEOUT = 60000;
const MAX_PRIORITY = 5; // Higher is faster, 5 is max.

const POST = async (
req: NextApiRequest,
Expand All @@ -35,10 +36,30 @@ const POST = async (

try {
const verificationId = v4();
try {
await axios.post(
`${ORCHESTRATOR_URL}/v0/check_email`,
{

const conn = await amqplib.connect(
process.env.RCH_AMQP_ADDR || "amqp://localhost"
);
const ch1 = await conn.createChannel();
const verifMethod = await getVerifMethod(req.body as CheckEmailInput);
const queueName = `check_email.${
// If the verifMethod is "Api", we use the "Headless" queue instead,
// because the same workers that handle the "Headless" queue also
// handle the "Api" queue.
//
// In this case, we leave the "Smtp" workers only with one task:
// Smtp. Hopefully this will make it easier to maintain their IP
// reputation.
verifMethod === "Api" ? "Headless" : verifMethod
}`;
await ch1.assertQueue(queueName, {
maxPriority: MAX_PRIORITY,
});

ch1.sendToQueue(
queueName,
Buffer.from(
JSON.stringify({
input: req.body as CheckEmailInput,
webhook: {
url: `${getWebappURL()}/api/calls/webhook`,
Expand All @@ -48,12 +69,14 @@ const POST = async (
verificationId: verificationId,
} as WebhookExtra,
},
},
{}
);
} catch (err) {
throw convertAxiosError(err as AxiosError);
}
})
),
{
priority: MAX_PRIORITY,
}
);

await ch1.close();

// Poll the database to make sure the call was added.
let checkEmailOutput: CheckEmailOutput | undefined;
Expand Down Expand Up @@ -108,3 +131,32 @@ const POST = async (
};

export default POST;

// getVerifMethod returns the verifMethod that is best used to verify the
// input's email address.
async function getVerifMethod(input: CheckEmailInput): Promise<string> {
const domain = input.to_email.split("@")[1];
if (!domain) {
return "Smtp";
}

const records = await dns.resolveMx(domain);
if (
input.yahoo_verif_method !== "Smtp" &&
records.some((r) => r.exchange.endsWith(".yahoodns.net")) // Note: there's no "." at the end of the domain.
) {
return "Headless";
} else if (
input.hotmail_verif_method !== "Smtp" &&
records.some((r) => r.exchange.endsWith(".protection.outlook.com")) // Note: there's no "." at the end of the domain.
) {
return "Headless";
} else if (
input.gmail_verif_method !== "Smtp" &&
records.some((r) => r.exchange.endsWith(".google.com")) // Note: there's no "." at the end of the domain.
) {
return "Api";
} else {
return "Smtp";
}
}
41 changes: 0 additions & 41 deletions src/util/backend.ts

This file was deleted.

53 changes: 51 additions & 2 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
# yarn lockfile v1


"@acuminous/bitsyntax@^0.1.2":
version "0.1.2"
resolved "https://registry.yarnpkg.com/@acuminous/bitsyntax/-/bitsyntax-0.1.2.tgz#e0b31b9ee7ad1e4dd840c34864327c33d9f1f653"
integrity sha512-29lUK80d1muEQqiUsSo+3A0yP6CdspgC95EnKBMi22Xlwt79i/En4Vr67+cXhU+cZjbti3TgGGC5wy1stIywVQ==
dependencies:
buffer-more-ints "~1.0.0"
debug "^4.3.4"
safe-buffer "~5.1.2"

"@ampproject/remapping@^2.2.0":
version "2.2.0"
resolved "https://registry.yarnpkg.com/@ampproject/remapping/-/remapping-2.2.0.tgz#56c133824780de3174aed5ab6834f3026790154d"
Expand Down Expand Up @@ -1944,6 +1953,13 @@
dependencies:
"@types/estree" "*"

"@types/amqplib@^0.10.4":
version "0.10.4"
resolved "https://registry.yarnpkg.com/@types/amqplib/-/amqplib-0.10.4.tgz#0bf1ceefe280c02502b209fa9f06394b0c4cd688"
integrity sha512-Y5Sqquh/LqDxSgxYaAAFNM0M7GyONtSDCcFMJk+DQwYEjibPyW6y+Yu9H9omdkKc3epyXULmFN3GTaeBHhn2Hg==
dependencies:
"@types/node" "*"

"@types/async@^3.2.16":
version "3.2.18"
resolved "https://registry.yarnpkg.com/@types/async/-/async-3.2.18.tgz#3d93dde6eab654f7bc23e549d9af5d3fa4a5bdc5"
Expand Down Expand Up @@ -2516,6 +2532,16 @@ ajv@^6.0.0, ajv@^6.12.3:
json-schema-traverse "^0.4.1"
uri-js "^4.2.2"

amqplib@^0.10.3:
version "0.10.3"
resolved "https://registry.yarnpkg.com/amqplib/-/amqplib-0.10.3.tgz#e186a2f74521eb55ec54db6d25ae82c29c1f911a"
integrity sha512-UHmuSa7n8vVW/a5HGh2nFPqAEr8+cD4dEZ6u9GjP91nHfr1a54RyAKyra7Sb5NH7NBKOUlyQSMXIp0qAixKexw==
dependencies:
"@acuminous/bitsyntax" "^0.1.2"
buffer-more-ints "~1.0.0"
readable-stream "1.x >=1.1.9"
url-parse "~1.5.10"

ansi-escapes@^4.2.1:
version "4.3.2"
resolved "https://registry.yarnpkg.com/ansi-escapes/-/ansi-escapes-4.3.2.tgz#6b2291d1db7d98b6521d5f1efa42d0f3a9feb65e"
Expand Down Expand Up @@ -2934,6 +2960,11 @@ buffer-from@^1.0.0:
resolved "https://registry.yarnpkg.com/buffer-from/-/buffer-from-1.1.1.tgz#32713bc028f75c02fdb710d7c7bcec1f2c6070ef"
integrity sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==

buffer-more-ints@~1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz#ef4f8e2dddbad429ed3828a9c55d44f05c611422"
integrity sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==

buffer@^5.5.0:
version "5.7.1"
resolved "https://registry.yarnpkg.com/buffer/-/buffer-5.7.1.tgz#ba62e7c13133053582197160851a8f648e99eed0"
Expand Down Expand Up @@ -7294,6 +7325,11 @@ qs@~6.5.2:
resolved "https://registry.yarnpkg.com/qs/-/qs-6.5.3.tgz#3aeeffc91967ef6e35c0e488ef46fb296ab76aad"
integrity sha512-qxXIEh4pCGfHICj1mAJQ2/2XVZkjCDTcEgfoSQxc/fYivUZxTkk7L3bDBJSoNrEzXI17oUO5Dp07ktqE5KzczA==

querystringify@^2.1.1:
version "2.2.0"
resolved "https://registry.yarnpkg.com/querystringify/-/querystringify-2.2.0.tgz#3345941b4153cb9d082d8eee4cda2016a9aef7f6"
integrity sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==

quick-lru@^5.1.1:
version "5.1.1"
resolved "https://registry.yarnpkg.com/quick-lru/-/quick-lru-5.1.1.tgz#366493e6b3e42a3a6885e2e99d18f80fb7a8c932"
Expand Down Expand Up @@ -7359,7 +7395,7 @@ read-cmd-shim@^4.0.0:
resolved "https://registry.yarnpkg.com/read-cmd-shim/-/read-cmd-shim-4.0.0.tgz#640a08b473a49043e394ae0c7a34dd822c73b9bb"
integrity sha512-yILWifhaSEEytfXI76kB9xEEiG1AiozaCJZ83A87ytjRiN+jVibXjedjCRNjoZviinhG+4UkalO3mWTd8u5O0Q==

[email protected]:
[email protected], "[email protected] >=1.1.9":
version "1.1.14"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-1.1.14.tgz#7cf4c54ef648e3813084c636dd2079e166c081d9"
integrity sha1-fPTFTvZI44EwhMY23SB54WbAgdk=
Expand Down Expand Up @@ -7582,6 +7618,11 @@ require-from-string@^2.0.2:
resolved "https://registry.yarnpkg.com/require-like/-/require-like-0.1.2.tgz#ad6f30c13becd797010c468afa775c0c0a6b47fa"
integrity sha512-oyrU88skkMtDdauHDuKVrgR+zuItqr6/c//FXzvmxRGMexSDc6hNvJInGW3LL46n+8b50RykrvwSUIIQH2LQ5A==

requires-port@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/requires-port/-/requires-port-1.0.0.tgz#925d2601d39ac485e091cf0da5c6e694dc3dcaff"
integrity sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==

resolve-alpn@^1.0.0:
version "1.2.1"
resolved "https://registry.yarnpkg.com/resolve-alpn/-/resolve-alpn-1.2.1.tgz#b7adbdac3546aaaec20b45e7d8265927072726f9"
Expand Down Expand Up @@ -7726,7 +7767,7 @@ [email protected], safe-buffer@^5.0.1, safe-buffer@^5.1.2, safe-buffer@~5.2.0:
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.2.1.tgz#1eaf9fa9bdb1fdd4ec75f58f9cdb4e6b7827eec6"
integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==

safe-buffer@~5.1.0, safe-buffer@~5.1.1:
safe-buffer@~5.1.0, safe-buffer@~5.1.1, safe-buffer@~5.1.2:
version "5.1.2"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d"
integrity sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==
Expand Down Expand Up @@ -8728,6 +8769,14 @@ uri-js@^4.2.2:
dependencies:
punycode "^2.1.0"

url-parse@~1.5.10:
version "1.5.10"
resolved "https://registry.yarnpkg.com/url-parse/-/url-parse-1.5.10.tgz#9d3c2f736c1d75dd3bd2be507dcc111f1e2ea9c1"
integrity sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==
dependencies:
querystringify "^2.1.1"
requires-port "^1.0.0"

utf-8-validate@^5.0.2:
version "5.0.5"
resolved "https://registry.yarnpkg.com/utf-8-validate/-/utf-8-validate-5.0.5.tgz#dd32c2e82c72002dc9f02eb67ba6761f43456ca1"
Expand Down

1 comment on commit b1bf8c2

@vercel
Copy link

@vercel vercel bot commented on b1bf8c2 Dec 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.