Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector/flink] Report pendingRecords. #157

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@
import com.alibaba.fluss.metrics.CharacterFilter;
import com.alibaba.fluss.metrics.Counter;
import com.alibaba.fluss.metrics.DescriptiveStatisticsHistogram;
import com.alibaba.fluss.metrics.Gauge;
import com.alibaba.fluss.metrics.Histogram;
import com.alibaba.fluss.metrics.MeterView;
import com.alibaba.fluss.metrics.Metric;
import com.alibaba.fluss.metrics.MetricNames;
import com.alibaba.fluss.metrics.ThreadSafeSimpleCounter;
import com.alibaba.fluss.metrics.groups.AbstractMetricGroup;
import com.alibaba.fluss.metrics.groups.GenericMetricGroup;
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;

import java.util.HashMap;
import java.util.Map;

import static com.alibaba.fluss.metrics.utils.MetricGroupUtils.makeScope;
Expand All @@ -55,6 +59,7 @@ public class ScannerMetricGroup extends AbstractMetricGroup {
private volatile double pollIdleRatio;
private volatile long lastPollMs;
private volatile long pollStartMs;
private final Map<Integer, GenericMetricGroup> buckets = new HashMap<>();

public ScannerMetricGroup(ClientMetricGroup parent, TablePath tablePath) {
super(parent.getMetricRegistry(), makeScope(parent, name), parent);
Expand Down Expand Up @@ -120,6 +125,19 @@ private long lastPollSecondsAgo() {
return (System.currentTimeMillis() - lastPollMs) / 1000;
}

public void recordBucketLag(int bucketId, long lag) {
buckets.computeIfAbsent(
bucketId,
(bucket) -> new GenericMetricGroup(registry, this, String.valueOf(bucketId)));
Metric metric = buckets.get(bucketId).metrics().get(bucketRecordsLagMetricName(bucketId));
if (metric == null) {
SimpleGauge simpleGauge = new SimpleGauge(lag);
buckets.get(bucketId).gauge(bucketRecordsLagMetricName(bucketId), simpleGauge);
} else {
((SimpleGauge) metric).setValue(lag);
}
}

@Override
protected String getGroupName(CharacterFilter filter) {
return name;
Expand All @@ -130,4 +148,32 @@ protected final void putVariables(Map<String, String> variables) {
variables.put("database", tablePath.getDatabaseName());
variables.put("table", tablePath.getTableName());
}

private static String bucketRecordsLagMetricName(int bucketId) {
return bucketId + ".records-lag";
}

@Override
public Map<String, Metric> metrics() {
Map<String, Metric> allMetric = new HashMap<>(super.metrics());
buckets.forEach((key, value) -> allMetric.putAll(value.metrics()));
return allMetric;
}

private static class SimpleGauge implements Gauge<Long> {
private long value;

public SimpleGauge(long value) {
this.value = value;
}

@Override
public Long getValue() {
return value;
}

public void setValue(long value) {
this.value = value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.metrics.Metric;
import com.alibaba.fluss.rpc.RpcClient;
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
import com.alibaba.fluss.types.RowType;
Expand Down Expand Up @@ -304,4 +305,9 @@ public void close() {
release();
}
}

@Override
public Map<String, Metric> metrics() {
return Collections.unmodifiableMap(scannerMetricGroup.metrics());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ private synchronized void handleFetchLogResponse(
projection);
logFetchBuffer.add(completedFetch);
}
scannerMetricGroup.recordBucketLag(
tb.getBucket(),
fetchResultForBucket.getHighWatermark() - fetchOffset);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package com.alibaba.fluss.client.scanner.log;

import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.metrics.Metric;

import java.time.Duration;
import java.util.Map;

/**
* The scanner is used to scan log data of specify table from Fluss.
Expand Down Expand Up @@ -120,4 +122,7 @@ default void subscribeFromBeginning(long partitionId, int bucket) {
* #poll(Duration timeout)}.
*/
void wakeup();

/** Get the metrics of the log scanner. */
public Map<String, ? extends Metric> metrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -371,4 +371,8 @@ protected enum ChildType {
VALUE,
GENERIC
}

public Map<String, Metric> metrics() {
return this.metrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@

import com.alibaba.fluss.connector.flink.source.reader.FlinkSourceReader;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metrics.Gauge;
import com.alibaba.fluss.metrics.Metric;

import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;

/**
* A collection class for handling metrics in {@link FlinkSourceReader} of Fluss.
Expand Down Expand Up @@ -64,9 +70,14 @@ public class FlinkSourceReaderMetrics {
// Map for tracking current consuming offsets
private final Map<TableBucket, Long> offsets = new HashMap<>();

// Map for tracking records lag of tableBucket
@Nullable private Map<TableBucket, Metric> recordsLagMetrics;

// For currentFetchEventTimeLag metric
private volatile long currentFetchEventTimeLag = UNINITIALIZED;

public static final String RECORDS_LAG = ".records-lag";

public FlinkSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) {
this.sourceReaderMetricGroup = sourceReaderMetricGroup;
this.flussSourceReaderMetricGroup =
Expand Down Expand Up @@ -125,4 +136,51 @@ private void checkTableBucketTracked(TableBucket tableBucket) {
public SourceReaderMetricGroup getSourceReaderMetricGroup() {
return sourceReaderMetricGroup;
}

public void maybeAddRecordsLagMetric(Map<String, ? extends Metric> metrics, TableBucket tb) {
// Lazily register pendingRecords
if (recordsLagMetrics == null) {
this.recordsLagMetrics = new ConcurrentHashMap<>();
this.sourceReaderMetricGroup.setPendingRecordsGauge(
() -> {
long pendingRecordsTotal = 0;
for (Metric recordsLagMetric : this.recordsLagMetrics.values()) {
pendingRecordsTotal +=
Long.parseLong(
((Gauge<?>) recordsLagMetric).getValue().toString());
}
return pendingRecordsTotal;
});
}
recordsLagMetrics.computeIfAbsent(tb, (ignored) -> getRecordsLagMetric(metrics, tb));
}

private @Nullable Metric getRecordsLagMetric(
Map<String, ? extends Metric> metrics, TableBucket tb) {
try {
int bucket = tb.getBucket();
Predicate<Map.Entry<String, ? extends Metric>> filter =
entry -> {
final String metricName = entry.getKey();
return metricName.equals(bucket + RECORDS_LAG);
};
return metrics.entrySet().stream()
.filter(filter)
.map(Map.Entry::getValue)
.findFirst()
.orElseThrow(
() ->
new IllegalStateException(
"Cannot find fluss metric matching current filter."));
} catch (IllegalStateException e) {
LOG.warn(
String.format(
"Error when getting fluss log scanner metric \"%s\" "
+ "for bucket \"%s\". "
+ "Metric \"%s\" may not be reported correctly. ",
RECORDS_LAG, tb, MetricNames.PENDING_RECORDS),
e);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ private FlinkRecordsWithSplitIds forLogRecords(ScanRecords scanRecords) {
}
}
splitRecords.put(splitId, toRecordAndPos(bucketScanRecords.iterator()));
flinkSourceReaderMetrics.maybeAddRecordsLagMetric(logScanner.metrics(), scanBucket);
}
Iterator<TableBucket> buckets = tableScanBuckets.iterator();
Iterator<String> splitIterator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.table.api.ValidationException;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -207,6 +208,81 @@ void testHandleLogSplitChangesAndFetch() throws Exception {
}
}

@Test
void testPendingRecords() throws Exception {
final Schema schema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.build();

final TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(schema).distributedBy(1).build();
TablePath tablePath1 = TablePath.of(DEFAULT_DB, "test-only-log-table");
long tableId = createTable(tablePath1, tableDescriptor);
MetricListener metricListener = new MetricListener();
FlinkSourceReaderMetrics flinkSourceReaderMetrics =
new FlinkSourceReaderMetrics(
InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));

try (FlinkSourceSplitReader splitReader =
createSplitReader(tablePath1, schema.toRowType(), flinkSourceReaderMetrics)) {

// no any records
List<SourceSplitBase> logSplits = new ArrayList<>();
Map<String, List<RecordAndPos>> expectedRecords = new HashMap<>();
assignSplitsAndFetchUntilRetrieveRecords(
splitReader, logSplits, expectedRecords, schema.toRowType());

assertThat(metricListener.getGauge(MetricNames.PENDING_RECORDS)).isNotPresent();

int rowCnt = 600;
// now, write some records into the table
List<InternalRow> internalRows = appendRows(tablePath1, rowCnt);
List<RecordAndPos> expected = new ArrayList<>(internalRows.size());
for (int i = 0; i < internalRows.size(); i++) {
expected.add(
new RecordAndPos(
new ScanRecord(i, i, RowKind.APPEND_ONLY, internalRows.get(i))));
}

TableBucket tableBucket = new TableBucket(tableId, 0);
String splitId = toLogSplitId(tableBucket);
expectedRecords.put(splitId, expected);

logSplits.add(new LogSplit(tableBucket, null, 0L));
assignSplits(splitReader, logSplits);
Set<String> finishedSplits = new HashSet<>();

int cnt = 0;
while (finishedSplits.size() < logSplits.size()) {

RecordsWithSplitIds<RecordAndPos> recordsBySplitIds = splitReader.fetch();
splitId = recordsBySplitIds.nextSplit();
RecordAndPos record;
if (splitId != null) {
while ((record = recordsBySplitIds.nextRecordFromSplit()) != null) {
cnt++;
}
// because lazy update this metric,client fetch all record in one batch
assertThat(
metricListener
.getGauge(MetricNames.PENDING_RECORDS)
.get()
.getValue())
.isEqualTo(600L);
if (cnt >= rowCnt) {
finishedSplits.add(splitId);
}
}
}
RecordsWithSplitIds<RecordAndPos> recordsBySplitIds = splitReader.fetch();
// client update metric
assertThat(metricListener.getGauge(MetricNames.PENDING_RECORDS).get().getValue())
.isEqualTo(0L);
}
}

@Test
void testHandleMixSnapshotLogSplitChangesAndFetch() throws Exception {
TablePath tablePath = TablePath.of(DEFAULT_DB, "test-mix-snapshot-log-table");
Expand Down Expand Up @@ -343,6 +419,14 @@ private FlinkSourceSplitReader createSplitReader(TablePath tablePath, RowType ro
clientConf, tablePath, rowType, null, createMockSourceReaderMetrics());
}

private FlinkSourceSplitReader createSplitReader(
TablePath tablePath,
RowType rowType,
FlinkSourceReaderMetrics flinkSourceReaderMetrics) {
return new FlinkSourceSplitReader(
clientConf, tablePath, rowType, null, flinkSourceReaderMetrics);
}

private FlinkSourceReaderMetrics createMockSourceReaderMetrics() {
MetricListener metricListener = new MetricListener();
return new FlinkSourceReaderMetrics(
Expand Down