Skip to content

Commit 9eed365

Browse files
committed
Added an async subscribe to topic to avoid issues in test
1 parent 55c5d90 commit 9eed365

File tree

5 files changed

+53
-39
lines changed

5 files changed

+53
-39
lines changed

lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
6666
def collect(subnet_id, attestation) do
6767
join(subnet_id)
6868
SubnetInfo.new_subnet_with_attestation(subnet_id, attestation)
69-
Libp2pPort.subscribe_to_topic(topic(subnet_id), __MODULE__)
69+
Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__)
7070
end
7171

7272
@spec stop_collecting(non_neg_integer()) ::

lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,19 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
3838
:ok
3939
end
4040

41-
# # TODO: Is anyone using this function?
42-
# @spec subscribe_to_topic() :: :ok | :error
43-
# def subscribe_to_topic() do
44-
# topic()
45-
# |> Libp2pPort.subscribe_to_topic(__MODULE__)
46-
# |> case do
47-
# :ok ->
48-
# :ok
41+
@spec subscribe_to_topic() :: :ok | :error
42+
def subscribe_to_topic() do
43+
topic()
44+
|> Libp2pPort.subscribe_to_topic(__MODULE__)
45+
|> case do
46+
:ok ->
47+
:ok
4948

50-
# {:error, reason} ->
51-
# Logger.error("[Gossip] Subscription failed: '#{reason}'")
52-
# :error
53-
# end
54-
# end
49+
{:error, reason} ->
50+
Logger.error("[Gossip] Subscription failed: '#{reason}'")
51+
:error
52+
end
53+
end
5554

5655
def topic() do
5756
fork_context = ForkChoice.get_fork_digest() |> Base.encode16(case: :lower)

lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,19 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do
2626
end
2727
end
2828

29-
# TODO: Is anyone using this function?
30-
# @spec subscribe_to_topics() :: :ok | {:error, String.t()}
31-
# def subscribe_to_topics() do
32-
# Enum.each(topics(), fn topic ->
33-
# case Libp2pPort.subscribe_to_topic(topic, __MODULE__) do
34-
# :ok ->
35-
# :ok
29+
@spec subscribe_to_topics() :: :ok | {:error, String.t()}
30+
def subscribe_to_topics() do
31+
Enum.each(topics(), fn topic ->
32+
case Libp2pPort.subscribe_to_topic(topic, __MODULE__) do
33+
:ok ->
34+
:ok
3635

37-
# {:error, reason} ->
38-
# Logger.error("[Gossip] Subscription failed: '#{reason}'")
39-
# {:error, reason}
40-
# end
41-
# end)
42-
# end
36+
{:error, reason} ->
37+
Logger.error("[Gossip] Subscription failed: '#{reason}'")
38+
{:error, reason}
39+
end
40+
end)
41+
end
4342

4443
def topics() do
4544
# TODO: this doesn't take into account fork digest changes

lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,17 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
4040
"bls_to_execution_change"
4141
]
4242

43-
# TODO: Is anyone using this function?
44-
# def subscribe_to_topics() do
45-
# Enum.reduce_while(topics(), :ok, fn topic, _acc ->
46-
# case Libp2pPort.subscribe_to_topic(topic, __MODULE__) do
47-
# :ok ->
48-
# {:cont, :ok}
49-
50-
# {:error, reason} ->
51-
# {:halt, {:error, "[OperationsCollector] Subscription failed: '#{reason}'"}}
52-
# end
53-
# end)
54-
# end
43+
def subscribe_to_topics() do
44+
Enum.reduce_while(topics(), :ok, fn topic, _acc ->
45+
case Libp2pPort.subscribe_to_topic(topic, __MODULE__) do
46+
:ok ->
47+
{:cont, :ok}
48+
49+
{:error, reason} ->
50+
{:halt, {:error, "[OperationsCollector] Subscription failed: '#{reason}'"}}
51+
end
52+
end)
53+
end
5554

5655
@spec get_bls_to_execution_changes(non_neg_integer()) :: list(SignedBLSToExecutionChange.t())
5756
def get_bls_to_execution_changes(count) do

lib/libp2p_port.ex

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,23 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
256256

257257
GenServer.cast(pid, {:new_subscriber, topic_name, module})
258258

259+
call_command(pid, {:subscribe, %SubscribeToTopic{name: topic_name}})
260+
end
261+
262+
@doc """
263+
Subscribes to the given topic async, not waiting for a response at the subscribe.
264+
After this, messages published to the topicwill be received by `self()`.
265+
"""
266+
@spec async_subscribe_to_topic(GenServer.server(), String.t(), module()) ::
267+
:ok | {:error, String.t()}
268+
def async_subscribe_to_topic(pid \\ __MODULE__, topic_name, module) do
269+
:telemetry.execute([:port, :message], %{}, %{
270+
function: "async_subscribe_to_topic",
271+
direction: "elixir->"
272+
})
273+
274+
GenServer.cast(pid, {:new_subscriber, topic_name, module})
275+
259276
cast_command(pid, {:subscribe, %SubscribeToTopic{name: topic_name}})
260277
end
261278

0 commit comments

Comments
 (0)