Skip to content

Commit

Permalink
[Pii-recognizer] Update the function to be compatible with the call c…
Browse files Browse the repository at this point in the history
…enter demo
  • Loading branch information
yonishelach committed Jan 14, 2024
1 parent 231fb7f commit 7ff1c92
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 245 deletions.
117 changes: 58 additions & 59 deletions pii_recognizer/function.yaml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pii_recognizer/item.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ spec:
- st-annotated-text
- https://huggingface.co/beki/en_spacy_pii_distilbert/resolve/main/en_spacy_pii_distilbert-any-py3-none-any.whl
url: ''
version: 0.1.0
version: 0.2.0
test_valid: False
219 changes: 34 additions & 185 deletions pii_recognizer/pii_recognizer.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,32 @@
# Copyright 2019 Iguazio
# Copyright 2023 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import os
import pathlib
import tempfile
import warnings
import pandas as pd
from collections.abc import Iterable
from multiprocessing import Pool, cpu_count
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from typing import List, Set, Tuple, Union

import annotated_text.util as at_util
import mlrun
import nltk
import pandas as pd
import presidio_analyzer as pa
import presidio_anonymizer as pre_anoymizer
from presidio_anonymizer.entities import OperatorConfig
from tqdm.auto import tqdm
from tqdm import tqdm

try:
import flair as fl
Expand Down Expand Up @@ -393,7 +390,6 @@ def analyze(
:param text: The text for analysis.
:param entities: The list of entities to recognize.
:param nlp_artifacts: Not used by this recognizer but needed for the interface.
:param language: Text language. Supported languages in MODEL_LANGUAGES
:returns: The list of Presidio RecognizerResult constructed from the recognized Flair detections.
"""
Expand Down Expand Up @@ -711,11 +707,11 @@ def _process(
entities: List[str] = None,
entities_operator_map: dict = None,
is_full_text: bool = True,
) -> Tuple[str, str, str]:
) -> Tuple[str, list]:
"""
Process the text of str using the model.
:param txt: Text to process
:param text: Text to process
:param model: Model to use for processing
:param entities: Entities to recognize
:param entities_operator_map: The entity_operator_map is a dictionary that maps entity to operator name and operator params.
Expand All @@ -729,7 +725,6 @@ def _process(
"""

# get the analyzer engine

analyzer = model

# analyze the text that can be used for anonymization
Expand Down Expand Up @@ -850,9 +845,9 @@ def _get_all_rpt(res_dict: dict, is_full_report: bool = True):
def recognize_pii(
context: mlrun.MLClientCtx,
input_path: Union[str, pathlib.Path],
output_path: str,
html_key: str,
score_threshold: float,
output_directory: str = None,
entities: List[
str
] = None, # List of entities to recognize, default is recognizing all
Expand All @@ -863,67 +858,60 @@ def recognize_pii(
is_full_text: bool = True,
is_full_html: bool = True,
is_full_report: bool = True,
) -> Tuple[pathlib.Path, dict, dict]:
) -> Union[Tuple[str, pd.DataFrame, dict, dict], Tuple[str, pd.DataFrame, dict]]:
"""
Walk through the input path, recognize PII in text and store the anonymized text in the output path. Generate the html with different colors for each entity, json report of the explaination.
Walk through the input path, recognize PII in text and store the anonymized text in the output path.
Generate the html with different colors for each entity, json report of the explanation.
:param context: The MLRun context. this is needed for log the artifacts.
:param input_path: The input path of the text files needs to be analyzied.
:param output_path: The output path to store the anonymized text.
:param input_path: The input path of the text files needs to be analyzed.
:param html_key: The html key for the artifact.
:param score_threshold: The score threshold to mark the recognition as trusted.
:param output_directory: The output directory path to store the anonymized text.
:param entities: The list of entities to recognize.
:param entity_operator_map: The map of entity to operator (mask, redact, replace, keep, hash, and its params)
:param model: The model to use. Can be "spacy", "flair", "pattern" or "whole".
:param generate_json: Whether to generate the json report of the explaination.
:param generate_html: Whether to generate the html report of the explaination.
:param generate_json: Whether to generate the json report of the explanation.
:param generate_html: Whether to generate the html report of the explanation.
:param is_full_text: Whether to return the full text or only the masked text.
:param is_full_html: Whether to return the full html or just the annotated text
:param is_full_report: Whether to return the full report or just the score and start, end index
:returns: A tuple of:
* Path to the output directory
* The json report of the explaination (if generate_json is True)
* The json report of the explanation (if generate_json is True)
* A dictionary of errors files that were not processed
"""

# Set output directory
if output_path is None:
output_path = tempfile.mkdtemp()
if output_directory is None:
output_directory = tempfile.mkdtemp()

# Create the output directory:
output_directory = pathlib.Path(output_path)
output_directory = pathlib.Path(output_directory)
if not output_directory.exists():
output_directory.mkdir()
output_directory.mkdir(parents=True, exist_ok=True)

txt_files_directory = pathlib.Path(input_path)
successes = []
errors = {}

res_dict = {}
txt_content = {}
# Load the model:
try:
analyzer = _get_analyzer_engine(model, entities)
except Exception as e:
errors["model"] = str(e)
logger.error(f"Error when get the model: {e}")

analyzer = _get_analyzer_engine(model, entities)
logger.info("Model loaded")
# Go over the text files in the input path, analyze and anonymize them:
for i, txt_file in enumerate(
tqdm(
list(txt_files_directory.glob("*.txt")),
desc="Processing files",
unit="file",
)
for txt_file in tqdm(
list(txt_files_directory.glob("*.txt")),
desc="Processing files",
unit="file",
):
try:
# Load the str from the text file
text = txt_file.read_text()
# TODO maybe the encoding issue if from this function call of tqdm.read_text()
# Need to fix it later
txt_content[str(txt_file)] = text
# Process the text to recoginze the pii entities in it
anonymized_text, results = _process(
Expand All @@ -936,158 +924,19 @@ def recognize_pii(
)
res_dict[str(txt_file)] = results
# Store the anonymized text in the output path
output_file = (
output_directory
/ f"{str(txt_file.relative_to(txt_files_directory)).split('.')[0]}.txt"
)
output_file = output_directory / f"{txt_file.stem}.txt"
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, "w") as f:
f.write(anonymized_text)

successes.append([txt_file.name, output_file.name])
except Exception as e:
errors[str(txt_file)] = str(e)
logger.error(f"Error processing {txt_file}: {e}")
if generate_html:
# Generate the html report
html_res = _get_all_html(txt_content, res_dict, is_full_html)
# Store the html report in the context
arti_html = mlrun.artifacts.Artifact(body=html_res, format="html", key=html_key)
context.log_artifact(arti_html)
if generate_json:
# Generate the json report
json_res = _get_all_rpt(res_dict, is_full_report)
return output_path, json_res, errors
return output_path, errors


def _recognize_pii_one_file(
input_file: str,
output_file: str,
score_threshold: float,
entities: List[
str
] = None, # List of entities to recognize, default is recognizing all
entity_operator_map: dict = None,
model: str = None,
is_full_text: bool = True,
) -> Tuple[dict, dict, dict]:
"""
Recognize PII in text and store the anonymized text in the output path. Generate the html with different colors for each entity, json report of the explaination.
:param input_file: The input path of the text files needs to be analyzied.
:param output_file: The output path to store the anonymized text.
:param score_threshold: The score threshold to mark the recognition as trusted.
:param entities: The list of entities to recognize.
:param entity_operator_map: The map of entity to operator (mask, redact, replace, keep, hash, and its params)
:param model: The model to use. Can be "spacy", "flair", "pattern" or "whole".
:param is_full_text: Whether to return the full text or only the masked text.
:returns: A tuple of:
* A dictionary of the text content of the input file
* A dictionary of the results of the explaination
* A dictionary of errors files that were not processed
"""

errors = {}
res_dict = {}
txt_content = {}
# Load the model:
try:
analyzer = _get_analyzer_engine(model, entities)
except Exception as e:
errors["model"] = str(e)
logger.error(f"Error when get the model: {e}")

logger.info("Model loaded")
try:
# Load the str from the text file
with open(input_file, "r", encoding="utf-8") as file:
text = file.read()
txt_content[str(input_file)] = text
# Process the text to recoginze the pii entities in it
anonymized_text, results = _process(
text=text,
model=analyzer,
entities=entities,
entities_operator_map=entity_operator_map,
score_threshold=score_threshold,
is_full_text=is_full_text,
)
res_dict[str(input_file)] = results
with open(output_file, "w", encoding="utf-8") as f:
f.write(anonymized_text)

except Exception as e:
errors[str(txt_file)] = str(e)
logger.error(f"Error processing {txt_file}: {e}")

return res_dict, txt_content, errors


def recognize_pii_parallel(
context: mlrun.MLClientCtx,
config_input_output: str,
score_threshold: float,
html_key: str,
entities: List[str] = None,
entity_operator_map: Dict = None,
model: str = None,
generate_html: bool = True,
generate_json: bool = True,
is_full_html: bool = True,
is_full_text: bool = True,
is_full_report: bool = True,
num_processes: int = None,
) -> Tuple[dict, dict]:
"""Doing a fan-in and fan-out pattern using mutiple processes for cpu node, Since our model is mixed with rule_based and NLP model based. Both Spacy and Flair do not support the cuda GPU natively. For now, we can use all the cores that a CPU offers.
:param context: The MLRun context. this is needed
:param config_input_output csv file which have the input file path and output file path
:param score_threshold: The threshold of the score to recognize the entities
:param html_key: The key of the html report in the context
:entities List of entities to recognize, default is recognizing all
:entity_operator_map The map of the entities and the operator to use. For example, {"PERSON": "replace", "LOCATION": "mask"}
:param model The model to use. Can be "spacy", "flair", "pattern" or "whole".
:param generate_html: Whether to generate the html report
:param generate_json: Whether to generate the json report
:param is_full_html: Whether to generate the full html report
:param is_full_text: Whether to generate the full text in the html report
:param is_full_report: Whether to generate the full json report
:param num_process The number of process to run in parallel
:returns: A tuple of:
* A json report of the result explaination
* A dictionary of errors files that were not processed
"""
if num_processes is None:
num_processes = cpu_count()

# Read the CSV into a DataFrame
config_df = pd.read_csv(config_input_output)

# Convert DataFrame rows into a list of tuples, each tuple is arguments for `_recognize_pii_one_file`
tasks = [
(
row["input_file"],
row["output_file"],
score_threshold,
entities,
entity_operator_map,
model,
is_full_text,
)
for _, row in config_df.iterrows()
]
# Create a pool of processes and distribute the tasks
with Pool(processes=num_processes) as pool:
res = pool.starmap(_recognize_pii_one_file, tasks)
# Get the results
res_dict = {}
txt_content = {}
errors = {}
for r in res:
res_dict.update(r[0])
txt_content.update(r[1])
errors.update(r[2])
successes = pd.DataFrame(
successes,
columns=["original_file", "anonymized_file"],
)

if generate_html:
# Generate the html report
Expand All @@ -1098,5 +947,5 @@ def recognize_pii_parallel(
if generate_json:
# Generate the json report
json_res = _get_all_rpt(res_dict, is_full_report)
return json_res, errors
return errors
return str(output_directory), successes, errors, json_res
return str(output_directory), successes, errors

0 comments on commit 7ff1c92

Please sign in to comment.