Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The from_http returns CloudEvent with inaccessible attributes #246

Open
matzew opened this issue Dec 19, 2024 · 5 comments
Open

The from_http returns CloudEvent with inaccessible attributes #246

matzew opened this issue Dec 19, 2024 · 5 comments
Labels
question Further information is requested

Comments

@matzew
Copy link
Member

matzew commented Dec 19, 2024

Expected Behavior

The from_http function should return a CloudEvent object where attributes like specversion, type, and source are accessible via event["attributes"] or similar documented methods.

Actual Behavior

But for me the from_http function returns a CloudEvent object with an empty attributes field. Attempting to access specversion, type, or source returns missing.

Steps to Reproduce the Problem

  1. Save the following script as reproducer.py:
import logging
from cloudevents.http import from_http
from http.server import BaseHTTPRequestHandler, HTTPServer

logging.basicConfig(level=logging.DEBUG)

class CloudEventHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        headers = {key.lower(): value for key, value in self.headers.items()}
        content_length = int(self.headers.get("content-length", 0))
        body = self.rfile.read(content_length).decode("utf-8")

        logging.info(f"Received headers: {headers}")
        logging.info(f"Received body: {body}")

        try:
            event = from_http(headers, body)
            logging.info(f"Parsed CloudEvent: {event}")

            logging.info(f"Type of parsed event: {type(event)}")
            logging.info(f"Event attributes: {event.get('attributes', {})}")

            specversion = event.get("attributes", {}).get("specversion", "missing")
            event_type = event.get("attributes", {}).get("type", "missing")
            source = event.get("attributes", {}).get("source", "missing")

            logging.info(f"Specversion: {specversion}, Type: {event_type}, Source: {source}")

            response_body = {
                "specversion": specversion,
                "type": event_type,
                "source": source,
            }

            self.send_response(200)
            self.send_header("Content-Type", "application/json")
            self.end_headers()
            self.wfile.write(str(response_body).encode("utf-8"))

        except Exception as e:
            logging.error(f"Error parsing CloudEvent: {e}")
            self.send_response(500)
            self.end_headers()
            self.wfile.write(f"Error: {e}".encode("utf-8"))

def run(server_class=HTTPServer, handler_class=CloudEventHandler, port=8080):
    server_address = ("", port)
    httpd = server_class(server_address, handler_class)
    logging.info(f"Starting HTTP server on port {port}")
    httpd.serve_forever()

if __name__ == "__main__":
    run()
  1. Run the script via python reproducer.py

  2. Access the server, like:

curl -v -X POST \
    -H "Content-Type: application/json" \
    -H "ce-specversion: 1.0" \
    -H "ce-type: test.event.type" \
    -H "ce-source: /my/source" \
    -H "ce-id: 12345" \
    -d '{"key":"value"}' \
    http://127.0.0.1:8080
  1. See the logs:
INFO:root:Received headers: {'host': '127.0.0.1:8080', 'user-agent': 'curl/8.9.1', 'accept': '*/*', 'content-type': 'application/json', 'ce-specversion': '1.0', 'ce-type': 'test.event.type', 'ce-source': '/my/source', 'ce-id': '12345', 'content-length': '15'}
INFO:root:Received body: {"key":"value"}
INFO:root:Parsed CloudEvent: {'attributes': {'specversion': '1.0', 'id': '12345', 'source': '/my/source', 'type': 'test.event.type', 'datacontenttype': 'application/json', 'time': '2024-12-19T09:29:20.297551+00:00'}, 'data': {'key': 'value'}}
INFO:root:Type of parsed event: <class 'cloudevents.http.event.CloudEvent'>
INFO:root:Event attributes: {}
INFO:root:Specversion: missing, Type: missing, Source: missing

Specifications

  • Platform: Linux (Fedora release 41)
  • Python Version: Python 3.13.0
  • SDK-Version: via pip show cloudevents:
Name: cloudevents
Version: 1.11.0
Summary: CloudEvents Python SDK
Home-page: https://github.com/cloudevents/sdk-python
Author: The Cloud Events Contributors
Author-email: [email protected]
License: https://www.apache.org/licenses/LICENSE-2.0
Location: /home/<user-name>/.local/lib/python3.13/site-packages
Requires: deprecation
Required-by: 
@lkingland
Copy link

lkingland commented Dec 19, 2024

Perhaps try accessing the attributes thusly:

specversion = event.specversion
event_type = event.type
source = event.source

If that doesn't work, I'll dig deeper 👍🏻

@matzew
Copy link
Member Author

matzew commented Dec 20, 2024

import logging
from cloudevents.http import from_http
from http.server import BaseHTTPRequestHandler, HTTPServer

logging.basicConfig(level=logging.DEBUG)

class CloudEventHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        headers = {key.lower(): value for key, value in self.headers.items()}
        content_length = int(self.headers.get("content-length", 0))
        body = self.rfile.read(content_length).decode("utf-8")

        logging.info(f"Received headers: {headers}")
        logging.info(f"Received body: {body}")

        try:
            # Parse the CloudEvent
            event = from_http(headers, body)
            logging.info(f"Parsed CloudEvent: {event}")

            # Access attributes as struct-like members
            specversion = event.specversion
            event_type = event.type
            source = event.source

            logging.info(f"Specversion: {specversion}, Type: {event_type}, Source: {source}")

            response_body = {
                "specversion": specversion,
                "type": event_type,
                "source": source,
            }

            self.send_response(200)
            self.send_header("Content-Type", "application/json")
            self.end_headers()
            self.wfile.write(str(response_body).encode("utf-8"))

        except Exception as e:
            logging.error(f"Error parsing CloudEvent: {e}")
            self.send_response(500)
            self.end_headers()
            self.wfile.write(f"Error: {e}".encode("utf-8"))

def run(server_class=HTTPServer, handler_class=CloudEventHandler, port=8080):
    server_address = ("", port)
    httpd = server_class(server_address, handler_class)
    logging.info(f"Starting HTTP server on port {port}")
    httpd.serve_forever()

if __name__ == "__main__":
    run()

Gives me 500:

http -j -v POST http://127.0.0.1:8080/ \
  Content-Type:application/json \
  ce-specversion:1.0 \
  ce-type:event.registry \
  ce-source:/dev/console/web/form \
  ce-id:$(uuidgen)
POST / HTTP/1.1
Accept: application/json, */*;q=0.5
Accept-Encoding: gzip, deflate
Connection: keep-alive
Content-Length: 0
Content-Type: application/json
Host: 127.0.0.1:8080
User-Agent: HTTPie/3.2.3
ce-id: 8e76d231-ea5c-4de8-b16a-0262006b7e50
ce-source: /dev/console/web/form
ce-specversion: 1.0
ce-type: event.registry



HTTP/1.0 500 Internal Server Error
Date: Fri, 20 Dec 2024 10:44:46 GMT
Server: BaseHTTP/0.6 Python/3.13.0

Error: 'CloudEvent' object has no attribute 'specversion'

@xSAVIKx
Copy link
Member

xSAVIKx commented Dec 20, 2024 via email

@xSAVIKx xSAVIKx added the question Further information is requested label Dec 20, 2024
@xSAVIKx
Copy link
Member

xSAVIKx commented Dec 20, 2024

@matzew can you please try it out? Just tag me if you see it's not working. I'll dig into the code then

@ncouture
Copy link

ncouture commented Dec 22, 2024

@matzew

You must use event.get(<attribute_name>), here's a functional example:

import sys
import logging
from cloudevents.http import from_http
from http.server import BaseHTTPRequestHandler, HTTPServer

logging.basicConfig(level=logging.DEBUG)


class CloudEventHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        headers = {key.lower(): value for key, value in self.headers.items()}
        content_length = int(self.headers.get("content-length", 0))
        body = self.rfile.read(content_length).decode("utf-8")

        logging.info(f"Received headers: {headers}")
        logging.info(f"Received body: {body}")

        try:
            event = from_http(headers, body)

            specversion = event.get("specversion")
            event_type = event.get("type")
            source = event.get("source")

            logging.info(
                f"Specversion: {specversion} "
                f"Type: {event_type} "
                f"Source: {source}"
            )

            response_body = {
                "specversion": specversion,
                "type": event_type,
                "source": source,
            }

            self.send_response(200)
            self.send_header("Content-Type", "application/json")
            self.end_headers()
            self.wfile.write(str(response_body).encode("utf-8") + "\n".encode("utf-8"))

        except Exception as e:
            logging.error(f"Error parsing CloudEvent: {e}")
            self.send_response(500)
            self.end_headers()
            self.wfile.write(f"Error: {e}".encode("utf-8"))
            raise


def run(server_class=HTTPServer, handler_class=CloudEventHandler, port=8080):
    server_address = ("", port)
    httpd = server_class(server_address, handler_class)
    logging.info(f"Starting HTTP server on port {port}")
    httpd.serve_forever()


if __name__ == "__main__":
    try:
        run()
    except KeyboardInterrupt:
        print("User interrupted")
        sys.exit(1)
$ curl -v \
     -H "Content-Type: application/json" \
     -H "ce-specversion: 1.0" \
     -H "ce-type: debugging" \
     -H "ce-source: /" \
     -H "ce-id: 12345" \
     -d '{"attributes": "acquired"}' \
     http://127.0.0.1:8080
*   Trying 127.0.0.1:8080...
* Connected to 127.0.0.1 (127.0.0.1) port 8080 (#0)
> POST / HTTP/1.1
> Host: 127.0.0.1:8080
> User-Agent: curl/7.88.1
> Accept: */*
> Content-Type: application/json
> ce-specversion: 1.0
> ce-type: debugging
> ce-source: /
> ce-id: 12345
> Content-Length: 26
> 
INFO:root:Received headers: {'host': '127.0.0.1:8080', 'user-agent': 'curl/7.88.1', 'accept': '*/*', 'content-type': 'application/json', 'ce-specversion': '1.0', 'ce-type': 'debugging', 'ce-source': '/', 'ce-id': '12345', 'content-length': '26'}
INFO:root:Received body: {"attributes": "acquired"}
INFO:root:Specversion: 1.0 Type: debugging Source: /
127.0.0.1 - - [22/Dec/2024 00:09:01] "POST / HTTP/1.1" 200 -
* HTTP 1.0, assume close after body
< HTTP/1.0 200 OK
< Server: BaseHTTP/0.6 Python/3.12.7
< Date: Sun, 22 Dec 2024 05:09:01 GMT
< Content-Type: application/json
< 
{'specversion': '1.0', 'type': 'debugging', 'source': '/'}
* Closing connection 0

Introspecting the event object lead me to the abstract.CloudEvent class.

This class overrides the __getitem__ method and is the subclass of the CloudEvent imported in the previous code. It gets the attribute by name for you with the help of other functions:

    def __getitem__(self, key: str) -> typing.Any:
        """
        Returns a value of an attribute of the event denoted by the given `key`.

        The `data` of the event should be accessed by the `.data` accessor rather
        than this mapping.

        :param key: The name of the event attribute to retrieve the value for.
        :returns: The event attribute value.
        """
        return self._get_attributes()[key]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants