Skip to content

Commit

Permalink
[vpj] Replace org.apache.kafka.common.TopicPartition with PubSubTopic…
Browse files Browse the repository at this point in the history
…Partition in VPJ (#1460)

Replace the usage of Kafka TopicPartition with Venice PubSubOffset instances to enable support for non-Kafka   
pub-sub clients. This change is part of the broader effort to decouple Venice from Kafka-specific classes and  
ensure compatibility with other pub-sub systems.
  • Loading branch information
sushantmane authored Jan 22, 2025
1 parent 3392d62 commit cd915b9
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -4173,7 +4172,7 @@ public boolean isPartitionConsumingOrHasPendingIngestionAction(int userPartition
}

/**
* Override the {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG} config with a remote Kafka bootstrap url.
* Override the {@link com.linkedin.venice.ConfigKeys#KAFKA_BOOTSTRAP_SERVERS} config with a remote Kafka bootstrap url.
*/
protected Properties createKafkaConsumerProperties(
Properties localConsumerProps,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ synchronized byte[] trainDict(Optional<PubSubConsumerAdapter> reusedConsumerOpti
// Get one split per partition
KafkaInputSplit[] splits = (KafkaInputSplit[]) kafkaInputFormat.getSplitsByRecordsPerSplit(jobConf, Long.MAX_VALUE);
// The following sort is trying to get a deterministic dict with the same input.
Arrays.sort(splits, Comparator.comparingInt(o -> o.getTopicPartition().partition()));
Arrays.sort(splits, Comparator.comparingInt(o -> o.getTopicPartition().getPartitionNumber()));
// Try to gather some records from each partition
PushJobZstdConfig zstdConfig = new PushJobZstdConfig(props, splits.length);
ZstdDictTrainer trainer = trainerSupplier.orElseGet(zstdConfig::getZstdDictTrainer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue;
import com.linkedin.venice.hadoop.mapreduce.datawriter.task.ReporterBackedMapReduceDataWriterTaskTracker;
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapterFactory;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.api.exceptions.PubSubOpTimeoutException;
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.pubsub.manager.TopicManagerContext;
Expand All @@ -30,7 +33,6 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.kafka.common.TopicPartition;


/**
Expand All @@ -49,7 +51,7 @@ public class KafkaInputFormat implements InputFormat<KafkaInputMapperKey, KafkaI
public static final long DEFAULT_KAFKA_INPUT_MAX_RECORDS_PER_MAPPER = 5000000L;
private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

protected Map<TopicPartition, Long> getLatestOffsets(JobConf config) {
protected Map<PubSubTopicPartition, Long> getLatestOffsets(JobConf config) {
VeniceProperties consumerProperties = KafkaInputUtils.getConsumerProperties(config);
TopicManagerContext topicManagerContext =
new TopicManagerContext.Builder().setPubSubPropertiesSupplier(k -> consumerProperties)
Expand All @@ -61,16 +63,16 @@ protected Map<TopicPartition, Long> getLatestOffsets(JobConf config) {
.build();
try (TopicManager topicManager =
new TopicManagerRepository(topicManagerContext, config.get(KAFKA_INPUT_BROKER_URL)).getLocalTopicManager()) {
String topic = config.get(KAFKA_INPUT_TOPIC);

PubSubTopic topic = pubSubTopicRepository.getTopic(config.get(KAFKA_INPUT_TOPIC));
Map<Integer, Long> latestOffsets = RetryUtils.executeWithMaxAttempt(
() -> topicManager.getTopicLatestOffsets(pubSubTopicRepository.getTopic(topic)),
() -> topicManager.getTopicLatestOffsets(topic),
10,
Duration.ofMinutes(1),
Arrays.asList(PubSubOpTimeoutException.class));
Map<TopicPartition, Long> partitionOffsetMap = new HashMap<>(latestOffsets.size());
Map<PubSubTopicPartition, Long> partitionOffsetMap = new HashMap<>(latestOffsets.size());
latestOffsets.forEach(
(partitionId, latestOffset) -> partitionOffsetMap.put(new TopicPartition(topic, partitionId), latestOffset));
(partitionId, latestOffset) -> partitionOffsetMap
.put(new PubSubTopicPartitionImpl(topic, partitionId), latestOffset));
return partitionOffsetMap;
}
}
Expand All @@ -92,7 +94,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
}

public InputSplit[] getSplitsByRecordsPerSplit(JobConf job, long maxRecordsPerSplit) {
Map<TopicPartition, Long> latestOffsets = getLatestOffsets(job);
Map<PubSubTopicPartition, Long> latestOffsets = getLatestOffsets(job);
List<InputSplit> splits = new LinkedList<>();
latestOffsets.forEach((topicPartition, end) -> {

Expand All @@ -103,7 +105,7 @@ public InputSplit[] getSplitsByRecordsPerSplit(JobConf job, long maxRecordsPerSp
long splitStart = 0;
while (splitStart < end) {
long splitEnd = Math.min(splitStart + maxRecordsPerSplit, end);
splits.add(new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(), splitStart, splitEnd));
splits.add(new KafkaInputSplit(pubSubTopicRepository, topicPartition, splitStart, splitEnd));
splitStart = splitEnd;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
Expand All @@ -37,7 +35,6 @@
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -70,8 +67,7 @@ public class KafkaInputRecordReader implements RecordReader<KafkaInputMapperKey,
private static final PubSubTopicRepository PUBSUB_TOPIC_REPOSITORY = new PubSubTopicRepository();

private final PubSubConsumerAdapter consumer;
private final TopicPartition topicPartition;
private final PubSubTopicPartition pubSubTopicPartition;
private final PubSubTopicPartition topicPartition;
private final long maxNumberOfRecords;
private final long startingOffset;
private long currentOffset;
Expand Down Expand Up @@ -129,8 +125,6 @@ public KafkaInputRecordReader(
KafkaInputSplit inputSplit = (KafkaInputSplit) split;
this.consumer = consumer;
this.topicPartition = inputSplit.getTopicPartition();
PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(topicPartition.topic());
this.pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopic, topicPartition.partition());
this.startingOffset = inputSplit.getStartingOffset();
this.currentOffset = inputSplit.getStartingOffset() - 1;
this.endingOffset = inputSplit.getEndingOffset();
Expand All @@ -148,7 +142,7 @@ public KafkaInputRecordReader(
if (!ownedConsumer) {
this.consumer.batchUnsubscribe(this.consumer.getAssignment());
}
this.consumer.subscribe(pubSubTopicPartition, currentOffset);
this.consumer.subscribe(topicPartition, currentOffset);
this.taskTracker = taskTracker;
LOGGER.info(
"KafkaInputRecordReader started for TopicPartition: {} starting offset: {} ending offset: {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,51 @@
package com.linkedin.venice.hadoop.input.kafka;

import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.kafka.common.TopicPartition;


/**
* We borrowed some idea from the open-sourced attic-crunch lib:
* https://github.com/apache/attic-crunch/blob/master/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputSplit.java
*
* InputSplit that represent retrieving data from a single {@link TopicPartition} between the specified start
* InputSplit that represent retrieving data from a single {@link PubSubTopicPartition} between the specified start
* and end offsets.
*/
public class KafkaInputSplit implements InputSplit {
private long startingOffset;
private long endingOffset;
private TopicPartition topicPartition;
private PubSubTopicPartition topicPartition;
private final PubSubTopicRepository topicRepository;

/**
* Nullary Constructor for creating the instance inside the Mapper instance.
*/
public KafkaInputSplit() {
topicRepository = new PubSubTopicRepository();
}

/**
* Constructs an input split for the provided {@param topic} and {@param partition} restricting data to be between
* the {@param startingOffset} and {@param endingOffset}
*
* @param topic the topic for the split
* @param partition the partition for the topic
* @param topicPartition the topic-partition for the split
* @param startingOffset the start of the split
* @param endingOffset the end of the split
*/
public KafkaInputSplit(String topic, int partition, long startingOffset, long endingOffset) {
public KafkaInputSplit(
PubSubTopicRepository topicRepository,
PubSubTopicPartition topicPartition,
long startingOffset,
long endingOffset) {
this.topicRepository = topicRepository;
this.startingOffset = startingOffset;
this.endingOffset = endingOffset;
topicPartition = new TopicPartition(topic, partition);
this.topicPartition = topicPartition;
}

@Override
Expand All @@ -57,7 +65,7 @@ public String[] getLocations() throws IOException {
*
* @return the topic and partition for the split
*/
public TopicPartition getTopicPartition() {
public PubSubTopicPartition getTopicPartition() {
return topicPartition;
}

Expand All @@ -81,8 +89,8 @@ public long getEndingOffset() {

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(topicPartition.topic());
dataOutput.writeInt(topicPartition.partition());
dataOutput.writeUTF(topicPartition.getTopicName());
dataOutput.writeInt(topicPartition.getPartitionNumber());
dataOutput.writeLong(startingOffset);
dataOutput.writeLong(endingOffset);
}
Expand All @@ -94,7 +102,7 @@ public void readFields(DataInput dataInput) throws IOException {
startingOffset = dataInput.readLong();
endingOffset = dataInput.readLong();

topicPartition = new TopicPartition(topic, partition);
topicPartition = new PubSubTopicPartitionImpl(topicRepository.getTopic(topic), partition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public void testNext() throws IOException {
Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> recordsMap = new HashMap<>();
recordsMap.put(pubSubTopicPartition, consumerRecordList);
when(consumer.poll(anyLong())).thenReturn(recordsMap, new HashMap<>());

KafkaInputSplit split = new KafkaInputSplit(topic, 0, 0, 102);
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), 0);
KafkaInputSplit split = new KafkaInputSplit(pubSubTopicRepository, topicPartition, 0, 102);
DataWriterTaskTracker taskTracker = new ReporterBackedMapReduceDataWriterTaskTracker(Reporter.NULL);
try (KafkaInputRecordReader reader =
new KafkaInputRecordReader(split, conf, taskTracker, consumer, pubSubTopicRepository)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperKey;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -30,6 +33,8 @@


public class TestKafkaInputDictTrainer {
private final static PubSubTopicRepository PUB_SUB_TOPIC_REPOSITORY = new PubSubTopicRepository();

private KafkaInputDictTrainer.CompressorBuilder getCompressorBuilder(VeniceCompressor mockCompressor) {
return (compressorFactory, compressionStrategy, kafkaUrl, topic, props) -> mockCompressor;
}
Expand All @@ -52,7 +57,9 @@ private KafkaInputDictTrainer.Param getParam(int sampleSize, CompressionStrategy
@Test(expectedExceptions = VeniceException.class, expectedExceptionsMessageRegExp = "No record.*")
public void testEmptyTopic() throws IOException {
KafkaInputFormat mockFormat = mock(KafkaInputFormat.class);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit("test_topic", 0, 0, 0) };
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(PUB_SUB_TOPIC_REPOSITORY.getTopic("test_topic"), 0);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 0) };
doReturn(splits).when(mockFormat).getSplitsByRecordsPerSplit(any(), anyLong());
RecordReader<KafkaInputMapperKey, KafkaInputMapperValue> mockRecordReader = mock(RecordReader.class);
doReturn(false).when(mockRecordReader).next(any(), any());
Expand Down Expand Up @@ -143,8 +150,10 @@ private ResettableRecordReader<KafkaInputMapperKey, KafkaInputMapperValue> mockR
@Test
public void testSamplingFromMultiplePartitions() throws IOException {
KafkaInputFormat mockFormat = mock(KafkaInputFormat.class);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit("test_topic", 0, 0, 2),
new KafkaInputSplit("test_topic", 0, 0, 2) };
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(PUB_SUB_TOPIC_REPOSITORY.getTopic("test_topic"), 0);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2),
new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2) };
doReturn(splits).when(mockFormat).getSplitsByRecordsPerSplit(any(), anyLong());

// Return 3 records
Expand Down Expand Up @@ -194,8 +203,10 @@ public void testSamplingFromMultiplePartitions() throws IOException {
@Test
public void testSamplingFromMultiplePartitionsWithSourceVersionCompressionEnabled() throws IOException {
KafkaInputFormat mockFormat = mock(KafkaInputFormat.class);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit("test_topic", 0, 0, 2),
new KafkaInputSplit("test_topic", 0, 0, 2) };
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(PUB_SUB_TOPIC_REPOSITORY.getTopic("test_topic"), 0);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2),
new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2) };
doReturn(splits).when(mockFormat).getSplitsByRecordsPerSplit(any(), anyLong());

// Return 3 records
Expand Down Expand Up @@ -244,8 +255,10 @@ public void testSamplingFromMultiplePartitionsWithSourceVersionCompressionEnable
@Test
public void testSamplingFromMultiplePartitionsWithSourceVersionCompressionEnabledWithChunking() throws IOException {
KafkaInputFormat mockFormat = mock(KafkaInputFormat.class);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit("test_topic", 0, 0, 2),
new KafkaInputSplit("test_topic", 0, 0, 2) };
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(PUB_SUB_TOPIC_REPOSITORY.getTopic("test_topic"), 0);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2),
new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2) };
doReturn(splits).when(mockFormat).getSplitsByRecordsPerSplit(any(), anyLong());

// Return 3 records
Expand Down
Loading

0 comments on commit cd915b9

Please sign in to comment.