Skip to content

Commit 9af9808

Browse files
committed
Allow task-based subscription processing via the subscription manager
- For high-throughput subscriptions, where order of processing is not important, we should allow a configuration to be set to process tasks in a task-based manner - creating a task for each subscription and processing them in parallel. - Add ``task_based`` configuration to the subscription manager, defaulting to ``False``, but allowing this to be set to true to enable task-based processing.
1 parent 78f404f commit 9af9808

File tree

2 files changed

+73
-7
lines changed

2 files changed

+73
-7
lines changed

web3/_utils/module_testing/persistent_connection_provider.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from dataclasses import (
44
dataclass,
55
)
6+
import time
67
from typing import (
78
TYPE_CHECKING,
89
Any,
@@ -38,6 +39,9 @@
3839
from web3.middleware import (
3940
ExtraDataToPOAMiddleware,
4041
)
42+
from web3.providers.persistent.request_processor import (
43+
TaskReliantQueue,
44+
)
4145
from web3.types import (
4246
BlockData,
4347
FormattedEthSubscriptionResponse,
@@ -914,3 +918,59 @@ async def test_run_forever_starts_with_0_subs_and_runs_until_task_cancelled(
914918

915919
# cleanup
916920
await clean_up_task(run_forever_task)
921+
922+
@pytest.mark.asyncio
923+
async def test_high_throughput_subscription_task_based(
924+
self, async_w3: AsyncWeb3
925+
) -> None:
926+
async_w3.provider._request_processor._handler_subscription_queue = (
927+
TaskReliantQueue(maxsize=5_000)
928+
)
929+
sub_manager = async_w3.subscription_manager
930+
sub_manager.task_based = True # turn on task-based processing
931+
932+
class Counter:
933+
val: int = 0
934+
935+
counter = Counter()
936+
937+
async def high_throughput_handler(
938+
handler_context: Any,
939+
) -> None:
940+
handler_context.counter.val += 1
941+
if handler_context.counter.val == 5_000:
942+
await handler_context.subscription.unsubscribe()
943+
# if we awaited all 5_000 messages, we would sleep at least 5 seconds
944+
await asyncio.sleep(5 // 5_000)
945+
946+
# build a meaningless subscription since we are fabricating the messages
947+
sub_id = await async_w3.eth.subscribe(
948+
"syncing",
949+
handler=high_throughput_handler,
950+
handler_context={"counter": counter},
951+
)
952+
async_w3.provider._request_processor.cache_request_information(
953+
request_id=sub_id,
954+
method=RPCEndpoint("eth_subscribe"),
955+
params=[],
956+
response_formatters=((), (), ()), # type: ignore
957+
)
958+
959+
# put 5_000 messages in the queue
960+
for _ in range(5_000):
961+
async_w3.provider._request_processor._handler_subscription_queue.put_nowait(
962+
{
963+
"jsonrpc": "2.0",
964+
"method": "eth_subscription",
965+
"params": {"subscription": HexBytes(sub_id), "result": False},
966+
}
967+
)
968+
969+
start = time.time()
970+
await sub_manager.handle_subscriptions()
971+
stop = time.time()
972+
973+
assert counter.val == 5_000
974+
975+
assert sub_manager.total_handler_calls == 5_000
976+
assert stop - start < 3, "subscription handling took too long!"

web3/providers/persistent/subscription_manager.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ def __init__(self, w3: "AsyncWeb3") -> None:
5757
self._provider = cast("PersistentConnectionProvider", w3.provider)
5858
self._subscription_container = SubscriptionContainer()
5959

60+
# turn on if order of subscription processing is not important
61+
self.task_based = False
62+
6063
# share the subscription container with the request processor so it can separate
6164
# subscriptions into different queues based on ``sub._handler`` presence
6265
self._provider._request_processor._subscription_container = (
@@ -281,14 +284,17 @@ async def handle_subscriptions(self, run_forever: bool = False) -> None:
281284
sub_id
282285
)
283286
if sub:
284-
await sub._handler(
285-
EthSubscriptionContext(
286-
self._w3,
287-
sub,
288-
formatted_sub_response["result"],
289-
**sub._handler_context,
290-
)
287+
sub_context = EthSubscriptionContext(
288+
self._w3,
289+
sub,
290+
formatted_sub_response["result"],
291+
**sub._handler_context,
291292
)
293+
if self.task_based:
294+
asyncio.create_task(sub._handler(sub_context))
295+
else:
296+
await sub._handler(sub_context)
297+
292298
except SubscriptionProcessingFinished:
293299
if not run_forever:
294300
self.logger.info(

0 commit comments

Comments
 (0)