1
1
package io .javaoperatorsdk .operator .processing .event .source .polling ;
2
2
3
+ import java .time .Duration ;
3
4
import java .util .Collections ;
4
5
import java .util .Optional ;
5
6
import java .util .Set ;
16
17
import io .javaoperatorsdk .operator .sample .simple .TestCustomResource ;
17
18
18
19
import static org .assertj .core .api .Assertions .assertThat ;
20
+ import static org .awaitility .Awaitility .await ;
19
21
import static org .mockito .ArgumentMatchers .any ;
20
22
import static org .mockito .ArgumentMatchers .eq ;
21
23
import static org .mockito .Mockito .atLeast ;
@@ -47,45 +49,50 @@ public void setup() {
47
49
}
48
50
49
51
@ Test
50
- void pollsTheResourceAfterAwareOfIt () throws InterruptedException {
52
+ void pollsTheResourceAfterAwareOfIt () {
51
53
source .onResourceCreated (testCustomResource );
52
54
53
- Thread .sleep (3 * PERIOD );
54
- verify (supplier , atLeast (2 )).fetchResources (eq (testCustomResource ));
55
- verify (eventHandler , times (1 )).handleEvent (any ());
55
+ await ().pollDelay (Duration .ofMillis (3 * PERIOD )).untilAsserted (() -> {
56
+ verify (supplier , atLeast (2 )).fetchResources (eq (testCustomResource ));
57
+ verify (supplier , atLeast (2 )).fetchDelay (any (), eq (testCustomResource ));
58
+ verify (eventHandler , times (1 )).handleEvent (any ());
59
+ });
56
60
}
57
61
58
62
@ Test
59
- void registeringTaskOnAPredicate () throws InterruptedException {
63
+ void registeringTaskOnAPredicate () {
60
64
setUpSource (new PerResourcePollingEventSource <>(supplier , resourceCache , PERIOD ,
61
65
testCustomResource -> testCustomResource .getMetadata ().getGeneration () > 1 ,
62
66
SampleExternalResource .class , CacheKeyMapper .singleResourceCacheKeyMapper ()));
63
67
source .onResourceCreated (testCustomResource );
64
- Thread .sleep (2 * PERIOD );
65
68
66
- verify (supplier , times (0 )).fetchResources (eq (testCustomResource ));
69
+
70
+ await ().pollDelay (Duration .ofMillis (2 * PERIOD ))
71
+ .untilAsserted (() -> verify (supplier , times (0 )).fetchResources (eq (testCustomResource )));
72
+
67
73
testCustomResource .getMetadata ().setGeneration (2L );
68
74
source .onResourceUpdated (testCustomResource , testCustomResource );
69
75
70
- Thread .sleep (2 * PERIOD );
71
76
72
- verify (supplier , atLeast (1 )).fetchResources (eq (testCustomResource ));
77
+ await ().pollDelay (Duration .ofMillis (2 * PERIOD ))
78
+ .untilAsserted (() -> verify (supplier , atLeast (1 )).fetchResources (eq (testCustomResource )));
73
79
}
74
80
75
81
@ Test
76
- void propagateEventOnDeletedResource () throws InterruptedException {
82
+ void propagateEventOnDeletedResource () {
77
83
source .onResourceCreated (testCustomResource );
78
84
when (supplier .fetchResources (any ()))
79
85
.thenReturn (Set .of (SampleExternalResource .testResource1 ()))
80
86
.thenReturn (Collections .emptySet ());
81
87
82
- Thread .sleep (3 * PERIOD );
83
- verify (supplier , atLeast (2 )).fetchResources (eq (testCustomResource ));
84
- verify (eventHandler , times (2 )).handleEvent (any ());
88
+ await ().pollDelay (Duration .ofMillis (3 * PERIOD )).untilAsserted (() -> {
89
+ verify (supplier , atLeast (2 )).fetchResources (eq (testCustomResource ));
90
+ verify (eventHandler , times (2 )).handleEvent (any ());
91
+ });
85
92
}
86
93
87
94
@ Test
88
- void getSecondaryResourceInitiatesFetchJustForFirstTime () throws InterruptedException {
95
+ void getSecondaryResourceInitiatesFetchJustForFirstTime () {
89
96
source .onResourceCreated (testCustomResource );
90
97
when (supplier .fetchResources (any ()))
91
98
.thenReturn (Set .of (SampleExternalResource .testResource1 ()))
@@ -104,31 +111,31 @@ void getSecondaryResourceInitiatesFetchJustForFirstTime() throws InterruptedExce
104
111
verify (supplier , times (1 )).fetchResources (eq (testCustomResource ));
105
112
verify (eventHandler , never ()).handleEvent (any ());
106
113
107
- Thread . sleep ( PERIOD * 2 );
108
-
109
- verify ( supplier , atLeast ( 2 )). fetchResources ( eq ( testCustomResource ) );
110
- value = source . getSecondaryResources ( testCustomResource );
111
- assertThat ( value ). hasSize ( 2 );
114
+ await (). pollDelay ( Duration . ofMillis ( PERIOD * 2 )). untilAsserted (() -> {
115
+ verify ( supplier , atLeast ( 2 )). fetchResources ( eq ( testCustomResource ));
116
+ var val = source . getSecondaryResources ( testCustomResource );
117
+ assertThat ( val ). hasSize ( 2 );
118
+ } );
112
119
}
113
120
114
121
@ Test
115
- void getsValueFromCacheOrSupplier () throws InterruptedException {
122
+ void getsValueFromCacheOrSupplier () {
116
123
source .onResourceCreated (testCustomResource );
117
124
when (supplier .fetchResources (any ()))
118
125
.thenReturn (Collections .emptySet ())
119
126
.thenReturn (Set .of (SampleExternalResource .testResource1 ()));
120
127
121
- Thread . sleep ( PERIOD / 3 );
122
-
123
- var value = source . getSecondaryResources ( testCustomResource );
124
- verify ( eventHandler , times ( 0 )). handleEvent ( any () );
125
- assertThat ( value ). isEmpty ( );
126
-
127
- Thread . sleep ( PERIOD * 2 );
128
-
129
- value = source . getSecondaryResources ( testCustomResource );
130
- assertThat ( value ). hasSize ( 1 );
131
- verify ( eventHandler , times ( 1 )). handleEvent ( any () );
128
+ await (). pollDelay ( Duration . ofMillis ( PERIOD / 3 )). untilAsserted (() -> {
129
+ var value = source . getSecondaryResources ( testCustomResource );
130
+ verify ( eventHandler , times ( 0 )). handleEvent ( any () );
131
+ assertThat ( value ). isEmpty ( );
132
+ } );
133
+
134
+ await (). pollDelay ( Duration . ofMillis ( PERIOD * 2 )). untilAsserted (() -> {
135
+ var value2 = source . getSecondaryResources ( testCustomResource );
136
+ assertThat ( value2 ). hasSize ( 1 );
137
+ verify ( eventHandler , times ( 1 )). handleEvent ( any () );
138
+ } );
132
139
}
133
140
134
141
@ Test
0 commit comments