Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client] RecordAccumulator's ready method will be blocked if no batch is ready. #322

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.GuardedBy;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand All @@ -56,9 +58,13 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID;
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand Down Expand Up @@ -107,6 +113,11 @@ public final class RecordAccumulator {

private final IdempotenceManager idempotenceManager;

private final ReentrantLock batchReadyLock = new ReentrantLock();

@GuardedBy("lock")
private final Condition batchIsReady = batchReadyLock.newCondition();

// TODO add retryBackoffMs to retry the produce request upon receiving an error.
// TODO add deliveryTimeoutMs to report success or failure on record delivery.
// TODO add nextBatchExpiryTimeMs
Expand Down Expand Up @@ -232,6 +243,10 @@ public RecordAppendResult append(
* </pre>
*/
public ReadyCheckResult ready(Cluster cluster) {
return ready(cluster, batchTimeoutMs);
}

public ReadyCheckResult ready(Cluster cluster, long timeoutMs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to provide this method? only testing purpose? If yes, why not use the batchTimeoutMs parameter in tests?

Set<ServerNode> readyNodes = new HashSet<>();
Set<PhysicalTablePath> unknownLeaderTables = new HashSet<>();
// Go table by table so that we can get queue sizes for buckets in a table and calculate
Expand All @@ -244,6 +259,9 @@ public ReadyCheckResult ready(Cluster cluster) {
readyNodes,
unknownLeaderTables,
cluster));
if (readyNodes.isEmpty()) {
waitBatchReady(timeoutMs);
}

// TODO and the earliest time at which any non-send-able bucket will be ready;

Expand Down Expand Up @@ -402,59 +420,59 @@ private void bucketReady(
Map<Integer, Deque<WriteBatch>> batches = bucketAndWriteBatches.batches;
// Collect the queue sizes for available buckets to be used in adaptive bucket allocate.

boolean exhausted = writerBufferPool.queued() > 0;
batches.forEach(
(bucketId, deque) -> {
TableBucket tableBucket = cluster.getTableBucket(physicalTablePath, bucketId);
ServerNode leader = cluster.leaderFor(tableBucket);
final long waitedTimeMs;
final int dequeSize;
final boolean full;

// Note: this loop is especially hot with large bucket counts.
// We are careful to only perform the minimum required inside the synchronized
// block, as this lock is also used to synchronize writer threads
// attempting to append() to a bucket/batch.
synchronized (deque) {
// Deque are often empty in this path, esp with large bucket counts,
// so we exit early if we can.
WriteBatch batch = deque.peekFirst();
if (batch == null) {
return;
}

waitedTimeMs = batch.waitedTimeMs(System.currentTimeMillis());
dequeSize = deque.size();
full = dequeSize > 1 || batch.isClosed();
}

if (leader == null) {
// This is a bucket for which leader is not known, but messages are
// available to send. Note that entries are currently not removed from
// batches when deque is empty.
unknownLeaderTables.add(physicalTablePath);
} else {
batchReady(exhausted, leader, waitedTimeMs, full, readyNodes);
batchReady(deque, leader, readyNodes);
}
});
}

private void batchReady(
boolean exhausted,
ServerNode leader,
long waitedTimeMs,
boolean full,
Set<ServerNode> readyNodes) {
Deque<WriteBatch> deque, ServerNode leader, Set<ServerNode> readyNodes) {
if (!readyNodes.contains(leader)) {
// if the wait time larger than lingerMs, we can send this batch even if it is not full.
boolean expired = waitedTimeMs >= (long) batchTimeoutMs;
boolean sendAble = full || expired || exhausted || closed || flushInProgress();
if (sendAble) {
if (batchReady(deque)) {
readyNodes.add(leader);
}
}
}

private boolean batchReady(Deque<WriteBatch> deque) {
final long waitedTimeMs;
final int dequeSize;
final boolean full;

boolean exhausted = writerBufferPool.queued() > 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be moved out of batches foreach, as it requires to obtain a lock.

// Note: this loop is especially hot with large bucket counts.
// We are careful to only perform the minimum required inside the synchronized
// block, as this lock is also used to synchronize writer threads
// attempting to append() to a bucket/batch.
synchronized (deque) {
// Deque are often empty in this path, esp with large bucket counts,
// so we exit early if we can.
WriteBatch batch = deque.peekFirst();
if (batch == null) {
return false;
}

waitedTimeMs = batch.waitedTimeMs(System.currentTimeMillis());
dequeSize = deque.size();
full = dequeSize > 1 || batch.isClosed();
}
// if the wait time larger than lingerMs, we can send this batch even if it is not full.
boolean expired = waitedTimeMs >= (long) batchTimeoutMs;
return full || expired || exhausted || closed || flushInProgress();
}

/**
* Are there any threads currently waiting on a flush?
*
Expand Down Expand Up @@ -532,6 +550,7 @@ private RecordAppendResult tryAppend(
WriteBatch last = deque.peekLast();
if (last != null) {
boolean success = last.tryAppend(writeRecord, callback);
notifyBatchReady(deque);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why here need to notify batch ready?

Copy link
Collaborator Author

@loserwang1024 loserwang1024 Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wuchong If not try to notify batch after each insert, the RecordAccumulator#ready will be blocked until timeout. I use use a producer-consumer-mode in RecordAccumulator, which block in consumer and signal by producer.

Maybe I do same thing as kafka, accumulator.ready return the min wait time of each bucket, and then sender suspends with Thread.sleep(nextReadyCheckDelayMs) and doesn't signal by consumer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean why notify for each record? Not when a batch is ready? It will result in busy loop that the sender doesn't wait at all.

if (!success) {
last.close();
} else {
Expand Down Expand Up @@ -760,6 +779,30 @@ private void insertInSequenceOrder(Deque<WriteBatch> deque, WriteBatch batch) {
}
}

private void notifyBatchReady(Deque<WriteBatch> deque) {
inLock(
batchReadyLock,
() -> {
if (batchReady(deque)) {
batchIsReady.signal();
}
});
}

private void waitBatchReady(long timeoutMs) {
try {
inLock(
batchReadyLock,
() -> {
if (timeoutMs > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be checked before lock .

batchIsReady.await(timeoutMs, TimeUnit.MILLISECONDS);
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

/** Metadata about a record just appended to the record accumulator. */
public static final class RecordAppendResult {
public final boolean batchIsFull;
Expand Down Expand Up @@ -790,7 +833,7 @@ public ReadyCheckResult(
/** Close this accumulator and force all the record buffers to be drained. */
public void close() {
closed = true;

inLock(batchReadyLock, batchIsReady::signalAll);
writerBufferPool.close();
arrowWriterPool.close();
bufferAllocator.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,6 @@ private void sendWriteData() throws Exception {
}

Set<ServerNode> readyNodes = readyCheckResult.readyNodes;
if (readyNodes.isEmpty()) {
// TODO The method sendWriteData is in a busy loop. If there is no data continuously, it
// will cause the CPU to be occupied. Currently, we just sleep 1 second to avoid this.
// In the future, we need to introduce delay logic to deal with it.
Thread.sleep(1);
}

// get the list of batches prepare to send.
Map<Integer, List<WriteBatch>> batches =
accumulator.drain(metadataUpdater.getCluster(), readyNodes, maxRequestSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
Expand Down Expand Up @@ -164,7 +165,7 @@ void testFull() throws Exception {
WriteBatch batch = writeBatches.peekFirst();
assertThat(batch.isClosed()).isFalse();
// No buckets should be ready.
assertThat(accum.ready(cluster).readyNodes.size()).isEqualTo(0);
assertThat(accum.ready(cluster, 10).readyNodes.size()).isEqualTo(0);
}

// this appends doesn't fit in the first batch, so a new batch is created and the first
Expand All @@ -175,7 +176,7 @@ void testFull() throws Exception {
Iterator<WriteBatch> bucketBatchesIterator = writeBatches.iterator();
assertThat(bucketBatchesIterator.next().isClosed()).isTrue();
// Bucket's leader should be ready.
assertThat(accum.ready(cluster).readyNodes).isEqualTo(Collections.singleton(node1));
assertThat(accum.ready(cluster, 10).readyNodes).isEqualTo(Collections.singleton(node1));

List<WriteBatch> batches =
accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE)
Expand Down Expand Up @@ -210,7 +211,7 @@ void testAppendLarge() throws Exception {
// row size > 10;
accum.append(createRecord(row1), writeCallback, cluster, 0, false);
// bucket's leader should be ready for bucket0.
assertThat(accum.ready(cluster).readyNodes).isEqualTo(Collections.singleton(node1));
assertThat(accum.ready(cluster, 10).readyNodes).isEqualTo(Collections.singleton(node1));

Deque<WriteBatch> writeBatches = accum.getDeque(DATA1_PHYSICAL_TABLE_PATH, tb1);
assertThat(writeBatches).hasSize(1);
Expand Down Expand Up @@ -265,7 +266,7 @@ void testAppendWithStickyBucketAssigner() throws Exception {
// We only appended if we do not retry.
if (!switchBucket) {
appends++;
assertThat(accum.ready(cluster).readyNodes.size()).isEqualTo(0);
assertThat(accum.ready(cluster, 10).readyNodes.size()).isEqualTo(0);
}
}

Expand Down Expand Up @@ -313,7 +314,7 @@ void testPartialDrain() throws Exception {
}
}

assertThat(accum.ready(cluster).readyNodes).isEqualTo(Collections.singleton(node1));
assertThat(accum.ready(cluster, 10).readyNodes).isEqualTo(Collections.singleton(node1));
List<WriteBatch> batches =
accum.drain(cluster, Collections.singleton(node1), 1024).get(node1.id());
// Due to size bound only one bucket should have been retrieved.
Expand All @@ -330,12 +331,12 @@ void testFlush() throws Exception {
assertThat(accum.hasIncomplete()).isTrue();
}

assertThat(accum.ready(cluster).readyNodes.size()).isEqualTo(0);
assertThat(accum.ready(cluster, 10).readyNodes.size()).isEqualTo(0);

accum.beginFlush();
// drain and deallocate all batches.
Map<Integer, List<WriteBatch>> results =
accum.drain(cluster, accum.ready(cluster).readyNodes, Integer.MAX_VALUE);
accum.drain(cluster, accum.ready(cluster, 10).readyNodes, Integer.MAX_VALUE);
assertThat(accum.hasIncomplete()).isTrue();

for (List<WriteBatch> batches : results.values()) {
Expand Down Expand Up @@ -363,7 +364,7 @@ void testTableWithUnknownLeader() throws Exception {
cluster = updateCluster(Collections.singletonList(bucket1));

accum.append(createRecord(row), writeCallback, cluster, 0, false);
RecordAccumulator.ReadyCheckResult readyCheckResult = accum.ready(cluster);
RecordAccumulator.ReadyCheckResult readyCheckResult = accum.ready(cluster, 10);
assertThat(readyCheckResult.unknownLeaderTables)
.isEqualTo(Collections.singleton(DATA1_PHYSICAL_TABLE_PATH));
assertThat(readyCheckResult.readyNodes.size()).isEqualTo(0);
Expand All @@ -374,7 +375,7 @@ void testTableWithUnknownLeader() throws Exception {
// update the bucket info with leader.
cluster = updateCluster(Collections.singletonList(bucket1));

readyCheckResult = accum.ready(cluster);
readyCheckResult = accum.ready(cluster, 10);
assertThat(readyCheckResult.unknownLeaderTables).isEmpty();
assertThat(readyCheckResult.readyNodes.size()).isEqualTo(1);
}
Expand All @@ -391,6 +392,41 @@ void testAwaitFlushComplete() throws Exception {
assertThatThrownBy(accum::awaitFlushCompletion).isInstanceOf(InterruptedException.class);
}

@Test
void testReadyWithTimeOut() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this test? Do we need to check the accumulator.ready result?

Maybe we can take Kafka's RecordAccumulatorTest#testNextReadyCheckDelay as an example.

// test case: node1(tb1, tb2), node2(tb3).
IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {1, "a"});
long batchSize = getTestBatchSize(row);
RecordAccumulator accum = createTestRecordAccumulator((int) batchSize, batchSize * 2);

// add bucket into cluster.
cluster = updateCluster(Arrays.asList(bucket1, bucket2, bucket3, bucket4));

// wait for ready timeout.
CompletableFuture<Void> future =
CompletableFuture.runAsync(
() -> {
accum.ready(cluster, 500L);
});
assertThat(future).isNotCompleted();
Thread.sleep(1000L);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use future.get() or future.join() to wait async result. Never use sleep as it is not guaranteed to finish in the sleep time in different testing environment, and makes the tests unstable.

assertThat(future).isCompleted();

// wait for ready success.
future =
CompletableFuture.runAsync(
() -> {
accum.ready(cluster, 60 * 60 * 1000L);
});
Thread.sleep(1000L);
assertThat(future).isNotCompleted();
// initial data.
accum.append(createRecord(row), writeCallback, cluster, 0, false);
accum.append(createRecord(row), writeCallback, cluster, 0, false);
Thread.sleep(100L);
assertThat(future).isCompleted();
}

private WriteRecord createRecord(InternalRow row) {
return new WriteRecord(DATA1_PHYSICAL_TABLE_PATH, WriteKind.APPEND, row, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@
import com.alibaba.fluss.rpc.protocol.Errors;
import com.alibaba.fluss.server.tablet.TestTabletServerGateway;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
Expand All @@ -61,6 +64,7 @@ final class SenderTest {
private static final int REQUEST_TIMEOUT = 5000;
private static final short ACKS_ALL = -1;
private static final int MAX_INFLIGHT_REQUEST_PER_BUCKET = 5;
private static final Logger log = LoggerFactory.getLogger(SenderTest.class);

private final TableBucket tb1 = new TableBucket(DATA1_TABLE_ID, 0);
private TestingMetadataUpdater metadataUpdater;
Expand All @@ -77,6 +81,11 @@ public void setup() {
sender = setupWithIdempotenceState();
}

@AfterEach
public void teardown() throws Exception {
sender.forceClose();
}

@Test
void testSimple() throws Exception {
long offset = 0;
Expand Down Expand Up @@ -240,15 +249,17 @@ void testIdempotenceWithMaxInflightBatch() throws Exception {
sender1.runOnce();
assertThat(idempotenceManager.inflightBatchSize(tb1))
.isEqualTo(MAX_INFLIGHT_REQUEST_PER_BUCKET);
assertThat(accumulator.ready(metadataUpdater.getCluster()).readyNodes.size()).isEqualTo(1);
assertThat(accumulator.ready(metadataUpdater.getCluster(), 10).readyNodes.size())
.isEqualTo(1);

// finish the first batch, the latest batch will be drained from the accumulator.
finishIdempotentProduceLogRequest(0, tb1, 0, createProduceLogResponse(tb1, 0L, 1L));
sender1.runOnce(); // receive response 0.
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isEqualTo(Optional.of(0));
assertThat(idempotenceManager.inflightBatchSize(tb1))
.isEqualTo(MAX_INFLIGHT_REQUEST_PER_BUCKET);
assertThat(accumulator.ready(metadataUpdater.getCluster()).readyNodes.size()).isEqualTo(0);
assertThat(accumulator.ready(metadataUpdater.getCluster(), 10).readyNodes.size())
.isEqualTo(0);
}

@Test
Expand Down
Loading