Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve existing errors when using transaction #192

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 26 additions & 30 deletions quill-jdbc-zio/src/main/scala/io/getquill/context/ZioJdbc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,63 +74,59 @@ object ZioJdbc {
}
}

implicit class QuillZioDataSourceExt[T](qzio: ZIO[DataSource, Throwable, T]) {
implicit class QuillZioDataSourceExt[T, E](qzio: ZIO[DataSource, E, T]) {

import io.getquill.context.qzio.ImplicitSyntax._
import io.getquill.context.qzio.ImplicitSyntax.*

def implicitDS(implicit implicitEnv: Implicit[DataSource]): ZIO[Any, SQLException, T] =
(for {
def implicitDS(implicit implicitEnv: Implicit[DataSource]): ZIO[Any, E, T] =
for {
q <- qzio.provideEnvironment(ZEnvironment(implicitEnv.env))
} yield q).refineToOrDie[SQLException]
} yield q
}

implicit class QuillZioSomeDataSourceExt[T, R](qzio: ZIO[DataSource with R, Throwable, T])(implicit tag: Tag[R]) {
implicit class QuillZioSomeDataSourceExt[T, E, R](qzio: ZIO[DataSource with R, E, T])(implicit tag: Tag[R]) {

import io.getquill.context.qzio.ImplicitSyntax._
import io.getquill.context.qzio.ImplicitSyntax.*

def implicitSomeDS(implicit implicitEnv: Implicit[DataSource]): ZIO[R, SQLException, T] =
(for {
def implicitSomeDS(implicit implicitEnv: Implicit[DataSource]): ZIO[R, E, T] =
for {
r <- ZIO.environment[R]
q <- qzio
.provideSomeLayer[DataSource](ZLayer.succeedEnvironment(r))
.provideEnvironment(ZEnvironment(implicitEnv.env))
} yield q).refineToOrDie[SQLException]
} yield q
}

implicit class QuillZioExtPlain[T](qzio: ZIO[Connection, Throwable, T]) {
implicit class QuillZioExtPlain[E, T](qzio: ZIO[Connection, E, T]) {

import io.getquill.context.qzio.ImplicitSyntax._
import io.getquill.context.qzio.ImplicitSyntax.*

def onDataSource: ZIO[DataSource, SQLException, T] =
(for {
q <- qzio.provideSomeLayer(Quill.Connection.acquireScoped)
} yield q).refineToOrDie[SQLException]
def onDataSource: ZIO[DataSource, E | SQLException, T] = qzio.provideSomeLayer(Quill.Connection.acquireScoped)

def implicitDS(implicit implicitEnv: Implicit[DataSource]): ZIO[Any, SQLException, T] =
(for {
q <- qzio
.provideSomeLayer(Quill.Connection.acquireScoped)
.provideEnvironment(ZEnvironment(implicitEnv.env))
} yield q).refineToOrDie[SQLException]
def implicitDS(implicit implicitEnv: Implicit[DataSource]): ZIO[Any, E | SQLException, T] =
onDataSource.provideEnvironment(ZEnvironment(implicitEnv.env))
}

implicit class QuillZioExt[T, R](qzio: ZIO[Connection with R, Throwable, T])(implicit tag: Tag[R]) {
implicit class QuillZioExt[T, E, R](qzio: ZIO[Connection with R, E, T])(implicit tag: Tag[R]) {
/**
* Change `Connection` of a QIO to `DataSource with Closeable` by providing a `DataSourceLayer.live` instance
* which will grab a connection from the data-source, perform the QIO operation, and the immediately release the connection.
* This is used for data-sources that have pooled connections e.g. Hikari.
* {{{
* def ds: DataSource with Closeable = ...
* run(query[Person]).onDataSource.provide(Has(ds))
* run(query[Person]).onDataSource.provide(ds)
* }}}
*/
def onSomeDataSource: ZIO[DataSource with R, SQLException, T] =
(for {
def onSomeDataSource: ZIO[DataSource with R, E | SQLException, T] =
for {
r <- ZIO.environment[R]
q <- qzio
.provideSomeLayer[Connection](ZLayer.succeedEnvironment(r))
.provideSomeLayer(Quill.Connection.acquireScoped)
} yield q).refineToOrDie[SQLException]
// This needs to be typed explicitly
z: ZIO[DataSource, E | SQLException, T] =
qzio
.provideSomeLayer[Connection](ZLayer.succeedEnvironment(r))
.provideSomeLayer(Quill.Connection.acquireScoped)
q <- z
} yield q
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.getquill.context.sql.idiom.SqlIdiom
import io.getquill.context.{ContextVerbStream, ExecutionInfo, ProtoContextSecundus}
import zio.Exit.{Failure, Success}
import zio.stream.ZStream
import zio.{FiberRef, Runtime, UIO, Unsafe, ZEnvironment, ZIO}
import zio.{FiberRef, Runtime, Scope, UIO, Unsafe, ZEnvironment, ZIO}

import java.sql.{Array as _, *}
import javax.sql.DataSource
Expand All @@ -23,11 +23,11 @@ import zio.ZIO.blocking
* as a resource dependency which can be provided later (see `ZioJdbc` for helper methods
* that assist in doing this).
*
* The resource dependency itself is just a `Has[Connection]`. Since this is frequently used
* The type `QIO[T]` i.e. Quill-IO has been defined as an alias for `ZIO[Has[Connection], SQLException, T]`.
* The resource dependency itself is just a `Connection`. Since this is frequently used
* The type `QIO[T]` i.e. Quill-IO has been defined as an alias for `ZIO[Connection, SQLException, T]`.
*
* Since in most JDBC use-cases, a connection-pool datasource i.e. Hikari is used it would actually
* be much more useful to interact with `ZIO[Has[DataSource], SQLException, T]`.
* be much more useful to interact with `ZIO[DataSource, SQLException, T]`.
* The extension method `.onDataSource` in `io.getquill.context.ZioJdbc.QuillZioExt` will perform this conversion
* (for even more brevity use `onDS` which is an alias for this method).
* {{
Expand All @@ -41,7 +41,7 @@ import zio.ZIO.blocking
* Runtime.default.unsafeRun(MyZioContext.run(query[Person]).ContextTranslateProtoprovideLayer(zioDS))
* }}
*
* Note however that the one exception to these cases are the `prepare` methods where a `ZIO[Has[Connection], SQLException, PreparedStatement]`
* Note however that the one exception to these cases are the `prepare` methods where a `ZIO[Connection, SQLException, PreparedStatement]`
* is being returned. In those situations the acquire-action-release pattern does not make any sense because the `PrepareStatement`
* is only held open while it's host-connection exists.
*/
Expand Down Expand Up @@ -162,12 +162,12 @@ abstract class ZioJdbcContext[+Dialect <: SqlIdiom, +Naming <: NamingStrategy] e
* Execute instructions in a transaction. For example, to add a Person row to the database and return
* the contents of the Person table immediately after that:
* {{{
* val a = run(query[Person].insert(Person(...)): ZIO[Has[DataSource], SQLException, Long]
* val b = run(query[Person]): ZIO[Has[DataSource], SQLException, Person]
* transaction(a *> b): ZIO[Has[DataSource], SQLException, Person]
* val a = run(query[Person].insert(Person(...)): ZIO[DataSource, SQLException, Long]
* val b = run(query[Person]): ZIO[DataSource, SQLException, Person]
* transaction(a *> b): ZIO[DataSource, SQLException, Person]
* }}}
*
* The order of operations run in the case that a new connection needs to be aquired are as follows:
* The order of operations run in the case that a new connection needs to be acquired are as follows:
* <pre>
* getDS from env,
* acquire-connection,
Expand All @@ -179,14 +179,14 @@ abstract class ZioJdbcContext[+Dialect <: SqlIdiom, +Naming <: NamingStrategy] e
* release-conn
* </pre>
*/
def transaction[R <: DataSource, A](op: ZIO[R, Throwable, A]): ZIO[R, Throwable, A] = {
def transaction[R <: DataSource, E, A](op: ZIO[R, E, A]): ZIO[R, E | SQLException, A] = {
blocking(currentConnection.get.flatMap {
// We can just return the op in the case that there is already a connection set on the fiber ref
// because the op is execute___ which will lookup the connection from the fiber ref via onConnection/onConnectionStream
// This will typically happen for nested transactions e.g. transaction(transaction(a *> b) *> c)
case Some(connection) => op
case Some(_) => op
case None =>
val connection = for {
val connection: ZIO[Scope with DataSource, SQLException, Unit] = (for {
env <- ZIO.service[DataSource]
connection <- scopedBestEffort(attemptBlocking(env.getConnection))
// Get the current value of auto-commit
Expand All @@ -197,7 +197,7 @@ abstract class ZioJdbcContext[+Dialect <: SqlIdiom, +Naming <: NamingStrategy] e
attemptBlocking(connection.setAutoCommit(prevAutoCommit)).orDie
}
_ <- ZIO.acquireRelease(currentConnection.set(Some(connection))) { _ =>
// Note. We are failing the fiber if auto-commit reset fails. For some circumstances this may be too aggresive.
// Note. We are failing the fiber if auto-commit reset fails. For some circumstances this may be too aggressive.
// If the connection pool e.g. Hikari resets this property for a recycled connection anyway doing it here
// might not be necessary
currentConnection.set(None)
Expand All @@ -207,7 +207,7 @@ abstract class ZioJdbcContext[+Dialect <: SqlIdiom, +Naming <: NamingStrategy] e
case Success(_) => blocking(ZIO.succeed(connection.commit()))
case Failure(cause) => blocking(ZIO.succeed(connection.rollback()))
}
} yield ()
} yield ()).refineToOrDie[SQLException]

ZIO.scoped(connection *> op)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,16 @@ abstract class ZioJdbcUnderlyingContext[+Dialect <: SqlIdiom, +Naming <: NamingS
* Note that for ZIO 2.0 since the env is covariant, R can be a subtype of connection because if there are other with-clauses
* they can be generalized to Something <: Connection. E.g. `Connection with OtherStuff` generalizes to `Something <: Connection`.
*/
private[getquill] def withoutAutoCommit[R <: Connection, A, E <: Throwable: ClassTag](f: ZIO[R, E, A]): ZIO[R, E, A] = {
private[getquill] def withoutAutoCommit[R <: Connection, A, E](f: ZIO[R, E, A]): ZIO[R, E | SQLException, A] = {
for {
conn <- ZIO.service[Connection]
autoCommitPrev = conn.getAutoCommit
r <- ZIO.acquireReleaseWith(sqlEffect(conn))(conn => ZIO.succeed(conn.setAutoCommit(autoCommitPrev))) { conn =>
sqlEffect(conn.setAutoCommit(false)).flatMap(_ => f)
}.refineToOrDie[E]
} yield r
result <- ZIO.acquireReleaseWith(sqlEffect(conn))(conn => ZIO.succeed(conn.setAutoCommit(autoCommitPrev))) { conn =>
// type has to be explicitly defined
val innerResult: ZIO[R, E | SQLException, A] = sqlEffect(conn.setAutoCommit(false)) *> f
innerResult
}
} yield result
}

private[getquill] def streamWithoutAutoCommit[A](f: ZStream[Connection, Throwable, A]): ZStream[Connection, Throwable, A] = {
Expand All @@ -125,20 +127,20 @@ abstract class ZioJdbcUnderlyingContext[+Dialect <: SqlIdiom, +Naming <: NamingS
} yield r
}

def transaction[R <: Connection, A](f: ZIO[R, Throwable, A]): ZIO[R, Throwable, A] = {
def transaction[R <: Connection, E, A](f: ZIO[R, E, A]): ZIO[R, E | SQLException, A] = {
ZIO.environment[R].flatMap(env =>
blocking(withoutAutoCommit(
blocking(withoutAutoCommit {
f.onExit {
case Success(_) =>
ZIO.succeed(env.get[Connection].commit())
case Failure(cause) =>
ZIO.succeed(env.get[Connection].rollback()).foldCauseZIO(
sqlEffect(env.get[Connection].rollback()).foldCauseZIO(
// NOTE: cause.flatMap(Cause.die) means wrap up the throwable failures into die failures, can only do if E param is Throwable (can also do .orDie at the end)
rollbackFailCause => ZIO.failCause(cause.flatMap(Cause.die(_, StackTrace.none)) ++ rollbackFailCause),
_ => ZIO.failCause(cause.flatMap(Cause.die(_, StackTrace.none))) // or ZIO.halt(cause).orDie
rollbackFailCause => ZIO.failCause(cause.flatMap(e => Cause.fail(e, StackTrace.none)).stripFailures ++ rollbackFailCause.stripFailures),
_ => ZIO.failCause(cause.flatMap(e => Cause.fail[E](e, StackTrace.none).stripFailures)) // or ZIO.halt(cause).orDie
)
}.provideEnvironment(env)
)))
}))
}

def probingDataSource: Option[DataSource] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,17 @@ trait QuillBaseContext[+Dialect <: SqlIdiom, +Naming <: NamingStrategy] extends
* Execute instructions in a transaction. For example, to add a Person row to the database and return
* the contents of the Person table immediately after that:
* {{{
* val a = run(query[Person].insert(Person(...)): ZIO[Has[DataSource], SQLException, Long]
* val b = run(query[Person]): ZIO[Has[DataSource], SQLException, Person]
* transaction(a *> b): ZIO[Has[DataSource], SQLException, Person]
* val a = run(query[Person].insert(Person(...)): ZIO[DataSource, SQLException, Long]
* val b = run(query[Person]): ZIO[DataSource, SQLException, Person]
* transaction(a *> b): ZIO[DataSource, SQLException, Person]
* }}}
*/
def transaction[R, A](op: ZIO[R, Throwable, A]): ZIO[R, Throwable, A] =
def transaction[R, E ,A](op: ZIO[R, E, A]): ZIO[R, E | SQLException, A] =
dsDelegate.transaction(op).provideSomeEnvironment[R]((env: ZEnvironment[R]) => env.add[DataSource](ds: DataSource))

private def onDS[T](qio: ZIO[DataSource, SQLException, T]): ZIO[Any, SQLException, T] =
private def onDS[T, E](qio: ZIO[DataSource, E, T]): ZIO[Any, E, T] =
qio.provideEnvironment(ZEnvironment(ds))

private def onDSStream[T](qstream: ZStream[DataSource, SQLException, T]): ZStream[Any, SQLException, T] =
private def onDSStream[T, E](qstream: ZStream[DataSource, E, T]): ZStream[Any, E, T] =
qstream.provideEnvironment(ZEnvironment(ds))
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import zio.{ ZIO, ZLayer }
import io.getquill.context.ZioJdbc._
import io.getquill._

import java.sql.Connection
import javax.sql.DataSource

class OnDataSourceSpec extends PeopleZioProxySpec {
Expand Down Expand Up @@ -56,6 +57,15 @@ class OnDataSourceSpec extends PeopleZioProxySpec {

people mustEqual peopleEntries.filter(p => p.name == "Alex")
}
"should keep existing errors" in {
// This is how you import the encoders/decoders of `underlying` context without importing things that will conflict
// i.e. the quote and run methods
import testContext.underlying.{prepare => _, run => _, _}
import java.sql.SQLException

val zioThatCanFail: ZIO[Connection, String, Nothing] = ZIO.service[Connection] *> ZIO.fail("Custom Error")
"val result: ZIO[DataSource, String | SQLException, List[Person]] = zioThatCanFail.onDataSource" should compile
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote the tests in this style (something must compile) because the only thing that changed is the types. Let me know if you have any suggestions!

}
}

"implicitDS on underlying context" - {
Expand Down Expand Up @@ -99,5 +109,19 @@ class OnDataSourceSpec extends PeopleZioProxySpec {
svc <- ZIO.attempt(Service(ds))
} yield (svc.people)).runSyncUnsafe() mustEqual peopleEntries.filter(p => p.name == "Alex")
}
"should keep existing errors" in {
// This is how you import the encoders/decoders of `underlying` context without importing things that will conflict
// i.e. the quote and run methods
import testContext.underlying.{prepare => _, run => _, _}
import java.sql.SQLException

(for {
ds <- ZIO.service[DataSource]
given Implicit[DataSource] = Implicit(ds)
} yield {
val zioThatCanFail: ZIO[DataSource, String, Nothing] = ZIO.service[DataSource] *> ZIO.fail("Custom Error")
"val result: ZIO[DataSource, String | SQLException, List[Person]] = zioThatCanFail.implicitDS" should compile
}).runSyncUnsafe()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ class ZioJdbcUnderlyingContextSpec extends ZioProxySpec {
r <- testContext.underlying.run(qr1)
} yield r).onDataSource.runSyncUnsafe().map(_.i) mustEqual List(33)
}
"keep existing errors" in {
import java.sql.{Connection, SQLException}

val zioThatCanFail: ZIO[Any, String, Nothing] = ZIO.fail("Custom Error")
"val result: ZIO[Connection, String | SQLException, List[TestEntity]] = testContext.underlying.transaction(zioThatCanFail *> testContext.underlying.run(qr1))" must compile
}
// "prepare" in {
// testContext.underlying.prepareParams(
// "select * from Person where name=? and age > ?", (ps, session) => (List("Sarah", 127), ps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,9 @@ class ZioJdbcContextSpec extends ZioSpec {
"select * from Person where name=? and age > ?", (ps, session) => (List("Sarah", 127), ps)
).runSyncUnsafe() mustEqual List("127", "'Sarah'")
}
"keep existing errors" in {
val zioThatCanFail: ZIO[Any, String, Nothing] = ZIO.fail("Custom Error")
"val result: ZIO[Any, String | SQLException, List[TestEntity]] = testContext.transaction(zioThatCanFail *> testContext.run(qr1))" must compile
}
}
}