From e4a52e76b195d3c09fb7575b6de85bae19cec187 Mon Sep 17 00:00:00 2001 From: joocer Date: Tue, 4 Feb 2025 22:15:52 +0000 Subject: [PATCH] 0.0.182 --- Makefile | 9 ++-- orso/compute/compiled.pyx | 92 +++++++++++++++++++++++++--------- orso/compute/varchar_array.pyx | 2 +- orso/dataframe.py | 6 +-- orso/version.py | 2 +- tests/__init__.py | 2 +- tests/test_compiled.py | 62 +++++++++++++++++++++-- tests/test_display.py | 2 +- tests/test_field_extractor.py | 56 ++++++++++++++++++++- 9 files changed, 193 insertions(+), 40 deletions(-) diff --git a/Makefile b/Makefile index 7c93d60..c22a4c1 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,17 @@ lint: - python -m pip install --quiet --upgrade pycln isort ruff yamllint + python -m pip install --upgrade pip uv + python -m uv pip install --quiet --upgrade pycln isort ruff yamllint cython-lint # python -m yamllint . +# cython-lint orso/compute/*.pyx python -m ruff check --fix --exit-zero python -m pycln . python -m isort . python -m ruff format orso update: - python -m pip install --quiet --upgrade -r requirements.txt - python -m pip install --quiet --upgrade -r tests/requirements.txt + python -m pip install --upgrade pip uv + python -m uv pip install --upgrade -r tests/requirements.txt + python -m uv pip install --upgrade -r requirements.txt test: python -m pip install --quiet --upgrade pytest coverage diff --git a/orso/compute/compiled.pyx b/orso/compute/compiled.pyx index cf4b36f..3b3a6dc 100644 --- a/orso/compute/compiled.pyx +++ b/orso/compute/compiled.pyx @@ -25,13 +25,11 @@ from cython cimport int from datetime import datetime from ormsgpack import unpackb from orso.exceptions import DataError -from typing import Dict, Any, Tuple -from libc.stdlib cimport malloc, free import numpy as np -cimport cython cimport numpy as cnp from numpy cimport ndarray -from libc.stdint cimport int32_t +from libc.stdint cimport int32_t, int64_t +from cpython.dict cimport PyDict_GetItem cnp.import_array() @@ -47,12 +45,12 @@ cpdef from_bytes_cython(bytes data): # Validate header and size, now using pointer arithmetic if length < HEADER_SIZE or (data_ptr[0] & 0xF0 != 0x10): raise DataError("Data malformed") - + # Deserialize record bytes cdef Py_ssize_t record_size = ( (data_ptr[2]) << 24 | (data_ptr[3]) << 16 | - (data_ptr[4]) << 8 | + (data_ptr[4]) << 8 | (data_ptr[5]) ) @@ -64,7 +62,6 @@ cpdef from_bytes_cython(bytes data): cdef list processed_list = [] cdef object item - for item in raw_tuple: if isinstance(item, list) and len(item) == 2 and item[0] == "__datetime__": processed_list.append(datetime.fromtimestamp(item[1])) @@ -73,47 +70,74 @@ cpdef from_bytes_cython(bytes data): return tuple(processed_list) - cpdef tuple extract_dict_columns(dict data, tuple fields): - cdef int i - cdef str field - cdef list field_data = [None] * len(fields) # Preallocate list size + """ + Extracts the given fields from a dictionary and returns them as a tuple. + + Parameters: + data: dict + The dictionary to extract fields from. + fields: tuple + The field names to extract. + + Returns: + A tuple containing values from the dictionary for the requested fields. + Missing fields will have None. + """ + cdef int64_t i, num_fields = len(fields) + cdef void* value_ptr + cdef list field_data = [None] * num_fields + + for i in range(num_fields): + value_ptr = PyDict_GetItem(data, fields[i]) + if value_ptr != NULL: + field_data[i] = value_ptr + else: + field_data[i] = None - for i, field in enumerate(fields): - if field in data: - field_data[i] = data[field] return tuple(field_data) # Convert list to tuple -cpdef cnp.ndarray collect_cython(list rows, cnp.ndarray[cnp.int32_t, ndim=1] columns, int limit=-1): +cpdef cnp.ndarray collect_cython(list rows, int32_t[:] columns, int limit=-1): """ Collects columns from a list of tuples (rows). """ - cdef int32_t i, j, col_idx - cdef int32_t num_rows = len(rows) - cdef int32_t num_cols = columns.shape[0] - cdef cnp.ndarray row + cdef int64_t i, j, col_idx + cdef int64_t num_rows = len(rows) + cdef int64_t num_cols = columns.shape[0] + cdef int64_t row_width = len(rows[0]) if num_rows > 0 else 0 + # Check if limit is set and within bounds if limit >= 0 and limit < num_rows: num_rows = limit + # Check if there are any rows or columns and exit early + if num_rows == 0 or num_cols == 0: + return np.empty((num_cols, num_rows), dtype=object) + + # Check if columns are within bounds + for j in range(num_cols): + col_idx = columns[j] + if col_idx < 0 or col_idx > row_width: + raise IndexError(f"Column index out of bounds (0 < {col_idx} < {row_width})") + # Initialize result memory view with pre-allocated numpy arrays for each column - cdef cnp.ndarray result = np.empty((num_cols, num_rows), dtype=object) + cdef object[:, :] result = np.empty((num_cols, num_rows), dtype=object) # Populate each column one at a time for j in range(num_cols): col_idx = columns[j] for i in range(num_rows): result[j, i] = rows[i][col_idx] - + # Convert each column back to a list and return the list of lists - return result + return np.asarray(result) cpdef int calculate_data_width(cnp.ndarray column_values): cdef int width, max_width cdef object value - + max_width = 4 # Default width for value in column_values: if value is not None: @@ -124,11 +148,29 @@ cpdef int calculate_data_width(cnp.ndarray column_values): return max_width +from cpython.list cimport PyList_New, PyList_SET_ITEM + def process_table(table, row_factory, int max_chunksize) -> list: - cdef list rows = [] + """ + Processes a PyArrow table and applies a row factory function to each row. + + Parameters: + table: PyArrow Table + The input table to process. + row_factory: function + A function applied to each row. + max_chunksize: int + The batch size to process at a time. + + Returns: + A list of transformed rows. + """ + cdef list rows = [None] * table.num_rows + cdef int64_t i = 0 for batch in table.to_batches(max_chunksize): df = batch.to_pandas().replace({np.nan: None}) for row in df.itertuples(index=False, name=None): - rows.append(row_factory(row)) + rows[i] = row_factory(row) + i += 1 return rows diff --git a/orso/compute/varchar_array.pyx b/orso/compute/varchar_array.pyx index 798afce..b364260 100644 --- a/orso/compute/varchar_array.pyx +++ b/orso/compute/varchar_array.pyx @@ -117,4 +117,4 @@ def unpack_byte_array(const unsigned char[::1] raw_bytes, Py_ssize_t n, const ch bytecount -= 4 + itemlen i += 1 - return out \ No newline at end of file + return out diff --git a/orso/dataframe.py b/orso/dataframe.py index e5211c2..5d12418 100644 --- a/orso/dataframe.py +++ b/orso/dataframe.py @@ -125,7 +125,7 @@ def polars(self, size=None): return to_polars(self, size) - def nbytes(self): + def nbytes(self) -> int: """Approximate the number of bytes used by the DataFrame""" self.materialize() if self._nbytes is None: @@ -140,10 +140,10 @@ def append(self, entry): self._nbytes += new_row.nbytes() self._cursor = None - def head(self, size: int = 5): + def head(self, size: int = 5) -> "DataFrame": return self.slice(0, size) - def tail(self, size: int = 5): + def tail(self, size: int = 5) -> "DataFrame": return self.slice(offset=0 - size, length=size) def query(self, predicate) -> "DataFrame": diff --git a/orso/version.py b/orso/version.py index 065a1ea..a1f08ec 100644 --- a/orso/version.py +++ b/orso/version.py @@ -10,5 +10,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__: str = "0.0.181" +__version__: str = "0.0.182" __author__: str = "@joocer" diff --git a/tests/__init__.py b/tests/__init__.py index d6590b1..77b7941 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -23,7 +23,7 @@ def run_tests(): for index, method in enumerate(test_methods): start_time = time.monotonic_ns() test_name = f"\033[38;2;255;184;108m{(index + 1):04}\033[0m \033[38;2;189;147;249m{str(method.__name__)}\033[0m" - print(test_name.ljust(display_width - 20), end="") + print(test_name.ljust(display_width - 20), end="", flush=True) error = None output = "" try: diff --git a/tests/test_compiled.py b/tests/test_compiled.py index 6f7e6ee..117cd2e 100644 --- a/tests/test_compiled.py +++ b/tests/test_compiled.py @@ -1,20 +1,74 @@ import os import sys +import numpy sys.path.insert(1, os.path.join(sys.path[0], "..")) from orso.compute.compiled import collect_cython -import numpy def test_collector(): - columns = collect_cython([(1, 2), (2, 1), (7, 8)], numpy.array([1, 0], dtype=numpy.int32)) assert len(columns) == 2 assert len(columns[0]) == 3 assert sum(columns[0]) == 11 assert sum(columns[1]) == 10 -if __name__ == "__main__": # prgama: nocover +def test_collector_empty_input(): + columns = collect_cython([], numpy.array([], dtype=numpy.int32)) + assert len(columns) == 0, len(columns) + +def test_collector_single_tuple(): + columns = collect_cython([(5, 10)], numpy.array([1], dtype=numpy.int32)) + assert len(columns) == 1, len(columns) + assert columns[0] == [10] + +def test_collector_large_data(): + data = [(i, i * 2) for i in range(10000)] + index = numpy.array([1, 0], dtype=numpy.int32) + columns = collect_cython(data, index) + assert len(columns) == 2 + assert len(columns[0]) == 10000 + assert sum(columns[0]) == sum(i * 2 for i in range(10000)) + assert sum(columns[1]) == sum(range(10000)) + + +def test_collector_non_integer_index(): + data = [(1, 2), (3, 4)] + index = numpy.array([0.5, 1.5], dtype=numpy.float64) + try: + collect_cython(data, index) + assert False, "Expected a ValueError" + except ValueError: + pass + +def test_collector_negative_index(): + data = [(1, 2), (3, 4)] + index = numpy.array([-1, 0], dtype=numpy.int32) + try: + collect_cython(data, index) + assert False, "Expected an IndexError" + except IndexError: + pass + +def test_collector_large_index_values(): + data = [(1, 2), (3, 4)] + index = numpy.array([100, 200], dtype=numpy.int32) + try: + collect_cython(data, index) + assert False, "Expected an IndexError" + except IndexError: + pass + +def test_collector_duplicate_indices(): + data = [(1, 2), (3, 4), (5, 6)] + index = numpy.array([1, 1, 0], dtype=numpy.int32) + columns = collect_cython(data, index) + assert len(columns) == 3 + assert sum(columns[0]) == 12, sum(columns[0]) + assert sum(columns[1]) == 12, sum(columns[1]) + assert sum(columns[2]) == 9, sum(columns[2]) + +if __name__ == "__main__": # pragma: nocover from tests import run_tests - test_collector() + run_tests() diff --git a/tests/test_display.py b/tests/test_display.py index 541bf99..8af3fb9 100644 --- a/tests/test_display.py +++ b/tests/test_display.py @@ -39,7 +39,7 @@ def find_all_substrings(s: str, sub: str) -> List[int]: def test_display_ascii_lazy(): - for i in range(10): + for i in range(10): df = DataFrame(cities.values).head(i) df._rows = (r for r in df._rows) diff --git a/tests/test_field_extractor.py b/tests/test_field_extractor.py index 6a96184..877e642 100644 --- a/tests/test_field_extractor.py +++ b/tests/test_field_extractor.py @@ -10,7 +10,7 @@ def test_extract_dict_columns_basic(): data = {'a': 1, 'b': 2, 'c': 3} fields = ('a', 'b', 'c') result = extract_dict_columns(data, fields) - assert result == (1, 2, 3) + assert result == (1, 2, 3), result def test_extract_dict_columns_missing_fields(): data = {'a': 1, 'b': 2, 'c': 3} @@ -160,6 +160,60 @@ def test_extract_dict_columns_with_same_field_order(): assert result1 == (1, 2, 3, 4) assert result2 == (4, 3, 2, 1) +def test_extract_dict_columns_sparse_dict(): + data = {'a': 1} + fields = ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i') + result = extract_dict_columns(data, fields) + assert result == (1, None, None, None, None, None, None, None, None) + +def test_extract_dict_columns_immutability(): + data = {'a': 1, 'b': 2, 'c': 3} + original_data = data.copy() + fields = ('a', 'b', 'c') + extract_dict_columns(data, fields) + assert data == original_data, "Function modified input data!" + +def test_extract_dict_columns_stress_test(): + data = {str(i): i for i in range(10000)} + fields = tuple(str(i) for i in range(20000)) + result = extract_dict_columns(data, fields) + assert result[:10000] == tuple(range(10000)) and result[10000:] == (None,) * 10000 + +def test_extract_dict_columns_duplicate_missing_fields(): + data = {'a': 1, 'b': 2} + fields = ('a', 'x', 'x', 'b') + result = extract_dict_columns(data, fields) + assert result == (1, None, None, 2) + +def test_extract_dict_columns_case_sensitivity(): + data = {'A': 1, 'b': 2} + fields = ('a', 'b') + result = extract_dict_columns(data, fields) + assert result == (None, 2) + +def test_extract_dict_columns_whitespace_keys(): + data = {' a': 1, 'b ': 2} + fields = ('a', 'b ') + result = extract_dict_columns(data, fields) + assert result == (None, 2) + +def test_extract_dict_columns_non_string_keys(): + data = {None: 1, 42: "forty-two"} + fields = (None, 42, "missing") + result = extract_dict_columns(data, fields) + assert result == (1, "forty-two", None) + +def test_extract_dict_columns_deeply_nested_keys(): + data = {'a': {'b': {'c': 1}}} + fields = ('a.b.c', 'a') + result = extract_dict_columns(data, fields) + assert result == (None, {'b': {'c': 1}}) + +def test_extract_dict_columns_large_dataset_with_missing(): + data = {str(i): i for i in range(10**6)} + fields = ('1000001', '500000', '999999') + result = extract_dict_columns(data, fields) + assert result == (None, 500000, 999999) if __name__ == "__main__": # prgama: nocover from tests import run_tests