Skip to content

Commit 4c1b6b0

Browse files
committed
Add async functions
1 parent 091a01f commit 4c1b6b0

File tree

8 files changed

+1062
-13
lines changed

8 files changed

+1062
-13
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.async;
18+
19+
/**
20+
* See tests for usage (AsyncFunctionsTest).
21+
* <p>
22+
* This class is not part of the public API and may be removed or changed at any time
23+
*/
24+
@FunctionalInterface
25+
public interface AsyncConsumer<T> extends AsyncFunction<T, Void> {
26+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.async;
18+
19+
import com.mongodb.lang.Nullable;
20+
21+
/**
22+
* See tests for usage (AsyncFunctionsTest).
23+
* <p>
24+
* This class is not part of the public API and may be removed or changed at any time
25+
*/
26+
@FunctionalInterface
27+
public interface AsyncFunction<T, R> {
28+
/**
29+
* This should not be called externally, but should be implemented as a
30+
* lambda. To "finish" an async chain, use one of the "finish" methods.
31+
*/
32+
void unsafeFinish(@Nullable T value, SingleResultCallback<R> callback);
33+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.async;
18+
19+
import com.mongodb.internal.async.function.RetryState;
20+
import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier;
21+
22+
import java.util.function.Predicate;
23+
import java.util.function.Supplier;
24+
25+
/**
26+
* See tests for usage (AsyncFunctionsTest).
27+
* <p>
28+
* This class is not part of the public API and may be removed or changed at any time
29+
*/
30+
@FunctionalInterface
31+
public interface AsyncRunnable extends AsyncSupplier<Void>, AsyncConsumer<Void> {
32+
33+
static AsyncRunnable beginAsync() {
34+
return (c) -> c.onResult(null, null);
35+
}
36+
37+
/**
38+
* Must be invoked at end of async chain
39+
* @param runnable the sync code to invoke (under non-exceptional flow)
40+
* prior to the callback
41+
* @param callback the callback provided by the method the chain is used in
42+
*/
43+
default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
44+
this.finish((r, e) -> {
45+
if (e != null) {
46+
callback.onResult(null, e);
47+
return;
48+
}
49+
try {
50+
runnable.run();
51+
} catch (Throwable t) {
52+
callback.onResult(null, t);
53+
return;
54+
}
55+
callback.onResult(null, null);
56+
});
57+
}
58+
59+
/**
60+
* See {@link #thenRunAndFinish(Runnable, SingleResultCallback)}, but the runnable
61+
* will always be executed, including on the exceptional path.
62+
* @param runnable the runnable
63+
* @param callback the callback
64+
*/
65+
default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
66+
this.finish((r, e) -> {
67+
try {
68+
runnable.run();
69+
} catch (Throwable t) {
70+
if (e != null) {
71+
t.addSuppressed(e);
72+
}
73+
callback.onResult(null, t);
74+
return;
75+
}
76+
callback.onResult(null, e);
77+
});
78+
}
79+
80+
/**
81+
* @param runnable The async runnable to run after this runnable
82+
* @return the composition of this runnable and the runnable, a runnable
83+
*/
84+
default AsyncRunnable thenRun(final AsyncRunnable runnable) {
85+
return (c) -> {
86+
this.unsafeFinish((r, e) -> {
87+
if (e == null) {
88+
runnable.unsafeFinish(c);
89+
} else {
90+
c.onResult(null, e);
91+
}
92+
});
93+
};
94+
}
95+
96+
/**
97+
* @param condition the condition to check
98+
* @param runnable The async runnable to run after this runnable,
99+
* if and only if the condition is met
100+
* @return the composition of this runnable and the runnable, a runnable
101+
*/
102+
default AsyncRunnable thenRunIf(final Supplier<Boolean> condition, final AsyncRunnable runnable) {
103+
return (callback) -> {
104+
this.unsafeFinish((r, e) -> {
105+
if (e != null) {
106+
callback.onResult(null, e);
107+
return;
108+
}
109+
boolean matched;
110+
try {
111+
matched = condition.get();
112+
} catch (Throwable t) {
113+
callback.onResult(null, t);
114+
return;
115+
}
116+
if (matched) {
117+
runnable.unsafeFinish(callback);
118+
} else {
119+
callback.onResult(null, null);
120+
}
121+
});
122+
};
123+
}
124+
125+
/**
126+
* @param supplier The supplier to supply using after this runnable
127+
* @return the composition of this runnable and the supplier, a supplier
128+
* @param <R> The return type of the resulting supplier
129+
*/
130+
default <R> AsyncSupplier<R> thenSupply(final AsyncSupplier<R> supplier) {
131+
return (c) -> {
132+
this.unsafeFinish((r, e) -> {
133+
if (e == null) {
134+
supplier.unsafeFinish(c);
135+
} else {
136+
c.onResult(null, e);
137+
}
138+
});
139+
};
140+
}
141+
142+
/**
143+
* @param runnable the runnable to loop
144+
* @param shouldRetry condition under which to retry
145+
* @return the composition of this, and the looping branch
146+
* @see RetryingAsyncCallbackSupplier
147+
*/
148+
default AsyncRunnable thenRunRetryingWhile(
149+
final AsyncRunnable runnable, final Predicate<Throwable> shouldRetry) {
150+
return thenRun(callback -> {
151+
new RetryingAsyncCallbackSupplier<Void>(
152+
new RetryState(),
153+
(rs, lastAttemptFailure) -> shouldRetry.test(lastAttemptFailure),
154+
cb -> runnable.finish(cb) // finish is required here, to handle exceptions
155+
).get(callback);
156+
});
157+
}
158+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.async;
18+
19+
import com.mongodb.lang.Nullable;
20+
21+
import java.util.function.Predicate;
22+
23+
24+
/**
25+
* See tests for usage (AsyncFunctionsTest).
26+
* <p>
27+
* This class is not part of the public API and may be removed or changed at any time
28+
*/
29+
@FunctionalInterface
30+
public interface AsyncSupplier<T> extends AsyncFunction<Void, T> {
31+
/**
32+
* This should not be called externally to this API. It should be
33+
* implemented as a lambda. To "finish" an async chain, use one of
34+
* the "finish" methods.
35+
*
36+
* @see #finish(SingleResultCallback)
37+
*/
38+
void unsafeFinish(SingleResultCallback<T> callback);
39+
40+
/**
41+
* This method must only be used when this AsyncSupplier corresponds
42+
* to a {@link java.util.function.Supplier} (and is therefore being
43+
* used within an async chain method lambda).
44+
* @param callback the callback
45+
*/
46+
default void getAsync(final SingleResultCallback<T> callback) {
47+
unsafeFinish(callback);
48+
}
49+
50+
@Override
51+
default void unsafeFinish(@Nullable final Void value, final SingleResultCallback<T> callback) {
52+
unsafeFinish(callback);
53+
}
54+
55+
/**
56+
* Must be invoked at end of async chain.
57+
* @param callback the callback provided by the method the chain is used in
58+
*/
59+
default void finish(final SingleResultCallback<T> callback) {
60+
final boolean[] callbackInvoked = {false};
61+
try {
62+
this.unsafeFinish((v, e) -> {
63+
callbackInvoked[0] = true;
64+
callback.onResult(v, e);
65+
});
66+
} catch (Throwable t) {
67+
if (callbackInvoked[0]) {
68+
throw t;
69+
} else {
70+
callback.onResult(null, t);
71+
}
72+
}
73+
}
74+
75+
/**
76+
* @param function The async function to run after this supplier
77+
* @return the composition of this supplier and the function, a supplier
78+
* @param <R> The return type of the resulting supplier
79+
*/
80+
default <R> AsyncSupplier<R> thenApply(final AsyncFunction<T, R> function) {
81+
return (c) -> {
82+
this.unsafeFinish((v, e) -> {
83+
if (e == null) {
84+
function.unsafeFinish(v, c);
85+
} else {
86+
c.onResult(null, e);
87+
}
88+
});
89+
};
90+
}
91+
92+
93+
/**
94+
* @param consumer The async consumer to run after this supplier
95+
* @return the composition of this supplier and the consumer, a runnable
96+
*/
97+
default AsyncRunnable thenConsume(final AsyncConsumer<T> consumer) {
98+
return (c) -> {
99+
this.unsafeFinish((v, e) -> {
100+
if (e == null) {
101+
consumer.unsafeFinish(v, c);
102+
} else {
103+
c.onResult(null, e);
104+
}
105+
});
106+
};
107+
}
108+
109+
/**
110+
* @param errorCheck A check, comparable to a catch-if/otherwise-rethrow
111+
* @param supplier The branch to execute if the error matches
112+
* @return The composition of this, and the conditional branch
113+
*/
114+
default AsyncSupplier<T> onErrorIf(
115+
final Predicate<Throwable> errorCheck,
116+
final AsyncSupplier<T> supplier) {
117+
return (callback) -> this.unsafeFinish((r, e) -> {
118+
if (e == null) {
119+
callback.onResult(r, null);
120+
return;
121+
}
122+
boolean errorMatched;
123+
try {
124+
errorMatched = errorCheck.test(e);
125+
} catch (Throwable t) {
126+
t.addSuppressed(e);
127+
callback.onResult(null, t);
128+
return;
129+
}
130+
if (errorMatched) {
131+
supplier.unsafeFinish(callback);
132+
} else {
133+
callback.onResult(null, e);
134+
}
135+
});
136+
}
137+
138+
}

driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackRunnable.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,4 @@ public interface AsyncCallbackRunnable {
3232
*/
3333
void run(SingleResultCallback<Void> callback);
3434

35-
/**
36-
* Converts this {@link AsyncCallbackSupplier} to {@link AsyncCallbackSupplier}{@code <Void>}.
37-
*/
38-
default AsyncCallbackSupplier<Void> asSupplier() {
39-
return this::run;
40-
}
41-
42-
/**
43-
* @see AsyncCallbackSupplier#whenComplete(Runnable)
44-
*/
45-
default AsyncCallbackRunnable whenComplete(final Runnable after) {
46-
return callback -> asSupplier().whenComplete(after).get(callback);
47-
}
4835
}

driver-core/src/main/com/mongodb/internal/async/function/RetryingAsyncCallbackSupplier.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ public RetryingAsyncCallbackSupplier(
8484
this.asyncFunction = asyncFunction;
8585
}
8686

87+
public RetryingAsyncCallbackSupplier(
88+
final RetryState state,
89+
final BiPredicate<RetryState, Throwable> retryPredicate,
90+
final AsyncCallbackSupplier<R> asyncFunction) {
91+
this(state, (previouslyChosenFailure, lastAttemptFailure) -> lastAttemptFailure, retryPredicate, asyncFunction);
92+
}
93+
8794
@Override
8895
public void get(final SingleResultCallback<R> callback) {
8996
/* `asyncFunction` and `callback` are the only externally provided pieces of code for which we do not need to care about

0 commit comments

Comments
 (0)