From b19dc8835bbd7936111f4582877f8611bd8be119 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Fri, 31 Jan 2025 17:56:29 -0800 Subject: [PATCH 1/3] PHOENIX-7489 Add all partition ids internally to optimize full CDC Index scan queries --- .../apache/phoenix/jdbc/PhoenixStatement.java | 97 ++++++++++++++++++- .../apache/phoenix/end2end/CDCQueryIT.java | 59 +++-------- 2 files changed, 109 insertions(+), 47 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 8c0e5cafe9f..529699af93a 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -48,6 +48,7 @@ import java.io.IOException; import java.io.Reader; import java.sql.BatchUpdateException; +import java.sql.Connection; import java.sql.ParameterMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -75,6 +76,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.phoenix.call.CallRunner; import org.apache.phoenix.compile.BaseMutationPlan; import org.apache.phoenix.compile.CloseStatementCompiler; @@ -148,6 +150,7 @@ import org.apache.phoenix.parse.DeleteJarStatement; import org.apache.phoenix.parse.DeleteStatement; import org.apache.phoenix.parse.ExplainType; +import org.apache.phoenix.parse.PartitionIdParseNode; import org.apache.phoenix.parse.ShowCreateTableStatement; import org.apache.phoenix.parse.ShowCreateTable; import org.apache.phoenix.parse.DropColumnStatement; @@ -348,7 +351,7 @@ protected PhoenixResultSet executeQuery(final CompilableStatement stmt, final Qu } - private PhoenixResultSet executeQuery(final CompilableStatement stmt, + private PhoenixResultSet executeQuery(final CompilableStatement compilableStatement, final boolean doRetryOnMetaNotFoundError, final QueryLogger queryLogger, final boolean noCommit, boolean shouldValidateLastDdlTimestamp) @@ -367,6 +370,7 @@ private PhoenixResultSet executeQuery(final CompilableStatement stmt, clearResultSet(); PhoenixResultSet rs = null; QueryPlan plan = null; + CompilableStatement stmt = compilableStatement; try { PhoenixConnection conn = getConnection(); conn.checkOpen(); @@ -378,6 +382,28 @@ private PhoenixResultSet executeQuery(final CompilableStatement stmt, } plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE); + if (plan.getTableRef() != null && PTableType.CDC.equals( + plan.getTableRef().getTable().getType())) { + if (stmt instanceof ExecutableSelectStatement) { + ParseNode parseNode = + ((ExecutableSelectStatement) stmt).getWhere(); + List selectNodes = + ((ExecutableSelectStatement) stmt).getSelect(); + boolean queryPartitionIds = + CollectionUtils.isNotEmpty(selectNodes) + && selectNodes.size() == 1 + && selectNodes.get(0) + .getNode() instanceof PartitionIdParseNode; + if (!queryPartitionIds && + !isPartitionIdIncludedInTree(parseNode)) { + stmt = parseStatement(addPartitionInList(conn, + plan.getTableRef().getTable().toString(), + stmt)); + plan = stmt.compilePlan(PhoenixStatement.this, + Sequence.ValueOp.VALIDATE_SEQUENCE); + } + } + } // Send mutations to hbase, so they are visible to subsequent reads. // Use original plan for data table so that data and immutable indexes will be sent // TODO: for joins, we need to iterate through all tables, but we need the original table, @@ -555,6 +581,75 @@ private PhoenixResultSet executeQuery(final CompilableStatement stmt, } } + /** + * Add IN Operator for PARTITION_ID() so that the full table scan CDC query can be + * optimized to be range scan. + * + * @param conn The Connection. + * @param cdcName CDC Object name. + * @param stmt Compilable Statement object. + * @return Updated query including PartitionId with IN operator. + * @throws SQLException If the distinct partition ids retrival fails. + */ + private static String addPartitionInList(Connection conn, String cdcName, + CompilableStatement stmt) + throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("SELECT DISTINCT PARTITION_ID() FROM " + + cdcName); + List partitionIds = new ArrayList<>(); + while (rs.next()) { + partitionIds.add(rs.getString(1)); + } + String query = stmt.toString(); + if (partitionIds.isEmpty()) { + return query; + } + StringBuilder builder; + boolean queryHasWhere = query.contains(" WHERE "); + if (queryHasWhere) { + builder = new StringBuilder(query); + builder.append(" AND PARTITION_ID() IN ("); + } else { + builder = new StringBuilder(query.split(cdcName)[0]); + builder.append(cdcName); + builder.append(" WHERE PARTITION_ID() IN ("); + } + boolean initialized = false; + for (String partitionId : partitionIds) { + if (!initialized) { + builder.append("'"); + initialized = true; + } else { + builder.append(",'"); + } + builder.append(partitionId); + builder.append("'"); + } + builder.append(")"); + if (!queryHasWhere) { + builder.append(query.split(cdcName)[1]); + } + return builder.toString(); + } + + /** + * Return true if the parseNode or any of its children contains PARTITION_ID() function. + * + * @param parseNode The parseNode from Where clause. + * @return True if the parseNode or any of its children contains PARTITION_ID() + * function. False otherwise. + */ + private static boolean isPartitionIdIncludedInTree(ParseNode parseNode) { + if (parseNode instanceof PartitionIdParseNode) { + return true; + } + if (parseNode == null || CollectionUtils.isEmpty(parseNode.getChildren())) { + return false; + } + return parseNode.getChildren().stream() + .anyMatch(PhoenixStatement::isPartitionIdIncludedInTree); + } + public String getTargetForAudit(CompilableStatement stmt) { String target = null; try { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java index a1519cc64bf..36c7e0548aa 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java @@ -78,12 +78,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -// NOTE: To debug the query execution, add the below condition or the equivalent where you need a -// breakpoint. -// if (.getTableName().getString().equals("N000002") || -//
.getTableName().getString().equals("__CDC__N000002")) { -// "".isEmpty(); -// } @RunWith(Parameterized.class) @Category(NeedsOwnMiniClusterTest.class) public class CDCQueryIT extends CDCBaseIT { @@ -255,36 +249,11 @@ private static String getCDCQuery(String cdcName, String[] partitionId) { return query.toString(); } - private static String addPartitionInList(Connection conn, String cdcName, String query) - throws SQLException{ - ResultSet rs = conn.createStatement().executeQuery("SELECT DISTINCT PARTITION_ID() FROM " - + cdcName); - List partitionIds = new ArrayList<>(); - while (rs.next()) { - partitionIds.add(rs.getString(1)); - } - StringBuilder builder = new StringBuilder(query); - builder.append(" WHERE PARTITION_ID() IN ("); - boolean initialized = false; - for (String partitionId : partitionIds) { - if (!initialized) { - builder.append("'"); - initialized = true; - } else { - builder.append(",'"); - } - builder.append(partitionId); - builder.append("'"); - } - builder.append(")"); - return builder.toString(); - } - private static PreparedStatement getCDCQueryPreparedStatement(Connection conn, String cdcName, String query, long minTimestamp, long maxTimestamp) throws SQLException { - StringBuilder builder = new StringBuilder(addPartitionInList(conn, cdcName, query)); - builder.append(" AND PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() < ?"); + StringBuilder builder = new StringBuilder(query); + builder.append(" WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() < ?"); PreparedStatement statement = conn.prepareStatement(builder.toString()); statement.setTimestamp(1, new Timestamp(minTimestamp)); statement.setTimestamp(2, new Timestamp(maxTimestamp)); @@ -333,10 +302,11 @@ public void testSelectCDC() throws Exception { try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. dumpCDCResults(conn, cdcName, - new TreeMap() {{ put("K", "INTEGER"); }}, - addPartitionInList(conn, cdcFullName, - "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," - + "\"CDC JSON\" FROM " + cdcFullName)); + new TreeMap() {{ + put("K", "INTEGER"); + }}, + "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + + "\"CDC JSON\" FROM " + cdcFullName); // Existence of an CDC index hint shouldn't cause the regular query path to fail. // Run the same query with a CDC index hit and without it and make sure we get the same @@ -474,8 +444,8 @@ public void testSelectGeneric() throws Exception { String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. - dumpCDCResults(conn, cdcName, pkColumns, addPartitionInList(conn, cdcFullName, - "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName)); + dumpCDCResults(conn, cdcName, pkColumns, + "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName); List changes = new ArrayList<>(); for (Set batch: allBatches.get(tenantId)) { @@ -487,19 +457,16 @@ public void testSelectGeneric() throws Exception { Thread.sleep(nextTime - currentTime); } verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery( - addPartitionInList(conn, cdcFullName,"SELECT * FROM " + cdcFullName)), + "SELECT * FROM " + cdcFullName), datatableName, dataColumns, changes, CHANGE_IMG); verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery( - addPartitionInList(conn, cdcFullName, - "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName)), + "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName), datatableName, dataColumns, changes, CHANGE_IMG); verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery( - addPartitionInList(conn, cdcFullName, - "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName)), + "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName), datatableName, dataColumns, changes, PRE_POST_IMG); verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery( - addPartitionInList(conn, cdcFullName, - "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * FROM " + cdcFullName)), + "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * FROM " + cdcFullName), datatableName, dataColumns, changes, ALL_IMG); cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, cdcName); checkIndexPartitionIdCount(conn, tableName, cdcFullName); From ad971fd627aacda4ee908fe015e3ec423afaed1e Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Sat, 1 Feb 2025 12:01:30 -0800 Subject: [PATCH 2/3] refactor --- .../apache/phoenix/jdbc/PhoenixStatement.java | 76 +------------------ .../java/org/apache/phoenix/util/CDCUtil.java | 74 ++++++++++++++++++ .../apache/phoenix/end2end/CDCQueryIT.java | 10 +-- 3 files changed, 82 insertions(+), 78 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 529699af93a..8c29734a887 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -48,7 +48,6 @@ import java.io.IOException; import java.io.Reader; import java.sql.BatchUpdateException; -import java.sql.Connection; import java.sql.ParameterMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -395,10 +394,10 @@ private PhoenixResultSet executeQuery(final CompilableStatement compilableStatem && selectNodes.get(0) .getNode() instanceof PartitionIdParseNode; if (!queryPartitionIds && - !isPartitionIdIncludedInTree(parseNode)) { - stmt = parseStatement(addPartitionInList(conn, + !CDCUtil.isPartitionIdIncludedInTree(parseNode)) { + stmt = parseStatement(CDCUtil.addPartitionInList(conn, plan.getTableRef().getTable().toString(), - stmt)); + stmt.toString())); plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE); } @@ -581,75 +580,6 @@ private PhoenixResultSet executeQuery(final CompilableStatement compilableStatem } } - /** - * Add IN Operator for PARTITION_ID() so that the full table scan CDC query can be - * optimized to be range scan. - * - * @param conn The Connection. - * @param cdcName CDC Object name. - * @param stmt Compilable Statement object. - * @return Updated query including PartitionId with IN operator. - * @throws SQLException If the distinct partition ids retrival fails. - */ - private static String addPartitionInList(Connection conn, String cdcName, - CompilableStatement stmt) - throws SQLException { - ResultSet rs = conn.createStatement().executeQuery("SELECT DISTINCT PARTITION_ID() FROM " - + cdcName); - List partitionIds = new ArrayList<>(); - while (rs.next()) { - partitionIds.add(rs.getString(1)); - } - String query = stmt.toString(); - if (partitionIds.isEmpty()) { - return query; - } - StringBuilder builder; - boolean queryHasWhere = query.contains(" WHERE "); - if (queryHasWhere) { - builder = new StringBuilder(query); - builder.append(" AND PARTITION_ID() IN ("); - } else { - builder = new StringBuilder(query.split(cdcName)[0]); - builder.append(cdcName); - builder.append(" WHERE PARTITION_ID() IN ("); - } - boolean initialized = false; - for (String partitionId : partitionIds) { - if (!initialized) { - builder.append("'"); - initialized = true; - } else { - builder.append(",'"); - } - builder.append(partitionId); - builder.append("'"); - } - builder.append(")"); - if (!queryHasWhere) { - builder.append(query.split(cdcName)[1]); - } - return builder.toString(); - } - - /** - * Return true if the parseNode or any of its children contains PARTITION_ID() function. - * - * @param parseNode The parseNode from Where clause. - * @return True if the parseNode or any of its children contains PARTITION_ID() - * function. False otherwise. - */ - private static boolean isPartitionIdIncludedInTree(ParseNode parseNode) { - if (parseNode instanceof PartitionIdParseNode) { - return true; - } - if (parseNode == null || CollectionUtils.isEmpty(parseNode.getChildren())) { - return false; - } - return parseNode.getChildren().stream() - .anyMatch(PhoenixStatement::isPartitionIdIncludedInTree); - } - public String getTargetForAudit(CompilableStatement stmt) { String target = null; try { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java index 2a897980418..eb0e8cd2ecd 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java @@ -18,10 +18,14 @@ package org.apache.phoenix.util; +import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; +import java.util.ArrayList; import java.util.Base64; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Set; @@ -30,9 +34,12 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.DescVarLengthFastByteComparisons; +import org.apache.phoenix.parse.ParseNode; +import org.apache.phoenix.parse.PartitionIdParseNode; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PVarchar; @@ -155,6 +162,73 @@ public static boolean isBinaryType(PDataType dataType) { || dataType.getSqlType() == PDataType.VARBINARY_ENCODED_TYPE); } + /** + * Return true if the parseNode or any of its children contains PARTITION_ID() function. + * + * @param parseNode The parseNode from Where clause. + * @return True if the parseNode or any of its children contains PARTITION_ID() + * function. False otherwise. + */ + public static boolean isPartitionIdIncludedInTree(ParseNode parseNode) { + if (parseNode instanceof PartitionIdParseNode) { + return true; + } + if (parseNode == null || CollectionUtils.isEmpty(parseNode.getChildren())) { + return false; + } + return parseNode.getChildren().stream() + .anyMatch(CDCUtil::isPartitionIdIncludedInTree); + } + + /** + * Add IN Operator for PARTITION_ID() so that the full table scan CDC query can be + * optimized to be range scan. + * + * @param conn The Connection. + * @param cdcName CDC Object name. + * @param query SQL Query Statement. + * @return Updated query including PartitionId with IN operator. + * @throws SQLException If the distinct partition ids retrival fails. + */ + public static String addPartitionInList(final Connection conn, final String cdcName, + final String query) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("SELECT DISTINCT PARTITION_ID() FROM " + + cdcName); + List partitionIds = new ArrayList<>(); + while (rs.next()) { + partitionIds.add(rs.getString(1)); + } + if (partitionIds.isEmpty()) { + return query; + } + StringBuilder builder; + boolean queryHasWhere = query.contains(" WHERE "); + if (queryHasWhere) { + builder = new StringBuilder(query); + builder.append(" AND PARTITION_ID() IN ("); + } else { + builder = new StringBuilder(query.split(cdcName)[0]); + builder.append(cdcName); + builder.append(" WHERE PARTITION_ID() IN ("); + } + boolean initialized = false; + for (String partitionId : partitionIds) { + if (!initialized) { + builder.append("'"); + initialized = true; + } else { + builder.append(",'"); + } + builder.append(partitionId); + builder.append("'"); + } + builder.append(")"); + if (!queryHasWhere) { + builder.append(query.split(cdcName)[1]); + } + return builder.toString(); + } + public enum CdcStreamStatus { ENABLED("ENABLED"), ENABLING("ENABLING"), diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java index 36c7e0548aa..37b7fb9d203 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java @@ -249,7 +249,7 @@ private static String getCDCQuery(String cdcName, String[] partitionId) { return query.toString(); } - private static PreparedStatement getCDCQueryPreparedStatement(Connection conn, String cdcName, + private static PreparedStatement getCDCQueryPreparedStatement(Connection conn, String query, long minTimestamp, long maxTimestamp) throws SQLException { StringBuilder builder = new StringBuilder(query); @@ -338,21 +338,21 @@ public void testSelectCDC() throws Exception { put("V2", "INTEGER"); put("B.VB", "INTEGER"); }}; - verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, cdcFullName, + verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName, startTS, endTS).executeQuery(), datatableName, dataColumns, changes, CHANGE_IMG); - verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, cdcFullName, + verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, "SELECT /*+ CDC_INCLUDE(CHANGE) */ PHOENIX_ROW_TIMESTAMP(), K," + "\"CDC JSON\" FROM " + cdcFullName, startTS, endTS) .executeQuery(), datatableName, dataColumns, changes, CHANGE_IMG); - verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, cdcFullName, + verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName, startTS, endTS).executeQuery(), datatableName, dataColumns, changes, PRE_POST_IMG); - verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, cdcFullName, + verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, "SELECT * FROM " + cdcFullName,startTS, endTS).executeQuery(), datatableName, dataColumns, changes, new HashSet<>()); From b8b6e0bd367dd9c28a56999689ff2c25cb68cf41 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Sat, 1 Feb 2025 12:02:59 -0800 Subject: [PATCH 3/3] minor change --- .../src/main/java/org/apache/phoenix/util/CDCUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java index eb0e8cd2ecd..9107dec14b0 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java @@ -188,7 +188,7 @@ public static boolean isPartitionIdIncludedInTree(ParseNode parseNode) { * @param cdcName CDC Object name. * @param query SQL Query Statement. * @return Updated query including PartitionId with IN operator. - * @throws SQLException If the distinct partition ids retrival fails. + * @throws SQLException If the distinct partition ids retrieval fails. */ public static String addPartitionInList(final Connection conn, final String cdcName, final String query) throws SQLException {