Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Script to purge all unprocessed messages #623

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Script to purge all unprocessed messages #623

wants to merge 2 commits into from

Conversation

kongzii
Copy link
Contributor

@kongzii kongzii commented Jan 7, 2025

No description provided.

Copy link
Contributor

coderabbitai bot commented Jan 7, 2025

Walkthrough

The pull request introduces a new script purge_messages.py in the scripts directory. This script provides a command-line interface for purging unprocessed messages using the typer library. The script allows users to remove queued messages associated with a specific consumer address by providing a private key. It fetches the count of unprocessed transactions, prompts for user confirmation, and then systematically removes messages from the queue.

Changes

File Change Summary
scripts/purge_messages.py Added new CLI script for purging messages using typer library. Implements a main function that:
- Accepts a private key as input
- Checks unprocessed transaction count
- Prompts user for confirmation
- Removes messages from the queue

Sequence Diagram

sequenceDiagram
    participant User
    participant CLI as Purge Messages Script
    participant APIKeys
    participant Queue

    User->>CLI: Provide private key
    CLI->>APIKeys: Initialize with private key
    APIKeys-->>CLI: Consumer address
    CLI->>Queue: Get unprocessed transaction count
    Queue-->>CLI: Return count
    CLI->>User: Prompt confirmation (show count)
    User->>CLI: Confirm (y/n)
    alt Confirmed
        loop While unprocessed messages exist
            CLI->>Queue: Pop message
            Queue-->>CLI: Remove message
            CLI->>User: Print messages popped
        end
    else Cancelled
        CLI->>User: Exit without changes
    end
Loading

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
scripts/purge_messages.py (2)

33-34: Enhance CLI documentation and logging.

The script would benefit from proper CLI documentation, version information, and logging configuration.

Apply this diff to improve the CLI:

+import logging
+
+app = typer.Typer(
+    help="Utility script to purge unprocessed messages from the queue.",
+    name="purge_messages",
+)
+
+@app.command()
+def purge(
+    private_key: str = typer.Option(
+        ...,
+        help="Private key of the agent's wallet",
+        prompt=True,
+        hide_input=True,
+    ),
+    verbose: bool = typer.Option(
+        False,
+        "--verbose",
+        "-v",
+        help="Enable verbose logging",
+    ),
+) -> None:
+    """
+    Purge all unprocessed messages for a specific agent.
+    
+    Requires the agent's wallet private key for authentication.
+    """
+    logging.basicConfig(
+        level=logging.DEBUG if verbose else logging.INFO,
+        format="%(asctime)s - %(levelname)s - %(message)s",
+    )
+    main(SecretStr(private_key))
+
 if __name__ == "__main__":
-    typer.run(main)
+    app()

1-34: Security Advisory: Handle with care!

This script handles sensitive data (private keys) and performs destructive operations. Please consider:

  1. Adding rate limiting to prevent abuse
  2. Implementing audit logging for tracking purge operations
  3. Adding a dry-run mode for safety
  4. Documenting security implications in the README

Would you like me to help implement these security enhancements?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f9b4a85 and 10c4cb8.

📒 Files selected for processing (1)
  • scripts/purge_messages.py (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: test-build-image
  • GitHub Check: pytest-docker
  • GitHub Check: pytest
  • GitHub Check: mypy

Comment on lines 1 to 10
import typer
from prediction_market_agent_tooling.gtypes import PrivateKey
from pydantic import SecretStr

from prediction_market_agent.db.agent_communication import (
fetch_count_unprocessed_transactions,
pop_message,
)
from prediction_market_agent.utils import APIKeys

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add missing imports and remove unused references.

The BET_FROM_PRIVATE_KEY is referenced in the code but not imported. Also, consider adding type hints import for better type safety.

Apply this diff to fix the imports:

 import typer
+from typing import Optional
 from prediction_market_agent_tooling.gtypes import PrivateKey
 from pydantic import SecretStr

 from prediction_market_agent.db.agent_communication import (
     fetch_count_unprocessed_transactions,
     pop_message,
 )
-from prediction_market_agent.utils import APIKeys
+from prediction_market_agent.utils import APIKeys, BET_FROM_PRIVATE_KEY
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import typer
from prediction_market_agent_tooling.gtypes import PrivateKey
from pydantic import SecretStr
from prediction_market_agent.db.agent_communication import (
fetch_count_unprocessed_transactions,
pop_message,
)
from prediction_market_agent.utils import APIKeys
import typer
from typing import Optional
from prediction_market_agent_tooling.gtypes import PrivateKey
from pydantic import SecretStr
from prediction_market_agent.db.agent_communication import (
fetch_count_unprocessed_transactions,
pop_message,
)
from prediction_market_agent.utils import APIKeys, BET_FROM_PRIVATE_KEY

Comment on lines 12 to 16
def main(private_key: SecretStr) -> None:
keys = APIKeys(BET_FROM_PRIVATE_KEY=PrivateKey(private_key))
n_messages = fetch_count_unprocessed_transactions(
consumer_address=keys.bet_from_address
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add input validation and error handling.

The function lacks input validation and error handling for invalid private keys. Also missing is a docstring explaining the function's purpose and parameters.

Apply this diff to improve the function:

 def main(private_key: SecretStr) -> None:
+    """Purge all unprocessed messages for a specific agent.
+
+    Args:
+        private_key (SecretStr): The private key of the agent's wallet.
+
+    Raises:
+        ValueError: If the private key format is invalid.
+        ConnectionError: If unable to connect to the message queue.
+    """
+    if not private_key.get_secret_value():
+        raise ValueError("Private key cannot be empty")
+
+    try:
         keys = APIKeys(BET_FROM_PRIVATE_KEY=PrivateKey(private_key))
         n_messages = fetch_count_unprocessed_transactions(
             consumer_address=keys.bet_from_address
         )
+    except Exception as e:
+        raise ValueError(f"Invalid private key or connection error: {str(e)}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def main(private_key: SecretStr) -> None:
keys = APIKeys(BET_FROM_PRIVATE_KEY=PrivateKey(private_key))
n_messages = fetch_count_unprocessed_transactions(
consumer_address=keys.bet_from_address
)
def main(private_key: SecretStr) -> None:
"""Purge all unprocessed messages for a specific agent.
Args:
private_key (SecretStr): The private key of the agent's wallet.
Raises:
ValueError: If the private key format is invalid.
ConnectionError: If unable to connect to the message queue.
"""
if not private_key.get_secret_value():
raise ValueError("Private key cannot be empty")
try:
keys = APIKeys(BET_FROM_PRIVATE_KEY=PrivateKey(private_key))
n_messages = fetch_count_unprocessed_transactions(
consumer_address=keys.bet_from_address
)
except Exception as e:
raise ValueError(f"Invalid private key or connection error: {str(e)}")

Comment on lines +18 to +30
if (
input(
f"Are you sure you want to purge all {n_messages} messages for agent {keys.bet_from_address}? (y/n): "
)
!= "y"
):
return

popped = 0
while fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address):
pop_message(api_keys=keys)
popped += 1
print(f"Popped {popped} messages.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve robustness of the purging logic.

The current implementation has several potential issues:

  1. No timeout mechanism for the while loop
  2. No error handling during message purging
  3. No batch processing for efficiency
  4. No progress indication for large message counts

Apply this diff to improve the implementation:

+    import time
+    from tqdm import tqdm
+
     if (
         input(
             f"Are you sure you want to purge all {n_messages} messages for agent {keys.bet_from_address}? (y/n): "
         )
         != "y"
     ):
         return

     popped = 0
-    while fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address):
-        pop_message(api_keys=keys)
-        popped += 1
-        print(f"Popped {popped} messages.")
+    start_time = time.time()
+    timeout = 300  # 5 minutes timeout
+    
+    with tqdm(total=n_messages, desc="Purging messages") as pbar:
+        while (
+            fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address)
+            and time.time() - start_time < timeout
+        ):
+            try:
+                pop_message(api_keys=keys)
+                popped += 1
+                pbar.update(1)
+            except Exception as e:
+                print(f"\nError while purging message {popped + 1}: {str(e)}")
+                if input("\nContinue purging? (y/n): ") != "y":
+                    break
+
+    if popped < n_messages:
+        print(f"\nPurge incomplete. {n_messages - popped} messages remaining.")

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
scripts/purge_messages.py (1)

32-33: Enhance CLI interface with help text and version info.

Add proper CLI documentation and version information for better usability.

 if __name__ == "__main__":
-    typer.run(main)
+    app = typer.Typer(
+        help="Purge all unprocessed messages for a specific agent.",
+        add_completion=False,
+    )
+
+    @app.command()
+    def purge(
+        private_key: str = typer.Option(
+            ...,
+            help="Private key of the agent's wallet",
+            prompt=True,
+            hide_input=True,
+        ),
+    ) -> None:
+        """Purge all unprocessed messages for the specified agent."""
+        main(SecretStr(private_key))
+
+    app(prog_name="purge_messages")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 10c4cb8 and b9b5e4e.

📒 Files selected for processing (1)
  • scripts/purge_messages.py (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: test-build-image
  • GitHub Check: pytest-docker
  • GitHub Check: pytest
  • GitHub Check: mypy
🔇 Additional comments (3)
scripts/purge_messages.py (3)

1-9: Add missing imports and module docstring.

The imports section needs the following improvements:

  1. Add SecretStr for secure private key handling
  2. Add type hints import
  3. Add module docstring explaining the script's purpose
+"""
+Script to purge all unprocessed messages for a specific agent.
+
+This script provides a CLI interface to safely remove queued messages
+associated with a specific consumer address using their private key.
+"""
 import typer
+from typing import Optional
 from prediction_market_agent_tooling.gtypes import private_key_type
+from pydantic import SecretStr

 from prediction_market_agent.db.agent_communication import (
     fetch_count_unprocessed_transactions,
     pop_message,
 )
 from prediction_market_agent.utils import APIKeys

25-29: Improve robustness of the purging logic.

The current implementation lacks timeout mechanism, error handling, and progress indication.

     popped = 0
-    while fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address):
-        pop_message(api_keys=keys)
-        popped += 1
-        print(f"Popped {popped} messages.")
+    import time
+    from tqdm import tqdm
+    
+    start_time = time.time()
+    timeout = 300  # 5 minutes timeout
+    
+    with tqdm(total=n_messages, desc="Purging messages") as pbar:
+        while (
+            fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address)
+            and time.time() - start_time < timeout
+        ):
+            try:
+                pop_message(api_keys=keys)
+                popped += 1
+                pbar.update(1)
+            except Exception as e:
+                print(f"\nError while purging message {popped + 1}: {str(e)}")
+                if input("\nContinue purging? (y/n): ") != "y":
+                    break
+
+    if time.time() - start_time >= timeout:
+        print("\nOperation timed out.")
+    
+    if popped < n_messages:
+        remaining = n_messages - popped
+        print(f"\nPurge incomplete. {remaining} messages remaining.")

11-12: ⚠️ Potential issue

Improve private key handling and type safety.

The private key should be handled more securely using SecretStr, and the function needs proper type annotations.

-def main(private_key: str) -> None:
-    keys = APIKeys(BET_FROM_PRIVATE_KEY=private_key_type(private_key))
+def main(private_key: SecretStr) -> None:
+    """Purge all unprocessed messages for a specific agent.
+
+    Args:
+        private_key (SecretStr): The private key of the agent's wallet.
+
+    Raises:
+        ValueError: If the private key format is invalid.
+        ConnectionError: If unable to connect to the message queue.
+    """
+    if not private_key.get_secret_value():
+        raise ValueError("Private key cannot be empty")
+
+    try:
+        keys = APIKeys(BET_FROM_PRIVATE_KEY=private_key_type(private_key.get_secret_value()))
+    except Exception as e:
+        raise ValueError(f"Invalid private key format: {str(e)}")

Likely invalid or redundant comment.

Comment on lines +13 to +15
n_messages = fetch_count_unprocessed_transactions(
consumer_address=keys.bet_from_address
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for database operations.

The database operations should be wrapped in try-except blocks to handle potential connection issues.

+    try:
         n_messages = fetch_count_unprocessed_transactions(
             consumer_address=keys.bet_from_address
         )
+    except Exception as e:
+        raise ConnectionError(f"Failed to fetch unprocessed messages: {str(e)}")
+
+    if n_messages == 0:
+        print("No messages to purge.")
+        return
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
n_messages = fetch_count_unprocessed_transactions(
consumer_address=keys.bet_from_address
)
try:
n_messages = fetch_count_unprocessed_transactions(
consumer_address=keys.bet_from_address
)
except Exception as e:
raise ConnectionError(f"Failed to fetch unprocessed messages: {str(e)}")
if n_messages == 0:
print("No messages to purge.")
return

Comment on lines +1 to +33
import typer
from prediction_market_agent_tooling.gtypes import private_key_type

from prediction_market_agent.db.agent_communication import (
fetch_count_unprocessed_transactions,
pop_message,
)
from prediction_market_agent.utils import APIKeys


def main(private_key: str) -> None:
keys = APIKeys(BET_FROM_PRIVATE_KEY=private_key_type(private_key))
n_messages = fetch_count_unprocessed_transactions(
consumer_address=keys.bet_from_address
)

if (
input(
f"Are you sure you want to purge all {n_messages} messages for agent {keys.bet_from_address}? (y/n): "
)
!= "y"
):
return

popped = 0
while fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address):
pop_message(api_keys=keys)
popped += 1
print(f"Popped {popped} messages.")


if __name__ == "__main__":
typer.run(main)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Add security warning and restrict file permissions

The script requires security improvements:

  • Change file permissions to 600 (chmod 600 scripts/purge_messages.py) to restrict access since it handles private keys
  • Add a warning message at the start of the script:
def main(private_key: str) -> None:
    """
    WARNING: This script performs irreversible deletion of messages.
    Handle with care as it uses private keys and permanently removes data.
    """
🔗 Analysis chain

Add security considerations and file permissions.

  1. Add a warning about the destructive nature of this script
  2. Ensure the script has appropriate file permissions

Run this script to check and fix file permissions:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check and fix file permissions for the script

# Check current permissions
current_perms=$(stat -c "%a" scripts/purge_messages.py)
echo "Current permissions: $current_perms"

# Ensure the script is not world-readable/writable due to sensitive data handling
if [[ $current_perms =~ ^[0-7]{3}$ && $current_perms != "600" ]]; then
    echo "Warning: Script should have restricted permissions (600) due to handling sensitive data"
fi

Length of output: 229


Script:

#!/bin/bash
# Check file permissions using ls and search for warning messages
echo "File permissions:"
ls -l scripts/purge_messages.py

echo -e "\nChecking for existing warning messages:"
rg -i "warning|caution|danger" scripts/purge_messages.py

Length of output: 306

Comment on lines +26 to +27
while fetch_count_unprocessed_transactions(consumer_address=keys.bet_from_address):
pop_message(api_keys=keys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected that fetch_count_unprocessed_transactions is called only once, yielding N unprocessed transactions. Afterwards, a for loop is created with exactly N, and those are then deleted.
Any reason for continuously sending RPC calls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case it goes out of sync for some reason. But it's just script to be run "once in a decade", does that really matter?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants