From e14a1ddfd781d09d77bcd7af37d9d42c7ce8d1ba Mon Sep 17 00:00:00 2001 From: Nitish Date: Mon, 16 Sep 2024 02:33:31 +0530 Subject: [PATCH 1/4] transpose the ZFrame --- .../main/java/zingg/common/client/ZFrame.java | 4 ++ .../core/executor/LabelDataViewHelper.java | 37 ++++++++++++++++-- .../zingg/common/core/util/ListHelper.java | 14 +++++++ .../java/zingg/spark/client/SparkFrame.java | 39 +++++++++++++++++-- .../spark/client/util/ExtendedFunction.scala | 18 +++++++++ 5 files changed, 106 insertions(+), 6 deletions(-) create mode 100644 common/core/src/main/java/zingg/common/core/util/ListHelper.java create mode 100644 spark/client/src/main/java/zingg/spark/client/util/ExtendedFunction.scala diff --git a/common/client/src/main/java/zingg/common/client/ZFrame.java b/common/client/src/main/java/zingg/common/client/ZFrame.java index b07a264c0..2e94b46f9 100644 --- a/common/client/src/main/java/zingg/common/client/ZFrame.java +++ b/common/client/src/main/java/zingg/common/client/ZFrame.java @@ -179,4 +179,8 @@ public interface ZFrame { public C gt(C column1, C column2); + public ZFrame transpose(String pivotColumn); + + public ZFrame addAutoIncrementalRow(); + } diff --git a/common/core/src/main/java/zingg/common/core/executor/LabelDataViewHelper.java b/common/core/src/main/java/zingg/common/core/executor/LabelDataViewHelper.java index bb9d7a25f..bccad9f8c 100644 --- a/common/core/src/main/java/zingg/common/core/executor/LabelDataViewHelper.java +++ b/common/core/src/main/java/zingg/common/core/executor/LabelDataViewHelper.java @@ -18,6 +18,10 @@ public class LabelDataViewHelper extends ZinggBase imp private static final long serialVersionUID = 1L; public static final Log LOG = LogFactory.getLog(LabelDataViewHelper.class); + public static String PIVOT_COLUMN = "field"; + public static String VALUE_1 = "value1"; + public static String VALUE_2 = "value2"; + public static String ORDER = "order"; public LabelDataViewHelper(IContext context, ClientOptions clientOptions) { setContext(context); @@ -87,7 +91,8 @@ public String getMsg2(double prediction, double score) { public void displayRecords(ZFrame records, String preMessage, String postMessage) { //System.out.println(); System.out.println(preMessage); - records.show(false); +// showHorizontal(records); + showVertical(records); System.out.println(postMessage); System.out.println("\tWhat do you think? Your choices are: "); System.out.println(); @@ -125,6 +130,32 @@ public ILabelDataViewHelper getLabelDataViewHelper() throws Unsuppor return this; } - - + public void showHorizontal(ZFrame records) { + records.show(false); + } + + public void showVertical(ZFrame records) { + ZFrame headerIncludedFrame = getHeaderIncludedDataFrame(records); + ZFrame vertical = headerIncludedFrame.transpose(PIVOT_COLUMN); + vertical.sortAscending(ORDER).drop(ORDER).show(); + } + + /*** + * return new ZFrame with new Column added as PIVOT used for transposing the matrix + * @param records + * @return header included zFrame + */ + private ZFrame getHeaderIncludedDataFrame(ZFrame records) { + ZFrame orderedRowAdded = records.addAutoIncrementalRow(); + + ZFrame firstRecord = orderedRowAdded.limit(1); + ZFrame secondRecord = orderedRowAdded.except(firstRecord).limit(1); + ZFrame thirdRecord = orderedRowAdded.except(firstRecord.union(secondRecord)); + + //return new ZFrame with Field column added to be used as pivot + return firstRecord.withColumn(PIVOT_COLUMN, VALUE_1). + union(secondRecord.withColumn(PIVOT_COLUMN, VALUE_2)). + union(thirdRecord.withColumn(PIVOT_COLUMN, ORDER)); + } + } diff --git a/common/core/src/main/java/zingg/common/core/util/ListHelper.java b/common/core/src/main/java/zingg/common/core/util/ListHelper.java new file mode 100644 index 000000000..c96df1acc --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/util/ListHelper.java @@ -0,0 +1,14 @@ +package zingg.common.core.util; + +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.util.List; + +public class ListHelper { + public static Seq convertListToSeq(List inputList) { + return JavaConverters.asScalaIteratorConverter(inputList.iterator()) + .asScala() + .toSeq(); + } +} diff --git a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java index 7add25352..7066b583d 100644 --- a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java +++ b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java @@ -1,8 +1,9 @@ package zingg.spark.client; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import org.apache.spark.internal.config.R; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -14,6 +15,8 @@ import zingg.common.client.FieldData; import zingg.common.client.ZFrame; import zingg.common.client.util.ColName; +import zingg.common.core.util.ListHelper; +import zingg.spark.client.util.ExtendedFunction; //Dataset, Row, column public class SparkFrame implements ZFrame, Row, Column> { @@ -458,7 +461,7 @@ public ZFrame, Row, Column> groupByCount(String groupByCol1, String @Override public ZFrame, Row, Column> intersect(ZFrame, Row, Column> other) { - return new SparkFrame(df.intersect(other.df())); + return new SparkFrame(df.intersect(other.df())); } @Override @@ -470,6 +473,36 @@ public Column substr(Column col, int startPos, int len) { public Column gt(Column column1, Column column2) { return column1.gt(column2); } - + + @Override + public ZFrame, Row, Column> transpose(String pivotColumn) { + ExtendedFunction extendedFunction = new ExtendedFunction(); + List columnsExceptPivot = new ArrayList<>(List.of(df.columns())); + columnsExceptPivot.remove(pivotColumn); + Dataset r = extendedFunction.TransposeDF(df, ListHelper.convertListToSeq(columnsExceptPivot), pivotColumn); + return new SparkFrame(r); + } + + /*** + * Add auto incremental row like {1, 2, 3, 4, 5} to the dataframe + * @return ZFrame + */ + + @Override + public ZFrame, Row, Column> addAutoIncrementalRow() { + String[] columns = df.columns(); + Dataset temporaryDF = df.limit(1); + List monotonicIncreasing = new ArrayList<>(); + for (int idx = 0; idx < columns.length; idx++) { + monotonicIncreasing.add(String.valueOf(idx)); + } + Collections.sort(monotonicIncreasing); + for (int idx = 0; idx < columns.length; idx++) { + temporaryDF = temporaryDF.withColumn(columns[idx], functions.lit(monotonicIncreasing.get(idx))); + } + return new SparkFrame(df.union(temporaryDF)); + } + + } diff --git a/spark/client/src/main/java/zingg/spark/client/util/ExtendedFunction.scala b/spark/client/src/main/java/zingg/spark/client/util/ExtendedFunction.scala new file mode 100644 index 000000000..3f8de887f --- /dev/null +++ b/spark/client/src/main/java/zingg/spark/client/util/ExtendedFunction.scala @@ -0,0 +1,18 @@ +package zingg.spark.client.util + +import org.apache.spark.sql.functions.{col, collect_list, concat_ws} +import org.apache.spark.sql.{Dataset, Row} + +class ExtendedFunction { + def TransposeDF(df: Dataset[Row], columns: Seq[String], pivotCol: String): Dataset[Row] = { + val columnsValue = columns.map(x => "'" + x + "', " + x) + val stackCols = columnsValue.mkString(",") + val df_1 = df.selectExpr(pivotCol, "stack(" + columns.size + "," + stackCols + ")") + .select(pivotCol, "col0", "col1") + + val final_df = df_1.groupBy(col("col0")).pivot(pivotCol).agg(concat_ws("", collect_list(col("col1")))) + .withColumnRenamed("col0", pivotCol) + final_df + } + +} From ad4db57b25eb09b60ef1213c3e1004b47dc26e25 Mon Sep 17 00:00:00 2001 From: Nitish Date: Mon, 16 Sep 2024 10:48:54 +0530 Subject: [PATCH 2/4] refactored --- .../main/java/zingg/common/client/ZFrame.java | 24 +++++++++++++ .../core/executor/LabelDataViewHelper.java | 34 +------------------ .../{ListHelper.java => ListConverter.java} | 2 +- .../java/zingg/spark/client/SparkFrame.java | 11 ++++-- 4 files changed, 35 insertions(+), 36 deletions(-) rename common/core/src/main/java/zingg/common/core/util/{ListHelper.java => ListConverter.java} (92%) diff --git a/common/client/src/main/java/zingg/common/client/ZFrame.java b/common/client/src/main/java/zingg/common/client/ZFrame.java index 2e94b46f9..7da1753f5 100644 --- a/common/client/src/main/java/zingg/common/client/ZFrame.java +++ b/common/client/src/main/java/zingg/common/client/ZFrame.java @@ -10,6 +10,10 @@ public interface ZFrame { public static final String COL_COUNT = "count"; public static final String COL_VALUE = "VALUE"; + public static String PIVOT_COLUMN = "field"; + public static String VALUE_1 = "value1"; + public static String VALUE_2 = "value2"; + public static String ORDER = "order"; public ZFrame cache(); public ZFrame as(String s); @@ -183,4 +187,24 @@ public interface ZFrame { public ZFrame addAutoIncrementalRow(); + public void showVertical(); + + /*** + * return new ZFrame with new Column added as PIVOT used for transposing the matrix + * @param records + * @return header included zFrame + */ + default ZFrame getHeaderIncludedDataFrame(ZFrame records) { + ZFrame orderedRowAdded = records.addAutoIncrementalRow(); + + ZFrame firstRecord = orderedRowAdded.limit(1); + ZFrame secondRecord = orderedRowAdded.except(firstRecord).limit(1); + ZFrame thirdRecord = orderedRowAdded.except(firstRecord.union(secondRecord)); + + //return new ZFrame with Field column added to be used as pivot + return firstRecord.withColumn(PIVOT_COLUMN, VALUE_1). + union(secondRecord.withColumn(PIVOT_COLUMN, VALUE_2)). + union(thirdRecord.withColumn(PIVOT_COLUMN, ORDER)); + } + } diff --git a/common/core/src/main/java/zingg/common/core/executor/LabelDataViewHelper.java b/common/core/src/main/java/zingg/common/core/executor/LabelDataViewHelper.java index bccad9f8c..3c51df9dd 100644 --- a/common/core/src/main/java/zingg/common/core/executor/LabelDataViewHelper.java +++ b/common/core/src/main/java/zingg/common/core/executor/LabelDataViewHelper.java @@ -18,10 +18,6 @@ public class LabelDataViewHelper extends ZinggBase imp private static final long serialVersionUID = 1L; public static final Log LOG = LogFactory.getLog(LabelDataViewHelper.class); - public static String PIVOT_COLUMN = "field"; - public static String VALUE_1 = "value1"; - public static String VALUE_2 = "value2"; - public static String ORDER = "order"; public LabelDataViewHelper(IContext context, ClientOptions clientOptions) { setContext(context); @@ -92,7 +88,7 @@ public void displayRecords(ZFrame records, String preMessage, String po //System.out.println(); System.out.println(preMessage); // showHorizontal(records); - showVertical(records); + records.showVertical(); System.out.println(postMessage); System.out.println("\tWhat do you think? Your choices are: "); System.out.println(); @@ -130,32 +126,4 @@ public ILabelDataViewHelper getLabelDataViewHelper() throws Unsuppor return this; } - public void showHorizontal(ZFrame records) { - records.show(false); - } - - public void showVertical(ZFrame records) { - ZFrame headerIncludedFrame = getHeaderIncludedDataFrame(records); - ZFrame vertical = headerIncludedFrame.transpose(PIVOT_COLUMN); - vertical.sortAscending(ORDER).drop(ORDER).show(); - } - - /*** - * return new ZFrame with new Column added as PIVOT used for transposing the matrix - * @param records - * @return header included zFrame - */ - private ZFrame getHeaderIncludedDataFrame(ZFrame records) { - ZFrame orderedRowAdded = records.addAutoIncrementalRow(); - - ZFrame firstRecord = orderedRowAdded.limit(1); - ZFrame secondRecord = orderedRowAdded.except(firstRecord).limit(1); - ZFrame thirdRecord = orderedRowAdded.except(firstRecord.union(secondRecord)); - - //return new ZFrame with Field column added to be used as pivot - return firstRecord.withColumn(PIVOT_COLUMN, VALUE_1). - union(secondRecord.withColumn(PIVOT_COLUMN, VALUE_2)). - union(thirdRecord.withColumn(PIVOT_COLUMN, ORDER)); - } - } diff --git a/common/core/src/main/java/zingg/common/core/util/ListHelper.java b/common/core/src/main/java/zingg/common/core/util/ListConverter.java similarity index 92% rename from common/core/src/main/java/zingg/common/core/util/ListHelper.java rename to common/core/src/main/java/zingg/common/core/util/ListConverter.java index c96df1acc..01acd000b 100644 --- a/common/core/src/main/java/zingg/common/core/util/ListHelper.java +++ b/common/core/src/main/java/zingg/common/core/util/ListConverter.java @@ -5,7 +5,7 @@ import java.util.List; -public class ListHelper { +public class ListConverter { public static Seq convertListToSeq(List inputList) { return JavaConverters.asScalaIteratorConverter(inputList.iterator()) .asScala() diff --git a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java index 7066b583d..74ee66921 100644 --- a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java +++ b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java @@ -15,7 +15,7 @@ import zingg.common.client.FieldData; import zingg.common.client.ZFrame; import zingg.common.client.util.ColName; -import zingg.common.core.util.ListHelper; +import zingg.common.core.util.ListConverter; import zingg.spark.client.util.ExtendedFunction; //Dataset, Row, column @@ -479,7 +479,7 @@ public ZFrame, Row, Column> transpose(String pivotColumn) { ExtendedFunction extendedFunction = new ExtendedFunction(); List columnsExceptPivot = new ArrayList<>(List.of(df.columns())); columnsExceptPivot.remove(pivotColumn); - Dataset r = extendedFunction.TransposeDF(df, ListHelper.convertListToSeq(columnsExceptPivot), pivotColumn); + Dataset r = extendedFunction.TransposeDF(df, ListConverter.convertListToSeq(columnsExceptPivot), pivotColumn); return new SparkFrame(r); } @@ -503,6 +503,13 @@ public ZFrame, Row, Column> addAutoIncrementalRow() { return new SparkFrame(df.union(temporaryDF)); } + @Override + public void showVertical() { + ZFrame, Row, Column> headerIncludedFrame = getHeaderIncludedDataFrame(new SparkFrame(df)); + ZFrame, Row, Column> vertical = headerIncludedFrame.transpose(PIVOT_COLUMN); + vertical.sortAscending(ORDER).drop(ORDER).show(); + } + } From 7a3197534b313126efb6c14ce222bc9e39ba7e13 Mon Sep 17 00:00:00 2001 From: Nitish Date: Mon, 16 Sep 2024 11:17:24 +0530 Subject: [PATCH 3/4] made listConverter generic --- .../src/main/java/zingg/common/core/util/ListConverter.java | 4 ++-- spark/client/src/main/java/zingg/spark/client/SparkFrame.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/common/core/src/main/java/zingg/common/core/util/ListConverter.java b/common/core/src/main/java/zingg/common/core/util/ListConverter.java index 01acd000b..8b592e273 100644 --- a/common/core/src/main/java/zingg/common/core/util/ListConverter.java +++ b/common/core/src/main/java/zingg/common/core/util/ListConverter.java @@ -5,8 +5,8 @@ import java.util.List; -public class ListConverter { - public static Seq convertListToSeq(List inputList) { +public class ListConverter { + public Seq convertListToSeq(List inputList) { return JavaConverters.asScalaIteratorConverter(inputList.iterator()) .asScala() .toSeq(); diff --git a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java index 74ee66921..ca546aa15 100644 --- a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java +++ b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java @@ -479,7 +479,8 @@ public ZFrame, Row, Column> transpose(String pivotColumn) { ExtendedFunction extendedFunction = new ExtendedFunction(); List columnsExceptPivot = new ArrayList<>(List.of(df.columns())); columnsExceptPivot.remove(pivotColumn); - Dataset r = extendedFunction.TransposeDF(df, ListConverter.convertListToSeq(columnsExceptPivot), pivotColumn); + ListConverter listConverter = new ListConverter(); + Dataset r = extendedFunction.TransposeDF(df, listConverter.convertListToSeq(columnsExceptPivot), pivotColumn); return new SparkFrame(r); } From e9eb598c6178ceafc55db595d9e7789915a70937 Mon Sep 17 00:00:00 2001 From: Nitish Date: Mon, 16 Sep 2024 18:09:37 +0530 Subject: [PATCH 4/4] refactored --- .../main/java/zingg/common/client/ZFrame.java | 21 ------------------ .../java/zingg/spark/client/SparkFrame.java | 22 ++++++++++++++++--- 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/common/client/src/main/java/zingg/common/client/ZFrame.java b/common/client/src/main/java/zingg/common/client/ZFrame.java index 7da1753f5..88072a12a 100644 --- a/common/client/src/main/java/zingg/common/client/ZFrame.java +++ b/common/client/src/main/java/zingg/common/client/ZFrame.java @@ -185,26 +185,5 @@ public interface ZFrame { public ZFrame transpose(String pivotColumn); - public ZFrame addAutoIncrementalRow(); - public void showVertical(); - - /*** - * return new ZFrame with new Column added as PIVOT used for transposing the matrix - * @param records - * @return header included zFrame - */ - default ZFrame getHeaderIncludedDataFrame(ZFrame records) { - ZFrame orderedRowAdded = records.addAutoIncrementalRow(); - - ZFrame firstRecord = orderedRowAdded.limit(1); - ZFrame secondRecord = orderedRowAdded.except(firstRecord).limit(1); - ZFrame thirdRecord = orderedRowAdded.except(firstRecord.union(secondRecord)); - - //return new ZFrame with Field column added to be used as pivot - return firstRecord.withColumn(PIVOT_COLUMN, VALUE_1). - union(secondRecord.withColumn(PIVOT_COLUMN, VALUE_2)). - union(thirdRecord.withColumn(PIVOT_COLUMN, ORDER)); - } - } diff --git a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java index ca546aa15..fd983bb6d 100644 --- a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java +++ b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java @@ -489,8 +489,7 @@ public ZFrame, Row, Column> transpose(String pivotColumn) { * @return ZFrame */ - @Override - public ZFrame, Row, Column> addAutoIncrementalRow() { + private ZFrame, Row, Column> addAutoIncrementalRow() { String[] columns = df.columns(); Dataset temporaryDF = df.limit(1); List monotonicIncreasing = new ArrayList<>(); @@ -508,9 +507,26 @@ public ZFrame, Row, Column> addAutoIncrementalRow() { public void showVertical() { ZFrame, Row, Column> headerIncludedFrame = getHeaderIncludedDataFrame(new SparkFrame(df)); ZFrame, Row, Column> vertical = headerIncludedFrame.transpose(PIVOT_COLUMN); - vertical.sortAscending(ORDER).drop(ORDER).show(); + vertical.sortAscending(ORDER).drop(ORDER).show(1000); } + /*** + * return new ZFrame with new Column added as PIVOT used for transposing the matrix + * @param records + * @return header included zFrame + */ + private ZFrame, Row, Column> getHeaderIncludedDataFrame(ZFrame, Row, Column> records) { + ZFrame, Row, Column> orderedRowAdded = addAutoIncrementalRow(); + + ZFrame, Row, Column> firstRecord = orderedRowAdded.limit(1); + ZFrame, Row, Column> secondRecord = orderedRowAdded.except(firstRecord).limit(1); + ZFrame, Row, Column> thirdRecord = orderedRowAdded.except(firstRecord.union(secondRecord)); + + //return new ZFrame with Field column added to be used as pivot + return firstRecord.withColumn(PIVOT_COLUMN, VALUE_1). + union(secondRecord.withColumn(PIVOT_COLUMN, VALUE_2)). + union(thirdRecord.withColumn(PIVOT_COLUMN, ORDER)); + } }