Skip to content

Commit

Permalink
Removing id generator (#594)
Browse files Browse the repository at this point in the history
Removing id generator
  • Loading branch information
ilongin authored Nov 29, 2024
1 parent 1d83f5c commit bf7d670
Show file tree
Hide file tree
Showing 31 changed files with 249 additions and 921 deletions.
5 changes: 0 additions & 5 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@

if TYPE_CHECKING:
from datachain.data_storage import (
AbstractIDGenerator,
AbstractMetastore,
AbstractWarehouse,
)
Expand Down Expand Up @@ -522,7 +521,6 @@ def find_column_to_str( # noqa: PLR0911
class Catalog:
def __init__(
self,
id_generator: "AbstractIDGenerator",
metastore: "AbstractMetastore",
warehouse: "AbstractWarehouse",
cache_dir=None,
Expand All @@ -535,7 +533,6 @@ def __init__(
):
datachain_dir = DataChainDir(cache=cache_dir, tmp=tmp_dir)
datachain_dir.init()
self.id_generator = id_generator
self.metastore = metastore
self._warehouse = warehouse
self.cache = DataChainCache(datachain_dir.cache, datachain_dir.tmp)
Expand Down Expand Up @@ -569,7 +566,6 @@ def get_init_params(self) -> dict[str, Any]:
def copy(self, cache=True, db=True):
result = copy(self)
if not db:
result.id_generator = None
result.metastore = None
result._warehouse = None
result.warehouse = None
Expand Down Expand Up @@ -969,7 +965,6 @@ def cleanup_tables(self, names: Iterable[str]) -> None:
are cleaned up as soon as they are no longer needed.
"""
self.warehouse.cleanup_tables(names)
self.id_generator.delete_uris(names)

def create_dataset_from_sources(
self,
Expand Down
73 changes: 8 additions & 65 deletions src/datachain/catalog/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,16 @@

from datachain.catalog import Catalog
from datachain.data_storage import (
AbstractIDGenerator,
AbstractMetastore,
AbstractWarehouse,
)
from datachain.data_storage.serializer import deserialize
from datachain.data_storage.sqlite import (
SQLiteIDGenerator,
SQLiteMetastore,
SQLiteWarehouse,
)
from datachain.utils import get_envs_by_prefix

ID_GENERATOR_SERIALIZED = "DATACHAIN__ID_GENERATOR"
ID_GENERATOR_IMPORT_PATH = "DATACHAIN_ID_GENERATOR"
ID_GENERATOR_ARG_PREFIX = "DATACHAIN_ID_GENERATOR_ARG_"
METASTORE_SERIALIZED = "DATACHAIN__METASTORE"
METASTORE_IMPORT_PATH = "DATACHAIN_METASTORE"
METASTORE_ARG_PREFIX = "DATACHAIN_METASTORE_ARG_"
Expand All @@ -31,45 +26,7 @@
IN_MEMORY_ERROR_MESSAGE = "In-memory is only supported on SQLite"


def get_id_generator(in_memory: bool = False) -> "AbstractIDGenerator":
id_generator_serialized = os.environ.get(ID_GENERATOR_SERIALIZED)
if id_generator_serialized:
id_generator_obj = deserialize(id_generator_serialized)
if not isinstance(id_generator_obj, AbstractIDGenerator):
raise RuntimeError(
"Deserialized ID generator is not an instance of AbstractIDGenerator: "
f"{id_generator_obj}"
)
return id_generator_obj

id_generator_import_path = os.environ.get(ID_GENERATOR_IMPORT_PATH)
id_generator_arg_envs = get_envs_by_prefix(ID_GENERATOR_ARG_PREFIX)
# Convert env variable names to keyword argument names by lowercasing them
id_generator_args: dict[str, Any] = {
k.lower(): v for k, v in id_generator_arg_envs.items()
}

if not id_generator_import_path:
id_generator_args["in_memory"] = in_memory
return SQLiteIDGenerator(**id_generator_args)
if in_memory:
raise RuntimeError(IN_MEMORY_ERROR_MESSAGE)
# ID generator paths are specified as (for example):
# datachain.data_storage.SQLiteIDGenerator
if "." not in id_generator_import_path:
raise RuntimeError(
f"Invalid {ID_GENERATOR_IMPORT_PATH} import path:"
f"{id_generator_import_path}"
)
module_name, _, class_name = id_generator_import_path.rpartition(".")
id_generator = import_module(module_name)
id_generator_class = getattr(id_generator, class_name)
return id_generator_class(**id_generator_args)


def get_metastore(
id_generator: Optional["AbstractIDGenerator"], in_memory: bool = False
) -> "AbstractMetastore":
def get_metastore(in_memory: bool = False) -> "AbstractMetastore":
metastore_serialized = os.environ.get(METASTORE_SERIALIZED)
if metastore_serialized:
metastore_obj = deserialize(metastore_serialized)
Expand All @@ -80,9 +37,6 @@ def get_metastore(
)
return metastore_obj

if id_generator is None:
id_generator = get_id_generator()

metastore_import_path = os.environ.get(METASTORE_IMPORT_PATH)
metastore_arg_envs = get_envs_by_prefix(METASTORE_ARG_PREFIX)
# Convert env variable names to keyword argument names by lowercasing them
Expand All @@ -91,10 +45,8 @@ def get_metastore(
}

if not metastore_import_path:
if not isinstance(id_generator, SQLiteIDGenerator):
raise ValueError("SQLiteMetastore can only be used with SQLiteIDGenerator")
metastore_args["in_memory"] = in_memory
return SQLiteMetastore(id_generator, **metastore_args)
return SQLiteMetastore(**metastore_args)
if in_memory:
raise RuntimeError(IN_MEMORY_ERROR_MESSAGE)
# Metastore paths are specified as (for example):
Expand All @@ -106,12 +58,10 @@ def get_metastore(
module_name, _, class_name = metastore_import_path.rpartition(".")
metastore = import_module(module_name)
metastore_class = getattr(metastore, class_name)
return metastore_class(id_generator, **metastore_args)
return metastore_class(**metastore_args)


def get_warehouse(
id_generator: Optional["AbstractIDGenerator"], in_memory: bool = False
) -> "AbstractWarehouse":
def get_warehouse(in_memory: bool = False) -> "AbstractWarehouse":
warehouse_serialized = os.environ.get(WAREHOUSE_SERIALIZED)
if warehouse_serialized:
warehouse_obj = deserialize(warehouse_serialized)
Expand All @@ -122,9 +72,6 @@ def get_warehouse(
)
return warehouse_obj

if id_generator is None:
id_generator = get_id_generator()

warehouse_import_path = os.environ.get(WAREHOUSE_IMPORT_PATH)
warehouse_arg_envs = get_envs_by_prefix(WAREHOUSE_ARG_PREFIX)
# Convert env variable names to keyword argument names by lowercasing them
Expand All @@ -133,10 +80,8 @@ def get_warehouse(
}

if not warehouse_import_path:
if not isinstance(id_generator, SQLiteIDGenerator):
raise ValueError("SQLiteWarehouse can only be used with SQLiteIDGenerator")
warehouse_args["in_memory"] = in_memory
return SQLiteWarehouse(id_generator, **warehouse_args)
return SQLiteWarehouse(**warehouse_args)
if in_memory:
raise RuntimeError(IN_MEMORY_ERROR_MESSAGE)
# Warehouse paths are specified as (for example):
Expand All @@ -148,7 +93,7 @@ def get_warehouse(
module_name, _, class_name = warehouse_import_path.rpartition(".")
warehouse = import_module(module_name)
warehouse_class = getattr(warehouse, class_name)
return warehouse_class(id_generator, **warehouse_args)
return warehouse_class(**warehouse_args)


def get_distributed_class(**kwargs):
Expand Down Expand Up @@ -188,11 +133,9 @@ def get_catalog(
and name of variable after, e.g. if it accepts team_id as kwargs
we can provide DATACHAIN_METASTORE_ARG_TEAM_ID=12345 env variable.
"""
id_generator = get_id_generator(in_memory=in_memory)
return Catalog(
id_generator=id_generator,
metastore=get_metastore(id_generator, in_memory=in_memory),
warehouse=get_warehouse(id_generator, in_memory=in_memory),
metastore=get_metastore(in_memory=in_memory),
warehouse=get_warehouse(in_memory=in_memory),
client_config=client_config,
in_memory=in_memory,
)
3 changes: 0 additions & 3 deletions src/datachain/data_storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from .id_generator import AbstractDBIDGenerator, AbstractIDGenerator
from .job import JobQueryType, JobStatus
from .metastore import AbstractDBMetastore, AbstractMetastore
from .warehouse import AbstractWarehouse

__all__ = [
"AbstractDBIDGenerator",
"AbstractDBMetastore",
"AbstractIDGenerator",
"AbstractMetastore",
"AbstractWarehouse",
"JobQueryType",
Expand Down
136 changes: 0 additions & 136 deletions src/datachain/data_storage/id_generator.py

This file was deleted.

11 changes: 2 additions & 9 deletions src/datachain/data_storage/metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from sqlalchemy import Delete, Insert, Select, Update
from sqlalchemy.schema import SchemaItem

from datachain.data_storage import AbstractIDGenerator, schema
from datachain.data_storage import schema
from datachain.data_storage.db_engine import DatabaseEngine

logger = logging.getLogger("datachain")
Expand Down Expand Up @@ -304,16 +304,10 @@ class AbstractDBMetastore(AbstractMetastore):
DATASET_DEPENDENCY_TABLE = "datasets_dependencies"
JOBS_TABLE = "jobs"

id_generator: "AbstractIDGenerator"
db: "DatabaseEngine"

def __init__(
self,
id_generator: "AbstractIDGenerator",
uri: Optional[StorageURI] = None,
):
def __init__(self, uri: Optional[StorageURI] = None):
uri = uri or StorageURI("")
self.id_generator = id_generator
super().__init__(uri)

def close(self) -> None:
Expand All @@ -322,7 +316,6 @@ def close(self) -> None:

def cleanup_tables(self, temp_table_names: list[str]) -> None:
"""Cleanup temp tables."""
self.id_generator.delete_uris(temp_table_names)

@classmethod
def _datasets_columns(cls) -> list["SchemaItem"]:
Expand Down
Loading

0 comments on commit bf7d670

Please sign in to comment.