lib: implement various stream utils for webstreams#39517
Conversation
Implements the finished, addAbortSignal, and pipeline functions for webstreams, and adds the documentation and tests for each function. The promisified versions of finished and pipeline are also added. Refs: #39316
|
|
||
| throw new ERR_INVALID_ARG_TYPE( | ||
| 'stream', | ||
| 'ReadableStream|WritableStream|TransformStream', |
There was a problem hiding this comment.
| 'ReadableStream|WritableStream|TransformStream', | |
| ['ReadableStream', 'WritableStream', 'TransformStream'], |
| if (streams.some(cannotWriteToStream)) { | ||
| throw new ERR_INVALID_ARG_TYPE( | ||
| 'streams', | ||
| 'WritableStream|TransformStream', |
There was a problem hiding this comment.
| 'WritableStream|TransformStream', | |
| ['WritableStream', 'TransformStream'], |
| if (streams.some(cannotReadFromStream)) { | ||
| throw new ERR_INVALID_ARG_TYPE( | ||
| 'streams', | ||
| 'ReadableStream|TransformStream', |
There was a problem hiding this comment.
| 'ReadableStream|TransformStream', | |
| ['ReadableStream', 'TransformStream'], |
|
A lot of unrelated style changes in |
|
|
||
| function readableStreamAddFinishedCallback(stream, callback) { | ||
| assert(isReadableStream(stream)); | ||
| stream[kState].finishedCallbacks.push(callback); |
There was a problem hiding this comment.
| stream[kState].finishedCallbacks.push(callback); | |
| ArrayPrototypePush(stream[kState].finishedCallbacks, callback); |
|
|
||
| function writableStreamAddFinishedCallback(stream, callback) { | ||
| assert(isWritableStream(stream)); | ||
| stream[kState].finishedCallbacks.push(callback); |
There was a problem hiding this comment.
| stream[kState].finishedCallbacks.push(callback); | |
| ArrayPrototypePush(stream[kState].finishedCallbacks, callback); |
aduh95
left a comment
There was a problem hiding this comment.
The code contains utility functions that are only used once (such as signalIsAborted, validateInputs, throwErrorIfNotWebstream, addAbortCallbackToSignal, etc.), which makes the code hard to follow. Could you try to remove them? Instead you can use comments to explain what the code is doing.
| --> | ||
|
|
||
| * Type: {WritableStream} | ||
| ### `webstream.finished(stream, callback)` |
There was a problem hiding this comment.
| ### `webstream.finished(stream, callback)` | |
| ### `webstream.finished(stream, callback)` |
| ``` | ||
| The `finished` API also provides promise version: |
There was a problem hiding this comment.
| ``` | |
| The `finished` API also provides promise version: | |
| ``` | |
| The `finished` API also provides promise version: |
ronag
left a comment
There was a problem hiding this comment.
We should just add support for web streams in existing stream functions. Not implement new ones.
@VoltrexMaster The only formatting tool I used was eslint with the given config file in the repo, so I'm unsure as to why this has happened. Let me know if I'm missing something, but otherwise I'll see if I can sort this out. |
@ronag Understood. Is it satisfactory to add a conditional flow in the existing functions which calls the functions added in this request? |
| validateInputs(signal, stream); | ||
|
|
||
| const onAbort = () => { | ||
| stream.abort(new AbortError()); |
There was a problem hiding this comment.
abort doesn't exist on web streams? Do they?
There was a problem hiding this comment.
Only on WritableStream. On ReadableStream it needs to be cancel().
There is another fundamental problem with this, however... if the ReadableStream has a Reader, or the WritableStream has a Writer, their cancel() and abort() methods cannot be called directly... and there's no public API for knowing if either is locked. If they are, there's no public API for getting the Reader or the Writer and those have to be used for doing the canceling/aborting.
Instead, in order for us to do this, we need to rely on the internal functions and cannot rely on the public API. This also means that these will only work for the core implementations of web streams (userland implementations would not be supported).
There was a problem hiding this comment.
@jasnell do you think this finished for webstreams is still a worthwhile development given that this is the case?
I think for For For |
| * `TransformStream` - Represents an algorithm for transforming streaming data. | ||
|
|
||
| Additionally, this module includes the utility functions | ||
| `pipeline()`, `finished()`, and `addAbortSignal()`. |
There was a problem hiding this comment.
I'm -1 on having this also exported on stream/web. Just have the stream exports support the additional types.
| const { | ||
| DOMException, | ||
| } = internalBinding('messaging'); | ||
| const { DOMException } = internalBinding('messaging'); |
There was a problem hiding this comment.
Unrelated style changes. Please leave the require statements formatted as they are.
| source, | ||
| extractHighWaterMark(highWaterMark, 0)); | ||
| extractHighWaterMark(highWaterMark, 0) | ||
| ); |
There was a problem hiding this comment.
All of the unrelated style changes in here make this difficult to review.
| */ | ||
| abort(reason = undefined) { | ||
| this.cancel(reason); | ||
| } |
There was a problem hiding this comment.
We cannot add methods to the standard class.
| */ | ||
| abort(reason = undefined) { | ||
| this?.readable?.abort?.(reason); | ||
| } |
There was a problem hiding this comment.
We cannot add methods to the standard API
| stream[kState].finishedCallbacks?.forEach((cb) => { | ||
| cb(stream[kState].storedError); | ||
| }); | ||
| } |
There was a problem hiding this comment.
I think this is the wrong approach. the WritableStream and ReadableStream APIs already have mechanisms for monitoring when each are closed via the closed promises on the Writer and Reader.
jasnell
left a comment
There was a problem hiding this comment.
I appreciate the work here but this has quite a few reasons why it's the wrong approach.
First, there are way too many unrelated style changes that make it impossible to effectively review. All of the purely style changes should be removed.
Second, this adds non-standard methods to the ReadableStream and TransportStream APIs that we just can't add.
Third, the abort controller implementation fails to account for locked streams.
Fourth, this should modify the existing require('stream').pipeline|finished|addAbortSignal() methods in place rather than creating new versions of them, creating new exports off stream/web, or introducing the new stream/web/promises.
Fifth, the way this adds the finish callbacks is problematic and definitely not ideal.
mcollina
left a comment
There was a problem hiding this comment.
I'm -1 for landing this. Support for webstreams should be added to the utilities we already have.
Please always tag @nodejs/streams when discussing streams PRs.
|
Thanks for all the feedback and sorry for it being a bit of a mess! Looks like I have some work to do... Seeing as though it'll be a good few changes, I'm going to close this PR for now. |
|
Any updates on this @sam-stanford ? Is this still in progress? |
Yep! Been away since I last took a look at this, so a little behind where is expected, but I'm making progress. I'm still learning the ropes for node and open source as a whole, so if this is something which needs doing quickly, feel free to take the reigns :) |
|
All right, I'll take a stab at it... Great work so far though! |

Implements the finished, addAbortSignal, and pipeline functions for webstreams, and adds the documentation and tests for each function. The promisified versions of finished and pipeline are also added.
Refs: #39316