Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] spark BatchScan ArrayIndexOutOfBoundsException #4823

Closed
2 tasks done
kwontaeheon opened this issue Jan 3, 2025 · 5 comments
Closed
2 tasks done

[Bug] spark BatchScan ArrayIndexOutOfBoundsException #4823

kwontaeheon opened this issue Jan 3, 2025 · 5 comments
Labels
bug Something isn't working

Comments

@kwontaeheon
Copy link

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

1.1-SNAPSHOT

Compute Engine

spark

Minimal reproduce step

It is not reproducible by this short query because of too much cdc inserted records.
But the logic is same.

create table paimon.default.t_test (
  k string not null, 
  v array<string>,
 PRIMARY KEY (`k`) NOT ENFORCED
)
WITH (
    'deletion-vectors.enabled' = 'true',
    'manifest.merge-min-count' = '60',
    'write-buffer-size' = '512 mb',
    'write-buffer-spillable' = 'true',
    'changelog-producer' = 'lookup',
    'changelog.time-retained' = '1 h',
    'snapshot.time-retained' = '2 h',
    'file.format' = 'parquet',
    'file.block-size' = '256 mb',
    'dynamic-bucket.target-row-num' = '6000000',
    'dynamic-bucket.initial-buckets' = '10',
    'tag.automatic-completion' = 'true',
    'tag.automatic-creation' = 'process-time',
    'tag.creation-period' = 'daily',
    'tag.num-retained-max' = '180'
)
;


-- belows are example of CDC ingestion
insert into paimon.default.t_test
values ('1', array('1', '2'));

insert into paimon.default.t_test
values ('2', cast(null as array<string>));

insert into paimon.default.t_test
values ('3', array());

delete from paimon.default.t_test
where k = '3';


-- Query from spark fails with error, but from Flink it works fine.
select *
from paimon.default.t_test
where array_contains(v, '1')
;
error log

Py4JJavaError: An error occurred while calling o512.showString.
: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: 12638039
at org.apache.paimon.utils.ThreadPoolUtils$2.next(ThreadPoolUtils.java:167)
at org.apache.paimon.utils.ThreadPoolUtils$2.next(ThreadPoolUtils.java:153)
at org.apache.paimon.shade.guava30.com.google.common.collect.Iterators$ConcatenatedIterator.hasNext(Iterators.java:1333)
at org.apache.paimon.utils.ThreadPoolUtils$1.advanceIfNeeded(ThreadPoolUtils.java:107)
at org.apache.paimon.utils.ThreadPoolUtils$1.hasNext(ThreadPoolUtils.java:90)
at org.apache.paimon.manifest.FileEntry.mergeEntries(FileEntry.java:126)
at org.apache.paimon.manifest.FileEntry.mergeEntries(FileEntry.java:112)
at org.apache.paimon.operation.AbstractFileStoreScan.readAndMergeFileEntries(AbstractFileStoreScan.java:379)
at org.apache.paimon.operation.AbstractFileStoreScan.doPlan(AbstractFileStoreScan.java:284)
at org.apache.paimon.operation.AbstractFileStoreScan.plan(AbstractFileStoreScan.java:225)
at org.apache.paimon.table.source.snapshot.SnapshotReaderImpl.read(SnapshotReaderImpl.java:259)
at org.apache.paimon.table.source.snapshot.FullStartingScanner.scan(FullStartingScanner.java:54)
at org.apache.paimon.table.source.DataTableBatchScan.plan(DataTableBatchScan.java:76)
at org.apache.paimon.spark.PaimonBaseScan.getOriginSplits(PaimonBaseScan.scala:101)
at org.apache.paimon.spark.PaimonBaseScan.lazyInputPartitions(PaimonBaseScan.scala:110)
at org.apache.paimon.spark.PaimonBaseScan.toBatch(PaimonBaseScan.scala:121)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:45)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:45)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputPartitions$lzycompute(BatchScanExec.scala:59)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputPartitions(BatchScanExec.scala:59)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:179)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:175)
at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.supportsColumnar(BatchScanExec.scala:36)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:147)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:496)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:171)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:164)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:186)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:186)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:179)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:238)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:284)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:252)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:117)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
at sun.reflect.GeneratedMethodAccessor105.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: 12638039
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.paimon.utils.ThreadPoolUtils$2.next(ThreadPoolUtils.java:162)
... 142 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 12638039
at org.apache.paimon.memory.MemorySegmentUtils.getIntMultiSegments(MemorySegmentUtils.java:683)
at org.apache.paimon.memory.MemorySegmentUtils.getInt(MemorySegmentUtils.java:673)
at org.apache.paimon.data.BinaryArray.pointTo(BinaryArray.java:127)
at org.apache.paimon.memory.MemorySegmentUtils.readArrayData(MemorySegmentUtils.java:1140)
at org.apache.paimon.data.BinaryRow.getArray(BinaryRow.java:337)
at org.apache.paimon.utils.InternalRowUtils.get(InternalRowUtils.java:187)
at org.apache.paimon.predicate.LeafPredicate.test(LeafPredicate.java:100)
at org.apache.paimon.operation.KeyValueFileStoreScan.filterByStats(KeyValueFileStoreScan.java:130)
at org.apache.paimon.operation.AbstractFileStoreScan.filterUnmergedManifestEntry(AbstractFileStoreScan.java:457)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.paimon.operation.AbstractFileStoreScan.lambda$readAndMergeFileEntries$5(AbstractFileStoreScan.java:375)
at org.apache.paimon.utils.ThreadPoolUtils.lambda$randomlyExecute$2(ThreadPoolUtils.java:144)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more

  • Method avoiding the error
    • Before query, I had to cach table on spark then reading works well.
    • caching sql : CACHE LAZY TABLE paimon.default.t_test

What doesn't meet your expectations?

Reading from spark using functions array_contains or explode, this must work well because reading from flink doesn't throw error.

Anything else?

I love this project.
Please give me any advise.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@kwontaeheon kwontaeheon added the bug Something isn't working label Jan 3, 2025
@Zouxxyy
Copy link
Contributor

Zouxxyy commented Jan 3, 2025

@kwontaeheon Thanks for pointing it out, can you tell the specific version, from the line number in the stack, I feel it is an older version.

@kwontaeheon
Copy link
Author

@Zouxxyy Thanks for comment.
I made the table using paimon 0.9.0 but updated only client to 1.1-SNAPSHOT.
Do I need to migrate table when paimon version upgraded?

@kwontaeheon
Copy link
Author

Oh the engine was mixed.
I made the table using paimon-flink-1.20:0.9.0.
Querying using paimon-spark-3.5:1.1-SNAPSHOT

@Zouxxyy
Copy link
Contributor

Zouxxyy commented Jan 6, 2025

Oh the engine was mixed. I made the table using paimon-flink-1.20:0.9.0. Querying using paimon-spark-3.5:1.1-SNAPSHOT

0.9 and 1.1-snapshot are compatible, but I can't align this line in 1.1-snapshot. Could you please confirm this again?

at org.apache.paimon.operation.KeyValueFileStoreScan.filterByStats(KeyValueFileStoreScan.java:130)

@kwontaeheon
Copy link
Author

@Zouxxyy Sorry for the confusing.
There was a fat jar in my $SPARK_HOME/jars using paimon-spark-0.9.0.
After updating it to paimon-spark-1.1-SNAPSHOT the error is gone.
Thanks for your comment!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants