Skip to content

Commit 1c76944

Browse files
cschockaertRobWin
authored andcommitted
fix(resilience4j-reactor): release semaphore on cancel (ReactiveX#333)
1 parent a11ef35 commit 1c76944

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ public void hookOnNext(T t) {
4545
}
4646
}
4747

48+
@Override
49+
public void hookOnCancel() {
50+
releaseBulkhead();
51+
}
52+
4853
@Override
4954
public void hookOnError(Throwable t) {
5055
if (wasCallPermitted()) {

resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/MonoBulkheadTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,18 @@ public void shouldEmitBulkheadFullExceptionEvenWhenErrorNotOnSubscribe() {
9797

9898
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(0);
9999
}
100+
101+
@Test
102+
public void shouldReleaseBulkheadSemaphoreOnCancel() {
103+
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
104+
StepVerifier.create(
105+
Mono.just("Event")
106+
.transform(BulkheadOperator.of(bulkhead)))
107+
.expectSubscription()
108+
.expectNext("Event")
109+
.thenCancel()
110+
.verify();
111+
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
112+
}
113+
100114
}

0 commit comments

Comments
 (0)