Skip to content

Commit 02693de

Browse files
rodrigo-oavilagaston9
authored andcommitted
refactor: validator's naive N GenServer and Supervisor removal (#1218)
1 parent d14f47a commit 02693de

File tree

4 files changed

+181
-136
lines changed

4 files changed

+181
-136
lines changed

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ kurtosis.setup.lambdaconsensus:
7777
kurtosis.start:
7878
kurtosis run --enclave lambdanet $(KURTOSIS_DIR) --args-file network_params.yaml
7979

80+
#💻 kurtosis.build-and-start: @ Builds the lambdaconsensus Docker image and starts the kurtosis environment.
81+
kurtosis.clean-start: kurtosis.clean kurtosis.setup.lambdaconsensus kurtosis.start
82+
8083
#💻 kurtosis.stop: @ Stops the kurtosis environment
8184
kurtosis.stop:
8285
kurtosis enclave stop lambdanet

lib/lambda_ethereum_consensus/validator/validator.ex

Lines changed: 113 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
defmodule LambdaEthereumConsensus.Validator do
22
@moduledoc """
3-
GenServer that performs validator duties.
3+
Module that performs validator duties.
44
"""
5-
use GenServer
65
require Logger
76

7+
defstruct [
8+
:slot,
9+
:root,
10+
:duties,
11+
:validator,
12+
:payload_builder
13+
]
14+
15+
alias LambdaEthereumConsensus.Beacon.Clock
816
alias LambdaEthereumConsensus.ForkChoice
917
alias LambdaEthereumConsensus.Libp2pPort
1018
alias LambdaEthereumConsensus.P2P.Gossip
@@ -23,40 +31,25 @@ defmodule LambdaEthereumConsensus.Validator do
2331

2432
@default_graffiti_message "Lambda, so gentle, so good"
2533

26-
##########################
27-
### Public API
28-
##########################
29-
30-
def start_link({_, _, {pubkey, _}} = opts) do
31-
# TODO: if possible, use validator index instead of pubkey?
32-
name =
33-
Atom.to_string(__MODULE__) <> LambdaEthereumConsensus.Utils.format_shorten_binary(pubkey)
34-
35-
GenServer.start_link(__MODULE__, opts, name: String.to_atom(name))
36-
end
37-
38-
def notify_new_block(slot, head_root),
39-
do: GenServer.cast(__MODULE__, {:new_block, slot, head_root})
40-
41-
##########################
42-
### GenServer Callbacks
43-
##########################
44-
45-
@type validator :: any()
34+
@type validator :: %{
35+
index: non_neg_integer() | nil,
36+
pubkey: Bls.pubkey(),
37+
privkey: Bls.privkey()
38+
}
4639

47-
@type state :: %{
40+
# TODO: Slot and Root are redundant, we should also have the duties separated and calculated
41+
# just at the begining of every epoch, and then just update them as needed.
42+
@type state :: %__MODULE__{
4843
slot: Types.slot(),
4944
root: Types.root(),
5045
duties: Duties.duties(),
5146
validator: validator(),
5247
payload_builder: {Types.slot(), Types.root(), BlockBuilder.payload_id()} | nil
5348
}
5449

55-
@impl true
56-
@spec init({Types.slot(), Types.root(), {Bls.pubkey(), Bls.privkey()}}) ::
57-
{:ok, state, {:continue, any}}
58-
def init({head_slot, head_root, {pubkey, privkey}}) do
59-
state = %{
50+
@spec new({Types.slot(), Types.root(), {Bls.pubkey(), Bls.privkey()}}) :: state
51+
def new({head_slot, head_root, {pubkey, privkey}}) do
52+
state = %__MODULE__{
6053
slot: head_slot,
6154
root: head_root,
6255
duties: Duties.empty_duties(),
@@ -68,19 +61,16 @@ defmodule LambdaEthereumConsensus.Validator do
6861
payload_builder: nil
6962
}
7063

71-
{:ok, state, {:continue, nil}}
72-
end
73-
74-
@impl true
75-
@spec handle_continue(any(), state) :: {:noreply, state}
76-
def handle_continue(_, %{slot: slot, root: root} = state) do
77-
case try_setup_validator(state, slot, root) do
64+
case try_setup_validator(state, head_slot, head_root) do
7865
nil ->
66+
# TODO: Previously this was handled by the validator continously trying to setup itself,
67+
# but now that they are processed syncronously, we should handle this case different.
68+
# Right now it's just omitted and logged.
7969
Logger.error("[Validator] Public key not found in the validator set")
80-
{:noreply, state}
70+
state
8171

8272
new_state ->
83-
{:noreply, new_state}
73+
new_state
8474
end
8575
end
8676

@@ -94,7 +84,7 @@ defmodule LambdaEthereumConsensus.Validator do
9484
nil
9585

9686
validator_index ->
97-
Logger.info("[Validator] Setup for validator number #{validator_index} complete")
87+
log_info(validator_index, "setup validator", slot: slot, root: root)
9888
validator = %{state.validator | index: validator_index}
9989
duties = Duties.maybe_update_duties(state.duties, beacon, epoch, validator)
10090
join_subnets_for_duties(duties)
@@ -103,49 +93,43 @@ defmodule LambdaEthereumConsensus.Validator do
10393
end
10494
end
10595

106-
@spec handle_cast(any, state) :: {:noreply, state}
107-
108-
@impl true
109-
def handle_cast(_, %{validator: nil} = state), do: {:noreply, state}
96+
@spec handle_new_block(Types.slot(), Types.root(), state) :: state
97+
def handle_new_block(slot, head_root, %{validator: %{index: nil}} = state) do
98+
log_error("-1", "setup validator", "index not present handle block",
99+
slot: slot,
100+
root: head_root
101+
)
110102

111-
# If we couldn't find the validator before, we just try again
112-
def handle_cast({:new_block, slot, head_root} = msg, %{validator: %{index: nil}} = state) do
113-
case try_setup_validator(state, slot, head_root) do
114-
nil -> {:noreply, state}
115-
new_state -> handle_cast(msg, new_state)
116-
end
103+
state
117104
end
118105

119-
def handle_cast({:new_block, slot, head_root}, state),
120-
do: {:noreply, handle_new_block(slot, head_root, state)}
121-
122-
def handle_cast({:on_tick, _}, %{validator: %{index: nil}} = state), do: {:noreply, state}
106+
def handle_new_block(slot, head_root, state) do
107+
log_debug(state.validator.index, "recieved new block", slot: slot, root: head_root)
123108

124-
def handle_cast({:on_tick, logical_time}, state),
125-
do: {:noreply, handle_tick(logical_time, state)}
126-
127-
##########################
128-
### Private Functions
129-
##########################
130-
131-
@spec handle_new_block(Types.slot(), Types.root(), state) :: state
132-
defp handle_new_block(slot, head_root, state) do
133109
# TODO: this doesn't take into account reorgs
134110
state
135111
|> update_state(slot, head_root)
136112
|> maybe_attest(slot)
137113
|> maybe_build_payload(slot + 1)
138114
end
139115

140-
defp handle_tick({slot, :first_third}, state) do
116+
@spec handle_tick(Clock.logical_time(), state) :: state
117+
def handle_tick(_logical_time, %{validator: %{index: nil}} = state) do
118+
log_error("-1", "setup validator", "index not present for handle tick")
119+
state
120+
end
121+
122+
def handle_tick({slot, :first_third}, state) do
123+
log_debug(state.validator.index, "started first third", slot: slot)
141124
# Here we may:
142125
# 1. propose our blocks
143126
# 2. (TODO) start collecting attestations for aggregation
144127
maybe_propose(state, slot)
145128
|> update_state(slot, state.root)
146129
end
147130

148-
defp handle_tick({slot, :second_third}, state) do
131+
def handle_tick({slot, :second_third}, state) do
132+
log_debug(state.validator.index, "started second third", slot: slot)
149133
# Here we may:
150134
# 1. send our attestation for an empty slot
151135
# 2. start building a payload
@@ -154,11 +138,16 @@ defmodule LambdaEthereumConsensus.Validator do
154138
|> maybe_build_payload(slot + 1)
155139
end
156140

157-
defp handle_tick({slot, :last_third}, state) do
141+
def handle_tick({slot, :last_third}, state) do
142+
log_debug(state.validator.index, "started last third", slot: slot)
158143
# Here we may publish our attestation aggregate
159144
maybe_publish_aggregate(state, slot)
160145
end
161146

147+
##########################
148+
### Private Functions
149+
##########################
150+
162151
@spec update_state(state, Types.slot(), Types.root()) :: state
163152

164153
defp update_state(%{slot: slot, root: root} = state, slot, root), do: state
@@ -256,16 +245,23 @@ defmodule LambdaEthereumConsensus.Validator do
256245
end
257246

258247
@spec attest(state, Duties.attester_duty()) :: :ok
259-
defp attest(state, current_duty) do
248+
defp attest(%{validator: validator} = state, current_duty) do
260249
subnet_id = current_duty.subnet_id
250+
log_debug(validator.index, "attesting", slot: current_duty.slot, subnet_id: subnet_id)
251+
261252
attestation = produce_attestation(current_duty, state.root, state.validator.privkey)
262253

263-
Logger.info("[Validator] Attesting in slot #{attestation.data.slot} on subnet #{subnet_id}")
254+
log_md = [slot: attestation.data.slot, attestation: attestation, subnet_id: subnet_id]
255+
log_debug(validator.index, "publishing attestation", log_md)
256+
264257
Gossip.Attestation.publish(subnet_id, attestation)
258+
|> log_debug_result(validator.index, "published attestation", log_md)
265259

266260
if current_duty.should_aggregate? do
267-
Logger.info("[Validator] Collecting messages for future aggregation...")
261+
log_debug(validator.index, "collecting for future aggregation", log_md)
262+
268263
Gossip.Attestation.collect(subnet_id, attestation)
264+
|> log_debug_result(validator.index, "collected attestation", log_md)
269265
end
270266
end
271267

@@ -288,14 +284,17 @@ defmodule LambdaEthereumConsensus.Validator do
288284
defp publish_aggregate(duty, validator) do
289285
case Gossip.Attestation.stop_collecting(duty.subnet_id) do
290286
{:ok, attestations} ->
291-
Logger.info("[Validator] Publishing aggregate of slot #{duty.slot}")
287+
log_md = [slot: duty.slot, attestations: attestations]
288+
log_debug(validator.index, "publishing aggregate", log_md)
292289

293290
aggregate_attestations(attestations)
294291
|> append_proof(duty.selection_proof, validator)
295292
|> append_signature(duty.signing_domain, validator)
296293
|> Gossip.Attestation.publish_aggregate()
294+
|> log_info_result(validator.index, "published aggregate", log_md)
297295

298-
_ ->
296+
{:error, reason} ->
297+
log_error(validator.index, "stop collecting attestations", reason)
299298
:ok
300299
end
301300
end
@@ -378,7 +377,7 @@ defmodule LambdaEthereumConsensus.Validator do
378377
BlockStates.get_state_info!(parent_root).beacon_state |> go_to_slot(slot)
379378
end
380379

381-
@spec fetch_validator_index(Types.BeaconState.t(), %{index: nil, pubkey: Bls.pubkey()}) ::
380+
@spec fetch_validator_index(Types.BeaconState.t(), validator()) ::
382381
non_neg_integer() | nil
383382
defp fetch_validator_index(beacon, %{index: nil, pubkey: pk}) do
384383
Enum.find_index(beacon.validators, &(&1.pubkey == pk))
@@ -399,18 +398,18 @@ defmodule LambdaEthereumConsensus.Validator do
399398

400399
defp start_payload_builder(%{payload_builder: {slot, root, _}} = state, slot, root), do: state
401400

402-
defp start_payload_builder(state, proposed_slot, head_root) do
401+
defp start_payload_builder(%{validator: validator} = state, proposed_slot, head_root) do
403402
# TODO: handle reorgs and late blocks
404-
Logger.info("[Validator] Starting to build payload for slot #{proposed_slot}")
403+
log_debug(validator.index, "starting building payload", slot: proposed_slot)
405404

406405
case BlockBuilder.start_building_payload(proposed_slot, head_root) do
407406
{:ok, payload_id} ->
407+
log_debug(validator.index, "payload built", slot: proposed_slot)
408+
408409
%{state | payload_builder: {proposed_slot, head_root, payload_id}}
409410

410411
{:error, reason} ->
411-
Logger.error(
412-
"[Validator] Failed to start building payload for slot #{proposed_slot}. Reason: #{reason}"
413-
)
412+
log_error(validator.index, "start building payload", reason, slot: proposed_slot)
414413

415414
%{state | payload_builder: nil}
416415
end
@@ -432,6 +431,8 @@ defmodule LambdaEthereumConsensus.Validator do
432431
} = state,
433432
proposed_slot
434433
) do
434+
log_debug(validator.index, "building block", slot: proposed_slot)
435+
435436
build_result =
436437
BlockBuilder.build_block(
437438
%BuildBlockRequest{
@@ -446,20 +447,19 @@ defmodule LambdaEthereumConsensus.Validator do
446447

447448
case build_result do
448449
{:ok, {signed_block, blob_sidecars}} ->
449-
publish_block(signed_block)
450-
Enum.each(blob_sidecars, &publish_sidecar/1)
450+
publish_block(validator.index, signed_block)
451+
Enum.each(blob_sidecars, &publish_sidecar(validator.index, &1))
451452

452453
{:error, reason} ->
453-
Logger.error(
454-
"[Validator] Failed to build block for slot #{proposed_slot}. Reason: #{reason}"
455-
)
454+
log_error(validator.index, "build block", reason, slot: proposed_slot)
456455
end
457456

458457
%{state | payload_builder: nil}
459458
end
460459

460+
# TODO: at least in kurtosis there are blocks that are proposed without a payload apparently, must investigate.
461461
defp propose(%{payload_builder: nil} = state, _proposed_slot) do
462-
Logger.error("[Validator] Tried to propose a block without an execution payload")
462+
log_error(state.validator.index, "propose block", "lack of execution payload")
463463
state
464464
end
465465

@@ -472,47 +472,57 @@ defmodule LambdaEthereumConsensus.Validator do
472472
end
473473

474474
# TODO: there's a lot of repeated code here. We should move this to a separate module
475-
defp publish_block(signed_block) do
475+
defp publish_block(validator_index, signed_block) do
476476
{:ok, ssz_encoded} = Ssz.to_ssz(signed_block)
477477
{:ok, encoded_msg} = :snappyer.compress(ssz_encoded)
478478
fork_context = ForkChoice.get_fork_digest() |> Base.encode16(case: :lower)
479479

480480
proposed_slot = signed_block.message.slot
481481

482-
# TODO: we might want to send the block to ForkChoice
483-
case Libp2pPort.publish("/eth2/#{fork_context}/beacon_block/ssz_snappy", encoded_msg) do
484-
:ok ->
485-
Logger.info("[Validator] Proposed block for slot #{proposed_slot}")
482+
log_debug(validator_index, "publishing block", slot: proposed_slot)
486483

487-
{:error, reason} ->
488-
Logger.error(
489-
"[Validator] Failed to publish block for slot #{proposed_slot}. Reason: #{reason}"
490-
)
491-
end
484+
# TODO: we might want to send the block to ForkChoice
485+
Libp2pPort.publish("/eth2/#{fork_context}/beacon_block/ssz_snappy", encoded_msg)
486+
|> log_info_result(validator_index, "published block", slot: proposed_slot)
492487
end
493488

494-
defp publish_sidecar(%Types.BlobSidecar{index: index} = sidecar) do
489+
defp publish_sidecar(validator_index, %Types.BlobSidecar{index: index} = sidecar) do
495490
{:ok, ssz_encoded} = Ssz.to_ssz(sidecar)
496491
{:ok, encoded_msg} = :snappyer.compress(ssz_encoded)
497492
fork_context = ForkChoice.get_fork_digest() |> Base.encode16(case: :lower)
498493

499494
subnet_id = compute_subnet_for_blob_sidecar(index)
500495

501-
case Libp2pPort.publish(
502-
"/eth2/#{fork_context}/blob_sidecar_#{subnet_id}/ssz_snappy",
503-
encoded_msg
504-
) do
505-
:ok ->
506-
:ok
496+
log_debug(validator_index, "publishing sidecar", sidecar_index: index)
507497

508-
{:error, reason} ->
509-
Logger.error(
510-
"[Validator] Failed to publish sidecar with index #{index}. Reason: #{reason}"
511-
)
512-
end
498+
Libp2pPort.publish("/eth2/#{fork_context}/blob_sidecar_#{subnet_id}/ssz_snappy", encoded_msg)
499+
|> log_debug_result(validator_index, "published sidecar", sidecar_index: index)
513500
end
514501

515502
defp compute_subnet_for_blob_sidecar(blob_index) do
516503
rem(blob_index, ChainSpec.get("BLOB_SIDECAR_SUBNET_COUNT"))
517504
end
505+
506+
# Some Log Helpers to avoid repetition
507+
508+
defp log_info_result(result, index, message, metadata),
509+
do: log_result(result, :info, index, message, metadata)
510+
511+
defp log_debug_result(result, index, message, metadata),
512+
do: log_result(result, :debug, index, message, metadata)
513+
514+
defp log_result(:ok, :info, index, message, metadata), do: log_info(index, message, metadata)
515+
defp log_result(:ok, :debug, index, message, metadata), do: log_debug(index, message, metadata)
516+
517+
defp log_result({:error, reason}, _level, index, message, metadata),
518+
do: log_error(index, message, reason, metadata)
519+
520+
defp log_info(index, message, metadata),
521+
do: Logger.info("[Validator] #{index} #{message}", metadata)
522+
523+
defp log_debug(index, message, metadata),
524+
do: Logger.debug("[Validator] #{index} #{message}", metadata)
525+
526+
defp log_error(index, message, reason, metadata \\ []),
527+
do: Logger.error("[Validator] #{index} Failed to #{message}. Reason: #{reason}", metadata)
518528
end

0 commit comments

Comments
 (0)