Skip to content

Commit

Permalink
Add resp_stream_is_complete() (#605)
Browse files Browse the repository at this point in the history
To determine if there is data remaining on the stream. Fixes #559
  • Loading branch information
hadley authored Jan 6, 2025
1 parent f9bc4c9 commit f680367
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 7 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ export(resp_retry_after)
export(resp_status)
export(resp_status_desc)
export(resp_stream_aws)
export(resp_stream_is_complete)
export(resp_stream_lines)
export(resp_stream_raw)
export(resp_stream_sse)
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# httr2 (development version)

* `resp_stream_is_complete()` tells you if there is still data remaining to be streamed (#559).
* New `url_modify()`, `url_modify_query()`, and `url_modify_relative()` make it easier to modify an existing url (#464).
* New `url_query_parse()` and `url_query_build()` allow you to parse and build a query string (#425).
* `req_url_query()` gains the ability to control how spaces are encoded (#432).
Expand Down
11 changes: 11 additions & 0 deletions R/resp-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#' * `resp_stream_aws()` retrieves a single event from an AWS stream
#' (i.e. mime type `application/vnd.amazon.eventstream``).
#'
#' Use `resp_stream_is_complete()` to determine if there is further data
#' waiting on the stream.
#'
#' @returns
#' * `resp_stream_raw()`: a raw vector.
#' * `resp_stream_lines()`: a character vector.
Expand Down Expand Up @@ -80,6 +83,14 @@ resp_stream_sse <- function(resp, max_size = Inf) {
}
}

#' @export
#' @rdname resp_stream_raw
resp_stream_is_complete <- function(resp) {
check_response(resp)

!isIncomplete(resp$body)
}

#' @export
#' @param ... Not used; included for compatibility with generic.
#' @rdname resp_stream_raw
Expand Down
6 changes: 6 additions & 0 deletions man/resp_stream_raw.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 26 additions & 7 deletions tests/testthat/test-resp-stream.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

test_that("can stream bytes from a connection", {
resp <- request_test("/stream-bytes/2048") %>% req_perform_connection()
withr::defer(close(resp))
Expand All @@ -16,6 +15,26 @@ test_that("can stream bytes from a connection", {
expect_length(out, 0)
})

test_that("can determine if a stream is complete (blocking)", {
resp <- request_test("/stream-bytes/2048") %>% req_perform_connection()
withr::defer(close(resp))

expect_false(resp_stream_is_complete(resp))
expect_length(resp_stream_raw(resp, kb = 2), 2048)
expect_length(resp_stream_raw(resp, kb = 1), 0)
expect_true(resp_stream_is_complete(resp))
})

test_that("can determine if a stream is complete (non-blocking)", {
resp <- request_test("/stream-bytes/2048") %>% req_perform_connection(blocking = FALSE)
withr::defer(close(resp))

expect_false(resp_stream_is_complete(resp))
expect_length(resp_stream_raw(resp, kb = 2), 2048)
expect_length(resp_stream_raw(resp, kb = 1), 0)
expect_true(resp_stream_is_complete(resp))
})

test_that("can't read from a closed connection", {
resp <- request_test("/stream-bytes/1024") %>% req_perform_connection()
close(resp)
Expand All @@ -42,7 +61,7 @@ test_that("can join lines across multiple reads", {
expect_equal(out, character())
expect_equal(resp1$cache$push_back, charToRaw("This is a "))

while(length(out) == 0) {
while (length(out) == 0) {
Sys.sleep(0.1)
out <- resp_stream_lines(resp1)
}
Expand Down Expand Up @@ -147,7 +166,7 @@ test_that("streams the specified number of lines", {

test_that("can feed sse events one at a time", {
req <- local_app_request(function(req, res) {
for(i in 1:3) {
for (i in 1:3) {
res$send_chunk(sprintf("data: %s\n\n", i))
}
})
Expand Down Expand Up @@ -185,7 +204,7 @@ test_that("can join sse events across multiple reads", {
expect_equal(out, NULL)
expect_equal(resp1$cache$push_back, charToRaw("data: 1\n"))

while(is.null(out)) {
while (is.null(out)) {
Sys.sleep(0.1)
out <- resp_stream_sse(resp1)
}
Expand Down Expand Up @@ -213,7 +232,7 @@ test_that("sse always interprets data as UTF-8", {
withr::defer(close(resp1))

out <- NULL
while(is.null(out)) {
while (is.null(out)) {
Sys.sleep(0.1)
out <- resp_stream_sse(resp1)
}
Expand All @@ -236,7 +255,7 @@ test_that("streaming size limits enforced", {
resp1 <- req_perform_connection(req, blocking = FALSE)
withr::defer(close(resp1))
expect_error(
while(is.null(out)) {
while (is.null(out)) {
Sys.sleep(0.1)
out <- resp_stream_sse(resp1, max_size = 999)
}
Expand Down Expand Up @@ -266,7 +285,7 @@ test_that("has a working find_event_boundary", {
}
expect_identical(
result,
list(matched=charToRaw(matched), remaining = charToRaw(remaining))
list(matched = charToRaw(matched), remaining = charToRaw(remaining))
)
}

Expand Down

0 comments on commit f680367

Please sign in to comment.