Skip to content

Commit

Permalink
refactor: Small drive-by's (#20772)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jan 17, 2025
1 parent 6753bb6 commit 672e4c6
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/aexpr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub enum IRAggExpr {
method: QuantileMethod,
},
Sum(Node),
// include_nulls
Count(Node, bool),
Std(Node, u8),
Var(Node, u8),
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/aexpr/traverse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::*;
impl AExpr {
/// Push the inputs of this node to the given container, in reverse order.
/// This ensures the primary node responsible for the name is pushed last.
pub(crate) fn inputs_rev<E>(&self, container: &mut E)
pub fn inputs_rev<E>(&self, container: &mut E)
where
E: Extend<Node>,
{
Expand Down
15 changes: 9 additions & 6 deletions crates/polars-stream/src/physical_plan/lower_group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use polars_plan::prelude::GroupbyOptions;
use polars_utils::arena::{Arena, Node};
use polars_utils::itertools::Itertools;
use polars_utils::pl_str::PlSmallStr;
use recursive::recursive;
use slotmap::SlotMap;

use super::lower_expr::lower_exprs;
Expand Down Expand Up @@ -71,6 +72,7 @@ fn build_group_by_fallback(
///
/// Such an expression is defined as the elementwise combination of scalar
/// aggregations of elementwise combinations of the input columns / scalar literals.
#[recursive]
fn try_lower_elementwise_scalar_agg_expr(
expr: Node,
inside_agg: bool,
Expand Down Expand Up @@ -178,7 +180,10 @@ fn try_lower_elementwise_scalar_agg_expr(
},

AExpr::Agg(agg) => {
let orig_agg = agg.clone();
// Nested aggregates not supported.
if inside_agg {
return None;
}
match agg {
IRAggExpr::Min { input, .. }
| IRAggExpr::Max { input, .. }
Expand All @@ -188,10 +193,7 @@ fn try_lower_elementwise_scalar_agg_expr(
| IRAggExpr::Sum(input)
| IRAggExpr::Var(input, ..)
| IRAggExpr::Std(input, ..) => {
// Nested aggregates not supported.
if inside_agg {
return None;
}
let orig_agg = agg.clone();
// Lower and replace input.
let trans_input = lower_rec!(*input, true)?;
let mut trans_agg = orig_agg;
Expand Down Expand Up @@ -311,7 +313,8 @@ fn try_build_streaming_group_by(
&mut trans_agg_exprs,
&trans_input_cols,
)?;
trans_output_exprs.push(ExprIR::new(trans_node, agg.output_name_inner().clone()));
let output_name = OutputName::Alias(agg.output_name().clone());
trans_output_exprs.push(ExprIR::new(trans_node, output_name));
}

let input_schema = &phys_sm[trans_input.node].output_schema;
Expand Down

0 comments on commit 672e4c6

Please sign in to comment.