diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 8c0e5cafe9f..8c29734a887 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.phoenix.call.CallRunner; import org.apache.phoenix.compile.BaseMutationPlan; import org.apache.phoenix.compile.CloseStatementCompiler; @@ -148,6 +149,7 @@ import org.apache.phoenix.parse.DeleteJarStatement; import org.apache.phoenix.parse.DeleteStatement; import org.apache.phoenix.parse.ExplainType; +import org.apache.phoenix.parse.PartitionIdParseNode; import org.apache.phoenix.parse.ShowCreateTableStatement; import org.apache.phoenix.parse.ShowCreateTable; import org.apache.phoenix.parse.DropColumnStatement; @@ -348,7 +350,7 @@ protected PhoenixResultSet executeQuery(final CompilableStatement stmt, final Qu } - private PhoenixResultSet executeQuery(final CompilableStatement stmt, + private PhoenixResultSet executeQuery(final CompilableStatement compilableStatement, final boolean doRetryOnMetaNotFoundError, final QueryLogger queryLogger, final boolean noCommit, boolean shouldValidateLastDdlTimestamp) @@ -367,6 +369,7 @@ private PhoenixResultSet executeQuery(final CompilableStatement stmt, clearResultSet(); PhoenixResultSet rs = null; QueryPlan plan = null; + CompilableStatement stmt = compilableStatement; try { PhoenixConnection conn = getConnection(); conn.checkOpen(); @@ -378,6 +381,28 @@ private PhoenixResultSet executeQuery(final CompilableStatement stmt, } plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE); + if (plan.getTableRef() != null && PTableType.CDC.equals( + plan.getTableRef().getTable().getType())) { + if (stmt instanceof ExecutableSelectStatement) { + ParseNode parseNode = + ((ExecutableSelectStatement) stmt).getWhere(); + List selectNodes = + ((ExecutableSelectStatement) stmt).getSelect(); + boolean queryPartitionIds = + CollectionUtils.isNotEmpty(selectNodes) + && selectNodes.size() == 1 + && selectNodes.get(0) + .getNode() instanceof PartitionIdParseNode; + if (!queryPartitionIds && + !CDCUtil.isPartitionIdIncludedInTree(parseNode)) { + stmt = parseStatement(CDCUtil.addPartitionInList(conn, + plan.getTableRef().getTable().toString(), + stmt.toString())); + plan = stmt.compilePlan(PhoenixStatement.this, + Sequence.ValueOp.VALIDATE_SEQUENCE); + } + } + } // Send mutations to hbase, so they are visible to subsequent reads. // Use original plan for data table so that data and immutable indexes will be sent // TODO: for joins, we need to iterate through all tables, but we need the original table, diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java index 2a897980418..9107dec14b0 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java @@ -18,10 +18,14 @@ package org.apache.phoenix.util; +import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; +import java.util.ArrayList; import java.util.Base64; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Set; @@ -30,9 +34,12 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.DescVarLengthFastByteComparisons; +import org.apache.phoenix.parse.ParseNode; +import org.apache.phoenix.parse.PartitionIdParseNode; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PVarchar; @@ -155,6 +162,73 @@ public static boolean isBinaryType(PDataType dataType) { || dataType.getSqlType() == PDataType.VARBINARY_ENCODED_TYPE); } + /** + * Return true if the parseNode or any of its children contains PARTITION_ID() function. + * + * @param parseNode The parseNode from Where clause. + * @return True if the parseNode or any of its children contains PARTITION_ID() + * function. False otherwise. + */ + public static boolean isPartitionIdIncludedInTree(ParseNode parseNode) { + if (parseNode instanceof PartitionIdParseNode) { + return true; + } + if (parseNode == null || CollectionUtils.isEmpty(parseNode.getChildren())) { + return false; + } + return parseNode.getChildren().stream() + .anyMatch(CDCUtil::isPartitionIdIncludedInTree); + } + + /** + * Add IN Operator for PARTITION_ID() so that the full table scan CDC query can be + * optimized to be range scan. + * + * @param conn The Connection. + * @param cdcName CDC Object name. + * @param query SQL Query Statement. + * @return Updated query including PartitionId with IN operator. + * @throws SQLException If the distinct partition ids retrieval fails. + */ + public static String addPartitionInList(final Connection conn, final String cdcName, + final String query) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("SELECT DISTINCT PARTITION_ID() FROM " + + cdcName); + List partitionIds = new ArrayList<>(); + while (rs.next()) { + partitionIds.add(rs.getString(1)); + } + if (partitionIds.isEmpty()) { + return query; + } + StringBuilder builder; + boolean queryHasWhere = query.contains(" WHERE "); + if (queryHasWhere) { + builder = new StringBuilder(query); + builder.append(" AND PARTITION_ID() IN ("); + } else { + builder = new StringBuilder(query.split(cdcName)[0]); + builder.append(cdcName); + builder.append(" WHERE PARTITION_ID() IN ("); + } + boolean initialized = false; + for (String partitionId : partitionIds) { + if (!initialized) { + builder.append("'"); + initialized = true; + } else { + builder.append(",'"); + } + builder.append(partitionId); + builder.append("'"); + } + builder.append(")"); + if (!queryHasWhere) { + builder.append(query.split(cdcName)[1]); + } + return builder.toString(); + } + public enum CdcStreamStatus { ENABLED("ENABLED"), ENABLING("ENABLING"), diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java index a1519cc64bf..37b7fb9d203 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java @@ -78,12 +78,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -// NOTE: To debug the query execution, add the below condition or the equivalent where you need a -// breakpoint. -// if (.getTableName().getString().equals("N000002") || -//
.getTableName().getString().equals("__CDC__N000002")) { -// "".isEmpty(); -// } @RunWith(Parameterized.class) @Category(NeedsOwnMiniClusterTest.class) public class CDCQueryIT extends CDCBaseIT { @@ -255,36 +249,11 @@ private static String getCDCQuery(String cdcName, String[] partitionId) { return query.toString(); } - private static String addPartitionInList(Connection conn, String cdcName, String query) - throws SQLException{ - ResultSet rs = conn.createStatement().executeQuery("SELECT DISTINCT PARTITION_ID() FROM " - + cdcName); - List partitionIds = new ArrayList<>(); - while (rs.next()) { - partitionIds.add(rs.getString(1)); - } - StringBuilder builder = new StringBuilder(query); - builder.append(" WHERE PARTITION_ID() IN ("); - boolean initialized = false; - for (String partitionId : partitionIds) { - if (!initialized) { - builder.append("'"); - initialized = true; - } else { - builder.append(",'"); - } - builder.append(partitionId); - builder.append("'"); - } - builder.append(")"); - return builder.toString(); - } - - private static PreparedStatement getCDCQueryPreparedStatement(Connection conn, String cdcName, + private static PreparedStatement getCDCQueryPreparedStatement(Connection conn, String query, long minTimestamp, long maxTimestamp) throws SQLException { - StringBuilder builder = new StringBuilder(addPartitionInList(conn, cdcName, query)); - builder.append(" AND PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() < ?"); + StringBuilder builder = new StringBuilder(query); + builder.append(" WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() < ?"); PreparedStatement statement = conn.prepareStatement(builder.toString()); statement.setTimestamp(1, new Timestamp(minTimestamp)); statement.setTimestamp(2, new Timestamp(maxTimestamp)); @@ -333,10 +302,11 @@ public void testSelectCDC() throws Exception { try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. dumpCDCResults(conn, cdcName, - new TreeMap() {{ put("K", "INTEGER"); }}, - addPartitionInList(conn, cdcFullName, - "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," - + "\"CDC JSON\" FROM " + cdcFullName)); + new TreeMap() {{ + put("K", "INTEGER"); + }}, + "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + + "\"CDC JSON\" FROM " + cdcFullName); // Existence of an CDC index hint shouldn't cause the regular query path to fail. // Run the same query with a CDC index hit and without it and make sure we get the same @@ -368,21 +338,21 @@ public void testSelectCDC() throws Exception { put("V2", "INTEGER"); put("B.VB", "INTEGER"); }}; - verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, cdcFullName, + verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName, startTS, endTS).executeQuery(), datatableName, dataColumns, changes, CHANGE_IMG); - verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, cdcFullName, + verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, "SELECT /*+ CDC_INCLUDE(CHANGE) */ PHOENIX_ROW_TIMESTAMP(), K," + "\"CDC JSON\" FROM " + cdcFullName, startTS, endTS) .executeQuery(), datatableName, dataColumns, changes, CHANGE_IMG); - verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, cdcFullName, + verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName, startTS, endTS).executeQuery(), datatableName, dataColumns, changes, PRE_POST_IMG); - verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, cdcFullName, + verifyChangesViaSCN(tenantId, getCDCQueryPreparedStatement(conn, "SELECT * FROM " + cdcFullName,startTS, endTS).executeQuery(), datatableName, dataColumns, changes, new HashSet<>()); @@ -474,8 +444,8 @@ public void testSelectGeneric() throws Exception { String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. - dumpCDCResults(conn, cdcName, pkColumns, addPartitionInList(conn, cdcFullName, - "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName)); + dumpCDCResults(conn, cdcName, pkColumns, + "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName); List changes = new ArrayList<>(); for (Set batch: allBatches.get(tenantId)) { @@ -487,19 +457,16 @@ public void testSelectGeneric() throws Exception { Thread.sleep(nextTime - currentTime); } verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery( - addPartitionInList(conn, cdcFullName,"SELECT * FROM " + cdcFullName)), + "SELECT * FROM " + cdcFullName), datatableName, dataColumns, changes, CHANGE_IMG); verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery( - addPartitionInList(conn, cdcFullName, - "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName)), + "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName), datatableName, dataColumns, changes, CHANGE_IMG); verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery( - addPartitionInList(conn, cdcFullName, - "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName)), + "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName), datatableName, dataColumns, changes, PRE_POST_IMG); verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery( - addPartitionInList(conn, cdcFullName, - "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * FROM " + cdcFullName)), + "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * FROM " + cdcFullName), datatableName, dataColumns, changes, ALL_IMG); cdcIndexShouldNotBeUsedForDataTableQueries(conn, tableName, cdcName); checkIndexPartitionIdCount(conn, tableName, cdcFullName);