1
// Copyright 2024 Moonbeam foundation
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use sp_blockchain::{CachedHeaderMetadata, HeaderMetadata};
18
use sp_core::storage::well_known_keys;
19
use sp_runtime::{
20
	generic::BlockId,
21
	traits::{Block as BlockT, HashingFor, Header as HeaderT, NumberFor, Zero},
22
	Justification, Justifications, StateVersion, Storage,
23
};
24
use sp_state_machine::{
25
	BackendTransaction, ChildStorageCollection, IndexOperation, StorageCollection, TrieBackend,
26
};
27
use std::future::Future;
28
use std::marker::PhantomData;
29
use std::ops::AddAssign;
30
use std::time::Duration;
31
use std::{
32
	collections::{HashMap, HashSet},
33
	ptr,
34
	sync::Arc,
35
};
36

            
37
use sc_client_api::{
38
	backend::{self, NewBlockState},
39
	blockchain::{self, BlockStatus, HeaderBackend},
40
	leaves::LeafSet,
41
	UsageInfo,
42
};
43

            
44
use jsonrpsee::http_client::HttpClient;
45
use sp_runtime::generic::SignedBlock;
46

            
47
use crate::chain_spec;
48
use crate::lazy_loading::lock::ReadWriteLock;
49
use crate::lazy_loading::state_overrides::StateEntry;
50
use crate::lazy_loading::{helpers, state_overrides};
51
use moonbeam_cli_opt::LazyLoadingConfig;
52
use moonbeam_core_primitives::BlockNumber;
53
use sc_client_api::StorageKey;
54
use sc_service::{Configuration, Error};
55
use serde::de::DeserializeOwned;
56
use sp_core::offchain::storage::InMemOffchainStorage;
57
use sp_core::{twox_128, H256};
58
use sp_rpc::list::ListOrValue;
59
use sp_rpc::number::NumberOrHex;
60
use sp_storage::{ChildInfo, StorageData};
61
use sp_trie::PrefixedMemoryDB;
62
use tokio_retry::strategy::FixedInterval;
63
use tokio_retry::Retry;
64

            
65
struct PendingBlock<B: BlockT> {
66
	block: StoredBlock<B>,
67
	state: NewBlockState,
68
}
69

            
70
#[derive(PartialEq, Eq, Clone)]
71
enum StoredBlock<B: BlockT> {
72
	Header(B::Header, Option<Justifications>),
73
	Full(B, Option<Justifications>),
74
}
75

            
76
impl<B: BlockT> StoredBlock<B> {
77
	fn new(
78
		header: B::Header,
79
		body: Option<Vec<B::Extrinsic>>,
80
		just: Option<Justifications>,
81
	) -> Self {
82
		match body {
83
			Some(body) => StoredBlock::Full(B::new(header, body), just),
84
			None => StoredBlock::Header(header, just),
85
		}
86
	}
87

            
88
	fn header(&self) -> &B::Header {
89
		match *self {
90
			StoredBlock::Header(ref h, _) => h,
91
			StoredBlock::Full(ref b, _) => b.header(),
92
		}
93
	}
94

            
95
	fn justifications(&self) -> Option<&Justifications> {
96
		match *self {
97
			StoredBlock::Header(_, ref j) | StoredBlock::Full(_, ref j) => j.as_ref(),
98
		}
99
	}
100

            
101
	fn extrinsics(&self) -> Option<&[B::Extrinsic]> {
102
		match *self {
103
			StoredBlock::Header(_, _) => None,
104
			StoredBlock::Full(ref b, _) => Some(b.extrinsics()),
105
		}
106
	}
107

            
108
	fn into_inner(self) -> (B::Header, Option<Vec<B::Extrinsic>>, Option<Justifications>) {
109
		match self {
110
			StoredBlock::Header(header, just) => (header, None, just),
111
			StoredBlock::Full(block, just) => {
112
				let (header, body) = block.deconstruct();
113
				(header, Some(body), just)
114
			}
115
		}
116
	}
117
}
118

            
119
#[derive(Clone)]
120
struct BlockchainStorage<Block: BlockT> {
121
	blocks: HashMap<Block::Hash, StoredBlock<Block>>,
122
	hashes: HashMap<NumberFor<Block>, Block::Hash>,
123
	best_hash: Block::Hash,
124
	best_number: NumberFor<Block>,
125
	finalized_hash: Block::Hash,
126
	finalized_number: NumberFor<Block>,
127
	genesis_hash: Block::Hash,
128
	header_cht_roots: HashMap<NumberFor<Block>, Block::Hash>,
129
	leaves: LeafSet<Block::Hash, NumberFor<Block>>,
130
	aux: HashMap<Vec<u8>, Vec<u8>>,
131
}
132

            
133
/// In-memory blockchain. Supports concurrent reads.
134
#[derive(Clone)]
135
pub struct Blockchain<Block: BlockT> {
136
	rpc_client: Arc<RPC>,
137
	storage: Arc<ReadWriteLock<BlockchainStorage<Block>>>,
138
}
139

            
140
impl<Block: BlockT + DeserializeOwned> Blockchain<Block> {
141
	/// Get header hash of given block.
142
	pub fn id(&self, id: BlockId<Block>) -> Option<Block::Hash> {
143
		match id {
144
			BlockId::Hash(h) => Some(h),
145
			BlockId::Number(n) => {
146
				let block_hash = self.storage.read().hashes.get(&n).cloned();
147
				match block_hash {
148
					None => {
149
						let block_hash =
150
							self.rpc_client.block_hash::<Block>(Some(n)).ok().flatten();
151

            
152
						block_hash.clone().map(|h| {
153
							self.storage.write().hashes.insert(n, h);
154
						});
155

            
156
						block_hash
157
					}
158
					block_hash => block_hash,
159
				}
160
			}
161
		}
162
	}
163

            
164
	/// Create new in-memory blockchain storage.
165
	fn new(rpc_client: Arc<RPC>) -> Blockchain<Block> {
166
		let storage = Arc::new(ReadWriteLock::new(BlockchainStorage {
167
			blocks: HashMap::new(),
168
			hashes: HashMap::new(),
169
			best_hash: Default::default(),
170
			best_number: Zero::zero(),
171
			finalized_hash: Default::default(),
172
			finalized_number: Zero::zero(),
173
			genesis_hash: Default::default(),
174
			header_cht_roots: HashMap::new(),
175
			leaves: LeafSet::new(),
176
			aux: HashMap::new(),
177
		}));
178
		Blockchain {
179
			rpc_client,
180
			storage,
181
		}
182
	}
183

            
184
	/// Insert a block header and associated data.
185
	pub fn insert(
186
		&self,
187
		hash: Block::Hash,
188
		header: <Block as BlockT>::Header,
189
		justifications: Option<Justifications>,
190
		body: Option<Vec<<Block as BlockT>::Extrinsic>>,
191
		new_state: NewBlockState,
192
	) -> sp_blockchain::Result<()> {
193
		let number = *header.number();
194
		if new_state.is_best() {
195
			self.apply_head(&header)?;
196
		}
197

            
198
		{
199
			let mut storage = self.storage.write();
200
			storage.leaves.import(hash, number, *header.parent_hash());
201
			storage
202
				.blocks
203
				.insert(hash, StoredBlock::new(header, body, justifications));
204

            
205
			if let NewBlockState::Final = new_state {
206
				storage.finalized_hash = hash;
207
				storage.finalized_number = number;
208
			}
209

            
210
			if number == Zero::zero() {
211
				storage.genesis_hash = hash;
212
			}
213
		}
214

            
215
		Ok(())
216
	}
217

            
218
	/// Get total number of blocks.
219
	pub fn blocks_count(&self) -> usize {
220
		let count = self.storage.read().blocks.len();
221

            
222
		log::debug!(
223
			target: super::LAZY_LOADING_LOG_TARGET,
224
			"Total number of blocks: {:?}",
225
			count
226
		);
227

            
228
		count
229
	}
230

            
231
	/// Compare this blockchain with another in-mem blockchain
232
	pub fn equals_to(&self, other: &Self) -> bool {
233
		// Check ptr equality first to avoid double read locks.
234
		if ptr::eq(self, other) {
235
			return true;
236
		}
237
		self.canon_equals_to(other) && self.storage.read().blocks == other.storage.read().blocks
238
	}
239

            
240
	/// Compare canonical chain to other canonical chain.
241
	pub fn canon_equals_to(&self, other: &Self) -> bool {
242
		// Check ptr equality first to avoid double read locks.
243
		if ptr::eq(self, other) {
244
			return true;
245
		}
246
		let this = self.storage.read();
247
		let other = other.storage.read();
248
		this.hashes == other.hashes
249
			&& this.best_hash == other.best_hash
250
			&& this.best_number == other.best_number
251
			&& this.genesis_hash == other.genesis_hash
252
	}
253

            
254
	/// Insert header CHT root.
255
	pub fn insert_cht_root(&self, block: NumberFor<Block>, cht_root: Block::Hash) {
256
		self.storage
257
			.write()
258
			.header_cht_roots
259
			.insert(block, cht_root);
260
	}
261

            
262
	/// Set an existing block as head.
263
	pub fn set_head(&self, hash: Block::Hash) -> sp_blockchain::Result<()> {
264
		let header = self
265
			.header(hash)?
266
			.ok_or_else(|| sp_blockchain::Error::UnknownBlock(format!("{}", hash)))?;
267

            
268
		self.apply_head(&header)
269
	}
270

            
271
	fn apply_head(&self, header: &<Block as BlockT>::Header) -> sp_blockchain::Result<()> {
272
		let mut storage = self.storage.write();
273

            
274
		let hash = header.hash();
275
		let number = header.number();
276

            
277
		storage.best_hash = hash;
278
		storage.best_number = *number;
279
		storage.hashes.insert(*number, hash);
280

            
281
		Ok(())
282
	}
283

            
284
	fn finalize_header(
285
		&self,
286
		block: Block::Hash,
287
		justification: Option<Justification>,
288
	) -> sp_blockchain::Result<()> {
289
		let mut storage = self.storage.write();
290
		storage.finalized_hash = block;
291

            
292
		if justification.is_some() {
293
			let block = storage
294
				.blocks
295
				.get_mut(&block)
296
				.expect("hash was fetched from a block in the db; qed");
297

            
298
			let block_justifications = match block {
299
				StoredBlock::Header(_, ref mut j) | StoredBlock::Full(_, ref mut j) => j,
300
			};
301

            
302
			*block_justifications = justification.map(Justifications::from);
303
		}
304

            
305
		Ok(())
306
	}
307

            
308
	fn append_justification(
309
		&self,
310
		hash: Block::Hash,
311
		justification: Justification,
312
	) -> sp_blockchain::Result<()> {
313
		let mut storage = self.storage.write();
314

            
315
		let block = storage
316
			.blocks
317
			.get_mut(&hash)
318
			.expect("hash was fetched from a block in the db; qed");
319

            
320
		let block_justifications = match block {
321
			StoredBlock::Header(_, ref mut j) | StoredBlock::Full(_, ref mut j) => j,
322
		};
323

            
324
		if let Some(stored_justifications) = block_justifications {
325
			if !stored_justifications.append(justification) {
326
				return Err(sp_blockchain::Error::BadJustification(
327
					"Duplicate consensus engine ID".into(),
328
				));
329
			}
330
		} else {
331
			*block_justifications = Some(Justifications::from(justification));
332
		};
333

            
334
		Ok(())
335
	}
336

            
337
	fn write_aux(&self, ops: Vec<(Vec<u8>, Option<Vec<u8>>)>) {
338
		let mut storage = self.storage.write();
339
		for (k, v) in ops {
340
			match v {
341
				Some(v) => storage.aux.insert(k, v),
342
				None => storage.aux.remove(&k),
343
			};
344
		}
345
	}
346
}
347

            
348
impl<Block: BlockT + DeserializeOwned> HeaderBackend<Block> for Blockchain<Block> {
349
	fn header(
350
		&self,
351
		hash: Block::Hash,
352
	) -> sp_blockchain::Result<Option<<Block as BlockT>::Header>> {
353
		// First, try to get the header from local storage
354
		if let Some(header) = self
355
			.storage
356
			.read()
357
			.blocks
358
			.get(&hash)
359
			.map(|b| b.header().clone())
360
		{
361
			return Ok(Some(header));
362
		}
363

            
364
		// If not found in local storage, fetch from RPC client
365
		let header = self
366
			.rpc_client
367
			.block::<Block, _>(Some(hash))
368
			.ok()
369
			.flatten()
370
			.map(|full_block| {
371
				// Cache block header
372
				let block = full_block.block.clone();
373
				self.storage.write().blocks.insert(
374
					hash,
375
					StoredBlock::Full(block.clone(), full_block.justifications),
376
				);
377

            
378
				block.header().clone()
379
			});
380

            
381
		if header.is_none() {
382
			log::warn!(
383
				target: super::LAZY_LOADING_LOG_TARGET,
384
				"Expected block {:x?} to exist.",
385
				&hash
386
			);
387
		}
388

            
389
		Ok(header)
390
	}
391

            
392
	fn info(&self) -> blockchain::Info<Block> {
393
		let storage = self.storage.read();
394
		blockchain::Info {
395
			best_hash: storage.best_hash,
396
			best_number: storage.best_number,
397
			genesis_hash: storage.genesis_hash,
398
			finalized_hash: storage.finalized_hash,
399
			finalized_number: storage.finalized_number,
400
			finalized_state: Some((storage.finalized_hash, storage.finalized_number)),
401
			number_leaves: storage.leaves.count(),
402
			block_gap: None,
403
		}
404
	}
405

            
406
	fn status(&self, hash: Block::Hash) -> sp_blockchain::Result<BlockStatus> {
407
		match self.storage.read().blocks.contains_key(&hash) {
408
			true => Ok(BlockStatus::InChain),
409
			false => Ok(BlockStatus::Unknown),
410
		}
411
	}
412

            
413
	fn number(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<NumberFor<Block>>> {
414
		let number = match self.storage.read().blocks.get(&hash) {
415
			Some(block) => *block.header().number(),
416
			_ => match self.rpc_client.block::<Block, _>(Some(hash)) {
417
				Ok(Some(block)) => *block.block.header().number(),
418
				err => {
419
					return Err(sp_blockchain::Error::UnknownBlock(
420
						format!("Failed to fetch block number from RPC: {:?}", err).into(),
421
					));
422
				}
423
			},
424
		};
425

            
426
		Ok(Some(number))
427
	}
428

            
429
	fn hash(
430
		&self,
431
		number: <<Block as BlockT>::Header as HeaderT>::Number,
432
	) -> sp_blockchain::Result<Option<Block::Hash>> {
433
		Ok(self.id(BlockId::Number(number)))
434
	}
435
}
436

            
437
impl<Block: BlockT + DeserializeOwned> HeaderMetadata<Block> for Blockchain<Block> {
438
	type Error = sp_blockchain::Error;
439

            
440
	fn header_metadata(
441
		&self,
442
		hash: Block::Hash,
443
	) -> Result<CachedHeaderMetadata<Block>, Self::Error> {
444
		self.header(hash)?
445
			.map(|header| CachedHeaderMetadata::from(&header))
446
			.ok_or_else(|| {
447
				sp_blockchain::Error::UnknownBlock(format!("header not found: {}", hash))
448
			})
449
	}
450

            
451
	fn insert_header_metadata(&self, _hash: Block::Hash, _metadata: CachedHeaderMetadata<Block>) {
452
		// No need to implement.
453
		unimplemented!("insert_header_metadata")
454
	}
455
	fn remove_header_metadata(&self, _hash: Block::Hash) {
456
		// No need to implement.
457
		unimplemented!("remove_header_metadata")
458
	}
459
}
460

            
461
impl<Block: BlockT + DeserializeOwned> blockchain::Backend<Block> for Blockchain<Block> {
462
	fn body(
463
		&self,
464
		hash: Block::Hash,
465
	) -> sp_blockchain::Result<Option<Vec<<Block as BlockT>::Extrinsic>>> {
466
		// First, try to get the header from local storage
467
		if let Some(extrinsics) = self
468
			.storage
469
			.read()
470
			.blocks
471
			.get(&hash)
472
			.and_then(|b| b.extrinsics().map(|x| x.to_vec()))
473
		{
474
			return Ok(Some(extrinsics));
475
		}
476
		let extrinsics = self
477
			.rpc_client
478
			.block::<Block, Block::Hash>(Some(hash))
479
			.ok()
480
			.flatten()
481
			.map(|b| b.block.extrinsics().to_vec());
482

            
483
		Ok(extrinsics)
484
	}
485

            
486
	fn justifications(&self, hash: Block::Hash) -> sp_blockchain::Result<Option<Justifications>> {
487
		Ok(self
488
			.storage
489
			.read()
490
			.blocks
491
			.get(&hash)
492
			.and_then(|b| b.justifications().cloned()))
493
	}
494

            
495
	fn last_finalized(&self) -> sp_blockchain::Result<Block::Hash> {
496
		let last_finalized = self.storage.read().finalized_hash;
497

            
498
		Ok(last_finalized)
499
	}
500

            
501
	fn leaves(&self) -> sp_blockchain::Result<Vec<Block::Hash>> {
502
		Ok(self.storage.read().leaves.hashes())
503
	}
504

            
505
	fn children(&self, _parent_hash: Block::Hash) -> sp_blockchain::Result<Vec<Block::Hash>> {
506
		unimplemented!("Not supported by the `lazy-loading` backend.")
507
	}
508

            
509
	fn indexed_transaction(&self, _hash: Block::Hash) -> sp_blockchain::Result<Option<Vec<u8>>> {
510
		unimplemented!("Not supported by the `lazy-loading` backend.")
511
	}
512

            
513
	fn block_indexed_body(
514
		&self,
515
		_hash: Block::Hash,
516
	) -> sp_blockchain::Result<Option<Vec<Vec<u8>>>> {
517
		unimplemented!("Not supported by the `lazy-loading` backend.")
518
	}
519
}
520

            
521
impl<Block: BlockT + DeserializeOwned> backend::AuxStore for Blockchain<Block> {
522
	fn insert_aux<
523
		'a,
524
		'b: 'a,
525
		'c: 'a,
526
		I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>,
527
		D: IntoIterator<Item = &'a &'b [u8]>,
528
	>(
529
		&self,
530
		insert: I,
531
		delete: D,
532
	) -> sp_blockchain::Result<()> {
533
		let mut storage = self.storage.write();
534
		for (k, v) in insert {
535
			storage.aux.insert(k.to_vec(), v.to_vec());
536
		}
537
		for k in delete {
538
			storage.aux.remove(*k);
539
		}
540
		Ok(())
541
	}
542

            
543
	fn get_aux(&self, key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> {
544
		Ok(self.storage.read().aux.get(key).cloned())
545
	}
546
}
547

            
548
pub struct BlockImportOperation<Block: BlockT> {
549
	pending_block: Option<PendingBlock<Block>>,
550
	old_state: ForkedLazyBackend<Block>,
551
	new_state: Option<BackendTransaction<HashingFor<Block>>>,
552
	aux: Vec<(Vec<u8>, Option<Vec<u8>>)>,
553
	storage_updates: StorageCollection,
554
	finalized_blocks: Vec<(Block::Hash, Option<Justification>)>,
555
	set_head: Option<Block::Hash>,
556
	pub(crate) before_fork: bool,
557
}
558

            
559
impl<Block: BlockT + DeserializeOwned> BlockImportOperation<Block> {
560
	fn apply_storage(
561
		&mut self,
562
		storage: Storage,
563
		commit: bool,
564
		state_version: StateVersion,
565
	) -> sp_blockchain::Result<Block::Hash> {
566
		use sp_state_machine::Backend;
567
		check_genesis_storage(&storage)?;
568

            
569
		let child_delta = storage.children_default.values().map(|child_content| {
570
			(
571
				&child_content.child_info,
572
				child_content
573
					.data
574
					.iter()
575
					.map(|(k, v)| (k.as_ref(), Some(v.as_ref()))),
576
			)
577
		});
578

            
579
		let (root, transaction) = self.old_state.full_storage_root(
580
			storage
581
				.top
582
				.iter()
583
				.map(|(k, v)| (k.as_ref(), Some(v.as_ref()))),
584
			child_delta,
585
			state_version,
586
		);
587

            
588
		if commit {
589
			self.new_state = Some(transaction);
590
			self.storage_updates = storage
591
				.top
592
				.iter()
593
				.map(|(k, v)| {
594
					if v.is_empty() {
595
						(k.clone(), None)
596
					} else {
597
						(k.clone(), Some(v.clone()))
598
					}
599
				})
600
				.collect();
601
		}
602
		Ok(root)
603
	}
604
}
605

            
606
impl<Block: BlockT + DeserializeOwned> backend::BlockImportOperation<Block>
607
	for BlockImportOperation<Block>
608
{
609
	type State = ForkedLazyBackend<Block>;
610

            
611
	fn state(&self) -> sp_blockchain::Result<Option<&Self::State>> {
612
		Ok(Some(&self.old_state))
613
	}
614

            
615
	fn set_block_data(
616
		&mut self,
617
		header: <Block as BlockT>::Header,
618
		body: Option<Vec<<Block as BlockT>::Extrinsic>>,
619
		_indexed_body: Option<Vec<Vec<u8>>>,
620
		justifications: Option<Justifications>,
621
		state: NewBlockState,
622
	) -> sp_blockchain::Result<()> {
623
		assert!(
624
			self.pending_block.is_none(),
625
			"Only one block per operation is allowed"
626
		);
627
		self.pending_block = Some(PendingBlock {
628
			block: StoredBlock::new(header, body, justifications),
629
			state,
630
		});
631
		Ok(())
632
	}
633

            
634
	fn update_db_storage(
635
		&mut self,
636
		update: BackendTransaction<HashingFor<Block>>,
637
	) -> sp_blockchain::Result<()> {
638
		self.new_state = Some(update);
639
		Ok(())
640
	}
641

            
642
	fn set_genesis_state(
643
		&mut self,
644
		storage: Storage,
645
		commit: bool,
646
		state_version: StateVersion,
647
	) -> sp_blockchain::Result<Block::Hash> {
648
		self.apply_storage(storage, commit, state_version)
649
	}
650

            
651
	fn reset_storage(
652
		&mut self,
653
		storage: Storage,
654
		state_version: StateVersion,
655
	) -> sp_blockchain::Result<Block::Hash> {
656
		self.apply_storage(storage, true, state_version)
657
	}
658

            
659
	fn insert_aux<I>(&mut self, ops: I) -> sp_blockchain::Result<()>
660
	where
661
		I: IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
662
	{
663
		self.aux.append(&mut ops.into_iter().collect());
664
		Ok(())
665
	}
666

            
667
	fn update_storage(
668
		&mut self,
669
		update: StorageCollection,
670
		_child_update: ChildStorageCollection,
671
	) -> sp_blockchain::Result<()> {
672
		self.storage_updates = update.clone();
673
		Ok(())
674
	}
675

            
676
	fn mark_finalized(
677
		&mut self,
678
		hash: Block::Hash,
679
		justification: Option<Justification>,
680
	) -> sp_blockchain::Result<()> {
681
		self.finalized_blocks.push((hash, justification));
682
		Ok(())
683
	}
684

            
685
	fn mark_head(&mut self, hash: Block::Hash) -> sp_blockchain::Result<()> {
686
		assert!(
687
			self.pending_block.is_none(),
688
			"Only one set block per operation is allowed"
689
		);
690
		self.set_head = Some(hash);
691
		Ok(())
692
	}
693

            
694
	fn update_transaction_index(
695
		&mut self,
696
		_index: Vec<IndexOperation>,
697
	) -> sp_blockchain::Result<()> {
698
		Ok(())
699
	}
700
}
701

            
702
/// DB-backed patricia trie state, transaction type is an overlay of changes to commit.
703
pub type DbState<B> = TrieBackend<Arc<dyn sp_state_machine::Storage<HashingFor<B>>>, HashingFor<B>>;
704

            
705
/// A struct containing arguments for iterating over the storage.
706
#[derive(Default)]
707
pub struct RawIterArgs {
708
	/// The prefix of the keys over which to iterate.
709
	pub prefix: Option<Vec<u8>>,
710

            
711
	/// The prefix from which to start the iteration from.
712
	///
713
	/// This is inclusive and the iteration will include the key which is specified here.
714
	pub start_at: Option<Vec<u8>>,
715

            
716
	/// If this is `true` then the iteration will *not* include
717
	/// the key specified in `start_at`, if there is such a key.
718
	pub start_at_exclusive: bool,
719
}
720

            
721
/// A raw iterator over the `BenchmarkingState`.
722
pub struct RawIter<Block: BlockT> {
723
	pub(crate) args: RawIterArgs,
724
	complete: bool,
725
	_phantom: PhantomData<Block>,
726
}
727

            
728
impl<Block: BlockT + DeserializeOwned> sp_state_machine::StorageIterator<HashingFor<Block>>
729
	for RawIter<Block>
730
{
731
	type Backend = ForkedLazyBackend<Block>;
732
	type Error = String;
733

            
734
	fn next_key(
735
		&mut self,
736
		backend: &Self::Backend,
737
	) -> Option<Result<sp_state_machine::StorageKey, Self::Error>> {
738
		use sp_state_machine::Backend;
739

            
740
		let remote_fetch =
741
			|key: Option<StorageKey>, start_key: Option<StorageKey>, block: Option<Block::Hash>| {
742
				let result = backend
743
					.rpc_client
744
					.storage_keys_paged(key, 5, start_key, block);
745

            
746
				match result {
747
					Ok(keys) => keys.first().map(|key| key.clone()),
748
					Err(err) => {
749
						log::trace!(
750
							target: super::LAZY_LOADING_LOG_TARGET,
751
							"Failed to fetch `next key` from RPC: {:?}",
752
							err
753
						);
754

            
755
						None
756
					}
757
				}
758
			};
759

            
760
		let prefix = self.args.prefix.clone().map(|k| StorageKey(k));
761
		let start_key = self.args.start_at.clone().map(|k| StorageKey(k));
762

            
763
		let maybe_next_key = if backend.before_fork {
764
			remote_fetch(prefix, start_key, backend.block_hash)
765
		} else {
766
			let mut iter_args = sp_state_machine::backend::IterArgs::default();
767
			iter_args.prefix = self.args.prefix.as_ref().map(|b| b.as_slice());
768
			iter_args.start_at = self.args.start_at.as_ref().map(|b| b.as_slice());
769
			iter_args.start_at_exclusive = true;
770
			iter_args.stop_on_incomplete_database = true;
771

            
772
			let readable_db = backend.db.read();
773
			let next_storage_key = readable_db
774
				.raw_iter(iter_args)
775
				.map(|mut iter| iter.next_key(&readable_db))
776
				.map(|op| op.map(|result| result.ok()).flatten())
777
				.ok()
778
				.flatten();
779

            
780
			// IMPORTANT: free storage read lock
781
			drop(readable_db);
782

            
783
			let removed_key = start_key
784
				.clone()
785
				.or(prefix.clone())
786
				.map(|key| backend.removed_keys.read().contains_key(&key.0))
787
				.unwrap_or(false);
788
			if next_storage_key.is_none() && !removed_key {
789
				let maybe_next_key = remote_fetch(prefix, start_key, Some(backend.fork_block));
790
				match maybe_next_key {
791
					Some(key) if !backend.removed_keys.read().contains_key(&key) => Some(key),
792
					_ => None,
793
				}
794
			} else {
795
				next_storage_key
796
			}
797
		};
798

            
799
		log::trace!(
800
			target: super::LAZY_LOADING_LOG_TARGET,
801
			"next_key: (prefix: {:?}, start_at: {:?}, next_key: {:?})",
802
			self.args.prefix.clone().map(|key| hex::encode(key)),
803
			self.args.start_at.clone().map(|key| hex::encode(key)),
804
			maybe_next_key.clone().map(|key| hex::encode(key))
805
		);
806

            
807
		if let Some(next_key) = maybe_next_key {
808
			if self
809
				.args
810
				.prefix
811
				.clone()
812
				.map(|filter_key| next_key.starts_with(&filter_key))
813
				.unwrap_or(false)
814
			{
815
				self.args.start_at = Some(next_key.clone());
816
				Some(Ok(next_key))
817
			} else {
818
				self.complete = true;
819
				None
820
			}
821
		} else {
822
			self.complete = true;
823
			None
824
		}
825
	}
826

            
827
	fn next_pair(
828
		&mut self,
829
		backend: &Self::Backend,
830
	) -> Option<Result<(sp_state_machine::StorageKey, sp_state_machine::StorageValue), Self::Error>>
831
	{
832
		use sp_state_machine::Backend;
833

            
834
		let remote_fetch =
835
			|key: Option<StorageKey>, start_key: Option<StorageKey>, block: Option<Block::Hash>| {
836
				let result = backend
837
					.rpc_client
838
					.storage_keys_paged(key, 5, start_key, block);
839

            
840
				match result {
841
					Ok(keys) => keys.first().map(|key| key.clone()),
842
					Err(err) => {
843
						log::trace!(
844
							target: super::LAZY_LOADING_LOG_TARGET,
845
							"Failed to fetch `next key` from RPC: {:?}",
846
							err
847
						);
848

            
849
						None
850
					}
851
				}
852
			};
853

            
854
		let prefix = self.args.prefix.clone().map(|k| StorageKey(k));
855
		let start_key = self.args.start_at.clone().map(|k| StorageKey(k));
856

            
857
		let maybe_next_key = if backend.before_fork {
858
			remote_fetch(prefix, start_key, backend.block_hash)
859
		} else {
860
			let mut iter_args = sp_state_machine::backend::IterArgs::default();
861
			iter_args.prefix = self.args.prefix.as_ref().map(|b| b.as_slice());
862
			iter_args.start_at = self.args.start_at.as_ref().map(|b| b.as_slice());
863
			iter_args.start_at_exclusive = true;
864
			iter_args.stop_on_incomplete_database = true;
865

            
866
			let readable_db = backend.db.read();
867
			let next_storage_key = readable_db
868
				.raw_iter(iter_args)
869
				.map(|mut iter| iter.next_key(&readable_db))
870
				.map(|op| op.map(|result| result.ok()).flatten())
871
				.ok()
872
				.flatten();
873

            
874
			// IMPORTANT: free storage read lock
875
			drop(readable_db);
876

            
877
			let removed_key = start_key
878
				.clone()
879
				.or(prefix.clone())
880
				.map(|key| backend.removed_keys.read().contains_key(&key.0))
881
				.unwrap_or(false);
882
			if next_storage_key.is_none() && !removed_key {
883
				let maybe_next_key = remote_fetch(prefix, start_key, Some(backend.fork_block));
884
				match maybe_next_key {
885
					Some(key) if !backend.removed_keys.read().contains_key(&key) => Some(key),
886
					_ => None,
887
				}
888
			} else {
889
				next_storage_key
890
			}
891
		};
892

            
893
		log::trace!(
894
			target: super::LAZY_LOADING_LOG_TARGET,
895
			"next_pair: (prefix: {:?}, start_at: {:?}, next_key: {:?})",
896
			self.args.prefix.clone().map(|key| hex::encode(key)),
897
			self.args.start_at.clone().map(|key| hex::encode(key)),
898
			maybe_next_key.clone().map(|key| hex::encode(key))
899
		);
900

            
901
		let maybe_value = maybe_next_key
902
			.clone()
903
			.map(|key| (*backend).storage(key.as_slice()).ok())
904
			.flatten()
905
			.flatten();
906

            
907
		if let Some(next_key) = maybe_next_key {
908
			if self
909
				.args
910
				.prefix
911
				.clone()
912
				.map(|filter_key| next_key.starts_with(&filter_key))
913
				.unwrap_or(false)
914
			{
915
				self.args.start_at = Some(next_key.clone());
916

            
917
				match maybe_value {
918
					Some(value) => Some(Ok((next_key, value))),
919
					_ => None,
920
				}
921
			} else {
922
				self.complete = true;
923
				None
924
			}
925
		} else {
926
			self.complete = true;
927
			None
928
		}
929
	}
930

            
931
	fn was_complete(&self) -> bool {
932
		self.complete
933
	}
934
}
935

            
936
#[derive(Debug, Clone)]
937
pub struct ForkedLazyBackend<Block: BlockT> {
938
	rpc_client: Arc<RPC>,
939
	block_hash: Option<Block::Hash>,
940
	fork_block: Block::Hash,
941
	pub(crate) db: Arc<ReadWriteLock<sp_state_machine::InMemoryBackend<HashingFor<Block>>>>,
942
	pub(crate) removed_keys: Arc<ReadWriteLock<HashMap<Vec<u8>, ()>>>,
943
	before_fork: bool,
944
}
945

            
946
impl<Block: BlockT> ForkedLazyBackend<Block> {
947
	fn update_storage(&self, key: &[u8], value: &Option<Vec<u8>>) {
948
		if let Some(ref val) = value {
949
			let mut entries: HashMap<Option<ChildInfo>, StorageCollection> = Default::default();
950
			entries.insert(None, vec![(key.to_vec(), Some(val.clone()))]);
951

            
952
			self.db.write().insert(entries, StateVersion::V0);
953
		}
954
	}
955
}
956

            
957
impl<Block: BlockT + DeserializeOwned> sp_state_machine::Backend<HashingFor<Block>>
958
	for ForkedLazyBackend<Block>
959
{
960
	type Error = <DbState<Block> as sp_state_machine::Backend<HashingFor<Block>>>::Error;
961
	type TrieBackendStorage = PrefixedMemoryDB<HashingFor<Block>>;
962
	type RawIter = RawIter<Block>;
963

            
964
	fn storage(&self, key: &[u8]) -> Result<Option<sp_state_machine::StorageValue>, Self::Error> {
965
		let remote_fetch = |block: Option<Block::Hash>| {
966
			let result = self.rpc_client.storage(StorageKey(key.to_vec()), block);
967

            
968
			match result {
969
				Ok(data) => data.map(|v| v.0),
970
				Err(err) => {
971
					log::debug!(
972
						target: super::LAZY_LOADING_LOG_TARGET,
973
						"Failed to fetch storage from live network: {:?}",
974
						err
975
					);
976
					None
977
				}
978
			}
979
		};
980

            
981
		if self.before_fork {
982
			return Ok(remote_fetch(self.block_hash));
983
		}
984

            
985
		let readable_db = self.db.read();
986
		let maybe_storage = readable_db.storage(key);
987
		let value = match maybe_storage {
988
			Ok(Some(data)) => Some(data),
989
			_ if !self.removed_keys.read().contains_key(key) => {
990
				let result = remote_fetch(Some(self.fork_block));
991

            
992
				// Cache state
993
				drop(readable_db);
994
				self.update_storage(key, &result);
995

            
996
				result
997
			}
998
			_ => None,
999
		};
		Ok(value)
	}
	fn storage_hash(
		&self,
		key: &[u8],
	) -> Result<Option<<HashingFor<Block> as sp_core::Hasher>::Out>, Self::Error> {
		let remote_fetch = |block: Option<Block::Hash>| {
			let result = self
				.rpc_client
				.storage_hash(StorageKey(key.to_vec()), block);
			match result {
				Ok(hash) => Ok(hash),
				Err(err) => Err(format!("Failed to fetch storage hash from RPC: {:?}", err).into()),
			}
		};
		if self.before_fork {
			return remote_fetch(self.block_hash);
		}
		let storage_hash = self.db.read().storage_hash(key);
		match storage_hash {
			Ok(Some(hash)) => Ok(Some(hash)),
			_ if !self.removed_keys.read().contains_key(key) => remote_fetch(Some(self.fork_block)),
			_ => Ok(None),
		}
	}
	fn closest_merkle_value(
		&self,
		_key: &[u8],
	) -> Result<
		Option<sp_trie::MerkleValue<<HashingFor<Block> as sp_core::Hasher>::Out>>,
		Self::Error,
	> {
		unimplemented!("closest_merkle_value: unsupported feature for lazy loading")
	}
	fn child_closest_merkle_value(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<
		Option<sp_trie::MerkleValue<<HashingFor<Block> as sp_core::Hasher>::Out>>,
		Self::Error,
	> {
		unimplemented!("child_closest_merkle_value: unsupported feature for lazy loading")
	}
	fn child_storage(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<Option<sp_state_machine::StorageValue>, Self::Error> {
		unimplemented!("child_storage: unsupported feature for lazy loading");
	}
	fn child_storage_hash(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<Option<<HashingFor<Block> as sp_core::Hasher>::Out>, Self::Error> {
		unimplemented!("child_storage_hash: unsupported feature for lazy loading");
	}
	fn next_storage_key(
		&self,
		key: &[u8],
	) -> Result<Option<sp_state_machine::StorageKey>, Self::Error> {
		let remote_fetch = |block: Option<Block::Hash>| {
			let start_key = Some(StorageKey(key.to_vec()));
			let result = self
				.rpc_client
				.storage_keys_paged(start_key.clone(), 2, None, block);
			match result {
				Ok(keys) => keys.last().cloned(),
				Err(err) => {
					log::trace!(
						target: super::LAZY_LOADING_LOG_TARGET,
						"Failed to fetch `next storage key` from RPC: {:?}",
						err
					);
					None
				}
			}
		};
		let maybe_next_key = if self.before_fork {
			remote_fetch(self.block_hash)
		} else {
			let next_storage_key = self.db.read().next_storage_key(key);
			match next_storage_key {
				Ok(Some(key)) => Some(key),
				_ if !self.removed_keys.read().contains_key(key) => {
					remote_fetch(Some(self.fork_block))
				}
				_ => None,
			}
		};
		log::trace!(
			target: super::LAZY_LOADING_LOG_TARGET,
			"next_storage_key: (key: {:?}, next_key: {:?})",
			hex::encode(key),
			maybe_next_key.clone().map(|key| hex::encode(key))
		);
		Ok(maybe_next_key)
	}
	fn next_child_storage_key(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_key: &[u8],
	) -> Result<Option<sp_state_machine::StorageKey>, Self::Error> {
		unimplemented!("next_child_storage_key: unsupported feature for lazy loading");
	}
	fn storage_root<'a>(
		&self,
		delta: impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)>,
		state_version: StateVersion,
	) -> (
		<HashingFor<Block> as sp_core::Hasher>::Out,
		BackendTransaction<HashingFor<Block>>,
	)
	where
		<HashingFor<Block> as sp_core::Hasher>::Out: Ord,
	{
		self.db.read().storage_root(delta, state_version)
	}
	fn child_storage_root<'a>(
		&self,
		_child_info: &sp_storage::ChildInfo,
		_delta: impl Iterator<Item = (&'a [u8], Option<&'a [u8]>)>,
		_state_version: StateVersion,
	) -> (
		<HashingFor<Block> as sp_core::Hasher>::Out,
		bool,
		BackendTransaction<HashingFor<Block>>,
	)
	where
		<HashingFor<Block> as sp_core::Hasher>::Out: Ord,
	{
		unimplemented!("child_storage_root: unsupported in lazy loading")
	}
	fn raw_iter(&self, args: sp_state_machine::IterArgs) -> Result<Self::RawIter, Self::Error> {
		let mut clone: RawIterArgs = Default::default();
		clone.start_at_exclusive = args.start_at_exclusive.clone();
		clone.prefix = args.prefix.map(|v| v.to_vec());
		clone.start_at = args.start_at.map(|v| v.to_vec());
		Ok(RawIter::<Block> {
			args: clone,
			complete: false,
			_phantom: Default::default(),
		})
	}
	fn register_overlay_stats(&self, stats: &sp_state_machine::StateMachineStats) {
		self.db.read().register_overlay_stats(stats)
	}
	fn usage_info(&self) -> sp_state_machine::UsageInfo {
		self.db.read().usage_info()
	}
}
impl<B: BlockT> sp_state_machine::backend::AsTrieBackend<HashingFor<B>> for ForkedLazyBackend<B> {
	type TrieBackendStorage = PrefixedMemoryDB<HashingFor<B>>;
	fn as_trie_backend(
		&self,
	) -> &sp_state_machine::TrieBackend<Self::TrieBackendStorage, HashingFor<B>> {
		unimplemented!("`as_trie_backend` is not supported in lazy loading mode.")
	}
}
/// Lazy loading (In-memory) backend. Keeps all states and blocks in memory.
pub struct Backend<Block: BlockT> {
	pub(crate) rpc_client: Arc<RPC>,
	states: ReadWriteLock<HashMap<Block::Hash, ForkedLazyBackend<Block>>>,
	pub(crate) blockchain: Blockchain<Block>,
	import_lock: parking_lot::RwLock<()>,
	pinned_blocks: ReadWriteLock<HashMap<Block::Hash, i64>>,
	pub(crate) fork_checkpoint: Block::Header,
}
impl<Block: BlockT + DeserializeOwned> Backend<Block> {
	fn new(rpc_client: Arc<RPC>, fork_checkpoint: Block::Header) -> Self {
		Backend {
			rpc_client: rpc_client.clone(),
			states: Default::default(),
			blockchain: Blockchain::new(rpc_client),
			import_lock: Default::default(),
			pinned_blocks: Default::default(),
			fork_checkpoint,
		}
	}
}
impl<Block: BlockT + DeserializeOwned> backend::AuxStore for Backend<Block> {
	fn insert_aux<
		'a,
		'b: 'a,
		'c: 'a,
		I: IntoIterator<Item = &'a (&'c [u8], &'c [u8])>,
		D: IntoIterator<Item = &'a &'b [u8]>,
	>(
		&self,
		_insert: I,
		_delete: D,
	) -> sp_blockchain::Result<()> {
		unimplemented!("`insert_aux` is not supported in lazy loading mode.")
	}
	fn get_aux(&self, _key: &[u8]) -> sp_blockchain::Result<Option<Vec<u8>>> {
		unimplemented!("`get_aux` is not supported in lazy loading mode.")
	}
}
impl<Block: BlockT + DeserializeOwned> backend::Backend<Block> for Backend<Block> {
	type BlockImportOperation = BlockImportOperation<Block>;
	type Blockchain = Blockchain<Block>;
	type State = ForkedLazyBackend<Block>;
	type OffchainStorage = InMemOffchainStorage;
	fn begin_operation(&self) -> sp_blockchain::Result<Self::BlockImportOperation> {
		let old_state = self.state_at(Default::default())?;
		Ok(BlockImportOperation {
			pending_block: None,
			old_state,
			new_state: None,
			aux: Default::default(),
			storage_updates: Default::default(),
			finalized_blocks: Default::default(),
			set_head: None,
			before_fork: false,
		})
	}
	fn begin_state_operation(
		&self,
		operation: &mut Self::BlockImportOperation,
		block: Block::Hash,
	) -> sp_blockchain::Result<()> {
		operation.old_state = self.state_at(block)?;
		Ok(())
	}
	fn commit_operation(&self, operation: Self::BlockImportOperation) -> sp_blockchain::Result<()> {
		if !operation.finalized_blocks.is_empty() {
			for (block, justification) in operation.finalized_blocks {
				self.blockchain.finalize_header(block, justification)?;
			}
		}
		if let Some(pending_block) = operation.pending_block {
			let old_state = &operation.old_state;
			let (header, body, justification) = pending_block.block.into_inner();
			let hash = header.hash();
			let new_removed_keys = old_state.removed_keys.clone();
			for (key, value) in operation.storage_updates.clone() {
				if value.is_some() {
					new_removed_keys.write().remove(&key.clone());
				} else {
					new_removed_keys.write().insert(key.clone(), ());
				}
			}
			let new_db = old_state.db.clone();
			new_db.write().insert(
				vec![(None::<ChildInfo>, operation.storage_updates)],
				StateVersion::V0,
			);
			let new_state = ForkedLazyBackend {
				rpc_client: self.rpc_client.clone(),
				block_hash: Some(hash.clone()),
				fork_block: self.fork_checkpoint.hash(),
				db: new_db,
				removed_keys: new_removed_keys,
				before_fork: operation.before_fork,
			};
			self.states.write().insert(hash, new_state);
			self.blockchain
				.insert(hash, header, justification, body, pending_block.state)?;
		}
		if !operation.aux.is_empty() {
			self.blockchain.write_aux(operation.aux);
		}
		if let Some(set_head) = operation.set_head {
			self.blockchain.set_head(set_head)?;
		}
		Ok(())
	}
	fn finalize_block(
		&self,
		hash: Block::Hash,
		justification: Option<Justification>,
	) -> sp_blockchain::Result<()> {
		self.blockchain.finalize_header(hash, justification)
	}
	fn append_justification(
		&self,
		hash: Block::Hash,
		justification: Justification,
	) -> sp_blockchain::Result<()> {
		self.blockchain.append_justification(hash, justification)
	}
	fn blockchain(&self) -> &Self::Blockchain {
		&self.blockchain
	}
	fn usage_info(&self) -> Option<UsageInfo> {
		None
	}
	fn offchain_storage(&self) -> Option<Self::OffchainStorage> {
		None
	}
	fn state_at(&self, hash: Block::Hash) -> sp_blockchain::Result<Self::State> {
		if hash == Default::default() {
			return Ok(ForkedLazyBackend::<Block> {
				rpc_client: self.rpc_client.clone(),
				block_hash: Some(hash),
				fork_block: self.fork_checkpoint.hash(),
				db: Default::default(),
				removed_keys: Default::default(),
				before_fork: true,
			});
		}
		let (backend, should_write) = self
			.states
			.read()
			.get(&hash)
			.cloned()
			.map(|state| (state, false))
			.unwrap_or_else(|| {
				let header: Block::Header = self
					.rpc_client
					.header::<Block>(Some(hash))
					.ok()
					.flatten()
					.expect("block header");
				let checkpoint = self.fork_checkpoint.clone();
				let state = if header.number().gt(checkpoint.number()) {
					let parent = self.state_at(*header.parent_hash()).ok();
					ForkedLazyBackend::<Block> {
						rpc_client: self.rpc_client.clone(),
						block_hash: Some(hash),
						fork_block: checkpoint.hash(),
						db: parent.clone().map_or(Default::default(), |p| p.db),
						removed_keys: parent.map_or(Default::default(), |p| p.removed_keys),
						before_fork: false,
					}
				} else {
					ForkedLazyBackend::<Block> {
						rpc_client: self.rpc_client.clone(),
						block_hash: Some(hash),
						fork_block: checkpoint.hash(),
						db: Default::default(),
						removed_keys: Default::default(),
						before_fork: true,
					}
				};
				(state, true)
			});
		if should_write {
			self.states.write().insert(hash, backend.clone());
		}
		Ok(backend)
	}
	fn revert(
		&self,
		_n: NumberFor<Block>,
		_revert_finalized: bool,
	) -> sp_blockchain::Result<(NumberFor<Block>, HashSet<Block::Hash>)> {
		Ok((Zero::zero(), HashSet::new()))
	}
	fn remove_leaf_block(&self, _hash: Block::Hash) -> sp_blockchain::Result<()> {
		Ok(())
	}
	fn get_import_lock(&self) -> &parking_lot::RwLock<()> {
		&self.import_lock
	}
	fn requires_full_sync(&self) -> bool {
		false
	}
	fn pin_block(&self, hash: <Block as BlockT>::Hash) -> blockchain::Result<()> {
		let mut blocks = self.pinned_blocks.write();
		*blocks.entry(hash).or_default() += 1;
		Ok(())
	}
	fn unpin_block(&self, hash: <Block as BlockT>::Hash) {
		let mut blocks = self.pinned_blocks.write();
		blocks
			.entry(hash)
			.and_modify(|counter| *counter -= 1)
			.or_insert(-1);
	}
}
impl<Block: BlockT + DeserializeOwned> backend::LocalBackend<Block> for Backend<Block> {}
/// Check that genesis storage is valid.
pub fn check_genesis_storage(storage: &Storage) -> sp_blockchain::Result<()> {
	if storage
		.top
		.iter()
		.any(|(k, _)| well_known_keys::is_child_storage_key(k))
	{
		return Err(sp_blockchain::Error::InvalidState);
	}
	if storage
		.children_default
		.keys()
		.any(|child_key| !well_known_keys::is_child_storage_key(child_key))
	{
		return Err(sp_blockchain::Error::InvalidState);
	}
	Ok(())
}
#[derive(Debug, Clone)]
pub struct RPC {
	http_client: HttpClient,
	delay_between_requests_ms: u64,
	max_retries_per_request: usize,
	counter: Arc<ReadWriteLock<u64>>,
}
impl RPC {
	pub fn new(
		http_client: HttpClient,
		delay_between_requests_ms: u64,
		max_retries_per_request: usize,
	) -> Self {
		Self {
			http_client,
			delay_between_requests_ms,
			max_retries_per_request,
			counter: Default::default(),
		}
	}
	pub fn system_chain(&self) -> Result<String, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::SystemApi::<H256, BlockNumber>::system_chain(&self.http_client)
		};
		self.block_on(request)
	}
	pub fn system_properties(
		&self,
	) -> Result<sc_chain_spec::Properties, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::SystemApi::<H256, BlockNumber>::system_properties(
				&self.http_client,
			)
		};
		self.block_on(request)
	}
	pub fn system_name(&self) -> Result<String, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::SystemApi::<H256, BlockNumber>::system_name(&self.http_client)
		};
		self.block_on(request)
	}
	pub fn block<Block, Hash: Clone>(
		&self,
		hash: Option<Hash>,
	) -> Result<Option<SignedBlock<Block>>, jsonrpsee::core::ClientError>
	where
		Block: BlockT + DeserializeOwned,
		Hash: 'static + Send + Sync + sp_runtime::Serialize + DeserializeOwned,
	{
		let request = &|| {
			substrate_rpc_client::ChainApi::<
				BlockNumber,
				Hash,
				Block::Header,
				SignedBlock<Block>,
			>::block(&self.http_client, hash.clone())
		};
		self.block_on(request)
	}
	pub fn block_hash<Block: BlockT + DeserializeOwned>(
		&self,
		block_number: Option<<Block::Header as HeaderT>::Number>,
	) -> Result<Option<Block::Hash>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::ChainApi::<
				<Block::Header as HeaderT>::Number,
				Block::Hash,
				Block::Header,
				SignedBlock<Block>,
			>::block_hash(
				&self.http_client,
				block_number.map(|n| ListOrValue::Value(NumberOrHex::Hex(n.into()))),
			)
		};
		self.block_on(request).map(|ok| match ok {
			ListOrValue::List(v) => v.get(0).map_or(None, |some| *some),
			ListOrValue::Value(v) => v,
		})
	}
	pub fn header<Block: BlockT + DeserializeOwned>(
		&self,
		hash: Option<Block::Hash>,
	) -> Result<Option<Block::Header>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::ChainApi::<
				BlockNumber,
				Block::Hash,
				Block::Header,
				SignedBlock<Block>,
			>::header(&self.http_client, hash)
		};
		self.block_on(request)
	}
	pub fn storage_hash<
		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize,
	>(
		&self,
		key: StorageKey,
		at: Option<Hash>,
	) -> Result<Option<Hash>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::StateApi::<Hash>::storage_hash(
				&self.http_client,
				key.clone(),
				at.clone(),
			)
		};
		self.block_on(request)
	}
	pub fn storage<
		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize + core::fmt::Debug,
	>(
		&self,
		key: StorageKey,
		at: Option<Hash>,
	) -> Result<Option<StorageData>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::StateApi::<Hash>::storage(
				&self.http_client,
				key.clone(),
				at.clone(),
			)
		};
		self.block_on(request)
	}
	pub fn storage_keys_paged<
		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize,
	>(
		&self,
		key: Option<StorageKey>,
		count: u32,
		start_key: Option<StorageKey>,
		at: Option<Hash>,
	) -> Result<Vec<sp_state_machine::StorageKey>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::StateApi::<Hash>::storage_keys_paged(
				&self.http_client,
				key.clone(),
				count.clone(),
				start_key.clone(),
				at.clone(),
			)
		};
		let result = self.block_on(request);
		match result {
			Ok(result) => Ok(result.iter().map(|item| item.0.clone()).collect()),
			Err(err) => Err(err),
		}
	}
	pub fn query_storage_at<
		Hash: 'static + Clone + Sync + Send + DeserializeOwned + sp_runtime::Serialize,
	>(
		&self,
		keys: Vec<StorageKey>,
		from_block: Option<Hash>,
	) -> Result<Vec<(StorageKey, Option<StorageData>)>, jsonrpsee::core::ClientError> {
		let request = &|| {
			substrate_rpc_client::StateApi::<Hash>::query_storage_at(
				&self.http_client,
				keys.clone(),
				from_block.clone(),
			)
		};
		let result = self.block_on(request);
		match result {
			Ok(result) => Ok(result
				.iter()
				.flat_map(|item| item.changes.clone())
				.collect()),
			Err(err) => Err(err),
		}
	}
	fn block_on<F, T, E>(&self, f: &dyn Fn() -> F) -> Result<T, E>
	where
		F: Future<Output = Result<T, E>>,
	{
		use tokio::runtime::Handle;
		tokio::task::block_in_place(move || {
			Handle::current().block_on(async move {
				let delay_between_requests = Duration::from_millis(self.delay_between_requests_ms);
				let start = std::time::Instant::now();
				self.counter.write().add_assign(1);
				log::debug!(
					target: super::LAZY_LOADING_LOG_TARGET,
					"Sending request: {}",
					self.counter.read()
				);
				// Explicit request delay, to avoid getting 429 errors
				let _ = tokio::time::sleep(delay_between_requests).await;
				// Retry request in case of failure
				// The maximum number of retries is specified by `self.max_retries_per_request`
				let retry_strategy =
					FixedInterval::new(delay_between_requests).take(self.max_retries_per_request);
				let result = Retry::spawn(retry_strategy, f).await;
				log::debug!(
					target: super::LAZY_LOADING_LOG_TARGET,
					"Completed request (id: {}, successful: {}, elapsed_time: {:?})",
					self.counter.read(),
					result.is_ok(),
					start.elapsed()
				);
				result
			})
		})
	}
}
/// Create an instance of a lazy loading memory backend.
pub fn new_lazy_loading_backend<Block>(
	config: &mut Configuration,
	lazy_loading_config: &LazyLoadingConfig,
) -> Result<Arc<Backend<Block>>, Error>
where
	Block: BlockT + DeserializeOwned,
	Block::Hash: From<H256>,
{
	let uri: String = lazy_loading_config.state_rpc.clone().into();
	let http_client = jsonrpsee::http_client::HttpClientBuilder::default()
		.max_request_size(u32::MAX)
		.max_response_size(u32::MAX)
		.request_timeout(Duration::from_secs(10))
		.build(uri)
		.map_err(|e| {
			sp_blockchain::Error::Backend(
				format!("failed to build http client: {:?}", e).to_string(),
			)
		})?;
	let rpc = RPC::new(http_client, 100, 10);
	let block_hash = lazy_loading_config
		.from_block
		.map(|block| Into::<Block::Hash>::into(block));
	let checkpoint: Block = rpc
		.block::<Block, _>(block_hash)
		.ok()
		.flatten()
		.expect("Fetching fork checkpoint")
		.block;
	let backend = Arc::new(Backend::new(Arc::new(rpc), checkpoint.header().clone()));
	let chain_name = backend
		.rpc_client
		.system_chain()
		.expect("Should fetch chain id");
	let chain_properties = backend
		.rpc_client
		.system_properties()
		.expect("Should fetch chain properties");
	let spec_builder = chain_spec::test_spec::lazy_loading_spec_builder(Default::default())
		.with_name(chain_name.as_str())
		.with_properties(chain_properties);
	config.chain_spec = Box::new(spec_builder.build());
	let base_overrides =
		state_overrides::base_state_overrides(lazy_loading_config.runtime_override.clone());
	let custom_overrides = if let Some(path) = lazy_loading_config.state_overrides_path.clone() {
		state_overrides::read(path)?
	} else {
		Default::default()
	};
	let state_overrides: Vec<(Vec<u8>, Vec<u8>)> = [base_overrides, custom_overrides]
		.concat()
		.iter()
		.map(|entry| match entry {
			StateEntry::Concrete(v) => {
				let key = [
					&twox_128(v.pallet.as_bytes()),
					&twox_128(v.storage.as_bytes()),
					v.key.clone().unwrap_or(Vec::new()).as_slice(),
				]
				.concat();
				(key, v.value.clone())
			}
			StateEntry::Raw(raw) => (raw.key.clone(), raw.value.clone()),
		})
		.collect();
	let _ = helpers::produce_genesis_block(backend.clone());
	// Produce first block after the fork
	let _ = helpers::produce_first_block(backend.clone(), checkpoint, state_overrides)?;
	Ok(backend)
}