Skip to content

Commit

Permalink
chore: ensure pending commits order
Browse files Browse the repository at this point in the history
Commits would be added to the chain of pending commits based only on the
rebalancing state. Given that the rebalancing state is updated (via
`ConsumerRebalanceListener.onPartitionsAssigned`) separate from when
pending commits are processed (after `poll`), it could happen that
commits emitted later would be processed before earlier pending ones.

This updates the condition for queueing commits to take into account the
prior existence of pending commits.

In addition, the condition for processing pending commits in `poll` is
also updated to disregard whether a rebalance operation was ongoing at
the start of the poll. Instead, the existence of pending commits along
with a non-`rebalancing` state are a sufficient trigger.

This ensures that rebalance operations that might conclude within a
single consumer poll do not leave behind any pending commits.

At the moment, these possibilities are theoretical as commit operations
are serialized via `KafkaConsumerActor`'s request queue, and don't
happen concurrently to polls. That said, the cost of the fixes is
trivial and being explicit about the conditions may prevent future bugs,
if the surrounding context changes.
  • Loading branch information
biochimia committed Dec 18, 2024
1 parent 68de15e commit 067d60e
Showing 1 changed file with 6 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V](
private[this] def commit(request: Request.Commit[F]): F[Unit] =
ref.flatModify { state =>
val commitF = commitAsync(request.offsets, request.callback)
if (state.rebalancing) {
if (state.rebalancing || state.pendingCommits.nonEmpty) {
val newState = state.withPendingCommit(
commitF >> logging.log(CommittedPendingCommit(request))
)
Expand Down Expand Up @@ -301,7 +301,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V](
}
.flatMap(records)

def handlePoll(newRecords: ConsumerRecords, initialRebalancing: Boolean): F[Unit] = {
def handlePoll(newRecords: ConsumerRecords): F[Unit] = {
def handleBatch(
state: State[F, K, V],
pendingCommits: Option[HandlePollResult.PendingCommits]
Expand Down Expand Up @@ -380,9 +380,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V](
}

def handlePendingCommits(state: State[F, K, V]) = {
val currentRebalancing = state.rebalancing

if (initialRebalancing && !currentRebalancing && state.pendingCommits.nonEmpty) {
if (!state.rebalancing && state.pendingCommits.nonEmpty) {
val newState = state.withoutPendingCommits
(
newState,
Expand Down Expand Up @@ -414,10 +412,9 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V](
ref
.get
.flatMap { state =>
if (state.subscribed && state.streaming) {
val initialRebalancing = state.rebalancing
pollConsumer(state).flatMap(handlePoll(_, initialRebalancing))
} else F.unit
if (state.subscribed && state.streaming)
pollConsumer(state).flatMap(handlePoll(_))
else F.unit
}
}

Expand Down

0 comments on commit 067d60e

Please sign in to comment.