1
// Copyright 2019-2025 PureStake Inc.
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! `trace_filter` RPC handler and its associated service task.
18
//! The RPC handler rely on `CacheTask` which provides a future that must be run inside a tokio
19
//! executor.
20
//!
21
//! The implementation is composed of multiple tasks :
22
//! - Many calls the RPC handler `Trace::filter`, communicating with the main task.
23
//! - A main `CacheTask` managing the cache and the communication between tasks.
24
//! - For each traced block an async task responsible to wait for a permit, spawn a blocking
25
//!   task and waiting for the result, then send it to the main `CacheTask`.
26

            
27
use futures::{select, FutureExt};
28
use std::{
29
	collections::{BTreeMap, HashMap},
30
	future::Future,
31
	marker::PhantomData,
32
	sync::Arc,
33
	time::{Duration, Instant},
34
};
35
use tokio::{
36
	sync::{mpsc, oneshot, Semaphore},
37
	time::interval,
38
};
39
use tracing::{instrument, Instrument};
40

            
41
use sc_client_api::backend::{Backend, StateBackend, StorageProvider};
42
use sc_service::SpawnTaskHandle;
43
use sp_api::{ApiExt, Core, ProvideRuntimeApi};
44
use sp_block_builder::BlockBuilder;
45
use sp_blockchain::{
46
	Backend as BlockchainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
47
};
48
use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Header as HeaderT};
49
use substrate_prometheus_endpoint::Registry as PrometheusRegistry;
50

            
51
use ethereum_types::H256;
52
use fc_rpc::lru_cache::LRUCacheByteLimited;
53
use fc_storage::StorageOverride;
54
use fp_rpc::EthereumRuntimeRPCApi;
55

            
56
use moonbeam_client_evm_tracing::{
57
	formatters::ResponseFormatter,
58
	types::block::{self, TransactionTrace},
59
};
60
pub use moonbeam_rpc_core_trace::{FilterRequest, TraceServer};
61
use moonbeam_rpc_core_types::{RequestBlockId, RequestBlockTag};
62
use moonbeam_rpc_primitives_debug::DebugRuntimeApi;
63

            
64
/// Internal type for trace results from blocking tasks
65
type TxsTraceRes = Result<Vec<TransactionTrace>, String>;
66

            
67
/// Type for trace results sent to requesters (Arc-wrapped for zero-copy sharing)
68
/// Both success (traces) and error (message) are Arc-wrapped to avoid cloning
69
/// when multiple waiters are waiting for the same block.
70
type SharedTxsTraceRes = Result<Arc<Vec<TransactionTrace>>, Arc<String>>;
71

            
72
/// Log target for trace cache operations
73
const CACHE_LOG_TARGET: &str = "trace-cache";
74

            
75
/// Maximum time allowed for tracing a single block.
76
const TRACING_TIMEOUT_SECS: u64 = 60;
77

            
78
/// RPC handler. Will communicate with a `CacheTask` through a `CacheRequester`.
79
pub struct Trace<B, C> {
80
	_phantom: PhantomData<B>,
81
	client: Arc<C>,
82
	requester: CacheRequester,
83
	max_count: u32,
84
	max_block_range: u32,
85
}
86

            
87
impl<B, C> Clone for Trace<B, C> {
88
	fn clone(&self) -> Self {
89
		Self {
90
			_phantom: PhantomData,
91
			client: Arc::clone(&self.client),
92
			requester: self.requester.clone(),
93
			max_count: self.max_count,
94
			max_block_range: self.max_block_range,
95
		}
96
	}
97
}
98

            
99
impl<B, C> Trace<B, C>
100
where
101
	B: BlockT<Hash = H256> + Send + Sync + 'static,
102
	B::Header: HeaderT<Number = u32>,
103
	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
104
	C: Send + Sync + 'static,
105
{
106
	/// Create a new RPC handler.
107
	pub fn new(
108
		client: Arc<C>,
109
		requester: CacheRequester,
110
		max_count: u32,
111
		max_block_range: u32,
112
	) -> Self {
113
		Self {
114
			client,
115
			requester,
116
			max_count,
117
			max_block_range,
118
			_phantom: PhantomData,
119
		}
120
	}
121

            
122
	/// Convert an optional block ID (number or tag) to a block height.
123
	fn block_id(&self, id: Option<RequestBlockId>) -> Result<u32, &'static str> {
124
		match id {
125
			Some(RequestBlockId::Number(n)) => Ok(n),
126
			None | Some(RequestBlockId::Tag(RequestBlockTag::Latest)) => {
127
				Ok(self.client.info().best_number)
128
			}
129
			Some(RequestBlockId::Tag(RequestBlockTag::Earliest)) => Ok(0),
130
			Some(RequestBlockId::Tag(RequestBlockTag::Finalized)) => {
131
				Ok(self.client.info().finalized_number)
132
			}
133
			Some(RequestBlockId::Tag(RequestBlockTag::Pending)) => {
134
				Err("'pending' is not supported")
135
			}
136
			Some(RequestBlockId::Hash(_)) => Err("Block hash not supported"),
137
		}
138
	}
139

            
140
	/// `trace_filter` endpoint (wrapped in the trait implementation with futures compatibility)
141
	async fn filter(self, req: FilterRequest) -> TxsTraceRes {
142
		let from_block = self.block_id(req.from_block)?;
143
		let to_block = self.block_id(req.to_block)?;
144

            
145
		// Validate block range to prevent abuse
146
		let block_range = to_block.saturating_sub(from_block);
147
		if block_range > self.max_block_range {
148
			return Err(format!(
149
				"block range is too wide (maximum {})",
150
				self.max_block_range
151
			));
152
		}
153

            
154
		let block_heights = from_block..=to_block;
155

            
156
		let count = req.count.unwrap_or(self.max_count);
157
		if count > self.max_count {
158
			return Err(format!(
159
				"count ({}) can't be greater than maximum ({})",
160
				count, self.max_count
161
			));
162
		}
163

            
164
		// Build a list of all the Substrate block hashes that need to be traced.
165
		let mut block_hashes = vec![];
166
		for block_height in block_heights {
167
			if block_height == 0 {
168
				continue; // no traces for genesis block.
169
			}
170

            
171
			let block_hash = self
172
				.client
173
				.hash(block_height)
174
				.map_err(|e| {
175
					format!(
176
						"Error when fetching block {} header : {:?}",
177
						block_height, e
178
					)
179
				})?
180
				.ok_or_else(|| format!("Block with height {} don't exist", block_height))?;
181

            
182
			block_hashes.push(block_hash);
183
		}
184

            
185
		// Fetch traces for all blocks
186
		self.fetch_traces(req, &block_hashes, count as usize).await
187
	}
188

            
189
	async fn fetch_traces(
190
		&self,
191
		req: FilterRequest,
192
		block_hashes: &[H256],
193
		count: usize,
194
	) -> TxsTraceRes {
195
		let from_address = req.from_address.unwrap_or_default();
196
		let to_address = req.to_address.unwrap_or_default();
197

            
198
		let mut traces_amount: i64 = -(req.after.unwrap_or(0) as i64);
199
		let mut traces = vec![];
200

            
201
		for &block_hash in block_hashes {
202
			// Request the traces of this block to the cache service.
203
			// This will resolve quickly if the block is already cached, or wait until the block
204
			// has finished tracing.
205
			let block_traces = self
206
				.requester
207
				.get_traces(block_hash)
208
				.await
209
				.map_err(|arc_error| (*arc_error).clone())?;
210

            
211
			// Filter addresses.
212
			let mut block_traces: Vec<_> = block_traces
213
				.iter()
214
				.filter(|trace| match trace.action {
215
					block::TransactionTraceAction::Call { from, to, .. } => {
216
						(from_address.is_empty() || from_address.contains(&from))
217
							&& (to_address.is_empty() || to_address.contains(&to))
218
					}
219
					block::TransactionTraceAction::Create { from, .. } => {
220
						(from_address.is_empty() || from_address.contains(&from))
221
							&& to_address.is_empty()
222
					}
223
					block::TransactionTraceAction::Suicide { address, .. } => {
224
						(from_address.is_empty() || from_address.contains(&address))
225
							&& to_address.is_empty()
226
					}
227
				})
228
				.cloned()
229
				.collect();
230

            
231
			// Don't insert anything if we're still before "after"
232
			traces_amount += block_traces.len() as i64;
233
			if traces_amount > 0 {
234
				let traces_amount = traces_amount as usize;
235
				// If the current Vec of traces is across the "after" marker,
236
				// we skip some elements of it.
237
				if traces_amount < block_traces.len() {
238
					let skip = block_traces.len() - traces_amount;
239
					block_traces = block_traces.into_iter().skip(skip).collect();
240
				}
241

            
242
				traces.append(&mut block_traces);
243

            
244
				// If we go over "count" (the limit), we trim and exit the loop,
245
				// unless we used the default maximum, in which case we return an error.
246
				if traces_amount >= count {
247
					if req.count.is_none() {
248
						return Err(format!(
249
							"the amount of traces goes over the maximum ({}), please use 'after' \
250
							and 'count' in your request",
251
							self.max_count
252
						));
253
					}
254

            
255
					traces = traces.into_iter().take(count).collect();
256
					break;
257
				}
258
			}
259
		}
260

            
261
		Ok(traces)
262
	}
263
}
264

            
265
#[jsonrpsee::core::async_trait]
266
impl<B, C> TraceServer for Trace<B, C>
267
where
268
	B: BlockT<Hash = H256> + Send + Sync + 'static,
269
	B::Header: HeaderT<Number = u32>,
270
	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
271
	C: Send + Sync + 'static,
272
{
273
	async fn filter(
274
		&self,
275
		filter: FilterRequest,
276
	) -> jsonrpsee::core::RpcResult<Vec<TransactionTrace>> {
277
		self.clone()
278
			.filter(filter)
279
			.await
280
			.map_err(fc_rpc::internal_err)
281
	}
282
}
283

            
284
/// Requests the cache task can accept.
285
enum CacheRequest {
286
	/// Fetch the traces for given block hash.
287
	/// The task will answer only when it has processed this block.
288
	GetTraces {
289
		/// Returns the array of traces or an error (Arc-wrapped for zero-copy sharing).
290
		sender: oneshot::Sender<SharedTxsTraceRes>,
291
		/// Hash of the block.
292
		block: H256,
293
	},
294
}
295

            
296
/// Allows to interact with the cache task.
297
#[derive(Clone)]
298
pub struct CacheRequester(mpsc::Sender<CacheRequest>);
299

            
300
impl CacheRequester {
301
	/// Fetch the traces for given block hash.
302
	/// If the block is already cached, returns immediately.
303
	/// If the block is being traced, waits for the result.
304
	/// If the block is not cached, triggers tracing and waits for the result.
305
	/// Returns Arc-wrapped traces for zero-copy sharing.
306
	#[instrument(skip(self))]
307
	pub async fn get_traces(&self, block: H256) -> SharedTxsTraceRes {
308
		let (response_tx, response_rx) = oneshot::channel();
309
		let sender = self.0.clone();
310

            
311
		sender
312
			.send(CacheRequest::GetTraces {
313
				sender: response_tx,
314
				block,
315
			})
316
			.await
317
			.map_err(|e| {
318
				Arc::new(format!(
319
					"Trace cache task is overloaded or closed. Error : {:?}",
320
					e
321
				))
322
			})?;
323

            
324
		response_rx
325
			.await
326
			.map_err(|e| {
327
				Arc::new(format!(
328
					"Trace cache task closed the response channel. Error : {:?}",
329
					e
330
				))
331
			})?
332
			.map_err(|arc_error| {
333
				Arc::new(format!("Failed to replay block. Error : {:?}", arc_error))
334
			})
335
	}
336
}
337

            
338
/// Entry in the wait list for a block being traced.
339
struct WaitListEntry {
340
	/// Time when this entry was created
341
	created_at: Instant,
342
	/// All requests waiting for this block to be traced
343
	waiters: Vec<oneshot::Sender<SharedTxsTraceRes>>,
344
}
345

            
346
/// Wait list for requests pending the same block trace.
347
/// Multiple concurrent requests for the same block will be added to this list
348
/// and all will receive the result once tracing completes.
349
type WaitList = HashMap<H256, WaitListEntry>;
350

            
351
/// Message sent from blocking trace tasks back to the main cache task.
352
enum BlockingTaskMessage {
353
	/// The tracing is finished and the result is sent to the main task.
354
	Finished {
355
		block_hash: H256,
356
		result: TxsTraceRes,
357
		duration: Duration,
358
	},
359
}
360

            
361
/// Prometheus metrics for trace filter cache operations.
362
struct CacheMetrics {
363
	/// Current size of the wait list (number of blocks being traced)
364
	wait_list_size: substrate_prometheus_endpoint::Gauge<substrate_prometheus_endpoint::U64>,
365
	/// Total requests that joined an existing wait list entry (deduplication)
366
	wait_list_joins_total:
367
		substrate_prometheus_endpoint::Counter<substrate_prometheus_endpoint::U64>,
368
	/// Total trace tasks spawned
369
	tasks_spawned_total: substrate_prometheus_endpoint::Counter<substrate_prometheus_endpoint::U64>,
370
	/// Total trace operations that timed out
371
	timeouts_total: substrate_prometheus_endpoint::Counter<substrate_prometheus_endpoint::U64>,
372
	/// Histogram of trace operation durations in seconds
373
	trace_duration_seconds: substrate_prometheus_endpoint::Histogram,
374
}
375

            
376
impl CacheMetrics {
377
	fn register(
378
		registry: &PrometheusRegistry,
379
	) -> Result<Self, substrate_prometheus_endpoint::PrometheusError> {
380
		Ok(Self {
381
			wait_list_size: substrate_prometheus_endpoint::register(
382
				substrate_prometheus_endpoint::Gauge::new(
383
					"trace_filter_wait_list_size",
384
					"Current number of blocks in the wait list being traced",
385
				)?,
386
				registry,
387
			)?,
388
			wait_list_joins_total: substrate_prometheus_endpoint::register(
389
				substrate_prometheus_endpoint::Counter::new(
390
					"trace_filter_wait_list_joins_total",
391
					"Total requests that joined an existing wait list entry",
392
				)?,
393
				registry,
394
			)?,
395
			tasks_spawned_total: substrate_prometheus_endpoint::register(
396
				substrate_prometheus_endpoint::Counter::new(
397
					"trace_filter_tasks_spawned_total",
398
					"Total trace tasks spawned",
399
				)?,
400
				registry,
401
			)?,
402
			timeouts_total: substrate_prometheus_endpoint::register(
403
				substrate_prometheus_endpoint::Counter::new(
404
					"trace_filter_timeouts_total",
405
					"Total trace operations that timed out",
406
				)?,
407
				registry,
408
			)?,
409
			trace_duration_seconds: substrate_prometheus_endpoint::register(
410
				substrate_prometheus_endpoint::Histogram::with_opts(
411
					substrate_prometheus_endpoint::HistogramOpts::new(
412
						"trace_filter_trace_duration_seconds",
413
						"Histogram of trace operation durations in seconds",
414
					)
415
					.buckets(vec![0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]),
416
				)?,
417
				registry,
418
			)?,
419
		})
420
	}
421
}
422

            
423
/// Type wrapper for the cache task, generic over the Client, Block and Backend types.
424
pub struct CacheTask<B, C, BE> {
425
	client: Arc<C>,
426
	backend: Arc<BE>,
427
	blocking_permits: Arc<Semaphore>,
428
	cache: LRUCacheByteLimited<H256, Arc<Vec<TransactionTrace>>>,
429
	wait_list: WaitList,
430
	metrics: Option<CacheMetrics>,
431
	_phantom: PhantomData<B>,
432
}
433

            
434
impl<B, C, BE> CacheTask<B, C, BE>
435
where
436
	BE: Backend<B> + 'static,
437
	BE::State: StateBackend<BlakeTwo256>,
438
	C: ProvideRuntimeApi<B>,
439
	C: StorageProvider<B, BE>,
440
	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
441
	C: Send + Sync + 'static,
442
	B: BlockT<Hash = H256> + Send + Sync + 'static,
443
	B::Header: HeaderT<Number = u32>,
444
	C::Api: BlockBuilder<B>,
445
	C::Api: DebugRuntimeApi<B>,
446
	C::Api: EthereumRuntimeRPCApi<B>,
447
	C::Api: ApiExt<B>,
448
{
449
	/// Create a new cache task.
450
	///
451
	/// Returns a Future that needs to be added to a tokio executor, and a handle allowing to
452
	/// send requests to the task.
453
	pub fn create(
454
		client: Arc<C>,
455
		backend: Arc<BE>,
456
		cache_size_bytes: u64,
457
		blocking_permits: Arc<Semaphore>,
458
		overrides: Arc<dyn StorageOverride<B>>,
459
		prometheus: Option<PrometheusRegistry>,
460
		spawn_handle: SpawnTaskHandle,
461
	) -> (impl Future<Output = ()>, CacheRequester) {
462
		// Communication with the outside world - bounded channel to prevent memory exhaustion
463
		let (requester_tx, mut requester_rx) = mpsc::channel(10_000);
464

            
465
		// Task running in the service.
466
		let task = async move {
467
			let (blocking_tx, mut blocking_rx) =
468
				mpsc::channel(blocking_permits.available_permits().saturating_mul(2));
469

            
470
			// Periodic cleanup interval for orphaned wait list entries
471
			let mut cleanup_interval = interval(Duration::from_secs(30));
472
			cleanup_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
473

            
474
			// Register metrics if prometheus registry is provided
475
			let metrics =
476
				prometheus
477
					.as_ref()
478
					.and_then(|registry| match CacheMetrics::register(registry) {
479
						Ok(metrics) => Some(metrics),
480
						Err(e) => {
481
							log::warn!(
482
								target: CACHE_LOG_TARGET,
483
								"Failed to register trace filter metrics: {:?}",
484
								e
485
							);
486
							None
487
						}
488
					});
489

            
490
			let mut inner = Self {
491
				client,
492
				backend,
493
				blocking_permits,
494
				cache: LRUCacheByteLimited::new(
495
					"trace-filter-blocks-cache",
496
					cache_size_bytes,
497
					prometheus,
498
				),
499
				wait_list: HashMap::new(),
500
				metrics,
501
				_phantom: Default::default(),
502
			};
503

            
504
			loop {
505
				select! {
506
					request = requester_rx.recv().fuse() => {
507
						match request {
508
							None => break,
509
							Some(CacheRequest::GetTraces {sender, block}) =>
510
								inner.request_get_traces(&blocking_tx, sender, block, overrides.clone(), &spawn_handle),
511
						}
512
					},
513
					message = blocking_rx.recv().fuse() => {
514
						if let Some(BlockingTaskMessage::Finished { block_hash, result, duration }) = message {
515
							inner.blocking_finished(block_hash, result, duration);
516
						}
517
					},
518
					_ = cleanup_interval.tick().fuse() => {
519
						inner.cleanup_wait_list();
520
					},
521
				}
522
			}
523
		}
524
		.instrument(tracing::debug_span!("trace_filter_cache"));
525

            
526
		(task, CacheRequester(requester_tx))
527
	}
528

            
529
	/// Handle a request to get traces for a specific block.
530
	/// - If cached: respond immediately
531
	/// - If pending: add to wait list
532
	/// - If new: spawn trace task and add to wait list
533
	fn request_get_traces(
534
		&mut self,
535
		blocking_tx: &mpsc::Sender<BlockingTaskMessage>,
536
		sender: oneshot::Sender<SharedTxsTraceRes>,
537
		block: H256,
538
		overrides: Arc<dyn StorageOverride<B>>,
539
		spawn_handle: &SpawnTaskHandle,
540
	) {
541
		log::trace!(
542
			target: CACHE_LOG_TARGET,
543
			"Request received: block={}, wait_list_size={}",
544
			block,
545
			self.wait_list.len()
546
		);
547

            
548
		// Check if block is already cached
549
		if let Some(cached) = self.cache.get(&block) {
550
			log::trace!(
551
				target: CACHE_LOG_TARGET,
552
				"Cache hit: block={}",
553
				block
554
			);
555
			// Cache hit - respond immediately with Arc::clone (cheap)
556
			let _ = sender.send(Ok(Arc::clone(&cached)));
557
			return;
558
		}
559

            
560
		// Check if block is currently being traced
561
		if let Some(entry) = self.wait_list.get_mut(&block) {
562
			log::trace!(
563
				target: CACHE_LOG_TARGET,
564
				"Joining wait list: block={}, waiters={}",
565
				block,
566
				entry.waiters.len()
567
			);
568
			entry.waiters.push(sender);
569

            
570
			// Increment deduplication metric
571
			if let Some(ref metrics) = self.metrics {
572
				metrics.wait_list_joins_total.inc();
573
			}
574

            
575
			return;
576
		}
577

            
578
		// Add sender to wait list for this new block
579
		self.wait_list.insert(
580
			block,
581
			WaitListEntry {
582
				created_at: Instant::now(),
583
				waiters: vec![sender],
584
			},
585
		);
586

            
587
		log::debug!(
588
			target: CACHE_LOG_TARGET,
589
			"Spawning trace task: block={}, available_permits={}",
590
			block,
591
			self.blocking_permits.available_permits()
592
		);
593

            
594
		// Update metrics
595
		if let Some(ref metrics) = self.metrics {
596
			metrics.tasks_spawned_total.inc();
597
			metrics.wait_list_size.set(self.wait_list.len() as u64);
598
		}
599

            
600
		// Spawn worker task to trace the block
601
		let blocking_permits = Arc::clone(&self.blocking_permits);
602
		let client = Arc::clone(&self.client);
603
		let backend = Arc::clone(&self.backend);
604
		let blocking_tx = blocking_tx.clone();
605
		let start_time = Instant::now();
606

            
607
		spawn_handle.spawn(
608
			"trace-block",
609
			Some("trace-filter"),
610
			async move {
611
				// Wait for permit to limit concurrent tracing operations
612
				let _permit = blocking_permits.acquire().await;
613

            
614
				// Perform block tracing in blocking task with timeout
615
				let result = match tokio::time::timeout(
616
					Duration::from_secs(TRACING_TIMEOUT_SECS),
617
					tokio::task::spawn_blocking(move || {
618
						Self::cache_block(client, backend, block, overrides)
619
					}),
620
				)
621
				.await
622
				{
623
					// Timeout occurred
624
					Err(_elapsed) => {
625
						log::error!(
626
							target: CACHE_LOG_TARGET,
627
							"Tracing timeout for block {}",
628
							block
629
						);
630
						Err(format!(
631
							"Tracing timeout after {} seconds",
632
							TRACING_TIMEOUT_SECS
633
						))
634
					}
635
					// Task completed
636
					Ok(join_result) => {
637
						match join_result {
638
							// Task panicked
639
							Err(join_err) => Err(format!("Tracing panicked: {:?}", join_err)),
640
							// Task succeeded, return its result
641
							Ok(trace_result) => trace_result,
642
						}
643
					}
644
				};
645

            
646
				// Send result back to main task
647
				let duration = start_time.elapsed();
648
				let _ = blocking_tx
649
					.send(BlockingTaskMessage::Finished {
650
						block_hash: block,
651
						result,
652
						duration,
653
					})
654
					.await;
655
			}
656
			.instrument(tracing::trace_span!("trace_block", block = %block)),
657
		);
658
	}
659

            
660
	/// Handle completion of a block trace task.
661
	/// Sends result to all waiting requests and caches it.
662
	/// Uses Arc for zero-copy sharing across multiple waiters.
663
	fn blocking_finished(&mut self, block_hash: H256, result: TxsTraceRes, duration: Duration) {
664
		// Get all waiting senders for this block
665
		if let Some(entry) = self.wait_list.remove(&block_hash) {
666
			let waiter_count = entry.waiters.len();
667

            
668
			// Update wait list size metric
669
			if let Some(ref metrics) = self.metrics {
670
				metrics.wait_list_size.set(self.wait_list.len() as u64);
671
				metrics
672
					.trace_duration_seconds
673
					.observe(duration.as_secs_f64());
674
			}
675

            
676
			match result {
677
				Ok(traces) => {
678
					let trace_count = traces.len();
679
					// Wrap successful result in Arc once
680
					let arc_traces = Arc::new(traces);
681

            
682
					log::debug!(
683
						target: CACHE_LOG_TARGET,
684
						"Trace completed: block={}, traces={}, waiters={}, cached=true, duration={:?}",
685
						block_hash,
686
						trace_count,
687
						waiter_count,
688
						duration
689
					);
690

            
691
					// Send Arc::clone to all waiters (cheap pointer copy, no data duplication)
692
					for sender in entry.waiters {
693
						let _ = sender.send(Ok(Arc::clone(&arc_traces)));
694
					}
695

            
696
					// Cache the Arc-wrapped result
697
					self.cache.put(block_hash, arc_traces);
698
				}
699
				Err(error) => {
700
					log::warn!(
701
						target: CACHE_LOG_TARGET,
702
						"Trace failed: block={}, waiters={}, error={}",
703
						block_hash,
704
						waiter_count,
705
						error
706
					);
707

            
708
					// Wrap error in Arc once
709
					let arc_error = Arc::new(error);
710

            
711
					// Send Arc::clone to all waiters (cheap pointer copy, no string duplication)
712
					for sender in entry.waiters {
713
						let _ = sender.send(Err(Arc::clone(&arc_error)));
714
					}
715
				}
716
			}
717
		}
718
	}
719

            
720
	/// Clean up orphaned wait list entries that have been pending too long.
721
	/// This handles cases where spawned tasks panic or get cancelled.
722
	fn cleanup_wait_list(&mut self) {
723
		let timeout = Duration::from_secs(TRACING_TIMEOUT_SECS + 10);
724
		let now = Instant::now();
725

            
726
		let mut to_remove = Vec::new();
727

            
728
		for (block_hash, entry) in &self.wait_list {
729
			if now.duration_since(entry.created_at) > timeout {
730
				log::warn!(
731
					target: CACHE_LOG_TARGET,
732
					"Cleaning up orphaned wait list entry for block {}",
733
					block_hash
734
				);
735
				to_remove.push(*block_hash);
736
			}
737
		}
738

            
739
		log::debug!(
740
			target: CACHE_LOG_TARGET,
741
			"Wait list status: active_blocks={}, timed_out_block_requests={}",
742
			self.wait_list.len(),
743
			to_remove.len()
744
		);
745

            
746
		// Increment timeout metric for each timed out block
747
		if !to_remove.is_empty() {
748
			if let Some(ref metrics) = self.metrics {
749
				for _ in &to_remove {
750
					metrics.timeouts_total.inc();
751
				}
752
			}
753
		}
754

            
755
		// Remove timed-out entries and notify waiters
756
		let timeout_error =
757
			Arc::new("Trace request timeout (task failed or was cancelled)".to_string());
758

            
759
		for block_hash in to_remove {
760
			if let Some(entry) = self.wait_list.remove(&block_hash) {
761
				for sender in entry.waiters {
762
					let _ = sender.send(Err(Arc::clone(&timeout_error)));
763
				}
764
			}
765
		}
766

            
767
		// Update wait list size metric after cleanup
768
		if let Some(ref metrics) = self.metrics {
769
			metrics.wait_list_size.set(self.wait_list.len() as u64);
770
		}
771
	}
772

            
773
	/// (In blocking task) Use the Runtime API to trace the block.
774
	#[instrument(skip(client, backend, overrides))]
775
	fn cache_block(
776
		client: Arc<C>,
777
		backend: Arc<BE>,
778
		substrate_hash: H256,
779
		overrides: Arc<dyn StorageOverride<B>>,
780
	) -> TxsTraceRes {
781
		// Get Substrate block data.
782
		let api = client.runtime_api();
783
		let block_header = client
784
			.header(substrate_hash)
785
			.map_err(|e| {
786
				format!(
787
					"Error when fetching substrate block {} header : {:?}",
788
					substrate_hash, e
789
				)
790
			})?
791
			.ok_or_else(|| format!("Substrate block {} don't exist", substrate_hash))?;
792

            
793
		let height = *block_header.number();
794
		let substrate_parent_hash = *block_header.parent_hash();
795

            
796
		// Get Ethereum block data.
797
		let (eth_block, eth_transactions) = match (
798
			overrides.current_block(substrate_hash),
799
			overrides.current_transaction_statuses(substrate_hash),
800
		) {
801
			(Some(a), Some(b)) => (a, b),
802
			_ => {
803
				return Err(format!(
804
					"Failed to get Ethereum block data for Substrate block {}",
805
					substrate_hash
806
				))
807
			}
808
		};
809

            
810
		let eth_block_hash = eth_block.header.hash();
811
		let eth_tx_hashes = eth_transactions
812
			.iter()
813
			.map(|t| t.transaction_hash)
814
			.collect();
815

            
816
		// Get extrinsics (containing Ethereum ones)
817
		let extrinsics = backend
818
			.blockchain()
819
			.body(substrate_hash)
820
			.map_err(|e| {
821
				format!(
822
					"Blockchain error when fetching extrinsics of block {} : {:?}",
823
					height, e
824
				)
825
			})?
826
			.ok_or_else(|| format!("Could not find block {} when fetching extrinsics.", height))?;
827

            
828
		// Get DebugRuntimeApi version
829
		let trace_api_version = if let Ok(Some(api_version)) =
830
			api.api_version::<dyn DebugRuntimeApi<B>>(substrate_parent_hash)
831
		{
832
			api_version
833
		} else {
834
			return Err("Runtime api version call failed (trace)".to_string());
835
		};
836

            
837
		// Trace the block.
838
		let f = || -> Result<_, String> {
839
			let result = if trace_api_version >= 5 {
840
				api.trace_block(
841
					substrate_parent_hash,
842
					extrinsics,
843
					eth_tx_hashes,
844
					&block_header,
845
				)
846
			} else {
847
				// Get core runtime api version
848
				let core_api_version = if let Ok(Some(api_version)) =
849
					api.api_version::<dyn Core<B>>(substrate_parent_hash)
850
				{
851
					api_version
852
				} else {
853
					return Err("Runtime api version call failed (core)".to_string());
854
				};
855

            
856
				// Initialize block: calls the "on_initialize" hook on every pallet
857
				// in AllPalletsWithSystem
858
				// This was fine before pallet-message-queue because the XCM messages
859
				// were processed by the "setValidationData" inherent call and not on an
860
				// "on_initialize" hook, which runs before enabling XCM tracing
861
				if core_api_version >= 5 {
862
					api.initialize_block(substrate_parent_hash, &block_header)
863
						.map_err(|e| format!("Runtime api access error: {:?}", e))?;
864
				} else {
865
					#[allow(deprecated)]
866
					api.initialize_block_before_version_5(substrate_parent_hash, &block_header)
867
						.map_err(|e| format!("Runtime api access error: {:?}", e))?;
868
				}
869

            
870
				#[allow(deprecated)]
871
				api.trace_block_before_version_5(substrate_parent_hash, extrinsics, eth_tx_hashes)
872
			};
873

            
874
			result
875
				.map_err(|e| format!("Blockchain error when replaying block {} : {:?}", height, e))?
876
				.map_err(|e| {
877
					tracing::warn!(
878
						target: "tracing",
879
						"Internal runtime error when replaying block {} : {:?}",
880
						height,
881
						e
882
					);
883
					format!(
884
						"Internal runtime error when replaying block {} : {:?}",
885
						height, e
886
					)
887
				})?;
888

            
889
			Ok(moonbeam_rpc_primitives_debug::Response::Block)
890
		};
891

            
892
		let eth_transactions_by_index: BTreeMap<u32, H256> = eth_transactions
893
			.iter()
894
			.map(|t| (t.transaction_index, t.transaction_hash))
895
			.collect();
896

            
897
		let mut proxy = moonbeam_client_evm_tracing::listeners::CallList::default();
898
		proxy.using(f)?;
899

            
900
		let traces: Vec<TransactionTrace> =
901
			moonbeam_client_evm_tracing::formatters::TraceFilter::format(proxy)
902
				.ok_or("Fail to format proxy")?
903
				.into_iter()
904
				.filter_map(|mut trace| {
905
					match eth_transactions_by_index.get(&trace.transaction_position) {
906
						Some(transaction_hash) => {
907
							trace.block_hash = eth_block_hash;
908
							trace.block_number = height;
909
							trace.transaction_hash = *transaction_hash;
910

            
911
							// Reformat error messages.
912
							if let block::TransactionTraceOutput::Error(ref mut error) =
913
								trace.output
914
							{
915
								if error.as_slice() == b"execution reverted" {
916
									*error = b"Reverted".to_vec();
917
								}
918
							}
919

            
920
							Some(trace)
921
						}
922
						None => {
923
							log::warn!(
924
								target: "tracing",
925
								"A trace in block {} does not map to any known ethereum transaction. Trace: {:?}",
926
								height,
927
								trace,
928
							);
929
							None
930
						}
931
					}
932
				})
933
				.collect();
934

            
935
		Ok(traces)
936
	}
937
}