Skip to content

Commit

Permalink
fix: create a new sort order representing the UNIONed output
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Sep 19, 2024
1 parent 1304bdf commit ad4a321
Showing 1 changed file with 102 additions and 23 deletions.
125 changes: 102 additions & 23 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1524,6 +1524,102 @@ impl Hash for ExprWrapper {
}
}

/// Take sort orderings for unioned sides of equal length, and return the unioned ordering.
///
/// Example:
/// child1 = order by a0, b, c
/// child2 = order by a, b, c
/// => union's joint order is a0, a, b, c.
fn calculate_joint_ordering(
lhs: &EquivalenceProperties,
rhs: &EquivalenceProperties,
) -> LexOrdering {
let mut union_ordering = vec![];
for ordering in lhs
.normalized_oeq_class()
.orderings
.iter()
.chain(rhs.normalized_oeq_class().orderings.iter())
{
if union_ordering.is_empty() {
union_ordering = ordering.clone();
continue;
}

if !union_ordering.len().eq(&ordering.len()) {
break;
}

let mut unioned = union_ordering.into_iter().peekable();
let mut curr = ordering.iter().peekable();
let mut new_union = vec![];
loop {
match (curr.next(), unioned.next()) {
(None, None) => break,
(None, Some(u)) => {
new_union.push(u.clone());
continue;
}
(Some(c), None) => {
new_union.push(c.clone());
continue;
}
(Some(c), Some(u)) => {
if c.eq(&u) {
new_union.push(c.clone());
continue;
} else if c.expr.eq(&u.expr) {
// options are different => negates each other
continue;
} else {
new_union.push(u.clone());
new_union.push(c.clone());
continue;
}
}
}
}
union_ordering = new_union;
}
collapse_lex_ordering(union_ordering)
}

/// Take sort orderings for unioned sides return the shorten, novel sort order.
///
/// Example:
/// child1 = order by a, b
/// child2 = order by a1, b1, c1
/// => union's prefixed order is a, b.
fn calculate_prefix_ordering(
lhs: &EquivalenceProperties,
rhs: &EquivalenceProperties,
) -> Vec<LexOrdering> {
// Calculate valid orderings for the union by searching for prefixes
// in both sides.
let mut orderings = vec![];
for mut ordering in lhs.normalized_oeq_class().orderings {
// Progressively shorten the ordering to search for a satisfied prefix:
while !rhs.ordering_satisfy(&ordering) {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}
for mut ordering in rhs.normalized_oeq_class().orderings {
// Progressively shorten the ordering to search for a satisfied prefix:
while !lhs.ordering_satisfy(&ordering) {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}
orderings
}

/// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties`
/// of `lhs` and `rhs` according to the schema of `lhs`.
fn calculate_union_binary(
Expand Down Expand Up @@ -1553,32 +1649,15 @@ fn calculate_union_binary(
})
.collect();

// Next, calculate valid orderings for the union by searching for prefixes
// in both sides.
let mut orderings = vec![];
for mut ordering in lhs.normalized_oeq_class().orderings {
// Progressively shorten the ordering to search for a satisfied prefix:
while !rhs.ordering_satisfy(&ordering) {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}
for mut ordering in rhs.normalized_oeq_class().orderings {
// Progressively shorten the ordering to search for a satisfied prefix:
while !lhs.ordering_satisfy(&ordering) {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}
// Create a unioned ordering.
let mut orderings = calculate_prefix_ordering(&lhs, &rhs);
let union_ordering = calculate_joint_ordering(&lhs, &rhs);
orderings.push(union_ordering);

let mut eq_properties = EquivalenceProperties::new(lhs.schema);
eq_properties.constants = constants;
eq_properties.add_new_orderings(orderings);

Ok(eq_properties)
}

Expand Down

0 comments on commit ad4a321

Please sign in to comment.