Skip to content

Commit 2d4690d

Browse files
committed
Merge branch 'main' into validator-state-management-refactor
2 parents 7675b4b + 84ee168 commit 2d4690d

26 files changed

+439
-340
lines changed

lib/lambda_ethereum_consensus/beacon/beacon_node.ex

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,31 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
1818
@impl true
1919
def init(_) do
2020
store = StoreSetup.setup!()
21-
deposit_tree_snapshot = StoreSetup.get_deposit_snapshot!()
2221

2322
LambdaEthereumConsensus.P2P.Metadata.init()
2423

2524
Cache.initialize_cache()
2625

2726
time = :os.system_time(:second)
2827

29-
ForkChoice.init_store(store, time)
30-
31-
init_execution_chain(deposit_tree_snapshot, store.head_root)
28+
store = ForkChoice.init_store(store, time)
3229

3330
validator_set = ValidatorSet.init(store.head_slot, store.head_root)
3431

32+
StoreSetup.get_deposit_snapshot!()
33+
|> init_execution_chain(store.head_root)
34+
3535
libp2p_args =
36-
[genesis_time: store.genesis_time, validator_set: validator_set] ++ get_libp2p_args()
36+
[genesis_time: store.genesis_time, validator_set: validator_set, store: store] ++
37+
get_libp2p_args()
3738

3839
children =
3940
[
4041
{LambdaEthereumConsensus.Libp2pPort, libp2p_args},
4142
{Task.Supervisor, name: PruneStatesSupervisor},
4243
{Task.Supervisor, name: PruneBlocksSupervisor},
43-
{Task.Supervisor, name: PruneBlobsSupervisor}
44+
{Task.Supervisor, name: PruneBlobsSupervisor},
45+
{Task.Supervisor, name: StoreStatesSupervisor}
4446
]
4547

4648
Supervisor.init(children, strategy: :one_for_all)

lib/lambda_ethereum_consensus/beacon/pending_blocks.ex

Lines changed: 51 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
77
require Logger
88

99
alias LambdaEthereumConsensus.ForkChoice
10-
alias LambdaEthereumConsensus.P2P.BlockDownloader
11-
1210
alias LambdaEthereumConsensus.Metrics
1311
alias LambdaEthereumConsensus.P2P.BlobDownloader
12+
alias LambdaEthereumConsensus.P2P.BlockDownloader
1413
alias LambdaEthereumConsensus.Store.BlobDb
1514
alias LambdaEthereumConsensus.Store.Blocks
1615
alias Types.BlockInfo
1716
alias Types.SignedBeaconBlock
17+
alias Types.Store
1818

1919
@type block_status :: :pending | :invalid | :download | :download_blobs | :unknown
2020
@type block_info ::
@@ -36,8 +36,8 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
3636
3737
If blobs are missing, they will be requested.
3838
"""
39-
@spec add_block(SignedBeaconBlock.t()) :: :ok
40-
def add_block(signed_block) do
39+
@spec add_block(Store.t(), SignedBeaconBlock.t()) :: Store.t()
40+
def add_block(store, signed_block) do
4141
block_info = BlockInfo.from_block(signed_block)
4242
loaded_block = Blocks.get_block_info(block_info.root)
4343

@@ -47,14 +47,18 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
4747

4848
if Enum.empty?(missing_blobs) do
4949
Blocks.new_block_info(block_info)
50-
process_block_and_check_children(block_info)
50+
process_block_and_check_children(store, block_info)
5151
else
52-
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/1, @download_retries)
52+
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/2, @download_retries)
5353

5454
block_info
5555
|> BlockInfo.change_status(:download_blobs)
5656
|> Blocks.new_block_info()
57+
58+
store
5759
end
60+
else
61+
store
5862
end
5963
end
6064

@@ -63,17 +67,22 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
6367
module after receiving a new block, but there are some other cases like at node startup, as there
6468
may be pending blocks from prior executions.
6569
"""
66-
def process_blocks() do
70+
def process_blocks(store) do
6771
case Blocks.get_blocks_with_status(:pending) do
6872
{:ok, blocks} ->
6973
blocks
7074
|> Enum.sort_by(fn %BlockInfo{} = block_info -> block_info.signed_block.message.slot end)
71-
|> Enum.each(&process_block/1)
75+
|> Enum.reduce(store, fn block_info, store ->
76+
{store, _state} = process_block(store, block_info)
77+
store
78+
end)
7279

7380
{:error, reason} ->
7481
Logger.error(
7582
"[Pending Blocks] Failed to get pending blocks to process. Reason: #{reason}"
7683
)
84+
85+
store
7786
end
7887
end
7988

@@ -85,13 +94,14 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
8594
# is called to check if there's any children that can now be processed. This function
8695
# is only to be called when a new block is saved as pending, not when processing blocks
8796
# in batch, to avoid unneeded recursion.
88-
defp process_block_and_check_children(block_info) do
89-
if process_block(block_info) in [:transitioned, :invalid] do
90-
process_blocks()
97+
defp process_block_and_check_children(store, block_info) do
98+
case process_block(store, block_info) do
99+
{store, result} when result in [:transitioned, :invalid] -> process_blocks(store)
100+
{store, _other} -> store
91101
end
92102
end
93103

94-
defp process_block(block_info) do
104+
defp process_block(store, block_info) do
95105
if block_info.status != :pending do
96106
Logger.error("Called process block for a block that's not ready: #{block_info}")
97107
end
@@ -105,9 +115,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
105115

106116
BlockDownloader.request_blocks_by_root(
107117
[parent_root],
108-
fn result ->
109-
process_downloaded_block(result)
110-
end,
118+
&process_downloaded_block/2,
111119
@download_retries
112120
)
113121

@@ -116,65 +124,67 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
116124
block_info.root
117125
)
118126

119-
:download_pending
127+
{store, :download_pending}
120128

121129
%BlockInfo{status: :invalid} ->
122130
Blocks.change_status(block_info, :invalid)
123-
:invalid
131+
{store, :invalid}
124132

125133
%BlockInfo{status: :transitioned} ->
126-
case ForkChoice.on_block(block_info) do
127-
:ok ->
134+
case ForkChoice.on_block(store, block_info) do
135+
{:ok, store} ->
128136
Blocks.change_status(block_info, :transitioned)
129-
:transitioned
137+
{store, :transitioned}
130138

131-
{:error, reason} ->
139+
{:error, reason, store} ->
132140
Logger.error("[PendingBlocks] Saving block as invalid #{reason}",
133141
slot: block_info.signed_block.message.slot,
134142
root: block_info.root
135143
)
136144

137145
Blocks.change_status(block_info, :invalid)
138-
:invalid
146+
{store, :invalid}
139147
end
140148

141149
_other ->
142-
:ok
150+
{store, :ok}
143151
end
144152
end
145153

146-
defp process_downloaded_block({:ok, [block]}) do
147-
add_block(block)
154+
defp process_downloaded_block(store, {:ok, [block]}) do
155+
{:ok, add_block(store, block)}
148156
end
149157

150-
defp process_downloaded_block({:error, reason}) do
151-
Logger.error("Error downloading block: #{inspect(reason)}")
152-
158+
defp process_downloaded_block(store, {:error, reason}) do
153159
# We might want to declare a block invalid here.
160+
Logger.error("Error downloading block: #{inspect(reason)}")
161+
{:ok, store}
154162
end
155163

156-
defp process_blobs({:ok, blobs}), do: add_blobs(blobs)
157-
158-
defp process_blobs({:error, reason}) do
159-
Logger.error("Error downloading blobs: #{inspect(reason)}")
164+
defp process_blobs(store, {:ok, blobs}), do: {:ok, add_blobs(store, blobs)}
160165

166+
defp process_blobs(store, {:error, reason}) do
161167
# We might want to declare a block invalid here.
168+
Logger.error("Error downloading blobs: #{inspect(reason)}")
169+
{:ok, store}
162170
end
163171

172+
def add_blob(store, blob), do: add_blobs(store, [blob])
173+
164174
# To be used when a series of blobs are downloaded. Stores each blob.
165175
# If there are blocks that can be processed, does so immediately.
166-
defp add_blobs(blobs) do
176+
defp add_blobs(store, blobs) do
167177
blobs
168178
|> Enum.map(&BlobDb.store_blob/1)
169179
|> Enum.uniq()
170-
|> Enum.each(fn root ->
171-
with %BlockInfo{} = block_info <- Blocks.get_block_info(root) do
172-
# TODO: add a new missing blobs call if some blobs are still missing for a block.
173-
if Enum.empty?(missing_blobs(block_info)) do
174-
block_info
175-
|> Blocks.change_status(:pending)
176-
|> process_block_and_check_children()
177-
end
180+
|> Enum.reduce(store, fn root, store ->
181+
with %BlockInfo{} = block_info <- Blocks.get_block_info(root),
182+
[] <- missing_blobs(block_info) do
183+
block_info
184+
|> Blocks.change_status(:pending)
185+
|> then(&process_block_and_check_children(store, &1))
186+
else
187+
_ -> store
178188
end
179189
end)
180190
end

lib/lambda_ethereum_consensus/beacon/sync_blocks.ex

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
5151
BlockDownloader.request_blocks_by_range(
5252
first_slot,
5353
count,
54-
&on_chunk_downloaded/1,
54+
&on_chunk_downloaded/2,
5555
@retries
5656
)
5757

@@ -61,11 +61,13 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
6161
end
6262
end
6363

64-
defp on_chunk_downloaded({:ok, range, blocks}) do
64+
defp on_chunk_downloaded(store, {:ok, range, blocks}) do
6565
Libp2pPort.notify_blocks_downloaded(range, blocks)
66+
{:ok, store}
6667
end
6768

68-
defp on_chunk_downloaded({:error, range, reason}) do
69+
defp on_chunk_downloaded(store, {:error, range, reason}) do
6970
Libp2pPort.notify_block_download_failed(range, reason)
71+
{:ok, store}
7072
end
7173
end

0 commit comments

Comments
 (0)