Skip to content

Commit

Permalink
Remove arg
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Jan 21, 2025
1 parent a256fc3 commit 7cf7fd8
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public InputSplit[] getSplitsByRecordsPerSplit(JobConf job, long maxRecordsPerSp
long splitStart = 0;
while (splitStart < end) {
long splitEnd = Math.min(splitStart + maxRecordsPerSplit, end);
splits.add(new KafkaInputSplit(pubSubTopicRepository, topicPartition, splitStart, splitEnd));
splits.add(new KafkaInputSplit(topicPartition, splitStart, splitEnd));
splitStart = splitEnd;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class KafkaInputSplit implements InputSplit {
private long startingOffset;
private long endingOffset;
private PubSubTopicPartition topicPartition;
private PubSubTopicRepository topicRepository;
private final PubSubTopicRepository topicRepository = new PubSubTopicRepository();

/**
* Nullary Constructor for creating the instance inside the Mapper instance.
Expand All @@ -29,19 +29,14 @@ public KafkaInputSplit() {
}

/**
* Constructs an input split for the provided {@param topic} and {@param partition} restricting data to be between
* the {@param startingOffset} and {@param endingOffset}
* Constructs an input split for the provided {@param topic} and {@param partition} restricting data to be between the
* {@param startingOffset} and {@param endingOffset}
*
* @param topicPartition the topic-partition for the split
* @param topicPartition the topic-partition for the split
* @param startingOffset the start of the split
* @param endingOffset the end of the split
*/
public KafkaInputSplit(
PubSubTopicRepository topicRepository,
PubSubTopicPartition topicPartition,
long startingOffset,
long endingOffset) {
this.topicRepository = topicRepository;
public KafkaInputSplit(PubSubTopicPartition topicPartition, long startingOffset, long endingOffset) {
this.startingOffset = startingOffset;
this.endingOffset = endingOffset;
this.topicPartition = topicPartition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testNext() throws IOException {
recordsMap.put(pubSubTopicPartition, consumerRecordList);
when(consumer.poll(anyLong())).thenReturn(recordsMap, new HashMap<>());
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), 0);
KafkaInputSplit split = new KafkaInputSplit(pubSubTopicRepository, topicPartition, 0, 102);
KafkaInputSplit split = new KafkaInputSplit(topicPartition, 0, 102);
DataWriterTaskTracker taskTracker = new ReporterBackedMapReduceDataWriterTaskTracker(Reporter.NULL);
try (KafkaInputRecordReader reader =
new KafkaInputRecordReader(split, conf, taskTracker, consumer, pubSubTopicRepository)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testEmptyTopic() throws IOException {
KafkaInputFormat mockFormat = mock(KafkaInputFormat.class);
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(PUB_SUB_TOPIC_REPOSITORY.getTopic("test_topic"), 0);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 0) };
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit(topicPartition, 0, 0) };
doReturn(splits).when(mockFormat).getSplitsByRecordsPerSplit(any(), anyLong());
RecordReader<KafkaInputMapperKey, KafkaInputMapperValue> mockRecordReader = mock(RecordReader.class);
doReturn(false).when(mockRecordReader).next(any(), any());
Expand Down Expand Up @@ -152,8 +152,8 @@ public void testSamplingFromMultiplePartitions() throws IOException {
KafkaInputFormat mockFormat = mock(KafkaInputFormat.class);
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(PUB_SUB_TOPIC_REPOSITORY.getTopic("test_topic"), 0);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2),
new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2) };
InputSplit[] splits =
new KafkaInputSplit[] { new KafkaInputSplit(topicPartition, 0, 2), new KafkaInputSplit(topicPartition, 0, 2) };
doReturn(splits).when(mockFormat).getSplitsByRecordsPerSplit(any(), anyLong());

// Return 3 records
Expand Down Expand Up @@ -205,8 +205,8 @@ public void testSamplingFromMultiplePartitionsWithSourceVersionCompressionEnable
KafkaInputFormat mockFormat = mock(KafkaInputFormat.class);
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(PUB_SUB_TOPIC_REPOSITORY.getTopic("test_topic"), 0);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2),
new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2) };
InputSplit[] splits =
new KafkaInputSplit[] { new KafkaInputSplit(topicPartition, 0, 2), new KafkaInputSplit(topicPartition, 0, 2) };
doReturn(splits).when(mockFormat).getSplitsByRecordsPerSplit(any(), anyLong());

// Return 3 records
Expand Down Expand Up @@ -257,8 +257,8 @@ public void testSamplingFromMultiplePartitionsWithSourceVersionCompressionEnable
KafkaInputFormat mockFormat = mock(KafkaInputFormat.class);
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(PUB_SUB_TOPIC_REPOSITORY.getTopic("test_topic"), 0);
InputSplit[] splits = new KafkaInputSplit[] { new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2),
new KafkaInputSplit(PUB_SUB_TOPIC_REPOSITORY, topicPartition, 0, 2) };
InputSplit[] splits =
new KafkaInputSplit[] { new KafkaInputSplit(topicPartition, 0, 2), new KafkaInputSplit(topicPartition, 0, 2) };
doReturn(splits).when(mockFormat).getSplitsByRecordsPerSplit(any(), anyLong());

// Return 3 records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testNext() throws IOException {
conf.set(KAFKA_INPUT_TOPIC, topic);

try (KafkaInputRecordReader reader =
new KafkaInputRecordReader(new KafkaInputSplit(pubSubTopicRepository, topicPartition, 0, 102), conf, null)) {
new KafkaInputRecordReader(new KafkaInputSplit(topicPartition, 0, 102), conf, null)) {
for (int i = 0; i < 100; ++i) {
KafkaInputMapperKey key = new KafkaInputMapperKey();
KafkaInputMapperValue value = new KafkaInputMapperValue();
Expand All @@ -116,7 +116,7 @@ public void testNextWithDeleteMessage() throws IOException {
conf.set(KAFKA_INPUT_TOPIC, topic);
conf.set(KAFKA_SOURCE_KEY_SCHEMA_STRING_PROP, ChunkedKeySuffix.SCHEMA$.toString());
try (KafkaInputRecordReader reader =
new KafkaInputRecordReader(new KafkaInputSplit(pubSubTopicRepository, topicPartition, 0, 102), conf, null)) {
new KafkaInputRecordReader(new KafkaInputSplit(topicPartition, 0, 102), conf, null)) {
for (int i = 0; i < 100; ++i) {
KafkaInputMapperKey key = new KafkaInputMapperKey();
KafkaInputMapperValue value = new KafkaInputMapperValue();
Expand Down Expand Up @@ -145,7 +145,7 @@ public void testNextWithUpdateMessage() throws IOException {
conf.set(KAFKA_INPUT_TOPIC, topic);
conf.set(KAFKA_SOURCE_KEY_SCHEMA_STRING_PROP, ChunkedKeySuffix.SCHEMA$.toString());
try (KafkaInputRecordReader reader =
new KafkaInputRecordReader(new KafkaInputSplit(pubSubTopicRepository, topicPartition, 0, 102), conf, null)) {
new KafkaInputRecordReader(new KafkaInputSplit(topicPartition, 0, 102), conf, null)) {
for (int i = 0; i < 100; ++i) {
KafkaInputMapperKey key = new KafkaInputMapperKey();
KafkaInputMapperValue value = new KafkaInputMapperValue();
Expand Down

0 comments on commit 7cf7fd8

Please sign in to comment.