Skip to content

Commit 7c04063

Browse files
authored
feat: collect BLSToExecutionChange from gossip (#920)
1 parent fd38d93 commit 7c04063

File tree

4 files changed

+83
-5
lines changed

4 files changed

+83
-5
lines changed

lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
88

99
alias LambdaEthereumConsensus.Beacon.BeaconChain
1010
alias LambdaEthereumConsensus.ForkChoice.{Handlers, Helpers}
11+
alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector
1112
alias LambdaEthereumConsensus.Store.Blocks
1213
alias LambdaEthereumConsensus.Store.StoreDb
1314
alias LambdaEthereumConsensus.Validator
@@ -166,6 +167,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
166167

167168
Handlers.notify_forkchoice_update(store, head_block)
168169

170+
OperationsCollector.notify_new_block(head_block)
169171
Validator.notify_new_block(head_block.slot, head_root)
170172

171173
BeaconChain.update_fork_choice_cache(

lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do
1818
topics = [
1919
{"beacon_block", SignedBeaconBlock, &Handler.handle_beacon_block/1},
2020
{"beacon_aggregate_and_proof", Types.SignedAggregateAndProof,
21-
&Handler.handle_beacon_aggregate_and_proof/1}
22-
# {"beacon_attestation_0", Types.Attestation},
21+
&Handler.handle_beacon_aggregate_and_proof/1},
2322
# {"voluntary_exit", Types.SignedVoluntaryExit},
2423
# {"proposer_slashing", Types.ProposerSlashing},
2524
# {"attester_slashing", Types.AttesterSlashing},
26-
# {"bls_to_execution_change", Types.SignedBLSToExecutionChange},
25+
{"bls_to_execution_change", Types.SignedBLSToExecutionChange,
26+
&Handler.handle_bls_to_execution_change/1}
2727
# {"sync_committee_contribution_and_proof", Types.SignedContributionAndProof},
2828
# {"sync_committee_0", Types.SyncCommitteeMessage}
2929
]
@@ -44,6 +44,8 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do
4444
{Consumer, %{topic: topic, ssz_type: ssz_type, handler: handler}}
4545
end
4646

47+
children = children ++ [LambdaEthereumConsensus.P2P.Gossip.OperationsCollector]
48+
4749
Supervisor.init(children, strategy: :one_for_one)
4850
end
4951
end

lib/lambda_ethereum_consensus/p2p/gossip/handler.ex

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,17 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do
77

88
alias LambdaEthereumConsensus.Beacon.BeaconChain
99
alias LambdaEthereumConsensus.Beacon.PendingBlocks
10+
alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector
1011
alias LambdaEthereumConsensus.Store.BlobDb
1112
alias LambdaEthereumConsensus.Utils.BitField
12-
alias Types.{AggregateAndProof, BlobSidecar, SignedAggregateAndProof, SignedBeaconBlock}
13+
14+
alias Types.{
15+
AggregateAndProof,
16+
BlobSidecar,
17+
SignedAggregateAndProof,
18+
SignedBeaconBlock,
19+
SignedBLSToExecutionChange
20+
}
1321

1422
def handle_beacon_block(%SignedBeaconBlock{message: block} = signed_block) do
1523
current_slot = BeaconChain.get_current_slot()
@@ -31,7 +39,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do
3139
root = aggregate.data.beacon_block_root |> Base.encode16()
3240

3341
# We are getting ~500 attestations in half a second. This is overwhelming the store GenServer at the moment.
34-
# Store.on_attestation(aggregate)
42+
# ForkChoice.on_attestation(aggregate)
3543

3644
Logger.debug(
3745
"[Gossip] Aggregate decoded. Total attestations: #{votes}",
@@ -40,6 +48,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do
4048
)
4149
end
4250

51+
def handle_bls_to_execution_change(%SignedBLSToExecutionChange{} = message) do
52+
# TODO: validate message first
53+
OperationsCollector.notify_bls_to_execution_change_gossip(message)
54+
end
55+
4356
def handle_blob_sidecar(%BlobSidecar{index: blob_index} = blob, blob_index) do
4457
BlobDb.store_blob(blob)
4558
Logger.debug("[Gossip] Blob sidecar received, with index #{blob_index}")
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
2+
@moduledoc """
3+
Module that stores the operations received from gossipsub.
4+
"""
5+
alias Types.BeaconBlock
6+
alias Types.SignedBLSToExecutionChange
7+
8+
use GenServer
9+
10+
def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
11+
12+
@spec notify_bls_to_execution_change_gossip(SignedBLSToExecutionChange.t()) :: :ok
13+
def notify_bls_to_execution_change_gossip(%SignedBLSToExecutionChange{} = msg) do
14+
GenServer.cast(__MODULE__, {:bls_to_execution_change, msg})
15+
end
16+
17+
@spec get_bls_to_execution_changes(non_neg_integer()) :: list(SignedBLSToExecutionChange.t())
18+
def get_bls_to_execution_changes(count) do
19+
GenServer.call(__MODULE__, {:get_bls_to_execution_changes, count})
20+
end
21+
22+
@spec notify_new_block(BeaconBlock.t()) :: :ok
23+
def notify_new_block(%BeaconBlock{} = block) do
24+
operations = %{bls_to_execution_changes: block.body.bls_to_execution_changes}
25+
GenServer.cast(__MODULE__, {:new_block, operations})
26+
end
27+
28+
@impl GenServer
29+
def init(_init_arg) do
30+
{:ok, %{bls_to_execution_change: []}}
31+
end
32+
33+
@impl GenServer
34+
def handle_call({:get_bls_to_execution_changes, count}, _from, state) do
35+
# NOTE: we don't remove these from the state, since after a block is built
36+
# :new_block will be called
37+
{:reply, Enum.take(state.bls_to_execution_change, count), state}
38+
end
39+
40+
@impl GenServer
41+
def handle_cast({:bls_to_execution_change, msg}, state) do
42+
new_msgs = [msg | state.bls_to_execution_change]
43+
{:noreply, %{state | bls_to_execution_change: new_msgs}}
44+
end
45+
46+
def handle_cast({:new_block, operations}, state) do
47+
{:noreply, filter_messages(state, operations)}
48+
end
49+
50+
defp filter_messages(state, operations) do
51+
indices =
52+
operations.bls_to_execution_changes
53+
|> MapSet.new(& &1.message.validator_index)
54+
55+
bls_to_execution_changes =
56+
state.bls_to_execution_change
57+
|> Enum.reject(&MapSet.member?(indices, &1.message.validator_index))
58+
59+
%{state | bls_to_execution_change: bls_to_execution_changes}
60+
end
61+
end

0 commit comments

Comments
 (0)