Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-346 Use reactive MRT operations #816

Merged
merged 2 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<maven.javadoc.plugin>3.3.0</maven.javadoc.plugin>
<maven.gpg.plugin>1.6</maven.gpg.plugin>
<aerospike-client-jdk8>9.0.2</aerospike-client-jdk8>
<aerospike-reactor-client>8.1.2</aerospike-reactor-client>
<aerospike-reactor-client>9.0.2</aerospike-reactor-client>
<reactor-test>3.7.0</reactor-test>
<embedded-aerospike>3.1.9</embedded-aerospike>
<jodatime>2.13.0</jodatime>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.springframework.data.aerospike.transaction.reactive;

import org.springframework.data.aerospike.transaction.sync.AerospikeTransactionResourceHolder;
import com.aerospike.client.AbortStatus;
import com.aerospike.client.CommitStatus;
import org.springframework.lang.Nullable;
import org.springframework.transaction.support.SmartTransactionObject;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;

/**
* A {@link SmartTransactionObject} implementation that has reactive transaction resource holder
Expand Down Expand Up @@ -39,26 +41,29 @@ void setResourceHolder(@Nullable AerospikeReactiveTransactionResourceHolder reso
this.resourceHolder = resourceHolder;
}

private void failIfNoTransaction() {
if (!hasResourceHolder()) {
throw new IllegalStateException("Error: expecting transaction to exist");
}
private Mono<AerospikeReactiveTransactionResourceHolder> getResourceHolder() {
return Mono.fromCallable(() -> {
if (!hasResourceHolder()) {
throw new IllegalStateException("Error: expecting transaction to exist");
}
return resourceHolder;
});
}

/**
* Commit the transaction
*/
public void commitTransaction() {
failIfNoTransaction();
resourceHolder.getClient().getAerospikeClient().commit(resourceHolder.getTransaction());
public Mono<CommitStatus> commitTransaction() {
return getResourceHolder()
.flatMap(h -> h.getClient().commit(h.getTransaction()));
}

/**
* Rollback (abort) the transaction
*/
public void abortTransaction() {
failIfNoTransaction();
resourceHolder.getClient().getAerospikeClient().abort(resourceHolder.getTransaction());
public Mono<AbortStatus> abortTransaction() {
return getResourceHolder()
.flatMap(h -> h.getClient().abort(h.getTransaction()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class AerospikeReactiveTransactionManager extends AbstractReactiveTransac
/**
* Create a new instance of {@link AerospikeReactiveTransactionManager}
*/

public AerospikeReactiveTransactionManager(IAerospikeReactorClient client) {
this.client = client;
}
Expand Down Expand Up @@ -73,7 +72,12 @@ protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationMa
rHolder.setSynchronizedWithTransaction(true);
synchronizationManager.bindResource(client, rHolder);
})
.onErrorMap(e -> new TransactionSystemException("Could not bind transaction resource", e))
.onErrorMap(e -> {
if (e instanceof TransactionSystemException) {
return e;
}
return new TransactionSystemException("Could not bind transaction resource", e);
})
.then();
});
}
Expand All @@ -89,28 +93,24 @@ private Mono<AerospikeReactiveTransactionResourceHolder> createResourceHolder(IA
@Override
protected Mono<Void> doCommit(TransactionSynchronizationManager synchronizationManager,
GenericReactiveTransaction status) {
return Mono.fromRunnable(() -> {
AerospikeReactiveTransaction transaction = getTransaction(status);
transaction.commitTransaction();
})
return Mono.fromSupplier(() -> getTransaction(status))
.flatMap(AerospikeReactiveTransaction::commitTransaction)
.onErrorMap(e -> new TransactionSystemException("Could not commit transaction", e))
.then();
}

@Override
protected Mono<Void> doRollback(TransactionSynchronizationManager synchronizationManager,
GenericReactiveTransaction status) {
return Mono.fromRunnable(() -> {
AerospikeReactiveTransaction transaction = getTransaction(status);
transaction.abortTransaction();
})
return Mono.fromSupplier(() -> getTransaction(status))
.flatMap(AerospikeReactiveTransaction::abortTransaction)
.onErrorMap(e -> new TransactionSystemException("Could not abort transaction", e))
.then();
}

@Override
protected Mono<Object> doSuspend(TransactionSynchronizationManager synchronizationManager, Object transaction)
throws TransactionException {
protected Mono<Object> doSuspend(TransactionSynchronizationManager synchronizationManager,
Object transaction) throws TransactionException {
return Mono.fromSupplier(() -> {
AerospikeReactiveTransaction aerospikeTransaction = toAerospikeTransaction(transaction);
aerospikeTransaction.setResourceHolder(null);
Expand All @@ -121,8 +121,7 @@ protected Mono<Object> doSuspend(TransactionSynchronizationManager synchronizati

@Override
protected Mono<Void> doResume(TransactionSynchronizationManager synchronizationManager,
@Nullable Object transaction,
Object suspendedResources) {
@Nullable Object transaction, Object suspendedResources) {
return Mono.fromRunnable(() -> synchronizationManager.bindResource(client, suspendedResources))
.onErrorMap(e -> new TransactionSystemException("Could not resume transaction", e))
.then();
Expand All @@ -135,7 +134,7 @@ protected Mono<Void> doSetRollbackOnly(TransactionSynchronizationManager synchro
AerospikeReactiveTransaction transaction = toAerospikeTransaction(status);
transaction.getRequiredResourceHolder().setRollbackOnly();
})
.onErrorMap(e -> new TransactionSystemException("Could not resume transaction", e))
.onErrorMap(e -> new TransactionSystemException("Could not set transaction to rollback-only", e))
.then();
}

Expand All @@ -149,7 +148,7 @@ protected Mono<Void> doCleanupAfterCompletion(TransactionSynchronizationManager
synchronizationManager.unbindResource(client);
aerospikeTransaction.getRequiredResourceHolder().clear();
})
.onErrorMap(e -> new TransactionSystemException("Could not resume transaction", e))
.onErrorMap(e -> new TransactionSystemException("Could not clean up transaction", e))
.then();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected void doSetRollbackOnly(DefaultTransactionStatus status) throws Transac
AerospikeTransaction transaction = getTransaction(status);
transaction.getResourceHolderOrFail().setRollbackOnly();
} catch (Exception e) {
throw new TransactionSystemException("Could not set rollback only for a transaction", e);
throw new TransactionSystemException("Could not set transaction to rollback-only", e);
}
}

Expand Down
Loading