Skip to content

Commit

Permalink
0.0.182
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Feb 4, 2025
1 parent 3923b5d commit e4a52e7
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 40 deletions.
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
lint:
python -m pip install --quiet --upgrade pycln isort ruff yamllint
python -m pip install --upgrade pip uv
python -m uv pip install --quiet --upgrade pycln isort ruff yamllint cython-lint
# python -m yamllint .
# cython-lint orso/compute/*.pyx
python -m ruff check --fix --exit-zero
python -m pycln .
python -m isort .
python -m ruff format orso

update:
python -m pip install --quiet --upgrade -r requirements.txt
python -m pip install --quiet --upgrade -r tests/requirements.txt
python -m pip install --upgrade pip uv
python -m uv pip install --upgrade -r tests/requirements.txt
python -m uv pip install --upgrade -r requirements.txt

test:
python -m pip install --quiet --upgrade pytest coverage
Expand Down
92 changes: 67 additions & 25 deletions orso/compute/compiled.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ from cython cimport int
from datetime import datetime
from ormsgpack import unpackb
from orso.exceptions import DataError
from typing import Dict, Any, Tuple
from libc.stdlib cimport malloc, free
import numpy as np
cimport cython
cimport numpy as cnp
from numpy cimport ndarray
from libc.stdint cimport int32_t
from libc.stdint cimport int32_t, int64_t
from cpython.dict cimport PyDict_GetItem

cnp.import_array()

Expand All @@ -47,12 +45,12 @@ cpdef from_bytes_cython(bytes data):
# Validate header and size, now using pointer arithmetic
if length < HEADER_SIZE or (data_ptr[0] & 0xF0 != 0x10):
raise DataError("Data malformed")

# Deserialize record bytes
cdef Py_ssize_t record_size = (
(<unsigned char>data_ptr[2]) << 24 |
(<unsigned char>data_ptr[3]) << 16 |
(<unsigned char>data_ptr[4]) << 8 |
(<unsigned char>data_ptr[4]) << 8 |
(<unsigned char>data_ptr[5])
)

Expand All @@ -64,7 +62,6 @@ cpdef from_bytes_cython(bytes data):
cdef list processed_list = []
cdef object item


for item in raw_tuple:
if isinstance(item, list) and len(item) == 2 and item[0] == "__datetime__":
processed_list.append(datetime.fromtimestamp(item[1]))
Expand All @@ -73,47 +70,74 @@ cpdef from_bytes_cython(bytes data):

return tuple(processed_list)


cpdef tuple extract_dict_columns(dict data, tuple fields):
cdef int i
cdef str field
cdef list field_data = [None] * len(fields) # Preallocate list size
"""
Extracts the given fields from a dictionary and returns them as a tuple.
Parameters:
data: dict
The dictionary to extract fields from.
fields: tuple
The field names to extract.
Returns:
A tuple containing values from the dictionary for the requested fields.
Missing fields will have None.
"""
cdef int64_t i, num_fields = len(fields)
cdef void* value_ptr
cdef list field_data = [None] * num_fields

for i in range(num_fields):
value_ptr = PyDict_GetItem(data, fields[i])
if value_ptr != NULL:
field_data[i] = <object>value_ptr
else:
field_data[i] = None

for i, field in enumerate(fields):
if field in data:
field_data[i] = data[field]
return tuple(field_data) # Convert list to tuple


cpdef cnp.ndarray collect_cython(list rows, cnp.ndarray[cnp.int32_t, ndim=1] columns, int limit=-1):
cpdef cnp.ndarray collect_cython(list rows, int32_t[:] columns, int limit=-1):
"""
Collects columns from a list of tuples (rows).
"""
cdef int32_t i, j, col_idx
cdef int32_t num_rows = len(rows)
cdef int32_t num_cols = columns.shape[0]
cdef cnp.ndarray row
cdef int64_t i, j, col_idx
cdef int64_t num_rows = len(rows)
cdef int64_t num_cols = columns.shape[0]
cdef int64_t row_width = len(rows[0]) if num_rows > 0 else 0

# Check if limit is set and within bounds
if limit >= 0 and limit < num_rows:
num_rows = limit

# Check if there are any rows or columns and exit early
if num_rows == 0 or num_cols == 0:
return np.empty((num_cols, num_rows), dtype=object)

# Check if columns are within bounds
for j in range(num_cols):
col_idx = columns[j]
if col_idx < 0 or col_idx > row_width:
raise IndexError(f"Column index out of bounds (0 < {col_idx} < {row_width})")

# Initialize result memory view with pre-allocated numpy arrays for each column
cdef cnp.ndarray result = np.empty((num_cols, num_rows), dtype=object)
cdef object[:, :] result = np.empty((num_cols, num_rows), dtype=object)

# Populate each column one at a time
for j in range(num_cols):
col_idx = columns[j]
for i in range(num_rows):
result[j, i] = rows[i][col_idx]

# Convert each column back to a list and return the list of lists
return result
return np.asarray(result)


cpdef int calculate_data_width(cnp.ndarray column_values):
cdef int width, max_width
cdef object value

max_width = 4 # Default width
for value in column_values:
if value is not None:
Expand All @@ -124,11 +148,29 @@ cpdef int calculate_data_width(cnp.ndarray column_values):
return max_width


from cpython.list cimport PyList_New, PyList_SET_ITEM

def process_table(table, row_factory, int max_chunksize) -> list:
cdef list rows = []
"""
Processes a PyArrow table and applies a row factory function to each row.

Parameters:
table: PyArrow Table
The input table to process.
row_factory: function
A function applied to each row.
max_chunksize: int
The batch size to process at a time.

Returns:
A list of transformed rows.
"""
cdef list rows = [None] * table.num_rows
cdef int64_t i = 0

for batch in table.to_batches(max_chunksize):
df = batch.to_pandas().replace({np.nan: None})
for row in df.itertuples(index=False, name=None):
rows.append(row_factory(row))
rows[i] = row_factory(row)
i += 1
return rows
2 changes: 1 addition & 1 deletion orso/compute/varchar_array.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,4 @@ def unpack_byte_array(const unsigned char[::1] raw_bytes, Py_ssize_t n, const ch
bytecount -= 4 + itemlen
i += 1

return out
return out
6 changes: 3 additions & 3 deletions orso/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def polars(self, size=None):

return to_polars(self, size)

def nbytes(self):
def nbytes(self) -> int:
"""Approximate the number of bytes used by the DataFrame"""
self.materialize()
if self._nbytes is None:
Expand All @@ -140,10 +140,10 @@ def append(self, entry):
self._nbytes += new_row.nbytes()
self._cursor = None

def head(self, size: int = 5):
def head(self, size: int = 5) -> "DataFrame":
return self.slice(0, size)

def tail(self, size: int = 5):
def tail(self, size: int = 5) -> "DataFrame":
return self.slice(offset=0 - size, length=size)

def query(self, predicate) -> "DataFrame":
Expand Down
2 changes: 1 addition & 1 deletion orso/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__: str = "0.0.181"
__version__: str = "0.0.182"
__author__: str = "@joocer"
2 changes: 1 addition & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def run_tests():
for index, method in enumerate(test_methods):
start_time = time.monotonic_ns()
test_name = f"\033[38;2;255;184;108m{(index + 1):04}\033[0m \033[38;2;189;147;249m{str(method.__name__)}\033[0m"
print(test_name.ljust(display_width - 20), end="")
print(test_name.ljust(display_width - 20), end="", flush=True)
error = None
output = ""
try:
Expand Down
62 changes: 58 additions & 4 deletions tests/test_compiled.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,74 @@
import os
import sys
import numpy

sys.path.insert(1, os.path.join(sys.path[0], ".."))

from orso.compute.compiled import collect_cython
import numpy

def test_collector():

columns = collect_cython([(1, 2), (2, 1), (7, 8)], numpy.array([1, 0], dtype=numpy.int32))
assert len(columns) == 2
assert len(columns[0]) == 3
assert sum(columns[0]) == 11
assert sum(columns[1]) == 10

if __name__ == "__main__": # prgama: nocover
def test_collector_empty_input():
columns = collect_cython([], numpy.array([], dtype=numpy.int32))
assert len(columns) == 0, len(columns)

def test_collector_single_tuple():
columns = collect_cython([(5, 10)], numpy.array([1], dtype=numpy.int32))
assert len(columns) == 1, len(columns)
assert columns[0] == [10]

def test_collector_large_data():
data = [(i, i * 2) for i in range(10000)]
index = numpy.array([1, 0], dtype=numpy.int32)
columns = collect_cython(data, index)
assert len(columns) == 2
assert len(columns[0]) == 10000
assert sum(columns[0]) == sum(i * 2 for i in range(10000))
assert sum(columns[1]) == sum(range(10000))


def test_collector_non_integer_index():
data = [(1, 2), (3, 4)]
index = numpy.array([0.5, 1.5], dtype=numpy.float64)
try:
collect_cython(data, index)
assert False, "Expected a ValueError"
except ValueError:
pass

def test_collector_negative_index():
data = [(1, 2), (3, 4)]
index = numpy.array([-1, 0], dtype=numpy.int32)
try:
collect_cython(data, index)
assert False, "Expected an IndexError"
except IndexError:
pass

def test_collector_large_index_values():
data = [(1, 2), (3, 4)]
index = numpy.array([100, 200], dtype=numpy.int32)
try:
collect_cython(data, index)
assert False, "Expected an IndexError"
except IndexError:
pass

def test_collector_duplicate_indices():
data = [(1, 2), (3, 4), (5, 6)]
index = numpy.array([1, 1, 0], dtype=numpy.int32)
columns = collect_cython(data, index)
assert len(columns) == 3
assert sum(columns[0]) == 12, sum(columns[0])
assert sum(columns[1]) == 12, sum(columns[1])
assert sum(columns[2]) == 9, sum(columns[2])

if __name__ == "__main__": # pragma: nocover
from tests import run_tests
test_collector()

run_tests()
2 changes: 1 addition & 1 deletion tests/test_display.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def find_all_substrings(s: str, sub: str) -> List[int]:

def test_display_ascii_lazy():

for i in range(10):
for i in range(10):
df = DataFrame(cities.values).head(i)
df._rows = (r for r in df._rows)

Expand Down
56 changes: 55 additions & 1 deletion tests/test_field_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def test_extract_dict_columns_basic():
data = {'a': 1, 'b': 2, 'c': 3}
fields = ('a', 'b', 'c')
result = extract_dict_columns(data, fields)
assert result == (1, 2, 3)
assert result == (1, 2, 3), result

def test_extract_dict_columns_missing_fields():
data = {'a': 1, 'b': 2, 'c': 3}
Expand Down Expand Up @@ -160,6 +160,60 @@ def test_extract_dict_columns_with_same_field_order():
assert result1 == (1, 2, 3, 4)
assert result2 == (4, 3, 2, 1)

def test_extract_dict_columns_sparse_dict():
data = {'a': 1}
fields = ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i')
result = extract_dict_columns(data, fields)
assert result == (1, None, None, None, None, None, None, None, None)

def test_extract_dict_columns_immutability():
data = {'a': 1, 'b': 2, 'c': 3}
original_data = data.copy()
fields = ('a', 'b', 'c')
extract_dict_columns(data, fields)
assert data == original_data, "Function modified input data!"

def test_extract_dict_columns_stress_test():
data = {str(i): i for i in range(10000)}
fields = tuple(str(i) for i in range(20000))
result = extract_dict_columns(data, fields)
assert result[:10000] == tuple(range(10000)) and result[10000:] == (None,) * 10000

def test_extract_dict_columns_duplicate_missing_fields():
data = {'a': 1, 'b': 2}
fields = ('a', 'x', 'x', 'b')
result = extract_dict_columns(data, fields)
assert result == (1, None, None, 2)

def test_extract_dict_columns_case_sensitivity():
data = {'A': 1, 'b': 2}
fields = ('a', 'b')
result = extract_dict_columns(data, fields)
assert result == (None, 2)

def test_extract_dict_columns_whitespace_keys():
data = {' a': 1, 'b ': 2}
fields = ('a', 'b ')
result = extract_dict_columns(data, fields)
assert result == (None, 2)

def test_extract_dict_columns_non_string_keys():
data = {None: 1, 42: "forty-two"}
fields = (None, 42, "missing")
result = extract_dict_columns(data, fields)
assert result == (1, "forty-two", None)

def test_extract_dict_columns_deeply_nested_keys():
data = {'a': {'b': {'c': 1}}}
fields = ('a.b.c', 'a')
result = extract_dict_columns(data, fields)
assert result == (None, {'b': {'c': 1}})

def test_extract_dict_columns_large_dataset_with_missing():
data = {str(i): i for i in range(10**6)}
fields = ('1000001', '500000', '999999')
result = extract_dict_columns(data, fields)
assert result == (None, 500000, 999999)

if __name__ == "__main__": # prgama: nocover
from tests import run_tests
Expand Down

0 comments on commit e4a52e7

Please sign in to comment.