diff --git a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex index 4d8abb8d3..537c7db73 100644 --- a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex @@ -10,13 +10,11 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do alias LambdaEthereumConsensus.ForkChoice alias LambdaEthereumConsensus.P2P.BlockDownloader + alias LambdaEthereumConsensus.Store.Blocks alias Types.SignedBeaconBlock - @type state :: %{ - pending_blocks: %{Types.root() => SignedBeaconBlock.t()}, - invalid_blocks: %{Types.root() => map()}, - blocks_to_download: MapSet.t(Types.root()) - } + @type block_status :: :pending | :invalid | :processing | :download | :unknown + @type state :: %{Types.root() => {SignedBeaconBlock.t() | nil, block_status()}} ########################## ### Public API @@ -31,11 +29,6 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do GenServer.cast(__MODULE__, {:add_block, signed_block}) end - @spec pending_block?(Types.root()) :: boolean() - def pending_block?(block_root) do - GenServer.call(__MODULE__, {:pending_block?, block_root}) - end - ########################## ### GenServer Callbacks ########################## @@ -45,19 +38,34 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do def init(_opts) do schedule_blocks_processing() schedule_blocks_download() - {:ok, %{pending_blocks: %{}, invalid_blocks: %{}, blocks_to_download: MapSet.new()}} + + {:ok, Map.new()} end + @spec handle_cast(any(), state()) :: {:noreply, state()} + @impl true def handle_cast({:add_block, %SignedBeaconBlock{message: block} = signed_block}, state) do block_root = Ssz.hash_tree_root!(block) - pending_blocks = Map.put(state.pending_blocks, block_root, signed_block) - {:noreply, Map.put(state, :pending_blocks, pending_blocks)} + + if state |> Map.get(block_root) do + {:noreply, state} + else + {:noreply, state |> Map.put(block_root, {signed_block, :pending})} + end end @impl true - def handle_call({:pending_block?, block_root}, _from, state) do - {:reply, Map.has_key?(state.pending_blocks, block_root), state} + def handle_cast({:block_processed, block_root, is_valid?}, state) do + if is_valid? do + state |> Map.delete(block_root) + else + state + |> Map.put(block_root, {nil, :invalid}) + end + |> then(fn state -> + {:noreply, state} + end) end @spec handle_info(any(), state()) :: {:noreply, state()} @@ -68,44 +76,40 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do @impl true @spec handle_info(atom(), state()) :: {:noreply, state()} def handle_info(:process_blocks, state) do - state.pending_blocks + state + |> Map.filter(fn {_, {_, s}} -> s == :pending end) + |> Enum.map(fn {root, {block, _}} -> {root, block} end) |> Enum.sort_by(fn {_, signed_block} -> signed_block.message.slot end) |> Enum.reduce(state, fn {block_root, signed_block}, state -> parent_root = signed_block.message.parent_root + parent_status = get_block_status(state, parent_root) cond do + # If already processed, remove it + Blocks.get_block(block_root) -> + state |> Map.delete(block_root) + # If parent is invalid, block is invalid - state.invalid_blocks |> Map.has_key?(parent_root) -> + parent_status == :invalid -> + state |> Map.put(block_root, {nil, :invalid}) + + # If parent is processing, block is pending + parent_status == :processing -> state - |> Map.update!(:pending_blocks, &Map.delete(&1, block_root)) - |> Map.update!( - :invalid_blocks, - &Map.put(&1, block_root, signed_block.message |> Map.take([:slot, :parent_root])) - ) # If parent is pending, block is pending - state.pending_blocks |> Map.has_key?(parent_root) -> + parent_status == :pending -> state - # If already in fork choice, remove from pending - ForkChoice.has_block?(block_root) -> - state |> Map.update!(:pending_blocks, &Map.delete(&1, block_root)) - # If parent is not in fork choice, download parent - not ForkChoice.has_block?(parent_root) -> - state |> Map.update!(:blocks_to_download, &MapSet.put(&1, parent_root)) + !Blocks.get_block(parent_root) -> + state |> Map.put(parent_root, {nil, :download}) # If all the other conditions are false, add block to fork choice true -> - new_state = send_block_to_forkchoice(state, signed_block, block_root) - - # When on checkpoint sync, we might accumulate a couple of hundred blocks in the pending blocks queue. - # This can cause the ForkChoice to timeout on other call requests since it has to process all the - # pending blocks first. - # TODO: find a better way to handle this - Process.sleep(100) - - new_state + Logger.info("Adding block to fork choice: ", root: block_root) + ForkChoice.on_block(signed_block, block_root) + state |> Map.put(block_root, {signed_block, :processing}) end end) |> then(fn state -> @@ -114,22 +118,12 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do end) end - @empty_mapset MapSet.new() - - @impl true - def handle_info(:download_blocks, %{blocks_to_download: to_download} = state) - when to_download == @empty_mapset do - schedule_blocks_download() - {:noreply, state} - end - @impl true def handle_info(:download_blocks, state) do - blocks_in_store = state.blocks_to_download |> MapSet.filter(&ForkChoice.has_block?/1) + blocks_to_download = state |> Map.filter(fn {_, {_, s}} -> s == :download end) |> Map.keys() downloaded_blocks = - state.blocks_to_download - |> MapSet.difference(blocks_in_store) + blocks_to_download |> Enum.take(16) |> BlockDownloader.request_blocks_by_root() |> case do @@ -141,38 +135,24 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do [] end - for signed_block <- downloaded_blocks do - add_block(signed_block) - end - - roots_to_remove = + new_state = downloaded_blocks - |> Enum.map(&Ssz.hash_tree_root!(&1.message)) - |> MapSet.new() - |> MapSet.union(blocks_in_store) + |> Enum.reduce(state, fn signed_block, state -> + block_root = Ssz.hash_tree_root!(signed_block.message) + state |> Map.put(block_root, {signed_block, :pending}) + end) schedule_blocks_download() - {:noreply, Map.update!(state, :blocks_to_download, &MapSet.difference(&1, roots_to_remove))} + {:noreply, new_state} end ########################## ### Private Functions ########################## - @spec send_block_to_forkchoice(state(), SignedBeaconBlock.t(), Types.root()) :: state() - defp send_block_to_forkchoice(state, signed_block, block_root) do - case ForkChoice.on_block(signed_block, block_root) do - :ok -> - state |> Map.update!(:pending_blocks, &Map.delete(&1, block_root)) - - :error -> - state - |> Map.update!(:pending_blocks, &Map.delete(&1, block_root)) - |> Map.update!( - :invalid_blocks, - &Map.put(&1, block_root, signed_block.message |> Map.take([:slot, :parent_root])) - ) - end + @spec get_block_status(state(), Types.root()) :: block_status() + defp get_block_status(state, block_root) do + state |> Map.get(block_root, {nil, :unknown}) |> elem(1) end def schedule_blocks_processing do diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 52cee7b9b..3c6443b79 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -14,8 +14,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do alias Types.SignedBeaconBlock alias Types.Store - @default_timeout 100_000 - ########################## ### Public API ########################## @@ -26,11 +24,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end - @spec has_block?(Types.root()) :: boolean() - def has_block?(block_root) do - GenServer.call(__MODULE__, {:has_block?, block_root}, @default_timeout) - end - @spec on_tick(Types.uint64()) :: :ok def on_tick(time) do GenServer.cast(__MODULE__, {:on_tick, time}) @@ -38,7 +31,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do @spec on_block(Types.SignedBeaconBlock.t(), Types.root()) :: :ok | :error def on_block(signed_block, block_root) do - GenServer.call(__MODULE__, {:on_block, block_root, signed_block}, @default_timeout) + GenServer.cast(__MODULE__, {:on_block, block_root, signed_block, self()}) end @spec on_attestation(Types.Attestation.t()) :: :ok @@ -75,17 +68,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do end @impl GenServer - def handle_call({:get_store_attrs, attrs}, _from, state) do - values = Enum.map(attrs, &Map.fetch!(state, &1)) - {:reply, values, state} - end - - def handle_call({:has_block?, block_root}, _from, state) do - {:reply, Store.has_block?(state, block_root), state} - end - - @impl GenServer - def handle_call({:on_block, block_root, %SignedBeaconBlock{} = signed_block}, _from, store) do + def handle_cast({:on_block, block_root, %SignedBeaconBlock{} = signed_block, from}, store) do slot = signed_block.message.slot result = @@ -99,11 +82,14 @@ defmodule LambdaEthereumConsensus.ForkChoice do Logger.info("[Fork choice] New block added", slot: slot, root: block_root) Task.async(__MODULE__, :recompute_head, [new_store]) - {:reply, :ok, new_store} + + GenServer.cast(from, {:block_processed, block_root, true}) + {:noreply, new_store} {:error, reason} -> Logger.error("[Fork choice] Failed to add block: #{reason}", slot: slot) - {:reply, :error, store} + GenServer.cast(from, {:block_processed, block_root, false}) + {:noreply, store} end end diff --git a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex index af035471f..5ed2caea0 100644 --- a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex @@ -79,7 +79,13 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do @spec request_blocks_by_root([Types.root()], integer()) :: {:ok, [Types.SignedBeaconBlock.t()]} | {:error, binary()} - def request_blocks_by_root(roots, retries \\ @default_retries) do + def request_blocks_by_root(roots, retries \\ @default_retries) + + def request_blocks_by_root([], _retries) do + {:ok, []} + end + + def request_blocks_by_root(roots, retries) do Logger.debug("Requesting block for roots #{Enum.map_join(roots, ", ", &Base.encode16/1)}") peer_id = get_some_peer() diff --git a/test/unit/pending_blocks_test.exs b/test/unit/pending_blocks_test.exs deleted file mode 100644 index b56e3e1b1..000000000 --- a/test/unit/pending_blocks_test.exs +++ /dev/null @@ -1,40 +0,0 @@ -defmodule Unit.PendingBlocks do - @moduledoc false - - use ExUnit.Case - use Patch - - alias LambdaEthereumConsensus.Beacon.PendingBlocks - alias LambdaEthereumConsensus.ForkChoice - alias LambdaEthereumConsensus.Store.BlockStore - - setup do - Application.put_env(:lambda_ethereum_consensus, ChainSpec, config: MainnetConfig) - - # Lets trigger the process_blocks manually - patch(PendingBlocks, :schedule_blocks_processing, fn -> :ok end) - patch(PendingBlocks, :schedule_blocks_download, fn -> :ok end) - - start_supervised!({PendingBlocks, []}) - :ok - end - - test "Adds a pending block to fork choice if the parent is there" do - signed_block = Fixtures.Block.signed_beacon_block() - block_root = Ssz.hash_tree_root!(signed_block.message) - - patch(ForkChoice, :has_block?, fn root -> root == signed_block.message.parent_root end) - patch(ForkChoice, :on_block, fn _block, _root -> :ok end) - - # Don't store the block in the DB, to avoid having to set it up - patch(BlockStore, :store_block, fn _block -> :ok end) - - PendingBlocks.add_block(signed_block) - - assert PendingBlocks.pending_block?(block_root) - send(PendingBlocks, :process_blocks) - - # If the block is not pending anymore, it means it was added to the fork choice - assert not PendingBlocks.pending_block?(block_root) - end -end