Skip to content

feat: add blocks node graph #1215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion lib/lambda_ethereum_consensus/beacon/pending_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
require Logger

alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.Libp2pPort
alias LambdaEthereumConsensus.P2P.BlockDownloader

alias LambdaEthereumConsensus.Metrics
alias LambdaEthereumConsensus.P2P.BlobDownloader
alias LambdaEthereumConsensus.Store.BlobDb
alias LambdaEthereumConsensus.Store.Blocks
Expand All @@ -19,6 +23,8 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
| {nil, :invalid | :download}
@type state :: nil

@download_retries 100

@doc """
If the block is not present, it will be stored as pending.

Expand All @@ -44,7 +50,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
Blocks.new_block_info(block_info)
process_block_and_check_children(block_info)
else
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/1, 30)
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/1, @download_retries)

block_info
|> BlockInfo.change_status(:download_blobs)
Expand Down Expand Up @@ -90,7 +96,22 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do

case Blocks.get_block_info(parent_root) do
nil ->
Logger.debug("[PendingBlocks] Add parent to download #{inspect(parent_root)}")
Blocks.add_block_to_download(parent_root)

BlockDownloader.request_blocks_by_root(
[parent_root],
fn result ->
process_downloaded_block(result)
end,
@download_retries
)

Metrics.block_relationship(
parent_root,
block_info.root
)

:download_pending

%BlockInfo{status: :invalid} ->
Expand Down Expand Up @@ -118,6 +139,16 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
end
end

defp process_downloaded_block({:ok, [block]}) do
Libp2pPort.add_block(block)
end

defp process_downloaded_block({:error, reason}) do
Logger.error("Error downloading block: #{inspect(reason)}")

# We might want to declare a block invalid here.
end

defp process_blobs({:ok, blobs}), do: add_blobs(blobs)

defp process_blobs({:error, reason}) do
Expand Down
5 changes: 4 additions & 1 deletion lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
alias LambdaEthereumConsensus.Execution.ExecutionChain
alias LambdaEthereumConsensus.ForkChoice.Handlers
alias LambdaEthereumConsensus.ForkChoice.Head
alias LambdaEthereumConsensus.Metrics
alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector
alias LambdaEthereumConsensus.StateTransition.Misc
alias LambdaEthereumConsensus.Store.BlobDb
Expand All @@ -27,14 +28,16 @@ defmodule LambdaEthereumConsensus.ForkChoice do
##########################

@spec init_store(Store.t(), Types.uint64()) :: :ok | :error
def init_store(%Store{head_slot: head_slot} = store, time) do
def init_store(%Store{head_slot: head_slot, head_root: head_root} = store, time) do
Logger.info("[Fork choice] Initialized store.", slot: head_slot)

store = Handlers.on_tick(store, time)

:telemetry.execute([:sync, :store], %{slot: Store.get_current_slot(store)})
:telemetry.execute([:sync, :on_block], %{slot: head_slot})

Metrics.block_status(head_root, head_slot, :transitioned)

persist_store(store)
end

Expand Down
58 changes: 53 additions & 5 deletions lib/lambda_ethereum_consensus/metrics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ defmodule LambdaEthereumConsensus.Metrics do
@moduledoc """
Basic telemetry metric generation to be used across the node.
"""
alias LambdaEthereumConsensus.Store.Blocks
require Logger

def tracer({:add_peer, %{}}) do
:telemetry.execute([:network, :pubsub_peers], %{}, %{result: "add"})
Expand Down Expand Up @@ -68,13 +70,59 @@ defmodule LambdaEthereumConsensus.Metrics do
end
end

def block_status(root, status) do
hex_root = root |> Base.encode16()
def block_status(root, slot, new_status) do
block_status_execute(root, new_status, slot, 1)
end

:telemetry.execute([:blocks, :status], %{}, %{
mainstat: status,
@doc """
- Sets the old status to '0' to deactivate it and sets the new status to '1' so that we can filter the Grafana table.
- If the old status is ':download', it will be deactivated with a 'nil' slot, since that's how it was activated.
"""
def block_status(root, slot, :download, new_status) do
block_status_execute(root, :download, nil, 0)
block_status_execute(root, new_status, slot, 1)
end

def block_status(root, slot, old_status, new_status) do
block_status_execute(root, old_status, slot, 0)
block_status_execute(root, new_status, slot, 1)
end

defp block_status_execute(root, status, slot, value) do
hex_root = Base.encode16(root)

Logger.debug(
"[Metrics] slot = #{inspect(slot)}, status = #{inspect(status)}, value = #{inspect(value)}"
)

:telemetry.execute([:blocks, :status], %{total: value}, %{
id: hex_root,
title: hex_root
mainstat: status,
color: map_color(status),
title: slot,
detail__root: hex_root
})
end

def block_relationship(nil, _), do: :ok

def block_relationship(parent_root, root) do
# If we try to add an edge to a non-existent node, it will crash.
if Blocks.get_block_info(parent_root) do
hex_parent_root = parent_root |> Base.encode16()
hex_root = root |> Base.encode16()

:telemetry.execute([:blocks, :relationship], %{total: 1}, %{
id: hex_root <> hex_parent_root,
source: hex_parent_root,
target: hex_root
})
end
end

defp map_color(:transitioned), do: "blue"
defp map_color(:pending), do: "green"
defp map_color(:download_blobs), do: "yellow"
defp map_color(:download), do: "orange"
defp map_color(:invalid), do: "red"
end
27 changes: 25 additions & 2 deletions lib/lambda_ethereum_consensus/store/blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,44 @@ defmodule LambdaEthereumConsensus.Store.Blocks do
# list. If it's not in the list, the operation is equivalent to only adding it in the correct
# one.
BlockDb.change_root_status(block_info.root, :download, block_info.status)

{slot, parent_root} =
if block_info.signed_block do
{block_info.signed_block.message.slot, block_info.signed_block.message.parent_root}
else
{nil, nil}
end

Metrics.block_status(
block_info.root,
slot,
:download,
block_info.status
)

Metrics.block_relationship(parent_root, block_info.root)
end

@doc """
Changes the status of a block in the db. Returns the block with the modified status.
"""
@spec change_status(BlockInfo.t(), BlockInfo.block_status()) :: BlockInfo.t()
def change_status(block_info, status) do
Metrics.block_status(block_info.root, status)

new_block_info = BlockInfo.change_status(block_info, status)
store_block_info(new_block_info)

old_status = block_info.status
BlockDb.change_root_status(block_info.root, old_status, status)

Metrics.block_status(
block_info.root,
block_info.signed_block.message.slot,
old_status,
status
)

Metrics.block_relationship(block_info.signed_block.message.parent_root, block_info.root)

new_block_info
end

Expand Down
5 changes: 4 additions & 1 deletion lib/lambda_ethereum_consensus/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ defmodule LambdaEthereumConsensus.Telemetry do
last_value("fork_choice.recompute_head.exception.duration",
unit: {:native, :millisecond}
),
counter("blocks.status.count", tags: [:title, :mainstat, :id])
last_value("blocks.status.total", tags: [:id, :mainstat, :color, :title, :detail__root]),
last_value("blocks.relationship.total",
tags: [:id, :source, :target]
)
]
end

Expand Down
85 changes: 84 additions & 1 deletion metrics/grafana/provisioning/dashboards/home.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,89 @@
"links": [],
"liveNow": false,
"panels": [
{
"datasource": {
"uid": "PBFA97CFB590B2093",
"type": "prometheus"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"id": 31,
"options": {
"nodes": {},
"edges": {}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "blocks_status_total",
"format": "table",
"fullMetaSearch": false,
"includeNullMetadata": true,
"instant": true,
"legendFormat": "__auto",
"range": false,
"refId": "A",
"useBackend": false,
"exemplar": false
},
{
"datasource": {
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "blocks_relationship_total",
"format": "table",
"fullMetaSearch": false,
"hide": false,
"includeNullMetadata": true,
"instant": true,
"legendFormat": "__auto",
"range": false,
"refId": "B",
"useBackend": false,
"exemplar": false
}
],
"title": "Blockchain View",
"transformations": [
{
"id": "filterByValue",
"options": {
"filters": [
{
"fieldName": "Value #A",
"config": {
"id": "equal",
"options": {
"value": 0
}
}
}
],
"type": "exclude",
"match": "any"
},
"filter": {
"id": "byRefId",
"options": "A"
},
"topic": "series"
}
],
"type": "nodeGraph"
},
{
"datasource": {
"type": "prometheus",
Expand Down Expand Up @@ -2698,4 +2781,4 @@
"uid": "90EXFQnIk",
"version": 3,
"weekStart": ""
}
}
Loading