Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(sdk): increase task reporting timeout from 5s to 5m #1048

Merged
merged 1 commit into from
Nov 20, 2024

Conversation

seriousben
Copy link
Member

@seriousben seriousben commented Nov 20, 2024

Context

Fixes #1037

The httpx client library sets various timeouts to 5s. Unfortunately, when uploading large files or a lot of files, this is not sufficient.

What

This PR does 2 things:

  • increase visibility into task reporting retries by modifying logs and adding a retry counter.
  • increase the default 5s Read Timeout to 5 minutes.

Note that while this should increase the success rate of most graphs outcome reporting, there are still some graphs with a very big number of files or very large files that would still have issues being uploaded. An alternative solution will be needed for example writing directly to blob storage using a pre-signed URL, splitting uploads in multiple requests to the server, doing chunk uploads, ...

New logs:

image

Testing

Using S3 run this graph:

Code
import subprocess, os, platform, sys
from typing import List, Tuple
from indexify import RemoteGraph, Graph, Image, indexify_function
import time

from pydantic import BaseModel


# @indexify_function(encoder="json")
@indexify_function()
def start_node(text: str) -> List[Tuple[int, str]]:
    return [(index, char) for index, char in enumerate(text)]


@indexify_function()
def concat_char_idx(char_data: Tuple[int, str]) -> str:
    return f"{char_data[1]}={char_data[0]}"


class Acc(BaseModel):
    values: List[str] = list()


@indexify_function(accumulate=Acc)
def append_to_list(acc: Acc, char_data: str) -> List[str]:
    char, index = char_data.split("=")
    if len(acc.values) <= int(index):
        acc.values.extend([""] * (int(index) - len(acc.values) + 1))
    acc.values[int(index)] = char
    return acc


@indexify_function()
def end_node(acc: Acc) -> str:
    return "".join(acc.values)


if __name__ == "__main__":
    graph = Graph(name="stress-test_graph", start_node=start_node)
    graph.add_edge(start_node, concat_char_idx)
    graph.add_edge(concat_char_idx, append_to_list)
    graph.add_edge(append_to_list, end_node)
    graph = RemoteGraph.deploy(graph)

    text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Morbi metus ipsum, tincidunt a ligula sed, feugiat luctus dui."
    # text = "hello world"
    invocation_id = graph.run(block_until_done=True, text=text)
    output = graph.output(invocation_id, "end_node")

    print(output)

Contribution Checklist

  • If the python-sdk was changed, please run make fmt in python-sdk/.
  • If the server was changed, please run make fmt in server/.
  • Make sure all PR Checks are passing.

@seriousben
Copy link
Member Author

Tracking the more involved improvement to this in #1049

@seriousben seriousben merged commit 1c4fb10 into main Nov 20, 2024
6 checks passed
@seriousben seriousben deleted the seriousben/increase-task-reporter-timeout branch November 20, 2024 20:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

error in response_body for s3 uploads using json encoder
2 participants