Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype for server implementation using Vert.x transport #114

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
40 changes: 40 additions & 0 deletions vertx-grpc/src/main/java/io/vertx/grpc/server/GrpcClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.vertx.grpc.server;

import io.grpc.MethodDescriptor;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.net.SocketAddress;

public class GrpcClient {

private final Vertx vertx;
private HttpClient client;

public GrpcClient(HttpClientOptions options, Vertx vertx) {
this.vertx = vertx;
this.client = vertx.createHttpClient(new HttpClientOptions(options)
.setProtocolVersion(HttpVersion.HTTP_2));
}

public GrpcClient(Vertx vertx) {
this(new HttpClientOptions().setHttp2ClearTextUpgrade(false), vertx);
}

public Future<GrpcClientRequest> request(SocketAddress server) {
RequestOptions options = new RequestOptions()
.setMethod(HttpMethod.POST)
.setServer(server);
return client.request(options)
.map(request -> new GrpcClientRequest(request));
}

public <Req, Resp> Future<GrpcClientCallRequest<Req, Resp>> call(SocketAddress server, MethodDescriptor<Req, Resp> methodDesc) {
return request(server).map(grpcRequest -> new GrpcClientCallRequest<>(grpcRequest, methodDesc));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.vertx.grpc.server;

import io.grpc.MethodDescriptor;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.WriteStream;

import java.io.IOException;
import java.io.InputStream;

public class GrpcClientCallRequest<Req, Resp> implements WriteStream<Req> {

private final GrpcClientRequest grpcRequest;
private final MethodDescriptor<Req, Resp> methodDesc;

public GrpcClientCallRequest(GrpcClientRequest grpcRequest, MethodDescriptor<Req, Resp> methodDesc) {

grpcRequest.fullMethodName(methodDesc.getFullMethodName());

this.grpcRequest = grpcRequest;
this.methodDesc = methodDesc;
}

public GrpcClientCallRequest<Req, Resp> encoding(String encoding) {
grpcRequest.encoding(encoding);
return this;
}

@Override
public GrpcClientCallRequest<Req, Resp> exceptionHandler(Handler<Throwable> handler) {
grpcRequest.exceptionHandler(handler);
return this;
}

@Override
public void write(Req data, Handler<AsyncResult<Void>> handler) {
write(data).onComplete(handler);
}

@Override
public void end(Handler<AsyncResult<Void>> handler) {
end().onComplete(handler);
}

@Override
public GrpcClientCallRequest<Req, Resp> setWriteQueueMaxSize(int maxSize) {
grpcRequest.setWriteQueueMaxSize(maxSize);
return this;
}

@Override
public boolean writeQueueFull() {
return grpcRequest.writeQueueFull();
}

@Override
public GrpcClientCallRequest<Req, Resp> drainHandler(Handler<Void> handler) {
grpcRequest.drainHandler(handler);
return this;
}

public Future<Void> write(Req message) {
return grpcRequest.write(GrpcMessage.message(encode(message)));
}

public Future<Void> end(Req message) {
return grpcRequest.end(GrpcMessage.message(encode(message)));
}

public Future<Void> end() {
return grpcRequest.end();
}

public Future<GrpcClientCallResponse<Req, Resp>> response() {
return grpcRequest.response().map(grpcResponse -> new GrpcClientCallResponse<>(grpcResponse, methodDesc));
}

private Buffer encode(Req message) {
Buffer encoded = Buffer.buffer();
InputStream stream = methodDesc.streamRequest(message);
byte[] tmp = new byte[256];
int i;
try {
while ((i = stream.read(tmp)) != -1) {
encoded.appendBytes(tmp, 0, i);
}
} catch (IOException e) {
throw new VertxException(e);
}
return encoded;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.vertx.grpc.server;

import io.grpc.MethodDescriptor;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;

import java.io.ByteArrayInputStream;

public class GrpcClientCallResponse<Req, Resp> implements ReadStream<Resp> {

private final GrpcClientResponse grpcResponse;
private final MethodDescriptor<Req, Resp> methodDesc;
private Handler<Resp> messageHandler;
private Handler<Void> endHandler;

public GrpcClientCallResponse(GrpcClientResponse grpcResponse, MethodDescriptor<Req, Resp> methodDesc) {
grpcResponse.messageHandler(msg -> {
handleMessage(msg);
});
grpcResponse.endHandler(v -> {
handleEnd();
});
this.grpcResponse = grpcResponse;
this.methodDesc = methodDesc;
}

private void handleMessage(GrpcMessage message) {
ByteArrayInputStream in = new ByteArrayInputStream(message.payload().getBytes());
Resp obj = methodDesc.parseResponse(in);
Handler<Resp> handler = messageHandler;
if (handler != null) {
handler.handle(obj);
}
}

private void handleEnd() {
Handler<Void> handler = endHandler;
if (handler != null) {
handler.handle(null);
}
}

public GrpcStatus status() {
return grpcResponse.status();
}

@Override
public ReadStream<Resp> exceptionHandler(Handler<Throwable> handler) {
grpcResponse.exceptionHandler(handler);
return this;
}

@Override
public GrpcClientCallResponse<Req, Resp> handler(@Nullable Handler<Resp> handler) {
return messageHandler(handler);
}

@Override
public GrpcClientCallResponse<Req, Resp> pause() {
grpcResponse.pause();
return this;
}

@Override
public GrpcClientCallResponse<Req, Resp> resume() {
grpcResponse.resume();
return this;
}

@Override
public GrpcClientCallResponse<Req, Resp> fetch(long amount) {
grpcResponse.fetch(amount);
return this;
}

public GrpcClientCallResponse<Req, Resp> messageHandler(Handler<Resp> handler) {
messageHandler = handler;
return this;
}

public GrpcClientCallResponse<Req, Resp> endHandler(Handler<Void> handler) {
endHandler = handler;
return this;
}
}
110 changes: 110 additions & 0 deletions vertx-grpc/src/main/java/io/vertx/grpc/server/GrpcClientRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.vertx.grpc.server;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.streams.WriteStream;

import java.util.Objects;

public class GrpcClientRequest implements WriteStream<GrpcMessage> {

private final HttpClientRequest httpRequest;
private String fullMethodName;
private String encoding = "identity";
private boolean headerSent;
private Future<GrpcClientResponse> response;

public GrpcClientRequest(HttpClientRequest httpRequest) {
this.httpRequest = httpRequest;
this.response = httpRequest.response().map(httpResponse -> {
GrpcClientResponse grpcResponse = new GrpcClientResponse(httpResponse);
grpcResponse.init();
return grpcResponse;
});
}

public GrpcClientRequest encoding(String encoding) {
Objects.requireNonNull(encoding);
this.encoding = encoding;
return this;
}

public GrpcClientRequest fullMethodName(String fullMethodName) {
this.fullMethodName = fullMethodName;
return this;
}

@Override
public GrpcClientRequest exceptionHandler(Handler<Throwable> handler) {
httpRequest.exceptionHandler(handler);
return this;
}

@Override
public void write(GrpcMessage data, Handler<AsyncResult<Void>> handler) {
write(data).onComplete(handler);
}

@Override
public void end(Handler<AsyncResult<Void>> handler) {
end().onComplete(handler);
}

@Override
public GrpcClientRequest setWriteQueueMaxSize(int maxSize) {
httpRequest.setWriteQueueMaxSize(maxSize);
return this;
}

@Override
public boolean writeQueueFull() {
return httpRequest.writeQueueFull();
}

@Override
public GrpcClientRequest drainHandler(Handler<Void> handler) {
httpRequest.drainHandler(handler);
return this;
}

public Future<Void> write(GrpcMessage message) {
return write(message, false);
}

public Future<Void> end(GrpcMessage message) {
return write(message, true);
}

public Future<Void> end() {
if (!headerSent) {
throw new IllegalStateException();
}
return httpRequest.end();
}

private Future<Void> write(GrpcMessage message, boolean end) {
if (!headerSent) {
if (fullMethodName == null) {
throw new IllegalStateException();
}
httpRequest.putHeader("content-type", "application/grpc");
httpRequest.putHeader("grpc-encoding", encoding);
httpRequest.putHeader("grpc-accept-encoding", "gzip");
httpRequest.putHeader("te", "trailers");
httpRequest.setChunked(true);
httpRequest.setURI("/" + fullMethodName);
headerSent = true;
}
if (end) {
return httpRequest.end(message.encode(encoding));
} else {
return httpRequest.write(message.encode(encoding));
}
}

public Future<GrpcClientResponse> response() {
return response;
}
}
Loading