1
// Copyright 2019-2025 PureStake Inc.
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! `trace_filter` RPC handler and its associated service task.
18
//! The RPC handler rely on `CacheTask` which provides a future that must be run inside a tokio
19
//! executor.
20
//!
21
//! The implementation is composed of multiple tasks :
22
//! - Many calls the RPC handler `Trace::filter`, communicating with the main task.
23
//! - A main `CacheTask` managing the cache and the communication between tasks.
24
//! - For each traced block an async task responsible to wait for a permit, spawn a blocking
25
//!   task and waiting for the result, then send it to the main `CacheTask`.
26

            
27
use futures::{select, stream::FuturesUnordered, FutureExt, StreamExt};
28
use std::{collections::BTreeMap, future::Future, marker::PhantomData, sync::Arc, time::Duration};
29
use tokio::{
30
	sync::{mpsc, oneshot, Semaphore},
31
	time::sleep,
32
};
33
use tracing::{instrument, Instrument};
34

            
35
use sc_client_api::backend::{Backend, StateBackend, StorageProvider};
36
use sc_utils::mpsc::TracingUnboundedSender;
37
use sp_api::{ApiExt, Core, ProvideRuntimeApi};
38
use sp_block_builder::BlockBuilder;
39
use sp_blockchain::{
40
	Backend as BlockchainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
41
};
42
use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Header as HeaderT};
43
use substrate_prometheus_endpoint::{
44
	register, Counter, PrometheusError, Registry as PrometheusRegistry, U64,
45
};
46

            
47
use ethereum_types::H256;
48
use fc_storage::StorageOverride;
49
use fp_rpc::EthereumRuntimeRPCApi;
50

            
51
use moonbeam_client_evm_tracing::{
52
	formatters::ResponseFormatter,
53
	types::block::{self, TransactionTrace},
54
};
55
pub use moonbeam_rpc_core_trace::{FilterRequest, TraceServer};
56
use moonbeam_rpc_core_types::{RequestBlockId, RequestBlockTag};
57
use moonbeam_rpc_primitives_debug::DebugRuntimeApi;
58

            
59
type TxsTraceRes = Result<Vec<TransactionTrace>, String>;
60

            
61
/// RPC handler. Will communicate with a `CacheTask` through a `CacheRequester`.
62
pub struct Trace<B, C> {
63
	_phantom: PhantomData<B>,
64
	client: Arc<C>,
65
	requester: CacheRequester,
66
	max_count: u32,
67
	max_block_range: u32,
68
}
69

            
70
impl<B, C> Clone for Trace<B, C> {
71
	fn clone(&self) -> Self {
72
		Self {
73
			_phantom: PhantomData,
74
			client: Arc::clone(&self.client),
75
			requester: self.requester.clone(),
76
			max_count: self.max_count,
77
			max_block_range: self.max_block_range,
78
		}
79
	}
80
}
81

            
82
impl<B, C> Trace<B, C>
83
where
84
	B: BlockT<Hash = H256> + Send + Sync + 'static,
85
	B::Header: HeaderT<Number = u32>,
86
	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
87
	C: Send + Sync + 'static,
88
{
89
	/// Create a new RPC handler.
90
	pub fn new(
91
		client: Arc<C>,
92
		requester: CacheRequester,
93
		max_count: u32,
94
		max_block_range: u32,
95
	) -> Self {
96
		Self {
97
			client,
98
			requester,
99
			max_count,
100
			max_block_range,
101
			_phantom: PhantomData,
102
		}
103
	}
104

            
105
	/// Convert an optional block ID (number or tag) to a block height.
106
	fn block_id(&self, id: Option<RequestBlockId>) -> Result<u32, &'static str> {
107
		match id {
108
			Some(RequestBlockId::Number(n)) => Ok(n),
109
			None | Some(RequestBlockId::Tag(RequestBlockTag::Latest)) => {
110
				Ok(self.client.info().best_number)
111
			}
112
			Some(RequestBlockId::Tag(RequestBlockTag::Earliest)) => Ok(0),
113
			Some(RequestBlockId::Tag(RequestBlockTag::Finalized)) => {
114
				Ok(self.client.info().finalized_number)
115
			}
116
			Some(RequestBlockId::Tag(RequestBlockTag::Pending)) => {
117
				Err("'pending' is not supported")
118
			}
119
			Some(RequestBlockId::Hash(_)) => Err("Block hash not supported"),
120
		}
121
	}
122

            
123
	/// `trace_filter` endpoint (wrapped in the trait implementation with futures compatibility)
124
	async fn filter(self, req: FilterRequest) -> TxsTraceRes {
125
		let from_block = self.block_id(req.from_block)?;
126
		let to_block = self.block_id(req.to_block)?;
127

            
128
		// Validate block range to prevent abuse
129
		let block_range = to_block.saturating_sub(from_block);
130
		if block_range > self.max_block_range {
131
			return Err(format!(
132
				"block range is too wide (maximum {})",
133
				self.max_block_range
134
			));
135
		}
136

            
137
		let block_heights = from_block..=to_block;
138

            
139
		let count = req.count.unwrap_or(self.max_count);
140
		if count > self.max_count {
141
			return Err(format!(
142
				"count ({}) can't be greater than maximum ({})",
143
				count, self.max_count
144
			));
145
		}
146

            
147
		// Build a list of all the Substrate block hashes that need to be traced.
148
		let mut block_hashes = vec![];
149
		for block_height in block_heights {
150
			if block_height == 0 {
151
				continue; // no traces for genesis block.
152
			}
153

            
154
			let block_hash = self
155
				.client
156
				.hash(block_height)
157
				.map_err(|e| {
158
					format!(
159
						"Error when fetching block {} header : {:?}",
160
						block_height, e
161
					)
162
				})?
163
				.ok_or_else(|| format!("Block with height {} don't exist", block_height))?;
164

            
165
			block_hashes.push(block_hash);
166
		}
167

            
168
		// Start a batch with these blocks.
169
		let batch_id = self.requester.start_batch(block_hashes.clone()).await?;
170
		// Fetch all the traces. It is done in another function to simplify error handling and allow
171
		// to call the following `stop_batch` regardless of the result. This is important for the
172
		// cache cleanup to work properly.
173
		let res = self.fetch_traces(req, &block_hashes, count as usize).await;
174
		// Stop the batch, allowing the cache task to remove useless non-started block traces and
175
		// start the expiration delay.
176
		self.requester.stop_batch(batch_id).await;
177

            
178
		res
179
	}
180

            
181
	async fn fetch_traces(
182
		&self,
183
		req: FilterRequest,
184
		block_hashes: &[H256],
185
		count: usize,
186
	) -> TxsTraceRes {
187
		let from_address = req.from_address.unwrap_or_default();
188
		let to_address = req.to_address.unwrap_or_default();
189

            
190
		let mut traces_amount: i64 = -(req.after.unwrap_or(0) as i64);
191
		let mut traces = vec![];
192

            
193
		for &block_hash in block_hashes {
194
			// Request the traces of this block to the cache service.
195
			// This will resolve quickly if the block is already cached, or wait until the block
196
			// has finished tracing.
197
			let block_traces = self.requester.get_traces(block_hash).await?;
198

            
199
			// Filter addresses.
200
			let mut block_traces: Vec<_> = block_traces
201
				.iter()
202
				.filter(|trace| match trace.action {
203
					block::TransactionTraceAction::Call { from, to, .. } => {
204
						(from_address.is_empty() || from_address.contains(&from))
205
							&& (to_address.is_empty() || to_address.contains(&to))
206
					}
207
					block::TransactionTraceAction::Create { from, .. } => {
208
						(from_address.is_empty() || from_address.contains(&from))
209
							&& to_address.is_empty()
210
					}
211
					block::TransactionTraceAction::Suicide { address, .. } => {
212
						(from_address.is_empty() || from_address.contains(&address))
213
							&& to_address.is_empty()
214
					}
215
				})
216
				.cloned()
217
				.collect();
218

            
219
			// Don't insert anything if we're still before "after"
220
			traces_amount += block_traces.len() as i64;
221
			if traces_amount > 0 {
222
				let traces_amount = traces_amount as usize;
223
				// If the current Vec of traces is across the "after" marker,
224
				// we skip some elements of it.
225
				if traces_amount < block_traces.len() {
226
					let skip = block_traces.len() - traces_amount;
227
					block_traces = block_traces.into_iter().skip(skip).collect();
228
				}
229

            
230
				traces.append(&mut block_traces);
231

            
232
				// If we go over "count" (the limit), we trim and exit the loop,
233
				// unless we used the default maximum, in which case we return an error.
234
				if traces_amount >= count {
235
					if req.count.is_none() {
236
						return Err(format!(
237
							"the amount of traces goes over the maximum ({}), please use 'after' \
238
							and 'count' in your request",
239
							self.max_count
240
						));
241
					}
242

            
243
					traces = traces.into_iter().take(count).collect();
244
					break;
245
				}
246
			}
247
		}
248

            
249
		Ok(traces)
250
	}
251
}
252

            
253
#[jsonrpsee::core::async_trait]
254
impl<B, C> TraceServer for Trace<B, C>
255
where
256
	B: BlockT<Hash = H256> + Send + Sync + 'static,
257
	B::Header: HeaderT<Number = u32>,
258
	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
259
	C: Send + Sync + 'static,
260
{
261
	async fn filter(
262
		&self,
263
		filter: FilterRequest,
264
	) -> jsonrpsee::core::RpcResult<Vec<TransactionTrace>> {
265
		self.clone()
266
			.filter(filter)
267
			.await
268
			.map_err(fc_rpc::internal_err)
269
	}
270
}
271

            
272
/// An opaque batch ID.
273
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
274
pub struct CacheBatchId(u64);
275

            
276
/// Requests the cache task can accept.
277
enum CacheRequest {
278
	/// Request to start caching the provided range of blocks.
279
	/// The task will add to blocks to its pool and immediately return a new batch ID.
280
	StartBatch {
281
		/// Returns the ID of the batch for cancellation.
282
		sender: oneshot::Sender<CacheBatchId>,
283
		/// List of block hash to trace.
284
		blocks: Vec<H256>,
285
	},
286
	/// Fetch the traces for given block hash.
287
	/// The task will answer only when it has processed this block.
288
	GetTraces {
289
		/// Returns the array of traces or an error.
290
		sender: oneshot::Sender<TxsTraceRes>,
291
		/// Hash of the block.
292
		block: H256,
293
	},
294
	/// Notify the cache that it can stop the batch with that ID. Any block contained only in
295
	/// this batch and still not started will be discarded.
296
	StopBatch { batch_id: CacheBatchId },
297
}
298

            
299
/// Allows to interact with the cache task.
300
#[derive(Clone)]
301
pub struct CacheRequester(TracingUnboundedSender<CacheRequest>);
302

            
303
impl CacheRequester {
304
	/// Request to start caching the provided range of blocks.
305
	/// The task will add to blocks to its pool and immediately return the batch ID.
306
	#[instrument(skip(self))]
307
	pub async fn start_batch(&self, blocks: Vec<H256>) -> Result<CacheBatchId, String> {
308
		let (response_tx, response_rx) = oneshot::channel();
309
		let sender = self.0.clone();
310

            
311
		sender
312
			.unbounded_send(CacheRequest::StartBatch {
313
				sender: response_tx,
314
				blocks,
315
			})
316
			.map_err(|e| {
317
				format!(
318
					"Failed to send request to the trace cache task. Error : {:?}",
319
					e
320
				)
321
			})?;
322

            
323
		response_rx.await.map_err(|e| {
324
			format!(
325
				"Trace cache task closed the response channel. Error : {:?}",
326
				e
327
			)
328
		})
329
	}
330

            
331
	/// Fetch the traces for given block hash.
332
	/// The task will answer only when it has processed this block.
333
	/// The block should be part of a batch first. If no batch has requested the block it will
334
	/// return an error.
335
	#[instrument(skip(self))]
336
	pub async fn get_traces(&self, block: H256) -> TxsTraceRes {
337
		let (response_tx, response_rx) = oneshot::channel();
338
		let sender = self.0.clone();
339

            
340
		sender
341
			.unbounded_send(CacheRequest::GetTraces {
342
				sender: response_tx,
343
				block,
344
			})
345
			.map_err(|e| {
346
				format!(
347
					"Failed to send request to the trace cache task. Error : {:?}",
348
					e
349
				)
350
			})?;
351

            
352
		response_rx
353
			.await
354
			.map_err(|e| {
355
				format!(
356
					"Trace cache task closed the response channel. Error : {:?}",
357
					e
358
				)
359
			})?
360
			.map_err(|e| format!("Failed to replay block. Error : {:?}", e))
361
	}
362

            
363
	/// Notify the cache that it can stop the batch with that ID. Any block contained only in
364
	/// this batch and still in the waiting pool will be discarded.
365
	#[instrument(skip(self))]
366
	pub async fn stop_batch(&self, batch_id: CacheBatchId) {
367
		let sender = self.0.clone();
368

            
369
		// Here we don't care if the request has been accepted or refused, the caller can't
370
		// do anything with it.
371
		let _ = sender
372
			.unbounded_send(CacheRequest::StopBatch { batch_id })
373
			.map_err(|e| {
374
				format!(
375
					"Failed to send request to the trace cache task. Error : {:?}",
376
					e
377
				)
378
			});
379
	}
380
}
381

            
382
/// Data stored for each block in the cache.
383
/// `active_batch_count` represents the number of batches using this
384
/// block. It will increase immediately when a batch is created, but will be
385
/// decrease only after the batch ends and its expiration delay passes.
386
/// It allows to keep the data in the cache for following requests that would use
387
/// this block, which is important to handle pagination efficiently.
388
struct CacheBlock {
389
	active_batch_count: usize,
390
	state: CacheBlockState,
391
}
392

            
393
/// State of a cached block. It can either be polled to be traced or cached.
394
enum CacheBlockState {
395
	/// Block has been added to the pool blocks to be replayed.
396
	/// It may be currently waiting to be replayed or being replayed.
397
	Pooled {
398
		started: bool,
399
		/// Multiple requests might query the same block while it is pooled to be
400
		/// traced. They response channel is stored here, and the result will be
401
		/// sent in all of them when the tracing is finished.
402
		waiting_requests: Vec<oneshot::Sender<TxsTraceRes>>,
403
		/// Channel used to unqueue a tracing that has not yet started.
404
		/// A tracing will be unqueued if it has not yet been started and the last batch
405
		/// needing this block is ended (ignoring the expiration delay).
406
		/// It is not used directly, but dropping will wake up the receiver.
407
		#[allow(dead_code)]
408
		unqueue_sender: oneshot::Sender<()>,
409
	},
410
	/// Tracing has been completed and the result is available. No Runtime API call
411
	/// will be needed until this block cache is removed.
412
	Cached { traces: TxsTraceRes },
413
}
414

            
415
/// Tracing a block is done in a separate tokio blocking task to avoid clogging the async threads.
416
/// For this reason a channel using this type is used by the blocking task to communicate with the
417
/// main cache task.
418
enum BlockingTaskMessage {
419
	/// Notify the tracing for this block has started as the blocking task got a permit from
420
	/// the semaphore. This is used to prevent the deletion of a cache entry for a block that has
421
	/// started being traced.
422
	Started { block_hash: H256 },
423
	/// The tracing is finished and the result is sent to the main task.
424
	Finished {
425
		block_hash: H256,
426
		result: TxsTraceRes,
427
	},
428
}
429

            
430
/// Type wrapper for the cache task, generic over the Client, Block and Backend types.
431
pub struct CacheTask<B, C, BE> {
432
	client: Arc<C>,
433
	backend: Arc<BE>,
434
	blocking_permits: Arc<Semaphore>,
435
	cached_blocks: BTreeMap<H256, CacheBlock>,
436
	batches: BTreeMap<u64, Vec<H256>>,
437
	next_batch_id: u64,
438
	metrics: Option<Metrics>,
439
	_phantom: PhantomData<B>,
440
}
441

            
442
impl<B, C, BE> CacheTask<B, C, BE>
443
where
444
	BE: Backend<B> + 'static,
445
	BE::State: StateBackend<BlakeTwo256>,
446
	C: ProvideRuntimeApi<B>,
447
	C: StorageProvider<B, BE>,
448
	C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
449
	C: Send + Sync + 'static,
450
	B: BlockT<Hash = H256> + Send + Sync + 'static,
451
	B::Header: HeaderT<Number = u32>,
452
	C::Api: BlockBuilder<B>,
453
	C::Api: DebugRuntimeApi<B>,
454
	C::Api: EthereumRuntimeRPCApi<B>,
455
	C::Api: ApiExt<B>,
456
{
457
	/// Create a new cache task.
458
	///
459
	/// Returns a Future that needs to be added to a tokio executor, and a handle allowing to
460
	/// send requests to the task.
461
	pub fn create(
462
		client: Arc<C>,
463
		backend: Arc<BE>,
464
		cache_duration: Duration,
465
		blocking_permits: Arc<Semaphore>,
466
		overrides: Arc<dyn StorageOverride<B>>,
467
		prometheus: Option<PrometheusRegistry>,
468
	) -> (impl Future<Output = ()>, CacheRequester) {
469
		// Communication with the outside world :
470
		let (requester_tx, mut requester_rx) =
471
			sc_utils::mpsc::tracing_unbounded("trace-filter-cache", 100_000);
472

            
473
		// Task running in the service.
474
		let task = async move {
475
			// The following variables are polled by the select! macro, and thus cannot be
476
			// part of Self without introducing borrowing issues.
477
			let mut batch_expirations = FuturesUnordered::new();
478
			let (blocking_tx, mut blocking_rx) =
479
				mpsc::channel(blocking_permits.available_permits() * 2);
480
			let metrics = if let Some(registry) = prometheus {
481
				match Metrics::register(&registry) {
482
					Ok(metrics) => Some(metrics),
483
					Err(err) => {
484
						log::error!(target: "tracing", "Failed to register metrics {err:?}");
485
						None
486
					}
487
				}
488
			} else {
489
				None
490
			};
491
			// Contains the inner state of the cache task, excluding the pooled futures/channels.
492
			// Having this object allows to refactor each event into its own function, simplifying
493
			// the main loop.
494
			let mut inner = Self {
495
				client,
496
				backend,
497
				blocking_permits,
498
				cached_blocks: BTreeMap::new(),
499
				batches: BTreeMap::new(),
500
				next_batch_id: 0,
501
				metrics,
502
				_phantom: Default::default(),
503
			};
504

            
505
			// Main event loop. This loop must not contain any direct .await, as we want to
506
			// react to events as fast as possible.
507
			loop {
508
				select! {
509
					request = requester_rx.next() => {
510
						match request {
511
							None => break,
512
							Some(CacheRequest::StartBatch {sender, blocks})
513
								=> inner.request_start_batch(&blocking_tx, sender, blocks, overrides.clone()),
514
							Some(CacheRequest::GetTraces {sender, block})
515
								=> inner.request_get_traces(sender, block),
516
							Some(CacheRequest::StopBatch {batch_id}) => {
517
								// Cannot be refactored inside `request_stop_batch` because
518
								// it has an unnamable type :C
519
								batch_expirations.push(async move {
520
									sleep(cache_duration).await;
521
									batch_id
522
								});
523

            
524
								inner.request_stop_batch(batch_id);
525
							},
526
						}
527
					},
528
					message = blocking_rx.recv().fuse() => {
529
						match message {
530
							None => (),
531
							Some(BlockingTaskMessage::Started { block_hash })
532
								=> inner.blocking_started(block_hash),
533
							Some(BlockingTaskMessage::Finished { block_hash, result })
534
								=> inner.blocking_finished(block_hash, result),
535
						}
536
					},
537
					batch_id = batch_expirations.next() => {
538
						match batch_id {
539
							None => (),
540
							Some(batch_id) => inner.expired_batch(batch_id),
541
						}
542
					}
543
				}
544
			}
545
		}
546
		.instrument(tracing::debug_span!("trace_filter_cache"));
547

            
548
		(task, CacheRequester(requester_tx))
549
	}
550

            
551
	/// Handle the creation of a batch.
552
	/// Will start the tracing process for blocks that are not already in the cache.
553
	#[instrument(skip(self, blocking_tx, sender, blocks, overrides))]
554
	fn request_start_batch(
555
		&mut self,
556
		blocking_tx: &mpsc::Sender<BlockingTaskMessage>,
557
		sender: oneshot::Sender<CacheBatchId>,
558
		blocks: Vec<H256>,
559
		overrides: Arc<dyn StorageOverride<B>>,
560
	) {
561
		tracing::trace!("Starting batch {}", self.next_batch_id);
562
		self.batches.insert(self.next_batch_id, blocks.clone());
563

            
564
		for block in blocks {
565
			// The block is already in the cache, awesome!
566
			if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
567
				block_cache.active_batch_count += 1;
568
				tracing::trace!(
569
					"Cache hit for block {}, now used by {} batches.",
570
					block,
571
					block_cache.active_batch_count
572
				);
573
			}
574
			// Otherwise we need to queue this block for tracing.
575
			else {
576
				tracing::trace!("Cache miss for block {}, pooling it for tracing.", block);
577

            
578
				let blocking_permits = Arc::clone(&self.blocking_permits);
579
				let (unqueue_sender, unqueue_receiver) = oneshot::channel();
580
				let client = Arc::clone(&self.client);
581
				let backend = Arc::clone(&self.backend);
582
				let blocking_tx = blocking_tx.clone();
583
				let overrides = overrides.clone();
584

            
585
				// Spawn all block caching asynchronously.
586
				// It will wait to obtain a permit, then spawn a blocking task.
587
				// When the blocking task returns its result, it is sent
588
				// thought a channel to the main task loop.
589
				tokio::spawn(
590
					async move {
591
						tracing::trace!("Waiting for blocking permit or task cancellation");
592
						let _permit = select!(
593
							_ = unqueue_receiver.fuse() => {
594
							tracing::trace!("Tracing of the block has been cancelled.");
595
								return;
596
							},
597
							permit = blocking_permits.acquire().fuse() => permit,
598
						);
599

            
600
						// Warn the main task that block tracing as started, and
601
						// this block cache entry should not be removed.
602
						let _ = blocking_tx
603
							.send(BlockingTaskMessage::Started { block_hash: block })
604
							.await;
605

            
606
						tracing::trace!("Start block tracing in a blocking task.");
607

            
608
						// Perform block tracing in a tokio blocking task.
609
						let result = async {
610
							tokio::task::spawn_blocking(move || {
611
								Self::cache_block(client, backend, block, overrides.clone())
612
							})
613
							.await
614
							.map_err(|e| {
615
								format!("Tracing Substrate block {} panicked : {:?}", block, e)
616
							})?
617
						}
618
						.await
619
						.map_err(|e| e.to_string());
620

            
621
						tracing::trace!("Block tracing finished, sending result to main task.");
622

            
623
						// Send a response to the main task.
624
						let _ = blocking_tx
625
							.send(BlockingTaskMessage::Finished {
626
								block_hash: block,
627
								result,
628
							})
629
							.await;
630
					}
631
					.instrument(tracing::trace_span!("Block tracing", block = %block)),
632
				);
633

            
634
				// Insert the block in the cache.
635
				self.cached_blocks.insert(
636
					block,
637
					CacheBlock {
638
						active_batch_count: 1,
639
						state: CacheBlockState::Pooled {
640
							started: false,
641
							waiting_requests: vec![],
642
							unqueue_sender,
643
						},
644
					},
645
				);
646
			}
647
		}
648

            
649
		// Respond with the batch ID.
650
		let _ = sender.send(CacheBatchId(self.next_batch_id));
651

            
652
		// Increase batch ID for the next request.
653
		self.next_batch_id = self.next_batch_id.overflowing_add(1).0;
654
	}
655

            
656
	/// Handle a request to get the traces of the provided block.
657
	/// - If the result is stored in the cache, it sends it immediately.
658
	/// - If the block is currently being pooled, it is added to this block cache waiting list,
659
	///   and all requests concerning this block will be satisfied when the tracing for this block
660
	///   is finished.
661
	/// - If this block is missing from the cache, it means no batch asked for it. All requested
662
	///   blocks should be contained in a batch beforehand, and thus an error is returned.
663
	#[instrument(skip(self))]
664
	fn request_get_traces(&mut self, sender: oneshot::Sender<TxsTraceRes>, block: H256) {
665
		if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
666
			match &mut block_cache.state {
667
				CacheBlockState::Pooled {
668
					ref mut waiting_requests,
669
					..
670
				} => {
671
					tracing::warn!(
672
						"A request asked a pooled block ({}), adding it to the list of \
673
						waiting requests.",
674
						block
675
					);
676
					waiting_requests.push(sender);
677
					if let Some(metrics) = &self.metrics {
678
						metrics.tracing_cache_misses.inc();
679
					}
680
				}
681
				CacheBlockState::Cached { traces, .. } => {
682
					tracing::warn!(
683
						"A request asked a cached block ({}), sending the traces directly.",
684
						block
685
					);
686
					let _ = sender.send(traces.clone());
687
					if let Some(metrics) = &self.metrics {
688
						metrics.tracing_cache_hits.inc();
689
					}
690
				}
691
			}
692
		} else {
693
			tracing::warn!(
694
				"An RPC request asked to get a block ({}) which was not batched.",
695
				block
696
			);
697
			let _ = sender.send(Err(format!(
698
				"RPC request asked a block ({}) that was not batched",
699
				block
700
			)));
701
		}
702
	}
703

            
704
	/// Handle a request to stop a batch.
705
	/// For all blocks that needed to be traced, are only in this batch and not yet started, their
706
	/// tracing is cancelled to save CPU-time and avoid attacks requesting large amount of blocks.
707
	/// This batch data is not yet removed however. Instead a expiration delay timer is started
708
	/// after which the data will indeed be cleared. (the code for that is in the main loop code
709
	/// as it involved an unnamable type :C)
710
	#[instrument(skip(self))]
711
	fn request_stop_batch(&mut self, batch_id: CacheBatchId) {
712
		tracing::trace!("Stopping batch {}", batch_id.0);
713
		if let Some(blocks) = self.batches.get(&batch_id.0) {
714
			for block in blocks {
715
				let mut remove = false;
716

            
717
				// We remove early the block cache if this batch is the last
718
				// pooling this block.
719
				if let Some(block_cache) = self.cached_blocks.get_mut(block) {
720
					if block_cache.active_batch_count == 1
721
						&& matches!(
722
							block_cache.state,
723
							CacheBlockState::Pooled { started: false, .. }
724
						) {
725
						remove = true;
726
					}
727
				}
728

            
729
				if remove {
730
					tracing::trace!("Pooled block {} is no longer requested.", block);
731
					// Remove block from the cache. Drops the value,
732
					// closing all the channels contained in it.
733
					let _ = self.cached_blocks.remove(block);
734
				}
735
			}
736
		}
737
	}
738

            
739
	/// A tracing blocking task notifies it got a permit and is starting the tracing.
740
	/// This started status is stored to avoid removing this block entry.
741
	#[instrument(skip(self))]
742
	fn blocking_started(&mut self, block_hash: H256) {
743
		if let Some(block_cache) = self.cached_blocks.get_mut(&block_hash) {
744
			if let CacheBlockState::Pooled {
745
				ref mut started, ..
746
			} = block_cache.state
747
			{
748
				*started = true;
749
			}
750
		}
751
	}
752

            
753
	/// A tracing blocking task notifies it has finished the tracing and provide the result.
754
	#[instrument(skip(self, result))]
755
	fn blocking_finished(&mut self, block_hash: H256, result: TxsTraceRes) {
756
		// In some cases it might be possible to receive traces of a block
757
		// that has no entry in the cache because it was removed of the pool
758
		// and received a permit concurrently. We just ignore it.
759
		//
760
		// TODO : Should we add it back ? Should it have an active_batch_count
761
		// of 1 then ?
762
		if let Some(block_cache) = self.cached_blocks.get_mut(&block_hash) {
763
			if let CacheBlockState::Pooled {
764
				ref mut waiting_requests,
765
				..
766
			} = block_cache.state
767
			{
768
				tracing::trace!(
769
					"A new block ({}) has been traced, adding it to the cache and responding to \
770
					{} waiting requests.",
771
					block_hash,
772
					waiting_requests.len()
773
				);
774
				// Send result in waiting channels
775
				while let Some(channel) = waiting_requests.pop() {
776
					let _ = channel.send(result.clone());
777
				}
778

            
779
				// Update cache entry
780
				block_cache.state = CacheBlockState::Cached { traces: result };
781
			}
782
		}
783
	}
784

            
785
	/// A batch expiration delay timer has completed. It performs the cache cleaning for blocks
786
	/// not longer used by other batches.
787
	#[instrument(skip(self))]
788
	fn expired_batch(&mut self, batch_id: CacheBatchId) {
789
		if let Some(batch) = self.batches.remove(&batch_id.0) {
790
			for block in batch {
791
				// For each block of the batch, we remove it if it was the
792
				// last batch containing it.
793
				let mut remove = false;
794
				if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
795
					block_cache.active_batch_count -= 1;
796

            
797
					if block_cache.active_batch_count == 0 {
798
						remove = true;
799
					}
800
				}
801

            
802
				if remove {
803
					let _ = self.cached_blocks.remove(&block);
804
				}
805
			}
806
		}
807
	}
808

            
809
	/// (In blocking task) Use the Runtime API to trace the block.
810
	#[instrument(skip(client, backend, overrides))]
811
	fn cache_block(
812
		client: Arc<C>,
813
		backend: Arc<BE>,
814
		substrate_hash: H256,
815
		overrides: Arc<dyn StorageOverride<B>>,
816
	) -> TxsTraceRes {
817
		// Get Substrate block data.
818
		let api = client.runtime_api();
819
		let block_header = client
820
			.header(substrate_hash)
821
			.map_err(|e| {
822
				format!(
823
					"Error when fetching substrate block {} header : {:?}",
824
					substrate_hash, e
825
				)
826
			})?
827
			.ok_or_else(|| format!("Substrate block {} don't exist", substrate_hash))?;
828

            
829
		let height = *block_header.number();
830
		let substrate_parent_hash = *block_header.parent_hash();
831

            
832
		// Get Ethereum block data.
833
		let (eth_block, eth_transactions) = match (
834
			overrides.current_block(substrate_hash),
835
			overrides.current_transaction_statuses(substrate_hash),
836
		) {
837
			(Some(a), Some(b)) => (a, b),
838
			_ => {
839
				return Err(format!(
840
					"Failed to get Ethereum block data for Substrate block {}",
841
					substrate_hash
842
				))
843
			}
844
		};
845

            
846
		let eth_block_hash = eth_block.header.hash();
847
		let eth_tx_hashes = eth_transactions
848
			.iter()
849
			.map(|t| t.transaction_hash)
850
			.collect();
851

            
852
		// Get extrinsics (containing Ethereum ones)
853
		let extrinsics = backend
854
			.blockchain()
855
			.body(substrate_hash)
856
			.map_err(|e| {
857
				format!(
858
					"Blockchain error when fetching extrinsics of block {} : {:?}",
859
					height, e
860
				)
861
			})?
862
			.ok_or_else(|| format!("Could not find block {} when fetching extrinsics.", height))?;
863

            
864
		// Get DebugRuntimeApi version
865
		let trace_api_version = if let Ok(Some(api_version)) =
866
			api.api_version::<dyn DebugRuntimeApi<B>>(substrate_parent_hash)
867
		{
868
			api_version
869
		} else {
870
			return Err("Runtime api version call failed (trace)".to_string());
871
		};
872

            
873
		// Trace the block.
874
		let f = || -> Result<_, String> {
875
			let result = if trace_api_version >= 5 {
876
				api.trace_block(
877
					substrate_parent_hash,
878
					extrinsics,
879
					eth_tx_hashes,
880
					&block_header,
881
				)
882
			} else {
883
				// Get core runtime api version
884
				let core_api_version = if let Ok(Some(api_version)) =
885
					api.api_version::<dyn Core<B>>(substrate_parent_hash)
886
				{
887
					api_version
888
				} else {
889
					return Err("Runtime api version call failed (core)".to_string());
890
				};
891

            
892
				// Initialize block: calls the "on_initialize" hook on every pallet
893
				// in AllPalletsWithSystem
894
				// This was fine before pallet-message-queue because the XCM messages
895
				// were processed by the "setValidationData" inherent call and not on an
896
				// "on_initialize" hook, which runs before enabling XCM tracing
897
				if core_api_version >= 5 {
898
					api.initialize_block(substrate_parent_hash, &block_header)
899
						.map_err(|e| format!("Runtime api access error: {:?}", e))?;
900
				} else {
901
					#[allow(deprecated)]
902
					api.initialize_block_before_version_5(substrate_parent_hash, &block_header)
903
						.map_err(|e| format!("Runtime api access error: {:?}", e))?;
904
				}
905

            
906
				#[allow(deprecated)]
907
				api.trace_block_before_version_5(substrate_parent_hash, extrinsics, eth_tx_hashes)
908
			};
909

            
910
			result
911
				.map_err(|e| format!("Blockchain error when replaying block {} : {:?}", height, e))?
912
				.map_err(|e| {
913
					tracing::warn!(
914
						target: "tracing",
915
						"Internal runtime error when replaying block {} : {:?}",
916
						height,
917
						e
918
					);
919
					format!(
920
						"Internal runtime error when replaying block {} : {:?}",
921
						height, e
922
					)
923
				})?;
924

            
925
			Ok(moonbeam_rpc_primitives_debug::Response::Block)
926
		};
927

            
928
		let eth_transactions_by_index: BTreeMap<u32, H256> = eth_transactions
929
			.iter()
930
			.map(|t| (t.transaction_index, t.transaction_hash))
931
			.collect();
932

            
933
		let mut proxy = moonbeam_client_evm_tracing::listeners::CallList::default();
934
		proxy.using(f)?;
935

            
936
		let traces: Vec<TransactionTrace> =
937
			moonbeam_client_evm_tracing::formatters::TraceFilter::format(proxy)
938
				.ok_or("Fail to format proxy")?
939
				.into_iter()
940
				.filter_map(|mut trace| {
941
					match eth_transactions_by_index.get(&trace.transaction_position) {
942
						Some(transaction_hash) => {
943
							trace.block_hash = eth_block_hash;
944
							trace.block_number = height;
945
							trace.transaction_hash = *transaction_hash;
946

            
947
							// Reformat error messages.
948
							if let block::TransactionTraceOutput::Error(ref mut error) =
949
								trace.output
950
							{
951
								if error.as_slice() == b"execution reverted" {
952
									*error = b"Reverted".to_vec();
953
								}
954
							}
955

            
956
							Some(trace)
957
						}
958
						None => {
959
							log::warn!(
960
								target: "tracing",
961
								"A trace in block {} does not map to any known ethereum transaction. Trace: {:?}",
962
								height,
963
								trace,
964
							);
965
							None
966
						}
967
					}
968
				})
969
				.collect();
970

            
971
		Ok(traces)
972
	}
973
}
974

            
975
/// Prometheus metrics for tracing.
976
#[derive(Clone)]
977
pub(crate) struct Metrics {
978
	tracing_cache_hits: Counter<U64>,
979
	tracing_cache_misses: Counter<U64>,
980
}
981

            
982
impl Metrics {
983
	pub(crate) fn register(registry: &PrometheusRegistry) -> Result<Self, PrometheusError> {
984
		Ok(Self {
985
			tracing_cache_hits: register(
986
				Counter::new("tracing_cache_hits", "Number of tracing cache hits.")?,
987
				registry,
988
			)?,
989
			tracing_cache_misses: register(
990
				Counter::new("tracing_cache_misses", "Number of tracing cache misses.")?,
991
				registry,
992
			)?,
993
		})
994
	}
995
}