Skip to content

Commit

Permalink
fix: EXPOSED-694 Entities insertion could fail if batches have differ…
Browse files Browse the repository at this point in the history
…ent column sets (#2365)
  • Loading branch information
obabichevjb authored Jan 31, 2025
1 parent a76c7b8 commit 7beb628
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,15 @@ class EntityCache(private val transaction: Transaction) {

@Suppress("TooGenericExceptionCaught")
internal fun flushInserts(table: IdTable<*>) {
var toFlush: List<Entity<*>> = inserts.remove(table)?.toList().orEmpty()
while (toFlush.isNotEmpty()) {
val partition = toFlush.partition { entity ->
entity.writeValues.none {
val (key, value) = it
key.referee == table.id && value is EntityID<*> && value._value == null
}
}
toFlush = partition.first
var entitiesToInsert = inserts.remove(table)?.toList().orEmpty()

while (entitiesToInsert.isNotEmpty()) {
val (currentBatch, nextBatch) = partitionEntitiesForInsert(entitiesToInsert, table)
entitiesToInsert = nextBatch

val ids = try {
executeAsPartOfEntityLifecycle {
table.batchInsert(toFlush) { entry ->
table.batchInsert(currentBatch) { entry ->
for ((c, v) in entry.writeValues) {
this[c] = v
}
Expand All @@ -250,16 +247,15 @@ class EntityCache(private val transaction: Transaction) {
// this try/catch should help to get information about the flaky test.
// try/catch can be safely removed after the fixing the issue
// TooGenericExceptionCaught suppress also can be removed
val toFlushString = toFlush.joinToString("; ") {
entry ->
val toFlushString = currentBatch.joinToString("; ") { entry ->
entry.writeValues.map { writeValue -> "${writeValue.key.name}=${writeValue.value}" }.joinToString { ", " }
}

exposedLogger.error("ArrayIndexOutOfBoundsException on attempt to make flush inserts. Table: ${table.tableName}, entries: ($toFlushString)", cause)
throw cause
}

for ((entry, genValues) in toFlush.zip(ids)) {
for ((entry, genValues) in currentBatch.zip(ids)) {
if (entry.id._value == null) {
val id = genValues[table.id]
entry.id._value = id._value
Expand All @@ -274,12 +270,30 @@ class EntityCache(private val transaction: Transaction) {
transaction.registerChange(entry.klass, entry.id, EntityChangeType.Created)
pendingInitializationLambdas[entry]?.forEach { it(entry) }
}

toFlush = partition.second
}
transaction.alertSubscribers()
}

/**
* That method places the entities with different `writeValues` column sets into different partitions.
* It prevents the issues with inconsistent batch insert statement.
*
* The entities that have referee in the same table and these referee are not created yet
* are also put into the second partition
*/
private fun partitionEntitiesForInsert(entities: Collection<Entity<*>>, table: IdTable<*>): Pair<List<Entity<*>>, List<Entity<*>>> {
val firstEntityColumns = entities.first().writeValues.keys
return entities.partition { entity ->
val refereeFromSameTableAlreadyCreated = entity.writeValues.none { (key, value) ->
key.referee == table.id && value is EntityID<*> && value._value == null
}

val columnSetAlignedWithFirstEntity = entity.writeValues.keys == firstEntityColumns

refereeFromSameTableAlreadyCreated && columnSetAlignedWithFirstEntity
}
}

/**
* Clears this [EntityCache] of all stored data, including any reference mappings.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package org.jetbrains.exposed.sql.tests.shared.entities
import org.jetbrains.exposed.dao.IntEntity
import org.jetbrains.exposed.dao.IntEntityClass
import org.jetbrains.exposed.dao.entityCache
import org.jetbrains.exposed.dao.flushCache
import org.jetbrains.exposed.dao.id.EntityID
import org.jetbrains.exposed.dao.id.IdTable
import org.jetbrains.exposed.dao.id.IntIdTable
import org.jetbrains.exposed.sql.Column
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.selectAll
import org.jetbrains.exposed.sql.tests.DatabaseTestsBase
Expand Down Expand Up @@ -170,4 +173,46 @@ class EntityCacheTests : DatabaseTestsBase() {
assertEquals(entity, TestEntity.testCache(entity.id))
}
}

object TableWithDefaultValue : IdTable<Int>() {
val value = integer("value")
val valueWithDefault = integer("valueWithDefault")
.default(10)

override val id: Column<EntityID<Int>> = integer("id")
.clientDefault { Random.nextInt() }
.entityId()

override val primaryKey: PrimaryKey = PrimaryKey(id)
}

class TableWithDefaultValueEntity(id: EntityID<Int>) : IntEntity(id) {
var value by TableWithDefaultValue.value

var valueWithDefault by TableWithDefaultValue.valueWithDefault

companion object : IntEntityClass<TableWithDefaultValueEntity>(TableWithDefaultValue)
}

@Test
fun entitiesWithDifferentAmountOfFieldsCouldBeCreated() {
withTables(TableWithDefaultValue) {
TableWithDefaultValueEntity.new {
value = 1
}
TableWithDefaultValueEntity.new {
value = 2
valueWithDefault = 1
}

// It's the key flush. It must not fail with inconsistent batch insert statement.
// The table also should have client side default value. Otherwise the `writeValues`
// would be extended with default values inside `EntityClass::new()` method.
flushCache()
entityCache.clear()

val entity = TableWithDefaultValueEntity.find { TableWithDefaultValue.value eq 1 }.first()
assertEquals(10, entity.valueWithDefault)
}
}
}

0 comments on commit 7beb628

Please sign in to comment.