Skip to content

Commit

Permalink
[SPARK-50059][CONNECT][PYTHON] API compatibility check for Structured…
Browse files Browse the repository at this point in the history
… Streaming I/O

### What changes were proposed in this pull request?

This PR proposes to add API compatibility check for Spark SQL Structured Streaming I/O functions

### Why are the changes needed?

To guarantee of the same behavior between Spark Classic and Spark Connect

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added UTs

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48593 from itholic/compat_ss_io.

Authored-by: Haejoon Lee <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
itholic authored and HyukjinKwon committed Oct 22, 2024
1 parent 61db251 commit a376880
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
4 changes: 2 additions & 2 deletions python/pyspark/sql/connect/streaming/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ def start(
partitionBy: Optional[Union[str, List[str]]] = None,
queryName: Optional[str] = None,
**options: "OptionalPrimitiveType",
) -> StreamingQuery:
) -> "StreamingQuery":
return self._start_internal(
path=path,
tableName=None,
Expand All @@ -673,7 +673,7 @@ def toTable(
partitionBy: Optional[Union[str, List[str]]] = None,
queryName: Optional[str] = None,
**options: "OptionalPrimitiveType",
) -> StreamingQuery:
) -> "StreamingQuery":
return self._start_internal(
path=None,
tableName=tableName,
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/streaming/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,7 @@ def start(
partitionBy: Optional[Union[str, List[str]]] = None,
queryName: Optional[str] = None,
**options: "OptionalPrimitiveType",
) -> StreamingQuery:
) -> "StreamingQuery":
"""Streams the contents of the :class:`DataFrame` to a data source.
The data source is specified by the ``format`` and a set of ``options``.
Expand Down Expand Up @@ -1710,7 +1710,7 @@ def toTable(
partitionBy: Optional[Union[str, List[str]]] = None,
queryName: Optional[str] = None,
**options: "OptionalPrimitiveType",
) -> StreamingQuery:
) -> "StreamingQuery":
"""
Starts the execution of the streaming query, which will continually output results to the
given table as new data arrives.
Expand Down
36 changes: 36 additions & 0 deletions python/pyspark/sql/tests/test_connect_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import pyspark.sql.protobuf.functions as ClassicProtobuf
from pyspark.sql.streaming.query import StreamingQuery as ClassicStreamingQuery
from pyspark.sql.streaming.query import StreamingQueryManager as ClassicStreamingQueryManager
from pyspark.sql.streaming.readwriter import DataStreamReader as ClassicDataStreamReader
from pyspark.sql.streaming.readwriter import DataStreamWriter as ClassicDataStreamWriter

if should_test_connect:
from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame
Expand All @@ -55,6 +57,8 @@
from pyspark.sql.connect.streaming.query import (
StreamingQueryManager as ConnectStreamingQueryManager,
)
from pyspark.sql.connect.streaming.readwriter import DataStreamReader as ConnectDataStreamReader
from pyspark.sql.connect.streaming.readwriter import DataStreamWriter as ConnectDataStreamWriter


class ConnectCompatibilityTestsMixin:
Expand Down Expand Up @@ -461,6 +465,38 @@ def test_streaming_query_manager_compatibility(self):
expected_missing_classic_methods,
)

def test_streaming_reader_compatibility(self):
"""Test Data Stream Reader compatibility between classic and connect."""
expected_missing_connect_properties = set()
expected_missing_classic_properties = set()
expected_missing_connect_methods = set()
expected_missing_classic_methods = set()
self.check_compatibility(
ClassicDataStreamReader,
ConnectDataStreamReader,
"DataStreamReader",
expected_missing_connect_properties,
expected_missing_classic_properties,
expected_missing_connect_methods,
expected_missing_classic_methods,
)

def test_streaming_writer_compatibility(self):
"""Test Data Stream Writer compatibility between classic and connect."""
expected_missing_connect_properties = set()
expected_missing_classic_properties = set()
expected_missing_connect_methods = set()
expected_missing_classic_methods = set()
self.check_compatibility(
ClassicDataStreamWriter,
ConnectDataStreamWriter,
"DataStreamWriter",
expected_missing_connect_properties,
expected_missing_classic_properties,
expected_missing_connect_methods,
expected_missing_classic_methods,
)


@unittest.skipIf(not should_test_connect, connect_requirement_message)
class ConnectCompatibilityTests(ConnectCompatibilityTestsMixin, ReusedSQLTestCase):
Expand Down

0 comments on commit a376880

Please sign in to comment.