Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Pass ingestion role change for store version from StoreIngestionTask to AggKafkaConsumerService. #1034

Merged
merged 7 commits into from
Aug 2, 2024

Conversation

haoxu07
Copy link
Contributor

@haoxu07 haoxu07 commented Jun 14, 2024

Summary, imperative, start upper case, don't end with a period

This PR introduced a new class TopicPartitionReplicaRole to bundle information about particular topic partition:

  1. Version role: future/current/backup
  2. Workload type: Active Active or Write Compute due to its nature of heavy processing needed.

And a new config SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED to turn on/off the resubscription.

StoreIngestionTask thread will periodically check the store's version role changed and store version's workload type changed. If changed, it will trigger resubscription by graceful un-subscription/ re-subscription for all partitions to let bottom AggKafkaConsumerService to find corresponding consumer thread pool for them. In the future KafkaConsumerService aware of store version role during subscription, there will be more information with TopicPartitionReplicaRole passed with aggKafkaConsumerService#subscribe.

In the future, we could also bundle more information to this object to pass into KafkaConsumerService to let it do better consumer pool allocation. With proper allocation of consumer thread pool, we could achieve prioritization on certain type of traffic (e.g. we could allocate larger consumer thread for current hybrid leader ingestion).

A unit test was added to verify the subscription/subscription happened for local and remote ingestion, by intentionally triggering version role change during ingestion.

How was this PR tested?

CI test

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

@haoxu07 haoxu07 force-pushed the versionRoleChange branch from 3afbf68 to 1dd891e Compare July 8, 2024 06:24
@haoxu07 haoxu07 force-pushed the versionRoleChange branch from 1dd891e to cbfa5ed Compare July 11, 2024 01:10
@haoxu07 haoxu07 changed the title [WIP] Pass version role change information to AggKafkaConsumerService. Pass version role change information to AggKafkaConsumerService. Jul 11, 2024
@haoxu07 haoxu07 changed the title Pass version role change information to AggKafkaConsumerService. [server] Pass version role change information to AggKafkaConsumerService. Jul 15, 2024
@haoxu07 haoxu07 marked this pull request as ready for review July 15, 2024 07:39
@haoxu07 haoxu07 requested review from gaojieliu and sixpluszero July 15, 2024 22:39
@haoxu07 haoxu07 force-pushed the versionRoleChange branch from 70c739b to 1bf5ad2 Compare July 16, 2024 08:02
@haoxu07 haoxu07 force-pushed the versionRoleChange branch 3 times, most recently from ec468cd to 522d534 Compare July 18, 2024 07:53
Copy link
Contributor

@sixpluszero sixpluszero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Hao for the change! I did one quick pass, and left two comments, overall it looks ok but just want to check with you about the way new class is defined.

@haoxu07 haoxu07 changed the title [server] Pass version role change information to AggKafkaConsumerService. [server] Pass Version role change information to AggKafkaConsumerService. Jul 19, 2024
@haoxu07 haoxu07 changed the title [server] Pass Version role change information to AggKafkaConsumerService. [server] Pass version role change information to AggKafkaConsumerService. Jul 19, 2024
@haoxu07 haoxu07 force-pushed the versionRoleChange branch from 522d534 to c906530 Compare July 23, 2024 17:59
Copy link
Contributor

@gaojieliu gaojieliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code change looks good overall, and I left some minor comments.

Copy link
Contributor

@huangminchn huangminchn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with Hao offline; there are 2 more high level issues to look into:

  1. For batch-only stores, there is no need for resubscription. When future ingestion is done and it becomes current version, it doesn't need any consumer from the current version pool.
  2. Whether there could be a race where SIT might close itself after unsubscribing to all hosted partitions.

@gaojieliu
Copy link
Contributor

gaojieliu commented Jul 25, 2024

For batch-only stores, there is no need for resubscription. When future ingestion is done and it becomes current version, it doesn't need any consumer from the current version pool.

I assume SIT would unsubscribe right after reporting completion for batch-only store, so this action for batch-only stores will be an NO-OP as there won't be active subscriptions.

@haoxu07 haoxu07 force-pushed the versionRoleChange branch 2 times, most recently from ab25abb to 06e3e87 Compare July 25, 2024 18:30
@huangminchn
Copy link
Contributor

For batch-only stores, there is no need for resubscription. When future ingestion is done and it becomes current version, it doesn't need any consumer from the current version pool.

I assume SIT would unsubscribe right after reporting completion for batch-only store, so this action for batch-only stores will be an NO-OP as there won't be active subscriptions.

@gaojieliu I believe the store change listener would still send out the new consumer action (haven't checked the latest diff yet). Could you please also take a look at my comment of the listener? #1034 (comment)

@haoxu07 haoxu07 force-pushed the versionRoleChange branch 6 times, most recently from e0e637c to baea335 Compare July 30, 2024 00:48
@haoxu07 haoxu07 changed the title [server] Pass version role change information to AggKafkaConsumerService. [server] Pass ingestion role change for store version from StoreIngestionTask to AggKafkaConsumerService. Jul 30, 2024
@haoxu07 haoxu07 force-pushed the versionRoleChange branch from baea335 to f64f330 Compare July 30, 2024 19:07
@haoxu07 haoxu07 force-pushed the versionRoleChange branch 2 times, most recently from dca8485 to 425e0dc Compare August 1, 2024 01:54
@haoxu07 haoxu07 force-pushed the versionRoleChange branch from 86e4edb to 81c5aef Compare August 1, 2024 23:18
@haoxu07 haoxu07 force-pushed the versionRoleChange branch from 81c5aef to aa931ff Compare August 1, 2024 23:33
@haoxu07 haoxu07 force-pushed the versionRoleChange branch from aa931ff to 166adf7 Compare August 2, 2024 17:47
Copy link
Contributor

@huangminchn huangminchn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hard work Hao!

@haoxu07 haoxu07 merged commit 1cc259a into linkedin:main Aug 2, 2024
32 checks passed
kvargha added a commit to kvargha/venice that referenced this pull request Aug 15, 2024
* Refactor onRecovery and add javadoc for DaVinciRecordTransformer's constructor

* [da-vinci] Added a speedup throttler when DaVinci is bootstrapping current versions (linkedin#1089)

When working with DaVinci customers, we often tune the ingestion params to speed up
the bootstrapping, but we don't want to use the same setup at the steady state since
the high ingestion throughput would affect the performance of the read path.
This PR introduces a way to dynamically switch ingestion throttlers based on the
ingestion status.
Essentially, this new feature would switch to the speedup throttler when it detects
any current version bootstrapping and otherwise, it will continue to use the regular
throttler.
To optimize the bootstrapping perf, we can bump up the following pools together with this feature:
1. Consumer pool.
2. Drainer/writer pool.
As the GC issue is mainly triggered by the ingestion speed, we can safely increase
the pool size.

New configs:
da.vinci.current.version.bootstrapping.speedup.enabled : default false
da.vinci.current.version.bootstrapping.quota.records.per.second : default -1
da.vinci.current.version.bootstrapping.quota.bytes.per.second : default -1

* Reset offset if we need to bootstrap from VT

* [controller] Delete leaking backup versions from repush (linkedin#1084)

If there are still leaking versions due to consecutive repush and intermittent failed push, there could be versions with repushSourceVersion which does not match the current version. This PR will delete those versions after backup retention period.

* [server] Pass ingestion role change for store version from StoreIngestionTask to AggKafkaConsumerService. (linkedin#1034)

This PR introduced a new class TopicPartitionReplicaRole to bundle information about particular topic partition:

1. Version role: future/current/backup
2. Workload type: Active Active or Write Compute due to its nature of heavy processing needed.

And a new config SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED to turn on/off the resubscription.

StoreIngestionTask thread will periodically check the store's version role changed and store version's workload type changed. If changed, it will trigger resubscription by graceful un-subscription/ re-subscription for all partitions to let bottom AggKafkaConsumerService to find corresponding consumer thread pool for them. In the future KafkaConsumerService aware of store version role during subscription, there will be more information with TopicPartitionReplicaRole passed with aggKafkaConsumerService#subscribe.

In the future, we could also bundle more information to this object to pass into KafkaConsumerService to let it do better consumer pool allocation. With proper allocation of consumer thread pool, we could achieve prioritization on certain type of traffic (e.g. we could allocate larger consumer thread for current hybrid leader ingestion).

A unit test was added to verify the subscription/subscription happened for local and remote ingestion, by intentionally triggering version role change during ingestion.

Co-authored-by: Hao Xu <[email protected]>

* Delete classHash after running tests

* [controller] Add retry thread for serviceDiscoveryAnnouncers that fail to register (linkedin#1088)

* Implemented retry thread for serviceDiscoveryAnnouncers that failed to register

* Added test cases for ServiceDiscoveryAnnouncerRetryTask

* Created helper class for register and unregister functions and revised test cases

* Made retryRegisterServiceDiscoveryAnnouncerMS into a config key

* Updated retryRegisterServiceDiscoveryAnnouncerMS from Long to long

* Renamed retryRegisterServiceDiscoveryAnnouncerMS to serviceDiscoveryRegistrationRetryMS

* Type checking for long -> 30L * Time.MS_PER_SECOND

* Created AsyncRetryingServiceDiscoveryAnnouncer class that implements ServiceDiscoveryAnnouncer to refactor code

* Test: Added a check to register() to ensure that the call doesn't throw an exception

* Only start and interrupt thread when queue is not empty + removed extraneous logging

* Removed unused LOGGER in TestServiceDiscoveryAnnouncerRetryTask

* Modified ServiceDiscoveryAnnouncerRetryTask with Nisarg's help

* Implemented exiting retry thread if there are no more failed announcers to register + added testing

* Throw an error if a user tries to use blob transfer with record transformer

* [da-vinci]Fixed a bug in DaVinci bootstrapping mode (linkedin#1097)

Fixed a bug in DaVinci bootstrapping mode

* [test] Fix store migration test flakiness (linkedin#1098)

The flaky test make assertions based on the --migration-status admin command which also polls the child
fabrics for migration status. e.g. store existence and migration flags. However, the endMigration call
only checks and waits on the parent controller's state to be reflected. Therefore there is a race where
the --migration-status can be executed while the child controller is still processing the end migration.
Assertion will fail when such race happens.

* [server] Fixed ServerStoreAclHandler unable to process DICTIONARY query action (linkedin#1095)

* [server] Fixed ServerStoreAclHandler unable to process DICTIONARY query action

In venice server the resource for ServerStoreAclHandler is a version topic for STORAGE and COMPUTE query
actions but it's still a store name if the query action is DICTIONARY. Always processing it as a version
topic will result in an empty string as the resource name.

Added tests with a mock DynamicAccessController to verify basic request parsing and handling for
ServerStoreAclHandler.

* [router] Drain in-flight requests for graceful router shutdown (linkedin#1092)

During router shutdown after D2 denouncement, it waits 30s to drain all in-flight requests, but it might not be enough or deterministic as there could still be in-flight requests between router<->server which would fail when the client is closed. This PR adds a wait and validates that all in-flight requests have been drained.

* [changelog] Add version consumption metric and old offset exception type (linkedin#1020)

* [changelog] Add version consumption metric and old offset exception type

This is at the request of users that are trying to program reactively to old checkpoints and monitor when this may be the case

* [da-vinci][router] Fix for router related issues on DaVinci Blob-Transfer (linkedin#1094)

* [server] Ratio Metric for Max Record Size (linkedin#1077)

* Added a ratio metric `assembled_record_size_ratio` to alert customers when they are nearing the record size limit by providing the percentage of the record limit reached.

* Replaced the metric `assembled_record_value_size_in_bytes` with `assembled_record_size_in_bytes`.

* Stopped counting RMD size towards the `maxRecordSizeBytes` limit, because it's an internal component and a user's write operation should not be failed because of it.

* Renamed the controller config `controller.default.max.record.size.bytes` to just `default.max.record.size.bytes`, which will be shared between server and controller.

* Added the metric `assembled_rmd_size_in_bytes` to track and monitor the RMD size as it contributes to overall record size.

* [controller] Remove stale kill messages from destination cluster before migration (linkedin#1072)

A store migration attempts can fail or be aborted, leading to `KILL` messages
being added to the participant system store. These messages are not removed
automatically, causing subsequent migration attempts to fail during ingestion.

To resolve this, this PR removes the kill message from the participant system
store for the store version being migrated.

* [vpj] Use native Spark readers for reading from Avro and VSON files (linkedin#1036)

Currently, we implement a custom input data source to read Avro and VSON files from HDFS. This input data source isn't optimized for large files (i.e. files larger than HDFS block size) and is not aware of blocks and location of those on HDFS. This PR modifies the input logic to use Spark's native Avro and SequenceFile input formats that should be much more optimal for files larger than the block size on HDFS.

Additionally, internally at LinkedIn, we have another input format that reads from a Spark table. This commit adds a lot of the base code that can be used to add arbitrary Spark dataframes as supported input formats. The remaining gap is the schema validation done before the DataWriterComputeJob.

* [controller] Deprecate ACTIVE_ACTIVE data replication policy (linkedin#1045)

We no longer need the ACTIVE_ACTIVE DRP as we don't consult it for lag measurements.

* [server][dvc] tweak logging for server (linkedin#1055)

* [server][dvc] tweak logging

* [router] NPE fix for ReadRequestThrottler constructor/listener (linkedin#1103)

Set the ReadRequestThrottler.storesThrottlers AtomicReference at the
definition site, containing an empty map, to fully eliminate NPEs, even
if listeners are added before the end of the constructor. Also added a
unit test to repro the issue.

Miscellaneous:

- Reduced accesses to the ReadRequestThrottler.storesThrottlers, to make
  it easier to see whether mutations are within the scope of a lock or
  not.

- Deleted the RoutersClusterManager::unSubscribeRouterCountChangedEvent
  function since it is unused.

* [da-vinci] Open RocksDB after blob transfer is completed (linkedin#1105)

* Reorganized DefaultIngestionBackend to ensure that boobstrap happens with a clean file
* Fix for integration test A

* [server] Add version topic information to topic partition ingestion context for better ingestion debugging. (linkedin#1090)

During ingestion debugging process, version topic information of all topic partitions on one consumer are still very useful. As we need this ingestion speed or last poll time for topic partition of all version topics for debugging slow ingestion issues. The version topic is the destination of DataReceiver inside ConsumptionTask for each PubSubTopicPartition, so here adding a one method for ConsumptionTask to get version topic.

---------

Co-authored-by: Hao Xu <[email protected]>

* Bump Jackson Version to 2.13.4 (linkedin#1109)

Bump Jackson Version to 2.13.4 to resolve ELR failure

Error message: In FasterXML jackson-databind before 2.12.7.1 and in 2.13.x before 2.13.4, resource exhaustion can occur because of a lack of a check in BeanDeserializer._deserializeFromArray to prevent use of deeply nested arrays. An application is vulnerable only with certain customized choices for deserialization.

* [thin-client] Do not print key/value info in log (linkedin#1106)

Do not print key/value info as they could contain PII.

* [vpj][server][samza][admin-tool][controller] Added parent controller discovery through multiple D2 clients (linkedin#1099)

* Added functionality to discoverLeaderController in D2ControllerClient to accept a list of D2Client

* Added constructor to support a list of D2Clients and modified methods accordingly

* Added a constructor to accept a list of d2ZkHosts and tested it

* [test] Add support to specify Zk base path in tests (linkedin#1112)

This will help colocate parent and child controllers on the same Zk in our test setup

* [server] Use chunk manifest schema id for writer when performing updates to chunked records (linkedin#1101)

* [server] Fix A/A partial update on chunked batch pushed key

This is to resolve the ingestion issues we've seen for chunked records in batch push combined with write compute

---------

Co-authored-by: Jialin Liu <[email protected]>

* [server] Fix server side connection tracking and logging (linkedin#1113)

**Switch to channelActive/channelInactive for Accurate Connection Tracking:**
Replaced the use of channelRegistered/channelUnregistered with
channelActive/channelInactive to improve the tracking of active connections.

**Safety Enhancements:** Added safeguards to ensure connections are only tracked if
they have been correctly marked as active. This prevents multiple
channelActive/channelInactive events from being processed for the same
connection and avoids tracking connections that are marked inactive before ever
being marked as active.

**Router Principal Update:** Updated the router principal name from CN=venice-router
to venice-router and switched to using a substring match instead of equals for
more flexible connection tracking from routers.

**Log Notifier Improvement:** Enhanced log statements within LogNotifier for better
clarity and removed redundant logging to streamline output.

* [fast-client] Added fanout size metric for FC to monitor multi-key HAR performance (linkedin#1108)

* [fast-client] Added fanout size metric for FC to monitor multi-key HAR performance

* Make previous public methods private, remove subscribe call inside onRecovery

* Correctly pass DVRT functional interface to initBackend, add todo to store classHash in rocksDB

* Fix spotbugs

* [server] Supporting 4 consumer pools for current version and isolation for AA WC leader. (linkedin#1096)

This PR is mainly for simplifying the logic of consumer pool assignment logic when subscribing one topic partition through PartitionReplicaIngestionContext. Inside KafkaConsumerServiceDelegator, A ConsumerPoolStrategyType is defined to specify consumer pool allocation strategy implementation of ConsumerPoolStrategy.
AggKafkaConsumerService#subscribeConsumerFor will pass PartitionReplicaIngestionContext with VersionRole and WorkloadType information to let CurrentVersionConsumerPoolStrategy pick up the specific consumer pool. Each instance of KakfaConsumerService will be built inside ConsumerPoolStrategy.

CurrentVersionConsumerPoolStrategy will support for 4 different consumer pools with these 4 configs:

SERVER_CONSUMER_POOL_SIZE_FOR_CURRENT_VERSION_AA_WC_LEADER
SERVER_CONSUMER_POOL_SIZE_FOR_NON_CURRENT_VERSION_AA_WC_LEADER
SERVER_CONSUMER_POOL_SIZE_FOR_CURRENT_VERSION_NON_AA_WC_LEADER
SERVER_CONSUMER_POOL_SIZE_FOR_NON_CURRENT_VERSION_NON_AA_WC_LEADER
DefaultConsumerPoolStrategy will support 1 pool per region with original default setting.

AAOrWCLeaderConsumerPoolStrategy will support for 1 more pool with: SERVER_DEDICATED_CONSUMER_POOL_SIZE_FOR_AA_WC_LEADER. Here for config backward-compatibility, we will keep relying on SERVER_DEDICATED_CONSUMER_POOL_FOR_AA_WC_LEADER_ENABLED to turn on this feature, SERVER_CONSUMER_POOL_ALLOCATION_STRATEGY is only useful when SERVER_DEDICATED_CONSUMER_POOL_FOR_AA_WC_LEADER_ENABLED is turned off.

After we validate the CurrentVersionConsumerPoolStrategy, we will try to clean up the configs and rely on SERVER_CONSUMER_POOL_ALLOCATION_STRATEGY to switch strategy.

Besides, this change also enable re-subscritpion based on workload type change.

* [doc] Venice Large Record Value Chunking Diagram (linkedin#1107)

Write-path diagram for large record value chunking in Venice. There is a link to the source diagram on Whimsical, and you'll need to copy the diagram to your account in order to edit it. Also included an svg file copy from Whimsical's experimental feature, but it's kinda glitchy and missing some lines.

* Init DVRT inside SIT, and move DVRT recovery to SIT

* Modify checkout action

* Undo

---------

Co-authored-by: Gaojie Liu <[email protected]>
Co-authored-by: Sourav Maji <[email protected]>
Co-authored-by: Hao Xu <[email protected]>
Co-authored-by: Hao Xu <[email protected]>
Co-authored-by: Tony Chen <[email protected]>
Co-authored-by: Xun Yin <[email protected]>
Co-authored-by: Zac Policzer <[email protected]>
Co-authored-by: Sebastian Infante Murguia <[email protected]>
Co-authored-by: Kai-Sern Lim <[email protected]>
Co-authored-by: Sushant Mane <[email protected]>
Co-authored-by: Nisarg Thakkar <[email protected]>
Co-authored-by: Xin(Adam) Chen <[email protected]>
Co-authored-by: Felix GV <[email protected]>
Co-authored-by: Jialin Liu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants