diff --git a/forgebox/__init__.py b/forgebox/__init__.py index ac15521..2dd5536 100644 --- a/forgebox/__init__.py +++ b/forgebox/__init__.py @@ -1 +1 @@ -__version__ = "0.4.17" +__version__ = "0.4.18" diff --git a/forgebox/_nbdev.py b/forgebox/_nbdev.py index 7eb9377..adf7356 100644 --- a/forgebox/_nbdev.py +++ b/forgebox/_nbdev.py @@ -40,6 +40,7 @@ "compare_sentences": "06_spacy.ipynb", "highlight": "06_spacy.ipynb", "FreeMap": "07_freemap.ipynb", + "SingleFileLiner": "09_multiprocess.ipynb", "Stuff": "10_loop.ipynb", "method4all": "10_loop.ipynb", "StorageCore": "10_loop.ipynb", @@ -159,6 +160,7 @@ "flatten.py", "spacy.py", "freemap.py", + "multiproc.py", "loop.py", "etl.py", "static_file.py", diff --git a/forgebox/multiproc.py b/forgebox/multiproc.py new file mode 100644 index 0000000..15fff1a --- /dev/null +++ b/forgebox/multiproc.py @@ -0,0 +1,105 @@ +# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/09_multiprocess.ipynb (unless otherwise specified). + +__all__ = ['SingleFileLiner'] + +# Cell +from pathlib import Path +import logging +from typing import List, Dict, Callable, Any, Tuple +import random + +# Cell +class SingleFileLiner: + """ + Text data reading line by line for multiprocessing + """ + + def __init__(self, file_path: Path, total: int = None): + """ + filepath: Path, + path to a textual file + total: int + if you set total to an integer, we'll have a fixed length interation object + if you set total to None, we'll read the file till it's over + """ + self.file_path = Path(file_path) + global SingleFileLiner_fp + SingleFileLiner_fp = open(file_path, "r") + global text_line_num + text_line_num = 0 + self.total = total + + def __repr__(self) -> str: + return f"SingleFileLiner:\t{self.file_path}" + + def __next__(self) -> str: + global SingleFileLiner_fp + line = SingleFileLiner_fp.readline() + global text_line_num + text_line_num += 1 + if line == "": + if self.total is None: + raise StopIteration(f"SingleFileLiner file read finish") + logging.warning(f"looping on {self.file_path}") + + self.restart() + return line.strip() + + def __len__(self): + if self.total is not None: + return self.total + else: + raise RuntimeError(f"You have to set total for len(self)") + + def __iter__(self) -> str: + if self.total is None: + while True: + try: + yield next(self) + except StopIteration: + # end the file reading + break + else: + for i in range(self.total): + yield next(self) + + def restart(self) -> None: + """resetart file reader""" + global SingleFileLiner_fp + SingleFileLiner_fp.close() + SingleFileLiner_fp = open(self.file_path, "r") + + def split_train_test( + self, + val_ratio: float = 0.1, + remove_original: bool = False, + logging_interval: int = 1000000) -> Tuple[Path]: + """ + Split the text file into train and test + :param val_ratio: + :return: + """ + suffix = self.file_path.suffix + + train_file_path = self.file_path.parent / \ + f"{self.file_path.stem}_train{suffix}" + valid_file_path = self.file_path.parent / \ + f"{self.file_path.stem}_valid{suffix}" + + # delete the file if exists + if remove_original: + train_file_path.unlink() + valid_file_path.unlink() + + # read and write by line + with open(self.file_path, "r") as f, open( + train_file_path, "w") as f_train, open( + valid_file_path, "w") as f_valid: + for i, line in enumerate(f): + if i % 1000000 == 0: + logging.info(f"{i}\tlines processed") + if random.random() < val_ratio: + f_valid.write(line) + else: + f_train.write(line) + return train_file_path, valid_file_path \ No newline at end of file diff --git a/nbs/09_multiprocess.ipynb b/nbs/09_multiprocess.ipynb new file mode 100644 index 0000000..6e42775 --- /dev/null +++ b/nbs/09_multiprocess.ipynb @@ -0,0 +1,209 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Multiprocessing Helper\n", + "> Things to smooth up the multiprocessing" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# default_exp multiproc" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# export\n", + "from pathlib import Path\n", + "import logging\n", + "from typing import List, Dict, Callable, Any, Tuple\n", + "import random" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Read single file line by line" + ] + }, + { + "cell_type": "code", + "execution_count": 46, + "metadata": {}, + "outputs": [], + "source": [ + "# export\n", + "class SingleFileLiner:\n", + " \"\"\"\n", + " Text data reading line by line for multiprocessing\n", + " \"\"\"\n", + "\n", + " def __init__(self, file_path: Path, total: int = None):\n", + " \"\"\"\n", + " filepath: Path,\n", + " path to a textual file\n", + " total: int\n", + " if you set total to an integer, we'll have a fixed length interation object\n", + " if you set total to None, we'll read the file till it's over\n", + " \"\"\"\n", + " self.file_path = Path(file_path)\n", + " global SingleFileLiner_fp\n", + " SingleFileLiner_fp = open(file_path, \"r\")\n", + " global text_line_num\n", + " text_line_num = 0\n", + " self.total = total\n", + "\n", + " def __repr__(self) -> str:\n", + " return f\"SingleFileLiner:\\t{self.file_path}\"\n", + "\n", + " def __next__(self) -> str:\n", + " global SingleFileLiner_fp\n", + " line = SingleFileLiner_fp.readline()\n", + " global text_line_num\n", + " text_line_num += 1\n", + " if line == \"\":\n", + " if self.total is None:\n", + " raise StopIteration(f\"SingleFileLiner file read finish\")\n", + " logging.warning(f\"looping on {self.file_path}\")\n", + "\n", + " self.restart()\n", + " return line.strip()\n", + "\n", + " def __len__(self):\n", + " if self.total is not None:\n", + " return self.total\n", + " else:\n", + " raise RuntimeError(f\"You have to set total for len(self)\")\n", + "\n", + " def __iter__(self) -> str:\n", + " if self.total is None:\n", + " while True:\n", + " try:\n", + " yield next(self)\n", + " except StopIteration:\n", + " # end the file reading\n", + " break\n", + " else:\n", + " for i in range(self.total):\n", + " yield next(self)\n", + "\n", + " def restart(self) -> None:\n", + " \"\"\"resetart file reader\"\"\"\n", + " global SingleFileLiner_fp\n", + " SingleFileLiner_fp.close()\n", + " SingleFileLiner_fp = open(self.file_path, \"r\")\n", + "\n", + " def split_train_test(\n", + " self,\n", + " val_ratio: float = 0.1,\n", + " remove_original: bool = False,\n", + " logging_interval: int = 1000000) -> Tuple[Path]:\n", + " \"\"\"\n", + " Split the text file into train and test\n", + " :param val_ratio:\n", + " :return:\n", + " \"\"\"\n", + " suffix = self.file_path.suffix\n", + "\n", + " train_file_path = self.file_path.parent / \\\n", + " f\"{self.file_path.stem}_train{suffix}\"\n", + " valid_file_path = self.file_path.parent / \\\n", + " f\"{self.file_path.stem}_valid{suffix}\"\n", + "\n", + " # delete the file if exists\n", + " if remove_original:\n", + " train_file_path.unlink()\n", + " valid_file_path.unlink()\n", + "\n", + " # read and write by line\n", + " with open(self.file_path, \"r\") as f, open(\n", + " train_file_path, \"w\") as f_train, open(\n", + " valid_file_path, \"w\") as f_valid:\n", + " for i, line in enumerate(f):\n", + " if i % 1000000 == 0:\n", + " logging.info(f\"{i}\\tlines processed\")\n", + " if random.random() < val_ratio:\n", + " f_valid.write(line)\n", + " else:\n", + " f_train.write(line)\n", + " return train_file_path, valid_file_path" + ] + }, + { + "cell_type": "code", + "execution_count": 47, + "metadata": {}, + "outputs": [], + "source": [ + "sfl = SingleFileLiner(\"../README.md\")\n", + "from joblib import Parallel, delayed\n", + "from time import sleep" + ] + }, + { + "cell_type": "code", + "execution_count": 48, + "metadata": {}, + "outputs": [], + "source": [ + "def get_line(x):\n", + "# sleep(1)\n", + " return x\n", + "\n", + "res = Parallel(backend=\"multiprocessing\", n_jobs=6)(delayed(get_line)(i) for i in sfl)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.4" + }, + "toc": { + "base_numbering": 1, + "nav_menu": {}, + "number_sections": true, + "sideBar": true, + "skip_h1_title": false, + "title_cell": "Table of Contents", + "title_sidebar": "Contents", + "toc_cell": false, + "toc_position": {}, + "toc_section_display": true, + "toc_window_display": false + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/settings.ini b/settings.ini index b416459..fecbddf 100644 --- a/settings.ini +++ b/settings.ini @@ -7,7 +7,7 @@ author = xiaochen(ray) zhang author_email = b2ray2c@gmail.com copyright = xiaochen(ray) zhang branch = master -version = 0.4.17 +version = 0.4.18 min_python = 3.6 audience = Developers language = English