1
// Copyright 2024 Moonbeam foundation
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use sp_blockchain::{CachedHeaderMetadata, HeaderMetadata};
18
use sp_core::storage::well_known_keys;
19
use sp_runtime::{
20
	generic::BlockId,
21
	traits::{Block as BlockT, HashingFor, Header as HeaderT, NumberFor, Zero},
22
	Justification, Justifications, StateVersion, Storage,
23
};
24
use sp_state_machine::{
25
	BackendTransaction, ChildStorageCollection, IndexOperation, StorageCollection, TrieBackend,
26
};
27
use std::future::Future;
28
use std::marker::PhantomData;
29
use std::ops::AddAssign;
30
use std::time::Duration;
31
use std::{
32
	collections::{HashMap, HashSet},
33
	ptr,
34
	sync::Arc,
35
};
36

            
37
use sc_client_api::{
38
	backend::{self, NewBlockState},
39
	blockchain::{self, BlockStatus, HeaderBackend},
40
	leaves::LeafSet,
41
	UsageInfo,
42
};
43

            
44
use jsonrpsee::http_client::HttpClient;
45
use sp_runtime::generic::SignedBlock;
46

            
47
use crate::chain_spec;
48
use crate::lazy_loading::lock::ReadWriteLock;
49
use crate::lazy_loading::state_overrides::StateEntry;
50
use crate::lazy_loading::{helpers, state_overrides};
51
use moonbeam_cli_opt::LazyLoadingConfig;
52
use moonbeam_core_primitives::BlockNumber;
53
use sc_client_api::StorageKey;
54
use sc_service::{Configuration, Error};
55
use serde::de::DeserializeOwned;
56
use sp_core::offchain::storage::InMemOffchainStorage;
57
use sp_core::{twox_128, H256};
58
use sp_rpc::list::ListOrValue;
59
use sp_rpc::number::NumberOrHex;
60
use sp_storage::{ChildInfo, StorageData};
61
use sp_trie::PrefixedMemoryDB;
62
use tokio_retry::strategy::FixedInterval;
63
use tokio_retry::Retry;
64

            
65
struct PendingBlock<B: BlockT> {
66
	block: StoredBlock<B>,
67
	state: NewBlockState,
68
}
69

            
70
#[derive(PartialEq, Eq, Clone)]
71
enum StoredBlock<B: BlockT> {
72
	Header(B::Header, Option<Justifications>),
73
	Full(B, Option<Justifications>),
74
}
75

            
76
impl<B: BlockT> StoredBlock<B> {
77
	fn new(
78
		header: B::Header,
79
		body: Option<Vec<B::Extrinsic>>,
80
		just: Option<Justifications>,
81
	) -> Self {
82
		match body {
83
			Some(body) => StoredBlock::Full(B::new(header, body), just),
84
			None => StoredBlock::Header(header, just),
85
		}
86
	}
87

            
88
	fn header(&self) -> &B::Header {
89
		match *self {
90
			StoredBlock::Header(ref h, _) => h,
91
			StoredBlock::Full(ref b, _) => b.header(),
92
		}
93
	}
94

            
95
	fn justifications(&self) -> Option<&Justifications> {
96
		match *self {
97
			StoredBlock::Header(_, ref j) | StoredBlock::Full(_, ref j) => j.as_ref(),
98
		}
99
	}
100

            
101
	fn extrinsics(&self) -> Option<&[B::Extrinsic]> {
102
		match *self {
103
			StoredBlock::Header(_, _) => None,
104
			StoredBlock::Full(ref b, _) => Some(b.extrinsics()),
105
		}
106
	}
107

            
108
	fn into_inner(self) -> (B::Header, Option<Vec<B::Extrinsic>>, Option<Justifications>) {
109
		match self {
110
			StoredBlock::Header(header, just) => (header, None, just),
111
			StoredBlock::Full(block, just) => {
112
				let (header, body) = block.deconstruct();
113
				(header, Some(body), just)
114
			}
115
		}
116
	}
117
}
118

            
119
#[derive(Clone)]
120
struct BlockchainStorage<Block: BlockT> {
121
	blocks: HashMap<Block::Hash, StoredBlock<Block>>,
122
	hashes: HashMap<NumberFor<Block>, Block::Hash>,
123
	best_hash: Block::Hash,
124
	best_number: NumberFor<Block>,
125
	finalized_hash: Block::Hash,
126
	finalized_number: NumberFor<Block>,
127
	genesis_hash: Block::Hash,
128
	header_cht_roots: HashMap<NumberFor<Block>, Block::Hash>,
129
	leaves: LeafSet<Block::Hash, NumberFor<Block>>,
130
	aux: HashMap<Vec<u8>, Vec<u8>>,
131
}
132

            
133
/// In-memory blockchain. Supports concurrent reads.
134
#[derive(Clone)]
135
pub struct Blockchain<Block: BlockT> {
136
	rpc_client: Arc<RPC>,
137
	storage: Arc<ReadWriteLock<BlockchainStorage<Block>>>,
138
}
139

            
140
impl<Block: BlockT + DeserializeOwned> Blockchain<Block> {
141
	/// Get header hash of given block.
142
	pub fn id(&self, id: BlockId<Block>) -> Option<Block::Hash> {
143
		match id {
144
			BlockId::Hash(h) => Some(h),
145
			BlockId::Number(n) => self.storage.read().hashes.get(&n).cloned(),
146
		}
147
	}
148

            
149
	/// Create new in-memory blockchain storage.
150
	fn new(rpc_client: Arc<RPC>) -> Blockchain<Block> {
151
		let storage = Arc::new(ReadWriteLock::new(BlockchainStorage {
152
			blocks: HashMap::new(),
153
			hashes: HashMap::new(),
154
			best_hash: Default::default(),
155
			best_number: Zero::zero(),
156
			finalized_hash: Default::default(),
157
			finalized_number: Zero::zero(),
158
			genesis_hash: Default::default(),
159
			header_cht_roots: HashMap::new(),
160
			leaves: LeafSet::new(),
161
			aux: HashMap::new(),
162
		}));
163
		Blockchain {
164
			rpc_client,
165
			storage,
166
		}
167
	}
168

            
169
	/// Insert a block header and associated data.
170
	pub fn insert(
171
		&self,
172
		hash: Block::Hash,
173
		header: <Block as BlockT>::Header,
174
		justifications: Option<Justifications>,
175
		body: Option<Vec<<Block as BlockT>::Extrinsic>>,
176
		new_state: NewBlockState,
177
	) -> sp_blockchain::Result<()> {
178
		let number = *header.number();
179
		if new_state.is_best() {
180
			self.apply_head(&header)?;
181
		}
182

            
183
		{
184
			let mut storage = self.storage.write();
185
			storage.leaves.import(hash, number, *header.parent_hash());
186
			storage
187
				.blocks
188
				.insert(hash, StoredBlock::new(header, body, justifications));
189

            
190
			if let NewBlockState::Final = new_state {
191
				storage.finalized_hash = hash;
192
				storage.finalized_number = number;
193
			}
194

            
195
			if number == Zero::zero() {
196
				storage.genesis_hash = hash;
197
			}
198
		}
199

            
200
		Ok(())
201
	}
202

            
203
	/// Get total number of blocks.
204
	pub fn blocks_count(&self) -> usize {
205
		let count = self.storage.read().blocks.len();
206

            
207
		log::debug!(
208
			target: super::LAZY_LOADING_LOG_TARGET,
209
			"Total number of blocks: {:?}",
210
			count
211
		);
212

            
213
		count
214
	}
215

            
216
	/// Compare this blockchain with another in-mem blockchain
217
	pub fn equals_to(&self, other: &Self) -> bool {
218
		// Check ptr equality first to avoid double read locks.
219
		if ptr::eq(self, other) {
220
			return true;
221
		}
222
		self.canon_equals_to(other) && self.storage.read().blocks == other.storage.read().blocks
223
	}
224

            
225
	/// Compare canonical chain to other canonical chain.
226
	pub fn canon_equals_to(&self, other: &Self) -> bool {
227
		// Check ptr equality first to avoid double read locks.
228
		if ptr::eq(self, other) {
229
			return true;
230
		}
231
		let this = self.storage.read();
232
		let other = other.storage.read();
233
		this.hashes == other.hashes
234
			&& this.best_hash == other.best_hash
235
			&& this.best_number == other.best_number
236
			&& this.genesis_hash == other.genesis_hash
237
	}
238

            
239
	/// Insert header CHT root.
240
	pub fn insert_cht_root(&self, block: NumberFor<Block>, cht_root: Block::Hash) {
241
		self.storage
242
			.write()
243
			.header_cht_roots
244
			.insert(block, cht_root);
245
	}
246

            
247
	/// Set an existing block as head.
248
	pub fn set_head(&self, hash: Block::Hash) -> sp_blockchain::Result<()> {
249
		let header = self
250
			.header(hash)?
251
			.ok_or_else(|| sp_blockchain::Error::UnknownBlock(format!("{}", hash)))?;
252

            
253
		self.apply_head(&header)
254
	}
255

            
256
	fn apply_head(&self, header: &<Block as BlockT>::Header) -> sp_blockchain::Result<()> {
257
		let mut storage = self.storage.write();
258

            
259
		let hash = header.hash();
260
		let number = header.number();
261

            
262
		storage.best_hash = hash;
263
		storage.best_number = *number;
264
		storage.hashes.insert(*number, hash);
265

            
266
		Ok(())
267
	}
268

            
269
	fn finalize_header(
270
		&self,
271
		block: Block::Hash,
272
		justification: Option<Justification>,
273
	) -> sp_blockchain::Result<()> {
274
		let mut storage = self.storage.write();
275
		storage.finalized_hash = block;
276

            
277
		if justification.is_some() {
278
			let block = storage
279
				.blocks
280
				.get_mut(&block)
281
				.expect("hash was fetched from a block in the db; qed");
282

            
283
			let block_justifications = match block {
284
				StoredBlock::Header(_, ref mut j) | StoredBlock::Full(_, ref mut j) => j,
285
			};
286

            
287
			*block_justifications = justification.map(Justifications::from);
288
		}
289

            
290
		Ok(())
291
	}
292

            
293
	fn append_justification(
294
		&self,
295
		hash: Block::Hash,
296
		justification: Justification,
297
	) -> sp_blockchain::Result<()> {
298
		let mut storage = self.storage.write();
299

            
300
		let block = storage
301
			.blocks
302
			.get_mut(&hash)
303
			.expect("hash was fetched from a block in the db; qed");
304

            
305
		let block_justifications = match block {
306
			StoredBlock::Header(_, ref mut j) | StoredBlock::Full(_, ref mut j) => j,
307
		};
308

            
309
		if let Some(stored_justifications) = block_justifications {
310
			if !stored_justifications.append(justification) {
311
				return Err(sp_blockchain::Error::BadJustification(
312
					"Duplicate consensus engine ID".into(),
313
				));
314
			}
315
		} else {
316
			*block_justifications = Some(Justifications::from(justification));
317
		};
318

            
319
		Ok(())
320
	}
321

            
322
	fn write_aux(&self, ops: Vec<(Vec<u8>, Option<Vec<u8>>)>) {
323
		let mut storage = self.storage.write();
324
		for (k, v) in ops {
325
			match v {
326
				Some(v) => storage.aux.insert(k, v),
327
				None => storage.aux.remove(&k),
328
			};
329
		}
330
	}
331
}
332

            
333
impl<Block: BlockT + DeserializeOwned> HeaderBackend<Block> for Blockchain<Block> {
334
	fn header(
335
		&self,
336
		hash: Block::Hash,
337
	) -> sp_blockchain::Result<Option<<Block as BlockT>::Header>> {
338
		// First, try to get the header from local storage
339
		if let Some(header) = self
340
			.storage
341
			.read()
342
			.blocks
343
			.get(&hash)
344
			.map(|b| b.header().clone())
345
		{
346
			return Ok(Some(header));
347
		}
348

            
349
		// If not found in local storage, fetch from RPC client
350
		let header = self
351
			.rpc_client
352
			.block::<Block, _>(Some(hash))
353
			.ok()
354
			.flatten()
355
			.map(|full_block| {
356
				// Cache block header
357
				let block = full_block.block.clone();
358
				self.storage.write().blocks.insert(
359
					hash,
360
					StoredBlock::Full(block.clone(), full_block.justifications),
361
				);
362

            
363
				block.header().clone()
364
			});
365

            
366
		if header.is_none() {
367
			log::warn!(
368
				target: super::LAZY_LOADING_LOG_TARGET,
369
				"Expected block {:x?} to exist.",
370
				&hash
371
			);
372
		}
373

            
374
		Ok(header)
375
	}
376

            
377
	fn info(&self) -> blockchain::Info<Block> {
378
		let storage = self.storage.read();
379
		blockchain::Info {
380
			best_hash: storage.best_hash,
381
			best_number: storage.best_number,
382
			genesis_hash: storage.genesis_hash,
383
			finalized_hash: storage.finalized_hash,
384
			finalized_number: storage.finalized_number,
385
			finalized_state: Some((storage.finalized_hash, storage.finalized_number)),
386
			number_leaves: storage.leaves.count(),
387
			block_gap: None,
388
		}
389
	}
390

            
391
	fn status(&self, hash: Block::Hash) -> sp_blockchain::Result<BlockStatus> {
392
		match self.storage.read().blocks.contains_key(&hash) {
393
			true => Ok(BlockStatus::InChain),
394
			false => Ok(BlockStatus::Unknown),
395
		}
396
	}
397

            
398
	fn number(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<NumberFor<Block>>> {
399
		let number = match self.storage.read().blocks.get(&hash) {
400
			Some(block) => *block.header().number(),
401
			_ => match self.rpc_client.block::<Block, _>(Some(hash)) {
402
				Ok(Some(block)) => *block.block.header().number(),
403
				err => {
404
					return Err(sp_blockchain::Error::UnknownBlock(
405
						format!("Failed to fetch block number from RPC: {:?}", err).into(),
406
					));
407
				}
408
			},
409
		};
410

            
411
		Ok(Some(number))
412
	}
413

            
414
	fn hash(
415
		&self,
416
		number: <<Block as BlockT>::Header as HeaderT>::Number,
417
	) -> sp_blockchain::Result<Option<Block::Hash>> {
418
		Ok(self.id(BlockId::Number(number)))
419
	}
420
}
421

            
422
impl<Block: BlockT + DeserializeOwned> HeaderMetadata<Block> for Blockchain<Block> {
423
	type Error = sp_blockchain::Error;
424

            
425
	fn header_metadata(
426
		&self,
427
		hash: Block::Hash,
428
	) -> Result<CachedHeaderMetadata<Block>, Self::Error> {
429
		self.header(hash)?
430
			.map(|header| CachedHeaderMetadata::from(&header))
431
			.ok_or_else(|| {
432
				sp_blockchain::Error::UnknownBlock(format!("header not found: {}", hash))
433
			})
434
	}
435

            
436
	fn insert_header_metadata(&self, _hash: Block::Hash, _metadata: CachedHeaderMetadata<Block>) {
437
		// No need to implement.
438
		unimplemented!("insert_header_metadata")
439
	}
440
	fn remove_header_metadata(&self, _hash: Block::Hash) {
441
		// No need to implement.
442
		unimplemented!("remove_header_metadata")
443
	}
444
}
445

            
446
impl<Block: BlockT + DeserializeOwned> blockchain::Backend<Block> for Blockchain<Block> {
447
	fn body(
448
		&self,
449
		hash: Block::Hash,
450
	) -> sp_blockchain::Result<Option<Vec<<Block as BlockT>::Extrinsic>>> {
451
		// First, try to get the header from local storage
452
		if let Some(extrinsics) = self
453
			.storage
454
			.read()
455
			.blocks
456
			.get(&hash)
457
			.and_then(|b| b.extrinsics().map(|x| x.to_vec()))
458
		{
459
			return Ok(Some(extrinsics));
460
		}
461
		let extrinsics = self
462
			.rpc_client
463
			.block::<Block, Block::Hash>(Some(hash))
464
			.ok()
465
			.flatten()
466
			.map(|b| b.block.extrinsics().to_vec());
467

            
468
		Ok(extrinsics)
469
	}
470

            
471
	fn justifications(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Justifications>> {
472
		Ok(self
473
			.storage
474
			.read()
475
			.blocks
476
			.get(&hash)
477
			.and_then(|b| b.justifications().cloned()))
478
	}
479

            
480
	fn last_finalized(&self) -> sp_blockchain::Result<Block::Hash> {
481
		let last_finalized = self.storage.read().finalized_hash;
482

            
483
		Ok(last_finalized)
484
	}
485

            
486
	fn leaves(&self) -> sp_blockchain::Result<Vec<Block::Hash>> {
487
		Ok(self.storage.read().leaves.hashes())
488
	}
489

            
490
	fn children(&self, _parent_hash: Block::Hash) -> sp_blockchain::Result<Vec<Block::Hash>> {
491
		unimplemented!("Not supported by the `lazy-loading` backend.")
492
	}
493

            
494
	fn indexed_transaction(&self, _hash: Block::Hash) -> sp_blockchain::Result<Option<Vec<u8>>> {
495
		unimplemented!("Not supported by the `lazy-loading` backend.")
496
	}
497

            
498
	fn block_indexed_body(
499
		&self,
500
		_hash: Block::Hash,
501
	) -> sp_blockchain::Result<Option<Vec<Vec<u8>>>> {
502
		unimplemented!("Not supported by the `lazy-loading` backend.")
503
	}
504
}
505

            
506
impl<Block: BlockT + DeserializeOwned> backend::AuxStore for Blockchain<Block> {
507
	fn insert_aux<
508
		'a,
509
		'b: 'a,
510
		'c: 'a,
511
		I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>,
512
		D: IntoIterator<Item = &'a &'b [u8]>,
513
	>(
514
		&self,
515
		insert: I,
516
		delete: D,
517
	) -> sp_blockchain::Result<()> {
518
		let mut storage = self.storage.write();
519
		for (k, v) in insert {
520
			storage.aux.insert(k.to_vec(), v.to_vec());
521
		}
522
		for k in delete {
523
			storage.aux.remove(*k);
524
		}
525
		Ok(())
526
	}
527

            
528
	fn get_aux(&self, key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> {
529
		Ok(self.storage.read().aux.get(key).cloned())
530
	}
531
}
532

            
533
pub struct BlockImportOperation<Block: BlockT> {
534
	pending_block: Option<PendingBlock<Block>>,
535
	old_state: ForkedLazyBackend<Block>,
536
	new_state: Option<BackendTransaction<HashingFor<Block>>>,
537
	aux: Vec<(Vec<u8>, Option<Vec<u8>>)>,
538
	storage_updates: StorageCollection,
539
	finalized_blocks: Vec<(Block::Hash, Option<Justification>)>,
540
	set_head: Option<Block::Hash>,
541
	pub(crate) before_fork: bool,
542
}
543

            
544
impl<Block: BlockT + DeserializeOwned> BlockImportOperation<Block> {
545
	fn apply_storage(
546
		&mut self,
547
		storage: Storage,
548
		commit: bool,
549
		state_version: StateVersion,
550
	) -> sp_blockchain::Result<Block::Hash> {
551
		use sp_state_machine::Backend;
552
		check_genesis_storage(&storage)?;
553

            
554
		let child_delta = storage.children_default.values().map(|child_content| {
555
			(
556
				&child_content.child_info,
557
				child_content
558
					.data
559
					.iter()
560
					.map(|(k, v)| (k.as_ref(), Some(v.as_ref()))),
561
			)
562
		});
563

            
564
		let (root, transaction) = self.old_state.full_storage_root(
565
			storage
566
				.top
567
				.iter()
568
				.map(|(k, v)| (k.as_ref(), Some(v.as_ref()))),
569
			child_delta,
570
			state_version,
571
		);
572

            
573
		if commit {
574
			self.new_state = Some(transaction);
575
			self.storage_updates = storage
576
				.top
577
				.iter()
578
				.map(|(k, v)| {
579
					if v.is_empty() {
580
						(k.clone(), None)
581
					} else {
582
						(k.clone(), Some(v.clone()))
583
					}
584
				})
585
				.collect();
586
		}
587
		Ok(root)
588
	}
589
}
590

            
591
impl<Block: BlockT + DeserializeOwned> backend::BlockImportOperation<Block>
592
	for BlockImportOperation<Block>
593
{
594
	type State = ForkedLazyBackend<Block>;
595

            
596
	fn state(&self) -> sp_blockchain::Result<Option<&Self::State>> {
597
		Ok(Some(&self.old_state))
598
	}
599

            
600
	fn set_block_data(
601
		&mut self,
602
		header: <Block as BlockT>::Header,
603
		body: Option<Vec<<Block as BlockT>::Extrinsic>>,
604
		_indexed_body: Option<Vec<Vec<u8>>>,
605
		justifications: Option<Justifications>,
606
		state: NewBlockState,
607
	) -> sp_blockchain::Result<()> {
608
		assert!(
609
			self.pending_block.is_none(),
610
			"Only one block per operation is allowed"
611
		);
612
		self.pending_block = Some(PendingBlock {
613
			block: StoredBlock::new(header, body, justifications),
614
			state,
615
		});
616
		Ok(())
617
	}
618

            
619
	fn update_db_storage(
620
		&mut self,
621
		update: BackendTransaction<HashingFor<Block>>,
622
	) -> sp_blockchain::Result<()> {
623
		self.new_state = Some(update);
624
		Ok(())
625
	}
626

            
627
	fn set_genesis_state(
628
		&mut self,
629
		storage: Storage,
630
		commit: bool,
631
		state_version: StateVersion,
632
	) -> sp_blockchain::Result<Block::Hash> {
633
		self.apply_storage(storage, commit, state_version)
634
	}
635

            
636
	fn reset_storage(
637
		&mut self,
638
		storage: Storage,
639
		state_version: StateVersion,
640
	) -> sp_blockchain::Result<Block::Hash> {
641
		self.apply_storage(storage, true, state_version)
642
	}
643

            
644
	fn insert_aux<I>(&mut self, ops: I) -> sp_blockchain::Result<()>
645
	where
646
		I: IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
647
	{
648
		self.aux.append(&mut ops.into_iter().collect());
649
		Ok(())
650
	}
651

            
652
	fn update_storage(
653
		&mut self,
654
		update: StorageCollection,
655
		_child_update: ChildStorageCollection,
656
	) -> sp_blockchain::Result<()> {
657
		self.storage_updates = update.clone();
658
		Ok(())
659
	}
660

            
661
	fn mark_finalized(
662
		&mut self,
663
		hash: Block::Hash,
664
		justification: Option<Justification>,
665
	) -> sp_blockchain::Result<()> {
666
		self.finalized_blocks.push((hash, justification));
667
		Ok(())
668
	}
669

            
670
	fn mark_head(&mut self, hash: Block::Hash) -> sp_blockchain::Result<()> {
671
		assert!(
672
			self.pending_block.is_none(),
673
			"Only one set block per operation is allowed"
674
		);
675
		self.set_head = Some(hash);
676
		Ok(())
677
	}
678

            
679
	fn update_transaction_index(
680
		&mut self,
681
		_index: Vec<IndexOperation>,
682
	) -> sp_blockchain::Result<()> {
683
		Ok(())
684
	}
685
}
686

            
687
/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
688
pub type DbState<B> = TrieBackend<Arc<dyn sp_state_machine::Storage<HashingFor<B>>>, HashingFor<B>>;
689

            
690
/// A struct containing arguments for iterating over the storage.
691
#[derive(Default)]
692
pub struct RawIterArgs {
693
	/// The prefix of the keys over which to iterate.
694
	pub prefix: Option<Vec<u8>>,
695

            
696
	/// The prefix from which to start the iteration from.
697
	///
698
	/// This is inclusive and the iteration will include the key which is specified here.
699
	pub start_at: Option<Vec<u8>>,
700

            
701
	/// If this is `true` then the iteration will *not* include
702
	/// the key specified in `start_at`, if there is such a key.
703
	pub start_at_exclusive: bool,
704
}
705

            
706
/// A raw iterator over the `BenchmarkingState`.
707
pub struct RawIter<Block: BlockT> {
708
	pub(crate) args: RawIterArgs,
709
	complete: bool,
710
	_phantom: PhantomData<Block>,
711
}
712

            
713
impl<Block: BlockT + DeserializeOwned> sp_state_machine::StorageIterator<HashingFor<Block>>
714
	for RawIter<Block>
715
{
716
	type Backend = ForkedLazyBackend<Block>;
717
	type Error = String;
718

            
719
	fn next_key(
720
		&mut self,
721
		backend: &Self::Backend,
722
	) -> Option<Result<sp_state_machine::StorageKey, Self::Error>> {
723
		use sp_state_machine::Backend;
724

            
725
		let remote_fetch =
726
			|key: Option<StorageKey>, start_key: Option<StorageKey>, block: Option<Block::Hash>| {
727
				let result = backend
728
					.rpc_client
729
					.storage_keys_paged(key, 5, start_key, block);
730

            
731
				match result {
732
					Ok(keys) => keys.first().map(|key| key.clone()),
733
					Err(err) => {
734
						log::trace!(
735
							target: super::LAZY_LOADING_LOG_TARGET,
736
							"Failed to fetch `next key` from RPC: {:?}",
737
							err
738
						);
739

            
740
						None
741
					}
742
				}
743
			};
744

            
745
		let prefix = self.args.prefix.clone().map(|k| StorageKey(k));
746
		let start_key = self.args.start_at.clone().map(|k| StorageKey(k));
747

            
748
		let maybe_next_key = if backend.before_fork {
749
			remote_fetch(prefix, start_key, backend.block_hash)
750
		} else {
751
			let mut iter_args = sp_state_machine::backend::IterArgs::default();
752
			iter_args.prefix = self.args.prefix.as_ref().map(|b| b.as_slice());
753
			iter_args.start_at = self.args.start_at.as_ref().map(|b| b.as_slice());
754
			iter_args.start_at_exclusive = true;
755
			iter_args.stop_on_incomplete_database = true;
756

            
757
			let readable_db = backend.db.read();
758
			let next_storage_key = readable_db
759
				.raw_iter(iter_args)
760
				.map(|mut iter| iter.next_key(&readable_db))
761
				.map(|op| op.map(|result| result.ok()).flatten())
762
				.ok()
763
				.flatten();
764

            
765
			// IMPORTANT: free storage read lock
766
			drop(readable_db);
767

            
768
			let removed_key = start_key
769
				.clone()
770
				.or(prefix.clone())
771
				.map(|key| backend.removed_keys.read().contains_key(&key.0))
772
				.unwrap_or(false);
773
			if next_storage_key.is_none() && !removed_key {
774
				let maybe_next_key = remote_fetch(prefix, start_key, Some(backend.fork_block));
775
				match maybe_next_key {
776
					Some(key) if !backend.removed_keys.read().contains_key(&key) => Some(key),
777
					_ => None,
778
				}
779
			} else {
780
				next_storage_key
781
			}
782
		};
783

            
784
		log::trace!(
785
			target: super::LAZY_LOADING_LOG_TARGET,
786
			"next_key: (prefix: {:?}, start_at: {:?}, next_key: {:?})",
787
			self.args.prefix.clone().map(|key| hex::encode(key)),
788
			self.args.start_at.clone().map(|key| hex::encode(key)),
789
			maybe_next_key.clone().map(|key| hex::encode(key))
790
		);
791

            
792
		if let Some(next_key) = maybe_next_key {
793
			if self
794
				.args
795
				.prefix
796
				.clone()
797
				.map(|filter_key| next_key.starts_with(&filter_key))
798
				.unwrap_or(false)
799
			{
800
				self.args.start_at = Some(next_key.clone());
801
				Some(Ok(next_key))
802
			} else {
803
				self.complete = true;
804
				None
805
			}
806
		} else {
807
			self.complete = true;
808
			None
809
		}
810
	}
811

            
812
	fn next_pair(
813
		&mut self,
814
		backend: &Self::Backend,
815
	) -> Option<Result<(sp_state_machine::StorageKey, sp_state_machine::StorageValue), Self::Error>>
816
	{
817
		use sp_state_machine::Backend;
818

            
819
		let remote_fetch =
820
			|key: Option<StorageKey>, start_key: Option<StorageKey>, block: Option<Block::Hash>| {
821
				let result = backend
822
					.rpc_client
823
					.storage_keys_paged(key, 5, start_key, block);
824

            
825
				match result {
826
					Ok(keys) => keys.first().map(|key| key.clone()),
827
					Err(err) => {
828
						log::trace!(
829
							target: super::LAZY_LOADING_LOG_TARGET,
830
							"Failed to fetch `next key` from RPC: {:?}",
831
							err
832
						);
833

            
834
						None
835
					}
836
				}
837
			};
838

            
839
		let prefix = self.args.prefix.clone().map(|k| StorageKey(k));
840
		let start_key = self.args.start_at.clone().map(|k| StorageKey(k));
841

            
842
		let maybe_next_key = if backend.before_fork {
843
			remote_fetch(prefix, start_key, backend.block_hash)
844
		} else {
845
			let mut iter_args = sp_state_machine::backend::IterArgs::default();
846
			iter_args.prefix = self.args.prefix.as_ref().map(|b| b.as_slice());
847
			iter_args.start_at = self.args.start_at.as_ref().map(|b| b.as_slice());
848
			iter_args.start_at_exclusive = true;
849
			iter_args.stop_on_incomplete_database = true;
850

            
851
			let readable_db = backend.db.read();
852
			let next_storage_key = readable_db
853
				.raw_iter(iter_args)
854
				.map(|mut iter| iter.next_key(&readable_db))
855
				.map(|op| op.map(|result| result.ok()).flatten())
856
				.ok()
857
				.flatten();
858

            
859
			// IMPORTANT: free storage read lock
860
			drop(readable_db);
861

            
862
			let removed_key = start_key
863
				.clone()
864
				.or(prefix.clone())
865
				.map(|key| backend.removed_keys.read().contains_key(&key.0))
866
				.unwrap_or(false);
867
			if next_storage_key.is_none() && !removed_key {
868
				let maybe_next_key = remote_fetch(prefix, start_key, Some(backend.fork_block));
869
				match maybe_next_key {
870
					Some(key) if !backend.removed_keys.read().contains_key(&key) => Some(key),
871
					_ => None,
872
				}
873
			} else {
874
				next_storage_key
875
			}
876
		};
877

            
878
		log::trace!(
879
			target: super::LAZY_LOADING_LOG_TARGET,
880
			"next_pair: (prefix: {:?}, start_at: {:?}, next_key: {:?})",
881
			self.args.prefix.clone().map(|key| hex::encode(key)),
882
			self.args.start_at.clone().map(|key| hex::encode(key)),
883
			maybe_next_key.clone().map(|key| hex::encode(key))
884
		);
885

            
886
		let maybe_value = maybe_next_key
887
			.clone()
888
			.map(|key| (*backend).storage(key.as_slice()).ok())
889
			.flatten()
890
			.flatten();
891

            
892
		if let Some(next_key) = maybe_next_key {
893
			if self
894
				.args
895
				.prefix
896
				.clone()
897
				.map(|filter_key| next_key.starts_with(&filter_key))
898
				.unwrap_or(false)
899
			{
900
				self.args.start_at = Some(next_key.clone());
901

            
902
				match maybe_value {
903
					Some(value) => Some(Ok((next_key, value))),
904
					_ => None,
905
				}
906
			} else {
907
				self.complete = true;
908
				None
909
			}
910
		} else {
911
			self.complete = true;
912
			None
913
		}
914
	}
915

            
916
	fn was_complete(&self) -> bool {
917
		self.complete
918
	}
919
}
920

            
921
#[derive(Debug, Clone)]
922
pub struct ForkedLazyBackend<Block: BlockT> {
923
	rpc_client: Arc<RPC>,
924
	block_hash: Option<Block::Hash>,
925
	fork_block: Block::Hash,
926
	pub(crate) db: Arc<ReadWriteLock<sp_state_machine::InMemoryBackend<HashingFor<Block>>>>,
927
	pub(crate) removed_keys: Arc<ReadWriteLock<HashMap<Vec<u8>, ()>>>,
928
	before_fork: bool,
929
}
930

            
931
impl<Block: BlockT> ForkedLazyBackend<Block> {
932
	fn update_storage(&self, key: &[u8], value: &Option<Vec<u8>>) {
933
		if let Some(ref val) = value {
934
			let mut entries: HashMap<Option<ChildInfo>, StorageCollection> = Default::default();
935
			entries.insert(None, vec![(key.to_vec(), Some(val.clone()))]);
936

            
937
			self.db.write().insert(entries, StateVersion::V0);
938
		}
939
	}
940
}
941

            
942
impl<Block: BlockT + DeserializeOwned> sp_state_machine::Backend<HashingFor<Block>>
943
	for ForkedLazyBackend<Block>
944
{
945
	type Error = <DbState<Block> as sp_state_machine::Backend<HashingFor<Block>>>::Error;
946
	type TrieBackendStorage = PrefixedMemoryDB<HashingFor<Block>>;
947
	type RawIter = RawIter<Block>;
948

            
949
	fn storage(&self, key: &[u8]) -> Result<Option<sp_state_machine::StorageValue>, Self::Error> {
950
		let remote_fetch = |block: Option<Block::Hash>| {
951
			let result = self.rpc_client.storage(StorageKey(key.to_vec()), block);
952

            
953
			match result {
954
				Ok(data) => data.map(|v| v.0),
955
				Err(err) => {
956
					log::debug!(
957
						target: super::LAZY_LOADING_LOG_TARGET,
958
						"Failed to fetch storage from live network: {:?}",
959
						err
960
					);
961
					None
962
				}
963
			}
964
		};
965

            
966
		if self.before_fork {
967
			return Ok(remote_fetch(self.block_hash));
968
		}
969

            
970
		let readable_db = self.db.read();
971
		let maybe_storage = readable_db.storage(key);
972
		let value = match maybe_storage {
973
			Ok(Some(data)) => Some(data),
974
			_ if !self.removed_keys.read().contains_key(key) => {
975
				let result = remote_fetch(Some(self.fork_block));
976

            
977
				// Cache state
978
				drop(readable_db);
979
				self.update_storage(key, &result);
980

            
981
				result
982
			}
983
			_ => None,
984
		};
985

            
986
		Ok(value)
987
	}
988

            
989
	fn storage_hash(
990
		&self,
991
		key: &[u8],
992
	) -> Result<Option<<HashingFor<Block> as sp_core::Hasher>::Out>, Self::Error> {
993
		let remote_fetch = |block: Option<Block::Hash>| {
994
			let result = self
995
				.rpc_client
996
				.storage_hash(StorageKey(key.to_vec()), block);
997

            
998
			match result {
999
				Ok(hash) => Ok(hash),
				Err(err) => Err(format!("Failed to fetch storage hash from RPC: {:?}", err).into()),
			}
		};
		if self.before_fork {
			return remote_fetch(self.block_hash);
		}
		let storage_hash = self.db.read().storage_hash(key);
		match storage_hash {
			Ok(Some(hash)) => Ok(Some(hash)),
			_ if !self.removed_keys.read().contains_key(key) => remote_fetch(Some(self.fork_block)),
			_ => Ok(None),
		}
	}
	fn closest_merkle_value(
		&self,
		_key: &[u8],
	) -> Result<
		Option<sp_trie::MerkleValue<<HashingFor<Block> as sp_core::Hasher>::Out>>,
		Self::Error,
	> {
		unimplemented!("closest_merkle_value: unsupported feature for lazy loading")
	}
	fn child_closest_merkle_value(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<
		Option<sp_trie::MerkleValue<<HashingFor<Block> as sp_core::Hasher>::Out>>,
		Self::Error,
	> {
		unimplemented!("child_closest_merkle_value: unsupported feature for lazy loading")
	}
	fn child_storage(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<Option<sp_state_machine::StorageValue>, Self::Error> {
		unimplemented!("child_storage: unsupported feature for lazy loading");
	}
	fn child_storage_hash(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<Option<<HashingFor<Block> as sp_core::Hasher>::Out>, Self::Error> {
		unimplemented!("child_storage_hash: unsupported feature for lazy loading");
	}
	fn next_storage_key(
		&self,
		key: &[u8],
	) -> Result<Option<sp_state_machine::StorageKey>, Self::Error> {
		let remote_fetch = |block: Option<Block::Hash>| {
			let start_key = Some(StorageKey(key.to_vec()));
			let result = self
				.rpc_client
				.storage_keys_paged(start_key.clone(), 2, None, block);
			match result {
				Ok(keys) => keys.last().cloned(),
				Err(err) => {
					log::trace!(
						target: super::LAZY_LOADING_LOG_TARGET,
						"Failed to fetch `next storage key` from RPC: {:?}",
						err
					);
					None
				}
			}
		};
		let maybe_next_key = if self.before_fork {
			remote_fetch(self.block_hash)
		} else {
			let next_storage_key = self.db.read().next_storage_key(key);
			match next_storage_key {
				Ok(Some(key)) => Some(key),
				_ if !self.removed_keys.read().contains_key(key) => {
					remote_fetch(Some(self.fork_block))
				}
				_ => None,
			}
		};
		log::trace!(
			target: super::LAZY_LOADING_LOG_TARGET,
			"next_storage_key: (key: {:?}, next_key: {:?})",
			hex::encode(key),
			maybe_next_key.clone().map(|key| hex::encode(key))
		);
		Ok(maybe_next_key)
	}
	fn next_child_storage_key(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<Option<sp_state_machine::StorageKey>, Self::Error> {
		unimplemented!("next_child_storage_key: unsupported feature for lazy loading");
	}
	fn storage_root<'a>(
		&self,
		delta: impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)>,
		state_version: StateVersion,
	) -> (
		<HashingFor<Block> as sp_core::Hasher>::Out,
		BackendTransaction<HashingFor<Block>>,
	)
	where
		<HashingFor<Block> as sp_core::Hasher>::Out: Ord,
	{
		self.db.read().storage_root(delta, state_version)
	}
	fn child_storage_root<'a>(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_delta: impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)>,
		_state_version: StateVersion,
	) -> (
		<HashingFor<Block> as sp_core::Hasher>::Out,
		bool,
		BackendTransaction<HashingFor<Block>>,
	)
	where
		<HashingFor<Block> as sp_core::Hasher>::Out: Ord,
	{
		unimplemented!("child_storage_root: unsupported in lazy loading")
	}
	fn raw_iter(&self, args: sp_state_machine::IterArgs) -> Result<Self::RawIter, Self::Error> {
		let mut clone: RawIterArgs = Default::default();
		clone.start_at_exclusive = args.start_at_exclusive.clone();
		clone.prefix = args.prefix.map(|v| v.to_vec());
		clone.start_at = args.start_at.map(|v| v.to_vec());
		Ok(RawIter::<Block> {
			args: clone,
			complete: false,
			_phantom: Default::default(),
		})
	}
	fn register_overlay_stats(&self, stats: &sp_state_machine::StateMachineStats) {
		self.db.read().register_overlay_stats(stats)
	}
	fn usage_info(&self) -> sp_state_machine::UsageInfo {
		self.db.read().usage_info()
	}
}
impl<B: BlockT> sp_state_machine::backend::AsTrieBackend<HashingFor<B>> for ForkedLazyBackend<B> {
	type TrieBackendStorage = PrefixedMemoryDB<HashingFor<B>>;
	fn as_trie_backend(
		&self,
	) -> &sp_state_machine::TrieBackend<Self::TrieBackendStorage, HashingFor<B>> {
		unimplemented!("`as_trie_backend` is not supported in lazy loading mode.")
	}
}
/// Lazy loading (In-memory) backend. Keeps all states and blocks in memory.
pub struct Backend<Block: BlockT> {
	pub(crate) rpc_client: Arc<RPC>,
	states: ReadWriteLock<HashMap<Block::Hash, ForkedLazyBackend<Block>>>,
	pub(crate) blockchain: Blockchain<Block>,
	import_lock: parking_lot::RwLock<()>,
	pinned_blocks: ReadWriteLock<HashMap<Block::Hash, i64>>,
	pub(crate) fork_checkpoint: Block::Header,
}
impl<Block: BlockT + DeserializeOwned> Backend<Block> {
	fn new(rpc_client: Arc<RPC>, fork_checkpoint: Block::Header) -> Self {
		Backend {
			rpc_client: rpc_client.clone(),
			states: Default::default(),
			blockchain: Blockchain::new(rpc_client),
			import_lock: Default::default(),
			pinned_blocks: Default::default(),
			fork_checkpoint,
		}
	}
}
impl<Block: BlockT + DeserializeOwned> backend::AuxStore for Backend<Block> {
	fn insert_aux<
		'a,
		'b: 'a,
		'c: 'a,
		I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>,
		D: IntoIterator<Item = &'a &'b [u8]>,
	>(
		&self,
		_insert: I,
		_delete: D,
	) -> sp_blockchain::Result<()> {
		unimplemented!("`insert_aux` is not supported in lazy loading mode.")
	}
	fn get_aux(&self, _key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> {
		unimplemented!("`get_aux` is not supported in lazy loading mode.")
	}
}
impl<Block: BlockT + DeserializeOwned> backend::Backend<Block> for Backend<Block> {
	type BlockImportOperation = BlockImportOperation<Block>;
	type Blockchain = Blockchain<Block>;
	type State = ForkedLazyBackend<Block>;
	type OffchainStorage = InMemOffchainStorage;
	fn begin_operation(&self) -> sp_blockchain::Result<Self::BlockImportOperation> {
		let old_state = self.state_at(Default::default())?;
		Ok(BlockImportOperation {
			pending_block: None,
			old_state,
			new_state: None,
			aux: Default::default(),
			storage_updates: Default::default(),
			finalized_blocks: Default::default(),
			set_head: None,
			before_fork: false,
		})
	}
	fn begin_state_operation(
		&self,
		operation: &mut Self::BlockImportOperation,
		block: Block::Hash,
	) -> sp_blockchain::Result<()> {
		operation.old_state = self.state_at(block)?;
		Ok(())
	}
	fn commit_operation(&self, operation: Self::BlockImportOperation) -> sp_blockchain::Result<()> {
		if !operation.finalized_blocks.is_empty() {
			for (block, justification) in operation.finalized_blocks {
				self.blockchain.finalize_header(block, justification)?;
			}
		}
		if let Some(pending_block) = operation.pending_block {
			let old_state = &operation.old_state;
			let (header, body, justification) = pending_block.block.into_inner();
			let hash = header.hash();
			let new_removed_keys = old_state.removed_keys.clone();
			for (key, value) in operation.storage_updates.clone() {
				if value.is_some() {
					new_removed_keys.write().remove(&key.clone());
				} else {
					new_removed_keys.write().insert(key.clone(), ());
				}
			}
			let new_db = old_state.db.clone();
			new_db.write().insert(
				vec![(None::<ChildInfo>, operation.storage_updates)],
				StateVersion::V0,
			);
			let new_state = ForkedLazyBackend {
				rpc_client: self.rpc_client.clone(),
				block_hash: Some(hash.clone()),
				fork_block: self.fork_checkpoint.hash(),
				db: new_db,
				removed_keys: new_removed_keys,
				before_fork: operation.before_fork,
			};
			self.states.write().insert(hash, new_state);
			self.blockchain
				.insert(hash, header, justification, body, pending_block.state)?;
		}
		if !operation.aux.is_empty() {
			self.blockchain.write_aux(operation.aux);
		}
		if let Some(set_head) = operation.set_head {
			self.blockchain.set_head(set_head)?;
		}
		Ok(())
	}
	fn finalize_block(
		&self,
		hash: Block::Hash,
		justification: Option<Justification>,
	) -> sp_blockchain::Result<()> {
		self.blockchain.finalize_header(hash, justification)
	}
	fn append_justification(
		&self,
		hash: Block::Hash,
		justification: Justification,
	) -> sp_blockchain::Result<()> {
		self.blockchain.append_justification(hash, justification)
	}
	fn blockchain(&self) -> &Self::Blockchain {
		&self.blockchain
	}
	fn usage_info(&self) -> Option<UsageInfo> {
		None
	}
	fn offchain_storage(&self) -> Option<Self::OffchainStorage> {
		None
	}
	fn state_at(&self, hash: Block::Hash) -> sp_blockchain::Result<Self::State> {
		if hash == Default::default() {
			return Ok(ForkedLazyBackend::<Block> {
				rpc_client: self.rpc_client.clone(),
				block_hash: Some(hash),
				fork_block: self.fork_checkpoint.hash(),
				db: Default::default(),
				removed_keys: Default::default(),
				before_fork: true,
			});
		}
		let (backend, should_write) = self
			.states
			.read()
			.get(&hash)
			.cloned()
			.map(|state| (state, false))
			.unwrap_or_else(|| {
				let header: Block::Header = self
					.rpc_client
					.header::<Block>(Some(hash))
					.ok()
					.flatten()
					.expect("block header");
				let checkpoint = self.fork_checkpoint.clone();
				let state = if header.number().gt(checkpoint.number()) {
					let parent = self.state_at(*header.parent_hash()).ok();
					ForkedLazyBackend::<Block> {
						rpc_client: self.rpc_client.clone(),
						block_hash: Some(hash),
						fork_block: checkpoint.hash(),
						db: parent.clone().map_or(Default::default(), |p| p.db),
						removed_keys: parent.map_or(Default::default(), |p| p.removed_keys),
						before_fork: false,
					}
				} else {
					ForkedLazyBackend::<Block> {
						rpc_client: self.rpc_client.clone(),
						block_hash: Some(hash),
						fork_block: checkpoint.hash(),
						db: Default::default(),
						removed_keys: Default::default(),
						before_fork: true,
					}
				};
				(state, true)
			});
		if should_write {
			self.states.write().insert(hash, backend.clone());
		}
		Ok(backend)
	}
	fn revert(
		&self,
		_n: NumberFor<Block>,
		_revert_finalized: bool,
	) -> sp_blockchain::Result<(NumberFor<Block>, HashSet<Block::Hash>)> {
		Ok((Zero::zero(), HashSet::new()))
	}
	fn remove_leaf_block(&self, _hash: Block::Hash) -> sp_blockchain::Result<()> {
		Ok(())
	}
	fn get_import_lock(&self) -> &parking_lot::RwLock<()> {
		&self.import_lock
	}
	fn requires_full_sync(&self) -> bool {
		false
	}
	fn pin_block(&self, hash: <Block as BlockT>::Hash) -> blockchain::Result<()> {
		let mut blocks = self.pinned_blocks.write();
		*blocks.entry(hash).or_default() += 1;
		Ok(())
	}
	fn unpin_block(&self, hash: <Block as BlockT>::Hash) {
		let mut blocks = self.pinned_blocks.write();
		blocks
			.entry(hash)
			.and_modify(|counter| *counter -= 1)
			.or_insert(-1);
	}
}
impl<Block: BlockT + DeserializeOwned> backend::LocalBackend<Block> for Backend<Block> {}
/// Check that genesis storage is valid.
pub fn check_genesis_storage(storage: &Storage) -> sp_blockchain::Result<()> {
	if storage
		.top
		.iter()
		.any(|(k, _)| well_known_keys::is_child_storage_key(k))
	{
		return Err(sp_blockchain::Error::InvalidState);
	}
	if storage
		.children_default
		.keys()
		.any(|child_key| !well_known_keys::is_child_storage_key(child_key))
	{
		return Err(sp_blockchain::Error::InvalidState);
	}
	Ok(())
}
#[derive(Debug, Clone)]
pub struct RPC {
	http_client: HttpClient,
	delay_between_requests_ms: u64,
	max_retries_per_request: usize,
	counter: Arc<ReadWriteLock<u64>>,
}
impl RPC {
	pub fn new(
		http_client: HttpClient,
		delay_between_requests_ms: u64,
		max_retries_per_request: usize,
	) -> Self {
		Self {
			http_client,
			delay_between_requests_ms,
			max_retries_per_request,
			counter: Default::default(),
		}
	}
	pub fn system_chain(&self) -> Result<String, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::SystemApi::<H256, BlockNumber>::system_chain(&self.http_client)
		};
		self.block_on(request)
	}
	pub fn system_properties(
		&self,
	) -> Result<sc_chain_spec::Properties, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::SystemApi::<H256, BlockNumber>::system_properties(
				&self.http_client,
			)
		};
		self.block_on(request)
	}
	pub fn system_name(&self) -> Result<String, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::SystemApi::<H256, BlockNumber>::system_name(&self.http_client)
		};
		self.block_on(request)
	}
	pub fn block<Block, Hash: Clone>(
		&self,
		hash: Option<Hash>,
	) -> Result<Option<SignedBlock<Block>>, jsonrpsee::core::ClientError>
	where
		Block: BlockT + DeserializeOwned,
		Hash: 'static + Send + Sync + sp_runtime::Serialize + DeserializeOwned,
	{
		let request = &|| {
			substrate_rpc_client::ChainApi::<
				BlockNumber,
				Hash,
				Block::Header,
				SignedBlock<Block>,
			>::block(&self.http_client, hash.clone())
		};
		self.block_on(request)
	}
	pub fn block_hash<Block: BlockT + DeserializeOwned>(
		&self,
		block_number: Option<BlockNumber>,
	) -> Result<Option<Block::Hash>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::ChainApi::<
				BlockNumber,
				Block::Hash,
				Block::Header,
				SignedBlock<Block>,
			>::block_hash(
				&self.http_client,
				block_number.map(|n| ListOrValue::Value(NumberOrHex::Number(n.into()))),
			)
		};
		self.block_on(request).map(|ok| match ok {
			ListOrValue::List(v) => v.get(0).map_or(None, |some| *some),
			ListOrValue::Value(v) => v,
		})
	}
	pub fn header<Block: BlockT + DeserializeOwned>(
		&self,
		hash: Option<Block::Hash>,
	) -> Result<Option<Block::Header>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::ChainApi::<
				BlockNumber,
				Block::Hash,
				Block::Header,
				SignedBlock<Block>,
			>::header(&self.http_client, hash)
		};
		self.block_on(request)
	}
	pub fn storage_hash<
		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize,
	>(
		&self,
		key: StorageKey,
		at: Option<Hash>,
	) -> Result<Option<Hash>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::StateApi::<Hash>::storage_hash(
				&self.http_client,
				key.clone(),
				at.clone(),
			)
		};
		self.block_on(request)
	}
	pub fn storage<
		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize + core::fmt::Debug,
	>(
		&self,
		key: StorageKey,
		at: Option<Hash>,
	) -> Result<Option<StorageData>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::StateApi::<Hash>::storage(
				&self.http_client,
				key.clone(),
				at.clone(),
			)
		};
		self.block_on(request)
	}
	pub fn storage_keys_paged<
		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize,
	>(
		&self,
		key: Option<StorageKey>,
		count: u32,
		start_key: Option<StorageKey>,
		at: Option<Hash>,
	) -> Result<Vec<sp_state_machine::StorageKey>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::StateApi::<Hash>::storage_keys_paged(
				&self.http_client,
				key.clone(),
				count.clone(),
				start_key.clone(),
				at.clone(),
			)
		};
		let result = self.block_on(request);
		match result {
			Ok(result) => Ok(result.iter().map(|item| item.0.clone()).collect()),
			Err(err) => Err(err),
		}
	}
	pub fn query_storage_at<
		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize,
	>(
		&self,
		keys: Vec<StorageKey>,
		from_block: Option<Hash>,
	) -> Result<Vec<(StorageKey, Option<StorageData>)>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::StateApi::<Hash>::query_storage_at(
				&self.http_client,
				keys.clone(),
				from_block.clone(),
			)
		};
		let result = self.block_on(request);
		match result {
			Ok(result) => Ok(result
				.iter()
				.flat_map(|item| item.changes.clone())
				.collect()),
			Err(err) => Err(err),
		}
	}
	fn block_on<F, T, E>(&self, f: &dyn Fn() -> F) -> Result<T, E>
	where
		F: Future<Output = Result<T, E>>,
	{
		use tokio::runtime::Handle;
		tokio::task::block_in_place(move || {
			Handle::current().block_on(async move {
				let delay_between_requests = Duration::from_millis(self.delay_between_requests_ms);
				let start = std::time::Instant::now();
				self.counter.write().add_assign(1);
				log::debug!(
					target: super::LAZY_LOADING_LOG_TARGET,
					"Sending request: {}",
					self.counter.read()
				);
				// Explicit request delay, to avoid getting 429 errors
				let _ = tokio::time::sleep(delay_between_requests).await;
				// Retry request in case of failure
				// The maximum number of retries is specified by `self.max_retries_per_request`
				let retry_strategy =
					FixedInterval::new(delay_between_requests).take(self.max_retries_per_request);
				let result = Retry::spawn(retry_strategy, f).await;
				log::debug!(
					target: super::LAZY_LOADING_LOG_TARGET,
					"Completed request (id: {}, successful: {}, elapsed_time: {:?})",
					self.counter.read(),
					result.is_ok(),
					start.elapsed()
				);
				result
			})
		})
	}
}
/// Create an instance of a lazy loading memory backend.
pub fn new_lazy_loading_backend<Block>(
	config: &mut Configuration,
	lazy_loading_config: &LazyLoadingConfig,
) -> Result<Arc<Backend<Block>>, Error>
where
	Block: BlockT + DeserializeOwned,
	Block::Hash: From<H256>,
{
	let uri: String = lazy_loading_config.state_rpc.clone().into();
	let http_client = jsonrpsee::http_client::HttpClientBuilder::default()
		.max_request_size(u32::MAX)
		.max_response_size(u32::MAX)
		.request_timeout(Duration::from_secs(10))
		.build(uri)
		.map_err(|e| {
			sp_blockchain::Error::Backend(
				format!("failed to build http client: {:?}", e).to_string(),
			)
		})?;
	let rpc = RPC::new(http_client, 100, 10);
	let block_hash = lazy_loading_config
		.from_block
		.map(|block| Into::<Block::Hash>::into(block));
	let checkpoint: Block = rpc
		.block::<Block, _>(block_hash)
		.ok()
		.flatten()
		.expect("Fetching fork checkpoint")
		.block;
	let backend = Arc::new(Backend::new(Arc::new(rpc), checkpoint.header().clone()));
	let chain_name = backend
		.rpc_client
		.system_chain()
		.expect("Should fetch chain id");
	let chain_properties = backend
		.rpc_client
		.system_properties()
		.expect("Should fetch chain properties");
	let spec_builder = chain_spec::test_spec::lazy_loading_spec_builder(Default::default())
		.with_name(chain_name.as_str())
		.with_properties(chain_properties);
	config.chain_spec = Box::new(spec_builder.build());
	let base_overrides =
		state_overrides::base_state_overrides(lazy_loading_config.runtime_override.clone());
	let custom_overrides = if let Some(path) = lazy_loading_config.state_overrides_path.clone() {
		state_overrides::read(path)?
	} else {
		Default::default()
	};
	let state_overrides: Vec<(Vec<u8>, Vec<u8>)> = [base_overrides, custom_overrides]
		.concat()
		.iter()
		.map(|entry| match entry {
			StateEntry::Concrete(v) => {
				let key = [
					&twox_128(v.pallet.as_bytes()),
					&twox_128(v.storage.as_bytes()),
					v.key.clone().unwrap_or(Vec::new()).as_slice(),
				]
				.concat();
				(key, v.value.clone())
			}
			StateEntry::Raw(raw) => (raw.key.clone(), raw.value.clone()),
		})
		.collect();
	let _ = helpers::produce_genesis_block(backend.clone());
	// Produce first block after the fork
	let _ = helpers::produce_first_block(backend.clone(), checkpoint, state_overrides)?;
	Ok(backend)
}