1
// Copyright 2019-2025 PureStake Inc.
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! `trace_filter` RPC handler and its associated service task.
18
//! The RPC handler rely on `CacheTask` which provides a future that must be run inside a tokio
19
//! executor.
20
//!
21
//! The implementation is composed of multiple tasks :
22
//! - Many calls the RPC handler `Trace::filter`, communicating with the main task.
23
//! - A main `CacheTask` managing the cache and the communication between tasks.
24
//! - For each traced block an async task responsible to wait for a permit, spawn a blocking
25
//!   task and waiting for the result, then send it to the main `CacheTask`.
26

            
27
use futures::{select, stream::FuturesUnordered, FutureExt, StreamExt};
28
use std::{collections::BTreeMap, future::Future, marker::PhantomData, sync::Arc, time::Duration};
29
use tokio::{
30
	sync::{mpsc, oneshot, Semaphore},
31
	time::sleep,
32
};
33
use tracing::{instrument, Instrument};
34

            
35
use sc_client_api::backend::{Backend, StateBackend, StorageProvider};
36
use sc_utils::mpsc::TracingUnboundedSender;
37
use sp_api::{ApiExt, Core, ProvideRuntimeApi};
38
use sp_block_builder::BlockBuilder;
39
use sp_blockchain::{
40
	Backend as BlockchainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
41
};
42
use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Header as HeaderT};
43
use substrate_prometheus_endpoint::{
44
	register, Counter, PrometheusError, Registry as PrometheusRegistry, U64,
45
};
46

            
47
use ethereum_types::H256;
48
use fc_storage::StorageOverride;
49
use fp_rpc::EthereumRuntimeRPCApi;
50

            
51
use moonbeam_client_evm_tracing::{
52
	formatters::ResponseFormatter,
53
	types::block::{self, TransactionTrace},
54
};
55
pub use moonbeam_rpc_core_trace::{FilterRequest, TraceServer};
56
use moonbeam_rpc_core_types::{RequestBlockId, RequestBlockTag};
57
use moonbeam_rpc_primitives_debug::DebugRuntimeApi;
58

            
59
type TxsTraceRes = Result<Vec<TransactionTrace>, String>;
60

            
61
/// RPC handler. Will communicate with a `CacheTask` through a `CacheRequester`.
62
pub struct Trace<B, C> {
63
	_phantom: PhantomData<B>,
64
	client: Arc<C>,
65
	requester: CacheRequester,
66
	max_count: u32,
67
}
68

            
69
impl<B, C> Clone for Trace<B, C> {
70
	fn clone(&self) -> Self {
71
		Self {
72
			_phantom: PhantomData,
73
			client: Arc::clone(&self.client),
74
			requester: self.requester.clone(),
75
			max_count: self.max_count,
76
		}
77
	}
78
}
79

            
80
impl<B, C> Trace<B, C>
81
where
82
	B: BlockT<Hash = H256> + Send + Sync + 'static,
83
	B::Header: HeaderT<Number = u32>,
84
	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
85
	C: Send + Sync + 'static,
86
{
87
	/// Create a new RPC handler.
88
	pub fn new(client: Arc<C>, requester: CacheRequester, max_count: u32) -> Self {
89
		Self {
90
			client,
91
			requester,
92
			max_count,
93
			_phantom: PhantomData,
94
		}
95
	}
96

            
97
	/// Convert an optional block ID (number or tag) to a block height.
98
	fn block_id(&self, id: Option<RequestBlockId>) -> Result<u32, &'static str> {
99
		match id {
100
			Some(RequestBlockId::Number(n)) => Ok(n),
101
			None | Some(RequestBlockId::Tag(RequestBlockTag::Latest)) => {
102
				Ok(self.client.info().best_number)
103
			}
104
			Some(RequestBlockId::Tag(RequestBlockTag::Earliest)) => Ok(0),
105
			Some(RequestBlockId::Tag(RequestBlockTag::Pending)) => {
106
				Err("'pending' is not supported")
107
			}
108
			Some(RequestBlockId::Hash(_)) => Err("Block hash not supported"),
109
		}
110
	}
111

            
112
	/// `trace_filter` endpoint (wrapped in the trait implementation with futures compatibility)
113
	async fn filter(self, req: FilterRequest) -> TxsTraceRes {
114
		let from_block = self.block_id(req.from_block)?;
115
		let to_block = self.block_id(req.to_block)?;
116
		let block_heights = from_block..=to_block;
117

            
118
		let count = req.count.unwrap_or(self.max_count);
119
		if count > self.max_count {
120
			return Err(format!(
121
				"count ({}) can't be greater than maximum ({})",
122
				count, self.max_count
123
			));
124
		}
125

            
126
		// Build a list of all the Substrate block hashes that need to be traced.
127
		let mut block_hashes = vec![];
128
		for block_height in block_heights {
129
			if block_height == 0 {
130
				continue; // no traces for genesis block.
131
			}
132

            
133
			let block_hash = self
134
				.client
135
				.hash(block_height)
136
				.map_err(|e| {
137
					format!(
138
						"Error when fetching block {} header : {:?}",
139
						block_height, e
140
					)
141
				})?
142
				.ok_or_else(|| format!("Block with height {} don't exist", block_height))?;
143

            
144
			block_hashes.push(block_hash);
145
		}
146

            
147
		// Start a batch with these blocks.
148
		let batch_id = self.requester.start_batch(block_hashes.clone()).await?;
149
		// Fetch all the traces. It is done in another function to simplify error handling and allow
150
		// to call the following `stop_batch` regardless of the result. This is important for the
151
		// cache cleanup to work properly.
152
		let res = self.fetch_traces(req, &block_hashes, count as usize).await;
153
		// Stop the batch, allowing the cache task to remove useless non-started block traces and
154
		// start the expiration delay.
155
		self.requester.stop_batch(batch_id).await;
156

            
157
		res
158
	}
159

            
160
	async fn fetch_traces(
161
		&self,
162
		req: FilterRequest,
163
		block_hashes: &[H256],
164
		count: usize,
165
	) -> TxsTraceRes {
166
		let from_address = req.from_address.unwrap_or_default();
167
		let to_address = req.to_address.unwrap_or_default();
168

            
169
		let mut traces_amount: i64 = -(req.after.unwrap_or(0) as i64);
170
		let mut traces = vec![];
171

            
172
		for &block_hash in block_hashes {
173
			// Request the traces of this block to the cache service.
174
			// This will resolve quickly if the block is already cached, or wait until the block
175
			// has finished tracing.
176
			let block_traces = self.requester.get_traces(block_hash).await?;
177

            
178
			// Filter addresses.
179
			let mut block_traces: Vec<_> = block_traces
180
				.iter()
181
				.filter(|trace| match trace.action {
182
					block::TransactionTraceAction::Call { from, to, .. } => {
183
						(from_address.is_empty() || from_address.contains(&from))
184
							&& (to_address.is_empty() || to_address.contains(&to))
185
					}
186
					block::TransactionTraceAction::Create { from, .. } => {
187
						(from_address.is_empty() || from_address.contains(&from))
188
							&& to_address.is_empty()
189
					}
190
					block::TransactionTraceAction::Suicide { address, .. } => {
191
						(from_address.is_empty() || from_address.contains(&address))
192
							&& to_address.is_empty()
193
					}
194
				})
195
				.cloned()
196
				.collect();
197

            
198
			// Don't insert anything if we're still before "after"
199
			traces_amount += block_traces.len() as i64;
200
			if traces_amount > 0 {
201
				let traces_amount = traces_amount as usize;
202
				// If the current Vec of traces is across the "after" marker,
203
				// we skip some elements of it.
204
				if traces_amount < block_traces.len() {
205
					let skip = block_traces.len() - traces_amount;
206
					block_traces = block_traces.into_iter().skip(skip).collect();
207
				}
208

            
209
				traces.append(&mut block_traces);
210

            
211
				// If we go over "count" (the limit), we trim and exit the loop,
212
				// unless we used the default maximum, in which case we return an error.
213
				if traces_amount >= count {
214
					if req.count.is_none() {
215
						return Err(format!(
216
							"the amount of traces goes over the maximum ({}), please use 'after' \
217
							and 'count' in your request",
218
							self.max_count
219
						));
220
					}
221

            
222
					traces = traces.into_iter().take(count).collect();
223
					break;
224
				}
225
			}
226
		}
227

            
228
		Ok(traces)
229
	}
230
}
231

            
232
#[jsonrpsee::core::async_trait]
233
impl<B, C> TraceServer for Trace<B, C>
234
where
235
	B: BlockT<Hash = H256> + Send + Sync + 'static,
236
	B::Header: HeaderT<Number = u32>,
237
	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
238
	C: Send + Sync + 'static,
239
{
240
	async fn filter(
241
		&self,
242
		filter: FilterRequest,
243
	) -> jsonrpsee::core::RpcResult<Vec<TransactionTrace>> {
244
		self.clone()
245
			.filter(filter)
246
			.await
247
			.map_err(fc_rpc::internal_err)
248
	}
249
}
250

            
251
/// An opaque batch ID.
252
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
253
pub struct CacheBatchId(u64);
254

            
255
/// Requests the cache task can accept.
256
enum CacheRequest {
257
	/// Request to start caching the provided range of blocks.
258
	/// The task will add to blocks to its pool and immediately return a new batch ID.
259
	StartBatch {
260
		/// Returns the ID of the batch for cancellation.
261
		sender: oneshot::Sender<CacheBatchId>,
262
		/// List of block hash to trace.
263
		blocks: Vec<H256>,
264
	},
265
	/// Fetch the traces for given block hash.
266
	/// The task will answer only when it has processed this block.
267
	GetTraces {
268
		/// Returns the array of traces or an error.
269
		sender: oneshot::Sender<TxsTraceRes>,
270
		/// Hash of the block.
271
		block: H256,
272
	},
273
	/// Notify the cache that it can stop the batch with that ID. Any block contained only in
274
	/// this batch and still not started will be discarded.
275
	StopBatch { batch_id: CacheBatchId },
276
}
277

            
278
/// Allows to interact with the cache task.
279
#[derive(Clone)]
280
pub struct CacheRequester(TracingUnboundedSender<CacheRequest>);
281

            
282
impl CacheRequester {
283
	/// Request to start caching the provided range of blocks.
284
	/// The task will add to blocks to its pool and immediately return the batch ID.
285
	#[instrument(skip(self))]
286
	pub async fn start_batch(&self, blocks: Vec<H256>) -> Result<CacheBatchId, String> {
287
		let (response_tx, response_rx) = oneshot::channel();
288
		let sender = self.0.clone();
289

            
290
		sender
291
			.unbounded_send(CacheRequest::StartBatch {
292
				sender: response_tx,
293
				blocks,
294
			})
295
			.map_err(|e| {
296
				format!(
297
					"Failed to send request to the trace cache task. Error : {:?}",
298
					e
299
				)
300
			})?;
301

            
302
		response_rx.await.map_err(|e| {
303
			format!(
304
				"Trace cache task closed the response channel. Error : {:?}",
305
				e
306
			)
307
		})
308
	}
309

            
310
	/// Fetch the traces for given block hash.
311
	/// The task will answer only when it has processed this block.
312
	/// The block should be part of a batch first. If no batch has requested the block it will
313
	/// return an error.
314
	#[instrument(skip(self))]
315
	pub async fn get_traces(&self, block: H256) -> TxsTraceRes {
316
		let (response_tx, response_rx) = oneshot::channel();
317
		let sender = self.0.clone();
318

            
319
		sender
320
			.unbounded_send(CacheRequest::GetTraces {
321
				sender: response_tx,
322
				block,
323
			})
324
			.map_err(|e| {
325
				format!(
326
					"Failed to send request to the trace cache task. Error : {:?}",
327
					e
328
				)
329
			})?;
330

            
331
		response_rx
332
			.await
333
			.map_err(|e| {
334
				format!(
335
					"Trace cache task closed the response channel. Error : {:?}",
336
					e
337
				)
338
			})?
339
			.map_err(|e| format!("Failed to replay block. Error : {:?}", e))
340
	}
341

            
342
	/// Notify the cache that it can stop the batch with that ID. Any block contained only in
343
	/// this batch and still in the waiting pool will be discarded.
344
	#[instrument(skip(self))]
345
	pub async fn stop_batch(&self, batch_id: CacheBatchId) {
346
		let sender = self.0.clone();
347

            
348
		// Here we don't care if the request has been accepted or refused, the caller can't
349
		// do anything with it.
350
		let _ = sender
351
			.unbounded_send(CacheRequest::StopBatch { batch_id })
352
			.map_err(|e| {
353
				format!(
354
					"Failed to send request to the trace cache task. Error : {:?}",
355
					e
356
				)
357
			});
358
	}
359
}
360

            
361
/// Data stored for each block in the cache.
362
/// `active_batch_count` represents the number of batches using this
363
/// block. It will increase immediately when a batch is created, but will be
364
/// decrease only after the batch ends and its expiration delay passes.
365
/// It allows to keep the data in the cache for following requests that would use
366
/// this block, which is important to handle pagination efficiently.
367
struct CacheBlock {
368
	active_batch_count: usize,
369
	state: CacheBlockState,
370
}
371

            
372
/// State of a cached block. It can either be polled to be traced or cached.
373
enum CacheBlockState {
374
	/// Block has been added to the pool blocks to be replayed.
375
	/// It may be currently waiting to be replayed or being replayed.
376
	Pooled {
377
		started: bool,
378
		/// Multiple requests might query the same block while it is pooled to be
379
		/// traced. They response channel is stored here, and the result will be
380
		/// sent in all of them when the tracing is finished.
381
		waiting_requests: Vec<oneshot::Sender<TxsTraceRes>>,
382
		/// Channel used to unqueue a tracing that has not yet started.
383
		/// A tracing will be unqueued if it has not yet been started and the last batch
384
		/// needing this block is ended (ignoring the expiration delay).
385
		/// It is not used directly, but dropping will wake up the receiver.
386
		#[allow(dead_code)]
387
		unqueue_sender: oneshot::Sender<()>,
388
	},
389
	/// Tracing has been completed and the result is available. No Runtime API call
390
	/// will be needed until this block cache is removed.
391
	Cached { traces: TxsTraceRes },
392
}
393

            
394
/// Tracing a block is done in a separate tokio blocking task to avoid clogging the async threads.
395
/// For this reason a channel using this type is used by the blocking task to communicate with the
396
/// main cache task.
397
enum BlockingTaskMessage {
398
	/// Notify the tracing for this block has started as the blocking task got a permit from
399
	/// the semaphore. This is used to prevent the deletion of a cache entry for a block that has
400
	/// started being traced.
401
	Started { block_hash: H256 },
402
	/// The tracing is finished and the result is sent to the main task.
403
	Finished {
404
		block_hash: H256,
405
		result: TxsTraceRes,
406
	},
407
}
408

            
409
/// Type wrapper for the cache task, generic over the Client, Block and Backend types.
410
pub struct CacheTask<B, C, BE> {
411
	client: Arc<C>,
412
	backend: Arc<BE>,
413
	blocking_permits: Arc<Semaphore>,
414
	cached_blocks: BTreeMap<H256, CacheBlock>,
415
	batches: BTreeMap<u64, Vec<H256>>,
416
	next_batch_id: u64,
417
	metrics: Option<Metrics>,
418
	_phantom: PhantomData<B>,
419
}
420

            
421
impl<B, C, BE> CacheTask<B, C, BE>
422
where
423
	BE: Backend<B> + 'static,
424
	BE::State: StateBackend<BlakeTwo256>,
425
	C: ProvideRuntimeApi<B>,
426
	C: StorageProvider<B, BE>,
427
	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
428
	C: Send + Sync + 'static,
429
	B: BlockT<Hash = H256> + Send + Sync + 'static,
430
	B::Header: HeaderT<Number = u32>,
431
	C::Api: BlockBuilder<B>,
432
	C::Api: DebugRuntimeApi<B>,
433
	C::Api: EthereumRuntimeRPCApi<B>,
434
	C::Api: ApiExt<B>,
435
{
436
	/// Create a new cache task.
437
	///
438
	/// Returns a Future that needs to be added to a tokio executor, and a handle allowing to
439
	/// send requests to the task.
440
	pub fn create(
441
		client: Arc<C>,
442
		backend: Arc<BE>,
443
		cache_duration: Duration,
444
		blocking_permits: Arc<Semaphore>,
445
		overrides: Arc<dyn StorageOverride<B>>,
446
		prometheus: Option<PrometheusRegistry>,
447
	) -> (impl Future<Output = ()>, CacheRequester) {
448
		// Communication with the outside world :
449
		let (requester_tx, mut requester_rx) =
450
			sc_utils::mpsc::tracing_unbounded("trace-filter-cache", 100_000);
451

            
452
		// Task running in the service.
453
		let task = async move {
454
			// The following variables are polled by the select! macro, and thus cannot be
455
			// part of Self without introducing borrowing issues.
456
			let mut batch_expirations = FuturesUnordered::new();
457
			let (blocking_tx, mut blocking_rx) =
458
				mpsc::channel(blocking_permits.available_permits() * 2);
459
			let metrics = if let Some(registry) = prometheus {
460
				match Metrics::register(&registry) {
461
					Ok(metrics) => Some(metrics),
462
					Err(err) => {
463
						log::error!(target: "tracing", "Failed to register metrics {err:?}");
464
						None
465
					}
466
				}
467
			} else {
468
				None
469
			};
470
			// Contains the inner state of the cache task, excluding the pooled futures/channels.
471
			// Having this object allows to refactor each event into its own function, simplifying
472
			// the main loop.
473
			let mut inner = Self {
474
				client,
475
				backend,
476
				blocking_permits,
477
				cached_blocks: BTreeMap::new(),
478
				batches: BTreeMap::new(),
479
				next_batch_id: 0,
480
				metrics,
481
				_phantom: Default::default(),
482
			};
483

            
484
			// Main event loop. This loop must not contain any direct .await, as we want to
485
			// react to events as fast as possible.
486
			loop {
487
				select! {
488
					request = requester_rx.next() => {
489
						match request {
490
							None => break,
491
							Some(CacheRequest::StartBatch {sender, blocks})
492
								=> inner.request_start_batch(&blocking_tx, sender, blocks, overrides.clone()),
493
							Some(CacheRequest::GetTraces {sender, block})
494
								=> inner.request_get_traces(sender, block),
495
							Some(CacheRequest::StopBatch {batch_id}) => {
496
								// Cannot be refactored inside `request_stop_batch` because
497
								// it has an unnamable type :C
498
								batch_expirations.push(async move {
499
									sleep(cache_duration).await;
500
									batch_id
501
								});
502

            
503
								inner.request_stop_batch(batch_id);
504
							},
505
						}
506
					},
507
					message = blocking_rx.recv().fuse() => {
508
						match message {
509
							None => (),
510
							Some(BlockingTaskMessage::Started { block_hash })
511
								=> inner.blocking_started(block_hash),
512
							Some(BlockingTaskMessage::Finished { block_hash, result })
513
								=> inner.blocking_finished(block_hash, result),
514
						}
515
					},
516
					batch_id = batch_expirations.next() => {
517
						match batch_id {
518
							None => (),
519
							Some(batch_id) => inner.expired_batch(batch_id),
520
						}
521
					}
522
				}
523
			}
524
		}
525
		.instrument(tracing::debug_span!("trace_filter_cache"));
526

            
527
		(task, CacheRequester(requester_tx))
528
	}
529

            
530
	/// Handle the creation of a batch.
531
	/// Will start the tracing process for blocks that are not already in the cache.
532
	#[instrument(skip(self, blocking_tx, sender, blocks, overrides))]
533
	fn request_start_batch(
534
		&mut self,
535
		blocking_tx: &mpsc::Sender<BlockingTaskMessage>,
536
		sender: oneshot::Sender<CacheBatchId>,
537
		blocks: Vec<H256>,
538
		overrides: Arc<dyn StorageOverride<B>>,
539
	) {
540
		tracing::trace!("Starting batch {}", self.next_batch_id);
541
		self.batches.insert(self.next_batch_id, blocks.clone());
542

            
543
		for block in blocks {
544
			// The block is already in the cache, awesome!
545
			if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
546
				block_cache.active_batch_count += 1;
547
				tracing::trace!(
548
					"Cache hit for block {}, now used by {} batches.",
549
					block,
550
					block_cache.active_batch_count
551
				);
552
			}
553
			// Otherwise we need to queue this block for tracing.
554
			else {
555
				tracing::trace!("Cache miss for block {}, pooling it for tracing.", block);
556

            
557
				let blocking_permits = Arc::clone(&self.blocking_permits);
558
				let (unqueue_sender, unqueue_receiver) = oneshot::channel();
559
				let client = Arc::clone(&self.client);
560
				let backend = Arc::clone(&self.backend);
561
				let blocking_tx = blocking_tx.clone();
562
				let overrides = overrides.clone();
563

            
564
				// Spawn all block caching asynchronously.
565
				// It will wait to obtain a permit, then spawn a blocking task.
566
				// When the blocking task returns its result, it is sent
567
				// thought a channel to the main task loop.
568
				tokio::spawn(
569
					async move {
570
						tracing::trace!("Waiting for blocking permit or task cancellation");
571
						let _permit = select!(
572
							_ = unqueue_receiver.fuse() => {
573
							tracing::trace!("Tracing of the block has been cancelled.");
574
								return;
575
							},
576
							permit = blocking_permits.acquire().fuse() => permit,
577
						);
578

            
579
						// Warn the main task that block tracing as started, and
580
						// this block cache entry should not be removed.
581
						let _ = blocking_tx
582
							.send(BlockingTaskMessage::Started { block_hash: block })
583
							.await;
584

            
585
						tracing::trace!("Start block tracing in a blocking task.");
586

            
587
						// Perform block tracing in a tokio blocking task.
588
						let result = async {
589
							tokio::task::spawn_blocking(move || {
590
								Self::cache_block(client, backend, block, overrides.clone())
591
							})
592
							.await
593
							.map_err(|e| {
594
								format!("Tracing Substrate block {} panicked : {:?}", block, e)
595
							})?
596
						}
597
						.await
598
						.map_err(|e| e.to_string());
599

            
600
						tracing::trace!("Block tracing finished, sending result to main task.");
601

            
602
						// Send a response to the main task.
603
						let _ = blocking_tx
604
							.send(BlockingTaskMessage::Finished {
605
								block_hash: block,
606
								result,
607
							})
608
							.await;
609
					}
610
					.instrument(tracing::trace_span!("Block tracing", block = %block)),
611
				);
612

            
613
				// Insert the block in the cache.
614
				self.cached_blocks.insert(
615
					block,
616
					CacheBlock {
617
						active_batch_count: 1,
618
						state: CacheBlockState::Pooled {
619
							started: false,
620
							waiting_requests: vec![],
621
							unqueue_sender,
622
						},
623
					},
624
				);
625
			}
626
		}
627

            
628
		// Respond with the batch ID.
629
		let _ = sender.send(CacheBatchId(self.next_batch_id));
630

            
631
		// Increase batch ID for the next request.
632
		self.next_batch_id = self.next_batch_id.overflowing_add(1).0;
633
	}
634

            
635
	/// Handle a request to get the traces of the provided block.
636
	/// - If the result is stored in the cache, it sends it immediately.
637
	/// - If the block is currently being pooled, it is added to this block cache waiting list,
638
	///   and all requests concerning this block will be satisfied when the tracing for this block
639
	///   is finished.
640
	/// - If this block is missing from the cache, it means no batch asked for it. All requested
641
	///   blocks should be contained in a batch beforehand, and thus an error is returned.
642
	#[instrument(skip(self))]
643
	fn request_get_traces(&mut self, sender: oneshot::Sender<TxsTraceRes>, block: H256) {
644
		if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
645
			match &mut block_cache.state {
646
				CacheBlockState::Pooled {
647
					ref mut waiting_requests,
648
					..
649
				} => {
650
					tracing::warn!(
651
						"A request asked a pooled block ({}), adding it to the list of \
652
						waiting requests.",
653
						block
654
					);
655
					waiting_requests.push(sender);
656
					if let Some(metrics) = &self.metrics {
657
						metrics.tracing_cache_misses.inc();
658
					}
659
				}
660
				CacheBlockState::Cached { traces, .. } => {
661
					tracing::warn!(
662
						"A request asked a cached block ({}), sending the traces directly.",
663
						block
664
					);
665
					let _ = sender.send(traces.clone());
666
					if let Some(metrics) = &self.metrics {
667
						metrics.tracing_cache_hits.inc();
668
					}
669
				}
670
			}
671
		} else {
672
			tracing::warn!(
673
				"An RPC request asked to get a block ({}) which was not batched.",
674
				block
675
			);
676
			let _ = sender.send(Err(format!(
677
				"RPC request asked a block ({}) that was not batched",
678
				block
679
			)));
680
		}
681
	}
682

            
683
	/// Handle a request to stop a batch.
684
	/// For all blocks that needed to be traced, are only in this batch and not yet started, their
685
	/// tracing is cancelled to save CPU-time and avoid attacks requesting large amount of blocks.
686
	/// This batch data is not yet removed however. Instead a expiration delay timer is started
687
	/// after which the data will indeed be cleared. (the code for that is in the main loop code
688
	/// as it involved an unnamable type :C)
689
	#[instrument(skip(self))]
690
	fn request_stop_batch(&mut self, batch_id: CacheBatchId) {
691
		tracing::trace!("Stopping batch {}", batch_id.0);
692
		if let Some(blocks) = self.batches.get(&batch_id.0) {
693
			for block in blocks {
694
				let mut remove = false;
695

            
696
				// We remove early the block cache if this batch is the last
697
				// pooling this block.
698
				if let Some(block_cache) = self.cached_blocks.get_mut(block) {
699
					if block_cache.active_batch_count == 1
700
						&& matches!(
701
							block_cache.state,
702
							CacheBlockState::Pooled { started: false, .. }
703
						) {
704
						remove = true;
705
					}
706
				}
707

            
708
				if remove {
709
					tracing::trace!("Pooled block {} is no longer requested.", block);
710
					// Remove block from the cache. Drops the value,
711
					// closing all the channels contained in it.
712
					let _ = self.cached_blocks.remove(block);
713
				}
714
			}
715
		}
716
	}
717

            
718
	/// A tracing blocking task notifies it got a permit and is starting the tracing.
719
	/// This started status is stored to avoid removing this block entry.
720
	#[instrument(skip(self))]
721
	fn blocking_started(&mut self, block_hash: H256) {
722
		if let Some(block_cache) = self.cached_blocks.get_mut(&block_hash) {
723
			if let CacheBlockState::Pooled {
724
				ref mut started, ..
725
			} = block_cache.state
726
			{
727
				*started = true;
728
			}
729
		}
730
	}
731

            
732
	/// A tracing blocking task notifies it has finished the tracing and provide the result.
733
	#[instrument(skip(self, result))]
734
	fn blocking_finished(&mut self, block_hash: H256, result: TxsTraceRes) {
735
		// In some cases it might be possible to receive traces of a block
736
		// that has no entry in the cache because it was removed of the pool
737
		// and received a permit concurrently. We just ignore it.
738
		//
739
		// TODO : Should we add it back ? Should it have an active_batch_count
740
		// of 1 then ?
741
		if let Some(block_cache) = self.cached_blocks.get_mut(&block_hash) {
742
			if let CacheBlockState::Pooled {
743
				ref mut waiting_requests,
744
				..
745
			} = block_cache.state
746
			{
747
				tracing::trace!(
748
					"A new block ({}) has been traced, adding it to the cache and responding to \
749
					{} waiting requests.",
750
					block_hash,
751
					waiting_requests.len()
752
				);
753
				// Send result in waiting channels
754
				while let Some(channel) = waiting_requests.pop() {
755
					let _ = channel.send(result.clone());
756
				}
757

            
758
				// Update cache entry
759
				block_cache.state = CacheBlockState::Cached { traces: result };
760
			}
761
		}
762
	}
763

            
764
	/// A batch expiration delay timer has completed. It performs the cache cleaning for blocks
765
	/// not longer used by other batches.
766
	#[instrument(skip(self))]
767
	fn expired_batch(&mut self, batch_id: CacheBatchId) {
768
		if let Some(batch) = self.batches.remove(&batch_id.0) {
769
			for block in batch {
770
				// For each block of the batch, we remove it if it was the
771
				// last batch containing it.
772
				let mut remove = false;
773
				if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
774
					block_cache.active_batch_count -= 1;
775

            
776
					if block_cache.active_batch_count == 0 {
777
						remove = true;
778
					}
779
				}
780

            
781
				if remove {
782
					let _ = self.cached_blocks.remove(&block);
783
				}
784
			}
785
		}
786
	}
787

            
788
	/// (In blocking task) Use the Runtime API to trace the block.
789
	#[instrument(skip(client, backend, overrides))]
790
	fn cache_block(
791
		client: Arc<C>,
792
		backend: Arc<BE>,
793
		substrate_hash: H256,
794
		overrides: Arc<dyn StorageOverride<B>>,
795
	) -> TxsTraceRes {
796
		// Get Subtrate block data.
797
		let api = client.runtime_api();
798
		let block_header = client
799
			.header(substrate_hash)
800
			.map_err(|e| {
801
				format!(
802
					"Error when fetching substrate block {} header : {:?}",
803
					substrate_hash, e
804
				)
805
			})?
806
			.ok_or_else(|| format!("Substrate block {} don't exist", substrate_hash))?;
807

            
808
		let height = *block_header.number();
809
		let substrate_parent_hash = *block_header.parent_hash();
810

            
811
		// Get Ethereum block data.
812
		let (eth_block, eth_transactions) = match (
813
			overrides.current_block(substrate_hash),
814
			overrides.current_transaction_statuses(substrate_hash),
815
		) {
816
			(Some(a), Some(b)) => (a, b),
817
			_ => {
818
				return Err(format!(
819
					"Failed to get Ethereum block data for Substrate block {}",
820
					substrate_hash
821
				))
822
			}
823
		};
824

            
825
		let eth_block_hash = eth_block.header.hash();
826
		let eth_tx_hashes = eth_transactions
827
			.iter()
828
			.map(|t| t.transaction_hash)
829
			.collect();
830

            
831
		// Get extrinsics (containing Ethereum ones)
832
		let extrinsics = backend
833
			.blockchain()
834
			.body(substrate_hash)
835
			.map_err(|e| {
836
				format!(
837
					"Blockchain error when fetching extrinsics of block {} : {:?}",
838
					height, e
839
				)
840
			})?
841
			.ok_or_else(|| format!("Could not find block {} when fetching extrinsics.", height))?;
842

            
843
		// Get DebugRuntimeApi version
844
		let trace_api_version = if let Ok(Some(api_version)) =
845
			api.api_version::<dyn DebugRuntimeApi<B>>(substrate_parent_hash)
846
		{
847
			api_version
848
		} else {
849
			return Err("Runtime api version call failed (trace)".to_string());
850
		};
851

            
852
		// Trace the block.
853
		let f = || -> Result<_, String> {
854
			let result = if trace_api_version >= 5 {
855
				api.trace_block(
856
					substrate_parent_hash,
857
					extrinsics,
858
					eth_tx_hashes,
859
					&block_header,
860
				)
861
			} else {
862
				// Get core runtime api version
863
				let core_api_version = if let Ok(Some(api_version)) =
864
					api.api_version::<dyn Core<B>>(substrate_parent_hash)
865
				{
866
					api_version
867
				} else {
868
					return Err("Runtime api version call failed (core)".to_string());
869
				};
870

            
871
				// Initialize block: calls the "on_initialize" hook on every pallet
872
				// in AllPalletsWithSystem
873
				// This was fine before pallet-message-queue because the XCM messages
874
				// were processed by the "setValidationData" inherent call and not on an
875
				// "on_initialize" hook, which runs before enabling XCM tracing
876
				if core_api_version >= 5 {
877
					api.initialize_block(substrate_parent_hash, &block_header)
878
						.map_err(|e| format!("Runtime api access error: {:?}", e))?;
879
				} else {
880
					#[allow(deprecated)]
881
					api.initialize_block_before_version_5(substrate_parent_hash, &block_header)
882
						.map_err(|e| format!("Runtime api access error: {:?}", e))?;
883
				}
884

            
885
				#[allow(deprecated)]
886
				api.trace_block_before_version_5(substrate_parent_hash, extrinsics, eth_tx_hashes)
887
			};
888

            
889
			result
890
				.map_err(|e| format!("Blockchain error when replaying block {} : {:?}", height, e))?
891
				.map_err(|e| {
892
					tracing::warn!(
893
						target: "tracing",
894
						"Internal runtime error when replaying block {} : {:?}",
895
						height,
896
						e
897
					);
898
					format!(
899
						"Internal runtime error when replaying block {} : {:?}",
900
						height, e
901
					)
902
				})?;
903

            
904
			Ok(moonbeam_rpc_primitives_debug::Response::Block)
905
		};
906

            
907
		let eth_transactions_by_index: BTreeMap<u32, H256> = eth_transactions
908
			.iter()
909
			.map(|t| (t.transaction_index, t.transaction_hash))
910
			.collect();
911

            
912
		let mut proxy = moonbeam_client_evm_tracing::listeners::CallList::default();
913
		proxy.using(f)?;
914

            
915
		let traces: Vec<TransactionTrace> =
916
			moonbeam_client_evm_tracing::formatters::TraceFilter::format(proxy)
917
				.ok_or("Fail to format proxy")?
918
				.into_iter()
919
				.filter_map(|mut trace| {
920
					match eth_transactions_by_index.get(&trace.transaction_position) {
921
						Some(transaction_hash) => {
922
							trace.block_hash = eth_block_hash;
923
							trace.block_number = height;
924
							trace.transaction_hash = *transaction_hash;
925

            
926
							// Reformat error messages.
927
							if let block::TransactionTraceOutput::Error(ref mut error) =
928
								trace.output
929
							{
930
								if error.as_slice() == b"execution reverted" {
931
									*error = b"Reverted".to_vec();
932
								}
933
							}
934

            
935
							Some(trace)
936
						}
937
						None => {
938
							log::warn!(
939
								target: "tracing",
940
								"A trace in block {} does not map to any known ethereum transaction. Trace: {:?}",
941
								height,
942
								trace,
943
							);
944
							None
945
						}
946
					}
947
				})
948
				.collect();
949

            
950
		Ok(traces)
951
	}
952
}
953

            
954
/// Prometheus metrics for tracing.
955
#[derive(Clone)]
956
pub(crate) struct Metrics {
957
	tracing_cache_hits: Counter<U64>,
958
	tracing_cache_misses: Counter<U64>,
959
}
960

            
961
impl Metrics {
962
	pub(crate) fn register(registry: &PrometheusRegistry) -> Result<Self, PrometheusError> {
963
		Ok(Self {
964
			tracing_cache_hits: register(
965
				Counter::new("tracing_cache_hits", "Number of tracing cache hits.")?,
966
				registry,
967
			)?,
968
			tracing_cache_misses: register(
969
				Counter::new("tracing_cache_misses", "Number of tracing cache misses.")?,
970
				registry,
971
			)?,
972
		})
973
	}
974
}