Skip to content

Commit

Permalink
[controller] Add StoreGrpcService for store related gRPC endpoints an…
Browse files Browse the repository at this point in the history
…d reduce boilerplate (#1454)

- Introduced `StoreGrpcServiceImpl` for store-related gRPC endpoints (update, get, delete ACLs).
- Added `VeniceUnauthorizedAccessException` to handle unauthorized access scenarios.
- Refactored `VeniceGrpcServerConfig` to support multiple `BindableService` instances.
- Updated `VeniceGrpcServer` to register multiple services and interceptors.
- Added `StoreRequestHandler` to process store-specific gRPC requests.
- Introduced `ControllerGrpcServerUtils` for common gRPC request handling and error responses.
- Added unit tests for `StoreRequestHandler` and enhanced existing gRPC server tests.
  • Loading branch information
sushantmane authored Jan 23, 2025
1 parent 8d043e7 commit 43a6135
Show file tree
Hide file tree
Showing 30 changed files with 1,470 additions and 460 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.linkedin.venice.exceptions;

public class VeniceUnauthorizedAccessException extends VeniceException {
public VeniceUnauthorizedAccessException(String message) {
super(message);
}

public VeniceUnauthorizedAccessException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.security.SSLFactory;
import io.grpc.BindableService;
import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptors;
import io.grpc.ServerInterceptor;
import io.grpc.TlsServerCredentials;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.io.IOException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -36,11 +39,19 @@ public VeniceGrpcServer(VeniceGrpcServerConfig config) {
this.executor = config.getExecutor();
this.config = config;
initServerCredentials();
server = Grpc.newServerBuilderForPort(config.getPort(), credentials)
.executor(executor) // TODO: experiment with different executors for best performance
.addService(ServerInterceptors.intercept(config.getService(), config.getInterceptors()))
.addService(ProtoReflectionService.newInstance())
.build();
ServerBuilder<?> serverBuilder = Grpc.newServerBuilderForPort(port, credentials)
.executor(executor)
.addService(ProtoReflectionService.newInstance());

List<BindableService> services = config.getServices();
for (BindableService service: services) {
serverBuilder.addService(service);
}
List<? extends ServerInterceptor> interceptors = config.getInterceptors();
for (ServerInterceptor interceptor: interceptors) {
serverBuilder.intercept(interceptor);
}
server = serverBuilder.build();
}

private void initServerCredentials() {
Expand Down Expand Up @@ -76,16 +87,12 @@ public void start() throws VeniceException {
try {
server.start();
LOGGER.info(
"Started gRPC server for service: {} on port: {} isSecure: {}",
config.getService().getClass().getSimpleName(),
"Started gRPC server for services: {} on port: {} isSecure: {}",
config.getServices(),
port,
isSecure());
} catch (IOException exception) {
LOGGER.error(
"Failed to start gRPC server for service: {} on port: {}",
config.getService().getClass().getSimpleName(),
port,
exception);
LOGGER.error("Failed to start gRPC server for services: {} on port: {}", config.getServices(), port, exception);
throw new VeniceException("Unable to start gRPC server", exception);
}
}
Expand All @@ -104,8 +111,8 @@ private boolean isSecure() {

public void stop() {
LOGGER.info(
"Shutting down gRPC server for service: {} on port: {} isSecure: {}",
config.getService().getClass().getSimpleName(),
"Shutting down gRPC server for services: {} on port: {} isSecure: {}",
config.getServices(),
port,
isSecure());
if (server != null && !server.isShutdown()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.grpc.BindableService;
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
Expand All @@ -13,15 +14,15 @@
public class VeniceGrpcServerConfig {
private final int port;
private final ServerCredentials credentials;
private final BindableService service;
private final List<BindableService> services;
private final List<? extends ServerInterceptor> interceptors;
private final SSLFactory sslFactory;
private final Executor executor;

private VeniceGrpcServerConfig(Builder builder) {
port = builder.port;
credentials = builder.credentials;
service = builder.service;
services = builder.services;
interceptors = builder.interceptors;
sslFactory = builder.sslFactory;
executor = builder.executor;
Expand All @@ -39,8 +40,8 @@ public Executor getExecutor() {
return executor;
}

public BindableService getService() {
return service;
public List<BindableService> getServices() {
return services;
}

public List<? extends ServerInterceptor> getInterceptors() {
Expand All @@ -53,13 +54,13 @@ public SSLFactory getSslFactory() {

@Override
public String toString() {
return "VeniceGrpcServerConfig{" + "port=" + port + ", service=" + service + "}";
return "VeniceGrpcServerConfig{" + "port=" + port + ", services=" + services + "}";
}

public static class Builder {
private Integer port;
private ServerCredentials credentials;
private BindableService service;
private final List<BindableService> services = new ArrayList<>(4);
private List<? extends ServerInterceptor> interceptors;
private SSLFactory sslFactory;
private int numThreads;
Expand All @@ -75,8 +76,13 @@ public Builder setCredentials(ServerCredentials credentials) {
return this;
}

public Builder setService(BindableService service) {
this.service = service;
public Builder addService(BindableService service) {
this.services.add(service);
return this;
}

public Builder setServices(List<BindableService> services) {
this.services.addAll(services);
return this;
}

Expand Down Expand Up @@ -114,8 +120,8 @@ private void verifyAndAddDefaults() {
if (port == null) {
throw new IllegalArgumentException("Port value is required to create the gRPC server but was not provided.");
}
if (service == null) {
throw new IllegalArgumentException("A non-null gRPC service instance is required to create the server.");
if (services.isEmpty()) {
throw new IllegalArgumentException("Service value is required to create the gRPC server but was not provided.");
}
if (numThreads <= 0 && executor == null) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.linkedin.venice.protocols.controller;

import "google/rpc/status.proto";
import "google/rpc/error_details.proto";
import "controller/ControllerGrpcRequestContext.proto";

option java_multiple_files = true;

Expand All @@ -13,9 +14,6 @@ service VeniceControllerGrpcService {

// ControllerRoutes
rpc getLeaderController(LeaderControllerGrpcRequest) returns (LeaderControllerGrpcResponse);

// CreateStore
rpc createStore(CreateStoreGrpcRequest) returns (CreateStoreGrpcResponse) {}
}

message DiscoverClusterGrpcRequest {
Expand All @@ -31,54 +29,12 @@ message DiscoverClusterGrpcResponse {
optional string pubSubBootstrapServers = 6;
}

message ClusterStoreGrpcInfo {
string clusterName = 1;
string storeName = 2;
}

message CreateStoreGrpcRequest {
ClusterStoreGrpcInfo clusterStoreInfo = 1;
string keySchema = 2;
string valueSchema = 3;
optional string owner = 4;
optional bool isSystemStore = 5;
optional string accessPermission = 6;
}

message CreateStoreGrpcResponse {
ClusterStoreGrpcInfo clusterStoreInfo = 1;
string owner = 2;
}

enum ControllerGrpcErrorType {
UNKNOWN = 0;
INCORRECT_CONTROLLER = 1;
INVALID_SCHEMA = 2;
INVALID_CONFIG = 3;
STORE_NOT_FOUND = 4;
SCHEMA_NOT_FOUND = 5;
CONNECTION_ERROR = 6;
GENERAL_ERROR = 7;
BAD_REQUEST = 8;
CONCURRENT_BATCH_PUSH = 9;
RESOURCE_STILL_EXISTS = 10;
UNAUTHORIZED = 11;
}

message VeniceControllerGrpcErrorInfo {
uint32 statusCode = 1;
string errorMessage = 2;
optional ControllerGrpcErrorType errorType = 3;
optional string clusterName = 4;
optional string storeName = 5;
}

message LeaderControllerGrpcRequest {
string clusterName = 1; // The cluster name
}

message LeaderControllerGrpcResponse {
string clusterName = 1; // The cluster name
string clusterName = 1; // The cluster name
string httpUrl = 2; // Leader controller URL
string httpsUrl = 3; // SSL-enabled leader controller URL
string grpcUrl = 4; // gRPC URL for leader controller
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
syntax = 'proto3';
package com.linkedin.venice.protocols.controller;

import "google/rpc/status.proto";
import "google/rpc/error_details.proto";
import "google/protobuf/timestamp.proto";

option java_multiple_files = true;

message ClusterStoreGrpcInfo {
string clusterName = 1;
string storeName = 2;
}

enum ControllerGrpcErrorType {
UNKNOWN = 0;
INCORRECT_CONTROLLER = 1;
INVALID_SCHEMA = 2;
INVALID_CONFIG = 3;
STORE_NOT_FOUND = 4;
SCHEMA_NOT_FOUND = 5;
CONNECTION_ERROR = 6;
GENERAL_ERROR = 7;
BAD_REQUEST = 8;
CONCURRENT_BATCH_PUSH = 9;
RESOURCE_STILL_EXISTS = 10;
UNAUTHORIZED = 11;
}

message VeniceControllerGrpcErrorInfo {
uint32 statusCode = 1;
string errorMessage = 2;
optional ControllerGrpcErrorType errorType = 3;
optional string clusterName = 4;
optional string storeName = 5;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
syntax = 'proto3';
package com.linkedin.venice.protocols.controller;


import "controller/ControllerGrpcRequestContext.proto";

option java_multiple_files = true;

service StoreGrpcService {
rpc createStore(CreateStoreGrpcRequest) returns (CreateStoreGrpcResponse);
rpc updateAclForStore(UpdateAclForStoreGrpcRequest) returns (UpdateAclForStoreGrpcResponse);
rpc getAclForStore(GetAclForStoreGrpcRequest) returns (GetAclForStoreGrpcResponse);
rpc deleteAclForStore(DeleteAclForStoreGrpcRequest) returns (DeleteAclForStoreGrpcResponse);
rpc checkResourceCleanupForStoreCreation(ClusterStoreGrpcInfo) returns (ResourceCleanupCheckGrpcResponse) {}
}

message CreateStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
string keySchema = 2;
string valueSchema = 3;
optional string owner = 4;
optional bool isSystemStore = 5;
optional string accessPermission = 6;
}

message CreateStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
string owner = 2;
}

message UpdateAclForStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
string accessPermissions = 3;
}

message UpdateAclForStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
}

message GetAclForStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
}

message GetAclForStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
string accessPermissions = 2;
}

message DeleteAclForStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
}

message DeleteAclForStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
}

message ResourceCleanupCheckGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
bool hasLingeringResources = 2;
optional string description = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.google.rpc.Code;
import com.google.rpc.Status;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.controller.grpc.GrpcRequestResponseConverter;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo;
import com.linkedin.venice.protocols.controller.ControllerGrpcErrorType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.venice.exceptions;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import org.testng.annotations.Test;


public class VeniceUnauthorizedAccessExceptionTest {
@Test
public void testExceptionWithMessage() {
String message = "Unauthorized access to the resource";
VeniceUnauthorizedAccessException exception = new VeniceUnauthorizedAccessException(message);

assertNotNull(exception);
assertEquals(exception.getMessage(), message);
}

@Test
public void testExceptionWithMessageAndCause() {
String message = "Unauthorized access due to invalid credentials";
Throwable cause = new RuntimeException("Invalid credentials");
VeniceUnauthorizedAccessException exception = new VeniceUnauthorizedAccessException(message, cause);

assertNotNull(exception);
assertEquals(exception.getMessage(), message);
assertEquals(exception.getCause(), cause);
}
}
Loading

0 comments on commit 43a6135

Please sign in to comment.