Skip to content

Commit

Permalink
Using zio-direct in Quill ZioJdbcContext
Browse files Browse the repository at this point in the history
  • Loading branch information
deusaquilus committed Feb 16, 2023
1 parent 979711d commit e8a08a4
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 54 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ lazy val `quill-zio` =
Test / fork := true,
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.0.8",
"dev.zio" %% "zio-streams" % "2.0.8"
"dev.zio" %% "zio-streams" % "2.0.8",
"dev.zio" %% "zio-direct" % "1.0.0-RC6",
"dev.zio" %% "zio-direct-streams" % "1.0.0-RC6"
)
)
.dependsOn(`quill-sql` % "compile->compile;test->test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ class CassandraZioContext[+N <: NamingStrategy](val naming: N)
}

private[getquill] def execute(cql: String, prepare: Prepare, csession: CassandraZioSession, fetchSize: Option[Int]) =
/*
val p = prepareRowAndLog(cql, prepare).run
attempt {
fetchSize match {
case Some(value) => p.setPageSize
case None => p
}
}
ZIO.fromCompletionStage(csession.session.executeAsync(p)).await
*/
simpleBlocking {
prepareRowAndLog(cql, prepare)
.mapAttempt { p =>
Expand Down Expand Up @@ -153,6 +163,15 @@ class CassandraZioContext[+N <: NamingStrategy](val naming: N)
rows <- ZIO.attempt(rs.currentPage())
singleRow <- ZIO.attempt(handleSingleResult(cql, rows.asScala.map(row => extractor(row, csession)).toList))
} yield singleRow

/*
val csession = ZIO.service[CassandraZioSession].run
val rs = execute(cql, prepare, csession, Some(1)).run
unsafe {
rows = rs.currentPage()
singleRow = handleSingleResult(cql, rows.asScala.map(row => extractor(row, csession)).toList)
}
*/
}

def executeAction(cql: String, prepare: Prepare = identityPrepare)(info: ExecutionInfo, dc: Runner): CIO[Unit] = simpleBlocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import io.getquill.*
import io.getquill.jdbczio.Quill
import zio.ZIO.attemptBlocking
import zio.ZIO.blocking
import zio.direct._
import zio.direct.core.metaprog.Verify
import zio.Scope

/**
* Quill context that executes JDBC queries inside of ZIO. Unlike most other contexts
Expand Down Expand Up @@ -180,47 +183,52 @@ abstract class ZioJdbcContext[+Dialect <: SqlIdiom, +Naming <: NamingStrategy] e
* </pre>
*/
def transaction[R <: DataSource, A](op: ZIO[R, Throwable, A]): ZIO[R, Throwable, A] = {
blocking(currentConnection.get.flatMap {
// We can just return the op in the case that there is already a connection set on the fiber ref
// because the op is execute___ which will lookup the connection from the fiber ref via onConnection/onConnectionStream
// This will typically happen for nested transactions e.g. transaction(transaction(a *> b) *> c)
case Some(connection) => op
case None =>
val connection = for {
env <- ZIO.service[DataSource]
connection <- scopedBestEffort(attemptBlocking(env.getConnection))
// Get the current value of auto-commit
prevAutoCommit <- attemptBlocking(connection.getAutoCommit)
// Disable auto-commit since we need to be able to roll back. Once everything is done, set it
// to whatever the previous value was.
_ <- ZIO.acquireRelease(attemptBlocking(connection.setAutoCommit(false))) { _ =>
attemptBlocking(connection.setAutoCommit(prevAutoCommit)).orDie
}
_ <- ZIO.acquireRelease(currentConnection.set(Some(connection))) { _ =>
// Note. We are failing the fiber if auto-commit reset fails. For some circumstances this may be too aggresive.
// If the connection pool e.g. Hikari resets this property for a recycled connection anyway doing it here
// might not be necessary
currentConnection.set(None)
}
// Once the `use` of this outer-Scoped is done, rollback the connection if needed
_ <- ZIO.addFinalizerExit {
case Success(_) => blocking(ZIO.succeed(connection.commit()))
case Failure(cause) => blocking(ZIO.succeed(connection.rollback()))
}
} yield ()

ZIO.scoped(connection *> op)
})
defer {
currentConnection.get.run match {
case Some(conn) => op.run
case None =>
ZIO.scoped(defer {
defer {
val env = ZIO.service[DataSource].run
val connection = scopedBestEffort(attemptBlocking(env.getConnection)).run
// Get the current value of auto-commit
val prevAutoCommit = attemptBlocking(connection.getAutoCommit).run
// Disable auto-commit since we need to be able to roll back. Once everything is done, set it
// to whatever the previous value was.
ZIO.acquireRelease(attemptBlocking(connection.setAutoCommit(false))) { _ =>
attemptBlocking(connection.setAutoCommit(prevAutoCommit)).orDie
}.run
ZIO.acquireRelease(currentConnection.set(Some(connection))) { _ =>
// Note. We are failing the fiber if auto-commit reset fails. For some circumstances this may be too aggresive.
// If the connection pool e.g. Hikari resets this property for a recycled connection anyway doing it here
// might not be necessary
currentConnection.set(None)
}.run
ZIO.addFinalizerExit {
case Success(_) => blocking(ZIO.succeed(connection.commit()))
case Failure(cause) => blocking(ZIO.succeed(connection.rollback()))
}.run
}.run
op.run
}).run
}
}
}

private def onConnection[T](qlio: ZIO[Connection, SQLException, T]): ZIO[DataSource, SQLException, T] =
currentConnection.get.flatMap {
case Some(connection) =>
blocking(qlio.provideEnvironment(ZEnvironment(connection)))
case None =>
blocking(qlio.provideLayer(Quill.Connection.acquireScoped))
defer {
currentConnection.get.run match {
case Some(connection) =>
blocking(qlio.provideEnvironment(ZEnvironment(connection))).run
case None =>
blocking(qlio.provideLayer(Quill.Connection.acquireScoped)).run
}
}

def foo(): Unit = {
val iter: ZIO[Scope, Nothing, Iterator[Either[Nothing, Option[Connection]]]] = ZStream.fromZIO(currentConnection.get).toIterator
}

private def onConnectionStream[T](qstream: ZStream[Connection, SQLException, T]): ZStream[DataSource, SQLException, T] =
streamBlocker *> ZStream.fromZIO(currentConnection.get).flatMap {
case Some(connection) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import javax.sql.DataSource
import scala.reflect.ClassTag
import scala.util.Try
import scala.annotation.targetName
import zio.direct._
import zio.direct.stream.{defer => deferStream, _}

abstract class ZioJdbcUnderlyingContext[+Dialect <: SqlIdiom, +Naming <: NamingStrategy] extends ZioContext[Dialect, Naming]
with JdbcContextVerbExecute[Dialect, Naming]
Expand Down Expand Up @@ -106,23 +108,23 @@ abstract class ZioJdbcUnderlyingContext[+Dialect <: SqlIdiom, +Naming <: NamingS
* they can be generalized to Something <: Connection. E.g. `Connection with OtherStuff` generalizes to `Something <: Connection`.
*/
private[getquill] def withoutAutoCommit[R <: Connection, A, E <: Throwable: ClassTag](f: ZIO[R, E, A]): ZIO[R, E, A] = {
for {
conn <- ZIO.service[Connection]
autoCommitPrev = conn.getAutoCommit
r <- ZIO.acquireReleaseWith(sqlEffect(conn))(conn => ZIO.succeed(conn.setAutoCommit(autoCommitPrev))) { conn =>
defer {
val conn = ZIO.service[Connection].run
val autoCommitPrev = conn.getAutoCommit
ZIO.acquireReleaseWith(sqlEffect(conn))(conn => ZIO.succeed(conn.setAutoCommit(autoCommitPrev))) { conn =>
sqlEffect(conn.setAutoCommit(false)).flatMap(_ => f)
}.refineToOrDie[E]
} yield r
}.refineToOrDie[E].run
}
}

private[getquill] def streamWithoutAutoCommit[A](f: ZStream[Connection, Throwable, A]): ZStream[Connection, Throwable, A] = {
for {
conn <- ZStream.service[Connection]
autoCommitPrev = conn.getAutoCommit
r <- ZStream.acquireReleaseWith(ZIO.attempt(conn.setAutoCommit(false)))(_ => {
deferStream {
val conn = ZStream.service[Connection].each
val autoCommitPrev = conn.getAutoCommit
ZStream.acquireReleaseWith(ZIO.attempt(conn.setAutoCommit(false)))(_ => {
ZIO.succeed(conn.setAutoCommit(autoCommitPrev))
}).flatMap(_ => f)
} yield r
}).flatMap(_ => f).each
}
}

def transaction[R <: Connection, A](f: ZIO[R, Throwable, A]): ZIO[R, Throwable, A] = {
Expand Down Expand Up @@ -179,11 +181,12 @@ abstract class ZioJdbcUnderlyingContext[+Dialect <: SqlIdiom, +Naming <: NamingS

val scopedEnv: ZStream[Connection, Throwable, (Connection, PrepareRow, ResultSet)] =
ZStream.scoped {
for {
conn <- ZIO.service[Connection]
ps <- scopedBestEffort(ZIO.attempt(prepareStatement(conn)))
rs <- scopedBestEffort(ZIO.attempt(ps.executeQuery()))
} yield (conn, ps, rs)
defer {
val conn = ZIO.service[Connection].run
val ps = scopedBestEffort(ZIO.attempt(prepareStatement(conn))).run
val rs = scopedBestEffort(ZIO.attempt(ps.executeQuery())).run
(conn, ps, rs)
}
}

val outStream: ZStream[Connection, Throwable, T] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ class Repo[T <: { def id: Int }](ds: DataSource) {
inline def getById(inline id: Int) =
run(query[T].filter(t => t.id == lift(id))).map(_.headOption).provideEnvironment(env)

inline def deleteById(inline id: Int) =
run(query[T].filter(t => t.id == lift(id)).delete).provideEnvironment(env)

inline def insert(inline t: T) =
run(query[T].insertValue(lift(t)).returning(_.id)).provideEnvironment(env)

Expand All @@ -46,6 +49,8 @@ object StructureBasedRepo extends ZIOAppDefault {
joeId <- peopleRepo.insert(joe)
joeNew <- peopleRepo.getById(joeId)
allJoes <- peopleRepo.searchByField(p => p.first == "Joe")
_ <- peopleRepo.deleteById(joeId)
allJoes1 <- peopleRepo.searchByField(p => p.first == "Joe")
_ <-
printLine("==== joe: " + joe) *>
printLine("==== joeId: " + joeId) *>
Expand Down

0 comments on commit e8a08a4

Please sign in to comment.