Skip to content

refactor: make BlockBySlot its own KvSchema #1217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/beacon_api/controllers/v1/beacon_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule BeaconApi.V1.BeaconController do
alias BeaconApi.ErrorController
alias BeaconApi.Helpers
alias BeaconApi.Utils
alias LambdaEthereumConsensus.Store.BlockDb
alias LambdaEthereumConsensus.Store.BlockBySlot
alias LambdaEthereumConsensus.Store.Blocks
alias LambdaEthereumConsensus.Store.StoreDb

Expand Down Expand Up @@ -94,7 +94,7 @@ defmodule BeaconApi.V1.BeaconController do

def get_block_root(conn, %{block_id: block_id}) do
with {slot, ""} when slot >= 0 <- Integer.parse(block_id),
{:ok, block_root} <- BlockDb.get_block_root_by_slot(slot) do
{:ok, block_root} <- BlockBySlot.get(slot) do
conn |> root_response(block_root, true, false)
else
:not_found ->
Expand Down
11 changes: 8 additions & 3 deletions lib/beacon_api/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ defmodule BeaconApi.Helpers do
"""

alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.Store.BlockBySlot
alias LambdaEthereumConsensus.Store.BlockDb
alias LambdaEthereumConsensus.Store.Blocks
alias LambdaEthereumConsensus.Store.StateDb
alias Types.StateInfo

alias Types.BeaconState
alias Types.SignedBeaconBlock
alias Types.StateInfo

import Types.Guards

@type named_root() :: :genesis | :justified | :finalized | :head
@type block_id() :: named_root() | :invalid_id | Types.slot() | Types.root()
Expand Down Expand Up @@ -78,11 +80,14 @@ defmodule BeaconApi.Helpers do

def block_root_by_block_id(slot) when is_integer(slot) do
with :ok <- check_valid_slot(slot, ForkChoice.get_current_chain_slot()),
{:ok, root} <- BlockDb.get_block_root_by_slot(slot) do
{:ok, root} when is_root(root) <- BlockBySlot.get(slot) do
# TODO compute is_optimistic_or_invalid() and is_finalized()
execution_optimistic = true
finalized = false
{:ok, {root, execution_optimistic, finalized}}
else
{:ok, :empty_slot} -> :empty_slot
other -> other
end
end

Expand Down
65 changes: 65 additions & 0 deletions lib/lambda_ethereum_consensus/store/block_by_slot.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
defmodule LambdaEthereumConsensus.Store.BlockBySlot do
@moduledoc """
KvSchema that stores block roots indexed by slot. As we store blocks by their root, this module
acts as an index if we need to look for them using their root. Some use cases are block pruning
(removing all blocks prior to a slot) or checking if a range of slots contain blocks, for
checkpoint sync checks.
"""

alias LambdaEthereumConsensus.Store.KvSchema
use KvSchema, prefix: "blockSlot"
@type value_t :: Types.root() | :empty_slot

################################
### PUBLIC API
################################

@doc """
Checks if all the blocks between first_slot and last_slot are present in the db.
This iterates through the db checking each one individually, although it only checks
the keys, so it doesn't need to decode the values, making it a relatively cheap
linear O(last_slot - first_slot) operation.
"""
@spec all_present?(Types.slot(), Types.slot()) :: boolean()
def all_present?(first_slot, last_slot) do
fold_keys(last_slot, MapSet.new(), fn slot, set -> MapSet.put(set, slot) end,
include_first: true
)
|> case do
{:ok, available} ->
Enum.all?(first_slot..last_slot, fn slot -> slot in available end)

{:error, :invalid_iterator} ->
false

{:error, "Failed to start iterator for table" <> _} ->
false
end
end

################################
### Schema implementation
################################

@impl KvSchema
@spec encode_key(Types.slot()) :: {:ok, binary()} | {:error, binary()}
def encode_key(slot), do: {:ok, <<slot::64>>}

@impl KvSchema
@spec decode_key(binary()) :: {:ok, integer()} | {:error, binary()}
def decode_key(<<slot::64>>), do: {:ok, slot}

def decode_key(other) do
{:error, "[Block by slot] Could not decode slot, not 64 bit integer: #{other}"}
end

@impl KvSchema
@spec encode_value(value_t()) :: {:ok, value_t()} | {:error, binary()}
def encode_value(:empty_slot), do: {:ok, <<>>}
def encode_value(<<_::256>> = root), do: {:ok, root}

@impl KvSchema
@spec decode_value(value_t()) :: {:ok, value_t()} | {:error, binary()}
def decode_value(<<>>), do: {:ok, :empty_slot}
def decode_value(<<_::256>> = root), do: {:ok, root}
end
95 changes: 30 additions & 65 deletions lib/lambda_ethereum_consensus/store/block_db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do
Storage and retrieval of blocks.
"""
require Logger
alias LambdaEthereumConsensus.Store.BlockBySlot
alias LambdaEthereumConsensus.Store.Db
alias LambdaEthereumConsensus.Store.Utils
alias Types.BlockInfo

@block_prefix "blockHash"
@blockslot_prefix "blockSlot"
@block_status_prefix "blockStatus"

@spec store_block_info(BlockInfo.t()) :: :ok
Expand All @@ -22,8 +22,7 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do
# TODO: this should apply fork-choice if not applied elsewhere
# TODO: handle cases where slot is empty
if not is_nil(block_info.signed_block) do
slothash_key = block_root_by_slot_key(block_info.signed_block.message.slot)
Db.put(slothash_key, block_info.root)
BlockBySlot.put(block_info.signed_block.message.slot, block_info.root)
end
end

Expand All @@ -35,24 +34,15 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do
end
end

@spec get_block_root_by_slot(Types.slot()) ::
{:ok, Types.root()} | {:error, String.t()} | :not_found | :empty_slot
def get_block_root_by_slot(slot) do
key = block_root_by_slot_key(slot)
block = Db.get(key)

case block do
{:ok, <<>>} -> :empty_slot
_ -> block
end
end

@spec get_block_info_by_slot(Types.slot()) ::
{:ok, BlockInfo.t()} | {:error, String.t()} | :not_found | :empty_slot
def get_block_info_by_slot(slot) do
# WARN: this will return the latest block received for the given slot
with {:ok, root} <- get_block_root_by_slot(slot) do
get_block_info(root)
# TODO: Are we actually saving empty slots in this index?
case BlockBySlot.get(slot) do
{:ok, :empty_slot} -> :empty_slot
{:ok, root} -> get_block_info(root)
other -> other
end
end

Expand Down Expand Up @@ -95,58 +85,33 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do
@spec prune_blocks_older_than(non_neg_integer()) :: :ok | {:error, String.t()} | :not_found
def prune_blocks_older_than(slot) do
Logger.info("[BlockDb] Pruning started.", slot: slot)
initial_key = slot |> block_root_by_slot_key()

slots_to_remove =
Stream.resource(
fn -> init_keycursor(initial_key) end,
&next_slot(&1, :prev),
&close_cursor/1
)
|> Enum.to_list()

slots_to_remove |> Enum.each(&remove_block_by_slot/1)
Logger.info("[BlockDb] Pruning finished. #{Enum.count(slots_to_remove)} blocks removed.")
end

@spec remove_block_by_slot(non_neg_integer()) :: :ok | :not_found
defp remove_block_by_slot(slot) do
slothash_key = block_root_by_slot_key(slot)

with {:ok, block_root} <- Db.get(slothash_key) do
key_block = block_key(block_root)
Db.delete(slothash_key)
Db.delete(key_block)
end
end

defp init_keycursor(initial_key) do
with {:ok, it} <- Db.iterate_keys(),
{:ok, _key} <- Exleveldb.iterator_move(it, initial_key) do
it
else
# DB is empty
{:error, :invalid_iterator} -> nil
end
end

defp next_slot(nil, _movement), do: {:halt, nil}

defp next_slot(it, movement) do
case Exleveldb.iterator_move(it, movement) do
{:ok, @blockslot_prefix <> <<key::64>>} ->
{[key], it}

_ ->
{:halt, it}
result =
BlockBySlot.fold_keys(slot, 0, fn slot, acc ->
case BlockBySlot.get(slot) do
{:ok, :empty_slot} ->
BlockBySlot.delete(slot)
acc + 1

{:ok, block_root} ->
BlockBySlot.delete(slot)
Db.delete(block_key(block_root))
acc + 1

other ->
Logger.error(
"[Block pruning] Failed to remove block from slot #{inspect(slot)}. Reason: #{inspect(other)}"
)
end
end)

# TODO: the separate get operation is avoided if we implement folding with values in KvSchema.
case result do
{:ok, n_removed} -> Logger.info("[BlockDb] Pruning finished. #{n_removed} blocks removed.")
{:error, reason} -> Logger.error("[BlockDb] Error pruning blocks: #{inspect(reason)}")
end
end

defp close_cursor(nil), do: :ok
defp close_cursor(it), do: :ok = Exleveldb.iterator_close(it)

defp block_key(root), do: Utils.get_key(@block_prefix, root)
defp block_root_by_slot_key(slot), do: Utils.get_key(@blockslot_prefix, slot)

defp block_status_key(status), do: Utils.get_key(@block_status_prefix, Atom.to_string(status))
end
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ defmodule LambdaEthereumConsensus.Store.CheckpointStates do
def prune(finalized_checkpoint) do
Logger.debug("Pruning old checkpoint states")

case fold(finalized_checkpoint, 0, fn key, acc ->
case fold_keys(finalized_checkpoint, 0, fn key, acc ->
delete(key)
acc + 1
end) do
Expand Down
49 changes: 34 additions & 15 deletions lib/lambda_ethereum_consensus/store/kv_schema.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,21 @@ defmodule LambdaEthereumConsensus.Store.KvSchema do
end)
end

@spec fold(key(), acc(), (key(), acc() -> acc())) :: {:ok, acc()} | {:error, binary()}
def fold(start_key, starting_value, f) do
db_span("fold", fn ->
with {:ok, it} <- Db.iterate(),
@spec fold_keys(key(), acc(), (key(), acc() -> acc())) :: {:ok, acc()} | {:error, any()}
def fold_keys(start_key, starting_value, f, opts \\ []) do
db_span("fold_keys", fn ->
include_first? = Keyword.get(opts, :include_first, false)
direction = Keyword.get(opts, :direction, :prev)

with {:ok, it} <- Db.iterate_keys(),
{:ok, encoded_start} <- do_encode_key(start_key),
{:ok, ^encoded_start, _} <- Exleveldb.iterator_move(it, encoded_start) do
res = iterate(it, starting_value, f)
{:ok, ^encoded_start} <- Exleveldb.iterator_move(it, encoded_start) do
res = iterate(it, starting_value, f, direction, encoded_start, include_first?)
Exleveldb.iterator_close(it)
{:ok, res}
else
# Failed at moving the iterator for the first time.
{:ok, some_key, _some_value} ->
# The iterator moved for the first time to a place where it wasn't expected.
{:ok, some_key} ->
{:error,
"Failed to start iterator for table #{@prefix}. The obtained key is: #{some_key}"}

Expand All @@ -77,16 +80,32 @@ defmodule LambdaEthereumConsensus.Store.KvSchema do
end)
end

defp iterate(it, acc, f) do
case Exleveldb.iterator_move(it, :prev) do
# TODO: add option to get the value in the function too if needed.
{:ok, @prefix <> _ = k, v} ->
# TODO: plan for weird corner cases where the key can't be decoded.
defp iterate(it, acc, f, direction, _first_key, false) do
iterate(it, acc, f, direction)
end

defp iterate(it, acc, f, direction, first_key, true) do
case accumulate(it, acc, f, first_key) do
{:cont, new_acc} -> iterate(it, new_acc, f, direction)
{:halt, new_acc} -> new_acc
end
end

defp iterate(it, acc, f, direction) do
case accumulate(it, acc, f, direction) do
{:cont, acc} -> iterate(it, acc, f, direction)
{:halt, acc} -> acc
end
end

defp accumulate(it, acc, f, direction) do
case Exleveldb.iterator_move(it, direction) do
{:ok, @prefix <> _ = k} ->
{:ok, decoded_key} = do_decode_key(k)
iterate(it, f.(decoded_key, acc), f)
{:cont, f.(decoded_key, acc)}

_ ->
acc
{:halt, acc}
end
end

Expand Down
8 changes: 8 additions & 0 deletions lib/types/types.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,12 @@ defmodule Types do
@type kzg_proof :: Kzg.proof()
@type bls_signature :: Bls.signature()
@type bls_pubkey :: Bls.pubkey()

defmodule Guards do
@moduledoc """
Module defining guards for some types. Added as needed.
"""

defguard is_root(binary) when is_binary(binary) and byte_size(binary) == 32
end
end
6 changes: 6 additions & 0 deletions test/fixtures/block.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Fixtures.Block do

alias Fixtures.Random
alias LambdaEthereumConsensus.Utils.BitVector
alias Types.BlockInfo

alias Types.BeaconBlock
alias Types.BeaconBlockBody
Expand All @@ -13,6 +14,11 @@ defmodule Fixtures.Block do
alias Types.ExecutionPayloadHeader
alias Types.SignedBeaconBlock

@spec block_info :: BlockInfo.t()
def block_info() do
signed_beacon_block() |> BlockInfo.from_block()
end

@spec signed_beacon_block :: SignedBeaconBlock.t()
def signed_beacon_block() do
%SignedBeaconBlock{
Expand Down
3 changes: 3 additions & 0 deletions test/fixtures/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ defmodule Fixtures.Random do
binary(32)
end

@spec slot() :: Types.slot()
def slot(), do: uint64()

@spec bls_signature :: binary
def bls_signature() do
binary(96)
Expand Down
Loading
Loading