Skip to content

Commit

Permalink
add more ut
Browse files Browse the repository at this point in the history
Signed-off-by: zombee0 <[email protected]>
  • Loading branch information
zombee0 committed Jan 22, 2025
1 parent 330c3c8 commit 7446b55
Showing 1 changed file with 82 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
import com.starrocks.utframe.UtFrameUtils;
import mockit.Mock;
import mockit.MockUp;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand All @@ -53,15 +56,18 @@
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class CacheRelaxDictManagerTest {
@ClassRule
public static TemporaryFolder temp = new TemporaryFolder();
private static ConnectContext ctx;

@BeforeClass
public static void beforeClass() throws Exception {
UtFrameUtils.createMinStarRocksCluster();
ctx = UtFrameUtils.createDefaultCtx();
ConnectorPlanTestBase.mockHiveCatalog(ctx);
ConnectorPlanTestBase.mockAllCatalogs(ctx, temp.newFolder().toURI().toString());
}

@After
Expand Down Expand Up @@ -116,10 +122,31 @@ public Pair<List<TResultBatch>, Status> executeStmtWithExecPlan(ConnectContext c
Assert.assertTrue(optionalColumnDict.isEmpty());
}

@Test
public void testLoaderError() throws ExecutionException, InterruptedException {
AsyncLoadingCache<ConnectorTableColumnKey, Optional<ColumnDict>> dictStatistics = Caffeine.newBuilder()
.maximumSize(Config.statistic_dict_columns)
.buildAsync(new CacheRelaxDictManager.DictLoader());
Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable("hive0",
"tpch", "part");
String tableUUID = table.getUUID();
ConnectorTableColumnKey key = new ConnectorTableColumnKey(tableUUID, "p_mfgr");
new MockUp<StmtExecutor>() {
@Mock
public Pair<List<TResultBatch>, Status> executeStmtWithExecPlan(ConnectContext context, ExecPlan plan) {
return new Pair<>(new ArrayList<>(), new Status(TStatusCode.INTERNAL_ERROR, "some error"));
}
};
CompletableFuture<Optional<ColumnDict>> future = dictStatistics.get(key);
Optional<ColumnDict> optionalColumnDict = future.get();
Assert.assertTrue(optionalColumnDict.isEmpty());
}

@Test
public void testLoaderDeserialize() throws ExecutionException, InterruptedException {
AsyncLoadingCache<ConnectorTableColumnKey, Optional<ColumnDict>> dictStatistics = Caffeine.newBuilder()
.maximumSize(Config.statistic_dict_columns)
.refreshAfterWrite(1, TimeUnit.SECONDS)
.buildAsync(new CacheRelaxDictManager.DictLoader());
Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable("hive0",
"tpch", "part");
Expand All @@ -136,6 +163,8 @@ public Pair<List<TResultBatch>, Status> executeStmtWithExecPlan(ConnectContext c
Optional<ColumnDict> optionalColumnDict = future.get();
Assert.assertFalse(optionalColumnDict.isEmpty());
Assert.assertEquals(100, optionalColumnDict.get().getDictSize());
Thread.sleep(2000);
Assert.assertTrue(dictStatistics.get(key).get().isPresent());
}

@Test
Expand Down Expand Up @@ -173,4 +202,56 @@ public void testHasGlobalDict() {
Assert.assertTrue(IRelaxDictManager.getInstance().hasGlobalDict(tableUUID, "p_mfgr"));
}

@Test
public void testGlobalDict() throws InterruptedException, TException {
Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable("hive0",
"tpch", "part");
String tableUUID = table.getUUID();
// clear
String columnName = "p_mfgr";
IRelaxDictManager manager = IRelaxDictManager.getInstance();
manager.removeGlobalDict(tableUUID, columnName);
Assert.assertTrue(manager.hasGlobalDict(tableUUID, columnName));
new MockUp<StmtExecutor>() {
@Mock
public Pair<List<TResultBatch>, Status> executeStmtWithExecPlan(ConnectContext context, ExecPlan plan)
throws TException {
return new Pair<>(List.of(generateDictResult(100)), new Status(TStatusCode.OK, "ok"));
}
};
int retry = 5;
// wait for loading
while (manager.getGlobalDict(tableUUID, columnName).isEmpty() && retry > 0) {
retry--;
Thread.sleep(1000);
}
if (manager.getGlobalDict(tableUUID, columnName).isPresent()) {
ColumnDict columnDict = manager.getGlobalDict(tableUUID, columnName).get();
Assert.assertEquals(100, columnDict.getDictSize());
TResultBatch resultBatch = generateDictResult(101);

TDeserializer deserializer = new TDeserializer(new TCompactProtocol.Factory());
ByteBuffer buffer = resultBatch.rows.get(0);
TStatisticData sd = new TStatisticData();
byte[] bytes = new byte[buffer.limit() - buffer.position()];
buffer.get(bytes);
deserializer.deserialize(sd, bytes);

manager.updateGlobalDict(tableUUID, columnName, Optional.of(sd));
Optional<ColumnDict> optional = manager.getGlobalDict(tableUUID, columnName);
Assert.assertTrue(optional.isPresent());
Assert.assertEquals(101, optional.get().getDictSize());
TStatisticData nullDict = new TStatisticData();
manager.updateGlobalDict(tableUUID, columnName, Optional.of(nullDict));
optional = manager.getGlobalDict(tableUUID, columnName);
Assert.assertTrue(optional.isPresent());
Assert.assertEquals(101, optional.get().getDictSize());
Assert.assertEquals(1,
((CacheRelaxDictManager) manager).estimateCount().get("ExternalTableColumnDict").intValue());
Assert.assertEquals(1, ((CacheRelaxDictManager) manager).getSamples().get(0).first.size());

manager.removeGlobalDict(tableUUID, columnName);
}
}

}

0 comments on commit 7446b55

Please sign in to comment.