From ef69f7387cb8fd19ff6dbe4940a581ef9c81114a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 12 Jul 2024 16:05:23 +0200 Subject: [PATCH 1/9] Implement fold keys including the first value --- .../store/checkpoint_states_db.ex | 2 +- .../store/kv_schema.ex | 47 +++++++++++++------ test/unit/store/kv_schema_test.exs | 17 +++++-- 3 files changed, 47 insertions(+), 19 deletions(-) diff --git a/lib/lambda_ethereum_consensus/store/checkpoint_states_db.ex b/lib/lambda_ethereum_consensus/store/checkpoint_states_db.ex index 9548556de..3ad78baab 100644 --- a/lib/lambda_ethereum_consensus/store/checkpoint_states_db.ex +++ b/lib/lambda_ethereum_consensus/store/checkpoint_states_db.ex @@ -76,7 +76,7 @@ defmodule LambdaEthereumConsensus.Store.CheckpointStates do def prune(finalized_checkpoint) do Logger.debug("Pruning old checkpoint states") - case fold(finalized_checkpoint, 0, fn key, acc -> + case fold_keys(finalized_checkpoint, 0, fn key, acc -> delete(key) acc + 1 end) do diff --git a/lib/lambda_ethereum_consensus/store/kv_schema.ex b/lib/lambda_ethereum_consensus/store/kv_schema.ex index 9d5fe3592..5f96e78e6 100644 --- a/lib/lambda_ethereum_consensus/store/kv_schema.ex +++ b/lib/lambda_ethereum_consensus/store/kv_schema.ex @@ -56,18 +56,30 @@ defmodule LambdaEthereumConsensus.Store.KvSchema do end) end - @spec fold(key(), acc(), (key(), acc() -> acc())) :: {:ok, acc()} | {:error, binary()} - def fold(start_key, starting_value, f) do - db_span("fold", fn -> - with {:ok, it} <- Db.iterate(), + @spec fold_keys(key(), acc(), (key(), acc() -> acc())) :: {:ok, acc()} | {:error, any()} + def fold_keys(start_key, starting_value, f, opts \\ []) do + db_span("fold_keys", fn -> + include_first? = Keyword.get(opts, :include_first, false) + direction = Keyword.get(opts, :direction, :prev) + + with {:ok, it} <- Db.iterate_keys(), {:ok, encoded_start} <- do_encode_key(start_key), - {:ok, ^encoded_start, _} <- Exleveldb.iterator_move(it, encoded_start) do - res = iterate(it, starting_value, f) + {:ok, ^encoded_start} <- Exleveldb.iterator_move(it, encoded_start) do + res = + if include_first? do + case accumulate(it, starting_value, f, encoded_start) do + {:cont, acc} -> iterate(it, acc, f, direction) + {:halt, acc} -> acc + end + else + iterate(it, starting_value, f, direction) + end + Exleveldb.iterator_close(it) {:ok, res} else - # Failed at moving the iterator for the first time. - {:ok, some_key, _some_value} -> + # The iterator moved for the first time to a place where it wasn't expected. + {:ok, some_key} -> {:error, "Failed to start iterator for table #{@prefix}. The obtained key is: #{some_key}"} @@ -77,16 +89,21 @@ defmodule LambdaEthereumConsensus.Store.KvSchema do end) end - defp iterate(it, acc, f) do - case Exleveldb.iterator_move(it, :prev) do - # TODO: add option to get the value in the function too if needed. - {:ok, @prefix <> _ = k, v} -> - # TODO: plan for weird corner cases where the key can't be decoded. + defp iterate(it, acc, f, direction) do + case accumulate(it, acc, f, direction) do + {:cont, acc} -> iterate(it, acc, f, direction) + {:halt, acc} -> acc + end + end + + defp accumulate(it, acc, f, direction) do + case Exleveldb.iterator_move(it, direction) do + {:ok, @prefix <> _ = k} -> {:ok, decoded_key} = do_decode_key(k) - iterate(it, f.(decoded_key, acc), f) + {:cont, f.(decoded_key, acc)} _ -> - acc + {:halt, acc} end end diff --git a/test/unit/store/kv_schema_test.exs b/test/unit/store/kv_schema_test.exs index c97b626a5..03c860d72 100644 --- a/test/unit/store/kv_schema_test.exs +++ b/test/unit/store/kv_schema_test.exs @@ -97,7 +97,18 @@ defmodule Unit.Store.KvSchemaTest do NumberSchema.put(5, %{b: 3}) NumberSchema.put(70, %{c: 5}) - assert {:ok, 6} == NumberSchema.fold(70, 0, fn n, acc -> acc + n end) + assert {:ok, 6} == NumberSchema.fold_keys(70, 0, fn n, acc -> acc + n end) + end + + @tag :tmp_dir + test "Folding includes the first value if so requested" do + TupleSchema.put({1, 2}, []) + NumberSchema.put(1, %{"a" => "b"}) + NumberSchema.put(5, %{b: 3}) + NumberSchema.put(70, %{c: 5}) + + assert {:ok, 76} == + NumberSchema.fold_keys(70, 0, fn n, acc -> acc + n end, include_first: true) end @tag :tmp_dir @@ -106,7 +117,7 @@ defmodule Unit.Store.KvSchemaTest do NumberSchema.put(200, %{b: 3}) NumberSchema.put(700, %{c: 5}) - assert {:ok, 300} == NumberSchema.fold(700, 0, fn n, acc -> acc + n end) + assert {:ok, 300} == NumberSchema.fold_keys(700, 0, fn n, acc -> acc + n end) end @tag :tmp_dir @@ -114,6 +125,6 @@ defmodule Unit.Store.KvSchemaTest do NumberSchema.put(1, %{"a" => "b"}) NumberSchema.put(5, %{b: 3}) - {:error, _} = NumberSchema.fold(3, 0, fn n, acc -> acc + n end) + {:error, _} = NumberSchema.fold_keys(3, 0, fn n, acc -> acc + n end) end end From d98f36a88a24958c57bfbe1068f9364bfb40b58b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 12 Jul 2024 16:38:11 +0200 Subject: [PATCH 2/9] Add block by slot KvSchema implementation. Make Block DB and others use it. Implement all_present? --- .../controllers/v1/beacon_controller.ex | 4 +- lib/beacon_api/helpers.ex | 3 +- .../store/block_by_slot.ex | 64 +++++++++++++ .../store/block_db.ex | 91 +++++++------------ test/fixtures/utils.ex | 3 + test/unit/store/block_by_slot_test.exs | 48 ++++++++++ 6 files changed, 150 insertions(+), 63 deletions(-) create mode 100644 lib/lambda_ethereum_consensus/store/block_by_slot.ex create mode 100644 test/unit/store/block_by_slot_test.exs diff --git a/lib/beacon_api/controllers/v1/beacon_controller.ex b/lib/beacon_api/controllers/v1/beacon_controller.ex index f01258399..1f33036bf 100644 --- a/lib/beacon_api/controllers/v1/beacon_controller.ex +++ b/lib/beacon_api/controllers/v1/beacon_controller.ex @@ -5,7 +5,7 @@ defmodule BeaconApi.V1.BeaconController do alias BeaconApi.ErrorController alias BeaconApi.Helpers alias BeaconApi.Utils - alias LambdaEthereumConsensus.Store.BlockDb + alias LambdaEthereumConsensus.Store.BlockBySlot alias LambdaEthereumConsensus.Store.Blocks alias LambdaEthereumConsensus.Store.StoreDb @@ -94,7 +94,7 @@ defmodule BeaconApi.V1.BeaconController do def get_block_root(conn, %{block_id: block_id}) do with {slot, ""} when slot >= 0 <- Integer.parse(block_id), - {:ok, block_root} <- BlockDb.get_block_root_by_slot(slot) do + {:ok, block_root} <- BlockBySlot.get(slot) do conn |> root_response(block_root, true, false) else :not_found -> diff --git a/lib/beacon_api/helpers.ex b/lib/beacon_api/helpers.ex index 20b95772e..56ab7bad6 100644 --- a/lib/beacon_api/helpers.ex +++ b/lib/beacon_api/helpers.ex @@ -4,6 +4,7 @@ defmodule BeaconApi.Helpers do """ alias LambdaEthereumConsensus.ForkChoice + alias LambdaEthereumConsensus.Store.BlockBySlot alias LambdaEthereumConsensus.Store.BlockDb alias LambdaEthereumConsensus.Store.Blocks alias LambdaEthereumConsensus.Store.StateDb @@ -78,7 +79,7 @@ defmodule BeaconApi.Helpers do def block_root_by_block_id(slot) when is_integer(slot) do with :ok <- check_valid_slot(slot, ForkChoice.get_current_chain_slot()), - {:ok, root} <- BlockDb.get_block_root_by_slot(slot) do + {:ok, root} <- BlockBySlot.get(slot) do # TODO compute is_optimistic_or_invalid() and is_finalized() execution_optimistic = true finalized = false diff --git a/lib/lambda_ethereum_consensus/store/block_by_slot.ex b/lib/lambda_ethereum_consensus/store/block_by_slot.ex new file mode 100644 index 000000000..91542a2f7 --- /dev/null +++ b/lib/lambda_ethereum_consensus/store/block_by_slot.ex @@ -0,0 +1,64 @@ +defmodule LambdaEthereumConsensus.Store.BlockBySlot do + alias LambdaEthereumConsensus.Store.KvSchema + use KvSchema, prefix: "blockSlot" + @type value_t :: Types.root() | <<>> + + ################################ + ### PUBLIC API + ################################ + + @doc """ + Checks if all the blocks between first_slot and last_slot are present in the db. + This iterates through the db checking each one individually, although it only checks + the keys, so it doesn't need to decode the values, making it a relatively cheap + linear O(last_slot - first_slot) operation. + """ + @spec all_present?(Types.slot(), Types.slot()) :: boolean() + def all_present?(first_slot, last_slot) do + fold_keys(last_slot, MapSet.new(), fn slot, set -> MapSet.put(set, slot) end, + include_first: true + ) + |> case do + {:ok, available} -> + Enum.all?(first_slot..last_slot, fn slot -> slot in available end) + + {:error, :invalid_iterator} -> + false + + {:error, "Failed to start iterator for table" <> _} -> + false + end + end + + ################################ + ### Schema implementation + ################################ + + @impl KvSchema + @spec encode_key(Types.slot()) :: {:ok, binary()} | {:error, binary()} + def encode_key(slot), do: {:ok, <>} + + @impl KvSchema + @spec decode_key(binary()) :: {:ok, integer()} | {:error, binary()} + def decode_key(<>), do: {:ok, slot} + + def decode_key(other) do + {:error, "[Block by slot] Could not decode slot, not 64 bit integer: #{other}"} + end + + @impl KvSchema + @spec encode_value(value_t()) :: {:ok, value_t()} | {:error, binary()} + def encode_value(root), do: check_root("Encoding", root) + + @impl KvSchema + @spec decode_value(value_t()) :: {:ok, value_t()} | {:error, binary()} + def decode_value(root), do: check_root("Decoding", root) + + ################################ + ### Private functions + ################################ + + defp check_root(_op, <<>>), do: {:ok, <<>>} + defp check_root(_op, <<_::256>> = root), do: {:ok, root} + defp check_root(op, other), do: {:error, "[Block by slot] #{op} error. Invalid root: #{other}"} +end diff --git a/lib/lambda_ethereum_consensus/store/block_db.ex b/lib/lambda_ethereum_consensus/store/block_db.ex index da170cf5d..ac41d32cf 100644 --- a/lib/lambda_ethereum_consensus/store/block_db.ex +++ b/lib/lambda_ethereum_consensus/store/block_db.ex @@ -3,12 +3,12 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do Storage and retrieval of blocks. """ require Logger + alias LambdaEthereumConsensus.Store.BlockBySlot alias LambdaEthereumConsensus.Store.Db alias LambdaEthereumConsensus.Store.Utils alias Types.BlockInfo @block_prefix "blockHash" - @blockslot_prefix "blockSlot" @block_status_prefix "blockStatus" @spec store_block_info(BlockInfo.t()) :: :ok @@ -22,8 +22,7 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do # TODO: this should apply fork-choice if not applied elsewhere # TODO: handle cases where slot is empty if not is_nil(block_info.signed_block) do - slothash_key = block_root_by_slot_key(block_info.signed_block.message.slot) - Db.put(slothash_key, block_info.root) + BlockBySlot.put(block_info.signed_block.message.slot, block_info.root) end end @@ -35,24 +34,14 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do end end - @spec get_block_root_by_slot(Types.slot()) :: - {:ok, Types.root()} | {:error, String.t()} | :not_found | :empty_slot - def get_block_root_by_slot(slot) do - key = block_root_by_slot_key(slot) - block = Db.get(key) - - case block do - {:ok, <<>>} -> :empty_slot - _ -> block - end - end - @spec get_block_info_by_slot(Types.slot()) :: {:ok, BlockInfo.t()} | {:error, String.t()} | :not_found | :empty_slot def get_block_info_by_slot(slot) do # WARN: this will return the latest block received for the given slot - with {:ok, root} <- get_block_root_by_slot(slot) do - get_block_info(root) + case BlockBySlot.get(slot) do + {:ok, <<>>} -> :empty_slot + {:ok, root} -> get_block_info(root) + other -> other end end @@ -95,58 +84,40 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do @spec prune_blocks_older_than(non_neg_integer()) :: :ok | {:error, String.t()} | :not_found def prune_blocks_older_than(slot) do Logger.info("[BlockDb] Pruning started.", slot: slot) - initial_key = slot |> block_root_by_slot_key() - - slots_to_remove = - Stream.resource( - fn -> init_keycursor(initial_key) end, - &next_slot(&1, :prev), - &close_cursor/1 - ) - |> Enum.to_list() - - slots_to_remove |> Enum.each(&remove_block_by_slot/1) - Logger.info("[BlockDb] Pruning finished. #{Enum.count(slots_to_remove)} blocks removed.") + + # TODO: this can be improved if we implement the folding with values in KvSchema. + n_removed = + BlockBySlot.fold_keys(slot, 0, fn slot, acc -> + case remove_block_by_slot(slot) do + :ok -> + acc + 1 + + other -> + Logger.error( + "[Block pruning] Failed to remove block from slot #{inspect(slot)}. Reason: #{inspect(other)}" + ) + end + end) + + Logger.info("[BlockDb] Pruning finished. #{n_removed} blocks removed.") end @spec remove_block_by_slot(non_neg_integer()) :: :ok | :not_found defp remove_block_by_slot(slot) do - slothash_key = block_root_by_slot_key(slot) - - with {:ok, block_root} <- Db.get(slothash_key) do - key_block = block_key(block_root) - Db.delete(slothash_key) - Db.delete(key_block) - end - end - - defp init_keycursor(initial_key) do - with {:ok, it} <- Db.iterate_keys(), - {:ok, _key} <- Exleveldb.iterator_move(it, initial_key) do - it - else - # DB is empty - {:error, :invalid_iterator} -> nil - end - end - - defp next_slot(nil, _movement), do: {:halt, nil} + case BlockBySlot.get(slot) do + {:ok, <<>>} -> + :ok - defp next_slot(it, movement) do - case Exleveldb.iterator_move(it, movement) do - {:ok, @blockslot_prefix <> <>} -> - {[key], it} + {:ok, block_root} -> + key_block = block_key(block_root) + BlockBySlot.delete(slot) + Db.delete(key_block) - _ -> - {:halt, it} + other -> + other end end - defp close_cursor(nil), do: :ok - defp close_cursor(it), do: :ok = Exleveldb.iterator_close(it) - defp block_key(root), do: Utils.get_key(@block_prefix, root) - defp block_root_by_slot_key(slot), do: Utils.get_key(@blockslot_prefix, slot) - defp block_status_key(status), do: Utils.get_key(@block_status_prefix, Atom.to_string(status)) end diff --git a/test/fixtures/utils.ex b/test/fixtures/utils.ex index a98d17bab..ffbf91f38 100644 --- a/test/fixtures/utils.ex +++ b/test/fixtures/utils.ex @@ -18,6 +18,9 @@ defmodule Fixtures.Random do binary(32) end + @spec slot() :: Types.slot() + def slot(), do: uint64() + @spec bls_signature :: binary def bls_signature() do binary(96) diff --git a/test/unit/store/block_by_slot_test.exs b/test/unit/store/block_by_slot_test.exs new file mode 100644 index 000000000..a671ffd7d --- /dev/null +++ b/test/unit/store/block_by_slot_test.exs @@ -0,0 +1,48 @@ +defmodule Unit.Store.BlockBySlotTest do + alias Fixtures.Random + alias LambdaEthereumConsensus.Store.BlockBySlot + + use ExUnit.Case + + setup %{tmp_dir: tmp_dir} do + Application.fetch_env!(:lambda_ethereum_consensus, ChainSpec) + |> Keyword.put(:config, MinimalConfig) + |> then(&Application.put_env(:lambda_ethereum_consensus, ChainSpec, &1)) + + start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: tmp_dir}) + :ok + end + + @tag :tmp_dir + test "Basic saving a block root" do + root = Random.root() + slot = Random.slot() + assert :ok == BlockBySlot.put(slot, root) + assert {:ok, root} == BlockBySlot.get(slot) + end + + @tag :tmp_dir + test "all_present? should return true when checking on a subset or the full set, but false for elements outside" do + Enum.each(2..4, fn slot -> + root = Random.root() + assert :ok == BlockBySlot.put(slot, root) + end) + + assert BlockBySlot.all_present?(2, 4) + assert BlockBySlot.all_present?(3, 3) + refute BlockBySlot.all_present?(1, 4) + refute BlockBySlot.all_present?(2, 5) + refute BlockBySlot.all_present?(1, 1) + end + + @tag :tmp_dir + test "all_present? should return false when elements are missing in between" do + root = Random.root() + BlockBySlot.put(1, root) + BlockBySlot.put(3, root) + + assert BlockBySlot.all_present?(3, 3) + assert BlockBySlot.all_present?(1, 1) + refute BlockBySlot.all_present?(1, 3) + end +end From 5aee827d449e499811f14bb26fc4df8f8e8f7a88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Sat, 13 Jul 2024 16:29:51 +0200 Subject: [PATCH 3/9] add tests for block db to make sure the refactor to block by slot schema didn't break anything --- test/fixtures/block.ex | 6 +++ test/unit/store/block_db_test.exs | 66 +++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 test/unit/store/block_db_test.exs diff --git a/test/fixtures/block.ex b/test/fixtures/block.ex index d8fbc0c16..097e6f2bc 100644 --- a/test/fixtures/block.ex +++ b/test/fixtures/block.ex @@ -5,6 +5,7 @@ defmodule Fixtures.Block do alias Fixtures.Random alias LambdaEthereumConsensus.Utils.BitVector + alias Types.BlockInfo alias Types.BeaconBlock alias Types.BeaconBlockBody @@ -13,6 +14,11 @@ defmodule Fixtures.Block do alias Types.ExecutionPayloadHeader alias Types.SignedBeaconBlock + @spec block_info :: BlockInfo.t() + def block_info() do + signed_beacon_block() |> BlockInfo.from_block() + end + @spec signed_beacon_block :: SignedBeaconBlock.t() def signed_beacon_block() do %SignedBeaconBlock{ diff --git a/test/unit/store/block_db_test.exs b/test/unit/store/block_db_test.exs new file mode 100644 index 000000000..b2ce47b37 --- /dev/null +++ b/test/unit/store/block_db_test.exs @@ -0,0 +1,66 @@ +defmodule Unit.Store.BlockDbTest do + alias Fixtures.Block + alias LambdaEthereumConsensus.Store.BlockDb + + use ExUnit.Case + + setup %{tmp_dir: tmp_dir} do + start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: tmp_dir}) + :ok + end + + @tag :tmp_dir + test "Simple block saving and loading" do + block_info = Block.block_info() + BlockDb.store_block_info(block_info) + assert {:ok, block_info} == BlockDb.get_block_info(block_info.root) + end + + @tag :tmp_dir + test "A saved block's root can be retrieved using its slot" do + block_info = Block.block_info() + BlockDb.store_block_info(block_info) + + assert {:ok, block_info} == + BlockDb.get_block_info_by_slot(block_info.signed_block.message.slot) + end + + @tag :tmp_dir + test "Pruning deletes only blocks prior to the one selected as target" do + blocks = + [block_1, block_2, block_3] = + [Block.block_info(), Block.block_info(), Block.block_info()] + |> Enum.sort_by(& &1.signed_block.message.slot) + + Enum.each(blocks, &BlockDb.store_block_info/1) + + assert {:ok, block_1} == BlockDb.get_block_info(block_1.root) + assert {:ok, block_2} == BlockDb.get_block_info(block_2.root) + assert {:ok, block_3} == BlockDb.get_block_info(block_3.root) + + BlockDb.prune_blocks_older_than(block_2.signed_block.message.slot) + + assert :not_found == BlockDb.get_block_info(block_1.root) + assert {:ok, block_2} == BlockDb.get_block_info(block_2.root) + assert {:ok, block_3} == BlockDb.get_block_info(block_3.root) + end + + @tag :tmp_dir + test "Pruning on a non existent root returns and doesn't delete anything" do + # TODO: this might not work for empty blocks. We may need to delete any block with lower slot + # anyway, even if the slot doesn't match anything. + + blocks = + [block_1, block_2, block_3] = + [Block.block_info(), Block.block_info(), Block.block_info()] + |> Enum.sort_by(& &1.signed_block.message.slot) + + Enum.each(blocks, &BlockDb.store_block_info/1) + + random_slot = (blocks |> Enum.map(& &1.signed_block.message.slot) |> Enum.max()) + 1 + assert :ok == BlockDb.prune_blocks_older_than(random_slot) + assert {:ok, block_1} == BlockDb.get_block_info(block_1.root) + assert {:ok, block_2} == BlockDb.get_block_info(block_2.root) + assert {:ok, block_3} == BlockDb.get_block_info(block_3.root) + end +end From 167a85a87ccc3fa58f8992ec95466ab8b1950637 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Sat, 13 Jul 2024 16:32:24 +0200 Subject: [PATCH 4/9] fix setup --- test/unit/store/block_by_slot_test.exs | 4 ---- test/unit/store/block_db_test.exs | 4 ++++ 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test/unit/store/block_by_slot_test.exs b/test/unit/store/block_by_slot_test.exs index a671ffd7d..997cbae89 100644 --- a/test/unit/store/block_by_slot_test.exs +++ b/test/unit/store/block_by_slot_test.exs @@ -5,10 +5,6 @@ defmodule Unit.Store.BlockBySlotTest do use ExUnit.Case setup %{tmp_dir: tmp_dir} do - Application.fetch_env!(:lambda_ethereum_consensus, ChainSpec) - |> Keyword.put(:config, MinimalConfig) - |> then(&Application.put_env(:lambda_ethereum_consensus, ChainSpec, &1)) - start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: tmp_dir}) :ok end diff --git a/test/unit/store/block_db_test.exs b/test/unit/store/block_db_test.exs index b2ce47b37..abe3e9ef8 100644 --- a/test/unit/store/block_db_test.exs +++ b/test/unit/store/block_db_test.exs @@ -5,6 +5,10 @@ defmodule Unit.Store.BlockDbTest do use ExUnit.Case setup %{tmp_dir: tmp_dir} do + Application.fetch_env!(:lambda_ethereum_consensus, ChainSpec) + |> Keyword.put(:config, MainnetConfig) + |> then(&Application.put_env(:lambda_ethereum_consensus, ChainSpec, &1)) + start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: tmp_dir}) :ok end From d256b36041e9c690b291e4fd54945e0f2f31cd20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Sat, 13 Jul 2024 17:04:22 +0200 Subject: [PATCH 5/9] fix linter --- .../store/block_by_slot.ex | 7 ++++++ .../store/kv_schema.ex | 22 ++++++++++--------- test/unit/store/block_db_test.exs | 2 ++ 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/lib/lambda_ethereum_consensus/store/block_by_slot.ex b/lib/lambda_ethereum_consensus/store/block_by_slot.ex index 91542a2f7..198ae8f65 100644 --- a/lib/lambda_ethereum_consensus/store/block_by_slot.ex +++ b/lib/lambda_ethereum_consensus/store/block_by_slot.ex @@ -1,4 +1,11 @@ defmodule LambdaEthereumConsensus.Store.BlockBySlot do + @moduledoc """ + KvSchema that stores block roots indexed by slot. As we store blocks by their root, this module + acts as an index if we need to look for them using their root. Some use cases are block pruning + (removing all blocks prior to a slot) or checking if a range of slots contain blocks, for + checkpoint sync checks. + """ + alias LambdaEthereumConsensus.Store.KvSchema use KvSchema, prefix: "blockSlot" @type value_t :: Types.root() | <<>> diff --git a/lib/lambda_ethereum_consensus/store/kv_schema.ex b/lib/lambda_ethereum_consensus/store/kv_schema.ex index 5f96e78e6..299947fec 100644 --- a/lib/lambda_ethereum_consensus/store/kv_schema.ex +++ b/lib/lambda_ethereum_consensus/store/kv_schema.ex @@ -65,16 +65,7 @@ defmodule LambdaEthereumConsensus.Store.KvSchema do with {:ok, it} <- Db.iterate_keys(), {:ok, encoded_start} <- do_encode_key(start_key), {:ok, ^encoded_start} <- Exleveldb.iterator_move(it, encoded_start) do - res = - if include_first? do - case accumulate(it, starting_value, f, encoded_start) do - {:cont, acc} -> iterate(it, acc, f, direction) - {:halt, acc} -> acc - end - else - iterate(it, starting_value, f, direction) - end - + res = iterate(it, starting_value, f, direction, encoded_start, include_first?) Exleveldb.iterator_close(it) {:ok, res} else @@ -89,6 +80,17 @@ defmodule LambdaEthereumConsensus.Store.KvSchema do end) end + defp iterate(it, acc, f, direction, _first_key, false) do + iterate(it, acc, f, direction) + end + + defp iterate(it, acc, f, direction, first_key, true) do + case accumulate(it, acc, f, first_key) do + {:cont, new_acc} -> iterate(it, new_acc, f, direction) + {:halt, new_acc} -> new_acc + end + end + defp iterate(it, acc, f, direction) do case accumulate(it, acc, f, direction) do {:cont, acc} -> iterate(it, acc, f, direction) diff --git a/test/unit/store/block_db_test.exs b/test/unit/store/block_db_test.exs index abe3e9ef8..d1c1ab3fa 100644 --- a/test/unit/store/block_db_test.exs +++ b/test/unit/store/block_db_test.exs @@ -67,4 +67,6 @@ defmodule Unit.Store.BlockDbTest do assert {:ok, block_2} == BlockDb.get_block_info(block_2.root) assert {:ok, block_3} == BlockDb.get_block_info(block_3.root) end + + # TODO: test for empty slots. end From 1a975134ecb00bfe3beebd8e020b93f9f87d1718 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Sun, 14 Jul 2024 20:14:41 +0200 Subject: [PATCH 6/9] Add tests for empty slots. Make decoded block by slot value be :empty_value --- .../store/block_by_slot.ex | 16 +++------- .../store/block_db.ex | 31 +++++++------------ test/unit/store/block_by_slot_test.exs | 16 ++++++++++ test/unit/store/block_db_test.exs | 28 ++++++++++++++--- 4 files changed, 56 insertions(+), 35 deletions(-) diff --git a/lib/lambda_ethereum_consensus/store/block_by_slot.ex b/lib/lambda_ethereum_consensus/store/block_by_slot.ex index 198ae8f65..765a180c7 100644 --- a/lib/lambda_ethereum_consensus/store/block_by_slot.ex +++ b/lib/lambda_ethereum_consensus/store/block_by_slot.ex @@ -8,7 +8,7 @@ defmodule LambdaEthereumConsensus.Store.BlockBySlot do alias LambdaEthereumConsensus.Store.KvSchema use KvSchema, prefix: "blockSlot" - @type value_t :: Types.root() | <<>> + @type value_t :: Types.root() | :empty_slot ################################ ### PUBLIC API @@ -55,17 +55,11 @@ defmodule LambdaEthereumConsensus.Store.BlockBySlot do @impl KvSchema @spec encode_value(value_t()) :: {:ok, value_t()} | {:error, binary()} - def encode_value(root), do: check_root("Encoding", root) + def encode_value(:empty_slot), do: {:ok, <<>>} + def encode_value(<<_::256>> = root), do: {:ok, root} @impl KvSchema @spec decode_value(value_t()) :: {:ok, value_t()} | {:error, binary()} - def decode_value(root), do: check_root("Decoding", root) - - ################################ - ### Private functions - ################################ - - defp check_root(_op, <<>>), do: {:ok, <<>>} - defp check_root(_op, <<_::256>> = root), do: {:ok, root} - defp check_root(op, other), do: {:error, "[Block by slot] #{op} error. Invalid root: #{other}"} + def decode_value(<<>>), do: {:ok, :empty_slot} + def decode_value(<<_::256>> = root), do: {:ok, root} end diff --git a/lib/lambda_ethereum_consensus/store/block_db.ex b/lib/lambda_ethereum_consensus/store/block_db.ex index ac41d32cf..31dd7a7fa 100644 --- a/lib/lambda_ethereum_consensus/store/block_db.ex +++ b/lib/lambda_ethereum_consensus/store/block_db.ex @@ -38,8 +38,9 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do {:ok, BlockInfo.t()} | {:error, String.t()} | :not_found | :empty_slot def get_block_info_by_slot(slot) do # WARN: this will return the latest block received for the given slot + # TODO: Are we actually saving empty slots in this index? case BlockBySlot.get(slot) do - {:ok, <<>>} -> :empty_slot + {:ok, :empty_slot} -> :empty_slot {:ok, root} -> get_block_info(root) other -> other end @@ -85,11 +86,17 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do def prune_blocks_older_than(slot) do Logger.info("[BlockDb] Pruning started.", slot: slot) - # TODO: this can be improved if we implement the folding with values in KvSchema. + # TODO: the separate get operation is avoided if we implement folding with values in KvSchema. n_removed = BlockBySlot.fold_keys(slot, 0, fn slot, acc -> - case remove_block_by_slot(slot) do - :ok -> + case BlockBySlot.get(slot) do + {:ok, :empty_slot} -> + BlockBySlot.delete(slot) + acc + 1 + + {:ok, block_root} -> + BlockBySlot.delete(slot) + Db.delete(block_key(block_root)) acc + 1 other -> @@ -102,22 +109,6 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do Logger.info("[BlockDb] Pruning finished. #{n_removed} blocks removed.") end - @spec remove_block_by_slot(non_neg_integer()) :: :ok | :not_found - defp remove_block_by_slot(slot) do - case BlockBySlot.get(slot) do - {:ok, <<>>} -> - :ok - - {:ok, block_root} -> - key_block = block_key(block_root) - BlockBySlot.delete(slot) - Db.delete(key_block) - - other -> - other - end - end - defp block_key(root), do: Utils.get_key(@block_prefix, root) defp block_status_key(status), do: Utils.get_key(@block_status_prefix, Atom.to_string(status)) end diff --git a/test/unit/store/block_by_slot_test.exs b/test/unit/store/block_by_slot_test.exs index 997cbae89..1f9a94e12 100644 --- a/test/unit/store/block_by_slot_test.exs +++ b/test/unit/store/block_by_slot_test.exs @@ -41,4 +41,20 @@ defmodule Unit.Store.BlockBySlotTest do assert BlockBySlot.all_present?(1, 1) refute BlockBySlot.all_present?(1, 3) end + + @tag :tmp_dir + test "retrieving an empty slot" do + assert :ok == BlockBySlot.put(1, :empty_slot) + assert {:ok, :empty_slot} == BlockBySlot.get(1) + end + + @tag :tmp_dir + test "Trying to save an atom that's not :empty_slot fails" do + assert_raise(FunctionClauseError, fn -> BlockBySlot.put(1, :some_atom) end) + end + + @tag :tmp_dir + test "Trying to save a non-root binary fails" do + assert_raise(FunctionClauseError, fn -> BlockBySlot.put(1, "Hello") end) + end end diff --git a/test/unit/store/block_db_test.exs b/test/unit/store/block_db_test.exs index d1c1ab3fa..d1709b719 100644 --- a/test/unit/store/block_db_test.exs +++ b/test/unit/store/block_db_test.exs @@ -1,5 +1,6 @@ defmodule Unit.Store.BlockDbTest do alias Fixtures.Block + alias LambdaEthereumConsensus.Store.BlockBySlot alias LambdaEthereumConsensus.Store.BlockDb use ExUnit.Case @@ -51,9 +52,6 @@ defmodule Unit.Store.BlockDbTest do @tag :tmp_dir test "Pruning on a non existent root returns and doesn't delete anything" do - # TODO: this might not work for empty blocks. We may need to delete any block with lower slot - # anyway, even if the slot doesn't match anything. - blocks = [block_1, block_2, block_3] = [Block.block_info(), Block.block_info(), Block.block_info()] @@ -68,5 +66,27 @@ defmodule Unit.Store.BlockDbTest do assert {:ok, block_3} == BlockDb.get_block_info(block_3.root) end - # TODO: test for empty slots. + @tag :tmp_dir + test "Empty blocks don't affect pruning" do + blocks = + [block_1, block_2, block_3] = + [Block.block_info(), Block.block_info(), Block.block_info()] + |> Enum.sort_by(& &1.signed_block.message.slot) + + Enum.each(blocks, &BlockDb.store_block_info/1) + + block_slots = Enum.map(blocks, & &1.signed_block.message.slot) + + min_slot = Enum.min(block_slots) - 1 + max_slot = Enum.max(block_slots) + 1 + BlockBySlot.put(max_slot, :empty_slot) + BlockBySlot.put(min_slot, :empty_slot) + + assert :ok == BlockDb.prune_blocks_older_than(max_slot) + assert :not_found == BlockDb.get_block_info(block_1.root) + assert :not_found == BlockDb.get_block_info(block_2.root) + assert :not_found == BlockDb.get_block_info(block_3.root) + assert {:ok, :empty_slot} == BlockBySlot.get(max_slot) + assert :not_found == BlockBySlot.get(min_slot) + end end From 05c95198661afe67c2875076429656d5eb24f0a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Mon, 15 Jul 2024 12:45:00 +0200 Subject: [PATCH 7/9] fix error handling in block db --- .../store/block_db.ex | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/lib/lambda_ethereum_consensus/store/block_db.ex b/lib/lambda_ethereum_consensus/store/block_db.ex index 31dd7a7fa..afc2e7d8b 100644 --- a/lib/lambda_ethereum_consensus/store/block_db.ex +++ b/lib/lambda_ethereum_consensus/store/block_db.ex @@ -87,26 +87,26 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do Logger.info("[BlockDb] Pruning started.", slot: slot) # TODO: the separate get operation is avoided if we implement folding with values in KvSchema. - n_removed = - BlockBySlot.fold_keys(slot, 0, fn slot, acc -> - case BlockBySlot.get(slot) do - {:ok, :empty_slot} -> - BlockBySlot.delete(slot) - acc + 1 - - {:ok, block_root} -> - BlockBySlot.delete(slot) - Db.delete(block_key(block_root)) - acc + 1 - - other -> - Logger.error( - "[Block pruning] Failed to remove block from slot #{inspect(slot)}. Reason: #{inspect(other)}" - ) - end - end) - - Logger.info("[BlockDb] Pruning finished. #{n_removed} blocks removed.") + case BlockBySlot.fold_keys(slot, 0, fn slot, acc -> + case BlockBySlot.get(slot) do + {:ok, :empty_slot} -> + BlockBySlot.delete(slot) + acc + 1 + + {:ok, block_root} -> + BlockBySlot.delete(slot) + Db.delete(block_key(block_root)) + acc + 1 + + other -> + Logger.error( + "[Block pruning] Failed to remove block from slot #{inspect(slot)}. Reason: #{inspect(other)}" + ) + end + end) do + {:ok, n_removed} -> Logger.info("[BlockDb] Pruning finished. #{n_removed} blocks removed.") + {:error, reason} -> Logger.error("[BlockDb] Error pruning blocks: #{inspect(reason)}") + end end defp block_key(root), do: Utils.get_key(@block_prefix, root) From e1656bbc4e9c76c6c4fd6cde5e864ba2072f6158 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Mon, 15 Jul 2024 15:51:20 +0200 Subject: [PATCH 8/9] fix linter --- .../store/block_db.ex | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/lib/lambda_ethereum_consensus/store/block_db.ex b/lib/lambda_ethereum_consensus/store/block_db.ex index afc2e7d8b..a498188cd 100644 --- a/lib/lambda_ethereum_consensus/store/block_db.ex +++ b/lib/lambda_ethereum_consensus/store/block_db.ex @@ -86,24 +86,27 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do def prune_blocks_older_than(slot) do Logger.info("[BlockDb] Pruning started.", slot: slot) + result = + BlockBySlot.fold_keys(slot, 0, fn slot, acc -> + case BlockBySlot.get(slot) do + {:ok, :empty_slot} -> + BlockBySlot.delete(slot) + acc + 1 + + {:ok, block_root} -> + BlockBySlot.delete(slot) + Db.delete(block_key(block_root)) + acc + 1 + + other -> + Logger.error( + "[Block pruning] Failed to remove block from slot #{inspect(slot)}. Reason: #{inspect(other)}" + ) + end + end) + # TODO: the separate get operation is avoided if we implement folding with values in KvSchema. - case BlockBySlot.fold_keys(slot, 0, fn slot, acc -> - case BlockBySlot.get(slot) do - {:ok, :empty_slot} -> - BlockBySlot.delete(slot) - acc + 1 - - {:ok, block_root} -> - BlockBySlot.delete(slot) - Db.delete(block_key(block_root)) - acc + 1 - - other -> - Logger.error( - "[Block pruning] Failed to remove block from slot #{inspect(slot)}. Reason: #{inspect(other)}" - ) - end - end) do + case result do {:ok, n_removed} -> Logger.info("[BlockDb] Pruning finished. #{n_removed} blocks removed.") {:error, reason} -> Logger.error("[BlockDb] Error pruning blocks: #{inspect(reason)}") end From d0eb1493e5610976e24e1629d039f9c83212a1a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Mon, 15 Jul 2024 16:09:44 +0200 Subject: [PATCH 9/9] fix dialyzer by adding a Types.Guards module --- lib/beacon_api/helpers.ex | 10 +++++++--- lib/types/types.ex | 8 ++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/lib/beacon_api/helpers.ex b/lib/beacon_api/helpers.ex index 56ab7bad6..fc5511bc5 100644 --- a/lib/beacon_api/helpers.ex +++ b/lib/beacon_api/helpers.ex @@ -8,10 +8,11 @@ defmodule BeaconApi.Helpers do alias LambdaEthereumConsensus.Store.BlockDb alias LambdaEthereumConsensus.Store.Blocks alias LambdaEthereumConsensus.Store.StateDb - alias Types.StateInfo - alias Types.BeaconState alias Types.SignedBeaconBlock + alias Types.StateInfo + + import Types.Guards @type named_root() :: :genesis | :justified | :finalized | :head @type block_id() :: named_root() | :invalid_id | Types.slot() | Types.root() @@ -79,11 +80,14 @@ defmodule BeaconApi.Helpers do def block_root_by_block_id(slot) when is_integer(slot) do with :ok <- check_valid_slot(slot, ForkChoice.get_current_chain_slot()), - {:ok, root} <- BlockBySlot.get(slot) do + {:ok, root} when is_root(root) <- BlockBySlot.get(slot) do # TODO compute is_optimistic_or_invalid() and is_finalized() execution_optimistic = true finalized = false {:ok, {root, execution_optimistic, finalized}} + else + {:ok, :empty_slot} -> :empty_slot + other -> other end end diff --git a/lib/types/types.ex b/lib/types/types.ex index dc7df1cf2..0d8ee00dd 100644 --- a/lib/types/types.ex +++ b/lib/types/types.ex @@ -48,4 +48,12 @@ defmodule Types do @type kzg_proof :: Kzg.proof() @type bls_signature :: Bls.signature() @type bls_pubkey :: Bls.pubkey() + + defmodule Guards do + @moduledoc """ + Module defining guards for some types. Added as needed. + """ + + defguard is_root(binary) when is_binary(binary) and byte_size(binary) == 32 + end end