Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce two citation matching related fixes for issues discovered during the last provision round on BETA #1429

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>sparkExecutorOverhead</name>
<description>The amount of off heap memory (in megabytes) to be allocated for the executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
Expand Down Expand Up @@ -93,6 +97,7 @@
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.yarn.executor.memoryOverhead=${sparkExecutorOverhead}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package eu.dnetlib.iis.wf.citationmatching.input;

import java.io.IOException;
import java.util.Collections;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -17,6 +18,7 @@
import eu.dnetlib.iis.citationmatching.schemas.DocumentMetadata;
import eu.dnetlib.iis.common.WorkflowRuntimeParameters;
import eu.dnetlib.iis.common.citations.schemas.Citation;
import eu.dnetlib.iis.common.java.io.HdfsUtils;
import eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal;
import pl.edu.icm.sparkutils.avro.SparkAvroLoader;
import pl.edu.icm.sparkutils.avro.SparkAvroSaver;
Expand All @@ -37,7 +39,7 @@ public class CitationMatchingInputTransformerJob {

//------------------------ LOGIC --------------------------

public static void main(String[] args) throws InterruptedException {
public static void main(String[] args) throws InterruptedException, IOException {

CitationMatchingInputTransformerJobParameters params = new CitationMatchingInputTransformerJobParameters();
JCommander jcommander = new JCommander(params);
Expand All @@ -48,6 +50,8 @@ public static void main(String[] args) throws InterruptedException {
conf.set("spark.kryo.registrator", "pl.edu.icm.sparkutils.avro.AvroCompatibleKryoRegistrator");

try (JavaSparkContext sc = new JavaSparkContext(conf)) {

HdfsUtils.remove(sc.hadoopConfiguration(), params.output);

JavaRDD<ExtractedDocumentMetadataMergedWithOriginal> inputDocuments = avroLoader.loadJavaRDD(sc, params.inputMetadata, ExtractedDocumentMetadataMergedWithOriginal.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package eu.dnetlib.iis.wf.citationmatching.output;

import java.io.IOException;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -9,6 +11,7 @@
import com.beust.jcommander.Parameters;

import eu.dnetlib.iis.citationmatching.schemas.Citation;
import eu.dnetlib.iis.common.java.io.HdfsUtils;
import pl.edu.icm.sparkutils.avro.SparkAvroLoader;
import pl.edu.icm.sparkutils.avro.SparkAvroSaver;

Expand All @@ -30,7 +33,7 @@ public class CitationMatchingOutputTransformerJob {

//------------------------ LOGIC --------------------------

public static void main(String[] args) {
public static void main(String[] args) throws IOException {

CitationMatchingOutputTransformerJobParameters params = new CitationMatchingOutputTransformerJobParameters();
JCommander jcommander = new JCommander(params);
Expand All @@ -41,6 +44,8 @@ public static void main(String[] args) {

try (JavaSparkContext sc = new JavaSparkContext(conf)) {

HdfsUtils.remove(sc.hadoopConfiguration(), params.output);

JavaRDD<Citation> inputCitations = avroLoader.loadJavaRDD(sc, params.input, Citation.class);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.yarn.executor.memoryOverhead=${sparkExecutorOverhead}
--conf spark.extraListeners=${spark1ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark1SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark1YarnHistoryServerAddress}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@
<value>${sparkExecutorMemory}</value>
<description>memory for individual executor</description>
</property>
<property>
<name>citationmatchingDirectSparkExecutorOverhead</name>
<value>2048</value>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this value has to be specified both here and in the config-default.template. And to they have to be always kept in sync?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in line with other *SparkExecutorOverhead properties defined in workflow.xml file and it does not require keeping in sync values declared in workflow.xml file with the ones from the default-config.xml files.

I think this duplication was mostly kept as safety measure: if the value is not declared in default-config.xml it will be taken from the workflow.xml as a default one. It is quite useful when performing development runs (no default-config.xml involved) but I agree it does not look well having some properties defined in two places therefore I have created a dedicated issue to consider duplicates removal: #1430.

<description>The amount of off heap memory (in megabytes) to be allocated for the executor</description>
</property>
<property>
<name>citationmatchingDirectSparkExecutorCores</name>
<value>${sparkExecutorCores}</value>
Expand Down Expand Up @@ -1657,6 +1662,10 @@
<name>sparkExecutorMemory</name>
<value>${citationmatchingDirectSparkExecutorMemory}</value>
</property>
<property>
<name>sparkExecutorOverhead</name>
<value>${citationmatchingDirectSparkExecutorOverhead}</value>
</property>
<property>
<name>sparkExecutorCores</name>
<value>${citationmatchingDirectSparkExecutorCores}</value>
Expand Down