From d2d852923e5d8b17aa17ffeb29c694b4aad813d8 Mon Sep 17 00:00:00 2001 From: Swanny Date: Mon, 28 Oct 2024 11:51:53 -0400 Subject: [PATCH 1/4] feat: ctrl c ctrl v --- .gitignore | 6 +- Cargo.toml | 48 ++++++++ bin/builder.rs | 59 ++++++++++ src/config.rs | 211 +++++++++++++++++++++++++++++++++++ src/lib.rs | 4 + src/sequencer.openapi.yml | 143 ++++++++++++++++++++++++ src/service.rs | 183 +++++++++++++++++++++++++++++++ src/signer.rs | 110 +++++++++++++++++++ src/tasks/block.rs | 139 +++++++++++++++++++++++ src/tasks/mod.rs | 3 + src/tasks/submit.rs | 225 ++++++++++++++++++++++++++++++++++++++ src/tasks/tx_poller.rs | 117 ++++++++++++++++++++ tests/tx_poller_test.rs | 91 +++++++++++++++ 13 files changed, 1338 insertions(+), 1 deletion(-) create mode 100644 Cargo.toml create mode 100644 bin/builder.rs create mode 100644 src/config.rs create mode 100644 src/lib.rs create mode 100644 src/sequencer.openapi.yml create mode 100644 src/service.rs create mode 100644 src/signer.rs create mode 100644 src/tasks/block.rs create mode 100644 src/tasks/mod.rs create mode 100644 src/tasks/submit.rs create mode 100644 src/tasks/tx_poller.rs create mode 100644 tests/tx_poller_test.rs diff --git a/.gitignore b/.gitignore index d01bd1a..efe3eb1 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,8 @@ Cargo.lock # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ \ No newline at end of file +#.idea/ + +# Added by cargo + +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..bc2b433 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,48 @@ +[package] +name = "zenith-builder-example" +version = "0.1.0" +description = "Zenith Builder Example" + +edition = "2021" +rust-version = "1.81" +authors = ["init4"] +license = "Apache-2.0 OR MIT" +homepage = "https://github.com/init4tt/zenith" +repository = "https://github.com/init4tt/zenith" + +[lib] +name = "builder" + +[[bin]] +name = "zenith-builder-example" +path = "bin/builder.rs" + +[dependencies] +zenith-types = { git = "https://github.com/init4tech/zenith-rs", branch = "main" } + +alloy-primitives = { version = "=0.8.8", features = ["serde", "tiny-keccak"] } +alloy-sol-types = { version = "=0.8.8", features = ["json"] } +alloy-rlp = { version = "0.3.4" } + +alloy = { version = "0.5.4", features = ["full", "json-rpc", "signer-aws"] } + +aws-config = "1.1.7" +aws-sdk-kms = "1.15.0" + +hex = { package = "const-hex", version = "1", default-features = false, features = [ + "alloc", +] } +serde = { version = "1.0.197", features = ["derive"] } +tracing = "0.1.40" + +axum = "0.7.5" +eyre = "0.6.12" +openssl = { version = "0.10", features = ["vendored"] } +reqwest = { version = "0.11.24", features = ["blocking", "json"] } +ruint = "1.12.1" +serde_json = "1.0" +thiserror = "1.0.58" +tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] } +tracing-subscriber = "0.3.18" +async-trait = "0.1.80" +oauth2 = "4.4.2" diff --git a/bin/builder.rs b/bin/builder.rs new file mode 100644 index 0000000..daf8294 --- /dev/null +++ b/bin/builder.rs @@ -0,0 +1,59 @@ +#![allow(dead_code)] + +use builder::config::BuilderConfig; +use builder::service::serve_builder_with_span; +use builder::tasks::tx_poller::TxPoller; + +use tokio::select; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + tracing_subscriber::fmt::try_init().unwrap(); + let span = tracing::info_span!("zenith-builder"); + + let config = BuilderConfig::load_from_env()?; + let provider = config.connect_provider().await?; + + tracing::debug!(rpc_url = config.host_rpc_url.as_ref(), "instantiated provider"); + + let sequencer_signer = config.connect_sequencer_signer().await?; + let zenith = config.connect_zenith(provider.clone()); + + let port = config.builder_port; + + let tx_poller = TxPoller::new(&config); + let builder = builder::tasks::block::BlockBuilder::new(&config); + + let submit = builder::tasks::submit::SubmitTask { + provider, + zenith, + client: reqwest::Client::new(), + sequencer_signer, + config, + }; + + let (submit_channel, submit_jh) = submit.spawn(); + let (build_channel, build_jh) = builder.spawn(submit_channel); + let tx_poller_jh = tx_poller.spawn(build_channel.clone()); + + let server = serve_builder_with_span(build_channel, ([0, 0, 0, 0], port), span); + + select! { + _ = submit_jh => { + tracing::info!("submit finished"); + }, + _ = build_jh => { + tracing::info!("build finished"); + } + _ = server => { + tracing::info!("server finished"); + } + _ = tx_poller_jh => { + tracing::info!("tx_poller finished"); + } + } + + tracing::info!("shutting down"); + + Ok(()) +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..5edb398 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,211 @@ +use crate::signer::{LocalOrAws, SignerError}; +use alloy::network::{Ethereum, EthereumWallet}; +use alloy::providers::fillers::BlobGasFiller; +use alloy::providers::{ + fillers::{ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller, WalletFiller}, + Identity, ProviderBuilder, RootProvider, +}; +use alloy::transports::BoxTransport; +use alloy_primitives::Address; +use std::{borrow::Cow, env, num, str::FromStr}; +use zenith_types::Zenith; + +// Keys for .env variables that need to be set to configure the builder. +const HOST_CHAIN_ID: &str = "HOST_CHAIN_ID"; +const RU_CHAIN_ID: &str = "RU_CHAIN_ID"; +const HOST_RPC_URL: &str = "HOST_RPC_URL"; +const ZENITH_ADDRESS: &str = "ZENITH_ADDRESS"; +const QUINCEY_URL: &str = "QUINCEY_URL"; +const BUILDER_PORT: &str = "BUILDER_PORT"; +const SEQUENCER_KEY: &str = "SEQUENCER_KEY"; // empty (to use Quincey) OR AWS key ID (to use AWS signer) OR raw private key (to use local signer) +const BUILDER_KEY: &str = "BUILDER_KEY"; // AWS key ID (to use AWS signer) OR raw private key (to use local signer) +const INCOMING_TRANSACTIONS_BUFFER: &str = "INCOMING_TRANSACTIONS_BUFFER"; +const BLOCK_CONFIRMATION_BUFFER: &str = "BLOCK_CONFIRMATION_BUFFER"; +const BUILDER_REWARDS_ADDRESS: &str = "BUILDER_REWARDS_ADDRESS"; +const ROLLUP_BLOCK_GAS_LIMIT: &str = "ROLLUP_BLOCK_GAS_LIMIT"; +const TX_POOL_URL: &str = "TX_POOL_URL"; +const TX_POOL_POLL_INTERVAL: &str = "TX_POOL_POLL_INTERVAL"; +const TX_POOL_CACHE_DURATION: &str = "TX_POOL_CACHE_DURATION"; +const OAUTH_CLIENT_ID: &str = "OAUTH_CLIENT_ID"; +const OAUTH_CLIENT_SECRET: &str = "OAUTH_CLIENT_SECRET"; +const OAUTH_AUTHENTICATE_URL: &str = "OAUTH_AUTHENTICATE_URL"; +const OAUTH_TOKEN_URL: &str = "OAUTH_TOKEN_URL"; +const OAUTH_AUDIENCE: &str = "OAUTH_AUDIENCE"; + +/// Configuration for a builder running a specific rollup on a specific host +/// chain. +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub struct BuilderConfig { + /// The chain ID of the host chain + pub host_chain_id: u64, + /// The chain ID of the host chain + pub ru_chain_id: u64, + /// URL for Host RPC node. + pub host_rpc_url: Cow<'static, str>, + /// address of the Zenith contract on Host. + pub zenith_address: Address, + /// URL for remote Quincey Sequencer server to sign blocks. + /// Disregarded if a sequencer_signer is configured. + pub quincey_url: Cow<'static, str>, + /// Port for the Builder server. + pub builder_port: u16, + /// Key to access Sequencer Wallet - AWS Key ID _OR_ local private key. + /// Set IFF using local Sequencer signing instead of remote Quincey signing. + pub sequencer_key: Option, + /// Key to access Builder transaction submission wallet - AWS Key ID _OR_ local private key. + pub builder_key: String, + /// Buffer in seconds that Builder will wait & accept incoming transactions before bundling them and submitting as a block. + pub incoming_transactions_buffer: u64, + /// Buffer in seconds in which the `submitBlock` transaction must confirm on the Host chain. + pub block_confirmation_buffer: u64, + /// Address on Rollup to which Builder will receive user transaction fees. + pub builder_rewards_address: Address, + /// Gas limit for RU block. + /// NOTE: a "smart" builder would determine this programmatically by simulating the block. + pub rollup_block_gas_limit: u64, + /// URL of the tx pool to poll for incoming transactions. + pub tx_pool_url: Cow<'static, str>, + //// Interval in seconds to poll the tx-pool for new transactions. + pub tx_pool_poll_interval: u64, + /// Duration in seconds transactions can live in the tx-pool cache. + pub tx_pool_cache_duration: u64, + /// OAuth client ID for the builder. + pub oauth_client_id: String, + /// OAuth client secret for the builder. + pub oauth_client_secret: String, + /// OAuth authenticate URL for the builder for performing OAuth logins. + pub oauth_authenticate_url: String, + /// OAuth token URL for the builder to get an OAuth2 access token + pub oauth_token_url: String, + /// OAuth audience for the builder. + pub oauth_audience: String, +} + +#[derive(Debug, thiserror::Error)] +pub enum ConfigError { + /// Error loading from environment variable + #[error("missing or non-unicode environment variable: {0}")] + Var(String), + /// Error parsing environment variable + #[error("failed to parse environment variable: {0}")] + Parse(#[from] num::ParseIntError), + /// Error parsing boolean environment variable + #[error("failed to parse boolean environment variable")] + ParseBool, + /// Error parsing hex from environment variable + #[error("failed to parse hex: {0}")] + Hex(#[from] hex::FromHexError), + /// Error connecting to the provider + #[error("failed to connect to provider: {0}")] + Provider(#[from] alloy::transports::TransportError), + /// Error connecting to the signer + #[error("failed to connect to signer: {0}")] + Signer(#[from] SignerError), +} + +impl ConfigError { + /// Missing or non-unicode env var. + pub fn missing(s: &str) -> Self { + ConfigError::Var(s.to_string()) + } +} + +/// Provider type used by this transaction +pub type Provider = FillProvider< + JoinFill< + JoinFill< + Identity, + JoinFill>>, + >, + WalletFiller, + >, + RootProvider, + BoxTransport, + Ethereum, +>; + +pub type ZenithInstance = Zenith::ZenithInstance; + +impl BuilderConfig { + /// Load the builder configuration from environment variables. + pub fn load_from_env() -> Result { + Ok(BuilderConfig { + host_chain_id: load_u64(HOST_CHAIN_ID)?, + ru_chain_id: load_u64(RU_CHAIN_ID)?, + host_rpc_url: load_url(HOST_RPC_URL)?, + zenith_address: load_address(ZENITH_ADDRESS)?, + quincey_url: load_url(QUINCEY_URL)?, + builder_port: load_u16(BUILDER_PORT)?, + sequencer_key: load_string_option(SEQUENCER_KEY), + builder_key: load_string(BUILDER_KEY)?, + incoming_transactions_buffer: load_u64(INCOMING_TRANSACTIONS_BUFFER)?, + block_confirmation_buffer: load_u64(BLOCK_CONFIRMATION_BUFFER)?, + builder_rewards_address: load_address(BUILDER_REWARDS_ADDRESS)?, + rollup_block_gas_limit: load_u64(ROLLUP_BLOCK_GAS_LIMIT)?, + tx_pool_url: load_url(TX_POOL_URL)?, + tx_pool_poll_interval: load_u64(TX_POOL_POLL_INTERVAL)?, + tx_pool_cache_duration: load_u64(TX_POOL_CACHE_DURATION)?, + oauth_client_id: load_string(OAUTH_CLIENT_ID)?, + oauth_client_secret: load_string(OAUTH_CLIENT_SECRET)?, + oauth_authenticate_url: load_string(OAUTH_AUTHENTICATE_URL)?, + oauth_token_url: load_string(OAUTH_TOKEN_URL)?, + oauth_audience: load_string(OAUTH_AUDIENCE)?, + }) + } + + pub async fn connect_builder_signer(&self) -> Result { + LocalOrAws::load(&self.builder_key, Some(self.host_chain_id)).await.map_err(Into::into) + } + + pub async fn connect_sequencer_signer(&self) -> Result, ConfigError> { + match &self.sequencer_key { + Some(sequencer_key) => LocalOrAws::load(sequencer_key, Some(self.host_chain_id)) + .await + .map_err(Into::into) + .map(Some), + None => Ok(None), + } + } + + /// Connect to the provider using the configuration. + pub async fn connect_provider(&self) -> Result { + let builder_signer = self.connect_builder_signer().await?; + ProviderBuilder::new() + .with_recommended_fillers() + .wallet(EthereumWallet::from(builder_signer)) + .on_builtin(&self.host_rpc_url) + .await + .map_err(Into::into) + } + + pub fn connect_zenith(&self, provider: Provider) -> ZenithInstance { + Zenith::new(self.zenith_address, provider) + } +} + +fn load_string(key: &str) -> Result { + env::var(key).map_err(|_| ConfigError::missing(key)) +} + +fn load_string_option(key: &str) -> Option { + load_string(key).ok() +} + +fn load_u64(key: &str) -> Result { + let val = load_string(key)?; + val.parse::().map_err(Into::into) +} + +fn load_u16(key: &str) -> Result { + let val = load_string(key)?; + val.parse::().map_err(Into::into) +} + +fn load_url(key: &str) -> Result, ConfigError> { + load_string(key).map_err(Into::into).map(Into::into) +} + +fn load_address(key: &str) -> Result { + let address = load_string(key)?; + Address::from_str(&address).map_err(Into::into) +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..7f35f2c --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,4 @@ +pub mod config; +pub mod service; +pub mod signer; +pub mod tasks; diff --git a/src/sequencer.openapi.yml b/src/sequencer.openapi.yml new file mode 100644 index 0000000..a37cc88 --- /dev/null +++ b/src/sequencer.openapi.yml @@ -0,0 +1,143 @@ +openapi: 3.0.3 +info: + title: Signet Sequencer API + version: 1.0.0 + description: API to obtain a signature for a new Signet block. +servers: +- url: https://sequencer.api.init4.network +paths: + /signBlock: + post: + summary: Obtain a signature for a new Signet block + description: Obtain a signature for a new Signet block. + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + host_block_number: + type: string + description: The block number of the host formatted as a 0x-prefixed minimal hex string. + host_chain_id: + type: string + description: The chain ID of the host formatted as a 0x-prefixed minimal hex string. + ru_chain_id: + type: string + description: The chain ID of the rollup formatted as a 0x-prefixed minimal hex string. + gas_limit: + type: string + description: The gas limit of the rollup block formatted as a 0x-prefixed minimal hex string. + ru_reward_address: + type: string + description: The reward address on the rollup for the builder formatted as a 0x-prefixed minimal hex string. + contents: + type: string + description: keccak256 hash of rlp-encoded transactions in the block formatted as a 0x-prefixed minimal hex string. + required: + - host_block_number + - host_chain_id + - ru_chain_id + - gas_limit + - ru_reward_address + - contents + responses: + '200': + description: A JSON SignResponse + content: + application/json: + schema: + type: object + properties: + req: + type: object + description: The inputted SignRequest. + properties: + host_block_number: + type: string + description: The block number of the host formatted as a 0x-prefixed minimal hex string. + host_chain_id: + type: string + description: The chain ID of the host formatted as a 0x-prefixed minimal hex string. + ru_chain_id: + type: string + description: The chain ID of the rollup formatted as a 0x-prefixed minimal hex string. + gas_limit: + type: string + description: The gas limit of the rollup block formatted as a 0x-prefixed minimal hex string. + ru_reward_address: + type: string + description: The reward address on the rollup for the builder formatted as a 0x-prefixed minimal hex string. + contents: + type: string + description: keccak256 hash of rlp-encoded transactions in the block formatted as a 0x-prefixed minimal hex string. + sig: + type: object + description: The signature over the SignRequest. + properties: + yParity: + type: boolean + description: The parity of the y value of the signature. + r: + type: string + description: Signature R field. + s: + type: string + description: Signature S field. +components: + schemas: + SignRequest: + type: object + properties: + host_block_number: + type: string + description: The block number of the host. + host_chain_id: + type: string + description: The chain ID of the host. + ru_chain_id: + type: string + description: The chain ID of the rollup. + gas_limit: + type: string + description: The gas limit of the rollup block. + ru_reward_address: + type: string + description: The reward address on the rollup for the builder. + contents: + type: string + description: keccak256 hash of rlp-encoded transactions in the block. + required: + - host_block_number + - host_chain_id + - ru_chain_id + - gas_limit + - ru_reward_address + - contents + Signature: + type: object + properties: + yParity: + type: boolean + description: The parity of the y value of the signature. + r: + type: string + description: Signature R field. + s: + type: string + description: Signature S field. + required: + - yParity + - r + - s + SignResponse: + type: object + properties: + req: + $ref: '#/components/schemas/SignRequest' + sig: + $ref: '#/components/schemas/Signature' + required: + - req + - sig diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000..95e0fb3 --- /dev/null +++ b/src/service.rs @@ -0,0 +1,183 @@ +use std::{fmt::Debug, net::SocketAddr}; + +use alloy::consensus::TxEnvelope; +use alloy::network::eip2718::Decodable2718; +use alloy::rpc::json_rpc::{ErrorPayload, Id}; +use alloy_primitives::B256; +use axum::{ + extract::State, + http::StatusCode, + response::{IntoResponse, Response}, + routing::{get, post}, + Json, Router, +}; +use serde_json::Value; +use tokio::sync::mpsc; +use tracing::{Instrument, Span}; + +/// App result +pub type AppResult = Result; + +/// App error. This is a wrapper around eyre::Report that also includes an HTTP +/// status code. It implements [`IntoResponse`] so that it can be returned as an +/// error type from [`axum::handler::Handler`]s. +#[derive(Debug)] +pub struct AppError { + code: StatusCode, + eyre: eyre::Report, +} + +impl AppError { + /// Instantiate a new error with the bad request status code. + pub fn bad_req(e: E) -> Self { + Self { code: StatusCode::BAD_REQUEST, eyre: e.into() } + } + + /// Instantiate a new error with the bad request status code and an error + /// string. + pub fn bad_req_str(e: &str) -> Self { + Self { code: StatusCode::BAD_REQUEST, eyre: eyre::eyre!(e.to_owned()) } + } + + /// Instantiate a new error with the internal server error status code. + pub fn server_err(e: E) -> Self { + Self { code: StatusCode::INTERNAL_SERVER_ERROR, eyre: e.into() } + } +} + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + (self.code, format!("{}", self.eyre)).into_response() + } +} + +#[derive(Debug, Clone)] +pub struct ServiceState { + dispatch: mpsc::UnboundedSender, +} + +/// Return a 404 Not Found response +pub async fn return_404() -> Response { + (StatusCode::NOT_FOUND, "not found").into_response() +} + +/// Return a 200 OK response +pub async fn return_200() -> Response { + (StatusCode::OK, "ok").into_response() +} + +/// Dispatches a transaction to the backend. +pub async fn include_tx(state: ServiceState, tx: TxEnvelope) -> Result { + // Simple check to see if the transaction is signed correctly. + if let Err(e) = tx.recover_signer() { + return Err(AppError::bad_req(e)); + } + + let hash = *tx.tx_hash(); + // send it to the backend + state.dispatch.send(tx).map_err(AppError::server_err)?; + // return the hash + Ok(hash) +} + +/// Handler for the /sendTransaction endpoint +pub async fn ingest_handler( + State(state): State, + Json(tx): Json, +) -> Result { + let hash = include_tx(state, tx).await?; + Ok(hash.to_string().into_response()) +} + +/// Handler for the /sendRawTransaction endpoint +pub async fn ingest_raw_handler( + State(state): State, + body: String, +) -> Result { + let body = body.strip_prefix("0x").unwrap_or(&body); + let buf = hex::decode(body).map_err(AppError::bad_req)?; + + let envelope = TxEnvelope::decode_2718(&mut buf.as_slice()).map_err(AppError::bad_req)?; + + ingest_handler(State(state), Json(envelope)).await +} + +/// Handler for the /rpc endpoint. +/// Simulates the eth_sendRawTransaction JSON-RPC method +pub async fn ingest_rpc_handler( + State(state): State, + body: String, +) -> Result { + // parse JSON-RPC values from request + let json = serde_json::from_str::(&body).map_err(AppError::bad_req)?; + let method = json["method"].as_str().expect("method not found"); + let tx = json["params"][0].as_str().expect("params malformed"); + + let id = match &json["id"] { + Value::Number(n) => Id::Number(n.as_u64().unwrap_or_default()), + Value::String(s) => Id::String(s.clone()), + _ => Id::None, + }; + + // MUST be eth_sendRawTransaction method + if method != "eth_sendRawTransaction" { + return Ok(Json(alloy::rpc::json_rpc::Response { + payload: alloy::rpc::json_rpc::ResponsePayload::<(), ()>::Failure(ErrorPayload { + code: -6969, + message: "Method not found".into(), + data: None, + }), + id, + }) + .into_response()); + } + + // parse TxEnvelope + let body: &str = tx.strip_prefix("0x").unwrap_or(tx); + let buf = hex::decode(body).map_err(AppError::bad_req)?; + let tx = TxEnvelope::decode_2718(&mut buf.as_slice()).map_err(AppError::bad_req)?; + + let hash = include_tx(state, tx).await?; + + // return JSON-RPC response + let resp = alloy::rpc::json_rpc::Response { + payload: alloy::rpc::json_rpc::ResponsePayload::<_, ()>::Success(hash), + id, + }; + + Ok(Json(resp).into_response()) +} + +/// Serve a builder service on the given socket address. +pub fn serve_builder_with_span( + dispatch: mpsc::UnboundedSender, + socket: impl Into, + span: Span, +) -> tokio::task::JoinHandle<()> { + let state = ServiceState { dispatch }; + + let router: Router = Router::new() + .route("/sendTransaction", post(ingest_handler)) + .route("/sendRawTransaction", post(ingest_raw_handler)) + .route("/rpc", post(ingest_rpc_handler)) + .route("/healthcheck", get(return_200)) + .fallback(return_404); + let app = router.with_state(state); + + let addr = socket.into(); + tokio::spawn( + async move { + match tokio::net::TcpListener::bind(&addr).await { + Ok(listener) => { + if let Err(err) = axum::serve(listener, app).await { + tracing::error!(%err, "serve failed"); + } + } + Err(err) => { + tracing::error!(%err, "failed to bind to the address"); + } + }; + } + .instrument(span), + ) +} diff --git a/src/signer.rs b/src/signer.rs new file mode 100644 index 0000000..392d64d --- /dev/null +++ b/src/signer.rs @@ -0,0 +1,110 @@ +use alloy::consensus::SignableTransaction; +use alloy::signers::aws::{AwsSigner, AwsSignerError}; +use alloy::signers::local::{LocalSignerError, PrivateKeySigner}; +use alloy::signers::Signature; +use alloy_primitives::{Address, ChainId, B256}; +use aws_config::BehaviorVersion; + +/// Abstraction over local signer or +#[derive(Debug, Clone)] +pub enum LocalOrAws { + Local(PrivateKeySigner), + Aws(AwsSigner), +} + +#[derive(Debug, thiserror::Error)] +pub enum SignerError { + /// Error during [`AwsSigner`] instantiation + #[error("failed to connect AWS signer: {0}")] + AwsSigner(#[from] AwsSignerError), + /// Error loading the private key + #[error("failed to load private key: {0}")] + Wallet(#[from] LocalSignerError), + /// Error parsing hex + #[error("failed to parse hex: {0}")] + Hex(#[from] hex::FromHexError), +} + +impl LocalOrAws { + /// Load a privkey or AWS signer from environment variables. + pub async fn load(key: &str, chain_id: Option) -> Result { + if let Ok(wallet) = LocalOrAws::wallet(key) { + Ok(LocalOrAws::Local(wallet)) + } else { + let signer = LocalOrAws::aws_signer(key, chain_id).await?; + Ok(LocalOrAws::Aws(signer)) + } + } + + /// Load the wallet from environment variables. + /// + /// # Panics + /// + /// Panics if the env var contents is not a valid secp256k1 private key. + fn wallet(private_key: &str) -> Result { + let bytes = hex::decode(private_key.strip_prefix("0x").unwrap_or(private_key))?; + Ok(PrivateKeySigner::from_slice(&bytes).unwrap()) + } + + /// Load the AWS signer from environment variables./s + async fn aws_signer(key_id: &str, chain_id: Option) -> Result { + let config = aws_config::load_defaults(BehaviorVersion::latest()).await; + let client = aws_sdk_kms::Client::new(&config); + AwsSigner::new(client, key_id.to_string(), chain_id).await.map_err(Into::into) + } +} + +#[async_trait::async_trait] +impl alloy::network::TxSigner for LocalOrAws { + fn address(&self) -> Address { + match self { + LocalOrAws::Local(signer) => signer.address(), + LocalOrAws::Aws(signer) => signer.address(), + } + } + + async fn sign_transaction( + &self, + tx: &mut dyn SignableTransaction, + ) -> alloy::signers::Result { + match self { + LocalOrAws::Local(signer) => signer.sign_transaction(tx).await, + LocalOrAws::Aws(signer) => signer.sign_transaction(tx).await, + } + } +} + +#[async_trait::async_trait] +impl alloy::signers::Signer for LocalOrAws { + /// Signs the given hash. + async fn sign_hash(&self, hash: &B256) -> alloy::signers::Result { + match self { + LocalOrAws::Local(signer) => signer.sign_hash(hash).await, + LocalOrAws::Aws(signer) => signer.sign_hash(hash).await, + } + } + + /// Returns the signer's Ethereum Address. + fn address(&self) -> Address { + match self { + LocalOrAws::Local(signer) => signer.address(), + LocalOrAws::Aws(signer) => signer.address(), + } + } + + /// Returns the signer's chain ID. + fn chain_id(&self) -> Option { + match self { + LocalOrAws::Local(signer) => signer.chain_id(), + LocalOrAws::Aws(signer) => signer.chain_id(), + } + } + + /// Sets the signer's chain ID. + fn set_chain_id(&mut self, chain_id: Option) { + match self { + LocalOrAws::Local(signer) => signer.set_chain_id(chain_id), + LocalOrAws::Aws(signer) => signer.set_chain_id(chain_id), + } + } +} diff --git a/src/tasks/block.rs b/src/tasks/block.rs new file mode 100644 index 0000000..31f5956 --- /dev/null +++ b/src/tasks/block.rs @@ -0,0 +1,139 @@ +use alloy::consensus::{SidecarBuilder, SidecarCoder, TxEnvelope}; +use alloy_primitives::{keccak256, Bytes, B256}; +use std::{sync::OnceLock, time::Duration}; +use tokio::{select, sync::mpsc, task::JoinHandle}; +use tracing::Instrument; +use zenith_types::{encode_txns, Alloy2718Coder}; + +use crate::config::BuilderConfig; + +#[derive(Debug, Default, Clone)] +/// A block in progress. +pub struct InProgressBlock { + transactions: Vec, + + raw_encoding: OnceLock, + hash: OnceLock, +} + +impl InProgressBlock { + /// Create a new `InProgressBlock` + pub fn new() -> Self { + Self { transactions: Vec::new(), raw_encoding: OnceLock::new(), hash: OnceLock::new() } + } + + /// Get the number of transactions in the block. + pub fn len(&self) -> usize { + self.transactions.len() + } + + /// Check if the block is empty. + pub fn is_empty(&self) -> bool { + self.transactions.is_empty() + } + + /// Unseal the block + fn unseal(&mut self) { + self.raw_encoding.take(); + self.hash.take(); + } + + /// Seal the block by encoding the transactions and calculating the contentshash. + fn seal(&self) { + self.raw_encoding.get_or_init(|| encode_txns::(&self.transactions).into()); + self.hash.get_or_init(|| keccak256(self.raw_encoding.get().unwrap().as_ref())); + } + + /// Ingest a transaction into the in-progress block. Fails + pub fn ingest_tx(&mut self, tx: &TxEnvelope) { + tracing::info!(hash = %tx.tx_hash(), "ingesting tx"); + self.unseal(); + self.transactions.push(tx.clone()); + } + + /// Encode the in-progress block + fn encode_raw(&self) -> &Bytes { + self.seal(); + self.raw_encoding.get().unwrap() + } + + /// Calculate the hash of the in-progress block, finishing the block. + pub fn contents_hash(&self) -> alloy_primitives::B256 { + self.seal(); + *self.hash.get().unwrap() + } + + /// Convert the in-progress block to sign request contents. + pub fn encode_calldata(&self) -> &Bytes { + self.encode_raw() + } + + /// Convert the in-progress block to a blob transaction sidecar. + pub fn encode_blob(&self) -> SidecarBuilder { + let mut coder = SidecarBuilder::::default(); + coder.ingest(self.encode_raw()); + coder + } +} + +/// BlockBuilder is a task that periodically builds a block then sends it for signing and submission. +pub struct BlockBuilder { + pub incoming_transactions_buffer: u64, + pub config: BuilderConfig, +} + +impl BlockBuilder { + // create a new block builder with the given config. + pub fn new(config: &BuilderConfig) -> Self { + Self { + config: config.clone(), + incoming_transactions_buffer: config.incoming_transactions_buffer, + } + } + + /// Spawn the block builder task, returning the inbound channel to it, and + /// a handle to the running task. + pub fn spawn( + self, + outbound: mpsc::UnboundedSender, + ) -> (mpsc::UnboundedSender, JoinHandle<()>) { + let mut in_progress = InProgressBlock::default(); + + let (sender, mut inbound) = mpsc::unbounded_channel(); + + let handle = tokio::spawn( + async move { + loop { + let sleep: tokio::time::Sleep = tokio::time::sleep(Duration::from_secs(self.incoming_transactions_buffer)); + tokio::pin!(sleep); + + select! { + biased; + _ = &mut sleep => { + if !in_progress.is_empty() { + tracing::debug!(txns = in_progress.len(), "sending block to submit task"); + let in_progress_block = std::mem::take(&mut in_progress); + if outbound.send(in_progress_block).is_err() { + tracing::debug!("downstream task gone"); + break + } + } + } + item_res = inbound.recv() => { + match item_res { + Some(item) => in_progress.ingest_tx(&item), + None => { + tracing::debug!("upstream task gone"); + break + } + } + } + } + } + } + .in_current_span(), + ); + + (sender, handle) + } +} diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs new file mode 100644 index 0000000..1df8ef3 --- /dev/null +++ b/src/tasks/mod.rs @@ -0,0 +1,3 @@ +pub mod block; +pub mod submit; +pub mod tx_poller; diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs new file mode 100644 index 0000000..302b87a --- /dev/null +++ b/src/tasks/submit.rs @@ -0,0 +1,225 @@ +use crate::{ + config::{Provider, ZenithInstance}, + signer::LocalOrAws, + tasks::block::InProgressBlock, +}; +use alloy::consensus::SimpleCoder; +use alloy::network::{TransactionBuilder, TransactionBuilder4844}; +use alloy::providers::{Provider as _, WalletProvider}; +use alloy::rpc::types::eth::TransactionRequest; +use alloy::signers::Signer; +use alloy::sol_types::SolCall; +use alloy::transports::TransportError; +use alloy_primitives::{FixedBytes, U256}; +use eyre::bail; +use oauth2::{ + basic::BasicClient, basic::BasicTokenType, reqwest::http_client, AuthUrl, ClientId, + ClientSecret, EmptyExtraTokenFields, StandardTokenResponse, TokenResponse, TokenUrl, +}; +use tokio::{sync::mpsc, task::JoinHandle}; +use tracing::{debug, error, instrument, trace}; +use zenith_types::{SignRequest, SignResponse, Zenith}; + +/// OAuth Audience Claim Name, required param by IdP for client credential grant +const OAUTH_AUDIENCE_CLAIM: &str = "audience"; + +/// Submits sidecars in ethereum txns to mainnet ethereum +pub struct SubmitTask { + /// Ethereum Provider + pub provider: Provider, + + /// Zenity + pub zenith: ZenithInstance, + + /// Reqwest + pub client: reqwest::Client, + + /// Sequencer Signer + pub sequencer_signer: Option, + + /// Config + pub config: crate::config::BuilderConfig, +} + +impl SubmitTask { + async fn sup_quincey(&self, sig_request: &SignRequest) -> eyre::Result { + tracing::info!( + host_block_number = %sig_request.host_block_number, + ru_chain_id = %sig_request.ru_chain_id, + "pinging quincey for signature" + ); + + let token = self.fetch_oauth_token().await?; + + let resp: reqwest::Response = self + .client + .post(self.config.quincey_url.as_ref()) + .json(sig_request) + .bearer_auth(token.access_token().secret()) + .send() + .await? + .error_for_status()?; + + let body = resp.bytes().await?; + + debug!(bytes = body.len(), "retrieved response body"); + trace!(body = %String::from_utf8_lossy(&body), "response body"); + + serde_json::from_slice(&body).map_err(Into::into) + } + + async fn fetch_oauth_token( + &self, + ) -> eyre::Result> { + let client = BasicClient::new( + ClientId::new(self.config.oauth_client_id.clone()), + Some(ClientSecret::new(self.config.oauth_client_secret.clone())), + AuthUrl::new(self.config.oauth_authenticate_url.clone())?, + Some(TokenUrl::new(self.config.oauth_token_url.clone())?), + ); + + let token_result = client + .exchange_client_credentials() + .add_extra_param(OAUTH_AUDIENCE_CLAIM, self.config.oauth_audience.clone()) + .request(http_client)?; + + Ok(token_result) + } + + #[instrument(skip_all)] + async fn construct_sig_request(&self, contents: &InProgressBlock) -> eyre::Result { + let ru_chain_id = U256::from(self.config.ru_chain_id); + let block_height = self.host_block_height().await?; + + Ok(SignRequest { + host_block_number: U256::from(block_height), + host_chain_id: U256::from(self.config.host_chain_id), + ru_chain_id, + gas_limit: U256::from(self.config.rollup_block_gas_limit), + ru_reward_address: self.config.builder_rewards_address, + contents: contents.contents_hash(), + }) + } + + fn build_blob_tx( + &self, + header: Zenith::BlockHeader, + v: u8, + r: FixedBytes<32>, + s: FixedBytes<32>, + in_progress: &InProgressBlock, + ) -> eyre::Result { + let data = Zenith::submitBlockCall { header, v, r, s, _4: Default::default() }.abi_encode(); + let sidecar = in_progress.encode_blob::().build()?; + Ok(TransactionRequest::default().with_blob_sidecar(sidecar).with_input(data)) + } + + async fn host_block_height(&self) -> eyre::Result { + let result = self.provider.get_block_number().await?; + Ok(result) + } + + async fn submit_transaction( + &self, + resp: &SignResponse, + in_progress: &InProgressBlock, + ) -> eyre::Result<()> { + let v: u8 = resp.sig.v().y_parity_byte() + 27; + let r: FixedBytes<32> = resp.sig.r().into(); + let s: FixedBytes<32> = resp.sig.s().into(); + + let header = Zenith::BlockHeader { + hostBlockNumber: resp.req.host_block_number, + rollupChainId: U256::from(self.config.ru_chain_id), + gasLimit: resp.req.gas_limit, + rewardAddress: resp.req.ru_reward_address, + blockDataHash: in_progress.contents_hash(), + }; + + let tx = self + .build_blob_tx(header, v, r, s, in_progress)? + .with_from(self.provider.default_signer_address()) + .with_to(self.config.zenith_address); + + if let Err(TransportError::ErrorResp(e)) = self.provider.call(&tx).await { + error!( + code = e.code, + message = %e.message, + data = ?e.data, + "error in transaction submission" + ); + + bail!("bailing transaction submission") + } + + tracing::debug!( + host_block_number = %resp.req.host_block_number, + gas_limit = %resp.req.gas_limit, + "sending transaction to network" + ); + + let result = self.provider.send_transaction(tx).await?; + + let tx_hash = result.tx_hash(); + + tracing::info!( + %tx_hash, + ru_chain_id = %resp.req.ru_chain_id, + gas_limit = %resp.req.gas_limit, + "dispatched to network" + ); + + Ok(()) + } + + #[instrument(skip_all, err)] + async fn handle_inbound(&self, in_progress: &InProgressBlock) -> eyre::Result<()> { + tracing::info!(txns = in_progress.len(), "handling inbound block"); + let sig_request = self.construct_sig_request(in_progress).await?; + + tracing::debug!( + host_block_number = %sig_request.host_block_number, + ru_chain_id = %sig_request.ru_chain_id, + "constructed signature request for host block" + ); + + // If configured with a local signer, we use it. Otherwise, we ask + // quincey (politely) + let signed = if let Some(signer) = &self.sequencer_signer { + let sig = signer.sign_hash(&sig_request.signing_hash()).await?; + tracing::debug!( + sig = hex::encode(sig.as_bytes()), + "acquired signature from local signer" + ); + SignResponse { req: sig_request, sig } + } else { + let resp: SignResponse = self.sup_quincey(&sig_request).await?; + tracing::debug!( + sig = hex::encode(resp.sig.as_bytes()), + "acquired signature from quincey" + ); + resp + }; + + self.submit_transaction(&signed, in_progress).await + } + + /// Spawn the task. + pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { + let (sender, mut inbound) = mpsc::unbounded_channel(); + let handle = tokio::spawn(async move { + loop { + if let Some(in_progress) = inbound.recv().await { + if let Err(e) = self.handle_inbound(&in_progress).await { + error!(%e, "error in block submission. Dropping block."); + } + } else { + tracing::debug!("upstream task gone"); + break; + } + } + }); + + (sender, handle) + } +} diff --git a/src/tasks/tx_poller.rs b/src/tasks/tx_poller.rs new file mode 100644 index 0000000..ca7829b --- /dev/null +++ b/src/tasks/tx_poller.rs @@ -0,0 +1,117 @@ +use std::time::Duration; +use std::{collections::HashMap, time}; + +use alloy::consensus::TxEnvelope; +use alloy_primitives::TxHash; + +use eyre::Error; +use reqwest::{Client, Url}; +use serde::{Deserialize, Serialize}; +use serde_json::from_slice; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +pub use crate::config::BuilderConfig; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TxPoolResponse { + transactions: Vec, +} + +/// Implements a poller for the block builder to pull transactions from the transaction pool. +pub struct TxPoller { + // config for the builder + pub config: BuilderConfig, + // Reqwest client for fetching transactions from the tx-pool + pub client: Client, + // Maintain a set of transaction hashes to their expiration times + pub seen_txns: HashMap, +} + +/// TxPoller implements a poller that fetches unique transactions from the transaction pool. +impl TxPoller { + /// returns a new TxPoller with the given config. + pub fn new(config: &BuilderConfig) -> Self { + Self { config: config.clone(), client: Client::new(), seen_txns: HashMap::new() } + } + + /// polls the tx-pool for unique transactions and evicts expired transactions. + /// unique transactions that haven't been seen before are sent into the builder pipeline. + pub async fn check_tx_pool(&mut self) -> Result, Error> { + let mut unique: Vec = Vec::new(); + + let url: Url = Url::parse(&self.config.tx_pool_url)?.join("transactions")?; + let result = self.client.get(url).send().await?; + let response: TxPoolResponse = from_slice(result.text().await?.as_bytes())?; + + response.transactions.iter().for_each(|entry| { + self.check_cache(entry.clone(), &mut unique); + }); + + Ok(unique) + } + + /// checks if the transaction has been seen before and if not, adds it to the unique transactions list. + fn check_cache(&mut self, tx: TxEnvelope, unique: &mut Vec) { + self.seen_txns.entry(*tx.tx_hash()).or_insert_with(|| { + // add to unique transactions + unique.push(tx.clone()); + // expiry is now + cache_duration + time::Instant::now() + Duration::from_secs(self.config.tx_pool_cache_duration) + }); + } + + /// removes entries from seen_txns that have lived past expiry + fn evict(&mut self) { + let expired_keys: Vec = self + .seen_txns + .iter() + .filter_map( + |(key, &expiration)| { + if !expiration.elapsed().is_zero() { + Some(*key) + } else { + None + } + }, + ) + .collect(); + + for key in expired_keys { + self.seen_txns.remove(&key); + } + } + + /// spawns a task that polls the tx-pool for unique transactions and ingests them into the tx_channel. + pub fn spawn(mut self, tx_channel: mpsc::UnboundedSender) -> JoinHandle<()> { + let handle: JoinHandle<()> = tokio::spawn(async move { + loop { + let channel = tx_channel.clone(); + let txns = self.check_tx_pool().await; + + // send recently discovered transactions to the builder pipeline + match txns { + Ok(txns) => { + for txn in txns.into_iter() { + let result = channel.send(txn); + if result.is_err() { + tracing::debug!("tx_poller failed to send tx"); + continue; + } + } + } + Err(e) => { + println!("Error polling transactions: {}", e); + } + } + + // evict expired txns once every loop + self.evict(); + + tokio::time::sleep(Duration::from_secs(self.config.tx_pool_poll_interval)).await; + } + }); + + handle + } +} diff --git a/tests/tx_poller_test.rs b/tests/tx_poller_test.rs new file mode 100644 index 0000000..f9378b9 --- /dev/null +++ b/tests/tx_poller_test.rs @@ -0,0 +1,91 @@ +mod tests { + use std::str::FromStr; + + use alloy::consensus::{SignableTransaction, TxEip1559, TxEnvelope}; + use alloy::signers::{local::PrivateKeySigner, SignerSync}; + use alloy_primitives::{bytes, Address, TxKind, U256}; + use builder::config::BuilderConfig; + use builder::tasks::{block::BlockBuilder, tx_poller}; + use eyre::Result; + + #[ignore = "integration test"] + #[tokio::test] + async fn test_tx_roundtrip() -> Result<()> { + // Create a new test environment + let (_, config) = setup_test_builder().await?; + + // Post a transaction to the cache + post_tx(&config).await?; + + // Create a new poller + let mut poller = tx_poller::TxPoller::new(&config); + + // Fetch transactions the pool + let transactions = poller.check_tx_pool().await?; + + // Ensure at least one transaction exists + assert!(!transactions.is_empty()); + + Ok(()) + } + + async fn post_tx(config: &BuilderConfig) -> Result<()> { + let client = reqwest::Client::new(); + let wallet = PrivateKeySigner::random(); + let tx_envelope = new_test_tx(&wallet)?; + + let url = format!("{}/transactions", config.tx_pool_url); + let response = client.post(&url).json(&tx_envelope).send().await?; + + if !response.status().is_success() { + let error_text = response.text().await?; + eyre::bail!("Failed to post transaction: {}", error_text); + } + + Ok(()) + } + + // Returns a new signed test transaction with default values + fn new_test_tx(wallet: &PrivateKeySigner) -> Result { + let tx = TxEip1559 { + chain_id: 17001, + nonce: 1, + gas_limit: 50000, + to: TxKind::Call( + Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), + ), + value: U256::from(1_f64), + input: bytes!(""), + ..Default::default() + }; + let signature = wallet.sign_hash_sync(&tx.signature_hash())?; + Ok(TxEnvelope::Eip1559(tx.into_signed(signature))) + } + + // Sets up a block builder with test values + async fn setup_test_builder() -> Result<(BlockBuilder, BuilderConfig)> { + let config = BuilderConfig { + host_chain_id: 17000, + ru_chain_id: 17001, + host_rpc_url: "http://rpc.holesky.signet.sh".into(), + zenith_address: Address::default(), + quincey_url: "http://localhost:8080".into(), + builder_port: 8080, + sequencer_key: None, + builder_key: "0000000000000000000000000000000000000000000000000000000000000000".into(), + incoming_transactions_buffer: 1, + block_confirmation_buffer: 1, + builder_rewards_address: Address::default(), + rollup_block_gas_limit: 100_000, + tx_pool_url: "http://localhost:9000/".into(), + tx_pool_cache_duration: 5, + tx_pool_poll_interval: 5, + oauth_client_id: "some_client_id".into(), + oauth_client_secret: "some_client_secret".into(), + oauth_authenticate_url: "http://localhost:8080".into(), + oauth_token_url: "http://localhost:8080".into(), + oauth_audience: "https://transactions.holesky.signet.sh".into(), + }; + Ok((BlockBuilder::new(&config), config)) + } +} From aa92400a26fb2edb0bfab58ceae1fdd04a541c00 Mon Sep 17 00:00:00 2001 From: Swanny Date: Mon, 28 Oct 2024 12:02:54 -0400 Subject: [PATCH 2/4] feat: readme and actions --- .github/workflows/ecr-cd.yml | 24 ++++++++++++++++ .github/workflows/rust-ci.yml | 11 +++++++ Dockerfile | 38 ++++++++++++++++++++++++ README.md | 54 ++++++++++++++++++++++++++++++++++- 4 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/ecr-cd.yml create mode 100644 .github/workflows/rust-ci.yml create mode 100644 Dockerfile diff --git a/.github/workflows/ecr-cd.yml b/.github/workflows/ecr-cd.yml new file mode 100644 index 0000000..c74a94b --- /dev/null +++ b/.github/workflows/ecr-cd.yml @@ -0,0 +1,24 @@ +name: Docker ECR Push + +on: + push: + branches: [main] + workflow_dispatch: + + +permissions: + packages: write + contents: read + id-token: write + +# simplest example of using the rust-base action +jobs: + docker-ecr-push: + uses: init4tech/actions/.github/workflows/ecr-build-and-push.yml@main + with: + rust-binary-name: zenith-builder-example + environment: dev + secrets: + aws-ecr-repository: ${{ secrets.AWS_ECR_REPOSITORY }} + aws-eks-cluster: ${{ secrets.AWS_EKS_CLUSTER }} + aws-ecr-deployer-role-arn: ${{ secrets.AWS_ECR_DEPLOYER_ROLE_ARN }} diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml new file mode 100644 index 0000000..66a8553 --- /dev/null +++ b/.github/workflows/rust-ci.yml @@ -0,0 +1,11 @@ +name: Rust CI + +on: + push: + branches: [main] + pull_request: + +# simplest example of using the rust-base action +jobs: + rust-base: + uses: init4tech/actions/.github/workflows/rust-base.yml@main diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..6418a61 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,38 @@ +# syntax=docker/dockerfile:1.7-labs +### STAGE 0: Create base chef image for building +### cargo chef is used to speed up the build process by caching dependencies using docker +FROM --platform=$TARGETPLATFORM lukemathwalker/cargo-chef:latest-rust-latest as chef + +RUN cargo install cargo-chef + +WORKDIR /app + +### Stage 1: cargo chef prepare +### Creates the recipe.json file which is a manifest of Cargo.toml files and +### the relevant Cargo.lock file +FROM chef as planner +COPY --exclude=target . . +RUN cargo chef prepare + +### Stage 2: Build the project +### This stage builds the deps of the project (not the code) using cargo chef cook +### and then it copies the source code and builds the actual crates +### this takes advantage of docker layer caching to the max +FROM chef as builder +COPY --from=planner /app/recipe.json recipe.json +RUN apt-get update && apt-get -y upgrade && apt-get install -y gcc libclang-dev pkg-config libssl-dev +RUN rustup target add x86_64-unknown-linux-gnu +RUN rustup toolchain install stable-x86_64-unknown-linux-gnu + +RUN cargo chef cook --release --target x86_64-unknown-linux-gnu --recipe-path recipe.json --bin zenith-builder-example +COPY --exclude=target . . + +RUN cargo build --release --target x86_64-unknown-linux-gnu --bin zenith-builder-example + +# Stage 3: Final image for running in the env +FROM --platform=$TARGETPLATFORM debian:bookworm-slim +RUN apt-get update && apt-get -y upgrade && apt-get install -y libssl-dev ca-certificates + +COPY --from=builder /app/target/x86_64-unknown-linux-gnu/release/zenith-builder-example /usr/local/bin/zenith-builder-example + +ENTRYPOINT [ "/usr/local/bin/zenith-builder-example" ] \ No newline at end of file diff --git a/README.md b/README.md index 0ff18a6..ab648f2 100644 --- a/README.md +++ b/README.md @@ -1 +1,53 @@ -# builder \ No newline at end of file +# builder + +Our sample signet builder implementation. + +## Development + +This crate contains an example block builder in the Signet ecosystem. + +### Requirements + +- Rust 1.81.0 +- Cargo [Lambda](https://www.cargo-lambda.info/) +- AWS CLI and credentials + +### Environment + +The following environment variables are exposed to configure the Builder: + +```bash +# Builder Configs +HOST_CHAIN_ID="17000" # Holesky Testnet +RU_CHAIN_ID="17001" +HOST_RPC_URL="http://host.url.here" +ZENITH_ADDRESS="ZENITH_ADDRESS_HERE" +QUINCEY_URL="http://signer.url.here" +BUILDER_PORT="8080" +BUILDER_KEY="YOUR_BUILDER_KEY_HERE" +INCOMING_TRANSACTIONS_BUFFER="10" +BLOCK_CONFIRMATION_BUFFER="10" +BUILDER_REWARDS_ADDRESS="BUILDER_REWARDS_ADDRESS_HERE" +ROLLUP_BLOCK_GAS_LIMIT="30000000" +# Transaction Pool Configs +TX_POOL_URL="http://pool.url.here/" # trailing slash is required +TX_POOL_POLL_INTERVAL="5" # seconds +TX_POOL_CACHE_DURATION="600" # seconds +``` + +## API + +### SignRequest + +Sign request example payload: + +```json +{ + "hostBlockNumber": "0x0", + "hostChainId": "0x1", + "ruChainId": "0x2", + "gasLimit": "0x5", + "ruRewardAddress": "0x0606060606060606060606060606060606060606", + "contents": "0x0707070707070707070707070707070707070707070707070707070707070707" +} +``` From af0df0f14b81f86dfd03bb57b40fcf5d84b76765 Mon Sep 17 00:00:00 2001 From: Swanny Date: Mon, 28 Oct 2024 12:04:38 -0400 Subject: [PATCH 3/4] chore: fmt --- bin/builder.rs | 5 ++++- src/config.rs | 4 +++- src/service.rs | 15 ++++++++++++--- src/signer.rs | 4 +++- src/tasks/block.rs | 12 +++++++++--- src/tasks/submit.rs | 18 +++++++++++++++--- src/tasks/tx_poller.rs | 22 ++++++++++++---------- 7 files changed, 58 insertions(+), 22 deletions(-) diff --git a/bin/builder.rs b/bin/builder.rs index daf8294..d805b53 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -14,7 +14,10 @@ async fn main() -> eyre::Result<()> { let config = BuilderConfig::load_from_env()?; let provider = config.connect_provider().await?; - tracing::debug!(rpc_url = config.host_rpc_url.as_ref(), "instantiated provider"); + tracing::debug!( + rpc_url = config.host_rpc_url.as_ref(), + "instantiated provider" + ); let sequencer_signer = config.connect_sequencer_signer().await?; let zenith = config.connect_zenith(provider.clone()); diff --git a/src/config.rs b/src/config.rs index 5edb398..f89dd60 100644 --- a/src/config.rs +++ b/src/config.rs @@ -154,7 +154,9 @@ impl BuilderConfig { } pub async fn connect_builder_signer(&self) -> Result { - LocalOrAws::load(&self.builder_key, Some(self.host_chain_id)).await.map_err(Into::into) + LocalOrAws::load(&self.builder_key, Some(self.host_chain_id)) + .await + .map_err(Into::into) } pub async fn connect_sequencer_signer(&self) -> Result, ConfigError> { diff --git a/src/service.rs b/src/service.rs index 95e0fb3..8182151 100644 --- a/src/service.rs +++ b/src/service.rs @@ -30,18 +30,27 @@ pub struct AppError { impl AppError { /// Instantiate a new error with the bad request status code. pub fn bad_req(e: E) -> Self { - Self { code: StatusCode::BAD_REQUEST, eyre: e.into() } + Self { + code: StatusCode::BAD_REQUEST, + eyre: e.into(), + } } /// Instantiate a new error with the bad request status code and an error /// string. pub fn bad_req_str(e: &str) -> Self { - Self { code: StatusCode::BAD_REQUEST, eyre: eyre::eyre!(e.to_owned()) } + Self { + code: StatusCode::BAD_REQUEST, + eyre: eyre::eyre!(e.to_owned()), + } } /// Instantiate a new error with the internal server error status code. pub fn server_err(e: E) -> Self { - Self { code: StatusCode::INTERNAL_SERVER_ERROR, eyre: e.into() } + Self { + code: StatusCode::INTERNAL_SERVER_ERROR, + eyre: e.into(), + } } } diff --git a/src/signer.rs b/src/signer.rs index 392d64d..52a5e7e 100644 --- a/src/signer.rs +++ b/src/signer.rs @@ -50,7 +50,9 @@ impl LocalOrAws { async fn aws_signer(key_id: &str, chain_id: Option) -> Result { let config = aws_config::load_defaults(BehaviorVersion::latest()).await; let client = aws_sdk_kms::Client::new(&config); - AwsSigner::new(client, key_id.to_string(), chain_id).await.map_err(Into::into) + AwsSigner::new(client, key_id.to_string(), chain_id) + .await + .map_err(Into::into) } } diff --git a/src/tasks/block.rs b/src/tasks/block.rs index 31f5956..6f27729 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -19,7 +19,11 @@ pub struct InProgressBlock { impl InProgressBlock { /// Create a new `InProgressBlock` pub fn new() -> Self { - Self { transactions: Vec::new(), raw_encoding: OnceLock::new(), hash: OnceLock::new() } + Self { + transactions: Vec::new(), + raw_encoding: OnceLock::new(), + hash: OnceLock::new(), + } } /// Get the number of transactions in the block. @@ -40,8 +44,10 @@ impl InProgressBlock { /// Seal the block by encoding the transactions and calculating the contentshash. fn seal(&self) { - self.raw_encoding.get_or_init(|| encode_txns::(&self.transactions).into()); - self.hash.get_or_init(|| keccak256(self.raw_encoding.get().unwrap().as_ref())); + self.raw_encoding + .get_or_init(|| encode_txns::(&self.transactions).into()); + self.hash + .get_or_init(|| keccak256(self.raw_encoding.get().unwrap().as_ref())); } /// Ingest a transaction into the in-progress block. Fails diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 302b87a..ea153b8 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -109,9 +109,18 @@ impl SubmitTask { s: FixedBytes<32>, in_progress: &InProgressBlock, ) -> eyre::Result { - let data = Zenith::submitBlockCall { header, v, r, s, _4: Default::default() }.abi_encode(); + let data = Zenith::submitBlockCall { + header, + v, + r, + s, + _4: Default::default(), + } + .abi_encode(); let sidecar = in_progress.encode_blob::().build()?; - Ok(TransactionRequest::default().with_blob_sidecar(sidecar).with_input(data)) + Ok(TransactionRequest::default() + .with_blob_sidecar(sidecar) + .with_input(data)) } async fn host_block_height(&self) -> eyre::Result { @@ -191,7 +200,10 @@ impl SubmitTask { sig = hex::encode(sig.as_bytes()), "acquired signature from local signer" ); - SignResponse { req: sig_request, sig } + SignResponse { + req: sig_request, + sig, + } } else { let resp: SignResponse = self.sup_quincey(&sig_request).await?; tracing::debug!( diff --git a/src/tasks/tx_poller.rs b/src/tasks/tx_poller.rs index ca7829b..c1809fc 100644 --- a/src/tasks/tx_poller.rs +++ b/src/tasks/tx_poller.rs @@ -32,7 +32,11 @@ pub struct TxPoller { impl TxPoller { /// returns a new TxPoller with the given config. pub fn new(config: &BuilderConfig) -> Self { - Self { config: config.clone(), client: Client::new(), seen_txns: HashMap::new() } + Self { + config: config.clone(), + client: Client::new(), + seen_txns: HashMap::new(), + } } /// polls the tx-pool for unique transactions and evicts expired transactions. @@ -66,15 +70,13 @@ impl TxPoller { let expired_keys: Vec = self .seen_txns .iter() - .filter_map( - |(key, &expiration)| { - if !expiration.elapsed().is_zero() { - Some(*key) - } else { - None - } - }, - ) + .filter_map(|(key, &expiration)| { + if !expiration.elapsed().is_zero() { + Some(*key) + } else { + None + } + }) .collect(); for key in expired_keys { From 3a65f157640a92cc2187564f43251a018d92bb30 Mon Sep 17 00:00:00 2001 From: Swanny Date: Mon, 28 Oct 2024 12:08:26 -0400 Subject: [PATCH 4/4] fix: tags for ecr --- .github/workflows/ecr-cd.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ecr-cd.yml b/.github/workflows/ecr-cd.yml index c74a94b..ab74aa8 100644 --- a/.github/workflows/ecr-cd.yml +++ b/.github/workflows/ecr-cd.yml @@ -3,6 +3,8 @@ name: Docker ECR Push on: push: branches: [main] + tags: + - v** workflow_dispatch: