From aa8e7cf660e22875b652c114e28ef19cd117c0c9 Mon Sep 17 00:00:00 2001 From: CRIMINAL Date: Sat, 15 May 2021 22:10:27 +0100 Subject: [PATCH] Destroy threads and assign max_threads to api requests Script will only limit threads when doing network requests. It will use all threads when processing data. When doing infinite runs, the script will destroy old threads. --- apis/api_helper.py | 15 +++++++------- apis/onlyfans/onlyfans.py | 35 ++++++++++++++++++++------------- datascraper/main_datascraper.py | 6 +++--- extras/OFRenamer/start.py | 10 +++++----- helpers/db_helper.py | 5 +++-- helpers/main_helper.py | 3 --- modules/onlyfans.py | 15 ++++++-------- start_ofd.py | 7 +++++-- 8 files changed, 51 insertions(+), 45 deletions(-) diff --git a/apis/api_helper.py b/apis/api_helper.py index f19352e02..292eb4210 100644 --- a/apis/api_helper.py +++ b/apis/api_helper.py @@ -37,7 +37,7 @@ def chunks(l, n): def multiprocessing(max_threads=None): if not max_threads: - max_threads = global_settings.get("max_threads", -1) + max_threads = -1 max_threads2 = cpu_count() if max_threads < 1 or max_threads >= max_threads2: pool = ThreadPool() @@ -47,10 +47,10 @@ def multiprocessing(max_threads=None): class session_manager(): - def __init__(self, original_sessions=[], headers: dict = {}, session_rules=None, session_retry_rules=None) -> None: + def __init__(self, original_sessions=[], headers: dict = {}, session_rules=None, session_retry_rules=None,max_threads=-1) -> None: self.sessions = self.copy_sessions(original_sessions) - self.pool = multiprocessing() - self.max_threads = self.pool._processes + self.pool = multiprocessing(max_threads) + self.max_threads = max_threads self.kill = False self.headers = headers self.session_rules = session_rules @@ -184,8 +184,9 @@ def test_session(proxy=None, cert=None, max_threads=None): while not sessions: proxies = [custom_proxy] if custom_proxy else proxies if proxies: - sessions = pool.starmap(test_session, product( - proxies, [cert], [max_threads])) + with pool: + sessions = pool.starmap(test_session, product( + proxies, [cert], [max_threads])) else: session = test_session(max_threads=max_threads) sessions.append(session) @@ -268,7 +269,7 @@ def multi(item): continue items = assign_session(links, session_manager.sessions) # item_groups = grouper(300,items) - pool = multiprocessing() + pool = session_manager.pool results = pool.starmap(multi, product( items)) not_faulty = [x for x in results if x] diff --git a/apis/onlyfans/onlyfans.py b/apis/onlyfans/onlyfans.py index caa02070a..3f63a3db7 100644 --- a/apis/onlyfans/onlyfans.py +++ b/apis/onlyfans/onlyfans.py @@ -254,7 +254,7 @@ def __init__(self, option={}) -> None: self.link = option.get("link") self.links = content_types() self.scraped = content_types() - self.auth_id: Optional[int] = None + self.authed: create_auth = option.get("authed") self.auth_count = None self.session_manager: api_helper.session_manager = option.get( "session_manager") @@ -474,30 +474,34 @@ def set_scraped(self, name, scraped: media_types): class start(): - def __init__(self, custom_request=callable) -> None: + def __init__(self, custom_request=callable, max_threads=-1) -> None: self.auths: list[create_auth] = [] self.subscriptions: list[create_subscription] = [] self.custom_request = custom_request - self.max_threads = -1 + self.max_threads = max_threads self.lists = None self.links = links - self.session_manager = api_helper.session_manager + self.pool = api_helper.multiprocessing() + self.session_manager = api_helper.session_manager( + session_rules=session_rules, session_retry_rules=session_retry_rules, max_threads=max_threads) self.settings = {} def set_auth_details(self, option={}, only_active=False): if only_active and not option.get("active"): return - auth = create_auth() + auth = create_auth(session_manager2=self.session_manager, + pool=self.pool) auth.auth_details = auth_details(option) self.auths.append(auth) return auth - def set_auth(self, me): - self.auth = me + def close_pools(self): + self.pool.close() + self.session_manager.pool.close() class create_auth(): - def __init__(self, option={}, api: Optional[start] = None, init=False) -> None: + def __init__(self, session_manager2: api_helper.session_manager, option={}, init=False, pool=None, ) -> None: self.id = option.get("id") self.username = option.get("username") if not self.username: @@ -513,16 +517,15 @@ def __init__(self, option={}, api: Optional[start] = None, init=False) -> None: self.archived_stories = {} self.mass_messages = [] self.paid_content = {} - self.session_manager = api_helper.session_manager( - session_rules=session_rules, session_retry_rules=session_retry_rules) + session_manager2 = copy.copy(session_manager2) + self.session_manager = session_manager2 + self.pool = pool self.auth_details: Optional[auth_details] = None self.profile_directory = option.get("profile_directory", "") self.guest = False self.active = False self.errors: list[error_details] = [] self.extras = {} - if api: - api.auths.append(self) valid_counts = ["chatMessagesCount"] args = [self.username, False, False] link_info = links(*args).full @@ -725,14 +728,18 @@ def get_subscriptions(self, resume=None, refresh=True, identifiers: list = [], e results = [] if self.isPerformer: temp_session_manager = self.session_manager + temp_pool = self.pool delattr(self, "session_manager") + delattr(self, "pool") json_authed = jsonpickle.encode( self, unpicklable=False) json_authed = jsonpickle.decode(json_authed) self.session_manager = temp_session_manager + self.pool = temp_pool json_authed = json_authed | self.get_user(self.username) subscription = create_subscription(json_authed) + subscription.authed = self subscription.session_manager = self.session_manager subscription = [subscription] results.append(subscription) @@ -756,11 +763,11 @@ def multi(item): subscription2 = self.get_user(subscription["username"]) subscription = subscription | subscription2 subscription = create_subscription(subscription) - subscription.auth_id = self.id + subscription.authed = self subscription.link = f"https://onlyfans.com/{subscription.username}" valid_subscriptions.append(subscription) return valid_subscriptions - pool = api_helper.multiprocessing() + pool = self.pool # offset_array= offset_array[:16] results += pool.starmap(multi, product( offset_array)) diff --git a/datascraper/main_datascraper.py b/datascraper/main_datascraper.py index bbad504f6..84432f552 100644 --- a/datascraper/main_datascraper.py +++ b/datascraper/main_datascraper.py @@ -25,7 +25,7 @@ # return text -def start_datascraper(json_config: dict, site_name_lower: str, api: Optional[OnlyFans.start] = None, webhooks=True): +def start_datascraper(json_config: dict, site_name_lower: str, api: Optional[OnlyFans.start] = None, webhooks=True)->Optional[OnlyFans.start]: json_settings = json_config["settings"] json_webhooks = json_settings["webhooks"] json_sites = json_config["supported"] @@ -48,13 +48,13 @@ def start_datascraper(json_config: dict, site_name_lower: str, api: Optional[Onl original_sessions = [x for x in original_sessions if x] if not original_sessions: print("Unable to create session") - return False + return None archive_time = timeit.default_timer() if site_name_lower == "onlyfans": site_name = "OnlyFans" module = m_onlyfans if not api: - api = OnlyFans.start() + api = OnlyFans.start(max_threads=json_settings["max_threads"]) api = main_helper.process_profiles( json_settings, original_sessions, site_name, api) print diff --git a/extras/OFRenamer/start.py b/extras/OFRenamer/start.py index e985f4564..af637b493 100644 --- a/extras/OFRenamer/start.py +++ b/extras/OFRenamer/start.py @@ -10,7 +10,7 @@ import traceback -def fix_directories(posts, all_files, database_session: scoped_session, folder, site_name, parent_type, api_type, username, base_directory, json_settings): +def fix_directories(api,posts, all_files, database_session: scoped_session, folder, site_name, parent_type, api_type, username, base_directory, json_settings): new_directories = [] def fix_directories(post: api_table, media_db: list[media_table]): @@ -111,9 +111,9 @@ def fix_directories(post: api_table, media_db: list[media_table]): return delete_rows result = database_session.query(folder.media_table) media_db = result.all() - pool = multiprocessing() + pool = api.pool delete_rows = pool.starmap(fix_directories, product( - posts, [media_db])) + posts, [media_db])) delete_rows = list(chain(*delete_rows)) for delete_row in delete_rows: database_session.query(folder.media_table).filter( @@ -123,7 +123,7 @@ def fix_directories(post: api_table, media_db: list[media_table]): return posts, new_directories -def start(Session, parent_type, api_type, api_path, site_name, subscription, folder, json_settings): +def start(api,Session, parent_type, api_type, api_path, site_name, subscription, folder, json_settings): api_table = folder.api_table media_table = folder.media_table database_session = Session() @@ -162,7 +162,7 @@ def start(Session, parent_type, api_type, api_path, site_name, subscription, fol all_files.extend(x) fixed, new_directories = fix_directories( - result, all_files, database_session, folder, site_name, parent_type, api_type, username, root_directory, json_settings) + api,result, all_files, database_session, folder, site_name, parent_type, api_type, username, root_directory, json_settings) database_session.close() return metadata diff --git a/helpers/db_helper.py b/helpers/db_helper.py index f2b0cd2d4..a6f35f62b 100644 --- a/helpers/db_helper.py +++ b/helpers/db_helper.py @@ -1,7 +1,7 @@ import os import sqlalchemy from sqlalchemy.engine.base import Engine -from sqlalchemy.orm.session import sessionmaker +from sqlalchemy.orm.session import Session, sessionmaker from sqlalchemy.orm import scoped_session from alembic.config import Config from alembic import command @@ -17,6 +17,7 @@ def create_database_session(connection_info, connection_type="sqlite:///", autoc kwargs["pool_size"] = pool_size kwargs["pool_pre_ping"] = True kwargs["max_overflow"] = -1 + kwargs["isolation_level"] = "READ COMMITTED" engine = sqlalchemy.create_engine( f'{connection_type}{connection_info}?charset=utf8mb4', **kwargs) @@ -72,7 +73,7 @@ def create_auth_array(item): return auth_array -def get_or_create(session, model, defaults=None, fbkwargs={}): +def get_or_create(session: Session, model, defaults=None, fbkwargs={}): instance = session.query(model).filter_by(**fbkwargs).one_or_none() if instance: return instance, True diff --git a/helpers/main_helper.py b/helpers/main_helper.py index e089bbedc..b9e0a74ae 100644 --- a/helpers/main_helper.py +++ b/helpers/main_helper.py @@ -12,7 +12,6 @@ import platform import re from datetime import datetime -import time from itertools import chain, zip_longest, groupby import psutil import shutil @@ -29,8 +28,6 @@ import classes.prepare_webhooks as prepare_webhooks from mergedeep import merge, Strategy import helpers.db_helper as db_helper -from alembic.config import Config -from alembic import command import traceback json_global_settings = None min_drive_space = 0 diff --git a/modules/onlyfans.py b/modules/onlyfans.py index 0b8b981f1..11915676d 100644 --- a/modules/onlyfans.py +++ b/modules/onlyfans.py @@ -23,8 +23,6 @@ from helpers.main_helper import choose_option, download_session, export_data, import_archive import extras.OFLogin.start_ofl as oflogin -multiprocessing = main_helper.multiprocessing - site_name = "OnlyFans" json_config = None json_global_settings = None @@ -340,7 +338,7 @@ def paid_content_scraper(api: start, identifiers=[]): authed, new_metadata, formatted_directories, subscription, api_type, api_path, metadata_path, site_name) parent_type = "" new_metadata = new_metadata + old_metadata - w = process_metadata(metadata_path, formatted_directories, new_metadata, + w = process_metadata(api, metadata_path, formatted_directories, new_metadata, site_name, parent_type, api_path, subscription, delete_metadatas) print @@ -554,7 +552,7 @@ def process_legacy_metadata(authed: create_auth, new_metadata_set, formatted_dir return final_set, delete_metadatas -def process_metadata(archive_path: str, formatted_directories: dict, new_metadata_object, site_name, parent_type, api_path, subscription, delete_metadatas): +def process_metadata(api, archive_path: str, formatted_directories: dict, new_metadata_object, site_name, parent_type, api_path, subscription, delete_metadatas): print Session, api_type, folder = main_helper.export_sqlite( archive_path, new_metadata_object, parent_type) @@ -568,7 +566,7 @@ def process_metadata(archive_path: str, formatted_directories: dict, new_metadat if json_global_settings["helpers"]["renamer"]: print("Renaming files.") new_metadata_object = ofrenamer.start( - Session, parent_type, api_type, api_path, site_name, subscription, folder, json_settings) + api, Session, parent_type, api_type, api_path, site_name, subscription, folder, json_settings) if delete_legacy_metadata: for old_metadata in delete_metadatas: if os.path.exists(old_metadata): @@ -627,7 +625,7 @@ def prepare_scraper(authed: create_auth, site_name, item): media_type = api_array["media_types"] username = api_array["username"] master_set = [] - pool = multiprocessing() + pool = authed.pool mandatory_directories = {} mandatory_directories["profile_directory"] = profile_directory mandatory_directories["download_directory"] = download_directory @@ -696,13 +694,12 @@ def prepare_scraper(authed: create_auth, site_name, item): new_metadata = new_metadata + old_metadata subscription.set_scraped(api_type, new_metadata) print - w = process_metadata(metadata_path, formatted_directories, new_metadata, + w = process_metadata(authed, metadata_path, formatted_directories, new_metadata, site_name, parent_type, api_path, subscription, delete_metadatas) print else: print("No "+api_type+" Found.") delattr(subscription.scraped, api_type) - return True @@ -1124,7 +1121,7 @@ def __init__(self, authed: create_auth = None, subscription=None) -> None: d_session = download_session() d_session.start(unit='B', unit_scale=True, miniters=1) - pool = multiprocessing() + pool = authed.session_manager.pool pool.starmap(self.prepare_download, product( media_set, [authed], [api_type], [subscription], [d_session])) d_session.close() diff --git a/start_ofd.py b/start_ofd.py index e3de493ab..a9f381822 100755 --- a/start_ofd.py +++ b/start_ofd.py @@ -22,7 +22,7 @@ json_sites = json_config["supported"] domain = json_settings["auto_site_choice"] string, site_names = main_helper.module_chooser(domain, json_sites) - + # logging.basicConfig(level=logging.DEBUG, format="%(message)s") while True: try: @@ -40,7 +40,10 @@ x = int(x) site_name = site_names[x] site_name_lower = site_name.lower() - apis = main_datascraper.start_datascraper(json_config, site_name_lower) + api = main_datascraper.start_datascraper( + json_config, site_name_lower) + if api: + api.close_pools() if exit_on_completion: print("Now exiting.") exit(0)