Skip to content

Commit

Permalink
MINOR: reduce allocations in log start and recovery checkpoints (apac…
Browse files Browse the repository at this point in the history
…he#8467)

For brokers with replica counts > 4000, allocations from logsByDir become
substantial. logsByDir is called often by LogManager.checkpointLogRecoveryOffsets
and LogManager.checkpointLogStartOffsets. The approach used is similar to the
one from the checkpointHighwatermarks change in
apache#6741.

Are there better ways to structure out data structure to avoid creating logsByDir on
demand for each checkpoint iteration? This micro-optimization will help as is, but if
we can avoid doing this completely it'd be better.

JMH benchmark results:
```
Before:
Benchmark                                                                      (numPartitions)  (numTopics)   Mode  Cnt        Score        Error   Units
CheckpointBench.measureCheckpointLogStartOffsets                                             3          100  thrpt   15        2.233 ±      0.013  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3          100  thrpt   15      477.097 ±     49.731  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3          100  thrpt   15   246083.007 ±     33.052    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3          100  thrpt   15      475.683 ±     55.569  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3          100  thrpt   15   245474.040 ±  14968.328    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3          100  thrpt   15        0.001 ±      0.001  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3          100  thrpt   15        0.341 ±      0.268    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3          100  thrpt   15      129.000               counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3          100  thrpt   15       52.000                   ms
CheckpointBench.measureCheckpointLogStartOffsets                                             3         1000  thrpt   15        0.572 ±      0.004  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3         1000  thrpt   15     1360.240 ±    150.539  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         1000  thrpt   15  2750221.257 ±    891.024    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3         1000  thrpt   15     1362.908 ±    148.799  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3         1000  thrpt   15  2756395.092 ±  44671.843    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3         1000  thrpt   15        0.017 ±      0.008  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3         1000  thrpt   15       33.611 ±     14.401    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3         1000  thrpt   15      273.000               counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3         1000  thrpt   15      186.000                   ms
CheckpointBench.measureCheckpointLogStartOffsets                                             3         2000  thrpt   15        0.266 ±      0.002  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3         2000  thrpt   15     1342.557 ±    171.260  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         2000  thrpt   15  5877881.729 ±   3695.086    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3         2000  thrpt   15     1343.965 ±    186.069  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3         2000  thrpt   15  5877788.561 ± 168540.343    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3         2000  thrpt   15        0.081 ±      0.043  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3         2000  thrpt   15      351.277 ±    167.006    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3         2000  thrpt   15      253.000               counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3         2000  thrpt   15      231.000                   ms
JMH benchmarks done

After:
CheckpointBench.measureCheckpointLogStartOffsets                                             3          100  thrpt   15        2.809 ±     0.129  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3          100  thrpt   15      211.248 ±    25.953  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3          100  thrpt   15    86533.838 ±  3763.989    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3          100  thrpt   15      211.512 ±    38.669  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3          100  thrpt   15    86228.552 ±  9590.781    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3          100  thrpt   15       ≈ 10⁻³              MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3          100  thrpt   15        0.140 ±     0.111    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3          100  thrpt   15       57.000              counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3          100  thrpt   15       25.000                  ms
CheckpointBench.measureCheckpointLogStartOffsets                                             3         1000  thrpt   15        1.046 ±     0.030  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3         1000  thrpt   15      524.597 ±    74.793  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         1000  thrpt   15   582898.889 ± 37552.262    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3         1000  thrpt   15      519.675 ±    89.754  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3         1000  thrpt   15   576371.150 ± 55972.955    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3         1000  thrpt   15        0.009 ±     0.005  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3         1000  thrpt   15        9.920 ±     5.375    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3         1000  thrpt   15      111.000              counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3         1000  thrpt   15       56.000                  ms
CheckpointBench.measureCheckpointLogStartOffsets                                             3         2000  thrpt   15        0.617 ±     0.007  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3         2000  thrpt   15      573.061 ±    95.931  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         2000  thrpt   15  1092098.004 ± 75140.633    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3         2000  thrpt   15      572.448 ±    97.960  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3         2000  thrpt   15  1091290.460 ± 85946.164    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3         2000  thrpt   15        0.010 ±     0.012  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3         2000  thrpt   15       19.990 ±    24.407    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3         2000  thrpt   15      109.000              counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3         2000  thrpt   15       67.000                  ms
JMH benchmarks done

```

For the 2000 topic, 3 partition case, we see a reduction in normalized allocations from 5877881B/op to 1284190.774B/op, a reduction of 78%.

Some allocation profiles from a mid sized broker follow. I have seen worse, but these
add up to around 3.8% on a broker that saw GC overhead in CPU time of around 30%.
You could argue that this is relatively small, but it seems worthwhile for a low risk change.

![image](https://user-images.githubusercontent.com/252189/79058104-33e91d80-7c1e-11ea-99c9-0cf2e3571e1f.png)
![image](https://user-images.githubusercontent.com/252189/79058105-38add180-7c1e-11ea-8bfd-6e6eafb0c794.png)

Reviewers: Ismael Juma <[email protected]>
  • Loading branch information
lbradstreet authored Apr 25, 2020
1 parent 99b8b51 commit cfc34ca
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
14 changes: 11 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1003,9 +1003,17 @@ class LogManager(logDirs: Seq[File],
/**
* Map of log dir to logs by topic and partitions in that dir
*/
private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
(this.currentLogs.toList ++ this.futureLogs.toList).toMap
.groupBy { case (_, log) => log.parentDir }
def logsByDir: Map[String, Map[TopicPartition, Log]] = {
// This code is called often by checkpoint processes and is written in a way that reduces
// allocations and CPU with many topic partitions.
// When changing this code please measure the changes with org.apache.kafka.jmh.server.CheckpointBench
val byDir = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Log]]()
def addToDir(tp: TopicPartition, log: Log): Unit = {
byDir.getOrElseUpdate(log.parentDir, new mutable.AnyRefMap[TopicPartition, Log]()).put(tp, log)
}
currentLogs.foreachEntry(addToDir)
futureLogs.foreachEntry(addToDir)
byDir
}

// logDir should be an absolute path
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/utils/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
def values: Iterable[V] = pool.values.asScala

def clear(): Unit = { pool.clear() }

def foreachEntry(f: (K, V) => Unit): Unit = {
pool.forEach((k, v) => f(k, v))
}

override def size: Int = pool.size

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@
@Measurement(iterations = 5)
@Fork(3)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
public class HighwatermarkCheckpointBench {
@State(value = Scope.Benchmark)
public class CheckpointBench {

@Param({"100", "1000", "2000"})
public int numTopics;
Expand Down Expand Up @@ -172,4 +172,10 @@ public void tearDown() throws Exception {
public void measureCheckpointHighWatermarks() {
this.replicaManager.checkpointHighWatermarks();
}

@Benchmark
@Threads(1)
public void measureCheckpointLogStartOffsets() {
this.logManager.checkpointLogStartOffsets();
}
}

0 comments on commit cfc34ca

Please sign in to comment.