Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nothing gets consumed from at_least_once example #678

Open
miladamery opened this issue May 2, 2024 · 2 comments
Open

Nothing gets consumed from at_least_once example #678

miladamery opened this issue May 2, 2024 · 2 comments

Comments

@miladamery
Copy link

miladamery commented May 2, 2024

Hi, im trying to consume messages from kafka with rust-rdkafka. i copy pasted at_least_once.rs code but nothing gets consumed from kafka. in console i get following logs:

rebalance: RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
tpl: TPL {ifbrlc-20230805/0: offset=Invalid metadata="", error=Ok(())}

and in my broker:

[2024-05-02 05:00:06,947] INFO [GroupCoordinator 3]: Assignment received from leader rdkafka-e3728e3e-11ae-43b1-a9dd-71716d3642b3 for group milad.group for generation 3. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:00:14,683] INFO [GroupCoordinator 3]: Dynamic member with unknown member id joins group milad.group in Stable state. Created a new member id rdkafka-e808bf6b-1acb-4a46-940c-9e3b48a0a8ac and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:00:51,954] INFO [GroupCoordinator 3]: Member rdkafka-e3728e3e-11ae-43b1-a9dd-71716d3642b3 in group milad.group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:00:51,955] INFO [GroupCoordinator 3]: Preparing to rebalance group milad.group in state PreparingRebalance with old generation 3 (__consumer_offsets-34) (reason: removing member rdkafka-e3728e3e-11ae-43b1-a9dd-71716d3642b3 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:00:51,957] INFO [GroupCoordinator 3]: Group milad.group with generation 4 is now empty (__consumer_offsets-34) (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:00:59,684] INFO [GroupCoordinator 3]: Pending member rdkafka-e808bf6b-1acb-4a46-940c-9e3b48a0a8ac in group milad.group has been removed after session timeout expiration. (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:03:58,331] INFO [GroupCoordinator 3]: Dynamic member with unknown member id joins group milad.group in Empty state. Created a new member id rdkafka-069fb102-d938-4916-a7b5-411c635d4c6c and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:03:58,332] INFO [GroupCoordinator 3]: Preparing to rebalance group milad.group in state PreparingRebalance with old generation 4 (__consumer_offsets-34) (reason: Adding new member rdkafka-069fb102-d938-4916-a7b5-411c635d4c6c with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:04:01,335] INFO [GroupCoordinator 3]: Stabilized group milad.group generation 5 (__consumer_offsets-34) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2024-05-02 05:04:01,341] INFO [GroupCoordinator 3]: Assignment received from leader rdkafka-069fb102-d938-4916-a7b5-411c635d4c6c for group milad.group for generation 5. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

here is my code:

use std::future;
use std::time::Duration;
use futures::StreamExt;
use rdkafka::{ClientConfig, ClientContext, Message, Statistics, TopicPartitionList};
use rdkafka::client::NativeClient;
use rdkafka::config::RDKafkaLogLevel;
use rdkafka::consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer};
use rdkafka::error::{KafkaError, KafkaResult};
use rdkafka::types::RDKafkaRespErr;

// A simple context to customize the consumer behavior and print a log line every time
// offsets are committed
struct LoggingConsumerContext;

impl ClientContext for LoggingConsumerContext {
    fn stats(&self, statistics: Statistics) {
        println!("Stats Received: {:?}", statistics);
    }

    fn error(&self, error: KafkaError, reason: &str) {
        println!("error: {}, reason: {}", error, reason);
    }
}

impl ConsumerContext for LoggingConsumerContext {
    fn rebalance(&self, native_client: &NativeClient, err: RDKafkaRespErr, tpl: &mut TopicPartitionList) {
        println!("rebalance: {:?}", err);
        println!("tpl: {:?}", tpl);
    }

    fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {
        println!("pre_rebalance: {:?}", rebalance);
    }

    fn post_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {
        println!("post_rebalance: {:?}", rebalance);
    }
    
    fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
        match result {
            Ok(_) => println!("Offsets committed successfully"),
            Err(e) => println!("Error while committing offsets: {}", e),
        };
    }


}

type LoggingConsumer = StreamConsumer<LoggingConsumerContext>;

fn create_consumer(brokers: &str, group_id: &str, topic: &str) -> LoggingConsumer {
    let context = LoggingConsumerContext;

    let consumer: LoggingConsumer = ClientConfig::new()
        .set("group.id", group_id)
        .set("bootstrap.servers", brokers)
        //.set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("max.poll.interval.ms", "20000")
        // Commit automatically every 5 seconds.
        .set("enable.auto.commit", "true")
        //.set("auto.commit.interval.ms", "5000")
        // but only commit the offsets explicitly stored via `consumer.store_offset`.
        //.set("enable.auto.offset.store", "false")
        //.set("auto.offset.reset", "earliest")
        .set_log_level(RDKafkaLogLevel::Debug)
        .create_with_context(context)
        .expect("Consumer creation failed");

    consumer
        .subscribe(&[topic])
        .expect("Can't subscribe to specified topic");

    consumer
}

#[tokio::main]
async fn main() {
    let consumer = create_consumer(
        "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094",
        "milad.group",
        "ifbrlc-20230805"
    );
    
    loop {
        match consumer.recv().await {
            Err(e) => {
                println!("Kafka error: {}", e);
            }
            Ok(m) => {
                println!("{:?}", m.payload());
            }
        }
    }
}

and this is my docker compose that runs kafka:

version: '2.1'

services:
  zoo1:
    image: confluentinc/cp-zookeeper:latest
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888


  kafka1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1

  kafka2:
    image: confluentinc/cp-kafka:latest
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9093"
      - "29093:29093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1


  kafka3:
    image: confluentinc/cp-kafka:latest
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9094"
      - "29094:29094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1

  kafka-ui:
    container_name: kafka-ui
    image: docker.arvancloud.ir/provectuslabs/kafka-ui:latest
    ports:
      - 8080:8080
    depends_on:
      - kafka1
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:19092

In the loop section of main nothing gets printed and even when i debug and set breakpoint on both Error and Ok section it doesnt stop.

my cargo.toml is :

[package]
name = "kafka-consumer-test"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rdkafka = "0.36.2"
tokio = {version = "1.37.0", features = ["full"]}
futures = "0.3.30"

What part Im doing wrong? How can i fix this?

@buraktabn
Copy link

Did you manage to solve this?

@miladamery
Copy link
Author

@buraktabn
I can't say I solved this issue. But creating a custom consumer won't consume anything. but if you use default consumer of rdkafka it will work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants