Skip to content

Commit

Permalink
Destroy threads and assign max_threads to api requests
Browse files Browse the repository at this point in the history
Script will only limit threads when doing network requests. It will use all threads when processing data.

When doing infinite runs, the script will destroy old threads.
  • Loading branch information
UltimaHoarder committed May 15, 2021
1 parent 0cdb3c8 commit aa8e7cf
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 45 deletions.
15 changes: 8 additions & 7 deletions apis/api_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def chunks(l, n):

def multiprocessing(max_threads=None):
if not max_threads:
max_threads = global_settings.get("max_threads", -1)
max_threads = -1
max_threads2 = cpu_count()
if max_threads < 1 or max_threads >= max_threads2:
pool = ThreadPool()
Expand All @@ -47,10 +47,10 @@ def multiprocessing(max_threads=None):


class session_manager():
def __init__(self, original_sessions=[], headers: dict = {}, session_rules=None, session_retry_rules=None) -> None:
def __init__(self, original_sessions=[], headers: dict = {}, session_rules=None, session_retry_rules=None,max_threads=-1) -> None:
self.sessions = self.copy_sessions(original_sessions)
self.pool = multiprocessing()
self.max_threads = self.pool._processes
self.pool = multiprocessing(max_threads)
self.max_threads = max_threads
self.kill = False
self.headers = headers
self.session_rules = session_rules
Expand Down Expand Up @@ -184,8 +184,9 @@ def test_session(proxy=None, cert=None, max_threads=None):
while not sessions:
proxies = [custom_proxy] if custom_proxy else proxies
if proxies:
sessions = pool.starmap(test_session, product(
proxies, [cert], [max_threads]))
with pool:
sessions = pool.starmap(test_session, product(
proxies, [cert], [max_threads]))
else:
session = test_session(max_threads=max_threads)
sessions.append(session)
Expand Down Expand Up @@ -268,7 +269,7 @@ def multi(item):
continue
items = assign_session(links, session_manager.sessions)
# item_groups = grouper(300,items)
pool = multiprocessing()
pool = session_manager.pool
results = pool.starmap(multi, product(
items))
not_faulty = [x for x in results if x]
Expand Down
35 changes: 21 additions & 14 deletions apis/onlyfans/onlyfans.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def __init__(self, option={}) -> None:
self.link = option.get("link")
self.links = content_types()
self.scraped = content_types()
self.auth_id: Optional[int] = None
self.authed: create_auth = option.get("authed")
self.auth_count = None
self.session_manager: api_helper.session_manager = option.get(
"session_manager")
Expand Down Expand Up @@ -474,30 +474,34 @@ def set_scraped(self, name, scraped: media_types):


class start():
def __init__(self, custom_request=callable) -> None:
def __init__(self, custom_request=callable, max_threads=-1) -> None:
self.auths: list[create_auth] = []
self.subscriptions: list[create_subscription] = []
self.custom_request = custom_request
self.max_threads = -1
self.max_threads = max_threads
self.lists = None
self.links = links
self.session_manager = api_helper.session_manager
self.pool = api_helper.multiprocessing()
self.session_manager = api_helper.session_manager(
session_rules=session_rules, session_retry_rules=session_retry_rules, max_threads=max_threads)
self.settings = {}

def set_auth_details(self, option={}, only_active=False):
if only_active and not option.get("active"):
return
auth = create_auth()
auth = create_auth(session_manager2=self.session_manager,
pool=self.pool)
auth.auth_details = auth_details(option)
self.auths.append(auth)
return auth

def set_auth(self, me):
self.auth = me
def close_pools(self):
self.pool.close()
self.session_manager.pool.close()


class create_auth():
def __init__(self, option={}, api: Optional[start] = None, init=False) -> None:
def __init__(self, session_manager2: api_helper.session_manager, option={}, init=False, pool=None, ) -> None:
self.id = option.get("id")
self.username = option.get("username")
if not self.username:
Expand All @@ -513,16 +517,15 @@ def __init__(self, option={}, api: Optional[start] = None, init=False) -> None:
self.archived_stories = {}
self.mass_messages = []
self.paid_content = {}
self.session_manager = api_helper.session_manager(
session_rules=session_rules, session_retry_rules=session_retry_rules)
session_manager2 = copy.copy(session_manager2)
self.session_manager = session_manager2
self.pool = pool
self.auth_details: Optional[auth_details] = None
self.profile_directory = option.get("profile_directory", "")
self.guest = False
self.active = False
self.errors: list[error_details] = []
self.extras = {}
if api:
api.auths.append(self)
valid_counts = ["chatMessagesCount"]
args = [self.username, False, False]
link_info = links(*args).full
Expand Down Expand Up @@ -725,14 +728,18 @@ def get_subscriptions(self, resume=None, refresh=True, identifiers: list = [], e
results = []
if self.isPerformer:
temp_session_manager = self.session_manager
temp_pool = self.pool
delattr(self, "session_manager")
delattr(self, "pool")
json_authed = jsonpickle.encode(
self, unpicklable=False)
json_authed = jsonpickle.decode(json_authed)
self.session_manager = temp_session_manager
self.pool = temp_pool
json_authed = json_authed | self.get_user(self.username)

subscription = create_subscription(json_authed)
subscription.authed = self
subscription.session_manager = self.session_manager
subscription = [subscription]
results.append(subscription)
Expand All @@ -756,11 +763,11 @@ def multi(item):
subscription2 = self.get_user(subscription["username"])
subscription = subscription | subscription2
subscription = create_subscription(subscription)
subscription.auth_id = self.id
subscription.authed = self
subscription.link = f"https://onlyfans.com/{subscription.username}"
valid_subscriptions.append(subscription)
return valid_subscriptions
pool = api_helper.multiprocessing()
pool = self.pool
# offset_array= offset_array[:16]
results += pool.starmap(multi, product(
offset_array))
Expand Down
6 changes: 3 additions & 3 deletions datascraper/main_datascraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# return text


def start_datascraper(json_config: dict, site_name_lower: str, api: Optional[OnlyFans.start] = None, webhooks=True):
def start_datascraper(json_config: dict, site_name_lower: str, api: Optional[OnlyFans.start] = None, webhooks=True)->Optional[OnlyFans.start]:
json_settings = json_config["settings"]
json_webhooks = json_settings["webhooks"]
json_sites = json_config["supported"]
Expand All @@ -48,13 +48,13 @@ def start_datascraper(json_config: dict, site_name_lower: str, api: Optional[Onl
original_sessions = [x for x in original_sessions if x]
if not original_sessions:
print("Unable to create session")
return False
return None
archive_time = timeit.default_timer()
if site_name_lower == "onlyfans":
site_name = "OnlyFans"
module = m_onlyfans
if not api:
api = OnlyFans.start()
api = OnlyFans.start(max_threads=json_settings["max_threads"])
api = main_helper.process_profiles(
json_settings, original_sessions, site_name, api)
print
Expand Down
10 changes: 5 additions & 5 deletions extras/OFRenamer/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import traceback


def fix_directories(posts, all_files, database_session: scoped_session, folder, site_name, parent_type, api_type, username, base_directory, json_settings):
def fix_directories(api,posts, all_files, database_session: scoped_session, folder, site_name, parent_type, api_type, username, base_directory, json_settings):
new_directories = []

def fix_directories(post: api_table, media_db: list[media_table]):
Expand Down Expand Up @@ -111,9 +111,9 @@ def fix_directories(post: api_table, media_db: list[media_table]):
return delete_rows
result = database_session.query(folder.media_table)
media_db = result.all()
pool = multiprocessing()
pool = api.pool
delete_rows = pool.starmap(fix_directories, product(
posts, [media_db]))
posts, [media_db]))
delete_rows = list(chain(*delete_rows))
for delete_row in delete_rows:
database_session.query(folder.media_table).filter(
Expand All @@ -123,7 +123,7 @@ def fix_directories(post: api_table, media_db: list[media_table]):
return posts, new_directories


def start(Session, parent_type, api_type, api_path, site_name, subscription, folder, json_settings):
def start(api,Session, parent_type, api_type, api_path, site_name, subscription, folder, json_settings):
api_table = folder.api_table
media_table = folder.media_table
database_session = Session()
Expand Down Expand Up @@ -162,7 +162,7 @@ def start(Session, parent_type, api_type, api_path, site_name, subscription, fol
all_files.extend(x)

fixed, new_directories = fix_directories(
result, all_files, database_session, folder, site_name, parent_type, api_type, username, root_directory, json_settings)
api,result, all_files, database_session, folder, site_name, parent_type, api_type, username, root_directory, json_settings)
database_session.close()
return metadata

Expand Down
5 changes: 3 additions & 2 deletions helpers/db_helper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import sqlalchemy
from sqlalchemy.engine.base import Engine
from sqlalchemy.orm.session import sessionmaker
from sqlalchemy.orm.session import Session, sessionmaker
from sqlalchemy.orm import scoped_session
from alembic.config import Config
from alembic import command
Expand All @@ -17,6 +17,7 @@ def create_database_session(connection_info, connection_type="sqlite:///", autoc
kwargs["pool_size"] = pool_size
kwargs["pool_pre_ping"] = True
kwargs["max_overflow"] = -1
kwargs["isolation_level"] = "READ COMMITTED"

engine = sqlalchemy.create_engine(
f'{connection_type}{connection_info}?charset=utf8mb4', **kwargs)
Expand Down Expand Up @@ -72,7 +73,7 @@ def create_auth_array(item):
return auth_array


def get_or_create(session, model, defaults=None, fbkwargs={}):
def get_or_create(session: Session, model, defaults=None, fbkwargs={}):
instance = session.query(model).filter_by(**fbkwargs).one_or_none()
if instance:
return instance, True
Expand Down
3 changes: 0 additions & 3 deletions helpers/main_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import platform
import re
from datetime import datetime
import time
from itertools import chain, zip_longest, groupby
import psutil
import shutil
Expand All @@ -29,8 +28,6 @@
import classes.prepare_webhooks as prepare_webhooks
from mergedeep import merge, Strategy
import helpers.db_helper as db_helper
from alembic.config import Config
from alembic import command
import traceback
json_global_settings = None
min_drive_space = 0
Expand Down
15 changes: 6 additions & 9 deletions modules/onlyfans.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
from helpers.main_helper import choose_option, download_session, export_data, import_archive
import extras.OFLogin.start_ofl as oflogin

multiprocessing = main_helper.multiprocessing

site_name = "OnlyFans"
json_config = None
json_global_settings = None
Expand Down Expand Up @@ -340,7 +338,7 @@ def paid_content_scraper(api: start, identifiers=[]):
authed, new_metadata, formatted_directories, subscription, api_type, api_path, metadata_path, site_name)
parent_type = ""
new_metadata = new_metadata + old_metadata
w = process_metadata(metadata_path, formatted_directories, new_metadata,
w = process_metadata(api, metadata_path, formatted_directories, new_metadata,
site_name, parent_type, api_path, subscription, delete_metadatas)
print

Expand Down Expand Up @@ -554,7 +552,7 @@ def process_legacy_metadata(authed: create_auth, new_metadata_set, formatted_dir
return final_set, delete_metadatas


def process_metadata(archive_path: str, formatted_directories: dict, new_metadata_object, site_name, parent_type, api_path, subscription, delete_metadatas):
def process_metadata(api, archive_path: str, formatted_directories: dict, new_metadata_object, site_name, parent_type, api_path, subscription, delete_metadatas):
print
Session, api_type, folder = main_helper.export_sqlite(
archive_path, new_metadata_object, parent_type)
Expand All @@ -568,7 +566,7 @@ def process_metadata(archive_path: str, formatted_directories: dict, new_metadat
if json_global_settings["helpers"]["renamer"]:
print("Renaming files.")
new_metadata_object = ofrenamer.start(
Session, parent_type, api_type, api_path, site_name, subscription, folder, json_settings)
api, Session, parent_type, api_type, api_path, site_name, subscription, folder, json_settings)
if delete_legacy_metadata:
for old_metadata in delete_metadatas:
if os.path.exists(old_metadata):
Expand Down Expand Up @@ -627,7 +625,7 @@ def prepare_scraper(authed: create_auth, site_name, item):
media_type = api_array["media_types"]
username = api_array["username"]
master_set = []
pool = multiprocessing()
pool = authed.pool
mandatory_directories = {}
mandatory_directories["profile_directory"] = profile_directory
mandatory_directories["download_directory"] = download_directory
Expand Down Expand Up @@ -696,13 +694,12 @@ def prepare_scraper(authed: create_auth, site_name, item):
new_metadata = new_metadata + old_metadata
subscription.set_scraped(api_type, new_metadata)
print
w = process_metadata(metadata_path, formatted_directories, new_metadata,
w = process_metadata(authed, metadata_path, formatted_directories, new_metadata,
site_name, parent_type, api_path, subscription, delete_metadatas)
print
else:
print("No "+api_type+" Found.")
delattr(subscription.scraped, api_type)

return True


Expand Down Expand Up @@ -1124,7 +1121,7 @@ def __init__(self, authed: create_auth = None, subscription=None) -> None:
d_session = download_session()
d_session.start(unit='B', unit_scale=True,
miniters=1)
pool = multiprocessing()
pool = authed.session_manager.pool
pool.starmap(self.prepare_download, product(
media_set, [authed], [api_type], [subscription], [d_session]))
d_session.close()
Expand Down
7 changes: 5 additions & 2 deletions start_ofd.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
json_sites = json_config["supported"]
domain = json_settings["auto_site_choice"]
string, site_names = main_helper.module_chooser(domain, json_sites)

# logging.basicConfig(level=logging.DEBUG, format="%(message)s")
while True:
try:
Expand All @@ -40,7 +40,10 @@
x = int(x)
site_name = site_names[x]
site_name_lower = site_name.lower()
apis = main_datascraper.start_datascraper(json_config, site_name_lower)
api = main_datascraper.start_datascraper(
json_config, site_name_lower)
if api:
api.close_pools()
if exit_on_completion:
print("Now exiting.")
exit(0)
Expand Down

0 comments on commit aa8e7cf

Please sign in to comment.