@@ -2,26 +2,125 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.SyncCommittee do
2
2
@ moduledoc """
3
3
This module handles sync committee from specific gossip subnets.
4
4
Used by validators to fulfill aggregation duties.
5
+
6
+ TODO: This module borrows almost all of its logic from Attestation,
7
+ this could be refactored to a common module if needed in the future.
5
8
"""
6
9
alias LambdaEthereumConsensus.ForkChoice
7
10
alias LambdaEthereumConsensus.Libp2pPort
11
+ alias LambdaEthereumConsensus.P2P
12
+ alias LambdaEthereumConsensus.P2P.Gossip.Handler
13
+ alias LambdaEthereumConsensus.StateTransition.Misc
14
+ alias Types.SyncSubnetInfo
15
+
16
+ @ behaviour Handler
8
17
9
18
require Logger
10
19
20
+ @ spec join ( [ non_neg_integer ( ) ] ) :: :ok
21
+ def join ( subnet_ids ) when is_list ( subnet_ids ) do
22
+ for subnet_id <- subnet_ids do
23
+ topic = topic ( subnet_id )
24
+ Libp2pPort . join_topic ( topic )
25
+
26
+ P2P.Metadata . set_syncnet ( subnet_id )
27
+ end
28
+
29
+ P2P.Metadata . get_metadata ( )
30
+ |> update_enr ( )
31
+ end
32
+
33
+ @ impl true
34
+ def handle_gossip_message ( store , topic , msg_id , message ) do
35
+ handle_gossip_message ( topic , msg_id , message )
36
+ store
37
+ end
38
+
39
+ def handle_gossip_message ( topic , msg_id , message ) do
40
+ subnet_id = extract_subnet_id ( topic )
41
+
42
+ with { :ok , uncompressed } <- :snappyer . decompress ( message ) ,
43
+ { :ok , sync_committee_msg } <- Ssz . from_ssz ( uncompressed , Types.SyncCommitteeMessage ) do
44
+ # TODO: validate before accepting
45
+ Libp2pPort . validate_message ( msg_id , :accept )
46
+
47
+ SyncSubnetInfo . add_message! ( subnet_id , sync_committee_msg )
48
+ else
49
+ { :error , _ } -> Libp2pPort . validate_message ( msg_id , :reject )
50
+ end
51
+ end
52
+
11
53
@ spec publish ( Types.SyncCommitteeMessage . t ( ) , [ non_neg_integer ( ) ] ) :: :ok
12
54
def publish ( % Types.SyncCommitteeMessage { } = sync_committee_msg , subnet_ids ) do
13
- Enum . each ( subnet_ids , fn subnet_id ->
55
+ for subnet_id <- subnet_ids do
14
56
topic = topic ( subnet_id )
15
57
16
58
{ :ok , encoded } = SszEx . encode ( sync_committee_msg , Types.SyncCommitteeMessage )
17
59
{ :ok , message } = :snappyer . compress ( encoded )
18
60
Libp2pPort . publish ( topic , message )
19
- end )
61
+ end
62
+
63
+ :ok
64
+ end
65
+
66
+ @ spec publish_contribution ( Types.SignedContributionAndProof . t ( ) ) :: :ok
67
+ def publish_contribution ( % Types.SignedContributionAndProof { } = signed_contribution ) do
68
+ fork_context = ForkChoice . get_fork_digest ( ) |> Base . encode16 ( case: :lower )
69
+ topic = "/eth2/#{ fork_context } /sync_committee_contribution_and_proof/ssz_snappy"
70
+ { :ok , encoded } = SszEx . encode ( signed_contribution , Types.SignedContributionAndProof )
71
+ { :ok , message } = :snappyer . compress ( encoded )
72
+ Libp2pPort . publish ( topic , message )
73
+ end
74
+
75
+ @ spec collect ( [ non_neg_integer ( ) ] , Types.SyncCommitteeMessage . t ( ) ) :: :ok
76
+ def collect ( subnet_ids , message ) do
77
+ join ( subnet_ids )
78
+
79
+ for subnet_id <- subnet_ids do
80
+ SyncSubnetInfo . new_subnet_with_message ( subnet_id , message )
81
+ Libp2pPort . async_subscribe_to_topic ( topic ( subnet_id ) , __MODULE__ )
82
+ end
83
+
84
+ :ok
85
+ end
86
+
87
+ @ spec stop_collecting ( non_neg_integer ( ) ) ::
88
+ { :ok , list ( Types.SyncCommitteeMessage . t ( ) ) } | { :error , String . t ( ) }
89
+ def stop_collecting ( subnet_id ) do
90
+ # TODO: (#1289) implement some way to unsubscribe without leaving the topic
91
+ topic = topic ( subnet_id )
92
+ Libp2pPort . leave_topic ( topic )
93
+ Libp2pPort . join_topic ( topic )
94
+ SyncSubnetInfo . stop_collecting ( subnet_id )
20
95
end
21
96
22
97
defp topic ( subnet_id ) do
23
98
# TODO: this doesn't take into account fork digest changes
24
99
fork_context = ForkChoice . get_fork_digest ( ) |> Base . encode16 ( case: :lower )
25
100
"/eth2/#{ fork_context } /sync_committee_#{ subnet_id } /ssz_snappy"
26
101
end
102
+
103
+ defp update_enr ( % { attnets: attnets , syncnets: syncnets } ) do
104
+ enr_fork_id = compute_enr_fork_id ( )
105
+ Libp2pPort . update_enr ( enr_fork_id , attnets , syncnets )
106
+ end
107
+
108
+ defp compute_enr_fork_id ( ) do
109
+ current_version = ForkChoice . get_fork_version ( )
110
+
111
+ fork_digest =
112
+ Misc . compute_fork_digest ( current_version , ChainSpec . get_genesis_validators_root ( ) )
113
+
114
+ % Types.EnrForkId {
115
+ fork_digest: fork_digest ,
116
+ next_fork_version: current_version ,
117
+ next_fork_epoch: Constants . far_future_epoch ( )
118
+ }
119
+ end
120
+
121
+ @ subnet_id_start byte_size ( "/eth2/00000000/sync_committee_" )
122
+
123
+ defp extract_subnet_id ( << _ :: binary - size ( @ subnet_id_start ) >> <> id_with_trailer ) do
124
+ id_with_trailer |> String . trim_trailing ( "/ssz_snappy" ) |> String . to_integer ( )
125
+ end
27
126
end
0 commit comments