Skip to content

Commit 98acdd8

Browse files
authored
refactor: make block processing async inside PendingBlocks (#709)
1 parent 29e3eaa commit 98acdd8

File tree

4 files changed

+67
-135
lines changed

4 files changed

+67
-135
lines changed

lib/lambda_ethereum_consensus/beacon/pending_blocks.ex

Lines changed: 53 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,11 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
1010

1111
alias LambdaEthereumConsensus.ForkChoice
1212
alias LambdaEthereumConsensus.P2P.BlockDownloader
13+
alias LambdaEthereumConsensus.Store.Blocks
1314
alias Types.SignedBeaconBlock
1415

15-
@type state :: %{
16-
pending_blocks: %{Types.root() => SignedBeaconBlock.t()},
17-
invalid_blocks: %{Types.root() => map()},
18-
blocks_to_download: MapSet.t(Types.root())
19-
}
16+
@type block_status :: :pending | :invalid | :processing | :download | :unknown
17+
@type state :: %{Types.root() => {SignedBeaconBlock.t() | nil, block_status()}}
2018

2119
##########################
2220
### Public API
@@ -31,11 +29,6 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
3129
GenServer.cast(__MODULE__, {:add_block, signed_block})
3230
end
3331

34-
@spec pending_block?(Types.root()) :: boolean()
35-
def pending_block?(block_root) do
36-
GenServer.call(__MODULE__, {:pending_block?, block_root})
37-
end
38-
3932
##########################
4033
### GenServer Callbacks
4134
##########################
@@ -45,19 +38,34 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
4538
def init(_opts) do
4639
schedule_blocks_processing()
4740
schedule_blocks_download()
48-
{:ok, %{pending_blocks: %{}, invalid_blocks: %{}, blocks_to_download: MapSet.new()}}
41+
42+
{:ok, Map.new()}
4943
end
5044

45+
@spec handle_cast(any(), state()) :: {:noreply, state()}
46+
5147
@impl true
5248
def handle_cast({:add_block, %SignedBeaconBlock{message: block} = signed_block}, state) do
5349
block_root = Ssz.hash_tree_root!(block)
54-
pending_blocks = Map.put(state.pending_blocks, block_root, signed_block)
55-
{:noreply, Map.put(state, :pending_blocks, pending_blocks)}
50+
51+
if state |> Map.get(block_root) do
52+
{:noreply, state}
53+
else
54+
{:noreply, state |> Map.put(block_root, {signed_block, :pending})}
55+
end
5656
end
5757

5858
@impl true
59-
def handle_call({:pending_block?, block_root}, _from, state) do
60-
{:reply, Map.has_key?(state.pending_blocks, block_root), state}
59+
def handle_cast({:block_processed, block_root, is_valid?}, state) do
60+
if is_valid? do
61+
state |> Map.delete(block_root)
62+
else
63+
state
64+
|> Map.put(block_root, {nil, :invalid})
65+
end
66+
|> then(fn state ->
67+
{:noreply, state}
68+
end)
6169
end
6270

6371
@spec handle_info(any(), state()) :: {:noreply, state()}
@@ -68,44 +76,40 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
6876
@impl true
6977
@spec handle_info(atom(), state()) :: {:noreply, state()}
7078
def handle_info(:process_blocks, state) do
71-
state.pending_blocks
79+
state
80+
|> Map.filter(fn {_, {_, s}} -> s == :pending end)
81+
|> Enum.map(fn {root, {block, _}} -> {root, block} end)
7282
|> Enum.sort_by(fn {_, signed_block} -> signed_block.message.slot end)
7383
|> Enum.reduce(state, fn {block_root, signed_block}, state ->
7484
parent_root = signed_block.message.parent_root
85+
parent_status = get_block_status(state, parent_root)
7586

7687
cond do
88+
# If already processed, remove it
89+
Blocks.get_block(block_root) ->
90+
state |> Map.delete(block_root)
91+
7792
# If parent is invalid, block is invalid
78-
state.invalid_blocks |> Map.has_key?(parent_root) ->
93+
parent_status == :invalid ->
94+
state |> Map.put(block_root, {nil, :invalid})
95+
96+
# If parent is processing, block is pending
97+
parent_status == :processing ->
7998
state
80-
|> Map.update!(:pending_blocks, &Map.delete(&1, block_root))
81-
|> Map.update!(
82-
:invalid_blocks,
83-
&Map.put(&1, block_root, signed_block.message |> Map.take([:slot, :parent_root]))
84-
)
8599

86100
# If parent is pending, block is pending
87-
state.pending_blocks |> Map.has_key?(parent_root) ->
101+
parent_status == :pending ->
88102
state
89103

90-
# If already in fork choice, remove from pending
91-
ForkChoice.has_block?(block_root) ->
92-
state |> Map.update!(:pending_blocks, &Map.delete(&1, block_root))
93-
94104
# If parent is not in fork choice, download parent
95-
not ForkChoice.has_block?(parent_root) ->
96-
state |> Map.update!(:blocks_to_download, &MapSet.put(&1, parent_root))
105+
!Blocks.get_block(parent_root) ->
106+
state |> Map.put(parent_root, {nil, :download})
97107

98108
# If all the other conditions are false, add block to fork choice
99109
true ->
100-
new_state = send_block_to_forkchoice(state, signed_block, block_root)
101-
102-
# When on checkpoint sync, we might accumulate a couple of hundred blocks in the pending blocks queue.
103-
# This can cause the ForkChoice to timeout on other call requests since it has to process all the
104-
# pending blocks first.
105-
# TODO: find a better way to handle this
106-
Process.sleep(100)
107-
108-
new_state
110+
Logger.info("Adding block to fork choice: ", root: block_root)
111+
ForkChoice.on_block(signed_block, block_root)
112+
state |> Map.put(block_root, {signed_block, :processing})
109113
end
110114
end)
111115
|> then(fn state ->
@@ -114,22 +118,12 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
114118
end)
115119
end
116120

117-
@empty_mapset MapSet.new()
118-
119-
@impl true
120-
def handle_info(:download_blocks, %{blocks_to_download: to_download} = state)
121-
when to_download == @empty_mapset do
122-
schedule_blocks_download()
123-
{:noreply, state}
124-
end
125-
126121
@impl true
127122
def handle_info(:download_blocks, state) do
128-
blocks_in_store = state.blocks_to_download |> MapSet.filter(&ForkChoice.has_block?/1)
123+
blocks_to_download = state |> Map.filter(fn {_, {_, s}} -> s == :download end) |> Map.keys()
129124

130125
downloaded_blocks =
131-
state.blocks_to_download
132-
|> MapSet.difference(blocks_in_store)
126+
blocks_to_download
133127
|> Enum.take(16)
134128
|> BlockDownloader.request_blocks_by_root()
135129
|> case do
@@ -141,38 +135,24 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
141135
[]
142136
end
143137

144-
for signed_block <- downloaded_blocks do
145-
add_block(signed_block)
146-
end
147-
148-
roots_to_remove =
138+
new_state =
149139
downloaded_blocks
150-
|> Enum.map(&Ssz.hash_tree_root!(&1.message))
151-
|> MapSet.new()
152-
|> MapSet.union(blocks_in_store)
140+
|> Enum.reduce(state, fn signed_block, state ->
141+
block_root = Ssz.hash_tree_root!(signed_block.message)
142+
state |> Map.put(block_root, {signed_block, :pending})
143+
end)
153144

154145
schedule_blocks_download()
155-
{:noreply, Map.update!(state, :blocks_to_download, &MapSet.difference(&1, roots_to_remove))}
146+
{:noreply, new_state}
156147
end
157148

158149
##########################
159150
### Private Functions
160151
##########################
161152

162-
@spec send_block_to_forkchoice(state(), SignedBeaconBlock.t(), Types.root()) :: state()
163-
defp send_block_to_forkchoice(state, signed_block, block_root) do
164-
case ForkChoice.on_block(signed_block, block_root) do
165-
:ok ->
166-
state |> Map.update!(:pending_blocks, &Map.delete(&1, block_root))
167-
168-
:error ->
169-
state
170-
|> Map.update!(:pending_blocks, &Map.delete(&1, block_root))
171-
|> Map.update!(
172-
:invalid_blocks,
173-
&Map.put(&1, block_root, signed_block.message |> Map.take([:slot, :parent_root]))
174-
)
175-
end
153+
@spec get_block_status(state(), Types.root()) :: block_status()
154+
defp get_block_status(state, block_root) do
155+
state |> Map.get(block_root, {nil, :unknown}) |> elem(1)
176156
end
177157

178158
def schedule_blocks_processing do

lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do
1414
alias Types.SignedBeaconBlock
1515
alias Types.Store
1616

17-
@default_timeout 100_000
18-
1917
##########################
2018
### Public API
2119
##########################
@@ -26,19 +24,14 @@ defmodule LambdaEthereumConsensus.ForkChoice do
2624
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
2725
end
2826

29-
@spec has_block?(Types.root()) :: boolean()
30-
def has_block?(block_root) do
31-
GenServer.call(__MODULE__, {:has_block?, block_root}, @default_timeout)
32-
end
33-
3427
@spec on_tick(Types.uint64()) :: :ok
3528
def on_tick(time) do
3629
GenServer.cast(__MODULE__, {:on_tick, time})
3730
end
3831

3932
@spec on_block(Types.SignedBeaconBlock.t(), Types.root()) :: :ok | :error
4033
def on_block(signed_block, block_root) do
41-
GenServer.call(__MODULE__, {:on_block, block_root, signed_block}, @default_timeout)
34+
GenServer.cast(__MODULE__, {:on_block, block_root, signed_block, self()})
4235
end
4336

4437
@spec on_attestation(Types.Attestation.t()) :: :ok
@@ -75,17 +68,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
7568
end
7669

7770
@impl GenServer
78-
def handle_call({:get_store_attrs, attrs}, _from, state) do
79-
values = Enum.map(attrs, &Map.fetch!(state, &1))
80-
{:reply, values, state}
81-
end
82-
83-
def handle_call({:has_block?, block_root}, _from, state) do
84-
{:reply, Store.has_block?(state, block_root), state}
85-
end
86-
87-
@impl GenServer
88-
def handle_call({:on_block, block_root, %SignedBeaconBlock{} = signed_block}, _from, store) do
71+
def handle_cast({:on_block, block_root, %SignedBeaconBlock{} = signed_block, from}, store) do
8972
slot = signed_block.message.slot
9073

9174
result =
@@ -99,11 +82,14 @@ defmodule LambdaEthereumConsensus.ForkChoice do
9982
Logger.info("[Fork choice] New block added", slot: slot, root: block_root)
10083

10184
Task.async(__MODULE__, :recompute_head, [new_store])
102-
{:reply, :ok, new_store}
85+
86+
GenServer.cast(from, {:block_processed, block_root, true})
87+
{:noreply, new_store}
10388

10489
{:error, reason} ->
10590
Logger.error("[Fork choice] Failed to add block: #{reason}", slot: slot)
106-
{:reply, :error, store}
91+
GenServer.cast(from, {:block_processed, block_root, false})
92+
{:noreply, store}
10793
end
10894
end
10995

lib/lambda_ethereum_consensus/p2p/block_downloader.ex

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,13 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do
7979

8080
@spec request_blocks_by_root([Types.root()], integer()) ::
8181
{:ok, [Types.SignedBeaconBlock.t()]} | {:error, binary()}
82-
def request_blocks_by_root(roots, retries \\ @default_retries) do
82+
def request_blocks_by_root(roots, retries \\ @default_retries)
83+
84+
def request_blocks_by_root([], _retries) do
85+
{:ok, []}
86+
end
87+
88+
def request_blocks_by_root(roots, retries) do
8389
Logger.debug("Requesting block for roots #{Enum.map_join(roots, ", ", &Base.encode16/1)}")
8490

8591
peer_id = get_some_peer()

test/unit/pending_blocks_test.exs

Lines changed: 0 additions & 40 deletions
This file was deleted.

0 commit comments

Comments
 (0)