-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Ensure all data is written before closing FoundationStreamBridge #713
Conversation
// `FoundationStreamBridge` is not usable on Linux because it uses ObjC-interop features, | ||
// so this test is disabled there. | ||
|
||
#if os(iOS) || os(macOS) || os(watchOS) || os(tvOS) || os(visionOS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we don't use a Stream & RunLoop in FoundationStreamBridge
, we can test it on all the Apple platforms & their simulators.
@@ -410,7 +410,7 @@ public final class URLSessionHTTPClient: HTTPClient { | |||
// that URLSession can stream its request body from. | |||
// Allow 16kb of in-memory buffer for request body streaming | |||
let streamBridge = requestStream.map { | |||
FoundationStreamBridge(readableStream: $0, bufferSize: 16_384, logger: logger) | |||
FoundationStreamBridge(readableStream: $0, bridgeBufferSize: 16_384, logger: logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just changed the name of the param here.
|
||
// The max size of the buffer for data being streamed. | ||
let maxBufferSize = 2 * maxDataSize | ||
let maxDataSize = 65_536 // 64 kb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the data for each test run a little bigger than before. Has little effect on the time needed to complete tests.
@@ -39,7 +33,8 @@ class FoundationStreamBridgeTests: XCTestCase { | |||
|
|||
// Create a temp buffer we can use to copy the input stream bytes | |||
// We are responsible for deallocating it | |||
let tempBuffer = UnsafeMutablePointer<UInt8>.allocate(capacity: maxDataSize) | |||
let tempBufferSize = maxDataSize | |||
let tempBuffer = UnsafeMutablePointer<UInt8>.allocate(capacity: tempBufferSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarify code a bit by having a tempBufferSize
to go with the tempBuffer
.
let bridgeBufferSize = Int.random(in: 1...(2 * dataSize)) | ||
|
||
// The bound stream buffer may be as small as 1 byte, up to 2x as big as the data size | ||
let boundStreamBufferSize = Int.random(in: 1...(2 * dataSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Buffer sizes for the bridge buffer and the bound stream buffer are set separately. (Before, these two buffers were set to the same size.)
This helps to create scenarios where either buffer may be partially consumed during streaming.
|
||
// Fill a data buffer with dataSize random numbers | ||
let originalData = Data(bytes: randomBuffer, count: dataSize) | ||
|
||
// Create a stream bridge with our original data & open it | ||
let bufferedStream = BufferedStream(data: originalData, isClosed: true) | ||
let subject = FoundationStreamBridge(readableStream: bufferedStream, bufferSize: bufferSize, logger: TestLogger()) | ||
let subject = FoundationStreamBridge(readableStream: bufferedStream, bridgeBufferSize: bridgeBufferSize, boundStreamBufferSize: boundStreamBufferSize, logger: TestLogger()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create the FoundationStreamBridge
under test with individual settings for bridge buffer size and bound stream buffer size.
bridgeBufferSize: Int = 65_536, | ||
boundStreamBufferSize: Int? = nil, | ||
logger: LogAgent | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added individual parameters for the size of the two buffers:
- The "bridge buffer", which holds data between the
ReadableStream
and the FoundationOutputStream
, and - the bound stream buffer, which holds data between the bound-pair
OutputStream
andInputStream
Foundation streams.
/// Sets stream status to indicate the stream is empty. | ||
func setReadableStreamIsEmpty() async { | ||
readableStreamIsEmpty = true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flag moved to the FoundationStreamBridge
because having it on this actor is redundant - the WriteCoordinator's perform()
method enforces exclusive access so having this flag on the Actor is just superfluous.
} | ||
self.task = task | ||
_ = try await task.value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplified this method, and (on this line) made it rethrow any error that was thrown by its Task
.
/// | ||
/// A pair of Foundation "bound streams" is created. Data from the `ReadableStream` is transferred into the Foundation | ||
/// `OutputStream` until the `ReadableStream` is closed and all data has been read from it. The Foundation | ||
/// `InputStream` is exposed as a property, and may be used to stream the data to other components. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lots of added commentary in this file to help guide the reader through how this type works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are great changes, but to be sure they don't break anything could you create a corresponding branch in aws-sdk-swift and run the full integration test suite against it?
|
/// Reads data from a smithy-swift native `ReadableStream` and streams the data through to a Foundation `InputStream`. | ||
/// | ||
/// A pair of Foundation "bound streams" is created. Data from the `ReadableStream` is transferred into the Foundation | ||
/// `OutputStream` until the `ReadableStream` is closed and all data has been read from it. The Foundation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Line 22 says data is read from ReadableStream
to a InputStream
. Line 24&25 say data is read from ReadableStream
to OutputStream
. So is OutputStream
a stepping stone (e.g., ReadableStream
=> OutputStream
=> InputStream
)? If so, why does there need to be one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 22 is supposed to summarize where the data comes in to the FoundationStreamBridge
and where it goes out. Line 24/25 are individual steps within the FoundationStreamBridge
.
Overall, data flows like this:
Customer events or data > ClientRuntime.ReadableStream > FoundationStreamBridge buffer > Foundation.OutputStream > bound stream buffer > Foundation.InputStream > URLRequest streaming body
We need the OutputStream
because a pair of bound streams is how you get data through to the InputStream
that we need for streaming our URLRequest body. Put another way, we can't put data into an InputStream
unless we do so with a bound OutputStream
.
Issue #
awslabs/aws-sdk-swift#1457
Description of changes
FoundationStreamBridge
can close while there is still data in the buffer left to stream. This potential bug was corrected by ensuring that both the buffer is empty AND the readable stream is closed before closing the output stream. (Before, we closed the output stream when the readable stream closed, without checking if there was data left to be streamed in the buffer.)OutputStream
has room to receive, the remaining buffer data is written prior to reading theReadableStream
for more. This might increase the number of writes needed, but will ensure that available data is transferred to the FoundationOutputStream
promptly.FoundationStreamBridge
to take a separate buffer size for the buffer between the bound streams. This aids in testing by helping to create mismatches between the size of the buffer and the size of writes to the output stream.FoundationStreamBridgeTests
to set random values for both of theFoundationStreamBridge
's internal buffers when performing test runs; this helps to create as many variations in operating parameters as possible.WriteCoordinator
'sperform()
function to rethrow errors thrown by the block.FoundationStreamBridge
to be more straightforward & easier to follow.FoundationStreamBridge
to aid developers in working with the code.Scope
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.