Skip to content

Commit

Permalink
[vpj] Dual write push job detials report from VPJ to both parent and …
Browse files Browse the repository at this point in the history
…child region for push job system store replication mode migration
  • Loading branch information
sixpluszero committed Jan 22, 2025
1 parent 3392d62 commit 6214fe9
Showing 1 changed file with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.PushJobCheckpoints;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.ZstdWithDictCompressor;
import com.linkedin.venice.controllerapi.ControllerClient;
Expand Down Expand Up @@ -239,6 +240,7 @@ public class VenicePushJob implements AutoCloseable {

// Mutable state
private ControllerClient controllerClient;
private ControllerClient pushJobDetailsSystemStoreControllerClient;
private ControllerClient kmeSchemaSystemStoreControllerClient;
private ControllerClient livenessHeartbeatStoreControllerClient;
private RunningJob runningJob;
Expand Down Expand Up @@ -1400,6 +1402,18 @@ private void initControllerClient(String storeName, Optional<SSLFactory> sslFact
LOGGER.info("Controller client has already been initialized");
}

if (pushJobDetailsSystemStoreControllerClient == null && pushJobSetting.multiRegion) {
pushJobDetailsSystemStoreControllerClient = getControllerClient(
VeniceSystemStoreUtils.getPushJobDetailsStoreName(),
pushJobSetting.d2Routing,
pushJobSetting.childControllerRegionD2ZkHosts,
controllerD2ZkHost,
sslFactory,
pushJobSetting.controllerRetries);
} else {
LOGGER.info("Push Job details system store child region controller client has already been initialized");
}

if (kmeSchemaSystemStoreControllerClient == null) {
kmeSchemaSystemStoreControllerClient = getControllerClient(
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(),
Expand Down Expand Up @@ -1868,11 +1882,23 @@ private void sendPushJobDetailsToController() {
updatePushJobDetailsWithConfigs();
updatePushJobDetailsWithLivenessHeartbeatException();

/**
* This logic is added to migrate push job details system store from Aggregate mode to Active/Active mode.
* If it is multi-region, it will dual write push job details to parent RT and child region RT.
* If it is single-region, it will only write push job details to child region RT.
*/
sendPushJobDetails(controllerClient);
if (pushJobDetailsSystemStoreControllerClient != null) {
sendPushJobDetails(pushJobDetailsSystemStoreControllerClient);
}
}

private void sendPushJobDetails(ControllerClient client) {
// send push job details to controller
try {
pushJobDetails.reportTimestamp = System.currentTimeMillis();
int version = pushJobSetting.version <= 0 ? UNCREATED_VERSION_NUMBER : pushJobSetting.version;
ControllerResponse response = controllerClient.sendPushJobDetails(
ControllerResponse response = client.sendPushJobDetails(
pushJobSetting.storeName,
version,
pushJobDetailsSerializer.serialize(null, pushJobDetails));
Expand Down

0 comments on commit 6214fe9

Please sign in to comment.