Skip to content

Commit 34646c1

Browse files
Fix block() issues with reactive transactions. (#1537)
* Fix block() issues with reactive transactions. Closes #1530. * Removing the form of checkForTransactionInThreadLocalStorage (#1538) that does not use TransactionMarkerOwner. As this bypasses a critical check on whether we are inside a blocking transaction. Two new tests in SDKTransactionsSaveIntegrationTests (reactiveSaveInBlockingTransaction and blockingSaveInBlockingTransaction) will fail without this, as the .save() does not realise it's in a transaction. Adding new tests, for .save() and for performing transactions in reactor blocking threads. Co-authored-by: Graham Pople <grahampople@gmail.com>
1 parent 0b40ca1 commit 34646c1

File tree

8 files changed

+259
-47
lines changed

8 files changed

+259
-47
lines changed

spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/ReactiveCouchbaseTemplate.java

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, fin
5959
}
6060

6161
public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, final CouchbaseConverter converter,
62-
final TranslationService translationService) {
62+
final TranslationService translationService) {
6363
this(clientFactory, converter, translationService, null);
6464
}
6565

@@ -75,35 +75,48 @@ public ReactiveCouchbaseTemplate(final CouchbaseClientFactory clientFactory, fin
7575
@Override
7676
public <T> Mono<T> save(T entity, String... scopeAndCollection) {
7777
Assert.notNull(entity, "Entity must not be null!");
78+
7879
String scope = scopeAndCollection.length > 0 ? scopeAndCollection[0] : null;
7980
String collection = scopeAndCollection.length > 1 ? scopeAndCollection[1] : null;
80-
Mono<T> result;
81-
final CouchbasePersistentEntity<?> mapperEntity = getConverter().getMappingContext()
82-
.getPersistentEntity(entity.getClass());
83-
final CouchbasePersistentProperty versionProperty = mapperEntity.getVersionProperty();
84-
final boolean versionPresent = versionProperty != null;
85-
final Long version = versionProperty == null || versionProperty.getField() == null ? null
86-
: (Long) ReflectionUtils.getField(versionProperty.getField(), entity);
87-
final boolean existingDocument = version != null && version > 0;
88-
89-
Class clazz = entity.getClass();
90-
91-
if (!versionPresent) { // the entity doesn't have a version property
92-
// No version field - no cas
93-
// If in a transaction, insert is the only thing that will work
94-
if (TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent()) {
95-
result = (Mono<T>) insertById(clazz).inScope(scope).inCollection(collection).one(entity);
96-
} else { // if not in a tx, then upsert will work
97-
result = (Mono<T>) upsertById(clazz).inScope(scope).inCollection(collection).one(entity);
81+
return Mono.defer(() -> {
82+
final CouchbasePersistentEntity<?> mapperEntity = getConverter().getMappingContext()
83+
.getPersistentEntity(entity.getClass());
84+
final CouchbasePersistentProperty versionProperty = mapperEntity.getVersionProperty();
85+
final boolean versionPresent = versionProperty != null;
86+
final Long version = versionProperty == null || versionProperty.getField() == null ? null
87+
: (Long) ReflectionUtils.getField(versionProperty.getField(),
88+
entity);
89+
final boolean existingDocument = version != null && version > 0;
90+
91+
Class clazz = entity.getClass();
92+
93+
if (!versionPresent) { // the entity doesn't have a version property
94+
// No version field - no cas
95+
// If in a transaction, insert is the only thing that will work
96+
return TransactionalSupport.checkForTransactionInThreadLocalStorage()
97+
.flatMap(ctx -> {
98+
if (ctx.isPresent()) {
99+
return (Mono<T>) insertById(clazz).inScope(scope)
100+
.inCollection(collection)
101+
.one(entity);
102+
} else { // if not in a tx, then upsert will work
103+
return (Mono<T>) upsertById(clazz).inScope(scope)
104+
.inCollection(collection)
105+
.one(entity);
106+
}
107+
});
108+
} else if (existingDocument) { // there is a version property, and it is non-zero
109+
// Updating existing document with cas
110+
return (Mono<T>) replaceById(clazz).inScope(scope)
111+
.inCollection(collection)
112+
.one(entity);
113+
} else { // there is a version property, but it's zero or not set.
114+
// Creating new document
115+
return (Mono<T>) insertById(clazz).inScope(scope)
116+
.inCollection(collection)
117+
.one(entity);
98118
}
99-
} else if (existingDocument) { // there is a version property, and it is non-zero
100-
// Updating existing document with cas
101-
result = (Mono<T>) replaceById(clazz).inScope(scope).inCollection(collection).one(entity);
102-
} else { // there is a version property, but it's zero or not set.
103-
// Creating new document
104-
result = (Mono<T>) insertById(clazz).inScope(scope).inCollection(collection).one(entity);
105-
}
106-
return result;
119+
});
107120
}
108121

109122
public <T> Mono<Long> count(Query query, Class<T> domainType) {

spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/TransactionalSupport.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,10 @@ public static Mono<Optional<CouchbaseResourceHolder>> checkForTransactionInThrea
5252
});
5353
}
5454

55-
public static Optional<CouchbaseResourceHolder> checkForTransactionInThreadLocalStorage(ContextView ctx) {
56-
return Optional.ofNullable(ctx.hasKey(TransactionMarker.class) ? new CouchbaseResourceHolder(ctx.get(TransactionMarker.class).context()) : null);
57-
}
58-
59-
//public static Optional<CouchbaseResourceHolder> blockingCheckForTransactionInThreadLocalStorage() {
60-
// return TransactionMarkerOwner.marker;
61-
// }
62-
6355
public static Mono<Void> verifyNotInTransaction(String methodName) {
6456
return checkForTransactionInThreadLocalStorage().flatMap(s -> {
6557
if (s.isPresent()) {
66-
return Mono.error(new IllegalArgumentException(methodName + "can not be used inside a transaction"));
58+
return Mono.error(new IllegalArgumentException(methodName + " can not be used inside a transaction"));
6759
} else {
6860
return Mono.empty();
6961
}

spring-data-couchbase/src/main/java/org/springframework/data/couchbase/core/mapping/CouchbaseList.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ private void verifyValueType(final Object value) {
202202
}
203203

204204
throw new IllegalArgumentException(
205-
"Attribute of type " + clazz.getCanonicalName() + "can not be stored and must be converted.");
205+
"Attribute of type " + clazz.getCanonicalName() + " can not be stored and must be converted.");
206206
}
207207

208208
/**

spring-data-couchbase/src/main/java/org/springframework/data/couchbase/transaction/CouchbaseCallbackTransactionManager.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import com.couchbase.client.java.transactions.config.TransactionOptions;
4545
import com.couchbase.client.java.transactions.error.TransactionCommitAmbiguousException;
4646
import com.couchbase.client.java.transactions.error.TransactionFailedException;
47-
import reactor.util.context.ContextView;
4847

4948
/**
5049
* The Couchbase transaction manager, providing support for @Transactional methods.
@@ -74,7 +73,9 @@ public CouchbaseCallbackTransactionManager(CouchbaseClientFactory couchbaseClien
7473

7574
@Override
7675
public <T> T execute(TransactionDefinition definition, TransactionCallback<T> callback) throws TransactionException {
77-
boolean createNewTransaction = handlePropagation(definition, null);
76+
boolean isInExistingTransaction = TransactionalSupport.checkForTransactionInThreadLocalStorage().block()
77+
.isPresent();
78+
boolean createNewTransaction = handlePropagation(definition, isInExistingTransaction);
7879

7980
setOptionsFromDefinition(definition);
8081

@@ -88,8 +89,9 @@ public <T> T execute(TransactionDefinition definition, TransactionCallback<T> ca
8889
@Stability.Internal
8990
<T> Flux<T> executeReactive(TransactionDefinition definition,
9091
org.springframework.transaction.reactive.TransactionCallback<T> callback) {
91-
return Flux.deferContextual((ctx) -> {
92-
boolean createNewTransaction = handlePropagation(definition, ctx);
92+
return TransactionalSupport.checkForTransactionInThreadLocalStorage().flatMapMany(isInTransaction -> {
93+
boolean isInExistingTransaction = isInTransaction.isPresent();
94+
boolean createNewTransaction = handlePropagation(definition, isInExistingTransaction);
9395

9496
setOptionsFromDefinition(definition);
9597

@@ -188,9 +190,7 @@ public boolean isCompleted() {
188190
}
189191

190192
// Propagation defines what happens when a @Transactional method is called from another @Transactional method.
191-
private boolean handlePropagation(TransactionDefinition definition, ContextView ctx) {
192-
boolean isExistingTransaction = ctx != null ? TransactionalSupport.checkForTransactionInThreadLocalStorage(ctx).isPresent() :
193-
TransactionalSupport.checkForTransactionInThreadLocalStorage().block().isPresent();
193+
private Boolean handlePropagation(TransactionDefinition definition, boolean isExistingTransaction) {
194194

195195
LOGGER.trace("Deciding propagation behaviour from {} and {}", definition.getPropagationBehavior(),
196196
isExistingTransaction);

spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionReactiveIntegrationTests.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.couchbase.client.java.query.QueryScanConsistency.REQUEST_PLUS;
2020

2121
import lombok.Data;
22+
import org.springframework.data.couchbase.domain.PersonWithoutVersion;
2223
import reactor.core.publisher.Mono;
2324
import reactor.test.StepVerifier;
2425

@@ -49,9 +50,9 @@
4950

5051
/**
5152
* todo gp: these tests are using the `.as(transactionalOperator::transactional)` method which is for the chopping
52-
* block, so presumably these tests are too
53-
* todo mr: I'm not sure how as(transactionalOperator::transactional) is different than
54-
* todo mr: transactionOperator.transaction(...)in CouchbaseTransactionalOperatorTemplateIntegrationTests ?
53+
* block, so presumably these tests are too todo mr: I'm not sure how as(transactionalOperator::transactional) is
54+
* different than todo mr: transactionOperator.transaction(...)in CouchbaseTransactionalOperatorTemplateIntegrationTests
55+
* ?
5556
*
5657
* @author Michael Reiche
5758
*/
@@ -72,6 +73,7 @@ public class CouchbasePersonTransactionReactiveIntegrationTests extends JavaInte
7273
String sName = "_default";
7374
String cName = "_default";
7475
Person WalterWhite;
76+
PersonWithoutVersion BobbyBlackWithoutVersion;
7577

7678
@BeforeAll
7779
public static void beforeAll() {
@@ -86,6 +88,7 @@ public static void afterAll() {
8688
@BeforeEach
8789
public void beforeEachTest() {
8890
WalterWhite = new Person("Walter", "White");
91+
BobbyBlackWithoutVersion = new PersonWithoutVersion("Bobby", "Black");
8992
TransactionTestUtil.assertNotInTransaction();
9093
List<RemoveResult> pr = operations.removeByQuery(Person.class).withConsistency(REQUEST_PLUS).all().collectList()
9194
.block();
@@ -140,6 +143,12 @@ public void commitShouldPersistTxEntriesOfTxAnnotatedMethod() {
140143

141144
}
142145

146+
@Test
147+
public void commitShouldPersistTxEntriesOfTxAnnotatedMethodNoVersion() {
148+
personService.declarativeSavePersonWithoutVersion(BobbyBlackWithoutVersion).as(StepVerifier::create) //
149+
.expectError(UnsupportedOperationException.class); //
150+
}
151+
143152
@Test
144153
public void commitShouldPersistTxEntriesAcrossCollections() {
145154

spring-data-couchbase/src/test/java/org/springframework/data/couchbase/transactions/PersonServiceReactive.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.junit.jupiter.api.Assertions.assertTrue;
2121

2222
import org.springframework.data.couchbase.core.TransactionalSupport;
23+
import org.springframework.data.couchbase.domain.PersonWithoutVersion;
2324
import org.springframework.stereotype.Service;
2425
import reactor.core.publisher.Flux;
2526
import reactor.core.publisher.Mono;
@@ -109,6 +110,11 @@ public Mono<Person> declarativeSavePerson(Person person) {
109110
return personOperationsRx.save(person);
110111
}
111112

113+
@Transactional
114+
public Mono<PersonWithoutVersion> declarativeSavePersonWithoutVersion(PersonWithoutVersion person) {
115+
return personOperationsRx.save(person);
116+
}
117+
112118
@Transactional
113119
public Mono<Person> declarativeSavePersonErrors(Person person) {
114120
return personOperationsRx.insertById(Person.class).one(person)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2022 the original author or authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.data.couchbase.transactions.sdk;
18+
19+
import static org.junit.jupiter.api.Assertions.assertTrue;
20+
import static org.junit.jupiter.api.Assertions.fail;
21+
import static org.springframework.data.couchbase.transactions.util.TransactionTestUtil.assertNotInTransaction;
22+
23+
import reactor.core.publisher.Mono;
24+
import reactor.core.scheduler.Schedulers;
25+
26+
import org.junit.jupiter.api.AfterEach;
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.DisplayName;
29+
import org.junit.jupiter.api.Test;
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.data.couchbase.CouchbaseClientFactory;
32+
import org.springframework.data.couchbase.core.CouchbaseTemplate;
33+
import org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate;
34+
import org.springframework.data.couchbase.domain.Person;
35+
import org.springframework.data.couchbase.transactions.TransactionsConfig;
36+
import org.springframework.data.couchbase.util.Capabilities;
37+
import org.springframework.data.couchbase.util.ClusterType;
38+
import org.springframework.data.couchbase.util.IgnoreWhen;
39+
import org.springframework.data.couchbase.util.JavaIntegrationTests;
40+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
41+
42+
/**
43+
* Added for issue 1527: Tests running regular SDK transactions (blocking and reactive) on a reactor non-blocking
44+
* thread.
45+
*
46+
* @author Graham Pople
47+
*/
48+
@IgnoreWhen(missesCapabilities = Capabilities.QUERY, clusterTypes = ClusterType.MOCKED)
49+
@SpringJUnitConfig(TransactionsConfig.class)
50+
public class SDKTransactionsNonBlockingThreadIntegrationTests extends JavaIntegrationTests {
51+
@Autowired private CouchbaseClientFactory couchbaseClientFactory;
52+
@Autowired private CouchbaseTemplate ops;
53+
@Autowired private ReactiveCouchbaseTemplate reactiveOps;
54+
55+
@BeforeEach
56+
public void beforeEachTest() {
57+
assertNotInTransaction();
58+
}
59+
60+
@AfterEach
61+
public void afterEachTest() {
62+
assertNotInTransaction();
63+
}
64+
65+
@DisplayName("Trying to run a blocking transaction (or anything blocking) on a non-blocking thread, will not work")
66+
@Test
67+
public void blockingTransactionOnNonBlockingThread() {
68+
try {
69+
Mono.just(1).publishOn(Schedulers.parallel()).flatMap(ignore -> {
70+
assertTrue(Schedulers.isInNonBlockingThread());
71+
assertTrue(Thread.currentThread().getName().contains("parallel"));
72+
73+
couchbaseClientFactory.getCluster().transactions().run(ctx -> {
74+
ops.insertById(Person.class).one(new Person("Walter", "White"));
75+
});
76+
return Mono.empty();
77+
}).block();
78+
fail();
79+
} catch (IllegalStateException ignored) {}
80+
}
81+
82+
@DisplayName("Trying to run a reactive transaction on a non-blocking thread should work")
83+
@Test
84+
public void reactiveTransactionOnNonBlockingThread() {
85+
Mono.just(1).publishOn(Schedulers.parallel()).flatMap(ignore -> {
86+
return couchbaseClientFactory.getCluster().reactive().transactions().run(ctx -> {
87+
return reactiveOps.insertById(Person.class).one(new Person("Walter", "White"));
88+
});
89+
}).block();
90+
}
91+
}

0 commit comments

Comments
 (0)