diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 9d4f3b130..2af3735fd 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -8,6 +8,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do alias LambdaEthereumConsensus.Beacon.BeaconChain alias LambdaEthereumConsensus.ForkChoice.{Handlers, Helpers} + alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector alias LambdaEthereumConsensus.Store.Blocks alias LambdaEthereumConsensus.Store.StoreDb alias LambdaEthereumConsensus.Validator @@ -166,6 +167,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do Handlers.notify_forkchoice_update(store, head_block) + OperationsCollector.notify_new_block(head_block) Validator.notify_new_block(head_block.slot, head_root) BeaconChain.update_fork_choice_cache( diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex b/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex index 4dfa35fd3..3d3a5a222 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex @@ -18,12 +18,12 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do topics = [ {"beacon_block", SignedBeaconBlock, &Handler.handle_beacon_block/1}, {"beacon_aggregate_and_proof", Types.SignedAggregateAndProof, - &Handler.handle_beacon_aggregate_and_proof/1} - # {"beacon_attestation_0", Types.Attestation}, + &Handler.handle_beacon_aggregate_and_proof/1}, # {"voluntary_exit", Types.SignedVoluntaryExit}, # {"proposer_slashing", Types.ProposerSlashing}, # {"attester_slashing", Types.AttesterSlashing}, - # {"bls_to_execution_change", Types.SignedBLSToExecutionChange}, + {"bls_to_execution_change", Types.SignedBLSToExecutionChange, + &Handler.handle_bls_to_execution_change/1} # {"sync_committee_contribution_and_proof", Types.SignedContributionAndProof}, # {"sync_committee_0", Types.SyncCommitteeMessage} ] @@ -44,6 +44,8 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do {Consumer, %{topic: topic, ssz_type: ssz_type, handler: handler}} end + children = children ++ [LambdaEthereumConsensus.P2P.Gossip.OperationsCollector] + Supervisor.init(children, strategy: :one_for_one) end end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex b/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex index bcf25d701..3c28c9234 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex @@ -7,9 +7,17 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do alias LambdaEthereumConsensus.Beacon.BeaconChain alias LambdaEthereumConsensus.Beacon.PendingBlocks + alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector alias LambdaEthereumConsensus.Store.BlobDb alias LambdaEthereumConsensus.Utils.BitField - alias Types.{AggregateAndProof, BlobSidecar, SignedAggregateAndProof, SignedBeaconBlock} + + alias Types.{ + AggregateAndProof, + BlobSidecar, + SignedAggregateAndProof, + SignedBeaconBlock, + SignedBLSToExecutionChange + } def handle_beacon_block(%SignedBeaconBlock{message: block} = signed_block) do current_slot = BeaconChain.get_current_slot() @@ -31,7 +39,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do root = aggregate.data.beacon_block_root |> Base.encode16() # We are getting ~500 attestations in half a second. This is overwhelming the store GenServer at the moment. - # Store.on_attestation(aggregate) + # ForkChoice.on_attestation(aggregate) Logger.debug( "[Gossip] Aggregate decoded. Total attestations: #{votes}", @@ -40,6 +48,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do ) end + def handle_bls_to_execution_change(%SignedBLSToExecutionChange{} = message) do + # TODO: validate message first + OperationsCollector.notify_bls_to_execution_change_gossip(message) + end + def handle_blob_sidecar(%BlobSidecar{index: blob_index} = blob, blob_index) do BlobDb.store_blob(blob) Logger.debug("[Gossip] Blob sidecar received, with index #{blob_index}") diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex new file mode 100644 index 000000000..ed44b7811 --- /dev/null +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -0,0 +1,61 @@ +defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do + @moduledoc """ + Module that stores the operations received from gossipsub. + """ + alias Types.BeaconBlock + alias Types.SignedBLSToExecutionChange + + use GenServer + + def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__) + + @spec notify_bls_to_execution_change_gossip(SignedBLSToExecutionChange.t()) :: :ok + def notify_bls_to_execution_change_gossip(%SignedBLSToExecutionChange{} = msg) do + GenServer.cast(__MODULE__, {:bls_to_execution_change, msg}) + end + + @spec get_bls_to_execution_changes(non_neg_integer()) :: list(SignedBLSToExecutionChange.t()) + def get_bls_to_execution_changes(count) do + GenServer.call(__MODULE__, {:get_bls_to_execution_changes, count}) + end + + @spec notify_new_block(BeaconBlock.t()) :: :ok + def notify_new_block(%BeaconBlock{} = block) do + operations = %{bls_to_execution_changes: block.body.bls_to_execution_changes} + GenServer.cast(__MODULE__, {:new_block, operations}) + end + + @impl GenServer + def init(_init_arg) do + {:ok, %{bls_to_execution_change: []}} + end + + @impl GenServer + def handle_call({:get_bls_to_execution_changes, count}, _from, state) do + # NOTE: we don't remove these from the state, since after a block is built + # :new_block will be called + {:reply, Enum.take(state.bls_to_execution_change, count), state} + end + + @impl GenServer + def handle_cast({:bls_to_execution_change, msg}, state) do + new_msgs = [msg | state.bls_to_execution_change] + {:noreply, %{state | bls_to_execution_change: new_msgs}} + end + + def handle_cast({:new_block, operations}, state) do + {:noreply, filter_messages(state, operations)} + end + + defp filter_messages(state, operations) do + indices = + operations.bls_to_execution_changes + |> MapSet.new(& &1.message.validator_index) + + bls_to_execution_changes = + state.bls_to_execution_change + |> Enum.reject(&MapSet.member?(indices, &1.message.validator_index)) + + %{state | bls_to_execution_change: bls_to_execution_changes} + end +end