Skip to content

Commit

Permalink
Address more review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xunyin8 committed Jan 7, 2025
1 parent e4484d3 commit 1e7493f
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,10 @@ protected void processMessageAndMaybeProduceToKafka(
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture();
CompletableFuture[] viewWriterFutures =
processViewWriters(partitionConsumptionState, keyBytes, mergeConflictResultWrapper);
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
hostLevelIngestionStats
.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
producePutOrDeleteToKafka(
mergeConflictResultWrapper,
Expand Down Expand Up @@ -1440,7 +1442,7 @@ CompletableFuture[] processViewWriters(
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
for (VeniceViewWriter writer: viewWriters.values()) {
viewWriterFutures[index++] = writer.processRecord(
mergeConflictResult.getNewValue(),
mergeConflictResultWrapper.getUpdatedValueBytes(),
oldValueBB,
keyBytes,
mergeConflictResult.getValueSchemaId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3363,8 +3363,9 @@ protected void processMessageAndMaybeProduceToKafka(
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture<>();
CompletableFuture[] viewWriterFutures =
processViewWriters(partitionConsumptionState, keyBytes, writeComputeResultWrapper);
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
hostLevelIngestionStats.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
produceToLocalKafkaHelper(
consumerRecord,
Expand Down Expand Up @@ -3982,7 +3983,7 @@ void reportCompleted(PartitionConsumptionState partitionConsumptionState, boolea
Set<String> getKafkaUrlSetFromTopicSwitch(TopicSwitchWrapper topicSwitchWrapper) {
if (isSeparatedRealtimeTopicEnabled()) {
Set<String> result = new HashSet<>();
for (String server : topicSwitchWrapper.getSourceServers()) {
for (String server: topicSwitchWrapper.getSourceServers()) {
result.add(server);
result.add(server + Utils.SEPARATE_TOPIC_SUFFIX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class HostLevelIngestionStats extends AbstractVeniceStats {
private final Sensor resubscriptionFailureSensor;

private final Sensor viewProducerLatencySensor;
private final Sensor viewProducerAckLatencySensor;
/**
* Sensors for emitting if/when we detect DCR violations (such as a backwards timestamp or receding offset vector)
*/
Expand Down Expand Up @@ -311,6 +312,12 @@ public HostLevelIngestionStats(
() -> totalStats.viewProducerLatencySensor,
avgAndMax());

this.viewProducerAckLatencySensor = registerPerStoreAndTotalSensor(
"total_view_writer_ack_latency",
totalStats,
() -> totalStats.viewProducerAckLatencySensor,
avgAndMax());

registerSensor(
"storage_quota_used",
new AsyncGauge((ignored, ignored2) -> hybridQuotaUsageGauge, "storage_quota_used"));
Expand Down Expand Up @@ -513,6 +520,10 @@ public void recordViewProducerLatency(double latency) {
viewProducerLatencySensor.record(latency);
}

public void recordViewProducerAckLatency(double latency) {
viewProducerAckLatencySensor.record(latency);
}

public void recordUnexpectedMessage() {
unexpectedMessageSensor.record();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.views.MaterializedView;
import com.linkedin.venice.writer.LeaderCompleteState;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -34,14 +37,20 @@
import org.apache.logging.log4j.Logger;


/**
* Materialized view writer is responsible for processing input records from the version topic and write them to the
* materialized view topic based on parameters defined in {@link com.linkedin.venice.meta.MaterializedViewParameters}.
* This writer has its own {@link VeniceWriter} and will also propagate heartbeat messages differently. See details in
* the doc for {@link #maybePropagateHeartbeatLowWatermarkToViewTopic} method.
*/
public class MaterializedViewWriter extends VeniceViewWriter {
private final PubSubProducerAdapterFactory pubSubProducerAdapterFactory;
private final MaterializedView internalView;
private final ReentrantLock broadcastHBLock = new ReentrantLock();
private final Map<Integer, Long> partitionToHeartbeatTimestampMap = new HashMap<>();
private final Clock clock;
private final Time time;
private final String materializedViewTopicName;
private VeniceWriter veniceWriter;
private Lazy<VeniceWriter> veniceWriter;
private long lastHBBroadcastTimestamp;

/**
Expand All @@ -59,29 +68,32 @@ public MaterializedViewWriter(
Version version,
Schema keySchema,
Map<String, String> extraViewParameters,
Clock clock) {
Time time) {
super(props, version, keySchema, extraViewParameters);
pubSubProducerAdapterFactory = props.getVeniceServerConfig().getPubSubClientsFactory().getProducerAdapterFactory();
internalView =
new MaterializedView(props.getCombinedProperties().toProperties(), version.getStoreName(), extraViewParameters);
materializedViewTopicName =
internalView.getTopicNamesAndConfigsForVersion(version.getNumber()).keySet().stream().findAny().get();
this.clock = clock;
this.time = time;
this.veniceWriter = Lazy.of(
() -> new VeniceWriterFactory(props.getCombinedProperties().toProperties(), pubSubProducerAdapterFactory, null)
.createVeniceWriter(buildWriterOptions()));
}

public MaterializedViewWriter(
VeniceConfigLoader props,
Version version,
Schema keySchema,
Map<String, String> extraViewParameters) {
this(props, version, keySchema, extraViewParameters, Clock.systemUTC());
this(props, version, keySchema, extraViewParameters, SystemTime.INSTANCE);
}

/**
* package private for testing purpose
*/
void setVeniceWriter(VeniceWriter veniceWriter) {
this.veniceWriter = veniceWriter;
this.veniceWriter = Lazy.of(() -> veniceWriter);
}

@Override
Expand All @@ -97,10 +109,11 @@ public CompletableFuture<PubSubProduceResult> processRecord(

@Override
public CompletableFuture<PubSubProduceResult> processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId) {
if (veniceWriter == null) {
initializeVeniceWriter();
if (newValue == null) {
// this is a delete operation
return veniceWriter.get().delete(key, null);
}
return veniceWriter.put(key, newValue.array(), newValueSchemaId);
return veniceWriter.get().put(key, ByteUtils.extractByteArray(newValue), newValueSchemaId);
}

@Override
Expand Down Expand Up @@ -130,13 +143,6 @@ VeniceWriterOptions buildWriterOptions() {
return setProducerOptimizations(internalView.getWriterOptionsBuilder(materializedViewTopicName, version)).build();
}

synchronized private void initializeVeniceWriter() {
if (veniceWriter == null) {
veniceWriter =
new VeniceWriterFactory(props, pubSubProducerAdapterFactory, null).createVeniceWriter(buildWriterOptions());
}
}

/**
* View topic's partitioner and partition count could be different from the VT. In order to ensure we are capturing
* all potential lag in the VT ingestion from the view topic, we will broadcast the low watermark observed from every
Expand All @@ -161,7 +167,7 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic(
broadcastHBLock.lock();
try {
partitionToHeartbeatTimestampMap.put(partition, heartbeatTimestamp);
long now = clock.millis();
long now = time.getMilliseconds();
if (now > lastHBBroadcastTimestamp + DEFAULT_HEARTBEAT_BROADCAST_INTERVAL_MS
&& !partitionToHeartbeatTimestampMap.isEmpty()) {
oldestHeartbeatTimestamp = Collections.min(partitionToHeartbeatTimestampMap.values());
Expand All @@ -179,9 +185,6 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic(
broadcastHBLock.unlock();
}
if (propagate && oldestHeartbeatTimestamp > 0) {
if (veniceWriter == null) {
initializeVeniceWriter();
}
LeaderCompleteState leaderCompleteState =
LeaderCompleteState.getLeaderCompleteState(partitionConsumptionState.isCompletionReported());
Set<String> failedPartitions = VeniceConcurrentHashMap.newKeySet();
Expand All @@ -193,14 +196,15 @@ private void maybePropagateHeartbeatLowWatermarkToViewTopic(
// least
// one partition leader has completed.
final int viewPartitionNumber = p;
CompletableFuture<PubSubProduceResult> heartBeatFuture = veniceWriter.sendHeartbeat(
materializedViewTopicName,
viewPartitionNumber,
null,
VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER,
true,
leaderCompleteState,
oldestHeartbeatTimestamp);
CompletableFuture<PubSubProduceResult> heartBeatFuture = veniceWriter.get()
.sendHeartbeat(
materializedViewTopicName,
viewPartitionNumber,
null,
VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER,
true,
leaderCompleteState,
oldestHeartbeatTimestamp);
heartBeatFuture.whenComplete((ignore, throwable) -> {
if (throwable != null) {
completionException.set(new CompletionException(throwable));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.views.MaterializedView;
import com.linkedin.venice.views.VeniceView;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.time.Clock;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -112,11 +112,11 @@ public void testProcessIngestionHeartbeat() {
viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
Map<String, String> viewParamsMap = viewParamsBuilder.build();
VeniceConfigLoader props = getMockProps();
Clock clock = mock(Clock.class);
Time time = mock(Time.class);
long startTime = System.currentTimeMillis();
doReturn(startTime).when(clock).millis();
doReturn(startTime).when(time).getMilliseconds();
MaterializedViewWriter materializedViewWriter =
new MaterializedViewWriter(props, version, SCHEMA, viewParamsMap, clock);
new MaterializedViewWriter(props, version, SCHEMA, viewParamsMap, time);
ControlMessage controlMessage = new ControlMessage();
controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue();
KafkaKey kafkaKey = mock(KafkaKey.class);
Expand All @@ -138,7 +138,7 @@ public void testProcessIngestionHeartbeat() {
.processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState);
verify(veniceWriter, never()).sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong());
long newTime = startTime + TimeUnit.MINUTES.toMillis(4);
doReturn(newTime).when(clock).millis();
doReturn(newTime).when(time).getMilliseconds();
doReturn(startTime + TimeUnit.MINUTES.toMillis(1)).when(producerMetadata).getMessageTimestamp();
materializedViewWriter
.processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState);
Expand All @@ -157,7 +157,7 @@ public void testProcessIngestionHeartbeat() {
Assert.assertEquals(timestamp, Long.valueOf(newTime));
}
newTime = newTime + TimeUnit.SECONDS.toMillis(30);
doReturn(newTime).when(clock).millis();
doReturn(newTime).when(time).getMilliseconds();
doReturn(startTime).when(producerMetadata).getMessageTimestamp();
materializedViewWriter
.processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 2, partitionConsumptionState);
Expand All @@ -170,7 +170,7 @@ public void testProcessIngestionHeartbeat() {
verify(veniceWriter, times(6))
.sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), heartbeatTimestampCaptor.capture());
newTime = newTime + TimeUnit.MINUTES.toMillis(3);
doReturn(newTime).when(clock).millis();
doReturn(newTime).when(time).getMilliseconds();
doReturn(newTime).when(producerMetadata).getMessageTimestamp();
materializedViewWriter
.processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, 1, partitionConsumptionState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,15 +1055,16 @@ private void validateInputDataSchema(String inputDataSchemaString) {

private void configureJobPropertiesWithMaterializedViewConfigs() {
try {
// For now, we only perform view topic writes for basic batch push. No incremental and re-push
if (pushJobSetting.isIncrementalPush || pushJobSetting.isSourceKafka) {
// For now, we only perform view topic writes for basic batch push and re-push. No incremental pushes.
if (pushJobSetting.isIncrementalPush) {
return;
}
StoreResponse storeResponse = ControllerClient.retryableRequest(
controllerClient,
pushJobSetting.controllerRetries,
c -> c.getStore(pushJobSetting.storeName));
Map<String, ViewConfig> viewConfigMap = storeResponse.getStore().getViewConfigs();
Map<String, ViewConfig> viewConfigMap =
storeResponse.getStore().getVersion(pushJobSetting.version).get().getViewConfigs();
viewConfigMap = viewConfigMap.entrySet()
.stream()
.filter(vc -> Objects.equals(vc.getValue().getViewClassName(), MaterializedView.class.getCanonicalName()))
Expand All @@ -1072,7 +1073,7 @@ private void configureJobPropertiesWithMaterializedViewConfigs() {
pushJobSetting.materializedViewConfigFlatMap = ViewUtils.flatViewConfigMapString(viewConfigMap);
}
} catch (Exception e) {
throw new VeniceException("Failed to configure job properties with view configs");
throw new VeniceException("Failed to configure job properties with view configs", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.linkedin.venice.jobs.DataWriterComputeJob;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
Expand Down Expand Up @@ -882,6 +883,9 @@ private void configureControllerClientMock(
when(storeResponse.getStore()).thenReturn(storeInfo);
when(storeInfo.getCompressionStrategy()).thenReturn(
CompressionStrategy.valueOf(props.getProperty(COMPRESSION_STRATEGY, CompressionStrategy.NO_OP.toString())));
Version version = mock(Version.class);
when(version.getViewConfigs()).thenReturn(Collections.emptyMap());
when(storeInfo.getVersion(anyInt())).thenReturn(Optional.of(version));

SchemaResponse keySchemaResponse = mock(SchemaResponse.class);
when(keySchemaResponse.isError()).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,11 @@ private StoreInfo getStoreInfo(Consumer<StoreInfo> info, boolean applyFirst) {
if (applyFirst) {
info.accept(storeInfo);
}
Version version = new VersionImpl(TEST_STORE, REPUSH_VERSION, TEST_PUSH);
version.setHybridStoreConfig(storeInfo.getHybridStoreConfig());
storeInfo.setVersions(Collections.singletonList(version));
if (storeInfo.getVersions() == null) {
Version version = new VersionImpl(TEST_STORE, REPUSH_VERSION, TEST_PUSH);
version.setHybridStoreConfig(storeInfo.getHybridStoreConfig());
storeInfo.setVersions(Collections.singletonList(version));
}
if (!applyFirst) {
info.accept(storeInfo);
}
Expand Down Expand Up @@ -963,6 +965,7 @@ public void testEnableBothTargetRegionConfigs() {
getSpyVenicePushJob(props, getClient());
}

@Test
public void testConfigureWithMaterializedViewConfigs() throws Exception {
Properties properties = getVpjRequiredProperties();
properties.put(KEY_FIELD_PROP, "id");
Expand All @@ -983,8 +986,11 @@ public void testConfigureWithMaterializedViewConfigs() throws Exception {
viewConfigs.put("testView", new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build()));
viewConfigs
.put("dummyView", new ViewConfigImpl(ChangeCaptureView.class.getCanonicalName(), Collections.emptyMap()));
Version version = new VersionImpl(TEST_STORE, 1, TEST_PUSH);
version.setViewConfigs(viewConfigs);
client = getClient(storeInfo -> {
storeInfo.setViewConfigs(viewConfigs);
storeInfo.setVersions(Collections.singletonList(version));
}, true);
doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), eq(null));
try (final VenicePushJob vpj = getSpyVenicePushJob(properties, client)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ public Future<PubSubProduceResult> put(
}

@Override
public Future<PubSubProduceResult> delete(
public CompletableFuture<PubSubProduceResult> delete(
Object key,
PubSubProducerCallback callback,
DeleteMetadata deleteMetadata) {
Expand Down Expand Up @@ -573,7 +573,7 @@ public Future<PubSubProduceResult> put(
}

@Override
public Future<PubSubProduceResult> delete(
public CompletableFuture<PubSubProduceResult> delete(
Object key,
PubSubProducerCallback callback,
DeleteMetadata deleteMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract Future<PubSubProduceResult> put(
PubSubProducerCallback callback,
PutMetadata putMetadata);

public abstract Future<PubSubProduceResult> delete(
public abstract CompletableFuture<PubSubProduceResult> delete(
K key,
PubSubProducerCallback callback,
DeleteMetadata deleteMetadata);
Expand Down
Loading

0 comments on commit 1e7493f

Please sign in to comment.