Skip to content

refactor: make block processing async inside PendingBlocks #709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 53 additions & 73 deletions lib/lambda_ethereum_consensus/beacon/pending_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do

alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.P2P.BlockDownloader
alias LambdaEthereumConsensus.Store.Blocks
alias Types.SignedBeaconBlock

@type state :: %{
pending_blocks: %{Types.root() => SignedBeaconBlock.t()},
invalid_blocks: %{Types.root() => map()},
blocks_to_download: MapSet.t(Types.root())
}
@type block_status :: :pending | :invalid | :processing | :download | :unknown
@type state :: %{Types.root() => {SignedBeaconBlock.t() | nil, block_status()}}

##########################
### Public API
Expand All @@ -31,11 +29,6 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
GenServer.cast(__MODULE__, {:add_block, signed_block})
end

@spec pending_block?(Types.root()) :: boolean()
def pending_block?(block_root) do
GenServer.call(__MODULE__, {:pending_block?, block_root})
end

##########################
### GenServer Callbacks
##########################
Expand All @@ -45,19 +38,34 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
def init(_opts) do
schedule_blocks_processing()
schedule_blocks_download()
{:ok, %{pending_blocks: %{}, invalid_blocks: %{}, blocks_to_download: MapSet.new()}}

{:ok, Map.new()}
end

@spec handle_cast(any(), state()) :: {:noreply, state()}

@impl true
def handle_cast({:add_block, %SignedBeaconBlock{message: block} = signed_block}, state) do
block_root = Ssz.hash_tree_root!(block)
pending_blocks = Map.put(state.pending_blocks, block_root, signed_block)
{:noreply, Map.put(state, :pending_blocks, pending_blocks)}

if state |> Map.get(block_root) do
{:noreply, state}
else
{:noreply, state |> Map.put(block_root, {signed_block, :pending})}
end
end

@impl true
def handle_call({:pending_block?, block_root}, _from, state) do
{:reply, Map.has_key?(state.pending_blocks, block_root), state}
def handle_cast({:block_processed, block_root, is_valid?}, state) do
if is_valid? do
state |> Map.delete(block_root)
else
state
|> Map.put(block_root, {nil, :invalid})
end
|> then(fn state ->
{:noreply, state}
end)
end

@spec handle_info(any(), state()) :: {:noreply, state()}
Expand All @@ -68,44 +76,40 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
@impl true
@spec handle_info(atom(), state()) :: {:noreply, state()}
def handle_info(:process_blocks, state) do
state.pending_blocks
state
|> Map.filter(fn {_, {_, s}} -> s == :pending end)
|> Enum.map(fn {root, {block, _}} -> {root, block} end)
|> Enum.sort_by(fn {_, signed_block} -> signed_block.message.slot end)
|> Enum.reduce(state, fn {block_root, signed_block}, state ->
parent_root = signed_block.message.parent_root
parent_status = get_block_status(state, parent_root)

cond do
# If already processed, remove it
Blocks.get_block(block_root) ->
state |> Map.delete(block_root)

# If parent is invalid, block is invalid
state.invalid_blocks |> Map.has_key?(parent_root) ->
parent_status == :invalid ->
state |> Map.put(block_root, {nil, :invalid})

# If parent is processing, block is pending
parent_status == :processing ->
state
|> Map.update!(:pending_blocks, &Map.delete(&1, block_root))
|> Map.update!(
:invalid_blocks,
&Map.put(&1, block_root, signed_block.message |> Map.take([:slot, :parent_root]))
)

# If parent is pending, block is pending
state.pending_blocks |> Map.has_key?(parent_root) ->
parent_status == :pending ->
state

# If already in fork choice, remove from pending
ForkChoice.has_block?(block_root) ->
state |> Map.update!(:pending_blocks, &Map.delete(&1, block_root))

# If parent is not in fork choice, download parent
not ForkChoice.has_block?(parent_root) ->
state |> Map.update!(:blocks_to_download, &MapSet.put(&1, parent_root))
!Blocks.get_block(parent_root) ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces a bug: if the parent is in the DB this would be false, and we'd try to process the block even if its parent wasn't processed, causing it to fail and marking it as an invalid block

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this assumes that a block is stored in the DB only if it is processed (and valid). We should not store invalid (or potentially invalid) blocks.

Do we store blocks before processing them today?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. But this can happen if the node restarts after processing some blocks

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is probably out of scope from this PR

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sure. We could patch it up by just nuking the database in the makefile to avoid this error when debugging, but we need to find a better way to handle this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like me to add this to the PR?

Copy link
Collaborator

@Arkenan Arkenan Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a more complex discussion outside of the scope of the PR, which is supporting recoverability, which we don't yet. To do so, we should save the state of a block in the DB.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, I meant we need to find a better way to handle this in a following PR. For now, we can just go for the nuclear option.

state |> Map.put(parent_root, {nil, :download})

# If all the other conditions are false, add block to fork choice
true ->
new_state = send_block_to_forkchoice(state, signed_block, block_root)

# When on checkpoint sync, we might accumulate a couple of hundred blocks in the pending blocks queue.
# This can cause the ForkChoice to timeout on other call requests since it has to process all the
# pending blocks first.
# TODO: find a better way to handle this
Process.sleep(100)

new_state
Logger.info("Adding block to fork choice: ", root: block_root)
ForkChoice.on_block(signed_block, block_root)
state |> Map.put(block_root, {signed_block, :processing})
end
end)
|> then(fn state ->
Expand All @@ -114,22 +118,12 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
end)
end

@empty_mapset MapSet.new()

@impl true
def handle_info(:download_blocks, %{blocks_to_download: to_download} = state)
when to_download == @empty_mapset do
schedule_blocks_download()
{:noreply, state}
end

@impl true
def handle_info(:download_blocks, state) do
blocks_in_store = state.blocks_to_download |> MapSet.filter(&ForkChoice.has_block?/1)
blocks_to_download = state |> Map.filter(fn {_, {_, s}} -> s == :download end) |> Map.keys()

downloaded_blocks =
state.blocks_to_download
|> MapSet.difference(blocks_in_store)
blocks_to_download
|> Enum.take(16)
|> BlockDownloader.request_blocks_by_root()
|> case do
Expand All @@ -141,38 +135,24 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
[]
end

for signed_block <- downloaded_blocks do
add_block(signed_block)
end

roots_to_remove =
new_state =
downloaded_blocks
|> Enum.map(&Ssz.hash_tree_root!(&1.message))
|> MapSet.new()
|> MapSet.union(blocks_in_store)
|> Enum.reduce(state, fn signed_block, state ->
block_root = Ssz.hash_tree_root!(signed_block.message)
state |> Map.put(block_root, {signed_block, :pending})
end)

schedule_blocks_download()
{:noreply, Map.update!(state, :blocks_to_download, &MapSet.difference(&1, roots_to_remove))}
{:noreply, new_state}
end

##########################
### Private Functions
##########################

@spec send_block_to_forkchoice(state(), SignedBeaconBlock.t(), Types.root()) :: state()
defp send_block_to_forkchoice(state, signed_block, block_root) do
case ForkChoice.on_block(signed_block, block_root) do
:ok ->
state |> Map.update!(:pending_blocks, &Map.delete(&1, block_root))

:error ->
state
|> Map.update!(:pending_blocks, &Map.delete(&1, block_root))
|> Map.update!(
:invalid_blocks,
&Map.put(&1, block_root, signed_block.message |> Map.take([:slot, :parent_root]))
)
end
@spec get_block_status(state(), Types.root()) :: block_status()
defp get_block_status(state, block_root) do
state |> Map.get(block_root, {nil, :unknown}) |> elem(1)
end

def schedule_blocks_processing do
Expand Down
28 changes: 7 additions & 21 deletions lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do
alias Types.SignedBeaconBlock
alias Types.Store

@default_timeout 100_000

##########################
### Public API
##########################
Expand All @@ -26,19 +24,14 @@ defmodule LambdaEthereumConsensus.ForkChoice do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@spec has_block?(Types.root()) :: boolean()
def has_block?(block_root) do
GenServer.call(__MODULE__, {:has_block?, block_root}, @default_timeout)
end

@spec on_tick(Types.uint64()) :: :ok
def on_tick(time) do
GenServer.cast(__MODULE__, {:on_tick, time})
end

@spec on_block(Types.SignedBeaconBlock.t(), Types.root()) :: :ok | :error
def on_block(signed_block, block_root) do
GenServer.call(__MODULE__, {:on_block, block_root, signed_block}, @default_timeout)
GenServer.cast(__MODULE__, {:on_block, block_root, signed_block, self()})
end

@spec on_attestation(Types.Attestation.t()) :: :ok
Expand Down Expand Up @@ -75,17 +68,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
end

@impl GenServer
def handle_call({:get_store_attrs, attrs}, _from, state) do
values = Enum.map(attrs, &Map.fetch!(state, &1))
{:reply, values, state}
end

def handle_call({:has_block?, block_root}, _from, state) do
{:reply, Store.has_block?(state, block_root), state}
end

@impl GenServer
def handle_call({:on_block, block_root, %SignedBeaconBlock{} = signed_block}, _from, store) do
def handle_cast({:on_block, block_root, %SignedBeaconBlock{} = signed_block, from}, store) do
slot = signed_block.message.slot

result =
Expand All @@ -99,11 +82,14 @@ defmodule LambdaEthereumConsensus.ForkChoice do
Logger.info("[Fork choice] New block added", slot: slot, root: block_root)

Task.async(__MODULE__, :recompute_head, [new_store])
{:reply, :ok, new_store}

GenServer.cast(from, {:block_processed, block_root, true})
{:noreply, new_store}

{:error, reason} ->
Logger.error("[Fork choice] Failed to add block: #{reason}", slot: slot)
{:reply, :error, store}
GenServer.cast(from, {:block_processed, block_root, false})
{:noreply, store}
end
end

Expand Down
8 changes: 7 additions & 1 deletion lib/lambda_ethereum_consensus/p2p/block_downloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,13 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do

@spec request_blocks_by_root([Types.root()], integer()) ::
{:ok, [Types.SignedBeaconBlock.t()]} | {:error, binary()}
def request_blocks_by_root(roots, retries \\ @default_retries) do
def request_blocks_by_root(roots, retries \\ @default_retries)

def request_blocks_by_root([], _retries) do
{:ok, []}
end

def request_blocks_by_root(roots, retries) do
Logger.debug("Requesting block for roots #{Enum.map_join(roots, ", ", &Base.encode16/1)}")

peer_id = get_some_peer()
Expand Down
40 changes: 0 additions & 40 deletions test/unit/pending_blocks_test.exs

This file was deleted.