Skip to content

Commit

Permalink
feat: drop use of FetchRequests
Browse files Browse the repository at this point in the history
Record streams are streamlined by dropping the `FetchRequest` middleman.
Instead, queues of records are lazily created by `KafkaConsumerActor`,
one for each assigned partition.

Enqueueing records uses `Queue.tryOffer`, to avoid blocking the polling
fiber. `KafkaConsumerActor` maintains a separate `spillover` map to hold
records when queues are full. This acts as back-pressure and pauses
polling for "slow" partitions.

`KafkaConsumerActor.poll` takes a more active role in managing assigned
partitions. Namely, it signals the revocation of any partitions that are
not assigned, and drops those from the internal state.

While the `ConsumerRebalanceListener` interface is meant to handle this
state when the consumer faces rebalance operations, explicitly handling
assignments in `poll` caters to manually assigned partitions. In
addition, it ensures `KafkaConsumerActor`'s internal state stays
consistent in the face of race conditions between rebalance operations
and the setup of record streams in `KafkaConsumer`.

The newly introduced `partitionState` (maintaining per-partition queues
and spillover records) bears some resemblance to the former `fetches`
and `records` fields, but differs in some important ways:

- registration of fetch requests involved a query of the current
  consumer assignment, which forced synchronization with the polling
  thread via the use of `withConsumer.blocking`.
- fetch requests would follow the lifcycle of record chunks, and needed
  to be continuously re-added to the state, before new data would be
  polled from a partition. Now, the current assignment forms the basis
  to fetch data from a partition, with spillover records acting as a
  back-pressure signal;
- `records` acted as a holding area, but mainly supported a race
  condition between a new assignment, the start of a new stream, and the
  subsequent registration of fetch requests. `records` was not generally
  used as a spillover area for incoming data.

In the face of multiple streams hooked up to a single consumer, an
inherent race in the old registration of fetch requests meant that each
chunk of records could be added to all or only a subset of the listening
streams. With the new approach, multiple streams will forcefully compete
to take elements from the queue, ensuring each chunk of records goes to
only one stream. (While I do not expect that such use of multiple
streams was a feature, the potential behavior change is noted.)

An internal `StreamId` was previously used to match `FetchRequest`s to
the owning stream. This is no longer used and the ID is dropped.

`KafkaConsumer` previously kept track of its partition assignment, along
with a `Deferred` instances to interrupt partition streams. This is now
handled by `KafkaConsumerActor`, which relies solely on the underlying
consumer to keep track of its current assignment.

Overall, this change should reduce some overhead and latency between the
polling thread and the record streams. It does this by removing layers
through which records must pass, as well as reducing synchronization
between polling and consuming fibers.
  • Loading branch information
biochimia committed Jan 3, 2025
1 parent 41be508 commit 4e0157c
Show file tree
Hide file tree
Showing 3 changed files with 329 additions and 640 deletions.
213 changes: 36 additions & 177 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import fs2.kafka.internal.*
import fs2.kafka.internal.converters.collection.*
import fs2.kafka.internal.syntax.*
import fs2.kafka.internal.KafkaConsumerActor.*
import fs2.kafka.internal.LogEntry.{RevokedPreviousFetch, StoredFetch}

import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetAndTimestamp}
import org.apache.kafka.common.{Metric, MetricName, PartitionInfo, TopicPartition}
Expand Down Expand Up @@ -127,10 +126,8 @@ object KafkaConsumer {

private def createKafkaConsumer[F[_], K, V](
requests: QueueSink[F, Request[F, K, V]],
settings: ConsumerSettings[F, K, V],
actor: KafkaConsumerActor[F, K, V],
fiber: Fiber[F, Throwable, Unit],
streamIdRef: Ref[F, StreamId],
id: Int,
withConsumer: WithConsumer[F],
stopConsumingDeferred: Deferred[F, Unit]
Expand All @@ -139,200 +136,74 @@ object KafkaConsumer {

override def partitionsMapStream
: Stream[F, Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]] = {
val chunkQueue: F[Queue[F, Option[Chunk[CommittableConsumerRecord[F, K, V]]]]] =
Queue.bounded(settings.maxPrefetchBatches - 1)

type PartitionResult =
(Chunk[CommittableConsumerRecord[F, K, V]], FetchCompletedReason)

type PartitionsMap = Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]
type PartitionsMapQueue = Queue[F, Option[PartitionsMap]]

def partitionStream(
streamId: StreamId,
partition: TopicPartition,
assignmentRevoked: F[Unit]
): Stream[F, CommittableConsumerRecord[F, K, V]] = Stream.force {
for {
chunks <- chunkQueue
dequeueDone <- Deferred[F, Unit]
shutdown = F.race(
F.race(
awaitTermination.attempt,
dequeueDone.get
),
F.race(
stopConsumingDeferred.get,
assignmentRevoked
)
)
.void
stopReqs <- Deferred[F, Unit]
} yield Stream
.eval {
val fetchPartition: F[Unit] = F
.deferred[PartitionResult]
.flatMap { deferred =>
val callback: PartitionResult => F[Unit] =
deferred.complete(_).void

val fetch: F[PartitionResult] = withPermit {
val assigned =
withConsumer.blocking {
_.assignment.contains(partition)
}

def storeFetch: F[Unit] =
actor
.ref
.modify { state =>
val (newState, oldFetches) =
state.withFetch(partition, streamId, callback)
newState ->
(logging.log(StoredFetch(partition, callback, newState)) >>
oldFetches.traverse_ { fetch =>
fetch.completeRevoked(Chunk.empty) >>
logging.log(RevokedPreviousFetch(partition, streamId))
})
}
.flatten

def completeRevoked: F[Unit] =
callback((Chunk.empty, FetchCompletedReason.TopicPartitionRevoked))

assigned.ifM(storeFetch, completeRevoked)
} >> deferred.get

fetch.flatMap { case (chunk, reason) =>
val enqueueChunk = chunks.offer(Some(chunk)).unlessA(chunk.isEmpty)

val completeRevoked =
stopReqs.complete(()).void.whenA(reason.topicPartitionRevoked)

enqueueChunk >> completeRevoked
}
}
partition: TopicPartition
): Stream[F, CommittableConsumerRecord[F, K, V]] =
Stream.force {
actor
.getQueueAndStopSignalFor(partition)
.map { case (chunksQueue, partitionStop) =>
val stopStream: F[Either[Throwable, Unit]] =
F.race(
partitionStop,
F.race(awaitTermination.attempt, stopConsumingDeferred.get)
)
.void
.attempt

Stream
.repeatEval {
stopReqs
.tryGet
.flatMap {
case None =>
fetchPartition

case Some(()) =>
// Prevent issuing additional requests after partition is
// revoked or shutdown happens, in case the stream isn't
// interrupted fast enough
F.unit
}
}
.interruptWhen(F.race(shutdown, stopReqs.get).void.attempt)
.compile
.drain
.guarantee(F.race(dequeueDone.get, chunks.offer(None)).void)
.start
.as {
Stream
.fromQueueNoneTerminated(chunks)
.flatMap(Stream.chunk)
.covary[F]
.onFinalize(dequeueDone.complete(()).void)
}
}
.flatten
}
Stream.fromQueueUnterminated(chunksQueue, 1).unchunks.interruptWhen(stopStream)
}
}

def enqueueAssignment(
streamId: StreamId,
assigned: Map[TopicPartition, Deferred[F, Unit]],
assigned: Set[TopicPartition],
partitionsMapQueue: PartitionsMapQueue
): F[Unit] =
stopConsumingDeferred
.tryGet
.flatMap {
case None =>
val assignment: PartitionsMap = assigned.map { case (partition, finisher) =>
partition -> partitionStream(streamId, partition, finisher.get)
}
val assignment: PartitionsMap = assigned
.view
.map { partition =>
partition -> partitionStream(partition)
}
.toMap

partitionsMapQueue.offer(Some(assignment))

case Some(()) =>
F.unit
}

def onRebalance(
streamId: StreamId,
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
partitionsMapQueue: PartitionsMapQueue
): OnRebalance[F] =
def onRebalance(partitionsMapQueue: PartitionsMapQueue): OnRebalance[F] =
OnRebalance(
onRevoked = revoked => {
for {
finishers <- assignmentRef.modify(_.partition(entry => !revoked.contains(entry._1)))
_ <- finishers.toVector.traverse { case (_, finisher) => finisher.complete(()) }
} yield ()
},
onAssigned = assignedPartitions => {
for {
assignment <- assignedPartitions
.toVector
.traverse { partition =>
Deferred[F, Unit].map(partition -> _)
}
.map(_.toMap)
_ <- assignmentRef.update(_ ++ assignment)
_ <- enqueueAssignment(
streamId = streamId,
assigned = assignment,
partitionsMapQueue = partitionsMapQueue
)
} yield ()
}
onRevoked = _ => F.unit,
onAssigned = assigned => enqueueAssignment(assigned, partitionsMapQueue)
)

def requestAssignment(
streamId: StreamId,
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
partitionsMapQueue: PartitionsMapQueue
): F[Map[TopicPartition, Deferred[F, Unit]]] = {
def requestAssignment(partitionsMapQueue: PartitionsMapQueue): F[Set[TopicPartition]] = {
val assignment = this.assignment(
Some(
onRebalance(
streamId,
assignmentRef,
partitionsMapQueue
)
onRebalance(partitionsMapQueue)
)
)

F.race(awaitTermination.attempt, assignment)
.flatMap {
case Left(_) =>
F.pure(Map.empty)

case Right(assigned) =>
assigned
.toVector
.traverse { partition =>
Deferred[F, Unit].map(partition -> _)
}
.map(_.toMap)
case Left(_) => F.pure(Set.empty)
case Right(assigned) => F.pure(assigned)
}
}

def initialEnqueue(
streamId: StreamId,
assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]],
partitionsMapQueue: PartitionsMapQueue
): F[Unit] =
def initialEnqueue(partitionsMapQueue: PartitionsMapQueue): F[Unit] =
for {
assigned <- requestAssignment(
streamId,
assignmentRef,
partitionsMapQueue
)
_ <- enqueueAssignment(streamId, assigned, partitionsMapQueue)
assigned <- requestAssignment(partitionsMapQueue)
_ <- enqueueAssignment(assigned, partitionsMapQueue)
} yield ()

Stream
Expand All @@ -341,16 +212,7 @@ object KafkaConsumer {
case None =>
for {
partitionsMapQueue <- Stream.eval(Queue.unbounded[F, Option[PartitionsMap]])
streamId <- Stream.eval(streamIdRef.modify(n => (n + 1, n)))
assignmentRef <- Stream
.eval(Ref[F].of(Map.empty[TopicPartition, Deferred[F, Unit]]))
_ <- Stream.eval(
initialEnqueue(
streamId,
assignmentRef,
partitionsMapQueue
)
)
_ <- Stream.eval(initialEnqueue(partitionsMapQueue))
out <- Stream
.fromQueueNoneTerminated(partitionsMapQueue)
.interruptWhen(awaitTermination.attempt)
Expand Down Expand Up @@ -681,7 +543,6 @@ object KafkaConsumer {
requests <- Resource.eval(Queue.unbounded[F, Request[F, K, V]])
polls <- Resource.eval(Queue.bounded[F, Request.Poll[F]](1))
ref <- Resource.eval(Ref.of[F, State[F, K, V]](State.empty))
streamId <- Resource.eval(Ref.of[F, StreamId](0))
dispatcher <- Dispatcher.sequential[F]
stopConsumingDeferred <- Resource.eval(Deferred[F, Unit])
withConsumer <- WithConsumer(mk, settings)
Expand All @@ -702,10 +563,8 @@ object KafkaConsumer {
fiber <- startBackgroundConsumer(requests, polls, actor, settings.pollInterval)
} yield createKafkaConsumer(
requests,
settings,
actor,
fiber,
streamId,
id,
withConsumer,
stopConsumingDeferred
Expand Down
Loading

0 comments on commit 4e0157c

Please sign in to comment.