From c35aebf08960f09a1c6876a77bdf0a9b0f29413c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 22 Mar 2024 15:54:53 -0300 Subject: [PATCH 1/7] feat: collect BLSToExecutionChange from gossip --- .../fork_choice/fork_choice.ex | 2 + .../p2p/gossip/gossipsub.ex | 8 ++-- .../p2p/gossip/handler.ex | 17 ++++++- .../p2p/gossip/operations_collector.ex | 46 +++++++++++++++++++ 4 files changed, 68 insertions(+), 5 deletions(-) create mode 100644 lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 9d4f3b130..2af3735fd 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -8,6 +8,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do alias LambdaEthereumConsensus.Beacon.BeaconChain alias LambdaEthereumConsensus.ForkChoice.{Handlers, Helpers} + alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector alias LambdaEthereumConsensus.Store.Blocks alias LambdaEthereumConsensus.Store.StoreDb alias LambdaEthereumConsensus.Validator @@ -166,6 +167,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do Handlers.notify_forkchoice_update(store, head_block) + OperationsCollector.notify_new_block(head_block) Validator.notify_new_block(head_block.slot, head_root) BeaconChain.update_fork_choice_cache( diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex b/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex index 4dfa35fd3..3d3a5a222 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex @@ -18,12 +18,12 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do topics = [ {"beacon_block", SignedBeaconBlock, &Handler.handle_beacon_block/1}, {"beacon_aggregate_and_proof", Types.SignedAggregateAndProof, - &Handler.handle_beacon_aggregate_and_proof/1} - # {"beacon_attestation_0", Types.Attestation}, + &Handler.handle_beacon_aggregate_and_proof/1}, # {"voluntary_exit", Types.SignedVoluntaryExit}, # {"proposer_slashing", Types.ProposerSlashing}, # {"attester_slashing", Types.AttesterSlashing}, - # {"bls_to_execution_change", Types.SignedBLSToExecutionChange}, + {"bls_to_execution_change", Types.SignedBLSToExecutionChange, + &Handler.handle_bls_to_execution_change/1} # {"sync_committee_contribution_and_proof", Types.SignedContributionAndProof}, # {"sync_committee_0", Types.SyncCommitteeMessage} ] @@ -44,6 +44,8 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do {Consumer, %{topic: topic, ssz_type: ssz_type, handler: handler}} end + children = children ++ [LambdaEthereumConsensus.P2P.Gossip.OperationsCollector] + Supervisor.init(children, strategy: :one_for_one) end end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex b/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex index bcf25d701..3c28c9234 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex @@ -7,9 +7,17 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do alias LambdaEthereumConsensus.Beacon.BeaconChain alias LambdaEthereumConsensus.Beacon.PendingBlocks + alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector alias LambdaEthereumConsensus.Store.BlobDb alias LambdaEthereumConsensus.Utils.BitField - alias Types.{AggregateAndProof, BlobSidecar, SignedAggregateAndProof, SignedBeaconBlock} + + alias Types.{ + AggregateAndProof, + BlobSidecar, + SignedAggregateAndProof, + SignedBeaconBlock, + SignedBLSToExecutionChange + } def handle_beacon_block(%SignedBeaconBlock{message: block} = signed_block) do current_slot = BeaconChain.get_current_slot() @@ -31,7 +39,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do root = aggregate.data.beacon_block_root |> Base.encode16() # We are getting ~500 attestations in half a second. This is overwhelming the store GenServer at the moment. - # Store.on_attestation(aggregate) + # ForkChoice.on_attestation(aggregate) Logger.debug( "[Gossip] Aggregate decoded. Total attestations: #{votes}", @@ -40,6 +48,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do ) end + def handle_bls_to_execution_change(%SignedBLSToExecutionChange{} = message) do + # TODO: validate message first + OperationsCollector.notify_bls_to_execution_change_gossip(message) + end + def handle_blob_sidecar(%BlobSidecar{index: blob_index} = blob, blob_index) do BlobDb.store_blob(blob) Logger.debug("[Gossip] Blob sidecar received, with index #{blob_index}") diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex new file mode 100644 index 000000000..da3eff23c --- /dev/null +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -0,0 +1,46 @@ +defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do + @moduledoc """ + Module that stores the operations received from gossipsub. + """ + alias Types.SignedBeaconBlock + alias Types.SignedBLSToExecutionChange + + use GenServer + + def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__) + + def notify_bls_to_execution_change_gossip(%SignedBLSToExecutionChange{} = msg) do + GenServer.cast(__MODULE__, {:bls_to_execution_change, msg}) + end + + def notify_new_block(%SignedBeaconBlock{} = block) do + GenServer.cast(__MODULE__, {:new_block, block}) + end + + @impl GenServer + def init(_init_arg) do + {:ok, %{bls_to_execution_change: []}} + end + + @impl GenServer + def handle_cast({:bls_to_execution_change, msg}, state) do + new_msgs = [msg | state.bls_to_execution_change] + {:noreply, %{state | bls_to_execution_change: new_msgs}} + end + + def handle_cast({:new_block, block}, state) do + {:noreply, filter_messages(state, block)} + end + + defp filter_messages(state, block) do + indices = + block.message.body.bls_to_execution_changes + |> MapSet.new(& &1.message.validator_index) + + bls_to_execution_changes = + state.bls_to_execution_change + |> Enum.reject(&MapSet.member?(indices, &1.message.validator_index)) + + %{state | bls_to_execution_change: bls_to_execution_changes} + end +end From c7601a5dca336edaa87da9249fd7503bfd3e37a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 22 Mar 2024 15:57:50 -0300 Subject: [PATCH 2/7] Reduce the amount of information sent --- .../p2p/gossip/operations_collector.ex | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex index da3eff23c..cf893ca96 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -14,7 +14,8 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do end def notify_new_block(%SignedBeaconBlock{} = block) do - GenServer.cast(__MODULE__, {:new_block, block}) + operations = %{bls_to_execution_changes: block.message.body.bls_to_execution_changes} + GenServer.cast(__MODULE__, {:new_block, operations}) end @impl GenServer @@ -28,13 +29,13 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do {:noreply, %{state | bls_to_execution_change: new_msgs}} end - def handle_cast({:new_block, block}, state) do - {:noreply, filter_messages(state, block)} + def handle_cast({:new_block, operations}, state) do + {:noreply, filter_messages(state, operations)} end - defp filter_messages(state, block) do + defp filter_messages(state, operations) do indices = - block.message.body.bls_to_execution_changes + operations.bls_to_execution_changes |> MapSet.new(& &1.message.validator_index) bls_to_execution_changes = From a98a36eb61833f964718072161ac9da5cc41ad60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 22 Mar 2024 18:49:29 -0300 Subject: [PATCH 3/7] Add typespecs --- .../p2p/gossip/operations_collector.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex index cf893ca96..f238ec28a 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -9,10 +9,12 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__) + @spec notify_bls_to_execution_change_gossip(SignedBLSToExecutionChange.t()) :: :ok def notify_bls_to_execution_change_gossip(%SignedBLSToExecutionChange{} = msg) do GenServer.cast(__MODULE__, {:bls_to_execution_change, msg}) end + @spec notify_new_block(SignedBeaconBlock.t()) :: :ok def notify_new_block(%SignedBeaconBlock{} = block) do operations = %{bls_to_execution_changes: block.message.body.bls_to_execution_changes} GenServer.cast(__MODULE__, {:new_block, operations}) From e08ef3d2a2dce2638ed5977a6fc82a55eb9ab2e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 22 Mar 2024 18:51:32 -0300 Subject: [PATCH 4/7] Fix dialyzer errors --- .../p2p/gossip/operations_collector.ex | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex index f238ec28a..6335dc4f4 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -2,7 +2,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do @moduledoc """ Module that stores the operations received from gossipsub. """ - alias Types.SignedBeaconBlock + alias Types.BeaconBlock alias Types.SignedBLSToExecutionChange use GenServer @@ -14,9 +14,9 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do GenServer.cast(__MODULE__, {:bls_to_execution_change, msg}) end - @spec notify_new_block(SignedBeaconBlock.t()) :: :ok - def notify_new_block(%SignedBeaconBlock{} = block) do - operations = %{bls_to_execution_changes: block.message.body.bls_to_execution_changes} + @spec notify_new_block(BeaconBlock.t()) :: :ok + def notify_new_block(%BeaconBlock{} = block) do + operations = %{bls_to_execution_changes: block.body.bls_to_execution_changes} GenServer.cast(__MODULE__, {:new_block, operations}) end From 2d41890a9a6561e1a50db2b66051d0f907e4faa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 25 Mar 2024 11:56:45 -0300 Subject: [PATCH 5/7] Add getter --- .../p2p/gossip/operations_collector.ex | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex index 6335dc4f4..6359f0a12 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -14,6 +14,12 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do GenServer.cast(__MODULE__, {:bls_to_execution_change, msg}) end + @spec notify_bls_to_execution_change_gossip(non_neg_integer()) :: + list(SignedBLSToExecutionChange.t()) + def get_bls_to_execution_changes(count) do + GenServer.call(__MODULE__, {:get_bls_to_execution_change, count}) + end + @spec notify_new_block(BeaconBlock.t()) :: :ok def notify_new_block(%BeaconBlock{} = block) do operations = %{bls_to_execution_changes: block.body.bls_to_execution_changes} @@ -25,6 +31,13 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do {:ok, %{bls_to_execution_change: []}} end + @impl GenServer + def handle_call({:get_bls_to_execution_change, count}, _from, state) do + # NOTE: we don't remove these from the state, since after a block is built + # :new_block will be called + {:reply, Enum.take(state.bls_to_execution_change, count), state} + end + @impl GenServer def handle_cast({:bls_to_execution_change, msg}, state) do new_msgs = [msg | state.bls_to_execution_change] From 7742e120c4255e2669b5eb884ae0e9b0763525d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 25 Mar 2024 11:57:20 -0300 Subject: [PATCH 6/7] fixup --- .../p2p/gossip/operations_collector.ex | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex index 6359f0a12..f7484b2aa 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -14,8 +14,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do GenServer.cast(__MODULE__, {:bls_to_execution_change, msg}) end - @spec notify_bls_to_execution_change_gossip(non_neg_integer()) :: - list(SignedBLSToExecutionChange.t()) + @spec get_bls_to_execution_changes(non_neg_integer()) :: list(SignedBLSToExecutionChange.t()) def get_bls_to_execution_changes(count) do GenServer.call(__MODULE__, {:get_bls_to_execution_change, count}) end From bb507765b99fd83e3b9068332df68bd7dc69fff3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 25 Mar 2024 12:11:14 -0300 Subject: [PATCH 7/7] Fix typo --- .../p2p/gossip/operations_collector.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex index f7484b2aa..ed44b7811 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -16,7 +16,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do @spec get_bls_to_execution_changes(non_neg_integer()) :: list(SignedBLSToExecutionChange.t()) def get_bls_to_execution_changes(count) do - GenServer.call(__MODULE__, {:get_bls_to_execution_change, count}) + GenServer.call(__MODULE__, {:get_bls_to_execution_changes, count}) end @spec notify_new_block(BeaconBlock.t()) :: :ok @@ -31,7 +31,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do end @impl GenServer - def handle_call({:get_bls_to_execution_change, count}, _from, state) do + def handle_call({:get_bls_to_execution_changes, count}, _from, state) do # NOTE: we don't remove these from the state, since after a block is built # :new_block will be called {:reply, Enum.take(state.bls_to_execution_change, count), state}