Skip to content

Commit e17070b

Browse files
authored
refactor: make BlockBySlot its own KvSchema (#1217)
1 parent fb52407 commit e17070b

File tree

12 files changed

+323
-89
lines changed

12 files changed

+323
-89
lines changed

lib/beacon_api/controllers/v1/beacon_controller.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ defmodule BeaconApi.V1.BeaconController do
55
alias BeaconApi.ErrorController
66
alias BeaconApi.Helpers
77
alias BeaconApi.Utils
8-
alias LambdaEthereumConsensus.Store.BlockDb
8+
alias LambdaEthereumConsensus.Store.BlockBySlot
99
alias LambdaEthereumConsensus.Store.Blocks
1010
alias LambdaEthereumConsensus.Store.StoreDb
1111

@@ -94,7 +94,7 @@ defmodule BeaconApi.V1.BeaconController do
9494

9595
def get_block_root(conn, %{block_id: block_id}) do
9696
with {slot, ""} when slot >= 0 <- Integer.parse(block_id),
97-
{:ok, block_root} <- BlockDb.get_block_root_by_slot(slot) do
97+
{:ok, block_root} <- BlockBySlot.get(slot) do
9898
conn |> root_response(block_root, true, false)
9999
else
100100
:not_found ->

lib/beacon_api/helpers.ex

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ defmodule BeaconApi.Helpers do
44
"""
55

66
alias LambdaEthereumConsensus.ForkChoice
7+
alias LambdaEthereumConsensus.Store.BlockBySlot
78
alias LambdaEthereumConsensus.Store.BlockDb
89
alias LambdaEthereumConsensus.Store.Blocks
910
alias LambdaEthereumConsensus.Store.StateDb
10-
alias Types.StateInfo
11-
1211
alias Types.BeaconState
1312
alias Types.SignedBeaconBlock
13+
alias Types.StateInfo
14+
15+
import Types.Guards
1416

1517
@type named_root() :: :genesis | :justified | :finalized | :head
1618
@type block_id() :: named_root() | :invalid_id | Types.slot() | Types.root()
@@ -78,11 +80,14 @@ defmodule BeaconApi.Helpers do
7880

7981
def block_root_by_block_id(slot) when is_integer(slot) do
8082
with :ok <- check_valid_slot(slot, ForkChoice.get_current_chain_slot()),
81-
{:ok, root} <- BlockDb.get_block_root_by_slot(slot) do
83+
{:ok, root} when is_root(root) <- BlockBySlot.get(slot) do
8284
# TODO compute is_optimistic_or_invalid() and is_finalized()
8385
execution_optimistic = true
8486
finalized = false
8587
{:ok, {root, execution_optimistic, finalized}}
88+
else
89+
{:ok, :empty_slot} -> :empty_slot
90+
other -> other
8691
end
8792
end
8893

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
defmodule LambdaEthereumConsensus.Store.BlockBySlot do
2+
@moduledoc """
3+
KvSchema that stores block roots indexed by slot. As we store blocks by their root, this module
4+
acts as an index if we need to look for them using their root. Some use cases are block pruning
5+
(removing all blocks prior to a slot) or checking if a range of slots contain blocks, for
6+
checkpoint sync checks.
7+
"""
8+
9+
alias LambdaEthereumConsensus.Store.KvSchema
10+
use KvSchema, prefix: "blockSlot"
11+
@type value_t :: Types.root() | :empty_slot
12+
13+
################################
14+
### PUBLIC API
15+
################################
16+
17+
@doc """
18+
Checks if all the blocks between first_slot and last_slot are present in the db.
19+
This iterates through the db checking each one individually, although it only checks
20+
the keys, so it doesn't need to decode the values, making it a relatively cheap
21+
linear O(last_slot - first_slot) operation.
22+
"""
23+
@spec all_present?(Types.slot(), Types.slot()) :: boolean()
24+
def all_present?(first_slot, last_slot) do
25+
fold_keys(last_slot, MapSet.new(), fn slot, set -> MapSet.put(set, slot) end,
26+
include_first: true
27+
)
28+
|> case do
29+
{:ok, available} ->
30+
Enum.all?(first_slot..last_slot, fn slot -> slot in available end)
31+
32+
{:error, :invalid_iterator} ->
33+
false
34+
35+
{:error, "Failed to start iterator for table" <> _} ->
36+
false
37+
end
38+
end
39+
40+
################################
41+
### Schema implementation
42+
################################
43+
44+
@impl KvSchema
45+
@spec encode_key(Types.slot()) :: {:ok, binary()} | {:error, binary()}
46+
def encode_key(slot), do: {:ok, <<slot::64>>}
47+
48+
@impl KvSchema
49+
@spec decode_key(binary()) :: {:ok, integer()} | {:error, binary()}
50+
def decode_key(<<slot::64>>), do: {:ok, slot}
51+
52+
def decode_key(other) do
53+
{:error, "[Block by slot] Could not decode slot, not 64 bit integer: #{other}"}
54+
end
55+
56+
@impl KvSchema
57+
@spec encode_value(value_t()) :: {:ok, value_t()} | {:error, binary()}
58+
def encode_value(:empty_slot), do: {:ok, <<>>}
59+
def encode_value(<<_::256>> = root), do: {:ok, root}
60+
61+
@impl KvSchema
62+
@spec decode_value(value_t()) :: {:ok, value_t()} | {:error, binary()}
63+
def decode_value(<<>>), do: {:ok, :empty_slot}
64+
def decode_value(<<_::256>> = root), do: {:ok, root}
65+
end

lib/lambda_ethereum_consensus/store/block_db.ex

Lines changed: 30 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do
33
Storage and retrieval of blocks.
44
"""
55
require Logger
6+
alias LambdaEthereumConsensus.Store.BlockBySlot
67
alias LambdaEthereumConsensus.Store.Db
78
alias LambdaEthereumConsensus.Store.Utils
89
alias Types.BlockInfo
910

1011
@block_prefix "blockHash"
11-
@blockslot_prefix "blockSlot"
1212
@block_status_prefix "blockStatus"
1313

1414
@spec store_block_info(BlockInfo.t()) :: :ok
@@ -22,8 +22,7 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do
2222
# TODO: this should apply fork-choice if not applied elsewhere
2323
# TODO: handle cases where slot is empty
2424
if not is_nil(block_info.signed_block) do
25-
slothash_key = block_root_by_slot_key(block_info.signed_block.message.slot)
26-
Db.put(slothash_key, block_info.root)
25+
BlockBySlot.put(block_info.signed_block.message.slot, block_info.root)
2726
end
2827
end
2928

@@ -35,24 +34,15 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do
3534
end
3635
end
3736

38-
@spec get_block_root_by_slot(Types.slot()) ::
39-
{:ok, Types.root()} | {:error, String.t()} | :not_found | :empty_slot
40-
def get_block_root_by_slot(slot) do
41-
key = block_root_by_slot_key(slot)
42-
block = Db.get(key)
43-
44-
case block do
45-
{:ok, <<>>} -> :empty_slot
46-
_ -> block
47-
end
48-
end
49-
5037
@spec get_block_info_by_slot(Types.slot()) ::
5138
{:ok, BlockInfo.t()} | {:error, String.t()} | :not_found | :empty_slot
5239
def get_block_info_by_slot(slot) do
5340
# WARN: this will return the latest block received for the given slot
54-
with {:ok, root} <- get_block_root_by_slot(slot) do
55-
get_block_info(root)
41+
# TODO: Are we actually saving empty slots in this index?
42+
case BlockBySlot.get(slot) do
43+
{:ok, :empty_slot} -> :empty_slot
44+
{:ok, root} -> get_block_info(root)
45+
other -> other
5646
end
5747
end
5848

@@ -95,58 +85,33 @@ defmodule LambdaEthereumConsensus.Store.BlockDb do
9585
@spec prune_blocks_older_than(non_neg_integer()) :: :ok | {:error, String.t()} | :not_found
9686
def prune_blocks_older_than(slot) do
9787
Logger.info("[BlockDb] Pruning started.", slot: slot)
98-
initial_key = slot |> block_root_by_slot_key()
99-
100-
slots_to_remove =
101-
Stream.resource(
102-
fn -> init_keycursor(initial_key) end,
103-
&next_slot(&1, :prev),
104-
&close_cursor/1
105-
)
106-
|> Enum.to_list()
107-
108-
slots_to_remove |> Enum.each(&remove_block_by_slot/1)
109-
Logger.info("[BlockDb] Pruning finished. #{Enum.count(slots_to_remove)} blocks removed.")
110-
end
111-
112-
@spec remove_block_by_slot(non_neg_integer()) :: :ok | :not_found
113-
defp remove_block_by_slot(slot) do
114-
slothash_key = block_root_by_slot_key(slot)
115-
116-
with {:ok, block_root} <- Db.get(slothash_key) do
117-
key_block = block_key(block_root)
118-
Db.delete(slothash_key)
119-
Db.delete(key_block)
120-
end
121-
end
122-
123-
defp init_keycursor(initial_key) do
124-
with {:ok, it} <- Db.iterate_keys(),
125-
{:ok, _key} <- Exleveldb.iterator_move(it, initial_key) do
126-
it
127-
else
128-
# DB is empty
129-
{:error, :invalid_iterator} -> nil
130-
end
131-
end
13288

133-
defp next_slot(nil, _movement), do: {:halt, nil}
134-
135-
defp next_slot(it, movement) do
136-
case Exleveldb.iterator_move(it, movement) do
137-
{:ok, @blockslot_prefix <> <<key::64>>} ->
138-
{[key], it}
139-
140-
_ ->
141-
{:halt, it}
89+
result =
90+
BlockBySlot.fold_keys(slot, 0, fn slot, acc ->
91+
case BlockBySlot.get(slot) do
92+
{:ok, :empty_slot} ->
93+
BlockBySlot.delete(slot)
94+
acc + 1
95+
96+
{:ok, block_root} ->
97+
BlockBySlot.delete(slot)
98+
Db.delete(block_key(block_root))
99+
acc + 1
100+
101+
other ->
102+
Logger.error(
103+
"[Block pruning] Failed to remove block from slot #{inspect(slot)}. Reason: #{inspect(other)}"
104+
)
105+
end
106+
end)
107+
108+
# TODO: the separate get operation is avoided if we implement folding with values in KvSchema.
109+
case result do
110+
{:ok, n_removed} -> Logger.info("[BlockDb] Pruning finished. #{n_removed} blocks removed.")
111+
{:error, reason} -> Logger.error("[BlockDb] Error pruning blocks: #{inspect(reason)}")
142112
end
143113
end
144114

145-
defp close_cursor(nil), do: :ok
146-
defp close_cursor(it), do: :ok = Exleveldb.iterator_close(it)
147-
148115
defp block_key(root), do: Utils.get_key(@block_prefix, root)
149-
defp block_root_by_slot_key(slot), do: Utils.get_key(@blockslot_prefix, slot)
150-
151116
defp block_status_key(status), do: Utils.get_key(@block_status_prefix, Atom.to_string(status))
152117
end

lib/lambda_ethereum_consensus/store/checkpoint_states_db.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ defmodule LambdaEthereumConsensus.Store.CheckpointStates do
7676
def prune(finalized_checkpoint) do
7777
Logger.debug("Pruning old checkpoint states")
7878

79-
case fold(finalized_checkpoint, 0, fn key, acc ->
79+
case fold_keys(finalized_checkpoint, 0, fn key, acc ->
8080
delete(key)
8181
acc + 1
8282
end) do

lib/lambda_ethereum_consensus/store/kv_schema.ex

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,21 @@ defmodule LambdaEthereumConsensus.Store.KvSchema do
5656
end)
5757
end
5858

59-
@spec fold(key(), acc(), (key(), acc() -> acc())) :: {:ok, acc()} | {:error, binary()}
60-
def fold(start_key, starting_value, f) do
61-
db_span("fold", fn ->
62-
with {:ok, it} <- Db.iterate(),
59+
@spec fold_keys(key(), acc(), (key(), acc() -> acc())) :: {:ok, acc()} | {:error, any()}
60+
def fold_keys(start_key, starting_value, f, opts \\ []) do
61+
db_span("fold_keys", fn ->
62+
include_first? = Keyword.get(opts, :include_first, false)
63+
direction = Keyword.get(opts, :direction, :prev)
64+
65+
with {:ok, it} <- Db.iterate_keys(),
6366
{:ok, encoded_start} <- do_encode_key(start_key),
64-
{:ok, ^encoded_start, _} <- Exleveldb.iterator_move(it, encoded_start) do
65-
res = iterate(it, starting_value, f)
67+
{:ok, ^encoded_start} <- Exleveldb.iterator_move(it, encoded_start) do
68+
res = iterate(it, starting_value, f, direction, encoded_start, include_first?)
6669
Exleveldb.iterator_close(it)
6770
{:ok, res}
6871
else
69-
# Failed at moving the iterator for the first time.
70-
{:ok, some_key, _some_value} ->
72+
# The iterator moved for the first time to a place where it wasn't expected.
73+
{:ok, some_key} ->
7174
{:error,
7275
"Failed to start iterator for table #{@prefix}. The obtained key is: #{some_key}"}
7376

@@ -77,16 +80,32 @@ defmodule LambdaEthereumConsensus.Store.KvSchema do
7780
end)
7881
end
7982

80-
defp iterate(it, acc, f) do
81-
case Exleveldb.iterator_move(it, :prev) do
82-
# TODO: add option to get the value in the function too if needed.
83-
{:ok, @prefix <> _ = k, v} ->
84-
# TODO: plan for weird corner cases where the key can't be decoded.
83+
defp iterate(it, acc, f, direction, _first_key, false) do
84+
iterate(it, acc, f, direction)
85+
end
86+
87+
defp iterate(it, acc, f, direction, first_key, true) do
88+
case accumulate(it, acc, f, first_key) do
89+
{:cont, new_acc} -> iterate(it, new_acc, f, direction)
90+
{:halt, new_acc} -> new_acc
91+
end
92+
end
93+
94+
defp iterate(it, acc, f, direction) do
95+
case accumulate(it, acc, f, direction) do
96+
{:cont, acc} -> iterate(it, acc, f, direction)
97+
{:halt, acc} -> acc
98+
end
99+
end
100+
101+
defp accumulate(it, acc, f, direction) do
102+
case Exleveldb.iterator_move(it, direction) do
103+
{:ok, @prefix <> _ = k} ->
85104
{:ok, decoded_key} = do_decode_key(k)
86-
iterate(it, f.(decoded_key, acc), f)
105+
{:cont, f.(decoded_key, acc)}
87106

88107
_ ->
89-
acc
108+
{:halt, acc}
90109
end
91110
end
92111

lib/types/types.ex

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,12 @@ defmodule Types do
4848
@type kzg_proof :: Kzg.proof()
4949
@type bls_signature :: Bls.signature()
5050
@type bls_pubkey :: Bls.pubkey()
51+
52+
defmodule Guards do
53+
@moduledoc """
54+
Module defining guards for some types. Added as needed.
55+
"""
56+
57+
defguard is_root(binary) when is_binary(binary) and byte_size(binary) == 32
58+
end
5159
end

test/fixtures/block.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ defmodule Fixtures.Block do
55

66
alias Fixtures.Random
77
alias LambdaEthereumConsensus.Utils.BitVector
8+
alias Types.BlockInfo
89

910
alias Types.BeaconBlock
1011
alias Types.BeaconBlockBody
@@ -13,6 +14,11 @@ defmodule Fixtures.Block do
1314
alias Types.ExecutionPayloadHeader
1415
alias Types.SignedBeaconBlock
1516

17+
@spec block_info :: BlockInfo.t()
18+
def block_info() do
19+
signed_beacon_block() |> BlockInfo.from_block()
20+
end
21+
1622
@spec signed_beacon_block :: SignedBeaconBlock.t()
1723
def signed_beacon_block() do
1824
%SignedBeaconBlock{

test/fixtures/utils.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ defmodule Fixtures.Random do
1818
binary(32)
1919
end
2020

21+
@spec slot() :: Types.slot()
22+
def slot(), do: uint64()
23+
2124
@spec bls_signature :: binary
2225
def bls_signature() do
2326
binary(96)

0 commit comments

Comments
 (0)