diff --git a/build.sbt b/build.sbt index e00b5ba9..88c43bf2 100644 --- a/build.sbt +++ b/build.sbt @@ -29,7 +29,7 @@ compileDocumentation := { lazy val rootProject = (project in file(".")) .settings(commonSettings) .settings(publishArtifact := false, name := "ox") - .aggregate(core, examples, kafka, mdcLogback, flowReactiveStreams) + .aggregate(core, examples, kafka, mdcLogback, flowReactiveStreams, cron) lazy val core: Project = (project in file("core")) .settings(commonSettings) @@ -94,6 +94,17 @@ lazy val flowReactiveStreams: Project = (project in file("flow-reactive-streams" ) .dependsOn(core) +lazy val cron: Project = (project in file("cron")) + .settings(commonSettings) + .settings( + name := "cron", + libraryDependencies ++= Seq( + "com.github.alonsodomin.cron4s" %% "cron4s-core" % "0.7.0", + scalaTest + ) + ) + .dependsOn(core % "test->test;compile->compile") + lazy val documentation: Project = (project in file("generated-doc")) // important: it must not be doc/ .enablePlugins(MdocPlugin) .settings(commonSettings) @@ -113,5 +124,6 @@ lazy val documentation: Project = (project in file("generated-doc")) // importan core, kafka, mdcLogback, - flowReactiveStreams + flowReactiveStreams, + cron ) diff --git a/core/src/main/scala/ox/scheduling/Schedule.scala b/core/src/main/scala/ox/scheduling/Schedule.scala index e4e7d1fc..624d8917 100644 --- a/core/src/main/scala/ox/scheduling/Schedule.scala +++ b/core/src/main/scala/ox/scheduling/Schedule.scala @@ -16,6 +16,17 @@ object Schedule: private[scheduling] sealed trait Infinite extends Schedule + /** @param computeNextDuration + * computes time between next invocations of operation. Invocation = 0 represents initialDelay before invoking operation for the first + * time. + */ + private[scheduling] final case class ComputedInfinite( + computeNextDuration: (Int, Option[FiniteDuration]) => FiniteDuration + ) extends Infinite: + def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration = computeNextDuration(invocation, lastDuration) + + override def initialDelay: FiniteDuration = computeNextDuration(0, None) + /** A schedule that represents an initial delay applied before the first invocation of operation being scheduled. Usually used in * combination with other schedules using [[andThen]] * diff --git a/cron/src/main/scala/ox/scheduling/cron/CronSchedule.scala b/cron/src/main/scala/ox/scheduling/cron/CronSchedule.scala new file mode 100644 index 00000000..bdfde97b --- /dev/null +++ b/cron/src/main/scala/ox/scheduling/cron/CronSchedule.scala @@ -0,0 +1,40 @@ +package ox.scheduling.cron + +import cron4s.lib.javatime.* +import cron4s.{Cron, CronExpr, toDateTimeCronOps} +import ox.scheduling.Schedule + +import java.time.LocalDateTime +import java.time.temporal.ChronoUnit +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.{Duration, FiniteDuration} + +/** Methods in this object provide [[Schedule]] based on supplied cron expression. + */ +object CronSchedule: + /** @param expression + * cron expression to parse + * @return + * [[CronSchedule]] from cron expression + * @throws cron4s.Error + * in case of invalid expression + */ + def unsafeFromString(expression: String): Schedule = + fromCronExpr(Cron.unsafeParse(expression)) + + /** @param cron + * [[CronExpr]] to base [[Schedule]] on. + * @return + * [[Schedule]] from cron expression + */ + def fromCronExpr(cron: CronExpr): Schedule = + def computeNext(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration = + val now = LocalDateTime.now() + val next = cron.next(now) + val duration = next.map(n => ChronoUnit.MILLIS.between(now, n)) + duration.map(FiniteDuration.apply(_, TimeUnit.MILLISECONDS)).getOrElse(Duration.Zero) + end computeNext + + Schedule.ComputedInfinite(computeNext) + end fromCronExpr +end CronSchedule diff --git a/cron/src/test/scala/ox/scheduling/cron/CronScheduleTest.scala b/cron/src/test/scala/ox/scheduling/cron/CronScheduleTest.scala new file mode 100644 index 00000000..38451fe4 --- /dev/null +++ b/cron/src/test/scala/ox/scheduling/cron/CronScheduleTest.scala @@ -0,0 +1,49 @@ +package ox.scheduling.cron + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import cron4s.* +import ox.scheduling.{RepeatConfig, repeat} +import scala.concurrent.duration.* +import ox.util.ElapsedTime + +class CronScheduleTest extends AnyFlatSpec with Matchers with ElapsedTime: + behavior of "repeat with cron schedule" + + it should "repeat a function every second" in { + // given + val cronExpr = Cron.unsafeParse("* * * ? * *") // every second + val cronSchedule = CronSchedule.fromCronExpr(cronExpr) + + var counter = 0 + + def f = + if counter > 0 then throw new RuntimeException("boom") + else counter += 1 + + // when + val (ex, elapsedTime) = measure(the[RuntimeException] thrownBy repeat(RepeatConfig(cronSchedule))(f)) + + // then + ex.getMessage shouldBe "boom" + counter shouldBe 1 + elapsedTime.toMillis should be < 2200L // Run 2 times, so at most 2 secs - 200ms for tolerance + } + + it should "provide initial delay" in { + // give + val cronExpr = Cron.unsafeParse("* * * ? * *") // every second + val cronSchedule = CronSchedule.fromCronExpr(cronExpr) + + def f = + throw new RuntimeException("boom") + + // when + val (ex, elapsedTime) = measure(the[RuntimeException] thrownBy repeat(RepeatConfig(cronSchedule))(f)) + + // then + ex.getMessage shouldBe "boom" + elapsedTime.toMillis should be < 1200L // Run 1 time, so at most 1 sec - 200ms for tolerance + elapsedTime.toMillis should be > 0L + } +end CronScheduleTest diff --git a/doc/integrations/cron4s.md b/doc/integrations/cron4s.md new file mode 100644 index 00000000..c7c6a80c --- /dev/null +++ b/doc/integrations/cron4s.md @@ -0,0 +1,69 @@ +# Cron scheduler + +Dependency: + +```scala +"com.softwaremill.ox" %% "cron" % "@VERSION@" +``` + +This module allows to run schedules based on cron expressions from [cron4s](https://github.com/alonsodomin/cron4s). + +`CronSchedule` can be used in all places that requires `Schedule` especially in repeat scenarios. + +For defining `CronExpr` see [cron4s documentation](https://www.alonsodomin.me/cron4s/userguide/index.html). + +## Api + +The cron module exposes methods for creating `Schedule` based on `CronExpr`. + +```scala +import ox.scheduling.cron.* +import cron4s.* + +repeat(RepeatConfig(CronSchedule.unsafeFromString("10-35 2,4,6 * ? * *")))(operation) +``` + +## Operation definition + +Methods from `ox.scheduling.cron.CronSchedule` define `Schedule`, so they can be plugged into `RepeatConfig` and used with `repeat` API. + + +## Configuration + +All configuration beyond `CronExpr` is provided by the `repeat` API. If an error handling within the operation +is needed, you can use a `retry` inside it (see an example below) or use `scheduled` with `CronSchedule` instead of `repeat`, which allows +full customization. + + +## Examples + +```scala mdoc:compile-only +import ox.UnionMode +import ox.scheduling.cron.CronSchedule +import scala.concurrent.duration.* +import ox.resilience.{RetryConfig, retry} +import ox.scheduling.* +import cron4s.* + +def directOperation: Int = ??? +def eitherOperation: Either[String, Int] = ??? +def unionOperation: String | Int = ??? + +val cronExpr: CronExpr = Cron.unsafeParse("10-35 2,4,6 * ? * *") + +// various operation definitions - same syntax +repeat(RepeatConfig(CronSchedule.fromCronExpr(cronExpr)))(directOperation) +repeatEither(RepeatConfig(CronSchedule.fromCronExpr(cronExpr)))(eitherOperation) + +// infinite repeats with a custom strategy +def customStopStrategy: Int => Boolean = ??? +repeat(RepeatConfig(CronSchedule.fromCronExpr(cronExpr), customStopStrategy))(directOperation) + +// custom error mode +repeatWithErrorMode(UnionMode[String])(RepeatConfig(CronSchedule.fromCronExpr(cronExpr)))(unionOperation) + +// repeat with retry inside +repeat(RepeatConfig(CronSchedule.fromCronExpr(cronExpr))) { + retry(RetryConfig.backoff(3, 100.millis))(directOperation) +} +``` diff --git a/doc/utils/retries.md b/doc/utils/retries.md index 622d99b1..6108c0bb 100644 --- a/doc/utils/retries.md +++ b/doc/utils/retries.md @@ -160,7 +160,7 @@ Instance with default configuration can be obtained with `AdaptiveRetry.default` `retry` will attempt to retry an operation if it throws an exception; `retryEither` will additionally retry, if the result is a `Left`. Finally `retryWithErrorMode` is the most flexible, and allows retrying operations using custom failure modes (such as union types). -The methods have an additional parameter, `shouldPayPenaltyCost`, which determines if result `T` should be considered failure in terms of paying cost for retry. Penalty is paid only if it is decided to retry operation, the penalty will not be paid for successful operation. +The methods have an additional parameter, `shouldPayPenaltyCost`, which determines if result `Either[E, T]` should be considered as a failure in terms of paying cost for retry. Penalty is paid only if it is decided to retry operation, the penalty will not be paid for successful operation. ### Examples