Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule based on cron expressions #263

Merged
merged 5 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ compileDocumentation := {
lazy val rootProject = (project in file("."))
.settings(commonSettings)
.settings(publishArtifact := false, name := "ox")
.aggregate(core, examples, kafka, mdcLogback, flowReactiveStreams)
.aggregate(core, examples, kafka, mdcLogback, flowReactiveStreams, cron)

lazy val core: Project = (project in file("core"))
.settings(commonSettings)
Expand Down Expand Up @@ -94,6 +94,17 @@ lazy val flowReactiveStreams: Project = (project in file("flow-reactive-streams"
)
.dependsOn(core)

lazy val cron: Project = (project in file("cron"))
.settings(commonSettings)
.settings(
name := "cron",
libraryDependencies ++= Seq(
"com.github.alonsodomin.cron4s" %% "cron4s-core" % "0.7.0",
scalaTest
)
)
.dependsOn(core % "test->test;compile->compile")

lazy val documentation: Project = (project in file("generated-doc")) // important: it must not be doc/
.enablePlugins(MdocPlugin)
.settings(commonSettings)
Expand All @@ -113,5 +124,6 @@ lazy val documentation: Project = (project in file("generated-doc")) // importan
core,
kafka,
mdcLogback,
flowReactiveStreams
flowReactiveStreams,
cron
)
7 changes: 7 additions & 0 deletions core/src/main/scala/ox/scheduling/Schedule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ object Schedule:

private[scheduling] sealed trait Infinite extends Schedule

private[scheduling] final case class ExternalInfinite(
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved
computeNextDuration: (Int, Option[FiniteDuration]) => FiniteDuration
) extends Infinite:
def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration = computeNextDuration(invocation, lastDuration)

override def initialDelay: FiniteDuration = computeNextDuration(0, None)

/** A schedule that represents an initial delay applied before the first invocation of operation being scheduled. Usually used in
* combination with other schedules using [[andThen]]
*
Expand Down
40 changes: 40 additions & 0 deletions cron/src/main/scala/ox/scheduling/cron/CronSchedule.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ox.scheduling.cron

import cron4s.lib.javatime.*
import cron4s.{Cron, CronExpr, toDateTimeCronOps}
import ox.scheduling.Schedule

import java.time.LocalDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{Duration, FiniteDuration}

/** Methods in this object provide
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved
*/
object CronSchedule:
/** @param expression
* cron expression to parse
* @return
* [[CronSchedule]] from cron expression
* @throws cron4s.Error
* in case of invalid expression
*/
def unsafeFromString(expression: String): Schedule =
fromCronExpr(Cron.unsafeParse(expression))

/** @param cron
* expression to base [[Schedule]] on.
* @return
* [[Schedule]] from cron expression
*/
def fromCronExpr(cron: CronExpr): Schedule =
def computeNext(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration =
val now = LocalDateTime.now()
val next = cron.next(now)
val duration = next.map(n => ChronoUnit.MILLIS.between(now, n))
duration.map(FiniteDuration.apply(_, TimeUnit.MILLISECONDS)).getOrElse(Duration.Zero)
end computeNext

Schedule.ExternalInfinite(computeNext)
end fromCronExpr
end CronSchedule
49 changes: 49 additions & 0 deletions cron/src/test/scala/ox/scheduling/cron/CronScheduleTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ox.scheduling.cron

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import cron4s.*
import ox.scheduling.{RepeatConfig, repeat}
import scala.concurrent.duration.*
import ox.util.ElapsedTime

class CronScheduleTest extends AnyFlatSpec with Matchers with ElapsedTime:
behavior of "repeat with cron schedule"

it should "repeat a function every second" in {
// given
val cronExpr = Cron.unsafeParse("* * * ? * *") // every second
val cronSchedule = CronSchedule.fromCronExpr(cronExpr)

var counter = 0

def f =
if counter > 0 then throw new RuntimeException("boom")
else counter += 1

// when
val (ex, elapsedTime) = measure(the[RuntimeException] thrownBy repeat(RepeatConfig(cronSchedule))(f))

// then
ex.getMessage shouldBe "boom"
counter shouldBe 1
elapsedTime.toMillis should be < 2200L // Run 2 times, so at most 2 secs - 200ms for tolerance
}

it should "provide initial delay" in {
// give
val cronExpr = Cron.unsafeParse("* * * ? * *") // every second
val cronSchedule = CronSchedule.fromCronExpr(cronExpr)

def f =
throw new RuntimeException("boom")

// when
val (ex, elapsedTime) = measure(the[RuntimeException] thrownBy repeat(RepeatConfig(cronSchedule))(f))

// then
ex.getMessage shouldBe "boom"
elapsedTime.toMillis should be < 1200L // Run 1 times, so at most 1 sec - 200ms for tolerance
elapsedTime.toMillis should be > 0L
}
end CronScheduleTest
71 changes: 71 additions & 0 deletions doc/integrations/cron4s.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Cron scheduler

Dependency:

```scala
"com.softwaremill.ox" %% "cron" % "@VERSION@"
```

This module allows to run schedules based on cron expressions from [cron4s](https://github.com/alonsodomin/cron4s).

`CronSchedule` can be used in all places that requires `Schedule` especially in repeat scenarios.

For defining `CronExpr` see [cron4s documentation](https://www.alonsodomin.me/cron4s/userguide/index.html).

## Api

The cron module exposes methods for creating `Schedule` based on `CronExpr`. That `Schedule` can be plugged

```scala
import ox.scheduling.cron.*
import cron4s.*

repeat(RepeatConfig(CronSchedule.unsafeFromString("10-35 2,4,6 * ? * *")))(operation)
```

The API uses `Schedule` underneath, so it can be plugged wherever `Schedule` is needed.

## Operation definition

Methods from `ox.scheduling.cron.CronSchedule` define `Schedule`, so they can be plugged into `RepeatConfig` and used with `repeat` API.


## Configuration

All configuration beyond `CronExpr` is provided by the `repeat` API. If an error handling within the operation
is needed, you can use a `retry` inside it (see an example below) or use `scheduled` with `CronSchedule` instead of `repeat`, which allows
full customization.


## Examples

```scala mdoc:compile-only
import ox.UnionMode
import ox.scheduling.cron.CronSchedule
import scala.concurrent.duration.*
import ox.resilience.{RetryConfig, retry}
import ox.scheduling.*
import cron4s.*

def directOperation: Int = ???
def eitherOperation: Either[String, Int] = ???
def unionOperation: String | Int = ???

val cronExpr: CronExpr = Cron.unsafeParse("10-35 2,4,6 * ? * *")

// various operation definitions - same syntax
repeat(RepeatConfig(CronSchedule.fromCronExpr(cronExpr)))(directOperation)
repeatEither(RepeatConfig(CronSchedule.fromCronExpr(cronExpr)))(eitherOperation)

// infinite repeats with a custom strategy
def customStopStrategy: Int => Boolean = ???
repeat(RepeatConfig(CronSchedule.fromCronExpr(cronExpr), customStopStrategy))(directOperation)

// custom error mode
repeatWithErrorMode(UnionMode[String])(RepeatConfig(CronSchedule.fromCronExpr(cronExpr)))(unionOperation)

// repeat with retry inside
repeat(RepeatConfig(CronSchedule.fromCronExpr(cronExpr))) {
retry(RetryConfig.backoff(3, 100.millis))(directOperation)
}
```
2 changes: 1 addition & 1 deletion doc/utils/retries.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Instance with default configuration can be obtained with `AdaptiveRetry.default`

`retry` will attempt to retry an operation if it throws an exception; `retryEither` will additionally retry, if the result is a `Left`. Finally `retryWithErrorMode` is the most flexible, and allows retrying operations using custom failure modes (such as union types).

The methods have an additional parameter, `shouldPayPenaltyCost`, which determines if result `T` should be considered failure in terms of paying cost for retry. Penalty is paid only if it is decided to retry operation, the penalty will not be paid for successful operation.
The methods have an additional parameter, `shouldPayPenaltyCost`, which determines if result `Either[E, T]` should be considered as a failure in terms of paying cost for retry. Penalty is paid only if it is decided to retry operation, the penalty will not be paid for successful operation.

### Examples

Expand Down