Skip to content

Commit b8623ee

Browse files
committed
Make the clock a ticker
1 parent 3c5961f commit b8623ee

File tree

3 files changed

+70
-66
lines changed

3 files changed

+70
-66
lines changed

lib/lambda_ethereum_consensus/beacon/beacon_node.ex

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
2424

2525
Cache.initialize_cache()
2626

27-
libp2p_args = get_libp2p_args()
28-
2927
time = :os.system_time(:second)
3028

3129
ForkChoice.init_store(store, time)
3230

31+
libp2p_args = get_libp2p_args(store.genesis_time)
32+
3333
validator_manager =
3434
get_validator_manager(
3535
deposit_tree_snapshot,
@@ -39,8 +39,8 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
3939

4040
children =
4141
[
42-
{LambdaEthereumConsensus.Beacon.Clock, {store.genesis_time, time}},
4342
{LambdaEthereumConsensus.Libp2pPort, libp2p_args},
43+
{LambdaEthereumConsensus.Beacon.Ticker, [LambdaEthereumConsensus.Libp2pPort]},
4444
{Task.Supervisor, name: PruneStatesSupervisor},
4545
{Task.Supervisor, name: PruneBlocksSupervisor},
4646
{Task.Supervisor, name: PruneBlobsSupervisor}
@@ -63,7 +63,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
6363
]
6464
end
6565

66-
defp get_libp2p_args() do
66+
defp get_libp2p_args(genesis_time) do
6767
config = Application.fetch_env!(:lambda_ethereum_consensus, :libp2p)
6868
port = Keyword.fetch!(config, :port)
6969
bootnodes = Keyword.fetch!(config, :bootnodes)
@@ -75,6 +75,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
7575
end
7676

7777
[
78+
genesis_time: genesis_time,
7879
listen_addr: listen_addr,
7980
enable_discovery: true,
8081
discovery_addr: "0.0.0.0:#{port}",
Lines changed: 21 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,87 +1,52 @@
1-
defmodule LambdaEthereumConsensus.Beacon.Clock do
1+
defmodule LambdaEthereumConsensus.Beacon.Ticker do
22
@moduledoc false
33

44
use GenServer
55

6-
alias LambdaEthereumConsensus.Libp2pPort
7-
86
require Logger
97

10-
@type state :: %{
11-
genesis_time: Types.uint64(),
12-
time: Types.uint64()
13-
}
14-
15-
@spec start_link({Types.uint64(), Types.uint64()}) :: :ignore | {:error, any} | {:ok, pid}
8+
@spec start_link([atom()]) :: :ignore | {:error, any} | {:ok, pid}
169
def start_link(opts) do
1710
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
1811
end
1912

13+
@spec register_to_tick(atom() | [atom()]) :: :ok
14+
def register_to_tick(to_tick) when is_atom(to_tick), do: register_to_tick([to_tick])
15+
def register_to_tick(to_tick) when is_list(to_tick) do
16+
GenServer.cast(__MODULE__, {:register_to_tick, to_tick})
17+
end
18+
2019
##########################
2120
### GenServer Callbacks
2221
##########################
2322

24-
@impl GenServer
25-
@spec init({Types.uint64(), Types.uint64()}) ::
26-
{:ok, state()} | {:stop, any}
27-
def init({genesis_time, time}) do
23+
@impl true
24+
@spec init([atom()]) :: {:ok, [atom()]} | {:stop, any}
25+
def init(to_tick) when is_list(to_tick) do
2826
schedule_next_tick()
2927

30-
{:ok,
31-
%{
32-
genesis_time: genesis_time,
33-
time: time
34-
}}
28+
{:ok, to_tick}
3529
end
3630

3731
@impl true
38-
def handle_info(:on_tick, state) do
32+
def handle_cast({:register_to_tick, to_tick_additions}, to_tick) do
33+
new_to_tick = Enum.uniq(to_tick ++ to_tick_additions)
34+
{:noreply, new_to_tick}
35+
end
36+
37+
@impl true
38+
def handle_info(:on_tick, to_tick) do
3939
schedule_next_tick()
4040
time = :os.system_time(:second)
41-
new_state = %{state | time: time}
42-
43-
if time >= state.genesis_time do
44-
45-
# TODO: reduce time between ticks to account for gnosis' 5s slot time.
46-
old_logical_time = compute_logical_time(state)
47-
new_logical_time = compute_logical_time(new_state)
4841

49-
Libp2pPort.on_tick({time, new_logical_time, new_logical_time != old_logical_time})
50-
end
42+
Enum.each(to_tick, & &1.on_tick(time))
5143

52-
{:noreply, new_state}
44+
{:noreply, to_tick}
5345
end
5446

5547
def schedule_next_tick() do
5648
# For millisecond precision
5749
time_to_next_tick = 1000 - rem(:os.system_time(:millisecond), 1000)
5850
Process.send_after(__MODULE__, :on_tick, time_to_next_tick)
5951
end
60-
61-
@type slot_third :: :first_third | :second_third | :last_third
62-
@type logical_time :: {Types.slot(), slot_third()}
63-
64-
@spec compute_logical_time(state()) :: logical_time()
65-
defp compute_logical_time(state) do
66-
elapsed_time = state.time - state.genesis_time
67-
68-
slot_thirds = div(elapsed_time * 3, ChainSpec.get("SECONDS_PER_SLOT"))
69-
slot = div(slot_thirds, 3)
70-
71-
slot_third =
72-
case rem(slot_thirds, 3) do
73-
0 -> :first_third
74-
1 -> :second_third
75-
2 -> :last_third
76-
end
77-
78-
{slot, slot_third}
79-
end
80-
81-
defp log_new_slot({slot, :first_third}) do
82-
:telemetry.execute([:sync, :store], %{slot: slot})
83-
Logger.info("[Clock] Slot transition", slot: slot)
84-
end
85-
86-
defp log_new_slot(_), do: :ok
8752
end

lib/libp2p_port.ex

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,16 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
6262
]
6363

6464
@type init_arg ::
65-
{:listen_addr, [String.t()]}
65+
{:genesis_time, Types.uint64()}
66+
| {:listen_addr, [String.t()]}
6667
| {:enable_discovery, boolean()}
6768
| {:discovery_addr, String.t()}
6869
| {:bootnodes, [String.t()]}
6970
| {:join_init_topics, boolean()}
7071
| {:enable_request_handlers, boolean()}
7172

73+
@type slot_data() :: {Types.uint64(), :first_third | :second_third | :last_third}
74+
7275
@type node_identity() :: %{
7376
peer_id: binary(),
7477
# Pretty-printed version of the peer ID
@@ -350,6 +353,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
350353

351354
@impl GenServer
352355
def init(args) do
356+
{genesis_time, args} = Keyword.pop!(args, :genesis_time)
353357
{join_init_topics, args} = Keyword.pop(args, :join_init_topics, false)
354358
{enable_request_handlers, args} = Keyword.pop(args, :enable_request_handlers, false)
355359

@@ -374,6 +378,8 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
374378

375379
{:ok,
376380
%{
381+
genesis_time: genesis_time,
382+
slot_data: {0, :first_third},
377383
port: port,
378384
subscribers: %{},
379385
requests: Requests.new(),
@@ -393,18 +399,22 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
393399
end
394400

395401
@impl GenServer
396-
def handle_cast({:on_tick, {time, slot_data, changed_slot_data}}, state) do
402+
def handle_cast({:on_tick, time}, %{genesis_time: genesis_time} = state) when time <= genesis_time, do: {:noreply, state}
403+
def handle_cast({:on_tick, time}, %{genesis_time: genesis_time, slot_data: slot_data} = state) do
397404
# TODO: we probably want to remove this from here, but we keep it here to have this serialized
398405
# with respect to the other fork choice store modifications.
406+
399407
ForkChoice.on_tick(time)
400408

401-
# For testing that calling it from the libp2p works, and its just a matter of the notify new block,
402-
# not the clock being the one who calls the notify tick.
403-
if changed_slot_data do
409+
new_slot_data = compute_slot(genesis_time, time)
410+
411+
if slot_data != new_slot_data do
404412
ValidatorManager.notify_tick(slot_data)
405413
end
406414

407-
{:noreply, state}
415+
log_new_slot(new_slot_data)
416+
417+
{:noreply, %{state | slot_data: new_slot_data}}
408418
end
409419

410420
def handle_cast(
@@ -682,4 +692,32 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
682692
add_subscriber(state, topic, module)
683693
end)
684694
end
695+
696+
@spec compute_slot(Types.uint64(), Types.uint64()) :: slot_data()
697+
defp compute_slot(genesis_time, time) do
698+
# TODO: This was copied as it is from the Clock to convert it into just a Ticker,
699+
# slot calculations are spread across modules, we should probably centralize them.
700+
elapsed_time = time - genesis_time
701+
702+
slot_thirds = div(elapsed_time * 3, ChainSpec.get("SECONDS_PER_SLOT"))
703+
slot = div(slot_thirds, 3)
704+
705+
slot_third =
706+
case rem(slot_thirds, 3) do
707+
0 -> :first_third
708+
1 -> :second_third
709+
2 -> :last_third
710+
end
711+
712+
{slot, slot_third}
713+
end
714+
715+
defp log_new_slot({slot, :first_third}) do
716+
# TODO: as with the previous function, this was copied from the Clock module.
717+
# is use :sync, :store as the slot event, probably something to look into.
718+
:telemetry.execute([:sync, :store], %{slot: slot})
719+
Logger.info("[Libp2p] Slot transition", slot: slot)
720+
end
721+
722+
defp log_new_slot(_), do: :ok
685723
end

0 commit comments

Comments
 (0)