Skip to content

Commit 2458ac7

Browse files
committed
Support @transactional options timeout and isolation level
1 parent 103e993 commit 2458ac7

File tree

2 files changed

+171
-0
lines changed

2 files changed

+171
-0
lines changed

src/main/java/org/springframework/data/couchbase/transaction/CouchbaseSimpleCallbackTransactionManager.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import reactor.core.publisher.Mono;
4343

4444
import java.lang.reflect.Field;
45+
import java.time.Duration;
4546
import java.util.concurrent.atomic.AtomicReference;
4647

4748
public class CouchbaseSimpleCallbackTransactionManager implements CallbackPreferringPlatformTransactionManager {
@@ -60,6 +61,8 @@ public CouchbaseSimpleCallbackTransactionManager(ReactiveCouchbaseClientFactory
6061
public <T> T execute(TransactionDefinition definition, TransactionCallback<T> callback) throws TransactionException {
6162
final AtomicReference<T> execResult = new AtomicReference<>();
6263

64+
setOptionsFromDefinition(definition);
65+
6366
TransactionResult result = couchbaseClientFactory.getCluster().block().transactions().run(ctx -> {
6467
CouchbaseTransactionStatus status = new CouchbaseTransactionStatus(null, true, false, false, true, null, null);
6568

@@ -78,6 +81,27 @@ public <T> T execute(TransactionDefinition definition, TransactionCallback<T> ca
7881
return execResult.get();
7982
}
8083

84+
/**
85+
* @param definition reflects the @Transactional options
86+
*/
87+
private void setOptionsFromDefinition(TransactionDefinition definition) {
88+
if (definition != null) {
89+
if (definition.getTimeout() != TransactionDefinition.TIMEOUT_DEFAULT) {
90+
options = options.timeout(Duration.ofSeconds(definition.getTimeout()));
91+
}
92+
93+
if (!(definition.getIsolationLevel() == TransactionDefinition.ISOLATION_DEFAULT
94+
|| definition.getIsolationLevel() == TransactionDefinition.ISOLATION_READ_COMMITTED)) {
95+
throw new IllegalArgumentException("Couchbase Transactions run at Read Committed isolation - other isolation levels are not supported");
96+
}
97+
98+
// readonly is ignored as it is documented as being a hint that won't necessarily cause writes to fail
99+
100+
// todo gpx what about propagation?
101+
}
102+
103+
}
104+
81105
// Setting ThreadLocal storage
82106
private void populateTransactionSynchronizationManager(TransactionAttemptContext ctx) {
83107
TransactionSynchronizationManager.setActualTransactionActive(true);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Copyright 2012-2022 the original author or authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.data.couchbase.transactions;
18+
19+
import com.couchbase.client.java.transactions.error.TransactionFailedException;
20+
import org.junit.jupiter.api.BeforeAll;
21+
import org.junit.jupiter.api.BeforeEach;
22+
import org.junit.jupiter.api.DisplayName;
23+
import org.junit.jupiter.api.Test;
24+
import org.springframework.beans.factory.annotation.Autowired;
25+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
26+
import org.springframework.context.support.GenericApplicationContext;
27+
import org.springframework.data.couchbase.CouchbaseClientFactory;
28+
import org.springframework.data.couchbase.config.BeanNames;
29+
import org.springframework.data.couchbase.core.CouchbaseOperations;
30+
import org.springframework.data.couchbase.core.CouchbaseTemplate;
31+
import org.springframework.data.couchbase.domain.Person;
32+
import org.springframework.data.couchbase.util.ClusterType;
33+
import org.springframework.data.couchbase.util.IgnoreWhen;
34+
import org.springframework.data.couchbase.util.JavaIntegrationTests;
35+
import org.springframework.stereotype.Component;
36+
import org.springframework.stereotype.Service;
37+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
38+
import org.springframework.transaction.annotation.EnableTransactionManagement;
39+
import org.springframework.transaction.annotation.Isolation;
40+
import org.springframework.transaction.annotation.Transactional;
41+
42+
import java.time.Duration;
43+
import java.util.concurrent.atomic.AtomicInteger;
44+
import java.util.function.Function;
45+
46+
import static org.junit.jupiter.api.Assertions.assertTrue;
47+
import static org.junit.jupiter.api.Assertions.fail;
48+
49+
/**
50+
* Tests for @Transactional methods, setting all the various options allowed by @Transactional.
51+
*/
52+
@IgnoreWhen(clusterTypes = ClusterType.MOCKED)
53+
@SpringJUnitConfig(Config.class)
54+
public class CouchbaseTransactionalOptionsIntegrationTests extends JavaIntegrationTests {
55+
56+
@Autowired CouchbaseClientFactory couchbaseClientFactory;
57+
PersonService personService;
58+
@Autowired
59+
CouchbaseTemplate operations;
60+
static GenericApplicationContext context;
61+
62+
@BeforeAll
63+
public static void beforeAll() {
64+
callSuperBeforeAll(new Object() {});
65+
context = new AnnotationConfigApplicationContext(Config.class, PersonService.class);
66+
}
67+
68+
@BeforeEach
69+
public void beforeEachTest() {
70+
personService = context.getBean(PersonService.class);
71+
72+
Person walterWhite = new Person(1, "Walter", "White");
73+
try {
74+
couchbaseClientFactory.getBucket().defaultCollection().remove(walterWhite.getId().toString());
75+
} catch (Exception ex) {
76+
// System.err.println(ex);
77+
}
78+
}
79+
80+
@DisplayName("@Transactional(timeout = 2) will timeout at around 2 seconds")
81+
@Test
82+
public void timeout() {
83+
long start = System.nanoTime();
84+
Person person = new Person(1, "Walter", "White");
85+
operations.insertById(Person.class).one(person);
86+
try {
87+
personService.timeout(person.getId().toString());
88+
fail();
89+
}
90+
catch (TransactionFailedException err) {
91+
}
92+
Duration timeTaken = Duration.ofNanos(System.nanoTime() - start);
93+
assertTrue(timeTaken.toMillis() >= 2000);
94+
assertTrue(timeTaken.toMillis() < 10_000); // Default transaction timeout is 15s
95+
}
96+
97+
@DisplayName("@Transactional(isolation = Isolation.ANYTHING_BUT_READ_COMMITTED) will fail")
98+
@Test
99+
public void unsupportedIsolation() {
100+
try {
101+
personService.unsupportedIsolation();
102+
fail();
103+
}
104+
catch (IllegalArgumentException err) {
105+
}
106+
}
107+
108+
@DisplayName("@Transactional(isolation = Isolation.READ_COMMITTED) will succeed")
109+
@Test
110+
public void supportedIsolation() {
111+
personService.supportedIsolation();
112+
}
113+
114+
@Service
115+
@Component
116+
@EnableTransactionManagement
117+
static
118+
class PersonService {
119+
final CouchbaseOperations ops;
120+
121+
public PersonService(CouchbaseOperations ops) {
122+
this.ops = ops;
123+
}
124+
125+
@Transactional(transactionManager = BeanNames.COUCHBASE_SIMPLE_CALLBACK_TRANSACTION_MANAGER)
126+
public <T> T doInTransaction(AtomicInteger tryCount, Function<CouchbaseOperations, T> callback) {
127+
tryCount.incrementAndGet();
128+
return callback.apply(ops);
129+
}
130+
131+
@Transactional(transactionManager = BeanNames.COUCHBASE_SIMPLE_CALLBACK_TRANSACTION_MANAGER, timeout = 2)
132+
public void timeout(String id) {
133+
while (true) {
134+
Person p = ops.findById(Person.class).one(id);
135+
ops.replaceById(Person.class).one(p);
136+
}
137+
}
138+
139+
@Transactional(transactionManager = BeanNames.COUCHBASE_SIMPLE_CALLBACK_TRANSACTION_MANAGER, isolation = Isolation.REPEATABLE_READ)
140+
public void unsupportedIsolation() {
141+
}
142+
143+
@Transactional(transactionManager = BeanNames.COUCHBASE_SIMPLE_CALLBACK_TRANSACTION_MANAGER, isolation = Isolation.READ_COMMITTED)
144+
public void supportedIsolation() {
145+
}
146+
}
147+
}

0 commit comments

Comments
 (0)