Skip to content

Commit

Permalink
Merge pull request #12 from devashishdxt/stream-response
Browse files Browse the repository at this point in the history
Fix handling of stream responses
  • Loading branch information
devashishdxt authored Sep 10, 2022
2 parents 7f42dd2 + 6c5b81e commit cdf3579
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ http-body = "0.4.5"
httparse = "1.8.0"
reqwest = { version = "0.11.11", default-features = false }
thiserror = "1.0.34"
tonic = { version = "0.8.0", default-features = false }
tonic = { version = "0.8.1", default-features = false }
tower = { version = "0.4.13", default-features = false }
wee_alloc = { version = "0.4.5", optional = true }

Expand Down
27 changes: 18 additions & 9 deletions src/grpc_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ use httparse::{Status, EMPTY_HEADER};

use crate::ClientError;

/// If 8th MSB of a frame is `0` for data and `1` for trailer
const TRAILER_BIT: u8 = 0b10000000;

pub struct GrpcResponse {
data: Bytes,
trailers: HeaderMap<HeaderValue>,
Expand All @@ -31,11 +34,22 @@ impl GrpcResponse {

body.extend(b"\n");

let compression_flag = body.get_u8();
let len = body.get_u32();
let mut data = BytesMut::new();

let mut compression_flag = body.get_u8();

let data_bytes = body.split_to(len as usize).freeze();
body.advance(5);
while compression_flag & TRAILER_BIT == 0 {
let len = body.get_u32();
let data_bytes = body.split_to(len as usize).freeze();

data.put_u8(compression_flag);
data.put_u32(len);
data.extend_from_slice(&data_bytes);

compression_flag = body.get_u8();
}

body.advance(4);

let mut trailers_buf = [EMPTY_HEADER; 64];
let parsed_trailers = match httparse::parse_headers(&body, &mut trailers_buf)
Expand All @@ -53,11 +67,6 @@ impl GrpcResponse {
trailers.insert(header_name, header_value);
}

let mut data = BytesMut::with_capacity(data_bytes.len() + 5);
data.put_u8(compression_flag);
data.put_u32(len);
data.extend_from_slice(&data_bytes);

Ok(Self {
data: data.freeze(),
trailers,
Expand Down
2 changes: 1 addition & 1 deletion test-suite/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"

[dependencies]
prost = "0.11.0"
tonic = { version = "0.8.0", default-features = false, features = [
tonic = { version = "0.8.1", default-features = false, features = [
"prost",
"codegen",
] }
Expand Down
24 changes: 24 additions & 0 deletions test-suite/client/tests/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,27 @@ async fn test_echo() {

assert_eq!(response.message, "echo(John)");
}

#[wasm_bindgen_test]
async fn test_echo_stream() {
let mut client = EchoClient::default();

let mut stream_response = client
.echo_stream(EchoRequest {
message: "John".to_string(),
})
.await
.expect("success stream response")
.into_inner();

for i in 0..3 {
let response = stream_response.message().await.expect("stream message");
assert!(response.is_some(), "{}", i);
let response = response.unwrap();

assert_eq!(response.message, "echo(John)");
}

let response = stream_response.message().await.expect("stream message");
assert!(response.is_none());
}
2 changes: 2 additions & 0 deletions test-suite/proto/echo.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package echo;

service Echo {
rpc Echo (EchoRequest) returns (EchoResponse) {}

rpc EchoStream (EchoRequest) returns (stream EchoResponse) {}
}

message EchoRequest {
Expand Down
5 changes: 3 additions & 2 deletions test-suite/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures-core = "0.3.24"
prost = "0.11.0"
tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread"] }
tonic = "0.8.0"
tokio = { version = "1.21.0", features = ["macros", "rt-multi-thread"] }
tonic = "0.8.1"
tonic-web = "0.4.0"

[build-dependencies]
Expand Down
43 changes: 42 additions & 1 deletion test-suite/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use std::error::Error;
use std::{
error::Error,
pin::Pin,
task::{Context, Poll},
};

use futures_core::Stream;
use proto::echo_server::EchoServer;
use tonic::{transport::Server, Request, Response, Status};

Expand All @@ -13,12 +18,48 @@ pub struct EchoService;

#[tonic::async_trait]
impl Echo for EchoService {
type EchoStreamStream = MessageStream;

async fn echo(&self, request: Request<EchoRequest>) -> Result<Response<EchoResponse>, Status> {
let request = request.into_inner();
Ok(Response::new(EchoResponse {
message: format!("echo({})", request.message),
}))
}

async fn echo_stream(
&self,
request: Request<EchoRequest>,
) -> Result<Response<Self::EchoStreamStream>, Status> {
let request = request.into_inner();
Ok(Response::new(MessageStream::new(request.message)))
}
}

pub struct MessageStream {
message: String,
count: u8,
}

impl MessageStream {
pub fn new(message: String) -> Self {
Self { message, count: 0 }
}
}

impl Stream for MessageStream {
type Item = Result<EchoResponse, Status>;

fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.count < 3 {
self.count += 1;
Poll::Ready(Some(Ok(EchoResponse {
message: format!("echo({})", self.message),
})))
} else {
Poll::Ready(None)
}
}
}

#[tokio::main]
Expand Down

0 comments on commit cdf3579

Please sign in to comment.