Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-7310 : Add block bytes scanned and fs read time to our scan metrics #2013

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REMOTE_RPC_RETRIES_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
import static org.apache.phoenix.exception.SQLExceptionCode.OPERATION_TIMED_OUT;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BLOCK_BYTES_SCANNED;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_IN_REMOTE_RESULTS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS;
Expand Down Expand Up @@ -165,6 +167,8 @@ private void updateMetrics() {
changeMetric(scanMetricsHolder.getCountOfBytesScanned(),
scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfRowsPaged(), dummyRowCounter);
changeMetric(scanMetricsHolder.getCountOfBlockBytesScanned(),
scanMetricsMap.get(BLOCK_BYTES_SCANNED_KEY_METRIC_NAME));

changeMetric(GLOBAL_SCAN_BYTES,
scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
Expand All @@ -190,7 +194,6 @@ private void updateMetrics() {
scanMetricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_ROWS_FILTERED,
scanMetricsMap.get(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME));

changeMetric(GLOBAL_PAGED_ROWS_COUNTER, dummyRowCounter);

scanMetricsUpdated = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.phoenix.monitoring.MetricType.CLIENT_METADATA_CACHE_ESTIMATED_USED_SIZE;
import static org.apache.phoenix.monitoring.MetricType.CLIENT_METADATA_CACHE_EVICTION_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.CLIENT_METADATA_CACHE_REMOVAL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.COUNT_BLOCK_BYTES_SCANNED;
import static org.apache.phoenix.monitoring.MetricType.HCONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME;
Expand Down Expand Up @@ -145,7 +146,7 @@ public enum GlobalClientMetrics {
GLOBAL_HBASE_COUNT_ROWS_SCANNED(COUNT_ROWS_SCANNED),
GLOBAL_HBASE_COUNT_ROWS_FILTERED(COUNT_ROWS_FILTERED),
GLOBAL_HBASE_COUNTER_METADATA_INCONSISTENCY(COUNTER_METADATA_INCONSISTENCY),

GLOBAL_HBASE_COUNT_BLOCK_BYTES_SCANNED(COUNT_BLOCK_BYTES_SCANNED),
GLOBAL_HA_PARALLEL_POOL1_TASK_QUEUE_WAIT_TIME(HA_PARALLEL_POOL1_TASK_QUEUE_WAIT_TIME),
GLOBAL_HA_PARALLEL_POOL1_TASK_END_TO_END_TIME(HA_PARALLEL_POOL1_TASK_END_TO_END_TIME),
GLOBAL_HA_PARALLEL_POOL1_TASK_EXECUTION_TIME(HA_PARALLEL_POOL1_TASK_EXECUTION_TIME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ public enum MetricType {
TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS("tsistrc", "Time spent in RPC calls for systemTable lookup",
LogLevel.DEBUG,PLong.INSTANCE),

COUNT_BLOCK_BYTES_SCANNED("bbs", "Count of Block Bytes Scanned",
LogLevel.DEBUG,PLong.INSTANCE),

//HA Related Metrics
HA_PARALLEL_COUNT_OPERATIONS_ACTIVE_CLUSTER("hpoac","Number of Operations to the active cluster",LogLevel.DEBUG,PLong.INSTANCE),
HA_PARALLEL_COUNT_OPERATIONS_STANDBY_CLUSTER("hposc","Number of Operations to the standby cluster",LogLevel.DEBUG,PLong.INSTANCE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.phoenix.monitoring;

import static org.apache.phoenix.monitoring.MetricType.COUNT_BLOCK_BYTES_SCANNED;
import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_IN_REMOTE_RESULTS;
import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_REGION_SERVER_RESULTS;
import static org.apache.phoenix.monitoring.MetricType.COUNT_MILLS_BETWEEN_NEXTS;
Expand Down Expand Up @@ -53,6 +54,8 @@ public class ScanMetricsHolder {
private final CombinableMetric countOfRowsFiltered;
private final CombinableMetric countOfBytesScanned;
private final CombinableMetric countOfRowsPaged;

private final CombinableMetric countOfBlockBytesScanned;
private Map<String, Long> scanMetricMap;
private Object scan;

Expand Down Expand Up @@ -84,6 +87,7 @@ private ScanMetricsHolder(ReadMetricQueue readMetrics, String tableName,Scan sca
countOfRowsFiltered = readMetrics.allotMetric(COUNT_ROWS_FILTERED, tableName);
countOfBytesScanned = readMetrics.allotMetric(SCAN_BYTES,tableName);
countOfRowsPaged = readMetrics.allotMetric(PAGED_ROWS_COUNTER, tableName);
countOfBlockBytesScanned = readMetrics.allotMetric(COUNT_BLOCK_BYTES_SCANNED, tableName);
}

public CombinableMetric getCountOfRemoteRPCcalls() {
Expand Down Expand Up @@ -142,6 +146,10 @@ public CombinableMetric getCountOfRowsPaged() {
return countOfRowsPaged;
}

public CombinableMetric getCountOfBlockBytesScanned() {
return countOfBlockBytesScanned;
}

public void setScanMetricMap(Map<String, Long> scanMetricMap) {
this.scanMetricMap = scanMetricMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;

import static org.apache.phoenix.monitoring.MetricType.COUNT_BLOCK_BYTES_SCANNED;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
Expand Down Expand Up @@ -140,7 +141,8 @@ public enum TableMetrics {
TABLE_NUM_SYSTEM_TABLE_RPC_SUCCESS(NUM_SYSTEM_TABLE_RPC_SUCCESS),
TABLE_NUM_SYSTEM_TABLE_RPC_FAILURES(NUM_SYSTEM_TABLE_RPC_FAILURES),
TABLE_NUM_METADATA_LOOKUP_FAILURES(NUM_METADATA_LOOKUP_FAILURES),
TABLE_TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS(TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS);
TABLE_TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS(TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS),
TABLE_COUNT_BLOCK_BYTES_SCANNED(COUNT_BLOCK_BYTES_SCANNED);

private MetricType metricType;
private PhoenixTableMetric metric;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public static void updateMetrics(ScannerContext src, ScannerContext dst) {
for (Map.Entry<String, Long> entry : src.getMetrics().getMetricsMap().entrySet()) {
dst.metrics.addToCounter(entry.getKey(), entry.getValue());
}
dst.incrementBlockProgress((int) src.getBlockSizeProgress());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;

import org.apache.hadoop.hbase.TableName;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(NeedsOwnMiniClusterTest.class)
public class BlockBytesScannedMetricIT extends BaseTest {

@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "true");
setUpTestDriver(new ReadOnlyProps(props));
}

@Test
public void testPointLookupBlockBytesScannedMetric() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
String tableName = generateUniqueName();
PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
stmt.execute("CREATE TABLE " + tableName
+ " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)");
for (int i = 1; i <= 10; i++) {
String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", tableName, i, i);
stmt.execute(sql);
}
conn.commit();

String POINT_LOOKUP_QUERY = "SELECT * FROM " + tableName + " WHERE A = 9";

// read from memory, block bytes read should be 0
long count0 = countBlockBytesScannedFromSql(stmt, POINT_LOOKUP_QUERY);
Assert.assertTrue(count0 == 0);

// flush and clear block cache
flush(tableName);
clearBlockCache(tableName);

long count1 = countBlockBytesScannedFromSql(stmt, POINT_LOOKUP_QUERY);
Assert.assertTrue(count1 > 0);
}

@Test
public void testRangeScanBlockBytesScannedMetric() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
String tableName = generateUniqueName();
PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
// create table with small block size and upsert enough rows to have at least 2 blocks
stmt.execute("CREATE TABLE " + tableName
+ " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG) BLOCKSIZE=200");
for (int i = 1; i <= 20; i++) {
String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", tableName, i, i);
stmt.execute(sql);
}
conn.commit();
flush(tableName);
clearBlockCache(tableName);

String RANGE_SCAN_QUERY = "SELECT * FROM " + tableName + " WHERE A > 14 AND A < 18";
String SERVER_FILTER_QUERY = "SELECT * FROM " + tableName + " WHERE Z > 14 AND Z < 18";
String SELECT_ALL_QUERY = "SELECT * FROM " + tableName;

long count1 = countBlockBytesScannedFromSql(stmt, RANGE_SCAN_QUERY);
Assert.assertTrue(count1 > 0);

long count2 = countBlockBytesScannedFromSql(stmt, SERVER_FILTER_QUERY);
Assert.assertTrue(count2 > 0);
// where clause has non PK column, will have to scan all rows
Assert.assertTrue(count2 > count1);

long count3 = countBlockBytesScannedFromSql(stmt, SELECT_ALL_QUERY);
Assert.assertTrue(count3 > 0);
// should be same as previous query which also scans all rows
Assert.assertEquals(count3, count2);
}

private void clearBlockCache(String tableName) {
palashc marked this conversation as resolved.
Show resolved Hide resolved
HRegionServer regionServer = utility.getMiniHBaseCluster().getRegionServer(0);
for (HRegion region : regionServer.getRegions(TableName.valueOf(tableName))) {
regionServer.clearRegionBlockCache(region);
}
}

private void flush(String tableName) throws IOException {
palashc marked this conversation as resolved.
Show resolved Hide resolved
HRegionServer regionServer = utility.getMiniHBaseCluster().getRegionServer(0);
for (HRegion region : regionServer.getRegions(TableName.valueOf(tableName))) {
region.flush(true);
}
}


private long countBlockBytesScannedFromSql(Statement stmt, String sql) throws SQLException {
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
// loop to the end
}
return getBlockBytesScanned(rs);
}

private long getBlockBytesScanned(ResultSet rs) throws SQLException {
if (!(rs instanceof PhoenixResultSet)) {
return -1;
}
Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);

long sum = 0;
boolean valid = false;
for (Map.Entry<String, Map<MetricType, Long>> entry : metrics.entrySet()) {
Long val = entry.getValue().get(MetricType.COUNT_BLOCK_BYTES_SCANNED);
if (val != null) {
sum += val.longValue();
valid = true;
}
}
if (valid) {
return sum;
} else {
return -1;
}
}
}