From 1bd58dfd0981e149e99dbe0fb2752f9f6c045a68 Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Thu, 17 Aug 2023 12:44:53 +0200 Subject: [PATCH 1/3] Closes #1427: Citation matching failed due to an executor exceeding memory limits Introducing new `citationmatchingDirectSparkExecutorOverhead` parameter set to `2048` by default. --- .../wf/citationmatching/direct/oozie_app/workflow.xml | 5 +++++ .../iis/wf/primary/processing/oozie_app/workflow.xml | 9 +++++++++ 2 files changed, 14 insertions(+) diff --git a/iis-wf/iis-wf-citationmatching-direct/src/main/resources/eu/dnetlib/iis/wf/citationmatching/direct/oozie_app/workflow.xml b/iis-wf/iis-wf-citationmatching-direct/src/main/resources/eu/dnetlib/iis/wf/citationmatching/direct/oozie_app/workflow.xml index c570f4f70..e55ea0cfe 100644 --- a/iis-wf/iis-wf-citationmatching-direct/src/main/resources/eu/dnetlib/iis/wf/citationmatching/direct/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-citationmatching-direct/src/main/resources/eu/dnetlib/iis/wf/citationmatching/direct/oozie_app/workflow.xml @@ -35,6 +35,10 @@ sparkExecutorCores number of cores used by single executor + + sparkExecutorOverhead + The amount of off heap memory (in megabytes) to be allocated for the executor + oozieActionShareLibForSpark2 oozie action sharelib for spark 2.* @@ -93,6 +97,7 @@ --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.yarn.executor.memoryOverhead=${sparkExecutorOverhead} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/processing/oozie_app/workflow.xml b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/processing/oozie_app/workflow.xml index ff313ca97..7cb09eb29 100644 --- a/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/processing/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-primary/src/main/resources/eu/dnetlib/iis/wf/primary/processing/oozie_app/workflow.xml @@ -276,6 +276,11 @@ ${sparkExecutorMemory} memory for individual executor + + citationmatchingDirectSparkExecutorOverhead + 2048 + The amount of off heap memory (in megabytes) to be allocated for the executor + citationmatchingDirectSparkExecutorCores ${sparkExecutorCores} @@ -1657,6 +1662,10 @@ sparkExecutorMemory ${citationmatchingDirectSparkExecutorMemory} + + sparkExecutorOverhead + ${citationmatchingDirectSparkExecutorOverhead} + sparkExecutorCores ${citationmatchingDirectSparkExecutorCores} From 363dfa1778d3a4f99017dbc122e617cbc3f0a21b Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Fri, 18 Aug 2023 17:18:17 +0200 Subject: [PATCH 2/3] Closes #1427: Citation matching failed due to an executor exceeding memory limits Fixing fuzzy citation matching failure due to an executor exceeding memory limits by propagating `sparkExecutorOverhead` to the `citation-matching-input-transformer` spark action. Until now it was propagated to `citation-matching` action only. --- .../dnetlib/iis/wf/citationmatching/fuzzy/oozie_app/workflow.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/iis-wf/iis-wf-citationmatching/src/main/resources/eu/dnetlib/iis/wf/citationmatching/fuzzy/oozie_app/workflow.xml b/iis-wf/iis-wf-citationmatching/src/main/resources/eu/dnetlib/iis/wf/citationmatching/fuzzy/oozie_app/workflow.xml index 12a8ba447..a8ab9ac0d 100644 --- a/iis-wf/iis-wf-citationmatching/src/main/resources/eu/dnetlib/iis/wf/citationmatching/fuzzy/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-citationmatching/src/main/resources/eu/dnetlib/iis/wf/citationmatching/fuzzy/oozie_app/workflow.xml @@ -115,6 +115,7 @@ --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.yarn.executor.memoryOverhead=${sparkExecutorOverhead} --conf spark.extraListeners=${spark1ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark1SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark1YarnHistoryServerAddress} From c4ff3fdc69b6ace09f11c72e29729ce8c6f3b62a Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Fri, 18 Aug 2023 17:22:53 +0200 Subject: [PATCH 3/3] Closes #1428: Fuzzy citation matching input and output transformers do not remove output dir at startup --- .../input/CitationMatchingInputTransformerJob.java | 6 +++++- .../output/CitationMatchingOutputTransformerJob.java | 7 ++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/iis-wf/iis-wf-citationmatching/src/main/java/eu/dnetlib/iis/wf/citationmatching/input/CitationMatchingInputTransformerJob.java b/iis-wf/iis-wf-citationmatching/src/main/java/eu/dnetlib/iis/wf/citationmatching/input/CitationMatchingInputTransformerJob.java index eaccbe580..d332c18bc 100644 --- a/iis-wf/iis-wf-citationmatching/src/main/java/eu/dnetlib/iis/wf/citationmatching/input/CitationMatchingInputTransformerJob.java +++ b/iis-wf/iis-wf-citationmatching/src/main/java/eu/dnetlib/iis/wf/citationmatching/input/CitationMatchingInputTransformerJob.java @@ -1,5 +1,6 @@ package eu.dnetlib.iis.wf.citationmatching.input; +import java.io.IOException; import java.util.Collections; import org.apache.commons.lang3.StringUtils; @@ -17,6 +18,7 @@ import eu.dnetlib.iis.citationmatching.schemas.DocumentMetadata; import eu.dnetlib.iis.common.WorkflowRuntimeParameters; import eu.dnetlib.iis.common.citations.schemas.Citation; +import eu.dnetlib.iis.common.java.io.HdfsUtils; import eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal; import pl.edu.icm.sparkutils.avro.SparkAvroLoader; import pl.edu.icm.sparkutils.avro.SparkAvroSaver; @@ -37,7 +39,7 @@ public class CitationMatchingInputTransformerJob { //------------------------ LOGIC -------------------------- - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) throws InterruptedException, IOException { CitationMatchingInputTransformerJobParameters params = new CitationMatchingInputTransformerJobParameters(); JCommander jcommander = new JCommander(params); @@ -48,6 +50,8 @@ public static void main(String[] args) throws InterruptedException { conf.set("spark.kryo.registrator", "pl.edu.icm.sparkutils.avro.AvroCompatibleKryoRegistrator"); try (JavaSparkContext sc = new JavaSparkContext(conf)) { + + HdfsUtils.remove(sc.hadoopConfiguration(), params.output); JavaRDD inputDocuments = avroLoader.loadJavaRDD(sc, params.inputMetadata, ExtractedDocumentMetadataMergedWithOriginal.class); diff --git a/iis-wf/iis-wf-citationmatching/src/main/java/eu/dnetlib/iis/wf/citationmatching/output/CitationMatchingOutputTransformerJob.java b/iis-wf/iis-wf-citationmatching/src/main/java/eu/dnetlib/iis/wf/citationmatching/output/CitationMatchingOutputTransformerJob.java index 65ce1fa2f..7b75af85d 100644 --- a/iis-wf/iis-wf-citationmatching/src/main/java/eu/dnetlib/iis/wf/citationmatching/output/CitationMatchingOutputTransformerJob.java +++ b/iis-wf/iis-wf-citationmatching/src/main/java/eu/dnetlib/iis/wf/citationmatching/output/CitationMatchingOutputTransformerJob.java @@ -1,5 +1,7 @@ package eu.dnetlib.iis.wf.citationmatching.output; +import java.io.IOException; + import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -9,6 +11,7 @@ import com.beust.jcommander.Parameters; import eu.dnetlib.iis.citationmatching.schemas.Citation; +import eu.dnetlib.iis.common.java.io.HdfsUtils; import pl.edu.icm.sparkutils.avro.SparkAvroLoader; import pl.edu.icm.sparkutils.avro.SparkAvroSaver; @@ -30,7 +33,7 @@ public class CitationMatchingOutputTransformerJob { //------------------------ LOGIC -------------------------- - public static void main(String[] args) { + public static void main(String[] args) throws IOException { CitationMatchingOutputTransformerJobParameters params = new CitationMatchingOutputTransformerJobParameters(); JCommander jcommander = new JCommander(params); @@ -41,6 +44,8 @@ public static void main(String[] args) { try (JavaSparkContext sc = new JavaSparkContext(conf)) { + HdfsUtils.remove(sc.hadoopConfiguration(), params.output); + JavaRDD inputCitations = avroLoader.loadJavaRDD(sc, params.input, Citation.class);