Skip to content

Commit

Permalink
Add resilience contracts
Browse files Browse the repository at this point in the history
  • Loading branch information
kyay10 committed Nov 15, 2024
1 parent 9ad7dd3 commit beca638
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
@file:JvmName("ComposeCopyKt")
@file:OptIn(ExperimentalContracts::class)

package arrow.optics

import androidx.compose.runtime.MutableState
import androidx.compose.runtime.snapshots.Snapshot
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.update
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.jvm.JvmName

/**
* Modifies the value in this [MutableState]
* by applying the function [block] to the current value.
*/
@Suppress("WRONG_INVOCATION_KIND") // withMutableSnapshot doesn't have a contract
public inline fun <T> MutableState<T>.update(crossinline block: (T) -> T) {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
Snapshot.withMutableSnapshot {
value = block(value)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
@file:OptIn(ExperimentalContracts::class)

package arrow.resilience

import arrow.atomic.Atomic
import arrow.core.Either
import arrow.core.identity
import arrow.core.left
import arrow.core.nonFatalOrThrow
import arrow.core.right
import arrow.resilience.CircuitBreaker.State.Closed
import arrow.resilience.CircuitBreaker.State.HalfOpen
import arrow.resilience.CircuitBreaker.State.Open
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.withContext
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.time.Duration
import kotlin.time.Duration.Companion.nanoseconds
import kotlin.time.TimeMark
Expand Down Expand Up @@ -158,20 +166,34 @@ private constructor(
* Returns a new task that upon execution will execute the given task, but with the protection of this circuit breaker.
* If an exception in [fa] occurs, other than an [ExecutionRejected] exception, it will be rethrown.
*/
public suspend fun <A> protectEither(fa: suspend () -> A): Either<ExecutionRejected, A> =
try {
public suspend fun <A> protectEither(fa: suspend () -> A): Either<ExecutionRejected, A> {
contract {
callsInPlace(fa, InvocationKind.AT_MOST_ONCE)
}
return try {
Either.Right(protectOrThrow(fa))
} catch (e: ExecutionRejected) {
Either.Left(e)
}
}

/**
* Returns a new task that upon execution will execute the given task, but with the protection of this circuit breaker.
* If an exception in [fa] occurs it will be rethrown
*/
public tailrec suspend fun <A> protectOrThrow(fa: suspend () -> A): A =
when (val curr = state.get()) {
is Closed -> markOrResetFailures(Either.catch { fa() })
public tailrec suspend fun <A> protectOrThrow(fa: suspend () -> A): A {
contract {
callsInPlace(fa, InvocationKind.EXACTLY_ONCE)
}
return when (val curr = state.get()) {
is Closed -> {
// This is markOrResetFailures(Either.catch { fa() }), but inlined to make the compiler happy with the contract
try {
markOrResetFailures(fa().right())
} catch (e: Throwable) {
markOrResetFailures(e.nonFatalOrThrow().left())
}
}
is Open -> {
if (curr.expiresAt.hasPassedNow()) {
// The Open state has expired, so we are transition to HalfOpen and attempt to close the CircuitBreaker
Expand All @@ -194,6 +216,7 @@ private constructor(
throw ExecutionRejected("Rejected because the CircuitBreaker is in the HalfOpen state", curr)
}
}
}

/** Function for counting failures in the `Closed` state, triggering the `Open` state if necessary.*/
private tailrec suspend fun <A> markOrResetFailures(result: Either<Throwable, A>): A =
Expand Down Expand Up @@ -245,24 +268,29 @@ private constructor(
resetTimeout: Duration,
awaitClose: CompletableDeferred<Unit>,
lastStartedAt: TimeMark
): A = try {
onHalfOpen.invoke()
task.invoke()
} catch (e: CancellationException) {
// We need to return to Open state, otherwise we get stuck in Half-Open (see https://github.com/monix/monix/issues/1080 )
state.set(Open(state.get().openingStrategy ,lastStartedAt, resetTimeout, awaitClose))
onOpenAndThrow(e)
} catch (e: Throwable) {
// Failed reset, which means we go back in the Open state with new expiry val nextTimeout
val value: Duration = (resetTimeout * exponentialBackoffFactor)
val nextTimeout = if (maxResetTimeout.isFinite() && value > maxResetTimeout) maxResetTimeout else value
state.set(Open(state.get().openingStrategy, timeSource.markNow(), nextTimeout, awaitClose))
onOpenAndThrow(e)
}.also {
// While in HalfOpen only a reset attempt is allowed to update the state, so setting this directly is safe
state.set(Closed(state.get().openingStrategy.resetFailuresCount()))
awaitClose.complete(Unit)
onClosed.invoke()
): A {
contract {
callsInPlace(task, InvocationKind.EXACTLY_ONCE)
}
return try {
onHalfOpen.invoke()
task.invoke()
} catch (e: CancellationException) {
// We need to return to Open state, otherwise we get stuck in Half-Open (see https://github.com/monix/monix/issues/1080 )
state.set(Open(state.get().openingStrategy, lastStartedAt, resetTimeout, awaitClose))
onOpenAndThrow(e)
} catch (e: Throwable) {
// Failed reset, which means we go back in the Open state with new expiry val nextTimeout
val value: Duration = (resetTimeout * exponentialBackoffFactor)
val nextTimeout = if (maxResetTimeout.isFinite() && value > maxResetTimeout) maxResetTimeout else value
state.set(Open(state.get().openingStrategy, timeSource.markNow(), nextTimeout, awaitClose))
onOpenAndThrow(e)
}.also {
// While in HalfOpen only a reset attempt is allowed to update the state, so setting this directly is safe
state.set(Closed(state.get().openingStrategy.resetFailuresCount()))
awaitClose.complete(Unit)
onClosed.invoke()
}
}

private suspend fun onOpenAndThrow(original: Throwable): Nothing {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@file:OptIn(ExperimentalTypeInference::class)
@file:OptIn(ExperimentalTypeInference::class, ExperimentalContracts::class)

package arrow.resilience

Expand All @@ -11,7 +11,7 @@ import arrow.core.merge
import arrow.core.nonFatalOrThrow
import arrow.core.raise.Raise
import arrow.core.raise.either
import arrow.core.raise.fold
import arrow.core.raise.recover
import arrow.core.right
import arrow.core.some
import arrow.resilience.Schedule.Companion.identity
Expand All @@ -21,6 +21,9 @@ import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.delay
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.retry
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.experimental.ExperimentalTypeInference
import kotlin.math.pow
import kotlin.random.Random
Expand Down Expand Up @@ -413,34 +416,45 @@ public fun interface Schedule<in Input, out Output> {
* It will throw the last exception if the [Schedule] is exhausted, and ignores the output of the [Schedule].
*/
public suspend inline fun <reified E: Throwable, A> Schedule<E, *>.retry(
noinline action: suspend () -> A
): A = retry(E::class, action)
action: suspend () -> A
): A {
contract {
callsInPlace(action, InvocationKind.AT_LEAST_ONCE)
}
return retry(E::class, action)
}

/**
* Retries [action] using any [E] that occurred as the input to the [Schedule].
* It will throw the last exception if the [Schedule] is exhausted, and ignores the output of the [Schedule].
*/
public suspend fun <E: Throwable, A> Schedule<E, *>.retry(
@Suppress("LEAKED_IN_PLACE_LAMBDA", "WRONG_INVOCATION_KIND")
public suspend inline fun <E: Throwable, A> Schedule<E, *>.retry(
exceptionClass: KClass<E>,
action: suspend () -> A
): A = retryOrElse(exceptionClass, action) { e, _ -> throw e }
): A {
contract {
callsInPlace(action, InvocationKind.AT_LEAST_ONCE) // because if the schedule is exhausted, it will throw
}
return retryOrElse(exceptionClass, action) { e, _ -> throw e }
}

/**
* Retries [action] using any [E] that occurred as the input to the [Schedule].
* If the [Schedule] is exhausted,
* it will invoke [orElse] with the last exception and the output of the [Schedule] to produce a fallback [Input] value.
*/
public suspend inline fun <reified E: Throwable, Input, Output> Schedule<E, Output>.retryOrElse(
noinline action: suspend () -> Input,
noinline orElse: suspend (Throwable, Output) -> Input
action: suspend () -> Input,
orElse: suspend (Throwable, Output) -> Input
): Input = retryOrElse(E::class, action, orElse)

/**
* Retries [action] using any [E] that occurred as the input to the [Schedule].
* If the [Schedule] is exhausted,
* it will invoke [orElse] with the last exception and the output of the [Schedule] to produce a fallback [Input] value.
*/
public suspend fun <E: Throwable, Input, Output> Schedule<E, Output>.retryOrElse(
public suspend inline fun <E: Throwable, Input, Output> Schedule<E, Output>.retryOrElse(
exceptionClass: KClass<E>,
action: suspend () -> Input,
orElse: suspend (E, Output) -> Input
Expand All @@ -453,8 +467,8 @@ public suspend fun <E: Throwable, Input, Output> Schedule<E, Output>.retryOrElse
* Returns [Either] with the fallback value if the [Schedule] is exhausted, or the successful result of [action].
*/
public suspend inline fun <reified E: Throwable, Input, Output, A> Schedule<E, Output>.retryOrElseEither(
noinline action: suspend () -> Input,
noinline orElse: suspend (E, Output) -> A
action: suspend () -> Input,
orElse: suspend (E, Output) -> A
): Either<A, Input> = retryOrElseEither(E::class, action, orElse)

/**
Expand All @@ -463,7 +477,7 @@ public suspend inline fun <reified E: Throwable, Input, Output, A> Schedule<E, O
* it will invoke [orElse] with the last exception and the output of the [Schedule] to produce a fallback value of [A].
* Returns [Either] with the fallback value if the [Schedule] is exhausted, or the successful result of [action].
*/
public suspend fun <E: Throwable, Input, Output, A> Schedule<E, Output>.retryOrElseEither(
public suspend inline fun <E: Throwable, Input, Output, A> Schedule<E, Output>.retryOrElseEither(
exceptionClass: KClass<E>,
action: suspend () -> Input,
orElse: suspend (E, Output) -> A
Expand Down Expand Up @@ -522,25 +536,22 @@ public suspend inline fun <Error, Result, Output> Raise<Error>.retry(
schedule: Schedule<Error, Output>,
@BuilderInference action: Raise<Error>.() -> Result,
): Result {
contract {
callsInPlace(action, InvocationKind.AT_LEAST_ONCE)
}
var step = schedule.step

while (true) {
currentCoroutineContext().ensureActive()
fold(
action,
recover = { error ->
when (val decision = step(error)) {
is Continue -> {
if (decision.delay != ZERO) delay(decision.delay)
step = decision.step
}

is Done -> raise(error)
recover({ return action(this) }) { error ->
when (val decision = step(error)) {
is Continue -> {
if (decision.delay != ZERO) delay(decision.delay)
step = decision.step
}
},
transform = { result ->
return result
},
)

is Done -> raise(error)
}
}
}
}

0 comments on commit beca638

Please sign in to comment.