Skip to content

Commit

Permalink
Batch messages poll endpoint (#65)
Browse files Browse the repository at this point in the history
* support message poll rest endpoint

* replace mongo install action in the github actions 

* returns 204 no-content if the message list is empty
  • Loading branch information
zzzming authored Aug 28, 2020
1 parent 0224e63 commit d9b4b6a
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 37 deletions.
34 changes: 17 additions & 17 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ jobs:
build_test:
name: build and test
runs-on: ubuntu-latest
strategy:
matrix:
mongodb-version: [4.2]
steps:
- name: Set up Go
uses: actions/setup-go@v1
Expand All @@ -51,16 +54,13 @@ jobs:
with:
fetch-depth: 1
path: go/src/github.com/kafkaesque-io/pulsar-beam
- name: Install MongoDB
run: |
echo $GITHUB_EVENT_NAME
echo $GITHUB_EVENT_PATH
sudo apt-get update
sudo apt-get install -y mongodb
- name: Start MongoDB v${{ matrix.mongodb-version }}
uses: supercharge/[email protected]
with:
mongodb-version: ${{ matrix.mongodb-version }}
- name: Verify MongoDB Installation and Status
run: |
ls /var/lib/mongodb
sudo systemctl status mongodb
sudo docker ps
- name: Build Binary
run: |
go mod download
Expand Down Expand Up @@ -111,7 +111,10 @@ jobs:
e2e_test:
name: e2e_test
needs: [analysis, build_test]
runs-on: ubuntu-latest
runs-on: ubuntu-latest
strategy:
matrix:
mongodb-version: [4.2]
steps:
- name: Check out code
uses: actions/checkout@v1
Expand All @@ -123,16 +126,13 @@ jobs:
run: |
pwd
go mod download
- name: Install MongoDB
run: |
echo $GITHUB_EVENT_NAME
echo $GITHUB_EVENT_PATH
sudo apt-get update
sudo apt-get install -y mongodb
- name: Start MongoDB v${{ matrix.mongodb-version }}
uses: supercharge/[email protected]
with:
mongodb-version: ${{ matrix.mongodb-version }}
- name: Verify MongoDB Installation and Status
run: |
ls /var/lib/mongodb
sudo systemctl status mongodb
sudo docker ps
- name: Set up root certificate
env:
PULSAR_CLIENT_CERT: ${{ secrets.PULSAR_CLIENT_CERT }}
Expand Down
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Beam is an http based streaming and queueing system backed up by Apache Pulsar.
- [x] A message can be pushed to a webhook or Cloud Function for consumption.
- [x] A webhook or Cloud Function receives a message, process it and reply another message, in a response body, back to another Pulsar topic via Pulsar Beam.
- [x] Messages can be streamed via HTTP Sever Sent Event, [SSE](https://www.html5rocks.com/en/tutorials/eventsource/basics/)
- [x] Support HTTP polling of batch messages

Opening an issue and PR are welcomed! Please email `[email protected]` for any inquiry or demo.

Expand Down Expand Up @@ -59,6 +60,21 @@ Query parameters
2. SubscriptionInitialPosition -> supported type are `latest` as default and `earliest`
3. SubscriptionName -> the length must be 5 characters or longer. An auto-generated name will be provided in absence. Only the auto-generated subscription will be unsubscribed.

### Endpoint to poll batch messages
Polls a batch of messages always from the earliest subscription position from a topic.
```
/v2/poll/{persistent}/{tenant}/{namespace}/{topic}
```
These HTTP headers may be required to map to Pulsar topic.
1. Authorization -> Bearer token as Pulsar token
2. PulsarUrl -> *optional* a fully qualified pulsar or pulsar+ssl URL where the message should be sent to. It is optional. The message will be sent to Pulsar URL specified under `PulsarBrokerURL` in the pulsar-beam.yml file if it is absent.

Query parameters
1. SubscriptionType -> Supported type strings are `exclusive` as default, `shared`, and `failover`
2. SubscriptionName -> the length must be 5 characters or longer. An auto-generated name will be provided in absence. Only the auto-generated subscription will be unsubscribed.
3. size -> The batch size. The default is 10.
4. perMessageTimeoutMs -> is a wait time out for the next message to arrive. It is in milliseconds per message. The default is 300ms.

### Webhook registration
Webhook registration is done via REST API backed by a database of your choice, such as MongoDB, in momery cache, and Pulsar itself. Yes, you can use a compacted Pulsar topic as a database table to perform CRUD. The configuration parameter is `"PbDbType": "inmemory",` in the `pulsar_beam.yml` file or the env variable `PbDbType`.

Expand Down Expand Up @@ -149,7 +165,7 @@ One end to end test is under `./src/e2e/e2etest.go`, that performs the following
4. Verify the replied message on the sink topic
5. Delete the topic and its webhook document via RESTful API

Since the set up is non-trivial involving Pulsar Beam, a Cloud function or webhook, the test tool, and Pulsar itself with SSL, we recommend to take advantage of [the free plan at Kafkaesque.io](https://kafkaesque.io) as the Pulsar server and a Cloud Function that we have verified GCP Fcuntion, Azure Function or AWS Lambda will suffice in the e2e flow.
Since the set up is non-trivial involving Pulsar Beam, a Cloud function or webhook, the test tool, and Pulsar itself with SSL, we recommend to take advantage of [the free plan at kesque.com](https://kesque.com) as the Pulsar server and a Cloud Function that we have verified GCP Fcuntion, Azure Function or AWS Lambda will suffice in the e2e flow.

Step to perform unit test
```bash
Expand Down
3 changes: 2 additions & 1 deletion config/pulsar_beam.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ PulsarPrivateKey: ./unit-test/example_private_key
PbDbInterval: 10s
DbConnectionStr: mongodb://localhost:27017
DbName:
DbPassword:
DbPassword:
TrustStore: "/etc/ssl/certs/ca-bundle.crt"
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod h1:Crc/GCZ3NXD
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
Expand Down Expand Up @@ -79,6 +80,7 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
Expand Down Expand Up @@ -113,6 +115,7 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/tidwall/pretty v1.0.1/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
Expand Down
42 changes: 35 additions & 7 deletions src/broker/sse-broker.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package broker

import (
"strings"
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/kafkaesque-io/pulsar-beam/src/model"
"github.com/kafkaesque-io/pulsar-beam/src/pulsardriver"
)

const (
// SSEBrokerMaxSize the max size of the number of HTTP SSE session is supported
SSEBrokerMaxSize = 200

// TODO add counters and max limit for SSEBroker
log "github.com/sirupsen/logrus"
)

// GetPulsarClientConsumer returns Puslar client and consumer interface objects
Expand All @@ -31,3 +29,33 @@ func GetPulsarClientConsumer(url, token, topic, subscriptionName string, subType

return client, consumer, nil
}

// PollBatchMessages polls a batch of consumer messages
func PollBatchMessages(url, token, topic, subscriptionName string, subType pulsar.SubscriptionType, size, perMessageTimeoutMs int) (model.PulsarMessages, error) {
log.Infof("getbatchmessages called")
client, consumer, err := GetPulsarClientConsumer(url, token, topic, subscriptionName, subType, pulsar.SubscriptionPositionEarliest)
if err != nil {
return model.NewPulsarMessages(size), err
}
if strings.HasPrefix(subscriptionName, model.NonResumable) {
defer consumer.Unsubscribe()
}
defer consumer.Close()
defer client.Close()

messages := model.NewPulsarMessages(size)
consumChan := consumer.Chan()
for i := 0; i < size; i++ {
select {
case msg := <-consumChan:
// log.Infof("received message %s on topic %s", string(msg.Payload()), msg.Topic())
messages.AddPulsarMessage(msg)
consumer.Ack(msg)

case <-time.After(time.Duration(perMessageTimeoutMs) * time.Millisecond): //TODO: this should be configurable
i = size
}
}

return messages, nil
}
57 changes: 57 additions & 0 deletions src/model/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package model

import (
"fmt"
"time"

"github.com/apache/pulsar-client-go/pulsar"
)

// PulsarMessage is the Pulsar Message type
type PulsarMessage struct {
Payload []byte `json:"payload"`
Topic string `json:"topic"`
EventTime time.Time `json:"eventTime"`
PublishTime time.Time `json:"publishTime"`
MessageID string `json:"messageId"`
Key string `json:"key"`
}

// PulsarMessages encapsulates a list of messages to be returned to a client
type PulsarMessages struct {
Limit int `json:"limit"`
Size int `json:"size"`
Messages []PulsarMessage `json:"messages"`
}

// NewPulsarMessages create a PulsarMessages object
func NewPulsarMessages(initSize int) PulsarMessages {
return PulsarMessages{
Limit: initSize,
Size: 0,
Messages: make([]PulsarMessage, 0),
}
}

// AddPulsarMessage adds a Pulsar Message to the payload, return true if reaches capacity
func (msgs *PulsarMessages) AddPulsarMessage(msg pulsar.Message) bool {
if msgs.Size >= msgs.Limit {
return true
}
msgs.Messages = append(msgs.Messages, PulsarMessage{
Payload: msg.Payload(),
Topic: msg.Topic(),
EventTime: msg.EventTime(),
PublishTime: msg.PublishTime(),
MessageID: fmt.Sprintf("%+v", msg.ID()),
Key: msg.Key(),
})
msgs.Size++

return msgs.Size >= msgs.Limit
}

// IsEmpty checks if the message list is empty
func (msgs *PulsarMessages) IsEmpty() bool {
return msgs.Size == 0
}
58 changes: 48 additions & 10 deletions src/route/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func TokenSubjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
w.Write(respJSON)
w.WriteHeader(http.StatusOK)
}
return
}
Expand Down Expand Up @@ -106,15 +105,55 @@ func ReceiveHandler(w http.ResponseWriter, r *http.Request) {
return
}

// recoverHandler a function recovers from panic
func recoverHandler(r *http.Request) {
if r := recover(); r != nil {
fmt.Printf("Recovered in http handler crash %v", r)
} else {
fmt.Printf("exit http handler")
}
}

// PollHandler polls messages from a Pulsar topic.
func PollHandler(w http.ResponseWriter, r *http.Request) {
defer recoverHandler(r)

u, _ := url.Parse(r.URL.String())
params := u.Query()
token, topicFN, pulsarURL, subName, _, subType, err := ConsumerConfigFromHTTPParts(util.AllowedPulsarURLs, &r.Header, mux.Vars(r), params)
if err != nil {
util.ResponseErrorJSON(err, w, http.StatusUnprocessableEntity)
return
}
w.Header().Set("Content-Type", "application/json; charset=UTF-8")

size := util.QueryParamInt(params, "batch", 10)
perMessageTimeoutMs := util.QueryParamInt(params, "perMessageTimeoutMs", 300)

// subscription initial position is always set to earliest since this is short poll
msgs, err := broker.PollBatchMessages(pulsarURL, token, topicFN, subName, subType, size, perMessageTimeoutMs)
if err != nil {
util.ResponseErrorJSON(err, w, http.StatusInternalServerError)
return
}

if msgs.IsEmpty() {
w.WriteHeader(http.StatusNoContent)
return
}

data, err := json.Marshal(msgs)
if err != nil {
util.ResponseErrorJSON(err, w, http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write(data)
}

// SSEHandler is the HTTP SSE handler
func SSEHandler(w http.ResponseWriter, r *http.Request) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Recovered in SSEHandler %v", r)
} else {
fmt.Printf("exit SSEHandler()")
}
}()
defer recoverHandler(r)

u, _ := url.Parse(r.URL.String())
params := u.Query()
Expand Down Expand Up @@ -189,7 +228,6 @@ func GetTopicHandler(w http.ResponseWriter, r *http.Request) {
util.ResponseErrorJSON(err, w, http.StatusInternalServerError)
} else {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusOK)
w.Write(resJSON)
}

Expand All @@ -199,6 +237,7 @@ func GetTopicHandler(w http.ResponseWriter, r *http.Request) {
func UpdateTopicHandler(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
defer r.Body.Close()
w.Header().Set("Content-Type", "application/json; charset=UTF-8")

var doc model.TopicConfig
err := decoder.Decode(&doc)
Expand Down Expand Up @@ -270,7 +309,6 @@ func DeleteTopicHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
} else {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusOK)
w.Write(resJSON)
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/route/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ var ReceiverRoutes = Routes{
SSEHandler,
middleware.AuthVerifyJWT,
},
Route{
"poll-messages",
http.MethodGet,
"/v2/poll/{persistent}/{tenant}/{namespace}/{topic}",
PollHandler,
middleware.AuthVerifyJWT,
},
}

// RestRoutes definition
Expand Down
14 changes: 14 additions & 0 deletions src/unit-test/model_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package tests

import (
"testing"

. "github.com/kafkaesque-io/pulsar-beam/src/model"
)

func TestPulsarMessages(t *testing.T) {

messages := NewPulsarMessages(10)
equals(t, messages.Limit, 10)
equals(t, messages.IsEmpty(), true)
}
Loading

0 comments on commit d9b4b6a

Please sign in to comment.