From 6c42221ecaf4841c6ae34305f371cadc87a75a10 Mon Sep 17 00:00:00 2001 From: Jim Crist-Harif Date: Fri, 30 Aug 2024 13:22:39 -0500 Subject: [PATCH] feat(api): avoid caching already concrete tables --- ibis/backends/__init__.py | 52 ++++++++++++++++++++++++------- ibis/backends/duckdb/__init__.py | 18 +++++++++++ ibis/backends/pyspark/__init__.py | 19 +++++++++++ ibis/expr/types/relations.py | 3 +- 4 files changed, 80 insertions(+), 12 deletions(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index 371affac47fc..14b35ff69364 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -805,17 +805,22 @@ def _cached_table(self, table: ir.Table) -> ir.CachedTable: Cached table """ entry = self._cache_op_to_entry.get(table.op()) - if entry is None or (cached_op := entry.cached_op_ref()) is None: - cached_op = self._create_cached_table(util.gen_name("cached"), table).op() - entry = CacheEntry( - table.op(), - weakref.ref(cached_op), - weakref.finalize( - cached_op, self._finalize_cached_table, cached_op.name - ), - ) - self._cache_op_to_entry[table.op()] = entry - self._cache_name_to_entry[cached_op.name] = entry + if entry is not None and (cached_op := entry.cached_op_ref()) is not None: + # Entry was already cached + return ir.CachedTable(cached_op) + + if not self._should_cache_table_expr(table): + # Expression isn't worth caching, no-op + return ir.CachedTable(table.op()) + + cached_op = self._create_cached_table(util.gen_name("cached"), table).op() + entry = CacheEntry( + table.op(), + weakref.ref(cached_op), + weakref.finalize(cached_op, self._finalize_cached_table, cached_op.name), + ) + self._cache_op_to_entry[table.op()] = entry + self._cache_name_to_entry[cached_op.name] = entry return ir.CachedTable(cached_op) def _finalize_cached_table(self, name: str) -> None: @@ -838,6 +843,31 @@ def _finalize_cached_table(self, name: str) -> None: if not sys.is_finalizing(): raise + def _should_cache_table_expr(self, expr: ir.Table) -> bool: + """Checks if a given table expression is worth caching.""" + op = expr.op() + + # Don't cache if an expression is a column subselection of a physical table. + while isinstance(op, (ops.Project, ops.DropColumns)): + if isinstance(op, ops.Project) and not all( + isinstance(v, ops.Field) for v in op.values.values() + ): + return True + op = op.parent + + return not isinstance( + op, ops.PhysicalTable + ) or self._should_cache_physical_table(op) + + def _should_cache_physical_table(self, op: ops.PhysicalTable) -> bool: + """Check whether a PhysicalTable node is worth caching. + + By default we don't cache any PhysicalTable ops. Some backends need + to override this method to allow for caching of tables backed by + potentially expensive IO (e.g. a TEMP VIEW backed by data on S3). + """ + return False + def _create_cached_table(self, name: str, expr: ir.Table) -> ir.Table: return self.create_table(name, expr, schema=expr.schema(), temp=True) diff --git a/ibis/backends/duckdb/__init__.py b/ibis/backends/duckdb/__init__.py index c55289e736e0..1e2dc5e9f26f 100644 --- a/ibis/backends/duckdb/__init__.py +++ b/ibis/backends/duckdb/__init__.py @@ -1589,6 +1589,24 @@ def _get_schema_using_query(self, query: str) -> sch.Schema: } ) + def _should_cache_physical_table(self, op: ops.PhysicalTable) -> bool: + if isinstance(op, (ops.DatabaseTable, ops.UnboundTable)): + # Cache temp views since they're used for `read_csv`/`read_parquet` + # and may point to remote data, don't cache anything else. + sql = ( + sg.select(sg.func("any_value", C.table_type.eq("VIEW"))) + .from_(sg.table("tables", db="information_schema")) + .where( + C.table_catalog.eq(op.namespace.catalog or self.current_catalog), + C.table_schema.eq(op.namespace.database or self.current_database), + ) + .sql(self.dialect) + ) + with self._safe_raw_sql(sql) as cur: + result = cur.fetchone() + return True if result is None else result[0] + return False + def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: name = op.name try: diff --git a/ibis/backends/pyspark/__init__.py b/ibis/backends/pyspark/__init__.py index 560406f23a4a..f3d36843808e 100644 --- a/ibis/backends/pyspark/__init__.py +++ b/ibis/backends/pyspark/__init__.py @@ -721,6 +721,25 @@ def _drop_cached_table(self, name): t.unpersist() assert not t.is_cached + def _should_cache_physical_table(self, op: ops.PhysicalTable) -> bool: + if isinstance(op, (ops.DatabaseTable, ops.UnboundTable)): + # Cache temp views since they're used for `read_csv`/`read_parquet` + # and may point to remote data, don't cache anything else. + sql = ( + f"SHOW VIEWS IN {op.namespace.database}" + if op.namespace.database + else "SHOW VIEWS" + ) + with self._active_catalog(op.namespace.catalog): + for view in self._session.sql(sql).collect(): + if view.viewName == op.name: + # already cached tables are also exposed as temp views, + # check the view isn't backed by the cache + return ( + view.isTemporary and op.name not in self._cached_dataframes + ) + return False + def read_delta( self, path: str | Path, diff --git a/ibis/expr/types/relations.py b/ibis/expr/types/relations.py index e4af609c2931..22c66534ab7b 100644 --- a/ibis/expr/types/relations.py +++ b/ibis/expr/types/relations.py @@ -4900,7 +4900,8 @@ def __enter__(self): def release(self): """Release the underlying expression from the cache.""" current_backend = self._find_backend(use_default=True) - return current_backend._finalize_cached_table(self.op().name) + if isinstance(op := self.op(), ops.PhysicalTable): + current_backend._finalize_cached_table(op.name) public(Table=Table, CachedTable=CachedTable)