Skip to content

Commit

Permalink
feat: inject pubsub to event handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ucpr committed Dec 15, 2023
1 parent aee4d93 commit 52dcea9
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 12 deletions.
24 changes: 24 additions & 0 deletions cmd/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"context"
"log/slog"

"github.com/ucpr/mongo-streamer/internal/log"
"github.com/ucpr/mongo-streamer/internal/pubsub"
)

type EventHandler struct {
pubsub pubsub.Publisher
}

func NewEventHandler(ps pubsub.Publisher) *EventHandler {
return &EventHandler{
pubsub: ps,
}
}

func (e *EventHandler) EventHandler(ctx context.Context, event []byte) error {
log.Info("event", slog.String("event", string(event)))
return nil
}
10 changes: 2 additions & 8 deletions cmd/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"log/slog"
"time"

"github.com/ucpr/mongo-streamer/internal/config"
Expand All @@ -17,13 +16,13 @@ type Streamer struct {
st persistent.StorageBuffer
}

func NewStreamer(ctx context.Context, cli *mongo.Client, mcfg *config.MongoDB) (*Streamer, error) {
func NewStreamer(ctx context.Context, cli *mongo.Client, mcfg *config.MongoDB, eh *EventHandler) (*Streamer, error) {
stLog := persistent.NewLogWriter()
st, err := persistent.NewBuffer(10, 5*time.Second, stLog)
if err != nil {
return nil, err
}
cs, err := mongo.NewChangeStream(ctx, cli, mcfg.Database, mcfg.Collection, eventHandler, st)
cs, err := mongo.NewChangeStream(ctx, cli, mcfg.Database, mcfg.Collection, eh.EventHandler, st)
if err != nil {
return nil, err
}
Expand All @@ -35,11 +34,6 @@ func NewStreamer(ctx context.Context, cli *mongo.Client, mcfg *config.MongoDB) (
}, nil
}

func eventHandler(ctx context.Context, event []byte) error {
log.Info("event", slog.String("event", string(event)))
return nil
}

func (s *Streamer) Stream(ctx context.Context) {
go func() {
s.st.Watch(ctx)
Expand Down
3 changes: 3 additions & 0 deletions cmd/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (
"github.com/ucpr/mongo-streamer/internal/config"
"github.com/ucpr/mongo-streamer/internal/http"
"github.com/ucpr/mongo-streamer/internal/mongo"
"github.com/ucpr/mongo-streamer/internal/pubsub"
)

func injectStreamer(ctx context.Context) (*Streamer, error) {
wire.Build(
config.Set,
mongo.Set,
pubsub.Set,
NewStreamer,
NewEventHandler,
)
return nil, nil
}
Expand Down
12 changes: 11 additions & 1 deletion cmd/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions internal/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ import (
"time"

"cloud.google.com/go/pubsub"
"github.com/google/wire"
"github.com/ucpr/mongo-streamer/internal/config"
)

//golint:gochecknoglobals
var Set = wire.NewSet(
wire.Bind(new(Publisher), new(*PubSubPublisher)),
NewPublisher,
)

const (
Expand Down Expand Up @@ -36,13 +44,13 @@ type PubSubPublisher struct {
var _ Publisher = (*PubSubPublisher)(nil)

// NewPublisher creates a new publisher.
func NewPublisher(ctx context.Context, projectID, topicID string) (*PubSubPublisher, error) {
cli, err := pubsub.NewClient(ctx, projectID)
func NewPublisher(ctx context.Context, cfg *config.PubSub) (*PubSubPublisher, error) {
cli, err := pubsub.NewClient(ctx, cfg.ProjectID)
if err != nil {
return nil, err
}

topic := cli.Topic(topicID)
topic := cli.Topic(cfg.TopicID)
// Set the default values for PublishSettings.
topic.PublishSettings.ByteThreshold = publisherByteThreshold
topic.PublishSettings.CountThreshold = publisherCountThreshold
Expand Down

0 comments on commit 52dcea9

Please sign in to comment.