Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrong result when .with_row_index() is used with .collect(streaming=True) #20694

Closed
2 tasks done
jhirsch-mhp opened this issue Jan 13, 2025 · 3 comments
Closed
2 tasks done
Labels
A-streaming Related to the streaming engine bug Something isn't working P-low Priority: low python Related to Python Polars

Comments

@jhirsch-mhp
Copy link

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

import polars as pl
print(f"polars version: {pl.__version__}")

# define simple Dataframe
d = pl.DataFrame({"a": range(4)})
print(d)

# Make lazy
d = d.lazy()

# Split d into two parts and concat again after applying `with_row_index()` on one
with_row_idx = pl.concat(
    [
        d.filter(pl.col("a") < 2).with_row_index("row_id").drop("row_id"),
        d.filter(pl.col("a") >= 2),
    ],
    how="vertical",
)

# Same without `with_row_index`
without_row_idx = pl.concat(
    [
        d.filter(pl.col("a") < 2),
        d.filter(pl.col("a") >= 2),
    ],
    how="vertical",
)

# Show Plans
print("\nWith row idx:\n", with_row_idx.explain())

print("\nWithout row idx:\n", without_row_idx.explain())

# See different results for Streaming
print(f"\nWith Row_id, Streaming=False: {with_row_idx.collect(streaming=False)}")
print(f"With Row_id, Streaming=True: {with_row_idx.collect(streaming=True)}")  # <--- This result is wrong

print(f"\nWithout Row_id, Streaming=False: {without_row_idx.collect(streaming=False)}")
print(f"Without Row_id, Streaming=True: {without_row_idx.collect(streaming=True)}")

Log output

polars version: 1.19.0
shape: (4, 1)
┌─────┐
│ a   │
│ --- │
│ i64 │
╞═════╡
│ 0   │
│ 1   │
│ 2   │
│ 3   │
└─────┘

With row idx:
 UNION
  PLAN 0:
    simple π 1/2 ["a"]
      ROW_INDEX
        FILTER [(col("a")) < (2)] FROM
          DF ["a"]; PROJECT */1 COLUMNS
  PLAN 1:
    FILTER [(col("a")) >= (2)] FROM
      DF ["a"]; PROJECT */1 COLUMNS
END UNION

Without row idx:
 UNION
  PLAN 0:
    FILTER [(col("a")) < (2)] FROM
      DF ["a"]; PROJECT */1 COLUMNS
  PLAN 1:
    FILTER [(col("a")) >= (2)] FROM
      DF ["a"]; PROJECT */1 COLUMNS
END UNION

With Row_id, Streaming=False: shape: (4, 1)
┌─────┐
│ a   │
│ --- │
│ i64 │
╞═════╡
│ 0   │
│ 1   │
│ 2   │
│ 3   │
└─────┘
With Row_id, Streaming=True: shape: (2, 1)
┌─────┐
│ a   │
│ --- │
│ i64 │
╞═════╡
│ 2   │
│ 3   │
└─────┘

Without Row_id, Streaming=False: shape: (4, 1)
┌─────┐
│ a   │
│ --- │
│ i64 │
╞═════╡
│ 0   │
│ 1   │
│ 2   │
│ 3   │
└─────┘
Without Row_id, Streaming=True: shape: (4, 1)
┌─────┐
│ a   │
│ --- │
│ i64 │
╞═════╡
│ 0   │
│ 1   │
│ 2   │
│ 3   │
└─────┘

Issue description

When using the .with_row_index() on a subset of a LazyFrame , it does yield wrong results after concat() when using streaming=True. The part of the data where the index is applied is just not used for concatenation.
With streaming=False everything works as expected.

Also, when replacing the .with_row_index("row_id").drop("row_id"), part with .with_columns(pl.lit([9,9]).alias('row_id')).drop("row_id"), everything works fine. So I guess it is really related to the with_row_index() function.

Expected behavior

There should not be any difference between streaming=True and streaming=False.
At least I did not find anything in the Docs regarding this.

Installed versions

--------Version info--------- Polars: 1.19.0 Index type: UInt32 Platform: Windows-10-10.0.19045-SP0 Python: 3.10.10 (tags/v3.10.10:aad5f6a, Feb 7 2023, 17:20:36) [MSC v.1929 64 bit (AMD64)] LTS CPU: False

----Optional dependencies----
adbc_driver_manager
altair
azure.identity
boto3 1.35.7
cloudpickle 3.0.0
connectorx
deltalake
fastexcel 0.11.6
fsspec 2023.12.2
gevent
google.auth 2.34.0
great_tables
matplotlib 3.9.2
nest_asyncio 1.6.0
numpy 1.24.4
openpyxl 3.1.5
pandas 2.2.2
pyarrow 13.0.0
pydantic
pyiceberg
sqlalchemy 2.0.32
torch 2.4.0+cpu
xlsx2csv 0.8.3
xlsxwriter 3.2.0

@jhirsch-mhp jhirsch-mhp added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Jan 13, 2025
@jhirsch-mhp jhirsch-mhp changed the title Wrong result when .with_row_index() is used in with .collect(streaming=True) Wrong result when .with_row_index() is used with .collect(streaming=True) Jan 13, 2025
@alexander-beedie alexander-beedie added the A-streaming Related to the streaming engine label Jan 14, 2025
@orlp
Copy link
Collaborator

orlp commented Jan 14, 2025

This works correctly in the new streaming engine, I'm not sure if it's worth the time to fix this in the old streaming engine if it will be replaced soon.

@orlp orlp added the P-low Priority: low label Jan 15, 2025
@orlp orlp removed the needs triage Awaiting prioritization by a maintainer label Jan 15, 2025
@github-project-automation github-project-automation bot moved this to Ready in Backlog Jan 15, 2025
@jhirsch-mhp
Copy link
Author

I just found out, that is it not only exclusive to the .with_row_index() function. It seems to be a more general problem and also occure on other cases.

See the following:

import polars as pl

print(f"polars version: {pl.__version__}")

# define simple Dataframe
d = pl.DataFrame({"a": range(4)}, schema={"a": pl.Float64})
print(d)

# Make lazy
d = d.lazy()

# Split the data
d1 = d.filter(pl.col("a") >= 2)
d2 = d.filter(pl.col("a") < 2)

# make a processing with aliasing
d2 = (d2
    .with_columns(pl.col("a").alias("index"))
    .with_columns(pl.mean("index"))  #  <- The problem here is not limited to the `mean` function! 
)

# Make d1 fit d2
d1 = d1.with_columns(pl.lit(0, dtype=pl.Float64).alias("index"))

# combine 
combined = pl.concat(
    [d2, d1],
    how="vertical",
)

# Show Plans
print("\nWith row idx:\n", combined.explain())

# See different results for Streaming
print(f"\nStreaming=False: {combined.collect(streaming=False)}")
print(f"Streaming=True: {combined.collect(streaming=True)}")  # <--- This result is wrong

Results in the following output:

polars version: 1.20.0
shape: (4, 1)
┌─────┐
│ a   │
│ --- │
│ f64 │
╞═════╡
│ 0.0 │
│ 1.0 │
│ 2.0 │
│ 3.0 │
└─────┘

With row idx:
 UNION
  PLAN 0:
     WITH_COLUMNS:
     [col("index").mean()] 
       WITH_COLUMNS:
       [col("a").alias("index")] 
        FILTER [(col("a")) < (2.0)] FROM
          DF ["a"]; PROJECT */1 COLUMNS
  PLAN 1:
     WITH_COLUMNS:
     [0.0.alias("index")] 
      FILTER [(col("a")) >= (2.0)] FROM
        DF ["a"]; PROJECT */1 COLUMNS
END UNION

Streaming=False: shape: (4, 2)
┌─────┬───────┐
│ a   ┆ index │
│ --- ┆ ---   │
│ f64 ┆ f64   │
╞═════╪═══════╡
│ 0.0 ┆ 0.5   │
│ 1.0 ┆ 0.5   │
│ 2.0 ┆ 0.0   │
│ 3.0 ┆ 0.0   │
└─────┴───────┘
Streaming=True: shape: (2, 2)
┌─────┬───────┐
│ a   ┆ index │
│ --- ┆ ---   │
│ f64 ┆ f64   │
╞═════╪═══════╡
│ 2.0 ┆ 0.0   │
│ 3.0 ┆ 0.0   │
└─────┴───────┘

This clearly should not be and currently I cannot use and trust the streaming=True option.
When is the new steaming pipeline planned?

@jhirsch-mhp
Copy link
Author

This is problem is even more general than described above. I opened another issue #20833 and close this one.

@github-project-automation github-project-automation bot moved this from Ready to Done in Backlog Jan 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-streaming Related to the streaming engine bug Something isn't working P-low Priority: low python Related to Python Polars
Projects
Archived in project
Development

No branches or pull requests

3 participants