diff --git a/apis/api_helper.py b/apis/api_helper.py index f19352e02..292eb4210 100644 --- a/apis/api_helper.py +++ b/apis/api_helper.py @@ -37,7 +37,7 @@ def chunks(l, n): def multiprocessing(max_threads=None): if not max_threads: - max_threads = global_settings.get("max_threads", -1) + max_threads = -1 max_threads2 = cpu_count() if max_threads < 1 or max_threads >= max_threads2: pool = ThreadPool() @@ -47,10 +47,10 @@ def multiprocessing(max_threads=None): class session_manager(): - def __init__(self, original_sessions=[], headers: dict = {}, session_rules=None, session_retry_rules=None) -> None: + def __init__(self, original_sessions=[], headers: dict = {}, session_rules=None, session_retry_rules=None,max_threads=-1) -> None: self.sessions = self.copy_sessions(original_sessions) - self.pool = multiprocessing() - self.max_threads = self.pool._processes + self.pool = multiprocessing(max_threads) + self.max_threads = max_threads self.kill = False self.headers = headers self.session_rules = session_rules @@ -184,8 +184,9 @@ def test_session(proxy=None, cert=None, max_threads=None): while not sessions: proxies = [custom_proxy] if custom_proxy else proxies if proxies: - sessions = pool.starmap(test_session, product( - proxies, [cert], [max_threads])) + with pool: + sessions = pool.starmap(test_session, product( + proxies, [cert], [max_threads])) else: session = test_session(max_threads=max_threads) sessions.append(session) @@ -268,7 +269,7 @@ def multi(item): continue items = assign_session(links, session_manager.sessions) # item_groups = grouper(300,items) - pool = multiprocessing() + pool = session_manager.pool results = pool.starmap(multi, product( items)) not_faulty = [x for x in results if x] diff --git a/apis/onlyfans/onlyfans.py b/apis/onlyfans/onlyfans.py index caa02070a..3f63a3db7 100644 --- a/apis/onlyfans/onlyfans.py +++ b/apis/onlyfans/onlyfans.py @@ -254,7 +254,7 @@ def __init__(self, option={}) -> None: self.link = option.get("link") self.links = content_types() self.scraped = content_types() - self.auth_id: Optional[int] = None + self.authed: create_auth = option.get("authed") self.auth_count = None self.session_manager: api_helper.session_manager = option.get( "session_manager") @@ -474,30 +474,34 @@ def set_scraped(self, name, scraped: media_types): class start(): - def __init__(self, custom_request=callable) -> None: + def __init__(self, custom_request=callable, max_threads=-1) -> None: self.auths: list[create_auth] = [] self.subscriptions: list[create_subscription] = [] self.custom_request = custom_request - self.max_threads = -1 + self.max_threads = max_threads self.lists = None self.links = links - self.session_manager = api_helper.session_manager + self.pool = api_helper.multiprocessing() + self.session_manager = api_helper.session_manager( + session_rules=session_rules, session_retry_rules=session_retry_rules, max_threads=max_threads) self.settings = {} def set_auth_details(self, option={}, only_active=False): if only_active and not option.get("active"): return - auth = create_auth() + auth = create_auth(session_manager2=self.session_manager, + pool=self.pool) auth.auth_details = auth_details(option) self.auths.append(auth) return auth - def set_auth(self, me): - self.auth = me + def close_pools(self): + self.pool.close() + self.session_manager.pool.close() class create_auth(): - def __init__(self, option={}, api: Optional[start] = None, init=False) -> None: + def __init__(self, session_manager2: api_helper.session_manager, option={}, init=False, pool=None, ) -> None: self.id = option.get("id") self.username = option.get("username") if not self.username: @@ -513,16 +517,15 @@ def __init__(self, option={}, api: Optional[start] = None, init=False) -> None: self.archived_stories = {} self.mass_messages = [] self.paid_content = {} - self.session_manager = api_helper.session_manager( - session_rules=session_rules, session_retry_rules=session_retry_rules) + session_manager2 = copy.copy(session_manager2) + self.session_manager = session_manager2 + self.pool = pool self.auth_details: Optional[auth_details] = None self.profile_directory = option.get("profile_directory", "") self.guest = False self.active = False self.errors: list[error_details] = [] self.extras = {} - if api: - api.auths.append(self) valid_counts = ["chatMessagesCount"] args = [self.username, False, False] link_info = links(*args).full @@ -725,14 +728,18 @@ def get_subscriptions(self, resume=None, refresh=True, identifiers: list = [], e results = [] if self.isPerformer: temp_session_manager = self.session_manager + temp_pool = self.pool delattr(self, "session_manager") + delattr(self, "pool") json_authed = jsonpickle.encode( self, unpicklable=False) json_authed = jsonpickle.decode(json_authed) self.session_manager = temp_session_manager + self.pool = temp_pool json_authed = json_authed | self.get_user(self.username) subscription = create_subscription(json_authed) + subscription.authed = self subscription.session_manager = self.session_manager subscription = [subscription] results.append(subscription) @@ -756,11 +763,11 @@ def multi(item): subscription2 = self.get_user(subscription["username"]) subscription = subscription | subscription2 subscription = create_subscription(subscription) - subscription.auth_id = self.id + subscription.authed = self subscription.link = f"https://onlyfans.com/{subscription.username}" valid_subscriptions.append(subscription) return valid_subscriptions - pool = api_helper.multiprocessing() + pool = self.pool # offset_array= offset_array[:16] results += pool.starmap(multi, product( offset_array)) diff --git a/datascraper/main_datascraper.py b/datascraper/main_datascraper.py index bbad504f6..84432f552 100644 --- a/datascraper/main_datascraper.py +++ b/datascraper/main_datascraper.py @@ -25,7 +25,7 @@ # return text -def start_datascraper(json_config: dict, site_name_lower: str, api: Optional[OnlyFans.start] = None, webhooks=True): +def start_datascraper(json_config: dict, site_name_lower: str, api: Optional[OnlyFans.start] = None, webhooks=True)->Optional[OnlyFans.start]: json_settings = json_config["settings"] json_webhooks = json_settings["webhooks"] json_sites = json_config["supported"] @@ -48,13 +48,13 @@ def start_datascraper(json_config: dict, site_name_lower: str, api: Optional[Onl original_sessions = [x for x in original_sessions if x] if not original_sessions: print("Unable to create session") - return False + return None archive_time = timeit.default_timer() if site_name_lower == "onlyfans": site_name = "OnlyFans" module = m_onlyfans if not api: - api = OnlyFans.start() + api = OnlyFans.start(max_threads=json_settings["max_threads"]) api = main_helper.process_profiles( json_settings, original_sessions, site_name, api) print diff --git a/extras/OFRenamer/start.py b/extras/OFRenamer/start.py index e985f4564..af637b493 100644 --- a/extras/OFRenamer/start.py +++ b/extras/OFRenamer/start.py @@ -10,7 +10,7 @@ import traceback -def fix_directories(posts, all_files, database_session: scoped_session, folder, site_name, parent_type, api_type, username, base_directory, json_settings): +def fix_directories(api,posts, all_files, database_session: scoped_session, folder, site_name, parent_type, api_type, username, base_directory, json_settings): new_directories = [] def fix_directories(post: api_table, media_db: list[media_table]): @@ -111,9 +111,9 @@ def fix_directories(post: api_table, media_db: list[media_table]): return delete_rows result = database_session.query(folder.media_table) media_db = result.all() - pool = multiprocessing() + pool = api.pool delete_rows = pool.starmap(fix_directories, product( - posts, [media_db])) + posts, [media_db])) delete_rows = list(chain(*delete_rows)) for delete_row in delete_rows: database_session.query(folder.media_table).filter( @@ -123,7 +123,7 @@ def fix_directories(post: api_table, media_db: list[media_table]): return posts, new_directories -def start(Session, parent_type, api_type, api_path, site_name, subscription, folder, json_settings): +def start(api,Session, parent_type, api_type, api_path, site_name, subscription, folder, json_settings): api_table = folder.api_table media_table = folder.media_table database_session = Session() @@ -162,7 +162,7 @@ def start(Session, parent_type, api_type, api_path, site_name, subscription, fol all_files.extend(x) fixed, new_directories = fix_directories( - result, all_files, database_session, folder, site_name, parent_type, api_type, username, root_directory, json_settings) + api,result, all_files, database_session, folder, site_name, parent_type, api_type, username, root_directory, json_settings) database_session.close() return metadata diff --git a/helpers/db_helper.py b/helpers/db_helper.py index f2b0cd2d4..a6f35f62b 100644 --- a/helpers/db_helper.py +++ b/helpers/db_helper.py @@ -1,7 +1,7 @@ import os import sqlalchemy from sqlalchemy.engine.base import Engine -from sqlalchemy.orm.session import sessionmaker +from sqlalchemy.orm.session import Session, sessionmaker from sqlalchemy.orm import scoped_session from alembic.config import Config from alembic import command @@ -17,6 +17,7 @@ def create_database_session(connection_info, connection_type="sqlite:///", autoc kwargs["pool_size"] = pool_size kwargs["pool_pre_ping"] = True kwargs["max_overflow"] = -1 + kwargs["isolation_level"] = "READ COMMITTED" engine = sqlalchemy.create_engine( f'{connection_type}{connection_info}?charset=utf8mb4', **kwargs) @@ -72,7 +73,7 @@ def create_auth_array(item): return auth_array -def get_or_create(session, model, defaults=None, fbkwargs={}): +def get_or_create(session: Session, model, defaults=None, fbkwargs={}): instance = session.query(model).filter_by(**fbkwargs).one_or_none() if instance: return instance, True diff --git a/helpers/main_helper.py b/helpers/main_helper.py index e089bbedc..b9e0a74ae 100644 --- a/helpers/main_helper.py +++ b/helpers/main_helper.py @@ -12,7 +12,6 @@ import platform import re from datetime import datetime -import time from itertools import chain, zip_longest, groupby import psutil import shutil @@ -29,8 +28,6 @@ import classes.prepare_webhooks as prepare_webhooks from mergedeep import merge, Strategy import helpers.db_helper as db_helper -from alembic.config import Config -from alembic import command import traceback json_global_settings = None min_drive_space = 0 diff --git a/modules/onlyfans.py b/modules/onlyfans.py index 0b8b981f1..11915676d 100644 --- a/modules/onlyfans.py +++ b/modules/onlyfans.py @@ -23,8 +23,6 @@ from helpers.main_helper import choose_option, download_session, export_data, import_archive import extras.OFLogin.start_ofl as oflogin -multiprocessing = main_helper.multiprocessing - site_name = "OnlyFans" json_config = None json_global_settings = None @@ -340,7 +338,7 @@ def paid_content_scraper(api: start, identifiers=[]): authed, new_metadata, formatted_directories, subscription, api_type, api_path, metadata_path, site_name) parent_type = "" new_metadata = new_metadata + old_metadata - w = process_metadata(metadata_path, formatted_directories, new_metadata, + w = process_metadata(api, metadata_path, formatted_directories, new_metadata, site_name, parent_type, api_path, subscription, delete_metadatas) print @@ -554,7 +552,7 @@ def process_legacy_metadata(authed: create_auth, new_metadata_set, formatted_dir return final_set, delete_metadatas -def process_metadata(archive_path: str, formatted_directories: dict, new_metadata_object, site_name, parent_type, api_path, subscription, delete_metadatas): +def process_metadata(api, archive_path: str, formatted_directories: dict, new_metadata_object, site_name, parent_type, api_path, subscription, delete_metadatas): print Session, api_type, folder = main_helper.export_sqlite( archive_path, new_metadata_object, parent_type) @@ -568,7 +566,7 @@ def process_metadata(archive_path: str, formatted_directories: dict, new_metadat if json_global_settings["helpers"]["renamer"]: print("Renaming files.") new_metadata_object = ofrenamer.start( - Session, parent_type, api_type, api_path, site_name, subscription, folder, json_settings) + api, Session, parent_type, api_type, api_path, site_name, subscription, folder, json_settings) if delete_legacy_metadata: for old_metadata in delete_metadatas: if os.path.exists(old_metadata): @@ -627,7 +625,7 @@ def prepare_scraper(authed: create_auth, site_name, item): media_type = api_array["media_types"] username = api_array["username"] master_set = [] - pool = multiprocessing() + pool = authed.pool mandatory_directories = {} mandatory_directories["profile_directory"] = profile_directory mandatory_directories["download_directory"] = download_directory @@ -696,13 +694,12 @@ def prepare_scraper(authed: create_auth, site_name, item): new_metadata = new_metadata + old_metadata subscription.set_scraped(api_type, new_metadata) print - w = process_metadata(metadata_path, formatted_directories, new_metadata, + w = process_metadata(authed, metadata_path, formatted_directories, new_metadata, site_name, parent_type, api_path, subscription, delete_metadatas) print else: print("No "+api_type+" Found.") delattr(subscription.scraped, api_type) - return True @@ -1124,7 +1121,7 @@ def __init__(self, authed: create_auth = None, subscription=None) -> None: d_session = download_session() d_session.start(unit='B', unit_scale=True, miniters=1) - pool = multiprocessing() + pool = authed.session_manager.pool pool.starmap(self.prepare_download, product( media_set, [authed], [api_type], [subscription], [d_session])) d_session.close() diff --git a/start_ofd.py b/start_ofd.py index e3de493ab..a9f381822 100755 --- a/start_ofd.py +++ b/start_ofd.py @@ -22,7 +22,7 @@ json_sites = json_config["supported"] domain = json_settings["auto_site_choice"] string, site_names = main_helper.module_chooser(domain, json_sites) - + # logging.basicConfig(level=logging.DEBUG, format="%(message)s") while True: try: @@ -40,7 +40,10 @@ x = int(x) site_name = site_names[x] site_name_lower = site_name.lower() - apis = main_datascraper.start_datascraper(json_config, site_name_lower) + api = main_datascraper.start_datascraper( + json_config, site_name_lower) + if api: + api.close_pools() if exit_on_completion: print("Now exiting.") exit(0)