diff --git a/fe/fe-core/src/test/java/com/starrocks/statistic/CacheRelaxDictManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/statistic/CacheRelaxDictManagerTest.java index 1ee06639fed0d..d92742be6798e 100644 --- a/fe/fe-core/src/test/java/com/starrocks/statistic/CacheRelaxDictManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/statistic/CacheRelaxDictManagerTest.java @@ -37,13 +37,16 @@ import com.starrocks.utframe.UtFrameUtils; import mockit.Mock; import mockit.MockUp; +import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TCompactProtocol; import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -53,15 +56,18 @@ import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; public class CacheRelaxDictManagerTest { + @ClassRule + public static TemporaryFolder temp = new TemporaryFolder(); private static ConnectContext ctx; @BeforeClass public static void beforeClass() throws Exception { UtFrameUtils.createMinStarRocksCluster(); ctx = UtFrameUtils.createDefaultCtx(); - ConnectorPlanTestBase.mockHiveCatalog(ctx); + ConnectorPlanTestBase.mockAllCatalogs(ctx, temp.newFolder().toURI().toString()); } @After @@ -116,10 +122,31 @@ public Pair, Status> executeStmtWithExecPlan(ConnectContext c Assert.assertTrue(optionalColumnDict.isEmpty()); } + @Test + public void testLoaderError() throws ExecutionException, InterruptedException { + AsyncLoadingCache> dictStatistics = Caffeine.newBuilder() + .maximumSize(Config.statistic_dict_columns) + .buildAsync(new CacheRelaxDictManager.DictLoader()); + Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable("hive0", + "tpch", "part"); + String tableUUID = table.getUUID(); + ConnectorTableColumnKey key = new ConnectorTableColumnKey(tableUUID, "p_mfgr"); + new MockUp() { + @Mock + public Pair, Status> executeStmtWithExecPlan(ConnectContext context, ExecPlan plan) { + return new Pair<>(new ArrayList<>(), new Status(TStatusCode.INTERNAL_ERROR, "some error")); + } + }; + CompletableFuture> future = dictStatistics.get(key); + Optional optionalColumnDict = future.get(); + Assert.assertTrue(optionalColumnDict.isEmpty()); + } + @Test public void testLoaderDeserialize() throws ExecutionException, InterruptedException { AsyncLoadingCache> dictStatistics = Caffeine.newBuilder() .maximumSize(Config.statistic_dict_columns) + .refreshAfterWrite(1, TimeUnit.SECONDS) .buildAsync(new CacheRelaxDictManager.DictLoader()); Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable("hive0", "tpch", "part"); @@ -136,6 +163,8 @@ public Pair, Status> executeStmtWithExecPlan(ConnectContext c Optional optionalColumnDict = future.get(); Assert.assertFalse(optionalColumnDict.isEmpty()); Assert.assertEquals(100, optionalColumnDict.get().getDictSize()); + Thread.sleep(2000); + Assert.assertTrue(dictStatistics.get(key).get().isPresent()); } @Test @@ -173,4 +202,56 @@ public void testHasGlobalDict() { Assert.assertTrue(IRelaxDictManager.getInstance().hasGlobalDict(tableUUID, "p_mfgr")); } + @Test + public void testGlobalDict() throws InterruptedException, TException { + Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable("hive0", + "tpch", "part"); + String tableUUID = table.getUUID(); + // clear + String columnName = "p_mfgr"; + IRelaxDictManager manager = IRelaxDictManager.getInstance(); + manager.removeGlobalDict(tableUUID, columnName); + Assert.assertTrue(manager.hasGlobalDict(tableUUID, columnName)); + new MockUp() { + @Mock + public Pair, Status> executeStmtWithExecPlan(ConnectContext context, ExecPlan plan) + throws TException { + return new Pair<>(List.of(generateDictResult(100)), new Status(TStatusCode.OK, "ok")); + } + }; + int retry = 5; + // wait for loading + while (manager.getGlobalDict(tableUUID, columnName).isEmpty() && retry > 0) { + retry--; + Thread.sleep(1000); + } + if (manager.getGlobalDict(tableUUID, columnName).isPresent()) { + ColumnDict columnDict = manager.getGlobalDict(tableUUID, columnName).get(); + Assert.assertEquals(100, columnDict.getDictSize()); + TResultBatch resultBatch = generateDictResult(101); + + TDeserializer deserializer = new TDeserializer(new TCompactProtocol.Factory()); + ByteBuffer buffer = resultBatch.rows.get(0); + TStatisticData sd = new TStatisticData(); + byte[] bytes = new byte[buffer.limit() - buffer.position()]; + buffer.get(bytes); + deserializer.deserialize(sd, bytes); + + manager.updateGlobalDict(tableUUID, columnName, Optional.of(sd)); + Optional optional = manager.getGlobalDict(tableUUID, columnName); + Assert.assertTrue(optional.isPresent()); + Assert.assertEquals(101, optional.get().getDictSize()); + TStatisticData nullDict = new TStatisticData(); + manager.updateGlobalDict(tableUUID, columnName, Optional.of(nullDict)); + optional = manager.getGlobalDict(tableUUID, columnName); + Assert.assertTrue(optional.isPresent()); + Assert.assertEquals(101, optional.get().getDictSize()); + Assert.assertEquals(1, + ((CacheRelaxDictManager) manager).estimateCount().get("ExternalTableColumnDict").intValue()); + Assert.assertEquals(1, ((CacheRelaxDictManager) manager).getSamples().get(0).first.size()); + + manager.removeGlobalDict(tableUUID, columnName); + } + } + }