Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vpj] Replace org.apache.kafka.common.TopicPartition with PubSubTopicPartition in VPJ #1460

Merged
merged 4 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -4173,7 +4172,7 @@ public boolean isPartitionConsumingOrHasPendingIngestionAction(int userPartition
}

/**
* Override the {@link CommonClientConfigs#BOOTSTRAP_SERVERS_CONFIG} config with a remote Kafka bootstrap url.
* Override the {@link com.linkedin.venice.ConfigKeys#KAFKA_BOOTSTRAP_SERVERS} config with a remote Kafka bootstrap url.
*/
protected Properties createKafkaConsumerProperties(
Properties localConsumerProps,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ synchronized byte[] trainDict(Optional<PubSubConsumerAdapter> reusedConsumerOpti
// Get one split per partition
KafkaInputSplit[] splits = (KafkaInputSplit[]) kafkaInputFormat.getSplitsByRecordsPerSplit(jobConf, Long.MAX_VALUE);
// The following sort is trying to get a deterministic dict with the same input.
Arrays.sort(splits, Comparator.comparingInt(o -> o.getTopicPartition().partition()));
Arrays.sort(splits, Comparator.comparingInt(o -> o.getTopicPartition().getPartitionNumber()));
// Try to gather some records from each partition
PushJobZstdConfig zstdConfig = new PushJobZstdConfig(props, splits.length);
ZstdDictTrainer trainer = trainerSupplier.orElseGet(zstdConfig::getZstdDictTrainer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue;
import com.linkedin.venice.hadoop.mapreduce.datawriter.task.ReporterBackedMapReduceDataWriterTaskTracker;
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapterFactory;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.api.exceptions.PubSubOpTimeoutException;
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.pubsub.manager.TopicManagerContext;
Expand All @@ -30,7 +33,6 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.kafka.common.TopicPartition;


/**
Expand All @@ -49,7 +51,7 @@ public class KafkaInputFormat implements InputFormat<KafkaInputMapperKey, KafkaI
public static final long DEFAULT_KAFKA_INPUT_MAX_RECORDS_PER_MAPPER = 5000000L;
private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

protected Map<TopicPartition, Long> getLatestOffsets(JobConf config) {
protected Map<PubSubTopicPartition, Long> getLatestOffsets(JobConf config) {
VeniceProperties consumerProperties = KafkaInputUtils.getConsumerProperties(config);
TopicManagerContext topicManagerContext =
new TopicManagerContext.Builder().setPubSubPropertiesSupplier(k -> consumerProperties)
sushantmane marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -61,16 +63,16 @@ protected Map<TopicPartition, Long> getLatestOffsets(JobConf config) {
.build();
try (TopicManager topicManager =
new TopicManagerRepository(topicManagerContext, config.get(KAFKA_INPUT_BROKER_URL)).getLocalTopicManager()) {
String topic = config.get(KAFKA_INPUT_TOPIC);

PubSubTopic topic = pubSubTopicRepository.getTopic(config.get(KAFKA_INPUT_TOPIC));
Map<Integer, Long> latestOffsets = RetryUtils.executeWithMaxAttempt(
() -> topicManager.getTopicLatestOffsets(pubSubTopicRepository.getTopic(topic)),
() -> topicManager.getTopicLatestOffsets(topic),
10,
Duration.ofMinutes(1),
Arrays.asList(PubSubOpTimeoutException.class));
Map<TopicPartition, Long> partitionOffsetMap = new HashMap<>(latestOffsets.size());
Map<PubSubTopicPartition, Long> partitionOffsetMap = new HashMap<>(latestOffsets.size());
latestOffsets.forEach(
(partitionId, latestOffset) -> partitionOffsetMap.put(new TopicPartition(topic, partitionId), latestOffset));
(partitionId, latestOffset) -> partitionOffsetMap
.put(new PubSubTopicPartitionImpl(topic, partitionId), latestOffset));
return partitionOffsetMap;
}
}
Expand All @@ -92,7 +94,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
}

public InputSplit[] getSplitsByRecordsPerSplit(JobConf job, long maxRecordsPerSplit) {
Map<TopicPartition, Long> latestOffsets = getLatestOffsets(job);
Map<PubSubTopicPartition, Long> latestOffsets = getLatestOffsets(job);
List<InputSplit> splits = new LinkedList<>();
latestOffsets.forEach((topicPartition, end) -> {

Expand All @@ -103,7 +105,7 @@ public InputSplit[] getSplitsByRecordsPerSplit(JobConf job, long maxRecordsPerSp
long splitStart = 0;
while (splitStart < end) {
long splitEnd = Math.min(splitStart + maxRecordsPerSplit, end);
splits.add(new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(), splitStart, splitEnd));
splits.add(new KafkaInputSplit(pubSubTopicRepository, topicPartition, splitStart, splitEnd));
splitStart = splitEnd;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
Expand All @@ -37,7 +35,6 @@
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -70,8 +67,7 @@ public class KafkaInputRecordReader implements RecordReader<KafkaInputMapperKey,
private static final PubSubTopicRepository PUBSUB_TOPIC_REPOSITORY = new PubSubTopicRepository();

private final PubSubConsumerAdapter consumer;
private final TopicPartition topicPartition;
private final PubSubTopicPartition pubSubTopicPartition;
private final PubSubTopicPartition topicPartition;
private final long maxNumberOfRecords;
private final long startingOffset;
private long currentOffset;
Expand Down Expand Up @@ -129,8 +125,6 @@ public KafkaInputRecordReader(
KafkaInputSplit inputSplit = (KafkaInputSplit) split;
this.consumer = consumer;
this.topicPartition = inputSplit.getTopicPartition();
PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(topicPartition.topic());
this.pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopic, topicPartition.partition());
this.startingOffset = inputSplit.getStartingOffset();
this.currentOffset = inputSplit.getStartingOffset() - 1;
this.endingOffset = inputSplit.getEndingOffset();
Expand All @@ -148,7 +142,7 @@ public KafkaInputRecordReader(
if (!ownedConsumer) {
this.consumer.batchUnsubscribe(this.consumer.getAssignment());
}
this.consumer.subscribe(pubSubTopicPartition, currentOffset);
this.consumer.subscribe(topicPartition, currentOffset);
this.taskTracker = taskTracker;
LOGGER.info(
"KafkaInputRecordReader started for TopicPartition: {} starting offset: {} ending offset: {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,51 @@
package com.linkedin.venice.hadoop.input.kafka;

import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.kafka.common.TopicPartition;


/**
* We borrowed some idea from the open-sourced attic-crunch lib:
* https://github.com/apache/attic-crunch/blob/master/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputSplit.java
*
* InputSplit that represent retrieving data from a single {@link TopicPartition} between the specified start
* InputSplit that represent retrieving data from a single {@link PubSubTopicPartition} between the specified start
* and end offsets.
*/
public class KafkaInputSplit implements InputSplit {
private long startingOffset;
private long endingOffset;
private TopicPartition topicPartition;
private PubSubTopicPartition topicPartition;
private final PubSubTopicRepository topicRepository;

/**
* Nullary Constructor for creating the instance inside the Mapper instance.
*/
public KafkaInputSplit() {
topicRepository = new PubSubTopicRepository();
}

/**
* Constructs an input split for the provided {@param topic} and {@param partition} restricting data to be between
* the {@param startingOffset} and {@param endingOffset}
*
* @param topic the topic for the split
* @param partition the partition for the topic
* @param topicPartition the topic-partition for the split
* @param startingOffset the start of the split
* @param endingOffset the end of the split
*/
public KafkaInputSplit(String topic, int partition, long startingOffset, long endingOffset) {
public KafkaInputSplit(
PubSubTopicRepository topicRepository,
PubSubTopicPartition topicPartition,
long startingOffset,
long endingOffset) {
this.topicRepository = topicRepository;
this.startingOffset = startingOffset;
this.endingOffset = endingOffset;
topicPartition = new TopicPartition(topic, partition);
this.topicPartition = topicPartition;
}

@Override
Expand All @@ -57,7 +65,7 @@ public String[] getLocations() throws IOException {
*
* @return the topic and partition for the split
*/
public TopicPartition getTopicPartition() {
public PubSubTopicPartition getTopicPartition() {
return topicPartition;
}

Expand All @@ -81,8 +89,8 @@ public long getEndingOffset() {

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(topicPartition.topic());
dataOutput.writeInt(topicPartition.partition());
dataOutput.writeUTF(topicPartition.getTopicName());
dataOutput.writeInt(topicPartition.getPartitionNumber());
dataOutput.writeLong(startingOffset);
dataOutput.writeLong(endingOffset);
}
Expand All @@ -94,7 +102,7 @@ public void readFields(DataInput dataInput) throws IOException {
startingOffset = dataInput.readLong();
endingOffset = dataInput.readLong();

topicPartition = new TopicPartition(topic, partition);
topicPartition = new PubSubTopicPartitionImpl(topicRepository.getTopic(topic), partition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public void testNext() throws IOException {
Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> recordsMap = new HashMap<>();
recordsMap.put(pubSubTopicPartition, consumerRecordList);
when(consumer.poll(anyLong())).thenReturn(recordsMap, new HashMap<>());

KafkaInputSplit split = new KafkaInputSplit(topic, 0, 0, 102);
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), 0);
KafkaInputSplit split = new KafkaInputSplit(pubSubTopicRepository, topicPartition, 0, 102);
DataWriterTaskTracker taskTracker = new ReporterBackedMapReduceDataWriterTaskTracker(Reporter.NULL);
try (KafkaInputRecordReader reader =
new KafkaInputRecordReader(split, conf, taskTracker, consumer, pubSubTopicRepository)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperKey;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -30,6 +33,8 @@


public class TestKafkaInputDictTrainer {
private final static PubSubTopicRepository PUB_SUB_TOPIC_REPOSITORY = new PubSubTopicRepository();

private KafkaInputDictTrainer.CompressorBuilder getCompressorBuilder(VeniceCompressor mockCompressor) {
return (compressorFactory, compressionStrategy, kafkaUrl, topic, props) -> mockCompressor;
}
Expand All @@ -52,7 +57,9 @@ private KafkaInputDictTrainer.Param getParam(int sampleSize, CompressionStrategy
@Test(expectedExceptions = VeniceException.class, expectedExceptionsMessageRegExp = "No record.*")
public void testEmptyTopic() throws IOException {
KafkaInputFormat mockFormat = mock(KafkaInputFormat.class);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit("test_topic", 0, 0, 0) };
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(PUB_SUB_TOPIC_REPOSITORY.getTopic("test_topic"), 0);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 0) };
doReturn(splits).when(mockFormat).getSplitsByRecordsPerSplit(any(), anyLong());
RecordReader<KafkaInputMapperKey, KafkaInputMapperValue> mockRecordReader = mock(RecordReader.class);
doReturn(false).when(mockRecordReader).next(any(), any());
Expand Down Expand Up @@ -143,8 +150,10 @@ private ResettableRecordReader<KafkaInputMapperKey, KafkaInputMapperValue> mockR
@Test
public void testSamplingFromMultiplePartitions() throws IOException {
KafkaInputFormat mockFormat = mock(KafkaInputFormat.class);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit("test_topic", 0, 0, 2),
new KafkaInputSplit("test_topic", 0, 0, 2) };
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(PUB_SUB_TOPIC_REPOSITORY.getTopic("test_topic"), 0);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2),
new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2) };
doReturn(splits).when(mockFormat).getSplitsByRecordsPerSplit(any(), anyLong());

// Return 3 records
Expand Down Expand Up @@ -194,8 +203,10 @@ public void testSamplingFromMultiplePartitions() throws IOException {
@Test
public void testSamplingFromMultiplePartitionsWithSourceVersionCompressionEnabled() throws IOException {
KafkaInputFormat mockFormat = mock(KafkaInputFormat.class);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit("test_topic", 0, 0, 2),
new KafkaInputSplit("test_topic", 0, 0, 2) };
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(PUB_SUB_TOPIC_REPOSITORY.getTopic("test_topic"), 0);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2),
new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2) };
doReturn(splits).when(mockFormat).getSplitsByRecordsPerSplit(any(), anyLong());

// Return 3 records
Expand Down Expand Up @@ -244,8 +255,10 @@ public void testSamplingFromMultiplePartitionsWithSourceVersionCompressionEnable
@Test
public void testSamplingFromMultiplePartitionsWithSourceVersionCompressionEnabledWithChunking() throws IOException {
KafkaInputFormat mockFormat = mock(KafkaInputFormat.class);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit("test_topic", 0, 0, 2),
new KafkaInputSplit("test_topic", 0, 0, 2) };
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(PUB_SUB_TOPIC_REPOSITORY.getTopic("test_topic"), 0);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2),
new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2) };
doReturn(splits).when(mockFormat).getSplitsByRecordsPerSplit(any(), anyLong());

// Return 3 records
Expand Down
Loading
Loading