Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vpj] Dual write push job detials report from VPJ to both parent and child region for push job system store replication mode migration #1463

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.PushJobCheckpoints;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.ZstdWithDictCompressor;
import com.linkedin.venice.controllerapi.ControllerClient;
Expand Down Expand Up @@ -239,6 +240,7 @@ public class VenicePushJob implements AutoCloseable {

// Mutable state
private ControllerClient controllerClient;
private ControllerClient pushJobDetailsSystemStoreControllerClient;
private ControllerClient kmeSchemaSystemStoreControllerClient;
private ControllerClient livenessHeartbeatStoreControllerClient;
private RunningJob runningJob;
Expand Down Expand Up @@ -1400,6 +1402,18 @@ private void initControllerClient(String storeName, Optional<SSLFactory> sslFact
LOGGER.info("Controller client has already been initialized");
}

if (pushJobDetailsSystemStoreControllerClient == null && pushJobSetting.multiRegion) {
pushJobDetailsSystemStoreControllerClient = getControllerClient(
VeniceSystemStoreUtils.getPushJobDetailsStoreName(),
pushJobSetting.d2Routing,
pushJobSetting.childControllerRegionD2ZkHosts,
controllerD2ZkHost,
sslFactory,
pushJobSetting.controllerRetries);
} else {
LOGGER.info("Push Job details system store child region controller client has already been initialized");
}

if (kmeSchemaSystemStoreControllerClient == null) {
kmeSchemaSystemStoreControllerClient = getControllerClient(
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(),
Expand Down Expand Up @@ -1868,11 +1882,23 @@ private void sendPushJobDetailsToController() {
updatePushJobDetailsWithConfigs();
updatePushJobDetailsWithLivenessHeartbeatException();

/**
* This logic is added to migrate push job details system store from Aggregate mode to Active/Active mode.
* If it is multi-region, it will dual write push job details to parent RT and child region RT.
* If it is single-region, it will only write push job details to child region RT.
Comment on lines +1887 to +1888
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't parent controller write it to child colo's RT instead? Why do we need to make a controller client to an arbitrary child colo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also stuff in parent controller that emits metrics about push job success/failures that we'll need to handle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you are right...I think I read the code wrong. It probably cannot work alone within VPJ changes. I will probably just change to dual write in parent controller.

*/
sendPushJobDetails(controllerClient);
if (pushJobDetailsSystemStoreControllerClient != null) {
sendPushJobDetails(pushJobDetailsSystemStoreControllerClient);
}
}

private void sendPushJobDetails(ControllerClient client) {
// send push job details to controller
try {
pushJobDetails.reportTimestamp = System.currentTimeMillis();
int version = pushJobSetting.version <= 0 ? UNCREATED_VERSION_NUMBER : pushJobSetting.version;
ControllerResponse response = controllerClient.sendPushJobDetails(
ControllerResponse response = client.sendPushJobDetails(
pushJobSetting.storeName,
version,
pushJobDetailsSerializer.serialize(null, pushJobDetails));
Expand Down
Loading