diff --git a/lib/beacon_api/controllers/v1/beacon_controller.ex b/lib/beacon_api/controllers/v1/beacon_controller.ex index f01258399..1f33036bf 100644 --- a/lib/beacon_api/controllers/v1/beacon_controller.ex +++ b/lib/beacon_api/controllers/v1/beacon_controller.ex @@ -5,7 +5,7 @@ defmodule BeaconApi.V1.BeaconController do alias BeaconApi.ErrorController alias BeaconApi.Helpers alias BeaconApi.Utils - alias LambdaEthereumConsensus.Store.BlockDb + alias LambdaEthereumConsensus.Store.BlockBySlot alias LambdaEthereumConsensus.Store.Blocks alias LambdaEthereumConsensus.Store.StoreDb @@ -94,7 +94,7 @@ defmodule BeaconApi.V1.BeaconController do def get_block_root(conn, %{block_id: block_id}) do with {slot, ""} when slot >= 0 <- Integer.parse(block_id), - {:ok, block_root} <- BlockDb.get_block_root_by_slot(slot) do + {:ok, block_root} <- BlockBySlot.get(slot) do conn |> root_response(block_root, true, false) else :not_found -> diff --git a/lib/beacon_api/helpers.ex b/lib/beacon_api/helpers.ex index 20b95772e..fc5511bc5 100644 --- a/lib/beacon_api/helpers.ex +++ b/lib/beacon_api/helpers.ex @@ -4,13 +4,15 @@ defmodule BeaconApi.Helpers do """ alias LambdaEthereumConsensus.ForkChoice + alias LambdaEthereumConsensus.Store.BlockBySlot alias LambdaEthereumConsensus.Store.BlockDb alias LambdaEthereumConsensus.Store.Blocks alias LambdaEthereumConsensus.Store.StateDb - alias Types.StateInfo - alias Types.BeaconState alias Types.SignedBeaconBlock + alias Types.StateInfo + + import Types.Guards @type named_root() :: :genesis | :justified | :finalized | :head @type block_id() :: named_root() | :invalid_id | Types.slot() | Types.root() @@ -78,11 +80,14 @@ defmodule BeaconApi.Helpers do def block_root_by_block_id(slot) when is_integer(slot) do with :ok <- check_valid_slot(slot, ForkChoice.get_current_chain_slot()), - {:ok, root} <- BlockDb.get_block_root_by_slot(slot) do + {:ok, root} when is_root(root) <- BlockBySlot.get(slot) do # TODO compute is_optimistic_or_invalid() and is_finalized() execution_optimistic = true finalized = false {:ok, {root, execution_optimistic, finalized}} + else + {:ok, :empty_slot} -> :empty_slot + other -> other end end diff --git a/lib/lambda_ethereum_consensus/store/block_by_slot.ex b/lib/lambda_ethereum_consensus/store/block_by_slot.ex new file mode 100644 index 000000000..765a180c7 --- /dev/null +++ b/lib/lambda_ethereum_consensus/store/block_by_slot.ex @@ -0,0 +1,65 @@ +defmodule LambdaEthereumConsensus.Store.BlockBySlot do + @moduledoc """ + KvSchema that stores block roots indexed by slot. As we store blocks by their root, this module + acts as an index if we need to look for them using their root. Some use cases are block pruning + (removing all blocks prior to a slot) or checking if a range of slots contain blocks, for + checkpoint sync checks. + """ + + alias LambdaEthereumConsensus.Store.KvSchema + use KvSchema, prefix: "blockSlot" + @type value_t :: Types.root() | :empty_slot + + ################################ + ### PUBLIC API + ################################ + + @doc """ + Checks if all the blocks between first_slot and last_slot are present in the db. + This iterates through the db checking each one individually, although it only checks + the keys, so it doesn't need to decode the values, making it a relatively cheap + linear O(last_slot - first_slot) operation. + """ + @spec all_present?(Types.slot(), Types.slot()) :: boolean() + def all_present?(first_slot, last_slot) do + fold_keys(last_slot, MapSet.new(), fn slot, set -> MapSet.put(set, slot) end, + include_first: true + ) + |> case do + {:ok, available} -> + Enum.all?(first_slot..last_slot, fn slot -> slot in available end) + + {:error, :invalid_iterator} -> + false + + {:error, "Failed to start iterator for table" <> _} -> + false + end + end + + ################################ + ### Schema implementation + ################################ + + @impl KvSchema + @spec encode_key(Types.slot()) :: {:ok, binary()} | {:error, binary()} + def encode_key(slot), do: {:ok, <>} + + @impl KvSchema + @spec decode_key(binary()) :: {:ok, integer()} | {:error, binary()} + def decode_key(<>), do: {:ok, slot} + + def decode_key(other) do + {:error, "[Block by slot] Could not decode slot, not 64 bit integer: #{other}"} + end + + @impl KvSchema + @spec encode_value(value_t()) :: {:ok, value_t()} | {:error, binary()} + def encode_value(:empty_slot), do: {:ok, <<>>} + def encode_value(<<_::256>> = root), do: {:ok, root} + + @impl KvSchema + @spec decode_value(value_t()) :: {:ok, value_t()} | {:error, binary()} + def decode_value(<<>>), do: {:ok, :empty_slot} + def decode_value(<<_::256>> = root), do: {:ok, root} +end diff --git a/lib/lambda_ethereum_consensus/store/block_db.ex b/lib/lambda_ethereum_consensus/store/block_db.ex index da170cf5d..a498188cd 100644 --- a/lib/lambda_ethereum_consensus/store/block_db.ex +++ b/lib/lambda_ethereum_consensus/store/block_db.ex @@ -3,12 +3,12 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do Storage and retrieval of blocks. """ require Logger + alias LambdaEthereumConsensus.Store.BlockBySlot alias LambdaEthereumConsensus.Store.Db alias LambdaEthereumConsensus.Store.Utils alias Types.BlockInfo @block_prefix "blockHash" - @blockslot_prefix "blockSlot" @block_status_prefix "blockStatus" @spec store_block_info(BlockInfo.t()) :: :ok @@ -22,8 +22,7 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do # TODO: this should apply fork-choice if not applied elsewhere # TODO: handle cases where slot is empty if not is_nil(block_info.signed_block) do - slothash_key = block_root_by_slot_key(block_info.signed_block.message.slot) - Db.put(slothash_key, block_info.root) + BlockBySlot.put(block_info.signed_block.message.slot, block_info.root) end end @@ -35,24 +34,15 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do end end - @spec get_block_root_by_slot(Types.slot()) :: - {:ok, Types.root()} | {:error, String.t()} | :not_found | :empty_slot - def get_block_root_by_slot(slot) do - key = block_root_by_slot_key(slot) - block = Db.get(key) - - case block do - {:ok, <<>>} -> :empty_slot - _ -> block - end - end - @spec get_block_info_by_slot(Types.slot()) :: {:ok, BlockInfo.t()} | {:error, String.t()} | :not_found | :empty_slot def get_block_info_by_slot(slot) do # WARN: this will return the latest block received for the given slot - with {:ok, root} <- get_block_root_by_slot(slot) do - get_block_info(root) + # TODO: Are we actually saving empty slots in this index? + case BlockBySlot.get(slot) do + {:ok, :empty_slot} -> :empty_slot + {:ok, root} -> get_block_info(root) + other -> other end end @@ -95,58 +85,33 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do @spec prune_blocks_older_than(non_neg_integer()) :: :ok | {:error, String.t()} | :not_found def prune_blocks_older_than(slot) do Logger.info("[BlockDb] Pruning started.", slot: slot) - initial_key = slot |> block_root_by_slot_key() - - slots_to_remove = - Stream.resource( - fn -> init_keycursor(initial_key) end, - &next_slot(&1, :prev), - &close_cursor/1 - ) - |> Enum.to_list() - - slots_to_remove |> Enum.each(&remove_block_by_slot/1) - Logger.info("[BlockDb] Pruning finished. #{Enum.count(slots_to_remove)} blocks removed.") - end - - @spec remove_block_by_slot(non_neg_integer()) :: :ok | :not_found - defp remove_block_by_slot(slot) do - slothash_key = block_root_by_slot_key(slot) - - with {:ok, block_root} <- Db.get(slothash_key) do - key_block = block_key(block_root) - Db.delete(slothash_key) - Db.delete(key_block) - end - end - - defp init_keycursor(initial_key) do - with {:ok, it} <- Db.iterate_keys(), - {:ok, _key} <- Exleveldb.iterator_move(it, initial_key) do - it - else - # DB is empty - {:error, :invalid_iterator} -> nil - end - end - defp next_slot(nil, _movement), do: {:halt, nil} - - defp next_slot(it, movement) do - case Exleveldb.iterator_move(it, movement) do - {:ok, @blockslot_prefix <> <>} -> - {[key], it} - - _ -> - {:halt, it} + result = + BlockBySlot.fold_keys(slot, 0, fn slot, acc -> + case BlockBySlot.get(slot) do + {:ok, :empty_slot} -> + BlockBySlot.delete(slot) + acc + 1 + + {:ok, block_root} -> + BlockBySlot.delete(slot) + Db.delete(block_key(block_root)) + acc + 1 + + other -> + Logger.error( + "[Block pruning] Failed to remove block from slot #{inspect(slot)}. Reason: #{inspect(other)}" + ) + end + end) + + # TODO: the separate get operation is avoided if we implement folding with values in KvSchema. + case result do + {:ok, n_removed} -> Logger.info("[BlockDb] Pruning finished. #{n_removed} blocks removed.") + {:error, reason} -> Logger.error("[BlockDb] Error pruning blocks: #{inspect(reason)}") end end - defp close_cursor(nil), do: :ok - defp close_cursor(it), do: :ok = Exleveldb.iterator_close(it) - defp block_key(root), do: Utils.get_key(@block_prefix, root) - defp block_root_by_slot_key(slot), do: Utils.get_key(@blockslot_prefix, slot) - defp block_status_key(status), do: Utils.get_key(@block_status_prefix, Atom.to_string(status)) end diff --git a/lib/lambda_ethereum_consensus/store/checkpoint_states_db.ex b/lib/lambda_ethereum_consensus/store/checkpoint_states_db.ex index 9548556de..3ad78baab 100644 --- a/lib/lambda_ethereum_consensus/store/checkpoint_states_db.ex +++ b/lib/lambda_ethereum_consensus/store/checkpoint_states_db.ex @@ -76,7 +76,7 @@ defmodule LambdaEthereumConsensus.Store.CheckpointStates do def prune(finalized_checkpoint) do Logger.debug("Pruning old checkpoint states") - case fold(finalized_checkpoint, 0, fn key, acc -> + case fold_keys(finalized_checkpoint, 0, fn key, acc -> delete(key) acc + 1 end) do diff --git a/lib/lambda_ethereum_consensus/store/kv_schema.ex b/lib/lambda_ethereum_consensus/store/kv_schema.ex index 9d5fe3592..299947fec 100644 --- a/lib/lambda_ethereum_consensus/store/kv_schema.ex +++ b/lib/lambda_ethereum_consensus/store/kv_schema.ex @@ -56,18 +56,21 @@ defmodule LambdaEthereumConsensus.Store.KvSchema do end) end - @spec fold(key(), acc(), (key(), acc() -> acc())) :: {:ok, acc()} | {:error, binary()} - def fold(start_key, starting_value, f) do - db_span("fold", fn -> - with {:ok, it} <- Db.iterate(), + @spec fold_keys(key(), acc(), (key(), acc() -> acc())) :: {:ok, acc()} | {:error, any()} + def fold_keys(start_key, starting_value, f, opts \\ []) do + db_span("fold_keys", fn -> + include_first? = Keyword.get(opts, :include_first, false) + direction = Keyword.get(opts, :direction, :prev) + + with {:ok, it} <- Db.iterate_keys(), {:ok, encoded_start} <- do_encode_key(start_key), - {:ok, ^encoded_start, _} <- Exleveldb.iterator_move(it, encoded_start) do - res = iterate(it, starting_value, f) + {:ok, ^encoded_start} <- Exleveldb.iterator_move(it, encoded_start) do + res = iterate(it, starting_value, f, direction, encoded_start, include_first?) Exleveldb.iterator_close(it) {:ok, res} else - # Failed at moving the iterator for the first time. - {:ok, some_key, _some_value} -> + # The iterator moved for the first time to a place where it wasn't expected. + {:ok, some_key} -> {:error, "Failed to start iterator for table #{@prefix}. The obtained key is: #{some_key}"} @@ -77,16 +80,32 @@ defmodule LambdaEthereumConsensus.Store.KvSchema do end) end - defp iterate(it, acc, f) do - case Exleveldb.iterator_move(it, :prev) do - # TODO: add option to get the value in the function too if needed. - {:ok, @prefix <> _ = k, v} -> - # TODO: plan for weird corner cases where the key can't be decoded. + defp iterate(it, acc, f, direction, _first_key, false) do + iterate(it, acc, f, direction) + end + + defp iterate(it, acc, f, direction, first_key, true) do + case accumulate(it, acc, f, first_key) do + {:cont, new_acc} -> iterate(it, new_acc, f, direction) + {:halt, new_acc} -> new_acc + end + end + + defp iterate(it, acc, f, direction) do + case accumulate(it, acc, f, direction) do + {:cont, acc} -> iterate(it, acc, f, direction) + {:halt, acc} -> acc + end + end + + defp accumulate(it, acc, f, direction) do + case Exleveldb.iterator_move(it, direction) do + {:ok, @prefix <> _ = k} -> {:ok, decoded_key} = do_decode_key(k) - iterate(it, f.(decoded_key, acc), f) + {:cont, f.(decoded_key, acc)} _ -> - acc + {:halt, acc} end end diff --git a/lib/types/types.ex b/lib/types/types.ex index dc7df1cf2..0d8ee00dd 100644 --- a/lib/types/types.ex +++ b/lib/types/types.ex @@ -48,4 +48,12 @@ defmodule Types do @type kzg_proof :: Kzg.proof() @type bls_signature :: Bls.signature() @type bls_pubkey :: Bls.pubkey() + + defmodule Guards do + @moduledoc """ + Module defining guards for some types. Added as needed. + """ + + defguard is_root(binary) when is_binary(binary) and byte_size(binary) == 32 + end end diff --git a/test/fixtures/block.ex b/test/fixtures/block.ex index d8fbc0c16..097e6f2bc 100644 --- a/test/fixtures/block.ex +++ b/test/fixtures/block.ex @@ -5,6 +5,7 @@ defmodule Fixtures.Block do alias Fixtures.Random alias LambdaEthereumConsensus.Utils.BitVector + alias Types.BlockInfo alias Types.BeaconBlock alias Types.BeaconBlockBody @@ -13,6 +14,11 @@ defmodule Fixtures.Block do alias Types.ExecutionPayloadHeader alias Types.SignedBeaconBlock + @spec block_info :: BlockInfo.t() + def block_info() do + signed_beacon_block() |> BlockInfo.from_block() + end + @spec signed_beacon_block :: SignedBeaconBlock.t() def signed_beacon_block() do %SignedBeaconBlock{ diff --git a/test/fixtures/utils.ex b/test/fixtures/utils.ex index a98d17bab..ffbf91f38 100644 --- a/test/fixtures/utils.ex +++ b/test/fixtures/utils.ex @@ -18,6 +18,9 @@ defmodule Fixtures.Random do binary(32) end + @spec slot() :: Types.slot() + def slot(), do: uint64() + @spec bls_signature :: binary def bls_signature() do binary(96) diff --git a/test/unit/store/block_by_slot_test.exs b/test/unit/store/block_by_slot_test.exs new file mode 100644 index 000000000..1f9a94e12 --- /dev/null +++ b/test/unit/store/block_by_slot_test.exs @@ -0,0 +1,60 @@ +defmodule Unit.Store.BlockBySlotTest do + alias Fixtures.Random + alias LambdaEthereumConsensus.Store.BlockBySlot + + use ExUnit.Case + + setup %{tmp_dir: tmp_dir} do + start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: tmp_dir}) + :ok + end + + @tag :tmp_dir + test "Basic saving a block root" do + root = Random.root() + slot = Random.slot() + assert :ok == BlockBySlot.put(slot, root) + assert {:ok, root} == BlockBySlot.get(slot) + end + + @tag :tmp_dir + test "all_present? should return true when checking on a subset or the full set, but false for elements outside" do + Enum.each(2..4, fn slot -> + root = Random.root() + assert :ok == BlockBySlot.put(slot, root) + end) + + assert BlockBySlot.all_present?(2, 4) + assert BlockBySlot.all_present?(3, 3) + refute BlockBySlot.all_present?(1, 4) + refute BlockBySlot.all_present?(2, 5) + refute BlockBySlot.all_present?(1, 1) + end + + @tag :tmp_dir + test "all_present? should return false when elements are missing in between" do + root = Random.root() + BlockBySlot.put(1, root) + BlockBySlot.put(3, root) + + assert BlockBySlot.all_present?(3, 3) + assert BlockBySlot.all_present?(1, 1) + refute BlockBySlot.all_present?(1, 3) + end + + @tag :tmp_dir + test "retrieving an empty slot" do + assert :ok == BlockBySlot.put(1, :empty_slot) + assert {:ok, :empty_slot} == BlockBySlot.get(1) + end + + @tag :tmp_dir + test "Trying to save an atom that's not :empty_slot fails" do + assert_raise(FunctionClauseError, fn -> BlockBySlot.put(1, :some_atom) end) + end + + @tag :tmp_dir + test "Trying to save a non-root binary fails" do + assert_raise(FunctionClauseError, fn -> BlockBySlot.put(1, "Hello") end) + end +end diff --git a/test/unit/store/block_db_test.exs b/test/unit/store/block_db_test.exs new file mode 100644 index 000000000..d1709b719 --- /dev/null +++ b/test/unit/store/block_db_test.exs @@ -0,0 +1,92 @@ +defmodule Unit.Store.BlockDbTest do + alias Fixtures.Block + alias LambdaEthereumConsensus.Store.BlockBySlot + alias LambdaEthereumConsensus.Store.BlockDb + + use ExUnit.Case + + setup %{tmp_dir: tmp_dir} do + Application.fetch_env!(:lambda_ethereum_consensus, ChainSpec) + |> Keyword.put(:config, MainnetConfig) + |> then(&Application.put_env(:lambda_ethereum_consensus, ChainSpec, &1)) + + start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: tmp_dir}) + :ok + end + + @tag :tmp_dir + test "Simple block saving and loading" do + block_info = Block.block_info() + BlockDb.store_block_info(block_info) + assert {:ok, block_info} == BlockDb.get_block_info(block_info.root) + end + + @tag :tmp_dir + test "A saved block's root can be retrieved using its slot" do + block_info = Block.block_info() + BlockDb.store_block_info(block_info) + + assert {:ok, block_info} == + BlockDb.get_block_info_by_slot(block_info.signed_block.message.slot) + end + + @tag :tmp_dir + test "Pruning deletes only blocks prior to the one selected as target" do + blocks = + [block_1, block_2, block_3] = + [Block.block_info(), Block.block_info(), Block.block_info()] + |> Enum.sort_by(& &1.signed_block.message.slot) + + Enum.each(blocks, &BlockDb.store_block_info/1) + + assert {:ok, block_1} == BlockDb.get_block_info(block_1.root) + assert {:ok, block_2} == BlockDb.get_block_info(block_2.root) + assert {:ok, block_3} == BlockDb.get_block_info(block_3.root) + + BlockDb.prune_blocks_older_than(block_2.signed_block.message.slot) + + assert :not_found == BlockDb.get_block_info(block_1.root) + assert {:ok, block_2} == BlockDb.get_block_info(block_2.root) + assert {:ok, block_3} == BlockDb.get_block_info(block_3.root) + end + + @tag :tmp_dir + test "Pruning on a non existent root returns and doesn't delete anything" do + blocks = + [block_1, block_2, block_3] = + [Block.block_info(), Block.block_info(), Block.block_info()] + |> Enum.sort_by(& &1.signed_block.message.slot) + + Enum.each(blocks, &BlockDb.store_block_info/1) + + random_slot = (blocks |> Enum.map(& &1.signed_block.message.slot) |> Enum.max()) + 1 + assert :ok == BlockDb.prune_blocks_older_than(random_slot) + assert {:ok, block_1} == BlockDb.get_block_info(block_1.root) + assert {:ok, block_2} == BlockDb.get_block_info(block_2.root) + assert {:ok, block_3} == BlockDb.get_block_info(block_3.root) + end + + @tag :tmp_dir + test "Empty blocks don't affect pruning" do + blocks = + [block_1, block_2, block_3] = + [Block.block_info(), Block.block_info(), Block.block_info()] + |> Enum.sort_by(& &1.signed_block.message.slot) + + Enum.each(blocks, &BlockDb.store_block_info/1) + + block_slots = Enum.map(blocks, & &1.signed_block.message.slot) + + min_slot = Enum.min(block_slots) - 1 + max_slot = Enum.max(block_slots) + 1 + BlockBySlot.put(max_slot, :empty_slot) + BlockBySlot.put(min_slot, :empty_slot) + + assert :ok == BlockDb.prune_blocks_older_than(max_slot) + assert :not_found == BlockDb.get_block_info(block_1.root) + assert :not_found == BlockDb.get_block_info(block_2.root) + assert :not_found == BlockDb.get_block_info(block_3.root) + assert {:ok, :empty_slot} == BlockBySlot.get(max_slot) + assert :not_found == BlockBySlot.get(min_slot) + end +end diff --git a/test/unit/store/kv_schema_test.exs b/test/unit/store/kv_schema_test.exs index c97b626a5..03c860d72 100644 --- a/test/unit/store/kv_schema_test.exs +++ b/test/unit/store/kv_schema_test.exs @@ -97,7 +97,18 @@ defmodule Unit.Store.KvSchemaTest do NumberSchema.put(5, %{b: 3}) NumberSchema.put(70, %{c: 5}) - assert {:ok, 6} == NumberSchema.fold(70, 0, fn n, acc -> acc + n end) + assert {:ok, 6} == NumberSchema.fold_keys(70, 0, fn n, acc -> acc + n end) + end + + @tag :tmp_dir + test "Folding includes the first value if so requested" do + TupleSchema.put({1, 2}, []) + NumberSchema.put(1, %{"a" => "b"}) + NumberSchema.put(5, %{b: 3}) + NumberSchema.put(70, %{c: 5}) + + assert {:ok, 76} == + NumberSchema.fold_keys(70, 0, fn n, acc -> acc + n end, include_first: true) end @tag :tmp_dir @@ -106,7 +117,7 @@ defmodule Unit.Store.KvSchemaTest do NumberSchema.put(200, %{b: 3}) NumberSchema.put(700, %{c: 5}) - assert {:ok, 300} == NumberSchema.fold(700, 0, fn n, acc -> acc + n end) + assert {:ok, 300} == NumberSchema.fold_keys(700, 0, fn n, acc -> acc + n end) end @tag :tmp_dir @@ -114,6 +125,6 @@ defmodule Unit.Store.KvSchemaTest do NumberSchema.put(1, %{"a" => "b"}) NumberSchema.put(5, %{b: 3}) - {:error, _} = NumberSchema.fold(3, 0, fn n, acc -> acc + n end) + {:error, _} = NumberSchema.fold_keys(3, 0, fn n, acc -> acc + n end) end end