pandera icon indicating copy to clipboard operation
pandera copied to clipboard

Pyspark unique check doesn't return error

Open mdenushev opened this issue 1 year ago • 9 comments

Describe the bug Trying to check uniqueness of field, but no errors returned

  • [x] I have checked that this issue has not already been reported.
  • [x] I have confirmed this bug exists on the latest version of pandera.
  • [x] (optional) I have confirmed this bug exists on the master branch of pandera.

Note: Please read this guide detailing how to provide the necessary information for us to reproduce your bug.

Code Sample, a copy-pastable example

from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .getOrCreate()
)

import pandera.pyspark as pa
import pyspark.sql.types as T
from pandera.pyspark import DataFrameModel

class Sample(DataFrameModel):
    id: T.StringType() = pa.Field(unique=True)
    
spark_schema = T.StructType([T.StructField('id', T.StringType(), False)])

from pyspark.sql import Row
data = [
    Row(id="1"),
    Row(id="1"),
    Row(id="2"),
]
sample_df = spark.createDataFrame(data, spark_schema)
res = Sample.to_schema().validate(sample_df)
print(res.pandera.errors)
# Output: {}

Expected behavior

Uniqueness error should be returned. Pandas uniqueness verification works fine.

Desktop (please complete the following information):

  • Executed in Jupyter Notebook
  • pandera==0.16.1 pyspark==3.4.1
  • python==3.9

mdenushev avatar Sep 21 '23 20:09 mdenushev

@NeerajMalhotra-QB @jaskaransinghsidana the unique=True core check was never implemented for pyspark right? I don't see it here:

https://github.com/unionai-oss/pandera/blob/main/pandera/backends/pyspark/column.py#L38-L42

Would this be as simple as:

df.select(schema.name).distinct().count() == df.select(schema.name).count()

If not we should probably raise a SchemaInitError to say that it isn't currently supported

cosmicBboy avatar Sep 23 '23 15:09 cosmicBboy

Yeah, implementation is simple. User needs to be careful about using it in production though. By default it should be disabled and only runs when absolutely needed.

On Sep 23, 2023, at 8:29 AM, Niels Bantilan @.***> wrote:

 @NeerajMalhotra-QB @jaskaransinghsidana the unique=True core check was never implemented for pyspark right? I don't see it here: https: //github. com/unionai-oss/pandera/blob/main/pandera/backends/pyspark/column. py#L38-L42 Would this be as simple

@NeerajMalhotra-QBhttps://urldefense.com/v3/__https://github.com/NeerajMalhotra-QB__;!!EIXh2HjOrYMV!eFo7oErYrsFrricKFg7wpkZmXo5sRLEWqpP9Y9IUR6qBAee3TYfHy9SQpIA4K5AuqZajCsdzsZETJhFm9W41g1vLLFSZd0M$ @jaskaransinghsidanahttps://urldefense.com/v3/__https://github.com/jaskaransinghsidana__;!!EIXh2HjOrYMV!eFo7oErYrsFrricKFg7wpkZmXo5sRLEWqpP9Y9IUR6qBAee3TYfHy9SQpIA4K5AuqZajCsdzsZETJhFm9W41g1vLqDnVQ3U$ the unique=True core check was never implemented for pyspark right? I don't see it here:

https://github.com/unionai-oss/pandera/blob/main/pandera/backends/pyspark/column.py#L38-L42https://urldefense.com/v3/__https://github.com/unionai-oss/pandera/blob/main/pandera/backends/pyspark/column.py*L38-L42__;Iw!!EIXh2HjOrYMV!eFo7oErYrsFrricKFg7wpkZmXo5sRLEWqpP9Y9IUR6qBAee3TYfHy9SQpIA4K5AuqZajCsdzsZETJhFm9W41g1vLuFeSJC4$

Would this be as simple as:

df.select(schema.name).distinct().count() == df.select(schema.name).count()

— Reply to this email directly, view it on GitHubhttps://urldefense.com/v3/__https://github.com/unionai-oss/pandera/issues/1344*issuecomment-1732342719__;Iw!!EIXh2HjOrYMV!eFo7oErYrsFrricKFg7wpkZmXo5sRLEWqpP9Y9IUR6qBAee3TYfHy9SQpIA4K5AuqZajCsdzsZETJhFm9W41g1vLfeZkiTY$, or unsubscribehttps://urldefense.com/v3/__https://github.com/notifications/unsubscribe-auth/AMOND3TO5NM7TIIXWG7WCGLX3353TANCNFSM6AAAAAA5CDWMQY__;!!EIXh2HjOrYMV!eFo7oErYrsFrricKFg7wpkZmXo5sRLEWqpP9Y9IUR6qBAee3TYfHy9SQpIA4K5AuqZajCsdzsZETJhFm9W41g1vLugjzQ4w$. You are receiving this because you were mentioned.Message ID: @.***>

+=============================================================+ This email is confidential and may be privileged. If you have received it in error, please notify us immediately, delete the email, and do not copy it, disclose its contents or use it for any purpose. +=============================================================+

NeerajMalhotra-QB avatar Sep 23 '23 15:09 NeerajMalhotra-QB

By default it should be disabled and only runs when absolutely needed.

Cool, unique=False by default, so opting in with unique=True should make sense

cosmicBboy avatar Sep 23 '23 19:09 cosmicBboy

It would be great to at minimum have this in the documentation. It breaks trust with the user to not have documentation or a warning on this. One thing to call out is that you can use Config to do it. Sample code:

import pandera.pyspark as pa
import pyspark.sql.types as T

from decimal import Decimal
from pyspark.sql import SparkSession
from pandera.pyspark import DataFrameModel
from unittest import TestCase

spark = SparkSession.builder.getOrCreate()

class TestPanderaSpark(TestCase):
    def test_unique(self):
        class PanderaSchema(DataFrameModel):
            id: T.IntegerType() = pa.Field(gt=5)
            product_name: T.StringType() = pa.Field(str_startswith="B")
            price: T.DecimalType(20, 5) = pa.Field()
            description: T.ArrayType(T.StringType()) = pa.Field()
            meta: T.MapType(T.StringType(), T.StringType()) = pa.Field()

            class Config:
                """Config of pandera class"""

                unique = "id"


        data = [
            (
                6,
                "Bread",
                Decimal(44.4),
                ["description of product"],
                {"product_category": "dairy"},
            ),
            (
                15,
                "Butter",
                Decimal(99.0),
                ["more details here"],
                {"product_category": "bakery"},
            ),
            (
                15,
                "Buzz",
                Decimal(99.0),
                ["more details here"],
                {"product_category": "bakery"},
            ),
        ]

        spark_schema = T.StructType(
            [
                T.StructField("id", T.IntegerType(), False),
                T.StructField("product", T.StringType(), False),
                T.StructField("price", T.DecimalType(20, 5), False),
                T.StructField("description", T.ArrayType(T.StringType(), False), False),
                T.StructField("meta", T.MapType(T.StringType(), T.StringType(), False), False),
            ],
        )
        df = spark.createDataFrame(data, spark_schema)
        df_out = PanderaSchema.validate(check_obj=df)
        self.assertTrue(len(df_out.pandera.errors['DATA']['DUPLICATES'])==1)

zippeurfou avatar Apr 18 '24 16:04 zippeurfou

Happy to review a PR to update documentation and a warning/error if it's specified @zippeurfou .

It breaks trust with the user to not have documentation or a warning on this.

Would it make sense to raise a SchemaInitError here instead? Seems like while this isn't implemented it should fail fast. Can basically raise an error here so that the behavior is clearer.

cosmicBboy avatar Apr 19 '24 02:04 cosmicBboy

Thanks @cosmicBboy, yes it makes sense. I will see if I can raise a PR for this.

zippeurfou avatar Apr 19 '24 05:04 zippeurfou

pinging this issue again, in case anyone has the capacity to make a PR for it. Basically the PR just needs to implement the solution described here, with unit tests

cosmicBboy avatar May 07 '24 14:05 cosmicBboy

What about ensuring uniqueness values over a composite primary key of a table, for example?

I understand that all three id_* columns below should be taken into account when applying the unique check:

        class PanderaSchema(DataFrameModel):
            id_1: T.IntegerType() = pa.Field(unique=True)  # composite primary key
            id_2: T.IntegerType() = pa.Field(unique=True)  # composite primary key
            id_3: T.IntegerType() = pa.Field(unique=True)  # composite primary key
            product_name: T.StringType() = pa.Field(str_startswith="B")
            price: T.DecimalType(20, 5) = pa.Field()
            description: T.ArrayType(T.StringType()) = pa.Field()
            meta: T.MapType(T.StringType(), T.StringType()) = pa.Field()

The test couldn't be done at column level only, but at DataFrameModel/Schema-level instead.

The logic of the unique could be something like this:

df_non_unique = df.groupBy(
    [column for column in df if column.<some_internal_unique_attribute>]
).agg{
    f.count().alias("count")
).filter("count" > 1)

if df_non_unique.count() > 0:
    raise Exception("The fields with unique check are not jointly unique")

WDYT, @NeerajMalhotra-QB, @zippeurfou?

Edit: The above behavior can be achieved with the Config class and it behaves correctly:

class PanderaSchema(DataFrameModel):
    id: T.IntegerType() = pa.Field()
    product_name: T.StringType() = pa.Field()
    price: T.DecimalType(20, 5) = pa.Field()
    description: T.ArrayType(T.StringType()) = pa.Field()
    meta: T.MapType(T.StringType(), T.StringType()) = pa.Field()

    class Config:
        unique = ["id", "product_name", "price"]

It's implemented here: image

filipeo2-mck avatar Jun 21 '24 14:06 filipeo2-mck

Related: #1285

filipeo2-mck avatar Jun 21 '24 14:06 filipeo2-mck