-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: Video Inference #200
Changes from 6 commits
23cb10a
beb4735
00220cc
1cca73e
c91fee9
915eddb
e6b627e
f5f06ec
7e1a93d
4550254
96db4b3
dd9dcfc
8c22cdf
5187587
7af4637
27cbecc
5dd4ee9
aa250cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,4 +16,5 @@ supervision | |
urllib3>=1.26.6 | ||
tqdm>=4.41.0 | ||
PyYAML>=5.3.1 | ||
requests_toolbelt | ||
requests_toolbelt | ||
python-magic |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,228 @@ | ||
import json | ||
import time | ||
from typing import List | ||
from urllib.parse import urljoin | ||
|
||
import magic | ||
import requests | ||
|
||
from roboflow.config import API_URL | ||
from roboflow.models.inference import InferenceModel | ||
|
||
SUPPORTED_ROBOFLOW_MODELS = ["object-detection", "classification", "instance-segmentation"] | ||
|
||
SUPPORTED_ADDITIONAL_MODELS = { | ||
"clip": { | ||
"model_id": "clip", | ||
"model_version": "1", | ||
"inference_type": "clip-embed-image" | ||
}, | ||
"gaze": { | ||
"model_id": "gaze", | ||
"model_version": "1", | ||
"inference_type": "gaze-detection" | ||
} | ||
} | ||
|
||
|
||
def is_valid_mime(filename): | ||
mime = magic.Magic(mime=True) | ||
file_type = mime.from_file(filename) | ||
return file_type in ["video/mp4", "video/avi", "video/webm"] | ||
|
||
|
||
def is_valid_video(filename): | ||
# check file type | ||
if not is_valid_mime(filename): | ||
return False | ||
|
||
return True | ||
|
||
class VideoInferenceModel(InferenceModel): | ||
""" | ||
Run inference on an object detection model hosted on Roboflow or served through Roboflow Inference. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
api_key, | ||
): | ||
""" | ||
Create a VideoDetectionModel object through which you can run inference on videos. | ||
|
||
Args: | ||
api_key (str): Your API key (obtained via your workspace API settings page). | ||
""" | ||
self.__api_key = api_key | ||
|
||
def predict( | ||
self, | ||
video_path: str, | ||
inference_type: str, | ||
fps: int = 5, | ||
additional_models: list = None, | ||
) -> List[str, str]: | ||
""" | ||
Infers detections based on image from specified model and image path. | ||
|
||
Args: | ||
video_path (str): path to the video you'd like to perform prediction on | ||
inference_type (str): type of the model to run | ||
fps (int): frames per second to run inference | ||
|
||
Returns: | ||
A list of the signed url and job id | ||
|
||
Example: | ||
>>> import roboflow | ||
|
||
>>> rf = roboflow.Roboflow(api_key="") | ||
|
||
>>> project = rf.workspace().project("PROJECT_ID") | ||
|
||
>>> model = project.version("1").model | ||
|
||
>>> prediction = model.predict("video.mp4", fps=5, inference_type="object-detection" | ||
""" | ||
|
||
url = urljoin(API_URL, "/video_upload_signed_url/?api_key=", self.__api_key) | ||
|
||
if fps > 5: | ||
raise Exception("FPS must be less than or equal to 5.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets set this to a higher value, up to the frame rate of the video itself. @PacificDou lets add logic in the video server to use fps = min(video_fps,s infer_fps); and set this actual value into the database object returned to the user when they query the videojob. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The user needs to know that the FPS can be upto the video frame rate, but given the fps-based pricing, set this value to as low as makes sense to their app. Maybe make it part of the blogpost? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be great if the server raised an exception if FPS is higher. Otherwise, if the user provides a video URL we have to download it in the package to check its FPS before we can do anything. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current behavior is that if the video frame-rate < infer_fs, the job fails and the user gets a helpful error message (this check happens when the video finally gets downloaded into the server and processing starts). Unfortunately probing a video to get its true video framerate is not possible at the time the request is made, its part of a batch operation. |
||
|
||
for model in additional_models: | ||
if model not in SUPPORTED_ADDITIONAL_MODELS: | ||
raise Exception(f"Model {model} is not supported for video inference.") | ||
|
||
if inference_type not in SUPPORTED_ROBOFLOW_MODELS: | ||
raise Exception(f"Model {inference_type} is not supported for video inference.") | ||
|
||
if not is_valid_video(video_path): | ||
raise Exception("Video path is not valid") | ||
|
||
payload = json.dumps( | ||
{ | ||
"file_name": video_path, | ||
} | ||
) | ||
|
||
headers = {"Content-Type": "application/json"} | ||
|
||
response = requests.request("POST", url, headers=headers, data=payload) | ||
|
||
signed_url = response.json()["signed_url"] | ||
|
||
print("Uploaded video to signed url: " + signed_url) | ||
|
||
url = urljoin(API_URL, "/videoinfer/?api_key=", self.__api_key) | ||
|
||
models = [ | ||
{ | ||
"model_id": self.dataset_id, | ||
"model_version": self.version, | ||
"inference_type": self.inference_type, | ||
} | ||
] | ||
|
||
for model in additional_models: | ||
models.append(SUPPORTED_ADDITIONAL_MODELS[model]) | ||
|
||
payload = json.dumps( | ||
{ | ||
"input_url": signed_url, | ||
"infer_fps": 5, | ||
"models": models | ||
} | ||
) | ||
|
||
response = requests.request("POST", url, headers=headers, data=payload) | ||
|
||
job_id = response.json()["job_id"] | ||
|
||
self.job_id = job_id | ||
|
||
return job_id, signed_url | ||
|
||
def poll_for_results(self, job_id: str = None) -> dict: | ||
""" | ||
Polls the Roboflow API to check if video inference is complete. | ||
|
||
Returns: | ||
Inference results as a dict | ||
|
||
Example: | ||
>>> import roboflow | ||
|
||
>>> rf = roboflow.Roboflow(api_key="") | ||
|
||
>>> project = rf.workspace().project("PROJECT_ID") | ||
|
||
>>> model = project.version("1").model | ||
|
||
>>> prediction = model.predict("video.mp4") | ||
|
||
>>> results = model.poll_for_results() | ||
""" | ||
|
||
if job_id is None: | ||
job_id = self.job_id | ||
|
||
url = urljoin( | ||
API_URL, "/videoinfer/?api_key=", self.__api_key, "&job_id=", self.job_id | ||
) | ||
|
||
response = requests.get(url, headers={"Content-Type": "application/json"}) | ||
|
||
data = response.json() | ||
|
||
if data["success"] != 0 or data["status_info"] != "success": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if int(data['status']) == 0: |
||
print("Job not complete yet. Check back in a minute.") | ||
return {} | ||
|
||
output_signed_url = data["output_signed_url"] | ||
|
||
inference_data = requests.get( | ||
output_signed_url, headers={"Content-Type": "application/json"} | ||
) | ||
|
||
# frame_offset and model name are top-level keys | ||
return inference_data.json() | ||
|
||
def poll_until_results(self, job_id) -> dict: | ||
""" | ||
Polls the Roboflow API to check if video inference is complete. | ||
|
||
When inference is complete, the results are returned. | ||
|
||
Returns: | ||
Inference results as a dict | ||
|
||
Example: | ||
>>> import roboflow | ||
|
||
>>> rf = roboflow.Roboflow(api_key="") | ||
|
||
>>> project = rf.workspace().project("PROJECT_ID") | ||
|
||
>>> model = project.version("1").model | ||
|
||
>>> prediction = model.predict("video.mp4") | ||
|
||
>>> results = model.poll_until_results() | ||
""" | ||
if job_id is None: | ||
job_id = self.job_id | ||
|
||
attempts = 0 | ||
|
||
while True: | ||
response = self.poll_for_response() | ||
|
||
time.sleep(60) | ||
|
||
print(f"({attempts * 60}s): Checking for inference results") | ||
|
||
attempts += 1 | ||
|
||
if response != {}: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd suggest moving this check before sleep |
||
return response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this API will only return a signed_url for uploading, and user still need to upload the video by himself, right ? @bigbitbus
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the user can use that returned value to upload a video to that signed url