Skip to content

Commit

Permalink
HIVE-28729: Apply nulls order setting in Reduce Sink operator of join…
Browse files Browse the repository at this point in the history
… branches
  • Loading branch information
kasakrisz committed Jan 30, 2025
1 parent 4f22ecd commit 0d2cca0
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rex.RexNode;
import org.apache.hadoop.hive.ql.util.NullOrdering;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
Expand All @@ -56,20 +57,28 @@
*/
public class HiveInsertExchange4JoinRule extends RelOptRule {

protected static transient final Logger LOG = LoggerFactory
.getLogger(HiveInsertExchange4JoinRule.class);
protected static final Logger LOG = LoggerFactory.getLogger(HiveInsertExchange4JoinRule.class);

/** Rule that creates Exchange operators under a MultiJoin operator. */
public static final HiveInsertExchange4JoinRule EXCHANGE_BELOW_MULTIJOIN =
new HiveInsertExchange4JoinRule(HiveMultiJoin.class);
public static HiveInsertExchange4JoinRule exchangeBelowMultiJoin(
RelFieldCollation.NullDirection defaultAscNullDirection) {
return new HiveInsertExchange4JoinRule(HiveMultiJoin.class, defaultAscNullDirection);
}


/** Rule that creates Exchange operators under a Join operator. */
public static final HiveInsertExchange4JoinRule EXCHANGE_BELOW_JOIN =
new HiveInsertExchange4JoinRule(Join.class);
public static HiveInsertExchange4JoinRule exchangeBelowJoin(
RelFieldCollation.NullDirection defaultAscNullDirection) {
return new HiveInsertExchange4JoinRule(Join.class, defaultAscNullDirection);
}

private final RelFieldCollation.NullDirection defaultAscNullDirection;

public HiveInsertExchange4JoinRule(Class<? extends RelNode> clazz) {
public HiveInsertExchange4JoinRule(
Class<? extends RelNode> clazz, RelFieldCollation.NullDirection defaultAscNullDirection) {
// match multijoin or join
super(RelOptRule.operand(clazz, any()));
this.defaultAscNullDirection = defaultAscNullDirection;
}

@Override
Expand Down Expand Up @@ -118,7 +127,7 @@ public void onMatch(RelOptRuleCall call) {
for (int pos : joinLeafPredInfo.getProjsJoinKeysInChildSchema(i)) {
if (!joinKeyPositions.contains(pos)) {
joinKeyPositions.add(pos);
collationListBuilder.add(new RelFieldCollation(pos));
collationListBuilder.add(new RelFieldCollation(pos, RelFieldCollation.Direction.ASCENDING, defaultAscNullDirection));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2416,7 +2416,8 @@ private RelNode applyPostJoinOrderingTransform(RelNode basePlan, RelMetadataProv

// 9.2. Introduce exchange operators below join/multijoin operators
generatePartialProgram(program, false, HepMatchOrder.DEPTH_FIRST,
HiveInsertExchange4JoinRule.EXCHANGE_BELOW_JOIN, HiveInsertExchange4JoinRule.EXCHANGE_BELOW_MULTIJOIN);
HiveInsertExchange4JoinRule.exchangeBelowJoin(NullOrdering.defaultNullOrder(conf).getDirection()),
HiveInsertExchange4JoinRule.exchangeBelowMultiJoin(NullOrdering.defaultNullOrder(conf).getDirection()));
} else {
generatePartialProgram(program, false, HepMatchOrder.DEPTH_FIRST,
HiveProjectSortExchangeTransposeRule.INSTANCE, HiveProjectMergeRule.INSTANCE);
Expand Down
7 changes: 7 additions & 0 deletions ql/src/test/queries/clientpositive/cbo_rp_null_order.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
SET hive.cbo.returnpath.hiveop=true;
SET hive.default.nulls.last=false;

CREATE TABLE t1(key int, value string);

EXPLAIN CBO SELECT * FROM t1 a INNER JOIN t1 b on a.key = b.key;
EXPLAIN SELECT * FROM t1 a INNER JOIN t1 b on a.key = b.key;
116 changes: 116 additions & 0 deletions ql/src/test/results/clientpositive/llap/cbo_rp_null_order.q.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
PREHOOK: query: CREATE TABLE t1(key int, value string)
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@t1
POSTHOOK: query: CREATE TABLE t1(key int, value string)
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@t1
PREHOOK: query: EXPLAIN CBO SELECT * FROM t1 a INNER JOIN t1 b on a.key = b.key
PREHOOK: type: QUERY
PREHOOK: Input: default@t1
#### A masked pattern was here ####
POSTHOOK: query: EXPLAIN CBO SELECT * FROM t1 a INNER JOIN t1 b on a.key = b.key
POSTHOOK: type: QUERY
POSTHOOK: Input: default@t1
#### A masked pattern was here ####
CBO PLAN:
HiveJoin(condition=[=($0, $2)], joinType=[inner], algorithm=[none], cost=[not available])
HiveSortExchange(distribution=[hash[0]], collation=[[0 ASC-nulls-first]])
HiveProject(key=[$0], value=[$1])
HiveFilter(condition=[IS NOT NULL($0)])
HiveTableScan(table=[[default, t1]], qbid:alias=[a])
HiveSortExchange(distribution=[hash[0]], collation=[[0 ASC-nulls-first]])
HiveProject(key=[$0], value=[$1])
HiveFilter(condition=[IS NOT NULL($0)])
HiveTableScan(table=[[default, t1]], qbid:alias=[b])

PREHOOK: query: EXPLAIN SELECT * FROM t1 a INNER JOIN t1 b on a.key = b.key
PREHOOK: type: QUERY
PREHOOK: Input: default@t1
#### A masked pattern was here ####
POSTHOOK: query: EXPLAIN SELECT * FROM t1 a INNER JOIN t1 b on a.key = b.key
POSTHOOK: type: QUERY
POSTHOOK: Input: default@t1
#### A masked pattern was here ####
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: a
filterExpr: key is not null (type: boolean)
Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: key is not null (type: boolean)
Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: key (type: int), value (type: string)
outputColumnNames: key, value
Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: key (type: int)
null sort order: a
sort order: +
Map-reduce partition columns: key (type: int)
Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: NONE
value expressions: value (type: string)
Execution mode: vectorized, llap
LLAP IO: all inputs
Map 3
Map Operator Tree:
TableScan
alias: b
filterExpr: key is not null (type: boolean)
Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: key is not null (type: boolean)
Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: key (type: int), value (type: string)
outputColumnNames: key, value
Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: key (type: int)
null sort order: a
sort order: +
Map-reduce partition columns: key (type: int)
Statistics: Num rows: 1 Data size: 188 Basic stats: COMPLETE Column stats: NONE
value expressions: value (type: string)
Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Execution mode: llap
Reduce Operator Tree:
Merge Join Operator
condition map:
Inner Join 0 to 1
keys:
0 key (type: int)
1 key (type: int)
outputColumnNames: key, value, key0, value0
Statistics: Num rows: 1 Data size: 206 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 206 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

0 comments on commit 0d2cca0

Please sign in to comment.