Skip to content

Commit 3c0b20f

Browse files
authored
feat: sse implementation for events beacon API (#1365)
1 parent 87ecfb9 commit 3c0b20f

File tree

12 files changed

+180
-4
lines changed

12 files changed

+180
-4
lines changed

config/config.exs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,13 @@ if System.get_env("RUSTLER_SKIP_COMPILE") do
5252
config :lambda_ethereum_consensus, Snappy, skip_compilation?: true
5353
config :lambda_ethereum_consensus, Ssz, skip_compilation?: true
5454
end
55+
56+
config :sse,
57+
keep_alive: {:system, "SSE_KEEP_ALIVE_IN_MS", 55000}
58+
59+
config :event_bus,
60+
topics: [:finalized_checkpoint, :block]
61+
62+
config :mime, :types, %{
63+
"text/event-stream" => ["sse"]
64+
}

config/runtime.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,9 @@ config :lambda_ethereum_consensus, EngineApi,
150150
# Beacon API
151151
config :lambda_ethereum_consensus, BeaconApi.Endpoint,
152152
server: enable_beacon_api,
153-
http: [port: beacon_api_port || 4000],
153+
# We use an infinit idle timeout to avoid closing sse connections, if needed we can
154+
# create a separate endpoint for them.
155+
http: [port: beacon_api_port || 4000, protocol_options: [idle_timeout: :infinity]],
154156
url: [host: "localhost"],
155157
render_errors: [
156158
formats: [json: BeaconApi.ErrorJSON],
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
defmodule BeaconApi.V1.EventsController do
2+
use BeaconApi, :controller
3+
4+
alias BeaconApi.ApiSpec
5+
alias BeaconApi.EventPubSub
6+
7+
require Logger
8+
9+
def open_api_operation(:subscribe),
10+
do: ApiSpec.spec().paths["/eth/v1/events"].get
11+
12+
@spec subscribe(Plug.Conn.t(), any) :: Plug.Conn.t()
13+
def subscribe(conn, %{"topics" => topics}) do
14+
case parse_topics(topics) do
15+
{:ok, topics} ->
16+
EventPubSub.sse_subscribe(conn, topics)
17+
18+
{:error, error} ->
19+
send_chunked_error(conn, error)
20+
end
21+
end
22+
23+
def subscribe(conn, _params) do
24+
error =
25+
Jason.encode!(%{
26+
code: 400,
27+
message: "Missing field topics"
28+
})
29+
30+
send_chunked_error(conn, error)
31+
end
32+
33+
defp parse_topics(topics_string) do
34+
# topics is a string list in the form of: "finalized_checkpoint, block" we need to split it
35+
topics = topics_string |> String.split(",") |> Enum.map(&String.trim/1)
36+
not_implemented_topics = Enum.reject(topics, &EventPubSub.implemented_topic?/1)
37+
38+
if Enum.empty?(not_implemented_topics) do
39+
{:ok, topics}
40+
else
41+
{:error,
42+
"Invalid topic/s #{inspect(not_implemented_topics)}. For now, only #{inspect(EventPubSub.implemented_topics())} are supported."}
43+
end
44+
end
45+
46+
defp send_chunked_error(conn, error) do
47+
conn
48+
|> Plug.Conn.send_chunked(400)
49+
|> Plug.Conn.chunk(error)
50+
|> case do
51+
{:ok, conn} -> Plug.Conn.halt(conn)
52+
{:error, _reason} -> Plug.Conn.halt(conn)
53+
end
54+
end
55+
end

lib/beacon_api/event_pubsub.ex

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
defmodule BeaconApi.EventPubSub do
2+
@moduledoc """
3+
Event listener for aggregating and sending events for SSE subscribers.
4+
5+
TODO: (#1368) This depends on `event_bus` and `sse`, but it could be easily switched later:
6+
- `event_bus` we could move to phoenix pubsub
7+
- `sse` we could just implement it ourselves using Plug.Conn.chunk and Plug.Conn.send_chunked
8+
9+
The idea is to have a single place to publish events, and then a method for a connection to subscribe to them.
10+
"""
11+
12+
require Logger
13+
alias EventBus.Model.Event
14+
alias LambdaEthereumConsensus.Store
15+
alias SSE.Chunk
16+
alias Types.StateInfo
17+
18+
@type topic() :: String.t() | atom()
19+
@type topics() :: list(topic())
20+
@type event_data() :: any()
21+
22+
# This is also dependant on the already needed event_bus compile time config,
23+
# we maintain them as strings for convienience
24+
@implemented_topics Application.compile_env!(:event_bus, :topics) |> Enum.map(&Atom.to_string/1)
25+
26+
@spec implemented_topics() :: topics()
27+
def implemented_topics(), do: @implemented_topics
28+
29+
@spec implemented_topic?(topic()) :: boolean()
30+
def implemented_topic?(topic) when is_atom(topic), do: implemented_topic?(Atom.to_string(topic))
31+
def implemented_topic?(topic) when is_binary(topic), do: topic in @implemented_topics
32+
33+
@doc """
34+
Publish an event to the event bus.
35+
36+
TODO: We might want a noop if there are no subscribers for a topic.
37+
"""
38+
@spec publish(topic(), event_data()) :: :ok | {:error, atom()}
39+
def publish(:finalized_checkpoint = topic, %{root: block_root, epoch: epoch}) do
40+
case Store.BlockStates.get_state_info(block_root) do
41+
%StateInfo{root: state_root} ->
42+
data = %{
43+
block: BeaconApi.Utils.hex_encode(block_root),
44+
state: BeaconApi.Utils.hex_encode(state_root),
45+
epoch: Integer.to_string(epoch),
46+
# TODO: this is a placeholder, we need to get if the execution is optimistic or not
47+
execution_optimistic: false
48+
}
49+
50+
chunk = %Chunk{event: topic, data: [Jason.encode!(data)]}
51+
event = %Event{id: UUID.uuid4(), data: chunk, topic: topic}
52+
53+
EventBus.notify(event)
54+
55+
nil ->
56+
Logger.error("State not available for block", root: block_root)
57+
58+
{:error, :state_not_available}
59+
end
60+
end
61+
62+
def publish(:block = topic, %{root: block_root, slot: slot}) do
63+
data = %{
64+
block: BeaconApi.Utils.hex_encode(block_root),
65+
slot: Integer.to_string(slot),
66+
# TODO: this is a placeholder, we need to get if the execution is optimistic or not
67+
execution_optimistic: false
68+
}
69+
70+
chunk = %Chunk{event: topic, data: [Jason.encode!(data)]}
71+
event = %Event{id: UUID.uuid4(), data: chunk, topic: topic}
72+
73+
EventBus.notify(event)
74+
end
75+
76+
def publish(_topic, _event_data), do: {:error, :unsupported_topic}
77+
78+
@doc """
79+
Subscribe to a topic for stream events in an sse connection.
80+
"""
81+
@spec sse_subscribe(Plug.Conn.t(), topics()) :: Plug.Conn.t()
82+
def sse_subscribe(conn, topics) when is_list(topics),
83+
do: SSE.stream(conn, {topics, %Chunk{data: []}})
84+
end

lib/beacon_api/router.ex

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule BeaconApi.Router do
22
use BeaconApi, :router
33

44
pipeline :api do
5-
plug(:accepts, ["json"])
5+
plug(:accepts, ["json", "sse"])
66
plug(OpenApiSpex.Plug.PutApiSpec, module: BeaconApi.ApiSpec)
77
end
88

@@ -22,6 +22,10 @@ defmodule BeaconApi.Router do
2222
get("/identity", NodeController, :identity)
2323
get("/version", NodeController, :version)
2424
end
25+
26+
scope "/events" do
27+
get("/", EventsController, :subscribe)
28+
end
2529
end
2630

2731
# Ethereum API Version 2

lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
44
"""
55

66
require Logger
7+
alias BeaconApi.EventPubSub
78
alias LambdaEthereumConsensus.Execution.ExecutionChain
89
alias LambdaEthereumConsensus.ForkChoice.Handlers
910
alias LambdaEthereumConsensus.ForkChoice.Head
@@ -65,6 +66,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
6566
|> tap(fn store ->
6667
StoreDb.persist_store(store)
6768
Logger.info("[Fork choice] Added new block", slot: slot, root: block_root)
69+
EventPubSub.publish(:block, %{root: block_root, slot: slot})
6870

6971
Logger.info("[Fork choice] Recomputed head",
7072
slot: store.head_slot,

lib/lambda_ethereum_consensus/fork_choice/handlers.ex

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
44
"""
55
require Logger
66

7+
alias BeaconApi.EventPubSub
78
alias LambdaEthereumConsensus.Execution.ExecutionClient
89
alias LambdaEthereumConsensus.ForkChoice
910
alias LambdaEthereumConsensus.StateTransition
@@ -281,7 +282,11 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
281282
|> if_then_update(
282283
finalized_checkpoint.epoch > store.finalized_checkpoint.epoch,
283284
# Update finalized checkpoint
284-
&%Store{&1 | finalized_checkpoint: finalized_checkpoint}
285+
fn store ->
286+
EventPubSub.publish(:finalized_checkpoint, finalized_checkpoint)
287+
288+
%Store{store | finalized_checkpoint: finalized_checkpoint}
289+
end
285290
)
286291
end
287292

mix.exs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ defmodule LambdaEthereumConsensus.MixProject do
6464
{:sentry, "~> 10.8.0"},
6565
{:prom_ex, "~> 1.11.0"},
6666
{:flama, git: "https://github.com/lambdaclass/ht1223_tracer"},
67-
{:uuid, "~> 1.1"}
67+
{:uuid, "~> 1.1"},
68+
# TODO: (#1368) We might want to use phoenix_pubsub instead and do our implementation of SSE.
69+
{:sse, "~> 0.4"},
70+
{:event_bus, ">= 1.6.0"}
6871
]
6972
end
7073

mix.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"elixir_make": {:hex, :elixir_make, "0.8.4", "4960a03ce79081dee8fe119d80ad372c4e7badb84c493cc75983f9d3bc8bde0f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "6e7f1d619b5f61dfabd0a20aa268e575572b542ac31723293a4c1a567d5ef040"},
2020
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
2121
"escape": {:hex, :escape, "0.1.0", "548edab75e6e6938b1e199ef59cb8e504bcfd3bcf83471d4ae9a3c7a7a3c7d45", [:mix], [], "hexpm", "a5d8e92db4677155df54bc1306d401b5233875d570d474201db03cb3047491cd"},
22+
"event_bus": {:hex, :event_bus, "1.7.0", "29a36fc09e8c4463c82206b6a300fa1d61cf4baf9a7b4e7cf0c3efb99c73998e", [:mix], [], "hexpm", "e556470f49f53060a0696c4bad81341252685011afc69eda25032c8a3a86eb2e"},
2223
"ex2ms": {:hex, :ex2ms, "1.7.0", "45b9f523d0b777667ded60070d82d871a37e294f0b6c5b8eca86771f00f82ee1", [:mix], [], "hexpm", "2589eee51f81f1b1caa6d08c990b1ad409215fe6f64c73f73c67d36ed10be827"},
2324
"exleveldb": {:hex, :exleveldb, "0.14.0", "8e9353bbce38482d6971d254c6b98ceb50f3f179c94732b5d17db1be426fca18", [:mix], [{:eleveldb, "~> 2.2.20", [hex: :eleveldb, repo: "hexpm", optional: false]}], "hexpm", "803cd3b4c826a1e17e7e28f6afe224837a743b475e1a48336f186af3dd8636ad"},
2425
"expo": {:hex, :expo, "0.5.2", "beba786aab8e3c5431813d7a44b828e7b922bfa431d6bfbada0904535342efe2", [:mix], [], "hexpm", "8c9bfa06ca017c9cb4020fabe980bc7fdb1aaec059fd004c2ab3bff03b1c599c"},
@@ -68,6 +69,7 @@
6869
"sentry": {:hex, :sentry, "10.8.1", "aa45309785e1521416225adb16e0b4d8b957578804527f3c7babb6fefbc5e456", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_ownership, "~> 0.3.0 or ~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.6", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.20 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "495b3cdadad90ba72eef973aa3dec39b3b8b2a362fe87e2f4ef32133ac3b4097"},
6970
"snappyer": {:hex, :snappyer, "1.2.10", "023e9ae00e969b0997208b5de7d3b12bb46ec6bc5411e8dc53e7b3f435b8f0fd", [:rebar3], [], "hexpm", "f55bd9ed147e7163cb3acd1e431a7ff2c9e31ceacbb8308786094fb64551c284"},
7071
"sourceror": {:hex, :sourceror, "1.5.0", "3e65d5fbb1a8e2864ad6411262c8018fee73474f5789dda12285c82999253d5d", [:mix], [], "hexpm", "4a32b5d189d8453f73278c15712f8731b89e9211e50726b798214b303b51bfc7"},
72+
"sse": {:hex, :sse, "0.4.0", "f17affacbc4618bac07590eec7bff849aa27d1f71bb3d41da3fd3cb255d16910", [:mix], [{:event_bus, ">= 1.6.0", [hex: :event_bus, repo: "hexpm", optional: false]}, {:plug, ">= 1.4.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "2dfb9923725b9d5292763c3de9b7798713f5771522823e961a250204917d7efb"},
7173
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
7274
"statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"},
7375
"stream_data": {:hex, :stream_data, "1.1.2", "05499eaec0443349ff877aaabc6e194e82bda6799b9ce6aaa1aadac15a9fdb4d", [:mix], [], "hexpm", "129558d2c77cbc1eb2f4747acbbea79e181a5da51108457000020a906813a1a9"},

network_params.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
participants:
22
- el_type: geth
3+
el_image: ethereum/client-go:v1.14.12
34
cl_type: lighthouse
5+
cl_image: sigp/lighthouse:v5.3.0
46
count: 2
57
validator_count: 32
68
- el_type: geth
9+
el_image: ethereum/client-go:v1.14.12
710
cl_type: lambda
811
cl_image: lambda_ethereum_consensus:latest
912
use_separate_vc: false

test/unit/beacon_api/beacon_api_v1_test.exs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ defmodule Unit.BeaconApiTest.V1 do
158158
test "node identity" do
159159
alias LambdaEthereumConsensus.Libp2pPort
160160
alias LambdaEthereumConsensus.P2P.Metadata
161+
162+
patch(BeaconApi.EventPubSub, :publish, fn _, _ -> :ok end)
161163
patch(ForkChoice, :get_fork_version, fn -> ChainSpec.get("DENEB_FORK_VERSION") end)
162164

163165
start_link_supervised!({Libp2pPort, genesis_time: :os.system_time(:second), store: %Store{}})

test/unit/fork_choice/handlers_test.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
defmodule Unit.ForkChoice.HandlersTest do
22
use ExUnit.Case
33

4+
use Patch
5+
46
alias LambdaEthereumConsensus.ForkChoice.Handlers
57
alias LambdaEthereumConsensus.Utils.Diff
68
alias Types.Store
@@ -44,6 +46,8 @@ defmodule Unit.ForkChoice.HandlersTest do
4446
end
4547

4648
test "upgrades unrealized checkpoints" do
49+
patch(BeaconApi.EventPubSub, :publish, fn _, _ -> :ok end)
50+
4751
start_time = 0
4852
end_time = start_time + ChainSpec.get("SECONDS_PER_SLOT") * ChainSpec.get("SLOTS_PER_EPOCH")
4953

0 commit comments

Comments
 (0)