diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..28cee4a --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,8 @@ +repos: +- repo: https://github.com/astral-sh/ruff-pre-commit + # Ruff version. + rev: v0.8.2 + hooks: + - id: ruff + args: [--fix] + - id: ruff-format diff --git a/examples/example.py b/examples/example.py index 8d7741a..c75d0c3 100644 --- a/examples/example.py +++ b/examples/example.py @@ -1,11 +1,10 @@ - from wellbelog.belolis.reader import LisReader # noqa if __name__ == '__main__': - PATH_TO_FILE = r'test\test_files\1-MPE-3-AL.lis' + PATH_TO_FILE = r'test\test_files\1-MPE-3-AL.lis' - reader = LisReader() - file = reader.process_physical_file(PATH_TO_FILE) - print(file.file_name) - print(file.logical_files_table()) + reader = LisReader() + file = reader.process_physical_file(PATH_TO_FILE) + print(file.file_name) + print(file.logical_files_table()) diff --git a/examples/example_dlis.py b/examples/example_dlis.py index 58d61ba..7523c7b 100644 --- a/examples/example_dlis.py +++ b/examples/example_dlis.py @@ -1,12 +1,11 @@ - from wellbelog.belodlis.reader import DlisReader # noqa if __name__ == '__main__': - PATH_TO_FILE = r'test\test_files\1PIR1AL_conv_ccl_canhoneio.dlis' + PATH_TO_FILE = r'test\test_files\1PIR1AL_conv_ccl_canhoneio.dlis' - reader = DlisReader() - file = reader.process_physical_file(PATH_TO_FILE) - print(file.file_name) - print(file.logical_files_table()) - table = file.logical_files_table() + reader = DlisReader() + file = reader.process_physical_file(PATH_TO_FILE) + print(file.file_name) + print(file.logical_files_table()) + table = file.logical_files_table() diff --git a/examples/example_las.py b/examples/example_las.py index 0d01ef6..addb9e9 100644 --- a/examples/example_las.py +++ b/examples/example_las.py @@ -1,11 +1,10 @@ - from wellbelog.belolas.reader import LasReader # noqa if __name__ == '__main__': - PATH_TO_FILE = r'test\test_files\1-MPE-3-AL_hals-dslt-tdd-hgns-gr_resistividade_repetida.las' + PATH_TO_FILE = r'test\test_files\1-MPE-3-AL_hals-dslt-tdd-hgns-gr_resistividade_repetida.las' - reader = LasReader() - file = reader.process_las_file(PATH_TO_FILE) - print(file.file_name) - print(file.table_view()) + reader = LasReader() + file = reader.process_las_file(PATH_TO_FILE) + print(file.file_name) + print(file.table_view()) diff --git a/examples/main_reader.py b/examples/main_reader.py index ad6deda..40bd6d1 100644 --- a/examples/main_reader.py +++ b/examples/main_reader.py @@ -4,6 +4,6 @@ PATH_TO_FILE = r'test\test_files\1PIR1AL_conv_ccl_canhoneio.dlis' if __name__ == '__main__': - reader = MainReader() - file = reader.load_file(PATH_TO_FILE) - print(file.file_name) + reader = MainReader() + file = reader.load_file(PATH_TO_FILE) + print(file.file_name) diff --git a/poetry.lock b/poetry.lock index 9827b54..927e6db 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand. [[package]] name = "annotated-types" @@ -782,6 +782,33 @@ pygments = ">=2.13.0,<3.0.0" [package.extras] jupyter = ["ipywidgets (>=7.5.1,<9)"] +[[package]] +name = "ruff" +version = "0.8.3" +description = "An extremely fast Python linter and code formatter, written in Rust." +optional = false +python-versions = ">=3.7" +files = [ + {file = "ruff-0.8.3-py3-none-linux_armv6l.whl", hash = "sha256:8d5d273ffffff0acd3db5bf626d4b131aa5a5ada1276126231c4174543ce20d6"}, + {file = "ruff-0.8.3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:e4d66a21de39f15c9757d00c50c8cdd20ac84f55684ca56def7891a025d7e939"}, + {file = "ruff-0.8.3-py3-none-macosx_11_0_arm64.whl", hash = "sha256:c356e770811858bd20832af696ff6c7e884701115094f427b64b25093d6d932d"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9c0a60a825e3e177116c84009d5ebaa90cf40dfab56e1358d1df4e29a9a14b13"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:75fb782f4db39501210ac093c79c3de581d306624575eddd7e4e13747e61ba18"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7f26bc76a133ecb09a38b7868737eded6941b70a6d34ef53a4027e83913b6502"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:01b14b2f72a37390c1b13477c1c02d53184f728be2f3ffc3ace5b44e9e87b90d"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:53babd6e63e31f4e96ec95ea0d962298f9f0d9cc5990a1bbb023a6baf2503a82"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1ae441ce4cf925b7f363d33cd6570c51435972d697e3e58928973994e56e1452"}, + {file = "ruff-0.8.3-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d7c65bc0cadce32255e93c57d57ecc2cca23149edd52714c0c5d6fa11ec328cd"}, + {file = "ruff-0.8.3-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5be450bb18f23f0edc5a4e5585c17a56ba88920d598f04a06bd9fd76d324cb20"}, + {file = "ruff-0.8.3-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:8faeae3827eaa77f5721f09b9472a18c749139c891dbc17f45e72d8f2ca1f8fc"}, + {file = "ruff-0.8.3-py3-none-musllinux_1_2_i686.whl", hash = "sha256:db503486e1cf074b9808403991663e4277f5c664d3fe237ee0d994d1305bb060"}, + {file = "ruff-0.8.3-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:6567be9fb62fbd7a099209257fef4ad2c3153b60579818b31a23c886ed4147ea"}, + {file = "ruff-0.8.3-py3-none-win32.whl", hash = "sha256:19048f2f878f3ee4583fc6cb23fb636e48c2635e30fb2022b3a1cd293402f964"}, + {file = "ruff-0.8.3-py3-none-win_amd64.whl", hash = "sha256:f7df94f57d7418fa7c3ffb650757e0c2b96cf2501a0b192c18e4fb5571dfada9"}, + {file = "ruff-0.8.3-py3-none-win_arm64.whl", hash = "sha256:fe2756edf68ea79707c8d68b78ca9a58ed9af22e430430491ee03e718b5e4936"}, + {file = "ruff-0.8.3.tar.gz", hash = "sha256:5e7558304353b84279042fc584a4f4cb8a07ae79b2bf3da1a7551d960b5626d3"}, +] + [[package]] name = "shapely" version = "2.0.6" @@ -974,4 +1001,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "d3bee0cd16c827ac61ee6bdc4952c8a13529b4a5c13ee82916786fd254c27d43" +content-hash = "93a9022f4e65f47c1ea8b9cede36511e660e6a8678e31cc78498a207dc98a1f6" diff --git a/pyproject.toml b/pyproject.toml index 9ae789e..6c767e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ shapely = "^2.0.6" sqlalchemy = "^2.0.34" pytest = "^8.3.3" pytest-cov = "^5.0.0" +ruff = "^0.8.3" [tool.poetry.group.dev.dependencies] diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000..d5cb9a2 --- /dev/null +++ b/ruff.toml @@ -0,0 +1,9 @@ +line-length = 150 +[lint] +ignore = ["F403"] + +# To specify that this rule should only be ignored for __init__.py files +exclude = ["**/__init__.py"] +[format] +quote-style = "single" +indent-style = "tab" \ No newline at end of file diff --git a/setup.py b/setup.py index 982e757..e04cd31 100644 --- a/setup.py +++ b/setup.py @@ -1,10 +1,10 @@ from setuptools import setup if __name__ == '__main__': - setup( - name='wellbelog', - version='0.0.1', - description='A Python package for reading well log files.', - author='Pedro Cavalcanti', - author_email='pedro@monumentosoftware.com.br', - ) + setup( + name='wellbelog', + version='0.0.1', + description='A Python package for reading well log files.', + author='Pedro Cavalcanti', + author_email='pedro@monumentosoftware.com.br', + ) diff --git a/test/dlis/test_dlis_export.py b/test/dlis/test_dlis_export.py index 10ff2bc..c33ed00 100644 --- a/test/dlis/test_dlis_export.py +++ b/test/dlis/test_dlis_export.py @@ -9,26 +9,25 @@ def test_exports(): - dlis_processor = DlisReader() - physical_file = dlis_processor.process_physical_file(file_path) - assert physical_file.file_name == '1PIR1AL_conv_ccl_canhoneio.dlis' - - file_curves = physical_file.curves_names - # Getting the first curve - frame = physical_file.logical_files[0].get_frame() - assert frame is not None - with tempfile.TemporaryDirectory() as temp_dir: - - csv_path = Path(temp_dir) / 'test.csv' - physical_file.logical_files[0].get_frame().data.to_csv(csv_path) - assert csv_path.exists() - assert csv_path.is_file() - assert csv_path.stat().st_size > 0 - - # Check the column names - with open(csv_path, 'r') as f: - columns = f.readline().strip().split(',') - if 'FRAMENO' in columns: - columns.remove('FRAMENO') - for _, column in enumerate(columns): - assert file_curves.__contains__(column) + dlis_processor = DlisReader() + physical_file = dlis_processor.process_physical_file(file_path) + assert physical_file.file_name == '1PIR1AL_conv_ccl_canhoneio.dlis' + + file_curves = physical_file.curves_names + # Getting the first curve + frame = physical_file.logical_files[0].get_frame() + assert frame is not None + with tempfile.TemporaryDirectory() as temp_dir: + csv_path = Path(temp_dir) / 'test.csv' + physical_file.logical_files[0].get_frame().data.to_csv(csv_path) + assert csv_path.exists() + assert csv_path.is_file() + assert csv_path.stat().st_size > 0 + + # Check the column names + with open(csv_path, 'r') as f: + columns = f.readline().strip().split(',') + if 'FRAMENO' in columns: + columns.remove('FRAMENO') + for _, column in enumerate(columns): + assert file_curves.__contains__(column) diff --git a/test/dlis/test_dlis_functions.py b/test/dlis/test_dlis_functions.py index fb1979d..f2a1a10 100644 --- a/test/dlis/test_dlis_functions.py +++ b/test/dlis/test_dlis_functions.py @@ -9,9 +9,9 @@ def test_open_dlis_file(): - physical_file = open_dlis_file(file_path) - assert isinstance(physical_file, dlis.PhysicalFile) - logical_files = unpack_physical_dlis(physical_file) - assert isinstance(logical_files, list) - assert len(logical_files) > 0 - assert isinstance(logical_files[0], dlis.LogicalFile) + physical_file = open_dlis_file(file_path) + assert isinstance(physical_file, dlis.PhysicalFile) + logical_files = unpack_physical_dlis(physical_file) + assert isinstance(logical_files, list) + assert len(logical_files) > 0 + assert isinstance(logical_files[0], dlis.LogicalFile) diff --git a/test/dlis/test_dlis_models.py b/test/dlis/test_dlis_models.py index 2b2c0dc..d03f658 100644 --- a/test/dlis/test_dlis_models.py +++ b/test/dlis/test_dlis_models.py @@ -10,63 +10,63 @@ def test_dlis_physical_schema(): - dlis_processor = DlisReader() - physical_file = dlis_processor.process_physical_file(file_path) - assert physical_file.file_name == '1PIR1AL_conv_ccl_canhoneio.dlis' + dlis_processor = DlisReader() + physical_file = dlis_processor.process_physical_file(file_path) + assert physical_file.file_name == '1PIR1AL_conv_ccl_canhoneio.dlis' - assert physical_file.logical_files_count > 0 + assert physical_file.logical_files_count > 0 - assert physical_file.curves_names is not None + assert physical_file.curves_names is not None - # We already know that the file has the CCL curve - # Test file curves = ['LTEN', 'TDEP', 'LSPD', 'MINMK', 'CCL'] - assert 'CCL' in physical_file.curves_names + # We already know that the file has the CCL curve + # Test file curves = ['LTEN', 'TDEP', 'LSPD', 'MINMK', 'CCL'] + assert 'CCL' in physical_file.curves_names def test_logical_files_table(): - dlis_processor = DlisReader() - physical_file = dlis_processor.process_physical_file(file_path) - table = physical_file.logical_files_table() - assert table.columns[0].header == 'File Name' - assert table.columns[1].header == 'Curves' - assert table.columns[2].header == 'Error' - assert len(table.columns) == 3 - assert len(table.rows) == physical_file.logical_files_count - assert table.title == '1PIR1AL_conv_ccl_canhoneio.dlis' + dlis_processor = DlisReader() + physical_file = dlis_processor.process_physical_file(file_path) + table = physical_file.logical_files_table() + assert table.columns[0].header == 'File Name' + assert table.columns[1].header == 'Curves' + assert table.columns[2].header == 'Error' + assert len(table.columns) == 3 + assert len(table.rows) == physical_file.logical_files_count + assert table.title == '1PIR1AL_conv_ccl_canhoneio.dlis' def test_logical_file_schema_methods(): - reader = DlisReader() - physical_file = reader.process_physical_file(file_path) - logical_file = physical_file.logical_files[0] - # Logical File curves names = ['TDEP', 'LSPD', 'LTEN', 'CCL', 'MINMK'] - assert logical_file.file_name == '1PIR1AL_conv_ccl_canhoneio.dlis' - assert logical_file.error is False - assert logical_file.error_message is None + reader = DlisReader() + physical_file = reader.process_physical_file(file_path) + logical_file = physical_file.logical_files[0] + # Logical File curves names = ['TDEP', 'LSPD', 'LTEN', 'CCL', 'MINMK'] + assert logical_file.file_name == '1PIR1AL_conv_ccl_canhoneio.dlis' + assert logical_file.error is False + assert logical_file.error_message is None - # NOTE Testing Curves names - assert logical_file.curves_names is not None - assert 'CCL' in logical_file.curves_names + # NOTE Testing Curves names + assert logical_file.curves_names is not None + assert 'CCL' in logical_file.curves_names - # NOTE Testing Frames counting - assert logical_file.frames_count == len(logical_file.frames) + # NOTE Testing Frames counting + assert logical_file.frames_count == len(logical_file.frames) - assert isinstance(logical_file.frames[0], FrameModel) + assert isinstance(logical_file.frames[0], FrameModel) - # NOTE Testing table view - table = logical_file.table_view() - assert table.columns[0].header == 'Logical File ID' - assert table.columns[1].header == 'File Name' - assert table.columns[2].header == 'Frames' - assert table.columns[3].header == 'Curves' - assert table.columns[4].header == 'Error' + # NOTE Testing table view + table = logical_file.table_view() + assert table.columns[0].header == 'Logical File ID' + assert table.columns[1].header == 'File Name' + assert table.columns[2].header == 'Frames' + assert table.columns[3].header == 'Curves' + assert table.columns[4].header == 'Error' def test_frame_schema_methods(): - reader = DlisReader() - physical_file = reader.process_physical_file(file_path) - logical_file = physical_file.logical_files[0] - frame = logical_file.frames[0] + reader = DlisReader() + physical_file = reader.process_physical_file(file_path) + logical_file = physical_file.logical_files[0] + frame = logical_file.frames[0] - assert frame.logical_file_id == logical_file.logical_id - assert isinstance(frame.data.as_df(), pd.DataFrame) + assert frame.logical_file_id == logical_file.logical_id + assert isinstance(frame.data.as_df(), pd.DataFrame) diff --git a/test/dlis/test_dlis_reading.py b/test/dlis/test_dlis_reading.py index efe9ccc..617f97e 100644 --- a/test/dlis/test_dlis_reading.py +++ b/test/dlis/test_dlis_reading.py @@ -11,46 +11,46 @@ def test_search_files(): - dlis_processor = DlisReader() - dlis_files = dlis_processor.search_files(folder_path) - assert len(dlis_files) == 2 - assert '1PIR1AL_conv_ccl_canhoneio.dlis' and '1PIR1AL_conv_ccl_canhoneio-error.dlis' in [file.name for file in dlis_files] + dlis_processor = DlisReader() + dlis_files = dlis_processor.search_files(folder_path) + assert len(dlis_files) == 2 + assert '1PIR1AL_conv_ccl_canhoneio.dlis' and '1PIR1AL_conv_ccl_canhoneio-error.dlis' in [file.name for file in dlis_files] - # Testing the search_files method with a wrong path - dlis_files = dlis_processor.search_files(folder_path / 'wrong_path') - assert not len(dlis_files) + # Testing the search_files method with a wrong path + dlis_files = dlis_processor.search_files(folder_path / 'wrong_path') + assert not len(dlis_files) def test_read_dlis_file(): - dlis_processor = DlisReader() - physical_file = dlis_processor.process_physical_file(file_path) - assert physical_file.file_name == '1PIR1AL_conv_ccl_canhoneio.dlis' + dlis_processor = DlisReader() + physical_file = dlis_processor.process_physical_file(file_path) + assert physical_file.file_name == '1PIR1AL_conv_ccl_canhoneio.dlis' def test_read_dlis_file_error(): - dlis_processor = DlisReader() - physical_file = dlis_processor.process_physical_file(error_file_path) - assert physical_file.error - assert physical_file.error_message is not None - assert physical_file.file_name == '1PIR1AL_conv_ccl_canhoneio-error.dlis' - assert physical_file.logical_files_count == 0 + dlis_processor = DlisReader() + physical_file = dlis_processor.process_physical_file(error_file_path) + assert physical_file.error + assert physical_file.error_message is not None + assert physical_file.file_name == '1PIR1AL_conv_ccl_canhoneio-error.dlis' + assert physical_file.logical_files_count == 0 def test_raw_read(): - dlis_processor = DlisReader() - raw = dlis_processor.load_raw(file_path) - assert isinstance(raw, dlis.PhysicalFile) + dlis_processor = DlisReader() + raw = dlis_processor.load_raw(file_path) + assert isinstance(raw, dlis.PhysicalFile) def test_raw_read_error(): - with pytest.raises(Exception): - dlis_processor = DlisReader() - dlis_processor.load_raw(error_file_path) + with pytest.raises(Exception): + dlis_processor = DlisReader() + dlis_processor.load_raw(error_file_path) def load_raw_unpack(): - dlis_processor = DlisReader() - logical = dlis_processor.load_raw(file_path, unpack=True) - assert isinstance(logical, list) - assert isinstance(logical[0], dlis.LogicalFile) - assert len(logical) > 0 + dlis_processor = DlisReader() + logical = dlis_processor.load_raw(file_path, unpack=True) + assert isinstance(logical, list) + assert isinstance(logical[0], dlis.LogicalFile) + assert len(logical) > 0 diff --git a/test/las/test_las__functions.py b/test/las/test_las__functions.py index 94de1e9..1a89f45 100644 --- a/test/las/test_las__functions.py +++ b/test/las/test_las__functions.py @@ -11,13 +11,13 @@ def test_open_las_file(): - las_file = open_las_file(file_path) - assert isinstance(las_file, LASFile) + las_file = open_las_file(file_path) + assert isinstance(las_file, LASFile) def test_process_curves_items(): - las_file = open_las_file(file_path) - curves = process_curves_items(las_file) - assert isinstance(curves, list) - assert len(curves) > 0 - assert isinstance(curves[0], LasCurvesSpecs) + las_file = open_las_file(file_path) + curves = process_curves_items(las_file) + assert isinstance(curves, list) + assert len(curves) > 0 + assert isinstance(curves[0], LasCurvesSpecs) diff --git a/test/las/test_las_export.py b/test/las/test_las_export.py index 76816c3..4fc4311 100644 --- a/test/las/test_las_export.py +++ b/test/las/test_las_export.py @@ -11,26 +11,25 @@ def test_exports(): - dlis_processor = LasReader() - las_file = dlis_processor.process_las_file(file_path) - assert las_file.file_name == '1-MPE-3-AL_hals-dslt-tdd-hgns-gr_resistividade_repetida.las' - assert las_file.error is False - - las_data = las_file.data - assert las_data is not None - - las_file_columns = las_data.columns - assert isinstance(las_data.as_df(), pd.DataFrame) - with tempfile.TemporaryDirectory() as temp_dir: - - csv_path = Path(temp_dir) / 'test.csv' - las_data.to_csv(csv_path) - assert csv_path.exists() - assert csv_path.is_file() - assert csv_path.stat().st_size > 0 - - # Check the column names - with open(csv_path, 'r') as f: - columns = f.readline().strip().split(',') - for _, column in enumerate(columns): - assert las_file_columns.__contains__(column) + dlis_processor = LasReader() + las_file = dlis_processor.process_las_file(file_path) + assert las_file.file_name == '1-MPE-3-AL_hals-dslt-tdd-hgns-gr_resistividade_repetida.las' + assert las_file.error is False + + las_data = las_file.data + assert las_data is not None + + las_file_columns = las_data.columns + assert isinstance(las_data.as_df(), pd.DataFrame) + with tempfile.TemporaryDirectory() as temp_dir: + csv_path = Path(temp_dir) / 'test.csv' + las_data.to_csv(csv_path) + assert csv_path.exists() + assert csv_path.is_file() + assert csv_path.stat().st_size > 0 + + # Check the column names + with open(csv_path, 'r') as f: + columns = f.readline().strip().split(',') + for _, column in enumerate(columns): + assert las_file_columns.__contains__(column) diff --git a/test/las/test_las_models.py b/test/las/test_las_models.py index bf3204b..2bc0b09 100644 --- a/test/las/test_las_models.py +++ b/test/las/test_las_models.py @@ -10,21 +10,21 @@ def test_lasfile_schema(): - reader = LasReader() - las_file = reader.process_las_file(file_path) - assert isinstance(las_file, LasFileModel) + reader = LasReader() + las_file = reader.process_las_file(file_path) + assert isinstance(las_file, LasFileModel) - curves = las_file.curves_names - assert isinstance(curves, list) - assert 'DEPT' in las_file.curves_names + curves = las_file.curves_names + assert isinstance(curves, list) + assert 'DEPT' in las_file.curves_names - gr_curve = las_file.get_curve('GR') - assert isinstance(gr_curve, LasCurvesSpecs) + gr_curve = las_file.get_curve('GR') + assert isinstance(gr_curve, LasCurvesSpecs) def test_tableview(): - reader = LasReader() - las_file = reader.process_las_file(file_path) - table = las_file.table_view() - assert isinstance(table, Table) - assert ['File Name', 'Curves', 'Error'] == [column.header for column in table.columns] + reader = LasReader() + las_file = reader.process_las_file(file_path) + table = las_file.table_view() + assert isinstance(table, Table) + assert ['File Name', 'Curves', 'Error'] == [column.header for column in table.columns] diff --git a/test/las/test_las_reading.py b/test/las/test_las_reading.py index 7cd6fa2..2711b4d 100644 --- a/test/las/test_las_reading.py +++ b/test/las/test_las_reading.py @@ -9,20 +9,19 @@ def test_search_files(): - las_processor = LasReader() - las_files = las_processor.search_files(folder_path) - assert len(las_files) == 1 - assert las_files[0].name == '1-MPE-3-AL_hals-dslt-tdd-hgns-gr_resistividade_repetida.las' + las_processor = LasReader() + las_files = las_processor.search_files(folder_path) + assert len(las_files) == 1 + assert las_files[0].name == '1-MPE-3-AL_hals-dslt-tdd-hgns-gr_resistividade_repetida.las' def test_las_reading(): - - manager = LasReader() - las_file = manager.process_las_file(file_path) - assert las_file.file_name == '1-MPE-3-AL_hals-dslt-tdd-hgns-gr_resistividade_repetida.las' + manager = LasReader() + las_file = manager.process_las_file(file_path) + assert las_file.file_name == '1-MPE-3-AL_hals-dslt-tdd-hgns-gr_resistividade_repetida.las' def test_raw_reading(): - manager = LasReader() - raw = manager.load_raw(file_path) - assert isinstance(raw, LASFile) + manager = LasReader() + raw = manager.load_raw(file_path) + assert isinstance(raw, LASFile) diff --git a/test/lis/test_lis_export.py b/test/lis/test_lis_export.py index e9de09a..921d363 100644 --- a/test/lis/test_lis_export.py +++ b/test/lis/test_lis_export.py @@ -8,18 +8,18 @@ def test_lis_to_csv(): - reader = LisReader() - file = reader.process_physical_file(file_path) - assert file.file_name == '1-MPE-3-AL.lis' - assert file.error is False + reader = LisReader() + file = reader.process_physical_file(file_path) + assert file.file_name == '1-MPE-3-AL.lis' + assert file.error is False - logical_file = file.logical_files[1] - assert logical_file.file_name == '1-MPE-3-AL.lis' - assert logical_file.error is False + logical_file = file.logical_files[1] + assert logical_file.file_name == '1-MPE-3-AL.lis' + assert logical_file.error is False - curve_data = logical_file.frames[1] - with tempfile.TemporaryDirectory() as tmpdirname: - csv_path = f'{tmpdirname}/1-MPE-3-AL.csv' - curve_data.to_csv(csv_path) - assert pathlib.Path(csv_path).is_file() - assert pathlib.Path(csv_path).stat().st_size > 0 + curve_data = logical_file.frames[1] + with tempfile.TemporaryDirectory() as tmpdirname: + csv_path = f'{tmpdirname}/1-MPE-3-AL.csv' + curve_data.to_csv(csv_path) + assert pathlib.Path(csv_path).is_file() + assert pathlib.Path(csv_path).stat().st_size > 0 diff --git a/test/lis/test_lis_functions.py b/test/lis/test_lis_functions.py index 600a5ad..d1c89bc 100644 --- a/test/lis/test_lis_functions.py +++ b/test/lis/test_lis_functions.py @@ -8,13 +8,13 @@ def test_read_file(): - lis_physical = read_lis_file(file_path) - assert isinstance(lis_physical, lis.PhysicalFile) + lis_physical = read_lis_file(file_path) + assert isinstance(lis_physical, lis.PhysicalFile) def test_parse_physical_file(): - lis_physical = read_lis_file(file_path) - lis_logical = parse_lis_physical_file(lis_physical) - assert isinstance(lis_logical, list) - assert isinstance(lis_logical[0], lis.LogicalFile) - assert len(lis_logical) > 0 + lis_physical = read_lis_file(file_path) + lis_logical = parse_lis_physical_file(lis_physical) + assert isinstance(lis_logical, list) + assert isinstance(lis_logical[0], lis.LogicalFile) + assert len(lis_logical) > 0 diff --git a/test/lis/test_lis_models.py b/test/lis/test_lis_models.py index 557aa0a..027f167 100644 --- a/test/lis/test_lis_models.py +++ b/test/lis/test_lis_models.py @@ -11,53 +11,53 @@ def test_lis_physical_model_methods(): - reader = LisReader() - file = reader.process_physical_file(file_path) + reader = LisReader() + file = reader.process_physical_file(file_path) - assert isinstance(file, PhysicalLisFileModel) - assert file.file_name == '1-MPE-3-AL.lis' - assert file.folder_name is None - assert file.error is False - assert file.error_message is None - assert file.logical_files_count > 0 - assert 'SP' in file.curves_names + assert isinstance(file, PhysicalLisFileModel) + assert file.file_name == '1-MPE-3-AL.lis' + assert file.folder_name is None + assert file.error is False + assert file.error_message is None + assert file.logical_files_count > 0 + assert 'SP' in file.curves_names def test_physical_file_table(): - reader = LisReader() - file = reader.process_physical_file(file_path) - table = file.logical_files_table() - assert table.columns[0].header == 'File Name' - assert table.columns[1].header == 'Curves' - assert table.columns[2].header == 'Error' - assert len(table.columns) == 3 - assert len(table.rows) == file.logical_files_count - assert table.title == '1-MPE-3-AL.lis' + reader = LisReader() + file = reader.process_physical_file(file_path) + table = file.logical_files_table() + assert table.columns[0].header == 'File Name' + assert table.columns[1].header == 'Curves' + assert table.columns[2].header == 'Error' + assert len(table.columns) == 3 + assert len(table.rows) == file.logical_files_count + assert table.title == '1-MPE-3-AL.lis' def test_lis_logical_file_methods(): - reader = LisReader() - file = reader.process_physical_file(file_path) - logical_file = file.logical_files[1] - assert isinstance(logical_file, LogicalLisFileModel) - assert logical_file.file_name == '1-MPE-3-AL.lis' - assert logical_file.error is False - assert logical_file.error_message is None + reader = LisReader() + file = reader.process_physical_file(file_path) + logical_file = file.logical_files[1] + assert isinstance(logical_file, LogicalLisFileModel) + assert logical_file.file_name == '1-MPE-3-AL.lis' + assert logical_file.error is False + assert logical_file.error_message is None - sp_curve = logical_file.get_frame() - assert sp_curve is not None - assert isinstance(sp_curve, FrameLisCurves) + sp_curve = logical_file.get_frame() + assert sp_curve is not None + assert isinstance(sp_curve, FrameLisCurves) - assert logical_file.frames_count == len(logical_file.frames) + assert logical_file.frames_count == len(logical_file.frames) def test_lis_logical_file_table(): - reader = LisReader() - file = reader.process_physical_file(file_path) - logical_file = file.logical_files[1] - table = logical_file.table_view() - assert table.columns[0].header == 'Logical File ID' - assert table.columns[1].header == 'Frames' - assert table.columns[2].header == 'Curves' - assert table.columns[3].header == 'Error' - assert table.title == '1-MPE-3-AL.lis' + reader = LisReader() + file = reader.process_physical_file(file_path) + logical_file = file.logical_files[1] + table = logical_file.table_view() + assert table.columns[0].header == 'Logical File ID' + assert table.columns[1].header == 'Frames' + assert table.columns[2].header == 'Curves' + assert table.columns[3].header == 'Error' + assert table.title == '1-MPE-3-AL.lis' diff --git a/test/lis/test_lis_reading.py b/test/lis/test_lis_reading.py index aff2e85..0cc2f97 100644 --- a/test/lis/test_lis_reading.py +++ b/test/lis/test_lis_reading.py @@ -11,44 +11,44 @@ def test_search_files(): - lis_processor = LisReader() - lis_files = lis_processor.search_files(folder_path) - assert len(lis_files) == 2 - assert '1-MPE-3-AL.lis' in [file.name for file in lis_files] + lis_processor = LisReader() + lis_files = lis_processor.search_files(folder_path) + assert len(lis_files) == 2 + assert '1-MPE-3-AL.lis' in [file.name for file in lis_files] def test_lis_reading(): - reader = LisReader() - file = reader.process_physical_file(file_path) - assert file.file_name == '1-MPE-3-AL.lis' - assert file.folder_name is None - assert file.error is False + reader = LisReader() + file = reader.process_physical_file(file_path) + assert file.file_name == '1-MPE-3-AL.lis' + assert file.folder_name is None + assert file.error is False def test_lis_reading_error(): - reader = LisReader() - file = reader.process_physical_file(error_file_path) - assert file.error - assert file.error_message is not None - assert file.file_name == '1-MPE-3-AL-error.lis' - assert file.logical_files_count == 0 + reader = LisReader() + file = reader.process_physical_file(error_file_path) + assert file.error + assert file.error_message is not None + assert file.file_name == '1-MPE-3-AL-error.lis' + assert file.logical_files_count == 0 def test_raw_reading(): - reader = LisReader() - raw = reader.load_raw(file_path) - assert isinstance(raw, lis.PhysicalFile) + reader = LisReader() + raw = reader.load_raw(file_path) + assert isinstance(raw, lis.PhysicalFile) def test_raw_reading_error(): - with pytest.raises(Exception): - reader = LisReader() - reader.load_raw(error_file_path) + with pytest.raises(Exception): + reader = LisReader() + reader.load_raw(error_file_path) def test_raw_unpacking(): - reader = LisReader() - logical = reader.load_raw(file_path, unpack=True) - assert isinstance(logical, list) - assert isinstance(logical[0], lis.LogicalFile) - assert len(logical) > 0 + reader = LisReader() + logical = reader.load_raw(file_path, unpack=True) + assert isinstance(logical, list) + assert isinstance(logical[0], lis.LogicalFile) + assert len(logical) > 0 diff --git a/test/utils/test_json_io.py b/test/utils/test_json_io.py index 86312a9..4c3b4c5 100644 --- a/test/utils/test_json_io.py +++ b/test/utils/test_json_io.py @@ -4,24 +4,24 @@ def test_read_json(): - with tempfile.NamedTemporaryFile('w', delete=False) as f: - f.write('[{"name": "John", "age": 30, "city": "New York"}]') - assert read_json(f.name) == [{'name': 'John', 'age': 30, 'city': 'New York'}] + with tempfile.NamedTemporaryFile('w', delete=False) as f: + f.write('[{"name": "John", "age": 30, "city": "New York"}]') + assert read_json(f.name) == [{'name': 'John', 'age': 30, 'city': 'New York'}] def test_write_json(): - with tempfile.NamedTemporaryFile('w', delete=False) as f: - write_json(f.name, [{'name': 'John', 'age': 30, 'city': 'New York'}]) - assert read_json(f.name) == [{'name': 'John', 'age': 30, 'city': 'New York'}] + with tempfile.NamedTemporaryFile('w', delete=False) as f: + write_json(f.name, [{'name': 'John', 'age': 30, 'city': 'New York'}]) + assert read_json(f.name) == [{'name': 'John', 'age': 30, 'city': 'New York'}] def test_is_jsonable(): - json_test_pass = {'name': 'John', 'age': 30, 'city': 'New York'} - json_test_fail = {'name': 'John', 'age': 30, 'city': 'New York', 'set': set()} - assert is_jsonable(json_test_pass) is True - assert is_jsonable(json_test_fail) is False + json_test_pass = {'name': 'John', 'age': 30, 'city': 'New York'} + json_test_fail = {'name': 'John', 'age': 30, 'city': 'New York', 'set': set()} + assert is_jsonable(json_test_pass) is True + assert is_jsonable(json_test_fail) is False def test_check_dict_json(): - json_test = {'name': 'John', 'age': 30, 'city': 'New York', 'set': set()} - assert check_dict_jsonable(json_test) == {'name': 'John', 'age': 30, 'city': 'New York', 'set': 'set()'} + json_test = {'name': 'John', 'age': 30, 'city': 'New York', 'set': set()} + assert check_dict_jsonable(json_test) == {'name': 'John', 'age': 30, 'city': 'New York', 'set': 'set()'} diff --git a/test/utils/test_main_reader.py b/test/utils/test_main_reader.py index ed6a8d1..484a6f8 100644 --- a/test/utils/test_main_reader.py +++ b/test/utils/test_main_reader.py @@ -8,13 +8,12 @@ def test_main_reader(): + reader = MainReader() + lis_file = reader.load_file(lis) + assert lis_file.file_name == '1-MPE-3-AL.lis' - reader = MainReader() - lis_file = reader.load_file(lis) - assert lis_file.file_name == '1-MPE-3-AL.lis' + dlis_file = reader.load_file(dlis) + assert dlis_file.file_name == '1PIR1AL_conv_ccl_canhoneio.dlis' - dlis_file = reader.load_file(dlis) - assert dlis_file.file_name == '1PIR1AL_conv_ccl_canhoneio.dlis' - - las_file = reader.load_file(las) - assert las_file.file_name == '1-MPE-3-AL.las' + las_file = reader.load_file(las) + assert las_file.file_name == '1-MPE-3-AL.las' diff --git a/test/utils/test_mnemonics_utils.py b/test/utils/test_mnemonics_utils.py index a04c70e..9464236 100644 --- a/test/utils/test_mnemonics_utils.py +++ b/test/utils/test_mnemonics_utils.py @@ -1,37 +1,35 @@ -from pathlib import Path - import pandas as pd from wellbelog.utils.mnemonicfix import MnemonicFix def test_replace_column_values(): - df = pd.DataFrame({'INDEX': [1, 2, 3, 4, 5]}) - characters = {'INDEX'} - default = 'DEPT' - assert MnemonicFix.replace_columns_values(df, characters, default).columns.values == ['DEPT'] + df = pd.DataFrame({'INDEX': [1, 2, 3, 4, 5]}) + characters = {'INDEX'} + default = 'DEPT' + assert MnemonicFix.replace_columns_values(df, characters, default).columns.values == ['DEPT'] def test_mnemonics_utils(): - string_to_fix = 'INDEX' - assert MnemonicFix.replace_index(string_to_fix) == 'DEPT' + string_to_fix = 'INDEX' + assert MnemonicFix.replace_index(string_to_fix) == 'DEPT' def test_dept_rename(): - df = pd.DataFrame({'DEPT(0)': [1, 2, 3, 4, 5]}) - assert MnemonicFix.depth_rename(df).columns.values == ['DEPT'] + df = pd.DataFrame({'DEPT(0)': [1, 2, 3, 4, 5]}) + assert MnemonicFix.depth_rename(df).columns.values == ['DEPT'] def test_gamma_rename(): - df = pd.DataFrame({'GR ': [1, 2, 3, 4, 5]}) - assert MnemonicFix.gamma_rename(df).columns.values == ['GR'] + df = pd.DataFrame({'GR ': [1, 2, 3, 4, 5]}) + assert MnemonicFix.gamma_rename(df).columns.values == ['GR'] def test_index_to_depth(): - df = pd.DataFrame({'INDEX': [1, 2, 3, 4, 5]}) - assert MnemonicFix.index_to_depth(df).columns.values == ['DEPT'] + df = pd.DataFrame({'INDEX': [1, 2, 3, 4, 5]}) + assert MnemonicFix.index_to_depth(df).columns.values == ['DEPT'] def test_strip_column_names(): - df = pd.DataFrame({' DEPT': [1, 2, 3, 4, 5]}) - assert MnemonicFix.strip_column_names(df).columns.values == ['DEPT'] + df = pd.DataFrame({' DEPT': [1, 2, 3, 4, 5]}) + assert MnemonicFix.strip_column_names(df).columns.values == ['DEPT'] diff --git a/test/utils/test_units.py b/test/utils/test_units.py index 9a1bf1c..c1b9363 100644 --- a/test/utils/test_units.py +++ b/test/utils/test_units.py @@ -2,10 +2,10 @@ def test_feet_to_meter(): - assert feet_to_meter(1, 1) == 0.3 - assert feet_to_meter(2, 1) == 0.6 + assert feet_to_meter(1, 1) == 0.3 + assert feet_to_meter(2, 1) == 0.6 def test_meter_to_feet(): - assert meter_to_feet(1, 1) == 3.3 - assert meter_to_feet(2, 1) == 6.6 + assert meter_to_feet(1, 1) == 3.3 + assert meter_to_feet(2, 1) == 6.6 diff --git a/wellbelog/belodlis/functions.py b/wellbelog/belodlis/functions.py index ace2cf3..d344a88 100644 --- a/wellbelog/belodlis/functions.py +++ b/wellbelog/belodlis/functions.py @@ -2,27 +2,27 @@ def open_dlis_file(file_path: str) -> dlis.PhysicalFile: - """ - Opens the DLIS file at the specified file path and returns a dlis.PhysicalFile object. + """ + Opens the DLIS file at the specified file path and returns a dlis.PhysicalFile object. - Parameters: - file_path (str): The path to the DLIS file. + Parameters: + file_path (str): The path to the DLIS file. - Returns: - dlis.PhysicalFile: The opened DLIS file. + Returns: + dlis.PhysicalFile: The opened DLIS file. - Exception: If there is an error while loading the DLIS file. - """ - try: - return dlis.load(file_path) + Exception: If there is an error while loading the DLIS file. + """ + try: + return dlis.load(file_path) - except Exception as e: - return e + except Exception as e: + return e def unpack_physical_dlis(ph_file: dlis.PhysicalFile) -> list[dlis.LogicalFile]: - """ - Unpacks the physical file and returns a list of logical files - """ - *logical_files, = ph_file - return logical_files + """ + Unpacks the physical file and returns a list of logical files + """ + (*logical_files,) = ph_file + return logical_files diff --git a/wellbelog/belodlis/objects_parsers/__init__.py b/wellbelog/belodlis/objects_parsers/__init__.py index 607247b..d4f7d6c 100644 --- a/wellbelog/belodlis/objects_parsers/__init__.py +++ b/wellbelog/belodlis/objects_parsers/__init__.py @@ -1,4 +1,2 @@ -from .logical_file_parser import ( - file_params_to_dict, file_tools_to_dict, get_logical_file_summary -) +from .logical_file_parser import file_params_to_dict, file_tools_to_dict, get_logical_file_summary from .physical_file_parser import get_physical_file_summary diff --git a/wellbelog/belodlis/objects_parsers/frame_parser.py b/wellbelog/belodlis/objects_parsers/frame_parser.py index 2610925..4c6a649 100644 --- a/wellbelog/belodlis/objects_parsers/frame_parser.py +++ b/wellbelog/belodlis/objects_parsers/frame_parser.py @@ -1,4 +1,3 @@ - from typing import Union from dlisio import dlis @@ -9,70 +8,70 @@ class FrameProcessor: - """ - This class is responsible for processing the dlis.Frame object - """ - @staticmethod - def process_frame(frame: dlis.Frame, file_name: str, logical_id: str, filter: bool = True) -> Union[FrameModel, None]: - """ - Process the Frame object and returns a dict with the main data - """ - if filter: - if 'DUMM' in [c.name for c in frame.channels]: - return None + """ + This class is responsible for processing the dlis.Frame object + """ - model = FrameModel( - file_name=file_name, - logical_file_id=logical_id, - description=str(frame.description) if frame.description else None, - channels=FrameProcessor.processs_frame_channels(frame), - ) + @staticmethod + def process_frame(frame: dlis.Frame, file_name: str, logical_id: str, filter: bool = True) -> Union[FrameModel, None]: + """ + Process the Frame object and returns a dict with the main data + """ + if filter: + if 'DUMM' in [c.name for c in frame.channels]: + return None - return model + model = FrameModel( + file_name=file_name, + logical_file_id=logical_id, + description=str(frame.description) if frame.description else None, + channels=FrameProcessor.processs_frame_channels(frame), + ) - @ staticmethod - def processs_frame_channels(frame: dlis.Frame, filter_dumm: bool = True) -> Union[ChannelsList, None]: - """ - Tries to create a list of FrameChannelData from the Frame.channels - """ + return model - channels: list[dlis.Channel] = frame.channels - if filter_dumm: - if any([c.name == 'DUMM' for c in channels]): - return None + @staticmethod + def processs_frame_channels(frame: dlis.Frame, filter_dumm: bool = True) -> Union[ChannelsList, None]: + """ + Tries to create a list of FrameChannelData from the Frame.channels + """ - if not channels: - return None + channels: list[dlis.Channel] = frame.channels + if filter_dumm: + if any([c.name == 'DUMM' for c in channels]): + return None - channel_main_dict = [ - FrameChannel( - long_name=str(channel.long_name), - name=MnemonicFix.replace_index(str(channel.name)).strip(), - units=str(channel.units), - repr=str(channel.reprc), - properties=str(channel.properties), + if not channels: + return None - ) - for channel in channels - ] - return channel_main_dict + channel_main_dict = [ + FrameChannel( + long_name=str(channel.long_name), + name=MnemonicFix.replace_index(str(channel.name)).strip(), + units=str(channel.units), + repr=str(channel.reprc), + properties=str(channel.properties), + ) + for channel in channels + ] + return channel_main_dict - @ staticmethod - def dlis_curves_to_dataframe(frame: dlis.Frame): - """Tries to create a dataframe from the Frame.curves""" - # Creating the main dataframe for the curve data - try: - df = pd.DataFrame(frame.curves()) - # - Fixing mnemonics and wrong indexes - df = MnemonicFix.index_to_depth(df) - df.columns = df.columns.str.strip() - # Setting the column Well to the origin - return df + @staticmethod + def dlis_curves_to_dataframe(frame: dlis.Frame): + """Tries to create a dataframe from the Frame.curves""" + # Creating the main dataframe for the curve data + try: + df = pd.DataFrame(frame.curves()) + # - Fixing mnemonics and wrong indexes + df = MnemonicFix.index_to_depth(df) + df.columns = df.columns.str.strip() + # Setting the column Well to the origin + return df - except ValueError as v: - raise v + except ValueError as v: + raise v - except Exception as e: - # NOTE It returns the Exception object - # NOTE It will be setted to the Error in the BeloFrame - return e + except Exception as e: + # NOTE It returns the Exception object + # NOTE It will be setted to the Error in the BeloFrame + return e diff --git a/wellbelog/belodlis/objects_parsers/logical_file_parser.py b/wellbelog/belodlis/objects_parsers/logical_file_parser.py index 2685dea..b9b2259 100644 --- a/wellbelog/belodlis/objects_parsers/logical_file_parser.py +++ b/wellbelog/belodlis/objects_parsers/logical_file_parser.py @@ -1,9 +1,6 @@ from typing import Union -from dlisio.dlis import ( - Measurement, Parameter, Channel, LogicalFile, - Fileheader -) +from dlisio.dlis import Measurement, Parameter, Channel, LogicalFile, Fileheader from rich.console import Console from wellbelog.schemas.dlis import LogicalFileSummary @@ -15,147 +12,139 @@ def file_params_to_dict(objs: list[MetadataObject], **kwargs) -> list[dict]: - """ - Create a a list of dictsthat summarize the content of 'objs', - One dict per obj + """ + Create a a list of dictsthat summarize the content of 'objs', + One dict per obj - Parameters - ---------- + Parameters + ---------- - objs : list() - list of metadata objects + objs : list() + list of metadata objects - **kwargs - Keyword arguments - Use kwargs to tell the function which fields (attributes) of the\n - objects you want to include in the DataFrame. The parameter name\n - must match an attribute on the object in 'objs', while the value\n - of the parameters is used as a column name. Any kwargs are excepted,\n - but if the object does not have the requested attribute, 'KeyError'\n - is used as the value.\n + **kwargs + Keyword arguments + Use kwargs to tell the function which fields (attributes) of the\n + objects you want to include in the DataFrame. The parameter name\n + must match an attribute on the object in 'objs', while the value\n + of the parameters is used as a column name. Any kwargs are excepted,\n + but if the object does not have the requested attribute, 'KeyError'\n + is used as the value.\n - Returns - ------- + Returns + ------- - dicts_summary : list[dict] - """ - dicts_summary: list[dict] = [] + dicts_summary : list[dict] + """ + dicts_summary: list[dict] = [] - for obj in objs: - obj: Parameter - obj_dict = {} - obj_dict['name'] = getattr(obj, 'name') - obj_dict['long_name'] = getattr(obj, 'long_name') + for obj in objs: + obj: Parameter + obj_dict = {} + obj_dict['name'] = getattr(obj, 'name') + obj_dict['long_name'] = getattr(obj, 'long_name') - try: - # NOTE Main attribute acees method - if getattr(obj, 'values').ndim == 1 and getattr(obj, 'values') is not None: # noqa - obj_dict['values'] = str(getattr(obj, 'values')[0]) + try: + # NOTE Main attribute acees method + if getattr(obj, 'values').ndim == 1 and getattr(obj, 'values') is not None: # noqa + obj_dict['values'] = str(getattr(obj, 'values')[0]) - except IndexError: - obj_dict['values'] = 'No values were found on this parameter' + except IndexError: + obj_dict['values'] = 'No values were found on this parameter' - except Exception: - obj_dict['values'] = 'Error while acessing attribute' + except Exception: + obj_dict['values'] = 'Error while acessing attribute' - dicts_summary.append(obj_dict) - return dicts_summary + dicts_summary.append(obj_dict) + return dicts_summary def file_tools_to_dict(objs: list[MetadataObject], **kwargs): - """ - Create a a list of dicts that summarize the content of 'Tools'. + """ + Create a a list of dicts that summarize the content of 'Tools'. - Parameters - ---------- + Parameters + ---------- - objs : list() - list of metadata objects + objs : list() + list of metadata objects - Returns - ------- + Returns + ------- - dicts_summary : list[dict] - """ - dicts_summary: list[dict] = [] - for obj in objs: - obj_dict = {} + dicts_summary : list[dict] + """ + dicts_summary: list[dict] = [] + for obj in objs: + obj_dict = {} - obj_dict['name'] = getattr(obj, 'name') - obj_dict['generic_name'] = getattr(obj, 'generic_name') - obj_dict['trademark_name'] = getattr(obj, 'trademark_name') - obj_dict['description'] = getattr(obj, 'description') + obj_dict['name'] = getattr(obj, 'name') + obj_dict['generic_name'] = getattr(obj, 'generic_name') + obj_dict['trademark_name'] = getattr(obj, 'trademark_name') + obj_dict['description'] = getattr(obj, 'description') - dicts_summary.append(obj_dict) - return dicts_summary + dicts_summary.append(obj_dict) + return dicts_summary def file_remarks(logical_file: LogicalFile) -> dict: - """ - Gets the remarks on a given logical file - - Args: - logical_file (LogicalFile): The logical file to be searched - - Returns: - list[dict]: a list of the remarks - """ - remars_dict_list = {} - try: - remarks = logical_file.find('PARAMETER', '^R[0-9]{1,2}') - remarks = sorted(remarks, key=lambda x: int(x.name[1:])) - for remark in remarks: - if remark.name == 'R8': - continue - remark_dict = {} - if remark.values.ndim == 1 and remark.values is not None: - remark_dict[remark.name] = " ".join(remark.values - ).encode('utf-8' - ).decode('utf-8') - remars_dict_list.update(remark_dict) - except Exception as e: - console.log(f'[bold on red] {e}') - pass - - return remars_dict_list + """ + Gets the remarks on a given logical file + + Args: + logical_file (LogicalFile): The logical file to be searched + + Returns: + list[dict]: a list of the remarks + """ + remars_dict_list = {} + try: + remarks = logical_file.find('PARAMETER', '^R[0-9]{1,2}') + remarks = sorted(remarks, key=lambda x: int(x.name[1:])) + for remark in remarks: + if remark.name == 'R8': + continue + remark_dict = {} + if remark.values.ndim == 1 and remark.values is not None: + remark_dict[remark.name] = ' '.join(remark.values).encode('utf-8').decode('utf-8') + remars_dict_list.update(remark_dict) + except Exception as e: + console.log(f'[bold on red] {e}') + pass + + return remars_dict_list def get_logical_file_summary(logical_file: LogicalFile) -> LogicalFileSummary: - """ - ## Master function - It combines the previous functions, into a pipeline.\n - This function creates various lists of dictionaries, - that reflects the objects that it contains. + """ + ## Master function + It combines the previous functions, into a pipeline.\n + This function creates various lists of dictionaries, + that reflects the objects that it contains. - Args: - logical_file (dlis.LogicalFile): A dlisio.dlis.LogicalFile. + Args: + logical_file (dlis.LogicalFile): A dlisio.dlis.LogicalFile. - main_parameters (list[str]): A list of the main parameters - that should be displayed + main_parameters (list[str]): A list of the main parameters + that should be displayed - Returns: - LogicalFileSummary: A LogicalFile Summary object. - """ + Returns: + LogicalFileSummary: A LogicalFile Summary object. + """ - parameters_dict_list = file_params_to_dict(logical_file.parameters) + parameters_dict_list = file_params_to_dict(logical_file.parameters) - # NOTE Creating the tools dict. - tools_dict_list = file_tools_to_dict(logical_file.tools) + # NOTE Creating the tools dict. + tools_dict_list = file_tools_to_dict(logical_file.tools) - # NOTE Getting the header - header: Fileheader = logical_file.fileheader - header = header.describe().__str__() + # NOTE Getting the header + header: Fileheader = logical_file.fileheader + header = header.describe().__str__() - # NOTE Creating the comments dict. - comments = {'comment': co.text for co in logical_file.comments} + # NOTE Creating the comments dict. + comments = {'comment': co.text for co in logical_file.comments} - # NOTE Creating the remarks dict. - remars_dict_list = file_remarks(logical_file) + # NOTE Creating the remarks dict. + remars_dict_list = file_remarks(logical_file) - return LogicalFileSummary( - parameters=parameters_dict_list, - tools=tools_dict_list, - remarks=remars_dict_list, - comments=comments, - header=header - ) + return LogicalFileSummary(parameters=parameters_dict_list, tools=tools_dict_list, remarks=remars_dict_list, comments=comments, header=header) diff --git a/wellbelog/belodlis/objects_parsers/physical_file_parser.py b/wellbelog/belodlis/objects_parsers/physical_file_parser.py index 6ebfd23..adb2fdb 100644 --- a/wellbelog/belodlis/objects_parsers/physical_file_parser.py +++ b/wellbelog/belodlis/objects_parsers/physical_file_parser.py @@ -1,40 +1,37 @@ - from dlisio.dlis import PhysicalFile from .sumaries import PhysicalFileSummary def get_physical_file_summary(physical_file: PhysicalFile, well: str, file_path: str) -> PhysicalFileSummary: - """ - This function transforms the .describe method of the PhisicalFile - into a list of strings and a dict, made with the values - from the descripition - - Args: - physical_file (PhysicalFile): A dlis.PhysicalFile - well (str): A sring representing the well - file_path (str): The file_name - - Returns: - PhysicalFileSummary: A PhysicalFile fast summary - """ - master_dicts = {'well': well, 'file_path': file_path} - - # NOTE Getting the physical file descripition - physical_desc = physical_file.describe().__str__() - # - Splitting the lines - lines = physical_desc.__str__().split('\n') - for line in lines: - try: - line = line.split(':') - # XXX This is strange but it works - # It is a hacked whitespace tool, for enhancing the process - key_ = line[0].replace(' ', '').replace( - ' ', '').replace(' ', '').strip( - ).lower().replace(' ', '_') - - master_dicts.update({key_: line[1]}) - - except Exception: - return None - - return PhysicalFileSummary(physical_desc, master_dicts) + """ + This function transforms the .describe method of the PhisicalFile + into a list of strings and a dict, made with the values + from the descripition + + Args: + physical_file (PhysicalFile): A dlis.PhysicalFile + well (str): A sring representing the well + file_path (str): The file_name + + Returns: + PhysicalFileSummary: A PhysicalFile fast summary + """ + master_dicts = {'well': well, 'file_path': file_path} + + # NOTE Getting the physical file descripition + physical_desc = physical_file.describe().__str__() + # - Splitting the lines + lines = physical_desc.__str__().split('\n') + for line in lines: + try: + line = line.split(':') + # XXX This is strange but it works + # It is a hacked whitespace tool, for enhancing the process + key_ = line[0].replace(' ', '').replace(' ', '').replace(' ', '').strip().lower().replace(' ', '_') + + master_dicts.update({key_: line[1]}) + + except Exception: + return None + + return PhysicalFileSummary(physical_desc, master_dicts) diff --git a/wellbelog/belodlis/objects_parsers/sumaries.py b/wellbelog/belodlis/objects_parsers/sumaries.py index 42a0836..cc08c82 100644 --- a/wellbelog/belodlis/objects_parsers/sumaries.py +++ b/wellbelog/belodlis/objects_parsers/sumaries.py @@ -1,23 +1,15 @@ # flake8: noqa: E501 -from typing import Any, Optional, Union -from pydantic import BaseModel, Field from dataclasses import dataclass -from dlisio.dlis import Fileheader -MAIN_PARAMETERS = [ - 'CN', 'WN', 'FN', - 'NATI', 'CONT', 'FL', - 'FL1', 'FL2', 'LONG', - 'LATI', 'DLAB', 'TLAB', - 'CSIZ', 'THNO', 'BS' -] +MAIN_PARAMETERS = ['CN', 'WN', 'FN', 'NATI', 'CONT', 'FL', 'FL1', 'FL2', 'LONG', 'LATI', 'DLAB', 'TLAB', 'CSIZ', 'THNO', 'BS'] @dataclass class PhysicalFileSummary: - """ - Entity to hold the description lines and a master dict of - a summary representation of the file. - """ - lines_desc: list[str] - file_summary_dict: list[dict] + """ + Entity to hold the description lines and a master dict of + a summary representation of the file. + """ + + lines_desc: list[str] + file_summary_dict: list[dict] diff --git a/wellbelog/belodlis/reader.py b/wellbelog/belodlis/reader.py index b734319..a7d471c 100644 --- a/wellbelog/belodlis/reader.py +++ b/wellbelog/belodlis/reader.py @@ -12,131 +12,130 @@ class DlisReader: - """ - This class is responsible for processing the dlis.PhysicalFile object - """ - - def __init__(self) -> None: - self. logger = setup_logger(__class__.__name__) - - def load_raw(self, path_to_file: str, unpack=False) -> PhysicalFileModel: - """ - Load a DLIS file and return the raw data. - - Args: - path_to_file (str): Path to the DLIS file. - - Returns: - PhysicalFileModel: The raw data. - """ - file = open_dlis_file(pathlib.Path(path_to_file).absolute()) - if isinstance(file, Exception): - self.logger.error(f'Error while opening the DLIS file: {file}') - raise file - if unpack: - return unpack_physical_dlis(file) - return file - - def search_files(self, path: str) -> Optional[list[pathlib.Path]]: - """ - Search for DLIS files in the given path and returns a list with the file paths. - - Parameters: - path (str): The path to the folder where the DLIS files are located. - """ - dlis_files = [] - try: - for file in pathlib.Path(path).rglob('*.dlis'): - dlis_files.append(file) - return dlis_files - except Exception as e: - self.logger.error(f'Error while searching for DLIS files: {e}') - return None - - def process_physical_file(self, path: str, folder_name: str = None) -> PhysicalFileModel: - """ - Process the PhysicalFile object and returns a PhysicalFileModel with the main data. - - Parameters: - path (str): The path to the DLIS file. - folder_name (str): The name of the folder where the file is located. - """ - # Creates a file name from the path and a BeloDlisWrapper object - file_name = pathlib.Path(path).name - self.logger.info(f'Processing the file: {file_name}') - # XXX Create a PhysicalFileModel object - physical = PhysicalFileModel(file_name=file_name, logical_files=[], folder_name=folder_name) - # Open the DLIS file or raise an exception - file = open_dlis_file(pathlib.Path(path).absolute()) - if isinstance(file, Exception): - self.logger.error(f'Error while opening the DLIS file: {file}') - physical.error = True - physical.error_message = file.__str__() - return physical - - logical_files = unpack_physical_dlis(file) - # Process each logical file - for file in logical_files: - logical_file = LogicalFileModel(file_name=file_name, frames=[]) - try: - logical_file.logical_id = file.fileheader.id - logical_file.summary = get_logical_file_summary(file) - frames: list[Frame] = file.find('FRAME') - - # If there are no frames, set the error flag and the error message - if not frames: - logical_file.error = True - logical_file.error_message = 'No frames found in the logical file' - physical.error_files.append(logical_file) - continue - - # NOTE Process each frame - # iterate over the frames and process each one - for frame in frames: - - # Create a frame model and process the frame, - # NOTE it will return None if the frame has a DUMM channel - frame_model = FrameProcessor.process_frame(frame, file_name=file_name, logical_id=file.fileheader.id) - - # If 'DUMM' channel is found, set the error flag and the error message - if frame_model is None: - logical_file.error = True - logical_file.error_message = 'DUMM channel found in the frame' - physical.error_files.append(logical_file) - continue - - # Process the frame and check for DUMM channels - data = FrameProcessor.dlis_curves_to_dataframe(frame) - # Check if the data is an exception - # If it is, set the error flag and the error message - if data is Exception: - self.logger.error(f'Error while processing the frame: {data}') - frame_model.error = True - frame_model.error_message = data.__str__() - logical_file.error = True - logical_file.error_message = data.__str__() - logical_file.frames.append(frame_model) - physical.error_files.append(logical_file) - continue - - # Create a frame data model - frame_data = FrameDataframe( - file_name=file_name, - logical_file_id=file.fileheader.id, - data=json.loads(data.to_json(orient='records')), - ) - # Set the frame data to the frame model - frame_model.data = frame_data - logical_file.frames.append(frame_model) - - # Append the logical file to the physical file - physical.logical_files.append(logical_file) - - return physical - - except Exception as e: - self.logger.error(f'Error while processing the logical file: {e}') - logical_file.error = True - logical_file.error_message = e.__str__() - physical.error_files.append(logical_file) - continue + """ + This class is responsible for processing the dlis.PhysicalFile object + """ + + def __init__(self) -> None: + self.logger = setup_logger(__class__.__name__) + + def load_raw(self, path_to_file: str, unpack=False) -> PhysicalFileModel: + """ + Load a DLIS file and return the raw data. + + Args: + path_to_file (str): Path to the DLIS file. + + Returns: + PhysicalFileModel: The raw data. + """ + file = open_dlis_file(pathlib.Path(path_to_file).absolute()) + if isinstance(file, Exception): + self.logger.error(f'Error while opening the DLIS file: {file}') + raise file + if unpack: + return unpack_physical_dlis(file) + return file + + def search_files(self, path: str) -> Optional[list[pathlib.Path]]: + """ + Search for DLIS files in the given path and returns a list with the file paths. + + Parameters: + path (str): The path to the folder where the DLIS files are located. + """ + dlis_files = [] + try: + for file in pathlib.Path(path).rglob('*.dlis'): + dlis_files.append(file) + return dlis_files + except Exception as e: + self.logger.error(f'Error while searching for DLIS files: {e}') + return None + + def process_physical_file(self, path: str, folder_name: str = None) -> PhysicalFileModel: + """ + Process the PhysicalFile object and returns a PhysicalFileModel with the main data. + + Parameters: + path (str): The path to the DLIS file. + folder_name (str): The name of the folder where the file is located. + """ + # Creates a file name from the path and a BeloDlisWrapper object + file_name = pathlib.Path(path).name + self.logger.info(f'Processing the file: {file_name}') + # XXX Create a PhysicalFileModel object + physical = PhysicalFileModel(file_name=file_name, logical_files=[], folder_name=folder_name) + # Open the DLIS file or raise an exception + file = open_dlis_file(pathlib.Path(path).absolute()) + if isinstance(file, Exception): + self.logger.error(f'Error while opening the DLIS file: {file}') + physical.error = True + physical.error_message = file.__str__() + return physical + + logical_files = unpack_physical_dlis(file) + # Process each logical file + for file in logical_files: + logical_file = LogicalFileModel(file_name=file_name, frames=[]) + try: + logical_file.logical_id = file.fileheader.id + logical_file.summary = get_logical_file_summary(file) + frames: list[Frame] = file.find('FRAME') + + # If there are no frames, set the error flag and the error message + if not frames: + logical_file.error = True + logical_file.error_message = 'No frames found in the logical file' + physical.error_files.append(logical_file) + continue + + # NOTE Process each frame + # iterate over the frames and process each one + for frame in frames: + # Create a frame model and process the frame, + # NOTE it will return None if the frame has a DUMM channel + frame_model = FrameProcessor.process_frame(frame, file_name=file_name, logical_id=file.fileheader.id) + + # If 'DUMM' channel is found, set the error flag and the error message + if frame_model is None: + logical_file.error = True + logical_file.error_message = 'DUMM channel found in the frame' + physical.error_files.append(logical_file) + continue + + # Process the frame and check for DUMM channels + data = FrameProcessor.dlis_curves_to_dataframe(frame) + # Check if the data is an exception + # If it is, set the error flag and the error message + if data is Exception: + self.logger.error(f'Error while processing the frame: {data}') + frame_model.error = True + frame_model.error_message = data.__str__() + logical_file.error = True + logical_file.error_message = data.__str__() + logical_file.frames.append(frame_model) + physical.error_files.append(logical_file) + continue + + # Create a frame data model + frame_data = FrameDataframe( + file_name=file_name, + logical_file_id=file.fileheader.id, + data=json.loads(data.to_json(orient='records')), + ) + # Set the frame data to the frame model + frame_model.data = frame_data + logical_file.frames.append(frame_model) + + # Append the logical file to the physical file + physical.logical_files.append(logical_file) + + return physical + + except Exception as e: + self.logger.error(f'Error while processing the logical file: {e}') + logical_file.error = True + logical_file.error_message = e.__str__() + physical.error_files.append(logical_file) + continue diff --git a/wellbelog/belolas/functions.py b/wellbelog/belolas/functions.py index 08c6e12..21ab347 100644 --- a/wellbelog/belolas/functions.py +++ b/wellbelog/belolas/functions.py @@ -4,25 +4,25 @@ def open_las_file(path: str) -> lasio.las.LASFile: - """ - Opens a LAS file and returns a LASFile object. - """ - try: - las_file = lasio.read(path) - return las_file - except Exception as e: - return e + """ + Opens a LAS file and returns a LASFile object. + """ + try: + las_file = lasio.read(path) + return las_file + except Exception as e: + return e def process_curves_items(las_file: lasio.las.LASFile) -> LasCurvesSpecs: - """ - Processes the curves items in a LAS file and returns a LasCurvesSpecs object. - """ - curves = las_file.curves - specs = [] - for curve in curves: - curve_dict = curve.__dict__ - data = curve_dict.pop('data') - spec = LasCurvesSpecs(**curve_dict, shape=data.shape) - specs.append(spec) - return specs + """ + Processes the curves items in a LAS file and returns a LasCurvesSpecs object. + """ + curves = las_file.curves + specs = [] + for curve in curves: + curve_dict = curve.__dict__ + data = curve_dict.pop('data') + spec = LasCurvesSpecs(**curve_dict, shape=data.shape) + specs.append(spec) + return specs diff --git a/wellbelog/belolas/reader.py b/wellbelog/belolas/reader.py index 3cbbdbf..08e6218 100644 --- a/wellbelog/belolas/reader.py +++ b/wellbelog/belolas/reader.py @@ -7,90 +7,87 @@ class LasReader: - """ - Class responsible for creating a LasFileModel object from a physical file. - """ - - def __init__(self) -> None: - self.logger = setup_logger(__class__.__name__) - - def load_raw(self, path_to_file: str) -> LasFileModel: - """ - Load a LAS file and return the raw data. - - Args: - path_to_file (str): Path to the LAS file. - - Returns: - LasFileModel: The raw data. - """ - return open_las_file(path_to_file) - - def search_files(self, path: str) -> list[pathlib.Path]: - """ - Search for LAS files in the given path and returns a list with the file paths. - - Args: - path (str): The path to the folder where the LAS files are located. - - Returns: - list[pathlib.Path]: A list with the file paths. - - Raises: - None. - """ - las_files = [] - try: - for file in pathlib.Path(path).rglob('*.las'): - las_files.append(file) - return las_files - except Exception as e: - self.logger.error(f'Error while searching for LAS files: {e}') - return las_files - - def process_las_file(self, path: str) -> LasFileModel: - """ - Processes a physical file and returns a LasFileModel object. - - Args: - path (str): The path to the file. - - Returns: - LasFileModel: A LasFileModel object representing the file. - - Raises: - None. - """ - # Create a LasFileModel object with the file name - file_name = pathlib.Path(path).name - las_file_model = LasFileModel(file_name=file_name) - - # Open the LAS file - file = open_las_file(path) - - # NOTE If the file has any error, return the error message - if isinstance(file, Exception): - las_file_model.error = True - las_file_model.error_message = file - return las_file_model - - try: - las_data = file.df() - shape_data = las_data.shape - las_curves_specs = process_curves_items(file) - las_file_model.specs = las_curves_specs - - las_dataframe = LasDataframe( - data=json.loads(las_data.to_json(orient='records')), - file_name=file_name, - columns=file.keys(), - shape=shape_data - ) - las_file_model.data = las_dataframe - return las_file_model - - except Exception as e: - self.logger.error(f'Error while processing LAS file: {e}') - las_file_model.error = True - las_file_model.error_message = str(e) - return las_file_model + """ + Class responsible for creating a LasFileModel object from a physical file. + """ + + def __init__(self) -> None: + self.logger = setup_logger(__class__.__name__) + + def load_raw(self, path_to_file: str) -> LasFileModel: + """ + Load a LAS file and return the raw data. + + Args: + path_to_file (str): Path to the LAS file. + + Returns: + LasFileModel: The raw data. + """ + return open_las_file(path_to_file) + + def search_files(self, path: str) -> list[pathlib.Path]: + """ + Search for LAS files in the given path and returns a list with the file paths. + + Args: + path (str): The path to the folder where the LAS files are located. + + Returns: + list[pathlib.Path]: A list with the file paths. + + Raises: + None. + """ + las_files = [] + try: + for file in pathlib.Path(path).rglob('*.las'): + las_files.append(file) + return las_files + except Exception as e: + self.logger.error(f'Error while searching for LAS files: {e}') + return las_files + + def process_las_file(self, path: str) -> LasFileModel: + """ + Processes a physical file and returns a LasFileModel object. + + Args: + path (str): The path to the file. + + Returns: + LasFileModel: A LasFileModel object representing the file. + + Raises: + None. + """ + # Create a LasFileModel object with the file name + file_name = pathlib.Path(path).name + las_file_model = LasFileModel(file_name=file_name) + + # Open the LAS file + file = open_las_file(path) + + # NOTE If the file has any error, return the error message + if isinstance(file, Exception): + las_file_model.error = True + las_file_model.error_message = file + return las_file_model + + try: + las_data = file.df() + shape_data = las_data.shape + las_curves_specs = process_curves_items(file) + las_file_model.specs = las_curves_specs + + las_dataframe = LasDataframe( + data=json.loads(las_data.to_json(orient='records')), file_name=file_name, columns=file.keys(), shape=shape_data + ) + las_file_model.data = las_dataframe + return las_file_model + + except Exception as e: + self.logger.error(f'Error while processing LAS file: {e}') + las_file_model.error = True + las_file_model.error_message = str(e) + return las_file_model diff --git a/wellbelog/belolis/functions.py b/wellbelog/belolis/functions.py index 0b93b99..6f37641 100644 --- a/wellbelog/belolis/functions.py +++ b/wellbelog/belolis/functions.py @@ -5,146 +5,146 @@ def read_lis_file(path_to_file: str) -> Union[lis.PhysicalFile, Exception]: - """ - Read a LIS file and return a list of LogicalFile objects. + """ + Read a LIS file and return a list of LogicalFile objects. - Args: - path_to_file (str): Path to the LIS file. + Args: + path_to_file (str): Path to the LIS file. - Returns: - list[lis.LogicalFile]: List of LogicalFile objects. - """ - try: - return lis.load(path_to_file) - except Exception as e: - return e + Returns: + list[lis.LogicalFile]: List of LogicalFile objects. + """ + try: + return lis.load(path_to_file) + except Exception as e: + return e def parse_lis_physical_file(file: lis.PhysicalFile) -> list[lis.LogicalFile]: - """ - Transform a LIS physical file into a list of LIS logical files. - """ - *f, = file - return f + """ + Transform a LIS physical file into a list of LIS logical files. + """ + (*f,) = file + return f def get_lis_data_spec(file: lis.LogicalFile) -> Union[lis.DataFormatSpec, list[lis.DataFormatSpec]]: - """ - Get the data specification of a LIS file. + """ + Get the data specification of a LIS file. - Args: - file (lis.LogicalFile): A LIS file. + Args: + file (lis.LogicalFile): A LIS file. - Returns: - dict: Data specification of the LIS file. - """ - try: - dataspec = file.data_format_specs() - if len(dataspec) == 1: - return dataspec[0] - return dataspec + Returns: + dict: Data specification of the LIS file. + """ + try: + dataspec = file.data_format_specs() + if len(dataspec) == 1: + return dataspec[0] + return dataspec - except Exception as e: - print(e) + except Exception as e: + print(e) def get_physical_lis_specs(physical_file: lis.LogicalFile, attrs: list[str]) -> list[dict[str, str]]: - """ - Get the attributes of a LIS file. - - Args: - physical_file (lis.PhysicalFile): A LIS physical file. - attrs (list[str]): List of attributes to be extracted. - - Returns: - list[dict[str, str]]: A list of dictionaries, where each dictionary represents a block of attributes. - Each dictionary contains the specified attributes as keys and their corresponding values as values. - """ - result = [] - spec = get_lis_data_spec(physical_file) - if not spec: - return result - if isinstance(spec, lis.DataFormatSpec): - specs = [spec] - elif isinstance(spec, list): - specs = spec - else: - return result - - for s in specs: - for block in s.specs: - block_dict = {} - for atr in attrs: - if hasattr(block, atr): - value = getattr(block, atr) - if isinstance(value, str): - value = value.strip() - block_dict[atr] = value - result.append(block_dict) - - result = [dict(t) for t in {tuple(d.items()) for d in result}] - return result + """ + Get the attributes of a LIS file. + + Args: + physical_file (lis.PhysicalFile): A LIS physical file. + attrs (list[str]): List of attributes to be extracted. + + Returns: + list[dict[str, str]]: A list of dictionaries, where each dictionary represents a block of attributes. + Each dictionary contains the specified attributes as keys and their corresponding values as values. + """ + result = [] + spec = get_lis_data_spec(physical_file) + if not spec: + return result + if isinstance(spec, lis.DataFormatSpec): + specs = [spec] + elif isinstance(spec, list): + specs = spec + else: + return result + + for s in specs: + for block in s.specs: + block_dict = {} + for atr in attrs: + if hasattr(block, atr): + value = getattr(block, atr) + if isinstance(value, str): + value = value.strip() + block_dict[atr] = value + result.append(block_dict) + + result = [dict(t) for t in {tuple(d.items()) for d in result}] + return result def get_lis_wellsite_components(logical_file: lis.LogicalFile) -> list[dict]: - """ - Retrieves the components of each record in a logical file. - - Args: - logical_file (lis.LogicalFile): The logical file containing the records. - - Returns: - list[dict]: A list of dictionaries, where each dictionary represents a component of a record. - Each dictionary contains the following keys: - - 'mnemonic': The mnemonic of the component. - - 'units': The units of the component. - - 'component': The component value. - - """ - records = logical_file.wellsite_data() - file_records = [] - for record in records: - component = record.components() - component_dict = {} - for c in component: - component_dict['mnemonic'] = c.mnemonic.strip() if isinstance(c.mnemonic, str) else str(c.mnemonic).strip() - component_dict['units'] = c.units.strip() if isinstance(c.units, str) else str(c.units).strip() - component_dict['component'] = c.component.strip() if isinstance(c.component, str) else str(c.component).strip() - file_records.append(component_dict) - return file_records + """ + Retrieves the components of each record in a logical file. + + Args: + logical_file (lis.LogicalFile): The logical file containing the records. + + Returns: + list[dict]: A list of dictionaries, where each dictionary represents a component of a record. + Each dictionary contains the following keys: + - 'mnemonic': The mnemonic of the component. + - 'units': The units of the component. + - 'component': The component value. + + """ + records = logical_file.wellsite_data() + file_records = [] + for record in records: + component = record.components() + component_dict = {} + for c in component: + component_dict['mnemonic'] = c.mnemonic.strip() if isinstance(c.mnemonic, str) else str(c.mnemonic).strip() + component_dict['units'] = c.units.strip() if isinstance(c.units, str) else str(c.units).strip() + component_dict['component'] = c.component.strip() if isinstance(c.component, str) else str(c.component).strip() + file_records.append(component_dict) + return file_records def get_curves(logical_file: lis.LogicalFile) -> list[pd.DataFrame]: - """ - Get the curves of a LIS file. - - Args: - logical_file (lis.LogicalFile): A LIS file. - - Returns: - pd.DataFrame: A DataFrame containing the curves. - """ - try: - format_specs = logical_file.data_format_specs() - - if len(format_specs) == 1: - format_spec = format_specs[0] - sample_rates = format_spec.sample_rates() - if len(sample_rates) == 1: - sample_rate = sample_rates - data = lis.curves(logical_file, format_spec) - df = pd.DataFrame(data) - df.columns = df.columns.str.strip() - return [df] - else: - dfs = [] - for format_spec in format_specs: - sample_rates = format_spec.sample_rates() - for sample_rate in sample_rates: - data = lis.curves(logical_file, format_spec, sample_rate) - df = pd.DataFrame(data) - df.columns = df.columns.str.strip() - dfs.append(df) - return dfs - except Exception as e: - print(e) + """ + Get the curves of a LIS file. + + Args: + logical_file (lis.LogicalFile): A LIS file. + + Returns: + pd.DataFrame: A DataFrame containing the curves. + """ + try: + format_specs = logical_file.data_format_specs() + + if len(format_specs) == 1: + format_spec = format_specs[0] + sample_rates = format_spec.sample_rates() + if len(sample_rates) == 1: + sample_rate = sample_rates + data = lis.curves(logical_file, format_spec) + df = pd.DataFrame(data) + df.columns = df.columns.str.strip() + return [df] + else: + dfs = [] + for format_spec in format_specs: + sample_rates = format_spec.sample_rates() + for sample_rate in sample_rates: + data = lis.curves(logical_file, format_spec, sample_rate) + df = pd.DataFrame(data) + df.columns = df.columns.str.strip() + dfs.append(df) + return dfs + except Exception as e: + print(e) diff --git a/wellbelog/belolis/reader.py b/wellbelog/belolis/reader.py index 7038377..44b6949 100644 --- a/wellbelog/belolis/reader.py +++ b/wellbelog/belolis/reader.py @@ -7,133 +7,138 @@ from wellbelog.utils.logging import setup_logger from .functions import read_lis_file, parse_lis_physical_file, get_curves, get_physical_lis_specs, get_lis_wellsite_components from ..schemas.lis import ( - FrameLisCurves, LisLogicalFileSpecsDict, LisLogicalFileWellSiteSpecDict, - PhysicalLisFileModel, LisLogicalWellSiteSpec, LOGICAL_FILE_ATTR, - LogicalLisFileModel, LisLogicalSpecs + FrameLisCurves, + LisLogicalFileSpecsDict, + LisLogicalFileWellSiteSpecDict, + PhysicalLisFileModel, + LisLogicalWellSiteSpec, + LOGICAL_FILE_ATTR, + LogicalLisFileModel, + LisLogicalSpecs, ) class LisReader: - """ - Class responsible for managing LIS files. - - Attributes: - logger: The logger instance for logging messages. - - Methods: - process_physical_file: Reads a LIS file and returns a list of LogicalFile objects. - """ - - def __init__(self) -> None: - self.logger = setup_logger(__class__.__name__) - - def search_files(self, path: str) -> list[pathlib.Path]: - """ - Search for LIS files in the given path and returns a list with the file paths. - - Args: - path (str): The path to the folder where the LIS files are located. - - Returns: - list[pathlib.Path]: A list with the file paths. - """ - lis_files = [] - try: - for file in pathlib.Path(path).rglob('*.lis'): - lis_files.append(file) - return lis_files - except Exception as e: - self.logger.error(f'Error while searching for LIS files: {e}') - return lis_files - - def load_raw(self, path_to_file: str, unpack=False) -> Union[lis.PhysicalFile, list[lis.LogicalFile]]: - """ - Load a LIS file and return the raw data. - Optionally, unpack the file and return the logical files. - - Args: - path_to_file (str): Path to the LIS file. - - Returns: - Union[lis.PhysicalFile, list[lis.LogicalFile]]: The raw data. - """ - file = read_lis_file(path_to_file) - if isinstance(file, Exception): - self.logger.error(f'Error while opening the LIS file: {file}') - raise file - if unpack: - return parse_lis_physical_file(file) - return file - - def process_physical_file(self, path_to_file: str, folder_name: str = None) -> PhysicalLisFileModel: - """ - Read a LIS file and return a list of LogicalFile objects. - - Args: - path_to_file(str): Path to the LIS file. - - Returns: - list[lis.LogicalFile]: List of LogicalFile objects. - """ - file_name = pathlib.Path(path_to_file) - assert file_name.is_file(), f"File {file_name} not found." - file = PhysicalLisFileModel(file_name=file_name.name, logical_files=[], folder_name=folder_name) - - # Reading the LIS file - physical = read_lis_file(path_to_file) - # NOTE Handling exceptions, if any, mark the file as errored and return it - if isinstance(physical, Exception): - self.logger.error(f"Error reading file: {physical}") - self.logger.error(physical) - file.error = True - file.error_message = "Error reading file" - return file - - # NOTE Unpacking the physical file - logical_files = parse_lis_physical_file(physical) - - # XXX Creating LogicalFile objects - for logical_file in logical_files: - logical_file_id = logical_files.index(logical_file) - - logical_file_model = LogicalLisFileModel( - file_name=file_name.name, - logical_id=logical_file_id, - ) - logical_file_model.header = logical_file.header() - - # NOTE Getting well site specifications - well_specs = [d for d in get_lis_wellsite_components(logical_file)] - _specs_dicts = [LisLogicalFileWellSiteSpecDict(**spec) for spec in well_specs] - well_site_specs = LisLogicalWellSiteSpec(file_name=file_name.name, logical_id=logical_file_id, specs_dicts=_specs_dicts) # noqa - - logical_file_model.well_site_specs = well_site_specs - # NOTE Getting the curves - try: - # NOTE Getting the physical file specifications - physical_specs = [LisLogicalFileSpecsDict(**spec) for spec in get_physical_lis_specs(logical_file, LOGICAL_FILE_ATTR)] # noqa - lis_logical_specs = LisLogicalSpecs(file_name=file_name.name, logical_id=logical_file_id, specs_dicts=physical_specs) # noqa - logical_file_model.specs = lis_logical_specs - curves = get_curves(logical_file) - curves_set_names = set() - for curve in curves: - curve_model = FrameLisCurves( - file_name=file_name.name, - logical_file_id=logical_file_id, - data=json.loads(curve.to_json(orient="records")), - ) - logical_file_model.frames.append(curve_model) - for curve_name in curve.columns: - curves_set_names.add(curve_name) - logical_file_model.curves_names = list(curves_set_names) - file.logical_files.append(logical_file_model) - - # NOTE Handling exceptions, if any, and appending the error to the error_files list - except Exception as e: - logical_file_model.error = True - logical_file_model.error_message = str(e) - file.error_files.append(logical_file_model) - self.logger.error(f"Error processing file: {e}") - continue - - return file + """ + Class responsible for managing LIS files. + + Attributes: + logger: The logger instance for logging messages. + + Methods: + process_physical_file: Reads a LIS file and returns a list of LogicalFile objects. + """ + + def __init__(self) -> None: + self.logger = setup_logger(__class__.__name__) + + def search_files(self, path: str) -> list[pathlib.Path]: + """ + Search for LIS files in the given path and returns a list with the file paths. + + Args: + path (str): The path to the folder where the LIS files are located. + + Returns: + list[pathlib.Path]: A list with the file paths. + """ + lis_files = [] + try: + for file in pathlib.Path(path).rglob('*.lis'): + lis_files.append(file) + return lis_files + except Exception as e: + self.logger.error(f'Error while searching for LIS files: {e}') + return lis_files + + def load_raw(self, path_to_file: str, unpack=False) -> Union[lis.PhysicalFile, list[lis.LogicalFile]]: + """ + Load a LIS file and return the raw data. + Optionally, unpack the file and return the logical files. + + Args: + path_to_file (str): Path to the LIS file. + + Returns: + Union[lis.PhysicalFile, list[lis.LogicalFile]]: The raw data. + """ + file = read_lis_file(path_to_file) + if isinstance(file, Exception): + self.logger.error(f'Error while opening the LIS file: {file}') + raise file + if unpack: + return parse_lis_physical_file(file) + return file + + def process_physical_file(self, path_to_file: str, folder_name: str = None) -> PhysicalLisFileModel: + """ + Read a LIS file and return a list of LogicalFile objects. + + Args: + path_to_file(str): Path to the LIS file. + + Returns: + list[lis.LogicalFile]: List of LogicalFile objects. + """ + file_name = pathlib.Path(path_to_file) + assert file_name.is_file(), f'File {file_name} not found.' + file = PhysicalLisFileModel(file_name=file_name.name, logical_files=[], folder_name=folder_name) + + # Reading the LIS file + physical = read_lis_file(path_to_file) + # NOTE Handling exceptions, if any, mark the file as errored and return it + if isinstance(physical, Exception): + self.logger.error(f'Error reading file: {physical}') + self.logger.error(physical) + file.error = True + file.error_message = 'Error reading file' + return file + + # NOTE Unpacking the physical file + logical_files = parse_lis_physical_file(physical) + + # XXX Creating LogicalFile objects + for logical_file in logical_files: + logical_file_id = logical_files.index(logical_file) + + logical_file_model = LogicalLisFileModel( + file_name=file_name.name, + logical_id=logical_file_id, + ) + logical_file_model.header = logical_file.header() + + # NOTE Getting well site specifications + well_specs = [d for d in get_lis_wellsite_components(logical_file)] + _specs_dicts = [LisLogicalFileWellSiteSpecDict(**spec) for spec in well_specs] + well_site_specs = LisLogicalWellSiteSpec(file_name=file_name.name, logical_id=logical_file_id, specs_dicts=_specs_dicts) # noqa + + logical_file_model.well_site_specs = well_site_specs + # NOTE Getting the curves + try: + # NOTE Getting the physical file specifications + physical_specs = [LisLogicalFileSpecsDict(**spec) for spec in get_physical_lis_specs(logical_file, LOGICAL_FILE_ATTR)] # noqa + lis_logical_specs = LisLogicalSpecs(file_name=file_name.name, logical_id=logical_file_id, specs_dicts=physical_specs) # noqa + logical_file_model.specs = lis_logical_specs + curves = get_curves(logical_file) + curves_set_names = set() + for curve in curves: + curve_model = FrameLisCurves( + file_name=file_name.name, + logical_file_id=logical_file_id, + data=json.loads(curve.to_json(orient='records')), + ) + logical_file_model.frames.append(curve_model) + for curve_name in curve.columns: + curves_set_names.add(curve_name) + logical_file_model.curves_names = list(curves_set_names) + file.logical_files.append(logical_file_model) + + # NOTE Handling exceptions, if any, and appending the error to the error_files list + except Exception as e: + logical_file_model.error = True + logical_file_model.error_message = str(e) + file.error_files.append(logical_file_model) + self.logger.error(f'Error processing file: {e}') + continue + + return file diff --git a/wellbelog/main_reader.py b/wellbelog/main_reader.py index bb6e479..4a7c928 100644 --- a/wellbelog/main_reader.py +++ b/wellbelog/main_reader.py @@ -15,75 +15,75 @@ class MainReader: - """ - Class responsible for managing LIS, LAS, DLIS, and TIFF files. - """ - - def __init__(self) -> None: - self.logger = setup_logger(__class__.__name__) - self.dlis_reader = DlisReader() - self.las_reader = LasReader() - self.lis_reader = LisReader() - - def load_file(self, path: str) -> ReaderReturnType: - """ - Load a file and return a list of LogicalFile objects based on its extension. - - Args: - path (str): Path to the file. - - Returns: - list[LogicalFile]: List of LogicalFile objects. - """ - file_extension = pathlib.Path(path).suffix.lower() - - if file_extension == '.lis': - return self._attempt_reading(path, self.lis_reader.process_physical_file, self.dlis_reader.process_physical_file) - elif file_extension == '.las': - return self._attempt_reading(path, self.las_reader.process_las_file) - elif file_extension == '.dlis': - return self._attempt_reading(path, self.dlis_reader.process_physical_file, self.lis_reader.process_physical_file) - elif file_extension == '.tiff': - return self._attempt_reading(path, self.dlis_reader.process_physical_file, self.lis_reader.process_physical_file) - else: - self.logger.error(f"Unsupported file type: {file_extension}") - raise ValueError(f"Unsupported file type: {file_extension}") - - def _attempt_reading(self, path: str, primary_reader: ReaderMethod, fallback_reader: ReaderMethod = None) -> ReaderReturnType: - """ - Attempt to read the file with a primary reader. If it fails and a fallback is provided, try with the fallback. - - Args: - path (str): Path to the file. - primary_reader (callable): Primary function to read the file. - fallback_reader (callable, optional): Fallback function if the primary fails. - - Returns: - list[LogicalFile]: List of LogicalFile objects. - """ - try: - return primary_reader(path) - - except Exception as primary_error: - self.logger.error(f"Primary reader failed for file {path}: {primary_error}") - if fallback_reader: - self.logger.info(f"Attempting to read {path} with fallback reader.") - try: - return fallback_reader(path) - except Exception as fallback_error: - self.logger.error(f"Fallback reader also failed for file {path}: {fallback_error}") - raise - else: - raise - - def read_tiff_file(self, path: str) -> ReaderReturnType: - """ - Reads a TIFF file, first attempting with DLIS reader, then falling back to LIS reader. - - Args: - path (str): Path to the TIFF file. - - Returns: - list[LogicalFile]: List of LogicalFile objects. - """ - return self._attempt_reading(path, self.dlis_reader.process_physical_file, self.lis_reader.process_physical_file) + """ + Class responsible for managing LIS, LAS, DLIS, and TIFF files. + """ + + def __init__(self) -> None: + self.logger = setup_logger(__class__.__name__) + self.dlis_reader = DlisReader() + self.las_reader = LasReader() + self.lis_reader = LisReader() + + def load_file(self, path: str) -> ReaderReturnType: + """ + Load a file and return a list of LogicalFile objects based on its extension. + + Args: + path (str): Path to the file. + + Returns: + list[LogicalFile]: List of LogicalFile objects. + """ + file_extension = pathlib.Path(path).suffix.lower() + + if file_extension == '.lis': + return self._attempt_reading(path, self.lis_reader.process_physical_file, self.dlis_reader.process_physical_file) + elif file_extension == '.las': + return self._attempt_reading(path, self.las_reader.process_las_file) + elif file_extension == '.dlis': + return self._attempt_reading(path, self.dlis_reader.process_physical_file, self.lis_reader.process_physical_file) + elif file_extension == '.tiff': + return self._attempt_reading(path, self.dlis_reader.process_physical_file, self.lis_reader.process_physical_file) + else: + self.logger.error(f'Unsupported file type: {file_extension}') + raise ValueError(f'Unsupported file type: {file_extension}') + + def _attempt_reading(self, path: str, primary_reader: ReaderMethod, fallback_reader: ReaderMethod = None) -> ReaderReturnType: + """ + Attempt to read the file with a primary reader. If it fails and a fallback is provided, try with the fallback. + + Args: + path (str): Path to the file. + primary_reader (callable): Primary function to read the file. + fallback_reader (callable, optional): Fallback function if the primary fails. + + Returns: + list[LogicalFile]: List of LogicalFile objects. + """ + try: + return primary_reader(path) + + except Exception as primary_error: + self.logger.error(f'Primary reader failed for file {path}: {primary_error}') + if fallback_reader: + self.logger.info(f'Attempting to read {path} with fallback reader.') + try: + return fallback_reader(path) + except Exception as fallback_error: + self.logger.error(f'Fallback reader also failed for file {path}: {fallback_error}') + raise + else: + raise + + def read_tiff_file(self, path: str) -> ReaderReturnType: + """ + Reads a TIFF file, first attempting with DLIS reader, then falling back to LIS reader. + + Args: + path (str): Path to the TIFF file. + + Returns: + list[LogicalFile]: List of LogicalFile objects. + """ + return self._attempt_reading(path, self.dlis_reader.process_physical_file, self.lis_reader.process_physical_file) diff --git a/wellbelog/plotters/general.py b/wellbelog/plotters/general.py index cd2ffc1..00ee221 100644 --- a/wellbelog/plotters/general.py +++ b/wellbelog/plotters/general.py @@ -5,36 +5,36 @@ def plot_all_curves(data: DataFrame, depth_column: str = 'DEPT', title='Geological Curves', figsize=(15, 10)) -> Figure: - """ - Function to create subplots for each column with depth as index. + """ + Function to create subplots for each column with depth as index. - Parameters: - data (pd.DataFrame): DataFrame containing the data. - depth_column (str): The name of the depth column. - figsize (tuple): Size of the figure. - title (str): Title of the plot. - """ - # Get the list of columns to plot (exclude the depth column) - columns = [col for col in data.columns if col != depth_column] - num_plots = len(columns) - # Create subplots - fig, axes = plt.subplots(nrows=1, ncols=num_plots, sharey=True, figsize=figsize) - axes: list[Axes] - # TODO Find a a better way to deal with color cycling - # Colors for the plots (cycle through colors if there are more columns than colors) - colors = ['green', 'red', 'blue', 'black', 'purple'] + Parameters: + data (pd.DataFrame): DataFrame containing the data. + depth_column (str): The name of the depth column. + figsize (tuple): Size of the figure. + title (str): Title of the plot. + """ + # Get the list of columns to plot (exclude the depth column) + columns = [col for col in data.columns if col != depth_column] + num_plots = len(columns) + # Create subplots + fig, axes = plt.subplots(nrows=1, ncols=num_plots, sharey=True, figsize=figsize) + axes: list[Axes] + # TODO Find a a better way to deal with color cycling + # Colors for the plots (cycle through colors if there are more columns than colors) + colors = ['green', 'red', 'blue', 'black', 'purple'] - for i, col in enumerate(columns): - axes[i].plot(data[col], data[depth_column], label=col, color=colors[i % len(colors)]) - axes[i].set_xlabel(col) - axes[i].grid(True) + for i, col in enumerate(columns): + axes[i].plot(data[col], data[depth_column], label=col, color=colors[i % len(colors)]) + axes[i].set_xlabel(col) + axes[i].grid(True) - # Set shared y-axis label and invert y-axis - axes[0].set_ylabel(depth_column) - axes[0].invert_yaxis() # Invert the y-axis to have depth increase downwards + # Set shared y-axis label and invert y-axis + axes[0].set_ylabel(depth_column) + axes[0].invert_yaxis() # Invert the y-axis to have depth increase downwards - # Set the title - fig.suptitle(title) - # Adjust layout to make room for the title - plt.tight_layout(rect=[0, 0, 1, 0.96]) - return fig + # Set the title + fig.suptitle(title) + # Adjust layout to make room for the title + plt.tight_layout(rect=[0, 0, 1, 0.96]) + return fig diff --git a/wellbelog/schemas/base_schema.py b/wellbelog/schemas/base_schema.py index c390a54..31a065d 100644 --- a/wellbelog/schemas/base_schema.py +++ b/wellbelog/schemas/base_schema.py @@ -1,4 +1,3 @@ - from datetime import datetime from typing import Union @@ -9,85 +8,93 @@ class HasIdSchema(BaseModel): - ''' - Base class for all models that have an id field. + """ + Base class for all models that have an id field. + + Attributes: + id: str: Unique identifier for the model. + """ - Attributes: - id: str: Unique identifier for the model. - ''' - id: Union[str, ObjectId] = Field(None, alias='_id') + id: Union[str, ObjectId] = Field(None, alias='_id') - model_config = { - 'arbitrary_types_allowed': True, - } + model_config = { + 'arbitrary_types_allowed': True, + } class TimeStampedModelSchema(HasIdSchema): - ''' - Base class for all models that have a created_at and updated_at fields. + """ + Base class for all models that have a created_at and updated_at fields. - Attributes: - created_at: datetime: Date and time the model was created. - updated_at: datetime: Date and time the model was last updated. - ''' - created_at: datetime = datetime.now() - updated_at: datetime = datetime.now() + Attributes: + created_at: datetime: Date and time the model was created. + updated_at: datetime: Date and time the model was last updated. + """ + + created_at: datetime = datetime.now() + updated_at: datetime = datetime.now() class GeoJSONSchema(TimeStampedModelSchema): - """ - Base class for all models that have a geometry field. - Specialized for GeoJSON geometries. + """ + Base class for all models that have a geometry field. + Specialized for GeoJSON geometries. + + Attributes: + geometry: dict: GeoJSON geometry object. + """ - Attributes: - geometry: dict: GeoJSON geometry object. - """ - geometry: dict = None + geometry: dict = None - def as_shape(self): - return shape(self.geometry) + def as_shape(self): + return shape(self.geometry) class DataframeSchema(TimeStampedModelSchema): - """ - Base class for all models that have a dataframe field. - Specialized for pandas dataframes. - - Attributes: - dataframe: dict: Pandas dataframe object. - """ - data: list = None - - def as_df(self) -> pd.DataFrame: - """ - Convert the data to a pandas dataframe. - """ - return pd.DataFrame(self.data) - - def to_csv(self, path: str, **kwargs,) -> str: - """ - Save the data to a CSV file. - - Args: - path (str): The path to save the file. - **kwargs: Additional keyword arguments to pass to pandas.to_csv. - - Returns: - str: the path to the file. - """ - self.as_df().to_csv(path, index=False, **kwargs) - return path - - def to_excel(self, path: str, **kwargs) -> str: - """ - Save the data to an Excel file. - - Args: - path (str): The path to save the file. - **kwargs: Additional keyword arguments to pass to pandas.to_excel. - - Returns: - str: the path to the file. - """ - self.as_df().to_excel(path, index=False, **kwargs) - return path + """ + Base class for all models that have a dataframe field. + Specialized for pandas dataframes. + + Attributes: + dataframe: dict: Pandas dataframe object. + """ + + data: list = None + + def as_df(self) -> pd.DataFrame: + """ + Convert the data to a pandas dataframe. + """ + return pd.DataFrame(self.data) + + def to_csv( + self, + path: str, + **kwargs, + ) -> str: + """ + Save the data to a CSV file. + + Args: + path (str): The path to save the file. + **kwargs: Additional keyword arguments to pass to pandas.to_csv. + + Returns: + str: the path to the file. + """ + self.as_df().to_csv(path, index=False, **kwargs) + return path + + def to_excel(self, path: str, **kwargs) -> str: + """ + Save the data to an Excel file. + + Args: + path (str): The path to save the file. + **kwargs: Additional keyword arguments to pass to pandas.to_excel. + + Returns: + str: the path to the file. + """ + self.as_df().to_excel(path, index=False, **kwargs) + return path diff --git a/wellbelog/schemas/dlis.py b/wellbelog/schemas/dlis.py index 564d1d4..b7bb153 100644 --- a/wellbelog/schemas/dlis.py +++ b/wellbelog/schemas/dlis.py @@ -9,17 +9,17 @@ class FrameChannel(TimeStampedModelSchema): - """ - Dataclass to acess channel data from Frames objects. - """ + """ + Dataclass to acess channel data from Frames objects. + """ - long_name: str = Field(None, description="The long name of the channel") - name: str = Field(None, description="The name of the channel") - units: str = Field(None, description="The units of the channel") - repr: str = Field(None, description="The representation of the channel") - properties: Union[str, Any] = Field(None, description="The properties of the channel") + long_name: str = Field(None, description='The long name of the channel') + name: str = Field(None, description='The name of the channel') + units: str = Field(None, description='The units of the channel') + repr: str = Field(None, description='The representation of the channel') + properties: Union[str, Any] = Field(None, description='The properties of the channel') - data: Any = Field(None, description="The data of the channel") + data: Any = Field(None, description='The data of the channel') ChannelsList = list[FrameChannel] @@ -27,169 +27,170 @@ class FrameChannel(TimeStampedModelSchema): class FrameDataframe(DataframeSchema): - """ - The Dlis frame data. + """ + The Dlis frame data. - Attributes: - file_name (str): The name of the file. - logical_file_id (str): The id of the logical file. - data (list[dict]): The dataframe of the file. - """ + Attributes: + file_name (str): The name of the file. + logical_file_id (str): The id of the logical file. + data (list[dict]): The dataframe of the file. + """ - file_name: str = Field(..., description="The name of the file.") - logical_file_id: str = Field(..., description="The id of the logical file.") + file_name: str = Field(..., description='The name of the file.') + logical_file_id: str = Field(..., description='The id of the logical file.') class FrameModel(TimeStampedModelSchema): - """ - A class used to represent a Dlis Frame. - - Attributes: - file_name (str): The name of the file. - logical_file_id (str): The id of the logical file. - description (Optional[str]): The description of the file. - channels (ChannelsList): The channels of the file. - error (bool): If the file has any error during opening. - error_message (Optional[str]): The error exception if any. - data (Optional[FrameDataframe]): The dataframe of the file. - """ - - file_name: str = Field(..., description="The name of the file.") - logical_file_id: str = Field(..., description="The id of the logical file.") - description: Optional[str] = Field(None, description="The description of the file.") - channels: ChannelsList = Field(None, description="The channels of the file.") - error: bool = Field(False, description="If the file has any error during opening.") - error_message: Optional[str] = Field(None, description="The error exception if any.") - data: Optional[FrameDataframe] = Field(None, description="The dataframe of the file.") + """ + A class used to represent a Dlis Frame. + + Attributes: + file_name (str): The name of the file. + logical_file_id (str): The id of the logical file. + description (Optional[str]): The description of the file. + channels (ChannelsList): The channels of the file. + error (bool): If the file has any error during opening. + error_message (Optional[str]): The error exception if any. + data (Optional[FrameDataframe]): The dataframe of the file. + """ + + file_name: str = Field(..., description='The name of the file.') + logical_file_id: str = Field(..., description='The id of the logical file.') + description: Optional[str] = Field(None, description='The description of the file.') + channels: ChannelsList = Field(None, description='The channels of the file.') + error: bool = Field(False, description='If the file has any error during opening.') + error_message: Optional[str] = Field(None, description='The error exception if any.') + data: Optional[FrameDataframe] = Field(None, description='The dataframe of the file.') class LogicalFileSummary(TimeStampedModelSchema): - """ - A class used to represent a a Logical File Summary. - - Attributes: - file_id (Union[str, int]): The file id of the file. - parameters (Optional[list]): List of dicts with the parameters of the file. - tools (Optional[list]): List of dicts with the tools of the file. - remarks (Optional[dict]): List of dicts with the remarks of the file. - comments (Optional[dict]): List of dicts with the comments of the file. - header (Optional[str]): String with the header of the file. - frames (Optional[list]): List of dicts with the frames of the file. - """ - tools: Optional[list] = Field(None, alias='tools') - remarks: Optional[dict] = Field(None, alias='remarks') - comments: Optional[dict] = Field(None, alias='comments') - header: Optional[str] = Field(None, alias='header') - - model_config = { - 'arbitrary_types_allowed': True, - } + """ + A class used to represent a a Logical File Summary. + + Attributes: + file_id (Union[str, int]): The file id of the file. + parameters (Optional[list]): List of dicts with the parameters of the file. + tools (Optional[list]): List of dicts with the tools of the file. + remarks (Optional[dict]): List of dicts with the remarks of the file. + comments (Optional[dict]): List of dicts with the comments of the file. + header (Optional[str]): String with the header of the file. + frames (Optional[list]): List of dicts with the frames of the file. + """ + + tools: Optional[list] = Field(None, alias='tools') + remarks: Optional[dict] = Field(None, alias='remarks') + comments: Optional[dict] = Field(None, alias='comments') + header: Optional[str] = Field(None, alias='header') + + model_config = { + 'arbitrary_types_allowed': True, + } class LogicalFileModel(TimeStampedModelSchema): - """ - A Logical File Model. - It contains the frames and the file name.Also, the summary of the file is included. - If any error occurs during the opening, the error flag is set to True. The error message is also set. - - Attributes: - file_name (str): The name of the file. - logical_id (Any): The id of the logical file. - summary (LogicalFileSummary): The summary of the file. - frames (list[FrameModel]): The frames of the file. - error (bool): If the file has any error during opening. - error_message (Optional[str]): The error exception if any. - """ - - file_name: str = Field(..., description="The name of the file.") - logical_id: Optional[Any] = Field(None, description="The id of the logical file.") - summary: Optional[LogicalFileSummary] = Field(None, description="The summary of the file.") - frames: list[FrameModel] = Field(None, description="The frames of the file.") - error: bool = Field(False, description="If the file has any error during opening.") - error_message: Optional[str] = Field(None, description="The error exception if any.") - - @property - def frames_count(self) -> int: - return len(self.frames) - - @cached_property - def curves_names(self) -> list[str]: - """" - Returns a set with the names of the curves in the file. - """ - if not self.frames or self.error: - return None - curves = [channel.name for frame in self.frames for channel in frame.channels] - return list(set(curves)) - - def get_frame(self, index: int = 0) -> FrameModel: - """ - Get the frame by index. - In the vast majority of the cases, the file has only one frame. - """ - assert not self.error, "The file has an error. Cannot get the frame." - if self.frames_count == 1: - return self.frames[0] - return self.frames[index] - - def table_view(self) -> Table: - """ - Get a table view of the file. - """ - table = Table(title=self.file_name) - table.add_column("Logical File ID", style="cyan") - table.add_column("File Name", style="magenta") - table.add_column("Frames", style="green") - table.add_column("Curves", style="cyan") - table.add_column("Error", style="red") - table.add_row(str(self.logical_id), self.file_name, str(self.frames_count), str(self.curves_names), str(self.error)) - console.print(table) - return table + """ + A Logical File Model. + It contains the frames and the file name.Also, the summary of the file is included. + If any error occurs during the opening, the error flag is set to True. The error message is also set. + + Attributes: + file_name (str): The name of the file. + logical_id (Any): The id of the logical file. + summary (LogicalFileSummary): The summary of the file. + frames (list[FrameModel]): The frames of the file. + error (bool): If the file has any error during opening. + error_message (Optional[str]): The error exception if any. + """ + + file_name: str = Field(..., description='The name of the file.') + logical_id: Optional[Any] = Field(None, description='The id of the logical file.') + summary: Optional[LogicalFileSummary] = Field(None, description='The summary of the file.') + frames: list[FrameModel] = Field(None, description='The frames of the file.') + error: bool = Field(False, description='If the file has any error during opening.') + error_message: Optional[str] = Field(None, description='The error exception if any.') + + @property + def frames_count(self) -> int: + return len(self.frames) + + @cached_property + def curves_names(self) -> list[str]: + """ " + Returns a set with the names of the curves in the file. + """ + if not self.frames or self.error: + return None + curves = [channel.name for frame in self.frames for channel in frame.channels] + return list(set(curves)) + + def get_frame(self, index: int = 0) -> FrameModel: + """ + Get the frame by index. + In the vast majority of the cases, the file has only one frame. + """ + assert not self.error, 'The file has an error. Cannot get the frame.' + if self.frames_count == 1: + return self.frames[0] + return self.frames[index] + + def table_view(self) -> Table: + """ + Get a table view of the file. + """ + table = Table(title=self.file_name) + table.add_column('Logical File ID', style='cyan') + table.add_column('File Name', style='magenta') + table.add_column('Frames', style='green') + table.add_column('Curves', style='cyan') + table.add_column('Error', style='red') + table.add_row(str(self.logical_id), self.file_name, str(self.frames_count), str(self.curves_names), str(self.error)) + console.print(table) + return table class PhysicalFileModel(TimeStampedModelSchema): - """ - A representation of a Physical File. - It contains the logical files and the file name. - If any error occurs during the opening, the error flag is set to True. The error message is also set. - - Attributes: - file_name (str): The name of the file. - logical_files (list[dlis.LogicalFile]): The logical files. - error (bool): If the file has any error during opening. - error_message (Optional[str]): The error exception if any. - """ - - file_name: str = Field(..., description="The name of the file.") - error: bool = Field(False, description="If the file has any error during opening.") - error_message: Optional[str] = Field(None, description="The error exception if any.") - logical_files: Optional[list[LogicalFileModel]] = Field(default_factory=list, description="The logical files.") - error_files: Optional[list[LogicalFileModel]] = Field(default_factory=list, description="The error files.") - mnemonics: Optional[list[str]] = Field(default_factory=list, description="The mnemonics of the file.") - - @property - def logical_files_count(self) -> int: - return len(self.logical_files) - - def logical_files_table(self) -> Table: - """ - Get a table view of the logical files. - """ - table = Table(title=self.file_name) - table.add_column("File Name", style="green") - table.add_column("Curves", style="cyan") - table.add_column("Error", style="red") - for file in self.logical_files: - table.add_row(file.file_name, str(file.curves_names), str(file.error)) - console.print(table) - return table - - @cached_property - def curves_names(self) -> list[str]: - """" - Returns a set with the names of the curves in the file. - """ - if not self.logical_files or self.error: - return None - curves = [channel.name for file in self.logical_files for frame in file.frames for channel in frame.channels] - return list(set(curves)) + """ + A representation of a Physical File. + It contains the logical files and the file name. + If any error occurs during the opening, the error flag is set to True. The error message is also set. + + Attributes: + file_name (str): The name of the file. + logical_files (list[dlis.LogicalFile]): The logical files. + error (bool): If the file has any error during opening. + error_message (Optional[str]): The error exception if any. + """ + + file_name: str = Field(..., description='The name of the file.') + error: bool = Field(False, description='If the file has any error during opening.') + error_message: Optional[str] = Field(None, description='The error exception if any.') + logical_files: Optional[list[LogicalFileModel]] = Field(default_factory=list, description='The logical files.') + error_files: Optional[list[LogicalFileModel]] = Field(default_factory=list, description='The error files.') + mnemonics: Optional[list[str]] = Field(default_factory=list, description='The mnemonics of the file.') + + @property + def logical_files_count(self) -> int: + return len(self.logical_files) + + def logical_files_table(self) -> Table: + """ + Get a table view of the logical files. + """ + table = Table(title=self.file_name) + table.add_column('File Name', style='green') + table.add_column('Curves', style='cyan') + table.add_column('Error', style='red') + for file in self.logical_files: + table.add_row(file.file_name, str(file.curves_names), str(file.error)) + console.print(table) + return table + + @cached_property + def curves_names(self) -> list[str]: + """ " + Returns a set with the names of the curves in the file. + """ + if not self.logical_files or self.error: + return None + curves = [channel.name for file in self.logical_files for frame in file.frames for channel in frame.channels] + return list(set(curves)) diff --git a/wellbelog/schemas/las.py b/wellbelog/schemas/las.py index 63e38d7..39d8097 100644 --- a/wellbelog/schemas/las.py +++ b/wellbelog/schemas/las.py @@ -9,85 +9,90 @@ class LasDataframe(DataframeSchema): - """ - A class that represents the data of a LAS file. + """ + A class that represents the data of a LAS file. - Attributes: - file_name (str): The name of the file. - data (str): The data of the curve. - columns (list[str]): The columns of the curve. - shape (tuple): The shape of the curve. - """ - file_name: str = Field(..., description="The name of the file.") - columns: list[str] = Field(..., description="The columns of the curve.") - shape: tuple = Field(..., description="The shape of the curve.") + Attributes: + file_name (str): The name of the file. + data (str): The data of the curve. + columns (list[str]): The columns of the curve. + shape (tuple): The shape of the curve. + """ + + file_name: str = Field(..., description='The name of the file.') + columns: list[str] = Field(..., description='The columns of the curve.') + shape: tuple = Field(..., description='The shape of the curve.') class LasCurvesSpecs(BaseModel): - """ - A class used to represent a BeloDlis object. + """ + A class used to represent a BeloDlis object. - Attributes: - mnemonic (str): The mnemonic of the curve. - unit (str): The unit of the curve. - description (str): The description of the curve. - """ + Attributes: + mnemonic (str): The mnemonic of the curve. + unit (str): The unit of the curve. + description (str): The description of the curve. + """ - mnemonic: str = Field(..., description="The mnemonic of the curve.") - unit: str = Field(..., description="The unit of the curve.") - descr: str = Field(..., description="The description of the curve.") - value: Optional[Any] = Field(None, description="The value of the curve.") - original_mnemonic: Optional[str] = Field(None, description="The original mnemonic of the curve.") - shape: Optional[tuple] = Field(None, description="The shape of the curve.") + mnemonic: str = Field(..., description='The mnemonic of the curve.') + unit: str = Field(..., description='The unit of the curve.') + descr: str = Field(..., description='The description of the curve.') + value: Optional[Any] = Field(None, description='The value of the curve.') + original_mnemonic: Optional[str] = Field(None, description='The original mnemonic of the curve.') + shape: Optional[tuple] = Field(None, description='The shape of the curve.') class LasFileModel(TimeStampedModelSchema): - """ - A class used to represent a BeloDlis object. - - Attributes: - file_name (str): The name of the file. - logical_files (list[dlis.LogicalFile]): The logical files. - """ - - file_name: str = Field(..., description="The name of the file.") - folder_name: Optional[str] = Field(None, description="The name of the folder.") - specs: list[LasCurvesSpecs] = Field([], description="The curves specs.") - error: bool = Field(False, description="If the file has any error during opening.") - error_message: Optional[str] = Field(None, description="The error exception if any.") - - data: Optional[LasDataframe] = Field(None, description="The data of the file.") - - def __str__(self) -> str: - return f"LasFileModel: {self.file_name}" - - def get_curve(self, column: str) -> Optional[LasCurvesSpecs]: - """ - Get the curve by the column name. - If the column is not found, return None. - Args: - column (str): The column name. - - Returns: - Optional[LasCurvesSpecs]: The curve. - """ - for spec in self.specs: - if spec.mnemonic == column: - return spec - return None - - @cached_property - def curves_names(self) -> list[str]: - return [spec.mnemonic for spec in self.specs] - - def table_view(self) -> Table: - """ - Create a table view of the file. - """ - table = Table(title=self.file_name) - table.add_column("File Name", style="green") - table.add_column("Curves", style="cyan") - table.add_column("Error", style="red") - table.add_row(self.file_name, str(self.curves_names), str(self.error),) - console.print(table) - return table + """ + A class used to represent a BeloDlis object. + + Attributes: + file_name (str): The name of the file. + logical_files (list[dlis.LogicalFile]): The logical files. + """ + + file_name: str = Field(..., description='The name of the file.') + folder_name: Optional[str] = Field(None, description='The name of the folder.') + specs: list[LasCurvesSpecs] = Field([], description='The curves specs.') + error: bool = Field(False, description='If the file has any error during opening.') + error_message: Optional[str] = Field(None, description='The error exception if any.') + + data: Optional[LasDataframe] = Field(None, description='The data of the file.') + + def __str__(self) -> str: + return f'LasFileModel: {self.file_name}' + + def get_curve(self, column: str) -> Optional[LasCurvesSpecs]: + """ + Get the curve by the column name. + If the column is not found, return None. + Args: + column (str): The column name. + + Returns: + Optional[LasCurvesSpecs]: The curve. + """ + for spec in self.specs: + if spec.mnemonic == column: + return spec + return None + + @cached_property + def curves_names(self) -> list[str]: + return [spec.mnemonic for spec in self.specs] + + def table_view(self) -> Table: + """ + Create a table view of the file. + """ + table = Table(title=self.file_name) + table.add_column('File Name', style='green') + table.add_column('Curves', style='cyan') + table.add_column('Error', style='red') + table.add_row( + self.file_name, + str(self.curves_names), + str(self.error), + ) + console.print(table) + return table diff --git a/wellbelog/schemas/lis.py b/wellbelog/schemas/lis.py index cc919d5..f7dc63d 100644 --- a/wellbelog/schemas/lis.py +++ b/wellbelog/schemas/lis.py @@ -8,180 +8,189 @@ from wellbelog.schemas.base_schema import TimeStampedModelSchema, DataframeSchema LOGICAL_FILE_ATTR = [ - 'api_curve_class', 'api_curve_type', 'api_log_type', - 'api_modifier', 'filenr', 'mnemonic', 'process_level', - 'reprc', 'reserved_size', 'samples', 'service_id', 'service_order_nr', 'units' + 'api_curve_class', + 'api_curve_type', + 'api_log_type', + 'api_modifier', + 'filenr', + 'mnemonic', + 'process_level', + 'reprc', + 'reserved_size', + 'samples', + 'service_id', + 'service_order_nr', + 'units', ] class FrameLisCurves(DataframeSchema): - """ - A class used to represent a BeloDlis object. + """ + A class used to represent a BeloDlis object. - Attributes: - physical (dlis.PhysicalFile): The physical file. - logical_files (list[dlis.LogicalFile]): The logical files. - file_name (str): The name of the file. - path_reference (str): The reference path. - """ + Attributes: + physical (dlis.PhysicalFile): The physical file. + logical_files (list[dlis.LogicalFile]): The logical files. + file_name (str): The name of the file. + path_reference (str): The reference path. + """ - file_name: str = Field(..., description="The name of the file.") - logical_file_id: Union[str, int] = Field(..., description="The id of the logical file.") + file_name: str = Field(..., description='The name of the file.') + logical_file_id: Union[str, int] = Field(..., description='The id of the logical file.') class LisLogicalFileWellSiteSpecDict(BaseModel): - """ - Represents the specification for a LIS logical file. - Like the well name, wellbore name and wellbore number. - """ - mnemonic: Optional[str] = Field(None, description="mnemonic") - units: Optional[str] = Field(None, description="units") - component: Optional[str] = Field(None, description="component information") + """ + Represents the specification for a LIS logical file. + Like the well name, wellbore name and wellbore number. + """ + + mnemonic: Optional[str] = Field(None, description='mnemonic') + units: Optional[str] = Field(None, description='units') + component: Optional[str] = Field(None, description='component information') class LisLogicalWellSiteSpec(BaseModel): - file_name: str = Field(..., description="The name of the file.") - logical_id: Union[str, int] = Field(..., description="The id of the logical file.") - specs_dicts: list[LisLogicalFileWellSiteSpecDict] = Field(default_factory=list, description="The specification of the file.") # noqa + file_name: str = Field(..., description='The name of the file.') + logical_id: Union[str, int] = Field(..., description='The id of the logical file.') + specs_dicts: list[LisLogicalFileWellSiteSpecDict] = Field(default_factory=list, description='The specification of the file.') # noqa class SimpleLisLogicalFileSpec(BaseModel): - """ - Represents the specification for a LIS logical file. - Like the curve mnemonic, units and number of samples. - """ - mnemonic: Optional[str] = Field(None, description="Mnemonic") - units: Optional[str] = Field(None, description="Units") - samples: Optional[int] = Field(None, description="Number of Samples") + """ + Represents the specification for a LIS logical file. + Like the curve mnemonic, units and number of samples. + """ + + mnemonic: Optional[str] = Field(None, description='Mnemonic') + units: Optional[str] = Field(None, description='Units') + samples: Optional[int] = Field(None, description='Number of Samples') class LisLogicalFileSpecsDict(BaseModel): - """ - Represents the specification for a LIS logical file. - But with more details. - """ - api_curve_class: Optional[Union[str, int]] = Field(None, description="API Curve Class") - api_curve_type: Optional[Union[str, int]] = Field(None, description="API Curve Type") - api_log_type: Optional[Union[str, int]] = Field(None, description="API Log Type") - api_modifier: Optional[Union[str, int]] = Field(None, description="API Modifier") - filenr: Optional[int] = Field(None, description="File Number") - mnemonic: Optional[str] = Field(None, description="Mnemonic") - process_level: Optional[int] = Field(None, description="Process Level") - reprc: Optional[Union[str, int]] = Field(None, description="Reprocessing") - reserved_size: Optional[int] = Field(None, description="Reserved Size") - samples: Optional[int] = Field(None, description="Number of Samples") - service_id: Optional[Union[str, int]] = Field(None, description="Service ID") - service_order_nr: Optional[Union[str, int]] = Field(None, description="Service Order Number") - units: Optional[Union[str, int]] = Field(None, description="Units") - - def simple(self) -> SimpleLisLogicalFileSpec: - return SimpleLisLogicalFileSpec( - mnemonic=self.mnemonic, - units=self.units, - samples=self.samples - ) + """ + Represents the specification for a LIS logical file. + But with more details. + """ + + api_curve_class: Optional[Union[str, int]] = Field(None, description='API Curve Class') + api_curve_type: Optional[Union[str, int]] = Field(None, description='API Curve Type') + api_log_type: Optional[Union[str, int]] = Field(None, description='API Log Type') + api_modifier: Optional[Union[str, int]] = Field(None, description='API Modifier') + filenr: Optional[int] = Field(None, description='File Number') + mnemonic: Optional[str] = Field(None, description='Mnemonic') + process_level: Optional[int] = Field(None, description='Process Level') + reprc: Optional[Union[str, int]] = Field(None, description='Reprocessing') + reserved_size: Optional[int] = Field(None, description='Reserved Size') + samples: Optional[int] = Field(None, description='Number of Samples') + service_id: Optional[Union[str, int]] = Field(None, description='Service ID') + service_order_nr: Optional[Union[str, int]] = Field(None, description='Service Order Number') + units: Optional[Union[str, int]] = Field(None, description='Units') + + def simple(self) -> SimpleLisLogicalFileSpec: + return SimpleLisLogicalFileSpec(mnemonic=self.mnemonic, units=self.units, samples=self.samples) class LisLogicalSpecs(BaseModel): - file_name: str = Field(..., description="The name of the file.") - logical_id: Union[str, int] = Field(..., description="The id of the logical file.") - specs_dicts: list[LisLogicalFileSpecsDict] = Field(default_factory=list, description="The specification of the file.") + file_name: str = Field(..., description='The name of the file.') + logical_id: Union[str, int] = Field(..., description='The id of the logical file.') + specs_dicts: list[LisLogicalFileSpecsDict] = Field(default_factory=list, description='The specification of the file.') class LogicalLisFileModel(TimeStampedModelSchema): - """ - Represents a logical LIS file. - It can have multiple frames. But generally only one. - - Attributes: - file_name (str): The name of the file. - logical_id (str): The id of the logical file. - frames (list[FrameLisCurves]): The frames of the file. - error (bool): If the file has any error during opening. - error_message (str): The error exception if any. - well_site_specs (LisLogicalWellSiteSpec): The well site specifications. - specs (list[Any]): The specification of the file. - header (str): The header of the file. - """ - - file_name: str = Field(..., description="The name of the file.") - logical_id: Optional[Any] = Field(..., description="The id of the logical file.") - frames: list[FrameLisCurves] = Field(default_factory=list, description="The frames of the file.") - curves_names: list[str] = Field(default_factory=list, description="The names of the curves.") - error: bool = Field(False, description="If the file has any error during opening.") - error_message: Optional[str] = Field(None, description="The error exception if any.") - well_site_specs: Optional[LisLogicalWellSiteSpec] = Field(None, description="The well site specifications.") - specs: list[Any] = Field(None, description="The specification of the file.") - header: Optional[str] = Field(None, description="The header of the file.") - - @property - def frames_count(self) -> int: - return len(self.frames) - - def get_frame(self, index=0) -> FrameLisCurves: - assert index < self.frames_count, f"Index {index} is out of range. The file has {self.frames_count} frames." - if self.frames_count == 1: - return self.frames[0] - return self.frames[index] - - def table_view(self) -> Table: - """ - Create a table view of the file. - """ - table = Table(title=self.file_name) - table.add_column("Logical File ID", style="cyan") - table.add_column("Frames", style="magenta") - table.add_column("Curves", style="green") - table.add_column("Error", style="red") - table.add_row(str(self.logical_id), str(self.frames_count), str(self.curves_names), str(self.error)) - console.print(table) - return table + """ + Represents a logical LIS file. + It can have multiple frames. But generally only one. + + Attributes: + file_name (str): The name of the file. + logical_id (str): The id of the logical file. + frames (list[FrameLisCurves]): The frames of the file. + error (bool): If the file has any error during opening. + error_message (str): The error exception if any. + well_site_specs (LisLogicalWellSiteSpec): The well site specifications. + specs (list[Any]): The specification of the file. + header (str): The header of the file. + """ + + file_name: str = Field(..., description='The name of the file.') + logical_id: Optional[Any] = Field(..., description='The id of the logical file.') + frames: list[FrameLisCurves] = Field(default_factory=list, description='The frames of the file.') + curves_names: list[str] = Field(default_factory=list, description='The names of the curves.') + error: bool = Field(False, description='If the file has any error during opening.') + error_message: Optional[str] = Field(None, description='The error exception if any.') + well_site_specs: Optional[LisLogicalWellSiteSpec] = Field(None, description='The well site specifications.') + specs: list[Any] = Field(None, description='The specification of the file.') + header: Optional[str] = Field(None, description='The header of the file.') + + @property + def frames_count(self) -> int: + return len(self.frames) + + def get_frame(self, index=0) -> FrameLisCurves: + assert index < self.frames_count, f'Index {index} is out of range. The file has {self.frames_count} frames.' + if self.frames_count == 1: + return self.frames[0] + return self.frames[index] + + def table_view(self) -> Table: + """ + Create a table view of the file. + """ + table = Table(title=self.file_name) + table.add_column('Logical File ID', style='cyan') + table.add_column('Frames', style='magenta') + table.add_column('Curves', style='green') + table.add_column('Error', style='red') + table.add_row(str(self.logical_id), str(self.frames_count), str(self.curves_names), str(self.error)) + console.print(table) + return table class PhysicalLisFileModel(TimeStampedModelSchema): - """ - Represents a physical LIS file. - It can have multiple logical files. And each logical file can have multiple frames, but generally only one. - - Attributes: - file_name (str): The name of the file. - folder_name (str): The name of the folder. - logical_files (list[LogicalLisFileModel]): The logical files. - error_files (list[LogicalLisFileModel]): The error files. - error (bool): If the file has any error during opening. - error_message (str): The error exception if any. - """ - - file_name: str = Field(..., description="The name of the file.") - folder_name: Optional[str] = Field(None, description="The name of the folder.") - logical_files: Optional[list[LogicalLisFileModel]] = Field(default_factory=list, description="The logical files.") - error_files: Optional[list[LogicalLisFileModel]] = Field(default_factory=list, description="The error files.") - error: bool = Field(False, description="If the file has any error during opening.") - error_message: Optional[str] = Field(None, description="The error exception if any.") - - @property - def logical_files_count(self) -> int: - return len(self.logical_files) - - @cached_property - def curves_names(self) -> list[str]: - """ - Get the names of the curves in the file. - - Returns: - list[str]: The names of the curves. - """ - return [curve for file in self.logical_files for curve in file.curves_names] - - def logical_files_table(self) -> Table: - """ - Get a table view of the logical files. - """ - table = Table(title=self.file_name) - table.add_column("File Name", style="green") - table.add_column("Curves", style="cyan") - table.add_column("Error", style="red") - for file in self.logical_files: - table.add_row(file.file_name, str(file.curves_names), str(file.error)) - console.print(table) - return table + """ + Represents a physical LIS file. + It can have multiple logical files. And each logical file can have multiple frames, but generally only one. + + Attributes: + file_name (str): The name of the file. + folder_name (str): The name of the folder. + logical_files (list[LogicalLisFileModel]): The logical files. + error_files (list[LogicalLisFileModel]): The error files. + error (bool): If the file has any error during opening. + error_message (str): The error exception if any. + """ + + file_name: str = Field(..., description='The name of the file.') + folder_name: Optional[str] = Field(None, description='The name of the folder.') + logical_files: Optional[list[LogicalLisFileModel]] = Field(default_factory=list, description='The logical files.') + error_files: Optional[list[LogicalLisFileModel]] = Field(default_factory=list, description='The error files.') + error: bool = Field(False, description='If the file has any error during opening.') + error_message: Optional[str] = Field(None, description='The error exception if any.') + + @property + def logical_files_count(self) -> int: + return len(self.logical_files) + + @cached_property + def curves_names(self) -> list[str]: + """ + Get the names of the curves in the file. + + Returns: + list[str]: The names of the curves. + """ + return [curve for file in self.logical_files for curve in file.curves_names] + + def logical_files_table(self) -> Table: + """ + Get a table view of the logical files. + """ + table = Table(title=self.file_name) + table.add_column('File Name', style='green') + table.add_column('Curves', style='cyan') + table.add_column('Error', style='red') + for file in self.logical_files: + table.add_row(file.file_name, str(file.curves_names), str(file.error)) + console.print(table) + return table diff --git a/wellbelog/utils/json_io.py b/wellbelog/utils/json_io.py index f7c1d41..71d1341 100644 --- a/wellbelog/utils/json_io.py +++ b/wellbelog/utils/json_io.py @@ -3,54 +3,57 @@ def read_json(path_) -> list[dict]: - ''' - Return a dict or a list of dicts from a given JSON. - ''' - with open(path_, encoding="utf8") as j: - return json.load(j) - - -def write_json(filename: str, source: list[dict],): - ''' - Writes a list of dicts to a JSON file. - Filename = example.json - Source = list of dictionaries. - ''' - with open(filename, 'w', encoding='utf-8') as f: - json.dump(source, f, ensure_ascii=False, indent=4) + """ + Return a dict or a list of dicts from a given JSON. + """ + with open(path_, encoding='utf8') as j: + return json.load(j) + + +def write_json( + filename: str, + source: list[dict], +): + """ + Writes a list of dicts to a JSON file. + Filename = example.json + Source = list of dictionaries. + """ + with open(filename, 'w', encoding='utf-8') as f: + json.dump(source, f, ensure_ascii=False, indent=4) def is_jsonable(x: Any) -> bool: - """ - Checks if avariable is json serializable. - Returns a boolean value + """ + Checks if avariable is json serializable. + Returns a boolean value - Args: - x (Any): Any variable + Args: + x (Any): Any variable - Returns: - bool: A boolean value if the variable is JSON serializable - """ - try: - json.dumps(x) - return True - except (TypeError, OverflowError): - return False + Returns: + bool: A boolean value if the variable is JSON serializable + """ + try: + json.dumps(x) + return True + except (TypeError, OverflowError): + return False def check_dict_jsonable(dict_: dict) -> dict: - """ - Check if a dict is JSON serializable. - Loops over the keys and values, and if the values - are not json serializable it converts to string. - - Args: - dict_ (dict): A dict to be checked - - Returns: - dict: A converted dict - """ - for k, v in dict_.items(): - if not is_jsonable(v): - dict_[k] = str(v) - return dict_ + """ + Check if a dict is JSON serializable. + Loops over the keys and values, and if the values + are not json serializable it converts to string. + + Args: + dict_ (dict): A dict to be checked + + Returns: + dict: A converted dict + """ + for k, v in dict_.items(): + if not is_jsonable(v): + dict_[k] = str(v) + return dict_ diff --git a/wellbelog/utils/logging.py b/wellbelog/utils/logging.py index 5cfa3af..106e0c6 100644 --- a/wellbelog/utils/logging.py +++ b/wellbelog/utils/logging.py @@ -2,42 +2,43 @@ class ColorfulFormatter(logging.Formatter): - """ - ColorfulFormatter class for logging - """ - COLORS = { - 'DEBUG': '\033[94m', # Blue - 'INFO': '\033[92m', # Green - 'WARNING': '\033[93m', # Yellow - 'ERROR': '\033[91m', # Red - 'CRITICAL': '\033[91m', # Red - } + """ + ColorfulFormatter class for logging + """ - RESET = '\033[0m' + COLORS = { + 'DEBUG': '\033[94m', # Blue + 'INFO': '\033[92m', # Green + 'WARNING': '\033[93m', # Yellow + 'ERROR': '\033[91m', # Red + 'CRITICAL': '\033[91m', # Red + } - def __init__(self, app_name: str): - super().__init__() - self.app_name = app_name + RESET = '\033[0m' - def format(self, record): - color = self.COLORS.get(record.levelname, self.RESET) - message = f"{color}[{self.app_name}] {record.levelname}{self.RESET}: {record.msg}" # noqa - return f'{message}' + def __init__(self, app_name: str): + super().__init__() + self.app_name = app_name + + def format(self, record): + color = self.COLORS.get(record.levelname, self.RESET) + message = f'{color}[{self.app_name}] {record.levelname}{self.RESET}: {record.msg}' # noqa + return f'{message}' def setup_logger(app_name: str, debug: bool = True): - """ - This function sets up the logger for the application - """ - logger = logging.getLogger(app_name) - logger.setLevel(logging.DEBUG) + """ + This function sets up the logger for the application + """ + logger = logging.getLogger(app_name) + logger.setLevel(logging.DEBUG) - handler = logging.StreamHandler() - formatter = ColorfulFormatter(app_name) - handler.setFormatter(formatter) - logger.addHandler(handler) + handler = logging.StreamHandler() + formatter = ColorfulFormatter(app_name) + handler.setFormatter(formatter) + logger.addHandler(handler) - if debug is False: - handler.setLevel(logging.ERROR) + if debug is False: + handler.setLevel(logging.ERROR) - return logger + return logger diff --git a/wellbelog/utils/mnemonicfix.py b/wellbelog/utils/mnemonicfix.py index ca14d2d..0ffa41a 100644 --- a/wellbelog/utils/mnemonicfix.py +++ b/wellbelog/utils/mnemonicfix.py @@ -2,96 +2,97 @@ class MnemonicFix: - """ - Main class for fixing mnemonics. - Searches for column values and replaces them. - """ - @staticmethod - def replace_columns_values(df: DataFrame, characters: set, default: str) -> DataFrame: - """ - Replaces columns values from a given dataframe, \n - if it contains a charcter - froma given set, and replaces it for a default value. - """ - - for original_chr in characters: - if original_chr in df.columns.values: - df.rename(columns={original_chr: default}, inplace=True) - return df - - @staticmethod - def replace_index(string: str) -> str: - """ - Replaces index values from a given string. - """ - if 'INDEX' in string: - return 'DEPT' - return string - - @staticmethod - def depth_rename(df: DataFrame) -> DataFrame: - """ - Depth rename utility. - Searches for columns values with whitespaces and renames them. - """ - if 'DEPT(0)' in df.columns.values: - df.rename(columns={'DEPT(0)': 'DEPT'}, inplace=True) - - if 'DEPT ' in df.columns.values: - df.rename(columns={'DEPT ': 'DEPT'}, inplace=True) - - return df - - @staticmethod - def gamma_rename(df: DataFrame) -> DataFrame: - """ - GR rename utility. - Searches for columns values with whitespaces and renames them. - """ - - if 'GR ' in df.columns.values: - df.rename(columns={'GR ': 'GR'}, inplace=True) - - if 'GR ' in df.columns.values: - df.rename(columns={'GR ': 'GR'}, inplace=True) - - if ' GR ' in df.columns.values: - df.rename(columns={' GR ': 'GR'}, inplace=True) - - if 'RGR ' in df.columns.values: - df.rename(columns={'RGR ': 'RGR'}, inplace=True) - - if 'RGR ' in df.columns.values: - df.rename(columns={'RGR ': 'RGR'}, inplace=True) - - if ' RGR ' in df.columns.values: - df.rename(columns={' RGR ': 'RGR'}, inplace=True) - - return df - - @staticmethod - def index_to_depth(df: DataFrame) -> DataFrame: - """ - Index to depth utility. - This function is specially usefull for Dlis files - that have index in place of depth. - """ - for col in df.columns.values: - if 'INDEX' in col: - target = col - - df.rename(columns={target: 'DEPT'}, inplace=True) - - if 'INDEX ' in col: - target = col - df.rename(columns={target: 'DEPT'}, inplace=True) - - return df - - @staticmethod - def strip_column_names(df: DataFrame) -> DataFrame: - """Rename cols of dataframe by cleaning whitespace""" - for col in df.columns.values: - col: str - df.rename(columns={col: col.lstrip().rstrip()}, inplace=True) - return df + """ + Main class for fixing mnemonics. + Searches for column values and replaces them. + """ + + @staticmethod + def replace_columns_values(df: DataFrame, characters: set, default: str) -> DataFrame: + """ + Replaces columns values from a given dataframe, \n + if it contains a charcter + froma given set, and replaces it for a default value. + """ + + for original_chr in characters: + if original_chr in df.columns.values: + df.rename(columns={original_chr: default}, inplace=True) + return df + + @staticmethod + def replace_index(string: str) -> str: + """ + Replaces index values from a given string. + """ + if 'INDEX' in string: + return 'DEPT' + return string + + @staticmethod + def depth_rename(df: DataFrame) -> DataFrame: + """ + Depth rename utility. + Searches for columns values with whitespaces and renames them. + """ + if 'DEPT(0)' in df.columns.values: + df.rename(columns={'DEPT(0)': 'DEPT'}, inplace=True) + + if 'DEPT ' in df.columns.values: + df.rename(columns={'DEPT ': 'DEPT'}, inplace=True) + + return df + + @staticmethod + def gamma_rename(df: DataFrame) -> DataFrame: + """ + GR rename utility. + Searches for columns values with whitespaces and renames them. + """ + + if 'GR ' in df.columns.values: + df.rename(columns={'GR ': 'GR'}, inplace=True) + + if 'GR ' in df.columns.values: + df.rename(columns={'GR ': 'GR'}, inplace=True) + + if ' GR ' in df.columns.values: + df.rename(columns={' GR ': 'GR'}, inplace=True) + + if 'RGR ' in df.columns.values: + df.rename(columns={'RGR ': 'RGR'}, inplace=True) + + if 'RGR ' in df.columns.values: + df.rename(columns={'RGR ': 'RGR'}, inplace=True) + + if ' RGR ' in df.columns.values: + df.rename(columns={' RGR ': 'RGR'}, inplace=True) + + return df + + @staticmethod + def index_to_depth(df: DataFrame) -> DataFrame: + """ + Index to depth utility. + This function is specially usefull for Dlis files + that have index in place of depth. + """ + for col in df.columns.values: + if 'INDEX' in col: + target = col + + df.rename(columns={target: 'DEPT'}, inplace=True) + + if 'INDEX ' in col: + target = col + df.rename(columns={target: 'DEPT'}, inplace=True) + + return df + + @staticmethod + def strip_column_names(df: DataFrame) -> DataFrame: + """Rename cols of dataframe by cleaning whitespace""" + for col in df.columns.values: + col: str + df.rename(columns={col: col.lstrip().rstrip()}, inplace=True) + return df diff --git a/wellbelog/utils/mnemonicset.py b/wellbelog/utils/mnemonicset.py index c794927..dcc68c5 100644 --- a/wellbelog/utils/mnemonicset.py +++ b/wellbelog/utils/mnemonicset.py @@ -1,17 +1,11 @@ class MnemonicSet: + GR_DT = {'GR', 'DT'} - GR_DT = {'GR', 'DT'} + GR_NPHI_RHOB = {'GR', 'NPHI', 'RHOB'} - GR_NPHI_RHOB = {'GR', 'NPHI', 'RHOB'} + GR_SP_CALI = {'GR', 'SP', 'CALI'} - GR_SP_CALI = {'GR', 'SP', 'CALI'} + RHOB_NPHI = {'NPHI', 'RHOB'} - RHOB_NPHI = {'NPHI', 'RHOB'} - -MNEMONIC_LIST = [ - MnemonicSet.GR_DT, - MnemonicSet.GR_NPHI_RHOB, - MnemonicSet.GR_SP_CALI, - MnemonicSet.RHOB_NPHI -] +MNEMONIC_LIST = [MnemonicSet.GR_DT, MnemonicSet.GR_NPHI_RHOB, MnemonicSet.GR_SP_CALI, MnemonicSet.RHOB_NPHI] diff --git a/wellbelog/utils/nullvalues.py b/wellbelog/utils/nullvalues.py index d3e3705..10f3b7c 100644 --- a/wellbelog/utils/nullvalues.py +++ b/wellbelog/utils/nullvalues.py @@ -1,19 +1,20 @@ """ Export set of null values, for checkink in the dataframes. """ + NULL_VALUES = { - -999.25, - -999.250, - -999.2500, - -999.25000, - -999.250000, - -999.000, - -999.0000, - -999.0000000, - -999.00000000, - -999.000000000, - -9999.999, - -9999.250000, - -999.00, - -999.9999999999, + -999.25, + -999.250, + -999.2500, + -999.25000, + -999.250000, + -999.000, + -999.0000, + -999.0000000, + -999.00000000, + -999.000000000, + -9999.999, + -9999.250000, + -999.00, + -999.9999999999, } diff --git a/wellbelog/utils/units.py b/wellbelog/utils/units.py index 7f2c4c8..5f4982d 100644 --- a/wellbelog/utils/units.py +++ b/wellbelog/utils/units.py @@ -5,8 +5,8 @@ def feet_to_meter(feet, decimals=4) -> float: - return round(float(feet) * 0.3048, decimals) + return round(float(feet) * 0.3048, decimals) def meter_to_feet(meter, decimals=4) -> float: - return round(float(meter) / 0.3048, decimals) + return round(float(meter) / 0.3048, decimals)