-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathTransaction.kt
68 lines (57 loc) · 2.3 KB
/
Transaction.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package klite.jdbc
import klite.error
import klite.logger
import kotlinx.coroutines.ThreadContextElement
import java.sql.Connection
import java.sql.SQLException
import javax.sql.DataSource
import kotlin.annotation.AnnotationTarget.CLASS
import kotlin.annotation.AnnotationTarget.FUNCTION
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext
/** Disable transaction for a route or a job. This will most likely leave the connection in auto-commit mode (depending on connection pool settings). */
@Target(FUNCTION, CLASS) annotation class NoTransaction
class Transaction(val db: DataSource): AutoCloseable {
companion object {
private val log = logger()
private val threadLocal = ThreadLocal<Transaction>()
fun current(): Transaction? = threadLocal.get()
}
private var conn: Connection? = null
val connection: Connection get() = conn ?: openConnection()
private fun openConnection() = db.connection.apply {
autoCommit = false
conn = this
}
override fun close() = close(true)
fun close(commit: Boolean = true) {
try {
conn?.apply {
if (!autoCommit) {
if (commit) commit() else rollback()
autoCommit = true
}
}
} catch (e: SQLException) {
log.error("Failed to ${if (commit) "commit" else "rollback"}", e)
} finally {
try { conn?.close() } catch (e: Exception) { log.error("Failed to close $conn: $e") }
conn = null
detachFromThread()
}
}
fun commit() = conn?.commit()
fun rollback() = conn?.rollback()
fun attachToThread() = this.also { threadLocal.set(it) }
fun detachFromThread() = threadLocal.remove()
}
fun <R> DataSource.withConnection(block: Connection.() -> R): R {
val tx = Transaction.current()
return if (tx?.db == this) tx.connection.block()
else connection.use(block)
}
class TransactionContext(val tx: Transaction? = Transaction.current()): ThreadContextElement<Transaction?>, AbstractCoroutineContextElement(Key) {
companion object Key: CoroutineContext.Key<TransactionContext>
override fun updateThreadContext(context: CoroutineContext) = Transaction.current().also { tx?.attachToThread() }
override fun restoreThreadContext(context: CoroutineContext, oldState: Transaction?) { oldState?.attachToThread() ?: tx?.detachFromThread() }
}