Skip to content

Commit

Permalink
perf improvements 3m nc voter runs in 1 hour 15 minutes with det matc…
Browse files Browse the repository at this point in the history
…hing 2 conditions
  • Loading branch information
sonalgoyal committed Oct 11, 2024
1 parent 983f4d9 commit c1e5f73
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public ZFrame<D, R, C> joinWithItself(ZFrame<D, R, C> lines, String joinColumn,

public ZFrame<D, R, C> joinWithItselfSourceSensitive(ZFrame<D, R, C> lines, String joinColumn, IArguments args) throws Exception {

ZFrame<D, R, C> lines1 = getPrefixedColumnsDS(lines).cache();
ZFrame<D, R, C> lines1 = getPrefixedColumnsDS(lines);

String[] sourceNames = args.getPipeNames();
lines = lines.filter(lines.equalTo(ColName.SOURCE_COL, sourceNames[0]));
Expand Down
59 changes: 15 additions & 44 deletions common/core/src/main/java/zingg/common/core/executor/Matcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ public ZFrame<D, R, C> getFieldDefColumnsDS(ZFrame<D, R, C> testDataOriginal) {
public ZFrame<D,R,C> getBlocked( ZFrame<D,R,C> testData) throws Exception, ZinggClientException{
LOG.debug("Blocking model file location is " + args.getBlockFile());
Tree<Canopy<R>> tree = getBlockingTreeUtil().readBlockingTree(args);
ZFrame<D,R,C> blocked = getBlockingTreeUtil().getBlockHashes(testData, tree);
ZFrame<D,R,C> blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)).cache();
return blocked1;
ZFrame<D,R,C> blocked = getBlockingTreeUtil().getBlockHashes(testData, tree);
return blocked;
}

public ZFrame<D,R,C> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll, IPairBuilder<S, D, R, C> iPairBuilder) throws Exception{
Expand Down Expand Up @@ -83,7 +82,7 @@ protected ZFrame<D,R,C> getActualDupes(ZFrame<D,R,C> blocked, ZFrame<D,R,C> test

protected ZFrame<D,R,C> getActualDupes(ZFrame<D,R,C> blocked, ZFrame<D,R,C> testData,
IFilter<D, R, C> predictionFilter, IPairBuilder<S, D, R, C> iPairBuilder, PredictionColsSelector colsSelector) throws Exception, ZinggClientException{
ZFrame<D,R,C> blocks = getPairs(selectColsFromBlocked(blocked), testData, iPairBuilder);
ZFrame<D,R,C> blocks = getPairs(blocked, testData, iPairBuilder);
ZFrame<D,R,C>dupesActual = predictOnBlocks(blocks);
ZFrame<D, R, C> filteredData = predictionFilter.filter(dupesActual);
if(colsSelector!=null) {
Expand All @@ -97,20 +96,16 @@ public void execute() throws ZinggClientException {
try {
// read input, filter, remove self joins
ZFrame<D,R,C> testDataOriginal = getTestData();
testDataOriginal = getFieldDefColumnsDS(testDataOriginal);
testDataOriginal = getFieldDefColumnsDS(testDataOriginal).cache();
ZFrame<D,R,C> testData = getStopWords().preprocessForStopWords(testDataOriginal);
testData = testData.repartition(args.getNumPartitions(), testData.col(ColName.ID_COL));
//testData = testData.repartition(args.getNumPartitions(), testData.col(ColName.ID_COL));
//testData = dropDuplicates(testData);
long count = testData.count();
LOG.info("Read " + count);
Analytics.track(Metric.DATA_COUNT, count, args.getCollectMetrics());

ZFrame<D,R,C>blocked = getBlocked(testData);
LOG.info("Blocked ");
/*blocked = blocked.cache();
blocked.withColumn("partition_id", functions.spark_partition_id())
.groupBy("partition_id").agg(functions.count("z_zid")).as("zid").orderBy("partition_id").toJavaRDD().saveAsTextFile("/tmp/zblockedParts");
*/
if (LOG.isDebugEnabled()) {
LOG.debug("Num distinct hashes " + blocked.select(ColName.HASH_COL).distinct().count());
blocked.show();
Expand All @@ -133,15 +128,15 @@ public void execute() throws ZinggClientException {



public void writeOutput( ZFrame<D,R,C> blocked, ZFrame<D,R,C> dupesActual) throws ZinggClientException {
public void writeOutput( ZFrame<D,R,C> testDataOriginal, ZFrame<D,R,C> dupesActual) throws ZinggClientException {
try{
//input dupes are pairs
///pick ones according to the threshold by user


//all clusters consolidated in one place
if (args.getOutput() != null) {
ZFrame<D, R, C> graphWithScores = getOutput(blocked, dupesActual);
ZFrame<D, R, C> graphWithScores = getOutput(testDataOriginal, dupesActual);
getPipeUtil().write(graphWithScores, args.getOutput());
}
}
Expand All @@ -153,7 +148,7 @@ public void writeOutput( ZFrame<D,R,C> blocked, ZFrame<D,R,C> dupesActual) th



protected ZFrame<D, R, C> getOutput(ZFrame<D, R, C> blocked, ZFrame<D, R, C> dupesActual) throws ZinggClientException, Exception {
protected ZFrame<D, R, C> getOutput(ZFrame<D, R, C> testDataOriginal, ZFrame<D, R, C> dupesActual) throws ZinggClientException, Exception {
//-1 is initial suggestion, 1 is add, 0 is deletion, 2 is unsure
/*blocked = blocked.drop(ColName.HASH_COL);
blocked = blocked.drop(ColName.SOURCE_COL);
Expand All @@ -165,7 +160,7 @@ protected ZFrame<D, R, C> getOutput(ZFrame<D, R, C> blocked, ZFrame<D, R, C> dup
LOG.debug("dupes ------------");
dupesActual.show();
}
ZFrame<D,R,C>graph = getGraphUtil().buildGraph(blocked, dupesActual).cache();
ZFrame<D,R,C>graph = getGraphUtil().buildGraph(testDataOriginal, dupesActual).cache();
//graph.toJavaRDD().saveAsTextFile("/tmp/zgraph");

if (LOG.isDebugEnabled()) {
Expand All @@ -174,27 +169,15 @@ protected ZFrame<D, R, C> getOutput(ZFrame<D, R, C> blocked, ZFrame<D, R, C> dup

}
//write score
ZFrame<D,R,C>score = getMinMaxScores(dupesActual, graph).cache();
ZFrame<D,R,C>score = getMinMaxScores(dupesActual, graph);
//score.toJavaRDD().coalesce(1).saveAsTextFile("/tmp/zallscoresAvg");
graph = graph.repartition(args.getNumPartitions(), graph.col(ColName.ID_COL)).cache();
graph = graph.repartition(args.getNumPartitions(), graph.col(ColName.ID_COL));
if (LOG.isDebugEnabled()) {
score.show();
}
ZFrame<D, R, C> graphWithScores = getGraphWithScores(graph, score);
//graphWithScores.toJavaRDD().saveAsTextFile("/tmp/zgraphWScores");
graphWithScores = graphWithScores.drop(ColName.HASH_COL);
graphWithScores = graphWithScores.drop(ColName.COL_PREFIX + ColName.ID_COL);
graphWithScores = graphWithScores.drop(ColName.ID_COL);
graphWithScores = graphWithScores.drop(ColName.SOURCE_COL);
/*String[] cols = graphWithScores.columns();
List<Column> columns = new ArrayList<Column>();
//columns.add(graphWithScores.col(ColName.CLUSTER_COLUMN));
//go only upto the last col, which is cluster col
for (int i=0; i < cols.length - 1; ++i) {
columns.add(graphWithScores.col(cols[i]));
}
graphWithScores = getDSUtil().select(graphWithScores, columns);
*/
graphWithScores = graphWithScores.drop(ColName.HASH_COL, ColName.COL_PREFIX + ColName.ID_COL, ColName.ID_COL, ColName.SOURCE_COL);

return graphWithScores;
}

Expand Down Expand Up @@ -229,7 +212,7 @@ protected ZFrame<D, R, C> getGraphWithScores(ZFrame<D, R, C> graph, ZFrame<D, R,
if (LOG.isDebugEnabled()) graphPairsFound.show(500);

ZFrame<D,R,C> graphPairsExtra = graphPairsFound.except(dupesWithIds);
ZFrame<D,R,C> graphPairsExtrawithDummyScore = graphPairsExtra.withColumn(ColName.SCORE_COL, 0.0);
ZFrame<D,R,C> graphPairsExtrawithDummyScore = graphPairsExtra.withColumn(ColName.SCORE_COL, 0.0).cache();
LOG.warn("graph pairs extra");
if (LOG.isDebugEnabled()) graphPairsExtra.show(500);

Expand All @@ -244,19 +227,7 @@ protected ZFrame<D, R, C> getGraphWithScores(ZFrame<D, R, C> graph, ZFrame<D, R,

ZFrame<D,R,C>s2RightCols = s2.toDF(ColName.SCORE_COL, ColName.ID_COL).cache();
ZFrame<D,R,C>allScores = s2RightCols.union(s1);
//allScores.toJavaRDD().coalesce(1).saveAsTextFile("/tmp/zallscores");
/*WindowSpec window = Window.partitionBy(ColName.ID_COL).orderBy(ColName.SCORE_COL);
//WindowSpec window = Window.orderBy(ColName.CLUSTER_COLUMN);
ZFrame<D,R,C>ranked = allScores.withColumn("rank", functions.rank().over(window)).
withColumn("minScore", functions.min(ColName.SCORE_COL).over(window)).
withColumn("maxScore", functions.max(ColName.SCORE_COL).over(window)).
where("rank == 1");
ranked.toJavaRDD().saveAsTextFile("/tmp/allscoresRanked");
//graph = graph.withColumn("rank", functions.rank().over(window));
//graph = graph.withColumn(ColName.DENSE_COL, functions.dense_rank().over(window));
//graph = graph.withColumn("row_num", functions.row_number().over(window));
*/

allScores = allScores.repartition(args.getNumPartitions(), allScores.col(ColName.ID_COL));

return allScores.groupByMinMaxScore(allScores.col(ColName.ID_COL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ public SelfPairBuilder(DSUtil<S, D, R, C> dsUtil, IArguments args) {

@Override
public ZFrame<D, R, C> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll) throws Exception {
ZFrame<D,R,C>joinH = getDSUtil().joinWithItself(blocked, ColName.HASH_COL, true).cache();
blocked = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)).cache();
ZFrame<D,R,C>joinH = getDSUtil().joinWithItself(blocked, ColName.HASH_COL, true);
joinH = joinH.filter(joinH.gt(ColName.ID_COL));
/*ZFrame<D,R,C>joinH = blocked.as("first").joinOnCol(blocked.as("second"), ColName.HASH_COL)
.selectExpr("first.z_zid as z_zid", "second.z_zid as z_z_zid");
*/
//joinH.show();
joinH = joinH.filter(joinH.gt(ColName.ID_COL));
/*joinH = joinH.filter(joinH.gt(ColName.ID_COL));
if (LOG.isDebugEnabled()) LOG.debug("Num comparisons " + joinH.count());
joinH = joinH.repartition(args.getNumPartitions(), joinH.col(ColName.ID_COL));
bAll = bAll.repartition(args.getNumPartitions(), bAll.col(ColName.ID_COL));
Expand All @@ -38,6 +40,7 @@ public ZFrame<D, R, C> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll) throws
joinH = joinH.repartition(args.getNumPartitions(), joinH.col(ColName.COL_PREFIX + ColName.ID_COL));
joinH = joinH.joinOnCol(bAll, ColName.COL_PREFIX + ColName.ID_COL);
LOG.warn("Joining again with actual values");
*/
//joinH.show();
return joinH;
}
Expand Down

0 comments on commit c1e5f73

Please sign in to comment.