Skip to content

feat: sse implementation for events beacon API #1365

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,13 @@ if System.get_env("RUSTLER_SKIP_COMPILE") do
config :lambda_ethereum_consensus, Snappy, skip_compilation?: true
config :lambda_ethereum_consensus, Ssz, skip_compilation?: true
end

config :sse,
keep_alive: {:system, "SSE_KEEP_ALIVE_IN_MS", 55000}

config :event_bus,
topics: [:finalized_checkpoint, :block]

config :mime, :types, %{
"text/event-stream" => ["sse"]
}
4 changes: 3 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ config :lambda_ethereum_consensus, EngineApi,
# Beacon API
config :lambda_ethereum_consensus, BeaconApi.Endpoint,
server: enable_beacon_api,
http: [port: beacon_api_port || 4000],
# We use an infinit idle timeout to avoid closing sse connections, if needed we can
# create a separate endpoint for them.
http: [port: beacon_api_port || 4000, protocol_options: [idle_timeout: :infinity]],
url: [host: "localhost"],
render_errors: [
formats: [json: BeaconApi.ErrorJSON],
Expand Down
55 changes: 55 additions & 0 deletions lib/beacon_api/controllers/v1/events_controller.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule BeaconApi.V1.EventsController do
use BeaconApi, :controller

alias BeaconApi.ApiSpec
alias BeaconApi.EventPubSub

require Logger

def open_api_operation(:subscribe),
do: ApiSpec.spec().paths["/eth/v1/events"].get

@spec subscribe(Plug.Conn.t(), any) :: Plug.Conn.t()
def subscribe(conn, %{"topics" => topics}) do
case parse_topics(topics) do
{:ok, topics} ->
EventPubSub.sse_subscribe(conn, topics)

{:error, error} ->
send_chunked_error(conn, error)
end
end

def subscribe(conn, _params) do
error =
Jason.encode!(%{
code: 400,
message: "Missing field topics"
})

send_chunked_error(conn, error)
end

defp parse_topics(topics_string) do
# topics is a string list in the form of: "finalized_checkpoint, block" we need to split it
topics = topics_string |> String.split(",") |> Enum.map(&String.trim/1)
not_implemented_topics = Enum.reject(topics, &EventPubSub.implemented_topic?/1)

if Enum.empty?(not_implemented_topics) do
{:ok, topics}
else
{:error,
"Invalid topic/s #{inspect(not_implemented_topics)}. For now, only #{inspect(EventPubSub.implemented_topics())} are supported."}
end
end

defp send_chunked_error(conn, error) do
conn
|> Plug.Conn.send_chunked(400)
|> Plug.Conn.chunk(error)
|> case do
{:ok, conn} -> Plug.Conn.halt(conn)
{:error, _reason} -> Plug.Conn.halt(conn)
end
end
end
84 changes: 84 additions & 0 deletions lib/beacon_api/event_pubsub.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
defmodule BeaconApi.EventPubSub do
@moduledoc """
Event listener for aggregating and sending events for SSE subscribers.

TODO: (#1368) This depends on `event_bus` and `sse`, but it could be easily switched later:
- `event_bus` we could move to phoenix pubsub
- `sse` we could just implement it ourselves using Plug.Conn.chunk and Plug.Conn.send_chunked

The idea is to have a single place to publish events, and then a method for a connection to subscribe to them.
"""

require Logger
alias EventBus.Model.Event
alias LambdaEthereumConsensus.Store
alias SSE.Chunk
alias Types.StateInfo

@type topic() :: String.t() | atom()
@type topics() :: list(topic())
@type event_data() :: any()

# This is also dependant on the already needed event_bus compile time config,
# we maintain them as strings for convienience
@implemented_topics Application.compile_env!(:event_bus, :topics) |> Enum.map(&Atom.to_string/1)

@spec implemented_topics() :: topics()
def implemented_topics(), do: @implemented_topics

@spec implemented_topic?(topic()) :: boolean()
def implemented_topic?(topic) when is_atom(topic), do: implemented_topic?(Atom.to_string(topic))
def implemented_topic?(topic) when is_binary(topic), do: topic in @implemented_topics

@doc """
Publish an event to the event bus.

TODO: We might want a noop if there are no subscribers for a topic.
"""
@spec publish(topic(), event_data()) :: :ok | {:error, atom()}
def publish(:finalized_checkpoint = topic, %{root: block_root, epoch: epoch}) do
case Store.BlockStates.get_state_info(block_root) do
%StateInfo{root: state_root} ->
data = %{
block: BeaconApi.Utils.hex_encode(block_root),
state: BeaconApi.Utils.hex_encode(state_root),
epoch: Integer.to_string(epoch),
# TODO: this is a placeholder, we need to get if the execution is optimistic or not
execution_optimistic: false
}

chunk = %Chunk{event: topic, data: [Jason.encode!(data)]}
event = %Event{id: UUID.uuid4(), data: chunk, topic: topic}

EventBus.notify(event)

nil ->
Logger.error("State not available for block", root: block_root)

{:error, :state_not_available}
end
end

def publish(:block = topic, %{root: block_root, slot: slot}) do
data = %{
block: BeaconApi.Utils.hex_encode(block_root),
slot: Integer.to_string(slot),
# TODO: this is a placeholder, we need to get if the execution is optimistic or not
execution_optimistic: false
}

chunk = %Chunk{event: topic, data: [Jason.encode!(data)]}
event = %Event{id: UUID.uuid4(), data: chunk, topic: topic}

EventBus.notify(event)
end

def publish(_topic, _event_data), do: {:error, :unsupported_topic}

@doc """
Subscribe to a topic for stream events in an sse connection.
"""
@spec sse_subscribe(Plug.Conn.t(), topics()) :: Plug.Conn.t()
def sse_subscribe(conn, topics) when is_list(topics),
do: SSE.stream(conn, {topics, %Chunk{data: []}})
end
6 changes: 5 additions & 1 deletion lib/beacon_api/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule BeaconApi.Router do
use BeaconApi, :router

pipeline :api do
plug(:accepts, ["json"])
plug(:accepts, ["json", "sse"])
plug(OpenApiSpex.Plug.PutApiSpec, module: BeaconApi.ApiSpec)
end

Expand All @@ -22,6 +22,10 @@ defmodule BeaconApi.Router do
get("/identity", NodeController, :identity)
get("/version", NodeController, :version)
end

scope "/events" do
get("/", EventsController, :subscribe)
end
end

# Ethereum API Version 2
Expand Down
2 changes: 2 additions & 0 deletions lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
"""

require Logger
alias BeaconApi.EventPubSub
alias LambdaEthereumConsensus.Execution.ExecutionChain
alias LambdaEthereumConsensus.ForkChoice.Handlers
alias LambdaEthereumConsensus.ForkChoice.Head
Expand Down Expand Up @@ -65,6 +66,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
|> tap(fn store ->
StoreDb.persist_store(store)
Logger.info("[Fork choice] Added new block", slot: slot, root: block_root)
EventPubSub.publish(:block, %{root: block_root, slot: slot})

Logger.info("[Fork choice] Recomputed head",
slot: store.head_slot,
Expand Down
7 changes: 6 additions & 1 deletion lib/lambda_ethereum_consensus/fork_choice/handlers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
"""
require Logger

alias BeaconApi.EventPubSub
alias LambdaEthereumConsensus.Execution.ExecutionClient
alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.StateTransition
Expand Down Expand Up @@ -281,7 +282,11 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
|> if_then_update(
finalized_checkpoint.epoch > store.finalized_checkpoint.epoch,
# Update finalized checkpoint
&%Store{&1 | finalized_checkpoint: finalized_checkpoint}
fn store ->
EventPubSub.publish(:finalized_checkpoint, finalized_checkpoint)

%Store{store | finalized_checkpoint: finalized_checkpoint}
end
)
end

Expand Down
5 changes: 4 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ defmodule LambdaEthereumConsensus.MixProject do
{:sentry, "~> 10.8.0"},
{:prom_ex, "~> 1.11.0"},
{:flama, git: "https://github.com/lambdaclass/ht1223_tracer"},
{:uuid, "~> 1.1"}
{:uuid, "~> 1.1"},
# TODO: (#1368) We might want to use phoenix_pubsub instead and do our implementation of SSE.
{:sse, "~> 0.4"},
{:event_bus, ">= 1.6.0"}
]
end

Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"elixir_make": {:hex, :elixir_make, "0.8.4", "4960a03ce79081dee8fe119d80ad372c4e7badb84c493cc75983f9d3bc8bde0f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "6e7f1d619b5f61dfabd0a20aa268e575572b542ac31723293a4c1a567d5ef040"},
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"escape": {:hex, :escape, "0.1.0", "548edab75e6e6938b1e199ef59cb8e504bcfd3bcf83471d4ae9a3c7a7a3c7d45", [:mix], [], "hexpm", "a5d8e92db4677155df54bc1306d401b5233875d570d474201db03cb3047491cd"},
"event_bus": {:hex, :event_bus, "1.7.0", "29a36fc09e8c4463c82206b6a300fa1d61cf4baf9a7b4e7cf0c3efb99c73998e", [:mix], [], "hexpm", "e556470f49f53060a0696c4bad81341252685011afc69eda25032c8a3a86eb2e"},
"ex2ms": {:hex, :ex2ms, "1.7.0", "45b9f523d0b777667ded60070d82d871a37e294f0b6c5b8eca86771f00f82ee1", [:mix], [], "hexpm", "2589eee51f81f1b1caa6d08c990b1ad409215fe6f64c73f73c67d36ed10be827"},
"exleveldb": {:hex, :exleveldb, "0.14.0", "8e9353bbce38482d6971d254c6b98ceb50f3f179c94732b5d17db1be426fca18", [:mix], [{:eleveldb, "~> 2.2.20", [hex: :eleveldb, repo: "hexpm", optional: false]}], "hexpm", "803cd3b4c826a1e17e7e28f6afe224837a743b475e1a48336f186af3dd8636ad"},
"expo": {:hex, :expo, "0.5.2", "beba786aab8e3c5431813d7a44b828e7b922bfa431d6bfbada0904535342efe2", [:mix], [], "hexpm", "8c9bfa06ca017c9cb4020fabe980bc7fdb1aaec059fd004c2ab3bff03b1c599c"},
Expand Down Expand Up @@ -68,6 +69,7 @@
"sentry": {:hex, :sentry, "10.8.1", "aa45309785e1521416225adb16e0b4d8b957578804527f3c7babb6fefbc5e456", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_ownership, "~> 0.3.0 or ~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.6", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.20 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "495b3cdadad90ba72eef973aa3dec39b3b8b2a362fe87e2f4ef32133ac3b4097"},
"snappyer": {:hex, :snappyer, "1.2.10", "023e9ae00e969b0997208b5de7d3b12bb46ec6bc5411e8dc53e7b3f435b8f0fd", [:rebar3], [], "hexpm", "f55bd9ed147e7163cb3acd1e431a7ff2c9e31ceacbb8308786094fb64551c284"},
"sourceror": {:hex, :sourceror, "1.5.0", "3e65d5fbb1a8e2864ad6411262c8018fee73474f5789dda12285c82999253d5d", [:mix], [], "hexpm", "4a32b5d189d8453f73278c15712f8731b89e9211e50726b798214b303b51bfc7"},
"sse": {:hex, :sse, "0.4.0", "f17affacbc4618bac07590eec7bff849aa27d1f71bb3d41da3fd3cb255d16910", [:mix], [{:event_bus, ">= 1.6.0", [hex: :event_bus, repo: "hexpm", optional: false]}, {:plug, ">= 1.4.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "2dfb9923725b9d5292763c3de9b7798713f5771522823e961a250204917d7efb"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
"statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"},
"stream_data": {:hex, :stream_data, "1.1.2", "05499eaec0443349ff877aaabc6e194e82bda6799b9ce6aaa1aadac15a9fdb4d", [:mix], [], "hexpm", "129558d2c77cbc1eb2f4747acbbea79e181a5da51108457000020a906813a1a9"},
Expand Down
3 changes: 3 additions & 0 deletions network_params.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
participants:
- el_type: geth
el_image: ethereum/client-go:v1.14.12
cl_type: lighthouse
cl_image: sigp/lighthouse:v5.3.0
count: 2
validator_count: 32
- el_type: geth
el_image: ethereum/client-go:v1.14.12
cl_type: lambda
cl_image: lambda_ethereum_consensus:latest
use_separate_vc: false
Expand Down
2 changes: 2 additions & 0 deletions test/unit/beacon_api/beacon_api_v1_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ defmodule Unit.BeaconApiTest.V1 do
test "node identity" do
alias LambdaEthereumConsensus.Libp2pPort
alias LambdaEthereumConsensus.P2P.Metadata

patch(BeaconApi.EventPubSub, :publish, fn _, _ -> :ok end)
patch(ForkChoice, :get_fork_version, fn -> ChainSpec.get("DENEB_FORK_VERSION") end)

start_link_supervised!({Libp2pPort, genesis_time: :os.system_time(:second), store: %Store{}})
Expand Down
4 changes: 4 additions & 0 deletions test/unit/fork_choice/handlers_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Unit.ForkChoice.HandlersTest do
use ExUnit.Case

use Patch

alias LambdaEthereumConsensus.ForkChoice.Handlers
alias LambdaEthereumConsensus.Utils.Diff
alias Types.Store
Expand Down Expand Up @@ -44,6 +46,8 @@ defmodule Unit.ForkChoice.HandlersTest do
end

test "upgrades unrealized checkpoints" do
patch(BeaconApi.EventPubSub, :publish, fn _, _ -> :ok end)

start_time = 0
end_time = start_time + ChainSpec.get("SECONDS_PER_SLOT") * ChainSpec.get("SLOTS_PER_EPOCH")

Expand Down
Loading