Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release reader immediately when shutting down a pipe #1208

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

MattiasBuelens
Copy link
Collaborator

@MattiasBuelens MattiasBuelens commented Jan 16, 2022

In #1207, we raised concern about whether it's currently possible for pipeTo() to drop chunks when the pipe shuts down while it still has a pending read request.

It turns out that this is indeed possible. The new WPT tests demonstrate at least one way this can happen: by aborting a pipe with an AbortSignal, and then enqueuing a chunk immediately after aborting the signal.

This PR fixes this by releasing the pipe's reader immediately when starting the shutdown process, rather than when it finalizes the shutdown. This ensures that any pending reads that were started are immediately rejected, and the chunks stay in the source's queue.

This does have the side effect of releasing the reader before the pipe promise resolves. I don't know if this is acceptable. Fixed in #1208 (comment).

(See WHATWG Working Mode: Changes for more details.)


Preview | Diff

@MattiasBuelens
Copy link
Collaborator Author

This does have the side effect of releasing the reader before the pipe promise resolves. I don't know if this is acceptable.

Hmm, this means that the pipe could still cancel the readable after it has released its reader lock but before it settles the pipe promise. That might be quite unexpected: the user might have already acquired a new reader, and only that new reader should have the ability to cancel the stream.

Possible solutions:

  1. We could try to split up ReadableStream(Default|BYOB)Release, such that the read requests are already rejected but the reader is still locked to the stream. We would then actually release the reader as part of the "finalize" steps (as in the current spec).
  2. We could make the pipe releases its reader, and immediately acquire a new reader. It would no longer use that reader to request new chunks (since shuttingDown is already true), it would only be used to keep the stream locked (in case we still want to cancel it).

Option 2 is probably the easiest to implement, but it may require a note to explain why this weird "release + acquire" dance is necessary. Then again, we already mention that:

the exact manner in which this happens is not observable to author code, and so there is flexibility in how this is done

so user agents are free to implement these steps differently if they want. 🤷‍♂️

Copy link
Member

@domenic domenic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like either option (1) or (2) is observably equivalent, right? My instinct would then be to go with whichever is easiest to spec (probably (2)?), and then explain that it's equivalent to the other one.

If we go with (2) and explain it's equivalent to (1), we're explaining the weird dance and saying that perhaps a more natural implementation would be (1), if you're less into code reuse than we are.

If we go with (1) and explain it's equivalent to (2), then we're explaining how despite doing some tricky direct manipulations of the internals, we're not actually doing anything which you couldn't already do with the public API.

index.bs Show resolved Hide resolved
@MattiasBuelens
Copy link
Collaborator Author

Yes, they're equivalent. I'll go with option 2. 👍

@MattiasBuelens MattiasBuelens force-pushed the pipe-release-reader-on-shutdown branch from 5e5adad to 15a9768 Compare January 19, 2022 22:02
@MattiasBuelens MattiasBuelens force-pushed the pipe-release-reader-on-shutdown branch from cf688a5 to 7008ee5 Compare January 19, 2022 22:25
Comment on lines +206 to +208
if (dest._state !== 'writable' || WritableStreamCloseQueuedOrInFlight(dest) === true) {
return promiseResolvedWith(true);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implements @domenic's suggestion from #1207 (comment).

I don't know if we need to update the spec text for this. It already specifies that these checks must happen before performing any reads and writes:

Shutdown must stop activity: if shuttingDown becomes true, the user agent must not initiate further reads from reader, and must only perform writes of already-read chunks, as described below. In particular, the user agent must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown.

We should still add a test for this particular case (although that might not be easy looking at the discussion in #1207).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't think we need to update the spec text.

Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the reference implementation to fix the issue from #1207 (comment). I still need to update the spec text, although this might be tricky because the pipeTo() specification is a bit vague on how to implement the "if dest becomes errored" check. 😕

Also, is it okay to piggyback this fix onto this PR, or would you prefer I move it to a separate one? (The new fix does depend on the initial fix though, we can't land it entirely separately.)

@@ -228,7 +235,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
});

// Errors must be propagated backward
isOrBecomesErrored(dest, writer._closedPromise, storedError => {
WritableStreamDefaultWriterIsOrBecomesErrored(writer, () => {
Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens Jan 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new helper allows attaching a synchronous callback for when dest becomes "erroring" or "errored", following the discussion in #1207 (comment). I added a test for this in web-platform-tests/wpt@1646d65.

is or becomes "`errored`", then
is or becomes "`erroring`" or "`errored`", then
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's useful to keep the pipe going when dest has already become "erroring"? Any new writes will just error immediately, as per step 9 of WritableStreamDefaultWriterWrite.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

is or becomes "`errored`", then
is or becomes "`erroring`" or "`errored`", then
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

Comment on lines +206 to +208
if (dest._state !== 'writable' || WritableStreamCloseQueuedOrInFlight(dest) === true) {
return promiseResolvedWith(true);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't think we need to update the spec text.

@@ -475,6 +483,20 @@ function WritableStreamDefaultWriterGetDesiredSize(writer) {
return WritableStreamDefaultControllerGetDesiredSize(stream._controller);
}

function WritableStreamDefaultWriterIsOrBecomesErrored(writer, errorListener) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe since this is not part of the standard, it should start with a lower-case letter?

Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens Jan 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. 👍

What do you suggest we do put in the spec text?

  • We can add a note saying that "errors must be propagated backward" must be handled synchronously as soon as that condition becomes true, in other words that it's not enough to add an asynchronous callback to writer.closed.
    • On the other hand: since the condition is now "is or becomes erroring or errored", maybe it's already clear enough that writer.closed is not good enough?
  • We can add a note in WritableStreamStartErroring below the "set stream.[[state]] to "erroring"" step to remind implementers that this is where that condition from pipeTo() can become true.
  • Alternatively, we tell implementers to look at the reference implementation for an example on how to do it... 😛

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this very specific to the one change from "writable" to "erroring"? In the reference implementation you've added a much more general listener setup.

I would suggest a general note saying "for all the 'becomes' conditions in the above, they must be processed synchronously as part of the [[state]] update, before any other web developer code can run." And then, if we anticipate that only being impactful in the one transition, we could append the extra note: "NOTE: Currently this requirement only has observable consequences for [the transition for writable stream states from from "writable" to "erroring"], and others could be done as asynchronous listeners". Or, if we think we might expand this listener usage in the future, then we should probably omit that note.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this very specific to the one change from "writable" to "erroring"?

That's the most noticeable case, since it determines whether or not we may drop a chunk (i.e. we accidentally read a chunk that we can no longer write).

I'm not sure whether it matters for the state transitions of the readable end. There might be an edge case where two shutdown conditions become true at the same time, and then it matters which condition is handled first. For example:

readableController.error(error1); // pipeTo() should immediately call writer.abort(error1)
writableController.error(error2); // should be ignored, since writable is already erroring
// => pipeTo() rejects with error1

versus:

writableController.error(error2); // pipeTo() should immediately call reader.cancel(error2)
readableController.error(error1); // should be ignored, since readable is already closed
// => pipeTo() rejects with error2

If we were to use a synchronous reaction for the writable -> erroring transition but an asynchronous reaction for the readable -> errored transition, then the first snippet would also behave like the second one... I think. 😛

I'll try to whip up some more WPTs to double check.

Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are two possible tests for this:

promise_test(async t => {
  const rs = recordingReadableStream();
  const ws = recordingWritableStream();

  const pipeToPromise = rs.pipeTo(ws);
  await flushAsyncEvents();

  rs.controller.error(error1);
  ws.controller.error(error2);

  await promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo must reject with readable\'s error');
  assert_array_equals(rs.eventsWithoutPulls, []);
  assert_array_equals(ws.events, []);

  await promise_rejects_exactly(t, error1, rs.getReader().closed);
  await promise_rejects_exactly(t, error2, ws.getWriter().closed);
}, 'Piping: error the readable stream right before erroring the writable stream');

promise_test(async t => {
  const rs = recordingReadableStream();
  const ws = recordingWritableStream();

  const pipeToPromise = rs.pipeTo(ws);
  await flushAsyncEvents();

  ws.controller.error(error1);
  rs.controller.error(error2);

  await promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo must reject with writable\'s error');
  assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
  assert_array_equals(ws.events, []);

  await promise_rejects_exactly(t, error1, ws.getWriter().closed);
  await rs.getReader().closed;
}, 'Piping: error the writable stream right before erroring the readable stream');

The behavior might be a bit surprising though. In the first test, ws is still writable when we call rs.controller.error(), so we end up in:

uponFulfillment(waitForWritesToFinish(), doTheRest);

This adds at least one microtask of delay (even if there are no pending writes), so we will not yet call ws.abort(error1). Instead, ws.controller.error(error2) goes through, and the abort gets ignored later on.

However, in the second test, because ws immediately becomes errored, we don't wait for pending writes to complete and instead we synchronously call rs.cancel(error1). Therefore, rs.controller.error(error2) gets ignored, and the stream ends up cancelled instead of errored.


The specification is a bit vague about this. It says:

Wait until every chunk that has been read has been written (i.e. the corresponding promises have settled).

It doesn't say how long this step can take. We may want to require that if there are no pending writes (i.e. we've never started any writes, or all writes have already settled), then this step must complete synchronously. Then, in the first test, we would call ws.abort(error1) synchronously and prevent ws.controller.error(error2). However, that might be tricky to actually implement correctly... 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest limiting the sync part to as small as possible to fix the issue. This still leaves the problem of how to spec it. We've tried to give latitude for implementations to optimise in their own way, but we're increasingly constraining their behaviour. Transparent thread offloading etc. may become impossible. I'm worried about it but I don't have an answer.

Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest limiting the sync part to as small as possible to fix the issue.

The sync part is already minimal. We have to go from "if source becomes errored" all the way to "perform WritableStreamAbort" in order to avoid ws.controller.error() from affecting the result. Thus, the entirety of step 3 in "shutdown with an action" must become synchronous (only if there are no pending writes that need to be awaited).


Anyway, I found another way to fix it. We keep track of how many ReadableStreamDefaultReaderRead() requests are still outstanding, and we only handle the source.[[state]] == "closed" transition after all those requests are settled. See MattiasBuelens@3c8b3c2.

However, this test still fails. We do call dest.abort() and source.cancel() in the correct order, but it seems like underlyingSink.abort() and underlyingSource.cancel() are being called in the wrong order. When we call WritableStreamStartErroring, the writable controller is not yet started, so we postpone calling sink.abort() until after sink.start() resolves. Previously, the writable would already have been started while we were asynchronously waiting for the writes to finish (even if there were no pending writes).

Adding await flushAsyncEvents() before calling pipeTo() in that test restores the order and fixes the problem. Good enough? 🤷‍♂️


We've tried to give latitude for implementations to optimise in their own way, but we're increasingly constraining their behaviour. Transparent thread offloading etc. may become impossible. I'm worried about it but I don't have an answer.

I agree, the reference implementation is becoming increasingly complicated in order to deal with these edge cases. 😞

I'm wondering if it's even worth trying to spec these edge cases, or instead allow some wiggle room in how quickly pipeTo() must respond to these state transitions. But then it would become impossible to test the behavior, or we'd have to allow multiple valid outcomes... 😕

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect most of the testable constraints we're imposing are in cases where the web developer controls one or both ends of the pipe, right? I'm not sure those are the ones we were planning to feasibly optimize, so starting to constrain them still seems like the right thing to do to me. But I might be missing something so please let me know.

On the larger problem, the root of the issue seems to be how imprecise "[[state]] is or becomes" is. Does that mean: (1) synchronously after the algorithm step which sets [[state]], probably interrupting other streams-implementation code; (2) synchronously after any streams-implementation code runs; (3) synchronously after any browser code runs; (4) asynchronously is OK to some degree?

My preference would be to try to resolve things like so:

  • Decide whether we're OK constraining all observable behavior, or want to allow leeway. In particular when one or both ends of a pipe are web-developer-created streams, a good bit more becomes observable.
  • Write tests reflecting the result of the previous decision. E.g. if we want to nail down all observable behavior, I think @MattiasBuelens has done a great job capturing as many scenarios as possible. (❤️!) We should analyze them for what reasonable expected behavior is, and assert that. (If we don't have strong feelings on reasonable expected behavior, then we can feel free to change the assertions in the next step.)
  • Come up with some more-rigorous formulation of "[[state]] is or becomes" for the spec which meets the expectations of those tests. This probably will make the spec more complex, and more like the reference implementation. E.g. it could be adding promise handlers (probably in combination with something like MattiasBuelens@3c8b3c2), or having separate synchronous state-change steps. Given that this will only be used for pipeTo, we can probably consider spec strategies that aren't as detailed and algorithmic as the rest of the spec, but they do need to be clear and unambigious between the (1)-(4) above.

As an example of how to apply this process,

Adding await flushAsyncEvents() before calling pipeTo() in that test restores the order and fixes the problem. Good enough? 🤷‍♂️

My preference would be that, if we decide to constrain all observable behavior, we have both variants of the test, with the version without flushAsyncEvents() having the assert for the other order.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect most of the testable constraints we're imposing are in cases where the web developer controls one or both ends of the pipe, right? I'm not sure those are the ones we were planning to feasibly optimize, so starting to constrain them still seems like the right thing to do to me. But I might be missing something so please let me know.

Correct. Streams created by the user agent will use the exported algorithms, and I think it's safe to assume that those will be called in a separate task, outside of web author code.

On the larger problem, the root of the issue seems to be how imprecise "[[state]] is or becomes" is. Does that mean: (1) synchronously after the algorithm step which sets [[state]], probably interrupting other streams-implementation code; (2) synchronously after any streams-implementation code runs; (3) synchronously after any browser code runs; (4) asynchronously is OK to some degree?

(2) may be ill-specified, since there are cases where streams code calls into author code, which can then call back into streams code. We've even had cases in the past where streams code calls back into itself, e.g. #1172.

I still prefer (1), and that's what I've been implementing. Yes, we need to be very careful when speccing, but at least any problems that arise can be fixed within the streams implementation.

My preference would be that, if we decide to constrain all observable behavior, we have both variants of the test, with the version without flushAsyncEvents() having the assert for the other order.

That seems reasonable. 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming back to this:

Adding await flushAsyncEvents() before calling pipeTo() in that test restores the order and fixes the problem. Good enough? 🤷‍♂️

It seems the main difference is that, when you call readable.cancel(), we always synchronously call source.cancel() regardless of whether source.start() has already settled. On the other hand, when you call writable.abort(), we first wait for sink.start() to settle before we call sink.abort().

IIRC the reason for this difference is so you can do e.g. an async loop in source.start():

new ReadableStream({
  async start(c) {
    for (let i = 0; i < 10; i++) {
      await new Promise(r => setTimeout(r, 1000));
      c.enqueue("chunk");
    }
    c.close();
  }
})

whereas for sink.start() this doesn't make sense.

I guess, if we really wanted to, we could have the test check when writableController.signal becomes aborted? That should happen synchronously regardless of whether sink.start() has already settled.

@MattiasBuelens
Copy link
Collaborator Author

MattiasBuelens commented Jan 25, 2022

I've generalized the implementation a bit. I added two new helpers:

  • readerAddStateChangeListener(reader, stateChangeListener)
    • Calls stateChangeListener the next time reader._stream._state changes.
  • writerAddStateChangeListener(writer, stateChangeListener)
    • Calls stateChangeListener the next time writer._stream._state changes.

The pipeTo() reference implementation now uses these helpers instead of reader._closedPromise and writer._closedPromise to detect state changes. A checkState() helper checks the shutdown conditions when the pipe starts, and when the state of one of the streams changes.

This should handle all possible synchronous state transitions, and check the shutdown conditions in the correct order as specified by the standard's text.

(Perhaps we want to also use these helpers for tee(), and possibly eliminate the extra microtask from readRequest's chunk steps there? I'll leave that for another time though. 😛)

index.bs Show resolved Hide resolved
@@ -475,6 +483,20 @@ function WritableStreamDefaultWriterGetDesiredSize(writer) {
return WritableStreamDefaultControllerGetDesiredSize(stream._controller);
}

function WritableStreamDefaultWriterIsOrBecomesErrored(writer, errorListener) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this very specific to the one change from "writable" to "erroring"? In the reference implementation you've added a much more general listener setup.

I would suggest a general note saying "for all the 'becomes' conditions in the above, they must be processed synchronously as part of the [[state]] update, before any other web developer code can run." And then, if we anticipate that only being impactful in the one transition, we could append the extra note: "NOTE: Currently this requirement only has observable consequences for [the transition for writable stream states from from "writable" to "erroring"], and others could be done as asynchronous listeners". Or, if we think we might expand this listener usage in the future, then we should probably omit that note.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants