|
| 1 | +defmodule LambdaEthereumConsensus.ValidatorPool do |
| 2 | + @moduledoc """ |
| 3 | + Module that holds the pool of validators and their states, |
| 4 | + it also manages the validator's duties as bitmaps to |
| 5 | + simplify the delegation of work. |
| 6 | + """ |
| 7 | + |
| 8 | + defstruct epoch: nil, slot: nil, head_root: nil, validators: %{uninitialized: []} |
| 9 | + |
| 10 | + require Logger |
| 11 | + |
| 12 | + alias LambdaEthereumConsensus.StateTransition.Misc |
| 13 | + alias LambdaEthereumConsensus.Validator |
| 14 | + |
| 15 | + @type validators :: %{atom() => list(Validator.state())} |
| 16 | + @type t :: %__MODULE__{ |
| 17 | + epoch: Types.epoch(), |
| 18 | + slot: Types.slot(), |
| 19 | + head_root: Types.root(), |
| 20 | + validators: validators() |
| 21 | + } |
| 22 | + |
| 23 | + @doc """ |
| 24 | + Initiate the pool of validators, given the slot and head root. |
| 25 | + """ |
| 26 | + @spec init(Types.slot(), Types.root()) :: t() |
| 27 | + def init(slot, head_root) do |
| 28 | + config = Application.get_env(:lambda_ethereum_consensus, __MODULE__, []) |
| 29 | + keystore_dir = Keyword.get(config, :keystore_dir) |
| 30 | + keystore_pass_dir = Keyword.get(config, :keystore_pass_dir) |
| 31 | + |
| 32 | + setup_validators(slot, head_root, keystore_dir, keystore_pass_dir) |
| 33 | + end |
| 34 | + |
| 35 | + defp setup_validators(_s, _r, keystore_dir, keystore_pass_dir) |
| 36 | + when is_nil(keystore_dir) or is_nil(keystore_pass_dir) do |
| 37 | + Logger.warning( |
| 38 | + "[Validator] No keystore_dir or keystore_pass_dir provided. Validators won't start." |
| 39 | + ) |
| 40 | + |
| 41 | + %__MODULE__{} |
| 42 | + end |
| 43 | + |
| 44 | + defp setup_validators(slot, head_root, keystore_dir, keystore_pass_dir) do |
| 45 | + validator_keys = decode_validator_keys(keystore_dir, keystore_pass_dir) |
| 46 | + |
| 47 | + validators = Enum.map(validator_keys, &Validator.new({slot, head_root, &1})) |
| 48 | + |
| 49 | + Logger.info("[Validator] Initialized #{Enum.count(validators)} validators") |
| 50 | + |
| 51 | + %__MODULE__{ |
| 52 | + epoch: Misc.compute_epoch_at_slot(slot), |
| 53 | + slot: slot, |
| 54 | + head_root: head_root, |
| 55 | + validators: %{uninitialized: validators} |
| 56 | + } |
| 57 | + end |
| 58 | + |
| 59 | + @doc """ |
| 60 | + Notify all validators of a new head. |
| 61 | + """ |
| 62 | + @spec notify_head(t(), Types.slot(), Types.root()) :: t() |
| 63 | + def notify_head(%{validators: %{uninitialized: validators}} = pool, slot, head_root) do |
| 64 | + uninitialized_validators = |
| 65 | + maybe_debug_notify( |
| 66 | + fn -> |
| 67 | + Enum.map(validators, &Validator.handle_new_head(slot, head_root, &1)) |
| 68 | + end, |
| 69 | + {:new_head, slot, head_root} |
| 70 | + ) |
| 71 | + |
| 72 | + %{pool | validators: %{uninitialized: uninitialized_validators}} |
| 73 | + end |
| 74 | + |
| 75 | + @doc """ |
| 76 | + Notify all validators of a new block. |
| 77 | + """ |
| 78 | + @spec notify_tick(t(), tuple()) :: t() |
| 79 | + def notify_tick(%{validators: %{uninitialized: validators}} = pool, slot_data) do |
| 80 | + uninitialized_validators = |
| 81 | + maybe_debug_notify( |
| 82 | + fn -> |
| 83 | + Enum.map(validators, &Validator.handle_tick(slot_data, &1)) |
| 84 | + end, |
| 85 | + {:on_tick, slot_data} |
| 86 | + ) |
| 87 | + |
| 88 | + %{pool | validators: %{uninitialized: uninitialized_validators}} |
| 89 | + end |
| 90 | + |
| 91 | + defp maybe_debug_notify(fun, data) do |
| 92 | + if Application.get_env(:logger, :level) == :debug do |
| 93 | + Logger.debug("[Validator] Notifying all Validators with message: #{inspect(data)}") |
| 94 | + |
| 95 | + start_time = System.monotonic_time(:millisecond) |
| 96 | + result = fun.() |
| 97 | + end_time = System.monotonic_time(:millisecond) |
| 98 | + |
| 99 | + Logger.debug( |
| 100 | + "[Validator] #{inspect(data)} notified to all Validators after #{end_time - start_time} ms" |
| 101 | + ) |
| 102 | + |
| 103 | + result |
| 104 | + else |
| 105 | + fun.() |
| 106 | + end |
| 107 | + end |
| 108 | + |
| 109 | + @doc """ |
| 110 | + Get validator keys from the keystore directory. |
| 111 | + This function expects two files for each validator: |
| 112 | + - <keystore_dir>/<public_key>.json |
| 113 | + - <keystore_pass_dir>/<public_key>.txt |
| 114 | + """ |
| 115 | + @spec decode_validator_keys(binary(), binary()) :: |
| 116 | + list({Bls.pubkey(), Bls.privkey()}) |
| 117 | + def decode_validator_keys(keystore_dir, keystore_pass_dir) |
| 118 | + when is_binary(keystore_dir) and is_binary(keystore_pass_dir) do |
| 119 | + keystore_dir |
| 120 | + |> File.ls!() |
| 121 | + |> Enum.flat_map(&paths_from_filename(keystore_dir, keystore_pass_dir, &1, Path.extname(&1))) |
| 122 | + |> Enum.flat_map(&decode_key/1) |
| 123 | + end |
| 124 | + |
| 125 | + defp paths_from_filename(keystore_dir, keystore_pass_dir, filename, ".json") do |
| 126 | + basename = Path.basename(filename, ".json") |
| 127 | + |
| 128 | + keystore_file = Path.join(keystore_dir, "#{basename}.json") |
| 129 | + keystore_pass_file = Path.join(keystore_pass_dir, "#{basename}.txt") |
| 130 | + |
| 131 | + [{keystore_file, keystore_pass_file}] |
| 132 | + end |
| 133 | + |
| 134 | + defp paths_from_filename(_keystore_dir, _keystore_pass_dir, basename, _ext) do |
| 135 | + Logger.warning("[Validator] Skipping file: #{basename}. Not a json keystore file.") |
| 136 | + [] |
| 137 | + end |
| 138 | + |
| 139 | + defp decode_key({keystore_file, keystore_pass_file}) do |
| 140 | + # TODO: remove `try` and handle errors properly |
| 141 | + [Keystore.decode_from_files!(keystore_file, keystore_pass_file)] |
| 142 | + rescue |
| 143 | + error -> |
| 144 | + Logger.error( |
| 145 | + "[Validator] Failed to decode keystore file: #{keystore_file}. Pass file: #{keystore_pass_file} Error: #{inspect(error)}" |
| 146 | + ) |
| 147 | + |
| 148 | + [] |
| 149 | + end |
| 150 | +end |
0 commit comments