diff --git a/internal/events/service_stream.go b/internal/events/service_stream.go index f556bd9..dcda893 100644 --- a/internal/events/service_stream.go +++ b/internal/events/service_stream.go @@ -33,7 +33,7 @@ import ( type streamService struct { transformConfig *models.TransformConfig - topic *pubsub.Topic + topic stream.Publisher } func newStreamService(logger log.Logger, transformConfig *models.TransformConfig, cfg *service.EventsStream) (*streamService, error) { @@ -70,7 +70,7 @@ func (ss *streamService) Send(evt models.Event) error { Body: bs, }) if err != nil { - return fmt.Errorf("error emitting %s: %v", evt.Type, err) + return fmt.Errorf("error emitting %s: %w", evt.Type, err) } return nil } diff --git a/internal/incoming/stream/publisher.go b/internal/incoming/stream/publisher.go index af7fbc1..53ffba4 100644 --- a/internal/incoming/stream/publisher.go +++ b/internal/incoming/stream/publisher.go @@ -18,6 +18,7 @@ import ( "fmt" "net/url" + "github.com/Shopify/sarama" "github.com/moov-io/achgateway/internal/kafka" "github.com/moov-io/achgateway/internal/service" "github.com/moov-io/base/log" @@ -26,7 +27,12 @@ import ( _ "gocloud.dev/pubsub/mempubsub" ) -func Topic(logger log.Logger, cfg *service.Config) (*pubsub.Topic, error) { +type Publisher interface { + Send(ctx context.Context, m *pubsub.Message) error + Shutdown(ctx context.Context) error +} + +func Topic(logger log.Logger, cfg *service.Config) (Publisher, error) { if cfg.Inbound.InMem != nil { // Strip away any query params. They're only supported by subscriptions u, err := url.Parse(cfg.Inbound.InMem.URL) @@ -38,7 +44,35 @@ func Topic(logger log.Logger, cfg *service.Config) (*pubsub.Topic, error) { return pubsub.OpenTopic(context.Background(), addr) } if cfg.Inbound.Kafka != nil { - return kafka.OpenTopic(logger, cfg.Inbound.Kafka) + topic, err := kafka.OpenTopic(logger, cfg.Inbound.Kafka) + if err != nil { + return nil, fmt.Errorf("creating topic: %w", err) + } + return &kafkaProducer{topic: topic}, nil } return nil, nil } + +type kafkaProducer struct { + topic *pubsub.Topic +} + +func (kp *kafkaProducer) Send(ctx context.Context, m *pubsub.Message) error { + err := kp.topic.Send(ctx, m) + if err != nil { + var producerError sarama.ProducerError + if kp.topic.ErrorAs(err, &producerError) { + return fmt.Errorf("producer error sending message: %w", producerError) + } + var producerErrors sarama.ProducerErrors + if kp.topic.ErrorAs(err, &producerErrors) { + return fmt.Errorf("producer errors sending message: %w", producerErrors) + } + return fmt.Errorf("error sending message: %w", err) + } + return nil +} + +func (kp *kafkaProducer) Shutdown(ctx context.Context) error { + return kp.topic.Shutdown(ctx) +} diff --git a/internal/incoming/stream/publisher_test.go b/internal/incoming/stream/publisher_test.go index 094ee82..90b1a2d 100644 --- a/internal/incoming/stream/publisher_test.go +++ b/internal/incoming/stream/publisher_test.go @@ -44,7 +44,7 @@ func TestStream(t *testing.T) { } } -func send(ctx context.Context, t *pubsub.Topic, body string) *pubsub.Message { +func send(ctx context.Context, t Publisher, body string) *pubsub.Message { msg := &pubsub.Message{ Body: []byte(body), Metadata: make(map[string]string), diff --git a/internal/incoming/stream/streamtest/streamtest.go b/internal/incoming/stream/streamtest/streamtest.go index c2a10c3..44ff510 100644 --- a/internal/incoming/stream/streamtest/streamtest.go +++ b/internal/incoming/stream/streamtest/streamtest.go @@ -32,7 +32,7 @@ import ( "gocloud.dev/pubsub" ) -func InmemStream(t *testing.T) (*pubsub.Topic, stream.Subscription) { +func InmemStream(t *testing.T) (stream.Publisher, stream.Subscription) { t.Helper() n, _ := rand.Int(rand.Reader, big.NewInt(10000)) diff --git a/internal/incoming/stream/subscription.go b/internal/incoming/stream/subscription.go index 499c57b..6aac559 100644 --- a/internal/incoming/stream/subscription.go +++ b/internal/incoming/stream/subscription.go @@ -19,7 +19,9 @@ package stream import ( "context" + "fmt" + "github.com/Shopify/sarama" "github.com/moov-io/achgateway/internal/kafka" "github.com/moov-io/achgateway/internal/service" "github.com/moov-io/base/log" @@ -47,7 +49,31 @@ func OpenSubscription(logger log.Logger, cfg *service.Config) (Subscription, err return nil, err } logger.Info().Logf("setup %T kafka subscription", sub) - return sub, nil + return &kafkaSubscription{sub: sub}, nil } return nil, nil } + +type kafkaSubscription struct { + sub *pubsub.Subscription +} + +func (ks *kafkaSubscription) Receive(ctx context.Context) (*pubsub.Message, error) { + msg, err := ks.sub.Receive(ctx) + if err != nil { + var consumerError sarama.ConsumerError + if ks.sub.ErrorAs(err, &consumerError) { + return msg, fmt.Errorf("consumer error receiving message: %w", consumerError) + } + var consumerErrors sarama.ConsumerErrors + if ks.sub.ErrorAs(err, &consumerErrors) { + return msg, fmt.Errorf("consumer errors receiving message: %w", consumerErrors) + } + return msg, fmt.Errorf("error receiving message: %w", err) + } + return msg, nil +} + +func (ks *kafkaSubscription) Shutdown(ctx context.Context) error { + return ks.sub.Shutdown(ctx) +} diff --git a/internal/incoming/web/api_files.go b/internal/incoming/web/api_files.go index 3b35c7e..a93620e 100644 --- a/internal/incoming/web/api_files.go +++ b/internal/incoming/web/api_files.go @@ -26,6 +26,7 @@ import ( "github.com/moov-io/ach" "github.com/moov-io/achgateway/internal/incoming" + "github.com/moov-io/achgateway/internal/incoming/stream" "github.com/moov-io/achgateway/internal/service" "github.com/moov-io/achgateway/pkg/compliance" "github.com/moov-io/achgateway/pkg/models" @@ -35,7 +36,7 @@ import ( "gocloud.dev/pubsub" ) -func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub *pubsub.Topic) *FilesController { +func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub stream.Publisher) *FilesController { return &FilesController{ logger: logger, cfg: cfg, @@ -46,7 +47,7 @@ func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub *pubsub.T type FilesController struct { logger log.Logger cfg service.HTTPConfig - publisher *pubsub.Topic + publisher stream.Publisher } func (c *FilesController) AppendRoutes(router *mux.Router) *mux.Router {