Skip to content

Commit

Permalink
Merge pull request #172 from moov-io/stream-extract-errors
Browse files Browse the repository at this point in the history
stream: try to extract consumer and producer errors from sarama
  • Loading branch information
adamdecaf authored Apr 20, 2023
2 parents ec57731 + 9a13680 commit ffc226d
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 9 deletions.
4 changes: 2 additions & 2 deletions internal/events/service_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

type streamService struct {
transformConfig *models.TransformConfig
topic *pubsub.Topic
topic stream.Publisher
}

func newStreamService(logger log.Logger, transformConfig *models.TransformConfig, cfg *service.EventsStream) (*streamService, error) {
Expand Down Expand Up @@ -70,7 +70,7 @@ func (ss *streamService) Send(evt models.Event) error {
Body: bs,
})
if err != nil {
return fmt.Errorf("error emitting %s: %v", evt.Type, err)
return fmt.Errorf("error emitting %s: %w", evt.Type, err)
}
return nil
}
38 changes: 36 additions & 2 deletions internal/incoming/stream/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net/url"

"github.com/Shopify/sarama"
"github.com/moov-io/achgateway/internal/kafka"
"github.com/moov-io/achgateway/internal/service"
"github.com/moov-io/base/log"
Expand All @@ -26,7 +27,12 @@ import (
_ "gocloud.dev/pubsub/mempubsub"
)

func Topic(logger log.Logger, cfg *service.Config) (*pubsub.Topic, error) {
type Publisher interface {
Send(ctx context.Context, m *pubsub.Message) error
Shutdown(ctx context.Context) error
}

func Topic(logger log.Logger, cfg *service.Config) (Publisher, error) {
if cfg.Inbound.InMem != nil {
// Strip away any query params. They're only supported by subscriptions
u, err := url.Parse(cfg.Inbound.InMem.URL)
Expand All @@ -38,7 +44,35 @@ func Topic(logger log.Logger, cfg *service.Config) (*pubsub.Topic, error) {
return pubsub.OpenTopic(context.Background(), addr)
}
if cfg.Inbound.Kafka != nil {
return kafka.OpenTopic(logger, cfg.Inbound.Kafka)
topic, err := kafka.OpenTopic(logger, cfg.Inbound.Kafka)
if err != nil {
return nil, fmt.Errorf("creating topic: %w", err)
}
return &kafkaProducer{topic: topic}, nil
}
return nil, nil
}

type kafkaProducer struct {
topic *pubsub.Topic
}

func (kp *kafkaProducer) Send(ctx context.Context, m *pubsub.Message) error {
err := kp.topic.Send(ctx, m)
if err != nil {
var producerError sarama.ProducerError
if kp.topic.ErrorAs(err, &producerError) {
return fmt.Errorf("producer error sending message: %w", producerError)
}
var producerErrors sarama.ProducerErrors
if kp.topic.ErrorAs(err, &producerErrors) {
return fmt.Errorf("producer errors sending message: %w", producerErrors)
}
return fmt.Errorf("error sending message: %w", err)
}
return nil
}

func (kp *kafkaProducer) Shutdown(ctx context.Context) error {
return kp.topic.Shutdown(ctx)
}
2 changes: 1 addition & 1 deletion internal/incoming/stream/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestStream(t *testing.T) {
}
}

func send(ctx context.Context, t *pubsub.Topic, body string) *pubsub.Message {
func send(ctx context.Context, t Publisher, body string) *pubsub.Message {
msg := &pubsub.Message{
Body: []byte(body),
Metadata: make(map[string]string),
Expand Down
2 changes: 1 addition & 1 deletion internal/incoming/stream/streamtest/streamtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"gocloud.dev/pubsub"
)

func InmemStream(t *testing.T) (*pubsub.Topic, stream.Subscription) {
func InmemStream(t *testing.T) (stream.Publisher, stream.Subscription) {
t.Helper()

n, _ := rand.Int(rand.Reader, big.NewInt(10000))
Expand Down
28 changes: 27 additions & 1 deletion internal/incoming/stream/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package stream

import (
"context"
"fmt"

"github.com/Shopify/sarama"
"github.com/moov-io/achgateway/internal/kafka"
"github.com/moov-io/achgateway/internal/service"
"github.com/moov-io/base/log"
Expand Down Expand Up @@ -47,7 +49,31 @@ func OpenSubscription(logger log.Logger, cfg *service.Config) (Subscription, err
return nil, err
}
logger.Info().Logf("setup %T kafka subscription", sub)
return sub, nil
return &kafkaSubscription{sub: sub}, nil
}
return nil, nil
}

type kafkaSubscription struct {
sub *pubsub.Subscription
}

func (ks *kafkaSubscription) Receive(ctx context.Context) (*pubsub.Message, error) {
msg, err := ks.sub.Receive(ctx)
if err != nil {
var consumerError sarama.ConsumerError
if ks.sub.ErrorAs(err, &consumerError) {
return msg, fmt.Errorf("consumer error receiving message: %w", consumerError)
}
var consumerErrors sarama.ConsumerErrors
if ks.sub.ErrorAs(err, &consumerErrors) {
return msg, fmt.Errorf("consumer errors receiving message: %w", consumerErrors)
}
return msg, fmt.Errorf("error receiving message: %w", err)
}
return msg, nil
}

func (ks *kafkaSubscription) Shutdown(ctx context.Context) error {
return ks.sub.Shutdown(ctx)
}
5 changes: 3 additions & 2 deletions internal/incoming/web/api_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/moov-io/ach"
"github.com/moov-io/achgateway/internal/incoming"
"github.com/moov-io/achgateway/internal/incoming/stream"
"github.com/moov-io/achgateway/internal/service"
"github.com/moov-io/achgateway/pkg/compliance"
"github.com/moov-io/achgateway/pkg/models"
Expand All @@ -35,7 +36,7 @@ import (
"gocloud.dev/pubsub"
)

func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub *pubsub.Topic) *FilesController {
func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub stream.Publisher) *FilesController {
return &FilesController{
logger: logger,
cfg: cfg,
Expand All @@ -46,7 +47,7 @@ func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub *pubsub.T
type FilesController struct {
logger log.Logger
cfg service.HTTPConfig
publisher *pubsub.Topic
publisher stream.Publisher
}

func (c *FilesController) AppendRoutes(router *mux.Router) *mux.Router {
Expand Down

0 comments on commit ffc226d

Please sign in to comment.