Skip to content

Commit

Permalink
Add unit tests and make VeniceViewWriter version specific (it already…
Browse files Browse the repository at this point in the history
… is implicitly)
  • Loading branch information
xunyin8 committed Nov 13, 2024
1 parent 8e3a625 commit a711263
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1513,7 +1513,6 @@ protected CompletableFuture[] processViewWriters(
mergeConflictResult.getNewValue(),
oldValueBB,
keyBytes,
versionNumber,
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3690,13 +3690,8 @@ protected void processControlMessageForViews(

// Iterate through list of views for the store and process the control message.
for (VeniceViewWriter viewWriter: viewWriters.values()) {
viewWriter.processControlMessage(
kafkaKey,
kafkaMessageEnvelope,
controlMessage,
partition,
partitionConsumptionState,
this.versionNumber);
viewWriter
.processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, partition, partitionConsumptionState);
}
}

Expand Down Expand Up @@ -3968,7 +3963,6 @@ protected CompletableFuture[] processViewWriters(
viewWriterFutures[index++] = writer.processRecord(
writeComputeResultWrapper.getNewPut().putValue,
keyBytes,
versionNumber,
writeComputeResultWrapper.getNewPut().schemaId);
}
return viewWriterFutures;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ public class ChangeCaptureViewWriter extends VeniceViewWriter {
public ChangeCaptureViewWriter(
VeniceConfigLoader props,
Store store,
int version,
Schema keySchema,
Map<String, String> extraViewParameters) {
super(props, store, keySchema, extraViewParameters);
super(props, store, version, keySchema, extraViewParameters);
internalView = new ChangeCaptureView(props.getCombinedProperties().toProperties(), store, extraViewParameters);
kafkaClusterUrlToIdMap = props.getVeniceServerConfig().getKafkaClusterUrlToIdMap();
pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory();
Expand All @@ -63,7 +64,6 @@ public CompletableFuture<PubSubProduceResult> processRecord(
ByteBuffer newValue,
ByteBuffer oldValue,
byte[] key,
int version,
int newValueSchemaId,
int oldValueSchemaId,
GenericRecord replicationMetadataRecord) {
Expand All @@ -77,18 +77,14 @@ public CompletableFuture<PubSubProduceResult> processRecord(
recordChangeEvent.replicationCheckpointVector = RmdUtils.extractOffsetVectorFromRmd(replicationMetadataRecord);

if (veniceWriter == null) {
initializeVeniceWriter(version);
initializeVeniceWriter();
}
// TODO: RecordChangeEvent isn't versioned today.
return veniceWriter.put(key, recordChangeEvent, 1);
}

@Override
public CompletableFuture<PubSubProduceResult> processRecord(
ByteBuffer newValue,
byte[] key,
int version,
int newValueSchemaId) {
public CompletableFuture<PubSubProduceResult> processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) {
// No op
return CompletableFuture.completedFuture(null);
}
Expand All @@ -99,8 +95,7 @@ public void processControlMessage(
KafkaMessageEnvelope kafkaMessageEnvelope,
ControlMessage controlMessage,
int partition,
PartitionConsumptionState partitionConsumptionState,
int version) {
PartitionConsumptionState partitionConsumptionState) {

// We only care (for now) about version swap control Messages
if (!(controlMessage.getControlMessageUnion() instanceof VersionSwap)) {
Expand Down Expand Up @@ -136,7 +131,7 @@ public void processControlMessage(

// Write the message on veniceWriter to the change capture topic
if (veniceWriter == null) {
initializeVeniceWriter(version);
initializeVeniceWriter();
}

veniceWriter.sendControlMessage(
Expand Down Expand Up @@ -193,7 +188,7 @@ VeniceWriterOptions buildWriterOptions(int version) {
return setProducerOptimizations(configBuilder).build();
}

synchronized private void initializeVeniceWriter(int version) {
synchronized private void initializeVeniceWriter() {
if (veniceWriter != null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ public class MaterializedViewWriter extends VeniceViewWriter {
public MaterializedViewWriter(
VeniceConfigLoader props,
Store store,
int version,
Schema keySchema,
Map<String, String> extraViewParameters,
Clock clock) {
super(props, store, keySchema, extraViewParameters);
super(props, store, version, keySchema, extraViewParameters);
pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory();
internalView = new MaterializedView(props.getCombinedProperties().toProperties(), store, extraViewParameters);
this.clock = clock;
Expand All @@ -70,31 +71,34 @@ public MaterializedViewWriter(
public MaterializedViewWriter(
VeniceConfigLoader props,
Store store,
int version,
Schema keySchema,
Map<String, String> extraViewParameters) {
this(props, store, keySchema, extraViewParameters, Clock.systemUTC());
this(props, store, version, keySchema, extraViewParameters, Clock.systemUTC());
}

/**
* package private for testing purpose
*/
void setVeniceWriter(VeniceWriter veniceWriter) {
this.veniceWriter = veniceWriter;
}

@Override
public CompletableFuture<PubSubProduceResult> processRecord(
ByteBuffer newValue,
ByteBuffer oldValue,
byte[] key,
int version,
int newValueSchemaId,
int oldValueSchemaId,
GenericRecord replicationMetadataRecord) {
return processRecord(newValue, key, version, newValueSchemaId);
return processRecord(newValue, key, newValueSchemaId);
}

@Override
public CompletableFuture<PubSubProduceResult> processRecord(
ByteBuffer newValue,
byte[] key,
int version,
int newValueSchemaId) {
public CompletableFuture<PubSubProduceResult> processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) {
if (veniceWriter == null) {
initializeVeniceWriter(version);
initializeVeniceWriter();
}
return veniceWriter.put(key, newValue.array(), newValueSchemaId);
}
Expand All @@ -105,16 +109,14 @@ public void processControlMessage(
KafkaMessageEnvelope kafkaMessageEnvelope,
ControlMessage controlMessage,
int partition,
PartitionConsumptionState partitionConsumptionState,
int version) {
PartitionConsumptionState partitionConsumptionState) {
final ControlMessageType type = ControlMessageType.valueOf(controlMessage);
// Ignore other control messages for materialized view.
if (type == ControlMessageType.START_OF_SEGMENT && Arrays.equals(kafkaKey.getKey(), KafkaKey.HEART_BEAT.getKey())) {
maybePropagateHeartbeatLowWatermarkToViewTopic(
partition,
partitionConsumptionState,
kafkaMessageEnvelope.producerMetadata.messageTimestamp,
version);
kafkaMessageEnvelope.getProducerMetadata().getMessageTimestamp());
}
}

Expand All @@ -137,7 +139,7 @@ VeniceWriterOptions buildWriterOptions(int version) {
return setProducerOptimizations(configBuilder).build();
}

synchronized private void initializeVeniceWriter(int version) {
synchronized private void initializeVeniceWriter() {
if (veniceWriter == null) {
veniceWriter = new VeniceWriterFactory(props, pubSubProducerAdapterFactory, null)
.createVeniceWriter(buildWriterOptions(version));
Expand All @@ -162,10 +164,9 @@ synchronized private void initializeVeniceWriter(int version) {
private void maybePropagateHeartbeatLowWatermarkToViewTopic(
int partition,
PartitionConsumptionState partitionConsumptionState,
long heartbeatTimestamp,
int version) {
long heartbeatTimestamp) {
boolean propagate = false;
long oldestHeartbeatTimestamp;
long oldestHeartbeatTimestamp = 0;
broadcastHBLock.lock();
try {
partitionToHeartbeatTimestampMap.put(partition, heartbeatTimestamp);
Expand All @@ -178,14 +179,17 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic(
propagate = true;
lastHBBroadcastTimestamp = now;
}
// We have determined that the oldestHeartbeatTimestamp offers no value in monitoring the lag for this view
// topic since it's within the DEFAULT_HEARTBEAT_BROADCAST_DELAY_THRESHOLD. We are also clearing the map, so we
// don't need to worry about removing timestamps belonging to partitions that we are no longer leader of.
partitionToHeartbeatTimestampMap.clear();
}
} finally {
broadcastHBLock.unlock();
}
if (propagate) {
if (propagate && oldestHeartbeatTimestamp > 0) {
if (veniceWriter == null) {
initializeVeniceWriter(version);
initializeVeniceWriter();
}
LeaderCompleteState leaderCompleteState =
LeaderCompleteState.getLeaderCompleteState(partitionConsumptionState.isCompletionReported());
Expand All @@ -205,7 +209,7 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic(
VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER,
true,
leaderCompleteState,
heartbeatTimestamp);
oldestHeartbeatTimestamp);
heartBeatFuture.whenComplete((ignore, throwable) -> {
if (throwable != null) {
completionException.set(new CompletionException(throwable));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@
* view implementations.
*/
public abstract class VeniceViewWriter extends VeniceView {
protected final int version;

public VeniceViewWriter(
VeniceConfigLoader props,
Store store,
int version,
Schema keySchema,
Map<String, String> extraViewParameters) {
super(props.getCombinedProperties().toProperties(), store, extraViewParameters);
this.version = version;
}

/**
Expand All @@ -52,7 +56,6 @@ public abstract CompletableFuture<PubSubProduceResult> processRecord(
ByteBuffer newValue,
ByteBuffer oldValue,
byte[] key,
int version,
int newValueSchemaId,
int oldValueSchemaId,
GenericRecord replicationMetadataRecord);
Expand All @@ -69,7 +72,6 @@ public abstract CompletableFuture<PubSubProduceResult> processRecord(
public abstract CompletableFuture<PubSubProduceResult> processRecord(
ByteBuffer newValue,
byte[] key,
int version,
int newValueSchemaId);

/**
Expand All @@ -85,15 +87,13 @@ public abstract CompletableFuture<PubSubProduceResult> processRecord(
* @param controlMessage the control message we're processing
* @param partition the partition this control message was delivered to
* @param partitionConsumptionState the pcs of the consuming node
* @param version the store version that received this message
*/
public void processControlMessage(
KafkaKey kafkaKey,
KafkaMessageEnvelope kafkaMessageEnvelope,
ControlMessage controlMessage,
int partition,
PartitionConsumptionState partitionConsumptionState,
int version) {
PartitionConsumptionState partitionConsumptionState) {
// Optionally act on Control Message
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public Map<String, VeniceViewWriter> buildStoreViewWriters(Store store, int vers
String className = viewConfig.getValue().getViewClassName();
Map<String, String> extraParams = viewConfig.getValue().getViewParameters();
VeniceViewWriter viewWriter =
ViewWriterUtils.getVeniceViewWriter(className, properties, store, keySchema, extraParams);
ViewWriterUtils.getVeniceViewWriter(className, properties, store, version, keySchema, extraParams);
storeViewWriters.put(viewConfig.getKey(), viewWriter);
}
return storeViewWriters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public static VeniceViewWriter getVeniceViewWriter(
String viewClass,
VeniceConfigLoader configLoader,
Store store,
int version,
Schema keySchema,
Map<String, String> extraViewParameters) {
Properties params = configLoader.getCombinedProperties().toProperties();
Expand All @@ -25,8 +26,8 @@ public static VeniceViewWriter getVeniceViewWriter(

VeniceViewWriter viewWriter = ReflectUtils.callConstructor(
ReflectUtils.loadClass(view.getWriterClassName()),
new Class<?>[] { VeniceConfigLoader.class, Store.class, Schema.class, Map.class },
new Object[] { configLoader, store, keySchema, extraViewParameters });
new Class<?>[] { VeniceConfigLoader.class, Store.class, Integer.TYPE, Schema.class, Map.class },
new Object[] { configLoader, store, version, keySchema, extraViewParameters });

return viewWriter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testConstructVersionSwapMessage() {

// Build the change capture writer and set the mock writer
ChangeCaptureViewWriter changeCaptureViewWriter =
new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, SCHEMA, Collections.emptyMap());
new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, 1, SCHEMA, Collections.emptyMap());
changeCaptureViewWriter.setVeniceWriter(mockVeniceWriter);

// Verify that we never produce the version swap from a follower replica
Expand All @@ -120,8 +120,7 @@ public void testConstructVersionSwapMessage() {
kafkaMessageEnvelope,
controlMessage,
1,
mockFollowerPartitionConsumptionState,
1);
mockFollowerPartitionConsumptionState);
Mockito.verify(mockVeniceWriter, Mockito.never())
.sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any());

Expand All @@ -133,8 +132,7 @@ public void testConstructVersionSwapMessage() {
kafkaMessageEnvelope,
ignoredControlMessage,
1,
mockLeaderPartitionConsumptionState,
1);
mockLeaderPartitionConsumptionState);
Mockito.verify(mockVeniceWriter, Mockito.never())
.sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any());

Expand All @@ -148,18 +146,12 @@ public void testConstructVersionSwapMessage() {
kafkaMessageEnvelope,
ignoredControlMessage,
1,
mockLeaderPartitionConsumptionState,
1);
mockLeaderPartitionConsumptionState);
Mockito.verify(mockVeniceWriter, Mockito.never())
.sendControlMessage(Mockito.any(), Mockito.anyInt(), Mockito.anyMap(), Mockito.any(), Mockito.any());

changeCaptureViewWriter.processControlMessage(
kafkaKey,
kafkaMessageEnvelope,
controlMessage,
1,
mockLeaderPartitionConsumptionState,
1);
changeCaptureViewWriter
.processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, mockLeaderPartitionConsumptionState);
ArgumentCaptor<ControlMessage> messageArgumentCaptor = ArgumentCaptor.forClass(ControlMessage.class);

// Verify and capture input
Expand Down Expand Up @@ -210,7 +202,7 @@ public void testBuildWriterOptions() {
Mockito.when(mockVeniceConfigLoader.getVeniceServerConfig()).thenReturn(mockVeniceServerConfig);

ChangeCaptureViewWriter changeCaptureViewWriter =
new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, SCHEMA, Collections.emptyMap());
new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, 1, SCHEMA, Collections.emptyMap());

VeniceWriterOptions writerOptions = changeCaptureViewWriter.buildWriterOptions(1);

Expand Down Expand Up @@ -245,7 +237,7 @@ public void testProcessRecord() throws ExecutionException, InterruptedException
Mockito.when(mockVeniceConfigLoader.getVeniceServerConfig()).thenReturn(mockVeniceServerConfig);

ChangeCaptureViewWriter changeCaptureViewWriter =
new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, SCHEMA, Collections.emptyMap());
new ChangeCaptureViewWriter(mockVeniceConfigLoader, mockStore, 1, SCHEMA, Collections.emptyMap());

Schema rmdSchema = RmdSchemaGenerator.generateMetadataSchema(SCHEMA, 1);
List<Long> vectors = Arrays.asList(1L, 2L, 3L);
Expand All @@ -255,13 +247,13 @@ public void testProcessRecord() throws ExecutionException, InterruptedException
changeCaptureViewWriter.setVeniceWriter(mockVeniceWriter);

// Update Case
changeCaptureViewWriter.processRecord(NEW_VALUE, OLD_VALUE, KEY, 1, 1, 1, rmdRecordWithValueLevelTimeStamp).get();
changeCaptureViewWriter.processRecord(NEW_VALUE, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get();

// Insert Case
changeCaptureViewWriter.processRecord(NEW_VALUE, null, KEY, 1, 1, 1, rmdRecordWithValueLevelTimeStamp).get();
changeCaptureViewWriter.processRecord(NEW_VALUE, null, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get();

// Deletion Case
changeCaptureViewWriter.processRecord(null, OLD_VALUE, KEY, 1, 1, 1, rmdRecordWithValueLevelTimeStamp).get();
changeCaptureViewWriter.processRecord(null, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get();

// Set up argument captors
ArgumentCaptor<byte[]> keyCaptor = ArgumentCaptor.forClass(byte[].class);
Expand Down
Loading

0 comments on commit a711263

Please sign in to comment.