From 4c1b6b0c9ac82a95a0fc7749d5829fa741daa360 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Tue, 3 Oct 2023 16:08:39 -0600 Subject: [PATCH 1/7] Add async functions --- .../mongodb/internal/async/AsyncConsumer.java | 26 + .../mongodb/internal/async/AsyncFunction.java | 33 + .../mongodb/internal/async/AsyncRunnable.java | 158 +++++ .../mongodb/internal/async/AsyncSupplier.java | 138 ++++ .../async/function/AsyncCallbackRunnable.java | 13 - .../RetryingAsyncCallbackSupplier.java | 7 + .../com/mongodb/client/TestListener.java | 43 ++ .../internal/async/AsyncFunctionsTest.java | 657 ++++++++++++++++++ 8 files changed, 1062 insertions(+), 13 deletions(-) create mode 100644 driver-core/src/main/com/mongodb/internal/async/AsyncConsumer.java create mode 100644 driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java create mode 100644 driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java create mode 100644 driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java create mode 100644 driver-core/src/test/functional/com/mongodb/client/TestListener.java create mode 100644 driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncConsumer.java b/driver-core/src/main/com/mongodb/internal/async/AsyncConsumer.java new file mode 100644 index 00000000000..b385670ae88 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncConsumer.java @@ -0,0 +1,26 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.async; + +/** + * See tests for usage (AsyncFunctionsTest). + *

+ * This class is not part of the public API and may be removed or changed at any time + */ +@FunctionalInterface +public interface AsyncConsumer extends AsyncFunction { +} diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java new file mode 100644 index 00000000000..8caf176dce6 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java @@ -0,0 +1,33 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.async; + +import com.mongodb.lang.Nullable; + +/** + * See tests for usage (AsyncFunctionsTest). + *

+ * This class is not part of the public API and may be removed or changed at any time + */ +@FunctionalInterface +public interface AsyncFunction { + /** + * This should not be called externally, but should be implemented as a + * lambda. To "finish" an async chain, use one of the "finish" methods. + */ + void unsafeFinish(@Nullable T value, SingleResultCallback callback); +} diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java new file mode 100644 index 00000000000..b9089252f49 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java @@ -0,0 +1,158 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.async; + +import com.mongodb.internal.async.function.RetryState; +import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier; + +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * See tests for usage (AsyncFunctionsTest). + *

+ * This class is not part of the public API and may be removed or changed at any time + */ +@FunctionalInterface +public interface AsyncRunnable extends AsyncSupplier, AsyncConsumer { + + static AsyncRunnable beginAsync() { + return (c) -> c.onResult(null, null); + } + + /** + * Must be invoked at end of async chain + * @param runnable the sync code to invoke (under non-exceptional flow) + * prior to the callback + * @param callback the callback provided by the method the chain is used in + */ + default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback callback) { + this.finish((r, e) -> { + if (e != null) { + callback.onResult(null, e); + return; + } + try { + runnable.run(); + } catch (Throwable t) { + callback.onResult(null, t); + return; + } + callback.onResult(null, null); + }); + } + + /** + * See {@link #thenRunAndFinish(Runnable, SingleResultCallback)}, but the runnable + * will always be executed, including on the exceptional path. + * @param runnable the runnable + * @param callback the callback + */ + default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback callback) { + this.finish((r, e) -> { + try { + runnable.run(); + } catch (Throwable t) { + if (e != null) { + t.addSuppressed(e); + } + callback.onResult(null, t); + return; + } + callback.onResult(null, e); + }); + } + + /** + * @param runnable The async runnable to run after this runnable + * @return the composition of this runnable and the runnable, a runnable + */ + default AsyncRunnable thenRun(final AsyncRunnable runnable) { + return (c) -> { + this.unsafeFinish((r, e) -> { + if (e == null) { + runnable.unsafeFinish(c); + } else { + c.onResult(null, e); + } + }); + }; + } + + /** + * @param condition the condition to check + * @param runnable The async runnable to run after this runnable, + * if and only if the condition is met + * @return the composition of this runnable and the runnable, a runnable + */ + default AsyncRunnable thenRunIf(final Supplier condition, final AsyncRunnable runnable) { + return (callback) -> { + this.unsafeFinish((r, e) -> { + if (e != null) { + callback.onResult(null, e); + return; + } + boolean matched; + try { + matched = condition.get(); + } catch (Throwable t) { + callback.onResult(null, t); + return; + } + if (matched) { + runnable.unsafeFinish(callback); + } else { + callback.onResult(null, null); + } + }); + }; + } + + /** + * @param supplier The supplier to supply using after this runnable + * @return the composition of this runnable and the supplier, a supplier + * @param The return type of the resulting supplier + */ + default AsyncSupplier thenSupply(final AsyncSupplier supplier) { + return (c) -> { + this.unsafeFinish((r, e) -> { + if (e == null) { + supplier.unsafeFinish(c); + } else { + c.onResult(null, e); + } + }); + }; + } + + /** + * @param runnable the runnable to loop + * @param shouldRetry condition under which to retry + * @return the composition of this, and the looping branch + * @see RetryingAsyncCallbackSupplier + */ + default AsyncRunnable thenRunRetryingWhile( + final AsyncRunnable runnable, final Predicate shouldRetry) { + return thenRun(callback -> { + new RetryingAsyncCallbackSupplier( + new RetryState(), + (rs, lastAttemptFailure) -> shouldRetry.test(lastAttemptFailure), + cb -> runnable.finish(cb) // finish is required here, to handle exceptions + ).get(callback); + }); + } +} diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java new file mode 100644 index 00000000000..7b38595bda9 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java @@ -0,0 +1,138 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.async; + +import com.mongodb.lang.Nullable; + +import java.util.function.Predicate; + + +/** + * See tests for usage (AsyncFunctionsTest). + *

+ * This class is not part of the public API and may be removed or changed at any time + */ +@FunctionalInterface +public interface AsyncSupplier extends AsyncFunction { + /** + * This should not be called externally to this API. It should be + * implemented as a lambda. To "finish" an async chain, use one of + * the "finish" methods. + * + * @see #finish(SingleResultCallback) + */ + void unsafeFinish(SingleResultCallback callback); + + /** + * This method must only be used when this AsyncSupplier corresponds + * to a {@link java.util.function.Supplier} (and is therefore being + * used within an async chain method lambda). + * @param callback the callback + */ + default void getAsync(final SingleResultCallback callback) { + unsafeFinish(callback); + } + + @Override + default void unsafeFinish(@Nullable final Void value, final SingleResultCallback callback) { + unsafeFinish(callback); + } + + /** + * Must be invoked at end of async chain. + * @param callback the callback provided by the method the chain is used in + */ + default void finish(final SingleResultCallback callback) { + final boolean[] callbackInvoked = {false}; + try { + this.unsafeFinish((v, e) -> { + callbackInvoked[0] = true; + callback.onResult(v, e); + }); + } catch (Throwable t) { + if (callbackInvoked[0]) { + throw t; + } else { + callback.onResult(null, t); + } + } + } + + /** + * @param function The async function to run after this supplier + * @return the composition of this supplier and the function, a supplier + * @param The return type of the resulting supplier + */ + default AsyncSupplier thenApply(final AsyncFunction function) { + return (c) -> { + this.unsafeFinish((v, e) -> { + if (e == null) { + function.unsafeFinish(v, c); + } else { + c.onResult(null, e); + } + }); + }; + } + + + /** + * @param consumer The async consumer to run after this supplier + * @return the composition of this supplier and the consumer, a runnable + */ + default AsyncRunnable thenConsume(final AsyncConsumer consumer) { + return (c) -> { + this.unsafeFinish((v, e) -> { + if (e == null) { + consumer.unsafeFinish(v, c); + } else { + c.onResult(null, e); + } + }); + }; + } + + /** + * @param errorCheck A check, comparable to a catch-if/otherwise-rethrow + * @param supplier The branch to execute if the error matches + * @return The composition of this, and the conditional branch + */ + default AsyncSupplier onErrorIf( + final Predicate errorCheck, + final AsyncSupplier supplier) { + return (callback) -> this.unsafeFinish((r, e) -> { + if (e == null) { + callback.onResult(r, null); + return; + } + boolean errorMatched; + try { + errorMatched = errorCheck.test(e); + } catch (Throwable t) { + t.addSuppressed(e); + callback.onResult(null, t); + return; + } + if (errorMatched) { + supplier.unsafeFinish(callback); + } else { + callback.onResult(null, e); + } + }); + } + +} diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackRunnable.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackRunnable.java index 7304a9ef9b5..02fdbdf9699 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackRunnable.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackRunnable.java @@ -32,17 +32,4 @@ public interface AsyncCallbackRunnable { */ void run(SingleResultCallback callback); - /** - * Converts this {@link AsyncCallbackSupplier} to {@link AsyncCallbackSupplier}{@code }. - */ - default AsyncCallbackSupplier asSupplier() { - return this::run; - } - - /** - * @see AsyncCallbackSupplier#whenComplete(Runnable) - */ - default AsyncCallbackRunnable whenComplete(final Runnable after) { - return callback -> asSupplier().whenComplete(after).get(callback); - } } diff --git a/driver-core/src/main/com/mongodb/internal/async/function/RetryingAsyncCallbackSupplier.java b/driver-core/src/main/com/mongodb/internal/async/function/RetryingAsyncCallbackSupplier.java index 9ebe02f5aa7..92233a072be 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/RetryingAsyncCallbackSupplier.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/RetryingAsyncCallbackSupplier.java @@ -84,6 +84,13 @@ public RetryingAsyncCallbackSupplier( this.asyncFunction = asyncFunction; } + public RetryingAsyncCallbackSupplier( + final RetryState state, + final BiPredicate retryPredicate, + final AsyncCallbackSupplier asyncFunction) { + this(state, (previouslyChosenFailure, lastAttemptFailure) -> lastAttemptFailure, retryPredicate, asyncFunction); + } + @Override public void get(final SingleResultCallback callback) { /* `asyncFunction` and `callback` are the only externally provided pieces of code for which we do not need to care about diff --git a/driver-core/src/test/functional/com/mongodb/client/TestListener.java b/driver-core/src/test/functional/com/mongodb/client/TestListener.java new file mode 100644 index 00000000000..db68065432c --- /dev/null +++ b/driver-core/src/test/functional/com/mongodb/client/TestListener.java @@ -0,0 +1,43 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client; + +import com.mongodb.annotations.ThreadSafe; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A simple listener that consumes string events, which can be checked in tests. + */ +@ThreadSafe +public final class TestListener { + private final List events = Collections.synchronizedList(new ArrayList<>()); + + public void add(final String s) { + events.add(s); + } + + public List getEventStrings() { + return new ArrayList<>(events); + } + + public void clear() { + events.clear(); + } +} diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java new file mode 100644 index 00000000000..375f7b5c555 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java @@ -0,0 +1,657 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.internal.async; + +import com.mongodb.client.TestListener; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static com.mongodb.internal.async.AsyncRunnable.beginAsync; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + +final class AsyncFunctionsTest { + private final TestListener listener = new TestListener(); + private final InvocationTracker invocationTracker = new InvocationTracker(); + + @Test + void testVariations1() { + /* + In our async code: + 1. a callback is provided as a method parameter + 2. at least one sync method must be converted to async + + To use this API: + 1. start an async chain using the "beginAsync" static method + 2. use an appropriate chaining method (then...), which will provide "c" + 3. copy all sync code to that method + 4. at the async method, pass in "c" and start a new chaining method + 5. provide the original "callback" at the end of the chain via "finish" + + Async methods MUST be preceded by unaffected "plain" sync code (sync + code with no async counterpart), and this code MUST reside above the + affected method, as it appears in the sync code. Plain code after + the sync method should be supplied via one of the "finally" variants. + Safe "shared" plain code (variable and lambda declarations) which cannot + throw, may remain outside the chained invocations, for convenience. + + Plain sync code MAY throw exceptions, and SHOULD NOT attempt to handle + them asynchronously. The exceptions will be caught and handled by the + chaining methods that contain this sync code. + + Each async lambda MUST invoke its async method with "c", and MUST return + immediately after invoking that method. It MUST NOT, for example, have + a catch or finally (including close on try-with-resources) after the + invocation of the sync method. + + A braced lambda body (with no linebreak before "."), as shown below, + should be used, as this will be consistent with other usages, and allows + the async code to be more easily compared to the sync code. + */ + + // the number of expected variations is often: 1 + N methods invoked + // 1 variation with no exceptions, and N per an exception in each method + assertBehavesSameVariations(2, + () -> { + // single sync method invocations... + sync(1); + }, + (callback) -> { + // ...become a single async invocation, wrapped in begin-thenRun/finish: + beginAsync().thenRun(c -> { + async(1, c); + }).finish(callback); + }); + } + + @Test + void testVariations2() { + // tests pairs + // converting: plain-sync, sync-plain, sync-sync + // (plain-plain does not need an async chain) + + assertBehavesSameVariations(3, + () -> { + // plain (unaffected) invocations... + plain(1); + sync(2); + }, + (callback) -> { + beginAsync().thenRun(c -> { + // ...are preserved above affected methods + plain(1); + async(2, c); + }).finish(callback); + }); + + assertBehavesSameVariations(3, + () -> { + // when a plain invocation follows an affected method... + sync(1); + plain(2); + }, + (callback) -> { + // ...it is moved to its own block + beginAsync().thenRun(c -> { + async(1, c); + }).thenRunAndFinish(() -> { + plain(2); + }, callback); + }); + + assertBehavesSameVariations(3, + () -> { + // when an affected method follows an affected method + sync(1); + sync(2); + }, + (callback) -> { + // ...it is moved to its own block + beginAsync().thenRun(c -> { + async(1, c); + }).thenRun(c -> { + async(2, c); + }).finish(callback); + }); + } + + @Test + void testVariations4() { + // tests the sync-sync pair with preceding and ensuing plain methods: + assertBehavesSameVariations(5, + () -> { + plain(11); + sync(1); + plain(22); + sync(2); + }, + (callback) -> { + beginAsync().thenRun(c -> { + plain(11); + async(1, c); + }).thenRun(c -> { + plain(22); + async(2, c); + }).finish(callback); + }); + + assertBehavesSameVariations(5, + () -> { + sync(1); + plain(11); + sync(2); + plain(22); + }, + (callback) -> { + beginAsync().thenRun(c -> { + async(1, c); + }).thenRun(c -> { + plain(11); + async(2, c); + }).thenRunAndFinish(() ->{ + plain(22); + }, callback); + }); + } + + @Test + void testSupply() { + assertBehavesSameVariations(4, + () -> { + sync(0); + plain(1); + return syncReturns(2); + }, + (callback) -> { + beginAsync().thenRun(c -> { + async(0, c); + }).thenSupply(c -> { + plain(1); + asyncReturns(2, c); + }).finish(callback); + }); + } + + @SuppressWarnings("ConstantConditions") + @Test + void testFullChain() { + // tests a chain: runnable, producer, function, function, consumer + + assertBehavesSameVariations(14, + () -> { + plain(90); + sync(0); + plain(91); + sync(1); + plain(92); + int v = syncReturns(2); + plain(93); + v = syncReturns(v + 1); + plain(94); + v = syncReturns(v + 10); + plain(95); + sync(v + 100); + plain(96); + }, + (callback) -> { + beginAsync().thenRun(c -> { + plain(90); + async(0, c); + }).thenRun(c -> { + plain(91); + async(1, c); + }).thenSupply(c -> { + plain(92); + asyncReturns(2, c); + }).thenApply((v, c) -> { + plain(93); + asyncReturns(v + 1, c); + }).thenApply((v, c) -> { + plain(94); + asyncReturns(v + 10, c); + }).thenConsume((v, c) -> { + plain(95); + async(v + 100, c); + }).thenRunAndFinish(() -> { + plain(96); + }, callback); + }); + } + + @Test + void testVariationsBranching() { + assertBehavesSameVariations(5, + () -> { + if (plainTest(1)) { + sync(2); + } else { + sync(3); + } + }, + (callback) -> { + beginAsync().thenRun(c -> { + if (plainTest(1)) { + async(2, c); + } else { + async(3, c); + } + }).finish(callback); + }); + + // 2 : fail on first sync, fail on test + // 3 : true test, sync2, sync3 + // 2 : false test, sync3 + // 7 total + assertBehavesSameVariations(7, + () -> { + sync(0); + if (plainTest(1)) { + sync(2); + } + sync(3); + }, + (callback) -> { + beginAsync().thenRun(c -> { + async(0, c); + }).thenRunIf(() -> plainTest(1), c -> { + async(2, c); + }).thenRun(c -> { + async(3, c); + }).finish(callback); + }); + + // an additional affected method within the "if" branch + assertBehavesSameVariations(8, + () -> { + sync(0); + if (plainTest(1)) { + sync(21); + sync(22); + } + sync(3); + }, + (callback) -> { + beginAsync().thenRun(c -> { + async(0, c); + }).thenRunIf(() -> plainTest(1), + beginAsync().thenRun(c -> { + async(21, c); + }).thenRun((c) -> { + async(22, c); + }) + ).thenRun(c -> { + async(3, c); + }).finish(callback); + }); + } + + @Test + void testErrorIf() { + // thenSupply: + assertBehavesSameVariations(5, + () -> { + try { + return syncReturns(1); + } catch (Exception e) { + if (e.getMessage().equals(plainTest(1) ? "unexpected" : "exception-1")) { + return syncReturns(2); + } else { + throw e; + } + } + }, + (callback) -> { + beginAsync().thenSupply(c -> { + asyncReturns(1, c); + }).onErrorIf(e -> e.getMessage().equals(plainTest(1) ? "unexpected" : "exception-1"), c -> { + asyncReturns(2, c); + }).finish(callback); + }); + + // thenRun: + assertBehavesSameVariations(5, + () -> { + try { + sync(1); + } catch (Exception e) { + if (e.getMessage().equals(plainTest(1) ? "unexpected" : "exception-1")) { + sync(2); + } else { + throw e; + } + } + }, + (callback) -> { + beginAsync().thenRun(c -> { + async(1, c); + }).onErrorIf(e -> e.getMessage().equals(plainTest(1) ? "unexpected" : "exception-1"), c -> { + async(2, c); + }).finish(callback); + }); + } + + @Test + void testLoop() { + assertBehavesSameVariations(InvocationTracker.DEPTH_LIMIT * 2 + 1, + () -> { + while (true) { + try { + sync(plainTest(0) ? 1 : 2); + break; + } catch (RuntimeException e) { + if (e.getMessage().equals("exception-1")) { + continue; + } + throw e; + } + } + }, + (callback) -> { + beginAsync().thenRunRetryingWhile( + c -> sync(plainTest(0) ? 1 : 2), + e -> e.getMessage().equals("exception-1") + ).finish(callback); + }); + } + + @Test + void testFinally() { + // (in try: normal flow + exception + exception) * (in finally: normal + exception) = 6 + assertBehavesSameVariations(6, + () -> { + try { + plain(1); + sync(2); + } finally { + plain(3); + } + }, + (callback) -> { + beginAsync().thenRun(c -> { + plain(1); + async(2, c); + }).thenAlwaysRunAndFinish(() -> { + plain(3); + }, callback); + }); + } + + @Test + void testUsedAsLambda() { + assertBehavesSameVariations(4, + () -> { + Supplier s = () -> syncReturns(9); + sync(0); + plain(1); + return s.get(); + }, + (callback) -> { + AsyncSupplier s = (c) -> asyncReturns(9, c); + beginAsync().thenRun(c -> { + async(0, c); + }).thenSupply((c) -> { + plain(1); + s.getAsync(c); + }).finish(callback); + }); + } + + @Test + void testVariables() { + assertBehavesSameVariations(3, + () -> { + int something; + something = 90; + sync(something); + something = something + 10; + sync(something); + }, + (callback) -> { + // Certain variables may need to be shared; these can be + // declared (but not initialized) outside the async chain. + // Any container works (atomic allowed but not needed) + final int[] something = new int[1]; + beginAsync().thenRun(c -> { + something[0] = 90; + async(something[0], c); + }).thenRun((c) -> { + something[0] = something[0] + 10; + async(something[0], c); + }).finish(callback); + }); + } + + @Test + void testInvalid() { + assertThrows(IllegalStateException.class, () -> { + beginAsync().thenRun(c -> { + async(3, c); + throw new IllegalStateException("must not cause second callback invocation"); + }).finish((v, e) -> {}); + }); + assertThrows(IllegalStateException.class, () -> { + beginAsync().thenRun(c -> { + async(3, c); + }).finish((v, e) -> { + throw new IllegalStateException("must not cause second callback invocation"); + }); + }); + } + + // invoked methods: + + private void plain(final int i) { + int cur = invocationTracker.getNextOption(2); + if (cur == 0) { + listener.add("plain-exception-" + i); + throw new RuntimeException("affected method exception-" + i); + } else { + listener.add("plain-success-" + i); + } + } + + private boolean plainTest(final int i) { + int cur = invocationTracker.getNextOption(3); + if (cur == 0) { + listener.add("plain-exception-" + i); + throw new RuntimeException("affected method exception-" + i); + } else if (cur == 1) { + listener.add("plain-false-" + i); + return false; + } else { + listener.add("plain-true-" + i); + return true; + } + } + + private void sync(final int i) { + int cur = invocationTracker.getNextOption(2); + if (cur == 0) { + listener.add("affected-exception-" + i); + throw new RuntimeException("exception-" + i); + } else { + listener.add("affected-success-" + i); + } + } + + private Integer syncReturns(final int i) { + int cur = invocationTracker.getNextOption(2); + if (cur == 0) { + listener.add("affected-exception-" + i); + throw new RuntimeException("exception-" + i); + } else { + listener.add("affected-success-" + i); + return i; + } + } + + private void async(final int i, final SingleResultCallback callback) { + try { + sync(i); + callback.onResult(null, null); + } catch (Throwable t) { + callback.onResult(null, t); + } + } + + private void asyncReturns(final int i, final SingleResultCallback callback) { + try { + callback.onResult(syncReturns(i), null); + } catch (Throwable t) { + callback.onResult(null, t); + } + } + + // assert methods: + + private void assertBehavesSameVariations(final int expectedVariations, final Runnable sync, + final Consumer> async) { + assertBehavesSameVariations( + expectedVariations, + () -> { + sync.run(); + return null; + }, + (c) -> { + async.accept((v, e) -> c.onResult(v, e)); + }); + } + + private void assertBehavesSameVariations(final int expectedVariations, final Supplier sync, + final Consumer> async) { + invocationTracker.reset(); + do { + invocationTracker.startInitialStep(); + assertBehavesSame( + sync, + () -> invocationTracker.startMatchStep(), + async); + + } while (invocationTracker.countDown()); + assertEquals(expectedVariations, invocationTracker.getVariationCount()); + } + + private void assertBehavesSame(final Supplier sync, final Runnable between, final Consumer> async) { + T expectedValue = null; + Throwable expectedException = null; + try { + expectedValue = sync.get(); + } catch (Throwable e) { + expectedException = e; + } + List expectedEvents = listener.getEventStrings(); + + listener.clear(); + between.run(); + + AtomicReference actualValue = new AtomicReference<>(); + AtomicReference actualException = new AtomicReference<>(); + try { + async.accept((v, e) -> { + actualValue.set(v); + actualException.set(e); + }); + } catch (Throwable e) { + fail("async threw instead of using callback"); + } + + // The following code can be used to debug variations: + // System.out.println("==="); + // System.out.println(listener.getEventStrings()); + // System.out.println("==="); + + assertEquals(expectedEvents, listener.getEventStrings(), "steps did not match"); + assertEquals(expectedValue, actualValue.get()); + assertEquals(expectedException == null, actualException.get() == null); + if (expectedException != null) { + assertEquals(expectedException.getMessage(), actualException.get().getMessage()); + assertEquals(expectedException.getClass(), actualException.get().getClass()); + } + + listener.clear(); + } + + /** + * Tracks invocations: allows testing of all variations of a method calls + */ + private static class InvocationTracker { + public static final int DEPTH_LIMIT = 50; + private final List invocationResults = new ArrayList<>(); + private boolean isMatchStep = false; // vs initial step + private int item = 0; + private int variationCount = 0; + + public void reset() { + variationCount = 0; + } + + public void startInitialStep() { + variationCount++; + isMatchStep = false; + item = -1; + } + + public int getNextOption(final int myOptionsSize) { + item++; + if (item >= invocationResults.size()) { + if (isMatchStep) { + fail("result should have been pre-initialized: steps may not match"); + } + if (isWithinDepthLimit()) { + invocationResults.add(myOptionsSize - 1); + } else { + invocationResults.add(0); // choose "0" option, usually an exception + } + } + return invocationResults.get(item); + } + + public void startMatchStep() { + isMatchStep = true; + item = -1; + } + + private boolean countDown() { + while (!invocationResults.isEmpty()) { + int lastItemIndex = invocationResults.size() - 1; + int lastItem = invocationResults.get(lastItemIndex); + if (lastItem > 0) { + // count current digit down by 1, until 0 + invocationResults.set(lastItemIndex, lastItem - 1); + return true; + } else { + // current digit completed, remove (move left) + invocationResults.remove(lastItemIndex); + } + } + return false; + } + + public int getVariationCount() { + return variationCount; + } + + public boolean isWithinDepthLimit() { + return invocationResults.size() < DEPTH_LIMIT; + } + } +} From 36a30f11a05aaf06933e40ca721a1d6d94f1f471 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Tue, 14 Nov 2023 09:07:49 -0700 Subject: [PATCH 2/7] Update async functions --- .../connection/AsyncCompletionHandler.java | 14 + .../mongodb/internal/async/AsyncFunction.java | 4 +- .../mongodb/internal/async/AsyncSupplier.java | 13 +- .../internal/async/SingleResultCallback.java | 30 + .../internal/async/AsyncFunctionsTest.java | 707 +++++++++++++++--- 5 files changed, 662 insertions(+), 106 deletions(-) diff --git a/driver-core/src/main/com/mongodb/connection/AsyncCompletionHandler.java b/driver-core/src/main/com/mongodb/connection/AsyncCompletionHandler.java index 893c5f0eedf..a286f346427 100644 --- a/driver-core/src/main/com/mongodb/connection/AsyncCompletionHandler.java +++ b/driver-core/src/main/com/mongodb/connection/AsyncCompletionHandler.java @@ -16,6 +16,7 @@ package com.mongodb.connection; +import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.lang.Nullable; /** @@ -38,4 +39,17 @@ public interface AsyncCompletionHandler { * @param t the exception that describes the failure */ void failed(Throwable t); + + /** + * @return this handler as a callback + */ + default SingleResultCallback asCallback() { + return (r, t) -> { + if (t != null) { + failed(t); + } else { + completed(r); + } + }; + } } diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java index 8caf176dce6..76dbccc5081 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java @@ -16,8 +16,6 @@ package com.mongodb.internal.async; -import com.mongodb.lang.Nullable; - /** * See tests for usage (AsyncFunctionsTest). *

@@ -29,5 +27,5 @@ public interface AsyncFunction { * This should not be called externally, but should be implemented as a * lambda. To "finish" an async chain, use one of the "finish" methods. */ - void unsafeFinish(@Nullable T value, SingleResultCallback callback); + void unsafeFinish(T value, SingleResultCallback callback); } diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java index 7b38595bda9..ede848eb344 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java @@ -16,8 +16,6 @@ package com.mongodb.internal.async; -import com.mongodb.lang.Nullable; - import java.util.function.Predicate; @@ -38,6 +36,7 @@ public interface AsyncSupplier extends AsyncFunction { void unsafeFinish(SingleResultCallback callback); /** + * This is the async variant of a supplier's get method. * This method must only be used when this AsyncSupplier corresponds * to a {@link java.util.function.Supplier} (and is therefore being * used within an async chain method lambda). @@ -48,7 +47,7 @@ default void getAsync(final SingleResultCallback callback) { } @Override - default void unsafeFinish(@Nullable final Void value, final SingleResultCallback callback) { + default void unsafeFinish(final Void value, final SingleResultCallback callback) { unsafeFinish(callback); } @@ -108,13 +107,13 @@ default AsyncRunnable thenConsume(final AsyncConsumer consumer) { /** * @param errorCheck A check, comparable to a catch-if/otherwise-rethrow - * @param supplier The branch to execute if the error matches + * @param errorFunction The branch to execute if the error matches * @return The composition of this, and the conditional branch */ default AsyncSupplier onErrorIf( final Predicate errorCheck, - final AsyncSupplier supplier) { - return (callback) -> this.unsafeFinish((r, e) -> { + final AsyncFunction errorFunction) { + return (callback) -> this.finish((r, e) -> { if (e == null) { callback.onResult(r, null); return; @@ -128,7 +127,7 @@ default AsyncSupplier onErrorIf( return; } if (errorMatched) { - supplier.unsafeFinish(callback); + errorFunction.unsafeFinish(e, callback); } else { callback.onResult(null, e); } diff --git a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java index 573c1ba423c..224dae62179 100644 --- a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java +++ b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java @@ -16,6 +16,8 @@ package com.mongodb.internal.async; +import com.mongodb.assertions.Assertions; +import com.mongodb.connection.AsyncCompletionHandler; import com.mongodb.internal.async.function.AsyncCallbackFunction; import com.mongodb.lang.Nullable; @@ -34,4 +36,32 @@ public interface SingleResultCallback { * @throws Error Never, on the best effort basis. */ void onResult(@Nullable T result, @Nullable Throwable t); + + /** + * @return this callback as a handler + */ + default AsyncCompletionHandler asHandler() { + return new AsyncCompletionHandler() { + @Override + public void completed(@Nullable final T result) { + onResult(result, null); + } + @Override + public void failed(final Throwable t) { + onResult(null, t); + } + }; + } + + default void complete(final SingleResultCallback callback) { + // takes a void callback (itself) to help ensure that this method + // is not accidentally used when "complete(T)" should have been used + // instead, since results are not marked nullable. + Assertions.assertTrue(callback == this); + this.onResult(null, null); + } + + default void complete(final T result) { + this.onResult(result, null); + } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java index 375f7b5c555..ad575e1b372 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java @@ -17,55 +17,92 @@ import com.mongodb.client.TestListener; import org.junit.jupiter.api.Test; +import org.opentest4j.AssertionFailedError; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; +import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.internal.async.AsyncRunnable.beginAsync; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; final class AsyncFunctionsTest { private final TestListener listener = new TestListener(); private final InvocationTracker invocationTracker = new InvocationTracker(); + private boolean isTestingAbruptCompletion = false; @Test - void testVariations1() { + void testBasicVariations1() { /* - In our async code: - 1. a callback is provided as a method parameter - 2. at least one sync method must be converted to async - - To use this API: - 1. start an async chain using the "beginAsync" static method - 2. use an appropriate chaining method (then...), which will provide "c" - 3. copy all sync code to that method - 4. at the async method, pass in "c" and start a new chaining method + Some of our methods have "Async" counterparts. These "Async" methods + must implement the same behaviour asynchronously. In these "Async" + methods, a SingleResultCallback is provided as a parameter, and the + method calls at least one other "Async" method (or it invokes a + non-driver async API). + + The API tested here facilitates the writing of such methods using + standardized, tested, and non-nested boilerplate. For example, given + the following "sync" method: + + public T myMethod() + sync(1); + } + + The async counterpart would be: + + public void myMethodAsync(SingleResultCallback callback) + beginAsync().thenRun(c -> { // 1, 2 + async(1, c); // 3, 4 + }).finish(callback); // 5 + } + + Usage: + 1. Start an async chain using the "beginAsync" static method. + 2. Use an appropriate chaining method (then...), which will provide "c" + 3. copy all sync code into that method; convert sync methods to async + 4. at any async method, pass in "c", and end that "block" 5. provide the original "callback" at the end of the chain via "finish" - Async methods MUST be preceded by unaffected "plain" sync code (sync - code with no async counterpart), and this code MUST reside above the - affected method, as it appears in the sync code. Plain code after - the sync method should be supplied via one of the "finally" variants. - Safe "shared" plain code (variable and lambda declarations) which cannot - throw, may remain outside the chained invocations, for convenience. + (The above example is tested at the end of this method, and other tests + will provide additional examples.) - Plain sync code MAY throw exceptions, and SHOULD NOT attempt to handle - them asynchronously. The exceptions will be caught and handled by the - chaining methods that contain this sync code. + Requirements and conventions: Each async lambda MUST invoke its async method with "c", and MUST return immediately after invoking that method. It MUST NOT, for example, have a catch or finally (including close on try-with-resources) after the - invocation of the sync method. + invocation of the async method. - A braced lambda body (with no linebreak before "."), as shown below, - should be used, as this will be consistent with other usages, and allows - the async code to be more easily compared to the sync code. + In cases where the async method has "mixed" returns (some of which are + plain sync, some async), the "c" callback MUST be completed on the + plain sync path, `c.complete()`, followed by a return or end of method. + + Chains starting with "beginAsync" correspond roughly to code blocks. + This includes the method body, blocks used in if/try/catch/while/etc. + statements, and places where anonymous code blocks might be used. For + clarity, such nested/indented chains might be omitted (where possible, + as demonstrated in the tests/examples below). + + Plain sync code MAY throw exceptions, and SHOULD NOT attempt to handle + them asynchronously. The exceptions will be caught and handled by the + code blocks that contain this sync code. + + All code, including "plain" code (parameter checks) SHOULD be placed + within the "boilerplate". This ensures that exceptions are handled, + and facilitates comparison/review. This excludes code that must be + "shared", such as lambda and variable declarations. + + A curly-braced lambda body (with no linebreak before "."), as shown + below, SHOULD be used (for consistency, and ease of comparison/review). */ // the number of expected variations is often: 1 + N methods invoked @@ -81,10 +118,19 @@ A braced lambda body (with no linebreak before "."), as shown below, async(1, c); }).finish(callback); }); + /* + Code review checklist for async code: + + 1. Is everything inside the boilerplate? + 2. Is "callback" supplied to "finish"? + 3. In each block and nested block, is that same block's "c" always passed/completed at the end of execution? + 4. Is every c.complete followed by a return, to end execution? + 5. Have all sync method calls been converted to async, where needed? + */ } @Test - void testVariations2() { + void testBasicVariations2() { // tests pairs // converting: plain-sync, sync-plain, sync-sync // (plain-plain does not need an async chain) @@ -110,12 +156,13 @@ void testVariations2() { plain(2); }, (callback) -> { - // ...it is moved to its own block + // ...it is moved to its own block, and must be completed: beginAsync().thenRun(c -> { async(1, c); - }).thenRunAndFinish(() -> { + }).thenRun(c -> { plain(2); - }, callback); + c.complete(c); + }).finish(callback); }); assertBehavesSameVariations(3, @@ -135,7 +182,7 @@ void testVariations2() { } @Test - void testVariations4() { + void testBasicVariations4() { // tests the sync-sync pair with preceding and ensuing plain methods: assertBehavesSameVariations(5, () -> { @@ -191,11 +238,33 @@ void testSupply() { }); } + @Test + void testSupplyMixed() { + assertBehavesSameVariations(5, + () -> { + if (plainTest(1)) { + return syncReturns(11); + } else { + return plainReturns(22); + } + }, + (callback) -> { + beginAsync().thenSupply(c -> { + if (plainTest(1)) { + asyncReturns(11, c); + } else { + int r = plainReturns(22); + c.complete(r); // corresponds to a return, and + // must be followed by a return or end of method + } + }).finish(callback); + }); + } + @SuppressWarnings("ConstantConditions") @Test void testFullChain() { - // tests a chain: runnable, producer, function, function, consumer - + // tests a chain with: runnable, producer, function, function, consumer assertBehavesSameVariations(14, () -> { plain(90); @@ -238,7 +307,7 @@ void testFullChain() { } @Test - void testVariationsBranching() { + void testConditionalVariations() { assertBehavesSameVariations(5, () -> { if (plainTest(1)) { @@ -293,11 +362,11 @@ void testVariationsBranching() { beginAsync().thenRun(c -> { async(0, c); }).thenRunIf(() -> plainTest(1), - beginAsync().thenRun(c -> { - async(21, c); - }).thenRun((c) -> { - async(22, c); - }) + beginAsync().thenRun(c -> { + async(21, c); + }).thenRun((c) -> { + async(22, c); + }) ).thenRun(c -> { async(3, c); }).finish(callback); @@ -305,7 +374,289 @@ void testVariationsBranching() { } @Test - void testErrorIf() { + void testMixedConditionalCascade() { + assertBehavesSameVariations(9, + () -> { + boolean test1 = plainTest(1); + if (test1) { + return syncReturns(11); + } + boolean test2 = plainTest(2); + if (test2) { + return 22; + } + int x = syncReturns(33); + plain(x + 100); + return syncReturns(44); + }, + (callback) -> { + beginAsync().thenSupply(c -> { + boolean test1 = plainTest(1); + if (test1) { + asyncReturns(11, c); + return; + } + boolean test2 = plainTest(2); + if (test2) { + c.complete(22); + return; + } + beginAsync().thenSupply(c2 -> { + asyncReturns(33, c2); + }).thenApply((x, c2) -> { + plain(assertNotNull(x) + 100); + asyncReturns(44, c2); + }).finish(c); + }).finish(callback); + }); + } + + @Test + void testPlain() { + // for completeness; should not be used, since there is no async + assertBehavesSameVariations(2, + () -> { + plain(1); + }, + (callback) -> { + beginAsync().thenRun(c -> { + plain(1); + c.complete(c); + }).finish(callback); + }); + } + + @Test + void testTryCatch() { + // single method in both try and catch + assertBehavesSameVariations(3, + () -> { + try { + sync(1); + } catch (Throwable t) { + sync(2); + } + }, + (callback) -> { + beginAsync().thenRun(c -> { + async(1, c); + }).onErrorIf(t -> true, (t, c) -> { + async(2, c); + }).finish(callback); + }); + + // mixed sync/plain + assertBehavesSameVariations(3, + () -> { + try { + sync(1); + } catch (Throwable t) { + plain(2); + } + }, + (callback) -> { + beginAsync().thenRun(c -> { + async(1, c); + }).onErrorIf(t -> true, (t, c) -> { + plain(2); + c.complete(c); + }).finish(callback); + }); + + // chain of 2 in try + // "onErrorIf" will consider everything in + // the preceding chain to be part of the try + assertBehavesSameVariations(5, + () -> { + try { + sync(1); + sync(2); + } catch (Throwable t) { + sync(9); + } + }, + (callback) -> { + beginAsync().thenRun(c -> { + async(1, c); + }).thenRun(c -> { + async(2, c); + }).onErrorIf(t -> true, (t, c) -> { + async(9, c); + }).finish(callback); + }); + + // chain of 2 in catch + assertBehavesSameVariations(4, + () -> { + try { + sync(1); + } catch (Throwable t) { + sync(8); + sync(9); + } + }, + (callback) -> { + beginAsync().thenRun(c -> { + async(1, c); + }).onErrorIf(t -> true, (t, callback2) -> { + beginAsync().thenRun(c -> { + async(8, c); + }).thenRun(c -> { + async(9, c); + }).finish(callback2); + }).finish(callback); + }); + + // method after the try-catch block + // here, the try-catch must be nested (as a code block) + assertBehavesSameVariations(5, + () -> { + try { + sync(1); + } catch (Throwable t) { + sync(2); + } + sync(3); + }, + (callback) -> { + beginAsync().thenRun(c2 -> { + beginAsync().thenRun(c -> { + async(1, c); + }).onErrorIf(t -> true, (t, c) -> { + async(2, c); + }).finish(c2); + }).thenRun(c -> { + async(3, c); + }).finish(callback); + }); + + // multiple catch blocks + // WARNING: these are not exclusive; if multiple "onErrorIf" blocks + // match, they will all be executed. + assertBehavesSameVariations(5, + () -> { + try { + if (plainTest(1)) { + throw new UnsupportedOperationException("A"); + } else { + throw new IllegalStateException("B"); + } + } catch (UnsupportedOperationException t) { + sync(8); + } catch (IllegalStateException t) { + sync(9); + } + }, + (callback) -> { + beginAsync().thenRun(c -> { + if (plainTest(1)) { + throw new UnsupportedOperationException("A"); + } else { + throw new IllegalStateException("B"); + } + }).onErrorIf(t -> t instanceof UnsupportedOperationException, (t, c) -> { + async(8, c); + }).onErrorIf(t -> t instanceof IllegalStateException, (t, c) -> { + async(9, c); + }).finish(callback); + }); + } + + @Test + void testTryCatchWithVariables() { + // using supply etc. + assertBehavesSameVariations(12, + () -> { + try { + int i = plainTest(0) ? 1 : 2; + i = syncReturns(i + 10); + sync(i + 100); + } catch (Throwable t) { + sync(3); + } + }, + (callback) -> { + beginAsync().thenRun( + beginAsync().thenSupply(c -> { + int i = plainTest(0) ? 1 : 2; + asyncReturns(i + 10, c); + }).thenConsume((i, c) -> { + async(assertNotNull(i) + 100, c); + }) + ).onErrorIf(t -> true, (t, c) -> { + async(3, c); + }).finish(callback); + }); + + // using an externally-declared variable + assertBehavesSameVariations(17, + () -> { + int i = plainTest(0) ? 1 : 2; + try { + i = syncReturns(i + 10); + sync(i + 100); + } catch (Throwable t) { + sync(3); + } + sync(i + 1000); + }, + (callback) -> { + final int[] i = new int[1]; + beginAsync().thenRun(c -> { + i[0] = plainTest(0) ? 1 : 2; + c.complete(c); + }).thenRun(c -> { + beginAsync().thenSupply(c2 -> { + asyncReturns(i[0] + 10, c2); + }).thenConsume((i2, c2) -> { + i[0] = assertNotNull(i2); + async(i2 + 100, c2); + }).onErrorIf(t -> true, (t, c2) -> { + async(3, c2); + }).finish(c); + }).thenRun(c -> { + async(i[0] + 1000, c); + }).finish(callback); + }); + } + + @Test + void testTryCatchWithConditionInCatch() { + assertBehavesSameVariations(12, + () -> { + try { + sync(plainTest(0) ? 1 : 2); + sync(3); + } catch (Throwable t) { + sync(5); + if (t.getMessage().equals("exception-1")) { + throw t; + } else { + throw new RuntimeException("wrapped-" + t.getMessage(), t); + } + } + }, + (callback) -> { + beginAsync().thenRun(c -> { + async(plainTest(0) ? 1 : 2, c); + }).thenRun(c -> { + async(3, c); + }).onErrorIf(t -> true, (t, c) -> { + beginAsync().thenRun(c2 -> { + async(5, c2); + }).thenRun(c2 -> { + if (assertNotNull(t).getMessage().equals("exception-1")) { + throw (RuntimeException) t; + } else { + throw new RuntimeException("wrapped-" + t.getMessage(), t); + } + }).finish(c); + }).finish(callback); + }); + } + + @Test + void testTryCatchTestAndRethrow() { // thenSupply: assertBehavesSameVariations(5, () -> { @@ -322,7 +673,7 @@ void testErrorIf() { (callback) -> { beginAsync().thenSupply(c -> { asyncReturns(1, c); - }).onErrorIf(e -> e.getMessage().equals(plainTest(1) ? "unexpected" : "exception-1"), c -> { + }).onErrorIf(e -> e.getMessage().equals(plainTest(1) ? "unexpected" : "exception-1"), (t, c) -> { asyncReturns(2, c); }).finish(callback); }); @@ -343,7 +694,7 @@ void testErrorIf() { (callback) -> { beginAsync().thenRun(c -> { async(1, c); - }).onErrorIf(e -> e.getMessage().equals(plainTest(1) ? "unexpected" : "exception-1"), c -> { + }).onErrorIf(e -> e.getMessage().equals(plainTest(1) ? "unexpected" : "exception-1"), (t, c) -> { async(2, c); }).finish(callback); }); @@ -356,18 +707,18 @@ void testLoop() { while (true) { try { sync(plainTest(0) ? 1 : 2); - break; } catch (RuntimeException e) { if (e.getMessage().equals("exception-1")) { continue; } throw e; } + break; } }, (callback) -> { beginAsync().thenRunRetryingWhile( - c -> sync(plainTest(0) ? 1 : 2), + c -> async(plainTest(0) ? 1 : 2, c), e -> e.getMessage().equals("exception-1") ).finish(callback); }); @@ -442,6 +793,8 @@ void testVariables() { @Test void testInvalid() { + isTestingAbruptCompletion = false; + invocationTracker.isAsyncStep = true; assertThrows(IllegalStateException.class, () -> { beginAsync().thenRun(c -> { async(3, c); @@ -457,6 +810,94 @@ void testInvalid() { }); } + @Test + void testDerivation() { + // Demonstrates the progression from nested async to the API. + + // Stand-ins for sync-async methods; these "happily" do not throw + // exceptions, to avoid complicating this demo async code. + Consumer happySync = (i) -> { + invocationTracker.getNextOption(1); + listener.add("affected-success-" + i); + }; + BiConsumer> happyAsync = (i, c) -> { + happySync.accept(i); + c.complete(c); + }; + + // Standard nested async, no error handling: + assertBehavesSameVariations(1, + () -> { + happySync.accept(1); + happySync.accept(2); + }, + (callback) -> { + happyAsync.accept(1, (v, e) -> { + happyAsync.accept(2, callback); + }); + }); + + // When both methods are naively extracted, they are out of order: + assertBehavesSameVariations(1, + () -> { + happySync.accept(1); + happySync.accept(2); + }, + (callback) -> { + SingleResultCallback second = (v, e) -> { + happyAsync.accept(2, callback); + }; + SingleResultCallback first = (v, e) -> { + happyAsync.accept(1, second); + }; + first.onResult(null, null); + }); + + // We create an "AsyncRunnable" that takes a callback, which + // decouples any async methods from each other, allowing them + // to be declared in a sync-like order, and without nesting: + assertBehavesSameVariations(1, + () -> { + happySync.accept(1); + happySync.accept(2); + }, + (callback) -> { + AsyncRunnable first = (SingleResultCallback c) -> { + happyAsync.accept(1, c); + }; + AsyncRunnable second = (SingleResultCallback c) -> { + happyAsync.accept(2, c); + }; + // This is a simplified variant of the "then" methods; + // it has no error handling. It takes methods A and B, + // and returns C, which is B(A()). + AsyncRunnable combined = (c) -> { + first.unsafeFinish((r, e) -> { + second.unsafeFinish(c); + }); + }; + combined.unsafeFinish(callback); + }); + + // This combining method is added as a default method on AsyncRunnable, + // and a "finish" method wraps the resulting methods. This also adds + // exception handling and monadic short-circuiting of ensuing methods + // when an exception arises (comparable to how thrown exceptions "skip" + // ensuing code). + assertBehavesSameVariations(3, + () -> { + sync(1); + sync(2); + }, + (callback) -> { + beginAsync().thenRun(c -> { + async(1, c); + }).thenRun(c -> { + async(2, c); + }).finish(callback); + }); + } + // invoked methods: private void plain(final int i) { @@ -469,6 +910,17 @@ private void plain(final int i) { } } + private int plainReturns(final int i) { + int cur = invocationTracker.getNextOption(2); + if (cur == 0) { + listener.add("plain-exception-" + i); + throw new RuntimeException("affected method exception-" + i); + } else { + listener.add("plain-success-" + i); + return i; + } + } + private boolean plainTest(final int i) { int cur = invocationTracker.getNextOption(3); if (cur == 0) { @@ -484,6 +936,46 @@ private boolean plainTest(final int i) { } private void sync(final int i) { + assertFalse(invocationTracker.isAsyncStep); + affected(i); + } + + + private Integer syncReturns(final int i) { + assertFalse(invocationTracker.isAsyncStep); + return affectedReturns(i); + } + + private void async(final int i, final SingleResultCallback callback) { + assertTrue(invocationTracker.isAsyncStep); + if (isTestingAbruptCompletion) { + affected(i); + callback.complete(callback); + + } else { + try { + affected(i); + callback.complete(callback); + } catch (Throwable t) { + callback.onResult(null, t); + } + } + } + + private void asyncReturns(final int i, final SingleResultCallback callback) { + assertTrue(invocationTracker.isAsyncStep); + if (isTestingAbruptCompletion) { + callback.complete(affectedReturns(i)); + } else { + try { + callback.complete(affectedReturns(i)); + } catch (Throwable t) { + callback.onResult(null, t); + } + } + } + + private void affected(final int i) { int cur = invocationTracker.getNextOption(2); if (cur == 0) { listener.add("affected-exception-" + i); @@ -493,7 +985,7 @@ private void sync(final int i) { } } - private Integer syncReturns(final int i) { + private int affectedReturns(final int i) { int cur = invocationTracker.getNextOption(2); if (cur == 0) { listener.add("affected-exception-" + i); @@ -504,29 +996,11 @@ private Integer syncReturns(final int i) { } } - private void async(final int i, final SingleResultCallback callback) { - try { - sync(i); - callback.onResult(null, null); - } catch (Throwable t) { - callback.onResult(null, t); - } - } - - private void asyncReturns(final int i, final SingleResultCallback callback) { - try { - callback.onResult(syncReturns(i), null); - } catch (Throwable t) { - callback.onResult(null, t); - } - } - // assert methods: private void assertBehavesSameVariations(final int expectedVariations, final Runnable sync, final Consumer> async) { - assertBehavesSameVariations( - expectedVariations, + assertBehavesSameVariations(expectedVariations, () -> { sync.run(); return null; @@ -538,19 +1012,28 @@ private void assertBehavesSameVariations(final int expectedVariations, final Run private void assertBehavesSameVariations(final int expectedVariations, final Supplier sync, final Consumer> async) { - invocationTracker.reset(); - do { - invocationTracker.startInitialStep(); - assertBehavesSame( - sync, - () -> invocationTracker.startMatchStep(), - async); + // run the variation-trying code twice, with direct/indirect exceptions + for (int i = 0; i < 2; i++) { + isTestingAbruptCompletion = i == 0; + + // the variation-trying code: + invocationTracker.reset(); + do { + invocationTracker.startInitialStep(); + assertBehavesSame( + sync, + () -> invocationTracker.startMatchStep(), + async); + } while (invocationTracker.countDown()); + assertEquals(expectedVariations, invocationTracker.getVariationCount(), + "number of variations did not match"); + } - } while (invocationTracker.countDown()); - assertEquals(expectedVariations, invocationTracker.getVariationCount()); } - private void assertBehavesSame(final Supplier sync, final Runnable between, final Consumer> async) { + private void assertBehavesSame(final Supplier sync, final Runnable between, + final Consumer> async) { + T expectedValue = null; Throwable expectedException = null; try { @@ -565,23 +1048,40 @@ private void assertBehavesSame(final Supplier sync, final Runnable betwee AtomicReference actualValue = new AtomicReference<>(); AtomicReference actualException = new AtomicReference<>(); + AtomicBoolean wasCalled = new AtomicBoolean(false); try { async.accept((v, e) -> { actualValue.set(v); actualException.set(e); + if (wasCalled.get()) { + fail(); + } + wasCalled.set(true); }); } catch (Throwable e) { fail("async threw instead of using callback"); } // The following code can be used to debug variations: - // System.out.println("==="); - // System.out.println(listener.getEventStrings()); - // System.out.println("==="); +// System.out.println("===VARIATION START"); +// System.out.println("sync: " + expectedEvents); +// System.out.println("callback called?: " + wasCalled.get()); +// System.out.println("value -- sync: " + expectedValue + " -- async: " + actualValue.get()); +// System.out.println("excep -- sync: " + expectedException + " -- async: " + actualException.get()); +// System.out.println("exception mode: " + (isTestingAbruptCompletion +// ? "exceptions thrown directly (abrupt completion)" : "exceptions into callbacks")); +// System.out.println("===VARIATION END"); + + // show assertion failures arising in async tests + if (actualException.get() != null && actualException.get() instanceof AssertionFailedError) { + throw (AssertionFailedError) actualException.get(); + } - assertEquals(expectedEvents, listener.getEventStrings(), "steps did not match"); + assertTrue(wasCalled.get(), "callback should have been called"); + assertEquals(expectedEvents, listener.getEventStrings(), "steps should have matched"); assertEquals(expectedValue, actualValue.get()); - assertEquals(expectedException == null, actualException.get() == null); + assertEquals(expectedException == null, actualException.get() == null, + "both or neither should have produced an exception"); if (expectedException != null) { assertEquals(expectedException.getMessage(), actualException.get().getMessage()); assertEquals(expectedException.getClass(), actualException.get().getClass()); @@ -595,10 +1095,10 @@ private void assertBehavesSame(final Supplier sync, final Runnable betwee */ private static class InvocationTracker { public static final int DEPTH_LIMIT = 50; - private final List invocationResults = new ArrayList<>(); - private boolean isMatchStep = false; // vs initial step - private int item = 0; - private int variationCount = 0; + private final List invocationOptionSequence = new ArrayList<>(); + private boolean isAsyncStep; // async = matching, vs initial step = populating + private int currentInvocationIndex; + private int variationCount; public void reset() { variationCount = 0; @@ -606,41 +1106,56 @@ public void reset() { public void startInitialStep() { variationCount++; - isMatchStep = false; - item = -1; + isAsyncStep = false; + currentInvocationIndex = -1; } public int getNextOption(final int myOptionsSize) { - item++; - if (item >= invocationResults.size()) { - if (isMatchStep) { + /* + This method creates (or gets) the next invocation's option. Each + invoker of this method has the "option" to behave in various ways, + usually just success (option 1) and exceptional failure (option 0), + though some callers might have more options. A sequence of method + outcomes (options) is one "variation". Tests automatically test + all possible variations (up to a limit, to prevent infinite loops). + + Methods generally have labels, to ensure that corresponding + sync/async methods are called in the right order, but these labels + are unrelated to the "variation" logic here. There are two "modes" + (whether completion is abrupt, or not), which are also unrelated. + */ + + currentInvocationIndex++; // which invocation result we are dealing with + + if (currentInvocationIndex >= invocationOptionSequence.size()) { + if (isAsyncStep) { fail("result should have been pre-initialized: steps may not match"); } if (isWithinDepthLimit()) { - invocationResults.add(myOptionsSize - 1); + invocationOptionSequence.add(myOptionsSize - 1); } else { - invocationResults.add(0); // choose "0" option, usually an exception + invocationOptionSequence.add(0); // choose "0" option, should always be an exception } } - return invocationResults.get(item); + return invocationOptionSequence.get(currentInvocationIndex); } public void startMatchStep() { - isMatchStep = true; - item = -1; + isAsyncStep = true; + currentInvocationIndex = -1; } private boolean countDown() { - while (!invocationResults.isEmpty()) { - int lastItemIndex = invocationResults.size() - 1; - int lastItem = invocationResults.get(lastItemIndex); + while (!invocationOptionSequence.isEmpty()) { + int lastItemIndex = invocationOptionSequence.size() - 1; + int lastItem = invocationOptionSequence.get(lastItemIndex); if (lastItem > 0) { // count current digit down by 1, until 0 - invocationResults.set(lastItemIndex, lastItem - 1); + invocationOptionSequence.set(lastItemIndex, lastItem - 1); return true; } else { // current digit completed, remove (move left) - invocationResults.remove(lastItemIndex); + invocationOptionSequence.remove(lastItemIndex); } } return false; @@ -651,7 +1166,7 @@ public int getVariationCount() { } public boolean isWithinDepthLimit() { - return invocationResults.size() < DEPTH_LIMIT; + return invocationOptionSequence.size() < DEPTH_LIMIT; } } } From b662ac3d86e66fe11a16e2dee7d847c1bb6360ce Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Tue, 21 Nov 2023 13:08:48 -0700 Subject: [PATCH 3/7] Add completeExceptionally --- .../com/mongodb/assertions/Assertions.java | 4 ++-- .../mongodb/internal/async/AsyncRunnable.java | 22 +++++++++---------- .../mongodb/internal/async/AsyncSupplier.java | 12 +++++----- .../internal/async/SingleResultCallback.java | 8 +++++-- 4 files changed, 25 insertions(+), 21 deletions(-) diff --git a/driver-core/src/main/com/mongodb/assertions/Assertions.java b/driver-core/src/main/com/mongodb/assertions/Assertions.java index 98100f65b45..ae30c179e85 100644 --- a/driver-core/src/main/com/mongodb/assertions/Assertions.java +++ b/driver-core/src/main/com/mongodb/assertions/Assertions.java @@ -92,7 +92,7 @@ public static Iterable notNullElements(final String name, final Iterable< public static T notNull(final String name, final T value, final SingleResultCallback callback) { if (value == null) { IllegalArgumentException exception = new IllegalArgumentException(name + " can not be null"); - callback.onResult(null, exception); + callback.completeExceptionally(exception); throw exception; } return value; @@ -122,7 +122,7 @@ public static void isTrue(final String name, final boolean condition) { public static void isTrue(final String name, final boolean condition, final SingleResultCallback callback) { if (!condition) { IllegalStateException exception = new IllegalStateException("state should be: " + name); - callback.onResult(null, exception); + callback.completeExceptionally(exception); throw exception; } } diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java index b9089252f49..b13d50919d8 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java @@ -31,7 +31,7 @@ public interface AsyncRunnable extends AsyncSupplier, AsyncConsumer { static AsyncRunnable beginAsync() { - return (c) -> c.onResult(null, null); + return (c) -> c.complete(c); } /** @@ -43,16 +43,16 @@ static AsyncRunnable beginAsync() { default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback callback) { this.finish((r, e) -> { if (e != null) { - callback.onResult(null, e); + callback.completeExceptionally(e); return; } try { runnable.run(); } catch (Throwable t) { - callback.onResult(null, t); + callback.completeExceptionally(t); return; } - callback.onResult(null, null); + callback.complete(callback); }); } @@ -70,10 +70,10 @@ default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultC if (e != null) { t.addSuppressed(e); } - callback.onResult(null, t); + callback.completeExceptionally(t); return; } - callback.onResult(null, e); + callback.completeExceptionally(e); }); } @@ -87,7 +87,7 @@ default AsyncRunnable thenRun(final AsyncRunnable runnable) { if (e == null) { runnable.unsafeFinish(c); } else { - c.onResult(null, e); + c.completeExceptionally(e); } }); }; @@ -103,20 +103,20 @@ default AsyncRunnable thenRunIf(final Supplier condition, final AsyncRu return (callback) -> { this.unsafeFinish((r, e) -> { if (e != null) { - callback.onResult(null, e); + callback.completeExceptionally(e); return; } boolean matched; try { matched = condition.get(); } catch (Throwable t) { - callback.onResult(null, t); + callback.completeExceptionally(t); return; } if (matched) { runnable.unsafeFinish(callback); } else { - callback.onResult(null, null); + callback.complete(callback); } }); }; @@ -133,7 +133,7 @@ default AsyncSupplier thenSupply(final AsyncSupplier supplier) { if (e == null) { supplier.unsafeFinish(c); } else { - c.onResult(null, e); + c.completeExceptionally(e); } }); }; diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java index ede848eb344..2197684e295 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java @@ -66,7 +66,7 @@ default void finish(final SingleResultCallback callback) { if (callbackInvoked[0]) { throw t; } else { - callback.onResult(null, t); + callback.completeExceptionally(t); } } } @@ -82,7 +82,7 @@ default AsyncSupplier thenApply(final AsyncFunction function) { if (e == null) { function.unsafeFinish(v, c); } else { - c.onResult(null, e); + c.completeExceptionally(e); } }); }; @@ -99,7 +99,7 @@ default AsyncRunnable thenConsume(final AsyncConsumer consumer) { if (e == null) { consumer.unsafeFinish(v, c); } else { - c.onResult(null, e); + c.completeExceptionally(e); } }); }; @@ -115,7 +115,7 @@ default AsyncSupplier onErrorIf( final AsyncFunction errorFunction) { return (callback) -> this.finish((r, e) -> { if (e == null) { - callback.onResult(r, null); + callback.complete(r); return; } boolean errorMatched; @@ -123,13 +123,13 @@ default AsyncSupplier onErrorIf( errorMatched = errorCheck.test(e); } catch (Throwable t) { t.addSuppressed(e); - callback.onResult(null, t); + callback.completeExceptionally(t); return; } if (errorMatched) { errorFunction.unsafeFinish(e, callback); } else { - callback.onResult(null, e); + callback.completeExceptionally(e); } }); } diff --git a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java index 224dae62179..e2ce4f1a698 100644 --- a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java +++ b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java @@ -44,11 +44,11 @@ default AsyncCompletionHandler asHandler() { return new AsyncCompletionHandler() { @Override public void completed(@Nullable final T result) { - onResult(result, null); + complete(result); } @Override public void failed(final Throwable t) { - onResult(null, t); + completeExceptionally(t); } }; } @@ -64,4 +64,8 @@ default void complete(final SingleResultCallback callback) { default void complete(final T result) { this.onResult(result, null); } + + default void completeExceptionally(final Throwable t) { + this.onResult(null, t); + } } From d8d093de78572fcfe18173892e4ef37e2f09f121 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Tue, 21 Nov 2023 14:04:31 -0700 Subject: [PATCH 4/7] Fix NP warning --- .../com/mongodb/internal/async/SingleResultCallback.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java index e2ce4f1a698..0487ee1c61f 100644 --- a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java +++ b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java @@ -44,7 +44,11 @@ default AsyncCompletionHandler asHandler() { return new AsyncCompletionHandler() { @Override public void completed(@Nullable final T result) { - complete(result); + if (result != null) { + complete(result); + } else { + complete((SingleResultCallback) SingleResultCallback.this); + } } @Override public void failed(final Throwable t) { From 1c458d4dea1885bdbba01ecafdecd35f8a3e39f4 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Wed, 22 Nov 2023 10:25:07 -0700 Subject: [PATCH 5/7] Move and update docs, fix cast --- .../mongodb/internal/async/AsyncConsumer.java | 2 +- .../mongodb/internal/async/AsyncFunction.java | 2 +- .../mongodb/internal/async/AsyncRunnable.java | 91 +++++++++++++++- .../mongodb/internal/async/AsyncSupplier.java | 2 +- .../internal/async/SingleResultCallback.java | 6 +- .../internal/async/AsyncFunctionsTest.java | 101 +++--------------- 6 files changed, 107 insertions(+), 97 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncConsumer.java b/driver-core/src/main/com/mongodb/internal/async/AsyncConsumer.java index b385670ae88..93a10c9cd2d 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncConsumer.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncConsumer.java @@ -17,7 +17,7 @@ package com.mongodb.internal.async; /** - * See tests for usage (AsyncFunctionsTest). + * See {@link AsyncRunnable}. *

* This class is not part of the public API and may be removed or changed at any time */ diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java index 76dbccc5081..111b23f1027 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java @@ -17,7 +17,7 @@ package com.mongodb.internal.async; /** - * See tests for usage (AsyncFunctionsTest). + * See {@link AsyncRunnable} *

* This class is not part of the public API and may be removed or changed at any time */ diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java index b13d50919d8..2d0f1051b59 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java @@ -23,9 +23,94 @@ import java.util.function.Supplier; /** - * See tests for usage (AsyncFunctionsTest). - *

- * This class is not part of the public API and may be removed or changed at any time + *

See the test code (AsyncFunctionsTest) for API usage. + * + *

This API is used to write "Async" methods. These must exhibit the + * same behaviour as their sync counterparts, except asynchronously, + * and will make use of a {@link SingleResultCallback} parameter. + * + *

This API makes it easy to compare and verify async code against + * corresponding sync code, since the "shape" and ordering of the + * async code matches that of the sync code. For example, given the + * following "sync" method: + * + *

+ * public T myMethod()
+ *     method1();
+ *     method2();
+ * }
+ * + *

The async counterpart would be: + * + *

+ * public void myMethodAsync(SingleResultCallback<T> callback)
+ *     beginAsync().thenRun(c -> {
+ *         method1Async(c);
+ *     }).thenRun(c -> {
+ *         method2Async(c);
+ *     }).finish(callback);
+ * }
+ * 
+ * + *

The usage of this API is defined in its tests (AsyncFunctionsTest). + * Each test specifies the Async API code that must be used to formally + * replace a particular pattern of sync code. These tests, in a sense, + * define formal rules of replacement. + * + *

Requirements and conventions: + * + *

Each async method SHOULD start with {@link #beginAsync()}, which begins + * a chain of lambdas. Each lambda provides a callback "c" that MUST be passed + * or completed at the lambda's end of execution. The async method's "callback" + * parameter MUST be passed to {@link #finish(SingleResultCallback)}, and MUST + * NOT be used otherwise. + * + *

Consider refactoring corresponding sync code to reduce nesting or to + * otherwise improve clarity, since minor issues will often be amplified in + * the async code. + * + *

Each async lambda MUST invoke its async method with "c", and MUST return + * immediately after invoking that method. It MUST NOT, for example, have + * a catch or finally (including close on try-with-resources) after the + * invocation of the async method. + * + *

In cases where the async method has "mixed" returns (some of which are + * plain sync, some async), the "c" callback MUST be completed on the + * plain sync path, using {@link SingleResultCallback#complete(Object)} or + * {@link SingleResultCallback#complete(SingleResultCallback)}, followed by a + * return or end of method. + * + *

Chains starting with {@link #beginAsync()} correspond roughly to code + * blocks. This includes method bodies, blocks used in if/try/catch/while/etc. + * statements, and places where anonymous code blocks might be used. For + * clarity, such nested/indented chains might be omitted (where possible, + * as demonstrated in tests). + * + *

Plain sync code MAY throw exceptions, and SHOULD NOT attempt to handle + * them asynchronously. The exceptions will be caught and handled by the API. + * + *

All code, including "plain" code (parameter checks) SHOULD be placed + * within the "boilerplate". This ensures that exceptions are handled, + * and facilitates comparison/review. This excludes code that must be + * "shared", such as lambda and variable declarations. + * + *

For consistency, and ease of comparison/review, async chains SHOULD be + * formatted as in the tests; that is, with line-breaks at the curly-braces of + * lambda bodies, with no linebreak before the "." of any Async API method. + * + *

Code review checklist, for common mistakes: + * + *

    + *
  1. Is everything inside the boilerplate?
  2. + *
  3. Is "callback" supplied to "finish"?
  4. + *
  5. In each block and nested block, is that same block's "c" always + * passed/completed at the end of execution?
  6. + *
  7. Is every c.complete followed by a return, to end execution?
  8. + *
  9. Have all sync method calls been converted to async, where needed?
  10. + *
+ * + *

This class is not part of the public API and may be removed or changed + * at any time */ @FunctionalInterface public interface AsyncRunnable extends AsyncSupplier, AsyncConsumer { diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java index 2197684e295..112fbc9c61d 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java @@ -20,7 +20,7 @@ /** - * See tests for usage (AsyncFunctionsTest). + * See {@link AsyncRunnable} *

* This class is not part of the public API and may be removed or changed at any time */ diff --git a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java index 0487ee1c61f..6b3be3c77f9 100644 --- a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java +++ b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java @@ -44,11 +44,7 @@ default AsyncCompletionHandler asHandler() { return new AsyncCompletionHandler() { @Override public void completed(@Nullable final T result) { - if (result != null) { - complete(result); - } else { - complete((SingleResultCallback) SingleResultCallback.this); - } + onResult(result, null); } @Override public void failed(final Throwable t) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java index ad575e1b372..58fe10e0b29 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java @@ -41,70 +41,7 @@ final class AsyncFunctionsTest { private boolean isTestingAbruptCompletion = false; @Test - void testBasicVariations1() { - /* - Some of our methods have "Async" counterparts. These "Async" methods - must implement the same behaviour asynchronously. In these "Async" - methods, a SingleResultCallback is provided as a parameter, and the - method calls at least one other "Async" method (or it invokes a - non-driver async API). - - The API tested here facilitates the writing of such methods using - standardized, tested, and non-nested boilerplate. For example, given - the following "sync" method: - - public T myMethod() - sync(1); - } - - The async counterpart would be: - - public void myMethodAsync(SingleResultCallback callback) - beginAsync().thenRun(c -> { // 1, 2 - async(1, c); // 3, 4 - }).finish(callback); // 5 - } - - Usage: - 1. Start an async chain using the "beginAsync" static method. - 2. Use an appropriate chaining method (then...), which will provide "c" - 3. copy all sync code into that method; convert sync methods to async - 4. at any async method, pass in "c", and end that "block" - 5. provide the original "callback" at the end of the chain via "finish" - - (The above example is tested at the end of this method, and other tests - will provide additional examples.) - - Requirements and conventions: - - Each async lambda MUST invoke its async method with "c", and MUST return - immediately after invoking that method. It MUST NOT, for example, have - a catch or finally (including close on try-with-resources) after the - invocation of the async method. - - In cases where the async method has "mixed" returns (some of which are - plain sync, some async), the "c" callback MUST be completed on the - plain sync path, `c.complete()`, followed by a return or end of method. - - Chains starting with "beginAsync" correspond roughly to code blocks. - This includes the method body, blocks used in if/try/catch/while/etc. - statements, and places where anonymous code blocks might be used. For - clarity, such nested/indented chains might be omitted (where possible, - as demonstrated in the tests/examples below). - - Plain sync code MAY throw exceptions, and SHOULD NOT attempt to handle - them asynchronously. The exceptions will be caught and handled by the - code blocks that contain this sync code. - - All code, including "plain" code (parameter checks) SHOULD be placed - within the "boilerplate". This ensures that exceptions are handled, - and facilitates comparison/review. This excludes code that must be - "shared", such as lambda and variable declarations. - - A curly-braced lambda body (with no linebreak before "."), as shown - below, SHOULD be used (for consistency, and ease of comparison/review). - */ - + void test1Method() { // the number of expected variations is often: 1 + N methods invoked // 1 variation with no exceptions, and N per an exception in each method assertBehavesSameVariations(2, @@ -118,21 +55,11 @@ below, SHOULD be used (for consistency, and ease of comparison/review). async(1, c); }).finish(callback); }); - /* - Code review checklist for async code: - - 1. Is everything inside the boilerplate? - 2. Is "callback" supplied to "finish"? - 3. In each block and nested block, is that same block's "c" always passed/completed at the end of execution? - 4. Is every c.complete followed by a return, to end execution? - 5. Have all sync method calls been converted to async, where needed? - */ } @Test - void testBasicVariations2() { - // tests pairs - // converting: plain-sync, sync-plain, sync-sync + void test2Methods() { + // tests pairs, converting: plain-sync, sync-plain, sync-sync // (plain-plain does not need an async chain) assertBehavesSameVariations(3, @@ -182,8 +109,9 @@ void testBasicVariations2() { } @Test - void testBasicVariations4() { - // tests the sync-sync pair with preceding and ensuing plain methods: + void test4Methods() { + // tests the sync-sync pair with preceding and ensuing plain methods. + assertBehavesSameVariations(5, () -> { plain(11); @@ -239,7 +167,7 @@ void testSupply() { } @Test - void testSupplyMixed() { + void testSupplyWithMixedReturns() { assertBehavesSameVariations(5, () -> { if (plainTest(1)) { @@ -261,7 +189,6 @@ void testSupplyMixed() { }); } - @SuppressWarnings("ConstantConditions") @Test void testFullChain() { // tests a chain with: runnable, producer, function, function, consumer @@ -307,7 +234,7 @@ void testFullChain() { } @Test - void testConditionalVariations() { + void testConditionals() { assertBehavesSameVariations(5, () -> { if (plainTest(1)) { @@ -413,7 +340,7 @@ void testMixedConditionalCascade() { @Test void testPlain() { - // for completeness; should not be used, since there is no async + // For completeness. This should not be used, since there is no async. assertBehavesSameVariations(2, () -> { plain(1); @@ -463,9 +390,11 @@ void testTryCatch() { }).finish(callback); }); - // chain of 2 in try - // "onErrorIf" will consider everything in - // the preceding chain to be part of the try + // chain of 2 in try. + // WARNING: "onErrorIf" will consider everything in + // the preceding chain to be part of the try. + // Use nested async chains to define the beginning + // of the "try". assertBehavesSameVariations(5, () -> { try { @@ -701,7 +630,7 @@ void testTryCatchTestAndRethrow() { } @Test - void testLoop() { + void testRetryLoop() { assertBehavesSameVariations(InvocationTracker.DEPTH_LIMIT * 2 + 1, () -> { while (true) { From 8ace19c3a03d680287f38e08c925bed584808237 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Thu, 23 Nov 2023 12:15:01 -0700 Subject: [PATCH 6/7] Apply suggestions from code review Co-authored-by: Valentin Kovalenko --- .../src/main/com/mongodb/internal/async/AsyncRunnable.java | 2 +- .../src/main/com/mongodb/internal/async/AsyncSupplier.java | 2 +- .../main/com/mongodb/internal/async/SingleResultCallback.java | 4 ++-- .../src/test/functional/com/mongodb/client/TestListener.java | 4 +++- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java index 2d0f1051b59..78b21bb32c2 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java @@ -158,7 +158,7 @@ default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultC callback.completeExceptionally(t); return; } - callback.completeExceptionally(e); + callback.onResult(r, e); }); } diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java index 112fbc9c61d..2f690215a35 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java @@ -47,7 +47,7 @@ default void getAsync(final SingleResultCallback callback) { } @Override - default void unsafeFinish(final Void value, final SingleResultCallback callback) { + default void unsafeFinish(@Nullable final Void value, final SingleResultCallback callback) { unsafeFinish(callback); } diff --git a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java index 6b3be3c77f9..1d055253246 100644 --- a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java +++ b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java @@ -61,11 +61,11 @@ default void complete(final SingleResultCallback callback) { this.onResult(null, null); } - default void complete(final T result) { + default void complete(@Nullable final T result) { this.onResult(result, null); } default void completeExceptionally(final Throwable t) { - this.onResult(null, t); + this.onResult(null, assertNotNull(t)); } } diff --git a/driver-core/src/test/functional/com/mongodb/client/TestListener.java b/driver-core/src/test/functional/com/mongodb/client/TestListener.java index db68065432c..6b968f31f1b 100644 --- a/driver-core/src/test/functional/com/mongodb/client/TestListener.java +++ b/driver-core/src/test/functional/com/mongodb/client/TestListener.java @@ -34,7 +34,9 @@ public void add(final String s) { } public List getEventStrings() { - return new ArrayList<>(events); + synchronized (events) { + return new ArrayList<>(events); + } } public void clear() { From 1190273fca55c99b9dc3f88cc10c83e220cc70e7 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Thu, 23 Nov 2023 13:10:18 -0700 Subject: [PATCH 7/7] Address PR comments --- .../main/com/mongodb/internal/async/AsyncFunction.java | 5 +++++ .../main/com/mongodb/internal/async/AsyncRunnable.java | 9 ++++++--- .../main/com/mongodb/internal/async/AsyncSupplier.java | 6 +++++- .../com/mongodb/internal/async/SingleResultCallback.java | 2 ++ .../com/mongodb/internal/async/AsyncFunctionsTest.java | 2 +- 5 files changed, 19 insertions(+), 5 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java index 111b23f1027..5be92558ee0 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncFunction.java @@ -16,6 +16,8 @@ package com.mongodb.internal.async; +import com.mongodb.lang.Nullable; + /** * See {@link AsyncRunnable} *

@@ -26,6 +28,9 @@ public interface AsyncFunction { /** * This should not be called externally, but should be implemented as a * lambda. To "finish" an async chain, use one of the "finish" methods. + * + * @param value A {@code @}{@link Nullable} argument of the asynchronous function. + * @param callback the callback */ void unsafeFinish(T value, SingleResultCallback callback); } diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java index 78b21bb32c2..fcf8d61387d 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java @@ -90,7 +90,7 @@ * them asynchronously. The exceptions will be caught and handled by the API. * *

All code, including "plain" code (parameter checks) SHOULD be placed - * within the "boilerplate". This ensures that exceptions are handled, + * within the API's async lambdas. This ensures that exceptions are handled, * and facilitates comparison/review. This excludes code that must be * "shared", such as lambda and variable declarations. * @@ -101,7 +101,7 @@ *

Code review checklist, for common mistakes: * *

    - *
  1. Is everything inside the boilerplate?
  2. + *
  3. Is everything (that can be) inside the async lambdas?
  4. *
  5. Is "callback" supplied to "finish"?
  6. *
  7. In each block and nested block, is that same block's "c" always * passed/completed at the end of execution?
  8. @@ -236,7 +236,10 @@ default AsyncRunnable thenRunRetryingWhile( new RetryingAsyncCallbackSupplier( new RetryState(), (rs, lastAttemptFailure) -> shouldRetry.test(lastAttemptFailure), - cb -> runnable.finish(cb) // finish is required here, to handle exceptions + // `finish` is required here instead of `unsafeFinish` + // because only `finish` meets the contract of + // `AsyncCallbackSupplier.get`, which we implement here + cb -> runnable.finish(cb) ).get(callback); }); } diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java index 2f690215a35..b7d24dd3df5 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java @@ -16,6 +16,8 @@ package com.mongodb.internal.async; +import com.mongodb.lang.Nullable; + import java.util.function.Predicate; @@ -43,7 +45,7 @@ public interface AsyncSupplier extends AsyncFunction { * @param callback the callback */ default void getAsync(final SingleResultCallback callback) { - unsafeFinish(callback); + finish(callback); } @Override @@ -113,6 +115,8 @@ default AsyncRunnable thenConsume(final AsyncConsumer consumer) { default AsyncSupplier onErrorIf( final Predicate errorCheck, final AsyncFunction errorFunction) { + // finish is used here instead of unsafeFinish to ensure that + // exceptions thrown from the callback are properly handled return (callback) -> this.finish((r, e) -> { if (e == null) { callback.complete(r); diff --git a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java index 1d055253246..632e453d0c0 100644 --- a/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java +++ b/driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java @@ -21,6 +21,8 @@ import com.mongodb.internal.async.function.AsyncCallbackFunction; import com.mongodb.lang.Nullable; +import static com.mongodb.assertions.Assertions.assertNotNull; + /** * An interface to describe the completion of an asynchronous function, which may be represented as {@link AsyncCallbackFunction}. * diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java index 58fe10e0b29..b783b3de93b 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java @@ -943,7 +943,7 @@ private void assertBehavesSameVariations(final int expectedVariations, final final Consumer> async) { // run the variation-trying code twice, with direct/indirect exceptions for (int i = 0; i < 2; i++) { - isTestingAbruptCompletion = i == 0; + isTestingAbruptCompletion = i != 0; // the variation-trying code: invocationTracker.reset();