Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client] RecordAccumulator's ready method will be blocked if no batch is ready. #322

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

loserwang1024
Copy link
Collaborator

Purpose

Linked issue: close #306

Tests

com.alibaba.fluss.client.write.RecordAccumulatorTest#testReadyWithTimeOut

@loserwang1024
Copy link
Collaborator Author

@wuchong @swuferhong @luoyuxia , CC

@loserwang1024
Copy link
Collaborator Author

rebase

accum.ready(cluster, 500L);
});
assertThat(future).isNotCompleted();
Thread.sleep(1000L);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use future.get() or future.join() to wait async result. Never use sleep as it is not guaranteed to finish in the sleep time in different testing environment, and makes the tests unstable.

@@ -391,6 +392,41 @@ void testAwaitFlushComplete() throws Exception {
assertThatThrownBy(accum::awaitFlushCompletion).isInstanceOf(InterruptedException.class);
}

@Test
void testReadyWithTimeOut() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this test? Do we need to check the accumulator.ready result?

Maybe we can take Kafka's RecordAccumulatorTest#testNextReadyCheckDelay as an example.

return ready(cluster, batchTimeoutMs);
}

public ReadyCheckResult ready(Cluster cluster, long timeoutMs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to provide this method? only testing purpose? If yes, why not use the batchTimeoutMs parameter in tests?

final int dequeSize;
final boolean full;

boolean exhausted = writerBufferPool.queued() > 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be moved out of batches foreach, as it requires to obtain a lock.

inLock(
batchReadyLock,
() -> {
if (timeoutMs > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be checked before lock .

@@ -532,6 +550,7 @@ private RecordAppendResult tryAppend(
WriteBatch last = deque.peekLast();
if (last != null) {
boolean success = last.tryAppend(writeRecord, callback);
notifyBatchReady(deque);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why here need to notify batch ready?

Copy link
Collaborator Author

@loserwang1024 loserwang1024 Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wuchong If not try to notify batch after each insert, the RecordAccumulator#ready will be blocked until timeout. I use use a producer-consumer-mode in RecordAccumulator, which block in consumer and signal by producer.

Maybe I do same thing as kafka, accumulator.ready return the min wait time of each bucket, and then sender suspends with Thread.sleep(nextReadyCheckDelayMs) and doesn't signal by consumer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean why notify for each record? Not when a batch is ready? It will result in busy loop that the sender doesn't wait at all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Fluss sender will be in busy loop when is in low traffic
2 participants