Lines
0 %
Functions
Branches
100 %
// Copyright 2019-2025 PureStake Inc.
// This file is part of Moonbeam.
// Moonbeam is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Moonbeam is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Moonbeam. If not, see <http://www.gnu.org/licenses/>.
//! `trace_filter` RPC handler and its associated service task.
//! The RPC handler rely on `CacheTask` which provides a future that must be run inside a tokio
//! executor.
//!
//! The implementation is composed of multiple tasks :
//! - Many calls the RPC handler `Trace::filter`, communicating with the main task.
//! - A main `CacheTask` managing the cache and the communication between tasks.
//! - For each traced block an async task responsible to wait for a permit, spawn a blocking
//! task and waiting for the result, then send it to the main `CacheTask`.
use futures::{select, FutureExt};
use std::{
collections::{BTreeMap, HashMap},
future::Future,
marker::PhantomData,
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
sync::{mpsc, oneshot, Semaphore},
time::interval,
use tracing::{instrument, Instrument};
use sc_client_api::backend::{Backend, StateBackend, StorageProvider};
use sc_service::SpawnTaskHandle;
use sp_api::{ApiExt, Core, ProvideRuntimeApi};
use sp_block_builder::BlockBuilder;
use sp_blockchain::{
Backend as BlockchainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Header as HeaderT};
use substrate_prometheus_endpoint::Registry as PrometheusRegistry;
use ethereum_types::H256;
use fc_rpc::lru_cache::LRUCacheByteLimited;
use fc_storage::StorageOverride;
use fp_rpc::EthereumRuntimeRPCApi;
use moonbeam_client_evm_tracing::{
formatters::ResponseFormatter,
types::block::{self, TransactionTrace},
pub use moonbeam_rpc_core_trace::{FilterRequest, TraceServer};
use moonbeam_rpc_core_types::{RequestBlockId, RequestBlockTag};
use moonbeam_rpc_primitives_debug::DebugRuntimeApi;
/// Internal type for trace results from blocking tasks
type TxsTraceRes = Result<Vec<TransactionTrace>, String>;
/// Type for trace results sent to requesters (Arc-wrapped for zero-copy sharing)
/// Both success (traces) and error (message) are Arc-wrapped to avoid cloning
/// when multiple waiters are waiting for the same block.
type SharedTxsTraceRes = Result<Arc<Vec<TransactionTrace>>, Arc<String>>;
/// Log target for trace cache operations
const CACHE_LOG_TARGET: &str = "trace-cache";
/// Maximum time allowed for tracing a single block.
const TRACING_TIMEOUT_SECS: u64 = 60;
/// RPC handler. Will communicate with a `CacheTask` through a `CacheRequester`.
pub struct Trace<B, C> {
_phantom: PhantomData<B>,
client: Arc<C>,
requester: CacheRequester,
max_count: u32,
max_block_range: u32,
}
impl<B, C> Clone for Trace<B, C> {
fn clone(&self) -> Self {
Self {
_phantom: PhantomData,
client: Arc::clone(&self.client),
requester: self.requester.clone(),
max_count: self.max_count,
max_block_range: self.max_block_range,
impl<B, C> Trace<B, C>
where
B: BlockT<Hash = H256> + Send + Sync + 'static,
B::Header: HeaderT<Number = u32>,
C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
C: Send + Sync + 'static,
{
/// Create a new RPC handler.
pub fn new(
) -> Self {
client,
requester,
max_count,
max_block_range,
/// Convert an optional block ID (number or tag) to a block height.
fn block_id(&self, id: Option<RequestBlockId>) -> Result<u32, &'static str> {
match id {
Some(RequestBlockId::Number(n)) => Ok(n),
None | Some(RequestBlockId::Tag(RequestBlockTag::Latest)) => {
Ok(self.client.info().best_number)
Some(RequestBlockId::Tag(RequestBlockTag::Earliest)) => Ok(0),
Some(RequestBlockId::Tag(RequestBlockTag::Finalized)) => {
Ok(self.client.info().finalized_number)
Some(RequestBlockId::Tag(RequestBlockTag::Pending)) => {
Err("'pending' is not supported")
Some(RequestBlockId::Hash(_)) => Err("Block hash not supported"),
/// `trace_filter` endpoint (wrapped in the trait implementation with futures compatibility)
async fn filter(self, req: FilterRequest) -> TxsTraceRes {
let from_block = self.block_id(req.from_block)?;
let to_block = self.block_id(req.to_block)?;
// Validate block range to prevent abuse
let block_range = to_block.saturating_sub(from_block);
if block_range > self.max_block_range {
return Err(format!(
"block range is too wide (maximum {})",
self.max_block_range
));
let block_heights = from_block..=to_block;
let count = req.count.unwrap_or(self.max_count);
if count > self.max_count {
"count ({}) can't be greater than maximum ({})",
count, self.max_count
// Build a list of all the Substrate block hashes that need to be traced.
let mut block_hashes = vec![];
for block_height in block_heights {
if block_height == 0 {
continue; // no traces for genesis block.
let block_hash = self
.client
.hash(block_height)
.map_err(|e| {
format!(
"Error when fetching block {} header : {:?}",
block_height, e
)
})?
.ok_or_else(|| format!("Block with height {} don't exist", block_height))?;
block_hashes.push(block_hash);
// Fetch traces for all blocks
self.fetch_traces(req, &block_hashes, count as usize).await
async fn fetch_traces(
&self,
req: FilterRequest,
block_hashes: &[H256],
count: usize,
) -> TxsTraceRes {
let from_address = req.from_address.unwrap_or_default();
let to_address = req.to_address.unwrap_or_default();
let mut traces_amount: i64 = -(req.after.unwrap_or(0) as i64);
let mut traces = vec![];
for &block_hash in block_hashes {
// Request the traces of this block to the cache service.
// This will resolve quickly if the block is already cached, or wait until the block
// has finished tracing.
let block_traces = self
.requester
.get_traces(block_hash)
.await
.map_err(|arc_error| (*arc_error).clone())?;
// Filter addresses.
let mut block_traces: Vec<_> = block_traces
.iter()
.filter(|trace| match trace.action {
block::TransactionTraceAction::Call { from, to, .. } => {
(from_address.is_empty() || from_address.contains(&from))
&& (to_address.is_empty() || to_address.contains(&to))
block::TransactionTraceAction::Create { from, .. } => {
&& to_address.is_empty()
block::TransactionTraceAction::Suicide { address, .. } => {
(from_address.is_empty() || from_address.contains(&address))
})
.cloned()
.collect();
// Don't insert anything if we're still before "after"
traces_amount += block_traces.len() as i64;
if traces_amount > 0 {
let traces_amount = traces_amount as usize;
// If the current Vec of traces is across the "after" marker,
// we skip some elements of it.
if traces_amount < block_traces.len() {
let skip = block_traces.len() - traces_amount;
block_traces = block_traces.into_iter().skip(skip).collect();
traces.append(&mut block_traces);
// If we go over "count" (the limit), we trim and exit the loop,
// unless we used the default maximum, in which case we return an error.
if traces_amount >= count {
if req.count.is_none() {
"the amount of traces goes over the maximum ({}), please use 'after' \
and 'count' in your request",
self.max_count
traces = traces.into_iter().take(count).collect();
break;
Ok(traces)
#[jsonrpsee::core::async_trait]
impl<B, C> TraceServer for Trace<B, C>
async fn filter(
filter: FilterRequest,
) -> jsonrpsee::core::RpcResult<Vec<TransactionTrace>> {
self.clone()
.filter(filter)
.map_err(fc_rpc::internal_err)
/// Requests the cache task can accept.
enum CacheRequest {
/// Fetch the traces for given block hash.
/// The task will answer only when it has processed this block.
GetTraces {
/// Returns the array of traces or an error (Arc-wrapped for zero-copy sharing).
sender: oneshot::Sender<SharedTxsTraceRes>,
/// Hash of the block.
block: H256,
},
/// Allows to interact with the cache task.
#[derive(Clone)]
pub struct CacheRequester(mpsc::Sender<CacheRequest>);
impl CacheRequester {
/// If the block is already cached, returns immediately.
/// If the block is being traced, waits for the result.
/// If the block is not cached, triggers tracing and waits for the result.
/// Returns Arc-wrapped traces for zero-copy sharing.
#[instrument(skip(self))]
pub async fn get_traces(&self, block: H256) -> SharedTxsTraceRes {
let (response_tx, response_rx) = oneshot::channel();
let sender = self.0.clone();
sender
.send(CacheRequest::GetTraces {
sender: response_tx,
block,
Arc::new(format!(
"Trace cache task is overloaded or closed. Error : {:?}",
e
))
})?;
response_rx
"Trace cache task closed the response channel. Error : {:?}",
.map_err(|arc_error| {
Arc::new(format!("Failed to replay block. Error : {:?}", arc_error))
/// Entry in the wait list for a block being traced.
struct WaitListEntry {
/// Time when this entry was created
created_at: Instant,
/// All requests waiting for this block to be traced
waiters: Vec<oneshot::Sender<SharedTxsTraceRes>>,
/// Wait list for requests pending the same block trace.
/// Multiple concurrent requests for the same block will be added to this list
/// and all will receive the result once tracing completes.
type WaitList = HashMap<H256, WaitListEntry>;
/// Message sent from blocking trace tasks back to the main cache task.
enum BlockingTaskMessage {
/// The tracing is finished and the result is sent to the main task.
Finished {
block_hash: H256,
result: TxsTraceRes,
duration: Duration,
/// Prometheus metrics for trace filter cache operations.
struct CacheMetrics {
/// Current size of the wait list (number of blocks being traced)
wait_list_size: substrate_prometheus_endpoint::Gauge<substrate_prometheus_endpoint::U64>,
/// Total requests that joined an existing wait list entry (deduplication)
wait_list_joins_total:
substrate_prometheus_endpoint::Counter<substrate_prometheus_endpoint::U64>,
/// Total trace tasks spawned
tasks_spawned_total: substrate_prometheus_endpoint::Counter<substrate_prometheus_endpoint::U64>,
/// Total trace operations that timed out
timeouts_total: substrate_prometheus_endpoint::Counter<substrate_prometheus_endpoint::U64>,
/// Histogram of trace operation durations in seconds
trace_duration_seconds: substrate_prometheus_endpoint::Histogram,
impl CacheMetrics {
fn register(
registry: &PrometheusRegistry,
) -> Result<Self, substrate_prometheus_endpoint::PrometheusError> {
Ok(Self {
wait_list_size: substrate_prometheus_endpoint::register(
substrate_prometheus_endpoint::Gauge::new(
"trace_filter_wait_list_size",
"Current number of blocks in the wait list being traced",
)?,
registry,
wait_list_joins_total: substrate_prometheus_endpoint::register(
substrate_prometheus_endpoint::Counter::new(
"trace_filter_wait_list_joins_total",
"Total requests that joined an existing wait list entry",
tasks_spawned_total: substrate_prometheus_endpoint::register(
"trace_filter_tasks_spawned_total",
"Total trace tasks spawned",
timeouts_total: substrate_prometheus_endpoint::register(
"trace_filter_timeouts_total",
"Total trace operations that timed out",
trace_duration_seconds: substrate_prometheus_endpoint::register(
substrate_prometheus_endpoint::Histogram::with_opts(
substrate_prometheus_endpoint::HistogramOpts::new(
"trace_filter_trace_duration_seconds",
"Histogram of trace operation durations in seconds",
.buckets(vec![0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]),
/// Type wrapper for the cache task, generic over the Client, Block and Backend types.
pub struct CacheTask<B, C, BE> {
backend: Arc<BE>,
blocking_permits: Arc<Semaphore>,
cache: LRUCacheByteLimited<H256, Arc<Vec<TransactionTrace>>>,
wait_list: WaitList,
metrics: Option<CacheMetrics>,
impl<B, C, BE> CacheTask<B, C, BE>
BE: Backend<B> + 'static,
BE::State: StateBackend<BlakeTwo256>,
C: ProvideRuntimeApi<B>,
C: StorageProvider<B, BE>,
C::Api: BlockBuilder<B>,
C::Api: DebugRuntimeApi<B>,
C::Api: EthereumRuntimeRPCApi<B>,
C::Api: ApiExt<B>,
/// Create a new cache task.
///
/// Returns a Future that needs to be added to a tokio executor, and a handle allowing to
/// send requests to the task.
pub fn create(
cache_size_bytes: u64,
overrides: Arc<dyn StorageOverride<B>>,
prometheus: Option<PrometheusRegistry>,
spawn_handle: SpawnTaskHandle,
) -> (impl Future<Output = ()>, CacheRequester) {
// Communication with the outside world - bounded channel to prevent memory exhaustion
let (requester_tx, mut requester_rx) = mpsc::channel(10_000);
// Task running in the service.
let task = async move {
let (blocking_tx, mut blocking_rx) =
mpsc::channel(blocking_permits.available_permits().saturating_mul(2));
// Periodic cleanup interval for orphaned wait list entries
let mut cleanup_interval = interval(Duration::from_secs(30));
cleanup_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
// Register metrics if prometheus registry is provided
let metrics =
prometheus
.as_ref()
.and_then(|registry| match CacheMetrics::register(registry) {
Ok(metrics) => Some(metrics),
Err(e) => {
log::warn!(
target: CACHE_LOG_TARGET,
"Failed to register trace filter metrics: {:?}",
);
None
});
let mut inner = Self {
backend,
blocking_permits,
cache: LRUCacheByteLimited::new(
"trace-filter-blocks-cache",
cache_size_bytes,
prometheus,
),
wait_list: HashMap::new(),
metrics,
_phantom: Default::default(),
loop {
select! {
request = requester_rx.recv().fuse() => {
match request {
None => break,
Some(CacheRequest::GetTraces {sender, block}) =>
inner.request_get_traces(&blocking_tx, sender, block, overrides.clone(), &spawn_handle),
message = blocking_rx.recv().fuse() => {
if let Some(BlockingTaskMessage::Finished { block_hash, result, duration }) = message {
inner.blocking_finished(block_hash, result, duration);
_ = cleanup_interval.tick().fuse() => {
inner.cleanup_wait_list();
.instrument(tracing::debug_span!("trace_filter_cache"));
(task, CacheRequester(requester_tx))
/// Handle a request to get traces for a specific block.
/// - If cached: respond immediately
/// - If pending: add to wait list
/// - If new: spawn trace task and add to wait list
fn request_get_traces(
&mut self,
blocking_tx: &mpsc::Sender<BlockingTaskMessage>,
spawn_handle: &SpawnTaskHandle,
) {
log::trace!(
"Request received: block={}, wait_list_size={}",
self.wait_list.len()
// Check if block is already cached
if let Some(cached) = self.cache.get(&block) {
"Cache hit: block={}",
block
// Cache hit - respond immediately with Arc::clone (cheap)
let _ = sender.send(Ok(Arc::clone(&cached)));
return;
// Check if block is currently being traced
if let Some(entry) = self.wait_list.get_mut(&block) {
"Joining wait list: block={}, waiters={}",
entry.waiters.len()
entry.waiters.push(sender);
// Increment deduplication metric
if let Some(ref metrics) = self.metrics {
metrics.wait_list_joins_total.inc();
// Add sender to wait list for this new block
self.wait_list.insert(
WaitListEntry {
created_at: Instant::now(),
waiters: vec![sender],
log::debug!(
"Spawning trace task: block={}, available_permits={}",
self.blocking_permits.available_permits()
// Update metrics
metrics.tasks_spawned_total.inc();
metrics.wait_list_size.set(self.wait_list.len() as u64);
// Spawn worker task to trace the block
let blocking_permits = Arc::clone(&self.blocking_permits);
let client = Arc::clone(&self.client);
let backend = Arc::clone(&self.backend);
let blocking_tx = blocking_tx.clone();
let start_time = Instant::now();
spawn_handle.spawn(
"trace-block",
Some("trace-filter"),
async move {
// Wait for permit to limit concurrent tracing operations
let _permit = blocking_permits.acquire().await;
// Perform block tracing in blocking task with timeout
let result = match tokio::time::timeout(
Duration::from_secs(TRACING_TIMEOUT_SECS),
tokio::task::spawn_blocking(move || {
Self::cache_block(client, backend, block, overrides)
}),
// Timeout occurred
Err(_elapsed) => {
log::error!(
"Tracing timeout for block {}",
Err(format!(
"Tracing timeout after {} seconds",
TRACING_TIMEOUT_SECS
// Task completed
Ok(join_result) => {
match join_result {
// Task panicked
Err(join_err) => Err(format!("Tracing panicked: {:?}", join_err)),
// Task succeeded, return its result
Ok(trace_result) => trace_result,
// Send result back to main task
let duration = start_time.elapsed();
let _ = blocking_tx
.send(BlockingTaskMessage::Finished {
block_hash: block,
result,
duration,
.await;
.instrument(tracing::trace_span!("trace_block", block = %block)),
/// Handle completion of a block trace task.
/// Sends result to all waiting requests and caches it.
/// Uses Arc for zero-copy sharing across multiple waiters.
fn blocking_finished(&mut self, block_hash: H256, result: TxsTraceRes, duration: Duration) {
// Get all waiting senders for this block
if let Some(entry) = self.wait_list.remove(&block_hash) {
let waiter_count = entry.waiters.len();
// Update wait list size metric
metrics
.trace_duration_seconds
.observe(duration.as_secs_f64());
match result {
Ok(traces) => {
let trace_count = traces.len();
// Wrap successful result in Arc once
let arc_traces = Arc::new(traces);
"Trace completed: block={}, traces={}, waiters={}, cached=true, duration={:?}",
block_hash,
trace_count,
waiter_count,
duration
// Send Arc::clone to all waiters (cheap pointer copy, no data duplication)
for sender in entry.waiters {
let _ = sender.send(Ok(Arc::clone(&arc_traces)));
// Cache the Arc-wrapped result
self.cache.put(block_hash, arc_traces);
Err(error) => {
"Trace failed: block={}, waiters={}, error={}",
error
// Wrap error in Arc once
let arc_error = Arc::new(error);
// Send Arc::clone to all waiters (cheap pointer copy, no string duplication)
let _ = sender.send(Err(Arc::clone(&arc_error)));
/// Clean up orphaned wait list entries that have been pending too long.
/// This handles cases where spawned tasks panic or get cancelled.
fn cleanup_wait_list(&mut self) {
let timeout = Duration::from_secs(TRACING_TIMEOUT_SECS + 10);
let now = Instant::now();
let mut to_remove = Vec::new();
for (block_hash, entry) in &self.wait_list {
if now.duration_since(entry.created_at) > timeout {
"Cleaning up orphaned wait list entry for block {}",
block_hash
to_remove.push(*block_hash);
"Wait list status: active_blocks={}, timed_out_block_requests={}",
self.wait_list.len(),
to_remove.len()
// Increment timeout metric for each timed out block
if !to_remove.is_empty() {
for _ in &to_remove {
metrics.timeouts_total.inc();
// Remove timed-out entries and notify waiters
let timeout_error =
Arc::new("Trace request timeout (task failed or was cancelled)".to_string());
for block_hash in to_remove {
let _ = sender.send(Err(Arc::clone(&timeout_error)));
// Update wait list size metric after cleanup
/// (In blocking task) Use the Runtime API to trace the block.
#[instrument(skip(client, backend, overrides))]
fn cache_block(
substrate_hash: H256,
// Get Substrate block data.
let api = client.runtime_api();
let block_header = client
.header(substrate_hash)
"Error when fetching substrate block {} header : {:?}",
substrate_hash, e
.ok_or_else(|| format!("Substrate block {} don't exist", substrate_hash))?;
let height = *block_header.number();
let substrate_parent_hash = *block_header.parent_hash();
// Get Ethereum block data.
let (eth_block, eth_transactions) = match (
overrides.current_block(substrate_hash),
overrides.current_transaction_statuses(substrate_hash),
(Some(a), Some(b)) => (a, b),
_ => {
"Failed to get Ethereum block data for Substrate block {}",
substrate_hash
let eth_block_hash = eth_block.header.hash();
let eth_tx_hashes = eth_transactions
.map(|t| t.transaction_hash)
// Get extrinsics (containing Ethereum ones)
let extrinsics = backend
.blockchain()
.body(substrate_hash)
"Blockchain error when fetching extrinsics of block {} : {:?}",
height, e
.ok_or_else(|| format!("Could not find block {} when fetching extrinsics.", height))?;
// Get DebugRuntimeApi version
let trace_api_version = if let Ok(Some(api_version)) =
api.api_version::<dyn DebugRuntimeApi<B>>(substrate_parent_hash)
api_version
} else {
return Err("Runtime api version call failed (trace)".to_string());
// Trace the block.
let f = || -> Result<_, String> {
let result = if trace_api_version >= 5 {
api.trace_block(
substrate_parent_hash,
extrinsics,
eth_tx_hashes,
&block_header,
// Get core runtime api version
let core_api_version = if let Ok(Some(api_version)) =
api.api_version::<dyn Core<B>>(substrate_parent_hash)
return Err("Runtime api version call failed (core)".to_string());
// Initialize block: calls the "on_initialize" hook on every pallet
// in AllPalletsWithSystem
// This was fine before pallet-message-queue because the XCM messages
// were processed by the "setValidationData" inherent call and not on an
// "on_initialize" hook, which runs before enabling XCM tracing
if core_api_version >= 5 {
api.initialize_block(substrate_parent_hash, &block_header)
.map_err(|e| format!("Runtime api access error: {:?}", e))?;
#[allow(deprecated)]
api.initialize_block_before_version_5(substrate_parent_hash, &block_header)
api.trace_block_before_version_5(substrate_parent_hash, extrinsics, eth_tx_hashes)
result
.map_err(|e| format!("Blockchain error when replaying block {} : {:?}", height, e))?
tracing::warn!(
target: "tracing",
"Internal runtime error when replaying block {} : {:?}",
height,
Ok(moonbeam_rpc_primitives_debug::Response::Block)
let eth_transactions_by_index: BTreeMap<u32, H256> = eth_transactions
.map(|t| (t.transaction_index, t.transaction_hash))
let mut proxy = moonbeam_client_evm_tracing::listeners::CallList::default();
proxy.using(f)?;
let traces: Vec<TransactionTrace> =
moonbeam_client_evm_tracing::formatters::TraceFilter::format(proxy)
.ok_or("Fail to format proxy")?
.into_iter()
.filter_map(|mut trace| {
match eth_transactions_by_index.get(&trace.transaction_position) {
Some(transaction_hash) => {
trace.block_hash = eth_block_hash;
trace.block_number = height;
trace.transaction_hash = *transaction_hash;
// Reformat error messages.
if let block::TransactionTraceOutput::Error(ref mut error) =
trace.output
if error.as_slice() == b"execution reverted" {
*error = b"Reverted".to_vec();
Some(trace)
None => {
"A trace in block {} does not map to any known ethereum transaction. Trace: {:?}",
trace,