Skip to content

Commit 3d0a47d

Browse files
committed
Allow task-based subscription processing via the subscription manager
- For high-throughput subscriptions, where order of processing is not important, we should allow a configuration to be set to process tasks in a task-based manner - creating a task for each subscription and processing them in parallel. - Add ``task_based`` configuration to the subscription manager, defaulting to ``False``, but allowing this to be set to true to enable task-based processing.
1 parent 78f404f commit 3d0a47d

File tree

2 files changed

+75
-7
lines changed

2 files changed

+75
-7
lines changed

web3/_utils/module_testing/persistent_connection_provider.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from dataclasses import (
44
dataclass,
55
)
6+
import time
67
from typing import (
78
TYPE_CHECKING,
89
Any,
@@ -15,6 +16,7 @@
1516
)
1617

1718
from eth_typing import (
19+
Address,
1820
ChecksumAddress,
1921
HexStr,
2022
)
@@ -38,6 +40,9 @@
3840
from web3.middleware import (
3941
ExtraDataToPOAMiddleware,
4042
)
43+
from web3.providers.persistent.request_processor import (
44+
TaskReliantQueue,
45+
)
4146
from web3.types import (
4247
BlockData,
4348
FormattedEthSubscriptionResponse,
@@ -914,3 +919,60 @@ async def test_run_forever_starts_with_0_subs_and_runs_until_task_cancelled(
914919

915920
# cleanup
916921
await clean_up_task(run_forever_task)
922+
923+
@pytest.mark.asyncio
924+
async def test_high_throughput_subscription_task_based(
925+
self, async_w3: AsyncWeb3
926+
) -> None:
927+
async_w3.provider._request_processor._handler_subscription_queue = (
928+
TaskReliantQueue(maxsize=5_000)
929+
)
930+
sub_manager = async_w3.subscription_manager
931+
sub_manager.task_based = True # turn on task-based processing
932+
933+
class Counter:
934+
val: int = 0
935+
936+
counter = Counter()
937+
938+
async def high_throughput_handler(
939+
handler_context: LogsSubscriptionContext,
940+
) -> None:
941+
handler_context.counter.val += 1
942+
if handler_context.counter.val == 5_000:
943+
await handler_context.subscription.unsubscribe()
944+
# if we awaited all 5_000 messages, we would sleep at least 5 seconds
945+
await asyncio.sleep(5 // 5_000)
946+
947+
high_throughput_subscription = LogsSubscription(
948+
# build a meaningless subscription since we are fabricating the messages
949+
Address(b"\x00" * 20),
950+
handler=high_throughput_handler,
951+
handler_context={"counter": counter},
952+
)
953+
sub_id = await sub_manager.subscribe(high_throughput_subscription)
954+
async_w3.provider._request_processor.cache_request_information(
955+
request_id=sub_id,
956+
method=RPCEndpoint("eth_subscribe"),
957+
params=[],
958+
response_formatters=((), (), ()), # type: ignore
959+
)
960+
961+
# put 5_000 messages in the queue
962+
for _ in range(5_000):
963+
async_w3.provider._request_processor._handler_subscription_queue.put_nowait(
964+
{
965+
"jsonrpc": "2.0",
966+
"method": "eth_subscription",
967+
"params": {"subscription": sub_id, "result": {"foo": "bar"}}, # type: ignore # noqa: E501
968+
}
969+
)
970+
971+
start = time.time()
972+
await sub_manager.handle_subscriptions()
973+
stop = time.time()
974+
975+
assert counter.val == 5_000
976+
977+
assert sub_manager.total_handler_calls == 5_000
978+
assert stop - start < 3, "subscription handling took too long!"

web3/providers/persistent/subscription_manager.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ def __init__(self, w3: "AsyncWeb3") -> None:
5757
self._provider = cast("PersistentConnectionProvider", w3.provider)
5858
self._subscription_container = SubscriptionContainer()
5959

60+
# turn on if order of subscription processing is not important
61+
self.task_based = False
62+
6063
# share the subscription container with the request processor so it can separate
6164
# subscriptions into different queues based on ``sub._handler`` presence
6265
self._provider._request_processor._subscription_container = (
@@ -281,14 +284,17 @@ async def handle_subscriptions(self, run_forever: bool = False) -> None:
281284
sub_id
282285
)
283286
if sub:
284-
await sub._handler(
285-
EthSubscriptionContext(
286-
self._w3,
287-
sub,
288-
formatted_sub_response["result"],
289-
**sub._handler_context,
290-
)
287+
sub_context = EthSubscriptionContext(
288+
self._w3,
289+
sub,
290+
formatted_sub_response["result"],
291+
**sub._handler_context,
291292
)
293+
if self.task_based:
294+
asyncio.create_task(sub._handler(sub_context))
295+
else:
296+
await sub._handler(sub_context)
297+
292298
except SubscriptionProcessingFinished:
293299
if not run_forever:
294300
self.logger.info(

0 commit comments

Comments
 (0)