diff --git a/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java b/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java index 0ddeb45c..ccbb71a1 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/memory/LazyMemorySegmentPool.java @@ -24,7 +24,6 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; - import java.io.Closeable; import java.io.EOFException; import java.io.IOException; @@ -67,6 +66,7 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable { private int pageUsage; + @VisibleForTesting LazyMemorySegmentPool(int maxPages, int pageSize, long maxTimeToBlockMs) { checkArgument(maxPages > 0, "MaxPages for LazyMemorySegmentPool should be greater than 0."); checkArgument( diff --git a/fluss-common/src/test/java/com/alibaba/fluss/memory/LazyMemorySegmentPoolTest.java b/fluss-common/src/test/java/com/alibaba/fluss/memory/LazyMemorySegmentPoolTest.java index eaf36fd8..921a056f 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/memory/LazyMemorySegmentPoolTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/memory/LazyMemorySegmentPoolTest.java @@ -32,9 +32,20 @@ /** Test for {@link com.alibaba.fluss.memory.LazyMemorySegmentPool}. */ public class LazyMemorySegmentPoolTest { + @Test + void testAutoCloseShouldClearCachedPages() throws Exception { + LazyMemorySegmentPool closedInstance; + try (LazyMemorySegmentPool autoCloseable = buildLazyMemorySegmentSource(10, 64, 100)) { + autoCloseable.nextSegment(); + assertThat(autoCloseable.getAllCachePages().size()).isGreaterThan(0); + closedInstance = autoCloseable; + } + assertThat(closedInstance.getAllCachePages()).isEmpty(); + } + @Test void testNextSegmentWaiter() throws Exception { - LazyMemorySegmentPool source = buildLazyMemorySegmentSource(10, 64); + LazyMemorySegmentPool source = buildLazyMemorySegmentSource(10, 64, 100); assertThat(source.pageSize()).isEqualTo(64); assertThat(source.freePages()).isEqualTo(10); @@ -64,20 +75,20 @@ void testNextSegmentWaiter() throws Exception { @Test void testIllegalArgument() { - assertThatThrownBy(() -> buildLazyMemorySegmentSource(0, 64)) + assertThatThrownBy(() -> buildLazyMemorySegmentSource(0, 64, 100)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("MaxPages for LazyMemorySegmentPool should be greater than 0."); - assertThatThrownBy(() -> buildLazyMemorySegmentSource(10, 32 * 1024 * 1024)) + assertThatThrownBy(() -> buildLazyMemorySegmentSource(10, 32 * 1024 * 1024, 100)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( "Page size should be less than PER_REQUEST_MEMORY_SIZE. " + "Page size is: 32768 KB, PER_REQUEST_MEMORY_SIZE is 16384 KB."); - assertThatThrownBy(() -> buildLazyMemorySegmentSource(10, 30)) + assertThatThrownBy(() -> buildLazyMemorySegmentSource(10, 30, 100)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( "Page size should be greater than 64 bytes to include the record batch header, but is 30 bytes."); - LazyMemorySegmentPool lazyMemorySegmentPool = buildLazyMemorySegmentSource(10, 100); + LazyMemorySegmentPool lazyMemorySegmentPool = buildLazyMemorySegmentSource(10, 100, 100); assertThatThrownBy( () -> lazyMemorySegmentPool.returnAll( @@ -87,8 +98,9 @@ void testIllegalArgument() { .hasMessage("Return too more memories."); } - private LazyMemorySegmentPool buildLazyMemorySegmentSource(int maxPages, int pageSize) { - return new LazyMemorySegmentPool(maxPages, pageSize, 100); + private LazyMemorySegmentPool buildLazyMemorySegmentSource( + int maxPages, int pageSize, int maxTimeToBlockMs) { + return new LazyMemorySegmentPool(maxPages, pageSize, maxTimeToBlockMs); } private CountDownLatch asyncReturnAll(