diff --git a/extensions/impl/kafka/sink.go b/extensions/impl/kafka/sink.go index 3801fbfa86..fe1384ad20 100644 --- a/extensions/impl/kafka/sink.go +++ b/extensions/impl/kafka/sink.go @@ -44,19 +44,14 @@ type KafkaSink struct { LastStats kafkago.WriterStats } -func (k *KafkaSink) ConfigureBatch(batchSize int, lingerInterval time.Duration) { - k.kc.WriterConf.BatchSize = batchSize - k.kc.WriterConf.BatchTimeout = lingerInterval -} - type kafkaConf struct { - Brokers string `json:"brokers"` - Topic string `json:"topic"` - MaxAttempts int `json:"maxAttempts"` - RequiredACKs int `json:"requiredACKs"` - Key string `json:"key"` - Headers interface{} `json:"headers"` - WriterConf kafkaWriterConf `json:"writerConf"` + kafkaWriterConf + Brokers string `json:"brokers"` + Topic string `json:"topic"` + MaxAttempts int `json:"maxAttempts"` + RequiredACKs int `json:"requiredACKs"` + Key string `json:"key"` + Headers interface{} `json:"headers"` // write config Compression string `json:"compression"` @@ -80,7 +75,7 @@ func (c *kafkaConf) validate() error { func (k *KafkaSink) Provision(ctx api.StreamContext, configs map[string]any) error { c := getDefaultKafkaConf() - err := cast.MapToStruct(configs, c) + err := c.configure(configs) failpoint.Inject("kafkaErr", func(val failpoint.Value) { err = mockKakfaSourceErr(val.(int), castConfErr) }) @@ -160,9 +155,9 @@ func (k *KafkaSink) buildKafkaWriter() error { AllowAutoTopicCreation: true, MaxAttempts: k.kc.MaxAttempts, RequiredAcks: kafkago.RequiredAcks(k.kc.RequiredACKs), - BatchSize: k.kc.WriterConf.BatchSize, - BatchBytes: k.kc.WriterConf.BatchBytes, - BatchTimeout: k.kc.WriterConf.BatchTimeout, + BatchSize: k.kc.BatchSize, + BatchBytes: k.kc.BatchBytes, + BatchTimeout: k.kc.BatchTimeout, Transport: &kafkago.Transport{ SASL: k.mechanism, TLS: k.tlsConfig, @@ -348,11 +343,21 @@ func getDefaultKafkaConf() *kafkaConf { c := &kafkaConf{ RequiredACKs: -1, MaxAttempts: 1, - WriterConf: kafkaWriterConf{ - BatchSize: 100, - BatchTimeout: time.Millisecond, - BatchBytes: 1048576, - }, + } + c.kafkaWriterConf = kafkaWriterConf{ + BatchSize: 100, + BatchTimeout: time.Microsecond, + BatchBytes: 10485760, // 10MB } return c } + +func (kc *kafkaConf) configure(props map[string]interface{}) error { + if err := cast.MapToStruct(props, kc); err != nil { + return err + } + if err := cast.MapToStruct(props, &kc.kafkaWriterConf); err != nil { + return err + } + return nil +} diff --git a/internal/topo/planner/planner_sink.go b/internal/topo/planner/planner_sink.go index 792fc1bd8b..35e8a23318 100644 --- a/internal/topo/planner/planner_sink.go +++ b/internal/topo/planner/planner_sink.go @@ -166,27 +166,17 @@ func findTemplateProps(props map[string]any) []string { return result } -type BatchsinkAble interface { - ConfigureBatch(batchSize int, lingerInterval time.Duration) -} - // Split sink node according to the sink configuration. Return the new input emitters. func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOption, sc *node.SinkConf, templates []string) ([]node.TopNode, error) { index := 0 result := make([]node.TopNode, 0) - // Batch enabled - bs, ok := s.(BatchsinkAble) - if ok { - bs.ConfigureBatch(sc.BatchSize, time.Duration(sc.LingerInterval)) - } else { - if sc.BatchSize > 0 || sc.LingerInterval > 0 { - batchOp, err := node.NewBatchOp(fmt.Sprintf("%s_%d_batch", sinkName, index), options, sc.BatchSize, time.Duration(sc.LingerInterval)) - if err != nil { - return nil, err - } - index++ - result = append(result, batchOp) + if sc.BatchSize > 0 || sc.LingerInterval > 0 { + batchOp, err := node.NewBatchOp(fmt.Sprintf("%s_%d_batch", sinkName, index), options, sc.BatchSize, time.Duration(sc.LingerInterval)) + if err != nil { + return nil, err } + index++ + result = append(result, batchOp) } // Transform enabled // Currently, the row to map is done here and is required. TODO: eliminate map and this could become optional