Skip to content

Commit e6f5210

Browse files
committed
Allow task-based subscription processing via the subscription manager
- For high-throughput subscriptions, where order of processing is not important, we should allow a configuration to be set to process tasks in a task-based manner - creating a task for each subscription and processing them in parallel. - Add ``task_based`` configuration to the subscription manager, defaulting to ``False``, but allowing this to be set to true to enable task-based processing.
1 parent 78f404f commit e6f5210

File tree

2 files changed

+76
-7
lines changed

2 files changed

+76
-7
lines changed

web3/_utils/module_testing/persistent_connection_provider.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from dataclasses import (
44
dataclass,
55
)
6+
import time
67
from typing import (
78
TYPE_CHECKING,
89
Any,
@@ -38,6 +39,9 @@
3839
from web3.middleware import (
3940
ExtraDataToPOAMiddleware,
4041
)
42+
from web3.providers.persistent.request_processor import (
43+
TaskReliantQueue,
44+
)
4145
from web3.types import (
4246
BlockData,
4347
FormattedEthSubscriptionResponse,
@@ -914,3 +918,62 @@ async def test_run_forever_starts_with_0_subs_and_runs_until_task_cancelled(
914918

915919
# cleanup
916920
await clean_up_task(run_forever_task)
921+
922+
@pytest.mark.asyncio
923+
async def test_high_throughput_subscription_task_based(
924+
self, async_w3: AsyncWeb3
925+
) -> None:
926+
async_w3.provider._request_processor._handler_subscription_queue = (
927+
TaskReliantQueue(maxsize=5_000)
928+
)
929+
sub_manager = async_w3.subscription_manager
930+
sub_manager.task_based = True
931+
932+
async def high_throughput_handler(
933+
handler_context: LogsSubscriptionContext,
934+
) -> None:
935+
handler_context.counter.val += 1
936+
if (
937+
handler_context.async_w3.subscription_manager.total_handler_calls
938+
== 5_000
939+
):
940+
await handler_context.subscription.unsubscribe()
941+
# 3 seconds / 5_000 = 0.0006 seconds. If we awaited, we would expect this to
942+
# take at least 3 seconds to process all 5_000 messages.
943+
await asyncio.sleep(0.0006)
944+
945+
class Counter:
946+
val: int = 0
947+
948+
counter = Counter()
949+
high_throughput_subscription = LogsSubscription(
950+
"0xdead00000000000000000000000000000000beef", # type: ignore
951+
handler=high_throughput_handler,
952+
handler_context={"counter": counter},
953+
)
954+
sub_id = await sub_manager.subscribe(high_throughput_subscription)
955+
async_w3.provider._request_processor.cache_request_information(
956+
request_id=sub_id,
957+
method=RPCEndpoint("eth_subscribe"),
958+
params=["testestest"],
959+
response_formatters=((), (), ()), # type: ignore
960+
)
961+
962+
# put 5_000 messages in the queue
963+
for _ in range(5_000):
964+
async_w3.provider._request_processor._handler_subscription_queue.put_nowait(
965+
{
966+
"jsonrpc": "2.0",
967+
"method": "eth_subscription",
968+
"params": {"subscription": sub_id, "result": {"foo": "bar"}}, # type: ignore # noqa: E501
969+
}
970+
)
971+
972+
start = time.time()
973+
await sub_manager.handle_subscriptions()
974+
stop = time.time()
975+
976+
assert counter.val == 5_000
977+
978+
assert sub_manager.total_handler_calls == 5_000
979+
assert stop - start < 3, "subscription handling took too long!"

web3/providers/persistent/subscription_manager.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ def __init__(self, w3: "AsyncWeb3") -> None:
5757
self._provider = cast("PersistentConnectionProvider", w3.provider)
5858
self._subscription_container = SubscriptionContainer()
5959

60+
# turn on if order of subscription processing is not important
61+
self.task_based = False
62+
6063
# share the subscription container with the request processor so it can separate
6164
# subscriptions into different queues based on ``sub._handler`` presence
6265
self._provider._request_processor._subscription_container = (
@@ -281,14 +284,17 @@ async def handle_subscriptions(self, run_forever: bool = False) -> None:
281284
sub_id
282285
)
283286
if sub:
284-
await sub._handler(
285-
EthSubscriptionContext(
286-
self._w3,
287-
sub,
288-
formatted_sub_response["result"],
289-
**sub._handler_context,
290-
)
287+
sub_context = EthSubscriptionContext(
288+
self._w3,
289+
sub,
290+
formatted_sub_response["result"],
291+
**sub._handler_context,
291292
)
293+
if self.task_based:
294+
asyncio.create_task(sub._handler(sub_context))
295+
else:
296+
await sub._handler(sub_context)
297+
292298
except SubscriptionProcessingFinished:
293299
if not run_forever:
294300
self.logger.info(

0 commit comments

Comments
 (0)