1
// Copyright 2019-2025 PureStake Inc.
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! `trace_filter` RPC handler and its associated service task.
18
//! The RPC handler rely on `CacheTask` which provides a future that must be run inside a tokio
19
//! executor.
20
//!
21
//! The implementation is composed of multiple tasks :
22
//! - Many calls the RPC handler `Trace::filter`, communicating with the main task.
23
//! - A main `CacheTask` managing the cache and the communication between tasks.
24
//! - For each traced block an async task responsible to wait for a permit, spawn a blocking
25
//!   task and waiting for the result, then send it to the main `CacheTask`.
26

            
27
use futures::{select, stream::FuturesUnordered, FutureExt, StreamExt};
28
use std::{collections::BTreeMap, future::Future, marker::PhantomData, sync::Arc, time::Duration};
29
use tokio::{
30
	sync::{mpsc, oneshot, Semaphore},
31
	time::sleep,
32
};
33
use tracing::{instrument, Instrument};
34

            
35
use sc_client_api::backend::{Backend, StateBackend, StorageProvider};
36
use sc_utils::mpsc::TracingUnboundedSender;
37
use sp_api::{ApiExt, Core, ProvideRuntimeApi};
38
use sp_block_builder::BlockBuilder;
39
use sp_blockchain::{
40
	Backend as BlockchainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
41
};
42
use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Header as HeaderT};
43
use substrate_prometheus_endpoint::{
44
	register, Counter, PrometheusError, Registry as PrometheusRegistry, U64,
45
};
46

            
47
use ethereum_types::H256;
48
use fc_storage::StorageOverride;
49
use fp_rpc::EthereumRuntimeRPCApi;
50

            
51
use moonbeam_client_evm_tracing::{
52
	formatters::ResponseFormatter,
53
	types::block::{self, TransactionTrace},
54
};
55
pub use moonbeam_rpc_core_trace::{FilterRequest, TraceServer};
56
use moonbeam_rpc_core_types::{RequestBlockId, RequestBlockTag};
57
use moonbeam_rpc_primitives_debug::DebugRuntimeApi;
58

            
59
type TxsTraceRes = Result<Vec<TransactionTrace>, String>;
60

            
61
/// RPC handler. Will communicate with a `CacheTask` through a `CacheRequester`.
62
pub struct Trace<B, C> {
63
	_phantom: PhantomData<B>,
64
	client: Arc<C>,
65
	requester: CacheRequester,
66
	max_count: u32,
67
}
68

            
69
impl<B, C> Clone for Trace<B, C> {
70
	fn clone(&self) -> Self {
71
		Self {
72
			_phantom: PhantomData,
73
			client: Arc::clone(&self.client),
74
			requester: self.requester.clone(),
75
			max_count: self.max_count,
76
		}
77
	}
78
}
79

            
80
impl<B, C> Trace<B, C>
81
where
82
	B: BlockT<Hash = H256> + Send + Sync + 'static,
83
	B::Header: HeaderT<Number = u32>,
84
	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
85
	C: Send + Sync + 'static,
86
{
87
	/// Create a new RPC handler.
88
	pub fn new(client: Arc<C>, requester: CacheRequester, max_count: u32) -> Self {
89
		Self {
90
			client,
91
			requester,
92
			max_count,
93
			_phantom: PhantomData,
94
		}
95
	}
96

            
97
	/// Convert an optional block ID (number or tag) to a block height.
98
	fn block_id(&self, id: Option<RequestBlockId>) -> Result<u32, &'static str> {
99
		match id {
100
			Some(RequestBlockId::Number(n)) => Ok(n),
101
			None | Some(RequestBlockId::Tag(RequestBlockTag::Latest)) => {
102
				Ok(self.client.info().best_number)
103
			}
104
			Some(RequestBlockId::Tag(RequestBlockTag::Earliest)) => Ok(0),
105
			Some(RequestBlockId::Tag(RequestBlockTag::Finalized)) => {
106
				Ok(self.client.info().finalized_number)
107
			}
108
			Some(RequestBlockId::Tag(RequestBlockTag::Pending)) => {
109
				Err("'pending' is not supported")
110
			}
111
			Some(RequestBlockId::Hash(_)) => Err("Block hash not supported"),
112
		}
113
	}
114

            
115
	/// `trace_filter` endpoint (wrapped in the trait implementation with futures compatibility)
116
	async fn filter(self, req: FilterRequest) -> TxsTraceRes {
117
		let from_block = self.block_id(req.from_block)?;
118
		let to_block = self.block_id(req.to_block)?;
119
		let block_heights = from_block..=to_block;
120

            
121
		let count = req.count.unwrap_or(self.max_count);
122
		if count > self.max_count {
123
			return Err(format!(
124
				"count ({}) can't be greater than maximum ({})",
125
				count, self.max_count
126
			));
127
		}
128

            
129
		// Build a list of all the Substrate block hashes that need to be traced.
130
		let mut block_hashes = vec![];
131
		for block_height in block_heights {
132
			if block_height == 0 {
133
				continue; // no traces for genesis block.
134
			}
135

            
136
			let block_hash = self
137
				.client
138
				.hash(block_height)
139
				.map_err(|e| {
140
					format!(
141
						"Error when fetching block {} header : {:?}",
142
						block_height, e
143
					)
144
				})?
145
				.ok_or_else(|| format!("Block with height {} don't exist", block_height))?;
146

            
147
			block_hashes.push(block_hash);
148
		}
149

            
150
		// Start a batch with these blocks.
151
		let batch_id = self.requester.start_batch(block_hashes.clone()).await?;
152
		// Fetch all the traces. It is done in another function to simplify error handling and allow
153
		// to call the following `stop_batch` regardless of the result. This is important for the
154
		// cache cleanup to work properly.
155
		let res = self.fetch_traces(req, &block_hashes, count as usize).await;
156
		// Stop the batch, allowing the cache task to remove useless non-started block traces and
157
		// start the expiration delay.
158
		self.requester.stop_batch(batch_id).await;
159

            
160
		res
161
	}
162

            
163
	async fn fetch_traces(
164
		&self,
165
		req: FilterRequest,
166
		block_hashes: &[H256],
167
		count: usize,
168
	) -> TxsTraceRes {
169
		let from_address = req.from_address.unwrap_or_default();
170
		let to_address = req.to_address.unwrap_or_default();
171

            
172
		let mut traces_amount: i64 = -(req.after.unwrap_or(0) as i64);
173
		let mut traces = vec![];
174

            
175
		for &block_hash in block_hashes {
176
			// Request the traces of this block to the cache service.
177
			// This will resolve quickly if the block is already cached, or wait until the block
178
			// has finished tracing.
179
			let block_traces = self.requester.get_traces(block_hash).await?;
180

            
181
			// Filter addresses.
182
			let mut block_traces: Vec<_> = block_traces
183
				.iter()
184
				.filter(|trace| match trace.action {
185
					block::TransactionTraceAction::Call { from, to, .. } => {
186
						(from_address.is_empty() || from_address.contains(&from))
187
							&& (to_address.is_empty() || to_address.contains(&to))
188
					}
189
					block::TransactionTraceAction::Create { from, .. } => {
190
						(from_address.is_empty() || from_address.contains(&from))
191
							&& to_address.is_empty()
192
					}
193
					block::TransactionTraceAction::Suicide { address, .. } => {
194
						(from_address.is_empty() || from_address.contains(&address))
195
							&& to_address.is_empty()
196
					}
197
				})
198
				.cloned()
199
				.collect();
200

            
201
			// Don't insert anything if we're still before "after"
202
			traces_amount += block_traces.len() as i64;
203
			if traces_amount > 0 {
204
				let traces_amount = traces_amount as usize;
205
				// If the current Vec of traces is across the "after" marker,
206
				// we skip some elements of it.
207
				if traces_amount < block_traces.len() {
208
					let skip = block_traces.len() - traces_amount;
209
					block_traces = block_traces.into_iter().skip(skip).collect();
210
				}
211

            
212
				traces.append(&mut block_traces);
213

            
214
				// If we go over "count" (the limit), we trim and exit the loop,
215
				// unless we used the default maximum, in which case we return an error.
216
				if traces_amount >= count {
217
					if req.count.is_none() {
218
						return Err(format!(
219
							"the amount of traces goes over the maximum ({}), please use 'after' \
220
							and 'count' in your request",
221
							self.max_count
222
						));
223
					}
224

            
225
					traces = traces.into_iter().take(count).collect();
226
					break;
227
				}
228
			}
229
		}
230

            
231
		Ok(traces)
232
	}
233
}
234

            
235
#[jsonrpsee::core::async_trait]
236
impl<B, C> TraceServer for Trace<B, C>
237
where
238
	B: BlockT<Hash = H256> + Send + Sync + 'static,
239
	B::Header: HeaderT<Number = u32>,
240
	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
241
	C: Send + Sync + 'static,
242
{
243
	async fn filter(
244
		&self,
245
		filter: FilterRequest,
246
	) -> jsonrpsee::core::RpcResult<Vec<TransactionTrace>> {
247
		self.clone()
248
			.filter(filter)
249
			.await
250
			.map_err(fc_rpc::internal_err)
251
	}
252
}
253

            
254
/// An opaque batch ID.
255
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
256
pub struct CacheBatchId(u64);
257

            
258
/// Requests the cache task can accept.
259
enum CacheRequest {
260
	/// Request to start caching the provided range of blocks.
261
	/// The task will add to blocks to its pool and immediately return a new batch ID.
262
	StartBatch {
263
		/// Returns the ID of the batch for cancellation.
264
		sender: oneshot::Sender<CacheBatchId>,
265
		/// List of block hash to trace.
266
		blocks: Vec<H256>,
267
	},
268
	/// Fetch the traces for given block hash.
269
	/// The task will answer only when it has processed this block.
270
	GetTraces {
271
		/// Returns the array of traces or an error.
272
		sender: oneshot::Sender<TxsTraceRes>,
273
		/// Hash of the block.
274
		block: H256,
275
	},
276
	/// Notify the cache that it can stop the batch with that ID. Any block contained only in
277
	/// this batch and still not started will be discarded.
278
	StopBatch { batch_id: CacheBatchId },
279
}
280

            
281
/// Allows to interact with the cache task.
282
#[derive(Clone)]
283
pub struct CacheRequester(TracingUnboundedSender<CacheRequest>);
284

            
285
impl CacheRequester {
286
	/// Request to start caching the provided range of blocks.
287
	/// The task will add to blocks to its pool and immediately return the batch ID.
288
	#[instrument(skip(self))]
289
	pub async fn start_batch(&self, blocks: Vec<H256>) -> Result<CacheBatchId, String> {
290
		let (response_tx, response_rx) = oneshot::channel();
291
		let sender = self.0.clone();
292

            
293
		sender
294
			.unbounded_send(CacheRequest::StartBatch {
295
				sender: response_tx,
296
				blocks,
297
			})
298
			.map_err(|e| {
299
				format!(
300
					"Failed to send request to the trace cache task. Error : {:?}",
301
					e
302
				)
303
			})?;
304

            
305
		response_rx.await.map_err(|e| {
306
			format!(
307
				"Trace cache task closed the response channel. Error : {:?}",
308
				e
309
			)
310
		})
311
	}
312

            
313
	/// Fetch the traces for given block hash.
314
	/// The task will answer only when it has processed this block.
315
	/// The block should be part of a batch first. If no batch has requested the block it will
316
	/// return an error.
317
	#[instrument(skip(self))]
318
	pub async fn get_traces(&self, block: H256) -> TxsTraceRes {
319
		let (response_tx, response_rx) = oneshot::channel();
320
		let sender = self.0.clone();
321

            
322
		sender
323
			.unbounded_send(CacheRequest::GetTraces {
324
				sender: response_tx,
325
				block,
326
			})
327
			.map_err(|e| {
328
				format!(
329
					"Failed to send request to the trace cache task. Error : {:?}",
330
					e
331
				)
332
			})?;
333

            
334
		response_rx
335
			.await
336
			.map_err(|e| {
337
				format!(
338
					"Trace cache task closed the response channel. Error : {:?}",
339
					e
340
				)
341
			})?
342
			.map_err(|e| format!("Failed to replay block. Error : {:?}", e))
343
	}
344

            
345
	/// Notify the cache that it can stop the batch with that ID. Any block contained only in
346
	/// this batch and still in the waiting pool will be discarded.
347
	#[instrument(skip(self))]
348
	pub async fn stop_batch(&self, batch_id: CacheBatchId) {
349
		let sender = self.0.clone();
350

            
351
		// Here we don't care if the request has been accepted or refused, the caller can't
352
		// do anything with it.
353
		let _ = sender
354
			.unbounded_send(CacheRequest::StopBatch { batch_id })
355
			.map_err(|e| {
356
				format!(
357
					"Failed to send request to the trace cache task. Error : {:?}",
358
					e
359
				)
360
			});
361
	}
362
}
363

            
364
/// Data stored for each block in the cache.
365
/// `active_batch_count` represents the number of batches using this
366
/// block. It will increase immediately when a batch is created, but will be
367
/// decrease only after the batch ends and its expiration delay passes.
368
/// It allows to keep the data in the cache for following requests that would use
369
/// this block, which is important to handle pagination efficiently.
370
struct CacheBlock {
371
	active_batch_count: usize,
372
	state: CacheBlockState,
373
}
374

            
375
/// State of a cached block. It can either be polled to be traced or cached.
376
enum CacheBlockState {
377
	/// Block has been added to the pool blocks to be replayed.
378
	/// It may be currently waiting to be replayed or being replayed.
379
	Pooled {
380
		started: bool,
381
		/// Multiple requests might query the same block while it is pooled to be
382
		/// traced. They response channel is stored here, and the result will be
383
		/// sent in all of them when the tracing is finished.
384
		waiting_requests: Vec<oneshot::Sender<TxsTraceRes>>,
385
		/// Channel used to unqueue a tracing that has not yet started.
386
		/// A tracing will be unqueued if it has not yet been started and the last batch
387
		/// needing this block is ended (ignoring the expiration delay).
388
		/// It is not used directly, but dropping will wake up the receiver.
389
		#[allow(dead_code)]
390
		unqueue_sender: oneshot::Sender<()>,
391
	},
392
	/// Tracing has been completed and the result is available. No Runtime API call
393
	/// will be needed until this block cache is removed.
394
	Cached { traces: TxsTraceRes },
395
}
396

            
397
/// Tracing a block is done in a separate tokio blocking task to avoid clogging the async threads.
398
/// For this reason a channel using this type is used by the blocking task to communicate with the
399
/// main cache task.
400
enum BlockingTaskMessage {
401
	/// Notify the tracing for this block has started as the blocking task got a permit from
402
	/// the semaphore. This is used to prevent the deletion of a cache entry for a block that has
403
	/// started being traced.
404
	Started { block_hash: H256 },
405
	/// The tracing is finished and the result is sent to the main task.
406
	Finished {
407
		block_hash: H256,
408
		result: TxsTraceRes,
409
	},
410
}
411

            
412
/// Type wrapper for the cache task, generic over the Client, Block and Backend types.
413
pub struct CacheTask<B, C, BE> {
414
	client: Arc<C>,
415
	backend: Arc<BE>,
416
	blocking_permits: Arc<Semaphore>,
417
	cached_blocks: BTreeMap<H256, CacheBlock>,
418
	batches: BTreeMap<u64, Vec<H256>>,
419
	next_batch_id: u64,
420
	metrics: Option<Metrics>,
421
	_phantom: PhantomData<B>,
422
}
423

            
424
impl<B, C, BE> CacheTask<B, C, BE>
425
where
426
	BE: Backend<B> + 'static,
427
	BE::State: StateBackend<BlakeTwo256>,
428
	C: ProvideRuntimeApi<B>,
429
	C: StorageProvider<B, BE>,
430
	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
431
	C: Send + Sync + 'static,
432
	B: BlockT<Hash = H256> + Send + Sync + 'static,
433
	B::Header: HeaderT<Number = u32>,
434
	C::Api: BlockBuilder<B>,
435
	C::Api: DebugRuntimeApi<B>,
436
	C::Api: EthereumRuntimeRPCApi<B>,
437
	C::Api: ApiExt<B>,
438
{
439
	/// Create a new cache task.
440
	///
441
	/// Returns a Future that needs to be added to a tokio executor, and a handle allowing to
442
	/// send requests to the task.
443
	pub fn create(
444
		client: Arc<C>,
445
		backend: Arc<BE>,
446
		cache_duration: Duration,
447
		blocking_permits: Arc<Semaphore>,
448
		overrides: Arc<dyn StorageOverride<B>>,
449
		prometheus: Option<PrometheusRegistry>,
450
	) -> (impl Future<Output = ()>, CacheRequester) {
451
		// Communication with the outside world :
452
		let (requester_tx, mut requester_rx) =
453
			sc_utils::mpsc::tracing_unbounded("trace-filter-cache", 100_000);
454

            
455
		// Task running in the service.
456
		let task = async move {
457
			// The following variables are polled by the select! macro, and thus cannot be
458
			// part of Self without introducing borrowing issues.
459
			let mut batch_expirations = FuturesUnordered::new();
460
			let (blocking_tx, mut blocking_rx) =
461
				mpsc::channel(blocking_permits.available_permits() * 2);
462
			let metrics = if let Some(registry) = prometheus {
463
				match Metrics::register(&registry) {
464
					Ok(metrics) => Some(metrics),
465
					Err(err) => {
466
						log::error!(target: "tracing", "Failed to register metrics {err:?}");
467
						None
468
					}
469
				}
470
			} else {
471
				None
472
			};
473
			// Contains the inner state of the cache task, excluding the pooled futures/channels.
474
			// Having this object allows to refactor each event into its own function, simplifying
475
			// the main loop.
476
			let mut inner = Self {
477
				client,
478
				backend,
479
				blocking_permits,
480
				cached_blocks: BTreeMap::new(),
481
				batches: BTreeMap::new(),
482
				next_batch_id: 0,
483
				metrics,
484
				_phantom: Default::default(),
485
			};
486

            
487
			// Main event loop. This loop must not contain any direct .await, as we want to
488
			// react to events as fast as possible.
489
			loop {
490
				select! {
491
					request = requester_rx.next() => {
492
						match request {
493
							None => break,
494
							Some(CacheRequest::StartBatch {sender, blocks})
495
								=> inner.request_start_batch(&blocking_tx, sender, blocks, overrides.clone()),
496
							Some(CacheRequest::GetTraces {sender, block})
497
								=> inner.request_get_traces(sender, block),
498
							Some(CacheRequest::StopBatch {batch_id}) => {
499
								// Cannot be refactored inside `request_stop_batch` because
500
								// it has an unnamable type :C
501
								batch_expirations.push(async move {
502
									sleep(cache_duration).await;
503
									batch_id
504
								});
505

            
506
								inner.request_stop_batch(batch_id);
507
							},
508
						}
509
					},
510
					message = blocking_rx.recv().fuse() => {
511
						match message {
512
							None => (),
513
							Some(BlockingTaskMessage::Started { block_hash })
514
								=> inner.blocking_started(block_hash),
515
							Some(BlockingTaskMessage::Finished { block_hash, result })
516
								=> inner.blocking_finished(block_hash, result),
517
						}
518
					},
519
					batch_id = batch_expirations.next() => {
520
						match batch_id {
521
							None => (),
522
							Some(batch_id) => inner.expired_batch(batch_id),
523
						}
524
					}
525
				}
526
			}
527
		}
528
		.instrument(tracing::debug_span!("trace_filter_cache"));
529

            
530
		(task, CacheRequester(requester_tx))
531
	}
532

            
533
	/// Handle the creation of a batch.
534
	/// Will start the tracing process for blocks that are not already in the cache.
535
	#[instrument(skip(self, blocking_tx, sender, blocks, overrides))]
536
	fn request_start_batch(
537
		&mut self,
538
		blocking_tx: &mpsc::Sender<BlockingTaskMessage>,
539
		sender: oneshot::Sender<CacheBatchId>,
540
		blocks: Vec<H256>,
541
		overrides: Arc<dyn StorageOverride<B>>,
542
	) {
543
		tracing::trace!("Starting batch {}", self.next_batch_id);
544
		self.batches.insert(self.next_batch_id, blocks.clone());
545

            
546
		for block in blocks {
547
			// The block is already in the cache, awesome!
548
			if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
549
				block_cache.active_batch_count += 1;
550
				tracing::trace!(
551
					"Cache hit for block {}, now used by {} batches.",
552
					block,
553
					block_cache.active_batch_count
554
				);
555
			}
556
			// Otherwise we need to queue this block for tracing.
557
			else {
558
				tracing::trace!("Cache miss for block {}, pooling it for tracing.", block);
559

            
560
				let blocking_permits = Arc::clone(&self.blocking_permits);
561
				let (unqueue_sender, unqueue_receiver) = oneshot::channel();
562
				let client = Arc::clone(&self.client);
563
				let backend = Arc::clone(&self.backend);
564
				let blocking_tx = blocking_tx.clone();
565
				let overrides = overrides.clone();
566

            
567
				// Spawn all block caching asynchronously.
568
				// It will wait to obtain a permit, then spawn a blocking task.
569
				// When the blocking task returns its result, it is sent
570
				// thought a channel to the main task loop.
571
				tokio::spawn(
572
					async move {
573
						tracing::trace!("Waiting for blocking permit or task cancellation");
574
						let _permit = select!(
575
							_ = unqueue_receiver.fuse() => {
576
							tracing::trace!("Tracing of the block has been cancelled.");
577
								return;
578
							},
579
							permit = blocking_permits.acquire().fuse() => permit,
580
						);
581

            
582
						// Warn the main task that block tracing as started, and
583
						// this block cache entry should not be removed.
584
						let _ = blocking_tx
585
							.send(BlockingTaskMessage::Started { block_hash: block })
586
							.await;
587

            
588
						tracing::trace!("Start block tracing in a blocking task.");
589

            
590
						// Perform block tracing in a tokio blocking task.
591
						let result = async {
592
							tokio::task::spawn_blocking(move || {
593
								Self::cache_block(client, backend, block, overrides.clone())
594
							})
595
							.await
596
							.map_err(|e| {
597
								format!("Tracing Substrate block {} panicked : {:?}", block, e)
598
							})?
599
						}
600
						.await
601
						.map_err(|e| e.to_string());
602

            
603
						tracing::trace!("Block tracing finished, sending result to main task.");
604

            
605
						// Send a response to the main task.
606
						let _ = blocking_tx
607
							.send(BlockingTaskMessage::Finished {
608
								block_hash: block,
609
								result,
610
							})
611
							.await;
612
					}
613
					.instrument(tracing::trace_span!("Block tracing", block = %block)),
614
				);
615

            
616
				// Insert the block in the cache.
617
				self.cached_blocks.insert(
618
					block,
619
					CacheBlock {
620
						active_batch_count: 1,
621
						state: CacheBlockState::Pooled {
622
							started: false,
623
							waiting_requests: vec![],
624
							unqueue_sender,
625
						},
626
					},
627
				);
628
			}
629
		}
630

            
631
		// Respond with the batch ID.
632
		let _ = sender.send(CacheBatchId(self.next_batch_id));
633

            
634
		// Increase batch ID for the next request.
635
		self.next_batch_id = self.next_batch_id.overflowing_add(1).0;
636
	}
637

            
638
	/// Handle a request to get the traces of the provided block.
639
	/// - If the result is stored in the cache, it sends it immediately.
640
	/// - If the block is currently being pooled, it is added to this block cache waiting list,
641
	///   and all requests concerning this block will be satisfied when the tracing for this block
642
	///   is finished.
643
	/// - If this block is missing from the cache, it means no batch asked for it. All requested
644
	///   blocks should be contained in a batch beforehand, and thus an error is returned.
645
	#[instrument(skip(self))]
646
	fn request_get_traces(&mut self, sender: oneshot::Sender<TxsTraceRes>, block: H256) {
647
		if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
648
			match &mut block_cache.state {
649
				CacheBlockState::Pooled {
650
					ref mut waiting_requests,
651
					..
652
				} => {
653
					tracing::warn!(
654
						"A request asked a pooled block ({}), adding it to the list of \
655
						waiting requests.",
656
						block
657
					);
658
					waiting_requests.push(sender);
659
					if let Some(metrics) = &self.metrics {
660
						metrics.tracing_cache_misses.inc();
661
					}
662
				}
663
				CacheBlockState::Cached { traces, .. } => {
664
					tracing::warn!(
665
						"A request asked a cached block ({}), sending the traces directly.",
666
						block
667
					);
668
					let _ = sender.send(traces.clone());
669
					if let Some(metrics) = &self.metrics {
670
						metrics.tracing_cache_hits.inc();
671
					}
672
				}
673
			}
674
		} else {
675
			tracing::warn!(
676
				"An RPC request asked to get a block ({}) which was not batched.",
677
				block
678
			);
679
			let _ = sender.send(Err(format!(
680
				"RPC request asked a block ({}) that was not batched",
681
				block
682
			)));
683
		}
684
	}
685

            
686
	/// Handle a request to stop a batch.
687
	/// For all blocks that needed to be traced, are only in this batch and not yet started, their
688
	/// tracing is cancelled to save CPU-time and avoid attacks requesting large amount of blocks.
689
	/// This batch data is not yet removed however. Instead a expiration delay timer is started
690
	/// after which the data will indeed be cleared. (the code for that is in the main loop code
691
	/// as it involved an unnamable type :C)
692
	#[instrument(skip(self))]
693
	fn request_stop_batch(&mut self, batch_id: CacheBatchId) {
694
		tracing::trace!("Stopping batch {}", batch_id.0);
695
		if let Some(blocks) = self.batches.get(&batch_id.0) {
696
			for block in blocks {
697
				let mut remove = false;
698

            
699
				// We remove early the block cache if this batch is the last
700
				// pooling this block.
701
				if let Some(block_cache) = self.cached_blocks.get_mut(block) {
702
					if block_cache.active_batch_count == 1
703
						&& matches!(
704
							block_cache.state,
705
							CacheBlockState::Pooled { started: false, .. }
706
						) {
707
						remove = true;
708
					}
709
				}
710

            
711
				if remove {
712
					tracing::trace!("Pooled block {} is no longer requested.", block);
713
					// Remove block from the cache. Drops the value,
714
					// closing all the channels contained in it.
715
					let _ = self.cached_blocks.remove(block);
716
				}
717
			}
718
		}
719
	}
720

            
721
	/// A tracing blocking task notifies it got a permit and is starting the tracing.
722
	/// This started status is stored to avoid removing this block entry.
723
	#[instrument(skip(self))]
724
	fn blocking_started(&mut self, block_hash: H256) {
725
		if let Some(block_cache) = self.cached_blocks.get_mut(&block_hash) {
726
			if let CacheBlockState::Pooled {
727
				ref mut started, ..
728
			} = block_cache.state
729
			{
730
				*started = true;
731
			}
732
		}
733
	}
734

            
735
	/// A tracing blocking task notifies it has finished the tracing and provide the result.
736
	#[instrument(skip(self, result))]
737
	fn blocking_finished(&mut self, block_hash: H256, result: TxsTraceRes) {
738
		// In some cases it might be possible to receive traces of a block
739
		// that has no entry in the cache because it was removed of the pool
740
		// and received a permit concurrently. We just ignore it.
741
		//
742
		// TODO : Should we add it back ? Should it have an active_batch_count
743
		// of 1 then ?
744
		if let Some(block_cache) = self.cached_blocks.get_mut(&block_hash) {
745
			if let CacheBlockState::Pooled {
746
				ref mut waiting_requests,
747
				..
748
			} = block_cache.state
749
			{
750
				tracing::trace!(
751
					"A new block ({}) has been traced, adding it to the cache and responding to \
752
					{} waiting requests.",
753
					block_hash,
754
					waiting_requests.len()
755
				);
756
				// Send result in waiting channels
757
				while let Some(channel) = waiting_requests.pop() {
758
					let _ = channel.send(result.clone());
759
				}
760

            
761
				// Update cache entry
762
				block_cache.state = CacheBlockState::Cached { traces: result };
763
			}
764
		}
765
	}
766

            
767
	/// A batch expiration delay timer has completed. It performs the cache cleaning for blocks
768
	/// not longer used by other batches.
769
	#[instrument(skip(self))]
770
	fn expired_batch(&mut self, batch_id: CacheBatchId) {
771
		if let Some(batch) = self.batches.remove(&batch_id.0) {
772
			for block in batch {
773
				// For each block of the batch, we remove it if it was the
774
				// last batch containing it.
775
				let mut remove = false;
776
				if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
777
					block_cache.active_batch_count -= 1;
778

            
779
					if block_cache.active_batch_count == 0 {
780
						remove = true;
781
					}
782
				}
783

            
784
				if remove {
785
					let _ = self.cached_blocks.remove(&block);
786
				}
787
			}
788
		}
789
	}
790

            
791
	/// (In blocking task) Use the Runtime API to trace the block.
792
	#[instrument(skip(client, backend, overrides))]
793
	fn cache_block(
794
		client: Arc<C>,
795
		backend: Arc<BE>,
796
		substrate_hash: H256,
797
		overrides: Arc<dyn StorageOverride<B>>,
798
	) -> TxsTraceRes {
799
		// Get Substrate block data.
800
		let api = client.runtime_api();
801
		let block_header = client
802
			.header(substrate_hash)
803
			.map_err(|e| {
804
				format!(
805
					"Error when fetching substrate block {} header : {:?}",
806
					substrate_hash, e
807
				)
808
			})?
809
			.ok_or_else(|| format!("Substrate block {} don't exist", substrate_hash))?;
810

            
811
		let height = *block_header.number();
812
		let substrate_parent_hash = *block_header.parent_hash();
813

            
814
		// Get Ethereum block data.
815
		let (eth_block, eth_transactions) = match (
816
			overrides.current_block(substrate_hash),
817
			overrides.current_transaction_statuses(substrate_hash),
818
		) {
819
			(Some(a), Some(b)) => (a, b),
820
			_ => {
821
				return Err(format!(
822
					"Failed to get Ethereum block data for Substrate block {}",
823
					substrate_hash
824
				))
825
			}
826
		};
827

            
828
		let eth_block_hash = eth_block.header.hash();
829
		let eth_tx_hashes = eth_transactions
830
			.iter()
831
			.map(|t| t.transaction_hash)
832
			.collect();
833

            
834
		// Get extrinsics (containing Ethereum ones)
835
		let extrinsics = backend
836
			.blockchain()
837
			.body(substrate_hash)
838
			.map_err(|e| {
839
				format!(
840
					"Blockchain error when fetching extrinsics of block {} : {:?}",
841
					height, e
842
				)
843
			})?
844
			.ok_or_else(|| format!("Could not find block {} when fetching extrinsics.", height))?;
845

            
846
		// Get DebugRuntimeApi version
847
		let trace_api_version = if let Ok(Some(api_version)) =
848
			api.api_version::<dyn DebugRuntimeApi<B>>(substrate_parent_hash)
849
		{
850
			api_version
851
		} else {
852
			return Err("Runtime api version call failed (trace)".to_string());
853
		};
854

            
855
		// Trace the block.
856
		let f = || -> Result<_, String> {
857
			let result = if trace_api_version >= 5 {
858
				api.trace_block(
859
					substrate_parent_hash,
860
					extrinsics,
861
					eth_tx_hashes,
862
					&block_header,
863
				)
864
			} else {
865
				// Get core runtime api version
866
				let core_api_version = if let Ok(Some(api_version)) =
867
					api.api_version::<dyn Core<B>>(substrate_parent_hash)
868
				{
869
					api_version
870
				} else {
871
					return Err("Runtime api version call failed (core)".to_string());
872
				};
873

            
874
				// Initialize block: calls the "on_initialize" hook on every pallet
875
				// in AllPalletsWithSystem
876
				// This was fine before pallet-message-queue because the XCM messages
877
				// were processed by the "setValidationData" inherent call and not on an
878
				// "on_initialize" hook, which runs before enabling XCM tracing
879
				if core_api_version >= 5 {
880
					api.initialize_block(substrate_parent_hash, &block_header)
881
						.map_err(|e| format!("Runtime api access error: {:?}", e))?;
882
				} else {
883
					#[allow(deprecated)]
884
					api.initialize_block_before_version_5(substrate_parent_hash, &block_header)
885
						.map_err(|e| format!("Runtime api access error: {:?}", e))?;
886
				}
887

            
888
				#[allow(deprecated)]
889
				api.trace_block_before_version_5(substrate_parent_hash, extrinsics, eth_tx_hashes)
890
			};
891

            
892
			result
893
				.map_err(|e| format!("Blockchain error when replaying block {} : {:?}", height, e))?
894
				.map_err(|e| {
895
					tracing::warn!(
896
						target: "tracing",
897
						"Internal runtime error when replaying block {} : {:?}",
898
						height,
899
						e
900
					);
901
					format!(
902
						"Internal runtime error when replaying block {} : {:?}",
903
						height, e
904
					)
905
				})?;
906

            
907
			Ok(moonbeam_rpc_primitives_debug::Response::Block)
908
		};
909

            
910
		let eth_transactions_by_index: BTreeMap<u32, H256> = eth_transactions
911
			.iter()
912
			.map(|t| (t.transaction_index, t.transaction_hash))
913
			.collect();
914

            
915
		let mut proxy = moonbeam_client_evm_tracing::listeners::CallList::default();
916
		proxy.using(f)?;
917

            
918
		let traces: Vec<TransactionTrace> =
919
			moonbeam_client_evm_tracing::formatters::TraceFilter::format(proxy)
920
				.ok_or("Fail to format proxy")?
921
				.into_iter()
922
				.filter_map(|mut trace| {
923
					match eth_transactions_by_index.get(&trace.transaction_position) {
924
						Some(transaction_hash) => {
925
							trace.block_hash = eth_block_hash;
926
							trace.block_number = height;
927
							trace.transaction_hash = *transaction_hash;
928

            
929
							// Reformat error messages.
930
							if let block::TransactionTraceOutput::Error(ref mut error) =
931
								trace.output
932
							{
933
								if error.as_slice() == b"execution reverted" {
934
									*error = b"Reverted".to_vec();
935
								}
936
							}
937

            
938
							Some(trace)
939
						}
940
						None => {
941
							log::warn!(
942
								target: "tracing",
943
								"A trace in block {} does not map to any known ethereum transaction. Trace: {:?}",
944
								height,
945
								trace,
946
							);
947
							None
948
						}
949
					}
950
				})
951
				.collect();
952

            
953
		Ok(traces)
954
	}
955
}
956

            
957
/// Prometheus metrics for tracing.
958
#[derive(Clone)]
959
pub(crate) struct Metrics {
960
	tracing_cache_hits: Counter<U64>,
961
	tracing_cache_misses: Counter<U64>,
962
}
963

            
964
impl Metrics {
965
	pub(crate) fn register(registry: &PrometheusRegistry) -> Result<Self, PrometheusError> {
966
		Ok(Self {
967
			tracing_cache_hits: register(
968
				Counter::new("tracing_cache_hits", "Number of tracing cache hits.")?,
969
				registry,
970
			)?,
971
			tracing_cache_misses: register(
972
				Counter::new("tracing_cache_misses", "Number of tracing cache misses.")?,
973
				registry,
974
			)?,
975
		})
976
	}
977
}