Skip to content

Commit

Permalink
Remove wrapping layer around example gRPC client
Browse files Browse the repository at this point in the history
The wrapping layer doesn't provide enough to make the additional layer
of abstraction worth it; we can work directly with tonic gRPC clients
instead.

Change-Id: I4a842e811503131032d19da982dd828bf72e22c7
  • Loading branch information
jblebrun committed Jan 15, 2025
1 parent 842825a commit 08c1b42
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 74 deletions.
1 change: 0 additions & 1 deletion oak_containers/examples/hello_world/host_app/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ package(
rust_library(
name = "oak_containers_examples_hello_world_host_app",
srcs = [
"src/app_client.rs",
"src/demo_transport.rs",
"src/http_service.rs",
"src/launcher_args.rs",
Expand Down
53 changes: 0 additions & 53 deletions oak_containers/examples/hello_world/host_app/src/app_client.rs

This file was deleted.

19 changes: 12 additions & 7 deletions oak_containers/examples/hello_world/host_app/src/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,25 @@ use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper::{body, server::conn::http1, service::service_fn, Request, Response};
use hyper_util::rt::{TokioIo, TokioTimer};
use oak_hello_world_proto::oak::containers::example::enclave_application_client::EnclaveApplicationClient;
use oak_proto_rust::oak::session::v1::{RequestWrapper, ResponseWrapper};
use prost::Message;
use tokio::{net::TcpListener, sync::Mutex};

use crate::app_client::EnclaveApplicationClient;
use tokio::{net::TcpListener, sync::Mutex, time::Duration};
use tonic::transport::{channel::Channel, Endpoint};

async fn handle_request(
request: RequestWrapper,
enclave_app: Arc<Mutex<EnclaveApplicationClient>>,
enclave_app: Arc<Mutex<EnclaveApplicationClient<Channel>>>,
) -> tonic::Result<ResponseWrapper> {
// This is not how we should actually use the streaming interface, but it
// works for HPKE, as long as all requests go to the same machine.
let mut response_stream =
let response_stream =
enclave_app.lock().await.legacy_session(tokio_stream::iter(vec![request])).await.map_err(
|err| tonic::Status::internal(format!("starting streaming session failed: {err:?}")),
)?;

response_stream
.into_inner()
.message()
.await?
.context("no response wrapper was returned")
Expand All @@ -58,9 +59,13 @@ pub async fn serve(
.get_trusted_app_address()
.await
.map_err(|error| anyhow!("Failed to get app address: {error:?}"))?;
let app_client = EnclaveApplicationClient::create(format!("http://{enclave_app_address}"))
let channel = Endpoint::from_shared(format!("http://{enclave_app_address}"))
.context("couldn't form channel")?
.connect_timeout(Duration::from_secs(120))
.connect()
.await
.map_err(|error| anyhow!("Failed to create enclave application client: {error:?}"))?;
.context("couldn't connect to enclave app")?;
let app_client = EnclaveApplicationClient::new(channel);

let app_client = Arc::new(Mutex::new(app_client));

Expand Down
1 change: 0 additions & 1 deletion oak_containers/examples/hello_world/host_app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod app_client;
pub mod demo_transport;
pub mod http_service;
pub mod launcher_args;
Expand Down
28 changes: 16 additions & 12 deletions oak_containers/examples/hello_world/host_app/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,24 @@

use std::{future::Future, pin::Pin, sync::Arc};

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use futures::{channel::mpsc, Stream, StreamExt};
use oak_hello_world_proto::oak::containers::example::host_application_server::{
HostApplication, HostApplicationServer,
use oak_hello_world_proto::oak::containers::example::{
enclave_application_client::EnclaveApplicationClient,
host_application_server::{HostApplication, HostApplicationServer},
};
use oak_proto_rust::oak::session::v1::{RequestWrapper, ResponseWrapper};
use tokio::{net::TcpListener, sync::Mutex};
use tokio::{net::TcpListener, sync::Mutex, time::Duration};
use tokio_stream::wrappers::TcpListenerStream;

use crate::app_client::EnclaveApplicationClient;
use tonic::transport::{channel::Channel, Endpoint};

/// The sample application's implementation of Oak's streaming service protocol.
struct HostApplicationImpl {
enclave_app: Arc<Mutex<EnclaveApplicationClient>>,
enclave_app: Arc<Mutex<EnclaveApplicationClient<Channel>>>,
}

impl HostApplicationImpl {
pub fn new(enclave_app: EnclaveApplicationClient) -> Self {
pub fn new(enclave_app: EnclaveApplicationClient<Channel>) -> Self {
Self { enclave_app: Arc::new(Mutex::new(enclave_app)) }
}
}
Expand All @@ -48,12 +48,12 @@ async fn forward_stream<Fut>(
upstream_starter: impl FnOnce(mpsc::Receiver<RequestWrapper>) -> Fut,
) -> Result<impl Stream<Item = Result<ResponseWrapper, tonic::Status>>, tonic::Status>
where
Fut: Future<Output = Result<tonic::Streaming<ResponseWrapper>, tonic::Status>>,
Fut: Future<Output = Result<tonic::Response<tonic::Streaming<ResponseWrapper>>, tonic::Status>>,
{
let mut request_stream = request_stream;
let (mut tx, rx) = mpsc::channel(10);

let mut upstream = upstream_starter(rx).await?;
let mut upstream = upstream_starter(rx).await?.into_inner();

Ok(async_stream::try_stream! {
loop {
Expand Down Expand Up @@ -120,9 +120,13 @@ pub async fn create(
.get_trusted_app_address()
.await
.map_err(|error| anyhow!("Failed to get app address: {error:?}"))?;
let app_client = EnclaveApplicationClient::create(format!("http://{enclave_app_address}"))
let channel = Endpoint::from_shared(format!("http://{enclave_app_address}"))
.context("couldn't form channel")?
.connect_timeout(Duration::from_secs(120))
.connect()
.await
.map_err(|error| anyhow!("Failed to create enclave application client: {error:?}"))?;
.context("couldn't connect to enclave app")?;
let app_client = EnclaveApplicationClient::new(channel);
tonic::transport::Server::builder()
.add_service(HostApplicationServer::new(HostApplicationImpl::new(app_client)))
.serve_with_incoming(TcpListenerStream::new(listener))
Expand Down

0 comments on commit 08c1b42

Please sign in to comment.