Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RateLimiter - consider whole operation execution time #251

Merged
merged 11 commits into from
Dec 17, 2024
53 changes: 39 additions & 14 deletions core/src/main/scala/ox/resilience/RateLimiter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,47 @@ package ox.resilience
import scala.concurrent.duration.FiniteDuration
import ox.*

import java.util.concurrent.Semaphore
import scala.annotation.tailrec

/** Rate limiter with a customizable algorithm. Operations can be blocked or dropped, when the rate limit is reached. */
class RateLimiter private (algorithm: RateLimiterAlgorithm):
/** Rate limiter with a customizable algorithm. Operations can be blocked or dropped, when the rate limit is reached. considerOperationTime decides if whole time of execution should be considered or just the start. */
class RateLimiter private (algorithm: RateLimiterAlgorithm, considerOperationTime: Boolean):
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved
private val semaphore = new Semaphore(algorithm.rate)
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved

/** Runs the operation, blocking if the rate limit is reached, until the rate limiter is replenished. */
def runBlocking[T](operation: => T): T =
algorithm.acquire()
operation
if considerOperationTime then
synchronized:
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved
semaphore.acquire()
algorithm.acquire()
val result = operation
semaphore.release()
result
else
algorithm.acquire()
operation

/** Runs or drops the operation, if the rate limit is reached.
*
* @return
* `Some` if the operation has been allowed to run, `None` if the operation has been dropped.
*/
def runOrDrop[T](operation: => T): Option[T] =
if algorithm.tryAcquire() then Some(operation)
if considerOperationTime then
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved
if synchronized:
algorithm.tryAcquire() && semaphore.tryAcquire()
then
val result = operation
semaphore.release()
Some(result)
else None
else if algorithm.tryAcquire() then Some(operation)
else None

end RateLimiter

object RateLimiter:
def apply(algorithm: RateLimiterAlgorithm)(using Ox): RateLimiter =
def apply(algorithm: RateLimiterAlgorithm, considerOperationTime: Boolean = false)(using Ox): RateLimiter =
@tailrec
def update(): Unit =
val waitTime = algorithm.getNextUpdate
Expand All @@ -36,7 +55,7 @@ object RateLimiter:
end update

forkDiscard(update())
new RateLimiter(algorithm)
new RateLimiter(algorithm, considerOperationTime)
end apply

/** Creates a rate limiter using a fixed window algorithm.
Expand All @@ -46,11 +65,13 @@ object RateLimiter:
* @param maxOperations
* Maximum number of operations that are allowed to **start** within a time [[window]].
* @param window
* Interval of time between replenishing the rate limiter. THe rate limiter is replenished to allow up to [[maxOperations]] in the next
* Interval of time between replenishing the rate limiter. The rate limiter is replenished to allow up to [[maxOperations]] in the next
* time window.
* @param considerOperationTime
* Whether to consider whole execution time of operation or just the start.
*/
def fixedWindow(maxOperations: Int, window: FiniteDuration)(using Ox): RateLimiter =
apply(RateLimiterAlgorithm.FixedWindow(maxOperations, window))
def fixedWindow(maxOperations: Int, window: FiniteDuration, considerOperationTime: Boolean = false)(using Ox): RateLimiter =
apply(RateLimiterAlgorithm.FixedWindow(maxOperations, window), considerOperationTime)

/** Creates a rate limiter using a sliding window algorithm.
*
Expand All @@ -60,9 +81,11 @@ object RateLimiter:
* Maximum number of operations that are allowed to **start** within any [[window]] of time.
* @param window
* Length of the window.
* @param considerOperationTime
* Whether to consider whole execution time of operation or just the start.
*/
def slidingWindow(maxOperations: Int, window: FiniteDuration)(using Ox): RateLimiter =
apply(RateLimiterAlgorithm.SlidingWindow(maxOperations, window))
def slidingWindow(maxOperations: Int, window: FiniteDuration, considerOperationTime: Boolean = false)(using Ox): RateLimiter =
apply(RateLimiterAlgorithm.SlidingWindow(maxOperations, window), considerOperationTime)

/** Rate limiter with token/leaky bucket algorithm.
*
Expand All @@ -72,7 +95,9 @@ object RateLimiter:
* Max capacity of tokens in the algorithm, limiting the operations that are allowed to **start** concurrently.
* @param refillInterval
* Interval of time between adding a single token to the bucket.
* @param considerOperationTime
* Whether to consider whole execution time of operation or just the start.
*/
def leakyBucket(maxTokens: Int, refillInterval: FiniteDuration)(using Ox): RateLimiter =
apply(RateLimiterAlgorithm.LeakyBucket(maxTokens, refillInterval))
def leakyBucket(maxTokens: Int, refillInterval: FiniteDuration, considerOperationTime: Boolean = false)(using Ox): RateLimiter =
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved
apply(RateLimiterAlgorithm.LeakyBucket(maxTokens, refillInterval), considerOperationTime)
end RateLimiter
2 changes: 2 additions & 0 deletions core/src/main/scala/ox/resilience/RateLimiterAlgorithm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ trait RateLimiterAlgorithm:
/** Returns the time in nanoseconds that needs to elapse until the next update. It should not modify internal state. */
def getNextUpdate: Long

def rate: Int

end RateLimiterAlgorithm

object RateLimiterAlgorithm:
Expand Down
74 changes: 74 additions & 0 deletions core/src/test/scala/ox/resilience/RateLimiterTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import ox.util.ElapsedTime
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import java.util.concurrent.atomic.AtomicLong
import org.scalatest.{EitherValues, TryValues}
import scala.concurrent.duration.*
import java.util.concurrent.atomic.AtomicReference

class RateLimiterTest extends AnyFlatSpec with Matchers with EitherValues with TryValues with ElapsedTime:

Check failure on line 12 in core/src/test/scala/ox/resilience/RateLimiterTest.scala

View workflow job for this annotation

GitHub Actions / Test report for --

ox.resilience.RateLimiterTest ► bucket RateLimiter should allow to run more long running operations concurrently than max rate when not considering operation's time

Failed test found in: core/target/test-reports/TEST-ox.resilience.RateLimiterTest.xml Error: sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: None was not equal to Some(0)
Raw output
sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: None was not equal to Some(0)
	at org.scalatest.matchers.MatchersHelper$.indicateFailure(MatchersHelper.scala:392)
	at org.scalatest.matchers.should.Matchers.shouldBe(Matchers.scala:7017)
	at org.scalatest.matchers.should.Matchers.shouldBe$(Matchers.scala:1808)
	at ox.resilience.RateLimiterTest.shouldBe(RateLimiterTest.scala:12)
	at ox.resilience.RateLimiterTest.testFun$proxy16$1$$anonfun$1(RateLimiterTest.scala:503)
	at ox.supervised$package$.$anonfun$2(supervised.scala:44)
	at ox.fork$package$.forkUserError$$anonfun$1(fork.scala:87)
	at ox.fork$package$.forkUserError$$anonfun$adapted$1(fork.scala:84)
	at java.base/java.util.concurrent.StructuredTaskScope$SubtaskImpl.run(StructuredTaskScope.java:889)
	at java.base/java.lang.VirtualThread.run(VirtualThread.java:329)
behavior of "fixed rate RateLimiter"

it should "drop operation when rate limit is exceeded" in {
Expand Down Expand Up @@ -467,4 +468,77 @@
}
}

it should "allow to run more long running operations concurrently than max rate when not considering operation's time" in {
supervised:
val rateLimiter = RateLimiter.fixedWindow(2, FiniteDuration(1, "second"))

val operationsRunning = AtomicLong(0L)

def operation =
operationsRunning.updateAndGet(_ + 1)
Thread.sleep(3000L)
operationsRunning.updateAndGet(_ - 1)
0
end operation

var result1: Option[Int] = Some(-1)
var result2: Option[Int] = Some(-1)
var result3: Int = -1
var resultOperations: Long = 0L

// operations with runOrDrop should be dropped while operations with runBlocking should wait
supervised:
forkUserDiscard:
result1 = rateLimiter.runOrDrop(operation)
forkUserDiscard:
result2 = rateLimiter.runOrDrop(operation)
forkUserDiscard:
result3 = rateLimiter.runBlocking(operation)
forkUserDiscard:
// Wait for next window for 3rd operation to start, take number of operations running
Thread.sleep(1500L)
resultOperations = operationsRunning.get()

result1 shouldBe Some(0)
result2 shouldBe Some(0)
result3 shouldBe 0
resultOperations shouldBe 3
}

it should "not allow to run more long running operations concurrently than max rate when considering operation time" in {
supervised:
val rateLimiter = RateLimiter.fixedWindow(2, FiniteDuration(1, "second"), considerOperationTime = true)

val operationsRunning = AtomicLong(0L)

def operation =
operationsRunning.updateAndGet(_ + 1)
Thread.sleep(3000L)
operationsRunning.updateAndGet(_ - 1)
0

var result1: Option[Int] = Some(-1)
var result2: Option[Int] = Some(-1)
var result3: Int = -1
var resultOperations: Long = 0L

// operations with runOrDrop should be dropped while operations with runBlocking should wait
supervised:
forkUserDiscard:
result1 = rateLimiter.runOrDrop(operation)
forkUserDiscard:
result2 = rateLimiter.runOrDrop(operation)
forkUserDiscard:
result3 = rateLimiter.runBlocking(operation)
forkUserDiscard:
// Wait for next window for 3rd operation to start, take number of operations running
Thread.sleep(1500L)
resultOperations = operationsRunning.get()

result1 shouldBe Some(0)
result2 shouldBe Some(0)
result3 shouldBe 0
resultOperations shouldBe 2
}

end RateLimiterTest
11 changes: 6 additions & 5 deletions doc/utils/rate-limiter.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Rate limiter

The rate limiter mechanism allows controlling the rate at which operations are executed. It ensures that at most a certain number of operations are run concurrently within a specified time frame, preventing system overload and ensuring fair resource usage. Note that the implemented limiting mechanism only takes into account the start of execution and not the whole execution of an operation.
The rate limiter mechanism allows controlling the rate at which operations are executed. It ensures that at most a certain number of operations are run concurrently within a specified time frame, preventing system overload and ensuring fair resource usage. Note that you can choose if mechanism takes into account only the start of execution or the whole execution of an operation.

## API

Expand Down Expand Up @@ -33,7 +33,7 @@ The `operation` can be provided directly using a by-name parameter, i.e. `f: =>

## Configuration

The configuration of a `RateLimiter` depends on an underlying algorithm that controls whether an operation can be executed or not. The following algorithms are available:
The configuration of a `RateLimiter` depends flag whether to consider execution time and on an underlying algorithm that controls whether an operation can be executed or not. The following algorithms are available:
- `RateLimiterAlgorithm.FixedWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in fixed windows of `dur` duration.
- `RateLimiterAlgorithm.SlidingWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in any window of time of duration `dur`.
- `RateLimiterAlgorithm.Bucket(maximum: Int, dur: FiniteDuration)` - where `maximum` is the maximum capacity of tokens available in the token bucket algorithm and one token is added each `dur`. It can represent both the leaky bucket algorithm or the token bucket algorithm.
Expand All @@ -42,10 +42,11 @@ The configuration of a `RateLimiter` depends on an underlying algorithm that con

You can use one of the following shorthands to define a Rate Limiter with the corresponding algorithm:

- `RateLimiter.fixedWindow(rate: Int, dur: FiniteDuration)`,
- `RateLimiter.slidingWindow(rate: Int, dur: FiniteDuration)`,
- `RateLimiter.leakyBucket(maximum: Int, dur: FiniteDuration)`,
- `RateLimiter.fixedWindow(rate: Int, dur: FiniteDuration, considerOperationTime: Boolean = false)`,
- `RateLimiter.slidingWindow(rate: Int, dur: FiniteDuration, considerOperationTime: Boolean = false)`,
- `RateLimiter.leakyBucket(maximum: Int, dur: FiniteDuration, considerOperationTime: Boolean = false)`,

These shorthands also allow to define if the whole execution time should be considered.
See the tests in `ox.resilience.*` for more.

## Custom rate limiter algorithms
Expand Down
Loading