diff --git a/spark/client/src/main/java/zingg/spark/client/SparkClient.java b/spark/client/src/main/java/zingg/spark/client/SparkClient.java index ae61414c4..e4d90b805 100644 --- a/spark/client/src/main/java/zingg/spark/client/SparkClient.java +++ b/spark/client/src/main/java/zingg/spark/client/SparkClient.java @@ -1,5 +1,6 @@ package zingg.spark.client; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -9,6 +10,7 @@ import zingg.common.client.Client; import zingg.common.client.ClientOptions; import zingg.common.client.IArguments; +import zingg.common.client.IZingg; import zingg.common.client.ZinggClientException; import zingg.common.client.util.PipeUtilBase; import zingg.spark.client.util.SparkPipeUtil; @@ -79,6 +81,11 @@ public SparkSession getSession() { .builder() .appName("Zingg") .getOrCreate(); + JavaSparkContext ctx = JavaSparkContext.fromSparkContext(session.sparkContext()); + JavaSparkContext.jarOfClass(IZingg.class); + LOG.debug("Context " + ctx.toString()); + //initHashFns(); + ctx.setCheckpointDir("/tmp/checkpoint"); setSession(s); return s; } diff --git a/spark/core/src/main/java/zingg/spark/core/context/ZinggSparkContext.java b/spark/core/src/main/java/zingg/spark/core/context/ZinggSparkContext.java index 8d9987426..07e57d652 100644 --- a/spark/core/src/main/java/zingg/spark/core/context/ZinggSparkContext.java +++ b/spark/core/src/main/java/zingg/spark/core/context/ZinggSparkContext.java @@ -35,6 +35,7 @@ public class ZinggSparkContext extends Context, Row,C public static final Log LOG = LogFactory.getLog(ZinggSparkContext.class); +<<<<<<< HEAD public void initSessionAndContext(SparkSession session) throws ZinggClientException { try{ @@ -59,6 +60,14 @@ public void initSessionAndContext(SparkSession session) if (LOG.isDebugEnabled()) e.printStackTrace(); throw new ZinggClientException(e.getMessage()); } +======= + @Override + public void init(SparkSession session) + throws ZinggClientException { + this.session = session; + setUtils(); + +>>>>>>> 622a907d (init changes to spark) } @Override