diff --git a/avro/main.py b/avro/main.py index 9cc3319..9a8b4f9 100755 --- a/avro/main.py +++ b/avro/main.py @@ -5,7 +5,8 @@ import re from concurrent.futures import ThreadPoolExecutor, as_completed from functools import lru_cache -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from itertools import chain +from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union # Import local modules. from . import config @@ -50,59 +51,54 @@ def parse(*texts: str) -> Union[str, List[str]]: ``` """ - # Internal function for multiple parses. + # Compiled regular expression for UTF-8 validation + utf8_regex = re.compile(r'\A[\x00-\x7F]*\Z') + @lru_cache def _parse_backend(text: str) -> str: fixed_text = validate.fix_string_case( text - ) # Sanitize text case to meet phonetic comparison standards. - output = [] # The output list of strings. + ) # Sanitize input text to meet phonetic comparison standards. cur_end = 0 # Cursor end point. - # Iterate through input text. - for cur, i in enumerate(fixed_text): - try: - i.encode('utf-8') - except UnicodeDecodeError: - uni_pass = False - else: - uni_pass = True - - match = {'matched': False} - - if not uni_pass: - cur_end = cur + 1 - output.append(i) - - elif cur >= cur_end and uni_pass: - match = match_patterns(fixed_text, cur, rule=False) + def output_generator() -> Generator[str, None, None]: + nonlocal cur_end - if match['matched']: - output.append(match['replaced']) - cur_end = cur + len(match['found']) + # Iterate through input text. + for cur, i in enumerate(fixed_text): + uni_pass = utf8_regex.match(i) is not None - else: - match = match_patterns(fixed_text, cur, rule=True) + if not uni_pass: + cur_end = cur + 1 + yield i + elif cur >= cur_end and uni_pass: + match = match_patterns(fixed_text, cur, rule=False) + matched = match['matched'] - if match['matched']: + if matched: + yield match['replaced'] cur_end = cur + len(match['found']) - replaced = process_rules( - rules=match['rules'], fixed_text=fixed_text, cur=cur, cur_end=cur_end - ) + else: + match = match_patterns(fixed_text, cur, rule=True) + matched = match['matched'] - if replaced is not None: - output.append(replaced) + if matched: + cur_end = cur + len(match['found']) + replaced = process_rules( + rules=match['rules'], fixed_text=fixed_text, cur=cur, cur_end=cur_end + ) - else: - output.append(match['replaced']) + if replaced: + yield replaced + else: + yield match['replaced'] - if not match['matched']: - cur_end = cur + 1 - output.append(i) + if not matched: + cur_end = cur + 1 + yield i - return ''.join(output) + return ''.join(chain.from_iterable(output_generator())) - # Do the final output. output = _concurrency_helper(_parse_backend, texts) return output[0] if len(output) == 1 else output @@ -135,28 +131,16 @@ def _reverse_backend(text: str) -> str: for cur, i in enumerate(text): try: i.encode('utf-8') - except UnicodeDecodeError: - uni_pass = False - else: - uni_pass = True - - match = {'matched': False} - - if not uni_pass: - output.append(i) - - elif uni_pass: match = match_patterns(text, cur, rule=False, reversed=True) if match['matched']: - if match['reversed']: - output.append(match['reversed']) - else: - output.append(match['found']) - - if not match['matched']: + output.append(match['reversed'] if match['reversed'] else match['found']) + else: output.append(i) + except UnicodeDecodeError: + output.append(i) + return ''.join(output) # Split using regex to remove noise. @@ -165,17 +149,12 @@ def _reverse_backend(text: str) -> str: # Extension for the _reverse_backend() function. @lru_cache def _reverse_backend_ext(text: str) -> str: - text_segments = [] exceptions = config.EXCEPTIONS.get(text, None) if not exceptions: separated_texts = compiled_regex.split(text) - - for separated_text in separated_texts: - text_segments.append(_reverse_backend(separated_text)) - + text_segments = [_reverse_backend(separated_text) for separated_text in separated_texts] return ''.join(text_segments) - else: return exceptions