Skip to content

Commit

Permalink
move OrderByRelBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
kasakrisz committed Jan 21, 2025
1 parent 37204a2 commit f30c55e
Show file tree
Hide file tree
Showing 2 changed files with 348 additions and 285 deletions.
295 changes: 10 additions & 285 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -1610,6 +1610,10 @@ protected CalcitePlannerAction(
this.columnAccessInfo = columnAccessInfo;
}

RelOptCluster getCluster() {
return cluster;
}

@Override
public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlus rootSchema) {
PerfLogger perfLogger = SessionState.getPerfLogger();
Expand Down Expand Up @@ -3932,7 +3936,7 @@ private RelNode genOBLogicalPlan(QB qb, Pair<RelNode, RowResolver> selPair,
fetchRN = cluster.getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit));
}

return new OrderByRelBuilder(this, selPair, outermostOB)
return new OrderByRelBuilder(CalcitePlanner.this, this, selPair, outermostOB)
.addSortByKeys(obAST)
.sortLimit(offsetRN, fetchRN);
}
Expand All @@ -3948,7 +3952,7 @@ private RelNode genSBLogicalPlan(QB qb, Pair<RelNode, RowResolver> selPair,
return null;
}

return new OrderByRelBuilder(this, selPair, outermostOB)
return new OrderByRelBuilder(CalcitePlanner.this, this, selPair, outermostOB)
.addSortByKeys(sbAST)
.addRelDistribution(distributeByAST)
.sortExchange();
Expand All @@ -3964,12 +3968,12 @@ private RelNode genClusterByLogicalPlan(QB qb, Pair<RelNode, RowResolver> selPai
return null;
}

return new OrderByRelBuilder(this, selPair, outermostOB)
return new OrderByRelBuilder(CalcitePlanner.this, this, selPair, outermostOB)
.addClusterBy(clusterByAST)
.sortExchange();
}

private List<RexNode> toRexNodeList(RelNode srcRel) {
List<RexNode> toRexNodeList(RelNode srcRel) {
return srcRel.getRowType().getFieldList().stream()
.map(input -> new RexInputRef(input.getIndex(), input.getType()))
.collect(Collectors.toList());
Expand Down Expand Up @@ -4210,7 +4214,7 @@ private RelNode genSelectForWindowing(QB qb, RelNode srcRel, HashSet<ColumnInfo>
return genSelectRelNode(projsForWindowSelOp, out_rwsch, srcRel, windowExpressions);
}

private RelNode genSelectRelNode(List<RexNode> calciteColLst, RowResolver out_rwsch,
RelNode genSelectRelNode(List<RexNode> calciteColLst, RowResolver out_rwsch,
RelNode srcRel) throws CalciteSemanticException {
return genSelectRelNode(calciteColLst, out_rwsch, srcRel, null);
}
Expand Down Expand Up @@ -5082,7 +5086,7 @@ private RelNode genQualifyLogicalPlan(QB qb, RelNode srcRel) throws SemanticExce
return genFilterRelNode(qb, targetNode, srcRel, null, null, true);
}

private ImmutableMap<String, Integer> buildHiveToCalciteColumnMap(RowResolver rr) {
ImmutableMap<String, Integer> buildHiveToCalciteColumnMap(RowResolver rr) {
ImmutableMap.Builder<String, Integer> b = new ImmutableMap.Builder<String, Integer>();
for (ColumnInfo ci : rr.getRowSchema().getSignature()) {
b.put(ci.getInternalName(), rr.getPosition(ci.getInternalName()));
Expand Down Expand Up @@ -5364,283 +5368,4 @@ private enum TableType {
NATIVE,
JDBC
}

private class OrderByRelBuilder {
private final CalcitePlannerAction calcitePlannerAction;
private final List<RexNode> newVCLst = new ArrayList<>();
private final List<RelFieldCollation> fieldCollations = Lists.newArrayList();
private final List<Pair<ASTNode, TypeInfo>> vcASTTypePairs = new ArrayList<>();
private final RowResolver outputRR = new RowResolver();
private final Pair<RelNode, RowResolver> selPair;
private final boolean outermostOB;

private HiveRelDistribution hiveRelDistribution;
private RelNode obInputRel;

OrderByRelBuilder(
CalcitePlannerAction calcitePlannerAction, Pair<RelNode, RowResolver> selPair, boolean outermostOB) {
this.calcitePlannerAction = calcitePlannerAction;
this.selPair = selPair;
this.outermostOB = outermostOB;
}

// - Walk through OB exprs and extract field collations and additional virtual columns needed
// - Add Child Project Rel if needed,
// - Generate Output RR, input Sel Rel for top constraining Sel
OrderByRelBuilder addSortByKeys(ASTNode obAST) throws SemanticException {
if (obAST == null) {
return this;
}

// 2. Walk through OB exprs and extract field collations and additional
// virtual columns needed
List<Node> obASTExprLst = obAST.getChildren();

for (int i = 0; i < obASTExprLst.size(); i++) {
// 2.1 Convert AST Expr to ExprNode
ASTNode orderByNode = (ASTNode) obASTExprLst.get(i);
ASTNode nullObASTExpr = (ASTNode) orderByNode.getChild(0);
ASTNode ref = (ASTNode) nullObASTExpr.getChild(0);

int fieldIndex = genSortByKey(ref);

// 2.4 Determine the Direction of order by
RelFieldCollation.Direction order = RelFieldCollation.Direction.DESCENDING;
if (orderByNode.getType() == HiveParser.TOK_TABSORTCOLNAMEASC) {
order = RelFieldCollation.Direction.ASCENDING;
}
RelFieldCollation.NullDirection nullOrder;
if (nullObASTExpr.getType() == HiveParser.TOK_NULLS_FIRST) {
nullOrder = RelFieldCollation.NullDirection.FIRST;
} else if (nullObASTExpr.getType() == HiveParser.TOK_NULLS_LAST) {
nullOrder = RelFieldCollation.NullDirection.LAST;
} else {
throw new SemanticException("Unexpected null ordering option: "
+ nullObASTExpr.getType());
}

// 2.5 Add to field collations
fieldCollations.add(new RelFieldCollation(fieldIndex, order, nullOrder));
}

return this;
}

private int genSortByKey(ASTNode ref) throws SemanticException {
// selPair.getKey() is the operator right before OB
// selPair.getValue() is RR which only contains columns needed in result
// set. Extra columns needed by order by will be absent from it.
RelNode srcRel = selPair.getKey();
RowResolver selectOutputRR = selPair.getValue();
RowResolver inputRR = calcitePlannerAction.relToHiveRR.get(srcRel);

int srcRelRecordSz = srcRel.getRowType().getFieldCount();
boolean isBothByPos = HiveConf.getBoolVar(conf, ConfVars.HIVE_GROUPBY_ORDERBY_POSITION_ALIAS);
boolean isObyByPos = isBothByPos
|| HiveConf.getBoolVar(conf, ConfVars.HIVE_ORDERBY_POSITION_ALIAS);
// replace each of the position alias in ORDERBY with the actual column
if (ref != null && ref.getToken().getType() == HiveParser.Number) {
if (isObyByPos) {
return getFieldIndexFromColumnNumber(selectOutputRR, ref);
} else { // if not using position alias and it is a number.
LOG.warn("Using constant number "
+ ref.getText()
+ " in order by. If you try to use position alias when hive.orderby.position.alias is false, " +
"the position alias will be ignored.");
}
} else {
// 2.2 Convert ExprNode to RexNode
RexNode orderByExpression = getOrderByExpression(selectOutputRR, inputRR, ref);

// 2.3 Determine the index of ob expr in child schema
// NOTE: Calcite can not take compound exprs in OB without it being
// present in the child (& hence we add a child Project Rel)
if (orderByExpression instanceof RexInputRef) {
return ((RexInputRef) orderByExpression).getIndex();
} else {
int fieldIndex = srcRelRecordSz + newVCLst.size();
newVCLst.add(orderByExpression);
vcASTTypePairs.add(new Pair<>(ref, TypeConverter.convert(orderByExpression.getType())));
return fieldIndex;
}
}

return 0;
}

private RexNode getOrderByExpression(
RowResolver selectOutputRR, RowResolver inputRR, ASTNode ref)
throws SemanticException {
// first try to get it from select
// in case of udtf, selectOutputRR may be null.
RexNode orderByExpression = null;
if (selectOutputRR != null) {
try {
Map<ASTNode, RexNode> astToExprNDescMap = genAllRexNode(
ref, selectOutputRR, calcitePlannerAction.cluster.getRexBuilder());
orderByExpression = astToExprNDescMap.get(ref);
} catch (SemanticException ex) {
// we can tolerate this as this is the previous behavior
LOG.debug("Can not find column in " + ref.getText() + ". The error msg is "
+ ex.getMessage());
}
}
// then try to get it from all
if (orderByExpression == null) {
Map<ASTNode, RexNode> astToExprNDescMap = genAllRexNode(
ref, inputRR, calcitePlannerAction.cluster.getRexBuilder());
orderByExpression = astToExprNDescMap.get(ref);
}
if (orderByExpression == null) {
throw new SemanticException("Invalid order by expression: " + ref.toString());
}
return orderByExpression;
}

// SELECT a, b FROM t ORDER BY 1
private int getFieldIndexFromColumnNumber(RowResolver selectOutputRR, ASTNode ref) throws SemanticException {
int fieldIndex;
int pos = Integer.parseInt(ref.getText());
if (pos > 0 && pos <= selectOutputRR.getColumnInfos().size()) {
// fieldIndex becomes so simple
// Note that pos starts from 1 while fieldIndex starts from 0;
fieldIndex = pos - 1;
} else {
throw new SemanticException(
ErrorMsg.INVALID_POSITION_ALIAS_IN_ORDERBY.getMsg("Position alias: " + pos
+ " does not exist\n" + "The Select List is indexed from 1 to "
+ selectOutputRR.getColumnInfos().size()));
}
return fieldIndex;
}

OrderByRelBuilder addRelDistribution(ASTNode distributeByAST) throws SemanticException {
if (distributeByAST != null) {
Builder<Integer> keys = ImmutableList.builder();
for (int i = 0; i < distributeByAST.getChildCount(); ++i) {
ASTNode keyAST = (ASTNode) distributeByAST.getChild(i);
int fieldIndex = genSortByKey(keyAST);
keys.add(fieldIndex);
}
hiveRelDistribution = new HiveRelDistribution(RelDistribution.Type.HASH_DISTRIBUTED, keys.build());
} else {
// In case of SORT BY we do not need Distribution
// but the instance RelDistributions.ANY can not be used here because
// org.apache.calcite.rel.core.Exchange has
// assert distribution != RelDistributions.ANY;
hiveRelDistribution = new HiveRelDistribution(RelDistribution.Type.ANY, RelDistributions.ANY.getKeys());
}
return this;
}

OrderByRelBuilder addClusterBy(ASTNode clusterBy) throws SemanticException {
addRelDistribution(clusterBy);
for (Integer fieldIndex : hiveRelDistribution.getKeys()) {
fieldCollations.add(new RelFieldCollation(
fieldIndex, RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST));
}
return this;
}

// - Walk through OB exprs and extract field collations and additional virtual columns needed
// - Add Child Project Rel if needed,
// - Generate Output RR, input Sel Rel for top constraining Sel
private void genOBProject() throws SemanticException {
// selPair.getKey() is the operator right before OB
// selPair.getValue() is RR which only contains columns needed in result
// set. Extra columns needed by order by will be absent from it.
RelNode srcRel = selPair.getKey();
RowResolver inputRR = calcitePlannerAction.relToHiveRR.get(srcRel);

// 3. Add Child Project Rel if needed, Generate Output RR, input Sel Rel
// for top constraining Sel
obInputRel = srcRel;
if (!newVCLst.isEmpty()) {
List<RexNode> originalInputRefs = calcitePlannerAction.toRexNodeList(srcRel);
RowResolver obSyntheticProjectRR = new RowResolver();
if (!RowResolver.add(obSyntheticProjectRR, inputRR)) {
throw new CalciteSemanticException(
"Duplicates detected when adding columns to RR: see previous message",
UnsupportedFeature.Duplicates_in_RR);
}
int vcolPos = inputRR.getRowSchema().getSignature().size();
for (Pair<ASTNode, TypeInfo> astTypePair : vcASTTypePairs) {
obSyntheticProjectRR.putExpression(astTypePair.getKey(), new ColumnInfo(
SemanticAnalyzer.getColumnInternalName(vcolPos), astTypePair.getValue(), null,
false));
vcolPos++;
}
obInputRel = calcitePlannerAction.genSelectRelNode(CompositeList.of(originalInputRefs, newVCLst),
obSyntheticProjectRR, srcRel);

if (outermostOB) {
if (!RowResolver.add(outputRR, inputRR)) {
throw new CalciteSemanticException(
"Duplicates detected when adding columns to RR: see previous message",
UnsupportedFeature.Duplicates_in_RR);
}

} else {
if (!RowResolver.add(outputRR, obSyntheticProjectRR)) {
throw new CalciteSemanticException(
"Duplicates detected when adding columns to RR: see previous message",
UnsupportedFeature.Duplicates_in_RR);
}
}
} else {
if (!RowResolver.add(outputRR, inputRR)) {
throw new CalciteSemanticException(
"Duplicates detected when adding columns to RR: see previous message",
UnsupportedFeature.Duplicates_in_RR);
}
}
}

RelNode sortLimit(RexNode offsetRN, RexNode fetchRN) throws SemanticException {
genOBProject();

// 4. Construct SortRel
RelOptCluster cluster = calcitePlannerAction.cluster;
RelTraitSet traitSet = cluster.traitSetOf(HiveRelNode.CONVENTION);
RelCollation canonizedCollation = RelCollations.of(fieldCollations);
RelNode sortRel = new HiveSortLimit(cluster, traitSet, obInputRel, canonizedCollation, offsetRN, fetchRN);
return endGenOBLogicalPlan(sortRel);
}

RelNode sortExchange() throws SemanticException {
genOBProject();

RelCollation canonizedCollation = RelCollations.of(fieldCollations);
ImmutableList.Builder<RexNode> builder = ImmutableList.builder();
for (RelFieldCollation relFieldCollation : canonizedCollation.getFieldCollations()) {
int index = relFieldCollation.getFieldIndex();
builder.add(calcitePlannerAction.cluster.getRexBuilder().makeInputRef(obInputRel, index));
}
RelNode sortRel = HiveSortExchange.create(obInputRel, hiveRelDistribution, canonizedCollation, builder.build());
return endGenOBLogicalPlan(sortRel);
}

// 5. Update RR maps
// NOTE: Output RR for SortRel is considered same as its input; we may
// end up not using VC that is present in sort rel. Also note that
// rowtype of sortrel is the type of it child; if child happens to be
// synthetic project that we introduced then that projectrel would
// contain the vc.
private RelNode endGenOBLogicalPlan(RelNode sortRel) throws CalciteSemanticException {
ImmutableMap<String, Integer> hiveColNameCalcitePosMap =
calcitePlannerAction.buildHiveToCalciteColumnMap(outputRR);
calcitePlannerAction.relToHiveRR.put(sortRel, outputRR);
calcitePlannerAction.relToHiveColNameCalcitePosMap.put(sortRel, hiveColNameCalcitePosMap);

RowResolver selectOutputRR = selPair.getValue();
if (selectOutputRR != null) {
List<RexNode> originalInputRefs = calcitePlannerAction.toRexNodeList(selPair.getKey());
List<RexNode> selectedRefs = originalInputRefs.subList(0, selectOutputRR.getColumnInfos().size());
// We need to add select since order by schema may have more columns than result schema.
return calcitePlannerAction.genSelectRelNode(selectedRefs, selectOutputRR, sortRel);
}

return sortRel;
}
}
}
Loading

0 comments on commit f30c55e

Please sign in to comment.