From 971c506025bd8983e40cfa50d096c396d77389c4 Mon Sep 17 00:00:00 2001 From: John Cormie Date: Wed, 20 Dec 2023 09:16:41 -0800 Subject: [PATCH] binder: rename gbs to stream and gbt to transport (#35357) Per https://google.github.io/styleguide/cppguide.html#General_Naming_Rules - Use "names that would be clear even to people on a different team". - "Minimize the use of abbreviations ... (especially acronyms and initialisms)" - "Do not worry about saving horizontal space" Also supports rename of grpc_binder_stream and grpc_binder_transport in an upcoming PR. Closes #35357 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35357 from jdcormie:rename bef505c760a45f2276c16cab478a01e9c5c924cd PiperOrigin-RevId: 592578920 --- .../binder/transport/binder_stream.h | 28 +- .../binder/transport/binder_transport.cc | 362 +++++++++--------- 2 files changed, 201 insertions(+), 189 deletions(-) diff --git a/src/core/ext/transport/binder/transport/binder_stream.h b/src/core/ext/transport/binder/transport/binder_stream.h index fc2f13c4496eb..afd0c41451433 100644 --- a/src/core/ext/transport/binder/transport/binder_stream.h +++ b/src/core/ext/transport/binder/transport/binder_stream.h @@ -20,30 +20,30 @@ #include "src/core/ext/transport/binder/transport/binder_transport.h" struct RecvInitialMetadataArgs { - grpc_binder_stream* gbs; - grpc_binder_transport* gbt; + grpc_binder_stream* stream; + grpc_binder_transport* transport; int tx_code; absl::StatusOr initial_metadata; }; struct RecvMessageArgs { - grpc_binder_stream* gbs; - grpc_binder_transport* gbt; + grpc_binder_stream* stream; + grpc_binder_transport* transport; int tx_code; absl::StatusOr message; }; struct RecvTrailingMetadataArgs { - grpc_binder_stream* gbs; - grpc_binder_transport* gbt; + grpc_binder_stream* stream; + grpc_binder_transport* transport; int tx_code; absl::StatusOr trailing_metadata; int status; }; struct RegisterStreamArgs { - grpc_binder_stream* gbs; - grpc_binder_transport* gbt; + grpc_binder_stream* stream; + grpc_binder_transport* transport; }; // TODO(mingcl): Figure out if we want to use class instead of struct here @@ -59,12 +59,12 @@ struct grpc_binder_stream { tx_code(tx_code), is_client(is_client), is_closed(false) { - recv_initial_metadata_args.gbs = this; - recv_initial_metadata_args.gbt = t; - recv_message_args.gbs = this; - recv_message_args.gbt = t; - recv_trailing_metadata_args.gbs = this; - recv_trailing_metadata_args.gbt = t; + recv_initial_metadata_args.stream = this; + recv_initial_metadata_args.transport = t; + recv_message_args.stream = this; + recv_message_args.transport = t; + recv_trailing_metadata_args.stream = this; + recv_trailing_metadata_args.transport = t; } ~grpc_binder_stream() { diff --git a/src/core/ext/transport/binder/transport/binder_transport.cc b/src/core/ext/transport/binder/transport/binder_transport.cc index fc16e47c4f169..aad7b43f2137a 100644 --- a/src/core/ext/transport/binder/transport/binder_transport.cc +++ b/src/core/ext/transport/binder/transport/binder_transport.cc @@ -99,7 +99,7 @@ static void grpc_binder_unref_transport(grpc_binder_transport* t) { static void register_stream_locked(void* arg, grpc_error_handle /*error*/) { RegisterStreamArgs* args = static_cast(arg); - args->gbt->registered_stream[args->gbs->GetTxCode()] = args->gbs; + args->transport->registered_stream[args->stream->GetTxCode()] = args->stream; } void grpc_binder_transport::InitStream(grpc_stream* gs, @@ -114,14 +114,14 @@ void grpc_binder_transport::InitStream(grpc_stream* gs, // `grpc_binder_transport::registered_stream` should only be updated in // combiner - grpc_binder_stream* gbs = reinterpret_cast(gs); - gbs->register_stream_args.gbs = gbs; - gbs->register_stream_args.gbt = this; + grpc_binder_stream* stream = reinterpret_cast(gs); + stream->register_stream_args.stream = stream; + stream->register_stream_args.transport = this; grpc_core::ExecCtx exec_ctx; - combiner->Run( - GRPC_CLOSURE_INIT(&gbs->register_stream_closure, register_stream_locked, - &gbs->register_stream_args, nullptr), - absl::OkStatus()); + combiner->Run(GRPC_CLOSURE_INIT(&stream->register_stream_closure, + register_stream_locked, + &stream->register_stream_args, nullptr), + absl::OkStatus()); } static void AssignMetadata(grpc_metadata_batch* mb, @@ -137,35 +137,36 @@ static void AssignMetadata(grpc_metadata_batch* mb, } } -static void cancel_stream_locked(grpc_binder_transport* gbt, - grpc_binder_stream* gbs, +static void cancel_stream_locked(grpc_binder_transport* transport, + grpc_binder_stream* stream, grpc_error_handle error) { gpr_log(GPR_INFO, "cancel_stream_locked"); - if (!gbs->is_closed) { - GPR_ASSERT(gbs->cancel_self_error.ok()); - gbs->is_closed = true; - gbs->cancel_self_error = error; - gbt->transport_stream_receiver->CancelStream(gbs->tx_code); - gbt->registered_stream.erase(gbs->tx_code); - if (gbs->recv_initial_metadata_ready != nullptr) { - grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_initial_metadata_ready, - error); - gbs->recv_initial_metadata_ready = nullptr; - gbs->recv_initial_metadata = nullptr; - gbs->trailing_metadata_available = nullptr; + if (!stream->is_closed) { + GPR_ASSERT(stream->cancel_self_error.ok()); + stream->is_closed = true; + stream->cancel_self_error = error; + transport->transport_stream_receiver->CancelStream(stream->tx_code); + transport->registered_stream.erase(stream->tx_code); + if (stream->recv_initial_metadata_ready != nullptr) { + grpc_core::ExecCtx::Run(DEBUG_LOCATION, + stream->recv_initial_metadata_ready, error); + stream->recv_initial_metadata_ready = nullptr; + stream->recv_initial_metadata = nullptr; + stream->trailing_metadata_available = nullptr; } - if (gbs->recv_message_ready != nullptr) { - grpc_core::ExecCtx::Run(DEBUG_LOCATION, gbs->recv_message_ready, error); - gbs->recv_message_ready = nullptr; - gbs->recv_message->reset(); - gbs->recv_message = nullptr; - gbs->call_failed_before_recv_message = nullptr; + if (stream->recv_message_ready != nullptr) { + grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream->recv_message_ready, + error); + stream->recv_message_ready = nullptr; + stream->recv_message->reset(); + stream->recv_message = nullptr; + stream->call_failed_before_recv_message = nullptr; } - if (gbs->recv_trailing_metadata_finished != nullptr) { + if (stream->recv_trailing_metadata_finished != nullptr) { grpc_core::ExecCtx::Run(DEBUG_LOCATION, - gbs->recv_trailing_metadata_finished, error); - gbs->recv_trailing_metadata_finished = nullptr; - gbs->recv_trailing_metadata = nullptr; + stream->recv_trailing_metadata_finished, error); + stream->recv_trailing_metadata_finished = nullptr; + stream->recv_trailing_metadata = nullptr; } } } @@ -187,53 +188,53 @@ static bool ContainsAuthorityAndPath(const grpc_binder::Metadata& metadata) { static void recv_initial_metadata_locked(void* arg, grpc_error_handle /*error*/) { RecvInitialMetadataArgs* args = static_cast(arg); - grpc_binder_stream* gbs = args->gbs; + grpc_binder_stream* stream = args->stream; gpr_log(GPR_INFO, "recv_initial_metadata_locked is_client = %d is_closed = %d", - gbs->is_client, gbs->is_closed); + stream->is_client, stream->is_closed); - if (!gbs->is_closed) { + if (!stream->is_closed) { grpc_error_handle error = [&] { - GPR_ASSERT(gbs->recv_initial_metadata); - GPR_ASSERT(gbs->recv_initial_metadata_ready); + GPR_ASSERT(stream->recv_initial_metadata); + GPR_ASSERT(stream->recv_initial_metadata_ready); if (!args->initial_metadata.ok()) { gpr_log(GPR_ERROR, "Failed to parse initial metadata"); return absl_status_to_grpc_error(args->initial_metadata.status()); } - if (!gbs->is_client) { + if (!stream->is_client) { // For server, we expect :authority and :path in initial metadata. if (!ContainsAuthorityAndPath(*args->initial_metadata)) { return GRPC_ERROR_CREATE( "Missing :authority or :path in initial metadata"); } } - AssignMetadata(gbs->recv_initial_metadata, *args->initial_metadata); + AssignMetadata(stream->recv_initial_metadata, *args->initial_metadata); return absl::OkStatus(); }(); - if (gbs->t->registered_method_matcher_cb != nullptr) { - gbs->t->registered_method_matcher_cb(gbs->t->accept_stream_user_data, - gbs->recv_initial_metadata); + if (stream->t->registered_method_matcher_cb != nullptr) { + stream->t->registered_method_matcher_cb( + stream->t->accept_stream_user_data, stream->recv_initial_metadata); } - grpc_closure* cb = gbs->recv_initial_metadata_ready; - gbs->recv_initial_metadata_ready = nullptr; - gbs->recv_initial_metadata = nullptr; + grpc_closure* cb = stream->recv_initial_metadata_ready; + stream->recv_initial_metadata_ready = nullptr; + stream->recv_initial_metadata = nullptr; grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); } - GRPC_BINDER_STREAM_UNREF(gbs, "recv_initial_metadata"); + GRPC_BINDER_STREAM_UNREF(stream, "recv_initial_metadata"); } static void recv_message_locked(void* arg, grpc_error_handle /*error*/) { RecvMessageArgs* args = static_cast(arg); - grpc_binder_stream* gbs = args->gbs; + grpc_binder_stream* stream = args->stream; gpr_log(GPR_INFO, "recv_message_locked is_client = %d is_closed = %d", - gbs->is_client, gbs->is_closed); + stream->is_client, stream->is_closed); - if (!gbs->is_closed) { + if (!stream->is_closed) { grpc_error_handle error = [&] { - GPR_ASSERT(gbs->recv_message); - GPR_ASSERT(gbs->recv_message_ready); + GPR_ASSERT(stream->recv_message); + GPR_ASSERT(stream->recv_message_ready); if (!args->message.ok()) { gpr_log(GPR_ERROR, "Failed to receive message"); if (args->message.status().message() == @@ -249,62 +250,63 @@ static void recv_message_locked(void* arg, grpc_error_handle /*error*/) { } grpc_core::SliceBuffer buf; buf.Append(grpc_core::Slice(grpc_slice_from_cpp_string(*args->message))); - *gbs->recv_message = std::move(buf); + *stream->recv_message = std::move(buf); return absl::OkStatus(); }(); - if (!error.ok() && gbs->call_failed_before_recv_message != nullptr) { - *gbs->call_failed_before_recv_message = true; + if (!error.ok() && stream->call_failed_before_recv_message != nullptr) { + *stream->call_failed_before_recv_message = true; } - grpc_closure* cb = gbs->recv_message_ready; - gbs->recv_message_ready = nullptr; - gbs->recv_message = nullptr; + grpc_closure* cb = stream->recv_message_ready; + stream->recv_message_ready = nullptr; + stream->recv_message = nullptr; grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); } - GRPC_BINDER_STREAM_UNREF(gbs, "recv_message"); + GRPC_BINDER_STREAM_UNREF(stream, "recv_message"); } static void recv_trailing_metadata_locked(void* arg, grpc_error_handle /*error*/) { RecvTrailingMetadataArgs* args = static_cast(arg); - grpc_binder_stream* gbs = args->gbs; + grpc_binder_stream* stream = args->stream; gpr_log(GPR_INFO, "recv_trailing_metadata_locked is_client = %d is_closed = %d", - gbs->is_client, gbs->is_closed); + stream->is_client, stream->is_closed); - if (!gbs->is_closed) { + if (!stream->is_closed) { grpc_error_handle error = [&] { - GPR_ASSERT(gbs->recv_trailing_metadata); - GPR_ASSERT(gbs->recv_trailing_metadata_finished); + GPR_ASSERT(stream->recv_trailing_metadata); + GPR_ASSERT(stream->recv_trailing_metadata_finished); if (!args->trailing_metadata.ok()) { gpr_log(GPR_ERROR, "Failed to receive trailing metadata"); return absl_status_to_grpc_error(args->trailing_metadata.status()); } - if (!gbs->is_client) { + if (!stream->is_client) { // Client will not send non-empty trailing metadata. if (!args->trailing_metadata.value().empty()) { gpr_log(GPR_ERROR, "Server receives non-empty trailing metadata."); return absl::CancelledError(); } } else { - AssignMetadata(gbs->recv_trailing_metadata, *args->trailing_metadata); + AssignMetadata(stream->recv_trailing_metadata, + *args->trailing_metadata); // Append status to metadata // TODO(b/192208695): See if we can avoid to manually put status // code into the header gpr_log(GPR_INFO, "status = %d", args->status); - gbs->recv_trailing_metadata->Set( + stream->recv_trailing_metadata->Set( grpc_core::GrpcStatusMetadata(), static_cast(args->status)); } return absl::OkStatus(); }(); - if (gbs->is_client || gbs->trailing_metadata_sent) { - grpc_closure* cb = gbs->recv_trailing_metadata_finished; - gbs->recv_trailing_metadata_finished = nullptr; - gbs->recv_trailing_metadata = nullptr; + if (stream->is_client || stream->trailing_metadata_sent) { + grpc_closure* cb = stream->recv_trailing_metadata_finished; + stream->recv_trailing_metadata_finished = nullptr; + stream->recv_trailing_metadata = nullptr; grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); } else { // According to transport explaineer - "Server extra: This op shouldn't @@ -313,10 +315,10 @@ static void recv_trailing_metadata_locked(void* arg, // // We haven't sent trailing metadata yet, so we have to delay completing // the recv_trailing_metadata callback. - gbs->need_to_call_trailing_metadata_callback = true; + stream->need_to_call_trailing_metadata_callback = true; } } - GRPC_BINDER_STREAM_UNREF(gbs, "recv_trailing_metadata"); + GRPC_BINDER_STREAM_UNREF(stream, "recv_trailing_metadata"); } namespace grpc_binder { @@ -366,15 +368,16 @@ class MetadataEncoder { } // namespace grpc_binder static void accept_stream_locked(void* gt, grpc_error_handle /*error*/) { - grpc_binder_transport* gbt = static_cast(gt); - if (gbt->accept_stream_fn) { + grpc_binder_transport* transport = static_cast(gt); + if (transport->accept_stream_fn) { gpr_log(GPR_INFO, "Accepting a stream"); // must pass in a non-null value. - (*gbt->accept_stream_fn)(gbt->accept_stream_user_data, gbt, gbt); + (*transport->accept_stream_fn)(transport->accept_stream_user_data, + transport, transport); } else { - ++gbt->accept_stream_fn_called_count_; + ++transport->accept_stream_fn_called_count_; gpr_log(GPR_INFO, "accept_stream_fn not set, current count = %d", - gbt->accept_stream_fn_called_count_); + transport->accept_stream_fn_called_count_); } } @@ -382,34 +385,35 @@ static void perform_stream_op_locked(void* stream_op, grpc_error_handle /*error*/) { grpc_transport_stream_op_batch* op = static_cast(stream_op); - grpc_binder_stream* gbs = + grpc_binder_stream* stream = static_cast(op->handler_private.extra_arg); - grpc_binder_transport* gbt = gbs->t; + grpc_binder_transport* transport = stream->t; if (op->cancel_stream) { // TODO(waynetu): Is this true? GPR_ASSERT(!op->send_initial_metadata && !op->send_message && !op->send_trailing_metadata && !op->recv_initial_metadata && !op->recv_message && !op->recv_trailing_metadata); - gpr_log(GPR_INFO, "cancel_stream is_client = %d", gbs->is_client); - if (!gbs->is_client) { + gpr_log(GPR_INFO, "cancel_stream is_client = %d", stream->is_client); + if (!stream->is_client) { // Send trailing metadata to inform the other end about the cancellation, // regardless if we'd already done that or not. auto cancel_tx = std::make_unique( - gbs->GetTxCode(), gbt->is_client); + stream->GetTxCode(), transport->is_client); cancel_tx->SetSuffix(grpc_binder::Metadata{}); cancel_tx->SetStatus(1); - (void)gbt->wire_writer->RpcCall(std::move(cancel_tx)); + (void)transport->wire_writer->RpcCall(std::move(cancel_tx)); } - cancel_stream_locked(gbt, gbs, op->payload->cancel_stream.cancel_error); + cancel_stream_locked(transport, stream, + op->payload->cancel_stream.cancel_error); if (op->on_complete != nullptr) { grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, absl::OkStatus()); } - GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op"); + GRPC_BINDER_STREAM_UNREF(stream, "perform_stream_op"); return; } - if (gbs->is_closed) { + if (stream->is_closed) { if (op->send_message) { // Reset the send_message payload to prevent memory leaks. op->payload->send_message.send_message->Clear(); @@ -418,36 +422,38 @@ static void perform_stream_op_locked(void* stream_op, grpc_core::ExecCtx::Run( DEBUG_LOCATION, op->payload->recv_initial_metadata.recv_initial_metadata_ready, - gbs->cancel_self_error); + stream->cancel_self_error); } if (op->recv_message) { grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->payload->recv_message.recv_message_ready, - gbs->cancel_self_error); + stream->cancel_self_error); } if (op->recv_trailing_metadata) { grpc_core::ExecCtx::Run( DEBUG_LOCATION, op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, - gbs->cancel_self_error); + stream->cancel_self_error); } if (op->on_complete != nullptr) { grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, - gbs->cancel_self_error); + stream->cancel_self_error); } - GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op"); + GRPC_BINDER_STREAM_UNREF(stream, "perform_stream_op"); return; } - int tx_code = gbs->tx_code; - auto tx = std::make_unique(tx_code, gbt->is_client); + int tx_code = stream->tx_code; + auto tx = + std::make_unique(tx_code, transport->is_client); if (op->send_initial_metadata) { gpr_log(GPR_INFO, "send_initial_metadata"); grpc_binder::Metadata init_md; auto batch = op->payload->send_initial_metadata.send_initial_metadata; - grpc_binder::MetadataEncoder encoder(gbt->is_client, tx.get(), &init_md); + grpc_binder::MetadataEncoder encoder(transport->is_client, tx.get(), + &init_md); batch->Encode(&encoder); tx->SetPrefix(init_md); } @@ -461,7 +467,7 @@ static void perform_stream_op_locked(void* stream_op, auto batch = op->payload->send_trailing_metadata.send_trailing_metadata; grpc_binder::Metadata trailing_metadata; - grpc_binder::MetadataEncoder encoder(gbt->is_client, tx.get(), + grpc_binder::MetadataEncoder encoder(transport->is_client, tx.get(), &trailing_metadata); batch->Encode(&encoder); @@ -471,68 +477,70 @@ static void perform_stream_op_locked(void* stream_op, } if (op->recv_initial_metadata) { gpr_log(GPR_INFO, "recv_initial_metadata"); - gbs->recv_initial_metadata_ready = + stream->recv_initial_metadata_ready = op->payload->recv_initial_metadata.recv_initial_metadata_ready; - gbs->recv_initial_metadata = + stream->recv_initial_metadata = op->payload->recv_initial_metadata.recv_initial_metadata; - gbs->trailing_metadata_available = + stream->trailing_metadata_available = op->payload->recv_initial_metadata.trailing_metadata_available; - GRPC_BINDER_STREAM_REF(gbs, "recv_initial_metadata"); - gbt->transport_stream_receiver->RegisterRecvInitialMetadata( - tx_code, [tx_code, gbs, - gbt](absl::StatusOr initial_metadata) { + GRPC_BINDER_STREAM_REF(stream, "recv_initial_metadata"); + transport->transport_stream_receiver->RegisterRecvInitialMetadata( + tx_code, [tx_code, stream, transport]( + absl::StatusOr initial_metadata) { grpc_core::ExecCtx exec_ctx; - gbs->recv_initial_metadata_args.tx_code = tx_code; - gbs->recv_initial_metadata_args.initial_metadata = + stream->recv_initial_metadata_args.tx_code = tx_code; + stream->recv_initial_metadata_args.initial_metadata = std::move(initial_metadata); - gbt->combiner->Run( - GRPC_CLOSURE_INIT(&gbs->recv_initial_metadata_closure, + transport->combiner->Run( + GRPC_CLOSURE_INIT(&stream->recv_initial_metadata_closure, recv_initial_metadata_locked, - &gbs->recv_initial_metadata_args, nullptr), + &stream->recv_initial_metadata_args, nullptr), absl::OkStatus()); }); } if (op->recv_message) { gpr_log(GPR_INFO, "recv_message"); - gbs->recv_message_ready = op->payload->recv_message.recv_message_ready; - gbs->recv_message = op->payload->recv_message.recv_message; - gbs->call_failed_before_recv_message = + stream->recv_message_ready = op->payload->recv_message.recv_message_ready; + stream->recv_message = op->payload->recv_message.recv_message; + stream->call_failed_before_recv_message = op->payload->recv_message.call_failed_before_recv_message; if (op->payload->recv_message.flags != nullptr) { *op->payload->recv_message.flags = 0; } - GRPC_BINDER_STREAM_REF(gbs, "recv_message"); - gbt->transport_stream_receiver->RegisterRecvMessage( - tx_code, [tx_code, gbs, gbt](absl::StatusOr message) { + GRPC_BINDER_STREAM_REF(stream, "recv_message"); + transport->transport_stream_receiver->RegisterRecvMessage( + tx_code, + [tx_code, stream, transport](absl::StatusOr message) { grpc_core::ExecCtx exec_ctx; - gbs->recv_message_args.tx_code = tx_code; - gbs->recv_message_args.message = std::move(message); - gbt->combiner->Run( - GRPC_CLOSURE_INIT(&gbs->recv_message_closure, recv_message_locked, - &gbs->recv_message_args, nullptr), + stream->recv_message_args.tx_code = tx_code; + stream->recv_message_args.message = std::move(message); + transport->combiner->Run( + GRPC_CLOSURE_INIT(&stream->recv_message_closure, + recv_message_locked, &stream->recv_message_args, + nullptr), absl::OkStatus()); }); } if (op->recv_trailing_metadata) { gpr_log(GPR_INFO, "recv_trailing_metadata"); - gbs->recv_trailing_metadata_finished = + stream->recv_trailing_metadata_finished = op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - gbs->recv_trailing_metadata = + stream->recv_trailing_metadata = op->payload->recv_trailing_metadata.recv_trailing_metadata; - GRPC_BINDER_STREAM_REF(gbs, "recv_trailing_metadata"); - gbt->transport_stream_receiver->RegisterRecvTrailingMetadata( - tx_code, [tx_code, gbs, gbt]( + GRPC_BINDER_STREAM_REF(stream, "recv_trailing_metadata"); + transport->transport_stream_receiver->RegisterRecvTrailingMetadata( + tx_code, [tx_code, stream, transport]( absl::StatusOr trailing_metadata, int status) { grpc_core::ExecCtx exec_ctx; - gbs->recv_trailing_metadata_args.tx_code = tx_code; - gbs->recv_trailing_metadata_args.trailing_metadata = + stream->recv_trailing_metadata_args.tx_code = tx_code; + stream->recv_trailing_metadata_args.trailing_metadata = std::move(trailing_metadata); - gbs->recv_trailing_metadata_args.status = status; - gbt->combiner->Run( - GRPC_CLOSURE_INIT(&gbs->recv_trailing_metadata_closure, + stream->recv_trailing_metadata_args.status = status; + transport->combiner->Run( + GRPC_CLOSURE_INIT(&stream->recv_trailing_metadata_closure, recv_trailing_metadata_locked, - &gbs->recv_trailing_metadata_args, nullptr), + &stream->recv_trailing_metadata_args, nullptr), absl::OkStatus()); }); } @@ -540,20 +548,20 @@ static void perform_stream_op_locked(void* stream_op, absl::Status status; if (op->send_initial_metadata || op->send_message || op->send_trailing_metadata) { - status = gbt->wire_writer->RpcCall(std::move(tx)); - if (!gbs->is_client && op->send_trailing_metadata) { - gbs->trailing_metadata_sent = true; + status = transport->wire_writer->RpcCall(std::move(tx)); + if (!stream->is_client && op->send_trailing_metadata) { + stream->trailing_metadata_sent = true; // According to transport explaineer - "Server extra: This op shouldn't // actually be considered complete until the server has also sent trailing // metadata to provide the other side with final status" // // Because we've done sending trailing metadata here, we can safely // complete the recv_trailing_metadata callback here. - if (gbs->need_to_call_trailing_metadata_callback) { - grpc_closure* cb = gbs->recv_trailing_metadata_finished; - gbs->recv_trailing_metadata_finished = nullptr; + if (stream->need_to_call_trailing_metadata_callback) { + grpc_closure* cb = stream->recv_trailing_metadata_finished; + stream->recv_trailing_metadata_finished = nullptr; grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::OkStatus()); - gbs->need_to_call_trailing_metadata_callback = false; + stream->need_to_call_trailing_metadata_callback = false; } } } @@ -564,27 +572,28 @@ static void perform_stream_op_locked(void* stream_op, absl_status_to_grpc_error(status)); gpr_log(GPR_INFO, "on_complete closure scheduled"); } - GRPC_BINDER_STREAM_UNREF(gbs, "perform_stream_op"); + GRPC_BINDER_STREAM_UNREF(stream, "perform_stream_op"); } void grpc_binder_transport::PerformStreamOp( grpc_stream* gs, grpc_transport_stream_op_batch* op) { - grpc_binder_stream* gbs = reinterpret_cast(gs); + grpc_binder_stream* stream = reinterpret_cast(gs); gpr_log(GPR_INFO, "%s = %p %p %p is_client = %d", __func__, this, gs, op, - gbs->is_client); - GRPC_BINDER_STREAM_REF(gbs, "perform_stream_op"); - op->handler_private.extra_arg = gbs; + stream->is_client); + GRPC_BINDER_STREAM_REF(stream, "perform_stream_op"); + op->handler_private.extra_arg = stream; combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked, op, nullptr), absl::OkStatus()); } -static void close_transport_locked(grpc_binder_transport* gbt) { - gbt->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::OkStatus(), - "transport closed due to disconnection/goaway"); - while (!gbt->registered_stream.empty()) { +static void close_transport_locked(grpc_binder_transport* transport) { + transport->state_tracker.SetState( + GRPC_CHANNEL_SHUTDOWN, absl::OkStatus(), + "transport closed due to disconnection/goaway"); + while (!transport->registered_stream.empty()) { cancel_stream_locked( - gbt, gbt->registered_stream.begin()->second, + transport, transport->registered_stream.begin()->second, grpc_error_set_int(GRPC_ERROR_CREATE("transport closed"), grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE)); @@ -594,26 +603,28 @@ static void close_transport_locked(grpc_binder_transport* gbt) { static void perform_transport_op_locked(void* transport_op, grpc_error_handle /*error*/) { grpc_transport_op* op = static_cast(transport_op); - grpc_binder_transport* gbt = + grpc_binder_transport* transport = static_cast(op->handler_private.extra_arg); // TODO(waynetu): Should we lock here to avoid data race? if (op->start_connectivity_watch != nullptr) { - gbt->state_tracker.AddWatcher(op->start_connectivity_watch_state, - std::move(op->start_connectivity_watch)); + transport->state_tracker.AddWatcher( + op->start_connectivity_watch_state, + std::move(op->start_connectivity_watch)); } if (op->stop_connectivity_watch != nullptr) { - gbt->state_tracker.RemoveWatcher(op->stop_connectivity_watch); + transport->state_tracker.RemoveWatcher(op->stop_connectivity_watch); } if (op->set_accept_stream) { - gbt->accept_stream_user_data = op->set_accept_stream_user_data; - gbt->accept_stream_fn = op->set_accept_stream_fn; - gbt->registered_method_matcher_cb = op->set_registered_method_matcher_fn; + transport->accept_stream_user_data = op->set_accept_stream_user_data; + transport->accept_stream_fn = op->set_accept_stream_fn; + transport->registered_method_matcher_cb = + op->set_registered_method_matcher_fn; gpr_log(GPR_DEBUG, "accept_stream_fn_called_count_ = %d", - gbt->accept_stream_fn_called_count_); - while (gbt->accept_stream_fn_called_count_ > 0) { - --gbt->accept_stream_fn_called_count_; - gbt->combiner->Run( - GRPC_CLOSURE_CREATE(accept_stream_locked, gbt, nullptr), + transport->accept_stream_fn_called_count_); + while (transport->accept_stream_fn_called_count_ > 0) { + --transport->accept_stream_fn_called_count_; + transport->combiner->Run( + GRPC_CLOSURE_CREATE(accept_stream_locked, transport, nullptr), absl::OkStatus()); } } @@ -628,9 +639,9 @@ static void perform_transport_op_locked(void* transport_op, do_close = true; } if (do_close) { - close_transport_locked(gbt); + close_transport_locked(transport); } - GRPC_BINDER_UNREF_TRANSPORT(gbt, "perform_transport_op"); + GRPC_BINDER_UNREF_TRANSPORT(transport, "perform_transport_op"); } void grpc_binder_transport::PerformOp(grpc_transport_op* op) { @@ -643,34 +654,35 @@ void grpc_binder_transport::PerformOp(grpc_transport_op* op) { } static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) { - grpc_binder_stream* gbs = static_cast(sp); - grpc_binder_transport* gbt = gbs->t; + grpc_binder_stream* stream = static_cast(sp); + grpc_binder_transport* transport = stream->t; cancel_stream_locked( - gbt, gbs, + transport, stream, grpc_error_set_int(GRPC_ERROR_CREATE("destroy stream"), grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE)); - gbs->~grpc_binder_stream(); + stream->~grpc_binder_stream(); } void grpc_binder_transport::DestroyStream(grpc_stream* gs, grpc_closure* then_schedule_closure) { gpr_log(GPR_INFO, __func__); - grpc_binder_stream* gbs = reinterpret_cast(gs); - gbs->destroy_stream_then_closure = then_schedule_closure; - gbs->t->combiner->Run(GRPC_CLOSURE_INIT(&gbs->destroy_stream, - destroy_stream_locked, gbs, nullptr), - absl::OkStatus()); + grpc_binder_stream* stream = reinterpret_cast(gs); + stream->destroy_stream_then_closure = then_schedule_closure; + stream->t->combiner->Run( + GRPC_CLOSURE_INIT(&stream->destroy_stream, destroy_stream_locked, stream, + nullptr), + absl::OkStatus()); } static void destroy_transport_locked(void* gt, grpc_error_handle /*error*/) { - grpc_binder_transport* gbt = static_cast(gt); - close_transport_locked(gbt); + grpc_binder_transport* transport = static_cast(gt); + close_transport_locked(transport); // Release the references held by the transport. - gbt->wire_reader = nullptr; - gbt->transport_stream_receiver = nullptr; - gbt->wire_writer = nullptr; - GRPC_BINDER_UNREF_TRANSPORT(gbt, "transport destroyed"); + transport->wire_reader = nullptr; + transport->transport_stream_receiver = nullptr; + transport->wire_writer = nullptr; + GRPC_BINDER_UNREF_TRANSPORT(transport, "transport destroyed"); } void grpc_binder_transport::Orphan() {