Skip to content

Commit

Permalink
progress
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Jan 24, 2024
1 parent 41bcb12 commit 24d914d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
25 changes: 25 additions & 0 deletions src/core/ext/transport/chaotic_good/server_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "absl/status/statusor.h"

#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>

Expand Down Expand Up @@ -369,6 +370,9 @@ void ChaoticGoodServerTransport::AbortWithError() {
ReleasableMutexLock lock(&mu_);
StreamMap stream_map = std::move(stream_map_);
stream_map_.clear();
state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN,
absl::UnavailableError("transport closed"),
"transport closed");
lock.Release();
for (const auto& pair : stream_map) {
auto call_initiator = pair.second;
Expand Down Expand Up @@ -411,5 +415,26 @@ absl::Status ChaoticGoodServerTransport::NewStream(
return absl::OkStatus();
}

void ChaoticGoodServerTransport::PerformOp(grpc_transport_op* op) {
MutexLock lock(&mu_);
bool did_stuff = false;
if (op->start_connectivity_watch != nullptr) {
state_tracker_.AddWatcher(op->start_connectivity_watch_state,
std::move(op->start_connectivity_watch));
did_stuff = true;
}
if (op->stop_connectivity_watch != nullptr) {
state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
did_stuff = true;
}
if (op->set_accept_stream) {
Crash("set_accept_stream not supported on inproc transport");
}
if (!did_stuff) {
Crash(absl::StrCat("unimplemented transport perform op: ",
grpc_transport_op_string(op)));
}
}

} // namespace chaotic_good
} // namespace grpc_core
5 changes: 4 additions & 1 deletion src/core/ext/transport/chaotic_good/server_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>

Expand Down Expand Up @@ -94,7 +95,7 @@ class ChaoticGoodServerTransport final : public Transport,
absl::string_view GetTransportName() const override { return "chaotic_good"; }
void SetPollset(grpc_stream*, grpc_pollset*) override {}
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
void PerformOp(grpc_transport_op*) override { Crash("unimplemented"); }
void PerformOp(grpc_transport_op*) override;
grpc_endpoint* GetEndpoint() override { return nullptr; }
void Orphan() override { delete this; }

Expand Down Expand Up @@ -145,6 +146,8 @@ class ChaoticGoodServerTransport final : public Transport,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
ActivityPtr writer_;
ActivityPtr reader_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){
"chaotic_good_server", GRPC_CHANNEL_IDLE};
};

} // namespace chaotic_good
Expand Down

0 comments on commit 24d914d

Please sign in to comment.