diff --git a/common/client/src/main/java/zingg/common/client/util/DSUtil.java b/common/client/src/main/java/zingg/common/client/util/DSUtil.java index f8d2f8108..7f6ab851a 100644 --- a/common/client/src/main/java/zingg/common/client/util/DSUtil.java +++ b/common/client/src/main/java/zingg/common/client/util/DSUtil.java @@ -113,7 +113,7 @@ public ZFrame joinWithItself(ZFrame lines, String joinColumn, public ZFrame joinWithItselfSourceSensitive(ZFrame lines, String joinColumn, IArguments args) throws Exception { - ZFrame lines1 = getPrefixedColumnsDS(lines).cache(); + ZFrame lines1 = getPrefixedColumnsDS(lines); String[] sourceNames = args.getPipeNames(); lines = lines.filter(lines.equalTo(ColName.SOURCE_COL, sourceNames[0])); diff --git a/common/core/src/main/java/zingg/common/core/executor/Matcher.java b/common/core/src/main/java/zingg/common/core/executor/Matcher.java index 88a16cd10..31cc67482 100644 --- a/common/core/src/main/java/zingg/common/core/executor/Matcher.java +++ b/common/core/src/main/java/zingg/common/core/executor/Matcher.java @@ -48,9 +48,8 @@ public ZFrame getFieldDefColumnsDS(ZFrame testDataOriginal) { public ZFrame getBlocked( ZFrame testData) throws Exception, ZinggClientException{ LOG.debug("Blocking model file location is " + args.getBlockFile()); Tree> tree = getBlockingTreeUtil().readBlockingTree(args); - ZFrame blocked = getBlockingTreeUtil().getBlockHashes(testData, tree); - ZFrame blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)).cache(); - return blocked1; + ZFrame blocked = getBlockingTreeUtil().getBlockHashes(testData, tree); + return blocked; } public ZFrame getPairs(ZFrameblocked, ZFramebAll, IPairBuilder iPairBuilder) throws Exception{ @@ -83,7 +82,7 @@ protected ZFrame getActualDupes(ZFrame blocked, ZFrame test protected ZFrame getActualDupes(ZFrame blocked, ZFrame testData, IFilter predictionFilter, IPairBuilder iPairBuilder, PredictionColsSelector colsSelector) throws Exception, ZinggClientException{ - ZFrame blocks = getPairs(selectColsFromBlocked(blocked), testData, iPairBuilder); + ZFrame blocks = getPairs(blocked, testData, iPairBuilder); ZFramedupesActual = predictOnBlocks(blocks); ZFrame filteredData = predictionFilter.filter(dupesActual); if(colsSelector!=null) { @@ -97,9 +96,9 @@ public void execute() throws ZinggClientException { try { // read input, filter, remove self joins ZFrame testDataOriginal = getTestData(); - testDataOriginal = getFieldDefColumnsDS(testDataOriginal); + testDataOriginal = getFieldDefColumnsDS(testDataOriginal).cache(); ZFrame testData = getStopWords().preprocessForStopWords(testDataOriginal); - testData = testData.repartition(args.getNumPartitions(), testData.col(ColName.ID_COL)); + //testData = testData.repartition(args.getNumPartitions(), testData.col(ColName.ID_COL)); //testData = dropDuplicates(testData); long count = testData.count(); LOG.info("Read " + count); @@ -107,10 +106,6 @@ public void execute() throws ZinggClientException { ZFrameblocked = getBlocked(testData); LOG.info("Blocked "); - /*blocked = blocked.cache(); - blocked.withColumn("partition_id", functions.spark_partition_id()) - .groupBy("partition_id").agg(functions.count("z_zid")).as("zid").orderBy("partition_id").toJavaRDD().saveAsTextFile("/tmp/zblockedParts"); - */ if (LOG.isDebugEnabled()) { LOG.debug("Num distinct hashes " + blocked.select(ColName.HASH_COL).distinct().count()); blocked.show(); @@ -133,7 +128,7 @@ public void execute() throws ZinggClientException { - public void writeOutput( ZFrame blocked, ZFrame dupesActual) throws ZinggClientException { + public void writeOutput( ZFrame testDataOriginal, ZFrame dupesActual) throws ZinggClientException { try{ //input dupes are pairs ///pick ones according to the threshold by user @@ -141,7 +136,7 @@ public void writeOutput( ZFrame blocked, ZFrame dupesActual) th //all clusters consolidated in one place if (args.getOutput() != null) { - ZFrame graphWithScores = getOutput(blocked, dupesActual); + ZFrame graphWithScores = getOutput(testDataOriginal, dupesActual); getPipeUtil().write(graphWithScores, args.getOutput()); } } @@ -153,7 +148,7 @@ public void writeOutput( ZFrame blocked, ZFrame dupesActual) th - protected ZFrame getOutput(ZFrame blocked, ZFrame dupesActual) throws ZinggClientException, Exception { + protected ZFrame getOutput(ZFrame testDataOriginal, ZFrame dupesActual) throws ZinggClientException, Exception { //-1 is initial suggestion, 1 is add, 0 is deletion, 2 is unsure /*blocked = blocked.drop(ColName.HASH_COL); blocked = blocked.drop(ColName.SOURCE_COL); @@ -165,7 +160,7 @@ protected ZFrame getOutput(ZFrame blocked, ZFrame dup LOG.debug("dupes ------------"); dupesActual.show(); } - ZFramegraph = getGraphUtil().buildGraph(blocked, dupesActual).cache(); + ZFramegraph = getGraphUtil().buildGraph(testDataOriginal, dupesActual).cache(); //graph.toJavaRDD().saveAsTextFile("/tmp/zgraph"); if (LOG.isDebugEnabled()) { @@ -174,27 +169,15 @@ protected ZFrame getOutput(ZFrame blocked, ZFrame dup } //write score - ZFramescore = getMinMaxScores(dupesActual, graph).cache(); + ZFramescore = getMinMaxScores(dupesActual, graph); //score.toJavaRDD().coalesce(1).saveAsTextFile("/tmp/zallscoresAvg"); - graph = graph.repartition(args.getNumPartitions(), graph.col(ColName.ID_COL)).cache(); + graph = graph.repartition(args.getNumPartitions(), graph.col(ColName.ID_COL)); if (LOG.isDebugEnabled()) { score.show(); } ZFrame graphWithScores = getGraphWithScores(graph, score); - //graphWithScores.toJavaRDD().saveAsTextFile("/tmp/zgraphWScores"); - graphWithScores = graphWithScores.drop(ColName.HASH_COL); - graphWithScores = graphWithScores.drop(ColName.COL_PREFIX + ColName.ID_COL); - graphWithScores = graphWithScores.drop(ColName.ID_COL); - graphWithScores = graphWithScores.drop(ColName.SOURCE_COL); - /*String[] cols = graphWithScores.columns(); - List columns = new ArrayList(); - //columns.add(graphWithScores.col(ColName.CLUSTER_COLUMN)); - //go only upto the last col, which is cluster col - for (int i=0; i < cols.length - 1; ++i) { - columns.add(graphWithScores.col(cols[i])); - } - graphWithScores = getDSUtil().select(graphWithScores, columns); - */ + graphWithScores = graphWithScores.drop(ColName.HASH_COL, ColName.COL_PREFIX + ColName.ID_COL, ColName.ID_COL, ColName.SOURCE_COL); + return graphWithScores; } @@ -229,7 +212,7 @@ protected ZFrame getGraphWithScores(ZFrame graph, ZFrame graphPairsExtra = graphPairsFound.except(dupesWithIds); - ZFrame graphPairsExtrawithDummyScore = graphPairsExtra.withColumn(ColName.SCORE_COL, 0.0); + ZFrame graphPairsExtrawithDummyScore = graphPairsExtra.withColumn(ColName.SCORE_COL, 0.0).cache(); LOG.warn("graph pairs extra"); if (LOG.isDebugEnabled()) graphPairsExtra.show(500); @@ -244,19 +227,7 @@ protected ZFrame getGraphWithScores(ZFrame graph, ZFrames2RightCols = s2.toDF(ColName.SCORE_COL, ColName.ID_COL).cache(); ZFrameallScores = s2RightCols.union(s1); - //allScores.toJavaRDD().coalesce(1).saveAsTextFile("/tmp/zallscores"); - /*WindowSpec window = Window.partitionBy(ColName.ID_COL).orderBy(ColName.SCORE_COL); - //WindowSpec window = Window.orderBy(ColName.CLUSTER_COLUMN); - ZFrameranked = allScores.withColumn("rank", functions.rank().over(window)). - withColumn("minScore", functions.min(ColName.SCORE_COL).over(window)). - withColumn("maxScore", functions.max(ColName.SCORE_COL).over(window)). - where("rank == 1"); - ranked.toJavaRDD().saveAsTextFile("/tmp/allscoresRanked"); - - //graph = graph.withColumn("rank", functions.rank().over(window)); - //graph = graph.withColumn(ColName.DENSE_COL, functions.dense_rank().over(window)); - //graph = graph.withColumn("row_num", functions.row_number().over(window)); - */ + allScores = allScores.repartition(args.getNumPartitions(), allScores.col(ColName.ID_COL)); return allScores.groupByMinMaxScore(allScores.col(ColName.ID_COL)); diff --git a/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilder.java b/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilder.java index 2e9e261db..a337edaf0 100644 --- a/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilder.java +++ b/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilder.java @@ -21,12 +21,14 @@ public SelfPairBuilder(DSUtil dsUtil, IArguments args) { @Override public ZFrame getPairs(ZFrameblocked, ZFramebAll) throws Exception { - ZFramejoinH = getDSUtil().joinWithItself(blocked, ColName.HASH_COL, true).cache(); + blocked = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)).cache(); + ZFramejoinH = getDSUtil().joinWithItself(blocked, ColName.HASH_COL, true); + joinH = joinH.filter(joinH.gt(ColName.ID_COL)); /*ZFramejoinH = blocked.as("first").joinOnCol(blocked.as("second"), ColName.HASH_COL) .selectExpr("first.z_zid as z_zid", "second.z_zid as z_z_zid"); */ //joinH.show(); - joinH = joinH.filter(joinH.gt(ColName.ID_COL)); + /*joinH = joinH.filter(joinH.gt(ColName.ID_COL)); if (LOG.isDebugEnabled()) LOG.debug("Num comparisons " + joinH.count()); joinH = joinH.repartition(args.getNumPartitions(), joinH.col(ColName.ID_COL)); bAll = bAll.repartition(args.getNumPartitions(), bAll.col(ColName.ID_COL)); @@ -38,6 +40,7 @@ public ZFrame getPairs(ZFrameblocked, ZFramebAll) throws joinH = joinH.repartition(args.getNumPartitions(), joinH.col(ColName.COL_PREFIX + ColName.ID_COL)); joinH = joinH.joinOnCol(bAll, ColName.COL_PREFIX + ColName.ID_COL); LOG.warn("Joining again with actual values"); + */ //joinH.show(); return joinH; }