Skip to content

Commit 55a6fac

Browse files
committed
Add parallelize option to async_w3.eth.subscribe API
- Add test for eth.subscribe with all kwargs and make sure they are passed through when creating the subscription - Clean up old ``_subscribe_with_args``
1 parent 586138c commit 55a6fac

File tree

2 files changed

+41
-13
lines changed

2 files changed

+41
-13
lines changed

tests/core/subscriptions/test_subscription_manager.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,3 +490,42 @@ async def parallel_handler(_ctx) -> None:
490490

491491
assert subscription_manager.total_handler_calls == 3
492492
assert len(manager_tasks) == 0 # all tasks cleaned up
493+
494+
495+
@pytest.mark.asyncio
496+
async def test_eth_subscribe_api_call_with_all_kwargs(subscription_manager):
497+
async_w3 = subscription_manager._w3
498+
provider = subscription_manager._w3.provider
499+
500+
label = "test_subscription"
501+
test_ctx = "test context"
502+
503+
async def fast_parallel_handler(context) -> None:
504+
assert asyncio.current_task() in subscription_manager._tasks
505+
assert context.test_ctx == test_ctx
506+
sub = subscription_manager.get_by_id(context.subscription.id)
507+
assert sub.label == label
508+
509+
await context.subscription.unsubscribe()
510+
511+
sub_id = await async_w3.eth.subscribe(
512+
"newHeads",
513+
handler=fast_parallel_handler,
514+
handler_context={"test_ctx": test_ctx},
515+
label=label,
516+
parallelize=True,
517+
)
518+
provider._request_processor.cache_request_information(
519+
request_id=sub_id,
520+
method="eth_subscribe",
521+
params=[],
522+
response_formatters=((), (), ()),
523+
)
524+
provider._request_processor._handler_subscription_queue.put_nowait(
525+
create_subscription_message(sub_id)
526+
)
527+
528+
await subscription_manager.handle_subscriptions()
529+
530+
assert subscription_manager.total_handler_calls == 1
531+
assert len(async_w3.subscription_manager._tasks) == 0

web3/eth/async_eth.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -720,19 +720,6 @@ async def uninstall_filter(self, filter_id: HexStr) -> bool:
720720
mungers=[default_root_munger],
721721
)
722722

723-
_subscribe_with_args: Method[
724-
Callable[
725-
[
726-
SubscriptionType,
727-
Optional[Union[LogsSubscriptionArg, bool]],
728-
],
729-
Awaitable[HexStr],
730-
]
731-
] = Method(
732-
RPC.eth_subscribe,
733-
mungers=[default_root_munger],
734-
)
735-
736723
async def subscribe(
737724
self,
738725
subscription_type: SubscriptionType,
@@ -745,6 +732,7 @@ async def subscribe(
745732
handler: Optional[EthSubscriptionHandler] = None,
746733
handler_context: Optional[Dict[str, Any]] = None,
747734
label: Optional[str] = None,
735+
parallelize: Optional[bool] = None,
748736
) -> HexStr:
749737
if not isinstance(self.w3.provider, PersistentConnectionProvider):
750738
raise MethodNotSupported(
@@ -757,6 +745,7 @@ async def subscribe(
757745
handler=handler,
758746
handler_context=handler_context or {},
759747
label=label,
748+
parallelize=parallelize,
760749
)
761750
return await self.w3.subscription_manager.subscribe(sub)
762751

0 commit comments

Comments
 (0)