Skip to content

Commit

Permalink
Adds examples and docs for cloud events messages (#785)
Browse files Browse the repository at this point in the history
Signed-off-by: Elena Kolevska <[email protected]>
  • Loading branch information
elena-kolevska authored Feb 21, 2025
1 parent 69c06e5 commit ba64f31
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 1 deletion.
50 changes: 50 additions & 0 deletions daprdocs/content/en/python-sdk-docs/python-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,56 @@ with DaprClient() as d:
resp = d.publish_event(pubsub_name='pubsub', topic_name='TOPIC_A', data='{"message":"Hello World"}')
```


Send [CloudEvents](https://cloudevents.io/) messages with a json payload:
```python
from dapr.clients import DaprClient
import json

with DaprClient() as d:
cloud_event = {
'specversion': '1.0',
'type': 'com.example.event',
'source': 'my-service',
'id': 'myid',
'data': {'id': 1, 'message': 'hello world'},
'datacontenttype': 'application/json',
}

# Set the data content type to 'application/cloudevents+json'
resp = d.publish_event(
pubsub_name='pubsub',
topic_name='TOPIC_CE',
data=json.dumps(cloud_event),
data_content_type='application/cloudevents+json',
)
```

Publish [CloudEvents](https://cloudevents.io/) messages with plain text payload:
```python
from dapr.clients import DaprClient
import json

with DaprClient() as d:
cloud_event = {
'specversion': '1.0',
'type': 'com.example.event',
'source': 'my-service',
'id': "myid",
'data': 'hello world',
'datacontenttype': 'text/plain',
}

# Set the data content type to 'application/cloudevents+json'
resp = d.publish_event(
pubsub_name='pubsub',
topic_name='TOPIC_CE',
data=json.dumps(cloud_event),
data_content_type='application/cloudevents+json',
)
```


#### Subscribe to messages

```python
Expand Down
11 changes: 10 additions & 1 deletion examples/pubsub-simple/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ expected_stdout_lines:
- '== APP == Dead-Letter Subscriber received: id=7, message="hello world", content_type="application/json"'
- '== APP == Dead-Letter Subscriber. Received via deadletter topic: TOPIC_D_DEAD'
- '== APP == Dead-Letter Subscriber. Originally intended topic: TOPIC_D'
- '== APP == Subscriber received: TOPIC_CE'
- '== APP == Subscriber received a json cloud event: id=8, message="hello world", content_type="application/json"'
- '== APP == Subscriber received: TOPIC_CE'
- '== APP == Subscriber received plain text cloud event: hello world, content_type="text/plain"'
output_match_mode: substring
background: true
match_order: none
Expand All @@ -45,7 +50,7 @@ sleep: 3

```bash
# 1. Start Subscriber (expose gRPC server receiver on port 50051)
dapr run --app-id python-subscriber --app-protocol grpc --app-port 50051 python3 subscriber.py
dapr run --app-id python-subscriber --app-protocol grpc --app-port 50051 -- python3 subscriber.py
```

<!-- END_STEP -->
Expand All @@ -60,6 +65,10 @@ expected_stdout_lines:
- "== APP == {'id': 3, 'message': 'hello world'}"
- "== APP == {'id': 4, 'message': 'hello world'}"
- "== APP == {'id': 5, 'message': 'hello world'}"
- "== APP == {'id': 6, 'message': 'hello world'}"
- "== APP == {'id': 7, 'message': 'hello world'}"
- "== APP == {'specversion': '1.0', 'type': 'com.example.event', 'source': 'my-service', 'id': 'abc-8', 'data': {'id': 8, 'message': 'hello world'}, 'datacontenttype': 'application/json'}"
- "== APP == {'specversion': '1.0', 'type': 'com.example.event', 'source': 'my-service', 'id': 'abc-10', 'data': 'hello world', 'datacontenttype': 'text/plain'}"
background: true
sleep: 15
-->
Expand Down
48 changes: 48 additions & 0 deletions examples/pubsub-simple/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,51 @@

# Print the request
print(req_data, flush=True)

## Send a cloud event with json data
id = 8
cloud_event = {
'specversion': '1.0',
'type': 'com.example.event',
'source': 'my-service',
'id': f'abc-{id}',
'data': {'id': id, 'message': 'hello world'},
'datacontenttype': 'application/json',
}

# Set the data content type to 'application/cloudevents+json'
resp = d.publish_event(
pubsub_name='pubsub',
topic_name='TOPIC_CE',
data=json.dumps(cloud_event),
data_content_type='application/cloudevents+json',
)

# Print the request
print(cloud_event, flush=True)

time.sleep(0.5)

# Send a cloud event with plain text data
id = 10
cloud_event = {
'specversion': '1.0',
'type': 'com.example.event',
'source': 'my-service',
'id': f'abc-{id}',
'data': 'hello world',
'datacontenttype': 'text/plain',
}

# Set the data content type to 'application/cloudevents+json'
resp = d.publish_event(
pubsub_name='pubsub',
topic_name='TOPIC_CE',
data=json.dumps(cloud_event),
data_content_type='application/cloudevents+json',
)

# Print the request
print(cloud_event, flush=True)

time.sleep(0.5)
36 changes: 36 additions & 0 deletions examples/pubsub-simple/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,42 @@ def mytopic(event: v1.Event) -> TopicEventResponse:
return TopicEventResponse('success')


@app.subscribe(pubsub_name='pubsub', topic='TOPIC_CE')
def receive_cloud_events(event: v1.Event) -> TopicEventResponse:
print('Subscriber received: ' + event.Subject(), flush=True)

content_type = event.content_type
data = event.Data()

try:
if content_type == 'application/json':
# Handle JSON data
json_data = json.loads(data)
print(
f'Subscriber received a json cloud event: id={json_data["id"]}, message="{json_data["message"]}", '
f'content_type="{event.content_type}"',
flush=True,
)
elif content_type == 'text/plain':
# Handle plain text data
if isinstance(data, bytes):
data = data.decode('utf-8')
print(
f'Subscriber received plain text cloud event: {data}, '
f'content_type="{content_type}"',
flush=True,
)
else:
print(f'Received unknown content type: {content_type}', flush=True)
return TopicEventResponse('fail')

except Exception as e:
print('Failed to process event data:', e, flush=True)
return TopicEventResponse('fail')

return TopicEventResponse('success')


@app.subscribe(pubsub_name='pubsub', topic='TOPIC_D', dead_letter_topic='TOPIC_D_DEAD')
def fail_and_send_to_dead_topic(event: v1.Event) -> TopicEventResponse:
return TopicEventResponse('retry')
Expand Down

0 comments on commit ba64f31

Please sign in to comment.