diff --git a/.gitignore b/.gitignore index be3cbb2b..3a797e99 100644 --- a/.gitignore +++ b/.gitignore @@ -183,3 +183,6 @@ sessions logs runbot.bat .DS_Store +/google_trends.dat +/google_trends.dir +/google_trends.bak diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 00000000..debf80d0 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,14 @@ + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/main.xml b/.idea/runConfigurations/main.xml new file mode 100644 index 00000000..179919aa --- /dev/null +++ b/.idea/runConfigurations/main.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/main_headless.xml b/.idea/runConfigurations/main_headless.xml new file mode 100644 index 00000000..3dda38e8 --- /dev/null +++ b/.idea/runConfigurations/main_headless.xml @@ -0,0 +1,24 @@ + + + + + \ No newline at end of file diff --git a/README.md b/README.md index 8621dcf0..73a33f46 100644 --- a/README.md +++ b/README.md @@ -124,3 +124,4 @@ - [ ] Complete "Read To Earn" (30 pts) - [ ] Setup flags for mobile/desktop search only +- [ ] Provide Windows Task Scheduler config \ No newline at end of file diff --git a/config.yaml b/config.yaml index 508b63e9..4f998776 100644 --- a/config.yaml +++ b/config.yaml @@ -1,4 +1,9 @@ # config.yaml apprise: + summary: always urls: - 'discord://WebhookID/WebhookToken' # Replace with your actual Apprise service URLs +attempts: + base_delay_in_seconds: 60 + max: 6 + strategy: constant diff --git a/main.py b/main.py index b9ec158f..fb16acdb 100644 --- a/main.py +++ b/main.py @@ -1,316 +1,329 @@ -import argparse -import atexit -import csv -import json -import logging -import logging.handlers as handlers -import random -import re -import sys -import time -from datetime import datetime -from pathlib import Path - -import psutil - -from src import ( - Browser, - DailySet, - Login, - MorePromotions, - PunchCards, - Searches, -) -from src.loggingColoredFormatter import ColoredFormatter -from src.utils import Utils - -POINTS_COUNTER = 0 - - -def main(): - args = argumentParser() - setupLogging() - loadedAccounts = setupAccounts() - # Register the cleanup function to be called on script exit - atexit.register(cleanupChromeProcesses) - - # Load previous day's points data - previous_points_data = load_previous_points_data() - - for currentAccount in loadedAccounts: - try: - earned_points = executeBot(currentAccount, args) - account_name = currentAccount.get("username", "") - previous_points = previous_points_data.get(account_name, 0) - - # Calculate the difference in points from the prior day - points_difference = earned_points - previous_points - - # Append the daily points and points difference to CSV and Excel - log_daily_points_to_csv(account_name, earned_points, points_difference) - - # Update the previous day's points data - previous_points_data[account_name] = earned_points - - logging.info(f"[POINTS] Data for '{account_name}' appended to the file.") - except Exception as e: - Utils.send_notification("⚠️ Error occurred, please check the log", str(e)) - logging.exception(f"{e.__class__.__name__}: {e}") - - # Save the current day's points data for the next day in the "logs" folder - save_previous_points_data(previous_points_data) - logging.info("[POINTS] Data saved for the next day.") - - -def log_daily_points_to_csv(date, earned_points, points_difference): - logs_directory = Path(__file__).resolve().parent / "logs" - csv_filename = logs_directory / "points_data.csv" - - # Create a new row with the date, daily points, and points difference - date = datetime.now().strftime("%Y-%m-%d") - new_row = { - "Date": date, - "Earned Points": earned_points, - "Points Difference": points_difference, - } - - fieldnames = ["Date", "Earned Points", "Points Difference"] - is_new_file = not csv_filename.exists() - - with open(csv_filename, mode="a", newline="") as file: - writer = csv.DictWriter(file, fieldnames=fieldnames) - - if is_new_file: - writer.writeheader() - - writer.writerow(new_row) - - -def setupLogging(): - format = "%(asctime)s [%(levelname)s] %(message)s" - terminalHandler = logging.StreamHandler(sys.stdout) - terminalHandler.setFormatter(ColoredFormatter(format)) - - logs_directory = Path(__file__).resolve().parent / "logs" - logs_directory.mkdir(parents=True, exist_ok=True) - - logging.basicConfig( - level=logging.INFO, - format=format, - handlers=[ - handlers.TimedRotatingFileHandler( - logs_directory / "activity.log", - when="midnight", - interval=1, - backupCount=2, - encoding="utf-8", - ), - terminalHandler, - ], - ) - - -def cleanupChromeProcesses(): - # Use psutil to find and terminate Chrome processes - for process in psutil.process_iter(["pid", "name"]): - if process.info["name"] == "chrome.exe": - try: - psutil.Process(process.info["pid"]).terminate() - except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): - pass - - -def argumentParser() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="MS Rewards Farmer") - parser.add_argument( - "-v", "--visible", action="store_true", help="Optional: Visible browser" - ) - parser.add_argument( - "-l", "--lang", type=str, default=None, help="Optional: Language (ex: en)" - ) - parser.add_argument( - "-g", "--geo", type=str, default=None, help="Optional: Geolocation (ex: US)" - ) - parser.add_argument( - "-p", - "--proxy", - type=str, - default=None, - help="Optional: Global Proxy (ex: http://user:pass@host:port)", - ) - parser.add_argument( - "-vn", - "--verbosenotifs", - action="store_true", - help="Optional: Send all the logs to the notification service", - ) - parser.add_argument( - "-cv", - "--chromeversion", - type=int, - default=None, - help="Optional: Set fixed Chrome version (ex. 118)", - ) - return parser.parse_args() - - -def setupAccounts() -> list: - """Sets up and validates a list of accounts loaded from 'accounts.json'.""" - - def validEmail(email: str) -> bool: - """Validate Email.""" - pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$" - return bool(re.match(pattern, email)) - - accountPath = Path(__file__).resolve().parent / "accounts.json" - if not accountPath.exists(): - accountPath.write_text( - json.dumps( - [{"username": "Your Email", "password": "Your Password"}], indent=4 - ), - encoding="utf-8", - ) - noAccountsNotice = """ - [ACCOUNT] Accounts credential file "accounts.json" not found. - [ACCOUNT] A new file has been created, please edit with your credentials and save. - """ - logging.warning(noAccountsNotice) - exit() - loadedAccounts = json.loads(accountPath.read_text(encoding="utf-8")) - for account in loadedAccounts: - if not validEmail(account["username"]): - logging.error(f"[CREDENTIALS] Wrong Email Address: '{account['username']}'") - exit() - random.shuffle(loadedAccounts) - return loadedAccounts - - -def executeBot(currentAccount, args: argparse.Namespace): - logging.info( - f"********************{currentAccount.get('username', '')}********************" - ) - - accountPointsCounter = 0 - remainingSearches = 0 - remainingSearchesM = 0 - startingPoints = 0 - - with Browser(mobile=False, account=currentAccount, args=args) as desktopBrowser: - utils = desktopBrowser.utils - accountPointsCounter = Login(desktopBrowser).login() - startingPoints = accountPointsCounter - if startingPoints == "Locked": - Utils.send_notification("🚫 Account is Locked", currentAccount["username"]) - return 0 - if startingPoints == "Verify": - Utils.send_notification("❗️ Account needs to be verified", currentAccount["username"]) - return 0 - logging.info( - f"[POINTS] You have {utils.formatNumber(accountPointsCounter)} points on your account" - ) - DailySet(desktopBrowser).completeDailySet() - PunchCards(desktopBrowser).completePunchCards() - MorePromotions(desktopBrowser).completeMorePromotions() - # VersusGame(desktopBrowser).completeVersusGame() - ( - remainingSearches, - remainingSearchesM, - ) = utils.getRemainingSearches() - - # Introduce random pauses before and after searches - pause_before_search = random.uniform( - 11.0, 15.0 - ) # Random pause between 11 to 15 seconds - time.sleep(pause_before_search) - - if remainingSearches != 0: - accountPointsCounter = Searches(desktopBrowser).bingSearches( - remainingSearches - ) - - pause_after_search = random.uniform( - 11.0, 15.0 - ) # Random pause between 11 to 15 seconds - time.sleep(pause_after_search) - - utils.goHome() - goalPoints = utils.getGoalPoints() - goalTitle = utils.getGoalTitle() - desktopBrowser.closeBrowser() - - if remainingSearchesM != 0: - desktopBrowser.closeBrowser() - with Browser(mobile=True, account=currentAccount, args=args) as mobileBrowser: - utils = mobileBrowser.utils - accountPointsCounter = Login(mobileBrowser).login() - accountPointsCounter = Searches(mobileBrowser).bingSearches( - remainingSearchesM - ) - - utils.goHome() - goalPoints = utils.getGoalPoints() - goalTitle = utils.getGoalTitle() - mobileBrowser.closeBrowser() - - logging.info( - f"[POINTS] You have earned {utils.formatNumber(accountPointsCounter - startingPoints)} points today !" - ) - logging.info( - f"[POINTS] You are now at {utils.formatNumber(accountPointsCounter)} points !" - ) - goalNotifier = "" - if goalPoints > 0: - logging.info( - f"[POINTS] You are now at {(utils.formatNumber((accountPointsCounter / goalPoints) * 100))}% of your goal ({goalTitle}) !\n" - ) - goalNotifier = f"🎯 Goal reached: {(utils.formatNumber((accountPointsCounter / goalPoints) * 100))}% ({goalTitle})" - - Utils.send_notification( - "Daily Points Update", - "\n".join( - [ - f"👤 Account: {currentAccount.get('username')}", - f"⭐️ Points earned today: {utils.formatNumber(accountPointsCounter - startingPoints)}", - f"💰 Total points: {utils.formatNumber(accountPointsCounter)}", - goalNotifier, - ] - ), - ) - - return accountPointsCounter - - -def export_points_to_csv(points_data): - logs_directory = Path(__file__).resolve().parent / "logs" - csv_filename = logs_directory / "points_data.csv" - with open(csv_filename, mode="a", newline="") as file: # Use "a" mode for append - fieldnames = ["Account", "Earned Points", "Points Difference"] - writer = csv.DictWriter(file, fieldnames=fieldnames) - - # Check if the file is empty, and if so, write the header row - if file.tell() == 0: - writer.writeheader() - - for data in points_data: - writer.writerow(data) - - -# Define a function to load the previous day's points data from a file in the "logs" folder -def load_previous_points_data(): - logs_directory = Path(__file__).resolve().parent / "logs" - try: - with open(logs_directory / "previous_points_data.json", "r") as file: - return json.load(file) - except FileNotFoundError: - return {} - - -# Define a function to save the current day's points data for the next day in the "logs" folder -def save_previous_points_data(data): - logs_directory = Path(__file__).resolve().parent / "logs" - with open(logs_directory / "previous_points_data.json", "w") as file: - json.dump(data, file, indent=4) - - -if __name__ == "__main__": - main() +import argparse +import csv +import json +import logging +import logging.config +import logging.handlers as handlers +import random +import re +import sys +import time +from datetime import datetime +from enum import Enum, auto + +from src import ( + Browser, + Login, + MorePromotions, + PunchCards, + Searches, + DailySet, + Account, +) +from src.loggingColoredFormatter import ColoredFormatter +from src.utils import Utils, RemainingSearches + + +def main(): + args = argumentParser() + Utils.args = args + setupLogging() + loadedAccounts = setupAccounts() + + # Load previous day's points data + previous_points_data = load_previous_points_data() + + for currentAccount in loadedAccounts: + try: + earned_points = executeBot(currentAccount, args) + except Exception as e: + logging.error("", exc_info=True) + Utils.sendNotification( + f"⚠️ Error executing {currentAccount.username}, please check the log", + f"{e}\n{e.__traceback__}", + ) + continue + previous_points = previous_points_data.get(currentAccount.username, 0) + + # Calculate the difference in points from the prior day + points_difference = earned_points - previous_points + + # Append the daily points and points difference to CSV and Excel + log_daily_points_to_csv(earned_points, points_difference) + + # Update the previous day's points data + previous_points_data[currentAccount.username] = earned_points + + logging.info( + f"[POINTS] Data for '{currentAccount.username}' appended to the file." + ) + + # Save the current day's points data for the next day in the "logs" folder + save_previous_points_data(previous_points_data) + logging.info("[POINTS] Data saved for the next day.") + + +def log_daily_points_to_csv(earned_points, points_difference): + logs_directory = Utils.getProjectRoot() / "logs" + csv_filename = logs_directory / "points_data.csv" + + # Create a new row with the date, daily points, and points difference + date = datetime.now().strftime("%Y-%m-%d") + new_row = { + "Date": date, + "Earned Points": earned_points, + "Points Difference": points_difference, + } + + fieldnames = ["Date", "Earned Points", "Points Difference"] + is_new_file = not csv_filename.exists() + + with open(csv_filename, mode="a", newline="") as file: + writer = csv.DictWriter(file, fieldnames=fieldnames) + + if is_new_file: + writer.writeheader() + + writer.writerow(new_row) + + +def setupLogging(): + _format = "%(asctime)s [%(levelname)s] %(message)s" + terminalHandler = logging.StreamHandler(sys.stdout) + terminalHandler.setFormatter(ColoredFormatter(_format)) + + logs_directory = Utils.getProjectRoot() / "logs" + logs_directory.mkdir(parents=True, exist_ok=True) + + # so only our code is logged if level=logging.DEBUG or finer + # if not working see https://stackoverflow.com/a/48891485/4164390 + logging.config.dictConfig( + { + "version": 1, + "disable_existing_loggers": True, + } + ) + logging.basicConfig( + level=logging.DEBUG, + format=_format, + handlers=[ + handlers.TimedRotatingFileHandler( + logs_directory / "activity.log", + when="midnight", + interval=1, + backupCount=2, + encoding="utf-8", + ), + terminalHandler, + ], + ) + + +def argumentParser() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="MS Rewards Farmer") + parser.add_argument( + "-v", "--visible", action="store_true", help="Optional: Visible browser" + ) + parser.add_argument( + "-l", "--lang", type=str, default=None, help="Optional: Language (ex: en)" + ) + parser.add_argument( + "-g", "--geo", type=str, default=None, help="Optional: Geolocation (ex: US)" + ) + parser.add_argument( + "-p", + "--proxy", + type=str, + default=None, + help="Optional: Global Proxy (ex: http://user:pass@host:port)", + ) + parser.add_argument( + "-vn", + "--verbosenotifs", + action="store_true", + help="Optional: Send all the logs to the notification service", + ) + parser.add_argument( + "-cv", + "--chromeversion", + type=int, + default=None, + help="Optional: Set fixed Chrome version (ex. 118)", + ) + parser.add_argument( + "-da", + "--disable-apprise", + action="store_true", + help="Optional: Disable Apprise, overrides config.yaml, useful when developing", + ) + return parser.parse_args() + + +def setupAccounts() -> list[Account]: + """Sets up and validates a list of accounts loaded from 'accounts.json'.""" + + def validEmail(email: str) -> bool: + """Validate Email.""" + pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$" + return bool(re.match(pattern, email)) + + accountPath = Utils.getProjectRoot() / "accounts.json" + if not accountPath.exists(): + accountPath.write_text( + json.dumps( + [{"username": "Your Email", "password": "Your Password"}], indent=4 + ), + encoding="utf-8", + ) + noAccountsNotice = """ + [ACCOUNT] Accounts credential file "accounts.json" not found. + [ACCOUNT] A new file has been created, please edit with your credentials and save. + """ + logging.warning(noAccountsNotice) + exit(1) + loadedAccounts: list[Account] = [] + for rawAccount in json.loads(accountPath.read_text(encoding="utf-8")): + account: Account = Account(**rawAccount) + if not validEmail(account.username): + logging.warning( + f"[CREDENTIALS] Invalid email: {account.username}, skipping this account" + ) + continue + loadedAccounts.append(account) + random.shuffle(loadedAccounts) + return loadedAccounts + + +class AppriseSummary(Enum): + always = auto() + on_error = auto() + never = auto() + + +def executeBot(currentAccount: Account, args: argparse.Namespace): + logging.info(f"********************{currentAccount.username}********************") + + accountPointsCounter: int + remainingSearches: RemainingSearches + startingPoints: int + + with Browser(mobile=False, account=currentAccount, args=args) as desktopBrowser: + utils = desktopBrowser.utils + startingPoints = accountPointsCounter = Login(desktopBrowser, args).login() + logging.info( + f"[POINTS] You have {utils.formatNumber(startingPoints)} points on your account" + ) + DailySet(desktopBrowser).completeDailySet() + PunchCards(desktopBrowser).completePunchCards() + MorePromotions(desktopBrowser).completeMorePromotions() + # VersusGame(desktopBrowser).completeVersusGame() + remainingSearches = utils.getRemainingSearches() + + if remainingSearches.desktop != 0: + accountPointsCounter = Searches( + desktopBrowser, remainingSearches + ).bingSearches(remainingSearches.desktop) + + goalPoints = utils.getGoalPoints() + goalTitle = utils.getGoalTitle() + + time.sleep(7.5) # give time for browser to close, probably can be more fine-tuned + + if remainingSearches.mobile != 0: + with Browser(mobile=True, account=currentAccount, args=args) as mobileBrowser: + utils = mobileBrowser.utils + Login(mobileBrowser, args).login() + accountPointsCounter = Searches( + mobileBrowser, remainingSearches + ).bingSearches(remainingSearches.mobile) + + goalPoints = utils.getGoalPoints() + goalTitle = utils.getGoalTitle() + + remainingSearches = utils.getRemainingSearches() + + logging.info( + f"[POINTS] You have earned {utils.formatNumber(accountPointsCounter - startingPoints)} points this run !" + ) + logging.info( + f"[POINTS] You are now at {utils.formatNumber(accountPointsCounter)} points !" + ) + appriseSummary = AppriseSummary[ + utils.config.get("apprise", {}).get("summary", AppriseSummary.always.name) + ] + if appriseSummary == AppriseSummary.always: + goalNotifier = "" + if goalPoints > 0: + logging.info( + f"[POINTS] You are now at {(utils.formatNumber((accountPointsCounter / goalPoints) * 100))}% of your " + f"goal ({goalTitle}) !" + ) + goalNotifier = ( + f"🎯 Goal reached: {(utils.formatNumber((accountPointsCounter / goalPoints) * 100))}%" + f" ({goalTitle})" + ) + + Utils.sendNotification( + "Daily Points Update", + "\n".join( + [ + f"👤 Account: {currentAccount.username}", + f"⭐️ Points earned today: {utils.formatNumber(accountPointsCounter - startingPoints)}", + f"💰 Total points: {utils.formatNumber(accountPointsCounter)}", + goalNotifier, + ] + ), + ) + elif appriseSummary == AppriseSummary.on_error: + if remainingSearches.getTotal() > 0: + Utils.sendNotification( + "Error: remaining searches", + f"account username: {currentAccount.username}, {remainingSearches}", + ) + elif appriseSummary == AppriseSummary.never: + pass + + return accountPointsCounter + + +def export_points_to_csv(points_data): + logs_directory = Utils.getProjectRoot() / "logs" + csv_filename = logs_directory / "points_data.csv" + with open(csv_filename, mode="a", newline="") as file: # Use "a" mode for append + fieldnames = ["Account", "Earned Points", "Points Difference"] + writer = csv.DictWriter(file, fieldnames=fieldnames) + + # Check if the file is empty, and if so, write the header row + if file.tell() == 0: + writer.writeheader() + + for data in points_data: + writer.writerow(data) + + +# Define a function to load the previous day's points data from a file in the "logs" folder +def load_previous_points_data(): + try: + with open( + Utils.getProjectRoot() / "logs" / "previous_points_data.json", "r" + ) as file: + return json.load(file) + except FileNotFoundError: + return {} + + +# Define a function to save the current day's points data for the next day in the "logs" folder +def save_previous_points_data(data): + logs_directory = Utils.getProjectRoot() / "logs" + with open(logs_directory / "previous_points_data.json", "w") as file: + json.dump(data, file, indent=4) + + +if __name__ == "__main__": + try: + main() + except Exception as e: + logging.exception("") + Utils.sendNotification( + "⚠️ Error occurred, please check the log", f"{e}\n{e.__traceback__}" + ) diff --git a/requirements.txt b/requirements.txt index 042c60a5..65b37b60 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,6 @@ selenium-wire numpy>=1.22.2 # not directly required, pinned by Snyk to avoid a vulnerability setuptools>=69.0.2 # not directly required, pinned by Snyk to avoid a vulnerability psutil -blinker==1.7.0 #prevents issues on newer versions +blinker==1.7.0 # prevents issues on newer versions apprise pyyaml diff --git a/src/__init__.py b/src/__init__.py index 095110f2..057991c6 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -1,3 +1,4 @@ +from .account import Account from .browser import Browser from .dailySet import DailySet from .login import Login diff --git a/src/account.py b/src/account.py new file mode 100644 index 00000000..35773dd6 --- /dev/null +++ b/src/account.py @@ -0,0 +1,8 @@ +from dataclasses import dataclass + + +@dataclass +class Account: + username: str + password: str + proxy: str | None = None diff --git a/src/activities.py b/src/activities.py index 47bc14e8..5aef119c 100644 --- a/src/activities.py +++ b/src/activities.py @@ -4,7 +4,6 @@ from selenium.webdriver.common.by import By from src.browser import Browser -from src.utils import Utils class Activities: @@ -18,7 +17,7 @@ def openDailySetActivity(self, cardId: int): By.XPATH, f'//*[@id="daily-sets"]/mee-card-group[1]/div/mee-card[{cardId}]/div/card-content/mee-rewards-daily-set-item-content/div/a', ).click() - self.browser.utils.switchToNewTab(8) + self.browser.utils.switchToNewTab(timeToWait=8) def openMorePromotionsActivity(self, cardId: int): # Open the More Promotions activity for the given cardId @@ -26,29 +25,28 @@ def openMorePromotionsActivity(self, cardId: int): By.XPATH, f'//*[@id="more-activities"]/div/mee-card[{cardId}]/div/card-content/mee-rewards-more-activities-card-item/div/a', ).click() - self.browser.utils.switchToNewTab(8) + self.browser.utils.switchToNewTab(timeToWait=8) def completeSearch(self): # Simulate completing a search activity - time.sleep(Utils.randomSeconds(10, 15)) + time.sleep(random.randint(10, 15)) self.browser.utils.closeCurrentTab() def completeSurvey(self): # Simulate completing a survey activity + # noinspection SpellCheckingInspection self.webdriver.find_element(By.ID, f"btoption{random.randint(0, 1)}").click() - time.sleep(Utils.randomSeconds(10, 15)) + time.sleep(random.randint(10, 15)) self.browser.utils.closeCurrentTab() def completeQuiz(self): # Simulate completing a quiz activity - if not self.browser.utils.waitUntilQuizLoads(): - self.browser.utils.resetTabs() - return - self.webdriver.find_element(By.XPATH, '//*[@id="rqStartQuiz"]').click() + startQuiz = self.browser.utils.waitUntilQuizLoads() + startQuiz.click() self.browser.utils.waitUntilVisible( By.XPATH, '//*[@id="currentQuestionContainer"]/div/div[1]', 5 ) - time.sleep(Utils.randomSeconds(10, 15)) + time.sleep(random.randint(10, 15)) numberOfQuestions = self.webdriver.execute_script( "return _w.rewardsQuizRenderInfo.maxQuestions" ) @@ -66,10 +64,8 @@ def completeQuiz(self): answers.append(f"rqAnswerOption{i}") for answer in answers: self.webdriver.find_element(By.ID, answer).click() - time.sleep(Utils.randomSeconds(10, 15)) - if not self.browser.utils.waitUntilQuestionRefresh(): - self.browser.utils.resetTabs() - return + time.sleep(random.randint(10, 15)) + self.browser.utils.waitUntilQuestionRefresh() elif numberOfOptions in [2, 3, 4]: correctOption = self.webdriver.execute_script( "return _w.rewardsQuizRenderInfo.correctAnswer" @@ -82,14 +78,13 @@ def completeQuiz(self): == correctOption ): self.webdriver.find_element(By.ID, f"rqAnswerOption{i}").click() - time.sleep(Utils.randomSeconds(10, 15)) - if not self.browser.utils.waitUntilQuestionRefresh(): - self.browser.utils.resetTabs() - return + time.sleep(random.randint(10, 15)) + + self.browser.utils.waitUntilQuestionRefresh() break if question + 1 != numberOfQuestions: - time.sleep(Utils.randomSeconds(10, 15)) - time.sleep(Utils.randomSeconds(10, 15)) + time.sleep(random.randint(10, 15)) + time.sleep(random.randint(10, 15)) self.browser.utils.closeCurrentTab() def completeABC(self): @@ -102,22 +97,20 @@ def completeABC(self): self.webdriver.find_element( By.ID, f"questionOptionChoice{question}{random.randint(0, 2)}" ).click() - time.sleep(Utils.randomSeconds(10, 15)) + time.sleep(random.randint(10, 15)) self.webdriver.find_element(By.ID, f"nextQuestionbtn{question}").click() - time.sleep(Utils.randomSeconds(10, 15)) - time.sleep(Utils.randomSeconds(1, 7)) + time.sleep(random.randint(10, 15)) + time.sleep(random.randint(1, 7)) self.browser.utils.closeCurrentTab() def completeThisOrThat(self): # Simulate completing a This or That activity - if not self.browser.utils.waitUntilQuizLoads(): - self.browser.utils.resetTabs() - return - self.webdriver.find_element(By.XPATH, '//*[@id="rqStartQuiz"]').click() + startQuiz = self.browser.utils.waitUntilQuizLoads() + startQuiz.click() self.browser.utils.waitUntilVisible( By.XPATH, '//*[@id="currentQuestionContainer"]/div/div[1]', 10 ) - time.sleep(Utils.randomSeconds(10, 15)) + time.sleep(random.randint(10, 15)) for _ in range(10): correctAnswerCode = self.webdriver.execute_script( "return _w.rewardsQuizRenderInfo.correctAnswer" @@ -126,12 +119,12 @@ def completeThisOrThat(self): answer2, answer2Code = self.getAnswerAndCode("rqAnswerOption1") if answer1Code == correctAnswerCode: answer1.click() - time.sleep(Utils.randomSeconds(10, 15)) + time.sleep(random.randint(10, 15)) elif answer2Code == correctAnswerCode: answer2.click() - time.sleep(Utils.randomSeconds(10, 15)) + time.sleep(random.randint(10, 15)) - time.sleep(Utils.randomSeconds(10, 15)) + time.sleep(random.randint(10, 15)) self.browser.utils.closeCurrentTab() def getAnswerAndCode(self, answerId: str) -> tuple: @@ -145,4 +138,5 @@ def getAnswerAndCode(self, answerId: str) -> tuple: self.browser.utils.getAnswerCode(answerEncodeKey, answerTitle), ) else: - return (answer, None) + # todo - throw exception? + return answer, None diff --git a/src/browser.py b/src/browser.py index fa838c1e..0bf2e9dc 100644 --- a/src/browser.py +++ b/src/browser.py @@ -1,14 +1,18 @@ -import contextlib +import argparse import logging import random from pathlib import Path -from typing import Any +from types import TracebackType +from typing import Any, Type import ipapi import seleniumwire.undetected_chromedriver as webdriver +import undetected_chromedriver +from ipapi.exceptions import RateLimited from selenium.webdriver import ChromeOptions from selenium.webdriver.chrome.webdriver import WebDriver +from src import Account from src.userAgentGenerator import GenerateUserAgent from src.utils import Utils @@ -16,19 +20,24 @@ class Browser: """WebDriver wrapper class.""" - def __init__(self, mobile: bool, account, args: Any) -> None: + webdriver: undetected_chromedriver.Chrome + + def __init__( + self, mobile: bool, account: Account, args: argparse.Namespace + ) -> None: # Initialize browser instance + logging.debug("in __init__") self.mobile = mobile self.browserType = "mobile" if mobile else "desktop" self.headless = not args.visible - self.username = account["username"] - self.password = account["password"] + self.username = account.username + self.password = account.password self.localeLang, self.localeGeo = self.getCCodeLang(args.lang, args.geo) self.proxy = None if args.proxy: self.proxy = args.proxy - elif account.get("proxy"): - self.proxy = account["proxy"] + elif account.proxy: + self.proxy = account.proxy self.userDataDir = self.setupProfiles() self.browserConfig = Utils.getBrowserConfig(self.userDataDir) ( @@ -41,25 +50,31 @@ def __init__(self, mobile: bool, account, args: Any) -> None: Utils.saveBrowserConfig(self.userDataDir, self.browserConfig) self.webdriver = self.browserSetup() self.utils = Utils(self.webdriver) + logging.debug("out __init__") def __enter__(self) -> "Browser": + logging.debug("in __enter__") return self - def __exit__(self, *args: Any) -> None: + def __exit__( + self, + exc_type: Type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: # Cleanup actions when exiting the browser context - self.closeBrowser() - - def closeBrowser(self) -> None: - """Perform actions to close the browser cleanly.""" - # Close the web browser - with contextlib.suppress(Exception): - self.webdriver.close() - + logging.debug( + f"in __exit__ exc_type={exc_type} exc_value={exc_value} traceback={traceback}" + ) + # turns out close is needed for undetected_chromedriver + self.webdriver.close() + self.webdriver.quit() + def browserSetup( self, - ) -> WebDriver: + ) -> undetected_chromedriver.Chrome: # Configure and setup the Chrome browser - options = webdriver.ChromeOptions() + options = undetected_chromedriver.ChromeOptions() options.headless = self.headless options.add_argument(f"--lang={self.localeLang}") options.add_argument("--log-level=3") @@ -73,7 +88,7 @@ def browserSetup( options.add_argument("--disable-gpu") options.add_argument("--disable-default-apps") options.add_argument("--disable-features=Translate") - options.add_argument('--disable-features=PrivacySandboxSettings4') + options.add_argument("--disable-features=PrivacySandboxSettings4") seleniumwireOptions: dict[str, Any] = {"verify_ssl": False} @@ -173,9 +188,7 @@ def setupProfiles(self) -> Path: Returns: Path """ - currentPath = Path(__file__) - parent = currentPath.parent.parent - sessionsDir = parent / "sessions" + sessionsDir = Utils.getProjectRoot() / "sessions" # Concatenate username and browser type for a plain text session ID sessionid = f"{self.username}" @@ -184,20 +197,23 @@ def setupProfiles(self) -> Path: sessionsDir.mkdir(parents=True, exist_ok=True) return sessionsDir - def getCCodeLang(self, lang: str, geo: str) -> tuple: + @staticmethod + def getCCodeLang(lang: str, geo: str) -> tuple: if lang is None or geo is None: try: nfo = ipapi.location() - if isinstance(nfo, dict): - if lang is None: - lang = nfo["languages"].split(",")[0].split("-")[0] - if geo is None: - geo = nfo["country"] - except Exception: # pylint: disable=broad-except - return ("en", "US") - return (lang, geo) - - def getChromeVersion(self) -> str: + except RateLimited: + logging.warning("Returning default", exc_info=True) + return "en", "US" + if isinstance(nfo, dict): + if lang is None: + lang = nfo["languages"].split(",")[0].split("-")[0] + if geo is None: + geo = nfo["country"] + return lang, geo + + @staticmethod + def getChromeVersion() -> str: chrome_options = ChromeOptions() chrome_options.add_argument("--headless=new") diff --git a/src/constants.py b/src/constants.py index fc4051a6..ae8a2a43 100644 --- a/src/constants.py +++ b/src/constants.py @@ -1,2 +1,3 @@ -BASE_URL = "https://rewards.bing.com" +REWARDS_URL = "https://rewards.bing.com/" +SEARCH_URL = "https://bing.com/" VERSION = 3 diff --git a/src/dailySet.py b/src/dailySet.py index 6ec6fd37..9eef8c9e 100644 --- a/src/dailySet.py +++ b/src/dailySet.py @@ -3,7 +3,6 @@ from datetime import datetime from src.browser import Browser - from .activities import Activities @@ -16,75 +15,75 @@ def __init__(self, browser: Browser): def completeDailySet(self): # Function to complete the Daily Set logging.info("[DAILY SET] " + "Trying to complete the Daily Set...") - self.browser.utils.goHome() data = self.browser.utils.getDashboardData()["dailySetPromotions"] todayDate = datetime.now().strftime("%m/%d/%Y") for activity in data.get(todayDate, []): + cardId = int(activity["offerId"][-1:]) try: - if activity["complete"] is False: - cardId = int(activity["offerId"][-1:]) - # Open the Daily Set activity - self.activities.openDailySetActivity(cardId) - if activity["promotionType"] == "urlreward": - logging.info(f"[DAILY SET] Completing search of card {cardId}") - # Complete search for URL reward - self.activities.completeSearch() - if activity["promotionType"] == "quiz": - if ( - activity["pointProgressMax"] == 50 - and activity["pointProgress"] == 0 - ): + # Open the Daily Set activity + if activity["complete"] is not False: + continue + self.activities.openDailySetActivity(cardId) + if activity["promotionType"] == "urlreward": + logging.info(f"[DAILY SET] Completing search of card {cardId}") + # Complete search for URL reward + self.activities.completeSearch() + if activity["promotionType"] == "quiz": + if ( + activity["pointProgressMax"] == 50 + and activity["pointProgress"] == 0 + ): + logging.info( + "[DAILY SET] " + f"Completing This or That of card {cardId}" + ) + # Complete This or That for a specific point progress max + self.activities.completeThisOrThat() + elif ( + activity["pointProgressMax"] in [40, 30] + and activity["pointProgress"] == 0 + ): + logging.info(f"[DAILY SET] Completing quiz of card {cardId}") + # Complete quiz for specific point progress max + self.activities.completeQuiz() + elif ( + activity["pointProgressMax"] == 10 + and activity["pointProgress"] == 0 + ): + # Extract and parse search URL for additional checks + searchUrl = urllib.parse.unquote( + urllib.parse.parse_qs( + urllib.parse.urlparse(activity["destinationUrl"]).query + )["ru"][0] + ) + searchUrlQueries = urllib.parse.parse_qs( + urllib.parse.urlparse(searchUrl).query + ) + filters = {} + for filterEl in searchUrlQueries["filters"][0].split(" "): + filterEl = filterEl.split(":", 1) + filters[filterEl[0]] = filterEl[1] + if "PollScenarioId" in filters: logging.info( - "[DAILY SET] " - + f"Completing This or That of card {cardId}" + f"[DAILY SET] Completing poll of card {cardId}" ) - # Complete This or That for a specific point progress max - self.activities.completeThisOrThat() - elif ( - activity["pointProgressMax"] in [40, 30] - and activity["pointProgress"] == 0 - ): + # Complete survey for a specific scenario + self.activities.completeSurvey() + else: logging.info( f"[DAILY SET] Completing quiz of card {cardId}" ) - # Complete quiz for specific point progress max - self.activities.completeQuiz() - elif ( - activity["pointProgressMax"] == 10 - and activity["pointProgress"] == 0 - ): - # Extract and parse search URL for additional checks - searchUrl = urllib.parse.unquote( - urllib.parse.parse_qs( - urllib.parse.urlparse( - activity["destinationUrl"] - ).query - )["ru"][0] - ) - searchUrlQueries = urllib.parse.parse_qs( - urllib.parse.urlparse(searchUrl).query - ) - filters = {} - for filterEl in searchUrlQueries["filters"][0].split(" "): - filterEl = filterEl.split(":", 1) - filters[filterEl[0]] = filterEl[1] - if "PollScenarioId" in filters: - logging.info( - f"[DAILY SET] Completing poll of card {cardId}" - ) - # Complete survey for a specific scenario - self.activities.completeSurvey() - else: - logging.info( - f"[DAILY SET] Completing quiz of card {cardId}" - ) - try: - # Try completing ABC activity - self.activities.completeABC() - except Exception: # pylint: disable=broad-except - # Default to completing quiz - self.activities.completeQuiz() + try: + # Try completing ABC activity + self.activities.completeABC() + except Exception: # pylint: disable=broad-except + logging.warning("", exc_info=True) + # Default to completing quiz + self.activities.completeQuiz() except Exception: # pylint: disable=broad-except + logging.error( + f"[DAILY SET] Error Daily Set of card {cardId}", exc_info=True + ) # Reset tabs in case of an exception self.browser.utils.resetTabs() - logging.info("[DAILY SET] Completed the Daily Set successfully !") + continue + logging.info("[DAILY SET] Exiting") diff --git a/src/login.py b/src/login.py index 4dde1d6e..06875c53 100644 --- a/src/login.py +++ b/src/login.py @@ -1,130 +1,83 @@ -import contextlib -import logging -import time -import urllib.parse - -from selenium.webdriver.common.by import By - -from src.browser import Browser - - -class Login: - def __init__(self, browser: Browser): - self.browser = browser - self.webdriver = browser.webdriver - self.utils = browser.utils - - def login(self): - logging.info("[LOGIN] " + "Logging-in...") - self.webdriver.get( - "https://rewards.bing.com/Signin/" - ) # changed site to allow bypassing when M$ blocks access to login.live.com randomly - alreadyLoggedIn = False - while True: - try: - self.utils.waitUntilVisible( - By.CSS_SELECTOR, 'html[data-role-name="RewardsPortal"]', 0.1 - ) - alreadyLoggedIn = True - break - except Exception: # pylint: disable=broad-except - try: - self.utils.waitUntilVisible(By.ID, "i0116", 10) - break - except Exception: # pylint: disable=broad-except - if self.utils.tryDismissAllMessages(): - continue - - if not alreadyLoggedIn: - if isLocked := self.executeLogin(): - return "Locked" - self.utils.tryDismissCookieBanner() - - logging.info("[LOGIN] " + "Logged-in !") - - self.utils.goHome() - points = self.utils.getAccountPoints() - - logging.info("[LOGIN] " + "Ensuring you are logged into Bing...") - self.checkBingLogin() - logging.info("[LOGIN] Logged-in successfully !") - return points - - def executeLogin(self): - self.utils.waitUntilVisible(By.ID, "i0116", 10) - logging.info("[LOGIN] " + "Entering email...") - self.utils.waitUntilClickable(By.NAME, "loginfmt", 10) - email_field = self.webdriver.find_element(By.NAME, "loginfmt") - - while True: - email_field.send_keys(self.browser.username) - time.sleep(3) - if email_field.get_attribute("value") == self.browser.username: - self.webdriver.find_element(By.ID, "idSIButton9").click() - break - - email_field.clear() - time.sleep(3) - - try: - self.enterPassword(self.browser.password) - except Exception: # pylint: disable=broad-except - logging.error("[LOGIN] " + "2FA Code required !") - with contextlib.suppress(Exception): - code = self.webdriver.find_element( - By.ID, "idRemoteNGC_DisplaySign" - ).get_attribute("innerHTML") - logging.error(f"[LOGIN] 2FA code: {code}") - logging.info("[LOGIN] Press enter when confirmed on your device...") - input() - - while not ( - urllib.parse.urlparse(self.webdriver.current_url).path == "/" - and urllib.parse.urlparse(self.webdriver.current_url).hostname - == "account.microsoft.com" - ): - if urllib.parse.urlparse(self.webdriver.current_url).hostname == "rewards.bing.com": - self.webdriver.get("https://account.microsoft.com") - - if "Abuse" in str(self.webdriver.current_url): - logging.error(f"[LOGIN] {self.browser.username} is locked") - return True - self.utils.tryDismissAllMessages() - time.sleep(1) - - self.utils.waitUntilVisible( - By.CSS_SELECTOR, 'html[data-role-name="MeePortal"]', 10 - ) - - def enterPassword(self, password): - self.utils.waitUntilClickable(By.NAME, "passwd", 10) - self.utils.waitUntilClickable(By.ID, "idSIButton9", 10) - - logging.info("[LOGIN] " + "Writing password...") - - password_field = self.webdriver.find_element(By.NAME, "passwd") - - while True: - password_field.send_keys(password) - time.sleep(3) - if password_field.get_attribute("value") == password: - self.webdriver.find_element(By.ID, "idSIButton9").click() - break - - password_field.clear() - time.sleep(3) - time.sleep(3) - - def checkBingLogin(self): - self.webdriver.get( - "https://www.bing.com/fd/auth/signin?action=interactive&provider=windows_live_id&return_url=https%3A%2F%2Fwww.bing.com%2F" - ) - while True: - currentUrl = urllib.parse.urlparse(self.webdriver.current_url) - if currentUrl.hostname == "www.bing.com" and currentUrl.path == "/": - time.sleep(3) - self.utils.tryDismissBingCookieBanner() - with contextlib.suppress(Exception): - if self.utils.checkBingLogin(): - return - time.sleep(1) +import argparse +import contextlib +import logging +import time +from argparse import Namespace + +from selenium.common import NoSuchElementException, TimeoutException +from selenium.webdriver.common.by import By +from undetected_chromedriver import Chrome + +from src.browser import Browser + + +class Login: + browser: Browser + args: Namespace + webdriver: Chrome + + def __init__(self, browser: Browser, args: argparse.Namespace): + self.browser = browser + self.webdriver = browser.webdriver + self.utils = browser.utils + self.args = args + + def login(self) -> int: + if self.utils.isLoggedIn(): + logging.info("[LOGIN] Already logged-in") + else: + logging.info("[LOGIN] Logging-in...") + self.executeLogin() + logging.info("[LOGIN] Logged-in successfully !") + + assert self.utils.isLoggedIn() + + return self.utils.getAccountPoints() + + def executeLogin(self) -> None: + self.utils.waitUntilVisible(By.ID, "i0116", 10) + + emailField = self.utils.waitUntilClickable(By.NAME, "loginfmt", 10) + logging.info("[LOGIN] Entering email...") + emailField.send_keys(self.browser.username) + time.sleep(3) + assert emailField.get_attribute("value") == self.browser.username + self.webdriver.find_element(By.ID, "idSIButton9").click() + + isTwoFactorEnabled = False + try: + self.utils.waitUntilVisible(By.ID, "pushNotificationsTitle", 10) + isTwoFactorEnabled = True + except NoSuchElementException: + logging.info("2FA not enabled") + + if isTwoFactorEnabled: + # todo - Handle 2FA when running headless + assert ( + self.args.visible + ), "2FA detected, run in visible mode to handle login" + while True: + print( + "2FA detected, handle prompts and press enter when on rewards portal to continue" + ) + input() + with contextlib.suppress(TimeoutException): + self.utils.waitUntilVisible( + By.CSS_SELECTOR, 'html[data-role-name="RewardsPortal"]', 10 + ) + break + print("Rewards portal not accessible, waiting until next attempt") + else: + passwordField = self.utils.waitUntilClickable(By.NAME, "passwd", 10) + enterPasswordButton = self.utils.waitUntilClickable( + By.ID, "idSIButton9", 10 + ) + logging.info("[LOGIN] Entering password...") + passwordField.send_keys(self.browser.password) + time.sleep(3) + assert passwordField.get_attribute("value") == self.browser.password + enterPasswordButton.click() + + self.utils.waitUntilVisible( + By.CSS_SELECTOR, 'html[data-role-name="RewardsPortal"]', 10 + ) diff --git a/src/morePromotions.py b/src/morePromotions.py index c676ae08..16e9463d 100644 --- a/src/morePromotions.py +++ b/src/morePromotions.py @@ -1,10 +1,10 @@ import logging from src.browser import Browser - from .activities import Activities +# todo Rename MoreActivities? class MorePromotions: def __init__(self, browser: Browser): self.browser = browser @@ -13,36 +13,52 @@ def __init__(self, browser: Browser): def completeMorePromotions(self): # Function to complete More Promotions logging.info("[MORE PROMOS] " + "Trying to complete More Promotions...") - self.browser.utils.goHome() - morePromotions = self.browser.utils.getDashboardData()["morePromotions"] - i = 0 + morePromotions: list[dict] = self.browser.utils.getDashboardData()[ + "morePromotions" + ] for promotion in morePromotions: try: - i += 1 + promotionTitle = promotion["title"] + logging.debug(f"promotionTitle={promotionTitle}") + # Open the activity for the promotion if ( - promotion["complete"] is False - and promotion["pointProgressMax"] != 0 + promotion["complete"] is not False + or promotion["pointProgressMax"] == 0 + ): + logging.debug("Already done, continuing") + # todo Handle special "Quote of the day" which is falsely complete + continue + self.activities.openMorePromotionsActivity( + morePromotions.index(promotion) + ) + if promotion["promotionType"] == "urlreward": + if promotion["title"] == "Search the lyrics of a song": + self.browser.webdriver.get( + "https://www.bing.com/search?q=black+sabbath+supernaut+lyrics" + ) + elif promotion["title"] == "Translate anything": + self.browser.webdriver.get( + "https://www.bing.com/search?q=translate+pencil+sharpener+to+spanish" + ) + # Complete search for URL reward + self.activities.completeSearch() + elif ( + promotion["promotionType"] == "quiz" + and promotion["pointProgress"] == 0 ): - # Open the activity for the promotion - self.activities.openMorePromotionsActivity(i) - if promotion["promotionType"] == "urlreward": - # Complete search for URL reward - self.activities.completeSearch() - elif ( - promotion["promotionType"] == "quiz" - and promotion["pointProgress"] == 0 - ): - # Complete different types of quizzes based on point progress max - if promotion["pointProgressMax"] == 10: - self.activities.completeABC() - elif promotion["pointProgressMax"] in [30, 40]: - self.activities.completeQuiz() - elif promotion["pointProgressMax"] == 50: - self.activities.completeThisOrThat() - else: - # Default to completing search - self.activities.completeSearch() + # Complete different types of quizzes based on point progress max + if promotion["pointProgressMax"] == 10: + self.activities.completeABC() + elif promotion["pointProgressMax"] in [30, 40]: + self.activities.completeQuiz() + elif promotion["pointProgressMax"] == 50: + self.activities.completeThisOrThat() + else: + # Default to completing search + self.activities.completeSearch() except Exception: # pylint: disable=broad-except + logging.error("[MORE PROMOS] Error More Promotions", exc_info=True) # Reset tabs in case of an exception self.browser.utils.resetTabs() - logging.info("[MORE PROMOS] Completed More Promotions successfully !") + continue + logging.info("[MORE PROMOS] Exiting") diff --git a/src/punchCards.py b/src/punchCards.py index 24e192b9..1657b477 100644 --- a/src/punchCards.py +++ b/src/punchCards.py @@ -7,8 +7,7 @@ from selenium.webdriver.common.by import By from src.browser import Browser - -from .constants import BASE_URL +from .constants import REWARDS_URL class PunchCards: @@ -73,18 +72,17 @@ def completePunchCards(self): punchCard["childPromotions"], ) except Exception: # pylint: disable=broad-except + logging.error("[PUNCH CARDS] Error Punch Cards", exc_info=True) self.browser.utils.resetTabs() - logging.info("[PUNCH CARDS] Completed the Punch Cards successfully !") - time.sleep(random.randint(100, 700) / 100) - self.webdriver.get(BASE_URL) - time.sleep(random.randint(100, 700) / 100) + continue + logging.info("[PUNCH CARDS] Exiting") def completePromotionalItems(self): # Function to complete promotional items with contextlib.suppress(Exception): item = self.browser.utils.getDashboardData()["promotionalItem"] destUrl = urllib.parse.urlparse(item["destinationUrl"]) - baseUrl = urllib.parse.urlparse(BASE_URL) + baseUrl = urllib.parse.urlparse(REWARDS_URL) if ( (item["pointProgressMax"] in [100, 200, 500]) and not item["complete"] diff --git a/src/searches.py b/src/searches.py index 2ed4510e..00b3df45 100644 --- a/src/searches.py +++ b/src/searches.py @@ -1,131 +1,179 @@ -import json -import logging -import random -import time -from datetime import date, timedelta - -import requests -from selenium.common.exceptions import TimeoutException -from selenium.webdriver.common.by import By -from selenium.webdriver.common.keys import Keys - -from src.browser import Browser -from src.utils import Utils - - -class Searches: - def __init__(self, browser: Browser): - self.browser = browser - self.webdriver = browser.webdriver - - def getGoogleTrends(self, wordsCount: int) -> list: - # Function to retrieve Google Trends search terms - searchTerms: list[str] = [] - i = 0 - while len(searchTerms) < wordsCount: - i += 1 - # Fetching daily trends from Google Trends API - r = requests.get( - f'https://trends.google.com/trends/api/dailytrends?hl={self.browser.localeLang}&ed={(date.today() - timedelta(days=i)).strftime("%Y%m%d")}&geo={self.browser.localeGeo}&ns=15' - ) - trends = json.loads(r.text[6:]) - for topic in trends["default"]["trendingSearchesDays"][0][ - "trendingSearches" - ]: - searchTerms.append(topic["title"]["query"].lower()) - searchTerms.extend( - relatedTopic["query"].lower() - for relatedTopic in topic["relatedQueries"] - ) - searchTerms = list(set(searchTerms)) - del searchTerms[wordsCount : (len(searchTerms) + 1)] - return searchTerms - - def getRelatedTerms(self, word: str) -> list: - # Function to retrieve related terms from Bing API - try: - r = requests.get( - f"https://api.bing.com/osjson.aspx?query={word}", - headers={"User-agent": self.browser.userAgent}, - ) - return r.json()[1] - except Exception: # pylint: disable=broad-except - return [] - - def bingSearches(self, numberOfSearches: int, pointsCounter: int = 0): - # Function to perform Bing searches - logging.info( - f"[BING] Starting {self.browser.browserType.capitalize()} Edge Bing searches..." - ) - - search_terms = self.getGoogleTrends(numberOfSearches) - self.webdriver.get("https://bing.com") - - i = 0 - attempt = 0 - for word in search_terms: - i += 1 - logging.info(f"[BING] {i}/{numberOfSearches}") - points = self.bingSearch(word) - if points <= pointsCounter: - relatedTerms = self.getRelatedTerms(word)[:2] - for term in relatedTerms: - points = self.bingSearch(term) - if not points <= pointsCounter: - break - if points > 0: - pointsCounter = points - else: - break - - if points <= pointsCounter: - attempt += 1 - if attempt == 2: - logging.warning( - "[BING] Possible blockage. Refreshing the page." - ) - self.webdriver.refresh() - attempt = 0 - logging.info( - f"[BING] Finished {self.browser.browserType.capitalize()} Edge Bing searches !" - ) - return pointsCounter - - def bingSearch(self, word: str): - # Function to perform a single Bing search - i = 0 - - while True: - try: - self.browser.utils.waitUntilClickable(By.ID, "sb_form_q") - searchbar = self.webdriver.find_element(By.ID, "sb_form_q") - searchbar.clear() - searchbar.send_keys(word) - searchbar.submit() - time.sleep(Utils.randomSeconds(100, 180)) - - # Scroll down after the search (adjust the number of scrolls as needed) - for _ in range(3): # Scroll down 3 times - self.webdriver.execute_script( - "window.scrollTo(0, document.body.scrollHeight);" - ) - time.sleep( - Utils.randomSeconds(7, 10) - ) # Random wait between scrolls - - return self.browser.utils.getBingAccountPoints() - except TimeoutException: - if i == 5: - logging.info("[BING] " + "TIMED OUT GETTING NEW PROXY") - self.webdriver.proxy = self.browser.giveMeProxy() - elif i == 10: - logging.error( - "[BING] " - + "Cancelling mobile searches due to too many retries." - ) - return self.browser.utils.getBingAccountPoints() - self.browser.utils.tryDismissAllMessages() - logging.error("[BING] " + "Timeout, retrying in 5~ seconds...") - time.sleep(Utils.randomSeconds(7, 15)) - i += 1 - continue +import json +import logging +import random +import shelve +import time +from datetime import date, timedelta +from enum import Enum, auto +from itertools import cycle +from typing import Final + +import requests +from selenium.webdriver.common.by import By + +from src.browser import Browser +from src.utils import Utils, RemainingSearches + +LOAD_DATE_KEY = "loadDate" + + +class AttemptsStrategy(Enum): + exponential = auto() + constant = auto() + + +class Searches: + config = Utils.loadConfig() + maxAttempts: Final[int] = config.get("attempts", {}).get("max", 6) + baseDelay: Final[float] = config.get("attempts", {}).get( + "base_delay_in_seconds", 60 + ) + # attemptsStrategy = Final[ # todo Figure why doesn't work with equality below + attemptsStrategy = AttemptsStrategy[ + config.get("attempts", {}).get("strategy", AttemptsStrategy.constant.name) + ] + + def __init__(self, browser: Browser, searches: RemainingSearches): + self.browser = browser + self.webdriver = browser.webdriver + + self.googleTrendsShelf: shelve.Shelf = shelve.open("google_trends") + logging.debug(f"google_trends = {list(self.googleTrendsShelf.items())}") + loadDate: date | None = None + if LOAD_DATE_KEY in self.googleTrendsShelf: + loadDate = self.googleTrendsShelf[LOAD_DATE_KEY] + + if loadDate is None or loadDate < date.today(): + self.googleTrendsShelf.clear() + self.googleTrendsShelf[LOAD_DATE_KEY] = date.today() + trends = self.getGoogleTrends(searches.getTotal()) + random.shuffle(trends) + for trend in trends: + self.googleTrendsShelf[trend] = None + logging.debug( + f"google_trends after load = {list(self.googleTrendsShelf.items())}" + ) + + def getGoogleTrends(self, wordsCount: int) -> list[str]: + # Function to retrieve Google Trends search terms + searchTerms: list[str] = [] + i = 0 + while len(searchTerms) < wordsCount: + i += 1 + # Fetching daily trends from Google Trends API + r = requests.get( + f"https://trends.google.com/trends/api/dailytrends?hl={self.browser.localeLang}" + f'&ed={(date.today() - timedelta(days=i)).strftime("%Y%m%d")}&geo={self.browser.localeGeo}&ns=15' + ) + trends = json.loads(r.text[6:]) + for topic in trends["default"]["trendingSearchesDays"][0][ + "trendingSearches" + ]: + searchTerms.append(topic["title"]["query"].lower()) + searchTerms.extend( + relatedTopic["query"].lower() + for relatedTopic in topic["relatedQueries"] + ) + searchTerms = list(set(searchTerms)) + del searchTerms[wordsCount : (len(searchTerms) + 1)] + return searchTerms + + def getRelatedTerms(self, term: str) -> list[str]: + # Function to retrieve related terms from Bing API + relatedTerms: list[str] = requests.get( + f"https://api.bing.com/osjson.aspx?query={term}", + headers={"User-agent": self.browser.userAgent}, + ).json()[1] + if not relatedTerms: + return [term] + return relatedTerms + + def bingSearches(self, numberOfSearches: int, pointsCounter: int = 0) -> int: + # Function to perform Bing searches + logging.info( + f"[BING] Starting {self.browser.browserType.capitalize()} Edge Bing searches..." + ) + + self.browser.utils.goToSearch() + + # todo Make sure rewards quiz is done + + for searchCount in range(1, numberOfSearches + 1): + # todo Disable cooldown for first 3 searches (Earning starts with your third search) + logging.info(f"[BING] {searchCount}/{numberOfSearches}") + googleTrends: list[str] = list(self.googleTrendsShelf.keys()) + logging.debug(f"self.googleTrendsShelf.keys() = {googleTrends}") + googleTrend = list(self.googleTrendsShelf.keys())[1] + pointsCounter = self.bingSearch(googleTrend) + logging.debug(f"pointsCounter = {pointsCounter}") + time.sleep(random.randint(10, 15)) + + logging.info( + f"[BING] Finished {self.browser.browserType.capitalize()} Edge Bing searches !" + ) + self.googleTrendsShelf.close() + return pointsCounter + + def bingSearch(self, term: str) -> int: + # Function to perform a single Bing search + pointsBefore = self.getAccountPoints() + + terms = self.getRelatedTerms(term) + logging.debug(f"terms={terms}") + termsCycle: cycle[str] = cycle(terms) + baseDelay = Searches.baseDelay + passedInTerm = term + logging.debug(f"passedInTerm={passedInTerm}") + + for i in range(self.maxAttempts): + searchbar = self.browser.utils.waitUntilVisible( + By.ID, "sb_form_q", timeToWait=20 + ) + + for _ in range(100): + searchbar.clear() + term = next(termsCycle) + logging.debug(f"term={term}") + searchbar.send_keys(term) + time.sleep(2) + try: + assert searchbar.get_attribute("value") == term + except AssertionError: + logging.debug('searchbar.get_attribute("value") != term') + continue + break + + searchbar.submit() + + pointsAfter = self.getAccountPoints() + if pointsBefore < pointsAfter: + del self.googleTrendsShelf[passedInTerm] + return pointsAfter + + # todo + # if i == (maxAttempts / 2): + # logging.info("[BING] " + "TIMED OUT GETTING NEW PROXY") + # self.webdriver.proxy = self.browser.giveMeProxy() + + baseDelay += random.randint(1, 10) # add some jitter + logging.debug( + f"[BING] Search attempt failed {i + 1}/{Searches.maxAttempts}, retrying after sleeping {baseDelay}" + f" seconds..." + ) + time.sleep(baseDelay) + + if Searches.attemptsStrategy == AttemptsStrategy.exponential: + baseDelay *= 2 + # todo debug why we get to this point occasionally even though searches complete + # update - Seems like account points aren't refreshing correctly see + logging.error("[BING] Reached max search attempt retries") + + # move failing term to end of list + logging.debug("Moving term to end of list") + del self.googleTrendsShelf[passedInTerm] + self.googleTrendsShelf[passedInTerm] = None + + return pointsBefore + + def getAccountPoints(self) -> int: + return self.browser.utils.getBingInfo()["userInfo"]["balance"] diff --git a/src/userAgentGenerator.py b/src/userAgentGenerator.py index 022d0fc7..c904f17b 100644 --- a/src/userAgentGenerator.py +++ b/src/userAgentGenerator.py @@ -31,7 +31,7 @@ class GenerateUserAgent: def userAgent( self, - browserConfig: dict[str, Any], + browserConfig: dict[str, Any] | None, mobile: bool = False, ) -> tuple[str, dict[str, Any], Any]: """ @@ -52,16 +52,16 @@ def userAgent( else self.USER_AGENT_TEMPLATES.get("edge_pc", "") ) + # todo - Refactor, kinda spaghetti code-y newBrowserConfig = None - if userAgentMetadata := browserConfig.get("userAgentMetadata"): - platformVersion = userAgentMetadata["platformVersion"] - + if browserConfig is not None: + platformVersion = browserConfig.get("userAgentMetadata")["platformVersion"] else: # ref : https://textslashplain.com/2021/09/21/determining-os-platform-version/ platformVersion = ( f"{random.randint(9,13) if mobile else random.randint(1,15)}.0.0" ) - newBrowserConfig = browserConfig + newBrowserConfig = {} newBrowserConfig["userAgentMetadata"] = { "platformVersion": platformVersion, } diff --git a/src/utils.py b/src/utils.py index 776745ed..efade9aa 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,296 +1,231 @@ -import contextlib -import json -import locale as pylocale -import random -import time -import urllib.parse -from pathlib import Path - -import requests -from selenium.webdriver.chrome.webdriver import WebDriver -from selenium.webdriver.common.by import By -from selenium.webdriver.support import expected_conditions as ec -from selenium.webdriver.support.wait import WebDriverWait - -import apprise -import yaml - -from .constants import BASE_URL - - -class Utils: - def __init__(self, webdriver: WebDriver, config_file='config.yaml'): - self.webdriver = webdriver - with contextlib.suppress(Exception): - locale = pylocale.getdefaultlocale()[0] - pylocale.setlocale(pylocale.LC_NUMERIC, locale) - - self.config = self.load_config(config_file) - - @staticmethod - def load_config(config_file): - with open(config_file, 'r') as file: - return yaml.safe_load(file) - - @staticmethod - def send_notification(title, body, config_file='config.yaml'): - apobj = apprise.Apprise() - for url in Utils.load_config(config_file)['apprise']['urls']: - apobj.add(url) - apobj.notify(body=body, title=title) - - def waitUntilVisible(self, by: str, selector: str, timeToWait: float = 10): - WebDriverWait(self.webdriver, timeToWait).until( - ec.visibility_of_element_located((by, selector)) - ) - - def waitUntilClickable(self, by: str, selector: str, timeToWait: float = 10): - WebDriverWait(self.webdriver, timeToWait).until( - ec.element_to_be_clickable((by, selector)) - ) - - def waitForMSRewardElement(self, by: str, selector: str): - loadingTimeAllowed = 5 - refreshsAllowed = 5 - - checkingInterval = 0.5 - checks = loadingTimeAllowed / checkingInterval - - tries = 0 - refreshCount = 0 - while True: - try: - self.webdriver.find_element(by, selector) - return True - except Exception: - if tries < checks: - tries += 1 - time.sleep(checkingInterval) - elif refreshCount < refreshsAllowed: - self.webdriver.refresh() - refreshCount += 1 - tries = 0 - time.sleep(5) - else: - return False - - def waitUntilQuestionRefresh(self): - return self.waitForMSRewardElement(By.CLASS_NAME, "rqECredits") - - def waitUntilQuizLoads(self): - return self.waitForMSRewardElement(By.XPATH, '//*[@id="rqStartQuiz"]') - - def waitUntilJS(self, jsSrc: str): - loadingTimeAllowed = 5 - refreshsAllowed = 5 - - checkingInterval = 0.5 - checks = loadingTimeAllowed / checkingInterval - - tries = 0 - refreshCount = 0 - while True: - elem = self.webdriver.execute_script(jsSrc) - if elem: - return elem - - if tries < checks: - tries += 1 - time.sleep(checkingInterval) - elif refreshCount < refreshsAllowed: - self.webdriver.refresh() - refreshCount += 1 - tries = 0 - time.sleep(5) - else: - return elem - - def resetTabs(self): - try: - curr = self.webdriver.current_window_handle - - for handle in self.webdriver.window_handles: - if handle != curr: - self.webdriver.switch_to.window(handle) - time.sleep(0.5) - self.webdriver.close() - time.sleep(0.5) - - self.webdriver.switch_to.window(curr) - time.sleep(0.5) - self.goHome() - except Exception: - self.goHome() - - def goHome(self): - reloadThreshold = 5 - reloadInterval = 10 - targetUrl = urllib.parse.urlparse(BASE_URL) - self.webdriver.get(BASE_URL) - reloads = 0 - interval = 1 - intervalCount = 0 - while True: - self.tryDismissCookieBanner() - with contextlib.suppress(Exception): - self.webdriver.find_element(By.ID, "more-activities") - break - currentUrl = urllib.parse.urlparse(self.webdriver.current_url) - if ( - currentUrl.hostname != targetUrl.hostname - ) and self.tryDismissAllMessages(): - time.sleep(1) - self.webdriver.get(BASE_URL) - time.sleep(interval) - if "proofs" in str(self.webdriver.current_url): - return "Verify" - intervalCount += 1 - if intervalCount >= reloadInterval: - intervalCount = 0 - reloads += 1 - self.webdriver.refresh() - if reloads >= reloadThreshold: - break - - def getAnswerCode(self, key: str, string: str) -> str: - t = sum(ord(string[i]) for i in range(len(string))) - t += int(key[-2:], 16) - return str(t) - - def getDashboardData(self) -> dict: - return self.webdriver.execute_script("return dashboard") - - def getBingInfo(self): - cookieJar = self.webdriver.get_cookies() - cookies = {cookie["name"]: cookie["value"] for cookie in cookieJar} - maxTries = 5 - for _ in range(maxTries): - with contextlib.suppress(Exception): - response = requests.get( - "https://www.bing.com/rewards/panelflyout/getuserinfo", - cookies=cookies, - ) - if response.status_code == requests.codes.ok: - return response.json() - time.sleep(1) - return None - - def checkBingLogin(self): - if data := self.getBingInfo(): - return data["userInfo"]["isRewardsUser"] - else: - return False - - def getAccountPoints(self) -> int: - return self.getDashboardData()["userStatus"]["availablePoints"] - - def getBingAccountPoints(self) -> int: - return data["userInfo"]["balance"] if (data := self.getBingInfo()) else 0 - - def getGoalPoints(self) -> int: - return self.getDashboardData()["userStatus"]["redeemGoal"]["price"] - - def getGoalTitle(self) -> str: - return self.getDashboardData()["userStatus"]["redeemGoal"]["title"] - - def tryDismissAllMessages(self): - buttons = [ - (By.ID, "iLandingViewAction"), - (By.ID, "iShowSkip"), - (By.ID, "iNext"), - (By.ID, "iLooksGood"), - (By.ID, "idSIButton9"), - (By.CSS_SELECTOR, ".ms-Button.ms-Button--primary"), - (By.ID, "bnp_btn_accept"), - (By.ID, "acceptButton") - ] - result = False - for button in buttons: - try: - elements = self.webdriver.find_elements(button[0], button[1]) - try: - for element in elements: - element.click() - except Exception: - continue - result = True - except Exception: - continue - return result - - def tryDismissCookieBanner(self): - with contextlib.suppress(Exception): - self.webdriver.find_element(By.ID, "cookie-banner").find_element( - By.TAG_NAME, "button" - ).click() - time.sleep(2) - - def tryDismissBingCookieBanner(self): - with contextlib.suppress(Exception): - self.webdriver.find_element(By.ID, "bnp_btn_accept").click() - time.sleep(2) - - def switchToNewTab(self, timeToWait: int = 0): - time.sleep(0.5) - self.webdriver.switch_to.window(window_name=self.webdriver.window_handles[1]) - if timeToWait > 0: - time.sleep(timeToWait) - - def closeCurrentTab(self): - self.webdriver.close() - time.sleep(0.5) - self.webdriver.switch_to.window(window_name=self.webdriver.window_handles[0]) - time.sleep(0.5) - - def visitNewTab(self, timeToWait: int = 0): - self.switchToNewTab(timeToWait) - self.closeCurrentTab() - - def getRemainingSearches(self): - dashboard = self.getDashboardData() - searchPoints = 1 - counters = dashboard["userStatus"]["counters"] - - if "pcSearch" not in counters: - return 0, 0 - - progressDesktop = counters["pcSearch"][0]["pointProgress"] - targetDesktop = counters["pcSearch"][0]["pointProgressMax"] - if len(counters["pcSearch"]) >= 2: - progressDesktop = progressDesktop + counters["pcSearch"][1]["pointProgress"] - targetDesktop = targetDesktop + counters["pcSearch"][1]["pointProgressMax"] - if targetDesktop in [30, 90, 102]: - searchPoints = 3 - elif targetDesktop == 50 or targetDesktop >= 170 or targetDesktop == 150: - searchPoints = 5 - remainingDesktop = int((targetDesktop - progressDesktop) / searchPoints) - remainingMobile = 0 - if dashboard["userStatus"]["levelInfo"]["activeLevel"] != "Level1": - progressMobile = counters["mobileSearch"][0]["pointProgress"] - targetMobile = counters["mobileSearch"][0]["pointProgressMax"] - remainingMobile = int((targetMobile - progressMobile) / searchPoints) - return remainingDesktop, remainingMobile - - def formatNumber(self, number, num_decimals=2): - return pylocale.format_string( - f"%10.{num_decimals}f", number, grouping=True - ).strip() - - def randomSeconds(self, max_value): - random_number = random.uniform(self, max_value) - return round(random_number, 3) - - @staticmethod - def getBrowserConfig(sessionPath: Path) -> dict: - configFile = sessionPath.joinpath("config.json") - if configFile.exists(): - with open(configFile, "r") as f: - return json.load(f) - else: - return {} - - @staticmethod - def saveBrowserConfig(sessionPath: Path, config: dict): - configFile = sessionPath.joinpath("config.json") - with open(configFile, "w") as f: - json.dump(config, f) +import contextlib +import json +import locale as pylocale +import time +from argparse import Namespace +from pathlib import Path +from typing import NamedTuple, Any + +import requests +import yaml +from apprise import Apprise +from selenium.common import NoSuchElementException, TimeoutException +from selenium.webdriver.chrome.webdriver import WebDriver +from selenium.webdriver.common.by import By +from selenium.webdriver.remote.webelement import WebElement +from selenium.webdriver.support import expected_conditions as ec +from selenium.webdriver.support.wait import WebDriverWait + +from .constants import REWARDS_URL +from .constants import SEARCH_URL + + +class RemainingSearches(NamedTuple): + desktop: int + mobile: int + + def getTotal(self) -> int: + return self.desktop + self.mobile + + +class Utils: + args: Namespace + + def __init__(self, webdriver: WebDriver): + self.webdriver = webdriver + with contextlib.suppress(Exception): + locale = pylocale.getdefaultlocale()[0] + pylocale.setlocale(pylocale.LC_NUMERIC, locale) + + self.config = self.loadConfig() + + @staticmethod + def getProjectRoot() -> Path: + return Path(__file__).parent.parent + + @staticmethod + def loadConfig(config_file=getProjectRoot() / "config.yaml") -> dict: + with open(config_file, "r") as file: + return yaml.safe_load(file) + + @staticmethod + def sendNotification(title, body) -> None: + if Utils.args.disable_apprise: + return + apprise = Apprise() + urls: list[str] = Utils.loadConfig().get("apprise", {}).get("urls", []) + for url in urls: + apprise.add(url) + apprise.notify(body=body, title=title) + + def waitUntilVisible( + self, by: str, selector: str, timeToWait: float = 10 + ) -> WebElement: + return WebDriverWait(self.webdriver, timeToWait).until( + ec.visibility_of_element_located((by, selector)) + ) + + def waitUntilClickable( + self, by: str, selector: str, timeToWait: float = 10 + ) -> WebElement: + return WebDriverWait(self.webdriver, timeToWait).until( + ec.element_to_be_clickable((by, selector)) + ) + + def waitUntilQuestionRefresh(self) -> WebElement: + return self.waitUntilVisible(By.CLASS_NAME, "rqECredits", timeToWait=20) + + def waitUntilQuizLoads(self) -> WebElement: + return self.waitUntilVisible(By.XPATH, '//*[@id="rqStartQuiz"]') + + def resetTabs(self) -> None: + curr = self.webdriver.current_window_handle + + for handle in self.webdriver.window_handles: + if handle != curr: + self.webdriver.switch_to.window(handle) + time.sleep(0.5) + self.webdriver.close() + time.sleep(0.5) + + self.webdriver.switch_to.window(curr) + time.sleep(0.5) + self.goToRewards() + + def goToRewards(self) -> None: + self.webdriver.get(REWARDS_URL) + assert self.webdriver.current_url == REWARDS_URL + + def goToSearch(self) -> None: + self.webdriver.get(SEARCH_URL) + # assert self.webdriver.current_url == SEARCH_URL, f"{self.webdriver.current_url} {SEARCH_URL}" + + @staticmethod + def getAnswerCode(key: str, string: str) -> str: + t = sum(ord(string[i]) for i in range(len(string))) + t += int(key[-2:], 16) + return str(t) + + def getDashboardData(self) -> dict: + self.goToRewards() + return self.webdriver.execute_script("return dashboard") + + def getBingInfo(self) -> Any: + cookieJar = WebDriverWait(self.webdriver, timeout=20).until(lambda d: d.get_cookies()) + cookies = {cookie["name"]: cookie["value"] for cookie in cookieJar} + response = requests.get( + "https://www.bing.com/rewards/panelflyout/getuserinfo", + cookies=cookies, + ) + assert response.status_code == requests.codes.ok + return response.json() + + def isLoggedIn(self) -> bool: + self.webdriver.get( + "https://rewards.bing.com/Signin/" + ) # changed site to allow bypassing when M$ blocks access to login.live.com randomly + with contextlib.suppress(TimeoutException): + self.waitUntilVisible( + By.CSS_SELECTOR, 'html[data-role-name="RewardsPortal"]', 10 + ) + return True + return False + + # todo - See if faster, but reliable, way to get this information that doesn't change page + def getAccountPoints(self) -> int: + return self.getDashboardData()["userStatus"]["availablePoints"] + + def getGoalPoints(self) -> int: + return self.getDashboardData()["userStatus"]["redeemGoal"]["price"] + + def getGoalTitle(self) -> str: + return self.getDashboardData()["userStatus"]["redeemGoal"]["title"] + + def tryDismissAllMessages(self) -> None: + buttons = [ + (By.ID, "iLandingViewAction"), + (By.ID, "iShowSkip"), + (By.ID, "iNext"), + (By.ID, "iLooksGood"), + (By.ID, "idSIButton9"), + (By.CSS_SELECTOR, ".ms-Button.ms-Button--primary"), + (By.ID, "bnp_btn_accept"), + (By.ID, "acceptButton"), + ] + for button in buttons: + try: + elements = self.webdriver.find_elements(by=button[0], value=button[1]) + except NoSuchElementException: # Expected? + continue + for element in elements: + element.click() + + def tryDismissCookieBanner(self) -> None: + with contextlib.suppress(NoSuchElementException): # Expected + self.webdriver.find_element(By.ID, "cookie-banner").find_element( + By.TAG_NAME, "button" + ).click() + time.sleep(2) + + def tryDismissBingCookieBanner(self) -> None: + with contextlib.suppress(NoSuchElementException): # Expected + self.webdriver.find_element(By.ID, "bnp_btn_accept").click() + time.sleep(2) + + def switchToNewTab(self, timeToWait: float = 0) -> None: + time.sleep(0.5) + self.webdriver.switch_to.window(window_name=self.webdriver.window_handles[1]) + if timeToWait > 0: + time.sleep(timeToWait) + + def closeCurrentTab(self) -> None: + self.webdriver.close() + time.sleep(0.5) + self.webdriver.switch_to.window(window_name=self.webdriver.window_handles[0]) + time.sleep(0.5) + + def visitNewTab(self, timeToWait: float = 0) -> None: + self.switchToNewTab(timeToWait) + self.closeCurrentTab() + + def getRemainingSearches(self) -> RemainingSearches: + dashboard = self.getDashboardData() + searchPoints = 1 + counters = dashboard["userStatus"]["counters"] + + progressDesktop = counters["pcSearch"][0]["pointProgress"] + targetDesktop = counters["pcSearch"][0]["pointProgressMax"] + if len(counters["pcSearch"]) >= 2: + progressDesktop = progressDesktop + counters["pcSearch"][1]["pointProgress"] + targetDesktop = targetDesktop + counters["pcSearch"][1]["pointProgressMax"] + if targetDesktop in [30, 90, 102]: + searchPoints = 3 + elif targetDesktop == 50 or targetDesktop >= 170 or targetDesktop == 150: + searchPoints = 5 + remainingDesktop = int((targetDesktop - progressDesktop) / searchPoints) + remainingMobile = 0 + if dashboard["userStatus"]["levelInfo"]["activeLevel"] != "Level1": + progressMobile = counters["mobileSearch"][0]["pointProgress"] + targetMobile = counters["mobileSearch"][0]["pointProgressMax"] + remainingMobile = int((targetMobile - progressMobile) / searchPoints) + return RemainingSearches(desktop=remainingDesktop, mobile=remainingMobile) + + @staticmethod + def formatNumber(number, num_decimals=2) -> str: + return pylocale.format_string( + f"%10.{num_decimals}f", number, grouping=True + ).strip() + + @staticmethod + def getBrowserConfig(sessionPath: Path) -> dict | None: + configFile = sessionPath / "config.json" + if not configFile.exists(): + return + with open(configFile, "r") as f: + return json.load(f) + + @staticmethod + def saveBrowserConfig(sessionPath: Path, config: dict) -> None: + configFile = sessionPath / "config.json" + with open(configFile, "w") as f: + json.dump(config, f) diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/test_main.py b/test/test_main.py new file mode 100644 index 00000000..5081dbd7 --- /dev/null +++ b/test/test_main.py @@ -0,0 +1,32 @@ +import unittest +from unittest.mock import patch, MagicMock + +import main + + +class TestMain(unittest.TestCase): + + # noinspection PyUnusedLocal + @patch.object(main, "save_previous_points_data") + @patch.object(main, "setupLogging") + @patch.object(main, "setupAccounts") + @patch.object(main, "executeBot") + # @patch.object(Utils, "send_notification") + def test_send_notification_when_exception( + self, + # mock_send_notification: MagicMock, + mock_executeBot: MagicMock, + mock_setupAccounts: MagicMock, + mock_setupLogging: MagicMock, + mock_save_previous_points_data: MagicMock, + ): + mock_setupAccounts.return_value = [{"password": "foo", "username": "bar"}] + mock_executeBot.side_effect = Exception + + main.main() + + # mock_send_notification.assert_called() + + +if __name__ == "__main__": + unittest.main()