Skip to content

Commit

Permalink
feat(pb): box stream NodeBody to reduce stack memory usage (#19911)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Dec 24, 2024
1 parent 6da5969 commit 8a8b295
Show file tree
Hide file tree
Showing 55 changed files with 239 additions and 151 deletions.
56 changes: 42 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ sqlx = { git = "https://github.com/madsim-rs/sqlx.git", rev = "3efe6d0065963db2a
futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" }
# patch to remove preserve_order from serde_json
bson = { git = "https://github.com/risingwavelabs/bson-rust", rev = "e5175ec" }
# TODO: unpatch after PR merged https://github.com/tokio-rs/prost/pull/1210
prost-build = { git = "https://github.com/xxchan/prost.git", rev = "0eb1c7b09976cf6b5231e4b8d87bb5908ae6a163" }

[workspace.metadata.dylint]
libraries = [{ path = "./lints" }]
13 changes: 2 additions & 11 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,7 @@ ln -s "$(pwd)/target/${RISEDEV_BUILD_TARGET_DIR}${BUILD_MODE_DIR}/risingwave" "$
[tasks.post-build-risingwave]
category = "RiseDev - Build"
description = "Copy RisingWave binaries to bin"
condition = { env_set = [
"ENABLE_BUILD_RUST",
], env_not_set = [
"USE_SYSTEM_RISINGWAVE",
] }
condition = { env_set = ["ENABLE_BUILD_RUST"], env_not_set = ["USE_SYSTEM_RISINGWAVE"] }
dependencies = [
"link-all-in-one-binaries",
"link-user-bin",
Expand Down Expand Up @@ -863,11 +859,6 @@ script = """
#!/usr/bin/env bash
set -e
echo "Running Planner Test requires larger stack size, setting RUST_MIN_STACK to 8388608 (8MB) as default."
if [[ -z "${RUST_MIN_STACK}" ]]; then
export RUST_MIN_STACK=8388608
fi
cargo nextest run --workspace --exclude risingwave_simulation "$@"
"""
description = "🌟 Run unit tests"
Expand Down Expand Up @@ -1303,7 +1294,7 @@ echo If you still feel this is not enough, you may copy $(tput setaf 4)risedev$(
[tasks.ci-start]
category = "RiseDev - CI"
dependencies = ["clean-data", "pre-start-dev"]
command = "target/debug/risedev-dev" # `risedev-dev` is always built in dev profile
command = "target/debug/risedev-dev" # `risedev-dev` is always built in dev profile
env = { RISEDEV_CLEAN_START = true }
args = ["${@}"]
description = "Clean data and start a full RisingWave dev cluster using risedev-dev"
Expand Down
1 change: 0 additions & 1 deletion ci/scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ export GCLOUD_DOWNLOAD_TGZ=https://rw-ci-deps-dist.s3.amazonaws.com/google-cloud
export NEXTEST_HIDE_PROGRESS_BAR=true
export RW_TELEMETRY_TYPE=test
export RW_SECRET_STORE_PRIVATE_KEY_HEX="0123456789abcdef0123456789abcdef"
export RUST_MIN_STACK=4194304

unset LANG

Expand Down
1 change: 0 additions & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ risedev ci-kill
export RISINGWAVE_CI=true

echo "--- e2e, ci-kafka-plus-pubsub, legacy kafka tests"
export RUST_MIN_STACK=4194304
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
risedev ci-start ci-kafka
./e2e_test/source_legacy/basic/scripts/prepare_ci_kafka.sh
Expand Down
4 changes: 0 additions & 4 deletions ci/scripts/pr-unit-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,4 @@ set -euo pipefail

source ci/scripts/common.sh
source ci/scripts/pr.env.sh

# Set RUST_MIN_STACK to 8MB to avoid stack overflow in planner test.
# This is a Unit Test specific setting.
export RUST_MIN_STACK=8388608
./ci/scripts/run-unit-test.sh
4 changes: 2 additions & 2 deletions src/frontend/src/expr/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl Expr for UserDefinedFunction {
ExprNode {
function_type: Type::Unspecified.into(),
return_type: Some(self.return_type().to_protobuf()),
rex_node: Some(RexNode::Udf(UserDefinedFunction {
rex_node: Some(RexNode::Udf(Box::new(UserDefinedFunction {
children: self.args.iter().map(Expr::to_expr_proto).collect(),
name: self.catalog.name.clone(),
arg_names: self.catalog.arg_names.clone(),
Expand All @@ -98,7 +98,7 @@ impl Expr for UserDefinedFunction {
body: self.catalog.body.clone(),
compressed_binary: self.catalog.compressed_binary.clone(),
always_retry_on_network_error: self.catalog.always_retry_on_network_error,
})),
}))),
}
}
}
4 changes: 2 additions & 2 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,10 @@ pub(crate) fn hijack_merger_for_target_table(
}
}

let pb_project = PbNodeBody::Project(ProjectNode {
let pb_project = PbNodeBody::Project(Box::new(ProjectNode {
select_list: exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
..Default::default()
});
}));

for fragment in graph.fragments.values_mut() {
if let Some(node) = &mut fragment.node {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,9 +649,9 @@ pub(crate) fn insert_merger_to_union_with_project(
// TODO: MergeNode is used as a placeholder, see issue #17658
node.input.push(StreamNode {
input: vec![StreamNode {
node_body: Some(NodeBody::Merge(MergeNode {
node_body: Some(NodeBody::Merge(Box::new(MergeNode {
..Default::default()
})),
}))),
..Default::default()
}],
identity: uniq_identity
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_asof_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl StreamNode for StreamAsOfJoin {
_ => unreachable!(),
};

NodeBody::AsOfJoin(AsOfJoinNode {
NodeBody::AsOfJoin(Box::new(AsOfJoinNode {
join_type: asof_join_type.into(),
left_key: left_jk_indices_prost,
right_key: right_jk_indices_prost,
Expand All @@ -321,7 +321,7 @@ impl StreamNode for StreamAsOfJoin {
right_deduped_input_pk_indices,
output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
asof_desc: Some(self.inequality_desc),
})
}))
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ impl StreamCdcTableScan {
append_only: true,
identity: "StreamCdcFilter".to_owned(),
fields: cdc_source_schema.clone(),
node_body: Some(PbNodeBody::CdcFilter(CdcFilterNode {
node_body: Some(PbNodeBody::CdcFilter(Box::new(CdcFilterNode {
search_condition: Some(filter_expr.to_expr_proto()),
upstream_source_id,
})),
}))),
};

let exchange_operator_id = self.core.ctx.next_plan_node_id();
Expand All @@ -205,13 +205,13 @@ impl StreamCdcTableScan {
append_only: true,
identity: "Exchange".to_owned(),
fields: cdc_source_schema.clone(),
node_body: Some(PbNodeBody::Exchange(ExchangeNode {
node_body: Some(PbNodeBody::Exchange(Box::new(ExchangeNode {
strategy: Some(DispatchStrategy {
r#type: DispatcherType::Simple as _,
dist_key_indices: vec![], // simple exchange doesn't need dist key
output_indices: (0..cdc_source_schema.len() as u32).collect(),
}),
})),
}))),
};

// The required columns from the external table
Expand Down Expand Up @@ -242,7 +242,7 @@ impl StreamCdcTableScan {
);

let options = self.core.options.to_proto();
let stream_scan_body = PbNodeBody::StreamCdcScan(StreamCdcScanNode {
let stream_scan_body = PbNodeBody::StreamCdcScan(Box::new(StreamCdcScanNode {
table_id: upstream_source_id,
upstream_column_ids,
output_indices,
Expand All @@ -252,7 +252,7 @@ impl StreamCdcTableScan {
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
disable_backfill: options.disable_backfill,
options: Some(options),
});
}));

// plan: merge -> filter -> exchange(simple) -> stream_scan
Ok(PbStreamNode {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_changelog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ impl_distill_by_unit!(StreamChangeLog, core, "StreamChangeLog");

impl StreamNode for StreamChangeLog {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
PbNodeBody::Changelog(ChangeLogNode {
PbNodeBody::Changelog(Box::new(ChangeLogNode {
need_op: self.core.need_op,
})
}))
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,15 @@ impl StreamNode for StreamDedup {
let table_catalog = self
.infer_internal_table_catalog()
.with_id(state.gen_table_id_wrapped());
PbNodeBody::AppendOnlyDedup(DedupNode {
PbNodeBody::AppendOnlyDedup(Box::new(DedupNode {
state_table: Some(table_catalog.to_internal_table_prost()),
dedup_column_indices: self
.core
.dedup_cols
.iter()
.map(|idx| *idx as _)
.collect_vec(),
})
}))
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl TryToStreamPb for StreamDeltaJoin {
// TODO: add a separate delta join node in proto, or move fragmenter to frontend so that we
// don't need an intermediate representation.
let eq_join_predicate = &self.eq_join_predicate;
Ok(NodeBody::DeltaIndexJoin(DeltaIndexJoinNode {
Ok(NodeBody::DeltaIndexJoin(Box::new(DeltaIndexJoinNode {
join_type: self.core.join_type as i32,
left_key: eq_join_predicate
.left_eq_indexes()
Expand Down Expand Up @@ -210,7 +210,7 @@ impl TryToStreamPb for StreamDeltaJoin {
.collect(),
}),
output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
}))
})))
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ impl StreamNode for StreamDml {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
use risingwave_pb::stream_plan::*;

PbNodeBody::Dml(DmlNode {
PbNodeBody::Dml(Box::new(DmlNode {
table_id: 0, // Meta will fill this table id.
table_version_id: INITIAL_TABLE_VERSION_ID, // Meta will fill this version id.
column_descs: self.column_descs.iter().map(Into::into).collect(),
rate_limit: self.base.ctx().overwrite_options().dml_rate_limit,
})
}))
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ impl StreamNode for StreamDynamicFilter {
let right_table = infer_right_internal_table_catalog(right.plan_base())
.with_id(state.gen_table_id_wrapped());
#[allow(deprecated)]
NodeBody::DynamicFilter(DynamicFilterNode {
NodeBody::DynamicFilter(Box::new(DynamicFilterNode {
left_key: left_index as u32,
condition,
left_table: Some(left_table.to_internal_table_prost()),
right_table: Some(right_table.to_internal_table_prost()),
condition_always_relax: false, // deprecated
})
}))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ impl StreamNode for StreamEowcOverWindow {
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost();

PbNodeBody::EowcOverWindow(EowcOverWindowNode {
PbNodeBody::EowcOverWindow(Box::new(EowcOverWindowNode {
calls,
partition_by,
order_by,
state_table: Some(state_table),
})
}))
}
}

Expand Down
Loading

0 comments on commit 8a8b295

Please sign in to comment.