Skip to content

Commit e3d7327

Browse files
committed
ValidatorsManager Genserver removal
1 parent cb99b04 commit e3d7327

File tree

6 files changed

+89
-103
lines changed

6 files changed

+89
-103
lines changed

config/runtime.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ if keystore_pass_dir != nil and not File.dir?(keystore_pass_dir) do
171171
System.halt(2)
172172
end
173173

174-
config :lambda_ethereum_consensus, LambdaEthereumConsensus.Validator.ValidatorManager,
174+
config :lambda_ethereum_consensus, LambdaEthereumConsensus.Validator.Setup,
175175
keystore_dir: keystore_dir,
176176
keystore_pass_dir: keystore_pass_dir
177177

lib/lambda_ethereum_consensus/beacon/beacon_node.ex

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
88
alias LambdaEthereumConsensus.ForkChoice
99
alias LambdaEthereumConsensus.StateTransition.Cache
1010
alias LambdaEthereumConsensus.Store.BlockStates
11-
alias LambdaEthereumConsensus.Validator.ValidatorManager
11+
alias LambdaEthereumConsensus.Validator
1212
alias Types.BeaconState
1313

1414
def start_link(opts) do
@@ -24,43 +24,36 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
2424

2525
Cache.initialize_cache()
2626

27-
libp2p_args = get_libp2p_args()
28-
2927
time = :os.system_time(:second)
3028

3129
ForkChoice.init_store(store, time)
3230

33-
validator_manager =
34-
get_validator_manager(
35-
deposit_tree_snapshot,
36-
store.head_slot,
37-
store.head_root
38-
)
31+
init_execution_chain(deposit_tree_snapshot, store.head_root)
32+
33+
validators = Validator.Setup.init(store.head_slot, store.head_root)
34+
35+
libp2p_args = [genesis_time: store.genesis_time, validators: validators] ++ get_libp2p_args()
3936

4037
children =
4138
[
4239
{LambdaEthereumConsensus.Beacon.Ticker, [LambdaEthereumConsensus.Libp2pPort]},
43-
{LambdaEthereumConsensus.Libp2pPort, [{:genesis_time, store.genesis_time} | libp2p_args]},
40+
{LambdaEthereumConsensus.Libp2pPort, libp2p_args},
4441
{Task.Supervisor, name: PruneStatesSupervisor},
4542
{Task.Supervisor, name: PruneBlocksSupervisor},
4643
{Task.Supervisor, name: PruneBlobsSupervisor}
47-
] ++ validator_manager
44+
]
4845

4946
Supervisor.init(children, strategy: :one_for_all)
5047
end
5148

52-
defp get_validator_manager(nil, _, _) do
49+
defp init_execution_chain(nil, _) do
5350
Logger.warning("Deposit data not found. Validator will be disabled.")
5451
[]
5552
end
5653

57-
defp get_validator_manager(snapshot, slot, head_root) do
54+
defp init_execution_chain(snapshot, head_root) do
5855
%BeaconState{eth1_data_votes: votes} = BlockStates.get_state_info!(head_root).beacon_state
5956
LambdaEthereumConsensus.Execution.ExecutionChain.init(snapshot, votes)
60-
# TODO: move checkpoint sync outside and move this to application.ex
61-
[
62-
{ValidatorManager, {slot, head_root}}
63-
]
6457
end
6558

6659
defp get_libp2p_args() do

lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ defmodule LambdaEthereumConsensus.ForkChoice do
44
"""
55

66
require Logger
7-
8-
# alias LambdaEthereumConsensus.Beacon.Clock
97
alias LambdaEthereumConsensus.Execution.ExecutionChain
108
alias LambdaEthereumConsensus.ForkChoice.Handlers
119
alias LambdaEthereumConsensus.ForkChoice.Head
10+
alias LambdaEthereumConsensus.Libp2pPort
1211
alias LambdaEthereumConsensus.Metrics
1312
alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector
1413
alias LambdaEthereumConsensus.StateTransition.Misc
@@ -18,7 +17,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do
1817
alias LambdaEthereumConsensus.Store.CheckpointStates
1918
alias LambdaEthereumConsensus.Store.StateDb
2019
alias LambdaEthereumConsensus.Store.StoreDb
21-
alias LambdaEthereumConsensus.Validator.ValidatorManager
2220
alias Types.Attestation
2321
alias Types.BlockInfo
2422
alias Types.Checkpoint
@@ -271,7 +269,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
271269
store.finalized_checkpoint
272270
)
273271

274-
ValidatorManager.notify_new_block(slot, head_root)
272+
Libp2pPort.notify_new_block({slot, head_root})
275273

276274
Logger.info("[Fork choice] Updated fork choice cache", slot: slot)
277275

lib/lambda_ethereum_consensus/validator/validator_manager.ex

Lines changed: 10 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,13 @@
1-
defmodule LambdaEthereumConsensus.Validator.ValidatorManager do
1+
defmodule LambdaEthereumConsensus.Validator.Setup do
22
@moduledoc """
3-
Module that manage the validators state
3+
Module that setups the initial validators state
44
"""
5-
use GenServer
65

76
require Logger
8-
alias LambdaEthereumConsensus.Beacon.Clock
97
alias LambdaEthereumConsensus.Validator
108

11-
@spec start_link({Types.slot(), Types.root()}) :: :ignore | {:error, any} | {:ok, pid}
12-
def start_link({slot, head_root}) do
13-
GenServer.start_link(__MODULE__, {slot, head_root}, name: __MODULE__)
14-
end
15-
16-
@spec init({Types.slot(), Types.root()}) ::
17-
{:ok, %{Bls.pubkey() => Validator.state()}} | {:stop, any}
18-
def init({slot, head_root}) do
9+
@spec init(Types.slot(), Types.root()) :: %{Bls.pubkey() => Validator.state()}
10+
def init(slot, head_root) do
1911
config = Application.get_env(:lambda_ethereum_consensus, __MODULE__, [])
2012
keystore_dir = Keyword.get(config, :keystore_dir)
2113
keystore_pass_dir = Keyword.get(config, :keystore_pass_dir)
@@ -26,10 +18,10 @@ defmodule LambdaEthereumConsensus.Validator.ValidatorManager do
2618
defp setup_validators(_s, _r, keystore_dir, keystore_pass_dir)
2719
when is_nil(keystore_dir) or is_nil(keystore_pass_dir) do
2820
Logger.warning(
29-
"[Validator Manager] No keystore_dir or keystore_pass_dir provided. Validator will not start."
21+
"[Validator] No keystore_dir or keystore_pass_dir provided. Validator will not start."
3022
)
3123

32-
{:ok, []}
24+
%{}
3325
end
3426

3527
defp setup_validators(slot, head_root, keystore_dir, keystore_pass_dir) do
@@ -42,63 +34,11 @@ defmodule LambdaEthereumConsensus.Validator.ValidatorManager do
4234
end)
4335
|> Map.new()
4436

45-
Logger.info("[Validator Manager] Initialized #{Enum.count(validators)} validators")
46-
47-
{:ok, validators}
48-
end
49-
50-
@spec notify_new_block({Types.slot(), Types.root()}) :: :ok
51-
def notify_new_block({slot, head_root}) do
52-
# Making this alone a cast sometimes solves the issue for a while
53-
# GenServer.cast(__MODULE__, {:notify_all, {:new_block, slot, head_root}})
54-
notify_validators({:new_block, slot, head_root})
55-
end
56-
57-
@spec notify_tick(Clock.logical_time()) :: :ok
58-
def notify_tick(logical_time) do
59-
# Making this a cast alone doesn't solve the issue
60-
# GenServer.cast(__MODULE__, {:notify_all, {:on_tick, logical_time}})
61-
notify_validators({:on_tick, logical_time})
62-
end
63-
64-
# TODO: The use of a Genserver and cast is still needed to avoid locking at the clock level.
65-
# This is a temporary solution and will be taken off in a future PR.
66-
defp notify_validators(msg), do: GenServer.call(__MODULE__, {:notify_all, msg}, 20_000)
37+
Logger.info("[Validator] Initialized #{Enum.count(validators)} validators")
6738

68-
def handle_cast({:notify_all, msg}, validators) do
69-
validators = notify_all(validators, msg)
70-
71-
{:noreply, validators}
39+
validators
7240
end
7341

74-
def handle_call({:notify_all, msg}, _from, validators) do
75-
validators = notify_all(validators, msg)
76-
77-
{:reply, :ok, validators}
78-
end
79-
80-
defp notify_all(validators, msg) do
81-
start_time = System.monotonic_time(:millisecond)
82-
83-
Logger.info("[Validator Manager] Notifying all Validators with message: #{inspect(msg)}")
84-
85-
updated_validators = Enum.map(validators, &notify_validator(&1, msg))
86-
87-
end_time = System.monotonic_time(:millisecond)
88-
89-
Logger.debug(
90-
"[Validator Manager] #{inspect(msg)} notified to all Validators after #{end_time - start_time} ms"
91-
)
92-
93-
updated_validators
94-
end
95-
96-
defp notify_validator({pubkey, validator}, {:on_tick, logical_time}),
97-
do: {pubkey, Validator.handle_tick(logical_time, validator)}
98-
99-
defp notify_validator({pubkey, validator}, {:new_block, slot, head_root}),
100-
do: {pubkey, Validator.handle_new_block(slot, head_root, validator)}
101-
10242
@doc """
10343
Get validator keys from the keystore directory.
10444
This function expects two files for each validator:
@@ -119,7 +59,7 @@ defmodule LambdaEthereumConsensus.Validator.ValidatorManager do
11959

12060
{keystore_file, keystore_pass_file}
12161
else
122-
Logger.warning("[Validator Manager] Skipping file: #{filename}. Not a keystore file.")
62+
Logger.warning("[Validator] Skipping file: #{filename}. Not a keystore file.")
12363
nil
12464
end
12565
end)
@@ -131,7 +71,7 @@ defmodule LambdaEthereumConsensus.Validator.ValidatorManager do
13171
rescue
13272
error ->
13373
Logger.error(
134-
"[Validator Manager] Failed to decode keystore file: #{keystore_file}. Pass file: #{keystore_pass_file} Error: #{inspect(error)}"
74+
"[Validator] Failed to decode keystore file: #{keystore_file}. Pass file: #{keystore_pass_file} Error: #{inspect(error)}"
13575
)
13676

13777
nil

lib/libp2p_port.ex

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
99

1010
use GenServer
1111

12-
alias LambdaEthereumConsensus.Validator.ValidatorManager
1312
alias LambdaEthereumConsensus.Beacon.PendingBlocks
1413
alias LambdaEthereumConsensus.Beacon.SyncBlocks
1514
alias LambdaEthereumConsensus.ForkChoice
@@ -22,6 +21,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
2221
alias LambdaEthereumConsensus.P2p.Requests
2322
alias LambdaEthereumConsensus.StateTransition.Misc
2423
alias LambdaEthereumConsensus.Utils.BitVector
24+
alias LambdaEthereumConsensus.Validator
2525
alias Libp2pProto.AddPeer
2626
alias Libp2pProto.Command
2727
alias Libp2pProto.Enr
@@ -63,6 +63,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
6363

6464
@type init_arg ::
6565
{:genesis_time, Types.uint64()}
66+
| {:validators, %{}}
6667
| {:listen_addr, [String.t()]}
6768
| {:enable_discovery, boolean()}
6869
| {:discovery_addr, String.t()}
@@ -113,6 +114,14 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
113114
GenServer.cast(__MODULE__, {:on_tick, time})
114115
end
115116

117+
@spec notify_new_block({Types.slot(), Types.root()}) :: :ok
118+
def notify_new_block(data) do
119+
# TODO: This is quick workarround to notify the libp2p port about new blocks from within
120+
# the ForkChoice.recompute_head/1 without moving the validators to the store this
121+
# allows to deferr that move until we simplify the state and remove duplicates.
122+
send(self(), {:new_block, data})
123+
end
124+
116125
@doc """
117126
Retrieves identity info from the underlying LibP2P node.
118127
"""
@@ -354,6 +363,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
354363
@impl GenServer
355364
def init(args) do
356365
{genesis_time, args} = Keyword.pop!(args, :genesis_time)
366+
{validators, args} = Keyword.pop(args, :validators, %{})
357367
{join_init_topics, args} = Keyword.pop(args, :join_init_topics, false)
358368
{enable_request_handlers, args} = Keyword.pop(args, :enable_request_handlers, false)
359369

@@ -379,6 +389,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
379389
{:ok,
380390
%{
381391
genesis_time: genesis_time,
392+
validators: validators,
382393
slot_data: nil,
383394
port: port,
384395
subscribers: %{},
@@ -408,7 +419,10 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
408419
end
409420

410421
@impl GenServer
411-
def handle_cast({:on_tick, time}, %{genesis_time: genesis_time} = state) when time < genesis_time, do: {:noreply, state}
422+
def handle_cast({:on_tick, time}, %{genesis_time: genesis_time} = state)
423+
when time < genesis_time,
424+
do: {:noreply, state}
425+
412426
def handle_cast({:on_tick, time}, %{genesis_time: genesis_time, slot_data: slot_data} = state) do
413427
# TODO: we probably want to remove this from here, but we keep it here to have this serialized
414428
# with respect to the other fork choice store modifications.
@@ -417,13 +431,11 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
417431

418432
new_slot_data = compute_slot(genesis_time, time)
419433

420-
if slot_data != new_slot_data do
421-
ValidatorManager.notify_tick(slot_data)
422-
end
434+
updated_state = maybe_tick_validators(slot_data != new_slot_data, new_slot_data, state)
423435

424-
log_new_slot(new_slot_data)
436+
log_new_slot(slot_data, new_slot_data)
425437

426-
{:noreply, %{state | slot_data: new_slot_data}}
438+
{:noreply, updated_state}
427439
end
428440

429441
def handle_cast(
@@ -486,6 +498,13 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
486498
{:noreply, new_state}
487499
end
488500

501+
@impl GenServer
502+
def handle_info({:new_block, data}, %{validators: validators} = state) do
503+
updated_validators = notify_validators(validators, {:new_block, data})
504+
505+
{:noreply, %{state | validators: updated_validators}}
506+
end
507+
489508
@impl GenServer
490509
def handle_info({_port, {:data, data}}, state) do
491510
%Notification{n: {_, payload}} = Notification.decode(data)
@@ -707,6 +726,38 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
707726
end)
708727
end
709728

729+
# Validator related functions
730+
731+
defp maybe_tick_validators(false = _slot_data_changed, _slot_data, state), do: state
732+
733+
defp maybe_tick_validators(true, slot_data, %{validators: validators} = state) do
734+
updated_validators = notify_validators(validators, {:on_tick, slot_data})
735+
736+
%{state | slot_data: slot_data, validators: updated_validators}
737+
end
738+
739+
defp notify_validators(validators, msg) do
740+
start_time = System.monotonic_time(:millisecond)
741+
742+
Logger.info("[Libp2p] Notifying all Validators with message: #{inspect(msg)}")
743+
744+
updated_validators = Enum.map(validators, &notify_validator(&1, msg))
745+
746+
end_time = System.monotonic_time(:millisecond)
747+
748+
Logger.debug(
749+
"[Validator Manager] #{inspect(msg)} notified to all Validators after #{end_time - start_time} ms"
750+
)
751+
752+
updated_validators
753+
end
754+
755+
defp notify_validator({pubkey, validator}, {:on_tick, slot_data}),
756+
do: {pubkey, Validator.handle_tick(slot_data, validator)}
757+
758+
defp notify_validator({pubkey, validator}, {:new_block, {slot, head_root}}),
759+
do: {pubkey, Validator.handle_new_block(slot, head_root, validator)}
760+
710761
@spec compute_slot(Types.uint64(), Types.uint64()) :: slot_data()
711762
defp compute_slot(genesis_time, time) do
712763
# TODO: This was copied as it is from the Clock to convert it into just a Ticker,
@@ -726,12 +777,14 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
726777
{slot, slot_third}
727778
end
728779

729-
defp log_new_slot({slot, :first_third}) do
780+
defp log_new_slot({slot, _third}, {slot, _third}), do: :ok
781+
782+
defp log_new_slot({_prev_slot, _thrid}, {slot, :first_third}) do
730783
# TODO: as with the previous function, this was copied from the Clock module.
731-
# is use :sync, :store as the slot event, probably something to look into.
784+
# It use :sync, :store as the slot event, probably something to look into.
732785
:telemetry.execute([:sync, :store], %{slot: slot})
733786
Logger.info("[Libp2p] Slot transition", slot: slot)
734787
end
735788

736-
defp log_new_slot(_), do: :ok
789+
defp log_new_slot(_, _), do: :ok
737790
end

test/unit/libp2p_port_test.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ defmodule Unit.Libp2pPortTest do
1717
end
1818

1919
defp start_port(name \\ Libp2pPort, init_args \\ []) do
20-
start_link_supervised!({Libp2pPort, [opts: [name: name], genesis_time: 42] ++ init_args}, id: name)
20+
start_link_supervised!({Libp2pPort, [opts: [name: name], genesis_time: 42] ++ init_args},
21+
id: name
22+
)
2123
end
2224

2325
@tag :tmp_dir

0 commit comments

Comments
 (0)