Skip to content

Commit 4dcf6dc

Browse files
committed
Limit trigger handling concurrency
- Change flatMap to use concurrency 1 in a trigger handling which should fix issues when events are sent fast. - Fixes #942
1 parent 5919b06 commit 4dcf6dc

File tree

3 files changed

+172
-8
lines changed

3 files changed

+172
-8
lines changed

spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveStateMachineExecutor.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -115,7 +115,8 @@ public ReactiveStateMachineExecutor(StateMachine<S, E> stateMachine, StateMachin
115115
@Override
116116
protected void onInit() throws Exception {
117117
triggerSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
118-
triggerFlux = triggerSink.asFlux().flatMap(trigger -> handleTrigger(trigger));
118+
// limit concurrency so that we get one by one handling
119+
triggerFlux = triggerSink.asFlux().flatMap(trigger -> handleTrigger(trigger), 1);
119120
}
120121

121122
@Override
@@ -328,10 +329,11 @@ private Mono<Void> handleTrigger(TriggerQueueItem queueItem) {
328329
}
329330
});
330331
}
331-
}))
332-
.contextWrite(Context.of(
333-
StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS, new ExecutorExceptionHolder(),
334-
REACTOR_CONTEXT_TRIGGER_ERRORS, new ExecutorExceptionHolder()));
332+
})
333+
)
334+
.contextWrite(Context.of(
335+
StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS, new ExecutorExceptionHolder(),
336+
REACTOR_CONTEXT_TRIGGER_ERRORS, new ExecutorExceptionHolder()));
335337
}
336338

337339

spring-statemachine-core/src/test/java/org/springframework/statemachine/TestUtils.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2020 the original author or authors.
2+
* Copyright 2015-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,8 @@
2323
import java.lang.reflect.Method;
2424
import java.time.Duration;
2525

26+
import org.apache.commons.logging.Log;
27+
import org.apache.commons.logging.LogFactory;
2628
import org.springframework.beans.factory.BeanFactory;
2729
import org.springframework.messaging.Message;
2830
import org.springframework.messaging.support.MessageBuilder;
@@ -44,6 +46,8 @@
4446
*/
4547
public class TestUtils {
4648

49+
private static Log log = LogFactory.getLog(TestUtils.class);
50+
4751
@SuppressWarnings("unchecked")
4852
public static <S, E> StateMachine<S, E> resolveMachine(BeanFactory beanFactory) {
4953
assertThat(beanFactory.containsBean(DEFAULT_ID_STATEMACHINE)).isTrue();
@@ -99,7 +103,10 @@ public static <T> Flux<Message<T>> eventsAsFlux(T... events) {
99103

100104
public static <S, E> void doSendEventAndConsumeAll(StateMachine<S, E> stateMachine, E event) {
101105
StepVerifier.create(stateMachine.sendEvent(eventAsMono(event)))
102-
.thenConsumeWhile(eventResult -> true)
106+
.thenConsumeWhile(eventResult -> {
107+
log.debug("Consume eventResult " + eventResult);
108+
return true;
109+
})
103110
.expectComplete()
104111
.verify(Duration.ofSeconds(5));
105112
}
@@ -126,6 +133,30 @@ public static <S, E> void doSendEventAndConsumeResultAsDenied(StateMachine<S, E>
126133
.verifyComplete();
127134
}
128135

136+
@SafeVarargs
137+
public static <S, E> void doSendEventsAndConsumeAll(StateMachine<S, E> stateMachine, E... events) {
138+
StepVerifier.create(stateMachine.sendEvents(eventsAsFlux(events)))
139+
.thenConsumeWhile(eventResult -> {
140+
log.debug("Consume eventResult " + eventResult);
141+
return true;
142+
})
143+
.expectComplete()
144+
.verify(Duration.ofSeconds(5));
145+
}
146+
147+
@SafeVarargs
148+
public static <S, E> void doSendEventsAndConsumeAllWithComplete(StateMachine<S, E> stateMachine, E... events) {
149+
Flux<Void> completions = stateMachine.sendEvents(eventsAsFlux(events))
150+
.doOnNext(result -> {
151+
log.debug("Consume eventResult " + result);
152+
})
153+
.flatMap(result -> result.complete());
154+
StepVerifier.create(completions)
155+
.thenConsumeWhile(complete -> true)
156+
.expectComplete()
157+
.verify(Duration.ofSeconds(10));
158+
}
159+
129160
@SuppressWarnings("unchecked")
130161
public static <T> T readField(String name, Object target) throws Exception {
131162
Field field = null;
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.statemachine.action;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.springframework.statemachine.TestUtils.doSendEventsAndConsumeAllWithComplete;
20+
import static org.springframework.statemachine.TestUtils.doStartAndAssert;
21+
import static org.springframework.statemachine.TestUtils.resolveMachine;
22+
23+
import java.time.Duration;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicLong;
27+
28+
import org.junit.jupiter.api.Test;
29+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.statemachine.AbstractStateMachineTests;
33+
import org.springframework.statemachine.StateContext;
34+
import org.springframework.statemachine.StateMachine;
35+
import org.springframework.statemachine.config.EnableStateMachine;
36+
import org.springframework.statemachine.config.EnumStateMachineConfigurerAdapter;
37+
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer;
38+
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;
39+
40+
import reactor.core.publisher.Mono;
41+
42+
/**
43+
* Tests for state machine reactive actions.
44+
*
45+
* @author Janne Valkealahti
46+
*
47+
*/
48+
public class ReactiveAction2Tests extends AbstractStateMachineTests {
49+
50+
@Test
51+
public void testSimpleReactiveActions() throws Exception {
52+
context.register(Config1.class);
53+
context.refresh();
54+
StateMachine<TestStates, TestEvents> machine = resolveMachine(context);
55+
doStartAndAssert(machine);
56+
57+
TestCountAction testAction3 = context.getBean("testAction3", TestCountAction.class);
58+
TestCountAction testAction4 = context.getBean("testAction4", TestCountAction.class);
59+
doSendEventsAndConsumeAllWithComplete(machine, TestEvents.E1, TestEvents.E2);
60+
assertThat(testAction3.latch.await(6, TimeUnit.SECONDS)).isTrue();
61+
assertThat(testAction4.latch.await(6, TimeUnit.SECONDS)).isTrue();
62+
assertThat(testAction4.time.get() - testAction3.time.get()).isGreaterThan(1000);
63+
}
64+
65+
@Configuration
66+
@EnableStateMachine
67+
static class Config1 extends EnumStateMachineConfigurerAdapter<TestStates, TestEvents> {
68+
69+
@Override
70+
public void configure(StateMachineStateConfigurer<TestStates, TestEvents> states) throws Exception {
71+
states
72+
.withStates()
73+
.initial(TestStates.S1)
74+
.stateEntryFunction(TestStates.S2, testAction3())
75+
.stateEntryFunction(TestStates.S3, testAction4());
76+
}
77+
78+
@Override
79+
public void configure(StateMachineTransitionConfigurer<TestStates, TestEvents> transitions) throws Exception {
80+
transitions
81+
.withExternal()
82+
.source(TestStates.S1)
83+
.target(TestStates.S2)
84+
.event(TestEvents.E1)
85+
.and()
86+
.withExternal()
87+
.source(TestStates.S2)
88+
.target(TestStates.S3)
89+
.event(TestEvents.E2);
90+
}
91+
92+
@Bean
93+
public TestCountAction testAction3() {
94+
return new TestCountAction("ACTION3");
95+
}
96+
97+
@Bean
98+
public TestCountAction testAction4() {
99+
return new TestCountAction("ACTION4");
100+
}
101+
}
102+
103+
@Override
104+
protected AnnotationConfigApplicationContext buildContext() {
105+
return new AnnotationConfigApplicationContext();
106+
}
107+
108+
private static class TestCountAction implements ReactiveAction<TestStates, TestEvents> {
109+
110+
private final String id;
111+
int count = 0;
112+
CountDownLatch latch = new CountDownLatch(1);
113+
AtomicLong time = new AtomicLong();
114+
115+
TestCountAction(String id) {
116+
this.id = id;
117+
}
118+
119+
@Override
120+
public Mono<Void> apply(StateContext<TestStates, TestEvents> context) {
121+
return Mono.delay(Duration.ofMillis(2000))
122+
.doFinally(x -> {
123+
count++;
124+
time.set(System.currentTimeMillis());
125+
latch.countDown();
126+
})
127+
.then()
128+
.log(id);
129+
}
130+
}
131+
}

0 commit comments

Comments
 (0)