Skip to content

Commit

Permalink
Merge pull request #590 from 47degrees/fix-traverse
Browse files Browse the repository at this point in the history
Replace implicit traverse behavior with explicit batching
  • Loading branch information
sloshy authored Jan 28, 2022
2 parents f23548b + 5e98932 commit e52274b
Show file tree
Hide file tree
Showing 14 changed files with 301 additions and 173 deletions.
25 changes: 10 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ A library for Simple & Efficient data access in Scala and Scala.js

Add the following dependency to your project's build file.

For Scala 2.11.x and 2.12.x:
For Scala 2.12.x through 3.x:

```scala
"com.47deg" %% "fetch" % "2.1.1"
"com.47deg" %% "fetch" % "3.0.0"
```

Or, if using Scala.js (0.6.x):
Or, if using Scala.js (1.8.x):

```scala
"com.47deg" %%% "fetch" % "2.1.1"
"com.47deg" %%% "fetch" % "3.0.0"
```


Expand Down Expand Up @@ -105,20 +105,15 @@ def fetchString[F[_] : Async](n: Int): Fetch[F, String] =

## Creating a runtime

Since `Fetch` relies on `Concurrent` from the `cats-effect` library, we'll need a runtime for executing our effects. We'll be using `IO` from `cats-effect` to run fetches, but you can use any type that has a `Concurrent` instance.
Since we'll use `IO` from the `cats-effect` library to execute our fetches, we'll need an `IORuntime` for executing our `IO` instances.

For executing `IO`, we need a `ContextShift[IO]` used for running `IO` instances and a `Timer[IO]` that is used for scheduling. Let's go ahead and create them. We'll use a `java.util.concurrent.ScheduledThreadPoolExecutor` with a couple of threads to run our fetches.

```scala
import java.util.concurrent._
import scala.concurrent.ExecutionContext

val executor = new ScheduledThreadPoolExecutor(4)
val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executor)

import cats.effect.unsafe.implicits.global
```scala mdoc:silent
import cats.effect.unsafe.implicits.global //Gives us an IORuntime in places it is normally not provided
```

Normally, in your applications, this is provided by `IOApp`, and you should not need to import this except in limited scenarios such as test environments that do not have Cats Effect integration.
For more information, and particularly on why you would usually not want to make one of these yourself, [see this post by Daniel Spiewak](https://github.com/typelevel/cats-effect/discussions/1562#discussioncomment-254838)

## Creating and running a fetch

Now that we can convert `Int` values to `Fetch[F, String]`, let's try creating a fetch.
Expand Down
23 changes: 7 additions & 16 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ A library for Simple & Efficient data access in Scala and Scala.js

Add the following dependency to your project's build file.

For Scala 2.11.x and 2.12.x:
For Scala 2.12.x through 3.x:

```scala
"com.47deg" %% "fetch" % "@VERSION@"
```

Or, if using Scala.js (0.6.x):
Or, if using Scala.js (1.8.x):

```scala
"com.47deg" %%% "fetch" % "@VERSION@"
Expand Down Expand Up @@ -106,20 +106,15 @@ def fetchString[F[_] : Async](n: Int): Fetch[F, String] =

## Creating a runtime

Since `Fetch` relies on `Concurrent` from the `cats-effect` library, we'll need a runtime for executing our effects. We'll be using `IO` from `cats-effect` to run fetches, but you can use any type that has a `Concurrent` instance.

For executing `IO`, we need a `ContextShift[IO]` used for running `IO` instances and a `Timer[IO]` that is used for scheduling. Let's go ahead and create them. We'll use a `java.util.concurrent.ScheduledThreadPoolExecutor` with a couple of threads to run our fetches.
Since we'll use `IO` from the `cats-effect` library to execute our fetches, we'll need an `IORuntime` for executing our `IO` instances.

```scala mdoc:silent
import java.util.concurrent._
import scala.concurrent.ExecutionContext

val executor = new ScheduledThreadPoolExecutor(4)
val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executor)

import cats.effect.unsafe.implicits.global
import cats.effect.unsafe.implicits.global //Gives us an IORuntime in places it is normally not provided
```

Normally, in your applications, this is provided by `IOApp`, and you should not need to import this except in limited scenarios such as test environments that do not have Cats Effect integration.
For more information, and particularly on why you would usually not want to make one of these yourself, [see this post by Daniel Spiewak](https://github.com/typelevel/cats-effect/discussions/1562#discussioncomment-254838)

## Creating and running a fetch

Now that we can convert `Int` values to `Fetch[F, String]`, let's try creating a fetch.
Expand Down Expand Up @@ -282,10 +277,6 @@ runFetchFourTimesSharedCache.unsafeRunTimed(5.seconds)

As you can see above, the cache will now work between calls and can be used to deduplicate requests over a period of time.
Note that this does not support any kind of automatic cache invalidation, so you will need to keep track of which values you want to re-fetch if you plan on sharing the cache.

```scala mdoc:invisible
executor.shutdownNow()
```
---

For more in-depth information, take a look at our [documentation](https://47degrees.github.io/fetch/docs.html).
Expand Down
3 changes: 2 additions & 1 deletion fetch-examples/src/test/scala/DoobieExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors

import fetch._
import fetch.syntax._

object DatabaseExample {
case class AuthorId(id: Int)
Expand Down Expand Up @@ -150,7 +151,7 @@ class DoobieExample extends AnyWordSpec with Matchers with BeforeAndAfterAll {

"We can fetch multiple authors from the DB in parallel" in {
def fetch[F[_]: Async]: Fetch[F, List[Author]] =
List(1, 2).traverse(Authors.fetchAuthor[F])
List(1, 2).map(Authors.fetchAuthor[F]).batchAll

val io: IO[(Log, List[Author])] = Fetch.runLog[IO](fetch)

Expand Down
3 changes: 2 additions & 1 deletion fetch-examples/src/test/scala/GithubExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import cats.effect._
import cats.syntax.all._
import fetch.{Data, DataSource, Fetch}
import fetch.syntax._
import io.circe._
import io.circe.generic.semiauto._
import org.http4s._
Expand Down Expand Up @@ -204,7 +205,7 @@ class GithubExample extends AnyWordSpec with Matchers {
def fetchOrg[F[_]: Async](org: String) =
for {
repos <- orgRepos(org)
projects <- repos.traverse(fetchProject[F])
projects <- repos.batchAllWith(fetchProject[F])
} yield projects

def fetchOrgStars[F[_]: Async](org: String): Fetch[F, Int] =
Expand Down
32 changes: 20 additions & 12 deletions fetch-examples/src/test/scala/GraphQLExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import atto._, Atto._
import cats.syntax.all._
import cats.data.NonEmptyList
import cats.effect._
import fetch.syntax._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

Expand Down Expand Up @@ -199,14 +200,15 @@ class GraphQLExample extends AnyWordSpec with Matchers {
case RepositoriesQuery(n, name, Some(_), Some(_)) =>
for {
repos <- Repos.fetch(org)
projects <-
repos
.take(n)
.traverse(repo =>
(Languages.fetch(repo), Collaborators.fetch(repo)).mapN { case (ls, cs) =>
Project(name >> Some(repo.name), ls, cs)
}
)
projects <- {
val nRepos = repos.take(n)
val fetches = nRepos.map { repo =>
(Languages.fetch(repo), Collaborators.fetch(repo)).mapN { case (ls, cs) =>
Project(name >> Some(repo.name), ls, cs)
}
}
fetches.batchAll
}
} yield projects

case RepositoriesQuery(n, name, None, None) =>
Expand All @@ -215,16 +217,22 @@ class GraphQLExample extends AnyWordSpec with Matchers {
case RepositoriesQuery(n, name, Some(_), None) =>
for {
repos <- Repos.fetch(org)
projects <- repos.traverse { r =>
Languages.fetch(r).map(ls => Project(name >> Some(r.name), ls, List()))
projects <- {
val fetches = repos.map { r =>
Languages.fetch(r).map(ls => Project(name >> Some(r.name), ls, List()))
}
fetches.batchAll
}
} yield projects

case RepositoriesQuery(n, name, None, Some(_)) =>
for {
repos <- Repos.fetch(org)
projects <- repos.traverse { r =>
Collaborators.fetch(r).map(cs => Project(name >> Some(r.name), List(), cs))
projects <- {
val fetches = repos.map { r =>
Collaborators.fetch(r).map(cs => Project(name >> Some(r.name), List(), cs))
}
fetches.batchAll
}
} yield projects
}
Expand Down
5 changes: 3 additions & 2 deletions fetch-examples/src/test/scala/Http4sExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.scalatest.wordspec.AnyWordSpec
import java.util.concurrent._

import fetch._
import fetch.syntax._

object HttpExample {
case class UserId(id: Int)
Expand Down Expand Up @@ -114,7 +115,7 @@ object HttpExample {
fetchUserById(UserId(id))

def fetchManyUsers[F[_]: Async](ids: List[Int]): Fetch[F, List[User]] =
ids.traverse(i => fetchUserById(UserId(i)))
ids.map(i => fetchUserById(UserId(i))).batchAll

def fetchPosts[F[_]: Async](user: User): Fetch[F, (User, List[Post])] =
fetchPostsForUser(user.id).map(posts => (user, posts))
Expand Down Expand Up @@ -149,7 +150,7 @@ class Http4sExample extends AnyWordSpec with Matchers {
def fetch[F[_]: Async]: Fetch[F, List[(User, List[Post])]] =
for {
users <- fetchManyUsers(List(1, 2))
usersWithPosts <- users.traverse(fetchPosts[F])
usersWithPosts <- users.map(fetchPosts[F]).batchAll
} yield usersWithPosts

val io = Fetch.runLog[IO](fetch)
Expand Down
23 changes: 22 additions & 1 deletion fetch/src/main/scala/fetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ object `package` {
}
} yield result)

override def product[A, B](fa: Fetch[F, A], fb: Fetch[F, B]): Fetch[F, (A, B)] =
override def product[A, B](fa: Fetch[F, A], fb: Fetch[F, B]): Fetch[F, (A, B)] = {
Unfetch[F, (A, B)](for {
fab <- (fa.run, fb.run).tupled
result = fab match {
Expand All @@ -321,6 +321,7 @@ object `package` {
Throw[F, (A, B)](e)
}
} yield result)
}

override def productR[A, B](fa: Fetch[F, A])(fb: Fetch[F, B]): Fetch[F, B] =
Unfetch[F, B](for {
Expand Down Expand Up @@ -360,6 +361,26 @@ object `package` {
def pure[F[_]: Applicative, A](a: A): Fetch[F, A] =
Unfetch(Applicative[F].pure(Done(a)))

/**
* Given a number of fetches, returns all of the results in a `List`. In the event that multiple
* fetches are made to the same data source, this will attempt to batch them together.
*
* This should be used in code that previously relied on the auto-batching behavior of calling
* `traverse` on lists of `Fetch` values.
*/
def batchAll[F[_]: Monad, A](fetches: Fetch[F, A]*): Fetch[F, List[A]] = {
fetches.toList.toNel
.map { nes =>
nes
.map(_.map(Chain.one(_)))
.reduceLeft { (fa, fb) =>
fetchM[F].map2(fa, fb)((a, b) => a ++ b)
}
.map(_.toList)
}
.getOrElse(Fetch.pure[F, List[A]](List.empty))
}

def exception[F[_]: Applicative, A](e: Log => FetchException): Fetch[F, A] =
Unfetch(Applicative[F].pure(Throw[F, A](e)))

Expand Down
11 changes: 11 additions & 0 deletions fetch/src/main/scala/syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package fetch

import cats._
import cats.effect._
import fetch.Fetch

object syntax {

Expand All @@ -38,4 +39,14 @@ object syntax {
def fetch[F[_]: Concurrent]: Fetch[F, B] =
Fetch.error[F, B](a)
}

implicit class FetchSeqBatchSyntax[F[_]: Monad, A](fetches: Seq[Fetch[F, A]]) {

def batchAll: Fetch[F, List[A]] = Fetch.batchAll(fetches: _*)
}

implicit class SeqSyntax[A](val as: Seq[A]) extends AnyVal {

def batchAllWith[F[_]: Monad, B](f: A => Fetch[F, B]) = Fetch.batchAll(as.map(f): _*)
}
}
19 changes: 10 additions & 9 deletions fetch/src/test/scala/FetchBatchingTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import cats.instances.list._
import cats.syntax.all._
import cats.effect._
import fetch._
import fetch.syntax._

import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.{DurationInt, FiniteDuration}
Expand Down Expand Up @@ -152,7 +153,7 @@ class FetchBatchingTests extends FetchSpec {

"A large fetch to a datasource with a maximum batch size is split and executed in sequence" in {
def fetch[F[_]: Concurrent]: Fetch[F, List[Int]] =
List.range(1, 6).traverse(fetchBatchedDataSeq[F])
List.range(1, 6).map(fetchBatchedDataSeq[F]).batchAll

val io = Fetch.runLog[IO](fetch)

Expand All @@ -166,7 +167,7 @@ class FetchBatchingTests extends FetchSpec {

"A large fetch to a datasource with a maximum batch size is split and executed in parallel" in {
def fetch[F[_]: Concurrent]: Fetch[F, List[Int]] =
List.range(1, 6).traverse(fetchBatchedDataPar[F])
List.range(1, 6).map(fetchBatchedDataPar[F]).batchAll

val io = Fetch.runLog[IO](fetch)

Expand All @@ -180,8 +181,8 @@ class FetchBatchingTests extends FetchSpec {

"Fetches to datasources with a maximum batch size should be split and executed in parallel and sequentially when using productR" in {
def fetch[F[_]: Concurrent]: Fetch[F, List[Int]] =
List.range(1, 6).traverse(fetchBatchedDataPar[F]) *>
List.range(1, 6).traverse(fetchBatchedDataSeq[F])
List.range(1, 6).map(fetchBatchedDataPar[F]).batchAll *>
List.range(1, 6).map(fetchBatchedDataSeq[F]).batchAll

val io = Fetch.runLog[IO](fetch)

Expand All @@ -195,8 +196,8 @@ class FetchBatchingTests extends FetchSpec {

"Fetches to datasources with a maximum batch size should be split and executed in parallel and sequentially when using productL" in {
def fetch[F[_]: Concurrent]: Fetch[F, List[Int]] =
List.range(1, 6).traverse(fetchBatchedDataPar[F]) <*
List.range(1, 6).traverse(fetchBatchedDataSeq[F])
List.range(1, 6).map(fetchBatchedDataPar[F]).batchAll <*
List.range(1, 6).map(fetchBatchedDataSeq[F]).batchAll

val io = Fetch.runLog[IO](fetch)

Expand All @@ -210,7 +211,7 @@ class FetchBatchingTests extends FetchSpec {

"A large (many) fetch to a datasource with a maximum batch size is split and executed in sequence" in {
def fetch[F[_]: Concurrent]: Fetch[F, List[Int]] =
List(fetchBatchedDataSeq[F](1), fetchBatchedDataSeq[F](2), fetchBatchedDataSeq[F](3)).sequence
List(1, 2, 3).map(fetchBatchedDataSeq[F]).batchAll

val io = Fetch.runLog[IO](fetch)

Expand All @@ -224,7 +225,7 @@ class FetchBatchingTests extends FetchSpec {

"A large (many) fetch to a datasource with a maximum batch size is split and executed in parallel" in {
def fetch[F[_]: Concurrent]: Fetch[F, List[Int]] =
List(fetchBatchedDataPar[F](1), fetchBatchedDataPar[F](2), fetchBatchedDataPar[F](3)).sequence
List(1, 2, 3).map(fetchBatchedDataPar[F]).batchAll

val io = Fetch.runLog[IO](fetch)

Expand All @@ -247,7 +248,7 @@ class FetchBatchingTests extends FetchSpec {
)

val io = Fetch.runLog[IO](
ids.toList.traverse(fetchBatchedDataBigId[IO])
ids.toList.map(fetchBatchedDataBigId[IO]).batchAll
)

io.map { case (log, result) =>
Expand Down
18 changes: 16 additions & 2 deletions fetch/src/test/scala/FetchReportingTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,25 @@ class FetchReportingTests extends FetchSpec {
}.unsafeToFuture()
}

"Single fetches combined with traverse are run in one round" in {
"Single fetches combined with traverse are NOT run in one round" in {
def fetch[F[_]: Concurrent] =
for {
manies <- many(3) // round 1
ones <- manies.traverse(one[F]) // round 2
ones <- manies.traverse(one[F]) // rounds 2, 3, 4
} yield ones

val io = Fetch.runLog[IO](fetch)

io.map { case (log, result) =>
log.rounds.size shouldEqual 4
}.unsafeToFuture()
}

"Single fetches combined with Fetch.batchAll are run in one round" in {
def fetch[F[_]: Concurrent] =
for {
manies <- many(3) // round 1
ones <- Fetch.batchAll(manies.map(one[F]): _*) // round 2
} yield ones

val io = Fetch.runLog[IO](fetch)
Expand Down
Loading

0 comments on commit e52274b

Please sign in to comment.