Skip to content

Commit

Permalink
🧊multiprocessing helper
Browse files Browse the repository at this point in the history
  • Loading branch information
raynardj committed Nov 27, 2021
1 parent 9bb4a79 commit d64168e
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 2 deletions.
2 changes: 1 addition & 1 deletion forgebox/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.17"
__version__ = "0.4.18"
2 changes: 2 additions & 0 deletions forgebox/_nbdev.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"compare_sentences": "06_spacy.ipynb",
"highlight": "06_spacy.ipynb",
"FreeMap": "07_freemap.ipynb",
"SingleFileLiner": "09_multiprocess.ipynb",
"Stuff": "10_loop.ipynb",
"method4all": "10_loop.ipynb",
"StorageCore": "10_loop.ipynb",
Expand Down Expand Up @@ -159,6 +160,7 @@
"flatten.py",
"spacy.py",
"freemap.py",
"multiproc.py",
"loop.py",
"etl.py",
"static_file.py",
Expand Down
105 changes: 105 additions & 0 deletions forgebox/multiproc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/09_multiprocess.ipynb (unless otherwise specified).

__all__ = ['SingleFileLiner']

# Cell
from pathlib import Path
import logging
from typing import List, Dict, Callable, Any, Tuple
import random

# Cell
class SingleFileLiner:
"""
Text data reading line by line for multiprocessing
"""

def __init__(self, file_path: Path, total: int = None):
"""
filepath: Path,
path to a textual file
total: int
if you set total to an integer, we'll have a fixed length interation object
if you set total to None, we'll read the file till it's over
"""
self.file_path = Path(file_path)
global SingleFileLiner_fp
SingleFileLiner_fp = open(file_path, "r")
global text_line_num
text_line_num = 0
self.total = total

def __repr__(self) -> str:
return f"SingleFileLiner:\t{self.file_path}"

def __next__(self) -> str:
global SingleFileLiner_fp
line = SingleFileLiner_fp.readline()
global text_line_num
text_line_num += 1
if line == "":
if self.total is None:
raise StopIteration(f"SingleFileLiner file read finish")
logging.warning(f"looping on {self.file_path}")

self.restart()
return line.strip()

def __len__(self):
if self.total is not None:
return self.total
else:
raise RuntimeError(f"You have to set total for len(self)")

def __iter__(self) -> str:
if self.total is None:
while True:
try:
yield next(self)
except StopIteration:
# end the file reading
break
else:
for i in range(self.total):
yield next(self)

def restart(self) -> None:
"""resetart file reader"""
global SingleFileLiner_fp
SingleFileLiner_fp.close()
SingleFileLiner_fp = open(self.file_path, "r")

def split_train_test(
self,
val_ratio: float = 0.1,
remove_original: bool = False,
logging_interval: int = 1000000) -> Tuple[Path]:
"""
Split the text file into train and test
:param val_ratio:
:return:
"""
suffix = self.file_path.suffix

train_file_path = self.file_path.parent / \
f"{self.file_path.stem}_train{suffix}"
valid_file_path = self.file_path.parent / \
f"{self.file_path.stem}_valid{suffix}"

# delete the file if exists
if remove_original:
train_file_path.unlink()
valid_file_path.unlink()

# read and write by line
with open(self.file_path, "r") as f, open(
train_file_path, "w") as f_train, open(
valid_file_path, "w") as f_valid:
for i, line in enumerate(f):
if i % 1000000 == 0:
logging.info(f"{i}\tlines processed")
if random.random() < val_ratio:
f_valid.write(line)
else:
f_train.write(line)
return train_file_path, valid_file_path
209 changes: 209 additions & 0 deletions nbs/09_multiprocess.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Multiprocessing Helper\n",
"> Things to smooth up the multiprocessing"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# default_exp multiproc"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"# export\n",
"from pathlib import Path\n",
"import logging\n",
"from typing import List, Dict, Callable, Any, Tuple\n",
"import random"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Read single file line by line"
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {},
"outputs": [],
"source": [
"# export\n",
"class SingleFileLiner:\n",
" \"\"\"\n",
" Text data reading line by line for multiprocessing\n",
" \"\"\"\n",
"\n",
" def __init__(self, file_path: Path, total: int = None):\n",
" \"\"\"\n",
" filepath: Path,\n",
" path to a textual file\n",
" total: int\n",
" if you set total to an integer, we'll have a fixed length interation object\n",
" if you set total to None, we'll read the file till it's over\n",
" \"\"\"\n",
" self.file_path = Path(file_path)\n",
" global SingleFileLiner_fp\n",
" SingleFileLiner_fp = open(file_path, \"r\")\n",
" global text_line_num\n",
" text_line_num = 0\n",
" self.total = total\n",
"\n",
" def __repr__(self) -> str:\n",
" return f\"SingleFileLiner:\\t{self.file_path}\"\n",
"\n",
" def __next__(self) -> str:\n",
" global SingleFileLiner_fp\n",
" line = SingleFileLiner_fp.readline()\n",
" global text_line_num\n",
" text_line_num += 1\n",
" if line == \"\":\n",
" if self.total is None:\n",
" raise StopIteration(f\"SingleFileLiner file read finish\")\n",
" logging.warning(f\"looping on {self.file_path}\")\n",
"\n",
" self.restart()\n",
" return line.strip()\n",
"\n",
" def __len__(self):\n",
" if self.total is not None:\n",
" return self.total\n",
" else:\n",
" raise RuntimeError(f\"You have to set total for len(self)\")\n",
"\n",
" def __iter__(self) -> str:\n",
" if self.total is None:\n",
" while True:\n",
" try:\n",
" yield next(self)\n",
" except StopIteration:\n",
" # end the file reading\n",
" break\n",
" else:\n",
" for i in range(self.total):\n",
" yield next(self)\n",
"\n",
" def restart(self) -> None:\n",
" \"\"\"resetart file reader\"\"\"\n",
" global SingleFileLiner_fp\n",
" SingleFileLiner_fp.close()\n",
" SingleFileLiner_fp = open(self.file_path, \"r\")\n",
"\n",
" def split_train_test(\n",
" self,\n",
" val_ratio: float = 0.1,\n",
" remove_original: bool = False,\n",
" logging_interval: int = 1000000) -> Tuple[Path]:\n",
" \"\"\"\n",
" Split the text file into train and test\n",
" :param val_ratio:\n",
" :return:\n",
" \"\"\"\n",
" suffix = self.file_path.suffix\n",
"\n",
" train_file_path = self.file_path.parent / \\\n",
" f\"{self.file_path.stem}_train{suffix}\"\n",
" valid_file_path = self.file_path.parent / \\\n",
" f\"{self.file_path.stem}_valid{suffix}\"\n",
"\n",
" # delete the file if exists\n",
" if remove_original:\n",
" train_file_path.unlink()\n",
" valid_file_path.unlink()\n",
"\n",
" # read and write by line\n",
" with open(self.file_path, \"r\") as f, open(\n",
" train_file_path, \"w\") as f_train, open(\n",
" valid_file_path, \"w\") as f_valid:\n",
" for i, line in enumerate(f):\n",
" if i % 1000000 == 0:\n",
" logging.info(f\"{i}\\tlines processed\")\n",
" if random.random() < val_ratio:\n",
" f_valid.write(line)\n",
" else:\n",
" f_train.write(line)\n",
" return train_file_path, valid_file_path"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {},
"outputs": [],
"source": [
"sfl = SingleFileLiner(\"../README.md\")\n",
"from joblib import Parallel, delayed\n",
"from time import sleep"
]
},
{
"cell_type": "code",
"execution_count": 48,
"metadata": {},
"outputs": [],
"source": [
"def get_line(x):\n",
"# sleep(1)\n",
" return x\n",
"\n",
"res = Parallel(backend=\"multiprocessing\", n_jobs=6)(delayed(get_line)(i) for i in sfl)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": false,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 4
}
2 changes: 1 addition & 1 deletion settings.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ author = xiaochen(ray) zhang
author_email = [email protected]
copyright = xiaochen(ray) zhang
branch = master
version = 0.4.17
version = 0.4.18
min_python = 3.6
audience = Developers
language = English
Expand Down

0 comments on commit d64168e

Please sign in to comment.