Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GQL Subscriptions over WebSockets #3333

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"@fastify/compress": "^8.0.1",
"@fastify/cookie": "^11.0.1",
"@fastify/cors": "^10.0.1",
"@fastify/websocket": "^11.0.1",
"@ffprobe-installer/ffprobe": "^2.1.2",
"@golevelup/nestjs-discovery": "^4.0.0",
"@graphql-hive/yoga": "^0.38.2",
Expand Down Expand Up @@ -78,6 +79,7 @@
"graphql": "^16.9.0",
"graphql-parse-resolve-info": "^4.14.0",
"graphql-scalars": "^1.22.4",
"graphql-ws": "^5.16.0",
"graphql-yoga": "^5.7.0",
"human-format": "^1.2.0",
"image-size": "^1.0.2",
Expand Down Expand Up @@ -158,6 +160,7 @@
"@nestjs/cli/webpack": "npm:empty-npm-package@*",
"@nestjs/cli/typescript": "^5.1.6",
"@nestjs/graphql/graphql-ws": "^5",
"@nestjs/graphql/ws": "^8",
"@nestjs/platform-fastify/@fastify/cors": "^10",
"@nestjs/platform-fastify/@fastify/formbody": "^8",
"@nestjs/platform-fastify/@fastify/middie": "^9",
Expand Down
118 changes: 117 additions & 1 deletion src/core/graphql/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ import {
GqlModuleOptions,
} from '@nestjs/graphql';
import type { RouteOptions as FastifyRoute } from 'fastify';
import type { ExecutionArgs } from 'graphql';
import { makeHandler as makeGqlWSHandler } from 'graphql-ws/lib/use/@fastify/websocket';
import {
createYoga,
type envelop,
YogaServerInstance,
YogaServerOptions,
} from 'graphql-yoga';
import { noop } from 'lodash';
import { AsyncResource } from 'node:async_hooks';
import type { WebSocket } from 'ws';
import { GqlContextType } from '~/common';
import { HttpAdapter, IRequest } from '../http';
import { IResponse } from '../http/types';
Expand Down Expand Up @@ -54,7 +60,14 @@ export class Driver extends AbstractDriver<DriverConfig> {
});

fastify.route({
method: ['GET', 'POST', 'OPTIONS'],
method: 'GET',
url: this.yoga.graphqlEndpoint,
handler: this.httpHandler,
// Allow this same path to handle websocket upgrade requests.
wsHandler: this.makeWsHandler(options),
});
fastify.route({
method: ['POST', 'OPTIONS'],
url: this.yoga.graphqlEndpoint,
handler: this.httpHandler,
});
Expand All @@ -76,6 +89,109 @@ export class Driver extends AbstractDriver<DriverConfig> {
.send(res.body);
};

/**
* This code ties fastify, yoga, and graphql-ws together.
* Execution layers in order:
* 1. fastify route (http path matching)
* 2. fastify's websocket plugin (http upgrade request & websocket open/close)
* This allows our fastify hooks to be executed.
* And provides a consistent Fastify `Request` type,
* instead of a raw `IncomingMessage`.
* 3. `graphql-ws`'s fastify handler (adapts #2 to graphql-ws)
* 4. `graphql-ws` (handles specific gql protocol over websockets)
* 5. `graphql-yoga` is unwrapped to `envelop`.
* Yoga just wraps `envelop` and handles more of the http layer.
* We really just reference `envelop` hooks with our "Yoga Plugins".
* So this allows our "yoga" plugins to be executed.
*/
private makeWsHandler(options: DriverConfig) {
const asyncContextBySocket = new WeakMap<WebSocket, AsyncResource>();
interface WsExecutionArgs extends ExecutionArgs {
socket: WebSocket;
envelop: ReturnType<ReturnType<typeof envelop>>;
}

// The graphql-ws handler which accepts the fastify websocket/request and
// orchestrates the subscription setup & execution.
// This forwards to yoga/envelop.
// This was adapted from yoga's graphql-ws example.
// https://github.com/dotansimha/graphql-yoga/tree/main/examples/graphql-ws
const fastifyWsHandler = makeGqlWSHandler<
Record<string, unknown>,
{ connection: { socket: WebSocket }; request: IRequest }
>({
schema: options.schema!,
// Custom execute/subscribe functions that really just defer to a
// unique envelop (yoga) instance per request.
execute: (wsArgs) => {
const { envelop, socket, ...args } = wsArgs as WsExecutionArgs;
return asyncContextBySocket.get(socket)!.runInAsyncScope(() => {
return envelop.execute(args);
});
},
subscribe: (wsArgs) => {
const { envelop, socket, ...args } = wsArgs as WsExecutionArgs;
// Because this is called via socket.onmessage, we don't have
// the same async context we started with.
// Grab and resume it.
return asyncContextBySocket.get(socket)!.runInAsyncScope(() => {
return envelop.subscribe(args);
});
},
// Create a unique envelop/yoga instance for each subscription.
// This allows "yoga" plugins that are really just envelop hooks
// to be executed.
onSubscribe: async (ctx, { payload }) => {
const {
extra: {
request,
connection: { socket },
},
} = ctx;
const envelop = this.yoga.getEnveloped({
req: request,
socket,
params: payload, // Same(ish?) shape as YogaInitialContext.params
});

const args: WsExecutionArgs = {
schema: envelop.schema,
operationName: payload.operationName,
document: envelop.parse(payload.query),
variableValues: payload.variables,
contextValue: await envelop.contextFactory(),
// These are needed in our execute()/subscribe() declared above.
// Public examples put these functions in the context, but I don't
// like exposing that implementation detail to the rest of the app.
envelop,
socket,
};

const errors = envelop.validate(args.schema, args.document);
if (errors.length) {
return errors;
}
return args;
},
});

const wsHandler: FastifyRoute['wsHandler'] = function (socket, req) {
// graphql-ws' fastify adapter still uses v9 of @fastify/websocket
// which has the socket wrapped in a SocketStream.
// v10+ removed this default wrapping.
// This adapts the socket for the usage of the adapter.
// The adapter's types are incorrect; they're using our v11 types,
// but their source is developed with v9.
// They just unwrap the socket out, and add a once('error')
// which we can ignore since they add the same handler to socket as well.
const connection = { socket, once: noop } as any;
// Save a reference to the current async context, so we can resume it.
asyncContextBySocket.set(socket, new AsyncResource('graphql-ws'));
return fastifyWsHandler.call(this, connection, req);
};
return wsHandler;
}

async stop() {
// noop
}
Expand Down
11 changes: 11 additions & 0 deletions src/core/graphql/gql-context.host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ export class GqlContextHostImpl implements GqlContextHost, OnModuleDestroy {
});
};

onSubscribe: Plugin['onSubscribe'] = ({
subscribeFn,
setSubscribeFn,
args,
}) => {
const ctx = args.contextValue;
setSubscribeFn((...args) => {
return this.als.run(ctx, subscribeFn, ...args);
});
};

onModuleDestroy() {
this.als.disable();
}
Expand Down
7 changes: 7 additions & 0 deletions src/core/graphql/graphql-error-formatter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ export class GraphqlErrorFormatter {
}),
});

onSubscribe: Plugin['onSubscribe'] = () => ({
onSubscribeError: ({ error, setError }) => {
const formatted = this.formatError(error);
setError(formatted);
},
});

formatError = (error: unknown) => {
if (!(error instanceof GraphQLError)) {
// I don't think this happens.
Expand Down
7 changes: 7 additions & 0 deletions src/core/graphql/graphql-session.plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,11 @@ export class GraphqlSessionPlugin {
args.contextValue.session$.complete();
},
});
onSubscribe: Plugin['onSubscribe'] = () => ({
onSubscribeResult: ({ args }) => ({
onEnd: () => {
args.contextValue.session$.complete();
},
}),
});
}
3 changes: 3 additions & 0 deletions src/core/http/http.adapter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import compression from '@fastify/compress';
import cookieParser from '@fastify/cookie';
import cors from '@fastify/cors';
import websocket from '@fastify/websocket';
import { DiscoveryService } from '@golevelup/nestjs-discovery';
import {
VERSION_NEUTRAL,
Expand Down Expand Up @@ -109,6 +110,8 @@ export class HttpAdapter extends PatchedFastifyAdapter {
// Only on routes we've decorated.
await app.register(rawBody, { global: false });

await app.register(websocket);

app.setGlobalPrefix(config.hostUrl$.value.pathname.slice(1));

config.applyTimeouts(app.getHttpServer(), config.httpTimeouts);
Expand Down
34 changes: 16 additions & 18 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,17 @@ __metadata:
languageName: node
linkType: hard

"@fastify/websocket@npm:^11.0.1":
version: 11.0.1
resolution: "@fastify/websocket@npm:11.0.1"
dependencies:
duplexify: "npm:^4.1.3"
fastify-plugin: "npm:^5.0.0"
ws: "npm:^8.16.0"
checksum: 10c0/4b5fd27596b6978b87a78f3d0afef9f42e4c6690c43ac28f1b49c61206305216765c6d81c21f089f4f5421a98fd4faae2f5fc49b64783481da5b638e10151b22
languageName: node
linkType: hard

"@ffprobe-installer/darwin-arm64@npm:5.0.1":
version: 5.0.1
resolution: "@ffprobe-installer/darwin-arm64@npm:5.0.1"
Expand Down Expand Up @@ -5645,6 +5656,7 @@ __metadata:
"@fastify/compress": "npm:^8.0.1"
"@fastify/cookie": "npm:^11.0.1"
"@fastify/cors": "npm:^10.0.1"
"@fastify/websocket": "npm:^11.0.1"
"@ffprobe-installer/ffprobe": "npm:^2.1.2"
"@golevelup/nestjs-discovery": "npm:^4.0.0"
"@graphql-hive/cli": "npm:^0.44.2"
Expand Down Expand Up @@ -5710,6 +5722,7 @@ __metadata:
graphql: "npm:^16.9.0"
graphql-parse-resolve-info: "npm:^4.14.0"
graphql-scalars: "npm:^1.22.4"
graphql-ws: "npm:^5.16.0"
graphql-yoga: "npm:^5.7.0"
human-format: "npm:^1.2.0"
husky: "npm:^4.3.8"
Expand Down Expand Up @@ -6269,7 +6282,7 @@ __metadata:
languageName: node
linkType: hard

"duplexify@npm:^4.1.1":
"duplexify@npm:^4.1.1, duplexify@npm:^4.1.3":
version: 4.1.3
resolution: "duplexify@npm:4.1.3"
dependencies:
Expand Down Expand Up @@ -7816,7 +7829,7 @@ __metadata:
languageName: node
linkType: hard

"graphql-ws@npm:^5, graphql-ws@npm:^5.14.0":
"graphql-ws@npm:^5, graphql-ws@npm:^5.14.0, graphql-ws@npm:^5.16.0":
version: 5.16.0
resolution: "graphql-ws@npm:5.16.0"
peerDependencies:
Expand Down Expand Up @@ -13913,22 +13926,7 @@ __metadata:
languageName: node
linkType: hard

"ws@npm:8.16.0":
version: 8.16.0
resolution: "ws@npm:8.16.0"
peerDependencies:
bufferutil: ^4.0.1
utf-8-validate: ">=5.0.2"
peerDependenciesMeta:
bufferutil:
optional: true
utf-8-validate:
optional: true
checksum: 10c0/a7783bb421c648b1e622b423409cb2a58ac5839521d2f689e84bc9dc41d59379c692dd405b15a997ea1d4c0c2e5314ad707332d0c558f15232d2bc07c0b4618a
languageName: node
linkType: hard

"ws@npm:^8.17.1":
"ws@npm:^8, ws@npm:^8.16.0, ws@npm:^8.17.1":
version: 8.18.0
resolution: "ws@npm:8.18.0"
peerDependencies:
Expand Down
Loading