diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java new file mode 100644 index 00000000000..80cfbc718d8 --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.MasterObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.util.CDCUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME; + +/** + * Master Coprocessor for Phoenix. + */ +public class PhoenixMasterObserver implements MasterObserver, MasterCoprocessor { + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixMasterObserver.class); + + private static final String STREAM_STATUS_QUERY + = "SELECT STREAM_NAME FROM " + SYSTEM_CDC_STREAM_STATUS_NAME + + " WHERE TABLE_NAME = ? AND STREAM_STATUS='" + + CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue() + "'"; + + // tableName, streamName, partitionId, parentId, startTime, endTime, startKey, endKey + private static final String PARTITION_UPSERT_SQL + = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES (?,?,?,?,?,?,?,?)"; + + private static final String PARENT_PARTITION_QUERY + = "SELECT PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME + + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? "; + + private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL + = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, STREAM_NAME, PARTITION_ID, " + + "PARTITION_END_TIME) VALUES (?,?,?,?)"; + + @Override + public Optional getMasterObserver() { + return Optional.of(this); + } + + /** + * Update parent -> daughter relationship for CDC Streams. + * - find parent partition id using start/end keys of daughters + * - upsert partition metadata for the 2 daughters + * - update the end time on the parent's partition metadata + * @param c the environment to interact with the framework and master + * @param regionInfoA the left daughter region + * @param regionInfoB the right daughter region + */ + @Override + public void postCompletedSplitRegionAction(final ObserverContext c, + final RegionInfo regionInfoA, + final RegionInfo regionInfoB) { + Configuration conf = c.getEnvironment().getConfiguration(); + try { + Connection conn = QueryUtil.getConnectionOnServer(conf); + // CDC will be enabled on Phoenix tables only + PTable phoenixTable = getPhoenixTable(conn, regionInfoA.getTable()); + if (phoenixTable == null) { + LOGGER.info("{} is not a Phoenix Table, skipping partition metadata update.", + regionInfoA.getTable()); + return; + } + // find streamName with ENABLED status + String tableName = phoenixTable.getName().getString(); + PreparedStatement pstmt = conn.prepareStatement(STREAM_STATUS_QUERY); + pstmt.setString(1, tableName); + ResultSet rs = pstmt.executeQuery(); + if (rs.next()) { + String streamName = rs.getString(1); + LOGGER.info("Updating partition metadata for table={}, stream={} daughters {} {}", + tableName, streamName, regionInfoA.getEncodedName(), regionInfoB.getEncodedName()); + String parentPartitionID = getParentPartitionId(conn, tableName, streamName, regionInfoA, regionInfoB); + upsertDaughterPartition(conn, tableName, streamName, parentPartitionID, regionInfoA); + upsertDaughterPartition(conn, tableName, streamName, parentPartitionID, regionInfoB); + updateParentPartitionEndTime(conn, tableName, streamName, parentPartitionID, regionInfoA.getRegionId()); + } + } catch (SQLException e) { + LOGGER.error("Unable to update CDC Stream Partition metadata during split with daughter regions: {} {}", + regionInfoA.getEncodedName(), regionInfoB.getEncodedName(), e); + } + } + + private PTable getPhoenixTable(Connection conn, TableName tableName) throws SQLException { + PTable pTable; + try { + pTable = PhoenixRuntime.getTable(conn, tableName.toString()); + } catch (TableNotFoundException e) { + return null; + } + return pTable; + } + + /** + * Lookup parent's partition id (region's encoded name) in SYSTEM.CDC_STREAM. + * RegionInfoA is left daughter and RegionInfoB is right daughter so parent's key range would + * be [RegionInfoA stratKey, RegionInfoB endKey] + */ + private String getParentPartitionId(Connection conn, String tableName, String streamName, + RegionInfo regionInfoA, RegionInfo regionInfoB) + throws SQLException { + byte[] parentStartKey = regionInfoA.getStartKey(); + byte[] parentEndKey = regionInfoB.getEndKey(); + + StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY); + if (parentStartKey.length == 0) { + qb.append(" AND PARTITION_START_KEY IS NULL "); + } else { + qb.append(" AND PARTITION_START_KEY = ? "); + } + if (parentEndKey.length == 0) { + qb.append(" AND PARTITION_END_KEY IS NULL "); + } else { + qb.append(" AND PARTITION_END_KEY = ? "); + } + + PreparedStatement pstmt = conn.prepareStatement(qb.toString()); + int index = 1; + pstmt.setString(index++, tableName); + pstmt.setString(index++, streamName); + if (parentStartKey.length > 0) pstmt.setBytes(index++, parentStartKey); + if (parentEndKey.length > 0) pstmt.setBytes(index++, parentEndKey); + LOGGER.info("Query to get parent partition id: " + pstmt); + ResultSet rs = pstmt.executeQuery(); + if (rs.next()) { + return rs.getString(1); + } else { + throw new SQLException(String.format("Could not find parent of the provided daughters: " + + "startKeyA=%s endKeyA=%s startKeyB=%s endKeyB=%s", + Bytes.toStringBinary(regionInfoA.getStartKey()), + Bytes.toStringBinary(regionInfoA.getEndKey()), + Bytes.toStringBinary(regionInfoB.getStartKey()), + Bytes.toStringBinary(regionInfoB.getEndKey()))); + } + } + + /** + * Insert partition metadata for a daughter region from the split. + */ + private void upsertDaughterPartition(Connection conn, String tableName, + String streamName, String parentPartitionID, + RegionInfo regionInfo) + throws SQLException { + String partitionId = regionInfo.getEncodedName(); + long startTime = regionInfo.getRegionId(); + byte[] startKey = regionInfo.getStartKey(); + byte[] endKey = regionInfo.getEndKey(); + PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL); + pstmt.setString(1, tableName); + pstmt.setString(2, streamName); + pstmt.setString(3, partitionId); + pstmt.setString(4, parentPartitionID); + pstmt.setLong(5, startTime); + // endTime in not set when inserting a new partition + pstmt.setNull(6, Types.BIGINT); + pstmt.setBytes(7, startKey.length == 0 ? null : startKey); + pstmt.setBytes(8, endKey.length == 0 ? null : endKey); + pstmt.executeUpdate(); + conn.commit(); + } + + /** + * Update parent partition's endTime by setting it to daughter's startTime. + */ + private void updateParentPartitionEndTime(Connection conn, String tableName, + String streamName, String parentPartitionID, + long daughterStartTime) throws SQLException { + PreparedStatement pstmt = conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL); + pstmt.setString(1, tableName); + pstmt.setString(2, streamName); + pstmt.setString(3, parentPartitionID); + pstmt.setLong(4, daughterStartTime); + pstmt.executeUpdate(); + conn.commit(); + } +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java index c84853481c5..de709e8aeff 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/tasks/CdcStreamPartitionMetadataTask.java @@ -37,6 +37,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; +import java.sql.Types; import java.util.List; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME; @@ -55,9 +56,8 @@ public class CdcStreamPartitionMetadataTask extends BaseTask { private static final String CDC_STREAM_STATUS_UPSERT_SQL = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)"; - // parent_partition_id will be null, set partition_end_time to -1 private static final String CDC_STREAM_PARTITION_UPSERT_SQL - = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES (?,?,?,null,?,-1,?,?)"; + = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES (?,?,?,?,?,?,?,?)"; @Override public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) { @@ -136,13 +136,16 @@ private void upsertPartitionMetadata(PhoenixConnection pconn, String tableName, throws SQLException { try (PreparedStatement ps = pconn.prepareStatement(CDC_STREAM_PARTITION_UPSERT_SQL)) { for (HRegionLocation tableRegion : tableRegions) { - RegionInfo ri = tableRegion.getRegionInfo(); + // set parent_partition_id, partition_end_time to null + RegionInfo ri = tableRegion.getRegion(); ps.setString(1, tableName); ps.setString(2, streamName); ps.setString(3, ri.getEncodedName()); - ps.setLong(4, ri.getRegionId()); - ps.setBytes(5, ri.getStartKey()); - ps.setBytes(6, ri.getEndKey()); + ps.setNull(4, Types.VARCHAR); + ps.setLong(5, ri.getRegionId()); + ps.setNull(6, Types.BIGINT); + ps.setBytes(7, ri.getStartKey()); + ps.setBytes(8, ri.getEndKey()); ps.executeUpdate(); } pconn.commit(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java index 461ee8a7c04..d70821bb94c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java @@ -18,14 +18,22 @@ package org.apache.phoenix.end2end; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.PhoenixMasterObserver; import org.apache.phoenix.coprocessor.TaskRegionObserver; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; @@ -40,6 +48,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -47,9 +56,10 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME; import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -@Category(ParallelStatsDisabledTest.class) +@Category(NeedsOwnMiniClusterTest.class) public class CDCStreamIT extends CDCBaseIT { private static RegionCoprocessorEnvironment TaskRegionEnvironment; @@ -62,6 +72,7 @@ public static synchronized void doSetup() throws Exception { Long.toString(Long.MAX_VALUE)); props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); + props.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName()); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); TaskRegionEnvironment = getUtility() @@ -71,6 +82,7 @@ public static synchronized void doSetup() throws Exception { .get(0).getCoprocessorHost() .findCoprocessorEnvironment(TaskRegionObserver.class.getName()); } + @Test public void testStreamPartitionMetadataBootstrap() throws Exception { Connection conn = newConnection(); @@ -149,9 +161,172 @@ public void testOnlyOneStreamAllowed() throws Exception { assertStreamStatus(conn, tableName, streamName, CDCUtil.CdcStreamStatus.ENABLING); } + /** + * Split the only region of the table with empty start key and empty end key. + */ + @Test + public void testPartitionMetadataTableWithSingleRegionSplits() throws Exception { + // create table, cdc and bootstrap stream metadata + Connection conn = newConnection(); + String tableName = generateUniqueName(); + createTableAndEnableCDC(conn, tableName); + + //split the only region somewhere in the middle + splitTable(conn, tableName, Bytes.toBytes("m")); + + //check partition metadata - daughter regions are inserted and parent's end time is updated. + ResultSet rs = conn.createStatement().executeQuery( + "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'"); + PartitionMetadata parent = null; + List daughters = new ArrayList<>(); + while (rs.next()) { + PartitionMetadata pm = new PartitionMetadata(rs); + // parent which was split + if (pm.endTime > 0) { + parent = pm; + } else { + daughters.add(pm); + } + } + assertNotNull(parent); + assertEquals(2, daughters.size()); + assertEquals(daughters.get(0).startTime, parent.endTime); + assertEquals(daughters.get(1).startTime, parent.endTime); + assertEquals(parent.partitionId, daughters.get(0).parentPartitionId); + assertEquals(parent.partitionId, daughters.get(1).parentPartitionId); + assertTrue(daughters.stream().anyMatch(d -> d.startKey == null && d.endKey != null && d.endKey[0] == 'm')); + assertTrue(daughters.stream().anyMatch(d -> d.endKey == null && d.startKey != null && d.startKey[0] == 'm')); + } + + /** + * Split the first region of the table with empty start key. + */ + @Test + public void testPartitionMetadataFirstRegionSplits() throws Exception { + // create table, cdc and bootstrap stream metadata + Connection conn = newConnection(); + String tableName = generateUniqueName(); + createTableAndEnableCDC(conn, tableName); + + //split the only region [null, null] + splitTable(conn, tableName, Bytes.toBytes("l")); + // we have 2 regions - [null, l], [l, null], split the first region + splitTable(conn, tableName, Bytes.toBytes("d")); + ResultSet rs = conn.createStatement().executeQuery( + "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'"); + PartitionMetadata grandparent = null, splitParent = null, unSplitParent = null; + List daughters = new ArrayList<>(); + while (rs.next()) { + PartitionMetadata pm = new PartitionMetadata(rs); + if (pm.endTime > 0) { + if (pm.startKey == null && pm.endKey == null) { + grandparent = pm; + } else { + splitParent = pm; + } + } else if (pm.endKey == null) { + unSplitParent = pm; + } else { + daughters.add(pm); + } + } + assertNotNull(grandparent); + assertNotNull(unSplitParent); + assertNotNull(splitParent); + assertEquals(2, daughters.size()); + assertEquals(daughters.get(0).startTime, splitParent.endTime); + assertEquals(daughters.get(1).startTime, splitParent.endTime); + assertEquals(splitParent.partitionId, daughters.get(0).parentPartitionId); + assertEquals(splitParent.partitionId, daughters.get(1).parentPartitionId); + assertTrue(daughters.stream().anyMatch(d -> d.startKey == null && d.endKey != null && d.endKey[0] == 'd')); + assertTrue(daughters.stream().anyMatch(d -> d.startKey != null && d.startKey[0] == 'd' && d.endKey[0] == 'l')); + } + + /** + * Split the last region of the table with empty end key. + */ + @Test + public void testPartitionMetadataLastRegionSplits() throws Exception { + // create table, cdc and bootstrap stream metadata + Connection conn = newConnection(); + String tableName = generateUniqueName(); + createTableAndEnableCDC(conn, tableName); + + //split the only region [null, null] + splitTable(conn, tableName, Bytes.toBytes("l")); + // we have 2 regions - [null, l], [l, null], split the second region + splitTable(conn, tableName, Bytes.toBytes("q")); + ResultSet rs = conn.createStatement().executeQuery( + "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'"); + PartitionMetadata grandparent = null, splitParent = null, unSplitParent = null; + List daughters = new ArrayList<>(); + while (rs.next()) { + PartitionMetadata pm = new PartitionMetadata(rs); + if (pm.endTime > 0) { + if (pm.startKey == null && pm.endKey == null) { + grandparent = pm; + } else { + splitParent = pm; + } + } else if (pm.startKey == null) { + unSplitParent = pm; + } else { + daughters.add(pm); + } + } + assertNotNull(grandparent); + assertNotNull(unSplitParent); + assertNotNull(splitParent); + assertEquals(2, daughters.size()); + assertEquals(daughters.get(0).startTime, splitParent.endTime); + assertEquals(daughters.get(1).startTime, splitParent.endTime); + assertEquals(splitParent.partitionId, daughters.get(0).parentPartitionId); + assertEquals(splitParent.partitionId, daughters.get(1).parentPartitionId); + assertTrue(daughters.stream().anyMatch(d -> d.startKey[0] == 'l' && d.endKey[0] == 'q')); + assertTrue(daughters.stream().anyMatch(d -> d.endKey == null && d.startKey != null && d.startKey[0] == 'q')); + } + + /** + * Split a middle region of the table with non-empty start/end key. + */ + @Test + public void testPartitionMetadataMiddleRegionSplits() throws Exception { + // create table, cdc and bootstrap stream metadata + Connection conn = newConnection(); + String tableName = generateUniqueName(); + createTableAndEnableCDC(conn, tableName); + + //split the only region [null, null] + splitTable(conn, tableName, Bytes.toBytes("d")); + // we have 2 regions - [null, d], [d, null], split the second region + splitTable(conn, tableName, Bytes.toBytes("q")); + // we have 3 regions - [null, d], [d, q], [q, null], split the second region + splitTable(conn, tableName, Bytes.toBytes("j")); + // [null, d], [d, j], [j, q], [q, null] + ResultSet rs = conn.createStatement().executeQuery( + "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'"); + PartitionMetadata parent = null; + List daughters = new ArrayList<>(); + while (rs.next()) { + PartitionMetadata pm = new PartitionMetadata(rs); + if (pm.startKey != null && pm.endKey != null) { + if (pm.endTime > 0) parent = pm; + else daughters.add(pm); + } + } + assertNotNull(parent); + assertEquals(2, daughters.size()); + assertEquals(daughters.get(0).startTime, parent.endTime); + assertEquals(daughters.get(1).startTime, parent.endTime); + assertEquals(parent.partitionId, daughters.get(0).parentPartitionId); + assertEquals(parent.partitionId, daughters.get(1).parentPartitionId); + assertTrue(daughters.stream().anyMatch(d -> d.startKey[0] == 'd' && d.endKey[0] == 'j')); + assertTrue(daughters.stream().anyMatch(d -> d.startKey[0] == 'j' && d.endKey[0] == 'q')); + } + private String getStreamName(Connection conn, String tableName, String cdcName) throws SQLException { return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName, CDCUtil.getCDCCreationTimestamp( - conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName))); + conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName))); } private void assertStreamStatus(Connection conn, String tableName, String streamName, @@ -160,7 +335,7 @@ private void assertStreamStatus(Connection conn, String tableName, String stream + SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME='" + tableName + "' AND STREAM_NAME='" + streamName + "'"); assertTrue(rs.next()); - Assert.assertEquals(status.getSerializedValue(), rs.getString(1)); + assertEquals(status.getSerializedValue(), rs.getString(1)); } private void assertPartitionMetadata(Connection conn, String tableName, String cdcName) @@ -180,4 +355,75 @@ private void assertPartitionMetadata(Connection conn, String tableName, String c assertTrue(rs.next()); } } + + private void createTableAndEnableCDC(Connection conn, String tableName) throws Exception { + String cdcName = generateUniqueName(); + String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName; + conn.createStatement().execute( + "CREATE TABLE " + tableName + " ( k VARCHAR PRIMARY KEY," + " v1 INTEGER," + + " v2 VARCHAR)"); + createCDC(conn, cdc_sql, null); + String streamName = getStreamName(conn, tableName, cdcName); + TaskRegionObserver.SelfHealingTask task = + new TaskRegionObserver.SelfHealingTask( + TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS); + task.run(); + assertStreamStatus(conn, tableName, streamName, CDCUtil.CdcStreamStatus.ENABLED); + + //upsert sample data + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('a', 1, 'foo')"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('b', 2, 'bar')"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('e', 3, 'alice')"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('j', 4, 'bob')"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('m', 5, 'cat')"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('p', 6, 'cat')"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('t', 7, 'cat')"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('z', 8, 'cat')"); + } + + /** + * Split the table at the provided split point. + */ + private void splitTable(Connection conn, String tableName, byte[] splitPoint) throws Exception { + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + Admin admin = services.getAdmin(); + Configuration configuration = + conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration(); + org.apache.hadoop.hbase.client.Connection hbaseConn = + ConnectionFactory.createConnection(configuration); + RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName)); + int nRegions = regionLocator.getAllRegionLocations().size(); + try { + admin.split(TableName.valueOf(tableName), splitPoint); + int retryCount = 0; + do { + Thread.sleep(2000); + retryCount++; + } while (retryCount < 10 && regionLocator.getAllRegionLocations().size() == nRegions); + Assert.assertNotEquals(regionLocator.getAllRegionLocations().size(), nRegions); + } finally { + admin.close(); + } + } + + /** + * Inner class to represent partition metadata for a region i.e. single row from SYSTEM.CDC_STREAM + */ + private class PartitionMetadata { + public String partitionId; + public String parentPartitionId; + public Long startTime; + public Long endTime; + public byte[] startKey; + public byte[] endKey; + + public PartitionMetadata(ResultSet rs) throws SQLException { + partitionId = rs.getString(3); + parentPartitionId = rs.getString(4); + startTime = rs.getLong(5); + endTime = rs.getLong(6); + startKey = rs.getBytes(7); + endKey = rs.getBytes(8); + } + } }