diff --git a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex index fa0b8eaa7..cf3d423b0 100644 --- a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex @@ -7,6 +7,10 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do require Logger alias LambdaEthereumConsensus.ForkChoice + alias LambdaEthereumConsensus.Libp2pPort + alias LambdaEthereumConsensus.P2P.BlockDownloader + + alias LambdaEthereumConsensus.Metrics alias LambdaEthereumConsensus.P2P.BlobDownloader alias LambdaEthereumConsensus.Store.BlobDb alias LambdaEthereumConsensus.Store.Blocks @@ -19,6 +23,8 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do | {nil, :invalid | :download} @type state :: nil + @download_retries 100 + @doc """ If the block is not present, it will be stored as pending. @@ -44,7 +50,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do Blocks.new_block_info(block_info) process_block_and_check_children(block_info) else - BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/1, 30) + BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/1, @download_retries) block_info |> BlockInfo.change_status(:download_blobs) @@ -90,7 +96,22 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do case Blocks.get_block_info(parent_root) do nil -> + Logger.debug("[PendingBlocks] Add parent to download #{inspect(parent_root)}") Blocks.add_block_to_download(parent_root) + + BlockDownloader.request_blocks_by_root( + [parent_root], + fn result -> + process_downloaded_block(result) + end, + @download_retries + ) + + Metrics.block_relationship( + parent_root, + block_info.root + ) + :download_pending %BlockInfo{status: :invalid} -> @@ -118,6 +139,16 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do end end + defp process_downloaded_block({:ok, [block]}) do + Libp2pPort.add_block(block) + end + + defp process_downloaded_block({:error, reason}) do + Logger.error("Error downloading block: #{inspect(reason)}") + + # We might want to declare a block invalid here. + end + defp process_blobs({:ok, blobs}), do: add_blobs(blobs) defp process_blobs({:error, reason}) do diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 2dc812e5e..c0869f1f3 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -9,6 +9,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do alias LambdaEthereumConsensus.Execution.ExecutionChain alias LambdaEthereumConsensus.ForkChoice.Handlers alias LambdaEthereumConsensus.ForkChoice.Head + alias LambdaEthereumConsensus.Metrics alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Store.BlobDb @@ -27,7 +28,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do ########################## @spec init_store(Store.t(), Types.uint64()) :: :ok | :error - def init_store(%Store{head_slot: head_slot} = store, time) do + def init_store(%Store{head_slot: head_slot, head_root: head_root} = store, time) do Logger.info("[Fork choice] Initialized store.", slot: head_slot) store = Handlers.on_tick(store, time) @@ -35,6 +36,8 @@ defmodule LambdaEthereumConsensus.ForkChoice do :telemetry.execute([:sync, :store], %{slot: Store.get_current_slot(store)}) :telemetry.execute([:sync, :on_block], %{slot: head_slot}) + Metrics.block_status(head_root, head_slot, :transitioned) + persist_store(store) end diff --git a/lib/lambda_ethereum_consensus/metrics.ex b/lib/lambda_ethereum_consensus/metrics.ex index 6d7748959..109e75f3a 100644 --- a/lib/lambda_ethereum_consensus/metrics.ex +++ b/lib/lambda_ethereum_consensus/metrics.ex @@ -2,6 +2,8 @@ defmodule LambdaEthereumConsensus.Metrics do @moduledoc """ Basic telemetry metric generation to be used across the node. """ + alias LambdaEthereumConsensus.Store.Blocks + require Logger def tracer({:add_peer, %{}}) do :telemetry.execute([:network, :pubsub_peers], %{}, %{result: "add"}) @@ -68,13 +70,59 @@ defmodule LambdaEthereumConsensus.Metrics do end end - def block_status(root, status) do - hex_root = root |> Base.encode16() + def block_status(root, slot, new_status) do + block_status_execute(root, new_status, slot, 1) + end - :telemetry.execute([:blocks, :status], %{}, %{ - mainstat: status, + @doc """ + - Sets the old status to '0' to deactivate it and sets the new status to '1' so that we can filter the Grafana table. + - If the old status is ':download', it will be deactivated with a 'nil' slot, since that's how it was activated. + """ + def block_status(root, slot, :download, new_status) do + block_status_execute(root, :download, nil, 0) + block_status_execute(root, new_status, slot, 1) + end + + def block_status(root, slot, old_status, new_status) do + block_status_execute(root, old_status, slot, 0) + block_status_execute(root, new_status, slot, 1) + end + + defp block_status_execute(root, status, slot, value) do + hex_root = Base.encode16(root) + + Logger.debug( + "[Metrics] slot = #{inspect(slot)}, status = #{inspect(status)}, value = #{inspect(value)}" + ) + + :telemetry.execute([:blocks, :status], %{total: value}, %{ id: hex_root, - title: hex_root + mainstat: status, + color: map_color(status), + title: slot, + detail__root: hex_root }) end + + def block_relationship(nil, _), do: :ok + + def block_relationship(parent_root, root) do + # If we try to add an edge to a non-existent node, it will crash. + if Blocks.get_block_info(parent_root) do + hex_parent_root = parent_root |> Base.encode16() + hex_root = root |> Base.encode16() + + :telemetry.execute([:blocks, :relationship], %{total: 1}, %{ + id: hex_root <> hex_parent_root, + source: hex_parent_root, + target: hex_root + }) + end + end + + defp map_color(:transitioned), do: "blue" + defp map_color(:pending), do: "green" + defp map_color(:download_blobs), do: "yellow" + defp map_color(:download), do: "orange" + defp map_color(:invalid), do: "red" end diff --git a/lib/lambda_ethereum_consensus/store/blocks.ex b/lib/lambda_ethereum_consensus/store/blocks.ex index 53f91cd02..a348410b9 100644 --- a/lib/lambda_ethereum_consensus/store/blocks.ex +++ b/lib/lambda_ethereum_consensus/store/blocks.ex @@ -82,6 +82,22 @@ defmodule LambdaEthereumConsensus.Store.Blocks do # list. If it's not in the list, the operation is equivalent to only adding it in the correct # one. BlockDb.change_root_status(block_info.root, :download, block_info.status) + + {slot, parent_root} = + if block_info.signed_block do + {block_info.signed_block.message.slot, block_info.signed_block.message.parent_root} + else + {nil, nil} + end + + Metrics.block_status( + block_info.root, + slot, + :download, + block_info.status + ) + + Metrics.block_relationship(parent_root, block_info.root) end @doc """ @@ -89,14 +105,21 @@ defmodule LambdaEthereumConsensus.Store.Blocks do """ @spec change_status(BlockInfo.t(), BlockInfo.block_status()) :: BlockInfo.t() def change_status(block_info, status) do - Metrics.block_status(block_info.root, status) - new_block_info = BlockInfo.change_status(block_info, status) store_block_info(new_block_info) old_status = block_info.status BlockDb.change_root_status(block_info.root, old_status, status) + Metrics.block_status( + block_info.root, + block_info.signed_block.message.slot, + old_status, + status + ) + + Metrics.block_relationship(block_info.signed_block.message.parent_root, block_info.root) + new_block_info end diff --git a/lib/lambda_ethereum_consensus/telemetry.ex b/lib/lambda_ethereum_consensus/telemetry.ex index 3f0bbec40..af38fa691 100644 --- a/lib/lambda_ethereum_consensus/telemetry.ex +++ b/lib/lambda_ethereum_consensus/telemetry.ex @@ -148,7 +148,10 @@ defmodule LambdaEthereumConsensus.Telemetry do last_value("fork_choice.recompute_head.exception.duration", unit: {:native, :millisecond} ), - counter("blocks.status.count", tags: [:title, :mainstat, :id]) + last_value("blocks.status.total", tags: [:id, :mainstat, :color, :title, :detail__root]), + last_value("blocks.relationship.total", + tags: [:id, :source, :target] + ) ] end diff --git a/metrics/grafana/provisioning/dashboards/home.json b/metrics/grafana/provisioning/dashboards/home.json index 2c9985225..b4156322f 100644 --- a/metrics/grafana/provisioning/dashboards/home.json +++ b/metrics/grafana/provisioning/dashboards/home.json @@ -28,6 +28,89 @@ "links": [], "liveNow": false, "panels": [ + { + "datasource": { + "uid": "PBFA97CFB590B2093", + "type": "prometheus" + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 31, + "options": { + "nodes": {}, + "edges": {} + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "blocks_status_total", + "format": "table", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": true, + "legendFormat": "__auto", + "range": false, + "refId": "A", + "useBackend": false, + "exemplar": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "blocks_relationship_total", + "format": "table", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": true, + "legendFormat": "__auto", + "range": false, + "refId": "B", + "useBackend": false, + "exemplar": false + } + ], + "title": "Blockchain View", + "transformations": [ + { + "id": "filterByValue", + "options": { + "filters": [ + { + "fieldName": "Value #A", + "config": { + "id": "equal", + "options": { + "value": 0 + } + } + } + ], + "type": "exclude", + "match": "any" + }, + "filter": { + "id": "byRefId", + "options": "A" + }, + "topic": "series" + } + ], + "type": "nodeGraph" + }, { "datasource": { "type": "prometheus", @@ -2698,4 +2781,4 @@ "uid": "90EXFQnIk", "version": 3, "weekStart": "" -} +} \ No newline at end of file