Skip to content

Commit fd38d93

Browse files
authored
feat: validator proposal duties lookahead (#909)
1 parent ecd501d commit fd38d93

File tree

5 files changed

+154
-75
lines changed

5 files changed

+154
-75
lines changed

lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
6060
@spec stop_collecting(non_neg_integer()) ::
6161
{:ok, list(Types.Attestation.t())} | {:error, String.t()}
6262
def stop_collecting(subnet_id) do
63-
leave(subnet_id)
63+
# TODO: implement some way to unsubscribe without leaving the topic
64+
topic = get_topic_name(subnet_id)
65+
Libp2pPort.leave_topic(topic)
66+
Libp2pPort.join_topic(topic)
6467
GenServer.call(__MODULE__, {:stop_collecting, subnet_id})
6568
end
6669

@@ -99,6 +102,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
99102
attestations = Map.put(state.attestations, subnet_id, [attestation])
100103
attnets = Map.put(state.attnets, subnet_id, attestation.data)
101104
new_state = %{state | attnets: attnets, attestations: attestations}
105+
get_topic_name(subnet_id) |> Libp2pPort.subscribe_to_topic()
102106
{:reply, :ok, new_state}
103107
end
104108

lib/lambda_ethereum_consensus/state_transition/accessors.ex

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,11 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do
245245
"""
246246
@spec get_beacon_proposer_index(BeaconState.t()) ::
247247
{:ok, Types.validator_index()} | {:error, String.t()}
248-
def get_beacon_proposer_index(%BeaconState{slot: slot} = state) do
249-
epoch = get_current_epoch(state)
248+
def get_beacon_proposer_index(%BeaconState{slot: state_slot} = state, slot \\ nil) do
249+
slot = if is_nil(slot), do: state_slot, else: slot
250+
# NOTE: slot should be within the state's current epoch, otherwise the result can change
251+
epoch = Misc.compute_epoch_at_slot(slot)
252+
250253
{:ok, root} = get_epoch_root(state, epoch)
251254

252255
Cache.lazily_compute(:beacon_proposer_index, {slot, root}, fn ->

lib/lambda_ethereum_consensus/validator/validator.ex

Lines changed: 110 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ defmodule LambdaEthereumConsensus.Validator do
3535
slot: slot,
3636
root: head_root,
3737
duties: %{
38-
attester: {:not_computed, :not_computed}
38+
# Order is: previous epoch, current epoch, next epoch
39+
attester: [:not_computed, :not_computed, :not_computed],
40+
proposer: :not_computed
3941
},
4042
validator: validator
4143
}
@@ -90,23 +92,30 @@ defmodule LambdaEthereumConsensus.Validator do
9092
# TODO: this doesn't take into account reorgs or empty slots
9193
new_state = update_state(state, slot, head_root)
9294

93-
if should_attest?(state, slot), do: attest(state)
94-
maybe_publish_aggregate(state, slot)
95+
case get_current_attester_duty(new_state, slot) do
96+
nil -> :ok
97+
duty -> attest(new_state, duty)
98+
end
99+
100+
new_state = maybe_publish_aggregate(new_state, slot)
101+
102+
if should_propose?(new_state, slot), do: propose(new_state)
95103

96104
{:noreply, new_state}
97105
end
98106

99107
defp update_state(%{slot: last_slot, root: last_root} = state, slot, head_root) do
100-
last_epoch = Misc.compute_epoch_at_slot(last_slot)
101-
epoch = Misc.compute_epoch_at_slot(slot)
108+
last_epoch = Misc.compute_epoch_at_slot(last_slot + 1)
109+
epoch = Misc.compute_epoch_at_slot(slot + 1)
102110

103111
if last_epoch == epoch do
104112
state
105113
else
106-
target_root =
107-
if slot == Misc.compute_start_slot_at_epoch(epoch), do: head_root, else: last_root
114+
start_slot = Misc.compute_start_slot_at_epoch(epoch)
115+
target_root = if slot == start_slot, do: head_root, else: last_root
108116

109-
new_beacon = fetch_target_state(epoch, target_root)
117+
# Process the start of the new epoch
118+
new_beacon = fetch_target_state(epoch, target_root) |> process_slots(start_slot)
110119

111120
new_duties =
112121
shift_duties(state.duties, epoch, last_epoch)
@@ -124,48 +133,48 @@ defmodule LambdaEthereumConsensus.Validator do
124133
state
125134
end
126135

127-
defp shift_duties(%{attester: {_, ep1}} = duties, epoch, current_epoch)
128-
when epoch + 1 == current_epoch do
129-
%{duties | attester: {ep1, :not_computed}}
136+
defp shift_duties(%{attester: [_ep0, ep1, ep2]} = duties, epoch, current_epoch) do
137+
case current_epoch - epoch do
138+
1 -> %{duties | attester: [ep1, ep2, :not_computed]}
139+
2 -> %{duties | attester: [ep2, :not_computed, :not_computed]}
140+
_ -> %{duties | attester: [:not_computed, :not_computed, :not_computed]}
141+
end
130142
end
131143

132-
defp shift_duties(duties, _, _), do: %{duties | attester: {:not_computed, :not_computed}}
133-
134144
defp maybe_update_duties(duties, beacon_state, epoch, validator) do
135145
attester_duties =
136146
maybe_update_attester_duties(duties.attester, beacon_state, epoch, validator)
137147

138-
%{duties | attester: attester_duties}
139-
end
148+
proposer_duties = compute_proposer_duties(beacon_state, epoch, validator.index)
140149

141-
defp maybe_update_attester_duties({:not_computed, _} = duties, beacon_state, epoch, validator) do
142-
compute_attester_duty(duties, 0, beacon_state, epoch, validator)
143-
|> maybe_update_attester_duties(beacon_state, epoch, validator)
150+
%{duties | attester: attester_duties, proposer: proposer_duties}
144151
end
145152

146-
defp maybe_update_attester_duties({_, :not_computed} = duties, beacon_state, epoch, validator) do
147-
compute_attester_duty(duties, 1, beacon_state, epoch, validator)
148-
|> maybe_update_attester_duties(beacon_state, epoch, validator)
149-
end
153+
defp maybe_update_attester_duties([epp, ep0, ep1], beacon_state, epoch, validator) do
154+
duties =
155+
Stream.with_index([ep0, ep1])
156+
|> Enum.map(fn
157+
{:not_computed, i} -> compute_attester_duty(beacon_state, epoch + i, validator)
158+
{d, _} -> d
159+
end)
150160

151-
defp maybe_update_attester_duties(duties, _, _, _), do: duties
161+
[epp | duties]
162+
end
152163

153-
defp compute_attester_duty(duties, index, beacon_state, base_epoch, validator)
154-
when index in 0..1 do
155-
epoch = base_epoch + index
164+
defp compute_attester_duty(beacon_state, epoch, validator) do
156165
# Can't fail
157166
{:ok, duty} = Utils.get_committee_assignment(beacon_state, epoch, validator.index)
158167

159-
duty =
160-
update_with_aggregation_duty(duty, beacon_state, validator.privkey)
161-
|> update_with_subnet_id(beacon_state, epoch)
162-
163-
put_elem(duties, index, duty)
168+
update_with_aggregation_duty(duty, beacon_state, validator.privkey)
169+
|> update_with_subnet_id(beacon_state, epoch)
164170
end
165171

166-
defp move_subnets(%{attester: {old_ep0, old_ep1}}, %{attester: {ep0, ep1}}) do
167-
old_subnets = MapSet.new([old_ep0.subnet_id, old_ep1.subnet_id])
168-
new_subnets = MapSet.new([ep0.subnet_id, ep1.subnet_id])
172+
defp get_subnet_ids(duties),
173+
do: duties |> Stream.reject(&(&1 == :not_computed)) |> Enum.map(& &1.subnet_id)
174+
175+
defp move_subnets(%{attester: old_duties}, %{attester: new_duties}) do
176+
old_subnets = old_duties |> get_subnet_ids() |> MapSet.new()
177+
new_subnets = new_duties |> get_subnet_ids() |> MapSet.new()
169178

170179
# leave old subnets (except for recurring ones)
171180
MapSet.difference(old_subnets, new_subnets) |> leave()
@@ -174,8 +183,8 @@ defmodule LambdaEthereumConsensus.Validator do
174183
MapSet.difference(new_subnets, old_subnets) |> join()
175184
end
176185

177-
defp join_subnets_for_duties(%{attester: {ep0, ep1}}) do
178-
join([ep0.subnet_id, ep1.subnet_id])
186+
defp join_subnets_for_duties(%{attester: duties}) do
187+
duties |> get_subnet_ids() |> join()
179188
end
180189

181190
defp join(subnets) do
@@ -192,54 +201,80 @@ defmodule LambdaEthereumConsensus.Validator do
192201
end
193202
end
194203

195-
defp log_duties(%{attester: attester_duties}, validator_index) do
196-
{%{index_in_committee: i0, committee_index: ci0, slot: slot0},
197-
%{index_in_committee: i1, committee_index: ci1, slot: slot1}} = attester_duties
198-
199-
Logger.info(
200-
"Validator #{validator_index} has to attest in committee #{ci0} of slot #{slot0} with index #{i0}," <>
201-
" and in committee #{ci1} of slot #{slot1} with index #{i1}"
202-
)
204+
defp log_duties(%{attester: attester_duties, proposer: proposer_duties}, validator_index) do
205+
attester_duties
206+
# Drop the first element, which is the previous epoch's duty
207+
|> Stream.drop(1)
208+
|> Enum.each(fn %{index_in_committee: i, committee_index: ci, slot: slot} ->
209+
Logger.info(
210+
"[Validator] #{validator_index} has to attest in committee #{ci} of slot #{slot} with index #{i}"
211+
)
212+
end)
213+
214+
Enum.each(proposer_duties, fn slot ->
215+
Logger.info("[Validator] #{validator_index} has to propose a block in slot #{slot}!")
216+
end)
203217
end
204218

205-
defp should_attest?(%{duties: %{attester: {%{slot: duty_slot}, _}}}, slot),
206-
do: duty_slot == slot
219+
defp get_current_attester_duty(state, current_slot) do
220+
find_attester_duty(state, &(&1.slot == current_slot))
221+
end
207222

208-
defp attest(state) do
209-
{current_duty, _} = state.duties.attester
223+
defp find_attester_duty(%{duties: %{attester: attester_duties}}, search_fn) do
224+
Enum.find(attester_duties, fn
225+
:not_computed -> false
226+
duty -> search_fn.(duty)
227+
end)
228+
end
210229

230+
defp attest(state, current_duty) do
211231
{subnet_id, attestation} =
212232
produce_attestation(current_duty, state.root, state.validator.privkey)
213233

214234
Logger.info("[Validator] Attesting in slot #{attestation.data.slot} on subnet #{subnet_id}")
215235
Gossip.Attestation.publish(subnet_id, attestation)
216236

217-
if current_duty.is_aggregator do
237+
if current_duty.should_aggregate? do
218238
Logger.info("[Validator] Collecting messages for future aggregation...")
219239
Gossip.Attestation.collect(subnet_id, attestation)
220240
end
221241
end
222242

223243
# We publish our aggregate on the next slot, and when we're an aggregator
224244
# TODO: we should publish it two-thirds of the way through the slot
225-
def maybe_publish_aggregate(%{duties: %{attester: {duty, _}}, validator: validator}, slot)
226-
when duty.slot == slot + 1 and duty.is_aggregator do
245+
defp maybe_publish_aggregate(%{duties: duties, validator: validator} = state, slot) do
246+
case find_attester_duty(state, &(&1.slot < slot and &1.should_aggregate?)) do
247+
nil ->
248+
state
249+
250+
duty ->
251+
publish_aggregate(duty, validator)
252+
253+
attester_duties =
254+
Enum.map(duties.attester, fn
255+
^duty -> %{duty | should_aggregate?: false}
256+
d -> d
257+
end)
258+
259+
%{state | duties: %{duties | attester: attester_duties}}
260+
end
261+
end
262+
263+
defp publish_aggregate(duty, validator) do
227264
case Gossip.Attestation.stop_collecting(duty.subnet_id) do
228265
{:ok, attestations} ->
229-
Logger.info("[Validator] Publishing aggregate of slot #{slot}")
266+
Logger.info("[Validator] Publishing aggregate of slot #{duty.slot}")
230267

231268
aggregate_attestations(attestations)
232269
|> append_proof(duty.selection_proof, validator)
233270
|> append_signature(duty.signing_domain, validator)
234271
|> Gossip.Attestation.publish_aggregate()
235272

236273
_ ->
237-
Logger.error("[Validator] Failed to publish aggregate")
274+
:ok
238275
end
239276
end
240277

241-
def maybe_publish_aggregate(_, _), do: :ok
242-
243278
defp aggregate_attestations(attestations) do
244279
aggregation_bits =
245280
attestations
@@ -325,11 +360,11 @@ defmodule LambdaEthereumConsensus.Validator do
325360
epoch = Misc.compute_epoch_at_slot(duty.slot)
326361
domain = Accessors.get_domain(beacon_state, Constants.domain_aggregate_and_proof(), epoch)
327362

328-
Map.put(duty, :is_aggregator, true)
363+
Map.put(duty, :should_aggregate?, true)
329364
|> Map.put(:selection_proof, proof)
330365
|> Map.put(:signing_domain, domain)
331366
else
332-
Map.put(duty, :is_aggregator, false)
367+
Map.put(duty, :should_aggregate?, false)
333368
end
334369
end
335370

@@ -345,4 +380,22 @@ defmodule LambdaEthereumConsensus.Validator do
345380
defp fetch_validator_index(beacon, %{index: nil, pubkey: pk}) do
346381
beacon.validators |> Enum.find_index(&(&1.pubkey == pk))
347382
end
383+
384+
defp compute_proposer_duties(beacon_state, epoch, validator_index) do
385+
start_slot = Misc.compute_start_slot_at_epoch(epoch)
386+
387+
start_slot..(start_slot + ChainSpec.get("SLOTS_PER_EPOCH") - 1)
388+
|> Enum.flat_map(fn slot ->
389+
# Can't fail
390+
{:ok, proposer_index} = Accessors.get_beacon_proposer_index(beacon_state, slot)
391+
if proposer_index == validator_index, do: [slot], else: []
392+
end)
393+
end
394+
395+
# If we are the proposer for the next slot, we should propose a block now
396+
defp should_propose?(%{duties: %{proposer: slots}}, slot), do: Enum.member?(slots, slot + 1)
397+
398+
defp propose(_state) do
399+
# TODO: implement block proposal
400+
end
348401
end

native/libp2p_port/internal/subscriptions/subscriptions.go

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type subscription struct {
2323
}
2424

2525
type Subscriber struct {
26-
subscriptions map[string]subscription
26+
subscriptions map[string]*subscription
2727
pendingMessages sync.Map
2828
gsub *pubsub.PubSub
2929
port *port.Port
@@ -157,7 +157,7 @@ func NewSubscriber(p *port.Port, h host.Host) Subscriber {
157157
utils.PanicIfError(err)
158158

159159
return Subscriber{
160-
subscriptions: make(map[string]subscription),
160+
subscriptions: make(map[string]*subscription),
161161
gsub: gsub,
162162
port: p,
163163
}
@@ -185,11 +185,14 @@ func (s *Subscriber) Subscribe(topicName string, handler []byte) error {
185185
}
186186
s.gsub.RegisterTopicValidator(topicName, validator)
187187
ctx, cancel := context.WithCancel(context.Background())
188-
sub.Cancel = cancel
189-
topicSub, err := sub.Topic.Subscribe()
190-
utils.PanicIfError(err)
191-
go subscribeToTopic(topicSub, ctx, s.gsub)
192-
s.subscriptions[topicName] = sub
188+
cancelChan := make(chan struct{})
189+
sub.Cancel = func() {
190+
cancel()
191+
<-cancelChan
192+
err := s.gsub.UnregisterTopicValidator(topicName)
193+
utils.PanicIfError(err)
194+
}
195+
go subscribeToTopic(sub.Topic, ctx, cancelChan)
193196
return nil
194197
}
195198

@@ -202,7 +205,8 @@ func (s *Subscriber) Leave(topicName string) {
202205
if sub.Cancel != nil {
203206
sub.Cancel()
204207
}
205-
sub.Topic.Close()
208+
err := sub.Topic.Close()
209+
utils.PanicIfError(err)
206210
}
207211

208212
func (s *Subscriber) Validate(msgId []byte, intResult int) {
@@ -220,29 +224,35 @@ func (s *Subscriber) Validate(msgId []byte, intResult int) {
220224

221225
func (s *Subscriber) Publish(topicName string, message []byte) {
222226
sub := s.getSubscription(topicName)
223-
err := sub.Topic.Publish(context.Background(), message)
224-
utils.PanicIfError(err)
227+
go func() {
228+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
229+
defer cancel()
230+
err := sub.Topic.Publish(ctx, message)
231+
utils.PanicIfError(err)
232+
}()
225233
}
226234

227235
// NOTE: we send the message to the port in the validator.
228236
// Here we just flush received messages and handle unsubscription.
229-
func subscribeToTopic(sub *pubsub.Subscription, ctx context.Context, gsub *pubsub.PubSub) {
230-
topic := sub.Topic()
237+
func subscribeToTopic(topic *pubsub.Topic, ctx context.Context, cancelChan chan struct{}) {
238+
sub, err := topic.Subscribe()
239+
utils.PanicIfError(err)
231240
for {
232241
_, err := sub.Next(ctx)
233242
if err == context.Canceled {
234243
break
235244
}
236245
}
237-
gsub.UnregisterTopicValidator(topic)
246+
sub.Cancel()
247+
cancelChan <- struct{}{}
238248
}
239249

240-
func (s *Subscriber) getSubscription(topicName string) subscription {
250+
func (s *Subscriber) getSubscription(topicName string) *subscription {
241251
sub, isSubscribed := s.subscriptions[topicName]
242252
if !isSubscribed {
243253
topic, err := s.gsub.Join(topicName)
244254
utils.PanicIfError(err)
245-
sub = subscription{
255+
sub = &subscription{
246256
Topic: topic,
247257
}
248258
s.subscriptions[topicName] = sub

0 commit comments

Comments
 (0)