-
Notifications
You must be signed in to change notification settings - Fork 134
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add decorator for event subscriptions (#290)
* Add decorator for event subscriptions * Fix naming of the app import * Add test cases * Add documentation * Fix flake8 errors * Fix typings * Migrate tests to unittest and add some tests to exercise callbacks * Fix some linter errors Co-authored-by: Charlie Stanley <[email protected]>
- Loading branch information
Showing
5 changed files
with
188 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,8 +6,10 @@ | |
""" | ||
|
||
from .actor import DaprActor | ||
from .app import DaprApp | ||
|
||
|
||
__all__ = [ | ||
'DaprActor', | ||
'DaprApp' | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Copyright (c) Microsoft Corporation and Dapr Contributors. | ||
Licensed under the MIT License. | ||
""" | ||
|
||
from typing import Dict, List, Optional | ||
from fastapi import FastAPI # type: ignore | ||
|
||
|
||
class DaprApp: | ||
""" | ||
Wraps a regular FastAPI app instance to enhance it with Dapr specific functionality. | ||
Args: | ||
app_instance: The FastAPI instance to wrap. | ||
""" | ||
|
||
def __init__(self, app_instance: FastAPI): | ||
self._app = app_instance | ||
self._subscriptions: List[Dict[str, object]] = [] | ||
|
||
self._app.add_api_route("/dapr/subscribe", | ||
self._get_subscriptions, | ||
methods=["GET"]) | ||
|
||
def subscribe(self, | ||
pubsub: str, | ||
topic: str, | ||
metadata: Optional[Dict[str, str]] = {}, | ||
route: Optional[str] = None): | ||
""" | ||
Subscribes to a topic on a pub/sub component. | ||
Subscriptions made through this method will show up when you GET /dapr/subscribe. | ||
Example: | ||
The following sample demonstrates how to use the subscribe method to register an | ||
event handler for the application on a pub/sub component named `pubsub`. | ||
>> app = FastAPI() | ||
>> dapr_app = DaprApp(app) | ||
>> @dapr_app.subscribe(pubsub='pubsub', topic='some_topic', route='/some_endpoint') | ||
>> def my_event_handler(event_data): | ||
>> pass | ||
Args: | ||
pubsub: The name of the pub/sub component. | ||
topic: The name of the topic. | ||
metadata: The metadata for the subscription. | ||
route: | ||
The HTTP route to register for the event subscription. By default we'll | ||
generate one that matches the pattern /events/{pubsub}/{topic}. You can | ||
override this with your own route. | ||
Returns: | ||
The decorator for the function. | ||
""" | ||
def decorator(func): | ||
event_handler_route = f"/events/{pubsub}/{topic}" if route is None else route | ||
|
||
self._app.add_api_route(event_handler_route, | ||
func, | ||
methods=["POST"]) | ||
|
||
self._subscriptions.append({ | ||
"pubsubname": pubsub, | ||
"topic": topic, | ||
"route": event_handler_route, | ||
"metadata": metadata | ||
}) | ||
|
||
return decorator | ||
|
||
def _get_subscriptions(self): | ||
return self._subscriptions |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
from fastapi import FastAPI | ||
from fastapi.testclient import TestClient | ||
from dapr.ext.fastapi import DaprApp | ||
from pydantic import BaseModel | ||
|
||
import unittest | ||
|
||
|
||
class Message(BaseModel): | ||
body: str | ||
|
||
|
||
class DaprAppTest(unittest.TestCase): | ||
def setUp(self): | ||
self.app = FastAPI() | ||
self.dapr_app = DaprApp(self.app) | ||
self.client = TestClient(self.app) | ||
|
||
def test_subscribe_subscription_registered(self): | ||
@self.dapr_app.subscribe(pubsub="pubsub", topic="test") | ||
def event_handler(event_data: Message): | ||
return "default route" | ||
|
||
self.assertEqual(len(self.dapr_app._subscriptions), 1) | ||
|
||
self.assertIn("/dapr/subscribe", [route.path for route in self.app.router.routes]) | ||
self.assertIn("/events/pubsub/test", [route.path for route in self.app.router.routes]) | ||
|
||
response = self.client.get("/dapr/subscribe") | ||
self.assertEqual( | ||
[{'pubsubname': 'pubsub', | ||
'topic': 'test', | ||
'route': '/events/pubsub/test', | ||
'metadata': {} | ||
}], response.json()) | ||
|
||
response = self.client.post("/events/pubsub/test", json={"body": "new message"}) | ||
self.assertEqual(response.status_code, 200) | ||
self.assertEqual(response.text, '"default route"') | ||
|
||
def test_subscribe_with_route_subscription_registered_with_custom_route(self): | ||
@self.dapr_app.subscribe(pubsub="pubsub", topic="test", route="/do-something") | ||
def event_handler(event_data: Message): | ||
return "custom route" | ||
|
||
self.assertEqual(len(self.dapr_app._subscriptions), 1) | ||
|
||
self.assertIn("/dapr/subscribe", [route.path for route in self.app.router.routes]) | ||
self.assertIn("/do-something", [route.path for route in self.app.router.routes]) | ||
|
||
response = self.client.get("/dapr/subscribe") | ||
self.assertEqual( | ||
[{'pubsubname': 'pubsub', | ||
'topic': 'test', | ||
'route': '/do-something', | ||
'metadata': {} | ||
}], response.json()) | ||
|
||
response = self.client.post("/do-something", json={"body": "new message"}) | ||
self.assertEqual(response.status_code, 200) | ||
self.assertEqual(response.text, '"custom route"') | ||
|
||
def test_subscribe_metadata(self): | ||
handler_metadata = {"rawPayload": "true"} | ||
|
||
@self.dapr_app.subscribe(pubsub="pubsub", | ||
topic="test", | ||
metadata=handler_metadata) | ||
def event_handler(event_data: Message): | ||
return "custom metadata" | ||
|
||
self.assertEqual((self.dapr_app._subscriptions[0]["metadata"]["rawPayload"]), "true") | ||
|
||
response = self.client.get("/dapr/subscribe") | ||
self.assertEqual( | ||
[{'pubsubname': 'pubsub', | ||
'topic': 'test', | ||
'route': '/events/pubsub/test', | ||
'metadata': {"rawPayload": "true"} | ||
}], response.json()) | ||
|
||
response = self.client.post("/events/pubsub/test", json={"body": "new message"}) | ||
self.assertEqual(response.status_code, 200) | ||
self.assertEqual(response.text, '"custom metadata"') | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |