Skip to content

Commit

Permalink
Drop support for native Node Readable stream: require manual conversi…
Browse files Browse the repository at this point in the history
…on to Node Web Streams

Utils to convert from and to Web Streams in Node are available from v17,
see https://nodejs.org/api/stream.html#streamreadabletowebstreamreadable-options .

Previously, we automatically converted between Node native streams and custom, Web-like Readable streams.
This led to occasional issues.
  • Loading branch information
larabr committed Jan 15, 2024
1 parent 84b9315 commit 2097f90
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 130 deletions.
13 changes: 4 additions & 9 deletions lib/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
/// <reference lib="dom" />

import { ReadableStream as NodeWebReadableStream } from 'node:stream/web';
type Data = Uint8Array | string;

export type WebStream<T extends Data> = ReadableStream<T>;

interface NodeStream<T extends Data> extends AsyncIterable<T> { // copied+simplified version of ReadableStream from @types/node/index.d.ts, which does no support generics
readable: boolean; pipe: Function; unpipe: Function; wrap: Function; setEncoding(encoding: string): this; pause(): this; resume(): this;
isPaused(): boolean; unshift(chunk: string | Uint8Array): void;
read(size?: number): T;
}
export type WebStream<T extends Data> = ReadableStream<T>
export type NodeWebStream<T extends Data> = NodeWebReadableStream<T>;

type Stream<T extends Data> = WebStream<T> | NodeStream<T>;
type Stream<T extends Data> = WebStream<T> | NodeWebStream<T>;
type MaybeStream<T extends Data> = T | Stream<T>;

export function readToEnd<T extends Data, R extends any = T>(input: MaybeStream<T>, join?: (chunks: T[]) => R): Promise<R>;
2 changes: 0 additions & 2 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
export * from './streams.js';

export { webToNode, nodeToWeb } from './node-conversions/index.js';

export { isStream, isArrayStream, isUint8Array, concatUint8Array } from './util.js';
2 changes: 0 additions & 2 deletions lib/node-conversions/browser-fallback.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
export const NodeReadableStream = null;
export const NodeBuffer = null;
export const webToNode = null;
export const nodeToWeb = null;
2 changes: 1 addition & 1 deletion lib/node-conversions/index.js
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export { webToNode, nodeToWeb, NodeBuffer, NodeReadableStream } from './node.js';
export { NodeBuffer, NodeReadableStream } from './node.js';
86 changes: 0 additions & 86 deletions lib/node-conversions/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,89 +2,3 @@
import { Buffer as NodeBuffer } from 'buffer';
import { Readable as NodeReadableStream } from 'stream';
export { NodeBuffer, NodeReadableStream };

import { getReader } from '../streams.js';

/**
* Web / node stream conversion functions
* From https://github.com/gwicke/node-web-streams
*/

/**
* Convert a Node Readable Stream to a Web ReadableStream
* @param {Readable} nodeStream
* @returns {ReadableStream}
*/
export function nodeToWeb(nodeStream) {
let canceled = false;
return new ReadableStream({
start(controller) {
nodeStream.pause();
nodeStream.on('data', chunk => {
if (canceled) {
return;
}
if (NodeBuffer.isBuffer(chunk)) {
chunk = new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength);
}
controller.enqueue(chunk);
nodeStream.pause();
});
nodeStream.on('end', () => {
if (canceled) {
return;
}
controller.close();
});
nodeStream.on('error', e => controller.error(e));
},
pull() {
nodeStream.resume();
},
cancel(reason) {
canceled = true;
nodeStream.destroy(reason);
}
});
}


class NodeReadable extends NodeReadableStream {
constructor(webStream, options) {
super(options);
this._reader = getReader(webStream);
}

async _read(size) {
try {
// eslint-disable-next-line no-constant-condition
while (true) {
const { done, value } = await this._reader.read()
if (done) {
this.push(null);
break;
}
if (!this.push(value) || this._cancelling) {
this._reading = false;
break;
}
}
} catch(e) {
this.emit('error', e);
}
}

_destroy(reason) {
this._reader.cancel(reason);
}
}

/**
* Convert a Web ReadableStream to a Node Readable Stream
* @param {ReadableStream} webStream
* @param {Object} options
* @returns {Readable}
*/
export function webToNode(webStream, options) {
return new NodeReadable(webStream, options);
}
2 changes: 1 addition & 1 deletion lib/node-conversions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
"./node.js": "./browser-fallback.js"
},
"exports": {
".": "index.js"
".": "./index.js"
}
}
4 changes: 0 additions & 4 deletions lib/reader.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { isUint8Array, isStream, isArrayStream } from './util.js';
import { nodeToWeb } from './node-conversions/index.js';
import * as streams from './streams.js';

const doneReadingSet = new WeakSet();
Expand All @@ -25,9 +24,6 @@ function Reader(input) {
return;
}
let streamType = isStream(input);
if (streamType === 'node') {
input = nodeToWeb(input);
}
if (streamType) {
const reader = input.getReader();
this._read = reader.read.bind(reader);
Expand Down
7 changes: 2 additions & 5 deletions lib/streams.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { isStream, isArrayStream, isUint8Array, concatUint8Array } from './util.js';
import { NodeBuffer, nodeToWeb } from './node-conversions/index.js';
import { NodeBuffer } from './node-conversions/index.js';
import { Reader, externalBuffer } from './reader.js';
import { ArrayStream, Writer } from './writer.js';

Expand All @@ -10,9 +10,6 @@ import { ArrayStream, Writer } from './writer.js';
*/
function toStream(input) {
let streamType = isStream(input);
if (streamType === 'node') {
return nodeToWeb(input);
}
if (streamType) {
return input;
}
Expand All @@ -25,7 +22,7 @@ function toStream(input) {
}

/**
* Convert data to ArrayStream
* Convert non-streamed data to ArrayStream; this is a noop if `input` is already a stream.
* @param {Object} input data to convert
* @returns {ArrayStream} Converted data
*/
Expand Down
2 changes: 1 addition & 1 deletion lib/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function isStream(input) {
return 'web';
}
if (NodeReadableStream && NodeReadableStream.prototype.isPrototypeOf(input)) {
return 'node';
throw new Error('Native Node streams are no longer supported: please manually convert the stream to a WebStream, using e.g. `stream.Readable.toWeb`');
}
if (input && input.getReader) {
return 'web-like';
Expand Down
3 changes: 3 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
"buffer": false,
"stream": false
},
"engines": {
"node": ">= 16.5.0"
},
"repository": {
"type": "git",
"url": "https://github.com/openpgpjs/web-stream-tools.git"
Expand Down
6 changes: 1 addition & 5 deletions test/browser.test.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import { expect } from 'chai';
import { webToNode, toStream, readToEnd } from '../lib/index.js';
import { toStream, readToEnd } from '../lib/index.js';

describe('Browser integration tests', () => {
it('Node.js specific utils are not defined', () => {
expect(webToNode).to.be.null;
})

it('toStream/readToEnd', async () => {
const input = 'chunk';
const streamedData = toStream('chunk');
Expand Down
24 changes: 19 additions & 5 deletions test/node.test.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
import { expect } from 'chai';

import { webToNode, toStream, readToEnd } from '@openpgp/web-stream-tools';
import { Readable } from 'stream';
import { ReadableStream as NodeWebReadableStream } from 'stream/web';
import { toStream, readToEnd } from '@openpgp/web-stream-tools';

describe('Node integration tests', () => {
it('Node.js specific utils are defined', () => {
expect(webToNode).to.not.be.null;
})
it('throws on node native stream', async () => {
const input = new Readable();
expect(() => toStream(input)).to.throw(/Native Node streams are no longer supported/);
});

it('accepts on node web stream', async () => {
const input = 'chunk';
const stream = new NodeWebReadableStream({
start(controller) {
controller.enqueue('chunk');
controller.close();
}
});
const streamedData = toStream(stream);
expect(await readToEnd(streamedData)).to.equal(input);
});

it('toStream/readToEnd', async () => {
const input = 'chunk';
Expand Down
21 changes: 12 additions & 9 deletions test/typescript.test.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
import assert from 'assert';
import { Readable as NodeReadableStream } from 'stream';
import { Readable as NodeNativeReadableStream } from 'stream';
import { ReadableStream as NodeWebReadableStream } from 'node:stream/web';
import { ReadableStream as WebReadableStream } from 'web-streams-polyfill';
import type { WebStream, NodeStream } from '@openpgp/web-stream-tools';
import type { WebStream, NodeWebStream } from '@openpgp/web-stream-tools';
import { readToEnd } from '@openpgp/web-stream-tools';
// @ts-ignore missing defs
import { ArrayStream, isArrayStream } from '@openpgp/web-stream-tools';

(async () => {
const nodeStream: NodeStream<string> = new NodeReadableStream();
assert(nodeStream instanceof NodeReadableStream);
// @ts-expect-error detect type parameter mismatch
const webStream: WebStream<string> = new ReadableStream<Uint8Array>();
const webStream: WebStream<string> = new WebReadableStream<string>();
assert(webStream instanceof WebReadableStream);

const anotherWebStream: WebStream<Uint8Array> = new ReadableStream<Uint8Array>();
const nodeWebStream: NodeWebStream<string> = NodeNativeReadableStream.toWeb(new NodeNativeReadableStream());
assert(nodeWebStream instanceof NodeWebReadableStream);
// @ts-expect-error detect type parameter mismatch
const anotherWebStream: WebStream<string> = new WebReadableStream<Uint8Array>();
assert(anotherWebStream instanceof WebReadableStream);
// @ts-expect-error detect node stream type mismatch
const nodeNativeStream: NodeWebStream<string> = new NodeNativeReadableStream();
assert(nodeNativeStream instanceof NodeNativeReadableStream);

await readToEnd(new Uint8Array([1])) as Uint8Array;
await readToEnd(new Uint8Array([1]), _ => _) as Uint8Array[];

assert(isArrayStream(new ArrayStream())); // ensure Array is actually extended in e.g. es5
assert(isArrayStream(new ArrayStream())) ; // ensure Array is actually extended in e.g. es5

console.log('TypeScript definitions are correct');
})().catch(e => {
Expand Down

0 comments on commit 2097f90

Please sign in to comment.