Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for unique validation in PySpark #1396

Merged
merged 7 commits into from
Oct 31, 2023

Conversation

filipeo2-mck
Copy link
Contributor

@filipeo2-mck filipeo2-mck commented Oct 26, 2023

Currently, the unique property in a Config class for DataFrameModel (PySpark) does not work, given no support for it was added in the initial pyspark support in release 0.16.0.

This PR adds support for it: test cases and a warning in the docs about a possible performance hit when using it were added.
The Column/Field-based unique capability remained untouched.

This PR is open for improvement suggestions.

Test code I
import pandera.pyspark as pa
from pyspark.sql import types as T
from pyspark.sql import SparkSession
from pprint import pprint as pp

class InputSchema(pa.DataFrameModel):
    year: T.IntegerType = pa.Field(gt=2000, coerce=True)
    month: T.IntegerType = pa.Field(ge=1, le=12, coerce=True)
    day: T.IntegerType = pa.Field(ge=0, le=365, coerce=True)

    class Config:
        unique = "year"  # More than one column can be set too: ["year", "month"]

data = [(2001, 3, 200), (2001, 6, 156), (2100, 12, 365), (2100, 12, 365)]
spark_schema = T.StructType(
    [
        T.StructField("year", T.IntegerType(), False),
        T.StructField("month", T.IntegerType(), False),
        T.StructField("day", T.IntegerType(), False),
    ],
)
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(data=data, schema=spark_schema, verifySchema=False)

df_out = InputSchema.validate(df)
pp(df_out.pandera.errors)
Output

Considering year column only, two rows have duplicated values:

defaultdict(<function ErrorHandler.__init__.<locals>.<lambda> at 0x17820a790>,  
            {'DATA': defaultdict(<class 'list'>,
                                 {'DUPLICATES': [{'check': 'unique',
                                                  'column': 'InputSchema',
                                                  'error': 'Duplicated rows '
                                                           '[2] were found for '
                                                           "columns ['year']",
                                                  'schema': 'InputSchema'}]})})
Test code II

If two columns are used:

    class Config:
        unique = ["year", "month"]
Output

Considering year and month columns, just one row has duplicated values:

defaultdict(<function ErrorHandler.__init__.<locals>.<lambda> at 0x14c20a790>,  
            {'DATA': defaultdict(<class 'list'>,
                                 {'DUPLICATES': [{'check': 'unique',
                                                  'column': 'InputSchema',
                                                  'error': 'Duplicated rows '
                                                           '[1] were found for '
                                                           "columns ['year', "
                                                           "'month']",
                                                  'schema': 'InputSchema'}]})})

@codecov
Copy link

codecov bot commented Oct 26, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (cf6b5e4) 93.92% compared to head (d6eea48) 94.08%.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1396      +/-   ##
==========================================
+ Coverage   93.92%   94.08%   +0.16%     
==========================================
  Files          91       91              
  Lines        6787     6802      +15     
==========================================
+ Hits         6375     6400      +25     
+ Misses        412      402      -10     
Files Coverage Δ
pandera/backends/pyspark/container.py 72.52% <100.00%> (+5.04%) ⬆️
pandera/backends/pyspark/decorators.py 100.00% <100.00%> (+4.00%) ⬆️

... and 2 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Comment on lines -420 to -440
def _coerce_df_dtype(obj: DataFrame) -> DataFrame:
if schema.dtype is None:
raise ValueError(
"dtype argument is None. Must specify this argument "
"to coerce dtype"
)

try:
return schema.dtype.try_coerce(obj)
except ParserError as exc:
raise SchemaError(
schema=schema,
data=obj,
message=(
f"Error while coercing '{schema.name}' to type "
f"{schema.dtype}: {exc}\n{exc.failure_cases}"
),
failure_cases=exc.failure_cases,
check=f"coerce_dtype('{schema.dtype}')",
) from exc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different from pandas namespace, this function was not being used inside the coercion functions. Removed.

@@ -89,17 +105,12 @@ def wrapper(self, *args, **kwargs):
return func(self, *args, **kwargs)
else:
warnings.warn(
"Skipping Execution of function as parameters set to DATA_ONLY ",
f"Skipping execution of function {func.__name__} as validation depth is set to DATA_ONLY ",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improving warning messages

@@ -53,7 +53,7 @@ def test_schema_only(self, spark, sample_spark_schema):
CONFIG.validation_enabled = True
CONFIG.validation_depth = ValidationDepth.SCHEMA_ONLY

pandra_schema = DataFrameSchema(
pandera_schema = DataFrameSchema(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

@filipeo2-mck
Copy link
Contributor Author

filipeo2-mck commented Oct 26, 2023

btw, it looks like pyspark/* tests are not running by default:
image

I believe these is the reason for the failing test coverage reports. All pyspark/* tests run fine when I force them to run locally:
image

@filipeo2-mck filipeo2-mck marked this pull request as ready for review October 26, 2023 20:42
Comment on lines +83 to +84
@validate_scope(scope=ValidationScope.DATA)
def _data_checks(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A separate function was defined to run DATA-related validations when this validation depth is enabled, given that the existing one (_schema_checks) is designated to SCHEMA-related validations.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great @filipeo2-mck! I like how we're catching the SchemaError raised by the unique method, but I think it's a bit weird that we're rasing a Schema error when it's potentially nothing wrong with the schema. @cosmicBboy, does it make sense to introduce a DataError in pandera.errors to make sure we distigish between different type of errors, or is there any reason that we don't want it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantically, what does the SchemaError in pandera.errors object stands for?

  • the segregation between two available validation types (schema vs data validations)? If this is the meaning, we should probably add data-related DataError exceptions to this namespace, to be specific about the kind of issue being raised.
  • the Pandera objects or components, at a macro level (a schema, some data - the df, the declared checks, the existing columns...)? If this is the meaning, I see no issues about calling it SchemaError.

Copy link
Collaborator

@cosmicBboy cosmicBboy Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantically, SchemaError(s) stand for anything that's wrong with the data or metadata of a validated object. That includes metadata (column names, types, etc), and data (actual data values contained in the object).

I think for clarity we should rename ValidationScope.SCHEMA to ValidationScore.METADATA to clarify the difference in pandera (I understand that the term Schema often refers to what I'm calling metadata here i.e. columns and their types, but pandera takes a slightly broader view of what a schema is).

but I think it's a bit weird that we're rasing a Schema error when it's potentially nothing wrong with the schema. @cosmicBboy, does it make sense to introduce a DataError in pandera.errors to make sure we distigish between different type of errors, or is there any reason that we don't want it?

I'm not sure what the concern is here: if the unique method is the function that raises a SchemaError that's caught elsewhere in the validation pipeline, doesn't that mean, by definition, that there's something wrong with the data (under pandera's definition of a "schema")?

Comment on lines 92 to 94
for value in kwargs.values():
if isinstance(value, pyspark.sql.DataFrame):
return value
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code coverage does not reach this because we are not passing check_obj as a kwarg. We use an arg instead.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Can you confirm if there's any scenario where a DataFrame might be passed as a kwarg?
  • For our tests related to this decorator, are we ensuring that we pass the pyspark.sql.DataFrame both as a positional and as a keyword argument?
  • If we anticipate that the DataFrame will always be passed as a positional argument and never as a keyword argument, would it make sense to refactor the decorator to remove the kwargs check for simplicity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrongly assumed that this decorator was used in pandas and other namespaces too and I wanted to keep the pattern, but checking now, there is no such thing in other integrations.
I'm changing it to remove the kwargs capability, as it's not used anywhere. Thank you for the input.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decorator was added to pyspark only. I hope eventually pandas will follow the suite...

Signed-off-by: Filipe Oliveira <[email protected]>
"""Run the checks related to data validation and uniqueness."""

# uniqueness of values
check_obj = self.unique(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't rememver how this was implemented, but wouldn't it make sense to catch the error which uniqe would raise and append to the ErrorHandler? Similar pattern to the SchemaError above?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially catching a DataError or similar. @NeerajMalhotra-QB

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is being done at a lower level inside this unique() method, through the helper function _check_uniqueness() that raises the exception:
image

This approach was kept from the existing function coerce_dtype():
image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

Copy link
Contributor Author

@filipeo2-mck filipeo2-mck Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I understood the issue now, it's about the use of an SchemaError exception in data vadliation, not the point where it's raised?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I flattened the unique validation to contain only one function, instead of two. The error collection is being done at root level, in the container.py now:
image

Copy link

@kasperjanehag kasperjanehag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! I added a bunch of comments.

Signed-off-by: Filipe Oliveira <[email protected]>
@NeerajMalhotra-QB
Copy link
Collaborator

LGTM but I am wondering if it could have bee more intuitive for the user to define uniqueness as a check in Field() rather than in config section?

@cosmicBboy
Copy link
Collaborator

@NeerajMalhotra-QB this is just to be consistent with the unique option at the dataframe level: https://pandera.readthedocs.io/en/stable/reference/generated/pandera.api.pandas.model_config.BaseConfig.html#pandera.api.pandas.model_config.BaseConfig

These options are currently exposed in the Config class in the DataFrameModel.

Would eventually like to support a syntax like this:

class Model(DataFrameModel, unique=[...], **kwargs):
    ...

But for now this is how it is.

@filipeo2-mck
Copy link
Contributor Author

filipeo2-mck commented Oct 27, 2023

LGTM but I am wondering if it could have bee more intuitive for the user to define uniqueness as a check in Field() rather than in config section?

Both approaches are valid from an usage perspective. I didn't implement it because:

  • Outside the documentation generated automatically from code, the written documentation covers the DataFrameSchema's unique property, so I thought it would be the first approach to take.
  • The Config class for DataFrameModel gives this as an model-wide option.
  • From what I remember from debugging, the unique property of columns was not present. I can check it again and add more info later.

EDIT: I'll add this distinction of what is being supported in the PR cover.

@NeerajMalhotra-QB
Copy link
Collaborator

@NeerajMalhotra-QB this is just to be consistent with the unique option at the dataframe level: https://pandera.readthedocs.io/en/stable/reference/generated/pandera.api.pandas.model_config.BaseConfig.html#pandera.api.pandas.model_config.BaseConfig

These options are currently exposed in the Config class in the DataFrameModel.

Would eventually like to support a syntax like this:

class Model(DataFrameModel, unique=[...], **kwargs):
    ...

But for now this is how it is.

sounds good to me. May be in future we can enhance Field to include unique check too but for now I agree this is the way it is done. :)

@NeerajMalhotra-QB NeerajMalhotra-QB added the enhancement New feature or request label Oct 27, 2023
@cosmicBboy
Copy link
Collaborator

Field(unique=...) is supported in the pandas schemas, not sure about pyspark tho: https://pandera.readthedocs.io/en/stable/reference/generated/pandera.api.pandas.model_components.Field.html#pandera.api.pandas.model_components.Field

The unique option at the dataframe-level is for specifying a list of strings for columns that should be jointly unique... @filipeo2-mck I haven't reviewed the code yet, but does this PR conform to that behavior?

@filipeo2-mck
Copy link
Contributor Author

Field(unique=...) is supported in the pandas schemas, not sure about pyspark tho: https://pandera.readthedocs.io/en/stable/reference/generated/pandera.api.pandas.model_components.Field.html#pandera.api.pandas.model_components.Field

The unique option at the dataframe-level is for specifying a list of strings for columns that should be jointly unique... @filipeo2-mck I haven't reviewed the code yet, but does this PR conform to that behavior?

Exactly. The uniqueness of values will be tested for the columns listed in unique property, in conjunction. I believe the example set in the PR cover is explicit enough to show the behavior. If not (or if you find any problem about it), just let me know.

@filipeo2-mck
Copy link
Contributor Author

Update: Added a missing condition/test to check if the unique configuration consists of valid dataframe columns. If not, a SchemaDefinitionError is raised.

@filipeo2-mck
Copy link
Contributor Author

filipeo2-mck commented Oct 27, 2023

About the unique support for columns (and not for the Model/Schema), inside pandera/backends/pyspark/container.py's there is no information about unique for the columns:

Setting a unique property:
image

Information available for a column:
image

Maybe this can be checked for another PR :P

Copy link
Collaborator

@NeerajMalhotra-QB NeerajMalhotra-QB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!!!

@cosmicBboy
Copy link
Collaborator

Thank you @filipeo2-mck !

@cosmicBboy cosmicBboy merged commit de0ec5f into unionai-oss:main Oct 31, 2023
55 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants