diff --git a/core/src/main/scala/ox/resilience/DurationRateLimiterAlgorithm.scala b/core/src/main/scala/ox/resilience/DurationRateLimiterAlgorithm.scala index 6cafa84d..8a13f26e 100644 --- a/core/src/main/scala/ox/resilience/DurationRateLimiterAlgorithm.scala +++ b/core/src/main/scala/ox/resilience/DurationRateLimiterAlgorithm.scala @@ -7,9 +7,18 @@ import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.concurrent.duration.FiniteDuration - +import ox.discard + +/** DurationRateLimiterAlgorithm: decides whether permit for operation can be acquired. Unlike RateLimiterAlgorithm it considers whole + * execution time for an operation. + * + * There is no leakyBucket algorithm implemented because effectively it would result in "max number of operations currently running", which + * can be achieved with single semaphore. + */ object DurationRateLimiterAlgorithm: - /** Fixed window algorithm: allows to run at most `rate` operations in consecutively segments of duration `per`. */ + /** Fixed window algorithm: allows running at most `rate` operations in consecutively segments of duration `per`. Considers whole + * execution time of an operation. Operation spanning more than one window blocks permits in all windows that it spans. + */ case class FixedWindow(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm: private val lastUpdate = new AtomicLong(System.nanoTime()) private val semaphore = new Semaphore(rate) @@ -28,18 +37,20 @@ object DurationRateLimiterAlgorithm: def update(): Unit = val now = System.nanoTime() lastUpdate.set(now) + // We treat running operation in new window the same as a new operation that started in this window, so we replenish permits to: rate - operationsRunning semaphore.release(rate - semaphore.availablePermits() - runningOperations.get()) end update def runOperation[T](operation: => T, permits: Int): T = runningOperations.updateAndGet(_ + permits) - val result = operation - runningOperations.updateAndGet(current => (current - permits).max(0)) - result + try operation + finally runningOperations.updateAndGet(_ - permits).discard end FixedWindow - /** Sliding window algorithm: allows to run at most `rate` operations in the lapse of `per` before current time. */ + /** Sliding window algorithm: allows to run at most `rate` operations in the lapse of `per` before current time. Considers whole execution + * time of an operation. Operation release permit after `per` passed since operation ended. + */ case class SlidingWindow(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm: // stores the timestamp and the number of permits acquired after finishing running operation private val log = new AtomicReference[Queue[(Long, Int)]](Queue[(Long, Int)]()) @@ -70,10 +81,9 @@ object DurationRateLimiterAlgorithm: end getNextUpdate def runOperation[T](operation: => T, permits: Int): T = - val result = operation + try operation // Consider end of operation as a point to release permit after `per` passes - addTimestampToLog(permits) - result + finally addTimestampToLog(permits) def update(): Unit = val now = System.nanoTime() diff --git a/core/src/main/scala/ox/resilience/RateLimiter.scala b/core/src/main/scala/ox/resilience/RateLimiter.scala index 4f86618d..f8d8e53f 100644 --- a/core/src/main/scala/ox/resilience/RateLimiter.scala +++ b/core/src/main/scala/ox/resilience/RateLimiter.scala @@ -2,8 +2,6 @@ package ox.resilience import scala.concurrent.duration.FiniteDuration import ox.* - -import java.util.concurrent.Semaphore import scala.annotation.tailrec /** Rate limiter with a customizable algorithm. Operations can be blocked or dropped, when the rate limit is reached. operationMode decides @@ -52,12 +50,8 @@ object RateLimiter: * Interval of time between replenishing the rate limiter. The rate limiter is replenished to allow up to [[maxOperations]] in the next * time window. */ - def fixedWindow(maxOperations: Int, window: FiniteDuration, operationMode: RateLimiterMode = RateLimiterMode.OperationStart)(using - Ox - ): RateLimiter = - operationMode match - case RateLimiterMode.OperationStart => apply(RateLimiterAlgorithm.FixedWindow(maxOperations, window)) - case RateLimiterMode.OperationDuration => apply(DurationRateLimiterAlgorithm.FixedWindow(maxOperations, window)) + def fixedWindow(maxOperations: Int, window: FiniteDuration)(using Ox): RateLimiter = + apply(RateLimiterAlgorithm.FixedWindow(maxOperations, window)) /** Creates a rate limiter using a sliding window algorithm. * @@ -68,12 +62,8 @@ object RateLimiter: * @param window * Length of the window. */ - def slidingWindow(maxOperations: Int, window: FiniteDuration, operationMode: RateLimiterMode = RateLimiterMode.OperationStart)(using - Ox - ): RateLimiter = - operationMode match - case RateLimiterMode.OperationStart => apply(RateLimiterAlgorithm.SlidingWindow(maxOperations, window)) - case RateLimiterMode.OperationDuration => apply(DurationRateLimiterAlgorithm.SlidingWindow(maxOperations, window)) + def slidingWindow(maxOperations: Int, window: FiniteDuration)(using Ox): RateLimiter = + apply(RateLimiterAlgorithm.SlidingWindow(maxOperations, window)) /** Creates a rate limiter with token/leaky bucket algorithm. * @@ -84,15 +74,31 @@ object RateLimiter: * @param refillInterval * Interval of time between adding a single token to the bucket. */ - def leakyBucket(maxTokens: Int, refillInterval: FiniteDuration)(using - Ox - ): RateLimiter = + def leakyBucket(maxTokens: Int, refillInterval: FiniteDuration)(using Ox): RateLimiter = apply(RateLimiterAlgorithm.LeakyBucket(maxTokens, refillInterval)) -end RateLimiter + /** Creates a rate limiter with duration fixed window algorithm. + * + * Must be run within an [[Ox]] concurrency scope, as a background fork is created, to replenish the rate limiter. + * + * @param maxOperations + * Maximum number of operations that are allowed to **run** (finishing from previous windows or start new) within a time [[window]]. + * @param window + * Length of the window. + */ + def durationFixedWindow(maxOperations: Int, window: FiniteDuration)(using Ox): RateLimiter = + apply(DurationRateLimiterAlgorithm.FixedWindow(maxOperations, window)) -/** Decides if RateLimiter should consider only start of an operation or whole time of execution. - */ -enum RateLimiterMode: - case OperationStart - case OperationDuration + /** Creates a rate limiter using a duration sliding window algorithm. + * + * Must be run within an [[Ox]] concurrency scope, as a background fork is created, to replenish the rate limiter. + * + * @param maxOperations + * Maximum number of operations that are allowed to **run** (start or finishing) within any [[window]] of time. + * @param window + * Length of the window. + */ + def durationSlidingWindow(maxOperations: Int, window: FiniteDuration)(using Ox): RateLimiter = + apply(DurationRateLimiterAlgorithm.SlidingWindow(maxOperations, window)) + +end RateLimiter diff --git a/core/src/main/scala/ox/resilience/RateLimiterAlgorithm.scala b/core/src/main/scala/ox/resilience/RateLimiterAlgorithm.scala index 3a898b91..789b0fd1 100644 --- a/core/src/main/scala/ox/resilience/RateLimiterAlgorithm.scala +++ b/core/src/main/scala/ox/resilience/RateLimiterAlgorithm.scala @@ -38,6 +38,8 @@ trait RateLimiterAlgorithm: end RateLimiterAlgorithm +/** RateLimiterAlgorithm: decides whether permit for operation can be acquired. Considers only start of an operation. + */ object RateLimiterAlgorithm: /** Fixed window algorithm: allows starting at most `rate` operations in consecutively segments of duration `per`. */ case class FixedWindow(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm: diff --git a/core/src/test/scala/ox/resilience/RateLimiterTest.scala b/core/src/test/scala/ox/resilience/RateLimiterTest.scala index 3be9bec1..c5be1b66 100644 --- a/core/src/test/scala/ox/resilience/RateLimiterTest.scala +++ b/core/src/test/scala/ox/resilience/RateLimiterTest.scala @@ -4,11 +4,8 @@ import ox.* import ox.util.ElapsedTime import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers - import java.util.concurrent.atomic.AtomicLong import org.scalatest.{EitherValues, TryValues} -import ox.resilience.RateLimiterMode.OperationDuration - import scala.concurrent.duration.* import java.util.concurrent.atomic.AtomicReference @@ -222,7 +219,7 @@ class RateLimiterTest extends AnyFlatSpec with Matchers with EitherValues with T it should "not allow to run more long running operations concurrently than max rate when considering operation time" in { supervised: - val rateLimiter = RateLimiter.fixedWindow(2, FiniteDuration(1, "second"), OperationDuration) + val rateLimiter = RateLimiter.durationFixedWindow(2, FiniteDuration(1, "second")) def operation = sleep(3.seconds) @@ -392,7 +389,7 @@ class RateLimiterTest extends AnyFlatSpec with Matchers with EitherValues with T it should "not allow to run more operations when operations are still running when considering operation time" in { supervised: - val rateLimiter = RateLimiter.slidingWindow(2, FiniteDuration(1, "second"), OperationDuration) + val rateLimiter = RateLimiter.durationSlidingWindow(2, FiniteDuration(1, "second")) def operation = sleep(3.seconds) @@ -429,7 +426,7 @@ class RateLimiterTest extends AnyFlatSpec with Matchers with EitherValues with T it should "not allow to run more operations when operations are still running in window span when considering operation time" in { supervised: - val rateLimiter = RateLimiter.slidingWindow(3, FiniteDuration(1, "second"), OperationDuration) + val rateLimiter = RateLimiter.durationSlidingWindow(3, FiniteDuration(1, "second")) def longOperation = sleep(3.seconds) diff --git a/doc/utils/rate-limiter.md b/doc/utils/rate-limiter.md index b24537e3..a6f63e22 100644 --- a/doc/utils/rate-limiter.md +++ b/doc/utils/rate-limiter.md @@ -1,6 +1,6 @@ # Rate limiter -The rate limiter mechanism allows controlling the rate at which operations are executed. It ensures that at most a certain number of operations are run concurrently within a specified time frame, preventing system overload and ensuring fair resource usage. Note that you can choose if algorithm takes into account only the start of execution or the whole execution of an operation. +The rate limiter mechanism allows controlling the rate at which operations are executed. It ensures that at most a certain number of operations are run concurrently within a specified time frame, preventing system overload and ensuring fair resource usage. Note that you can choose algorithm that takes into account only the start of execution or the whole execution time of an operation. ## API @@ -34,26 +34,27 @@ The `operation` can be provided directly using a by-name parameter, i.e. `f: => ## Configuration The configuration of a `RateLimiter` depends on an underlying algorithm that controls whether an operation can be executed or not. The following algorithms are available: -- `RateLimiterAlgorithm.FixedWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in fixed windows of `dur` duration. -- `RateLimiterAlgorithm.SlidingWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in any window of time of duration `dur`. -- `RateLimiterAlgorithm.Bucket(maximum: Int, dur: FiniteDuration)` - where `maximum` is the maximum capacity of tokens available in the token bucket algorithm and one token is added each `dur`. It can represent both the leaky bucket algorithm or the token bucket algorithm. -- `DurationRateLimiterAlgorithm.FixedWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations which execution spans fixed windows of `dur` duration. -- `DurationRateLimiterAlgorithm.SlidingWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations which execution spans any window of time of duration `dur`. +- `RateLimiterAlgorithm.FixedWindow(rate: Int, per: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in fixed windows of `per` duration. +- `RateLimiterAlgorithm.SlidingWindow(rate: Int, per: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in any window of time of duration `per`. +- `RateLimiterAlgorithm.Bucket(maximum: Int, per: FiniteDuration)` - where `rate` is the maximum capacity of tokens available in the token bucket algorithm and one token is added each `per`. It can represent both the leaky bucket algorithm or the token bucket algorithm. +- `DurationRateLimiterAlgorithm.FixedWindow(rate: Int, per: FiniteDuration)` - where `rate` is the maximum number of operations which execution spans fixed windows of `per` duration. Considers whole execution time of an operation. Operation spanning more than one window blocks permits in all windows that it spans. +- `DurationRateLimiterAlgorithm.SlidingWindow(rate: Int, per: FiniteDuration)` - where `rate` is the maximum number of operations which execution spans any window of time of duration `per`. Considers whole execution time of an operation. Operation release permit after `per` passed since operation ended. ### API shorthands You can use one of the following shorthands to define a Rate Limiter with the corresponding algorithm: -- `RateLimiter.fixedWindow(rate: Int, dur: FiniteDuration, operationMode: RateLimiterMode = RateLimiterMode.OperationStart)`, -- `RateLimiter.slidingWindow(rate: Int, dur: FiniteDuration, operationMode: RateLimiterMode = RateLimiterMode.OperationStart)`, -- `RateLimiter.leakyBucket(maximum: Int, dur: FiniteDuration)` +- `RateLimiter.fixedWindow(maxOperations: Int, window: FiniteDuration)` +- `RateLimiter.slidingWindow(maxOperations: Int, window: FiniteDuration)` +- `RateLimiter.leakyBucket(maxTokens: Int, refillInterval: FiniteDuration)` +- `RateLimiter.durationFixedWindow(maxOperations: Int, window: FiniteDuration)` +- `RateLimiter.durationSlidingWindow(maxOperations: Int, window: FiniteDuration)` -These shorthands also allow to define if the whole execution time of an operation should be considered. See the tests in `ox.resilience.*` for more. ## Custom rate limiter algorithms -The `RateLimiterAlgorithm` employed by `RateLimiter` can be extended to implement new algorithms or modify existing ones. Its interface is modelled like that of a `Semaphore` although the underlying implementation could be different. For best compatibility with the existing interface of `RateLimiter`, methods `acquire` and `tryAcquire` should offer the same guaranties as Java's `Semaphores`. There is also method `def runOperation[T](operation: => T, permits: Int): T` for cases where considering span of execution may be necessary. +The `RateLimiterAlgorithm` employed by `RateLimiter` can be extended to implement new algorithms or modify existing ones. Its interface is modelled like that of a `Semaphore` although the underlying implementation could be different. For best compatibility with the existing interface of `RateLimiter`, methods `acquire` and `tryAcquire` should offer the same guaranties as Java's `Semaphores`. There is also method `def runOperation[T](operation: => T, permits: Int): T` for cases where considering span of execution may be necessary(see implementations in `DurationRateLimiterAlgorithm`). Additionally, there are two methods employed by the `GenericRateLimiter` for updating its internal state automatically: - `def update(): Unit`: Updates the internal state of the rate limiter to reflect its current situation. Invoked in a background fork repeatedly, when a rate limiter is created.