diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml index 16db6461..da2d09ce 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yml +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -46,7 +46,8 @@ body: attributes: label: Copy and paste your error description: | - From the terminal that was running the farmer, copy and paste the error/bug here. + Run the program with -d argument (ex: python main.py -d) and, from the terminal that was running the farmer, + copy and paste the log/error/bug here. validations: required: true - type: textarea diff --git a/.github/workflows/build_push_docker_image.yml b/.github/workflows/build_push_docker_image.yml new file mode 100644 index 00000000..4bf039a6 --- /dev/null +++ b/.github/workflows/build_push_docker_image.yml @@ -0,0 +1,37 @@ +name: Build and Push Docker Image + +on: + push: + tags: + - "*" + workflow_dispatch: + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout Code + uses: actions/checkout@v4 + - name: Docker meta + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ secrets.DOCKERHUB_USERNAME }}/ms-rewards + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Build and push + uses: docker/build-push-action@v5 + with: + context: . + platforms: linux/amd64,linux/arm64 + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} diff --git a/.gitignore b/.gitignore index 7302770d..fa083af2 100644 --- a/.gitignore +++ b/.gitignore @@ -187,3 +187,5 @@ runbot.bat /google_trends.dir /google_trends.bak /config-private.yaml +mypy.ini +config.yaml diff --git a/.template-config-private.yaml b/.template-config-private.yaml deleted file mode 100644 index 60b62b9d..00000000 --- a/.template-config-private.yaml +++ /dev/null @@ -1,5 +0,0 @@ -# config-private.yaml -# Copy this file to config-private.yaml to use -apprise: - urls: - - 'discord://WebhookID/WebhookToken' # Replace with your actual Apprise service URLs diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..e4595adf --- /dev/null +++ b/Dockerfile @@ -0,0 +1,9 @@ +FROM python:slim +COPY . /app +WORKDIR /app +RUN apt-get update && apt-get install -y cron chromium chromium-driver \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* +RUN pip install --no-cache-dir -r requirements.txt +ENV DOCKER=1 +CMD ["sh", "docker.sh"] \ No newline at end of file diff --git a/README.md b/README.md index d48f509e..e9b43b43 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,8 @@ > [!IMPORTANT] -> If you are multi-accounting and abusing the service for which this is intended - **_DO NOT COMPLAIN ABOUT BANS!!!_** +> If you are multi-accounting and abusing the service for which this is intended - * +*_DO NOT COMPLAIN ABOUT BANS!!!_** @@ -42,61 +43,156 @@ 3. (Windows Only) Make sure Visual C++ redistributable DLLs are installed If they're not, install the current "vc_redist.exe" from - this [link](https://learn.microsoft.com/en-GB/cpp/windows/latest-supported-vc-redist?view=msvc-170) and reboot your - computer - -4. Edit the `accounts.json.sample` with your accounts credentials and rename it by removing `.sample` at the end. - - The "totp" field is not mandatory, only enter your TOTP key if you use it for 2FA (if ommitting, don't keep - it as an empty string, remove the line completely). - - The "proxy" field is not mandatory, you can omit it if you don't want to use proxy (don't keep it as an empty string, - remove the line completely). - - - If you want to add more than one account, the syntax is the following: - - ```json - [ - { - "username": "Your Email 1", - "password": "Your Password 1", - "totp": "0123 4567 89ab cdef", - "proxy": "http://user:pass@host1:port" - }, - { - "username": "Your Email 2", - "password": "Your Password 2", - "totp": "0123 4567 89ab cdef", - "proxy": "http://user:pass@host2:port" - } - ] + this [link](https://learn.microsoft.com/en-GB/cpp/windows/latest-supported-vc-redist?view=msvc-170) + and reboot your computer + +4. Run the script with the following arguments: + ``` + python main.py -C ``` -5. Run the script: +5. Open the generated `config.yaml` file and edit it with your information. - `python main.py` + The "totp" field is not mandatory, only enter your TOTP key if you use it for 2FA (if + ommitting, don't keep it as an empty string, remove the line completely). -6. (Windows Only) You can set up automatic execution by generating a Task Scheduler XML file. + The "proxy" field is not mandatory, you can omit it if you don't want to use proxy (don't + keep it as an empty string, remove the line completely). - If you are a Windows user, run the `generate_task_xml.py` script to create a `.xml` file. After generating the file, import it into Task Scheduler to schedule automatic execution of the script. This will allow the script to run at the specified time without manual intervention. + You can add or remove accounts according to your will. - To import the XML file into Task Scheduler, see [this guide](https://superuser.com/a/485565/709704). + the "apprise.urls" field is not mandatory, you can remove it if you don't want to get notifications. +6. Run the script: -## Launch arguments + `python main.py` -- `-v/--visible` to disable headless -- `-l/--lang` to force a language (ex: en) -- `-g/--geo` to force a searching geolocation (ex: US) - `https://trends.google.com/trends/ for proper geolocation abbreviation for your choice. These MUST be uppercase!!!` -- `-p/--proxy` to add a proxy to the whole program, supports http/https/socks4/socks5 (overrides per-account proxy in - accounts.json) - `(ex: http://user:pass@host:port)` -- `-cv/--chromeversion` to use a specific version of chrome - `(ex: 118)` -- `-da/--disable-apprise` disables Apprise notifications for the session, overriding [config.yaml](config.yaml). - Useful when running manually as opposed to on a schedule. -- `-t/--searchtype` to only do `desktop` or `mobile` searches, `(ex: --searchtype=mobile)` +7. (Windows Only) You can set up automatic execution by generating a Task Scheduler XML file. + + If you are a Windows user, run the `generate_task_xml.py` script to create a `.xml` file. + After generating the file, import it into Task Scheduler to schedule automatic execution of + the script. This will allow the script to run at the specified time without manual + intervention. + + To import the XML file into Task Scheduler, + see [this guide](https://superuser.com/a/485565/709704). + +## Configuration file + +All the variable listed here can be added to you `config.yaml` configuration file, and the values represented here show +the default ones, if not said otherwise. + +```yaml +# config.yaml +apprise: # 'apprise' is the name of the service used for notifications https://github.com/caronc/apprise + enabled: true # set it to false to disable apprise globally, can be overridden with command-line arguments. + notify: + incomplete-activity: true # set it to false to disable notifications for incomplete activities + uncaught-exception: true # set it to false to disable notifications for uncaught exceptions + login-code: true # set it to false to disable notifications for the temporary M$ Authenticator login code + summary: ON_ERROR # set it to ALWAYS to always receive a summary about your points progression or errors, or to + # NEVER to never receive a summary, even in case of an error. + urls: # add apprise urls here to receive notifications on the specified services : + # https://github.com/caronc/apprise#supported-notifications + # Empty by default. + - discord://{WebhookID}/{WebhookToken} # Exemple url +browser: + geolocation: US # Replace with your country code https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2. + # Can be overridden with command-line arguments. + language: en # Replace with your language code https://en.wikipedia.org/wiki/List_of_ISO_639-1_codes. + # Can be overridden with command-line arguments. + visible: false # set it to true to show the browser window, can be overridden with command-line arguments. + proxy: null # set the global proxy using the 'http://user:pass@host:port' syntax. + # Override per-account proxies. Can be overridden with command-line arguments. +activities: + ignore: # list of activities to ignore, like activities that can't be completed + - Get 50 entries plus 1000 points! + - Safeguard your family's info + search: # list of searches to do for search-based activities + "Black Friday shopping": black friday deals + "Discover open job roles": jobs at microsoft + "Expand your vocabulary": define demure + "Find places to stay": hotels rome italy + "Find somewhere new to explore": directions to new york + "Gaming time": vampire survivors video game + "Get your shopping done faster": new iphone + "Houses near you": apartments manhattan + "How's the economy?": sp 500 + "Learn to cook a new recipe": how cook pierogi + "Let's watch that movie again!": aliens movie + "Plan a quick getaway": flights nyc to paris + "Prepare for the weather": weather tomorrow + "Quickly convert your money": convert 374 usd to yen + "Search the lyrics of a song": black sabbath supernaut lyrics + "Stay on top of the elections": election news latest + "Too tired to cook tonight?": Pizza Hut near me + "Translate anything": translate pencil sharpener to spanish + "What time is it?": china time + "What's for Thanksgiving dinner?": pumpkin pie recipe + "Who won?": braves score + "You can track your package": usps tracking +logging: + level: INFO # Set to DEBUG, WARNING, ERROR or CRITICAL to change the level of displayed information in the terminal + # See https://docs.python.org/3/library/logging.html#logging-levels. Can be overridden with command-line arguments. +retries: + base_delay_in_seconds: 120 # The base wait time between each retries. Multiplied by two each try. + max: 4 # The maximal number of retries to do + strategy: EXPONENTIAL # Set it to CONSTANT to use the same delay between each retries. + # Else, increase it exponentially each time. +cooldown: + min: 300 # The minimal wait time between two searches/activities + max: 600 # The maximal wait time between two searches/activities +search: + type: both # Set it to 'mobile' or 'desktop' to only complete searches on one plateform, + # can be overridden with command-line arguments. +accounts: # The accounts to use. You can put zero, one or an infinite number of accounts here. + # Empty by default, can be overridden with command-line arguments. + - email: Your Email 1 # replace with your email + password: Your Password 1 # replace with your password + totp: 0123 4567 89ab cdef # replace with your totp, or remove it + proxy: http://user:pass@host1:port # replace with your account proxy, or remove it + - email: Your Email 2 # replace with your email + password: Your Password 2 # replace with your password + totp: 0123 4567 89ab cdef # replace with your totp, or remove it + proxy: http://user:pass@host2:port # replace with your account proxy, or remove it +``` + +## Usage + +``` +usage: main.py [-h] [-c CONFIG] [-C] [-v] [-l LANG] [-g GEO] [-em EMAIL] [-pw PASSWORD] + [-p PROXY] [-t {desktop,mobile,both}] [-da] [-d] + +A simple bot that uses Selenium to farm M$ Rewards in Python + +options: + -h, --help show this help message and exit + -c CONFIG, --config CONFIG + Specify the configuration file path + -C, --create-config Create a fillable configuration file with basic settings and given + ones if none exists + -v, --visible Visible browser (Disable headless mode) + -l LANG, --lang LANG Language (ex: en) see https://serpapi.com/google-languages for options + -g GEO, --geo GEO Searching geolocation (ex: US) see https://serpapi.com/google-trends- + locations for options (should be uppercase) + -em EMAIL, --email EMAIL + Email address of the account to run. Only used if a password is given. + -pw PASSWORD, --password PASSWORD + Password of the account to run. Only used if an email is given. + -p PROXY, --proxy PROXY + Global Proxy, supports http/https/socks4/socks5 (overrides config per- + account proxies) `(ex: http://user:pass@host:port)` + -t {desktop,mobile,both}, --searchtype {desktop,mobile,both} + Set to search in either desktop, mobile or both (default: both) + -da, --disable-apprise + Disable Apprise notifications, useful when developing + -d, --debug Set the logging level to DEBUG + +At least one account should be specified, either using command line arguments or a +configuration file. All specified arguments will override the configuration file values +``` + +You can display this message at any moment using `python main.py -h`. ## Features @@ -108,7 +204,8 @@ - Multi-Account Management - Session storing - 2FA Support -- Notifications via [Apprise](https://github.com/caronc/apprise) - no longer limited to Telegram or Discord +- Notifications via [Apprise](https://github.com/caronc/apprise) - no longer limited to + Telegram or Discord - Proxy Support (3.0) - they need to be **high quality** proxies - Logs to CSV file for point tracking @@ -117,8 +214,8 @@ Fork this repo and: * if providing a bugfix, create a pull request into master. -* if providing a new feature, please create a pull request into develop. Extra points if you update - the [CHANGELOG.md](CHANGELOG.md). +* if providing a new feature, please create a pull request into develop. Extra points if you + update the [CHANGELOG.md](CHANGELOG.md). ## To Do List (When time permits or someone makes a PR) diff --git a/accounts.json.sample b/accounts.json.sample deleted file mode 100644 index 9aceda60..00000000 --- a/accounts.json.sample +++ /dev/null @@ -1,14 +0,0 @@ -[ - { - "username": "Your Email 1", - "password": "Your Password 1", - "totp": "0123 4567 89ab cdef", - "proxy": "http://user:pass@host1:port" - }, - { - "username": "Your Email 2", - "password": "Your Password 2", - "totp": "0123 4567 89ab cdef", - "proxy": "http://user:pass@host2:port" - } -] diff --git a/activities.py b/activities.py new file mode 100644 index 00000000..bebb0a8c --- /dev/null +++ b/activities.py @@ -0,0 +1,141 @@ +import contextlib +import random +import time + +from selenium.common import TimeoutException +from selenium.webdriver.common.by import By +from selenium.webdriver.remote.webelement import WebElement + +from src.browser import Browser + + +class Activities: + def __init__(self, browser: Browser): + self.browser = browser + self.webdriver = browser.webdriver + + def openDailySetActivity(self, cardId: int): + # Open the Daily Set activity for the given cardId + element = self.webdriver.find_element(By.XPATH, + f'//*[@id="daily-sets"]/mee-card-group[1]/div/mee-card[{cardId}]/div/card-content/mee-rewards-daily-set-item-content/div/a', ) + self.browser.utils.click(element) + self.browser.utils.switchToNewTab(timeToWait=8) + + def openMorePromotionsActivity(self, cardId: int): + # Open the More Promotions activity for the given cardId + element = self.webdriver.find_element(By.CSS_SELECTOR, + f"#more-activities > .m-card-group > .ng-scope:nth-child({cardId + 1}) .ds-card-sec") + self.browser.utils.click(element) + self.browser.utils.switchToNewTab(timeToWait=5) + + def completeSearch(self): + # Simulate completing a search activity + time.sleep(random.randint(10, 15)) + self.browser.utils.closeCurrentTab() + + def completeSurvey(self): + # Simulate completing a survey activity + # noinspection SpellCheckingInspection + self.webdriver.find_element(By.ID, f"btoption{random.randint(0, 1)}").click() + time.sleep(random.randint(10, 15)) + self.browser.utils.closeCurrentTab() + + def completeQuiz(self): + # Simulate completing a quiz activity + with contextlib.suppress(TimeoutException): + startQuiz = self.browser.utils.waitUntilQuizLoads() + self.browser.utils.click(startQuiz) + self.browser.utils.waitUntilVisible( + By.ID, "overlayPanel", 5 + ) + currentQuestionNumber: int = self.webdriver.execute_script( + "return _w.rewardsQuizRenderInfo.currentQuestionNumber" + ) + maxQuestions = self.webdriver.execute_script( + "return _w.rewardsQuizRenderInfo.maxQuestions" + ) + numberOfOptions = self.webdriver.execute_script( + "return _w.rewardsQuizRenderInfo.numberOfOptions" + ) + for _ in range(currentQuestionNumber, maxQuestions + 1): + if numberOfOptions == 8: + answers = [] + for i in range(numberOfOptions): + isCorrectOption = self.webdriver.find_element( + By.ID, f"rqAnswerOption{i}" + ).get_attribute("iscorrectoption") + if isCorrectOption and isCorrectOption.lower() == "true": + answers.append(f"rqAnswerOption{i}") + for answer in answers: + element = self.webdriver.find_element(By.ID, answer) + self.browser.utils.click(element) + self.browser.utils.waitUntilQuestionRefresh() + elif numberOfOptions in [2, 3, 4]: + correctOption = self.webdriver.execute_script( + "return _w.rewardsQuizRenderInfo.correctAnswer" + ) + for i in range(numberOfOptions): + if ( + self.webdriver.find_element( + By.ID, f"rqAnswerOption{i}" + ).get_attribute("data-option") + == correctOption + ): + element = self.webdriver.find_element(By.ID, f"rqAnswerOption{i}") + self.browser.utils.click(element) + + self.browser.utils.waitUntilQuestionRefresh() + break + self.browser.utils.closeCurrentTab() + + def completeABC(self): + # Simulate completing an ABC activity + counter = self.webdriver.find_element( + By.XPATH, '//*[@id="QuestionPane0"]/div[2]' + ).text[:-1][1:] + numberOfQuestions = max(int(s) for s in counter.split() if s.isdigit()) + for question in range(numberOfQuestions): + element = self.webdriver.find_element(By.ID, f"questionOptionChoice{question}{random.randint(0, 2)}") + self.browser.utils.click(element) + time.sleep(random.randint(10, 15)) + element = self.webdriver.find_element(By.ID, f"nextQuestionbtn{question}") + self.browser.utils.click(element) + time.sleep(random.randint(10, 15)) + time.sleep(random.randint(1, 7)) + self.browser.utils.closeCurrentTab() + + def completeThisOrThat(self): + # Simulate completing a This or That activity + startQuiz = self.browser.utils.waitUntilQuizLoads() + self.browser.utils.click(startQuiz) + self.browser.utils.waitUntilVisible( + By.XPATH, '//*[@id="currentQuestionContainer"]/div/div[1]', 10 + ) + time.sleep(random.randint(10, 15)) + for _ in range(10): + correctAnswerCode = self.webdriver.execute_script( + "return _w.rewardsQuizRenderInfo.correctAnswer" + ) + answer1, answer1Code = self.getAnswerAndCode("rqAnswerOption0") + answer2, answer2Code = self.getAnswerAndCode("rqAnswerOption1") + answerToClick: WebElement + if answer1Code == correctAnswerCode: + answerToClick = answer1 + elif answer2Code == correctAnswerCode: + answerToClick = answer2 + + self.browser.utils.click(answerToClick) + time.sleep(random.randint(10, 15)) + + time.sleep(random.randint(10, 15)) + self.browser.utils.closeCurrentTab() + + def getAnswerAndCode(self, answerId: str) -> tuple[WebElement, str]: + # Helper function to get answer element and its code + answerEncodeKey = self.webdriver.execute_script("return _G.IG") + answer = self.webdriver.find_element(By.ID, answerId) + answerTitle = answer.get_attribute("data-option") + return ( + answer, + self.browser.utils.getAnswerCode(answerEncodeKey, answerTitle), + ) diff --git a/config.yaml b/config.yaml deleted file mode 100644 index 7e45c418..00000000 --- a/config.yaml +++ /dev/null @@ -1,7 +0,0 @@ -# config.yaml -apprise: - summary: ON_ERROR -retries: - base_delay_in_seconds: 14.0625 # base_delay_in_seconds * 2^max = 14.0625 * 2^6 = 900 = 15 minutes - max: 8 - strategy: EXPONENTIAL diff --git a/docker.sh b/docker.sh new file mode 100644 index 00000000..08dedc2e --- /dev/null +++ b/docker.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +# Check if RUN_ONCE environment variable is set. In case, running the script now and exiting. +if [ "$RUN_ONCE" = "true" ] +then + echo "RUN_ONCE environment variable is set. Running the script now and exiting." + python main.py + exit 0 +fi +# Check if CRON_SCHEDULE environment variable is set +if [ -z "$CRON_SCHEDULE" ] +then + echo "CRON_SCHEDULE environment variable is not set. Setting it to 4 AM everyday by default" + CRON_SCHEDULE="0 4 * * *" +fi + +# Setting up cron job +echo "$CRON_SCHEDULE root python /app/main.py >> /var/log/cron.log 2>&1" > /etc/cron.d/rewards-cron-job + +# Give execution rights on the cron job +chmod 0644 /etc/cron.d/rewards-cron-job + +# Apply cron job +crontab /etc/cron.d/rewards-cron-job + +# Create the log file to be able to run tail +touch /var/log/cron.log + +echo "Cron job is set to run at $CRON_SCHEDULE. Waiting for the cron to run..." + +# Run the cron +cron && tail -f /var/log/cron.log diff --git a/main.py b/main.py index 813c0382..7b40bfab 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,8 @@ -import argparse import csv import json import logging import logging.config import logging.handlers as handlers -import random -import re import sys import traceback from datetime import datetime @@ -14,38 +11,34 @@ from src import ( Browser, Login, - MorePromotions, PunchCards, Searches, ReadToEarn, - DailySet, - Account, ) +from src.activities import Activities from src.browser import RemainingSearches from src.loggingColoredFormatter import ColoredFormatter -from src.utils import Utils +from src.utils import CONFIG, sendNotification, getProjectRoot, formatNumber def main(): - args = argumentParser() - Utils.args = args setupLogging() - loadedAccounts = setupAccounts() # Load previous day's points data previous_points_data = load_previous_points_data() - for currentAccount in loadedAccounts: + for currentAccount in CONFIG.accounts: try: - earned_points = executeBot(currentAccount, args) + earned_points = executeBot(currentAccount) except Exception as e1: logging.error("", exc_info=True) - Utils.sendNotification( - f"⚠️ Error executing {currentAccount.username}, please check the log", + sendNotification( + f"⚠️ Error executing {currentAccount.email}, please check the log", traceback.format_exc(), + e1, ) continue - previous_points = previous_points_data.get(currentAccount.username, 0) + previous_points = previous_points_data.get(currentAccount.email, 0) # Calculate the difference in points from the prior day points_difference = earned_points - previous_points @@ -54,10 +47,10 @@ def main(): log_daily_points_to_csv(earned_points, points_difference) # Update the previous day's points data - previous_points_data[currentAccount.username] = earned_points + previous_points_data[currentAccount.email] = earned_points logging.info( - f"[POINTS] Data for '{currentAccount.username}' appended to the file." + f"[POINTS] Data for '{currentAccount.email}' appended to the file." ) # Save the current day's points data for the next day in the "logs" folder @@ -66,7 +59,7 @@ def main(): def log_daily_points_to_csv(earned_points, points_difference): - logs_directory = Utils.getProjectRoot() / "logs" + logs_directory = getProjectRoot() / "logs" csv_filename = logs_directory / "points_data.csv" # Create a new row with the date, daily points, and points difference @@ -90,15 +83,14 @@ def log_daily_points_to_csv(earned_points, points_difference): def setupLogging(): - _format = "%(asctime)s [%(levelname)s] %(message)s" + _format = CONFIG.logging.format terminalHandler = logging.StreamHandler(sys.stdout) terminalHandler.setFormatter(ColoredFormatter(_format)) - logs_directory = Utils.getProjectRoot() / "logs" + logs_directory = getProjectRoot() / "logs" logs_directory.mkdir(parents=True, exist_ok=True) # so only our code is logged if level=logging.DEBUG or finer - # if not working see https://stackoverflow.com/a/48891485/4164390 logging.config.dictConfig( { "version": 1, @@ -106,7 +98,7 @@ def setupLogging(): } ) logging.basicConfig( - level=logging.DEBUG, + level=logging.getLevelName(CONFIG.logging.level.upper()), format=_format, handlers=[ handlers.TimedRotatingFileHandler( @@ -121,88 +113,6 @@ def setupLogging(): ) -def argumentParser() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="MS Rewards Farmer") - parser.add_argument( - "-v", "--visible", action="store_true", help="Optional: Visible browser" - ) - parser.add_argument( - "-l", "--lang", type=str, default=None, help="Optional: Language (ex: en)" - ) - parser.add_argument( - "-g", "--geo", type=str, default=None, help="Optional: Geolocation (ex: US)" - ) - parser.add_argument( - "-p", - "--proxy", - type=str, - default=None, - help="Optional: Global Proxy (ex: http://user:pass@host:port)", - ) - parser.add_argument( - "-vn", - "--verbosenotifs", - action="store_true", - help="Optional: Send all the logs to the notification service", - ) - parser.add_argument( - "-cv", - "--chromeversion", - type=int, - default=None, - help="Optional: Set fixed Chrome version (ex. 118)", - ) - parser.add_argument( - "-da", - "--disable-apprise", - action="store_true", - help="Optional: Disable Apprise, overrides config.yaml, useful when developing", - ) - parser.add_argument( - "-t", - "--searchtype", - type=str, - default=None, - help="Optional: Set to only search in either desktop or mobile (ex: 'desktop' or 'mobile')", - ) - return parser.parse_args() - - -def setupAccounts() -> list[Account]: - """Sets up and validates a list of accounts loaded from 'accounts.json'.""" - - def validEmail(email: str) -> bool: - """Validate Email.""" - pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$" - return bool(re.match(pattern, email)) - - accountPath = Utils.getProjectRoot() / "accounts.json" - if not accountPath.exists(): - accountPath.write_text( - json.dumps( - [{"username": "Your Email", "password": "Your Password"}], indent=4 - ), - encoding="utf-8", - ) - noAccountsNotice = """ - [ACCOUNT] Accounts credential file "accounts.json" not found. - [ACCOUNT] A new file has been created, please edit with your credentials and save. - """ - logging.warning(noAccountsNotice) - exit(1) - loadedAccounts: list[Account] = [] - for rawAccount in json.loads(accountPath.read_text(encoding="utf-8")): - account: Account = Account(**rawAccount) - if not validEmail(account.username): - logging.warning( - f"[CREDENTIALS] Invalid email: {account.username}, skipping this account" - ) - continue - loadedAccounts.append(account) - random.shuffle(loadedAccounts) - return loadedAccounts - - class AppriseSummary(Enum): """ configures how results are summarized via Apprise @@ -222,8 +132,8 @@ class AppriseSummary(Enum): """ -def executeBot(currentAccount: Account, args: argparse.Namespace): - logging.info(f"********************{currentAccount.username}********************") +def executeBot(currentAccount): + logging.info(f"********************{currentAccount.email}********************") startingPoints: int | None = None accountPoints: int @@ -231,18 +141,16 @@ def executeBot(currentAccount: Account, args: argparse.Namespace): goalTitle: str goalPoints: int - if args.searchtype in ("desktop", None): - with Browser(mobile=False, account=currentAccount, args=args) as desktopBrowser: + if CONFIG.search.type in ("desktop", "both", None): + with Browser(mobile=False, account=currentAccount) as desktopBrowser: utils = desktopBrowser.utils - Login(desktopBrowser, args).login() + Login(desktopBrowser).login() startingPoints = utils.getAccountPoints() logging.info( - f"[POINTS] You have {utils.formatNumber(startingPoints)} points on your account" + f"[POINTS] You have {formatNumber(startingPoints)} points on your account" ) - # todo Combine these classes so main loop isn't duplicated - DailySet(desktopBrowser).completeDailySet() + Activities(desktopBrowser).completeActivities() PunchCards(desktopBrowser).completePunchCards() - MorePromotions(desktopBrowser).completeMorePromotions() # VersusGame(desktopBrowser).completeVersusGame() with Searches(desktopBrowser) as searches: @@ -256,10 +164,10 @@ def executeBot(currentAccount: Account, args: argparse.Namespace): ) accountPoints = utils.getAccountPoints() - if args.searchtype in ("mobile", None): - with Browser(mobile=True, account=currentAccount, args=args) as mobileBrowser: + if CONFIG.search.type in ("mobile", "both", None): + with Browser(mobile=True, account=currentAccount) as mobileBrowser: utils = mobileBrowser.utils - Login(mobileBrowser, args).login() + Login(mobileBrowser).login() if startingPoints is None: startingPoints = utils.getAccountPoints() ReadToEarn(mobileBrowser).completeReadToEarn() @@ -275,42 +183,38 @@ def executeBot(currentAccount: Account, args: argparse.Namespace): accountPoints = utils.getAccountPoints() logging.info( - f"[POINTS] You have earned {Utils.formatNumber(accountPoints - startingPoints)} points this run !" + f"[POINTS] You have earned {formatNumber(accountPoints - startingPoints)} points this run !" ) - logging.info( - f"[POINTS] You are now at {Utils.formatNumber(accountPoints)} points !" - ) - appriseSummary = AppriseSummary[ - Utils.loadConfig().get("apprise", {}).get("summary", AppriseSummary.ALWAYS.name) - ] + logging.info(f"[POINTS] You are now at {formatNumber(accountPoints)} points !") + appriseSummary = AppriseSummary[CONFIG.apprise.summary] if appriseSummary == AppriseSummary.ALWAYS: goalStatus = "" if goalPoints > 0: logging.info( - f"[POINTS] You are now at {(Utils.formatNumber((accountPoints / goalPoints) * 100))}% of your " + f"[POINTS] You are now at {(formatNumber((accountPoints / goalPoints) * 100))}% of your " f"goal ({goalTitle}) !" ) goalStatus = ( - f"🎯 Goal reached: {(Utils.formatNumber((accountPoints / goalPoints) * 100))}%" + f"🎯 Goal reached: {(formatNumber((accountPoints / goalPoints) * 100))}%" f" ({goalTitle})" ) - Utils.sendNotification( + sendNotification( "Daily Points Update", "\n".join( [ - f"👤 Account: {currentAccount.username}", - f"⭐️ Points earned today: {Utils.formatNumber(accountPoints - startingPoints)}", - f"💰 Total points: {Utils.formatNumber(accountPoints)}", + f"👤 Account: {currentAccount.email}", + f"⭐️ Points earned today: {formatNumber(accountPoints - startingPoints)}", + f"💰 Total points: {formatNumber(accountPoints)}", goalStatus, ] ), ) elif appriseSummary == AppriseSummary.ON_ERROR: if remainingSearches.getTotal() > 0: - Utils.sendNotification( + sendNotification( "Error: remaining searches", - f"account username: {currentAccount.username}, {remainingSearches}", + f"account email: {currentAccount.email}, {remainingSearches}", ) elif appriseSummary == AppriseSummary.NEVER: pass @@ -319,7 +223,7 @@ def executeBot(currentAccount: Account, args: argparse.Namespace): def export_points_to_csv(points_data): - logs_directory = Utils.getProjectRoot() / "logs" + logs_directory = getProjectRoot() / "logs" csv_filename = logs_directory / "points_data.csv" with open(csv_filename, mode="a", newline="") as file: # Use "a" mode for append fieldnames = ["Account", "Earned Points", "Points Difference"] @@ -336,9 +240,7 @@ def export_points_to_csv(points_data): # Define a function to load the previous day's points data from a file in the "logs" folder def load_previous_points_data(): try: - with open( - Utils.getProjectRoot() / "logs" / "previous_points_data.json", "r" - ) as file: + with open(getProjectRoot() / "logs" / "previous_points_data.json", "r") as file: return json.load(file) except FileNotFoundError: return {} @@ -346,7 +248,7 @@ def load_previous_points_data(): # Define a function to save the current day's points data for the next day in the "logs" folder def save_previous_points_data(data): - logs_directory = Utils.getProjectRoot() / "logs" + logs_directory = getProjectRoot() / "logs" with open(logs_directory / "previous_points_data.json", "w") as file: json.dump(data, file, indent=4) @@ -356,6 +258,7 @@ def save_previous_points_data(data): main() except Exception as e: logging.exception("") - Utils.sendNotification( - "⚠️ Error occurred, please check the log", traceback.format_exc() + sendNotification( + "⚠️ Error occurred, please check the log", traceback.format_exc(), e ) + exit(1) diff --git a/requirements.txt b/requirements.txt index 859f29d1..61f520c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,15 +1,16 @@ -requests~=2.32.3 -selenium>=4.15.2 # not directly required, pinned by Snyk to avoid a vulnerability -ipapi~=1.0.4 -undetected-chromedriver==3.5.5 -selenium-wire~=5.1.0 +apprise~=1.9.0 +blinker==1.7.0 # prevents issues on newer versions numpy>=1.22.2 # not directly required, pinned by Snyk to avoid a vulnerability -setuptools +ipapi~=1.0.4 psutil -blinker==1.7.0 # prevents issues on newer versions -apprise~=1.8.1 +pycountry~=24.6.1 +pyotp~=2.9.0 pyyaml~=6.0.2 -urllib3>=2.2.2 # not directly required, pinned by Snyk to avoid a vulnerability requests-oauthlib~=2.0.0 -zipp>=3.19.1 # not directly required, pinned by Snyk to avoid a vulnerability -pyotp~=2.9.0 +requests~=2.32.3 +selenium-wire~=5.1.0 +selenium>=4.15.2 # not directly required, pinned by Snyk to avoid a vulnerability +setuptools +undetected-chromedriver==3.5.5 +urllib3>=2.2.2 # not directly required, pinned by Snyk to avoid a vulnerability +zipp>=3.19.1 # not directly required, pinned by Snyk to avoid a vulnerability \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py index 82174ec9..9d6a2fe8 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -1,9 +1,6 @@ -from .account import Account from .remainingSearches import RemainingSearches from .browser import Browser -from .dailySet import DailySet from .login import Login -from .morePromotions import MorePromotions from .punchCards import PunchCards from .readToEarn import ReadToEarn from .searches import Searches diff --git a/src/account.py b/src/account.py deleted file mode 100644 index 3ec30515..00000000 --- a/src/account.py +++ /dev/null @@ -1,9 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class Account: - username: str - password: str - totp: str | None = None - proxy: str | None = None diff --git a/src/activities.py b/src/activities.py index bebb0a8c..1a7d5453 100644 --- a/src/activities.py +++ b/src/activities.py @@ -1,12 +1,15 @@ import contextlib -import random -import time +import logging +from random import randint +from time import sleep from selenium.common import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.remote.webelement import WebElement from src.browser import Browser +from src.constants import REWARDS_URL +from src.utils import CONFIG, sendNotification, getAnswerCode class Activities: @@ -16,38 +19,39 @@ def __init__(self, browser: Browser): def openDailySetActivity(self, cardId: int): # Open the Daily Set activity for the given cardId - element = self.webdriver.find_element(By.XPATH, - f'//*[@id="daily-sets"]/mee-card-group[1]/div/mee-card[{cardId}]/div/card-content/mee-rewards-daily-set-item-content/div/a', ) + cardId += 1 + element = self.webdriver.find_element( + By.XPATH, + f'//*[@id="daily-sets"]/mee-card-group[1]/div/mee-card[{cardId}]/div/card-content/mee-rewards-daily-set-item-content/div/a', + ) self.browser.utils.click(element) - self.browser.utils.switchToNewTab(timeToWait=8) + self.browser.utils.switchToNewTab() def openMorePromotionsActivity(self, cardId: int): + cardId += 1 # Open the More Promotions activity for the given cardId - element = self.webdriver.find_element(By.CSS_SELECTOR, - f"#more-activities > .m-card-group > .ng-scope:nth-child({cardId + 1}) .ds-card-sec") + element = self.webdriver.find_element( + By.CSS_SELECTOR, + f"#more-activities > .m-card-group > .ng-scope:nth-child({cardId}) .ds-card-sec", + ) self.browser.utils.click(element) - self.browser.utils.switchToNewTab(timeToWait=5) + self.browser.utils.switchToNewTab() def completeSearch(self): # Simulate completing a search activity - time.sleep(random.randint(10, 15)) - self.browser.utils.closeCurrentTab() + pass def completeSurvey(self): # Simulate completing a survey activity # noinspection SpellCheckingInspection - self.webdriver.find_element(By.ID, f"btoption{random.randint(0, 1)}").click() - time.sleep(random.randint(10, 15)) - self.browser.utils.closeCurrentTab() + self.webdriver.find_element(By.ID, f"btoption{randint(0, 1)}").click() def completeQuiz(self): # Simulate completing a quiz activity with contextlib.suppress(TimeoutException): startQuiz = self.browser.utils.waitUntilQuizLoads() self.browser.utils.click(startQuiz) - self.browser.utils.waitUntilVisible( - By.ID, "overlayPanel", 5 - ) + self.browser.utils.waitUntilVisible(By.ID, "overlayPanel", 5) currentQuestionNumber: int = self.webdriver.execute_script( "return _w.rewardsQuizRenderInfo.currentQuestionNumber" ) @@ -81,12 +85,13 @@ def completeQuiz(self): ).get_attribute("data-option") == correctOption ): - element = self.webdriver.find_element(By.ID, f"rqAnswerOption{i}") + element = self.webdriver.find_element( + By.ID, f"rqAnswerOption{i}" + ) self.browser.utils.click(element) self.browser.utils.waitUntilQuestionRefresh() break - self.browser.utils.closeCurrentTab() def completeABC(self): # Simulate completing an ABC activity @@ -95,14 +100,14 @@ def completeABC(self): ).text[:-1][1:] numberOfQuestions = max(int(s) for s in counter.split() if s.isdigit()) for question in range(numberOfQuestions): - element = self.webdriver.find_element(By.ID, f"questionOptionChoice{question}{random.randint(0, 2)}") + element = self.webdriver.find_element( + By.ID, f"questionOptionChoice{question}{randint(0, 2)}" + ) self.browser.utils.click(element) - time.sleep(random.randint(10, 15)) + sleep(randint(10, 15)) element = self.webdriver.find_element(By.ID, f"nextQuestionbtn{question}") self.browser.utils.click(element) - time.sleep(random.randint(10, 15)) - time.sleep(random.randint(1, 7)) - self.browser.utils.closeCurrentTab() + sleep(randint(10, 15)) def completeThisOrThat(self): # Simulate completing a This or That activity @@ -111,7 +116,7 @@ def completeThisOrThat(self): self.browser.utils.waitUntilVisible( By.XPATH, '//*[@id="currentQuestionContainer"]/div/div[1]', 10 ) - time.sleep(random.randint(10, 15)) + sleep(randint(10, 15)) for _ in range(10): correctAnswerCode = self.webdriver.execute_script( "return _w.rewardsQuizRenderInfo.correctAnswer" @@ -125,10 +130,7 @@ def completeThisOrThat(self): answerToClick = answer2 self.browser.utils.click(answerToClick) - time.sleep(random.randint(10, 15)) - - time.sleep(random.randint(10, 15)) - self.browser.utils.closeCurrentTab() + sleep(randint(10, 15)) def getAnswerAndCode(self, answerId: str) -> tuple[WebElement, str]: # Helper function to get answer element and its code @@ -137,5 +139,97 @@ def getAnswerAndCode(self, answerId: str) -> tuple[WebElement, str]: answerTitle = answer.get_attribute("data-option") return ( answer, - self.browser.utils.getAnswerCode(answerEncodeKey, answerTitle), + getAnswerCode(answerEncodeKey, answerTitle), ) + + def doActivity(self, activity: dict, activities: list[dict]) -> None: + try: + activityTitle = cleanupActivityTitle(activity["title"]) + logging.debug(f"activityTitle={activityTitle}") + if activity["complete"] is True or activity["pointProgressMax"] == 0: + logging.debug("Already done, returning") + return + if activityTitle in CONFIG.activities.ignore: + logging.debug(f"Ignoring {activityTitle}") + return + # Open the activity for the activity + cardId = activities.index(activity) + isDailySet = ( + "daily_set_date" in activity["attributes"] + and activity["attributes"]["daily_set_date"] + ) + if isDailySet: + self.openDailySetActivity(cardId) + else: + self.openMorePromotionsActivity(cardId) + with contextlib.suppress(TimeoutException): + searchbar = self.browser.utils.waitUntilClickable(By.ID, "sb_form_q") + self.browser.utils.click(searchbar) + if activityTitle in CONFIG.activities.search: + searchbar.send_keys(CONFIG.activities.search[activityTitle]) + sleep(2) + searchbar.submit() + elif "poll" in activityTitle: + logging.info(f"[ACTIVITY] Completing poll of card {cardId}") + # Complete survey for a specific scenario + self.completeSurvey() + elif activity["promotionType"] == "urlreward": + # Complete search for URL reward + self.completeSearch() + elif activity["promotionType"] == "quiz": + # Complete different types of quizzes based on point progress max + if activity["pointProgressMax"] == 10: + self.completeABC() + elif activity["pointProgressMax"] in [30, 40]: + self.completeQuiz() + elif activity["pointProgressMax"] == 50: + self.completeThisOrThat() + else: + # Default to completing search + self.completeSearch() + except Exception: + logging.error(f"[ACTIVITY] Error doing {activityTitle}", exc_info=True) + sleep(randint(CONFIG.cooldown.min, CONFIG.cooldown.max)) + self.browser.utils.resetTabs() + + def completeActivities(self): + logging.info("[DAILY SET] " + "Trying to complete the Daily Set...") + dailySetPromotions = self.browser.utils.getDailySetPromotions() + self.browser.utils.goToRewards() + for activity in dailySetPromotions: + self.doActivity(activity, dailySetPromotions) + logging.info("[DAILY SET] Done") + + logging.info("[MORE PROMOS] " + "Trying to complete More Promotions...") + morePromotions: list[dict] = self.browser.utils.getMorePromotions() + self.browser.utils.goToRewards() + for activity in morePromotions: + self.doActivity(activity, morePromotions) + logging.info("[MORE PROMOS] Done") + + # todo Send one email for all accounts? + # fixme This is falsely considering some activities incomplete when complete + if CONFIG.get('apprise.notify.incomplete-activity'): + incompleteActivities: dict[str, tuple[str, str, str]] = {} + for activity in ( + self.browser.utils.getDailySetPromotions() + + self.browser.utils.getMorePromotions() + ): # Have to refresh + if activity["pointProgress"] < activity["pointProgressMax"]: + incompleteActivities[cleanupActivityTitle(activity["title"])] = ( + activity["promotionType"], + activity["pointProgress"], + activity["pointProgressMax"], + ) + for incompleteActivityToIgnore in CONFIG.activities.ignore: + incompleteActivities.pop(incompleteActivityToIgnore, None) + if incompleteActivities: + logging.info(f"incompleteActivities: {incompleteActivities}") + sendNotification( + f"We found some incomplete activities for {self.browser.email}", + str(incompleteActivities) + "\n" + REWARDS_URL, + ) + + +def cleanupActivityTitle(activityTitle: str) -> str: + return activityTitle.replace("\u200b", "").replace("\xa0", " ") diff --git a/src/browser.py b/src/browser.py index 93215280..32000736 100644 --- a/src/browser.py +++ b/src/browser.py @@ -1,20 +1,23 @@ -import argparse +import contextlib +import locale import logging +import os import random from pathlib import Path from types import TracebackType from typing import Any, Type import ipapi +import pycountry import seleniumwire.undetected_chromedriver as webdriver import undetected_chromedriver from ipapi.exceptions import RateLimited from selenium.webdriver import ChromeOptions from selenium.webdriver.chrome.webdriver import WebDriver -from src import Account, RemainingSearches +from src import RemainingSearches from src.userAgentGenerator import GenerateUserAgent -from src.utils import Utils +from src.utils import CONFIG, Utils, getBrowserConfig, getProjectRoot, saveBrowserConfig class Browser: @@ -23,24 +26,22 @@ class Browser: webdriver: undetected_chromedriver.Chrome def __init__( - self, mobile: bool, account: Account, args: argparse.Namespace + self, mobile: bool, account ) -> None: # Initialize browser instance logging.debug("in __init__") self.mobile = mobile self.browserType = "mobile" if mobile else "desktop" - self.headless = not args.visible - self.username = account.username + self.headless = not CONFIG.browser.visible + self.email = account.email self.password = account.password - self.totp = account.totp - self.localeLang, self.localeGeo = self.getCCodeLang(args.lang, args.geo) - self.proxy = None - if args.proxy: - self.proxy = args.proxy - elif account.proxy: + self.totp = account.get('totp') + self.localeLang, self.localeGeo = self.getLanguageCountry(CONFIG.browser.language, CONFIG.browser.geolocation) + self.proxy = CONFIG.browser.proxy + if not self.proxy and account.get('proxy'): self.proxy = account.proxy self.userDataDir = self.setupProfiles() - self.browserConfig = Utils.getBrowserConfig(self.userDataDir) + self.browserConfig = getBrowserConfig(self.userDataDir) ( self.userAgent, self.userAgentMetadata, @@ -48,7 +49,7 @@ def __init__( ) = GenerateUserAgent().userAgent(self.browserConfig, mobile) if newBrowserConfig: self.browserConfig = newBrowserConfig - Utils.saveBrowserConfig(self.userDataDir, self.browserConfig) + saveBrowserConfig(self.userDataDir, self.browserConfig) self.webdriver = self.browserSetup() self.utils = Utils(self.webdriver) logging.debug("out __init__") @@ -58,10 +59,10 @@ def __enter__(self): return self def __exit__( - self, - exc_type: Type[BaseException] | None, - exc_value: BaseException | None, - traceback: TracebackType | None, + self, + exc_type: Type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, ): # Cleanup actions when exiting the browser context logging.debug( @@ -79,10 +80,15 @@ def browserSetup( options.headless = self.headless options.add_argument(f"--lang={self.localeLang}") options.add_argument("--log-level=3") - options.add_argument("--blink-settings=imagesEnabled=false") #If you are having MFA sign in issues comment this line out + options.add_argument( + "--blink-settings=imagesEnabled=false" + ) # If you are having MFA sign in issues comment this line out options.add_argument("--ignore-certificate-errors") options.add_argument("--ignore-certificate-errors-spki-list") options.add_argument("--ignore-ssl-errors") + if os.environ.get("DOCKER"): + options.add_argument("--disable-dev-shm-usage") + options.add_argument("--headless=new") options.add_argument("--no-sandbox") options.add_argument("--disable-extensions") options.add_argument("--dns-prefetch-disable") @@ -90,7 +96,9 @@ def browserSetup( options.add_argument("--disable-default-apps") options.add_argument("--disable-features=Translate") options.add_argument("--disable-features=PrivacySandboxSettings4") - options.add_argument("--disable-search-engine-choice-screen") #153 + options.add_argument("--disable-http2") + options.add_argument("--disable-search-engine-choice-screen") # 153 + options.page_load_strategy = "eager" seleniumwireOptions: dict[str, Any] = {"verify_ssl": False} @@ -101,17 +109,26 @@ def browserSetup( "https": self.proxy, "no_proxy": "localhost,127.0.0.1", } + driver = None - # Obtain webdriver chrome driver version - version = self.getChromeVersion() - major = int(version.split(".")[0]) + if os.environ.get("DOCKER"): + driver = webdriver.Chrome( + options=options, + seleniumwire_options=seleniumwireOptions, + user_data_dir=self.userDataDir.as_posix(), + driver_executable_path="/usr/bin/chromedriver", + ) + else: + # Obtain webdriver chrome driver version + version = self.getChromeVersion() + major = int(version.split(".")[0]) - driver = webdriver.Chrome( - options=options, - seleniumwire_options=seleniumwireOptions, - user_data_dir=self.userDataDir.as_posix(), - version_main=major, - ) + driver = webdriver.Chrome( + options=options, + seleniumwire_options=seleniumwireOptions, + user_data_dir=self.userDataDir.as_posix(), + version_main=major, + ) seleniumLogger = logging.getLogger("seleniumwire") seleniumLogger.setLevel(logging.ERROR) @@ -130,7 +147,7 @@ def browserSetup( "height": deviceHeight, "width": deviceWidth, } - Utils.saveBrowserConfig(self.userDataDir, self.browserConfig) + saveBrowserConfig(self.userDataDir, self.browserConfig) if self.mobile: screenHeight = deviceHeight + 146 @@ -185,34 +202,62 @@ def browserSetup( def setupProfiles(self) -> Path: """ Sets up the sessions profile for the chrome browser. - Uses the username to create a unique profile for the session. + Uses the email to create a unique profile for the session. Returns: Path """ - sessionsDir = Utils.getProjectRoot() / "sessions" + sessionsDir = getProjectRoot() / "sessions" - # Concatenate username and browser type for a plain text session ID - sessionid = f"{self.username}" + # Concatenate email and browser type for a plain text session ID + sessionid = f"{self.email}" sessionsDir = sessionsDir / sessionid sessionsDir.mkdir(parents=True, exist_ok=True) return sessionsDir @staticmethod - def getCCodeLang(lang: str, geo: str) -> tuple: - if lang is None or geo is None: + def getLanguageCountry(language: str, country: str) -> tuple[str, str]: + if not country: + country = CONFIG.browser.geolocation + + if not language: + country = CONFIG.browser.language + + if not language or not country: + currentLocale = locale.getlocale() + if not language: + with contextlib.suppress(ValueError): + language = pycountry.languages.get( + alpha_2=currentLocale[0].split("_")[0] + ).alpha_2 + if not country: + with contextlib.suppress(ValueError): + country = pycountry.countries.get( + alpha_2=currentLocale[0].split("_")[1] + ).alpha_2 + + if not language or not country: try: - nfo = ipapi.location() + ipapiLocation = ipapi.location() + if not language: + language = ipapiLocation["languages"].split(",")[0].split("-")[0] + if not country: + country = ipapiLocation["country"] except RateLimited: - logging.warning("Returning default", exc_info=True) - return "en", "US" - if isinstance(nfo, dict): - if lang is None: - lang = nfo["languages"].split(",")[0].split("-")[0] - if geo is None: - geo = nfo["country"] - return lang, geo + logging.warning(exc_info=True) + + if not language: + language = "en" + logging.warning( + f"Not able to figure language returning default: {language}" + ) + + if not country: + country = "US" + logging.warning(f"Not able to figure country returning default: {country}") + + return language, country @staticmethod def getChromeVersion() -> str: @@ -229,29 +274,42 @@ def getChromeVersion() -> str: return version def getRemainingSearches( - self, desktopAndMobile: bool = False + self, desktopAndMobile: bool = False ) -> RemainingSearches | int: - dashboard = self.utils.getDashboardData() + # bingInfo = self.utils.getBingInfo() + bingInfo = self.utils.getDashboardData() searchPoints = 1 - counters = dashboard["userStatus"]["counters"] - - progressDesktop = counters["pcSearch"][0]["pointProgress"] - targetDesktop = counters["pcSearch"][0]["pointProgressMax"] - if len(counters["pcSearch"]) >= 2: - progressDesktop = progressDesktop + counters["pcSearch"][1]["pointProgress"] - targetDesktop = targetDesktop + counters["pcSearch"][1]["pointProgressMax"] - if targetDesktop in [30, 90, 102]: + counters = bingInfo["userStatus"]["counters"] + pcSearch: dict = counters["pcSearch"][0] + pointProgressMax: int = pcSearch["pointProgressMax"] + + searchPoints: int + if pointProgressMax in [30, 90, 102]: searchPoints = 3 - elif targetDesktop == 50 or targetDesktop >= 170 or targetDesktop == 150: + elif pointProgressMax in [50, 150] or pointProgressMax >= 170: searchPoints = 5 - remainingDesktop = int((targetDesktop - progressDesktop) / searchPoints) - remainingMobile = 0 - if dashboard["userStatus"]["levelInfo"]["activeLevel"] != "Level1": - progressMobile = counters["mobileSearch"][0]["pointProgress"] - targetMobile = counters["mobileSearch"][0]["pointProgressMax"] - remainingMobile = int((targetMobile - progressMobile) / searchPoints) + pcPointsRemaining = pcSearch["pointProgressMax"] - pcSearch["pointProgress"] + assert pcPointsRemaining % searchPoints == 0 + remainingDesktopSearches: int = int(pcPointsRemaining / searchPoints) + + activeLevel = bingInfo["userStatus"]["levelInfo"]["activeLevel"] + remainingMobileSearches: int = 0 + if activeLevel == "Level2": + mobileSearch: dict = counters["mobileSearch"][0] + mobilePointsRemaining = ( + mobileSearch["pointProgressMax"] - mobileSearch["pointProgress"] + ) + assert mobilePointsRemaining % searchPoints == 0 + remainingMobileSearches = int(mobilePointsRemaining / searchPoints) + elif activeLevel == "Level1": + pass + else: + raise AssertionError(f"Unknown activeLevel: {activeLevel}") + if desktopAndMobile: - return RemainingSearches(desktop=remainingDesktop, mobile=remainingMobile) + return RemainingSearches( + desktop=remainingDesktopSearches, mobile=remainingMobileSearches + ) if self.mobile: - return remainingMobile - return remainingDesktop + return remainingMobileSearches + return remainingDesktopSearches diff --git a/src/dailySet.py b/src/dailySet.py deleted file mode 100644 index e16b6146..00000000 --- a/src/dailySet.py +++ /dev/null @@ -1,89 +0,0 @@ -import logging -import urllib.parse -from datetime import datetime - -from src.browser import Browser -from .activities import Activities - - -class DailySet: - def __init__(self, browser: Browser): - self.browser = browser - self.webdriver = browser.webdriver - self.activities = Activities(browser) - - def completeDailySet(self): - # Function to complete the Daily Set - logging.info("[DAILY SET] " + "Trying to complete the Daily Set...") - data = self.browser.utils.getDashboardData()["dailySetPromotions"] - self.browser.utils.goToRewards() - todayDate = datetime.now().strftime("%m/%d/%Y") - for activity in data.get(todayDate, []): - cardId = int(activity["offerId"][-1:]) - try: - # Open the Daily Set activity - if activity["complete"] is not False: - continue - self.activities.openDailySetActivity(cardId) - if activity["promotionType"] == "urlreward": - logging.info(f"[DAILY SET] Completing search of card {cardId}") - # Complete search for URL reward - self.activities.completeSearch() - if activity["promotionType"] == "quiz": - if ( - activity["pointProgressMax"] == 50 - and activity["pointProgress"] == 0 - ): - logging.info( - "[DAILY SET] " + f"Completing This or That of card {cardId}" - ) - # Complete This or That for a specific point progress max - self.activities.completeThisOrThat() - elif ( - activity["pointProgressMax"] in [40, 30] - ): - logging.info(f"[DAILY SET] Completing quiz of card {cardId}") - # Complete quiz for specific point progress max - self.activities.completeQuiz() - elif ( - activity["pointProgressMax"] == 10 - and activity["pointProgress"] == 0 - ): - # Extract and parse search URL for additional checks - searchUrl = urllib.parse.unquote( - urllib.parse.parse_qs( - urllib.parse.urlparse(activity["destinationUrl"]).query - )["ru"][0] - ) - searchUrlQueries = urllib.parse.parse_qs( - urllib.parse.urlparse(searchUrl).query - ) - filters = {} - for filterEl in searchUrlQueries["filters"][0].split(" "): - filterEl = filterEl.split(":", 1) - filters[filterEl[0]] = filterEl[1] - if "PollScenarioId" in filters: - logging.info( - f"[DAILY SET] Completing poll of card {cardId}" - ) - # Complete survey for a specific scenario - self.activities.completeSurvey() - else: - logging.info( - f"[DAILY SET] Completing quiz of card {cardId}" - ) - try: - # Try completing ABC activity - self.activities.completeABC() - except Exception: # pylint: disable=broad-except - logging.warning("", exc_info=True) - # Default to completing quiz - self.activities.completeQuiz() - except Exception: # pylint: disable=broad-except - logging.error( - f"[DAILY SET] Error Daily Set of card {cardId}", exc_info=True - ) - # Reset tabs in case of an exception - self.browser.utils.resetTabs() - continue - logging.info("[DAILY SET] Exiting") diff --git a/src/login.py b/src/login.py index d42a84f5..adf48366 100644 --- a/src/login.py +++ b/src/login.py @@ -5,40 +5,85 @@ from pyotp import TOTP from selenium.common import TimeoutException +from selenium.common.exceptions import ( + ElementNotInteractableException, + NoSuchElementException, +) from selenium.webdriver.common.by import By from undetected_chromedriver import Chrome from src.browser import Browser +from src.utils import sendNotification, CONFIG class Login: browser: Browser - args: Namespace webdriver: Chrome - def __init__(self, browser: Browser, args: argparse.Namespace): + def __init__(self, browser: Browser): self.browser = browser self.webdriver = browser.webdriver self.utils = browser.utils - self.args = args - def login(self) -> None: - if self.utils.isLoggedIn(): - logging.info("[LOGIN] Already logged-in") - else: - logging.info("[LOGIN] Logging-in...") - self.executeLogin() - logging.info("[LOGIN] Logged-in successfully !") - - assert self.utils.isLoggedIn() + def check_locked_user(self): + try: + element = self.webdriver.find_element( + By.XPATH, "//div[@id='serviceAbuseLandingTitle']" + ) + self.locked(element) + except NoSuchElementException: + return + + def check_banned_user(self): + try: + element = self.webdriver.find_element(By.XPATH, '//*[@id="fraudErrorBody"]') + self.banned(element) + except NoSuchElementException: + return + + def locked(self, element): + try: + if element.is_displayed(): + logging.critical("This Account is Locked!") + self.webdriver.close() + raise Exception("Account locked, moving to the next account.") + except (ElementNotInteractableException, NoSuchElementException): + pass + + def banned(self, element): + try: + if element.is_displayed(): + logging.critical("This Account is Banned!") + self.webdriver.close() + raise Exception("Account banned, moving to the next account.") + except (ElementNotInteractableException, NoSuchElementException): + pass - def executeLogin(self) -> None: + def login(self) -> None: + try: + if self.utils.isLoggedIn(): + logging.info("[LOGIN] Already logged-in") + self.check_locked_user() + self.check_banned_user() + else: + logging.info("[LOGIN] Logging-in...") + self.execute_login() + logging.info("[LOGIN] Logged-in successfully!") + self.check_locked_user() + self.check_banned_user() + assert self.utils.isLoggedIn() + except Exception as e: + logging.error(f"Error during login: {e}") + self.webdriver.close() + raise + + def execute_login(self) -> None: # Email field emailField = self.utils.waitUntilVisible(By.ID, "i0116") logging.info("[LOGIN] Entering email...") emailField.click() - emailField.send_keys(self.browser.username) - assert emailField.get_attribute("value") == self.browser.username + emailField.send_keys(self.browser.email) + assert emailField.get_attribute("value") == self.browser.email self.utils.waitUntilClickable(By.ID, "idSIButton9").click() # Passwordless check @@ -52,13 +97,14 @@ def executeLogin(self) -> None: # Passworless login, have user confirm code on phone codeField = self.utils.waitUntilVisible(By.ID, "displaySign") logging.warning( - "[LOGIN] Confirm your login with code %s on your phone (you have" - " one minute)!\a", + "[LOGIN] Confirm your login with code %s on your phone (you have one minute)!\a", codeField.text, ) + if CONFIG.get("apprise.notify.login-code"): + sendNotification( + f"Confirm your login on your phone", f"Code: {codeField.text} (expires in 1 minute)") self.utils.waitUntilVisible(By.NAME, "kmsiForm", 60) logging.info("[LOGIN] Successfully verified!") - else: # Password-based login, enter password from accounts.json passwordField = self.utils.waitUntilClickable(By.NAME, "passwd") @@ -82,12 +128,9 @@ def executeLogin(self) -> None: logging.debug("isTOTPEnabled = %s", isTOTPEnabled) if isDeviceAuthEnabled: - # For some reason, undetected chromedriver doesn't receive the confirmation - # after the user has confirmed the login on their phone. + # Device-based authentication not supported raise Exception( - "Unfortunatly, device auth is not supported yet. Turn on" - " passwordless login in your account settings, use TOTPs or remove" - " 2FA altogether." + "Device authentication not supported. Please use TOTP or disable 2FA." ) # Device auth, have user confirm code on phone @@ -99,6 +142,9 @@ def executeLogin(self) -> None: " one minute)!\a", codeField.text, ) + if CONFIG.get("apprise.notify.login-code"): + sendNotification( + f"Confirm your login on your phone", f"Code: {codeField.text} (expires in 1 minute)") self.utils.waitUntilVisible(By.NAME, "kmsiForm", 60) logging.info("[LOGIN] Successfully verified!") @@ -108,15 +154,19 @@ def executeLogin(self) -> None: # TOTP token provided logging.info("[LOGIN] Entering OTP...") otp = TOTP(self.browser.totp.replace(" ", "")).now() - otpField = self.utils.waitUntilClickable(By.ID, "idTxtBx_SAOTCC_OTC") + otpField = self.utils.waitUntilClickable( + By.ID, "idTxtBx_SAOTCC_OTC" + ) otpField.send_keys(otp) assert otpField.get_attribute("value") == otp - self.utils.waitUntilClickable(By.ID, "idSubmit_SAOTCC_Continue").click() - + self.utils.waitUntilClickable( + By.ID, "idSubmit_SAOTCC_Continue" + ).click() else: # TOTP token not provided, manual intervention required - assert self.args.visible, ( - "[LOGIN] 2FA detected, provide token in accounts.json or run in" + assert CONFIG.browser.visible, ( + "[LOGIN] 2FA detected, provide token in accounts.json or or run in" + "[LOGIN] 2FA detected, provide token in accounts.json or handle manually." " visible mode to handle login." ) print( @@ -125,6 +175,9 @@ def executeLogin(self) -> None: ) input() + self.check_locked_user() + self.check_banned_user() + self.utils.waitUntilVisible(By.NAME, "kmsiForm") self.utils.waitUntilClickable(By.ID, "acceptButton").click() @@ -137,7 +190,7 @@ def executeLogin(self) -> None: if isAskingToProtect: assert ( - self.args.visible + CONFIG.browser.visible ), "Account protection detected, run in visible mode to handle login" print( "Account protection detected, handle prompts and press enter when on rewards page" diff --git a/src/morePromotions.py b/src/morePromotions.py deleted file mode 100644 index fd686b74..00000000 --- a/src/morePromotions.py +++ /dev/null @@ -1,130 +0,0 @@ -import contextlib -import logging -import random -import time - -from selenium.common import TimeoutException -from selenium.webdriver.common.by import By - -from src.browser import Browser -from .activities import Activities -from .utils import Utils - - -# todo Rename MoreActivities? -class MorePromotions: - def __init__(self, browser: Browser): - self.browser = browser - self.activities = Activities(browser) - - # todo Refactor so less complex - def completeMorePromotions(self): - # Function to complete More Promotions - logging.info("[MORE PROMOS] " + "Trying to complete More Promotions...") - morePromotions: list[dict] = self.browser.utils.getDashboardData()[ - "morePromotions" - ] - self.browser.utils.goToRewards() - for promotion in morePromotions: - try: - promotionTitle = promotion["title"].replace("\u200b", "").replace("\xa0", " ") - logging.debug(f"promotionTitle={promotionTitle}") - # Open the activity for the promotion - if ( - promotion["complete"] is not False - or promotion["pointProgressMax"] == 0 - ): - logging.debug("Already done, continuing") - continue - self.activities.openMorePromotionsActivity( - morePromotions.index(promotion) - ) - self.browser.webdriver.execute_script("window.scrollTo(0, 1080)") - with contextlib.suppress(TimeoutException): - searchbar = self.browser.utils.waitUntilClickable( - By.ID, "sb_form_q" - ) - self.browser.utils.click(searchbar) - # todo These and following are US-English specific, maybe there's a good way to internationalize - # todo Could use dictionary of promotionTitle to search to simplify - if "Search the lyrics of a song" in promotionTitle: - searchbar.send_keys("black sabbath supernaut lyrics") - searchbar.submit() - elif "Translate anything" in promotionTitle: - searchbar.send_keys("translate pencil sharpener to spanish") - searchbar.submit() - elif "Let's watch that movie again!" in promotionTitle: - searchbar.send_keys("aliens movie") - searchbar.submit() - elif "Discover open job roles" in promotionTitle: - searchbar.send_keys("walmart open job roles") - searchbar.submit() - elif "Plan a quick getaway" in promotionTitle: - searchbar.send_keys("flights nyc to paris") - searchbar.submit() - elif "You can track your package" in promotionTitle: - searchbar.send_keys("usps tracking") - searchbar.submit() - elif "Find somewhere new to explore" in promotionTitle: - searchbar.send_keys("directions to new york") - searchbar.submit() - elif "Too tired to cook tonight?" in promotionTitle: - searchbar.send_keys("Pizza Hut near me") - searchbar.submit() - elif "Quickly convert your money" in promotionTitle: - searchbar.send_keys("convert 374 usd to yen") - searchbar.submit() - elif "Learn to cook a new recipe" in promotionTitle: - searchbar.send_keys("how cook pierogi") - searchbar.submit() - elif "Find places to stay" in promotionTitle: - searchbar.send_keys("hotels rome italy") - searchbar.submit() - elif "How's the economy?" in promotionTitle: - searchbar.send_keys("sp 500") - searchbar.submit() - elif "Who won?" in promotionTitle: - searchbar.send_keys("braves score") - searchbar.submit() - elif "Gaming time" in promotionTitle: - searchbar.send_keys("vampire survivors video game") - searchbar.submit() - elif "Expand your vocabulary" in promotionTitle: - searchbar.send_keys("definition definition") - searchbar.submit() - elif "What time is it?" in promotionTitle: - searchbar.send_keys("china time") - searchbar.submit() - elif promotion["promotionType"] == "urlreward": - # Complete search for URL reward - self.activities.completeSearch() - elif ( - promotion["promotionType"] == "quiz" - ): - # Complete different types of quizzes based on point progress max - if promotion["pointProgressMax"] == 10: - self.activities.completeABC() - elif promotion["pointProgressMax"] in [30, 40]: - self.activities.completeQuiz() - elif promotion["pointProgressMax"] == 50: - self.activities.completeThisOrThat() - else: - # Default to completing search - self.activities.completeSearch() - self.browser.webdriver.execute_script("window.scrollTo(0, 1080)") - time.sleep(random.randint(5, 10)) - - self.browser.utils.resetTabs() - time.sleep(2) - except Exception: # pylint: disable=broad-except - logging.error("[MORE PROMOS] Error More Promotions", exc_info=True) - # Reset tabs in case of an exception - self.browser.utils.resetTabs() - continue - incompletePromotions: list[tuple[str, str]] = [] - for promotion in self.browser.utils.getDashboardData()["morePromotions"]: # Have to refresh - if promotion["pointProgress"] < promotion["pointProgressMax"]: - incompletePromotions.append((promotion["title"], promotion["promotionType"])) - if incompletePromotions: - Utils.sendNotification("Incomplete promotions(s)", incompletePromotions) - logging.info("[MORE PROMOS] Exiting") diff --git a/src/punchCards.py b/src/punchCards.py index 28d24fb9..514e0a06 100644 --- a/src/punchCards.py +++ b/src/punchCards.py @@ -23,12 +23,12 @@ def completePunchCard(self, url: str, childPromotions: dict): self.webdriver.find_element( By.XPATH, "//a[@class='offer-cta']/div" ).click() - self.browser.utils.visitNewTab(random.randint(13, 17)) + self.browser.utils.switchToNewTab(True) if child["promotionType"] == "quiz": self.webdriver.find_element( By.XPATH, "//a[@class='offer-cta']/div" ).click() - self.browser.utils.switchToNewTab(8) + self.browser.utils.switchToNewTab() counter = str( self.webdriver.find_element( By.XPATH, '//*[@id="QuestionPane0"]/div[2]' @@ -50,7 +50,6 @@ def completePunchCard(self, url: str, childPromotions: dict): ).click() time.sleep(random.randint(100, 700) / 100) time.sleep(random.randint(100, 700) / 100) - self.browser.utils.closeCurrentTab() def completePunchCards(self): # Function to complete all punch cards @@ -99,6 +98,6 @@ def completePromotionalItems(self): self.webdriver.find_element( By.XPATH, '//*[@id="promo-item"]/section/div/div/div/span' ).click() - self.browser.utils.visitNewTab(8) + self.browser.utils.switchToNewTab(True) except Exception: logging.debug("", exc_info=True) diff --git a/src/readToEarn.py b/src/readToEarn.py index 408382ca..3f17d663 100644 --- a/src/readToEarn.py +++ b/src/readToEarn.py @@ -7,93 +7,110 @@ from src.browser import Browser from .activities import Activities -from .utils import Utils +from .utils import makeRequestsSession + +# todo Use constant naming style +client_id = "0000000040170455" +authorization_base_url = "https://login.live.com/oauth20_authorize.srf" +token_url = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token" +redirect_uri = " https://login.live.com/oauth20_desktop.srf" +scope = ["service::prod.rewardsplatform.microsoft.com::MBI_SSL"] -client_id = '0000000040170455' -authorization_base_url = 'https://login.live.com/oauth20_authorize.srf' -token_url = 'https://login.microsoftonline.com/consumers/oauth2/v2.0/token' -redirect_uri = ' https://login.live.com/oauth20_desktop.srf' -scope = [ "service::prod.rewardsplatform.microsoft.com::MBI_SSL"] class ReadToEarn: def __init__(self, browser: Browser): self.browser = browser self.webdriver = browser.webdriver self.activities = Activities(browser) - + def completeReadToEarn(self): - + logging.info("[READ TO EARN] " + "Trying to complete Read to Earn...") - accountName = self.browser.username - + accountName = self.browser.email + # Should Really Cache Token and load it in. # To Save token - #with open('token.pickle', 'wb') as f: + # with open('token.pickle', 'wb') as f: # pickle.dump(token, f) # To Load token - #with open('token.pickle', 'rb') as f: + # with open('token.pickle', 'rb') as f: # token = pickle.load(f) - #mobileApp = OAuth2Session(client_id, scope=scope, token=token) - + # mobileApp = OAuth2Session(client_id, scope=scope, token=token) + # Use Webdriver to get OAuth2 Token # This works, since you already logged into Bing, so no user interaction needed - mobileApp = Utils.makeRequestsSession(OAuth2Session(client_id, scope=scope, redirect_uri=redirect_uri)) - authorization_url, state = mobileApp.authorization_url(authorization_base_url, access_type="offline_access", login_hint=accountName) - + mobileApp = makeRequestsSession( + OAuth2Session(client_id, scope=scope, redirect_uri=redirect_uri) + ) + authorization_url, state = mobileApp.authorization_url( + authorization_base_url, access_type="offline_access", login_hint=accountName + ) + # Get Referer URL from webdriver self.webdriver.get(authorization_url) while True: logging.info("[READ TO EARN] Waiting for Login") - if self.webdriver.current_url[:48] == "https://login.live.com/oauth20_desktop.srf?code=": + if ( + self.webdriver.current_url[:48] + == "https://login.live.com/oauth20_desktop.srf?code=" + ): redirect_response = self.webdriver.current_url break time.sleep(1) - + logging.info("[READ TO EARN] Logged-in successfully !") # Use returned URL to create a token - token = mobileApp.fetch_token(token_url, authorization_response=redirect_response,include_client_id=True) - + token = mobileApp.fetch_token( + token_url, authorization_response=redirect_response, include_client_id=True + ) + # Do Daily Check in json_data = { - 'amount': 1, - 'country': self.browser.localeGeo.lower(), - 'id': 1, - 'type': 101, - 'attributes': { - 'offerid': 'Gamification_Sapphire_DailyCheckIn', + "amount": 1, + "country": self.browser.localeGeo.lower(), + "id": 1, + "type": 101, + "attributes": { + "offerid": "Gamification_Sapphire_DailyCheckIn", }, } - json_data['id'] = secrets.token_hex(64) + json_data["id"] = secrets.token_hex(64) logging.info("[READ TO EARN] Daily App Check In") - r = mobileApp.post("https://prod.rewardsplatform.microsoft.com/dapi/me/activities",json=json_data) + r = mobileApp.post( + "https://prod.rewardsplatform.microsoft.com/dapi/me/activities", + json=json_data, + ) balance = r.json().get("response").get("balance") time.sleep(random.randint(10, 20)) # json data to confirm an article is read json_data = { - 'amount': 1, - 'country': self.browser.localeGeo.lower(), - 'id': 1, - 'type': 101, - 'attributes': { - 'offerid': 'ENUS_readarticle3_30points', - }, - } + "amount": 1, + "country": self.browser.localeGeo.lower(), + "id": 1, + "type": 101, + "attributes": { + "offerid": "ENUS_readarticle3_30points", + }, + } # 10 is the most articles you can read. Sleep time is a guess, not tuned for i in range(10): # Replace ID with a random value so get credit for a new article - json_data['id'] = secrets.token_hex(64) - r = mobileApp.post("https://prod.rewardsplatform.microsoft.com/dapi/me/activities",json=json_data) + json_data["id"] = secrets.token_hex(64) + r = mobileApp.post( + "https://prod.rewardsplatform.microsoft.com/dapi/me/activities", + json=json_data, + ) newbalance = r.json().get("response").get("balance") if newbalance == balance: logging.info("[READ TO EARN] Read All Available Articles !") break else: - logging.info("[READ TO EARN] Read Article " + str(i+1)) + logging.info("[READ TO EARN] Read Article " + str(i + 1)) balance = newbalance time.sleep(random.randint(10, 20)) - - logging.info("[READ TO EARN] Completed the Read to Earn successfully !") + + logging.info("[READ TO EARN] Completed the Read to Earn successfully !") diff --git a/src/searches.py b/src/searches.py index d97e1c6a..d583ae93 100644 --- a/src/searches.py +++ b/src/searches.py @@ -1,24 +1,19 @@ -import contextlib import dbm.dumb import json import logging -import random import shelve -import time from datetime import date, timedelta from enum import Enum, auto from itertools import cycle +from random import random, randint, shuffle +from time import sleep from typing import Final import requests -from selenium.common import TimeoutException from selenium.webdriver.common.by import By -from selenium.webdriver.remote.webelement import WebElement -from selenium.webdriver.support import expected_conditions -from selenium.webdriver.support.wait import WebDriverWait from src.browser import Browser -from src.utils import Utils +from src.utils import CONFIG, makeRequestsSession, getProjectRoot class RetriesStrategy(Enum): @@ -37,27 +32,22 @@ class RetriesStrategy(Enum): class Searches: - config = Utils.loadConfig() - maxRetries: Final[int] = config.get("retries", {}).get("max", 8) + maxRetries: Final[int] = CONFIG.retries.max """ the max amount of retries to attempt """ - baseDelay: Final[float] = config.get("retries", {}).get( - "base_delay_in_seconds", 14.0625 - ) + baseDelay: Final[float] = CONFIG.get("retries.base_delay_in_seconds") """ how many seconds to delay """ # retriesStrategy = Final[ # todo Figure why doesn't work with equality below - retriesStrategy = RetriesStrategy[ - config.get("retries", {}).get("strategy", RetriesStrategy.CONSTANT.name) - ] + retriesStrategy = RetriesStrategy[CONFIG.retries.strategy] def __init__(self, browser: Browser): self.browser = browser self.webdriver = browser.webdriver - dumbDbm = dbm.dumb.open((Utils.getProjectRoot() / "google_trends").__str__()) + dumbDbm = dbm.dumb.open((getProjectRoot() / "google_trends").__str__()) self.googleTrendsShelf: shelve.Shelf = shelve.Shelf(dumbDbm) def __enter__(self): @@ -70,7 +60,7 @@ def getGoogleTrends(self, wordsCount: int) -> list[str]: # Function to retrieve Google Trends search terms searchTerms: list[str] = [] i = 0 - session = Utils.makeRequestsSession() + session = makeRequestsSession() while len(searchTerms) < wordsCount: i += 1 # Fetching daily trends from Google Trends API @@ -80,7 +70,7 @@ def getGoogleTrends(self, wordsCount: int) -> list[str]: ) assert ( r.status_code == requests.codes.ok - ) # todo Add guidance if assertion fails + ), "Adjust retry config in src.utils.Utils.makeRequestsSession" trends = json.loads(r.text[6:]) for topic in trends["default"]["trendingSearchesDays"][0][ "trendingSearches" @@ -97,7 +87,7 @@ def getGoogleTrends(self, wordsCount: int) -> list[str]: def getRelatedTerms(self, term: str) -> list[str]: # Function to retrieve related terms from Bing API relatedTerms: list[str] = ( - Utils.makeRequestsSession() + makeRequestsSession() .get( f"https://api.bing.com/osjson.aspx?query={term}", headers={"User-agent": self.browser.userAgent}, @@ -116,25 +106,36 @@ def bingSearches(self) -> None: self.browser.utils.goToSearch() - while (remainingSearches := self.browser.getRemainingSearches()) > 0: - logging.info(f"[BING] Remaining searches={remainingSearches}") + while True: desktopAndMobileRemaining = self.browser.getRemainingSearches( desktopAndMobile=True ) + logging.info(f"[BING] Remaining searches={desktopAndMobileRemaining}") + if ( + self.browser.browserType == "desktop" + and desktopAndMobileRemaining.desktop == 0 + ) or ( + self.browser.browserType == "mobile" + and desktopAndMobileRemaining.mobile == 0 + ): + break + if desktopAndMobileRemaining.getTotal() > len(self.googleTrendsShelf): # self.googleTrendsShelf.clear() # Maybe needed? logging.debug( f"google_trends before load = {list(self.googleTrendsShelf.items())}" ) trends = self.getGoogleTrends(desktopAndMobileRemaining.getTotal()) - random.shuffle(trends) + shuffle(trends) for trend in trends: self.googleTrendsShelf[trend] = None logging.debug( f"google_trends after load = {list(self.googleTrendsShelf.items())}" ) + self.bingSearch() - time.sleep(random.randint(10, 15)) + del self.googleTrendsShelf[list(self.googleTrendsShelf.keys())[0]] + sleep(randint(10, 15)) logging.info( f"[BING] Finished {self.browser.browserType.capitalize()} Edge Bing searches !" @@ -151,6 +152,7 @@ def bingSearch(self) -> None: baseDelay = Searches.baseDelay logging.debug(f"rootTerm={rootTerm}") + # todo If first 3 searches of day, don't retry since points register differently, will be a bit quicker for i in range(self.maxRetries + 1): if i != 0: sleepTime: float @@ -160,39 +162,27 @@ def bingSearch(self) -> None: sleepTime = baseDelay else: raise AssertionError + sleepTime += baseDelay * random() # Add jitter logging.debug( f"[BING] Search attempt not counted {i}/{Searches.maxRetries}, sleeping {sleepTime}" f" seconds..." ) - time.sleep(sleepTime) + sleep(sleepTime) - searchbar: WebElement - for _ in range(1000): - searchbar = self.browser.utils.waitUntilClickable( - By.ID, "sb_form_q", timeToWait=40 - ) - searchbar.clear() - term = next(termsCycle) - logging.debug(f"term={term}") - time.sleep(1) - searchbar.send_keys(term) - time.sleep(1) - with contextlib.suppress(TimeoutException): - WebDriverWait(self.webdriver, 20).until( - expected_conditions.text_to_be_present_in_element_value( - (By.ID, "sb_form_q"), term - ) - ) - break - logging.debug("error send_keys") - else: - # todo Still happens occasionally, gotta be a fix - raise TimeoutException + searchbar = self.browser.utils.waitUntilClickable( + By.ID, "sb_form_q", timeToWait=40 + ) + searchbar.clear() + term = next(termsCycle) + logging.debug(f"term={term}") + sleep(1) + searchbar.send_keys(term) + sleep(1) searchbar.submit() pointsAfter = self.browser.utils.getAccountPoints() if pointsBefore < pointsAfter: - del self.googleTrendsShelf[rootTerm] + sleep(randint(CONFIG.cooldown.min, CONFIG.cooldown.max)) return # todo @@ -200,7 +190,3 @@ def bingSearch(self) -> None: # logging.info("[BING] " + "TIMED OUT GETTING NEW PROXY") # self.webdriver.proxy = self.browser.giveMeProxy() logging.error("[BING] Reached max search attempt retries") - - logging.debug("Moving passedInTerm to end of list") - del self.googleTrendsShelf[rootTerm] - self.googleTrendsShelf[rootTerm] = None diff --git a/src/userAgentGenerator.py b/src/userAgentGenerator.py index 6cbbaee9..31895a0f 100644 --- a/src/userAgentGenerator.py +++ b/src/userAgentGenerator.py @@ -4,7 +4,7 @@ import requests from requests import HTTPError, Response -from src.utils import Utils +from src.utils import makeRequestsSession class GenerateUserAgent: @@ -139,29 +139,45 @@ def getEdgeVersions(self) -> tuple[str, str]: response = self.getWebdriverPage( "https://edgeupdates.microsoft.com/api/products" ) + + def getValueIgnoreCase(data: dict, key: str) -> Any: + """Get the value from a dictionary ignoring the case of the first letter of the key.""" + for k, v in data.items(): + if k.lower() == key.lower(): + return v + return None + data = response.json() if stableProduct := next( - (product for product in data if product["Product"] == "Stable"), + ( + product + for product in data + if getValueIgnoreCase(product, "product") == "Stable" + ), None, ): - releases = stableProduct["Releases"] + releases = getValueIgnoreCase(stableProduct, "releases") androidRelease = next( - (release for release in releases if release["Platform"] == "Android"), + ( + release + for release in releases + if getValueIgnoreCase(release, "platform") == "Android" + ), None, ) windowsRelease = next( ( release for release in releases - if release["Platform"] == "Windows" - and release["Architecture"] == "x64" + if getValueIgnoreCase(release, "platform") == "Windows" + and getValueIgnoreCase(release, "architecture") == "x64" ), None, ) if androidRelease and windowsRelease: return ( - windowsRelease["ProductVersion"], - androidRelease["ProductVersion"], + getValueIgnoreCase(windowsRelease, "productVersion"), + getValueIgnoreCase(androidRelease, "productVersion"), ) raise HTTPError("Failed to get Edge versions.") @@ -180,7 +196,7 @@ def getChromeVersion(self) -> str: @staticmethod def getWebdriverPage(url: str) -> Response: - response = Utils.makeRequestsSession().get(url) + response = makeRequestsSession().get(url) if response.status_code != requests.codes.ok: # pylint: disable=no-member raise HTTPError( f"Failed to get webdriver page {url}. " diff --git a/src/utils.py b/src/utils.py index 5b2fc555..b0f5cab8 100644 --- a/src/utils.py +++ b/src/utils.py @@ -4,17 +4,24 @@ import logging import re import time -from argparse import Namespace +from argparse import Namespace, ArgumentParser +from datetime import date from pathlib import Path -from typing import Any +import random +from typing import Any, Self +from copy import deepcopy import requests import yaml from apprise import Apprise from requests import Session from requests.adapters import HTTPAdapter -from selenium.common import NoSuchElementException, TimeoutException, ElementClickInterceptedException, \ - ElementNotInteractableException +from selenium.common import ( + ElementClickInterceptedException, + ElementNotInteractableException, + NoSuchElementException, + TimeoutException, +) from selenium.webdriver.chrome.webdriver import WebDriver from selenium.webdriver.common.by import By from selenium.webdriver.remote.webelement import WebElement @@ -22,12 +29,204 @@ from selenium.webdriver.support.wait import WebDriverWait from urllib3 import Retry -from .constants import REWARDS_URL -from .constants import SEARCH_URL +from .constants import REWARDS_URL, SEARCH_URL + +class Config(dict): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + for key, value in self.items(): + if isinstance(value, dict): + self[key] = self.__class__(value) + if isinstance(value, list): + for i, v in enumerate(value): + if isinstance(v, dict): + value[i] = self.__class__(v) + + def __or__(self, other): + new = deepcopy(self) + for key in other: + if key in new: + if isinstance(new[key], dict) and isinstance(other[key], dict): + new[key] = new[key] | other[key] + continue + if isinstance(other[key], dict): + new[key] = self.__class__(other[key]) + continue + if isinstance(other[key], list): + new[key] = self.configifyList(other[key]) + continue + new[key] = other[key] + return new + + + def __getattribute__(self, item): + if item in self: + return self[item] + return super().__getattribute__(item) + + def __setattr__(self, key, value): + if type(value) is dict: + value = self.__class__(value) + if type(value) is list: + value = self.configifyList(value) + self[key] = value + + + def __getitem__(self, item): + if type(item) is not str or not '.' in item: + return super().__getitem__(item) + item: str + items = item.split(".") + found = super().__getitem__(items[0]) + for item in items[1:]: + found = found.__getitem__(item) + return found + + def __setitem__(self, key, value): + if type(value) is dict: + value = self.__class__(value) + if type(value) is list: + value = self.configifyList(value) + if type(key) is not str or not '.' in key: + return super().__setitem__(key, value) + item: str + items = key.split(".") + found = super().__getitem__(items[0]) + for item in items[1:-1]: + found = found.__getitem__(item) + found.__setitem__(items[-1], value) + + @classmethod + def fromYaml(cls, path: Path) -> Self: + if not path.exists() or not path.is_file(): + return cls() + with open(path, encoding="utf-8") as f: + yamlContents = yaml.safe_load(f) + if not yamlContents: + return cls() + return cls(yamlContents) + + + @classmethod + def configifyList(cls, listToConvert: list) -> list: + new = [None] * len(listToConvert) + for index, item in enumerate(listToConvert): + if isinstance(item, dict): + new[index] = cls(item) + continue + if isinstance(item, list): + new[index] = cls.configifyList(item) + continue + new[index] = item + return new + + @classmethod + def dictifyList(cls, listToConvert: list) -> list: + new = [None] * len(listToConvert) + for index, item in enumerate(listToConvert): + if isinstance(item, cls): + new[index] = item.toDict() + continue + if isinstance(item, list): + new[index] = cls.dictifyList(item) + continue + new[index] = item + return new + + + def get(self, key, default=None): + if type(key) is not str or not '.' in key: + return super().get(key, default) + item: str + keys = key.split(".") + found = super().get(keys[0], default) + for key in keys[1:]: + found = found.get(key, default) + return found + + def toDict(self) -> dict: + new = {} + for key, value in self.items(): + if isinstance(value, self.__class__): + new[key] = value.toDict() + continue + if isinstance(value, list): + new[key] = self.dictifyList(value) + continue + new[key] = value + return new + + +DEFAULT_CONFIG: Config = Config( + { + 'apprise': { + 'enabled': True, + 'notify': { + 'incomplete-activity': True, + 'uncaught-exception': True, + 'login-code': True + }, + 'summary': 'ON_ERROR', + 'urls': [] + }, + 'browser': { + 'geolocation': 'US', + 'language': 'en', + 'visible': False, + 'proxy': None + }, + 'activities': { + 'ignore': [ + 'Get 50 entries plus 1000 points!', + "Safeguard your family's info" + ], + 'search': { + 'Black Friday shopping': 'black friday deals', + 'Discover open job roles': 'jobs at microsoft', + 'Expand your vocabulary': 'define demure', + 'Find places to stay': 'hotels rome italy', + 'Find somewhere new to explore': 'directions to new york', + 'Gaming time': 'vampire survivors video game', + 'Get your shopping done faster': 'new iphone', + 'Houses near you': 'apartments manhattan', + "How's the economy?": 'sp 500', + 'Learn to cook a new recipe': 'how cook pierogi', + "Let's watch that movie again!": 'aliens movie', + 'Plan a quick getaway': 'flights nyc to paris', + 'Prepare for the weather': 'weather tomorrow', + 'Quickly convert your money': 'convert 374 usd to yen', + 'Search the lyrics of a song': 'black sabbath supernaut lyrics', + 'Stay on top of the elections': 'election news latest', + 'Too tired to cook tonight?': 'Pizza Hut near me', + 'Translate anything': 'translate pencil sharpener to spanish', + 'What time is it?': 'china time', + "What's for Thanksgiving dinner?": 'pumpkin pie recipe', + 'Who won?': 'braves score', + 'You can track your package': 'usps tracking' + } + }, + 'logging': { + 'format': '%(asctime)s [%(levelname)s] %(message)s', + 'level': 'INFO' + }, + 'retries': { + 'base_delay_in_seconds': 120, + 'max': 4, + 'strategy': 'EXPONENTIAL' + }, + 'cooldown': { + 'min': 300, + 'max': 600 + }, + 'search': { + 'type': 'both' + }, + 'accounts': [] + } +) class Utils: - args: Namespace def __init__(self, webdriver: WebDriver): self.webdriver = webdriver @@ -35,38 +234,7 @@ def __init__(self, webdriver: WebDriver): locale = pylocale.getdefaultlocale()[0] pylocale.setlocale(pylocale.LC_NUMERIC, locale) - self.config = self.loadConfig() - - @staticmethod - def getProjectRoot() -> Path: - return Path(__file__).parent.parent - - @staticmethod - def loadConfig(configFilename="config.yaml") -> dict: - configFile = Utils.getProjectRoot() / configFilename - try: - with open(configFile, "r") as file: - config = yaml.safe_load(file) - if not config: - logging.info(f"{file} doesn't exist") - return {} - return config - except OSError: - logging.warning(f"{configFilename} doesn't exist") - return {} - - @staticmethod - def sendNotification(title, body) -> None: - if Utils.args.disable_apprise: - return - apprise = Apprise() - urls: list[str] = Utils.loadConfig("config-private.yaml").get("apprise", {}).get("urls", []) - if not urls: - logging.debug("No urls found, not sending notification") - return - for url in urls: - apprise.add(url) - assert apprise.notify(title=str(title), body=str(body)) + # self.config = self.loadConfig() def waitUntilVisible( self, by: str, selector: str, timeToWait: float = 10 @@ -119,12 +287,7 @@ def goToSearch(self) -> None: # self.webdriver.current_url == SEARCH_URL # ), f"{self.webdriver.current_url} {SEARCH_URL}" # need regex: AssertionError: https://www.bing.com/?toWww=1&redig=A5B72363182B49DEBB7465AD7520FDAA https://bing.com/ - @staticmethod - def getAnswerCode(key: str, string: str) -> str: - t = sum(ord(string[i]) for i in range(len(string))) - t += int(key[-2:], 16) - return str(t) - + # Prefer getBingInfo if possible def getDashboardData(self) -> dict: urlBefore = self.webdriver.current_url try: @@ -136,8 +299,17 @@ def getDashboardData(self) -> dict: except TimeoutException: self.goToRewards() + def getDailySetPromotions(self) -> list[dict]: + return self.getDashboardData()["dailySetPromotions"][ + date.today().strftime("%m/%d/%Y") + ] + + def getMorePromotions(self) -> list[dict]: + return self.getDashboardData()["morePromotions"] + + # Not reliable def getBingInfo(self) -> Any: - session = self.makeRequestsSession() + session = makeRequestsSession() for cookie in self.webdriver.get_cookies(): session.cookies.set(cookie["name"], cookie["value"]) @@ -145,23 +317,13 @@ def getBingInfo(self) -> Any: response = session.get("https://www.bing.com/rewards/panelflyout/getuserinfo") assert response.status_code == requests.codes.ok - return response.json()["userInfo"] - - @staticmethod - def makeRequestsSession(session: Session = requests.session()) -> Session: - retry = Retry( - total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504] - ) - session.mount( - "https://", HTTPAdapter(max_retries=retry) - ) # See https://stackoverflow.com/a/35504626/4164390 to finetune - session.mount( - "http://", HTTPAdapter(max_retries=retry) - ) # See https://stackoverflow.com/a/35504626/4164390 to finetune - return session + # fixme Add more asserts + # todo Add fallback to src.utils.Utils.getDashboardData (slower but more reliable) + return response.json() def isLoggedIn(self) -> bool: - # return self.getBingInfo()["isRewardsUser"] # todo For some reason doesn't work, but doesn't involve changing url so preferred + if self.getBingInfo()["isRewardsUser"]: # faster, if it works + return True self.webdriver.get( "https://rewards.bing.com/Signin/" ) # changed site to allow bypassing when M$ blocks access to login.live.com randomly @@ -173,7 +335,7 @@ def isLoggedIn(self) -> bool: return False def getAccountPoints(self) -> int: - return self.getBingInfo()["balance"] + return self.getDashboardData()["userStatus"]["availablePoints"] def getGoalPoints(self) -> int: return self.getDashboardData()["userStatus"]["redeemGoal"]["price"] @@ -182,7 +344,7 @@ def getGoalTitle(self) -> str: return self.getDashboardData()["userStatus"]["redeemGoal"]["title"] def tryDismissAllMessages(self) -> None: - buttons = [ + byValues = [ (By.ID, "iLandingViewAction"), (By.ID, "iShowSkip"), (By.ID, "iNext"), @@ -190,30 +352,26 @@ def tryDismissAllMessages(self) -> None: (By.ID, "idSIButton9"), (By.ID, "bnp_btn_accept"), (By.ID, "acceptButton"), + (By.CSS_SELECTOR, ".dashboardPopUpPopUpSelectButton"), ] - for button in buttons: - try: - elements = self.webdriver.find_elements(by=button[0], value=button[1]) - except (NoSuchElementException, ElementNotInteractableException): # Expected? - logging.debug("", exc_info=True) - continue - for element in elements: - element.click() - self.tryDismissCookieBanner() - self.tryDismissBingCookieBanner() - - def tryDismissCookieBanner(self) -> None: - with contextlib.suppress(NoSuchElementException, ElementNotInteractableException): # Expected + for byValue in byValues: + dismissButtons = [] + with contextlib.suppress(NoSuchElementException): + dismissButtons = self.webdriver.find_elements( + by=byValue[0], value=byValue[1] + ) + for dismissButton in dismissButtons: + dismissButton.click() + with contextlib.suppress(NoSuchElementException): self.webdriver.find_element(By.ID, "cookie-banner").find_element( By.TAG_NAME, "button" ).click() - def tryDismissBingCookieBanner(self) -> None: - with contextlib.suppress(NoSuchElementException, ElementNotInteractableException): # Expected - self.webdriver.find_element(By.ID, "bnp_btn_accept").click() - - def switchToNewTab(self, timeToWait: float = 0) -> None: + def switchToNewTab(self, timeToWait: float = 15, closeTab: bool = False) -> None: + time.sleep(timeToWait) self.webdriver.switch_to.window(window_name=self.webdriver.window_handles[1]) + if closeTab: + self.closeCurrentTab() def closeCurrentTab(self) -> None: self.webdriver.close() @@ -221,33 +379,298 @@ def closeCurrentTab(self) -> None: self.webdriver.switch_to.window(window_name=self.webdriver.window_handles[0]) time.sleep(0.5) - def visitNewTab(self, timeToWait: float = 0) -> None: - self.switchToNewTab(timeToWait) - self.closeCurrentTab() - - @staticmethod - def formatNumber(number, num_decimals=2) -> str: - return pylocale.format_string( - f"%10.{num_decimals}f", number, grouping=True - ).strip() - - @staticmethod - def getBrowserConfig(sessionPath: Path) -> dict | None: - configFile = sessionPath / "config.json" - if not configFile.exists(): - return - with open(configFile, "r") as f: - return json.load(f) - - @staticmethod - def saveBrowserConfig(sessionPath: Path, config: dict) -> None: - configFile = sessionPath / "config.json" - with open(configFile, "w") as f: - json.dump(config, f) - def click(self, element: WebElement) -> None: try: + WebDriverWait(self.webdriver, 10).until( + expected_conditions.element_to_be_clickable(element) + ) element.click() except (ElementClickInterceptedException, ElementNotInteractableException): self.tryDismissAllMessages() + WebDriverWait(self.webdriver, 10).until( + expected_conditions.element_to_be_clickable(element) + ) element.click() + + +def argumentParser() -> Namespace: + parser = ArgumentParser( + description="A simple bot that uses Selenium to farm M$ Rewards in Python", + epilog="At least one account should be specified, either using command line arguments or a configuration file." + "\nAll specified arguments will override the configuration file values." + ) + parser.add_argument( + "-c", + "--config", + type=str, + default=None, + help="Specify the configuration file path", + ) + parser.add_argument( + "-C", + "--create-config", + action="store_true", + help="Create a fillable configuration file with basic settings and given ones if none exists", + ) + parser.add_argument( + "-v", + "--visible", + action="store_true", + help="Visible browser (Disable headless mode)", + ) + parser.add_argument( + "-l", + "--lang", + type=str, + default=None, + help="Language (ex: en)" + "\nsee https://serpapi.com/google-languages for options" + ) + parser.add_argument( + "-g", + "--geo", + type=str, + default=None, + help="Searching geolocation (ex: US)" + "\nsee https://serpapi.com/google-trends-locations for options (should be uppercase)" + ) + parser.add_argument( + "-em", + "--email", + type=str, + default=None, + help="Email address of the account to run. Only used if a password is given.", + ) + parser.add_argument( + "-pw", + "--password", + type=str, + default=None, + help="Password of the account to run. Only used if an email is given.", + ) + parser.add_argument( + "-p", + "--proxy", + type=str, + default=None, + help="Global Proxy, supports http/https/socks4/socks5 (overrides config per-account proxies)" + "\n`(ex: http://user:pass@host:port)`", + ) + parser.add_argument( + "-t", + "--searchtype", + choices=['desktop', 'mobile', 'both'], + default=None, + help="Set to search in either desktop, mobile or both (default: both)", + ) + parser.add_argument( + "-da", + "--disable-apprise", + action="store_true", + help="Disable Apprise notifications, useful when developing", + ) + parser.add_argument( + "-d", + "--debug", + action="store_true", + help="Set the logging level to DEBUG", + ) + return parser.parse_args() + + +def getProjectRoot() -> Path: + return Path(__file__).parent.parent + + +def commandLineArgumentsAsConfig(args: Namespace) -> Config: + config = Config() + if args.visible: + config.browser = Config() + config.browser.visible = True + if args.lang: + if not 'browser' in config: + config.browser = Config() + config.browser.language = args.lang + if args.geo: + if not 'browser' in config: + config.browser = Config() + config.browser.geolocation = args.geo + if args.proxy: + if not 'browser' in config: + config.browser = Config() + config.browser.proxy = args.proxy + if args.disable_apprise: + config.apprise = Config() + config.apprise.enabled = False + if args.debug: + config.logging = Config() + config.logging.level = 'DEBUG' + if args.searchtype: + config.search = Config() + config.search.type = args.searchtype + if args.email and args.password: + config.accounts = [Config( + email=args.email, + password=args.password, + )] + + return config + + +def setupAccounts(config: Config) -> Config: + def validEmail(email: str) -> bool: + """Validate Email.""" + pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$" + return bool(re.match(pattern, email)) + + loadedAccounts = [] + for account in config.accounts: + if ( + not 'email' in account + or not isinstance(account.email, str) + or not validEmail(account.email) + ): + logging.warning( + f"[CREDENTIALS] Invalid email '{account.get('email', 'No email provided')}'," + f" skipping this account" + ) + continue + if not 'password' in account or not isinstance(account['password'], str): + logging.warning( + f"[CREDENTIALS] Invalid password '{account.get('password', 'No password provided')}'," + f" skipping this account" + ) + loadedAccounts.append(account) + + if not loadedAccounts: + noAccountsNotice = """ + [ACCOUNT] No valid account provided. + [ACCOUNT] Please provide a valid account, either using command line arguments or a configuration file. + [ACCOUNT] For command line, please use the following arguments (change the email and password): + [ACCOUNT] `--email youremail@domain.com --password yourpassword` + [ACCOUNT] For configuration file, please generate a configuration file using the `-C` argument, + [ACCOUNT] then edit the generated file by replacing the email and password using yours. + """ + logging.error(noAccountsNotice) + exit(1) + + random.shuffle(loadedAccounts) + config.accounts = loadedAccounts + return config + +def createEmptyConfig(configPath: Path, config: Config) -> None: + if configPath.is_file(): + logging.error( + f"[CONFIG] A file already exists at '{configPath}'" + ) + exit(1) + + emptyConfig = Config( + { + 'apprise': { + 'urls': ['discord://{WebhookID}/{WebhookToken}'] + }, + 'accounts': [ + { + 'email': 'Your Email 1', + 'password': 'Your Password 1', + 'totp': '0123 4567 89ab cdef', + 'proxy': 'http://user:pass@host1:port' + }, + { + 'email': 'Your Email 2', + 'password': 'Your Password 2', + 'totp': '0123 4567 89ab cdef', + 'proxy': 'http://user:pass@host2:port' + } + ] + } + ) + with open(configPath, "w", encoding="utf-8") as configFile: + yaml.dump((emptyConfig | config).toDict(), configFile) + logging.info( + f"[CONFIG] A configuration file was created at '{configPath}'" + ) + exit(0) + + +def loadConfig( + configFilename="config.yaml", defaultConfig=DEFAULT_CONFIG +) -> Config: + args = argumentParser() + if args.config: + configFile = Path(args.config) + else: + configFile = getProjectRoot() / configFilename + + args_config = commandLineArgumentsAsConfig(args) + + if args.create_config: + createEmptyConfig(configFile, args_config) + + config = defaultConfig | Config.fromYaml(configFile) | args_config + config = setupAccounts(config) + + return config + + +def sendNotification(title: str, body: str, e: Exception = None) -> None: + if not CONFIG.apprise.enabled or ( + e and not CONFIG.get("apprise.notify.uncaught-exception") + ): + return + apprise = Apprise() + urls: list[str] = CONFIG.apprise.urls + if not urls: + logging.debug("No urls found, not sending notification") + return + for url in urls: + apprise.add(url) + assert apprise.notify(title=str(title), body=str(body)) + + +def getAnswerCode(key: str, string: str) -> str: + t = sum(ord(string[i]) for i in range(len(string))) + t += int(key[-2:], 16) + return str(t) + + +def formatNumber(number, num_decimals=2) -> str: + return pylocale.format_string(f"%10.{num_decimals}f", number, grouping=True).strip() + + +def getBrowserConfig(sessionPath: Path) -> dict | None: + configFile = sessionPath / "config.json" + if not configFile.exists(): + return + with open(configFile, "r") as f: + return json.load(f) + + +def saveBrowserConfig(sessionPath: Path, config: dict) -> None: + configFile = sessionPath / "config.json" + with open(configFile, "w") as f: + json.dump(config, f) + + +def makeRequestsSession(session: Session = requests.session()) -> Session: + retry = Retry( + total=CONFIG.retries.max, + backoff_factor=1, + status_forcelist=[ + 500, + 502, + 503, + 504, + ], + ) + session.mount( + "https://", HTTPAdapter(max_retries=retry) + ) # See https://stackoverflow.com/a/35504626/4164390 to finetune + session.mount( + "http://", HTTPAdapter(max_retries=retry) + ) # See https://stackoverflow.com/a/35504626/4164390 to finetune + return session + + +CONFIG = loadConfig() diff --git a/test/test_main.py b/test/test_main.py index 5081dbd7..5df13c49 100644 --- a/test/test_main.py +++ b/test/test_main.py @@ -2,6 +2,8 @@ from unittest.mock import patch, MagicMock import main +from src import utils +from src.utils import Config, CONFIG class TestMain(unittest.TestCase): @@ -9,18 +11,16 @@ class TestMain(unittest.TestCase): # noinspection PyUnusedLocal @patch.object(main, "save_previous_points_data") @patch.object(main, "setupLogging") - @patch.object(main, "setupAccounts") @patch.object(main, "executeBot") - # @patch.object(Utils, "send_notification") + # @patch.object(utils, "sendNotification") def test_send_notification_when_exception( self, # mock_send_notification: MagicMock, mock_executeBot: MagicMock, - mock_setupAccounts: MagicMock, mock_setupLogging: MagicMock, mock_save_previous_points_data: MagicMock, ): - mock_setupAccounts.return_value = [{"password": "foo", "username": "bar"}] + CONFIG.accounts = [Config({"password": "foo", "email": "bar"})] mock_executeBot.side_effect = Exception main.main() diff --git a/test/test_utils.py b/test/test_utils.py index 780a76e8..a359a437 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -1,11 +1,9 @@ -from argparse import Namespace from unittest import TestCase -from src.utils import Utils +from src.utils import CONFIG, sendNotification class TestUtils(TestCase): def test_send_notification(self): - Utils.args = Namespace() - Utils.args.disable_apprise = False - Utils.sendNotification("title", "body") + CONFIG.apprise.enabled = True + sendNotification("title", "body")