Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer committed Jan 17, 2025
1 parent e55e0d5 commit 979a663
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 37 deletions.
47 changes: 26 additions & 21 deletions extensions/impl/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,14 @@ type KafkaSink struct {
LastStats kafkago.WriterStats
}

func (k *KafkaSink) ConfigureBatch(batchSize int, lingerInterval time.Duration) {
k.kc.WriterConf.BatchSize = batchSize
k.kc.WriterConf.BatchTimeout = lingerInterval
}

type kafkaConf struct {
Brokers string `json:"brokers"`
Topic string `json:"topic"`
MaxAttempts int `json:"maxAttempts"`
RequiredACKs int `json:"requiredACKs"`
Key string `json:"key"`
Headers interface{} `json:"headers"`
WriterConf kafkaWriterConf `json:"writerConf"`
kafkaWriterConf
Brokers string `json:"brokers"`
Topic string `json:"topic"`
MaxAttempts int `json:"maxAttempts"`
RequiredACKs int `json:"requiredACKs"`
Key string `json:"key"`
Headers interface{} `json:"headers"`

// write config
Compression string `json:"compression"`
Expand All @@ -80,7 +75,7 @@ func (c *kafkaConf) validate() error {

func (k *KafkaSink) Provision(ctx api.StreamContext, configs map[string]any) error {
c := getDefaultKafkaConf()
err := cast.MapToStruct(configs, c)
err := c.configure(configs)
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), castConfErr)
})
Expand Down Expand Up @@ -160,9 +155,9 @@ func (k *KafkaSink) buildKafkaWriter() error {
AllowAutoTopicCreation: true,
MaxAttempts: k.kc.MaxAttempts,
RequiredAcks: kafkago.RequiredAcks(k.kc.RequiredACKs),
BatchSize: k.kc.WriterConf.BatchSize,
BatchBytes: k.kc.WriterConf.BatchBytes,
BatchTimeout: k.kc.WriterConf.BatchTimeout,
BatchSize: k.kc.BatchSize,
BatchBytes: k.kc.BatchBytes,
BatchTimeout: k.kc.BatchTimeout,
Transport: &kafkago.Transport{
SASL: k.mechanism,
TLS: k.tlsConfig,
Expand Down Expand Up @@ -348,11 +343,21 @@ func getDefaultKafkaConf() *kafkaConf {
c := &kafkaConf{
RequiredACKs: -1,
MaxAttempts: 1,
WriterConf: kafkaWriterConf{
BatchSize: 100,
BatchTimeout: time.Millisecond,
BatchBytes: 1048576,
},
}
c.kafkaWriterConf = kafkaWriterConf{
BatchSize: 100,
BatchTimeout: time.Microsecond,
BatchBytes: 10485760, // 10MB
}
return c
}

func (kc *kafkaConf) configure(props map[string]interface{}) error {
if err := cast.MapToStruct(props, kc); err != nil {
return err
}
if err := cast.MapToStruct(props, &kc.kafkaWriterConf); err != nil {
return err
}
return nil
}
22 changes: 6 additions & 16 deletions internal/topo/planner/planner_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,27 +166,17 @@ func findTemplateProps(props map[string]any) []string {
return result
}

type BatchsinkAble interface {
ConfigureBatch(batchSize int, lingerInterval time.Duration)
}

// Split sink node according to the sink configuration. Return the new input emitters.
func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOption, sc *node.SinkConf, templates []string) ([]node.TopNode, error) {
index := 0
result := make([]node.TopNode, 0)
// Batch enabled
bs, ok := s.(BatchsinkAble)
if ok {
bs.ConfigureBatch(sc.BatchSize, time.Duration(sc.LingerInterval))
} else {
if sc.BatchSize > 0 || sc.LingerInterval > 0 {
batchOp, err := node.NewBatchOp(fmt.Sprintf("%s_%d_batch", sinkName, index), options, sc.BatchSize, time.Duration(sc.LingerInterval))
if err != nil {
return nil, err
}
index++
result = append(result, batchOp)
if sc.BatchSize > 0 || sc.LingerInterval > 0 {
batchOp, err := node.NewBatchOp(fmt.Sprintf("%s_%d_batch", sinkName, index), options, sc.BatchSize, time.Duration(sc.LingerInterval))
if err != nil {
return nil, err
}
index++
result = append(result, batchOp)
}
// Transform enabled
// Currently, the row to map is done here and is required. TODO: eliminate map and this could become optional
Expand Down

0 comments on commit 979a663

Please sign in to comment.