Skip to content

Commit

Permalink
Enable verification test to run cluster with replication factor of 2
Browse files Browse the repository at this point in the history
To run with a replication factor of 2, we need to manually provision the cluster
with it. This commit adds the required proto files and calls the grpc endpoint to
manually provision a cluster.

This fixes restatedev#3.
  • Loading branch information
tillrohrmann committed Dec 30, 2024
1 parent c96b7f1 commit 1c1271a
Show file tree
Hide file tree
Showing 11 changed files with 543 additions and 10 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ WORKDIR /usr/src/app
COPY . .

RUN npm install
RUN npm run proto
RUN npm run build

FROM node:23 as prod
Expand Down
8 changes: 8 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: v2
clean: true
plugins:
- remote: buf.build/community/stephenh-ts-proto
out: src/generated
opt: outputSchema=true,env=node,esModuleInterop=true,lowerCaseServiceMethods=true,paths=source_relative,useExactTypes=false
inputs:
- directory: proto
10 changes: 10 additions & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# For details on buf.yaml configuration, visit https://buf.build/docs/configuration/v2/buf-yaml
version: v2
modules:
- path: proto
lint:
use:
- STANDARD
breaking:
use:
- FILE
7 changes: 6 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@
"main": "app.js",
"type": "commonjs",
"scripts": {
"proto": "npx buf generate",
"build": "tsc --noEmitOnError",
"lint": "eslint --ignore-path .eslintignore --ext .ts src",
"format": "prettier --ignore-path .eslintignore --write \"src/**/*.+(js|ts|json)\"",
"app": "node ./dist/app.js"
},
"author": "Restate developers",
"dependencies": {
"@grpc/grpc-js": "^1.12.5",
"@restatedev/restate-sdk-clients": "dev",
"testcontainers": "^10.9.0"
"protobufjs": "^7.4.0",
"testcontainers": "^10.9.0",
"ts-proto": "^2.6.1"
},
"devDependencies": {
"@bufbuild/buf": "^1.48.0",
"eslint": "^9.11.1",
"prettier": "^3.3.3",
"typescript": "^5.6.2"
Expand Down
73 changes: 73 additions & 0 deletions proto/node_ctl_svc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate service protocol, which is
// released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/proto/blob/main/LICENSE

syntax = "proto3";

import "google/protobuf/empty.proto";
import "restate/cluster.proto";
import "restate/common.proto";
import "restate/node.proto";

package restate.node_ctl_svc;

service NodeCtlSvc {
// Get identity information from this node.
rpc GetIdent(google.protobuf.Empty) returns (IdentResponse);

rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse);

// Provision the Restate cluster on this node.
rpc ProvisionCluster(ProvisionClusterRequest) returns (ProvisionClusterResponse);
}

message ProvisionClusterRequest {
bool dry_run = 1;
// if unset then the configured cluster num partitions will be used
optional uint32 num_partitions = 2;
// if unset then the configured cluster placement strategy will be used
optional restate.cluster.ReplicationStrategy placement_strategy = 3;
// if unset then the configured cluster default log provider will be used
optional restate.cluster.DefaultProvider log_provider = 4;
}

message ProvisionClusterResponse {
bool dry_run = 1;
restate.cluster.ClusterConfiguration cluster_configuration = 2;
}

message IdentResponse {
restate.common.NodeStatus status = 1;
restate.common.NodeId node_id = 2;
string cluster_name = 3;
// indicates which roles are enabled on this node
repeated string roles = 4;
// Age of the running node in seconds (how many seconds since the daemon
// started)
uint64 age_s = 5;
restate.common.AdminStatus admin_status = 6;
restate.common.WorkerStatus worker_status = 7;
restate.common.LogServerStatus log_server_status = 8;
restate.common.MetadataServerStatus metadata_server_status = 9;
uint32 nodes_config_version = 10;
uint32 logs_version = 11;
uint32 schema_version = 12;
uint32 partition_table_version = 13;
}

message GetMetadataRequest {
// If set, we'll first sync with metadata store to esnure we are returning the latest value.
// Otherwise, we'll return the local value on this node.
bool sync = 1;
restate.node.MetadataKind kind = 2;
}

message GetMetadataResponse {
// polymorphic. The value depends on the MetadataKind requested
bytes encoded = 1;
}
114 changes: 114 additions & 0 deletions proto/restate/cluster.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

syntax = "proto3";

import "restate/common.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

package restate.cluster;

message ClusterState {
google.protobuf.Duration last_refreshed = 1;
restate.common.Version nodes_config_version = 2;
restate.common.Version partition_table_version = 3;
map<uint32, NodeState> nodes = 4;
restate.common.Version logs_metadata_version = 5;
}

message NodeState {
oneof state {
AliveNode alive = 1;
DeadNode dead = 2;
SuspectNode suspect = 3;
}
}

message SuspectNode {
restate.common.NodeId generational_node_id = 1;
google.protobuf.Timestamp last_attempt = 2;
}

message AliveNode {
restate.common.NodeId generational_node_id = 1;
google.protobuf.Timestamp last_heartbeat_at = 2;
// partition id is u16 but protobuf doesn't support u16. This must be a value
// that's safe to convert to u16
map<uint32, PartitionProcessorStatus> partitions = 3;
}

message DeadNode { google.protobuf.Timestamp last_seen_alive = 1; }

enum RunMode {
RunMode_UNKNOWN = 0;
LEADER = 1;
FOLLOWER = 2;
}

enum ReplayStatus {
ReplayStatus_UNKNOWN = 0;
STARTING = 1;
ACTIVE = 2;
CATCHING_UP = 3;
}

message PartitionProcessorStatus {
google.protobuf.Timestamp updated_at = 1;
RunMode planned_mode = 2;
RunMode effective_mode = 3;
optional restate.common.LeaderEpoch last_observed_leader_epoch = 4;
optional restate.common.NodeId last_observed_leader_node = 5;
optional restate.common.Lsn last_applied_log_lsn = 6;
optional google.protobuf.Timestamp last_record_applied_at = 7;
uint64 num_skipped_records = 8;
ReplayStatus replay_status = 9;
optional restate.common.Lsn last_persisted_log_lsn = 10;
optional restate.common.Lsn last_archived_log_lsn = 12;
// Set if replay_status is CATCHING_UP
optional restate.common.Lsn target_tail_lsn = 11;
}

enum NodeSetSelectionStrategyKind {
NodeSetSelectionStrategyKind_UNKNOWN = 0;
StrictFaultTolerantGreedy = 1;
}

message NodeSetSelectionStrategy { NodeSetSelectionStrategyKind kind = 1; }

message ReplicatedProviderConfig {
string replication_property = 1;
NodeSetSelectionStrategy nodeset_selection_strategy = 2;
}

message DefaultProvider {
string provider = 1;
// only required if provider = "replicated"
optional ReplicatedProviderConfig replicated_config = 2;
}

enum ReplicationStrategyKind {
ReplicationStrategyKind_UNKNOWN = 0;
OnAllNodes = 1;
Factor = 2;
}

message ReplicationStrategy {
ReplicationStrategyKind kind = 1;
// required if kind == "Factor"
optional uint32 factor = 2;
}

message ClusterConfiguration {
uint32 num_partitions = 1;
ReplicationStrategy replication_strategy = 2;
DefaultProvider default_provider = 3;
}
134 changes: 134 additions & 0 deletions proto/restate/common.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate service protocol, which is
// released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/proto/blob/main/LICENSE

syntax = "proto3";

package restate.common;

enum ProtocolVersion {
ProtocolVersion_UNKNOWN = 0;
FLEXBUFFERS = 1;
}

message NodeId {
uint32 id = 1;
optional uint32 generation = 2;
}

// Partition Processor leadership epoch number
message LeaderEpoch { uint64 value = 1; }

// Log sequence number
message Lsn { uint64 value = 1; }

// A generic type for versioned metadata
message Version { uint32 value = 1; }

// The handle name or type tag of the message. For every target there must be
// exactly one message handler implementation.
enum TargetName {
reserved 7, 8;
TargetName_UNKNOWN = 0;
METADATA_MANAGER = 1;
INGRESS = 2;
LOCAL_METADATA_STORE = 3;
LOCAL_METADATA_STORE_CLIENT = 4;
ATTACH_REQUEST = 5;
ATTACH_RESPONSE = 6;
CONTROL_PROCESSORS = 9;
// LogServer
LOG_SERVER_STORE = 10;
LOG_SERVER_STORED = 11;
LOG_SERVER_RELEASE = 12;
LOG_SERVER_RELEASED = 13;
LOG_SERVER_SEAL = 14;
LOG_SERVER_SEALED = 15;
LOG_SERVER_GET_LOGLET_INFO = 16;
LOG_SERVER_LOGLET_INFO = 17;
LOG_SERVER_GET_RECORDS = 18;
LOG_SERVER_RECORDS = 19;
LOG_SERVER_TRIM = 20;
LOG_SERVER_TRIMMED = 21;
LOG_SERVER_WAIT_FOR_TAIL = 22;
LOG_SERVER_TAIL_UPDATED = 23;
LOG_SERVER_GET_DIGEST = 24;
LOG_SERVER_DIGEST = 25;
// Reserving space for more log-server messages
// ReplicatedLoglet
REPLICATED_LOGLET_APPEND = 40;
REPLICATED_LOGLET_APPENDED = 41;
REPLICATED_LOGLET_GET_SEQUENCER_STATE = 42;
REPLICATED_LOGLET_SEQUENCER_STATE = 43;
// Partition Processor
PARTITION_CREATE_SNAPSHOT_REQUEST = 50;
PARTITION_CREATE_SNAPSHOT_RESPONSE = 51;
PARTITION_PROCESSOR_RPC = 52;
PARTITION_PROCESSOR_RPC_RESPONSE = 53;
// Node
NODE_GET_NODE_STATE_REQUEST = 60;
NODE_GET_NODE_STATE_RESPONSE = 61;
// Remote Scanner
REMOTE_QUERY_SCANNER_OPEN = 80;
REMOTE_QUERY_SCANNER_OPENED = 81;
REMOTE_QUERY_SCANNER_NEXT = 82;
REMOTE_QUERY_SCANNER_NEXT_RESULT = 83;
REMOTE_QUERY_SCANNER_CLOSE = 84;
REMOTE_QUERY_SCANNER_CLOSED = 85;
}

// ** Health & Per-role Status

enum NodeStatus {
NodeStatus_UNKNOWN = 0;
// The node has joined the cluster and is fully operational.
ALIVE = 1;
// The node is not fully running yet.
STARTING_UP = 2;
// The node is performing a graceful shutdown.
SHUTTING_DOWN = 3;
}

enum NodeRpcStatus {
NodeRpcStatus_UNKNOWN = 0;
NodeRpcStatus_READY = 1;
NodeRpcStatus_STARTING_UP = 2;
NodeRpcStatus_STOPPING = 3;
}

enum WorkerStatus {
WorkerStatus_UNKNOWN = 0;
WorkerStatus_READY = 1;
WorkerStatus_STARTING_UP = 2;
}

enum AdminStatus {
AdminStatus_UNKNOWN = 0;
AdminStatus_READY = 1;
AdminStatus_STARTING_UP = 2;
}

enum LogServerStatus {
LogServerStatus_UNKNOWN = 0;
LogServerStatus_READY = 1;
LogServerStatus_STARTING_UP = 2;
LogServerStatus_FAILSAFE = 3;
LogServerStatus_STOPPING = 4;
}

enum MetadataServerStatus {
MetadataServerStatus_UNKNOWN = 0;
MetadataServerStatus_READY = 1;
MetadataServerStatus_STARTING_UP = 2;
}

enum IngressStatus {
IngressStatus_UNKNOWN = 0;
IngressStatus_READY = 1;
IngressStatus_STARTING_UP = 2;
}
Loading

0 comments on commit 1c1271a

Please sign in to comment.