Skip to content

Commit

Permalink
[connector/flink] Report pendingRecords.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alibaba-HZY committed Dec 10, 2024
1 parent 72f1e0c commit a226cd6
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@
import com.alibaba.fluss.metrics.CharacterFilter;
import com.alibaba.fluss.metrics.Counter;
import com.alibaba.fluss.metrics.DescriptiveStatisticsHistogram;
import com.alibaba.fluss.metrics.Gauge;
import com.alibaba.fluss.metrics.Histogram;
import com.alibaba.fluss.metrics.MeterView;
import com.alibaba.fluss.metrics.Metric;
import com.alibaba.fluss.metrics.MetricNames;
import com.alibaba.fluss.metrics.ThreadSafeSimpleCounter;
import com.alibaba.fluss.metrics.groups.AbstractMetricGroup;
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
import com.alibaba.fluss.server.metrics.group.BucketMetricGroup;

import java.util.HashMap;
import java.util.Map;

import static com.alibaba.fluss.metrics.utils.MetricGroupUtils.makeScope;
Expand All @@ -55,6 +59,7 @@ public class ScannerMetricGroup extends AbstractMetricGroup {
private volatile double pollIdleRatio;
private volatile long lastPollMs;
private volatile long pollStartMs;
private final Map<Integer, BucketMetricGroup> buckets = new HashMap<>();

public ScannerMetricGroup(ClientMetricGroup parent, TablePath tablePath) {
super(parent.getMetricRegistry(), makeScope(parent, name), parent);
Expand Down Expand Up @@ -120,6 +125,18 @@ private long lastPollSecondsAgo() {
return (System.currentTimeMillis() - lastPollMs) / 1000;
}

public void recordBucketLag(int bucketId, long lag) {
buckets.computeIfAbsent(
bucketId, (bucket) -> new BucketMetricGroup(registry, bucketId, this));
Metric metric = buckets.get(bucketId).metrics().get(bucketRecordsLagMetricName(bucketId));
if (metric == null) {
SimpleGauge simpleGauge = new SimpleGauge(lag);
buckets.get(bucketId).gauge(bucketRecordsLagMetricName(bucketId), simpleGauge);
} else {
((SimpleGauge) metric).setValue(lag);
}
}

@Override
protected String getGroupName(CharacterFilter filter) {
return name;
Expand All @@ -130,4 +147,32 @@ protected final void putVariables(Map<String, String> variables) {
variables.put("database", tablePath.getDatabaseName());
variables.put("table", tablePath.getTableName());
}

private static String bucketRecordsLagMetricName(int bucketId) {
return bucketId + ".records-lag";
}

@Override
public Map<String, Metric> metrics() {
Map<String, Metric> allMetric = new HashMap<>(super.metrics());
buckets.forEach((key, value) -> allMetric.putAll(value.metrics()));
return allMetric;
}

private static class SimpleGauge implements Gauge<Long> {
private long value;

public SimpleGauge(long value) {
this.value = value;
}

@Override
public Long getValue() {
return value;
}

public void setValue(long value) {
this.value = value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.metrics.Metric;
import com.alibaba.fluss.rpc.RpcClient;
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
import com.alibaba.fluss.types.RowType;
Expand Down Expand Up @@ -304,4 +305,9 @@ public void close() {
release();
}
}

@Override
public Map<String, Metric> metrics() {
return Collections.unmodifiableMap(scannerMetricGroup.metrics());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ private synchronized void handleFetchLogResponse(
projection);
logFetchBuffer.add(completedFetch);
}
scannerMetricGroup.recordBucketLag(
tb.getBucket(),
fetchResultForBucket.getHighWatermark() - fetchOffset);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package com.alibaba.fluss.client.scanner.log;

import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.metrics.Metric;

import java.time.Duration;
import java.util.Map;

/**
* The scanner is used to scan log data of specify table from Fluss.
Expand Down Expand Up @@ -120,4 +122,7 @@ default void subscribeFromBeginning(long partitionId, int bucket) {
* #poll(Duration timeout)}.
*/
void wakeup();

/** Get the metrics of the log scanner. */
public Map<String, ? extends Metric> metrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -371,4 +371,8 @@ protected enum ChildType {
VALUE,
GENERIC
}

public Map<String, Metric> metrics() {
return this.metrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@

import com.alibaba.fluss.connector.flink.source.reader.FlinkSourceReader;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metrics.Gauge;
import com.alibaba.fluss.metrics.Metric;

import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;

/**
* A collection class for handling metrics in {@link FlinkSourceReader} of Fluss.
Expand Down Expand Up @@ -64,9 +70,14 @@ public class FlinkSourceReaderMetrics {
// Map for tracking current consuming offsets
private final Map<TableBucket, Long> offsets = new HashMap<>();

// Map for tracking records lag of tableBucket
@Nullable private Map<TableBucket, Metric> recordsLagMetrics;

// For currentFetchEventTimeLag metric
private volatile long currentFetchEventTimeLag = UNINITIALIZED;

public static final String RECORDS_LAG = ".records-lag";

public FlinkSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) {
this.sourceReaderMetricGroup = sourceReaderMetricGroup;
this.flussSourceReaderMetricGroup =
Expand Down Expand Up @@ -125,4 +136,51 @@ private void checkTableBucketTracked(TableBucket tableBucket) {
public SourceReaderMetricGroup getSourceReaderMetricGroup() {
return sourceReaderMetricGroup;
}

public void maybeAddRecordsLagMetric(Map<String, ? extends Metric> metrics, TableBucket tb) {
// Lazily register pendingRecords
if (recordsLagMetrics == null) {
this.recordsLagMetrics = new ConcurrentHashMap<>();
this.sourceReaderMetricGroup.setPendingRecordsGauge(
() -> {
long pendingRecordsTotal = 0;
for (Metric recordsLagMetric : this.recordsLagMetrics.values()) {
pendingRecordsTotal +=
Long.parseLong(
((Gauge<?>) recordsLagMetric).getValue().toString());
}
return pendingRecordsTotal;
});
}
recordsLagMetrics.computeIfAbsent(tb, (ignored) -> getRecordsLagMetric(metrics, tb));
}

private @Nullable Metric getRecordsLagMetric(
Map<String, ? extends Metric> metrics, TableBucket tb) {
try {
int bucket = tb.getBucket();
Predicate<Map.Entry<String, ? extends Metric>> filter =
entry -> {
final String metricName = entry.getKey();
return metricName.equals(bucket + RECORDS_LAG);
};
return metrics.entrySet().stream()
.filter(filter)
.map(Map.Entry::getValue)
.findFirst()
.orElseThrow(
() ->
new IllegalStateException(
"Cannot find fluss metric matching current filter."));
} catch (IllegalStateException e) {
LOG.warn(
String.format(
"Error when getting fluss log scanner metric \"%s\" "
+ "for bucket \"%s\". "
+ "Metric \"%s\" may not be reported correctly. ",
RECORDS_LAG, tb, MetricNames.PENDING_RECORDS),
e);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ private FlinkRecordsWithSplitIds forLogRecords(ScanRecords scanRecords) {
}
}
splitRecords.put(splitId, toRecordAndPos(bucketScanRecords.iterator()));
flinkSourceReaderMetrics.maybeAddRecordsLagMetric(logScanner.metrics(), scanBucket);
}
Iterator<TableBucket> buckets = tableScanBuckets.iterator();
Iterator<String> splitIterator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.table.api.ValidationException;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -207,6 +208,81 @@ void testHandleLogSplitChangesAndFetch() throws Exception {
}
}

@Test
void testPendingRecords() throws Exception {
final Schema schema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.build();

final TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(schema).distributedBy(1).build();
TablePath tablePath1 = TablePath.of(DEFAULT_DB, "test-only-log-table");
long tableId = createTable(tablePath1, tableDescriptor);
MetricListener metricListener = new MetricListener();
FlinkSourceReaderMetrics flinkSourceReaderMetrics =
new FlinkSourceReaderMetrics(
InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));

try (FlinkSourceSplitReader splitReader =
createSplitReader(tablePath1, schema.toRowType(), flinkSourceReaderMetrics)) {

// no any records
List<SourceSplitBase> logSplits = new ArrayList<>();
Map<String, List<RecordAndPos>> expectedRecords = new HashMap<>();
assignSplitsAndFetchUntilRetrieveRecords(
splitReader, logSplits, expectedRecords, schema.toRowType());

assertThat(metricListener.getGauge(MetricNames.PENDING_RECORDS)).isNotPresent();

int rowCnt = 600;
// now, write some records into the table
List<InternalRow> internalRows = appendRows(tablePath1, rowCnt);
List<RecordAndPos> expected = new ArrayList<>(internalRows.size());
for (int i = 0; i < internalRows.size(); i++) {
expected.add(
new RecordAndPos(
new ScanRecord(i, i, RowKind.APPEND_ONLY, internalRows.get(i))));
}

TableBucket tableBucket = new TableBucket(tableId, 0);
String splitId = toLogSplitId(tableBucket);
expectedRecords.put(splitId, expected);

logSplits.add(new LogSplit(tableBucket, null, 0L));
assignSplits(splitReader, logSplits);
Set<String> finishedSplits = new HashSet<>();

int cnt = 0;
while (finishedSplits.size() < logSplits.size()) {

RecordsWithSplitIds<RecordAndPos> recordsBySplitIds = splitReader.fetch();
splitId = recordsBySplitIds.nextSplit();
RecordAndPos record;
if (splitId != null) {
while ((record = recordsBySplitIds.nextRecordFromSplit()) != null) {
cnt++;
}
// because lazy update this metric,client fetch all record in one batch
assertThat(
metricListener
.getGauge(MetricNames.PENDING_RECORDS)
.get()
.getValue())
.isEqualTo(600L);
if (cnt >= rowCnt) {
finishedSplits.add(splitId);
}
}
}
RecordsWithSplitIds<RecordAndPos> recordsBySplitIds = splitReader.fetch();
// client update metric
assertThat(metricListener.getGauge(MetricNames.PENDING_RECORDS).get().getValue())
.isEqualTo(0L);
}
}

@Test
void testHandleMixSnapshotLogSplitChangesAndFetch() throws Exception {
TablePath tablePath = TablePath.of(DEFAULT_DB, "test-mix-snapshot-log-table");
Expand Down Expand Up @@ -343,6 +419,14 @@ private FlinkSourceSplitReader createSplitReader(TablePath tablePath, RowType ro
clientConf, tablePath, rowType, null, createMockSourceReaderMetrics());
}

private FlinkSourceSplitReader createSplitReader(
TablePath tablePath,
RowType rowType,
FlinkSourceReaderMetrics flinkSourceReaderMetrics) {
return new FlinkSourceSplitReader(
clientConf, tablePath, rowType, null, flinkSourceReaderMetrics);
}

private FlinkSourceReaderMetrics createMockSourceReaderMetrics() {
MetricListener metricListener = new MetricListener();
return new FlinkSourceReaderMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class BucketMetricGroup extends AbstractMetricGroup {

private final int bucket;

public BucketMetricGroup(MetricRegistry registry, int bucket, PhysicalTableMetricGroup parent) {
public BucketMetricGroup(MetricRegistry registry, int bucket, AbstractMetricGroup parent) {
super(registry, makeScope(parent, String.valueOf(bucket)), parent);
this.bucket = bucket;
}
Expand Down

0 comments on commit a226cd6

Please sign in to comment.