Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): avoid caching physical tables #9976

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 41 additions & 11 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,17 +805,22 @@ def _cached_table(self, table: ir.Table) -> ir.CachedTable:
Cached table
"""
entry = self._cache_op_to_entry.get(table.op())
if entry is None or (cached_op := entry.cached_op_ref()) is None:
cached_op = self._create_cached_table(util.gen_name("cached"), table).op()
entry = CacheEntry(
table.op(),
weakref.ref(cached_op),
weakref.finalize(
cached_op, self._finalize_cached_table, cached_op.name
),
)
self._cache_op_to_entry[table.op()] = entry
self._cache_name_to_entry[cached_op.name] = entry
if entry is not None and (cached_op := entry.cached_op_ref()) is not None:
# Entry was already cached
return ir.CachedTable(cached_op)

if not self._should_cache_table_expr(table):
# Expression isn't worth caching, no-op
return ir.CachedTable(table.op())

cached_op = self._create_cached_table(util.gen_name("cached"), table).op()
entry = CacheEntry(
table.op(),
weakref.ref(cached_op),
weakref.finalize(cached_op, self._finalize_cached_table, cached_op.name),
)
self._cache_op_to_entry[table.op()] = entry
self._cache_name_to_entry[cached_op.name] = entry
return ir.CachedTable(cached_op)

def _finalize_cached_table(self, name: str) -> None:
Expand All @@ -838,6 +843,31 @@ def _finalize_cached_table(self, name: str) -> None:
if not sys.is_finalizing():
raise

def _should_cache_table_expr(self, expr: ir.Table) -> bool:
"""Checks if a given table expression is worth caching."""
op = expr.op()

# Don't cache if an expression is a column subselection of a physical table.
while isinstance(op, (ops.Project, ops.DropColumns)):
if isinstance(op, ops.Project) and not all(
isinstance(v, ops.Field) for v in op.values.values()
):
Comment on lines +851 to +854
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this code is unrelated to the backend and is about some property of an expression.

Can we define this as an attribute on ops.Relation subclasses? Seems like it'd be much less squishy that way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning the decision is consistent and only based on the operation type (no query in the backend needed)? That's option 2 listed above.

return True
op = op.parent

return not isinstance(
op, ops.PhysicalTable
) or self._should_cache_physical_table(op)

def _should_cache_physical_table(self, op: ops.PhysicalTable) -> bool:
"""Check whether a PhysicalTable node is worth caching.

By default we don't cache any PhysicalTable ops. Some backends need
to override this method to allow for caching of tables backed by
potentially expensive IO (e.g. a TEMP VIEW backed by data on S3).
"""
return False

def _create_cached_table(self, name: str, expr: ir.Table) -> ir.Table:
return self.create_table(name, expr, schema=expr.schema(), temp=True)

Expand Down
18 changes: 18 additions & 0 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,24 @@
}
)

def _should_cache_physical_table(self, op: ops.PhysicalTable) -> bool:
if isinstance(op, (ops.DatabaseTable, ops.UnboundTable)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a property we should encode on the operation itself instead of leaving it up to the backend.

Is that not possible for some reason?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I covered this in the PR body above. I agree that that would be one potentially nicer option (that's bullet 2 above).

# Cache temp views since they're used for `read_csv`/`read_parquet`
# and may point to remote data, don't cache anything else.
sql = (
sg.select(sg.func("any_value", C.table_type.eq("VIEW")))
.from_(sg.table("tables", db="information_schema"))
.where(
C.table_catalog.eq(op.namespace.catalog or self.current_catalog),
C.table_schema.eq(op.namespace.database or self.current_database),
)
.sql(self.dialect)
)
with self._safe_raw_sql(sql) as cur:
result = cur.fetchone()
return True if result is None else result[0]
return False

Check warning on line 1608 in ibis/backends/duckdb/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/duckdb/__init__.py#L1608

Added line #L1608 was not covered by tests

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
name = op.name
try:
Expand Down
19 changes: 19 additions & 0 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,25 @@
t.unpersist()
assert not t.is_cached

def _should_cache_physical_table(self, op: ops.PhysicalTable) -> bool:
if isinstance(op, (ops.DatabaseTable, ops.UnboundTable)):
# Cache temp views since they're used for `read_csv`/`read_parquet`
# and may point to remote data, don't cache anything else.
sql = (
f"SHOW VIEWS IN {op.namespace.database}"
if op.namespace.database
else "SHOW VIEWS"
)
with self._active_catalog(op.namespace.catalog):
for view in self._session.sql(sql).collect():
if view.viewName == op.name:
# already cached tables are also exposed as temp views,
# check the view isn't backed by the cache
return (
view.isTemporary and op.name not in self._cached_dataframes
)
return False

Check warning on line 741 in ibis/backends/pyspark/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/pyspark/__init__.py#L741

Added line #L741 was not covered by tests

def read_delta(
self,
path: str | Path,
Expand Down
3 changes: 2 additions & 1 deletion ibis/expr/types/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4900,7 +4900,8 @@ def __enter__(self):
def release(self):
"""Release the underlying expression from the cache."""
current_backend = self._find_backend(use_default=True)
return current_backend._finalize_cached_table(self.op().name)
if isinstance(op := self.op(), ops.PhysicalTable):
current_backend._finalize_cached_table(op.name)


public(Table=Table, CachedTable=CachedTable)