Skip to content

Commit

Permalink
Add configuration parameters for rdkafka config
Browse files Browse the repository at this point in the history
Signed-off-by: Tomáš Novák <[email protected]>
  • Loading branch information
MioOgbeni committed Jan 6, 2025
1 parent f799f61 commit a692a76
Showing 1 changed file with 83 additions and 4 deletions.
87 changes: 83 additions & 4 deletions lib/fluent/plugin/in_rdkafka_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ class Fluent::Plugin::RdKafkaGroupInput < Fluent::Plugin::Input

helpers :thread, :parser, :compat_parameters

config_param :brokers, :string, :default => 'localhost:9092',
:desc => <<-DESC
Set brokers directly:
<broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
Brokers: you can choose to use either brokers or zookeeper.
DESC

config_param :group_id, :string, :default => 'fluentd',
:desc => "A group id for the consumer."

config_param :topics, :string,
:desc => "Listening topics(separate with comma',')."

Expand Down Expand Up @@ -47,8 +57,27 @@ class Fluent::Plugin::RdKafkaGroupInput < Fluent::Plugin::Input
config_param :max_batch_size, :integer, :default => 10000,
:desc => "Maximum number of log lines emitted in a single batch."

config_param :kafka_configs, :hash, :default => {},
:desc => "Kafka configuration properties as desribed in https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md"
config_param :idempotent, :bool, :default => false, :desc => 'Enable idempotent producer'

config_param :max_send_retries, :integer, :default => 2,
:desc => "Number of times to retry sending of messages to a leader. Used for message.send.max.retries"
config_param :required_acks, :integer, :default => -1,
:desc => "The number of acks required per request. Used for request.required.acks"
config_param :ack_timeout, :time, :default => nil,
:desc => "How long the producer waits for acks. Used for request.timeout.ms"
config_param :compression_codec, :string, :default => nil,
:desc => <<-DESC
The codec the producer uses to compress messages. Used for compression.codec
Supported codecs: (gzip|snappy)
DESC

config_param :rdkafka_buffering_max_ms, :integer, :default => nil, :desc => 'Used for queue.buffering.max.ms'
config_param :rdkafka_buffering_max_messages, :integer, :default => nil, :desc => 'Used for queue.buffering.max.messages'
config_param :rdkafka_message_max_bytes, :integer, :default => nil, :desc => 'Used for message.max.bytes'
config_param :rdkafka_message_max_num, :integer, :default => nil, :desc => 'Used for batch.num.messages'

config_param :rdkafka_options, :hash, :default => {},
:desc => "Set any rdkafka configuration. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md"

config_section :parse do
config_set_default :@type, 'json'
Expand Down Expand Up @@ -91,7 +120,7 @@ def configure(conf)
log.warn "The in_rdkafka_group consumer was not yet tested under heavy production load. Use it at your own risk!"

log.info "Will watch for topics #{@topics} at brokers " \
"#{@kafka_configs["bootstrap.servers"]} and '#{@kafka_configs["group.id"]}' group"
"#{@brokers} and '#{@group_id}' group"

@topics = _config_to_array(@topics)

Expand Down Expand Up @@ -141,6 +170,55 @@ def setup_parser(parser_conf)
end
end

def build_config
config = {:"bootstrap.servers" => @brokers}

if @ssl_ca_cert && @ssl_ca_cert[0]
ssl = true
config[:"ssl.ca.location"] = @ssl_ca_cert[0]
config[:"ssl.certificate.location"] = @ssl_client_cert if @ssl_client_cert
config[:"ssl.key.location"] = @ssl_client_cert_key if @ssl_client_cert_key
config[:"ssl.key.password"] = @ssl_client_cert_key_password if @ssl_client_cert_key_password
end

if @principal
sasl = true
config[:"sasl.mechanisms"] = "GSSAPI"
config[:"sasl.kerberos.principal"] = @principal
config[:"sasl.kerberos.service.name"] = @service_name if @service_name
config[:"sasl.kerberos.keytab"] = @keytab if @keytab
end

if ssl && sasl
security_protocol = "SASL_SSL"
elsif ssl && !sasl
security_protocol = "SSL"
elsif !ssl && sasl
security_protocol = "SASL_PLAINTEXT"
else
security_protocol = "PLAINTEXT"
end
config[:"security.protocol"] = security_protocol

config[:"compression.codec"] = @compression_codec if @compression_codec
config[:"message.send.max.retries"] = @max_send_retries if @max_send_retries
config[:"request.required.acks"] = @required_acks if @required_acks
config[:"request.timeout.ms"] = @ack_timeout * 1000 if @ack_timeout
config[:"queue.buffering.max.ms"] = @rdkafka_buffering_max_ms if @rdkafka_buffering_max_ms
config[:"queue.buffering.max.messages"] = @rdkafka_buffering_max_messages if @rdkafka_buffering_max_messages
config[:"message.max.bytes"] = @rdkafka_message_max_bytes if @rdkafka_message_max_bytes
config[:"batch.num.messages"] = @rdkafka_message_max_num if @rdkafka_message_max_num
config[:"sasl.username"] = @username if @username
config[:"sasl.password"] = @password if @password
config[:"enable.idempotence"] = @idempotent if @idempotent

@rdkafka_options.each { |k, v|
config[k.to_sym] = v
}

config
end

def start
super

Expand All @@ -161,7 +239,8 @@ def shutdown
end

def setup_consumer
consumer = Rdkafka::Config.new(@kafka_configs).consumer
@config = build_config
consumer = Rdkafka::Config.new(config).consumer
consumer.subscribe(*@topics)
consumer
end
Expand Down

0 comments on commit a692a76

Please sign in to comment.